Skip to content

How-to: domain event patterns

A pattern for separating side effects after a UseCase runs (sending email, logging, notifications) using domain events and BackgroundTasks.


1. Basic pattern: run events asynchronously with BackgroundTasks

Run work after the response with FastAPI's BackgroundTasks.

python
from fastapi import BackgroundTasks, FastAPI
from fastapi.responses import JSONResponse

def send_welcome_email(email: str) -> None:
    # email sending (slow)
    ...

@app.post("/users", status_code=201)
def create_user(body: CreateUserBody, background_tasks: BackgroundTasks) -> JSONResponse:
    user = create_user_use_case(body.name, body.email)
    background_tasks.add_task(send_welcome_email, user.email)
    return JSONResponse({"user_id": user.user_id}, status_code=201)

2. EventBus pattern: publish domain events from the UseCase

A pattern where the UseCase publishes events and handlers subscribe. The UseCase has no HTTP knowledge (no BackgroundTasks).

python
from dataclasses import dataclass
from typing import Any, Callable

# event definition
@dataclass(frozen=True, slots=True)
class UserCreatedEvent:
    user_id: int
    email: str

# EventBus
type EventHandler = Callable[[Any], None]

class EventBus:
    def __init__(self) -> None:
        self._handlers: dict[type, list[EventHandler]] = {}

    def subscribe(self, event_type: type, handler: EventHandler) -> None:
        self._handlers.setdefault(event_type, []).append(handler)

    def publish(self, event: object) -> None:
        for handler in self._handlers.get(type(event), []):
            handler(event)

event_bus = EventBus()

# UseCase: HTTP-independent
class CreateUserUseCase:
    def __init__(self, event_bus: EventBus) -> None:
        self._event_bus = event_bus

    def execute(self, name: str, email: str) -> User:
        user = User(...)
        self._event_bus.publish(UserCreatedEvent(user.user_id, user.email))
        return user

# register handler
def on_user_created(event: UserCreatedEvent) -> None:
    send_welcome_email(event.email)

event_bus.subscribe(UserCreatedEvent, on_user_created)

3. Combining BackgroundTasks and EventBus

Use BackgroundTasks in the HTTP handler to run the EventBus handlers in the background.

python
@app.post("/users", status_code=201)
def create_user(
    body: CreateUserBody,
    background_tasks: BackgroundTasks,
    use_case: CreateUserUseCase = Depends(get_create_user_use_case),
) -> JSONResponse:
    # the UseCase publishes the event synchronously (queues it on the EventBus)
    user = use_case.execute(body.name, body.email)
    # run event handlers after the response via BackgroundTasks
    for handler_call in collected_events:
        background_tasks.add_task(handler_call)
    return JSONResponse({"user_id": user.user_id}, status_code=201)

4. Verifying events in tests

In tests, completion does not wait on BackgroundTasks — under TestClient it runs synchronously and immediately.

python
# under TestClient, BackgroundTasks runs synchronously before the response returns
executed = []
event_bus.subscribe(UserCreatedEvent, lambda e: executed.append(e))

with TestClient(app) as client:
    r = client.post("/users", json={"name": "Alice", "email": "alice@example.com"})

assert len(executed) == 1  # BackgroundTasks already completed
assert executed[0].email == "alice@example.com"

Caveats

  • EventBus tends to become a module-level global. If handlers accumulate across tests, reset it with an autouse fixture.
  • When publishing events inside a UseCase, pass EventBus as a constructor argument (constructor injection) — avoid a global reference.

5. dataclass inheritance: required fields after default fields

If a base class has a default_factory field, adding a required field in a subclass errors out under Python's dataclass rules. kw_only=True (Python 3.10+) resolves it.

python
# ❌ error: 'order_id' is a required field coming after 'occurred_at'
@dataclass(frozen=True)
class DomainEvent:
    occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str  # TypeError: non-default argument follows default argument

# ✅ resolved with kw_only=True (Python 3.10+)
@dataclass(frozen=True)
class DomainEvent:
    occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC), kw_only=True)

@dataclass(frozen=True)
class OrderPlaced(DomainEvent):
    order_id: str  # OK — kw_only fields aren't subject to the MRO ordering constraint
    total_amount: int

With kw_only=True, the field becomes keyword-only in __init__. A subclass's required arguments can be defined as ordinary positional arguments, and occurred_at is treated as optional.

Released under the MIT License.