API Reference¶
ciuf.Engine¶
The main entry point. Connects to a database, discovers the schema, and manages the DAG cache.
Constructor¶
| Parameter | Type | Default | Description |
|---|---|---|---|
db_connstring |
str |
— | SQLAlchemy connection string |
max_memory_mb |
float \| None |
None |
LRU eviction threshold in MB. When total cache exceeds this limit, least-recently-used tables are evicted. |
ttl_seconds |
float \| None |
None |
TTL per cached result. Tables not accessed within this window are evicted on the next query. |
On construction, CIUF connects to the database, inspects all tables, and creates a TableNode for each one. No data is loaded until the first query.
Write event methods¶
on_insert(table, row)¶
Notify CIUF that a row was inserted. Propagates the delta incrementally through the DAG.
engine.on_insert("orders", {
"id": 12345,
"amount": 99.0,
"customer_id": 42,
"status": "completed",
})
| Parameter | Type | Description |
|---|---|---|
table |
str |
Table name (must match a table discovered at init) |
row |
dict |
Column → value dict for the inserted row |
on_update(table, new, old=None)¶
Notify CIUF that a row was updated. The old state is looked up from the cache automatically.
engine.on_update("customers",
new={"id": 42, "plan": "pro", "name": "Acme Corp"},
old={"id": 42, "plan": "free", "name": "Acme Corp"}, # optional, for call-site clarity
)
| Parameter | Type | Description |
|---|---|---|
table |
str |
Table name |
new |
dict |
Column → value dict for the updated row (new state) |
old |
dict \| None |
Unused; accepted for call-site documentation clarity |
on_delete(table, row)¶
Notify CIUF that a row was deleted.
| Parameter | Type | Description |
|---|---|---|
table |
str |
Table name |
row |
dict |
Dict containing at least the primary key column(s) of the deleted row |
Properties¶
engine.tables¶
Returns a dict of {table_name: TableNode} for all tables discovered at init.
engine.total_memory_mb()¶
Returns the current total memory used by all cached table data, in MB.
engine.dispose()¶
Releases database connections and clears internal state. Call when shutting down.
ciuf.from_sql¶
Parse a SQL query and build a CIUF DAG automatically. Returns a QueryResult.
from ciuf import Engine, from_sql
engine = Engine("postgresql://user:pass@localhost/mydb")
result = from_sql(engine, """
SELECT orders.id, orders.amount, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.id
WHERE customers.plan = 'pro'
""")
df = result.query() # pandas DataFrame
Signature¶
| Parameter | Type | Default | Description |
|---|---|---|---|
engine |
Engine |
— | CIUF Engine connected to the database |
sql |
str |
— | SQL SELECT statement |
dialect |
str |
"postgres" |
SQL dialect for parsing. Supported: postgres, sqlite, mysql, oracle, mssql |
Supported SQL constructs¶
| Construct | Supported |
|---|---|
SELECT col1, col2, ... |
✅ |
SELECT * |
✅ |
Column aliases (AS) |
✅ |
FROM table |
✅ |
INNER JOIN / LEFT JOIN |
✅ |
WHERE with =, !=, <, >, <=, >= |
✅ |
WHERE with IN, BETWEEN, LIKE |
✅ |
WHERE with AND / OR |
✅ |
GROUP BY with COUNT, SUM, AVG, MIN, MAX |
✅ |
| Subqueries | ❌ (v1 scope) |
CTEs (WITH ...) |
❌ (v1 scope) |
| Window functions | ❌ (v1 scope) |
UNION / INTERSECT |
❌ (v1 scope) |
ciuf.QueryResult¶
Returned by from_sql. Wraps a DAG node and exposes a .query() method.
result = from_sql(engine, "SELECT ...")
df = result.query() # returns pandas DataFrame
node = result.node # the underlying DAG node (SelectNode or JoinNode)
Methods¶
result.query() → pd.DataFrame¶
Returns the cached result as a pandas DataFrame. The first call builds the cache from the database; subsequent calls return from memory.
result.node¶
The underlying DAG node (SelectNode, JoinNode, etc.). Used for advanced inspection.
Low-level node API¶
For advanced use cases, you can build the DAG manually:
from ciuf import Engine, FilterNode, FilterType, Condition, Operator, DialectsEnum, SelectNode
engine = Engine("postgresql://...")
table = engine.tables["orders"]
filter_node = FilterNode(
table=table,
lib_engine=engine,
filter_type=FilterType.CONDITION,
condition=Condition(
column="status",
operator=Operator.EQUAL,
values=["completed"],
dialect=DialectsEnum.POSTGRESQL,
),
)
select_node = SelectNode(
parent=table,
query_filter=filter_node,
columns=["id", "amount", "customer_id"],
)
df = select_node.query()
See Architecture for the full DAG model and node types.