CrewAI Integration
Build a multi-agent trading crew with analyst, trader, and risk manager roles
This guide shows you how to connect AgentExchange to a CrewAI multi-agent system in under 20 minutes. You will wrap each SDK method as a @tool-decorated function, wire them into a Crew with specialized Agent roles, and define Task objects that orchestrate a full trading strategy — all on simulated funds backed by real-time Binance data.
What You Get
After following this guide your CrewAI crew will be able to:
- Fetch live prices for any of 600+ Binance trading pairs
- Analyze 24-hour ticker stats and OHLCV candles
- Place, monitor, and cancel market / limit / stop-loss / take-profit orders
- Read account balances, open positions, and portfolio summary
- Pull performance analytics (Sharpe ratio, drawdown, win rate)
- Stream real-time prices and order notifications over WebSocket
All on simulated funds — real Binance prices, no real money at risk.
Prerequisites
| Requirement | Version |
|---|---|
| Python | 3.10+ |
crewai | latest (pip install crewai) |
crewai-tools | latest (pip install crewai-tools) |
agentexchange | pip install agentexchange or pip install -e sdk/ |
| AgentExchange server | running (see quickstart) |
docker compose up -d
curl http://localhost:8000/health
# {"status":"ok"}
Step 1 — Register an Account
curl -s -X POST http://localhost:8000/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{"display_name": "MyCrewBot", "starting_balance": "10000.00"}'
Save api_secret now — it is shown only once.
Store in .env (never commit):
AGENTEXCHANGE_API_KEY=ak_live_...
AGENTEXCHANGE_API_SECRET=sk_live_...
AGENTEXCHANGE_BASE_URL=http://localhost:8000
AGENTEXCHANGE_WS_URL=ws://localhost:8000
OPENAI_API_KEY=sk-...
Step 2 — Create the Shared SDK Client
# trading_crew/client.py
import os
from agentexchange import AgentExchangeClient
client = AgentExchangeClient(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)
AgentExchangeClient is thread-safe — share this single instance across all tools.
Step 3 — Define Tools with the @tool Decorator
CrewAI tools are Python functions decorated with @tool. The decorator's name and docstring become the description the LLM reads when deciding which tool to call.
# trading_crew/tools.py
import json
import os
from decimal import Decimal
from typing import Optional
from crewai.tools import tool
from agentexchange import AgentExchangeClient
from agentexchange.exceptions import AgentExchangeError, RateLimitError
client = AgentExchangeClient(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)
def _err(exc: Exception) -> str:
if isinstance(exc, RateLimitError):
return f"ERROR RateLimitError: {exc}. Wait until the rate-limit window resets, then retry."
if isinstance(exc, AgentExchangeError):
return f"ERROR {exc.code}: {exc}"
return f"ERROR unexpected: {exc}"
@tool("get_price")
def get_price(symbol: str) -> str:
"""Get the current live price for a single trading pair.
Args:
symbol: Uppercase trading pair, e.g. BTCUSDT or ETHUSDT.
"""
try:
price = client.get_price(symbol.strip().upper())
return json.dumps({
"symbol": price.symbol,
"price": str(price.price),
"timestamp": price.timestamp.isoformat(),
})
except Exception as exc:
return _err(exc)
@tool("get_all_prices")
def get_all_prices(query: str = "all") -> str:
"""Get current prices for all 600+ active trading pairs at once.
Use this instead of calling get_price in a loop when scanning multiple coins.
"""
try:
prices = client.get_all_prices()
return json.dumps([
{"symbol": p.symbol, "price": str(p.price)}
for p in prices
])
except Exception as exc:
return _err(exc)
@tool("get_ticker")
def get_ticker(symbol: str) -> str:
"""Get 24-hour ticker statistics for a trading pair.
Includes open, high, low, close, volume, and 24h change percentage.
Use this to assess momentum before entering a trade.
Args:
symbol: Uppercase trading pair, e.g. BTCUSDT.
"""
try:
t = client.get_ticker(symbol.strip().upper())
return json.dumps({
"symbol": t.symbol,
"open": str(t.open), "high": str(t.high),
"low": str(t.low), "close": str(t.close),
"volume": str(t.volume), "change_pct": str(t.change_pct),
})
except Exception as exc:
return _err(exc)
@tool("get_candles")
def get_candles(symbol: str, interval: str = "1h", limit: int = 24) -> str:
"""Get OHLCV candle history for technical analysis.
Args:
symbol: Uppercase trading pair, e.g. BTCUSDT.
interval: 1m, 5m, 15m, 1h, 4h, 1d. Default 1h.
limit: Number of candles (1-1000). Default 24.
"""
try:
candles = client.get_candles(
symbol=symbol.strip().upper(), interval=interval, limit=limit,
)
return json.dumps([
{
"open_time": c.open_time.isoformat(),
"open": str(c.open), "high": str(c.high),
"low": str(c.low), "close": str(c.close),
"volume": str(c.volume),
}
for c in candles
])
except Exception as exc:
return _err(exc)
@tool("place_order")
def place_order(
symbol: str, side: str, order_type: str, quantity: str,
price: Optional[str] = None, trigger_price: Optional[str] = None,
) -> str:
"""Place a crypto order on the exchange.
ALWAYS call get_balance and get_price BEFORE placing an order.
Args:
symbol: Uppercase trading pair, e.g. BTCUSDT.
side: "buy" or "sell".
order_type: "market", "limit", "stop_loss", or "take_profit".
quantity: Order size as decimal string, e.g. "0.01".
price: Required for limit orders.
trigger_price: Required for stop_loss and take_profit orders.
Examples:
Market buy: place_order("BTCUSDT", "buy", "market", "0.01")
Limit buy: place_order("BTCUSDT", "buy", "limit", "0.01", price="60000.00")
Stop-loss sell: place_order("BTCUSDT", "sell", "stop_loss", "0.01", trigger_price="58000.00")
"""
try:
kwargs: dict = {}
if price:
kwargs["price"] = Decimal(price)
if trigger_price:
kwargs["trigger_price"] = Decimal(trigger_price)
order = client.place_order(
symbol=symbol.strip().upper(), side=side,
order_type=order_type, quantity=Decimal(quantity), **kwargs,
)
return json.dumps({
"order_id": str(order.order_id),
"symbol": order.symbol, "side": order.side,
"status": order.status, "quantity": str(order.quantity),
"executed_price": str(order.executed_price) if order.executed_price else None,
"slippage_pct": str(order.slippage_pct) if order.slippage_pct else None,
})
except Exception as exc:
return _err(exc)
@tool("get_balance")
def get_balance(query: str = "all") -> str:
"""Get all asset balances — available (free to trade) and total.
ALWAYS call this before placing any order to confirm you have enough funds.
"""
try:
balance = client.get_balance()
return json.dumps({
"total_equity_usdt": str(balance.total_equity_usdt),
"balances": [
{"asset": b.asset, "available": str(b.available), "total": str(b.total)}
for b in balance.balances
],
})
except Exception as exc:
return _err(exc)
@tool("get_portfolio")
def get_portfolio(query: str = "all") -> str:
"""Get full portfolio summary including total equity, PnL, and ROI."""
try:
pf = client.get_portfolio()
return json.dumps({
"total_equity": str(pf.total_equity),
"starting_balance": str(pf.starting_balance),
"roi_pct": str(pf.roi_pct),
"unrealized_pnl": str(pf.unrealized_pnl),
"realized_pnl": str(pf.realized_pnl),
"available_cash": str(pf.available_cash),
"position_count": pf.position_count,
})
except Exception as exc:
return _err(exc)
@tool("get_positions")
def get_positions(query: str = "all") -> str:
"""Get all open positions with unrealized PnL.
No single position may exceed 25% of total equity.
"""
try:
positions = client.get_positions()
return json.dumps([
{
"symbol": p.symbol, "quantity": str(p.quantity),
"avg_entry_price": str(p.avg_entry_price),
"current_price": str(p.current_price),
"unrealized_pnl": str(p.unrealized_pnl),
"unrealized_pnl_pct": str(p.unrealized_pnl_pct),
}
for p in positions
])
except Exception as exc:
return _err(exc)
@tool("get_performance")
def get_performance(period: str = "all") -> str:
"""Get trading performance metrics for the given time period.
Args:
period: "1d", "7d", "30d", or "all". Default "all".
"""
try:
p = period.strip() or "all"
perf = client.get_performance(period=p)
return json.dumps({
"period": p,
"sharpe_ratio": str(perf.sharpe_ratio),
"win_rate": str(perf.win_rate),
"max_drawdown_pct": str(perf.max_drawdown_pct),
"total_trades": perf.total_trades,
"profit_factor": str(perf.profit_factor),
})
except Exception as exc:
return _err(exc)
AGENTEXCHANGE_TOOLS = [
get_price, get_all_prices, get_ticker, get_candles,
place_order, get_balance, get_positions, get_portfolio, get_performance,
]
Step 4 — Define Specialized Agents
CrewAI shines when you split responsibilities across agents with distinct roles. This crew mirrors a real trading desk: one analyst researches the market, one trader executes orders, and one risk manager monitors the portfolio.
# trading_crew/agents.py
import os
from crewai import Agent
from langchain_openai import ChatOpenAI
from trading_crew.tools import (
get_all_prices, get_ticker, get_candles,
get_price, get_balance, get_positions, place_order,
get_portfolio, get_performance,
)
llm = ChatOpenAI(model="gpt-4o", temperature=0, openai_api_key=os.environ["OPENAI_API_KEY"])
market_analyst = Agent(
role="Market Analyst",
goal=(
"Scan the market for the best trading opportunity right now. "
"Identify the single coin with the strongest momentum (highest 24h change_pct > 2% "
"combined with above-average volume). Provide a buy recommendation with a suggested "
"entry quantity (max $200 USDT value) and a stop-loss level 5% below entry."
),
backstory=(
"You are an experienced quant analyst specializing in momentum strategies. "
"You only recommend a trade when the data clearly supports it."
),
tools=[get_all_prices, get_ticker, get_candles],
llm=llm,
verbose=True,
)
trader = Agent(
role="Trader",
goal=(
"Execute the trade recommended by the Market Analyst. "
"Before every order: (1) call get_balance to confirm sufficient USDT, "
"(2) call get_price to verify current price, "
"(3) call get_positions to check you are not already in this symbol. "
"After a buy fills, immediately place a stop_loss at the recommended level."
),
backstory=(
"You are a disciplined execution trader. You follow instructions precisely "
"and always confirm account state before placing orders."
),
tools=[get_price, get_balance, get_positions, place_order],
llm=llm,
verbose=True,
)
risk_manager = Agent(
role="Risk Manager",
goal=(
"After each trading round, evaluate portfolio health. "
"Report: current equity, ROI, open positions with unrealized PnL, "
"7-day Sharpe ratio and max drawdown, and whether the daily loss "
"circuit breaker is close to triggering."
),
backstory=(
"You are a portfolio risk officer. You monitor positions and performance metrics "
"after every trade cycle. You never place orders — your job is oversight."
),
tools=[get_portfolio, get_positions, get_balance, get_performance],
llm=llm,
verbose=True,
)
Step 5 — Define Tasks
Each Task maps to one agent's deliverable. Tasks run sequentially; later tasks reference earlier outputs via context.
# trading_crew/tasks.py
from crewai import Task
from trading_crew.agents import market_analyst, trader, risk_manager
research_task = Task(
description=(
"Scan the market for the best momentum opportunity right now.\n"
"1. Call get_all_prices.\n"
"2. Call get_ticker for BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, ADAUSDT, XRPUSDT.\n"
"3. Filter to coins with change_pct > 2%.\n"
"4. For the top candidate, call get_candles with interval='1h' limit=24.\n"
"5. Output: recommended symbol, current price, suggested quantity "
"(max $200 USDT), and stop-loss trigger (5% below entry)."
),
expected_output=(
"A JSON-style recommendation with: symbol, current_price, "
"suggested_quantity, stop_loss_trigger_price, and rationale."
),
agent=market_analyst,
)
execution_task = Task(
description=(
"Execute the trade recommended in the research task.\n"
"1. Read symbol, quantity, and stop-loss from the research output.\n"
"2. Call get_balance — confirm available USDT >= (quantity × current_price × 1.002).\n"
"3. Call get_positions — confirm no existing position in this symbol.\n"
"4. Call get_price for the latest price.\n"
"5. Place a market buy order.\n"
"6. Once filled, place a stop_loss sell at the recommended trigger price.\n"
"7. Report both order IDs, executed buy price, and stop-loss trigger."
),
expected_output=(
"Summary with: buy_order_id, executed_buy_price, quantity, "
"stop_loss_order_id, stop_loss_trigger_price, remaining_usdt_balance."
),
agent=trader,
context=[research_task],
)
risk_review_task = Task(
description=(
"Review portfolio health after the executed trades.\n"
"1. Call get_portfolio for total equity, ROI, and cash.\n"
"2. Call get_positions for all open positions with unrealized PnL.\n"
"3. Call get_performance with period='7d'.\n"
"4. Flag any position whose value exceeds 20% of total equity.\n"
"5. Flag if 7-day max drawdown exceeds 10%.\n"
"6. Output a risk summary report."
),
expected_output=(
"Structured risk report with: total_equity, roi_pct, open_positions, "
"sharpe_ratio_7d, max_drawdown_7d, and any risk flags triggered."
),
agent=risk_manager,
context=[execution_task],
)
Step 6 — Assemble and Run the Crew
# trading_crew/crew.py
from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task
trading_crew = Crew(
agents=[market_analyst, trader, risk_manager],
tasks=[research_task, execution_task, risk_review_task],
process=Process.sequential,
verbose=True,
)
# main.py
from dotenv import load_dotenv
load_dotenv()
from trading_crew.crew import trading_crew
result = trading_crew.kickoff()
print(result)
python main.py
The crew runs in order:
- Analyst scans 600+ prices, picks the best momentum coin, outputs a recommendation.
- Trader confirms balance, checks positions, executes the market buy, places a stop-loss.
- Risk Manager reviews the new portfolio state and flags any violations.
Step 7 — Autonomous Strategy Loop
import time
from dotenv import load_dotenv
load_dotenv()
from crewai import Crew, Process
from trading_crew.agents import market_analyst, trader, risk_manager
from trading_crew.tasks import research_task, execution_task, risk_review_task
INTERVAL_SECONDS = 300 # run every 5 minutes
def run_once() -> str:
# Create a new Crew instance each cycle so tasks start fresh
crew = Crew(
agents=[market_analyst, trader, risk_manager],
tasks=[research_task, execution_task, risk_review_task],
process=Process.sequential,
verbose=False,
)
return str(crew.kickoff())
if __name__ == "__main__":
cycle = 0
while True:
cycle += 1
print(f"\n{'='*60}\nCycle {cycle} — running strategy...\n{'='*60}")
print(run_once())
print(f"\nSleeping {INTERVAL_SECONDS}s...")
time.sleep(INTERVAL_SECONDS)
Create a new Crew instance on each iteration rather than reusing the same one. This ensures tasks start fresh without stale context from previous cycles.
Step 8 — Hierarchical Crew with a Manager
For more complex setups, use Process.hierarchical to add a manager LLM that delegates tasks dynamically:
from crewai import Crew, Process
from langchain_openai import ChatOpenAI
manager_llm = ChatOpenAI(model="gpt-4o", temperature=0)
hierarchical_crew = Crew(
agents=[market_analyst, trader, risk_manager],
tasks=[research_task, execution_task, risk_review_task],
process=Process.hierarchical,
manager_llm=manager_llm,
verbose=True,
)
In hierarchical mode, the manager decides which specialist to invoke and in what order.
Step 9 — WebSocket Streaming
Run a WebSocket client in a background thread and add a tool that reads from the price cache:
# trading_crew/price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS
latest_prices: dict[str, str] = {}
_lock = threading.Lock()
ws = AgentExchangeWS(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)
@ws.on_ticker("BTCUSDT")
def _on_btc(msg: dict) -> None:
with _lock:
latest_prices["BTCUSDT"] = msg["data"]["price"]
@ws.on_ticker("ETHUSDT")
def _on_eth(msg: dict) -> None:
with _lock:
latest_prices["ETHUSDT"] = msg["data"]["price"]
@ws.on_order_update()
def _on_order(msg: dict) -> None:
data = msg["data"]
print(f"[WS] Order {data['order_id']} → {data['status']}")
def start_stream() -> threading.Thread:
t = threading.Thread(target=ws.run_forever, daemon=True)
t.start()
return t
# Add to trading_crew/tools.py
from crewai.tools import tool
from trading_crew.price_stream import latest_prices, _lock
@tool("get_streamed_price")
def get_streamed_price(symbol: str) -> str:
"""Get the latest price from the live WebSocket feed (zero HTTP latency).
Falls back gracefully if the symbol has not yet been cached.
Args:
symbol: Uppercase trading pair, e.g. BTCUSDT.
"""
sym = symbol.strip().upper()
with _lock:
price = latest_prices.get(sym)
if not price:
return f"No streamed price cached for {sym}. Use get_price instead."
return json.dumps({"symbol": sym, "price": price, "source": "websocket"})
Start the stream before kicking off the crew:
from trading_crew.price_stream import start_stream
start_stream() # non-blocking daemon thread
result = trading_crew.kickoff()
Error Handling Reference
| Error Code | Recommended Agent Behavior |
|---|---|
INSUFFICIENT_BALANCE | Call get_balance, recalculate quantity, retry |
RATE_LIMIT_EXCEEDED | Stop calling APIs; wait for the rate-limit window to reset |
DAILY_LOSS_LIMIT | Do not place more orders today; report in task output |
INVALID_SYMBOL | Call list_pairs to find the correct symbol, retry |
INVALID_QUANTITY | Reduce quantity; check minimum step size |
ORDER_REJECTED | Check list_open_orders count (max 50) and position limits (max 25%) |
PRICE_NOT_AVAILABLE | Retry after 2–3 seconds |
CONNECTION_ERROR | Retry with exponential back-off: 1s → 2s → 4s |
Project Structure
trading_crew/
├── client.py # Shared AgentExchangeClient instance
├── tools.py # @tool-decorated SDK wrappers
├── price_stream.py # WebSocket background thread + price cache
├── agents.py # Agent role definitions
├── tasks.py # Task definitions
└── crew.py # Crew assembly
main.py # Entry point
.env # API credentials (never commit)
Troubleshooting
ModuleNotFoundError: No module named 'agentexchange'
pip install -e sdk/
Crew exceeds token limits on large get_all_prices responses
Filter the response in the tool before returning:
@tool("get_top_prices")
def get_top_prices(query: str = "all") -> str:
"""Get prices for the top 20 most liquid trading pairs only."""
top_symbols = [
"BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "ADAUSDT",
"XRPUSDT", "DOGEUSDT", "AVAXUSDT", "DOTUSDT", "MATICUSDT",
"LINKUSDT", "LTCUSDT", "UNIUSDT", "ATOMUSDT", "FILUSDT",
"NEARUSDT", "APTUSDT", "ARBUSDT", "OPUSDT", "INJUSDT",
]
try:
prices = client.get_all_prices()
return json.dumps([
{"symbol": p.symbol, "price": str(p.price)}
for p in prices if p.symbol in top_symbols
])
except Exception as exc:
return _err(exc)
Next Steps
- LangChain Guide — single-agent executor with typed tools
- Agent Zero Guide — skill file integration
- OpenClaw Guide —
@openclaw.toolSDK wrappers - SDK Reference — full method list
- API Reference — direct REST endpoint documentation