Skip to main content

SQLAlchemy

BemiHQ/bemi-sqlalchemy

Bemi plugs into SQLAlchemy and PostgreSQL to track database changes automatically. It unlocks robust context-aware audit trails and time travel querying inside your application.

This Python package is a recommended SQLAlchemy integration, enabling you to pass application-specific context when performing database changes. This can include context such as the 'where' (API endpoint, worker, etc.), 'who' (user, cron job, etc.), and 'how' behind a change, thereby enriching the information captured by Bemi.

See this example repo as a SQLAlchemy and FastAPI Todo app that automatically tracks and contextualized all changes.

Prerequisites

  • PostgreSQL 14+
  • SQLAlchemy

Installation

  1. Install the Python package
pip install bemi-sqlalchemy
  1. Generate an Alembic migration file to add lightweight PostgreSQL triggers for passing application context with all data changes into PostgreSQL replication log
alembic revision -m "Init Bemi"

And add the following code:

alembic/versions/a925526dcc3b_init_bemi.py
...

def upgrade() -> None:
Bemi.migration_upgrade()

def downgrade() -> None:
Bemi.migration_downgrade()
  1. Run the Alembic migration
alembic upgrade head

Usage

Now you can easily specify custom application context that will be automatically passed with all data changes:

from bemi import Bemi
from sqlalchemy import create_engine
engine = create_engine(DATABASE_URL)

Bemi.set_context({ "process": "MyWorker" })

with engine.connect() as connection:
connection.execute("INSERT INTO ...")
connection.execute("UPDATE ...")

Application context:

  • Is bound to the current execution thread within an HTTP request.
  • Is used only with INSERT, UPDATE, DELETE SQL queries performed via SQLAlchemy. Otherwise, it is a no-op.
  • Is passed directly into PG Write-Ahead Log with data changes without affecting the structure of the database and SQL queries.

Application context will automatically include the original SQL query that performed data changes, which is generally useful for troubleshooting purposes.

FastAPI

Add a middleware to your FastAPI app to automatically pass application context with all tracked database changes made within an HTTP request:

from bemi import BemiFastAPIMiddleware
from fastapi import FastAPI

app = FastAPI()

app.add_middleware(
BemiFastAPIMiddleware,
set_context=lambda request : {
"user_id": current_user(request),
"endpoint": request.url.path,
"method": request.method,
}
)

Database connection

Connect your PostgreSQL source database on bemi.io to start ingesting and storing all data changes stitched together with application-specific context. The database connection details can be securely configured through the dashboard UI in a few seconds.

dashboard

Once your destination PostgreSQL database has been fully provisioned, you'll see a "Connected" status. You can now test the connection after making database changes in your connected source database:

psql postgres://[USERNAME]:[PASSWORD]@[HOSTNAME]:5432/[DATABASE] -c 'SELECT "primary_key", "table", "operation", "before", "after", "context", "committed_at" FROM changes;'

primary_key | table | operation | before | after | context | committed_at
-------------+-------+-----------+----------------------------------------------------+-----------------------------------------------------+-------------------------------------------------------------------------------------------+------------------------
26 | todo | CREATE | {} | {"id": 26, "task": "Sleep", "is_completed": false} | {"user_id": 187234, "endpoint": "/todo", "method": "POST", "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:09+00
27 | todo | CREATE | {} | {"id": 27, "task": "Eat", "is_completed": false} | {"user_id": 187234, "endpoint": "/todo", "method": "POST", "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:11+00
28 | todo | CREATE | {} | {"id": 28, "task": "Repeat", "is_completed": false} | {"user_id": 187234, "endpoint": "/todo", "method": "POST", "SQL": "INSERT INTO ..."} | 2023-12-11 17:09:13+00
26 | todo | UPDATE | {"id": 26, "task": "Sleep", "is_completed": false} | {"id": 26, "task": "Sleep", "is_completed": true} | {"user_id": 187234, "endpoint": "/todo/complete", "method": "PUT", "SQL": "UPDATE ..."} | 2023-12-11 17:09:15+00
27 | todo | DELETE | {"id": 27, "task": "Eat", "is_completed": false} | {} | {"user_id": 187234, "endpoint": "/todo/27", "method": "DELETE", "SQL": "DELETE FROM ..."} | 2023-12-11 17:09:18+00

See Destination Database for more details.

License

Distributed under the terms of the LGPL-3.0. If you need to modify and distribute the code, please release it to contribute back to the open-source community. hide_title: true