Skip to content

Query Engine

The ParticleDB query engine executes physical plans using vectorized operators over Apache Arrow columnar arrays. Execution is parallelized across chunks via the Rayon thread pool, and multiple optimization tiers — from zone-level precomputed aggregates down to monomorphized SIMD loops — minimize the work performed per row.

SQL text
Parse → AST → Logical Plan
Optimizer (pushdown, folding, join reorder)
Physical Plan
Vectorized Execution (Arrow RecordBatch chunks, 8192 rows each)
Result batches → wire format (PG / gRPC / HTTP / RESP / WS)

Each physical operator processes data in chunks of 8,192 rows. At 8 bytes per value, one column chunk occupies 64 KB — sized to fit in the CPU L1 data cache. This chunk size balances per-chunk overhead against cache efficiency.

Repeated identical queries hit a plan cache keyed on normalized SQL text. The cache stores the compiled physical plan so parsing and optimization are skipped on subsequent executions. The cache is invalidated on DDL changes (CREATE TABLE, DROP TABLE, CREATE INDEX).

A query result cache sits on top of the plan cache for fully deterministic queries: identical SQL text returns the cached result set in O(1) without re-execution. The result cache is invalidated on any DML operation that touches the referenced tables.

The engine uses a cost-based dispatch to select the fastest aggregation strategy for each query based on the GROUP BY column type, number of distinct groups, sort order, and number of aggregate functions.

Queries without GROUP BY (e.g., SELECT SUM(x) FROM t) resolve through a tiered fast path:

  1. Running stats — O(1) lookup from precomputed table-wide statistics.
  2. Zone aggregate — O(chunks) summation of per-chunk zone map statistics.
  3. Raw-value accumulation — tight for &v in vals loops that LLVM auto-vectorizes into SIMD instructions, bypassing Arrow compute kernel overhead.
  4. COUNT(*) — resolved from metadata in ~20 us, no row scan.

For GROUP BY columns with a small number of distinct values (groups x aggs ≤ 8), the engine builds a boolean mask per group value and accumulates each aggregate using bit-scan iteration over the mask. This strategy is used for dictionary-encoded string columns below the cardinality threshold.

When the GROUP BY column is an integer with a bounded range, the engine allocates a dense accumulator array indexed directly by the group key value. Each row maps to its accumulator slot in O(1) — no hashing, no collision handling.

Variants:

StrategyCondition
Global densePer-thread memory ≤ 64 MB
Range-limited denseSorted data with per-thread range ≤ 32 MB
Radix-partition hashUnsorted high-cardinality exceeding dense thresholds

Range-limited dense is especially effective for naturally sorted columns (e.g., session_id = row_index / 20). Each Rayon thread accumulates into a small dense array spanning only its local [min, max] range, then results are merged via parallel tree reduction with into_par_iter().reduce() in log2(N) levels.

For unsorted high-cardinality GROUP BY columns that exceed dense-array memory budgets, the engine uses radix-partitioned open-addressing hash tables (CompactPart). Each partition fits in cache, and partitions are merged in parallel using a field_merge vector of pre-resolved AccAction operations.

Dictionary-encoded columns use the raw integer dictionary key as a direct array index into accumulators — no hash table, no string comparison. The u8 key slice is accessed directly for maximum cache efficiency.

The engine fuses filter evaluation and aggregate accumulation into a single pass over the data, eliminating intermediate RecordBatch materialization.

Before scanning rows, each chunk is classified using zone map min/max values:

ClassificationAction
ALLEvery row passes — use zone-level stats
NONENo rows pass — skip entirely
PARTIALSome rows pass — scan with mask

This classification runs before any row-level work, and for many filtered aggregates the majority of chunks resolve as ALL or NONE.

Filter predicates are dispatched once through a macro-generated monomorphization layer. Each comparison operator (LT, GT, LE, GE, EQ, NE) generates a specialized closure that compiles to a direct SIMD comparison instruction. No match per element.

For compound AND filters, a nested macro generates 36 monomorphized variants (6 x 6 operator pairs), enabling LLVM to fully auto-vectorize the fused filter+aggregate loop.

Memory StreamsBest PathReason
1-2Contiguous flat columnHardware prefetcher handles 1-2 streams
3+Per-batch (64 KB chunks)L1-resident chunks beat 3+ large scans

The engine selects the path automatically based on the number of distinct column references in the filter and aggregate expressions.

When the filter column and aggregate column are identical (e.g., SELECT AVG(rating) WHERE rating > 3), only one memory stream is needed. The filtered values are accumulated directly without loading a second column, halving memory bandwidth.

On ARM (Apple Silicon), float aggregation uses val * m as f64 where m is 0 or 1. This avoids branch misprediction and costly general-purpose-to-NEON register file transfers that would result from bit-masking approaches like f64::from_bits(...).

For integer aggregation, val & mask.wrapping_neg() provides branchless selection without any floating-point conversion.

The engine builds a hash table on the smaller side of the join and streams the larger (probe) side batch by batch. This prevents OOM on large joins — only the build side plus one probe batch need to fit in memory at any time.

For inputs exceeding 100K rows, the probe phase runs in parallel using par_iter over probe batches against the shared read-only hash table.

When the join key is Int64, a specialized probe_int64_inner path avoids generic hashing overhead and uses direct integer comparison.

For queries like SELECT SUM(b.amount) FROM a JOIN b ON a.id = b.id, the engine fuses the join probe and aggregation into a single pass using scalar accumulators — no per-group arrays, no intermediate join output materialization.

Built-in functions are evaluated on whole Arrow columns rather than row-by-row:

FunctionVectorized?Notes
ABSYesArrow compute kernel
UPPERYesString array transform
LOWERYesString array transform
LENGTHYesString array transform
COALESCEYesNull-aware column merge

This eliminates per-row allocation of Vec<(String, ScalarValue)> intermediaries. Functions not yet vectorized fall back to a row-by-row eval_expr_to_array_fallback path.

When the plan contains a Limit(Sort(input)) pattern, the engine passes the combined limit + offset to Arrow’s lexsort_to_indices for O(N log K) partial sort instead of O(N log N) full sort. This is particularly effective for ORDER BY ... LIMIT queries where K is much smaller than N.

The Rayon thread pool is used throughout execution:

  • Chunk-level parallelism — scan, filter, and aggregate operators process chunks concurrently.
  • Parallel projectionexecute_project uses par_iter for multi-batch inputs.
  • Parallel min/max pre-scan — key range detection uses par_iter().reduce().
  • Parallel tree reduction — dense-array merge across threads uses into_par_iter().reduce() in log2(N) levels instead of sequential accumulation.
  • Parallel join probe — probe batches processed concurrently against the shared build-side hash table.

An is_likely_point_select heuristic classifies incoming queries. Point selects execute on the async Tokio thread, while analytical queries are routed through tokio::task::spawn_blocking to the Rayon pool, preventing long-running OLAP queries from blocking OLTP throughput.