Skip to content

操作指南:流式响应

使用 StreamingResponse 实现 SSE、NDJSON 和 CSV 流式传输的模式。


1. Server-Sent Events(SSE)

用于实时通知和进度更新。

python
import asyncio
from collections.abc import AsyncIterator

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def sse_generator(topic: str) -> AsyncIterator[str]:
    for i in range(5):
        yield f"data: {{'topic': '{topic}', 'count': {i}}}\n\n"
        await asyncio.sleep(1)
    yield "data: {\"done\": true}\n\n"

@app.get("/events/{topic}")
async def stream_events(topic: str) -> StreamingResponse:
    return StreamingResponse(
        sse_generator(topic),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 nginx 代理缓冲
        },
    )

SSE 格式:每条消息以 data: ...\n\n(两个换行符)结尾。


2. NDJSON(换行符分隔的 JSON)

用于流式传输大量数据。

python
import json
from collections.abc import AsyncIterator

async def ndjson_generator(items: list[dict[str, object]]) -> AsyncIterator[str]:
    for item in items:
        yield json.dumps(item, ensure_ascii=False) + "\n"

@app.get("/export/items")
async def export_items() -> StreamingResponse:
    items = fetch_all_items()
    return StreamingResponse(
        ndjson_generator(items),
        media_type="application/x-ndjson",
        headers={"Content-Disposition": "attachment; filename=items.ndjson"},
    )

3. CSV 流式传输

python
import csv
import io
from collections.abc import Iterator

def csv_generator(rows: list[dict[str, object]]) -> Iterator[str]:
    buf = io.StringIO()
    writer = csv.DictWriter(buf, fieldnames=["id", "name", "email"])
    writer.writeheader()
    yield buf.getvalue()
    buf.seek(0)
    buf.truncate()

    for row in rows:
        writer.writerow(row)
        yield buf.getvalue()
        buf.seek(0)
        buf.truncate()

@app.get("/export/users.csv")
def export_users_csv() -> StreamingResponse:
    rows = fetch_all_users()
    return StreamingResponse(
        csv_generator(rows),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"},
    )

4. response_model 不可用

StreamingResponseresponse_model 参数不兼容。该参数会出现在 OpenAPI schema 中,但不会执行任何验证。

python
# ❌ 不要在 StreamingResponse 上指定 response_model
@app.get("/stream", response_model=SomeModel)
def stream() -> StreamingResponse: ...

# ✅ 省略 response_model
@app.get("/stream")
def stream() -> StreamingResponse: ...

5. 与 middleware 共存

nene2 的 RequestIdMiddlewareSecurityHeadersMiddleware 也适用于 StreamingResponse。即使在流式传输期间,响应头也会包含 X-Request-Id


6. 测试

client.stream() 上下文中,不能使用 r.text / r.content(会引发 ResponseNotRead)。使用 r.iter_lines() / r.iter_text() / r.iter_bytes() 收集数据块。

python
def test_sse_stream() -> None:
    with TestClient(app, raise_server_exceptions=False) as client:
        with client.stream("GET", "/events/test") as r:
            assert r.status_code == 200
            assert "text/event-stream" in r.headers["content-type"]
            lines = []
            for line in r.iter_lines():
                lines.append(line)
                if len(lines) >= 3:
                    break
            assert any("data:" in line for line in lines)

def test_csv_stream() -> None:
    with TestClient(app) as client:
        with client.stream("GET", "/export/users.csv") as r:
            assert r.status_code == 200
            # ✅ 使用 iter_text() 收集文本
            content = "".join(r.iter_text())
            # ❌ r.text 会引发 ResponseNotRead
        lines = [l for l in content.splitlines() if l]
        assert lines[0] == "id,name,email"

根据 MIT 许可证发布。