Skip to content

Python SDK

The ParticleDB Python SDK provides a REST client (zero external dependencies, uses stdlib urllib) and an optional gRPC client. It includes typed helpers for SQL queries, key-value operations, vector search, and RAG pipelines.

Terminal window
pip install particledb

Zero external dependencies — uses Python stdlib only.

from particledb import ParticleDB
db = ParticleDB(host="localhost", port=8080)
# Health check
print(db.health())
# CREATE TABLE
db.execute("""
CREATE TABLE users (
id BIGINT PRIMARY KEY,
name VARCHAR NOT NULL,
email VARCHAR
)
""")
# INSERT
db.execute(
"INSERT INTO users (id, name, email) VALUES ($1, $2, $3)",
[1, "alice", "alice@example.com"],
)
# SELECT
rows = db.query("SELECT * FROM users WHERE id = $1", [1])
for row in rows:
print(row["name"], row["email"])
# Full result with metadata
result = db.query_result("SELECT * FROM users")
print(f"{result.row_count} rows in {result.elapsed_ms}ms")
from particledb import ParticleDB
db = ParticleDB(
host="localhost",
port=8080,
user="admin",
password="secret",
)
from particledb import GrpcClient
client = GrpcClient(host="localhost", port=50051)
rows = client.query("SELECT * FROM users")
for row in rows:
print(row)
# Batch execute
results = client.batch_execute([
"INSERT INTO users (id, name) VALUES (10, 'dave')",
"INSERT INTO users (id, name) VALUES (11, 'eve')",
])
# Context manager
with GrpcClient(host="localhost", port=50051) as client:
rows = client.query("SELECT 1")
# TLS
client = GrpcClient(host="db.example.com", port=50051, secure=True)
# SELECT -- returns list[dict]
rows = db.query("SELECT * FROM users WHERE id = $1", [1])
# Full result with metadata
result = db.query_result("SELECT * FROM users")
print(f"Columns: {result.columns}, Count: {result.row_count}")
# Single row (returns dict or None)
user = db.query_one("SELECT * FROM users WHERE id = $1", [42])
# Execute DML/DDL -- returns rows affected
affected = db.execute(
"UPDATE users SET name = $1 WHERE id = $2",
["bob", 1],
)
# Bulk insert
db.bulk_insert(
"users",
["id", "name", "email"],
[
[2, "bob", "bob@example.com"],
[3, "charlie", "charlie@example.com"],
],
)

Redis-compatible KV API backed by ParticleDB SQL functions.

db = ParticleDB()
# Set / Get
db.kv.set("session:123", '{"user":"alice"}')
db.kv.set("temp:key", "value", ttl=3600) # expires in 1 hour
val = db.kv.get("session:123")
# Delete
db.kv.delete("session:123")
# Batch operations
db.kv.mset({"key1": "val1", "key2": "val2"})
values = db.kv.mget(["key1", "key2"])
# Atomic increment
db.kv.incr("counter")
db.kv.incr("counter", amount=5)
# Check existence and TTL
db.kv.exists("key1") # True
db.kv.ttl("temp:key") # seconds remaining
# Pattern matching
keys = db.kv.keys("session:*")

Nearest-neighbor similarity search on vector columns.

db = ParticleDB()
# Create a vector index
db.vector.create_index("documents", "embedding", "hnsw", m=16, ef_construction=200)
# Insert a document with a vector
db.vector.insert(
"documents",
{"id": 1, "title": "ParticleDB Overview"},
"embedding",
[0.1, 0.2, 0.3, 0.4],
)
# Search for nearest neighbors
results = db.vector.search(
"documents",
"embedding",
[0.1, 0.15, 0.28, 0.42],
k=5,
columns=["id", "title"],
)
for r in results:
print(f"{r['row']['title']} (distance: {r['distance']:.4f})")
# With a filter
results = db.vector.search(
"documents",
"embedding",
[0.1, 0.2, 0.3, 0.4],
k=10,
filter="category = 'tech'",
)

Combine vector search with text retrieval for question answering.

db = ParticleDB()
# Create a RAG pipeline
db.rag.create_pipeline(
"docs_qa",
table="documents",
vector_col="embedding",
text_col="content",
k=5,
mode="hybrid",
)
# Query using the pipeline
answer = db.rag.query("What is ParticleDB?", pipeline="docs_qa")
print(answer)
# Query without a pipeline (inline config)
answer = db.rag.query(
"How fast is ParticleDB?",
table="benchmarks",
vector_col="embedding",
text_col="content",
k=3,
)
# Ingest documents
db.rag.ingest(
"documents",
"ParticleDB is a high-performance HTAP database.",
[0.1, 0.2, 0.3],
metadata={"source": "docs", "page": 1},
)
from particledb import GrpcClient
with GrpcClient(host="localhost", port=50051) as client:
for event in client.stream_query("users", events=["INSERT", "UPDATE"]):
print(f"{event['event']} on {event['table']}: {event['payload']}")
# List all tables
print(db.tables())
# Get table schema
print(db.table("users"))
MethodDescription
query(sql, params=[])Execute SELECT, return list[dict]
query_result(sql, params=[])Execute SELECT, return QueryResult with metadata
query_one(sql, params=[])Execute SELECT, return first row or None
execute(sql, params=[])Execute DML/DDL, return int rows affected
bulk_insert(table, columns, rows)Bulk insert via POST /bulk
tables()List all tables
table(name)Get table schema
health()Health check
.kvAccess KV operations
.vectorAccess Vector operations
.ragAccess RAG operations
MethodDescription
execute_sql(sql, params=[])Execute SQL, return full response dict
query(sql, params=[])Execute SELECT, return list[dict]
batch_execute(statements)Execute multiple SQL statements
stream_query(table, events=[...])Subscribe to CDC events (generator)
health()Health check
close()Close the gRPC channel
MethodDescription
set(key, value, ttl=None)Store a key-value pair
get(key)Retrieve a value
delete(key)Delete a key
mset(mapping)Set multiple key-value pairs
mget(keys)Get multiple values
incr(key, amount=1)Atomic increment
exists(key)Check key existence
ttl(key)Get remaining TTL
keys(pattern="*")List keys by pattern
MethodDescription
search(table, column, query_vector, k=10, columns=None, filter=None)Nearest-neighbor search
insert(table, data, vector_column, vector)Insert row with vector
create_index(table, column, index_type="hnsw", **params)Create vector index
MethodDescription
query(question, pipeline=None, table=..., ...)RAG query
ingest(table, content, embedding, ...)Ingest a document
create_pipeline(name, table=..., ...)Create a named pipeline