ストリーミングレスポンスとは
ChatGPTが文字をリアルタイムに流すように表示する機能、これがストリーミングレスポンスです。LLMは最初のトークンを生成してから最後のトークンを生成するまでに時間がかかります。ストリーミングなしでは全て生成し終わるまでユーザーは何も見えません。ストリーミングを使うと生成されたトークンを即時にユーザーに届けられます。
長い回答(たとえば500〜1000トークン)では待機時間の差が体感的に大きく、UXに直結します。
SSE(Server-Sent Events)の仕組み
LLMストリーミングの標準的な実装方法がSSE(Server-Sent Events)です。
SSEはHTTP上でサーバーからクライアントへ一方向にデータを継続的に送り続けるプロトコルです。一度コネクションを確立すると、サーバーがデータを送り続けられます。WebSocketと違い、クライアントからの送信は別途HTTPリクエストで行います。
SSEのデータフォーマットは以下のようなシンプルなテキストです。
data: こんにちは
data: 、今日は
data: いい天気
data: ですね。
data: [DONE]
各行は data: プレフィックスから始まり、空行でイベントの区切りを表します。[DONE] はストリームの終了を示すChatGPT互換の慣習です。
Next.jsでの実装
App Router(Next.js 13以降)を使った実装例です。
// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { NextRequest } from "next/server";
const client = new Anthropic();
export async function POST(req: NextRequest) {
const { message } = await req.json();
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
const response = await client.messages.create({
model: "claude-opus-4-5",
max_tokens: 1024,
messages: [{ role: "user", content: message }],
stream: true,
});
for await (const chunk of response) {
if (
chunk.type === "content_block_delta" &&
chunk.delta.type === "text_delta"
) {
const data = `data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`;
controller.enqueue(encoder.encode(data));
}
}
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
} catch (error) {
controller.error(error);
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
フロントエンド側はEventSourceAPIか fetch + ReadableStreamで受信します。
// クライアント側
async function streamChat(message: string) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") return;
const parsed = JSON.parse(data);
console.log(parsed.text); // UIに追記する処理
}
}
}
}
FastAPIでの実装
PythonのFastAPIを使う場合は StreamingResponse を使います。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
@app.post("/chat")
async def chat(request: dict):
message = request["message"]
async def generate():
with client.messages.stream(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": message}]
) as stream:
for text in stream.text_stream:
data = json.dumps({"text": text})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
注意点とエラーハンドリング
途中でコネクションが切れた場合
ユーザーがページを離れたりネットワークが切断されると、SSEのコネクションが切れます。サーバー側は書き込みエラーで検知できますが、APIへのリクエストは続いてしまう場合があります。
AbortControllerを使ってフロントエンドからキャンセルシグナルを送るか、SSEの close イベントを使ってバックエンドのストリームを終了させる設計が必要です。
ネットワークエラーと再接続
EventSource APIはネットワークエラー時に自動で再接続しますが、fetch + ReadableStreamは自動再接続しません。長い会話でのドロップに備えてリトライロジックの実装が必要な場合があります。
Vercelなどのサーバーレス環境
Vercel(Edge Functions除く)のサーバーレス環境では、レスポンスのストリーミングに制限があります。Edge Runtimeを使うか、別途専用サーバーを立てる選択が必要です。Next.js App RouterのEdge Runtimeでのストリーミングは問題なく動作します。
まとめ
LLMのストリーミングはSSEプロトコルで実装するのが標準です。Next.jsではReadableStream、FastAPIではStreamingResponseを使うことで実装できます。コネクション切断の処理とサーバーレス環境の制約を意識した設計が、本番環境での安定動作につながります。