SQLAlchemy Integration¶
CIUF is compatible with SQLAlchemy 2.x. The connection string CIUF accepts is a standard SQLAlchemy connection URL, and write events can be hooked into the SQLAlchemy session lifecycle.
Connection¶
Pass any SQLAlchemy-compatible connection string to Engine:
from ciuf import Engine
# PostgreSQL
engine = Engine("postgresql://user:password@localhost:5432/mydb")
# PostgreSQL (psycopg3)
engine = Engine("postgresql+psycopg://user:password@localhost:5432/mydb")
# SQLite (development/testing)
engine = Engine("sqlite:///./local.db")
CIUF calls sqlalchemy.create_engine() internally and uses inspect() to discover the schema on startup.
Session event hooks¶
Hook CIUF write events into your SQLAlchemy ORM session so that every flush automatically updates the cache:
from sqlalchemy import event
from sqlalchemy.orm import Session
from ciuf import Engine
ciuf_engine = Engine("postgresql://user:pass@localhost/mydb")
def _row_to_dict(obj):
return {
c.key: getattr(obj, c.key)
for c in obj.__mapper__.column_attrs
}
@event.listens_for(Session, "after_flush")
def sync_ciuf_on_flush(session, flush_context):
for obj in session.new:
ciuf_engine.on_insert(obj.__tablename__, _row_to_dict(obj))
for obj in session.dirty:
ciuf_engine.on_update(obj.__tablename__, new=_row_to_dict(obj))
for obj in session.deleted:
ciuf_engine.on_delete(obj.__tablename__, _row_to_dict(obj))
After this hook is registered, every ORM write that goes through a Session.flush() or Session.commit() automatically propagates through the CIUF DAG.
Scoped sessions¶
If you use scoped_session, register the event on the session factory:
from sqlalchemy.orm import scoped_session, sessionmaker
SessionFactory = scoped_session(sessionmaker(bind=sa_engine))
@event.listens_for(SessionFactory, "after_flush")
def sync_ciuf(session, flush_context):
for obj in session.new:
ciuf_engine.on_insert(obj.__tablename__, _row_to_dict(obj))
for obj in session.dirty:
ciuf_engine.on_update(obj.__tablename__, new=_row_to_dict(obj))
for obj in session.deleted:
ciuf_engine.on_delete(obj.__tablename__, _row_to_dict(obj))
FastAPI + SQLAlchemy¶
A typical FastAPI setup with a per-request session and CIUF:
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from ciuf import Engine, from_sql
app = FastAPI()
ciuf_engine = Engine("postgresql://user:pass@localhost/mydb")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/orders/{customer_id}")
def get_orders(customer_id: int, db: Session = Depends(get_db)):
result = from_sql(
ciuf_engine,
f"SELECT orders.id, orders.amount, products.name "
f"FROM orders JOIN products ON orders.product_id = products.id "
f"WHERE orders.customer_id = {customer_id}"
)
return result.query().to_dict(orient="records")
Using the Core API (without ORM)¶
For SQLAlchemy Core (no ORM models), call write events manually after each execute:
from sqlalchemy import text
with sa_engine.connect() as conn:
conn.execute(text("INSERT INTO orders (id, amount, customer_id) VALUES (1, 99.0, 42)"))
conn.commit()
# Manually notify CIUF
ciuf_engine.on_insert("orders", {"id": 1, "amount": 99.0, "customer_id": 42})
Connection string reference¶
| Database | Connection string format |
|---|---|
| PostgreSQL | postgresql://user:pass@host:5432/db |
| PostgreSQL + psycopg3 | postgresql+psycopg://user:pass@host:5432/db |
| PostgreSQL + asyncpg | postgresql+asyncpg://user:pass@host:5432/db |
| SQLite | sqlite:///path/to/file.db |
| SQLite (in-memory) | sqlite:///:memory: |
| MySQL | mysql+pymysql://user:pass@host:3306/db |
See SQLAlchemy engine configuration for full reference.