Advanced Queries¶
This page covers CUBRID-specific query constructs: hierarchical queries (CONNECT BY), MERGE statement, and click counter functions.
Hierarchical Queries (CONNECT BY)¶
CUBRID supports Oracle-style hierarchical queries for traversing tree-structured data. The sqlalchemy_cubrid package provides a complete set of constructs for building these queries.
Basic example¶
Given an employees table with a self-referencing manager_id column:
from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()
emp = Table(
"employees", metadata,
Column("id", Integer, primary_key=True),
Column("name", String(100)),
Column("manager_id", Integer),
)
Query the hierarchy:
from sqlalchemy_cubrid import HierarchicalSelect, prior, level_col
stmt = HierarchicalSelect(
emp,
columns=[emp.c.id, emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(" " * (row[2] - 1) + row[1])
Generated SQL:
SELECT employees.id, employees.name, LEVEL
FROM employees
START WITH employees.manager_id IS NULL
CONNECT BY PRIOR employees.id = employees.manager_id
HierarchicalSelect parameters¶
| Parameter | Type | Description |
|---|---|---|
table |
Table | The source table |
columns |
list | Columns to select |
connect_by |
expression | The CONNECT BY condition (must include prior()) |
start_with |
expression | Root row filter condition |
where |
expression | Additional WHERE filter (applied before traversal) |
order_siblings_by |
list | Order siblings at each level |
nocycle |
bool | Prevent infinite loops in cyclic data |
prior()¶
Marks a column as the "prior" (parent) side of the hierarchical relationship:
from sqlalchemy_cubrid import prior
# Parent's id matches child's manager_id
connect_by = (prior(emp.c.id) == emp.c.manager_id)
level_col()¶
Returns the LEVEL pseudo-column, which indicates the depth in the hierarchy (1 for root):
from sqlalchemy_cubrid import level_col
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
sys_connect_by_path()¶
Builds a path string from root to current node:
from sqlalchemy_cubrid import sys_connect_by_path
stmt = HierarchicalSelect(
emp,
columns=[
emp.c.name,
sys_connect_by_path(emp.c.name, "/"),
],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
Generated SQL includes:
Note
CUBRID requires the separator to be a string literal, not a bind parameter. The dialect handles this automatically.
connect_by_root()¶
Returns the root node's column value for each row in the hierarchy:
from sqlalchemy_cubrid import connect_by_root
stmt = HierarchicalSelect(
emp,
columns=[
emp.c.name,
connect_by_root(emp.c.name), # Root employee name
],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
connect_by_isleaf()¶
Returns 1 if the current row is a leaf node (has no children), 0 otherwise:
from sqlalchemy_cubrid import connect_by_isleaf
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, connect_by_isleaf()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
connect_by_iscycle()¶
Returns 1 if the current row causes a cycle. Requires nocycle=True:
from sqlalchemy_cubrid import connect_by_iscycle
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, connect_by_iscycle()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
nocycle=True,
)
NOCYCLE¶
Prevents infinite loops when data contains cycles:
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
nocycle=True,
)
Generated SQL:
ORDER SIBLINGS BY¶
Order rows at the same level of the hierarchy:
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
order_siblings_by=[emp.c.name],
)
Generated SQL:
rownum()¶
Returns the ROWNUM pseudo-column, which provides sequential row numbers in the result set starting from 1. Available in both hierarchical and regular queries in CUBRID:
from sqlalchemy_cubrid import rownum
stmt = HierarchicalSelect(
emp,
columns=[rownum(), emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
)
Generated SQL includes:
WHERE clause¶
Filter rows before the hierarchical traversal:
stmt = HierarchicalSelect(
emp,
columns=[emp.c.name, level_col()],
connect_by=(prior(emp.c.id) == emp.c.manager_id),
start_with=(emp.c.manager_id == None),
where=(emp.c.name != "Intern"),
)
MERGE Statement¶
CUBRID supports the SQL MERGE statement for conditional insert/update (upsert based on a source table or subquery).
Basic example¶
from sqlalchemy_cubrid import Merge
stmt = (
Merge(target_table)
.using(source_table)
.on(target_table.c.id == source_table.c.id)
.when_matched_then_update({
target_table.c.name: source_table.c.name,
target_table.c.value: source_table.c.value,
})
.when_not_matched_then_insert({
target_table.c.id: source_table.c.id,
target_table.c.name: source_table.c.name,
target_table.c.value: source_table.c.value,
})
)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
Generated SQL:
MERGE INTO target_table
USING source_table
ON (target_table.id = source_table.id)
WHEN MATCHED THEN UPDATE SET
target_table.name = source_table.name,
target_table.value = source_table.value
WHEN NOT MATCHED THEN INSERT (target_table.id, target_table.name, target_table.value)
VALUES (source_table.id, source_table.name, source_table.value)
Merge API¶
| Method | Description |
|---|---|
Merge(target) |
Create a MERGE targeting the given table |
.using(source) |
Set the source table or subquery |
.on(condition) |
Set the join condition |
.when_matched_then_update(dict, condition=) |
SET clause for matched rows (optional AND condition) |
.when_matched_then_delete(condition=) |
DELETE matched rows (optional AND condition) |
.when_not_matched_then_insert(dict, condition=) |
INSERT clause for unmatched rows (optional AND condition) |
WHEN MATCHED THEN DELETE¶
Delete matched rows instead of updating them:
stmt = (
Merge(target_table)
.using(source_table)
.on(target_table.c.id == source_table.c.id)
.when_matched_then_delete()
)
Generated SQL:
MERGE INTO target_table USING source_table
ON (target_table.id = source_table.id)
WHEN MATCHED THEN DELETE
Conditional WHEN clauses¶
Add conditions to WHEN clauses using the condition= parameter:
stmt = (
Merge(target_table)
.using(source_table)
.on(target_table.c.id == source_table.c.id)
.when_matched_then_update(
{target_table.c.name: source_table.c.name},
condition=source_table.c.active == 1,
)
.when_matched_then_delete(
condition=source_table.c.active == 0,
)
.when_not_matched_then_insert(
{target_table.c.id: source_table.c.id,
target_table.c.name: source_table.c.name},
)
)
Generated SQL:
MERGE INTO target_table USING source_table
ON (target_table.id = source_table.id)
WHEN MATCHED AND source_table.active = ? THEN UPDATE SET ...
WHEN MATCHED AND source_table.active = ? THEN DELETE
WHEN NOT MATCHED THEN INSERT (...) VALUES (...)
Update only (no insert)¶
stmt = (
Merge(target_table)
.using(source_table)
.on(target_table.c.id == source_table.c.id)
.when_matched_then_update({
target_table.c.name: source_table.c.name,
})
)
Insert only (no update)¶
stmt = (
Merge(target_table)
.using(source_table)
.on(target_table.c.id == source_table.c.id)
.when_not_matched_then_insert({
target_table.c.id: source_table.c.id,
target_table.c.name: source_table.c.name,
})
)
Click Counter Functions¶
CUBRID provides atomic increment and decrement functions that work within SELECT statements. These are unique to CUBRID and are used for implementing view counters, like counts, etc.
INCR()¶
Atomically increments an integer column by 1 and returns the value before the increment:
from sqlalchemy import select
from sqlalchemy_cubrid import incr
stmt = select(incr(articles.c.view_count)).where(articles.c.id == 42)
with engine.connect() as conn:
result = conn.execute(stmt)
old_count = result.scalar()
conn.commit()
Generated SQL:
DECR()¶
Atomically decrements an integer column by 1 and returns the value before the decrement:
from sqlalchemy_cubrid import decr
stmt = select(decr(articles.c.view_count)).where(articles.c.id == 42)
Click counter constraints¶
Warning
- Click counters only work on
SMALLINT,INT, andBIGINTcolumns. - The result set must contain exactly one row.
- Click counters combine a SELECT and an UPDATE into a single atomic operation.
Built-in Functions¶
The dialect registers CUBRID-specific built-in functions as GenericFunction classes for proper type inference:
NVL / IFNULL¶
from sqlalchemy import func, select
# NVL(expr, default) -- returns default when expr is NULL
stmt = select(func.nvl(users.c.nickname, users.c.name))
# IFNULL(expr, default) -- alias for NVL
stmt = select(func.ifnull(users.c.nickname, "Anonymous"))
NVL2¶
# NVL2(expr, not_null_val, null_val)
stmt = select(func.nvl2(users.c.email, "Has email", "No email"))
DECODE¶
# DECODE(expr, search1, result1, ..., default)
stmt = select(func.decode(users.c.status, 1, "Active", 2, "Inactive", "Unknown"))
IF¶
GROUP_CONCAT¶
# GROUP_CONCAT(expr) -- concatenate values in a group
stmt = select(func.group_concat(users.c.name)).group_by(users.c.department)
Import Reference¶
All query constructs are available from the top-level package: