API Reference¶
Complete API documentation for DBWarden's FastAPI integration.
For most applications, use the DatabaseHandle pattern instead of
get_session(). Call database_config() and use .async_session
directly in route parameters: no Annotated, Depends, or type
aliases needed. See Session Dependency.
get_session¶
Returns a FastAPI dependency that yields an AsyncSession.
Signature¶
def get_session(
database: str | None = None,
*,
dev: bool = False,
) -> Callable[[], AsyncGenerator[AsyncSession, None]]
Parameters¶
database : str | None, optional
- Database name from DBWarden config
- If None, uses the default database
- Default: None
dev : bool, keyword-only, optional
- If True, uses dev_database_url instead of database_url
- Useful for local development
- Default: False
Returns¶
Callable[[], AsyncGenerator[AsyncSession, None]]
- A dependency function that FastAPI's Depends() can consume
- The dependency yields an AsyncSession for each request
- Sessions are automatically closed after the request
Examples¶
# Default database
SessionDep = Annotated[AsyncSession, Depends(get_session())]
# Specific database
AnalyticsSessionDep = Annotated[AsyncSession, Depends(get_session("analytics"))]
# Dev mode
DevSessionDep = Annotated[AsyncSession, Depends(get_session(dev=True))]
Raises¶
ValueError: If database type is not supportedDBWardenConfigError: If config is not loaded or database not found
migration_context¶
Async context manager for running startup migration checks or migrations.
Signature¶
@asynccontextmanager
async def migration_context(
*,
mode: Literal["migrate", "check"] = "check",
database: str | None = None,
all_databases: bool = False,
dev: bool = False,
strict_translation: bool = False,
with_backup: bool = False,
backup_dir: str | None = None,
verbose: bool = False,
allow_in_production: bool = False,
fail_fast: bool = True,
only_dev: bool = False,
) -> AsyncGenerator[None, None]
Parameters¶
mode : Literal["migrate", "check"], keyword-only, optional
- "check" - Read-only validation (recommended for production)
- "migrate" - Apply pending migrations
- Default: "check"
database : str | None, keyword-only, optional
- Database name to check/migrate
- If None, uses default database
- Default: None
all_databases : bool, keyword-only, optional
- If True, check/migrate all configured databases
- Default: False
dev : bool, keyword-only, optional
- Use dev database URL
- Default: False
strict_translation : bool, keyword-only, optional
- Enable strict SQL translation mode
- Default: False
with_backup : bool, keyword-only, optional
- Create backup before migrations (migrate mode only)
- Default: False
backup_dir : str | None, keyword-only, optional
- Directory for backups
- If None, uses default location
- Default: None
verbose : bool, keyword-only, optional
- Enable detailed logging
- Default: False
allow_in_production : bool, keyword-only, optional
- Allow migrate mode in production environment
- Default: False
fail_fast : bool, keyword-only, optional
- Exit immediately on failure
- If False, logs warning but continues
- Default: True
only_dev : bool, keyword-only, optional
- Only run in development environments
- Skipped if ENVIRONMENT is production
- Default: False
Returns¶
AsyncGenerator[None, None]
- Async context manager for use in FastAPI lifespan
Examples¶
# Check mode (recommended)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with migration_context(mode="check", all_databases=True):
yield
# Migrate mode (dev only)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with migration_context(
mode="migrate",
only_dev=True,
with_backup=True,
):
yield
Raises¶
RuntimeError: If checks fail andfail_fast=TrueValueError: Ifmodeis invalid
check_schema_on_startup¶
Run read-only startup schema checks.
Signature¶
def check_schema_on_startup(
*,
database: str | None = None,
all_databases: bool = False,
dev: bool = False,
strict_translation: bool = False,
only_dev: bool = False,
fail_fast: bool = True,
verbose: bool = False,
) -> list[HealthResult]
Parameters¶
Same as migration_context, except no migration-specific parameters.
Returns¶
list[HealthResult]
- List of health results, one per database checked
- Each HealthResult contains:
- database: str - Database name
- status: str - "ok", "degraded", or "error"
- connected: bool - Whether connection succeeded
- pending_migrations: int - Number of unapplied migrations
- lock_active: bool - Whether migration lock is held
- error: str | None - Error message if failed
Examples¶
@asynccontextmanager
async def lifespan(app: FastAPI):
results = check_schema_on_startup(all_databases=True, fail_fast=True)
for result in results:
print(f"{result.database}: {result.status}")
yield
Raises¶
RuntimeError: If any check fails andfail_fast=True
migrate_on_startup¶
Run migration workflow at startup.
Signature¶
def migrate_on_startup(
*,
database: str | None = None,
all_databases: bool = False,
dev: bool = False,
strict_translation: bool = False,
with_backup: bool = False,
backup_dir: str | None = None,
verbose: bool = False,
allow_in_production: bool = False,
fail_fast: bool = True,
only_dev: bool = False,
) -> None
Parameters¶
Same as migration_context in migrate mode.
Returns¶
None
Examples¶
@asynccontextmanager
async def lifespan(app: FastAPI):
migrate_on_startup(
all_databases=True,
with_backup=True,
only_dev=True,
)
yield
Raises¶
RuntimeError: If migration fails andfail_fast=TrueRuntimeError: If in production andallow_in_production=False
DBWardenHealthRouter¶
Creates a FastAPI APIRouter with health, liveness, and readiness endpoints.
Signature¶
Parameters¶
auth_mode : str, optional
- "open" (default) - no authentication required
- "authenticated" - requires X-API-Key header
- Can also be set via DBWARDEN_HEALTH_AUTH env var
api_key : str | None, optional
- API key for authenticated mode
Returns¶
APIRouter
- Router with health endpoints configured
- Routes:
- GET / - Overall health for all databases
- GET /liveness - Always returns 200 (app is alive)
- GET /readiness - Returns 200 when all databases reachable, 503 otherwise
- GET /{database_name} - Health for specific database
Examples¶
from dbwarden.fastapi import DBWardenHealthRouter
app = FastAPI()
app.include_router(DBWardenHealthRouter(), prefix="/health")
# Now available:
# GET /health/ - All databases
# GET /health/liveness - Liveness probe
# GET /health/readiness - Readiness probe
# GET /health/primary - Specific database
Response Schema¶
{
"status": "ok" | "degraded" | "error",
"databases": [
{
"database": str,
"status": "ok" | "degraded" | "error",
"connected": bool,
"pending_migrations": int,
"lock_active": bool,
"error": str | None
}
]
}
Liveness response:
HTTP Status Codes¶
| Scenario | Status Code |
|---|---|
| All healthy | 200 |
| Degraded (pending migrations) | 200 |
| Database unreachable | 503 |
| Database not found | 404 (per-database route only) |
| App is alive (liveness) | 200 |
| Unauthenticated (auth mode) | 401 |
| Invalid API key (auth mode) | 403 |
DBWardenRouter¶
Creates a FastAPI APIRouter with migration status and execution endpoints.
Signature¶
Parameters¶
auth_mode : str, optional
- "open" - No authentication required
- "authenticated" - Require X-API-Key header
- Default: "open"
api_key : str | None, optional
- API key for authenticated mode
- If None, reads from DBWARDEN_MIGRATE_AUTH env var
- Default: None
Returns¶
APIRouter
- Router with status and migrate endpoints
Endpoints¶
GET /status - Returns per-database migration and seed status for all configured databases.
POST /migrate - Triggers migration execution. Accepts JSON body:
Examples¶
from dbwarden.fastapi import DBWardenRouter
app = FastAPI()
app.include_router(DBWardenRouter(), prefix="/db")
# Now available:
# GET /db/status
# POST /db/migrate
With authentication:
app.include_router(
DBWardenRouter(auth_mode="authenticated", api_key="my-secret-key"),
prefix="/db",
)
Response Schema (GET /status)¶
{
"databases": {
"primary": {
"database": "primary",
"status": "ok" | "degraded" | "error",
"connected": bool,
"pending_migrations": int,
"applied_migrations": int,
"pending_seeds": int,
"applied_seeds": int,
"lock_active": bool,
"error": str | None
}
}
}
Response Schema (POST /migrate)¶
HTTP Status Codes¶
| Scenario | Status Code |
|---|---|
| Status retrieved successfully | 200 |
| Migrate completed | 200 |
| Migrate dry-run | 200 |
| Auth failure | 403 |
| Migration error | 500 |
MetricsRouter¶
Creates a FastAPI APIRouter with a Prometheus metrics endpoint.
Signature¶
Returns¶
APIRouter
- Router with metrics endpoint
Endpoints¶
GET /metrics - Returns Prometheus text-format metrics.
Only active when prometheus_client is installed and DBWARDEN_METRICS=true is set. Returns 404 when disabled.
Examples¶
from dbwarden.fastapi import MetricsRouter
app = FastAPI()
app.include_router(MetricsRouter(), prefix="/metrics")
# Now available:
# GET /metrics
Response format¶
# HELP dbwarden_pending_migrations Number of pending migrations
# TYPE dbwarden_pending_migrations gauge
dbwarden_pending_migrations{database="primary"} 0
# HELP dbwarden_schema_version Current schema version
# TYPE dbwarden_schema_version gauge
dbwarden_schema_version{database="primary"} 5.0
MetricsMiddleware¶
ASGI middleware that refreshes pending-migration gauges on each HTTP request.
Signature¶
Usage¶
from dbwarden.fastapi import MetricsMiddleware, MetricsRouter
app = FastAPI()
app.add_middleware(MetricsMiddleware)
app.include_router(MetricsRouter(), prefix="/metrics")
The middleware also records HTTP request duration via the migration duration histogram.
dispose_engines¶
Close all cached database engines and clients. Should be called during FastAPI shutdown.
Signature¶
Examples¶
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dbwarden.fastapi import dispose_engines
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
dispose_engines()
This closes async and sync session factories, connection pools, and ClickHouse clients for all configured databases.
dbwarden_lifespan¶
Async context manager that handles the full FastAPI engine lifecycle: startup schema validation (or auto-migration), readiness gate, seed application, connection pool warmup, and cleanup on shutdown.
Signature¶
async def dbwarden_lifespan(
app=None,
*,
mode: str = "check", # "check" | "migrate" | "none"
database: str | None = None,
all_databases: bool = False,
dev: bool = False,
strict_translation: bool = False,
with_backup: bool = False,
backup_dir: str | None = None,
verbose: bool = False,
allow_in_production: bool = False,
fail_fast: bool = True,
only_dev: bool = False,
readiness_gate: bool = False,
apply_seeds: bool = False,
pool_warmup: bool = False,
pool_warmup_size: int = 3,
)
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
app |
FastAPI |
None |
FastAPI application instance (optional, for router registration) |
mode |
str |
"check" |
Startup mode: "check" (read-only), "migrate" (auto-apply), "none" (skip) |
database |
str |
None |
Target a single database by name |
all_databases |
bool |
False |
Target all configured databases |
readiness_gate |
bool |
False |
Raise if any database is unreachable after startup checks |
apply_seeds |
bool |
False |
Apply pending seed data after migrations |
pool_warmup |
bool |
False |
Acquire connections before yielding to reduce cold-start latency |
pool_warmup_size |
int |
3 |
Number of connections to acquire during warmup |
The remaining parameters (dev, strict_translation, with_backup, etc.) are identical to
migration_context() and control startup check/migration behavior.
Usage¶
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dbwarden.fastapi import dbwarden_lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
async with dbwarden_lifespan(
app,
mode="check",
readiness_gate=True,
pool_warmup=True,
pool_warmup_size=5,
):
yield
app = FastAPI(lifespan=lifespan)
QueryTracingMiddleware¶
ASGI middleware that emits per-request structured query tracing logs. Tracks query count, total duration, slowest query, and slow query threshold breaches.
Signature¶
Usage¶
from dbwarden.fastapi import QueryTracingMiddleware
app.add_middleware(QueryTracingMiddleware, slow_query_threshold_ms=100)
The middleware monkey-patches SQLAlchemy's Engine.connect around each request to count and time queries. On each response, it logs:
| Field | Description |
|---|---|
path |
Request path |
method |
HTTP method |
request_duration_ms |
Total request time |
query_count |
Number of database queries |
total_query_time_ms |
Cumulative query time |
slowest_query_time_ms |
Duration of the slowest query |
slow_queries |
Count of queries exceeding the threshold |
Slow queries are logged at WARNING level; normal requests at INFO.
PoolMetricsCollector¶
Collects SQLAlchemy connection pool metrics for monitoring.
Signature¶
Methods¶
register(name: str, engine) - Register an engine for metrics collection.
collect() -> dict[str, dict[str, int]] - Collect pool metrics from all registered engines.
Usage¶
from dbwarden.fastapi import PoolMetricsCollector
from sqlalchemy import create_engine
collector = PoolMetricsCollector()
engine = create_engine("postgresql://localhost/db")
collector.register("primary", engine)
metrics = collector.collect()
# {
# "primary": {
# "pool_size": 5,
# "checked_out": 2,
# "overflow": 0,
# "checked_in": 3
# }
# }
override_database¶
Async context manager that temporarily overrides a database URL for testing.
Signature¶
async def override_database(
database: str,
url: str,
*,
run_migrations: bool = False,
verbose: bool = False,
) -> AsyncGenerator[Any, None]
Parameters¶
database : str - Database name to override.
url : str - Temporary database URL.
run_migrations : bool, keyword-only, optional - Run pending migrations after override. Default: False.
verbose : bool, keyword-only, optional - Enable verbose migration output. Default: False.
Usage¶
from dbwarden.fastapi import override_database
async with override_database("primary", "sqlite+aiosqlite:///:memory:",
run_migrations=True):
# Test code here uses the overridden database
...
# Original URL is restored on exit
The original sqlalchemy_url_sync and sqlalchemy_url_async are restored when the context manager exits, even if an exception occurs.
migration_state¶
Async context manager that simulates a specific migration state for testing by inserting tracking records.
Signature¶
async def migration_state(
applied: list[str] | None = None,
database: str | None = None,
) -> AsyncGenerator[None, None]
Parameters¶
applied : list[str] | None, optional - List of version strings to mark as applied. Default: None.
database : str | None, optional - Target database name. Default: None (uses default database).
Usage¶
from dbwarden.fastapi import migration_state
async with migration_state(applied=["0001", "0002"]):
# Database appears to have migrations 0001 and 0002 applied
...
# Tracking records are cleaned up on exit
migration_lock and sync_migration_lock¶
Redis-backed distributed migration lock context managers for coordinating migrations across multiple application instances.
Async signature¶
async def migration_lock(
redis_client: Any,
key: str = "dbwarden_migrate",
ttl: int = 60,
) -> AsyncGenerator[None, None]
Sync signature¶
def sync_migration_lock(
redis_client: Any,
key: str = "dbwarden_migrate",
ttl: int = 60,
) -> Generator[None, None]
Parameters¶
redis_client : Any
- Redis client instance (any library with setnx, expire, delete methods)
key : str, optional
- Redis key for the lock
- Default: "dbwarden_migrate"
ttl : int, optional
- Lock TTL in seconds (auto-expiry)
- Default: 60
Raises¶
LockError: If the lock is already held by another process
Examples¶
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from dbwarden.fastapi import migration_context, migration_lock
redis_client = aioredis.from_url("redis://localhost:6379")
@asynccontextmanager
async def lifespan(app: FastAPI):
async with migration_lock(redis_client):
async with migration_context(mode="migrate"):
yield
import redis
from dbwarden.fastapi import migration_context, sync_migration_lock
redis_client = redis.from_url("redis://localhost:6379")
@asynccontextmanager
async def lifespan(app: FastAPI):
with sync_migration_lock(redis_client):
async with migration_context(mode="migrate"):
yield
Type Aliases¶
DatabaseHandle Pattern (Recommended)¶
Use .async_session and .sync_session directly: no type aliases needed:
from dbwarden import database_config
primary = database_config(database_name="primary", ...)
@app.get("/users")
async def list_users(session: primary.async_session):
result = await session.execute(select(User))
return result.scalars().all()
SessionDep (Alternative)¶
If you prefer the Annotated pattern, use get_session():
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from dbwarden.fastapi import get_session
SessionDep = Annotated[AsyncSession, Depends(get_session())]
Data Models¶
HealthResult¶
Returned by check_schema_on_startup:
@dataclass
class HealthResult:
database: str # Database name
status: str # "ok", "degraded", or "error"
connected: bool # Connection successful?
pending_migrations: int # Number of unapplied migrations
lock_active: bool # Migration lock held?
error: str | None # Error message if failed
DatabaseHealth¶
Pydantic model for health endpoints:
class DatabaseHealth(BaseModel):
database: str
status: str
connected: bool
pending_migrations: int
lock_active: bool
error: str | None = None
HealthResponse¶
Pydantic model for health endpoints:
Constants¶
Environment Detection¶
DBWarden detects environment from ENVIRONMENT variable:
Development environments:
- dev
- development
- local
- test
- testing
Production environments:
- prod
- production
Used by only_dev and allow_in_production parameters.
Exceptions¶
DBWardenNotInitializedError¶
Raised when DBWarden config hasn't been loaded.
DBWardenDatabaseNotFoundError¶
Raised when specified database name doesn't exist in config.