TradeReady.io
Python SDK

WebSocket Client

AgentExchangeWS — real-time streaming with decorator-based event handlers

Download .md

AgentExchangeWS connects to the platform's WebSocket endpoint and delivers real-time events to your handlers via Python decorators. Instead of polling get_price() on every loop iteration, subscribe once and receive updates as they happen.

Import and Construction

from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(
    api_key="ak_live_...",
    base_url="ws://localhost:8000",  # default
)

The api_secret is not required — WebSocket authentication uses only the api_key.

Decorator-Based Subscriptions

Register handlers before calling connect() or run_forever():

from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
async def handle_btc(data):
    print(f"BTC: ${data['price']}")

@ws.on_ticker("ETHUSDT")
async def handle_eth(data):
    print(f"ETH: ${data['price']}")

@ws.on_order_update()
async def handle_order(data):
    print(f"Order {data['order_id']}{data['status']}")

@ws.on_portfolio()
async def handle_portfolio(data):
    print(f"Equity: ${data['total_equity']}")

# Block forever with auto-reconnect
await ws.connect()

Available Channels

@ws.on_ticker(symbol)

Receive real-time price updates for a specific trading pair.

@ws.on_ticker("SOLUSDT")
async def on_sol(data):
    # data contains: symbol, price, timestamp
    print(f"{data['symbol']}: ${data['price']}")

Subscribe to all 600+ pairs at once using the "all" wildcard:

@ws.on_ticker("all")
async def on_any_price(data):
    symbol = data["symbol"]
    price = data["price"]
    # Called for every price update across all pairs

The "all" wildcard generates very high message volume (600+ pairs × tick frequency). Use it only when you need full market coverage. For most use cases, subscribe to individual symbols.

@ws.on_candles(symbol, interval)

Receive OHLCV candle updates when a candle closes.

@ws.on_candles("BTCUSDT", "1m")
async def on_btc_1m(data):
    # data contains: symbol, interval, open, high, low, close, volume, open_time
    print(f"1m candle closed at ${data['close']}")

@ws.on_candles("ETHUSDT", "1h")
async def on_eth_1h(data):
    print(f"ETH 1h: O={data['open']} H={data['high']} L={data['low']} C={data['close']}")

Supported intervals: 1m, 5m, 15m, 1h, 4h, 1d

@ws.on_order_update()

Receive real-time status updates for your orders (filled, cancelled, rejected, partial fill).

@ws.on_order_update()
async def on_order(data):
    # data contains: order_id, symbol, side, status, executed_price, executed_quantity
    print(f"Order {data['order_id']}: {data['status']}")
    if data["status"] == "filled":
        print(f"  Filled at: ${data['executed_price']}")

@ws.on_portfolio()

Receive portfolio snapshots pushed after each trade execution or balance update.

@ws.on_portfolio()
async def on_portfolio(data):
    # data contains: total_equity, available_cash, unrealized_pnl, roi_pct
    print(f"Equity: ${data['total_equity']}  ROI: {data['roi_pct']}%")

Connection Lifecycle

await ws.connect()

Establishes the WebSocket connection and starts the message loop. This is an async method that blocks until the connection is closed or an unrecoverable error occurs.

import asyncio
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
async def on_btc(data):
    print(f"BTC: ${data['price']}")

asyncio.run(ws.connect())

ws.run_forever()

A synchronous wrapper around connect() for use in background threads. Starts an event loop and blocks the calling thread indefinitely.

import threading
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
def on_btc(data):           # sync handlers are also supported
    print(f"BTC: ${data['price']}")

# Start in a daemon thread — non-blocking
thread = threading.Thread(target=ws.run_forever, daemon=True)
thread.start()

# Your main code continues here
# The WS client runs in the background until the process exits

Auto-Reconnect Behavior

The client reconnects automatically on disconnection:

  • Exponential backoff: 1s → 2s → 4s → 8s → ... → 60s maximum
  • All channel subscriptions are re-sent to the server after reconnect
  • An AuthenticationError (invalid API key) stops reconnection permanently

Heartbeat Protocol

The server sends application-level ping messages. The client responds automatically:

  • Server sends: {"type": "ping"}
  • Client responds: {"type": "pong"}
  • If no message is received for 40 seconds, the client closes the stale connection and reconnects

Your handler code does not need to handle pings — this is handled internally.

Complete Example: Streaming Price Cache

This pattern is commonly used with LangChain and CrewAI to give agent tools zero-latency price lookups:

# price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS

# Shared state — written by WS thread, read by agent tools
latest_prices: dict[str, str] = {}
latest_orders: list[dict] = []
_lock = threading.Lock()

ws = AgentExchangeWS(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)


@ws.on_ticker("BTCUSDT")
def on_btc(data):
    with _lock:
        latest_prices["BTCUSDT"] = data["price"]


@ws.on_ticker("ETHUSDT")
def on_eth(data):
    with _lock:
        latest_prices["ETHUSDT"] = data["price"]


@ws.on_ticker("SOLUSDT")
def on_sol(data):
    with _lock:
        latest_prices["SOLUSDT"] = data["price"]


@ws.on_order_update()
def on_order(data):
    with _lock:
        latest_orders.append(data)
        # Keep only the last 100 updates
        if len(latest_orders) > 100:
            latest_orders.pop(0)
    print(f"[WS] Order {data['order_id']}{data['status']}")


def start_stream() -> threading.Thread:
    """Start the WebSocket feed in a daemon thread."""
    t = threading.Thread(target=ws.run_forever, daemon=True)
    t.start()
    return t


def get_cached_price(symbol: str) -> str | None:
    """Thread-safe read of the latest cached price."""
    with _lock:
        return latest_prices.get(symbol.upper())

Then in your agent setup:

from price_stream import start_stream, get_cached_price

# Start the feed before the agent loop
start_stream()

# Later, in any tool function:
price = get_cached_price("BTCUSDT")
if price:
    print(f"BTC (from cache): ${price}")

Complete Example: Async Event Loop

For agents already running inside an async context:

import asyncio
import os
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key=os.environ["AGENTEXCHANGE_API_KEY"])
price_cache: dict[str, str] = {}


@ws.on_ticker("all")
async def on_price(data):
    price_cache[data["symbol"]] = data["price"]


@ws.on_order_update()
async def on_order(data):
    if data["status"] == "filled":
        print(f"[FILL] {data['symbol']} {data['side']} {data['executed_quantity']} @ {data['executed_price']}")


async def strategy_loop():
    """Run a simple strategy while the WS feed updates the cache."""
    while True:
        btc = price_cache.get("BTCUSDT")
        if btc and float(btc) > 65000:
            print("BTC above $65k — consider taking profit")
        await asyncio.sleep(10)


async def main():
    # Run both the WS client and the strategy loop concurrently
    await asyncio.gather(
        ws.connect(),
        strategy_loop(),
    )


asyncio.run(main())

Constructor Parameters

ParameterDefaultNotes
api_keyrequiredak_live_... format
base_urlws://localhost:8000Platform WebSocket URL

Further Reading

On this page