Skip to content

SQLAlchemy Integration

CIUF is compatible with SQLAlchemy 2.x. The connection string CIUF accepts is a standard SQLAlchemy connection URL, and write events can be hooked into the SQLAlchemy session lifecycle.

Connection

Pass any SQLAlchemy-compatible connection string to Engine:

from ciuf import Engine

# PostgreSQL
engine = Engine("postgresql://user:password@localhost:5432/mydb")

# PostgreSQL (psycopg3)
engine = Engine("postgresql+psycopg://user:password@localhost:5432/mydb")

# SQLite (development/testing)
engine = Engine("sqlite:///./local.db")

CIUF calls sqlalchemy.create_engine() internally and uses inspect() to discover the schema on startup.

Session event hooks

Hook CIUF write events into your SQLAlchemy ORM session so that every flush automatically updates the cache:

from sqlalchemy import event
from sqlalchemy.orm import Session
from ciuf import Engine

ciuf_engine = Engine("postgresql://user:pass@localhost/mydb")

def _row_to_dict(obj):
    return {
        c.key: getattr(obj, c.key)
        for c in obj.__mapper__.column_attrs
    }

@event.listens_for(Session, "after_flush")
def sync_ciuf_on_flush(session, flush_context):
    for obj in session.new:
        ciuf_engine.on_insert(obj.__tablename__, _row_to_dict(obj))
    for obj in session.dirty:
        ciuf_engine.on_update(obj.__tablename__, new=_row_to_dict(obj))
    for obj in session.deleted:
        ciuf_engine.on_delete(obj.__tablename__, _row_to_dict(obj))

After this hook is registered, every ORM write that goes through a Session.flush() or Session.commit() automatically propagates through the CIUF DAG.

Scoped sessions

If you use scoped_session, register the event on the session factory:

from sqlalchemy.orm import scoped_session, sessionmaker

SessionFactory = scoped_session(sessionmaker(bind=sa_engine))

@event.listens_for(SessionFactory, "after_flush")
def sync_ciuf(session, flush_context):
    for obj in session.new:
        ciuf_engine.on_insert(obj.__tablename__, _row_to_dict(obj))
    for obj in session.dirty:
        ciuf_engine.on_update(obj.__tablename__, new=_row_to_dict(obj))
    for obj in session.deleted:
        ciuf_engine.on_delete(obj.__tablename__, _row_to_dict(obj))

FastAPI + SQLAlchemy

A typical FastAPI setup with a per-request session and CIUF:

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from ciuf import Engine, from_sql

app = FastAPI()
ciuf_engine = Engine("postgresql://user:pass@localhost/mydb")

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/orders/{customer_id}")
def get_orders(customer_id: int, db: Session = Depends(get_db)):
    result = from_sql(
        ciuf_engine,
        f"SELECT orders.id, orders.amount, products.name "
        f"FROM orders JOIN products ON orders.product_id = products.id "
        f"WHERE orders.customer_id = {customer_id}"
    )
    return result.query().to_dict(orient="records")

Using the Core API (without ORM)

For SQLAlchemy Core (no ORM models), call write events manually after each execute:

from sqlalchemy import text

with sa_engine.connect() as conn:
    conn.execute(text("INSERT INTO orders (id, amount, customer_id) VALUES (1, 99.0, 42)"))
    conn.commit()
    # Manually notify CIUF
    ciuf_engine.on_insert("orders", {"id": 1, "amount": 99.0, "customer_id": 42})

Connection string reference

Database Connection string format
PostgreSQL postgresql://user:pass@host:5432/db
PostgreSQL + psycopg3 postgresql+psycopg://user:pass@host:5432/db
PostgreSQL + asyncpg postgresql+asyncpg://user:pass@host:5432/db
SQLite sqlite:///path/to/file.db
SQLite (in-memory) sqlite:///:memory:
MySQL mysql+pymysql://user:pass@host:3306/db

See SQLAlchemy engine configuration for full reference.