LangChain Integration
Connect AgentExchange to a LangChain agent with typed tools, a ReAct executor, and WebSocket streaming
This guide shows you how to connect AgentExchange to a LangChain agent in under 15 minutes. You will wrap each SDK method as a typed LangChain Tool, wire them into an AgentExecutor, and optionally stream live prices over WebSocket into a shared price cache the agent can query without extra HTTP calls.
What You Get
After following this guide your LangChain agent will be able to:
- Fetch live prices for any of 600+ Binance trading pairs
- Place, monitor, and cancel market / limit / stop-loss / take-profit orders
- Read account balances, open positions, and portfolio summary
- Pull performance analytics (Sharpe ratio, drawdown, win rate)
- Stream real-time prices and order notifications over WebSocket
- Reset its trading session to restart a strategy cleanly
All on simulated funds backed by real Binance market data.
Prerequisites
| Requirement | Version |
|---|---|
| Python | 3.10+ |
langchain | latest (pip install langchain) |
langchain-openai or langchain-anthropic | latest |
agentexchange | pip install agentexchange or pip install -e sdk/ |
| AgentExchange server | running (see quickstart) |
Start the platform if you haven't already:
docker compose up -d
curl http://localhost:8000/health
# {"status":"ok"}
Step 1 — Register an Account
curl -s -X POST http://localhost:8000/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{"display_name": "MyLangChainBot", "starting_balance": "10000.00"}'
{
"account_id": "a1b2c3d4-...",
"api_key": "ak_live_...",
"api_secret": "sk_live_...",
"starting_balance": "10000.00"
}
Save api_secret now — it is shown only once.
Store credentials in .env (never commit):
AGENTEXCHANGE_API_KEY=ak_live_...
AGENTEXCHANGE_API_SECRET=sk_live_...
AGENTEXCHANGE_BASE_URL=http://localhost:8000
AGENTEXCHANGE_WS_URL=ws://localhost:8000
OPENAI_API_KEY=sk-...
Step 2 — Create the SDK Client
# trading_agent/client.py
import os
from agentexchange import AgentExchangeClient
client = AgentExchangeClient(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)
AgentExchangeClient is thread-safe — share this single instance across all your tools.
Step 3 — Wrap SDK Methods as LangChain Tools
LangChain tools accept a single str argument (the LLM's input) and return a str. For tools that need multiple parameters, accept a JSON string.
# trading_agent/tools.py
import json
import os
from decimal import Decimal
from typing import Any
from langchain.tools import Tool
from agentexchange import AgentExchangeClient
from agentexchange.exceptions import AgentExchangeError, RateLimitError
client = AgentExchangeClient(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)
def _safe(fn):
"""Wrap a tool function so exceptions become LLM-readable error strings."""
def wrapper(raw: str) -> str:
try:
return fn(raw)
except RateLimitError as exc:
return f"ERROR RateLimitError: {exc}. Wait until the rate-limit window resets, then retry."
except AgentExchangeError as exc:
return f"ERROR {exc.code}: {exc}"
except Exception as exc:
return f"ERROR unexpected: {exc}"
wrapper.__name__ = fn.__name__
return wrapper
# ---------------------------------------------------------------------------
# Market data tools
# ---------------------------------------------------------------------------
@_safe
def get_price(symbol: str) -> str:
price = client.get_price(symbol.strip().upper())
return json.dumps({
"symbol": price.symbol,
"price": str(price.price),
"timestamp": price.timestamp.isoformat(),
})
@_safe
def get_all_prices(_: str) -> str:
prices = client.get_all_prices()
return json.dumps([
{"symbol": p.symbol, "price": str(p.price)}
for p in prices
])
@_safe
def get_ticker(symbol: str) -> str:
t = client.get_ticker(symbol.strip().upper())
return json.dumps({
"symbol": t.symbol,
"open": str(t.open), "high": str(t.high),
"low": str(t.low), "close": str(t.close),
"volume": str(t.volume), "change_pct": str(t.change_pct),
})
@_safe
def get_candles(raw: str) -> str:
"""Input: JSON string with keys symbol (required), interval (default 1m), limit (default 100)."""
params: dict[str, Any] = json.loads(raw)
candles = client.get_candles(
symbol=params["symbol"].upper(),
interval=params.get("interval", "1m"),
limit=int(params.get("limit", 100)),
)
return json.dumps([
{
"open_time": c.open_time.isoformat(),
"open": str(c.open), "high": str(c.high),
"low": str(c.low), "close": str(c.close),
"volume": str(c.volume),
}
for c in candles
])
# ---------------------------------------------------------------------------
# Trading tools
# ---------------------------------------------------------------------------
@_safe
def place_order(raw: str) -> str:
"""Input: JSON with symbol, side, order_type, quantity, price?, trigger_price?"""
params: dict[str, Any] = json.loads(raw)
kwargs: dict[str, Any] = {}
if params.get("price"):
kwargs["price"] = Decimal(params["price"])
if params.get("trigger_price"):
kwargs["trigger_price"] = Decimal(params["trigger_price"])
order = client.place_order(
symbol=params["symbol"].upper(),
side=params["side"],
order_type=params["order_type"],
quantity=Decimal(params["quantity"]),
**kwargs,
)
return json.dumps({
"order_id": str(order.order_id),
"symbol": order.symbol,
"side": order.side,
"status": order.status,
"quantity": str(order.quantity),
"executed_price": str(order.executed_price) if order.executed_price else None,
"slippage_pct": str(order.slippage_pct) if order.slippage_pct else None,
})
@_safe
def cancel_order(order_id: str) -> str:
client.cancel_order(order_id.strip())
return json.dumps({"cancelled": True, "order_id": order_id.strip()})
# ---------------------------------------------------------------------------
# Account tools
# ---------------------------------------------------------------------------
@_safe
def get_balance(_: str) -> str:
balance = client.get_balance()
return json.dumps({
"total_equity_usdt": str(balance.total_equity_usdt),
"balances": [
{"asset": b.asset, "available": str(b.available), "total": str(b.total)}
for b in balance.balances
],
})
@_safe
def get_portfolio(_: str) -> str:
pf = client.get_portfolio()
return json.dumps({
"total_equity": str(pf.total_equity),
"roi_pct": str(pf.roi_pct),
"unrealized_pnl": str(pf.unrealized_pnl),
"realized_pnl": str(pf.realized_pnl),
"available_cash": str(pf.available_cash),
"position_count": pf.position_count,
})
@_safe
def get_performance(period: str) -> str:
p = period.strip() or "all"
perf = client.get_performance(period=p)
return json.dumps({
"period": p,
"sharpe_ratio": str(perf.sharpe_ratio),
"win_rate": str(perf.win_rate),
"max_drawdown_pct": str(perf.max_drawdown_pct),
"total_trades": perf.total_trades,
"profit_factor": str(perf.profit_factor),
})
# ---------------------------------------------------------------------------
# Tool list — pass to AgentExecutor
# ---------------------------------------------------------------------------
AGENTEXCHANGE_TOOLS: list[Tool] = [
Tool(name="get_price", func=get_price, description="Get the current price of a trading pair. Input: symbol string, e.g. 'BTCUSDT'."),
Tool(name="get_all_prices", func=get_all_prices, description="Get prices for all 600+ active trading pairs. Input is ignored — pass 'all'."),
Tool(name="get_ticker", func=get_ticker, description="Get 24h ticker stats (high, low, volume, change_pct). Input: symbol string."),
Tool(name="get_candles", func=get_candles, description="Get OHLCV candles. Input: JSON {symbol, interval, limit}. Intervals: 1m, 5m, 15m, 1h, 4h, 1d."),
Tool(name="place_order", func=place_order, description="Place an order. Input: JSON {symbol, side (buy/sell), order_type (market/limit/stop_loss/take_profit), quantity, price?, trigger_price?}."),
Tool(name="cancel_order", func=cancel_order, description="Cancel an open order. Input: order_id UUID string."),
Tool(name="get_balance", func=get_balance, description="Get all asset balances. Input is ignored."),
Tool(name="get_portfolio", func=get_portfolio, description="Get full portfolio summary: equity, ROI, PnL, cash. Input is ignored."),
Tool(name="get_performance",func=get_performance,description="Get performance analytics (Sharpe, win rate, drawdown). Input: period — '1d', '7d', '30d', or 'all'."),
]
Step 4 — Build the AgentExecutor
# trading_agent/agent.py
import os
from dotenv import load_dotenv
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from trading_agent.tools import AGENTEXCHANGE_TOOLS
load_dotenv()
llm = ChatOpenAI(model="gpt-4o", temperature=0)
SYSTEM_PROMPT = PromptTemplate.from_template(
"""You are a crypto trading agent operating on the AgentExchange simulated exchange.
All funds are virtual — prices are real Binance data, but no real money is at risk.
Guidelines:
- ALWAYS call get_balance before placing any order.
- ALWAYS call get_price before placing any order to verify the current price.
- After opening a position, place a stop_loss order immediately.
- Use get_all_prices (one call) instead of looping get_price when scanning many coins.
- Quantities and prices must be decimal strings in JSON inputs, e.g. "0.01" not 0.01.
- If you receive an ERROR string from a tool, read the error code and follow its guidance:
- INSUFFICIENT_BALANCE → reduce quantity or check balance first
- RATE_LIMIT_EXCEEDED → wait before retrying
- DAILY_LOSS_LIMIT → do not place more orders today
- INVALID_SYMBOL → call list_pairs to find the correct symbol
- PRICE_NOT_AVAILABLE → retry after a few seconds
You have access to these tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}"""
)
agent = create_react_agent(llm=llm, tools=AGENTEXCHANGE_TOOLS, prompt=SYSTEM_PROMPT)
executor = AgentExecutor(
agent=agent,
tools=AGENTEXCHANGE_TOOLS,
verbose=True,
max_iterations=15,
handle_parsing_errors=True,
)
Step 5 — Run the Agent
from trading_agent.agent import executor
result = executor.invoke({
"input": (
"Check my balance. "
"Find the coin with the highest 24-hour price change. "
"Buy $200 worth at market price. "
"Then place a stop-loss 5% below the entry price."
)
})
print(result["output"])import time
from trading_agent.agent import executor
tasks = [
"What is my current portfolio value and ROI?",
"Which of my open positions is performing best? Should I take profit?",
"Scan all prices. Pick the strongest momentum coin (highest 24h change > 3%). "
"Buy $150 if I have enough balance and no existing position in that coin.",
]
for task in tasks:
print(f"\n>>> {task}")
result = executor.invoke({"input": task})
print(result["output"])
time.sleep(2)import time
from trading_agent.agent import executor
STRATEGY_PROMPT = """
You are running a momentum strategy. Do the following in order:
1. Call get_all_prices to get all current prices.
2. Call get_ticker for each of BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, ADAUSDT.
3. Identify any coin with 24h change_pct > 2%.
4. For each qualifying coin:
a. Check if I already have an open position.
b. If not, and if my available USDT balance > $200, buy $200 worth at market.
c. Immediately place a stop_loss at -5% of the executed price.
5. Report a brief summary of what you did.
"""
while True:
result = executor.invoke({"input": STRATEGY_PROMPT})
print(result["output"])
print("--- sleeping 5 min ---")
time.sleep(300)Step 6 — Structured Tool Inputs with StructuredTool
For complex tools, StructuredTool provides Pydantic validation and cleaner LLM prompting:
from decimal import Decimal
from typing import Literal, Optional
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from trading_agent.client import client
from agentexchange.exceptions import AgentExchangeError
class PlaceOrderInput(BaseModel):
symbol: str = Field(..., description="Trading pair, e.g. 'BTCUSDT'")
side: Literal["buy", "sell"] = Field(..., description="Direction of the order")
order_type: Literal["market", "limit", "stop_loss", "take_profit"] = Field(...)
quantity: str = Field(..., description="Quantity as decimal string, e.g. '0.01'")
price: Optional[str] = Field(None, description="Limit price (required for limit orders)")
trigger_price: Optional[str] = Field(None, description="Trigger price for stop/TP orders")
def _place_order_structured(
symbol: str, side: str, order_type: str, quantity: str,
price: Optional[str] = None, trigger_price: Optional[str] = None,
) -> str:
try:
kwargs = {}
if price:
kwargs["price"] = Decimal(price)
if trigger_price:
kwargs["trigger_price"] = Decimal(trigger_price)
order = client.place_order(
symbol=symbol.upper(), side=side, order_type=order_type,
quantity=Decimal(quantity), **kwargs,
)
return (
f"Order placed: {order.order_id} | {order.side} {order.quantity} {order.symbol} "
f"@ {order.executed_price or 'pending'} | status={order.status}"
)
except AgentExchangeError as exc:
return f"ERROR {exc.code}: {exc}"
place_order_structured = StructuredTool(
name="place_order",
func=_place_order_structured,
args_schema=PlaceOrderInput,
description="Place a crypto order on AgentExchange. Types: market, limit, stop_loss, take_profit.",
)
Replace the Tool(name="place_order", ...) entry in AGENTEXCHANGE_TOOLS with place_order_structured for automatic schema validation.
Step 7 — WebSocket Streaming
Run a WebSocket client in a background thread so the agent reads from a zero-latency price cache instead of calling get_price over HTTP:
# trading_agent/price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS
latest_prices: dict[str, str] = {}
_lock = threading.Lock()
ws = AgentExchangeWS(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)
@ws.on_ticker("BTCUSDT")
def _on_btc(msg):
with _lock:
latest_prices["BTCUSDT"] = msg["data"]["price"]
@ws.on_ticker("ETHUSDT")
def _on_eth(msg):
with _lock:
latest_prices["ETHUSDT"] = msg["data"]["price"]
@ws.on_order_update()
def _on_order(msg):
data = msg["data"]
print(f"[WS] Order {data['order_id']} → {data['status']} @ {data.get('executed_price', 'N/A')}")
def start_stream():
t = threading.Thread(target=ws.run_forever, daemon=True)
t.start()
return t
Add a tool that reads from the cache:
from langchain.tools import Tool
from trading_agent.price_stream import latest_prices, _lock
import json
def get_streamed_price(symbol: str) -> str:
with _lock:
price = latest_prices.get(symbol.strip().upper())
if not price:
return (
f"No streamed price cached for {symbol}. "
"Use get_price instead, or wait a moment for the WS feed to populate."
)
return json.dumps({"symbol": symbol.upper(), "price": price, "source": "websocket"})
streamed_price_tool = Tool(
name="get_streamed_price",
func=get_streamed_price,
description="Get the latest price from the live WS feed (zero HTTP latency). Input: symbol string.",
)
Start the stream before the executor:
from trading_agent.price_stream import start_stream
start_stream() # non-blocking; runs in background thread
Step 8 — Async Agent
For production systems running many concurrent agents:
import asyncio
import os
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from agentexchange import AsyncAgentExchangeClient
from trading_agent.tools import SYSTEM_PROMPT
async_client = AsyncAgentExchangeClient(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
)
async def run_agent(task: str) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_react_agent(llm=llm, tools=AGENTEXCHANGE_TOOLS, prompt=SYSTEM_PROMPT)
executor = AgentExecutor(agent=agent, tools=AGENTEXCHANGE_TOOLS, max_iterations=15)
result = await executor.ainvoke({"input": task})
return result["output"]
async def run_concurrent_agents():
tasks = [
"What is the current BTC price?",
"What is my portfolio value?",
"List my open orders.",
]
results = await asyncio.gather(*[run_agent(t) for t in tasks])
for task, result in zip(tasks, results):
print(f"Task: {task}\nAnswer: {result}\n")
asyncio.run(run_concurrent_agents())
Error Handling Reference
All tool functions return an ERROR <code>: <message> string when an exception is raised. The LLM reads this and should respond accordingly:
| Error Code | Recommended Agent Behavior |
|---|---|
INSUFFICIENT_BALANCE | Call get_balance, recalculate quantity, retry |
RATE_LIMIT_EXCEEDED | Wait for rate-limit window to reset before retrying |
DAILY_LOSS_LIMIT | Do not place more orders today; report the situation |
INVALID_SYMBOL | Call list_pairs to find the correct symbol, then retry |
INVALID_QUANTITY | Reduce quantity; ensure it meets the pair's minimum |
PRICE_NOT_AVAILABLE | Retry after 2–3 seconds |
CONNECTION_ERROR | Retry with exponential back-off: 1s, 2s, 4s |
INTERNAL_ERROR | Retry with exponential back-off; escalate if persistent |
Troubleshooting
ModuleNotFoundError: No module named 'agentexchange'
pip install -e sdk/
Agent says "I cannot access the API"
- Verify the backend is running:
curl http://localhost:8000/health - Confirm
AGENTEXCHANGE_API_KEYandAGENTEXCHANGE_BASE_URLare set - Ensure no trailing slash on
AGENTEXCHANGE_BASE_URL
PRICE_NOT_AVAILABLE on startup
The Binance WebSocket ingestion service needs ~30 seconds to populate all prices after a cold start. Wait and retry.
Agent constructs malformed place_order JSON
Add an explicit reminder in the system prompt: "All quantity and price fields must be decimal strings in quotes, e.g. "0.01", never bare numbers." Or switch to StructuredTool + Pydantic (Step 6).
Agent exceeds max_iterations
Increase max_iterations on AgentExecutor (default 15 may be low for complex multi-step tasks), or break the task into smaller subtasks.
Next Steps
- CrewAI Guide — multi-agent crew with analyst, trader, and risk manager
- Agent Zero Guide — skill file integration
- OpenClaw Guide —
@openclaw.toolSDK wrappers - SDK Reference — full method list
- API Reference — direct REST endpoint documentation