Skip to content

How-to: integrating AsyncUseCase with FastAPI

Basic AsyncUseCaseProtocol implementation

AsyncUseCaseProtocol is a Protocol (structural subtyping), so no inheritance is needed. Just implementing async def execute(self, input_: I) -> O conforms.

python
from dataclasses import dataclass
from nene2.use_case import AsyncUseCaseProtocol


@dataclass(frozen=True, slots=True)
class FetchUserInput:
    user_id: int


@dataclass(frozen=True, slots=True)
class FetchUserOutput:
    user_id: int
    name: str


class FetchUserUseCase:
    async def execute(self, input_: FetchUserInput) -> FetchUserOutput:
        # async work such as an external API call or DB access
        return FetchUserOutput(user_id=input_.user_id, name="Alice")

Integration with FastAPI Depends

Passing a factory function to Depends() is the standard pattern.

python
from fastapi import Depends, FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()


def get_fetch_user_use_case() -> FetchUserUseCase:
    return FetchUserUseCase()


@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    use_case: FetchUserUseCase = Depends(get_fetch_user_use_case),
) -> JSONResponse:
    result = await use_case.execute(FetchUserInput(user_id=user_id))
    return JSONResponse({"user_id": result.user_id, "name": result.name})

DI for a UseCase with external dependencies

A UseCase that takes a repository or external client gets those injected via Depends too.

python
class FetchUserUseCase:
    def __init__(self, repository: UserRepositoryInterface) -> None:
        self._repository = repository

    async def execute(self, input_: FetchUserInput) -> FetchUserOutput:
        user = await self._repository.find_by_id(input_.user_id)
        return FetchUserOutput(user_id=user.id, name=user.name)


def get_user_repository() -> UserRepositoryInterface:
    return InMemoryUserRepository()


def get_fetch_user_use_case(
    repository: UserRepositoryInterface = Depends(get_user_repository),
) -> FetchUserUseCase:
    return FetchUserUseCase(repository)

Concurrent execution

Use asyncio.gather() to run multiple AsyncUseCases concurrently.

python
import asyncio


@app.get("/dashboard")
async def dashboard(
    user_id: int,
    fetch_user: FetchUserUseCase = Depends(get_fetch_user_use_case),
    fetch_stats: FetchStatsUseCase = Depends(get_fetch_stats_use_case),
) -> JSONResponse:
    user, stats = await asyncio.gather(
        fetch_user.execute(FetchUserInput(user_id=user_id)),
        fetch_stats.execute(FetchStatsInput(user_id=user_id)),
    )
    return JSONResponse({"user": user.name, "stats": stats.count})

A note on isinstance()

AsyncUseCaseProtocol is @runtime_checkable, but isinstance() only checks for the presence of an execute attribute (it does not distinguish sync from async).

python
# isinstance() returns True for a sync UseCase too (false positive)
isinstance(sync_use_case, AsyncUseCaseProtocol)  # → True

# the correct way to check for async
import inspect
inspect.iscoroutinefunction(use_case.execute)  # → True/False

Type safety is guaranteed by mypy --strict static analysis. See ADR-0010 for details.


The blocking problem of sync DB calls

Making a sync DB call (e.g. the SQLAlchemy sync API) inside an async def handler blocks the event loop and stalls other requests.

python
# ❌ a sync DB call inside async def blocks
@app.get("/notes")
async def list_notes() -> JSONResponse:
    notes = session.execute(select(Note)).scalars().all()  # blocks!
    return JSONResponse(...)

Solution 1: run it in a thread pool with run_in_threadpool

python
from nene2.middleware import run_in_threadpool

@app.get("/notes")
async def list_notes() -> JSONResponse:
    notes = await run_in_threadpool(session.execute, select(Note))
    return JSONResponse(...)

Solution 2: use a def (sync) handler

If you use a sync DB, don't make the handler async def. FastAPI runs it in a thread pool automatically.

python
# ✅ def handler + sync DB = no problem
@app.get("/notes")
def list_notes() -> JSONResponse:
    notes = session.execute(select(Note)).scalars().all()
    return JSONResponse(...)

Solution 3: migrate to the SQLAlchemy async API

Longer term, consider migrating to SQLAlchemy's async API (AsyncSession).

Released under the MIT License.