Implementar um repository SQLAlchemy
Este guia mostra como escrever um SqlAlchemyXxxRepository que usa SqlAlchemyQueryExecutor para persistência com SQL raw — sem ORM, sem mágica.
Pré-requisitos
- Um domínio já scaffolado (
entity.py,repository.py,use_case.py,handler.py) - Um app nene2 rodando usando
AppSettingscomDB_ADAPTER=sqlite(oumysql/pgsql)
1. Criar sqlalchemy_repository.py
Helper de schema
Defina ensure_schema() no topo do arquivo. Chame-o uma vez na inicialização a partir de create_app().
from nene2.database import DatabaseQueryExecutorInterface
def ensure_schema(executor: DatabaseQueryExecutorInterface) -> None:
executor.write(
"""
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
author TEXT NOT NULL,
isbn TEXT NOT NULL,
published_year INTEGER NOT NULL
)
"""
)Coloque
ensure_schema()emsqlalchemy_repository.py(não em umschema.pyseparado), para que a definição do schema fique ao lado do repository que possui a tabela. Para apps multi-domínio com um arquivo de schema compartilhado, mantenha umensure_schema()por domínio e chame todos a partir decreate_app().
Helper de linha para entidade
fetch_one / fetch_all retornam dict[str, Any]. Use um método estático privado para centralizar o mapeamento e manter cada método de query enxuto.
from typing import Any
from .entity import Book
from .repository import BookRepositoryInterface
class SqlAlchemyBookRepository(BookRepositoryInterface):
def __init__(self, executor: DatabaseQueryExecutorInterface) -> None:
self._executor = executor
@staticmethod
def _to_book(row: dict[str, Any]) -> Book:
return Book(
id=row["id"],
title=row["title"],
author=row["author"],
isbn=row["isbn"],
published_year=row["published_year"],
)Use
dict[str, Any]— nãodict[str, object].fetch_one()/fetch_all()retornamdict[str, Any], entãorow["id"]éAnyque é atribuível aintsobmypy --strictsem nenhum cast. Usardict[str, object]requer# type: ignore[call-overload]e dispara errosunused-ignoresubsequentes.
Implementação completa
class SqlAlchemyBookRepository(BookRepositoryInterface):
def __init__(self, executor: DatabaseQueryExecutorInterface) -> None:
self._executor = executor
def find_all(self, limit: int, offset: int) -> list[Book]:
rows = self._executor.fetch_all(
"SELECT id, title, author, isbn, published_year FROM books "
"ORDER BY id LIMIT :limit OFFSET :offset",
{"limit": limit, "offset": offset},
)
return [self._to_book(row) for row in rows]
def count_all(self) -> int:
row = self._executor.fetch_one("SELECT COUNT(*) AS cnt FROM books")
return int(row["cnt"]) if row else 0
def find_by_id(self, book_id: int) -> Book | None:
row = self._executor.fetch_one(
"SELECT id, title, author, isbn, published_year FROM books WHERE id = :id",
{"id": book_id},
)
return self._to_book(row) if row else None
def save(self, book: Book) -> Book:
new_id = self._executor.write(
"INSERT INTO books (title, author, isbn, published_year) "
"VALUES (:title, :author, :isbn, :published_year)",
{"title": book.title, "author": book.author,
"isbn": book.isbn, "published_year": book.published_year},
)
return Book(id=new_id, title=book.title, author=book.author,
isbn=book.isbn, published_year=book.published_year)
def update(self, book: Book) -> Book:
self._executor.write(
"UPDATE books SET title=:title, author=:author, isbn=:isbn, "
"published_year=:published_year WHERE id=:id",
{"title": book.title, "author": book.author,
"isbn": book.isbn, "published_year": book.published_year, "id": book.id},
)
return book
def delete(self, book_id: int) -> None:
self._executor.write("DELETE FROM books WHERE id = :id", {"id": book_id})
@staticmethod
def _to_book(row: dict[str, Any]) -> Book:
return Book(
id=row["id"],
title=row["title"],
author=row["author"],
isbn=row["isbn"],
published_year=row["published_year"],
)2. Conectar ao create_app()
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from nene2.config import AppSettings
from nene2.database import SqlAlchemyQueryExecutor
from bookshelf.repository import BookRepositoryInterface, InMemoryBookRepository
from bookshelf.sqlalchemy_repository import SqlAlchemyBookRepository, ensure_schema
def _build_repository(cfg: AppSettings) -> BookRepositoryInterface:
if cfg.db_adapter == "sqlite":
is_memory = cfg.db_name == ":memory:"
engine = create_engine(
cfg.db_url,
connect_args={"check_same_thread": False},
poolclass=StaticPool if is_memory else None,
)
executor = SqlAlchemyQueryExecutor(engine)
ensure_schema(executor) # cria a tabela na primeira execução
return SqlAlchemyBookRepository(executor)
return InMemoryBookRepository() # fallback para testes / desenvolvimento localEnvolva o branch if/else em uma função helper como
_build_repository()que retorna o tipo da interface. É mais limpo do que declararrepo: BookRepositoryInterfaceantes de um bloco if/else emcreate_app()— ambas as abordagens satisfazemmypy --strict, mas o helper mantémcreate_app()legível.Se preferir o branch inline, declare o tipo primeiro:
pythonrepo: BookRepositoryInterface if cfg.db_adapter == "sqlite": repo = SqlAlchemyBookRepository(executor) else: repo = InMemoryBookRepository()
StaticPoolé necessário para bancos SQLite em memória (DB_NAME=:memory:) para evitar que o SQLAlchemy abra múltiplas conexões — cada uma veria um banco vazio. SQLite baseado em arquivo e outros adaptadores não precisam disso.
3. Valor de retorno de write()
executor.write() retorna:
| Operação | Valor de retorno |
|---|---|
INSERT | lastrowid — a chave primária inteira auto-gerada da nova linha |
UPDATE / DELETE | rowcount — número de linhas afetadas (0 se nada correspondeu) |
Use lastrowid para reconstruir a entidade após INSERT:
new_id = self._executor.write("INSERT INTO ...", {...})
return Book(id=new_id, ...)Use rowcount para detectar linhas ausentes em UPDATE / DELETE:
affected = self._executor.write("UPDATE books SET ... WHERE id = :id", {"id": book_id})
if affected == 0:
raise BookNotFoundException(book_id)
lastrowidé garantido como umintpositivo para INSERTs de linha única no SQLite, MySQL e PostgreSQL. É0para INSERTs multi-linha ou quando a tabela não tem colunaAUTOINCREMENT/SERIAL— evite esses padrões se precisar do ID de volta.
4. Entidades com campos datetime
Quando sua entidade tem um campo created_at: datetime suportado por um DEFAULT CURRENT_TIMESTAMP gerado pelo banco, use parse_db_datetime() de nene2.database.
Por que é necessário
O SQLite armazena CURRENT_TIMESTAMP como uma string simples ("2026-05-20 12:34:56"), não como um objeto Python datetime. datetime.fromisoformat() faz parse da string mas retorna um datetime naive (sem timezone), então a resposta JSON vaza um timestamp ambíguo. parse_db_datetime() trata todos os três casos de forma transparente:
| Driver | Valor bruto | Após parse_db_datetime() |
|---|---|---|
| SQLite | "2026-05-20 12:34:56" (str) | datetime(…, tzinfo=UTC) |
| MySQL/PostgreSQL | objeto datetime naive | datetime(…, tzinfo=UTC) |
| MySQL/PostgreSQL | objeto datetime aware | inalterado |
Schema
"created_at DATETIME DEFAULT CURRENT_TIMESTAMP"Padrão SELECT-after-INSERT
Após write() você só recebe de volta o lastrowid, não o created_at gerado pelo DB. Faça um segundo fetch_one() para recuperar a linha completa:
from datetime import datetime
from typing import Any
from nene2.database import DatabaseQueryExecutorInterface, parse_db_datetime
from .entity import Post
def _to_post(row: dict[str, Any]) -> Post:
return Post(
id=row["id"],
title=row["title"],
body=row["body"],
created_at=parse_db_datetime(row["created_at"]),
)
class SqlAlchemyPostRepository(PostRepositoryInterface):
def save(self, title: str, body: str) -> Post:
new_id = self._executor.write(
"INSERT INTO posts (title, body) VALUES (:title, :body)",
{"title": title, "body": body},
)
row = self._executor.fetch_one(
"SELECT id, title, body, created_at FROM posts WHERE id = :id",
{"id": new_id},
)
if row is None:
raise RuntimeError(f"Row {new_id} not found after INSERT into posts")
return _to_post(row)O guard
if row is None: raise RuntimeError(...)é necessário porquefetch_one()retornadict | None. A linha realmente não pode serNonelogo após INSERT — o guard existe para satisfazer o verificador de tipos. PrefiraRuntimeErroraassert:asserté removido porpython -Oe sinalizado pela regra S101 do ruff em código não-teste.
Repository InMemory com datetime
O InMemoryXxxRepository deve gerar o timestamp em Python:
from datetime import datetime, timezone
def save(self, title: str, body: str) -> Post:
now = datetime.now(timezone.utc)
post = Post(id=self._next_id, title=title, body=body, created_at=now)
self._store[self._next_id] = post
self._next_id += 1
return postSerialização JSON
datetime.isoformat() em um datetime aware UTC produz "2026-05-20T12:34:56+00:00". Retorne-o como string no dict de resposta:
def _post_dict(post: Post) -> dict[str, object]:
return {
"id": post.id,
"title": post.title,
"body": post.body,
"created_at": post.created_at.isoformat(), # "2026-05-20T12:34:56+00:00"
}5. Recursos aninhados — validação de propriedade no DELETE
Quando um recurso é aninhado sob um pai (ex: DELETE /posts/{post_id}/comments/{comment_id}), sempre valide que o filho pertence ao pai no UseCase, não apenas no banco de dados.
Errado — ignora post_id
# handler
@router.delete("/posts/{post_id}/comments/{comment_id}", status_code=204)
async def delete_comment(post_id: int, comment_id: int) -> None:
delete_use_case.execute(DeleteCommentInput(comment_id)) # post_id não utilizado!Isso permite que DELETE /posts/1/comments/5 delete o comentário 5 mesmo quando ele pertence ao post 2.
Correto — valide propriedade no UseCase
# use_case.py
@dataclass(frozen=True, slots=True)
class DeleteCommentInput:
post_id: int
comment_id: int
class DeleteCommentUseCase:
def execute(self, input_: DeleteCommentInput) -> None:
comment = self._repository.find_by_id(input_.comment_id)
if comment is None or comment.post_id != input_.post_id:
raise CommentNotFoundException(input_.comment_id)
self._repository.delete(input_.comment_id)
# handler
@router.delete("/posts/{post_id}/comments/{comment_id}", status_code=204)
async def delete_comment(post_id: int, comment_id: int) -> None:
delete_use_case.execute(DeleteCommentInput(post_id, comment_id))O mesmo padrão se aplica a GET e PUT em recursos aninhados: sempre passe
post_idpara o UseCase e verifiquecomment.post_id == input_.post_id.
6. Use InMemoryXxxRepository nos testes
Nunca faça mock do banco de dados. Use a implementação em memória para testes unitários:
from bookshelf.repository import InMemoryBookRepository
from bookshelf.use_case import CreateBookUseCase, CreateBookInput
def test_create_book() -> None:
repo = InMemoryBookRepository()
use_case = CreateBookUseCase(repo)
book = use_case.execute(CreateBookInput(
title="Clean Code", author="Robert C. Martin",
isbn="978-0132350884", published_year=2008,
))
assert book.id == 1
assert book.title == "Clean Code"Para testes de repository SQLAlchemy, use um engine SQLite em memória:
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from nene2.database import SqlAlchemyQueryExecutor
from bookshelf.sqlalchemy_repository import SqlAlchemyBookRepository, ensure_schema
def _make_repo() -> SqlAlchemyBookRepository:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
executor = SqlAlchemyQueryExecutor(engine)
ensure_schema(executor)
return SqlAlchemyBookRepository(executor)7. Operações de escrita múltipla atômicas com transactional()
Quando um UseCase precisa escrever em múltiplas tabelas atomicamente, use SqlAlchemyTransactionManager.transactional() junto com métodos _in_tx do repository.
Definir métodos _in_tx na interface
Adicione métodos dedicados que aceitam um parâmetro executor explícito. Esses são chamados apenas dentro de um callback transactional() — nunca fora.
from nene2.database import DatabaseQueryExecutorInterface
from abc import ABC, abstractmethod
class AccountRepositoryInterface(ABC):
@abstractmethod
def find_by_id(self, account_id: int) -> Account | None: ...
# variantes _in_tx — executor é fornecido pelo callback transactional()
@abstractmethod
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None: ...
@abstractmethod
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None: ...Implementar métodos _in_tx no repository SQLAlchemy
Os métodos _in_tx usam o executor passado em vez de self._executor, para que compartilhem a mesma conexão e participem da mesma transação.
class SqlAlchemyAccountRepository(AccountRepositoryInterface):
def __init__(self, executor: SqlAlchemyQueryExecutor) -> None:
self._executor = executor
def find_by_id(self, account_id: int) -> Account | None:
row = self._executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
row = executor.fetch_one(
"SELECT id, name, balance_cents FROM accounts WHERE id = :id",
{"id": account_id},
)
return self._to_entity(row) if row else None
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
executor.write(
"UPDATE accounts SET balance_cents = balance_cents + :delta WHERE id = :id",
{"delta": delta_cents, "id": account_id},
)Conectar o UseCase com SqlAlchemyTransactionManager
from nene2.database import SqlAlchemyTransactionManager
engine = create_engine(cfg.db_url, connect_args={"check_same_thread": False})
transaction_manager = SqlAlchemyTransactionManager(engine)
transfer_use_case = TransferUseCase(transaction_manager, account_repo, transfer_repo)Implementar InMemory _in_tx para testes unitários
A implementação InMemory ignora o executor — as operações vão diretamente para o store em memória. InMemoryTransactionManager chama o callback imediatamente com um executor no-op.
from nene2.database import DatabaseQueryExecutorInterface, DatabaseTransactionManagerInterface
from collections.abc import Callable
class InMemoryAccountRepository(AccountRepositoryInterface):
def find_by_id_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int
) -> Account | None:
return self._accounts.get(account_id)
def update_balance_in_tx(
self, executor: DatabaseQueryExecutorInterface, account_id: int, delta_cents: int
) -> None:
account = self._accounts[account_id]
self._accounts[account_id] = Account(
id=account.id, name=account.name, balance_cents=account.balance_cents + delta_cents
)
class _NoOpExecutor(DatabaseQueryExecutorInterface):
def fetch_all(self, sql: str, params: dict[str, object] | None = None) -> list[dict[str, object]]:
return []
def fetch_one(self, sql: str, params: dict[str, object] | None = None) -> dict[str, object] | None:
return None
def write(self, sql: str, params: dict[str, object] | None = None) -> int:
return 0
class InMemoryTransactionManager(DatabaseTransactionManagerInterface):
def transactional[T](self, callback: Callable[[DatabaseQueryExecutorInterface], T]) -> T:
return callback(_NoOpExecutor())
def begin(self) -> None: pass
def commit(self) -> None: pass
def rollback(self) -> None: passRollback em exceção:
SqlAlchemyTransactionManager.transactional()usaengine.begin()— qualquer exceção dentro do callback dispara um rollback automático. Exceções de domínio (AccountNotFoundException, etc.) propagam normalmente após o rollback.
6. Usando MySQL 8
Pacotes necessários
O MySQL 8 usa autenticação caching_sha2_password por padrão. Instale ambos pymysql e cryptography — sem cryptography a conexão falha com Authentication plugin 'caching_sha2_password' is not supported.
uv add pymysql cryptographyURL de conexão
url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{name}"
engine = create_engine(url, pool_pre_ping=True)pool_pre_ping=True é recomendado para MySQL — testa a conexão antes do uso para lidar com conexões inativas após o wait_timeout do servidor.
Health check
DatabaseHealthCheck recebe um SqlAlchemyQueryExecutor, não o engine diretamente:
from nene2.database import DatabaseHealthCheck, SqlAlchemyQueryExecutor
executor = SqlAlchemyQueryExecutor(engine)
health = DatabaseHealthCheck(executor) # ← executor, não engine
app.add_api_route("/health", health.check, methods=["GET"])Diferença de tipo de CURRENT_TIMESTAMP
O SQLite retorna CURRENT_TIMESTAMP como str; o MySQL retorna um objeto datetime naive. Use parse_db_datetime() em _to_entity() para tratar ambos de forma transparente:
from nene2.database import parse_db_datetime
def _to_entity(row: dict[str, Any]) -> Product:
return Product(
id=int(row["id"]),
created_at=parse_db_datetime(row["created_at"]), # funciona para SQLite e MySQL
)Exemplo Docker Compose
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mydb
MYSQL_USER: appuser
MYSQL_PASSWORD: apppass
ports:
- "3310:3306"
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-uappuser", "-papppass"]
interval: 5s
timeout: 5s
retries: 10
app:
build: .
depends_on:
mysql:
condition: service_healthy