Skip to content

Change Data Capture (CDC)

ParticleDB supports Change Data Capture (CDC) through changefeeds that stream INSERT, UPDATE, and DELETE events from tables to external systems in real time. Each change event carries a monotonically increasing sequence number for ordering and resumption.

Stream changes to an Apache Kafka topic:

CREATE CHANGEFEED orders_feed
FOR TABLE orders
INTO KAFKA 'broker:9092/orders-topic';

The broker address and topic are specified in a single string separated by /.

Send changes as HTTP POST requests to a webhook endpoint:

CREATE CHANGEFEED audit_feed
FOR TABLE users
INTO WEBHOOK 'https://api.example.com/hook';

Changefeeds accept optional configuration via a WITH clause:

CREATE CHANGEFEED detailed_feed
FOR TABLE orders
INTO KAFKA 'broker:9092/orders-topic'
WITH (
include_old_values = true,
batch_size = 500,
flush_interval_ms = 2000,
filter = 'status = ''shipped'''
);
OptionDefaultDescription
include_old_valuesfalseInclude the old row image on UPDATE and DELETE events
batch_size100Number of events to batch before sending
flush_interval_ms1000Maximum time (ms) to wait before flushing an incomplete batch
filternoneSQL WHERE-style expression to filter events

Temporarily stop a changefeed without dropping it. The changefeed retains its position and resumes from where it left off:

PAUSE CHANGEFEED orders_feed;
RESUME CHANGEFEED orders_feed;

List all changefeeds with their current status, table, and delivery statistics:

SHOW CHANGEFEEDS;

The output includes:

ColumnDescription
nameChangefeed name
tableWatched table
stateRunning, Paused, or Error
last_sequenceLast delivered sequence number
events_deliveredTotal events delivered since creation
DROP CHANGEFEED orders_feed;

Each change event is serialized as JSON with the following structure:

{
"sequence": 42,
"timestamp": "2025-03-15T10:30:00Z",
"table": "orders",
"operation": "INSERT",
"key": {"id": 1001},
"new_row": {"id": 1001, "customer": "acme", "amount": 250.00},
"old_row": null
}
FieldPresenceDescription
sequenceAlwaysMonotonic sequence number for ordering and resumption
timestampAlwaysWall-clock time when the change was captured
tableAlwaysName of the modified table
operationAlwaysINSERT, UPDATE, or DELETE
keyAlwaysPrimary key value(s) of the affected row
new_rowINSERT, UPDATEThe new row image
old_rowUPDATE, DELETEThe old row image (only when include_old_values is enabled)

For Kafka sinks, the message key is formatted as table:pk_json to enable partition routing by primary key.

Each changefeed runs as a background task that:

  1. Tails the WAL via a WalTailer to capture DML events in order.
  2. Batches events up to batch_size or until flush_interval elapses.
  3. Delivers to the sink (Kafka producer or HTTP POST).
  4. Tracks position via the last_sequence counter for resumption after pause or restart.

The changefeed manager orchestrates the lifecycle of all active feeds, handling creation, pause/resume, error recovery, and teardown.