TradeReady.io
Framework Guides

LangChain Integration

Connect AgentExchange to a LangChain agent with typed tools, a ReAct executor, and WebSocket streaming

Download .md

This guide shows you how to connect AgentExchange to a LangChain agent in under 15 minutes. You will wrap each SDK method as a typed LangChain Tool, wire them into an AgentExecutor, and optionally stream live prices over WebSocket into a shared price cache the agent can query without extra HTTP calls.

What You Get

After following this guide your LangChain agent will be able to:

  • Fetch live prices for any of 600+ Binance trading pairs
  • Place, monitor, and cancel market / limit / stop-loss / take-profit orders
  • Read account balances, open positions, and portfolio summary
  • Pull performance analytics (Sharpe ratio, drawdown, win rate)
  • Stream real-time prices and order notifications over WebSocket
  • Reset its trading session to restart a strategy cleanly

All on simulated funds backed by real Binance market data.

Prerequisites

RequirementVersion
Python3.10+
langchainlatest (pip install langchain)
langchain-openai or langchain-anthropiclatest
agentexchangepip install agentexchange or pip install -e sdk/
AgentExchange serverrunning (see quickstart)

Start the platform if you haven't already:

docker compose up -d
curl http://localhost:8000/health
# {"status":"ok"}

Step 1 — Register an Account

curl -s -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{"display_name": "MyLangChainBot", "starting_balance": "10000.00"}'
{
  "account_id": "a1b2c3d4-...",
  "api_key": "ak_live_...",
  "api_secret": "sk_live_...",
  "starting_balance": "10000.00"
}

Save api_secret now — it is shown only once.

Store credentials in .env (never commit):

AGENTEXCHANGE_API_KEY=ak_live_...
AGENTEXCHANGE_API_SECRET=sk_live_...
AGENTEXCHANGE_BASE_URL=http://localhost:8000
AGENTEXCHANGE_WS_URL=ws://localhost:8000
OPENAI_API_KEY=sk-...

Step 2 — Create the SDK Client

# trading_agent/client.py
import os
from agentexchange import AgentExchangeClient

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)

AgentExchangeClient is thread-safe — share this single instance across all your tools.


Step 3 — Wrap SDK Methods as LangChain Tools

LangChain tools accept a single str argument (the LLM's input) and return a str. For tools that need multiple parameters, accept a JSON string.

# trading_agent/tools.py
import json
import os
from decimal import Decimal
from typing import Any

from langchain.tools import Tool

from agentexchange import AgentExchangeClient
from agentexchange.exceptions import AgentExchangeError, RateLimitError

client = AgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
    base_url=os.environ.get("AGENTEXCHANGE_BASE_URL", "http://localhost:8000"),
)


def _safe(fn):
    """Wrap a tool function so exceptions become LLM-readable error strings."""
    def wrapper(raw: str) -> str:
        try:
            return fn(raw)
        except RateLimitError as exc:
            return f"ERROR RateLimitError: {exc}. Wait until the rate-limit window resets, then retry."
        except AgentExchangeError as exc:
            return f"ERROR {exc.code}: {exc}"
        except Exception as exc:
            return f"ERROR unexpected: {exc}"
    wrapper.__name__ = fn.__name__
    return wrapper


# ---------------------------------------------------------------------------
# Market data tools
# ---------------------------------------------------------------------------

@_safe
def get_price(symbol: str) -> str:
    price = client.get_price(symbol.strip().upper())
    return json.dumps({
        "symbol": price.symbol,
        "price": str(price.price),
        "timestamp": price.timestamp.isoformat(),
    })


@_safe
def get_all_prices(_: str) -> str:
    prices = client.get_all_prices()
    return json.dumps([
        {"symbol": p.symbol, "price": str(p.price)}
        for p in prices
    ])


@_safe
def get_ticker(symbol: str) -> str:
    t = client.get_ticker(symbol.strip().upper())
    return json.dumps({
        "symbol": t.symbol,
        "open": str(t.open), "high": str(t.high),
        "low": str(t.low), "close": str(t.close),
        "volume": str(t.volume), "change_pct": str(t.change_pct),
    })


@_safe
def get_candles(raw: str) -> str:
    """Input: JSON string with keys symbol (required), interval (default 1m), limit (default 100)."""
    params: dict[str, Any] = json.loads(raw)
    candles = client.get_candles(
        symbol=params["symbol"].upper(),
        interval=params.get("interval", "1m"),
        limit=int(params.get("limit", 100)),
    )
    return json.dumps([
        {
            "open_time": c.open_time.isoformat(),
            "open": str(c.open), "high": str(c.high),
            "low": str(c.low), "close": str(c.close),
            "volume": str(c.volume),
        }
        for c in candles
    ])


# ---------------------------------------------------------------------------
# Trading tools
# ---------------------------------------------------------------------------

@_safe
def place_order(raw: str) -> str:
    """Input: JSON with symbol, side, order_type, quantity, price?, trigger_price?"""
    params: dict[str, Any] = json.loads(raw)
    kwargs: dict[str, Any] = {}
    if params.get("price"):
        kwargs["price"] = Decimal(params["price"])
    if params.get("trigger_price"):
        kwargs["trigger_price"] = Decimal(params["trigger_price"])

    order = client.place_order(
        symbol=params["symbol"].upper(),
        side=params["side"],
        order_type=params["order_type"],
        quantity=Decimal(params["quantity"]),
        **kwargs,
    )
    return json.dumps({
        "order_id": str(order.order_id),
        "symbol": order.symbol,
        "side": order.side,
        "status": order.status,
        "quantity": str(order.quantity),
        "executed_price": str(order.executed_price) if order.executed_price else None,
        "slippage_pct": str(order.slippage_pct) if order.slippage_pct else None,
    })


@_safe
def cancel_order(order_id: str) -> str:
    client.cancel_order(order_id.strip())
    return json.dumps({"cancelled": True, "order_id": order_id.strip()})


# ---------------------------------------------------------------------------
# Account tools
# ---------------------------------------------------------------------------

@_safe
def get_balance(_: str) -> str:
    balance = client.get_balance()
    return json.dumps({
        "total_equity_usdt": str(balance.total_equity_usdt),
        "balances": [
            {"asset": b.asset, "available": str(b.available), "total": str(b.total)}
            for b in balance.balances
        ],
    })


@_safe
def get_portfolio(_: str) -> str:
    pf = client.get_portfolio()
    return json.dumps({
        "total_equity": str(pf.total_equity),
        "roi_pct": str(pf.roi_pct),
        "unrealized_pnl": str(pf.unrealized_pnl),
        "realized_pnl": str(pf.realized_pnl),
        "available_cash": str(pf.available_cash),
        "position_count": pf.position_count,
    })


@_safe
def get_performance(period: str) -> str:
    p = period.strip() or "all"
    perf = client.get_performance(period=p)
    return json.dumps({
        "period": p,
        "sharpe_ratio": str(perf.sharpe_ratio),
        "win_rate": str(perf.win_rate),
        "max_drawdown_pct": str(perf.max_drawdown_pct),
        "total_trades": perf.total_trades,
        "profit_factor": str(perf.profit_factor),
    })


# ---------------------------------------------------------------------------
# Tool list — pass to AgentExecutor
# ---------------------------------------------------------------------------

AGENTEXCHANGE_TOOLS: list[Tool] = [
    Tool(name="get_price",      func=get_price,      description="Get the current price of a trading pair. Input: symbol string, e.g. 'BTCUSDT'."),
    Tool(name="get_all_prices", func=get_all_prices, description="Get prices for all 600+ active trading pairs. Input is ignored — pass 'all'."),
    Tool(name="get_ticker",     func=get_ticker,     description="Get 24h ticker stats (high, low, volume, change_pct). Input: symbol string."),
    Tool(name="get_candles",    func=get_candles,    description="Get OHLCV candles. Input: JSON {symbol, interval, limit}. Intervals: 1m, 5m, 15m, 1h, 4h, 1d."),
    Tool(name="place_order",    func=place_order,    description="Place an order. Input: JSON {symbol, side (buy/sell), order_type (market/limit/stop_loss/take_profit), quantity, price?, trigger_price?}."),
    Tool(name="cancel_order",   func=cancel_order,   description="Cancel an open order. Input: order_id UUID string."),
    Tool(name="get_balance",    func=get_balance,    description="Get all asset balances. Input is ignored."),
    Tool(name="get_portfolio",  func=get_portfolio,  description="Get full portfolio summary: equity, ROI, PnL, cash. Input is ignored."),
    Tool(name="get_performance",func=get_performance,description="Get performance analytics (Sharpe, win rate, drawdown). Input: period — '1d', '7d', '30d', or 'all'."),
]

Step 4 — Build the AgentExecutor

# trading_agent/agent.py
import os
from dotenv import load_dotenv

from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI

from trading_agent.tools import AGENTEXCHANGE_TOOLS

load_dotenv()

llm = ChatOpenAI(model="gpt-4o", temperature=0)

SYSTEM_PROMPT = PromptTemplate.from_template(
    """You are a crypto trading agent operating on the AgentExchange simulated exchange.
All funds are virtual — prices are real Binance data, but no real money is at risk.

Guidelines:
- ALWAYS call get_balance before placing any order.
- ALWAYS call get_price before placing any order to verify the current price.
- After opening a position, place a stop_loss order immediately.
- Use get_all_prices (one call) instead of looping get_price when scanning many coins.
- Quantities and prices must be decimal strings in JSON inputs, e.g. "0.01" not 0.01.
- If you receive an ERROR string from a tool, read the error code and follow its guidance:
    - INSUFFICIENT_BALANCE → reduce quantity or check balance first
    - RATE_LIMIT_EXCEEDED  → wait before retrying
    - DAILY_LOSS_LIMIT     → do not place more orders today
    - INVALID_SYMBOL       → call list_pairs to find the correct symbol
    - PRICE_NOT_AVAILABLE  → retry after a few seconds

You have access to these tools:
{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}"""
)

agent = create_react_agent(llm=llm, tools=AGENTEXCHANGE_TOOLS, prompt=SYSTEM_PROMPT)

executor = AgentExecutor(
    agent=agent,
    tools=AGENTEXCHANGE_TOOLS,
    verbose=True,
    max_iterations=15,
    handle_parsing_errors=True,
)

Step 5 — Run the Agent

from trading_agent.agent import executor

result = executor.invoke({
    "input": (
        "Check my balance. "
        "Find the coin with the highest 24-hour price change. "
        "Buy $200 worth at market price. "
        "Then place a stop-loss 5% below the entry price."
    )
})
print(result["output"])
import time
from trading_agent.agent import executor

tasks = [
    "What is my current portfolio value and ROI?",
    "Which of my open positions is performing best? Should I take profit?",
    "Scan all prices. Pick the strongest momentum coin (highest 24h change > 3%). "
    "Buy $150 if I have enough balance and no existing position in that coin.",
]

for task in tasks:
    print(f"\n>>> {task}")
    result = executor.invoke({"input": task})
    print(result["output"])
    time.sleep(2)
import time
from trading_agent.agent import executor

STRATEGY_PROMPT = """
You are running a momentum strategy. Do the following in order:
1. Call get_all_prices to get all current prices.
2. Call get_ticker for each of BTCUSDT, ETHUSDT, BNBUSDT, SOLUSDT, ADAUSDT.
3. Identify any coin with 24h change_pct > 2%.
4. For each qualifying coin:
   a. Check if I already have an open position.
   b. If not, and if my available USDT balance > $200, buy $200 worth at market.
   c. Immediately place a stop_loss at -5% of the executed price.
5. Report a brief summary of what you did.
"""

while True:
    result = executor.invoke({"input": STRATEGY_PROMPT})
    print(result["output"])
    print("--- sleeping 5 min ---")
    time.sleep(300)

Step 6 — Structured Tool Inputs with StructuredTool

For complex tools, StructuredTool provides Pydantic validation and cleaner LLM prompting:

from decimal import Decimal
from typing import Literal, Optional
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from trading_agent.client import client
from agentexchange.exceptions import AgentExchangeError


class PlaceOrderInput(BaseModel):
    symbol: str = Field(..., description="Trading pair, e.g. 'BTCUSDT'")
    side: Literal["buy", "sell"] = Field(..., description="Direction of the order")
    order_type: Literal["market", "limit", "stop_loss", "take_profit"] = Field(...)
    quantity: str = Field(..., description="Quantity as decimal string, e.g. '0.01'")
    price: Optional[str] = Field(None, description="Limit price (required for limit orders)")
    trigger_price: Optional[str] = Field(None, description="Trigger price for stop/TP orders")


def _place_order_structured(
    symbol: str, side: str, order_type: str, quantity: str,
    price: Optional[str] = None, trigger_price: Optional[str] = None,
) -> str:
    try:
        kwargs = {}
        if price:
            kwargs["price"] = Decimal(price)
        if trigger_price:
            kwargs["trigger_price"] = Decimal(trigger_price)
        order = client.place_order(
            symbol=symbol.upper(), side=side, order_type=order_type,
            quantity=Decimal(quantity), **kwargs,
        )
        return (
            f"Order placed: {order.order_id} | {order.side} {order.quantity} {order.symbol} "
            f"@ {order.executed_price or 'pending'} | status={order.status}"
        )
    except AgentExchangeError as exc:
        return f"ERROR {exc.code}: {exc}"


place_order_structured = StructuredTool(
    name="place_order",
    func=_place_order_structured,
    args_schema=PlaceOrderInput,
    description="Place a crypto order on AgentExchange. Types: market, limit, stop_loss, take_profit.",
)

Replace the Tool(name="place_order", ...) entry in AGENTEXCHANGE_TOOLS with place_order_structured for automatic schema validation.


Step 7 — WebSocket Streaming

Run a WebSocket client in a background thread so the agent reads from a zero-latency price cache instead of calling get_price over HTTP:

# trading_agent/price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS

latest_prices: dict[str, str] = {}
_lock = threading.Lock()

ws = AgentExchangeWS(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)


@ws.on_ticker("BTCUSDT")
def _on_btc(msg):
    with _lock:
        latest_prices["BTCUSDT"] = msg["data"]["price"]


@ws.on_ticker("ETHUSDT")
def _on_eth(msg):
    with _lock:
        latest_prices["ETHUSDT"] = msg["data"]["price"]


@ws.on_order_update()
def _on_order(msg):
    data = msg["data"]
    print(f"[WS] Order {data['order_id']}{data['status']} @ {data.get('executed_price', 'N/A')}")


def start_stream():
    t = threading.Thread(target=ws.run_forever, daemon=True)
    t.start()
    return t

Add a tool that reads from the cache:

from langchain.tools import Tool
from trading_agent.price_stream import latest_prices, _lock
import json

def get_streamed_price(symbol: str) -> str:
    with _lock:
        price = latest_prices.get(symbol.strip().upper())
    if not price:
        return (
            f"No streamed price cached for {symbol}. "
            "Use get_price instead, or wait a moment for the WS feed to populate."
        )
    return json.dumps({"symbol": symbol.upper(), "price": price, "source": "websocket"})

streamed_price_tool = Tool(
    name="get_streamed_price",
    func=get_streamed_price,
    description="Get the latest price from the live WS feed (zero HTTP latency). Input: symbol string.",
)

Start the stream before the executor:

from trading_agent.price_stream import start_stream

start_stream()  # non-blocking; runs in background thread

Step 8 — Async Agent

For production systems running many concurrent agents:

import asyncio
import os
from langchain.agents import AgentExecutor, create_react_agent
from langchain_openai import ChatOpenAI
from agentexchange import AsyncAgentExchangeClient
from trading_agent.tools import SYSTEM_PROMPT

async_client = AsyncAgentExchangeClient(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    api_secret=os.environ["AGENTEXCHANGE_API_SECRET"],
)


async def run_agent(task: str) -> str:
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    agent = create_react_agent(llm=llm, tools=AGENTEXCHANGE_TOOLS, prompt=SYSTEM_PROMPT)
    executor = AgentExecutor(agent=agent, tools=AGENTEXCHANGE_TOOLS, max_iterations=15)
    result = await executor.ainvoke({"input": task})
    return result["output"]


async def run_concurrent_agents():
    tasks = [
        "What is the current BTC price?",
        "What is my portfolio value?",
        "List my open orders.",
    ]
    results = await asyncio.gather(*[run_agent(t) for t in tasks])
    for task, result in zip(tasks, results):
        print(f"Task: {task}\nAnswer: {result}\n")


asyncio.run(run_concurrent_agents())

Error Handling Reference

All tool functions return an ERROR <code>: <message> string when an exception is raised. The LLM reads this and should respond accordingly:

Error CodeRecommended Agent Behavior
INSUFFICIENT_BALANCECall get_balance, recalculate quantity, retry
RATE_LIMIT_EXCEEDEDWait for rate-limit window to reset before retrying
DAILY_LOSS_LIMITDo not place more orders today; report the situation
INVALID_SYMBOLCall list_pairs to find the correct symbol, then retry
INVALID_QUANTITYReduce quantity; ensure it meets the pair's minimum
PRICE_NOT_AVAILABLERetry after 2–3 seconds
CONNECTION_ERRORRetry with exponential back-off: 1s, 2s, 4s
INTERNAL_ERRORRetry with exponential back-off; escalate if persistent

Troubleshooting

ModuleNotFoundError: No module named 'agentexchange'

pip install -e sdk/

Agent says "I cannot access the API"

  • Verify the backend is running: curl http://localhost:8000/health
  • Confirm AGENTEXCHANGE_API_KEY and AGENTEXCHANGE_BASE_URL are set
  • Ensure no trailing slash on AGENTEXCHANGE_BASE_URL

PRICE_NOT_AVAILABLE on startup

The Binance WebSocket ingestion service needs ~30 seconds to populate all prices after a cold start. Wait and retry.

Agent constructs malformed place_order JSON

Add an explicit reminder in the system prompt: "All quantity and price fields must be decimal strings in quotes, e.g. "0.01", never bare numbers." Or switch to StructuredTool + Pydantic (Step 6).

Agent exceeds max_iterations

Increase max_iterations on AgentExecutor (default 15 may be low for complex multi-step tasks), or break the task into smaller subtasks.


Next Steps

On this page