Skip to content

API Reference

ciuf.Engine

The main entry point. Connects to a database, discovers the schema, and manages the DAG cache.

from ciuf import Engine

engine = Engine("postgresql://user:pass@localhost/mydb")

Constructor

Engine(
    db_connstring: str,
    max_memory_mb: float | None = None,
    ttl_seconds: float | None = None,
)
Parameter Type Default Description
db_connstring str SQLAlchemy connection string
max_memory_mb float \| None None LRU eviction threshold in MB. When total cache exceeds this limit, least-recently-used tables are evicted.
ttl_seconds float \| None None TTL per cached result. Tables not accessed within this window are evicted on the next query.

On construction, CIUF connects to the database, inspects all tables, and creates a TableNode for each one. No data is loaded until the first query.

Write event methods

on_insert(table, row)

Notify CIUF that a row was inserted. Propagates the delta incrementally through the DAG.

engine.on_insert("orders", {
    "id": 12345,
    "amount": 99.0,
    "customer_id": 42,
    "status": "completed",
})
Parameter Type Description
table str Table name (must match a table discovered at init)
row dict Column → value dict for the inserted row

on_update(table, new, old=None)

Notify CIUF that a row was updated. The old state is looked up from the cache automatically.

engine.on_update("customers",
    new={"id": 42, "plan": "pro", "name": "Acme Corp"},
    old={"id": 42, "plan": "free", "name": "Acme Corp"},  # optional, for call-site clarity
)
Parameter Type Description
table str Table name
new dict Column → value dict for the updated row (new state)
old dict \| None Unused; accepted for call-site documentation clarity

on_delete(table, row)

Notify CIUF that a row was deleted.

engine.on_delete("orders", {"id": 12345})
Parameter Type Description
table str Table name
row dict Dict containing at least the primary key column(s) of the deleted row

Properties

engine.tables

Returns a dict of {table_name: TableNode} for all tables discovered at init.

print(engine.tables.keys())  # dict_keys(['orders', 'customers', 'products'])

engine.total_memory_mb()

Returns the current total memory used by all cached table data, in MB.

engine.dispose()

Releases database connections and clears internal state. Call when shutting down.


ciuf.from_sql

Parse a SQL query and build a CIUF DAG automatically. Returns a QueryResult.

from ciuf import Engine, from_sql

engine = Engine("postgresql://user:pass@localhost/mydb")
result = from_sql(engine, """
    SELECT orders.id, orders.amount, customers.name
    FROM orders
    JOIN customers ON orders.customer_id = customers.id
    WHERE customers.plan = 'pro'
""")

df = result.query()  # pandas DataFrame

Signature

from_sql(
    engine: Engine,
    sql: str,
    dialect: str = "postgres",
) -> QueryResult
Parameter Type Default Description
engine Engine CIUF Engine connected to the database
sql str SQL SELECT statement
dialect str "postgres" SQL dialect for parsing. Supported: postgres, sqlite, mysql, oracle, mssql

Supported SQL constructs

Construct Supported
SELECT col1, col2, ...
SELECT *
Column aliases (AS)
FROM table
INNER JOIN / LEFT JOIN
WHERE with =, !=, <, >, <=, >=
WHERE with IN, BETWEEN, LIKE
WHERE with AND / OR
GROUP BY with COUNT, SUM, AVG, MIN, MAX
Subqueries ❌ (v1 scope)
CTEs (WITH ...) ❌ (v1 scope)
Window functions ❌ (v1 scope)
UNION / INTERSECT ❌ (v1 scope)

ciuf.QueryResult

Returned by from_sql. Wraps a DAG node and exposes a .query() method.

result = from_sql(engine, "SELECT ...")
df = result.query()   # returns pandas DataFrame
node = result.node    # the underlying DAG node (SelectNode or JoinNode)

Methods

result.query() → pd.DataFrame

Returns the cached result as a pandas DataFrame. The first call builds the cache from the database; subsequent calls return from memory.

result.node

The underlying DAG node (SelectNode, JoinNode, etc.). Used for advanced inspection.


Low-level node API

For advanced use cases, you can build the DAG manually:

from ciuf import Engine, FilterNode, FilterType, Condition, Operator, DialectsEnum, SelectNode

engine = Engine("postgresql://...")
table = engine.tables["orders"]

filter_node = FilterNode(
    table=table,
    lib_engine=engine,
    filter_type=FilterType.CONDITION,
    condition=Condition(
        column="status",
        operator=Operator.EQUAL,
        values=["completed"],
        dialect=DialectsEnum.POSTGRESQL,
    ),
)
select_node = SelectNode(
    parent=table,
    query_filter=filter_node,
    columns=["id", "amount", "customer_id"],
)

df = select_node.query()

See Architecture for the full DAG model and node types.