WebSocket¶
FastHTTP supports WebSocket connections for real-time bidirectional communication.
WebSocket routes are defined with the @app.ws() decorator — same declarative style as HTTP routes, but the handler receives a WebSocket connection object instead of a Response.
Quick Start¶
Connect to a WebSocket echo server, send a message, and print the response:
from fasthttp import FastHTTP, WebSocket
app = FastHTTP()
@app.ws(url="wss://echo.websocket.org")
async def echo(ws: WebSocket) -> None:
await ws.send("Hello from fasthttp!")
msg = await ws.recv()
print(f"Received: {msg}")
if __name__ == "__main__":
app.run()
Run it:
Output:
INFO | fasthttp | FastHTTP started
INFO | fasthttp | Running 1 routes
INFO | fasthttp | WebSocket connections: 1
INFO | fasthttp | WebSocket connecting: wss://echo.websocket.org
INFO | fasthttp | ✔ WebSocket connected: wss://echo.websocket.org
Received: Hello from fasthttp!
INFO | fasthttp | WebSocket closed: wss://echo.websocket.org
INFO | fasthttp | Done in 0.53s
@app.ws() Signature¶
@app.ws(
url: str,
*,
reconnect: bool = False,
max_retries: int = 0,
tags: list[str] | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str |
— | WebSocket endpoint, must start with ws:// or wss://. |
reconnect |
bool |
False |
Automatically reconnect when the connection drops. |
max_retries |
int |
0 |
Max reconnect attempts (ignored when reconnect=False). -1 = unlimited. |
tags |
list[str] |
None |
Tags for grouping and filtering routes. |
Sending Messages¶
await ws.send("Hello") # text
await ws.send(b"\x00\xff") # binary
await ws.send({"key": "value"}) # dict/list — auto-serialized to JSON
await ws.send_str("Hello") # explicit text
await ws.send_bytes(b"\x00\xff") # explicit binary
send() accepts str, bytes, dict, or list. Dictionaries and lists are automatically serialized to JSON via orjson.
Receiving Messages¶
Single message¶
msg = await ws.recv() # -> WebSocketMessage
msg = await ws.recv_str() # -> str (raises TypeError on binary)
msg = await ws.recv_bytes() # -> bytes (raises TypeError on text)
WebSocketMessage object¶
msg = await ws.recv()
msg.text # str | None — the message as text (None if binary)
msg.data # str | bytes — the raw underlying data
msg.json() # dict | list — parsed JSON body
str(msg) # str representation
Streaming with async for¶
@app.ws(url="wss://stream.binance.com:9443/ws/btcusdt@trade")
async def trades(ws: WebSocket) -> None:
async for msg in ws:
trade = msg.json()
print(f"BTC price: {trade['p']}")
The loop exits cleanly when the connection is closed.
Connection Properties¶
ws.closed # bool — is the connection closed?
ws.local_address # (host, port) — local endpoint
ws.remote_address # (host, port) — remote server endpoint
Connection Lifecycle¶
Manual close¶
Standard WebSocket close codes:
| Code | Meaning |
|---|---|
1000 |
Normal closure |
1001 |
Going away (server restart, client navigation) |
1008 |
Policy violation |
1011 |
Unexpected server error |
Ping / Pong¶
Heartbeat is handled automatically by the underlying websockets library.
Auto-Reconnect¶
When reconnect=True, the client reconnects with exponential backoff (2^attempt seconds, capped at 30s) if the connection drops.
@app.ws(url="wss://ws-feed.exchange.com", reconnect=True, max_retries=5)
async def market_data(ws: WebSocket) -> None:
"""Reconnect up to 5 times with backoff."""
async for msg in ws:
process(msg)
Set max_retries=-1 for unlimited reconnection attempts.
Running with HTTP Routes¶
WebSocket and HTTP routes run together. HTTP requests complete first, then the app stays alive until all WebSocket connections are closed or interrupted.
from fasthttp import FastHTTP, WebSocket
from fasthttp.response import Response
app = FastHTTP()
@app.get(url="https://api.example.com/status")
async def health(resp: Response) -> dict:
return resp.json()
@app.ws(url="wss://stream.example.com/feed")
async def feed(ws: WebSocket) -> None:
async for msg in ws:
print(msg)
if __name__ == "__main__":
app.run() # HTTP completes first, then WS stays alive until Ctrl+C
Error Handling¶
from fasthttp.websocket import WebSocketError, WebSocketConnectionError
async def safe_handler(ws: WebSocket) -> None:
try:
await ws.send("ping")
msg = await ws.recv()
except WebSocketConnectionError:
print("Connection lost")
except WebSocketError as e:
print(f"WebSocket error: {e}")
Dependencies¶
WebSocket support uses the websockets library. It is included as a core dependency — no extra install step needed.
Summary¶
| Feature | Available |
|---|---|
| Text / binary / JSON messages | ✅ |
async for iteration |
✅ |
| Auto-reconnect with backoff | ✅ |
| Ping / Pong | ✅ |
Works with app.run() |
✅ |
| Works with HTTP routes | ✅ |
| Type-annotated handlers | ✅ |