Streaming LLM Responses

Keywords: streaming,sse,realtime

Streaming LLM Responses

Why Streaming?
Instead of waiting for complete generation, stream tokens as they are produced:
- Better UX: Users see immediate response
- Lower perceived latency: First token appears quickly
- Flexibility: User can stop generation early

Server-Sent Events (SSE)
Standard protocol for streaming from server to client.

Server Implementation (FastAPI)
``python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/chat")
async def chat(prompt: str):
async def generate():
for token in llm.generate_stream(prompt):
yield f"data: {json.dumps({"token": token})}

"
yield "data: [DONE]

"

return StreamingResponse(
generate(),
media_type="text/event-stream"
)
`

Client Implementation (JavaScript)
`javascript
const eventSource = new EventSource("/chat?prompt=Hello");

eventSource.onmessage = function(event) {
if (event.data === "[DONE]") {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
document.getElementById("output").textContent += data.token;
};
`

Python Client
`python
import httpx

with httpx.stream("POST", "/chat", json={"prompt": "Hello"}) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
print(data["token"], end="", flush=True)
`

OpenAI-Style Streaming
`python
from openai import OpenAI

client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)

for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
`

Key Streaming Metrics

| Metric | Description | Target |
|--------|-------------|--------|
| TTFT | Time to First Token | Less than 500ms |
| TPOT | Time Per Output Token | Less than 50ms |
| ITL | Inter-Token Latency | Low variance |

WebSocket Alternative
For bidirectional real-time communication:
`python
from fastapi import WebSocket

@app.websocket("/ws/chat")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
while True:
prompt = await websocket.receive_text()
for token in llm.generate_stream(prompt):
await websocket.send_text(token)
``

Best Practices
- Handle connection drops gracefully
- Consider buffering (send every N tokens)
- Implement backpressure for slow clients
- Add heartbeats for long generations
- Log complete generations for debugging

Want to learn more?

Search 13,225+ semiconductor and AI topics or chat with our AI assistant.

Search Topics Chat with CFSGPT