WebSocket Client
AgentExchangeWS — real-time streaming with decorator-based event handlers
AgentExchangeWS connects to the platform's WebSocket endpoint and delivers real-time events to your handlers via Python decorators. Instead of polling get_price() on every loop iteration, subscribe once and receive updates as they happen.
Import and Construction
from agentexchange import AgentExchangeWS
ws = AgentExchangeWS(
api_key="ak_live_...",
base_url="ws://localhost:8000", # default
)
The api_secret is not required — WebSocket authentication uses only the api_key.
Decorator-Based Subscriptions
Register handlers before calling connect() or run_forever():
from agentexchange import AgentExchangeWS
ws = AgentExchangeWS(api_key="ak_live_...")
@ws.on_ticker("BTCUSDT")
async def handle_btc(data):
print(f"BTC: ${data['price']}")
@ws.on_ticker("ETHUSDT")
async def handle_eth(data):
print(f"ETH: ${data['price']}")
@ws.on_order_update()
async def handle_order(data):
print(f"Order {data['order_id']} → {data['status']}")
@ws.on_portfolio()
async def handle_portfolio(data):
print(f"Equity: ${data['total_equity']}")
# Block forever with auto-reconnect
await ws.connect()
Available Channels
@ws.on_ticker(symbol)
Receive real-time price updates for a specific trading pair.
@ws.on_ticker("SOLUSDT")
async def on_sol(data):
# data contains: symbol, price, timestamp
print(f"{data['symbol']}: ${data['price']}")
Subscribe to all 600+ pairs at once using the "all" wildcard:
@ws.on_ticker("all")
async def on_any_price(data):
symbol = data["symbol"]
price = data["price"]
# Called for every price update across all pairs
The "all" wildcard generates very high message volume (600+ pairs × tick frequency). Use it only when you need full market coverage. For most use cases, subscribe to individual symbols.
@ws.on_candles(symbol, interval)
Receive OHLCV candle updates when a candle closes.
@ws.on_candles("BTCUSDT", "1m")
async def on_btc_1m(data):
# data contains: symbol, interval, open, high, low, close, volume, open_time
print(f"1m candle closed at ${data['close']}")
@ws.on_candles("ETHUSDT", "1h")
async def on_eth_1h(data):
print(f"ETH 1h: O={data['open']} H={data['high']} L={data['low']} C={data['close']}")
Supported intervals: 1m, 5m, 15m, 1h, 4h, 1d
@ws.on_order_update()
Receive real-time status updates for your orders (filled, cancelled, rejected, partial fill).
@ws.on_order_update()
async def on_order(data):
# data contains: order_id, symbol, side, status, executed_price, executed_quantity
print(f"Order {data['order_id']}: {data['status']}")
if data["status"] == "filled":
print(f" Filled at: ${data['executed_price']}")
@ws.on_portfolio()
Receive portfolio snapshots pushed after each trade execution or balance update.
@ws.on_portfolio()
async def on_portfolio(data):
# data contains: total_equity, available_cash, unrealized_pnl, roi_pct
print(f"Equity: ${data['total_equity']} ROI: {data['roi_pct']}%")
Connection Lifecycle
await ws.connect()
Establishes the WebSocket connection and starts the message loop. This is an async method that blocks until the connection is closed or an unrecoverable error occurs.
import asyncio
from agentexchange import AgentExchangeWS
ws = AgentExchangeWS(api_key="ak_live_...")
@ws.on_ticker("BTCUSDT")
async def on_btc(data):
print(f"BTC: ${data['price']}")
asyncio.run(ws.connect())
ws.run_forever()
A synchronous wrapper around connect() for use in background threads. Starts an event loop and blocks the calling thread indefinitely.
import threading
from agentexchange import AgentExchangeWS
ws = AgentExchangeWS(api_key="ak_live_...")
@ws.on_ticker("BTCUSDT")
def on_btc(data): # sync handlers are also supported
print(f"BTC: ${data['price']}")
# Start in a daemon thread — non-blocking
thread = threading.Thread(target=ws.run_forever, daemon=True)
thread.start()
# Your main code continues here
# The WS client runs in the background until the process exits
Auto-Reconnect Behavior
The client reconnects automatically on disconnection:
- Exponential backoff: 1s → 2s → 4s → 8s → ... → 60s maximum
- All channel subscriptions are re-sent to the server after reconnect
- An
AuthenticationError(invalid API key) stops reconnection permanently
Heartbeat Protocol
The server sends application-level ping messages. The client responds automatically:
- Server sends:
{"type": "ping"} - Client responds:
{"type": "pong"} - If no message is received for 40 seconds, the client closes the stale connection and reconnects
Your handler code does not need to handle pings — this is handled internally.
Complete Example: Streaming Price Cache
This pattern is commonly used with LangChain and CrewAI to give agent tools zero-latency price lookups:
# price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS
# Shared state — written by WS thread, read by agent tools
latest_prices: dict[str, str] = {}
latest_orders: list[dict] = []
_lock = threading.Lock()
ws = AgentExchangeWS(
api_key=os.environ["AGENTEXCHANGE_API_KEY"],
base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)
@ws.on_ticker("BTCUSDT")
def on_btc(data):
with _lock:
latest_prices["BTCUSDT"] = data["price"]
@ws.on_ticker("ETHUSDT")
def on_eth(data):
with _lock:
latest_prices["ETHUSDT"] = data["price"]
@ws.on_ticker("SOLUSDT")
def on_sol(data):
with _lock:
latest_prices["SOLUSDT"] = data["price"]
@ws.on_order_update()
def on_order(data):
with _lock:
latest_orders.append(data)
# Keep only the last 100 updates
if len(latest_orders) > 100:
latest_orders.pop(0)
print(f"[WS] Order {data['order_id']} → {data['status']}")
def start_stream() -> threading.Thread:
"""Start the WebSocket feed in a daemon thread."""
t = threading.Thread(target=ws.run_forever, daemon=True)
t.start()
return t
def get_cached_price(symbol: str) -> str | None:
"""Thread-safe read of the latest cached price."""
with _lock:
return latest_prices.get(symbol.upper())
Then in your agent setup:
from price_stream import start_stream, get_cached_price
# Start the feed before the agent loop
start_stream()
# Later, in any tool function:
price = get_cached_price("BTCUSDT")
if price:
print(f"BTC (from cache): ${price}")
Complete Example: Async Event Loop
For agents already running inside an async context:
import asyncio
import os
from agentexchange import AgentExchangeWS
ws = AgentExchangeWS(api_key=os.environ["AGENTEXCHANGE_API_KEY"])
price_cache: dict[str, str] = {}
@ws.on_ticker("all")
async def on_price(data):
price_cache[data["symbol"]] = data["price"]
@ws.on_order_update()
async def on_order(data):
if data["status"] == "filled":
print(f"[FILL] {data['symbol']} {data['side']} {data['executed_quantity']} @ {data['executed_price']}")
async def strategy_loop():
"""Run a simple strategy while the WS feed updates the cache."""
while True:
btc = price_cache.get("BTCUSDT")
if btc and float(btc) > 65000:
print("BTC above $65k — consider taking profit")
await asyncio.sleep(10)
async def main():
# Run both the WS client and the strategy loop concurrently
await asyncio.gather(
ws.connect(),
strategy_loop(),
)
asyncio.run(main())
Constructor Parameters
| Parameter | Default | Notes |
|---|---|---|
api_key | required | ak_live_... format |
base_url | ws://localhost:8000 | Platform WebSocket URL |
Further Reading
- Installation — setup and quick example
- Sync Client — REST methods reference
- Error Handling —
AuthenticationErrorandConnectionErrordetails - LangChain Guide — full WS streaming integration with an agent executor
- CrewAI Guide — WS streaming in a multi-agent crew