Database Architecture
Understanding GoPie's hybrid PostgreSQL and DuckDB architecture
GoPie employs a hybrid database architecture combining PostgreSQL for transactional operations and metadata management with DuckDB for high-performance analytical queries. This design provides the best of both worlds: ACID compliance for critical operations and blazing-fast analytics for large-scale data processing.
Architecture Overview
Dual Database System
When Each Database is Used
PostgreSQL
- Metadata Storage: Dataset info, schemas, descriptions
- User Management: Authentication, authorization, roles
- Transactional Data: Real-time updates, ACID compliance
- System State: Configuration, settings, audit trails
DuckDB
- Analytical Queries: Complex aggregations, window functions
- Large Scans: Full table scans, filtering large datasets
- Joins: Multi-table joins, especially on large tables
- Time Series: Date-based aggregations, trend analysis
PostgreSQL Implementation
Schema Design
Core Tables
-- Organizations and multi-tenancy
CREATE TABLE organizations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
settings JSONB DEFAULT '{}'
);
-- User management
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
organization_id UUID REFERENCES organizations(id),
role VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Dataset metadata
CREATE TABLE datasets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
organization_id UUID REFERENCES organizations(id),
name VARCHAR(255) NOT NULL,
description TEXT,
schema JSONB NOT NULL,
storage_path VARCHAR(500),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(organization_id, name)
);
-- Query history and caching
CREATE TABLE queries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dataset_id UUID REFERENCES datasets(id),
user_id UUID REFERENCES users(id),
query_text TEXT NOT NULL,
query_type VARCHAR(50),
execution_time_ms INTEGER,
result_cache_key VARCHAR(255),
created_at TIMESTAMPTZ DEFAULT NOW()
);
Performance Optimizations
Indexing Strategy
-- Multi-column indexes for common queries
CREATE INDEX idx_datasets_org_name
ON datasets(organization_id, name);
CREATE INDEX idx_queries_user_created
ON queries(user_id, created_at DESC);
-- Partial indexes for filtered queries
CREATE INDEX idx_active_datasets
ON datasets(organization_id)
WHERE deleted_at IS NULL;
-- GIN indexes for JSONB queries
CREATE INDEX idx_dataset_schema
ON datasets USING GIN (schema);
Connection Pooling
# PgBouncer configuration
[databases]
gopie = host=localhost dbname=gopie pool_mode=transaction
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
reserve_pool_size = 5
Transactional Patterns
ACID Compliance
async def create_dataset_with_schema(dataset_info: dict):
async with db.transaction():
# Create dataset record
dataset = await db.execute(
"""
INSERT INTO datasets (organization_id, name, schema)
VALUES ($1, $2, $3)
RETURNING *
""",
dataset_info['org_id'],
dataset_info['name'],
dataset_info['schema']
)
# Create associated permissions
await db.execute(
"""
INSERT INTO dataset_permissions (dataset_id, role, permissions)
VALUES ($1, 'owner', '["read", "write", "delete"]')
""",
dataset['id']
)
# Log the action
await db.execute(
"""
INSERT INTO audit_logs (action, resource_id, user_id)
VALUES ('dataset.create', $1, $2)
""",
dataset['id'],
current_user_id
)
return dataset
DuckDB Implementation
Embedded Architecture
In-Process Analytics
class DuckDBManager:
def __init__(self):
self.conn = duckdb.connect(':memory:')
self.setup_extensions()
def setup_extensions(self):
# Enable useful extensions
self.conn.execute("INSTALL httpfs")
self.conn.execute("LOAD httpfs")
self.conn.execute("INSTALL parquet")
self.conn.execute("LOAD parquet")
File-Based Storage
# Persistent DuckDB for large datasets
conn = duckdb.connect('/data/duckdb/analytics.db')
# Configure memory and threads
conn.execute("SET memory_limit='8GB'")
conn.execute("SET threads=4")
Data Loading Strategies
Direct Parquet Queries
-- Query Parquet files directly without loading
SELECT
customer_id,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM read_parquet('s3://bucket/sales/*.parquet')
WHERE order_date >= '2024-01-01'
GROUP BY customer_id
ORDER BY total_sales DESC
LIMIT 100;
External Tables
-- Create external table pointing to S3
CREATE TABLE sales AS
SELECT * FROM read_parquet('s3://bucket/sales/*.parquet');
-- Create view for complex transformations
CREATE VIEW sales_summary AS
SELECT
DATE_TRUNC('month', order_date) as month,
product_category,
SUM(amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales
GROUP BY 1, 2;
Query Optimization
Columnar Storage Benefits
# Efficient column scanning
query = """
SELECT
region,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp
FROM weather_data
WHERE station_type = 'AIRPORT'
GROUP BY region
"""
# DuckDB only reads required columns
# Automatic SIMD vectorization
# Parallel execution
Advanced Analytics
-- Window functions for time series
WITH daily_sales AS (
SELECT
date,
SUM(amount) as total,
SUM(amount) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) / 7.0 as moving_avg_7d
FROM transactions
GROUP BY date
)
SELECT * FROM daily_sales
WHERE date >= CURRENT_DATE - INTERVAL '30 days';
-- Complex joins with optimization
SELECT
c.customer_name,
p.product_name,
SUM(o.quantity * o.price) as total_spent
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
WHERE o.order_date >= '2024-01-01'
GROUP BY c.customer_name, p.product_name
HAVING SUM(o.quantity * o.price) > 1000
ORDER BY total_spent DESC;
Query Router
Decision Logic
class QueryRouter:
def route_query(self, query: Query) -> Engine:
# Parse query to understand requirements
parsed = self.parse_query(query)
# Metadata queries go to PostgreSQL
if parsed.is_metadata_query:
return PostgreSQLEngine()
# Small datasets can use PostgreSQL
if parsed.estimated_rows < 10000:
return PostgreSQLEngine()
# Analytical queries use DuckDB
if parsed.has_aggregations or parsed.has_window_functions:
return DuckDBEngine()
# Large scans use DuckDB
if parsed.is_full_table_scan:
return DuckDBEngine()
# Default to PostgreSQL for OLTP
return PostgreSQLEngine()
Query Translation
class QueryTranslator:
def translate_for_duckdb(self, pg_query: str) -> str:
# Handle PostgreSQL-specific syntax
query = pg_query
# Convert string concatenation
query = query.replace("||", "CONCAT")
# Convert array operations
query = re.sub(r"ANY\s*\(ARRAY\[(.*?)\]\)",
r"IN (\1)", query)
# Handle date functions
query = query.replace("NOW()", "CURRENT_TIMESTAMP")
return query
Data Synchronization
Change Data Capture
class DataSync:
async def sync_dataset_to_duckdb(self, dataset_id: str):
# Get dataset metadata from PostgreSQL
dataset = await self.pg.fetch_one(
"SELECT * FROM datasets WHERE id = $1",
dataset_id
)
# Load data into DuckDB
self.duckdb.execute(f"""
CREATE TABLE dataset_{dataset_id} AS
SELECT * FROM read_parquet('{dataset['storage_path']}')
""")
# Create indexes for common queries
self.create_duckdb_indexes(dataset_id, dataset['schema'])
# Update sync status
await self.pg.execute(
"UPDATE datasets SET duckdb_synced_at = NOW() WHERE id = $1",
dataset_id
)
Real-time Updates
# Use PostgreSQL for real-time data
async def record_event(event: dict):
await pg.execute("""
INSERT INTO events (type, data, created_at)
VALUES ($1, $2, NOW())
""", event['type'], json.dumps(event['data']))
# Batch sync to DuckDB for analytics
async def sync_events_batch():
# Export recent events
events = await pg.fetch("""
SELECT * FROM events
WHERE created_at > NOW() - INTERVAL '1 hour'
AND NOT synced
""")
# Bulk insert to DuckDB
duckdb.executemany(
"INSERT INTO events_analytics VALUES (?, ?, ?)",
events
)
# Mark as synced
await pg.execute("""
UPDATE events SET synced = true
WHERE created_at > NOW() - INTERVAL '1 hour'
""")
Performance Characteristics
PostgreSQL Performance
Strengths
- OLTP Operations: <10ms for simple queries
- Index Lookups: O(log n) B-tree performance
- Concurrent Writes: MVCC for high concurrency
- JSON Operations: Efficient JSONB indexing
Limitations
- Full table scans on large tables
- Complex analytical queries
- Large aggregations
- Limited parallelization
DuckDB Performance
Strengths
- Columnar Scans: 10-100x faster than row stores
- Vectorization: SIMD CPU instructions
- Parallel Execution: Multi-core utilization
- Compression: 10:1 typical compression ratios
Benchmarks
# 100M row table performance
Simple Aggregation:
PostgreSQL: 45 seconds
DuckDB: 0.8 seconds
Complex Join (3 tables):
PostgreSQL: 120 seconds
DuckDB: 3.2 seconds
Window Functions:
PostgreSQL: 89 seconds
DuckDB: 1.5 seconds
Storage Management
Tiered Storage
class StorageManager:
def determine_storage_tier(self, dataset: Dataset) -> StorageTier:
# Hot data in PostgreSQL
if dataset.access_frequency > 100:
return PostgreSQLTier()
# Warm data in DuckDB
if dataset.size < 10_000_000:
return DuckDBTier()
# Cold data in S3/Parquet
return S3ParquetTier()
Compression Strategies
-- DuckDB automatic compression
CREATE TABLE sales_compressed AS
SELECT * FROM read_csv('sales.csv', AUTO_DETECT=TRUE);
-- Check compression ratio
SELECT
table_name,
compressed_size,
actual_size,
actual_size::FLOAT / compressed_size AS compression_ratio
FROM duckdb_tables()
WHERE table_name = 'sales_compressed';
Backup and Recovery
PostgreSQL Backup
# Continuous archiving with WAL
archive_mode = on
archive_command = 'aws s3 cp %p s3://backup/wal/%f'
# Daily base backups
pg_basebackup -D /backup/base -Ft -z -P
DuckDB Backup
# Export to Parquet for backup
def backup_duckdb_table(table_name: str):
duckdb.execute(f"""
COPY {table_name}
TO 's3://backup/duckdb/{table_name}_{date.today()}.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
""")
Best Practices
Query Optimization
- Use Appropriate Engine: Let the router decide
- Partition Large Tables: Time-based partitioning
- Maintain Statistics: Regular ANALYZE
- Monitor Performance: Track slow queries
Data Modeling
- Normalize in PostgreSQL: 3NF for transactions
- Denormalize in DuckDB: Star schema for analytics
- Use Appropriate Types: Optimize storage
- Document Schemas: Clear column descriptions
Future Enhancements
Planned Features
- Automatic query rewriting
- Intelligent caching layer
- Cross-database joins
- GPU acceleration for DuckDB
Research Areas
- Lakehouse architecture
- Real-time OLAP
- Distributed DuckDB
- Hybrid transactional/analytical processing
Next Steps
- Learn about MCP Servers for AI integration
- Review Data Pipeline for processing details
- Explore Multi-tenancy for isolation strategies