<!-- Generated from TradeReady.io docs. Visit https://tradeready.io/docs for the full experience. -->

---
title: CrewAI Integration
description: Build a multi-agent trading crew with analyst, trader, and risk manager roles
---

This guide shows you how to connect **AgentExchange** to a CrewAI multi-agent system in under 20 minutes. You will wrap each SDK method as a `@tool`-decorated function, wire them into a `Crew` with specialized `Agent` roles, and define `Task` objects that orchestrate a full trading strategy — all on simulated funds backed by real-time Binance data.

## What You Get

After following this guide your CrewAI crew will be able to:

- Fetch live prices for any of 600+ Binance trading pairs
- Analyze 24-hour ticker stats and OHLCV candles
- Place, monitor, and cancel market / limit / stop-loss / take-profit orders
- Read account balances, open positions, and portfolio summary
- Pull performance analytics (Sharpe ratio, drawdown, win rate)
- Stream real-time prices and order notifications over WebSocket

All on simulated funds — real Binance prices, no real money at risk.

## Prerequisites

| Requirement | Version |
|-------------|---------|
| Python | 3.10+ |
| `crewai` | latest (`pip install crewai`) |
| `crewai-tools` | latest (`pip install crewai-tools`) |
| `agentexchange` | `pip install agentexchange` or `pip install -e sdk/` |
| AgentExchange server | running (see [quickstart](/docs/quickstart)) |

```bash
docker compose up -d
curl http://localhost:8000/health
# {"status":"ok"}
```

---

## Step 1 — Register an Account

```bash
curl -s -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{"display_name": "MyCrewBot", "starting_balance": "10000.00"}'
```

> **Warning:**
> Save `api_secret` now — it is shown only once.

Store in `.env` (never commit):

```bash
AGENTEXCHANGE_API_KEY=ak_live_...
AGENTEXCHANGE_API_SECRET=sk_live_...
AGENTEXCHANGE_BASE_URL=http://localhost:8000
AGENTEXCHANGE_WS_URL=ws://localhost:8000
OPENAI_API_KEY=sk-...
```

---

## Step 2 — Create the Shared SDK Client

```python
# trading_crew/client.py
import os
from agentexchange import AgentExchangeClient

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)
```

`AgentExchangeClient` is thread-safe — share this single instance across all tools.

---

## Step 3 — Define Tools with the `@tool` Decorator

CrewAI tools are Python functions decorated with `@tool`. The decorator's `name` and docstring become the description the LLM reads when deciding which tool to call.

```python
# trading_crew/tools.py
import json
import os
from decimal import Decimal
from typing import Optional

from crewai.tools import tool

from agentexchange import AgentExchangeClient
from agentexchange.exceptions import AgentExchangeError, RateLimitError

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)

def _err(exc: Exception) -> str:
    if isinstance(exc, RateLimitError):
        return f"ERROR RateLimitError: {exc}. Wait until the rate-limit window resets, then retry."
    if isinstance(exc, AgentExchangeError):
        return f"ERROR {exc.code}: {exc}"
    return f"ERROR unexpected: {exc}"

@tool("get_price")
def get_price(symbol: str) -> str:
    """Get the current live price for a single trading pair.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT or ETHUSDT.
    """
    try:
        price = client.get_price(symbol.strip().upper())
        return json.dumps({
            "symbol": price.symbol,
            "price": str(price.price),
            "timestamp": price.timestamp.isoformat(),
        })
    except Exception as exc:
        return _err(exc)

@tool("get_all_prices")
def get_all_prices(query: str = "all") -> str:
    """Get current prices for all 600+ active trading pairs at once.

    Use this instead of calling get_price in a loop when scanning multiple coins.
    """
    try:
        prices = client.get_all_prices()
        return json.dumps([
            {"symbol": p.symbol, "price": str(p.price)}
            for p in prices
        ])
    except Exception as exc:
        return _err(exc)

@tool("get_ticker")
def get_ticker(symbol: str) -> str:
    """Get 24-hour ticker statistics for a trading pair.

    Includes open, high, low, close, volume, and 24h change percentage.
    Use this to assess momentum before entering a trade.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT.
    """
    try:
        t = client.get_ticker(symbol.strip().upper())
        return json.dumps({
            "symbol": t.symbol,
            "open": str(t.open), "high": str(t.high),
            "low": str(t.low), "close": str(t.close),
            "volume": str(t.volume), "change_pct": str(t.change_pct),
        })
    except Exception as exc:
        return _err(exc)

@tool("get_candles")
def get_candles(symbol: str, interval: str = "1h", limit: int = 24) -> str:
    """Get OHLCV candle history for technical analysis.

    Args:
        symbol:   Uppercase trading pair, e.g. BTCUSDT.
        interval: 1m, 5m, 15m, 1h, 4h, 1d. Default 1h.
        limit:    Number of candles (1-1000). Default 24.
    """
    try:
        candles = client.get_candles(
            symbol=symbol.strip().upper(), interval=interval, limit=limit,
        )
        return json.dumps([
            {
                "open_time": c.open_time.isoformat(),
                "open": str(c.open), "high": str(c.high),
                "low": str(c.low), "close": str(c.close),
                "volume": str(c.volume),
            }
            for c in candles
        ])
    except Exception as exc:
        return _err(exc)

@tool("place_order")
def place_order(
    symbol: str, side: str, order_type: str, quantity: str,
    price: Optional[str] = None, trigger_price: Optional[str] = None,
) -> str:
    """Place a crypto order on the exchange.

    ALWAYS call get_balance and get_price BEFORE placing an order.

    Args:
        symbol:        Uppercase trading pair, e.g. BTCUSDT.
        side:          "buy" or "sell".
        order_type:    "market", "limit", "stop_loss", or "take_profit".
        quantity:      Order size as decimal string, e.g. "0.01".
        price:         Required for limit orders.
        trigger_price: Required for stop_loss and take_profit orders.

    Examples:
        Market buy:     place_order("BTCUSDT", "buy",  "market",    "0.01")
        Limit buy:      place_order("BTCUSDT", "buy",  "limit",     "0.01", price="60000.00")
        Stop-loss sell: place_order("BTCUSDT", "sell", "stop_loss", "0.01", trigger_price="58000.00")
    """
    try:
        kwargs: dict = {}
        if price:
            kwargs["price"] = Decimal(price)
        if trigger_price:
            kwargs["trigger_price"] = Decimal(trigger_price)
        order = client.place_order(
            symbol=symbol.strip().upper(), side=side,
            order_type=order_type, quantity=Decimal(quantity), **kwargs,
        )
        return json.dumps({
            "order_id": str(order.order_id),
            "symbol": order.symbol, "side": order.side,
            "status": order.status, "quantity": str(order.quantity),
            "executed_price": str(order.executed_price) if order.executed_price else None,
            "slippage_pct": str(order.slippage_pct) if order.slippage_pct else None,
        })
    except Exception as exc:
        return _err(exc)

@tool("get_balance")
def get_balance(query: str = "all") -> str:
    """Get all asset balances — available (free to trade) and total.

    ALWAYS call this before placing any order to confirm you have enough funds.
    """
    try:
        balance = client.get_balance()
        return json.dumps({
            "total_equity_usdt": str(balance.total_equity_usdt),
            "balances": [
                {"asset": b.asset, "available": str(b.available), "total": str(b.total)}
                for b in balance.balances
            ],
        })
    except Exception as exc:
        return _err(exc)

@tool("get_portfolio")
def get_portfolio(query: str = "all") -> str:
    """Get full portfolio summary including total equity, PnL, and ROI."""
    try:
        pf = client.get_portfolio()
        return json.dumps({
            "total_equity": str(pf.total_equity),
            "starting_balance": str(pf.starting_balance),
            "roi_pct": str(pf.roi_pct),
            "unrealized_pnl": str(pf.unrealized_pnl),
            "realized_pnl": str(pf.realized_pnl),
            "available_cash": str(pf.available_cash),
            "position_count": pf.position_count,
        })
    except Exception as exc:
        return _err(exc)

@tool("get_positions")
def get_positions(query: str = "all") -> str:
    """Get all open positions with unrealized PnL.

    No single position may exceed 25% of total equity.
    """
    try:
        positions = client.get_positions()
        return json.dumps([
            {
                "symbol": p.symbol, "quantity": str(p.quantity),
                "avg_entry_price": str(p.avg_entry_price),
                "current_price": str(p.current_price),
                "unrealized_pnl": str(p.unrealized_pnl),
                "unrealized_pnl_pct": str(p.unrealized_pnl_pct),
            }
            for p in positions
        ])
    except Exception as exc:
        return _err(exc)

@tool("get_performance")
def get_performance(period: str = "all") -> str:
    """Get trading performance metrics for the given time period.

    Args:
        period: "1d", "7d", "30d", or "all". Default "all".
    """
    try:
        p = period.strip() or "all"
        perf = client.get_performance(period=p)
        return json.dumps({
            "period": p,
            "sharpe_ratio": str(perf.sharpe_ratio),
            "win_rate": str(perf.win_rate),
            "max_drawdown_pct": str(perf.max_drawdown_pct),
            "total_trades": perf.total_trades,
            "profit_factor": str(perf.profit_factor),
        })
    except Exception as exc:
        return _err(exc)

AGENTEXCHANGE_TOOLS = [
    get_price, get_all_prices, get_ticker, get_candles,
    place_order, get_balance, get_positions, get_portfolio, get_performance,
]
```

---

## Step 4 — Define Specialized Agents

CrewAI shines when you split responsibilities across agents with distinct roles. This crew mirrors a real trading desk: one analyst researches the market, one trader executes orders, and one risk manager monitors the portfolio.

```python
# trading_crew/agents.py
import os
from crewai import Agent
from langchain_openai import ChatOpenAI

from trading_crew.tools import (
    get_all_prices, get_ticker, get_candles,
    get_price, get_balance, get_positions, place_order,
    get_portfolio, get_performance,
)

llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=os.environ["OPENAI_API_KEY"])

market_analyst = Agent(
    role="Market Analyst",
    goal=(
        "Scan the market for the best trading opportunity right now. "
        "Identify the single coin with the strongest momentum (highest 24h change_pct > 2% "
        "combined with above-average volume). Provide a buy recommendation with a suggested "
        "entry quantity (max $200 USDT value) and a stop-loss level 5% below entry."
    ),
    backstory=(
        "You are an experienced quant analyst specializing in momentum strategies. "
        "You only recommend a trade when the data clearly supports it."
    ),
    tools=[get_all_prices, get_ticker, get_candles],
    llm=llm,
    verbose=True,
)

trader = Agent(
    role="Trader",
    goal=(
        "Execute the trade recommended by the Market Analyst. "
        "Before every order: (1) call get_balance to confirm sufficient USDT, "
        "(2) call get_price to verify current price, "
        "(3) call get_positions to check you are not already in this symbol. "
        "After a buy fills, immediately place a stop_loss at the recommended level."
    ),
    backstory=(
        "You are a disciplined execution trader. You follow instructions precisely "
        "and always confirm account state before placing orders."
    ),
    tools=[get_price, get_balance, get_positions, place_order],
    llm=llm,
    verbose=True,
)

risk_manager = Agent(
    role="Risk Manager",
    goal=(
        "After each trading round, evaluate portfolio health. "
        "Report: current equity, ROI, open positions with unrealized PnL, "
        "7-day Sharpe ratio and max drawdown, and whether the daily loss "
        "circuit breaker is close to triggering."
    ),
    backstory=(
        "You are a portfolio risk officer. You monitor positions and performance metrics "
        "after every trade cycle. You never place orders — your job is oversight."
    ),
    tools=[get_portfolio, get_positions, get_balance, get_performance],
    llm=llm,
    verbose=True,
)
```

---

## Step 5 — Define Tasks

Each `Task` maps to one agent's deliverable. Tasks run sequentially; later tasks reference earlier outputs via `context`.

```python
# trading_crew/tasks.py
from crewai import Task
from trading_crew.agents import market_analyst, trader, risk_manager

research_task = Task(
    description=(
        "Scan the market for the best momentum opportunity right now.\n"
        "1. Call get_all_prices.\n"
        "2. Call get_ticker for BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, ADAUSDT, XRPUSDT.\n"
        "3. Filter to coins with change_pct > 2%.\n"
        "4. For the top candidate, call get_candles with interval='1h' limit=24.\n"
        "5. Output: recommended symbol, current price, suggested quantity "
        "(max $200 USDT), and stop-loss trigger (5% below entry)."
    ),
    expected_output=(
        "A JSON-style recommendation with: symbol, current_price, "
        "suggested_quantity, stop_loss_trigger_price, and rationale."
    ),
    agent=market_analyst,
)

execution_task = Task(
    description=(
        "Execute the trade recommended in the research task.\n"
        "1. Read symbol, quantity, and stop-loss from the research output.\n"
        "2. Call get_balance — confirm available USDT >= (quantity × current_price × 1.002).\n"
        "3. Call get_positions — confirm no existing position in this symbol.\n"
        "4. Call get_price for the latest price.\n"
        "5. Place a market buy order.\n"
        "6. Once filled, place a stop_loss sell at the recommended trigger price.\n"
        "7. Report both order IDs, executed buy price, and stop-loss trigger."
    ),
    expected_output=(
        "Summary with: buy_order_id, executed_buy_price, quantity, "
        "stop_loss_order_id, stop_loss_trigger_price, remaining_usdt_balance."
    ),
    agent=trader,
    context=[research_task],
)

risk_review_task = Task(
    description=(
        "Review portfolio health after the executed trades.\n"
        "1. Call get_portfolio for total equity, ROI, and cash.\n"
        "2. Call get_positions for all open positions with unrealized PnL.\n"
        "3. Call get_performance with period='7d'.\n"
        "4. Flag any position whose value exceeds 20% of total equity.\n"
        "5. Flag if 7-day max drawdown exceeds 10%.\n"
        "6. Output a risk summary report."
    ),
    expected_output=(
        "Structured risk report with: total_equity, roi_pct, open_positions, "
        "sharpe_ratio_7d, max_drawdown_7d, and any risk flags triggered."
    ),
    agent=risk_manager,
    context=[execution_task],
)
```

---

## Step 6 — Assemble and Run the Crew

```python
# trading_crew/crew.py
from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task

trading_crew = Crew(
    agents=[market_analyst, trader, risk_manager],
    tasks=[research_task, execution_task, risk_review_task],
    process=Process.sequential,
    verbose=True,
)
```

```python
# main.py
from dotenv import load_dotenv
load_dotenv()

from trading_crew.crew import trading_crew

result = trading_crew.kickoff()
print(result)
```

```bash
python main.py
```

The crew runs in order:
1. **Analyst** scans 600+ prices, picks the best momentum coin, outputs a recommendation.
2. **Trader** confirms balance, checks positions, executes the market buy, places a stop-loss.
3. **Risk Manager** reviews the new portfolio state and flags any violations.

---

## Step 7 — Autonomous Strategy Loop

```python
import time
from dotenv import load_dotenv
load_dotenv()

from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task

INTERVAL_SECONDS = 300  # run every 5 minutes

def run_once() -> str:
    # Create a new Crew instance each cycle so tasks start fresh
    crew = Crew(
        agents=[market_analyst, trader, risk_manager],
        tasks=[research_task, execution_task, risk_review_task],
        process=Process.sequential,
        verbose=False,
    )
    return str(crew.kickoff())

if __name__ == "__main__":
    cycle = 0
    while True:
        cycle += 1
        print(f"\n{'='*60}\nCycle {cycle} — running strategy...\n{'='*60}")
        print(run_once())
        print(f"\nSleeping {INTERVAL_SECONDS}s...")
        time.sleep(INTERVAL_SECONDS)
```

> **Info:**
> Create a new `Crew` instance on each iteration rather than reusing the same one. This ensures tasks start fresh without stale context from previous cycles.

---

## Step 8 — Hierarchical Crew with a Manager

For more complex setups, use `Process.hierarchical` to add a manager LLM that delegates tasks dynamically:

```python
from crewai import Crew, Process
from langchain_openai import ChatOpenAI

manager_llm = ChatOpenAI(model="gpt-4o", temperature=0)

hierarchical_crew = Crew(
    agents=[market_analyst, trader, risk_manager],
    tasks=[research_task, execution_task, risk_review_task],
    process=Process.hierarchical,
    manager_llm=manager_llm,
    verbose=True,
)
```

In hierarchical mode, the manager decides which specialist to invoke and in what order.

---

## Step 9 — WebSocket Streaming

Run a WebSocket client in a background thread and add a tool that reads from the price cache:

```python
# trading_crew/price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS

latest_prices: dict[str, str] = {}
_lock = threading.Lock()

ws = AgentExchangeWS(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)

@ws.on_ticker("BTCUSDT")
def _on_btc(msg: dict) -> None:
    with _lock:
        latest_prices["BTCUSDT"] = msg["data"]["price"]

@ws.on_ticker("ETHUSDT")
def _on_eth(msg: dict) -> None:
    with _lock:
        latest_prices["ETHUSDT"] = msg["data"]["price"]

@ws.on_order_update()
def _on_order(msg: dict) -> None:
    data = msg["data"]
    print(f"[WS] Order {data['order_id']} → {data['status']}")

def start_stream() -> threading.Thread:
    t = threading.Thread(target=ws.run_forever, daemon=True)
    t.start()
    return t
```

```python
# Add to trading_crew/tools.py
from crewai.tools import tool
from trading_crew.price_stream import latest_prices, _lock

@tool("get_streamed_price")
def get_streamed_price(symbol: str) -> str:
    """Get the latest price from the live WebSocket feed (zero HTTP latency).

    Falls back gracefully if the symbol has not yet been cached.

    Args:
        symbol: Uppercase trading pair, e.g. BTCUSDT.
    """
    sym = symbol.strip().upper()
    with _lock:
        price = latest_prices.get(sym)
    if not price:
        return f"No streamed price cached for {sym}. Use get_price instead."
    return json.dumps({"symbol": sym, "price": price, "source": "websocket"})
```

Start the stream before kicking off the crew:

```python
from trading_crew.price_stream import start_stream

start_stream()  # non-blocking daemon thread
result = trading_crew.kickoff()
```

---

## Error Handling Reference

| Error Code | Recommended Agent Behavior |
|------------|---------------------------|
| `INSUFFICIENT_BALANCE` | Call `get_balance`, recalculate quantity, retry |
| `RATE_LIMIT_EXCEEDED` | Stop calling APIs; wait for the rate-limit window to reset |
| `DAILY_LOSS_LIMIT` | Do not place more orders today; report in task output |
| `INVALID_SYMBOL` | Call `list_pairs` to find the correct symbol, retry |
| `INVALID_QUANTITY` | Reduce quantity; check minimum step size |
| `ORDER_REJECTED` | Check `list_open_orders` count (max 50) and position limits (max 25%) |
| `PRICE_NOT_AVAILABLE` | Retry after 2–3 seconds |
| `CONNECTION_ERROR` | Retry with exponential back-off: 1s → 2s → 4s |

---

## Project Structure

```
trading_crew/
├── client.py         # Shared AgentExchangeClient instance
├── tools.py          # @tool-decorated SDK wrappers
├── price_stream.py   # WebSocket background thread + price cache
├── agents.py         # Agent role definitions
├── tasks.py          # Task definitions
└── crew.py           # Crew assembly
main.py               # Entry point
.env                  # API credentials (never commit)
```

---

## Troubleshooting

**`ModuleNotFoundError: No module named 'agentexchange'`**

```bash
pip install -e sdk/
```

**Crew exceeds token limits on large `get_all_prices` responses**

Filter the response in the tool before returning:

```python
@tool("get_top_prices")
def get_top_prices(query: str = "all") -> str:
    """Get prices for the top 20 most liquid trading pairs only."""
    top_symbols = [
        "BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "ADAUSDT",
        "XRPUSDT", "DOGEUSDT", "AVAXUSDT", "DOTUSDT", "MATICUSDT",
        "LINKUSDT", "LTCUSDT", "UNIUSDT", "ATOMUSDT", "FILUSDT",
        "NEARUSDT", "APTUSDT", "ARBUSDT", "OPUSDT", "INJUSDT",
    ]
    try:
        prices = client.get_all_prices()
        return json.dumps([
            {"symbol": p.symbol, "price": str(p.price)}
            for p in prices if p.symbol in top_symbols
        ])
    except Exception as exc:
        return _err(exc)
```

---

## Next Steps

- [LangChain Guide](/docs/frameworks/langchain) — single-agent executor with typed tools
- [Agent Zero Guide](/docs/frameworks/agent-zero) — skill file integration
- [OpenClaw Guide](/docs/frameworks/openclaw) — `@openclaw.tool` SDK wrappers
- [SDK Reference](/docs/sdk/sync-client) — full method list
- [API Reference](/docs/api) — direct REST endpoint documentation
