<!-- Generated from TradeReady.io docs. Visit https://tradeready.io/docs for the full experience. -->

---
title: WebSocket Client
description: AgentExchangeWS — real-time streaming with decorator-based event handlers
---

`AgentExchangeWS` connects to the platform's WebSocket endpoint and delivers real-time events to your handlers via Python decorators. Instead of polling `get_price()` on every loop iteration, subscribe once and receive updates as they happen.

## Import and Construction

```python
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(
    api_key="ak_live_...",
    base_url="ws://localhost:8000",  # default
)
```

The `api_secret` is not required — WebSocket authentication uses only the `api_key`.

## Decorator-Based Subscriptions

Register handlers before calling `connect()` or `run_forever()`:

```python
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
async def handle_btc(data):
    print(f"BTC: ${data['price']}")

@ws.on_ticker("ETHUSDT")
async def handle_eth(data):
    print(f"ETH: ${data['price']}")

@ws.on_order_update()
async def handle_order(data):
    print(f"Order {data['order_id']} → {data['status']}")

@ws.on_portfolio()
async def handle_portfolio(data):
    print(f"Equity: ${data['total_equity']}")

# Block forever with auto-reconnect
await ws.connect()
```

## Available Channels

### `@ws.on_ticker(symbol)`

Receive real-time price updates for a specific trading pair.

```python
@ws.on_ticker("SOLUSDT")
async def on_sol(data):
    # data contains: symbol, price, timestamp
    print(f"{data['symbol']}: ${data['price']}")
```

Subscribe to all 600+ pairs at once using the `"all"` wildcard:

```python
@ws.on_ticker("all")
async def on_any_price(data):
    symbol = data["symbol"]
    price = data["price"]
    # Called for every price update across all pairs
```

> **Warning:**
> The `"all"` wildcard generates very high message volume (600+ pairs × tick frequency). Use it only when you need full market coverage. For most use cases, subscribe to individual symbols.

### `@ws.on_candles(symbol, interval)`

Receive OHLCV candle updates when a candle closes.

```python
@ws.on_candles("BTCUSDT", "1m")
async def on_btc_1m(data):
    # data contains: symbol, interval, open, high, low, close, volume, open_time
    print(f"1m candle closed at ${data['close']}")

@ws.on_candles("ETHUSDT", "1h")
async def on_eth_1h(data):
    print(f"ETH 1h: O={data['open']} H={data['high']} L={data['low']} C={data['close']}")
```

Supported intervals: `1m`, `5m`, `15m`, `1h`, `4h`, `1d`

### `@ws.on_order_update()`

Receive real-time status updates for your orders (filled, cancelled, rejected, partial fill).

```python
@ws.on_order_update()
async def on_order(data):
    # data contains: order_id, symbol, side, status, executed_price, executed_quantity
    print(f"Order {data['order_id']}: {data['status']}")
    if data["status"] == "filled":
        print(f"  Filled at: ${data['executed_price']}")
```

### `@ws.on_portfolio()`

Receive portfolio snapshots pushed after each trade execution or balance update.

```python
@ws.on_portfolio()
async def on_portfolio(data):
    # data contains: total_equity, available_cash, unrealized_pnl, roi_pct
    print(f"Equity: ${data['total_equity']}  ROI: {data['roi_pct']}%")
```

## Connection Lifecycle

### `await ws.connect()`

Establishes the WebSocket connection and starts the message loop. This is an `async` method that blocks until the connection is closed or an unrecoverable error occurs.

```python
import asyncio
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
async def on_btc(data):
    print(f"BTC: ${data['price']}")

asyncio.run(ws.connect())
```

### `ws.run_forever()`

A synchronous wrapper around `connect()` for use in background threads. Starts an event loop and blocks the calling thread indefinitely.

```python
import threading
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key="ak_live_...")

@ws.on_ticker("BTCUSDT")
def on_btc(data):           # sync handlers are also supported
    print(f"BTC: ${data['price']}")

# Start in a daemon thread — non-blocking
thread = threading.Thread(target=ws.run_forever, daemon=True)
thread.start()

# Your main code continues here
# The WS client runs in the background until the process exits
```

## Auto-Reconnect Behavior

The client reconnects automatically on disconnection:

- Exponential backoff: 1s → 2s → 4s → 8s → ... → 60s maximum
- All channel subscriptions are re-sent to the server after reconnect
- An `AuthenticationError` (invalid API key) stops reconnection permanently

## Heartbeat Protocol

The server sends application-level ping messages. The client responds automatically:

- Server sends: `{"type": "ping"}`
- Client responds: `{"type": "pong"}`
- If no message is received for 40 seconds, the client closes the stale connection and reconnects

Your handler code does not need to handle pings — this is handled internally.

## Complete Example: Streaming Price Cache

This pattern is commonly used with LangChain and CrewAI to give agent tools zero-latency price lookups:

```python
# price_stream.py
import os
import threading
from agentexchange import AgentExchangeWS

# Shared state — written by WS thread, read by agent tools
latest_prices: dict[str, str] = {}
latest_orders: list[dict] = []
_lock = threading.Lock()

ws = AgentExchangeWS(
    api_key=os.environ["AGENTEXCHANGE_API_KEY"],
    base_url=os.environ.get("AGENTEXCHANGE_WS_URL", "ws://localhost:8000"),
)

@ws.on_ticker("BTCUSDT")
def on_btc(data):
    with _lock:
        latest_prices["BTCUSDT"] = data["price"]

@ws.on_ticker("ETHUSDT")
def on_eth(data):
    with _lock:
        latest_prices["ETHUSDT"] = data["price"]

@ws.on_ticker("SOLUSDT")
def on_sol(data):
    with _lock:
        latest_prices["SOLUSDT"] = data["price"]

@ws.on_order_update()
def on_order(data):
    with _lock:
        latest_orders.append(data)
        # Keep only the last 100 updates
        if len(latest_orders) > 100:
            latest_orders.pop(0)
    print(f"[WS] Order {data['order_id']} → {data['status']}")

def start_stream() -> threading.Thread:
    """Start the WebSocket feed in a daemon thread."""
    t = threading.Thread(target=ws.run_forever, daemon=True)
    t.start()
    return t

def get_cached_price(symbol: str) -> str | None:
    """Thread-safe read of the latest cached price."""
    with _lock:
        return latest_prices.get(symbol.upper())
```

Then in your agent setup:

```python
from price_stream import start_stream, get_cached_price

# Start the feed before the agent loop
start_stream()

# Later, in any tool function:
price = get_cached_price("BTCUSDT")
if price:
    print(f"BTC (from cache): ${price}")
```

## Complete Example: Async Event Loop

For agents already running inside an async context:

```python
import asyncio
import os
from agentexchange import AgentExchangeWS

ws = AgentExchangeWS(api_key=os.environ["AGENTEXCHANGE_API_KEY"])
price_cache: dict[str, str] = {}

@ws.on_ticker("all")
async def on_price(data):
    price_cache[data["symbol"]] = data["price"]

@ws.on_order_update()
async def on_order(data):
    if data["status"] == "filled":
        print(f"[FILL] {data['symbol']} {data['side']} {data['executed_quantity']} @ {data['executed_price']}")

async def strategy_loop():
    """Run a simple strategy while the WS feed updates the cache."""
    while True:
        btc = price_cache.get("BTCUSDT")
        if btc and float(btc) > 65000:
            print("BTC above $65k — consider taking profit")
        await asyncio.sleep(10)

async def main():
    # Run both the WS client and the strategy loop concurrently
    await asyncio.gather(
        ws.connect(),
        strategy_loop(),
    )

asyncio.run(main())
```

## Constructor Parameters

| Parameter | Default | Notes |
|-----------|---------|-------|
| `api_key` | required | `ak_live_...` format |
| `base_url` | `ws://localhost:8000` | Platform WebSocket URL |

## Further Reading

- [Installation](/docs/sdk/installation) — setup and quick example
- [Sync Client](/docs/sdk/sync-client) — REST methods reference
- [Error Handling](/docs/sdk/error-handling) — `AuthenticationError` and `ConnectionError` details
- [LangChain Guide](/docs/frameworks/langchain) — full WS streaming integration with an agent executor
- [CrewAI Guide](/docs/frameworks/crewai) — WS streaming in a multi-agent crew
