TradeReady.io
Framework Guides

CrewAI Integration

Build a multi-agent trading crew with analyst, trader, and risk manager roles

Download .md

This guide shows you how to connect AgentExchange to a CrewAI multi-agent system in under 20 minutes. You will wrap each SDK method as a @tool-decorated function, wire them into a Crew with specialized Agent roles, and define Task objects that orchestrate a full trading strategy — all on simulated funds backed by real-time Binance data.

What You Get

After following this guide your CrewAI crew will be able to:

  • Fetch live prices for any of 600+ Binance trading pairs
  • Analyze 24-hour ticker stats and OHLCV candles
  • Place, monitor, and cancel market / limit / stop-loss / take-profit orders
  • Read account balances, open positions, and portfolio summary
  • Pull performance analytics (Sharpe ratio, drawdown, win rate)
  • Stream real-time prices and order notifications over WebSocket

All on simulated funds — real Binance prices, no real money at risk.

Prerequisites

RequirementVersion
Python3.10+
crewailatest (pip install crewai)
crewai-toolslatest (pip install crewai-tools)
agentexchangepip install agentexchange or pip install -e sdk/
AgentExchange serverrunning (see quickstart)
docker compose up -d
curl http://localhost:8000/health
# {"status":"ok"}

Step 1 — Register an Account

curl -s -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{"display_name": "MyCrewBot", "starting_balance": "10000.00"}'

Save api_secret now — it is shown only once.

Store in .env (never commit):

AGENTEXCHANGE_API_KEY=ak_live_...
AGENTEXCHANGE_API_SECRET=sk_live_...
AGENTEXCHANGE_BASE_URL=http://localhost:8000
AGENTEXCHANGE_WS_URL=ws://localhost:8000
OPENAI_API_KEY=sk-...

Step 2 — Create the Shared SDK Client

# trading_crew/client.py
import os
from agentexchange import AgentExchangeClient

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)

AgentExchangeClient is thread-safe — share this single instance across all tools.


Step 3 — Define Tools with the @tool Decorator

CrewAI tools are Python functions decorated with @tool. The decorator's name and docstring become the description the LLM reads when deciding which tool to call.

# trading_crew/tools.py
import json
import os
from decimal import Decimal
from typing import Optional

from crewai.tools import tool

from agentexchange import AgentExchangeClient
from agentexchange.exceptions import AgentExchangeError, RateLimitError

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)


def _err(exc: Exception) -> str:
    if isinstance(exc, RateLimitError):
        return f"ERROR RateLimitError: {exc}. Wait until the rate-limit window resets, then retry."
    if isinstance(exc, AgentExchangeError):
        return f"ERROR {exc.code}: {exc}"
    return f"ERROR unexpected: {exc}"


@tool("get_price")
def get_price(symbol: str) -> str:
    """Get the current live price for a single trading pair.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT or ETHUSDT.
    """
    try:
        price = client.get_price(symbol.strip().upper())
        return json.dumps({
            "symbol": price.symbol,
            "price": str(price.price),
            "timestamp": price.timestamp.isoformat(),
        })
    except Exception as exc:
        return _err(exc)


@tool("get_all_prices")
def get_all_prices(query: str = "all") -> str:
    """Get current prices for all 600+ active trading pairs at once.

    Use this instead of calling get_price in a loop when scanning multiple coins.
    """
    try:
        prices = client.get_all_prices()
        return json.dumps([
            {"symbol": p.symbol, "price": str(p.price)}
            for p in prices
        ])
    except Exception as exc:
        return _err(exc)


@tool("get_ticker")
def get_ticker(symbol: str) -> str:
    """Get 24-hour ticker statistics for a trading pair.

    Includes open, high, low, close, volume, and 24h change percentage.
    Use this to assess momentum before entering a trade.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT.
    """
    try:
        t = client.get_ticker(symbol.strip().upper())
        return json.dumps({
            "symbol": t.symbol,
            "open": str(t.open), "high": str(t.high),
            "low": str(t.low), "close": str(t.close),
            "volume": str(t.volume), "change_pct": str(t.change_pct),
        })
    except Exception as exc:
        return _err(exc)


@tool("get_candles")
def get_candles(symbol: str, interval: str = "1h", limit: int = 24) -> str:
    """Get OHLCV candle history for technical analysis.

    Args:
        symbol:   Uppercase trading pair, e.g. BTCUSDT.
        interval: 1m, 5m, 15m, 1h, 4h, 1d. Default 1h.
        limit:    Number of candles (1-1000). Default 24.
    """
    try:
        candles = client.get_candles(
            symbol=symbol.strip().upper(), interval=interval, limit=limit,
        )
        return json.dumps([
            {
                "open_time": c.open_time.isoformat(),
                "open": str(c.open), "high": str(c.high),
                "low": str(c.low), "close": str(c.close),
                "volume": str(c.volume),
            }
            for c in candles
        ])
    except Exception as exc:
        return _err(exc)


@tool("place_order")
def place_order(
    symbol: str, side: str, order_type: str, quantity: str,
    price: Optional[str] = None, trigger_price: Optional[str] = None,
) -> str:
    """Place a crypto order on the exchange.

    ALWAYS call get_balance and get_price BEFORE placing an order.

    Args:
        symbol:        Uppercase trading pair, e.g. BTCUSDT.
        side:          "buy" or "sell".
        order_type:    "market", "limit", "stop_loss", or "take_profit".
        quantity:      Order size as decimal string, e.g. "0.01".
        price:         Required for limit orders.
        trigger_price: Required for stop_loss and take_profit orders.

    Examples:
        Market buy:     place_order("BTCUSDT", "buy",  "market",    "0.01")
        Limit buy:      place_order("BTCUSDT", "buy",  "limit",     "0.01", price="60000.00")
        Stop-loss sell: place_order("BTCUSDT", "sell", "stop_loss", "0.01", trigger_price="58000.00")
    """
    try:
        kwargs: dict = {}
        if price:
            kwargs["price"] = Decimal(price)
        if trigger_price:
            kwargs["trigger_price"] = Decimal(trigger_price)
        order = client.place_order(
            symbol=symbol.strip().upper(), side=side,
            order_type=order_type, quantity=Decimal(quantity), **kwargs,
        )
        return json.dumps({
            "order_id": str(order.order_id),
            "symbol": order.symbol, "side": order.side,
            "status": order.status, "quantity": str(order.quantity),
            "executed_price": str(order.executed_price) if order.executed_price else None,
            "slippage_pct": str(order.slippage_pct) if order.slippage_pct else None,
        })
    except Exception as exc:
        return _err(exc)


@tool("get_balance")
def get_balance(query: str = "all") -> str:
    """Get all asset balances — available (free to trade) and total.

    ALWAYS call this before placing any order to confirm you have enough funds.
    """
    try:
        balance = client.get_balance()
        return json.dumps({
            "total_equity_usdt": str(balance.total_equity_usdt),
            "balances": [
                {"asset": b.asset, "available": str(b.available), "total": str(b.total)}
                for b in balance.balances
            ],
        })
    except Exception as exc:
        return _err(exc)


@tool("get_portfolio")
def get_portfolio(query: str = "all") -> str:
    """Get full portfolio summary including total equity, PnL, and ROI."""
    try:
        pf = client.get_portfolio()
        return json.dumps({
            "total_equity": str(pf.total_equity),
            "starting_balance": str(pf.starting_balance),
            "roi_pct": str(pf.roi_pct),
            "unrealized_pnl": str(pf.unrealized_pnl),
            "realized_pnl": str(pf.realized_pnl),
            "available_cash": str(pf.available_cash),
            "position_count": pf.position_count,
        })
    except Exception as exc:
        return _err(exc)


@tool("get_positions")
def get_positions(query: str = "all") -> str:
    """Get all open positions with unrealized PnL.

    No single position may exceed 25% of total equity.
    """
    try:
        positions = client.get_positions()
        return json.dumps([
            {
                "symbol": p.symbol, "quantity": str(p.quantity),
                "avg_entry_price": str(p.avg_entry_price),
                "current_price": str(p.current_price),
                "unrealized_pnl": str(p.unrealized_pnl),
                "unrealized_pnl_pct": str(p.unrealized_pnl_pct),
            }
            for p in positions
        ])
    except Exception as exc:
        return _err(exc)


@tool("get_performance")
def get_performance(period: str = "all") -> str:
    """Get trading performance metrics for the given time period.

    Args:
        period: "1d", "7d", "30d", or "all". Default "all".
    """
    try:
        p = period.strip() or "all"
        perf = client.get_performance(period=p)
        return json.dumps({
            "period": p,
            "sharpe_ratio": str(perf.sharpe_ratio),
            "win_rate": str(perf.win_rate),
            "max_drawdown_pct": str(perf.max_drawdown_pct),
            "total_trades": perf.total_trades,
            "profit_factor": str(perf.profit_factor),
        })
    except Exception as exc:
        return _err(exc)


AGENTEXCHANGE_TOOLS = [
    get_price, get_all_prices, get_ticker, get_candles,
    place_order, get_balance, get_positions, get_portfolio, get_performance,
]

Step 4 — Define Specialized Agents

CrewAI shines when you split responsibilities across agents with distinct roles. This crew mirrors a real trading desk: one analyst researches the market, one trader executes orders, and one risk manager monitors the portfolio.

# trading_crew/agents.py
import os
from crewai import Agent
from langchain_openai import ChatOpenAI

from trading_crew.tools import (
    get_all_prices, get_ticker, get_candles,
    get_price, get_balance, get_positions, place_order,
    get_portfolio, get_performance,
)

llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=os.environ["OPENAI_API_KEY"])

market_analyst = Agent(
    role="Market Analyst",
    goal=(
        "Scan the market for the best trading opportunity right now. "
        "Identify the single coin with the strongest momentum (highest 24h change_pct > 2% "
        "combined with above-average volume). Provide a buy recommendation with a suggested "
        "entry quantity (max $200 USDT value) and a stop-loss level 5% below entry."
    ),
    backstory=(
        "You are an experienced quant analyst specializing in momentum strategies. "
        "You only recommend a trade when the data clearly supports it."
    ),
    tools=[get_all_prices, get_ticker, get_candles],
    llm=llm,
    verbose=True,
)

trader = Agent(
    role="Trader",
    goal=(
        "Execute the trade recommended by the Market Analyst. "
        "Before every order: (1) call get_balance to confirm sufficient USDT, "
        "(2) call get_price to verify current price, "
        "(3) call get_positions to check you are not already in this symbol. "
        "After a buy fills, immediately place a stop_loss at the recommended level."
    ),
    backstory=(
        "You are a disciplined execution trader. You follow instructions precisely "
        "and always confirm account state before placing orders."
    ),
    tools=[get_price, get_balance, get_positions, place_order],
    llm=llm,
    verbose=True,
)

risk_manager = Agent(
    role="Risk Manager",
    goal=(
        "After each trading round, evaluate portfolio health. "
        "Report: current equity, ROI, open positions with unrealized PnL, "
        "7-day Sharpe ratio and max drawdown, and whether the daily loss "
        "circuit breaker is close to triggering."
    ),
    backstory=(
        "You are a portfolio risk officer. You monitor positions and performance metrics "
        "after every trade cycle. You never place orders — your job is oversight."
    ),
    tools=[get_portfolio, get_positions, get_balance, get_performance],
    llm=llm,
    verbose=True,
)

Step 5 — Define Tasks

Each Task maps to one agent's deliverable. Tasks run sequentially; later tasks reference earlier outputs via context.

# trading_crew/tasks.py
from crewai import Task
from trading_crew.agents import market_analyst, trader, risk_manager

research_task = Task(
    description=(
        "Scan the market for the best momentum opportunity right now.\n"
        "1. Call get_all_prices.\n"
        "2. Call get_ticker for BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, ADAUSDT, XRPUSDT.\n"
        "3. Filter to coins with change_pct > 2%.\n"
        "4. For the top candidate, call get_candles with interval='1h' limit=24.\n"
        "5. Output: recommended symbol, current price, suggested quantity "
        "(max $200 USDT), and stop-loss trigger (5% below entry)."
    ),
    expected_output=(
        "A JSON-style recommendation with: symbol, current_price, "
        "suggested_quantity, stop_loss_trigger_price, and rationale."
    ),
    agent=market_analyst,
)

execution_task = Task(
    description=(
        "Execute the trade recommended in the research task.\n"
        "1. Read symbol, quantity, and stop-loss from the research output.\n"
        "2. Call get_balance — confirm available USDT >= (quantity × current_price × 1.002).\n"
        "3. Call get_positions — confirm no existing position in this symbol.\n"
        "4. Call get_price for the latest price.\n"
        "5. Place a market buy order.\n"
        "6. Once filled, place a stop_loss sell at the recommended trigger price.\n"
        "7. Report both order IDs, executed buy price, and stop-loss trigger."
    ),
    expected_output=(
        "Summary with: buy_order_id, executed_buy_price, quantity, "
        "stop_loss_order_id, stop_loss_trigger_price, remaining_usdt_balance."
    ),
    agent=trader,
    context=[research_task],
)

risk_review_task = Task(
    description=(
        "Review portfolio health after the executed trades.\n"
        "1. Call get_portfolio for total equity, ROI, and cash.\n"
        "2. Call get_positions for all open positions with unrealized PnL.\n"
        "3. Call get_performance with period='7d'.\n"
        "4. Flag any position whose value exceeds 20% of total equity.\n"
        "5. Flag if 7-day max drawdown exceeds 10%.\n"
        "6. Output a risk summary report."
    ),
    expected_output=(
        "Structured risk report with: total_equity, roi_pct, open_positions, "
        "sharpe_ratio_7d, max_drawdown_7d, and any risk flags triggered."
    ),
    agent=risk_manager,
    context=[execution_task],
)

Step 6 — Assemble and Run the Crew

# trading_crew/crew.py
from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task

trading_crew = Crew(
    agents=[market_analyst, trader, risk_manager],
    tasks=[research_task, execution_task, risk_review_task],
    process=Process.sequential,
    verbose=True,
)
# main.py
from dotenv import load_dotenv
load_dotenv()

from trading_crew.crew import trading_crew

result = trading_crew.kickoff()
print(result)
python main.py

The crew runs in order:

  1. Analyst scans 600+ prices, picks the best momentum coin, outputs a recommendation.
  2. Trader confirms balance, checks positions, executes the market buy, places a stop-loss.
  3. Risk Manager reviews the new portfolio state and flags any violations.

Step 7 — Autonomous Strategy Loop

import time
from dotenv import load_dotenv
load_dotenv()

from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task

INTERVAL_SECONDS = 300  # run every 5 minutes


def run_once() -> str:
    # Create a new Crew instance each cycle so tasks start fresh
    crew = Crew(
        agents=[market_analyst, trader, risk_manager],
        tasks=[research_task, execution_task, risk_review_task],
        process=Process.sequential,
        verbose=False,
    )
    return str(crew.kickoff())


if __name__ == "__main__":
    cycle = 0
    while True:
        cycle += 1
        print(f"\n{'='*60}\nCycle {cycle} — running strategy...\n{'='*60}")
        print(run_once())
        print(f"\nSleeping {INTERVAL_SECONDS}s...")
        time.sleep(INTERVAL_SECONDS)

Create a new Crew instance on each iteration rather than reusing the same one. This ensures tasks start fresh without stale context from previous cycles.


Step 8 — Hierarchical Crew with a Manager

For more complex setups, use Process.hierarchical to add a manager LLM that delegates tasks dynamically:

from crewai import Crew, Process
from langchain_openai import ChatOpenAI

manager_llm = ChatOpenAI(model="gpt-4o", temperature=0)

hierarchical_crew = Crew(
    agents=[market_analyst, trader, risk_manager],
    tasks=[research_task, execution_task, risk_review_task],
    process=Process.hierarchical,
    manager_llm=manager_llm,
    verbose=True,
)

In hierarchical mode, the manager decides which specialist to invoke and in what order.


Step 9 — WebSocket Streaming

Run a WebSocket client in a background thread and add a tool that reads from the price cache:

# trading_crew/price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS

latest_prices: dict[str, str] = {}
_lock = threading.Lock()

ws = AgentExchangeWS(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)


@ws.on_ticker("BTCUSDT")
def _on_btc(msg: dict) -> None:
    with _lock:
        latest_prices["BTCUSDT"] = msg["data"]["price"]


@ws.on_ticker("ETHUSDT")
def _on_eth(msg: dict) -> None:
    with _lock:
        latest_prices["ETHUSDT"] = msg["data"]["price"]


@ws.on_order_update()
def _on_order(msg: dict) -> None:
    data = msg["data"]
    print(f"[WS] Order {data['order_id']}{data['status']}")


def start_stream() -> threading.Thread:
    t = threading.Thread(target=ws.run_forever, daemon=True)
    t.start()
    return t
# Add to trading_crew/tools.py
from crewai.tools import tool
from trading_crew.price_stream import latest_prices, _lock

@tool("get_streamed_price")
def get_streamed_price(symbol: str) -> str:
    """Get the latest price from the live WebSocket feed (zero HTTP latency).

    Falls back gracefully if the symbol has not yet been cached.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT.
    """
    sym = symbol.strip().upper()
    with _lock:
        price = latest_prices.get(sym)
    if not price:
        return f"No streamed price cached for {sym}. Use get_price instead."
    return json.dumps({"symbol": sym, "price": price, "source": "websocket"})

Start the stream before kicking off the crew:

from trading_crew.price_stream import start_stream

start_stream()  # non-blocking daemon thread
result = trading_crew.kickoff()

Error Handling Reference

Error CodeRecommended Agent Behavior
INSUFFICIENT_BALANCECall get_balance, recalculate quantity, retry
RATE_LIMIT_EXCEEDEDStop calling APIs; wait for the rate-limit window to reset
DAILY_LOSS_LIMITDo not place more orders today; report in task output
INVALID_SYMBOLCall list_pairs to find the correct symbol, retry
INVALID_QUANTITYReduce quantity; check minimum step size
ORDER_REJECTEDCheck list_open_orders count (max 50) and position limits (max 25%)
PRICE_NOT_AVAILABLERetry after 2–3 seconds
CONNECTION_ERRORRetry with exponential back-off: 1s → 2s → 4s

Project Structure

trading_crew/
├── client.py         # Shared AgentExchangeClient instance
├── tools.py          # @tool-decorated SDK wrappers
├── price_stream.py   # WebSocket background thread + price cache
├── agents.py         # Agent role definitions
├── tasks.py          # Task definitions
└── crew.py           # Crew assembly
main.py               # Entry point
.env                  # API credentials (never commit)

Troubleshooting

ModuleNotFoundError: No module named 'agentexchange'

pip install -e sdk/

Crew exceeds token limits on large get_all_prices responses

Filter the response in the tool before returning:

@tool("get_top_prices")
def get_top_prices(query: str = "all") -> str:
    """Get prices for the top 20 most liquid trading pairs only."""
    top_symbols = [
        "BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "ADAUSDT",
        "XRPUSDT", "DOGEUSDT", "AVAXUSDT", "DOTUSDT", "MATICUSDT",
        "LINKUSDT", "LTCUSDT", "UNIUSDT", "ATOMUSDT", "FILUSDT",
        "NEARUSDT", "APTUSDT", "ARBUSDT", "OPUSDT", "INJUSDT",
    ]
    try:
        prices = client.get_all_prices()
        return json.dumps([
            {"symbol": p.symbol, "price": str(p.price)}
            for p in prices if p.symbol in top_symbols
        ])
    except Exception as exc:
        return _err(exc)

Next Steps

On this page