Database Architecture

Understanding GoPie's hybrid PostgreSQL and DuckDB architecture

GoPie employs a hybrid database architecture combining PostgreSQL for transactional operations and metadata management with DuckDB for high-performance analytical queries. This design provides the best of both worlds: ACID compliance for critical operations and blazing-fast analytics for large-scale data processing.

Architecture Overview

Dual Database System

When Each Database is Used

PostgreSQL

  • Metadata Storage: Dataset info, schemas, descriptions
  • User Management: Authentication, authorization, roles
  • Transactional Data: Real-time updates, ACID compliance
  • System State: Configuration, settings, audit trails

DuckDB

  • Analytical Queries: Complex aggregations, window functions
  • Large Scans: Full table scans, filtering large datasets
  • Joins: Multi-table joins, especially on large tables
  • Time Series: Date-based aggregations, trend analysis

PostgreSQL Implementation

Schema Design

Core Tables

-- Organizations and multi-tenancy
CREATE TABLE organizations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    settings JSONB DEFAULT '{}'
);

-- User management
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    organization_id UUID REFERENCES organizations(id),
    role VARCHAR(50) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Dataset metadata
CREATE TABLE datasets (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    organization_id UUID REFERENCES organizations(id),
    name VARCHAR(255) NOT NULL,
    description TEXT,
    schema JSONB NOT NULL,
    storage_path VARCHAR(500),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(organization_id, name)
);

-- Query history and caching
CREATE TABLE queries (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    dataset_id UUID REFERENCES datasets(id),
    user_id UUID REFERENCES users(id),
    query_text TEXT NOT NULL,
    query_type VARCHAR(50),
    execution_time_ms INTEGER,
    result_cache_key VARCHAR(255),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

Performance Optimizations

Indexing Strategy

-- Multi-column indexes for common queries
CREATE INDEX idx_datasets_org_name 
    ON datasets(organization_id, name);

CREATE INDEX idx_queries_user_created 
    ON queries(user_id, created_at DESC);

-- Partial indexes for filtered queries
CREATE INDEX idx_active_datasets 
    ON datasets(organization_id) 
    WHERE deleted_at IS NULL;

-- GIN indexes for JSONB queries
CREATE INDEX idx_dataset_schema 
    ON datasets USING GIN (schema);

Connection Pooling

# PgBouncer configuration
[databases]
gopie = host=localhost dbname=gopie pool_mode=transaction

[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
reserve_pool_size = 5

Transactional Patterns

ACID Compliance

async def create_dataset_with_schema(dataset_info: dict):
    async with db.transaction():
        # Create dataset record
        dataset = await db.execute(
            """
            INSERT INTO datasets (organization_id, name, schema)
            VALUES ($1, $2, $3)
            RETURNING *
            """,
            dataset_info['org_id'],
            dataset_info['name'],
            dataset_info['schema']
        )
        
        # Create associated permissions
        await db.execute(
            """
            INSERT INTO dataset_permissions (dataset_id, role, permissions)
            VALUES ($1, 'owner', '["read", "write", "delete"]')
            """,
            dataset['id']
        )
        
        # Log the action
        await db.execute(
            """
            INSERT INTO audit_logs (action, resource_id, user_id)
            VALUES ('dataset.create', $1, $2)
            """,
            dataset['id'],
            current_user_id
        )
        
        return dataset

DuckDB Implementation

Embedded Architecture

In-Process Analytics

class DuckDBManager:
    def __init__(self):
        self.conn = duckdb.connect(':memory:')
        self.setup_extensions()
    
    def setup_extensions(self):
        # Enable useful extensions
        self.conn.execute("INSTALL httpfs")
        self.conn.execute("LOAD httpfs")
        self.conn.execute("INSTALL parquet")
        self.conn.execute("LOAD parquet")

File-Based Storage

# Persistent DuckDB for large datasets
conn = duckdb.connect('/data/duckdb/analytics.db')

# Configure memory and threads
conn.execute("SET memory_limit='8GB'")
conn.execute("SET threads=4")

Data Loading Strategies

Direct Parquet Queries

-- Query Parquet files directly without loading
SELECT 
    customer_id,
    SUM(amount) as total_sales,
    COUNT(*) as transaction_count
FROM read_parquet('s3://bucket/sales/*.parquet')
WHERE order_date >= '2024-01-01'
GROUP BY customer_id
ORDER BY total_sales DESC
LIMIT 100;

External Tables

-- Create external table pointing to S3
CREATE TABLE sales AS 
SELECT * FROM read_parquet('s3://bucket/sales/*.parquet');

-- Create view for complex transformations
CREATE VIEW sales_summary AS
SELECT 
    DATE_TRUNC('month', order_date) as month,
    product_category,
    SUM(amount) as revenue,
    COUNT(DISTINCT customer_id) as unique_customers
FROM sales
GROUP BY 1, 2;

Query Optimization

Columnar Storage Benefits

# Efficient column scanning
query = """
SELECT 
    region,
    AVG(temperature) as avg_temp,
    MAX(temperature) as max_temp
FROM weather_data
WHERE station_type = 'AIRPORT'
GROUP BY region
"""

# DuckDB only reads required columns
# Automatic SIMD vectorization
# Parallel execution

Advanced Analytics

-- Window functions for time series
WITH daily_sales AS (
    SELECT 
        date,
        SUM(amount) as total,
        SUM(amount) OVER (
            ORDER BY date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) / 7.0 as moving_avg_7d
    FROM transactions
    GROUP BY date
)
SELECT * FROM daily_sales
WHERE date >= CURRENT_DATE - INTERVAL '30 days';

-- Complex joins with optimization
SELECT 
    c.customer_name,
    p.product_name,
    SUM(o.quantity * o.price) as total_spent
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
WHERE o.order_date >= '2024-01-01'
GROUP BY c.customer_name, p.product_name
HAVING SUM(o.quantity * o.price) > 1000
ORDER BY total_spent DESC;

Query Router

Decision Logic

class QueryRouter:
    def route_query(self, query: Query) -> Engine:
        # Parse query to understand requirements
        parsed = self.parse_query(query)
        
        # Metadata queries go to PostgreSQL
        if parsed.is_metadata_query:
            return PostgreSQLEngine()
        
        # Small datasets can use PostgreSQL
        if parsed.estimated_rows < 10000:
            return PostgreSQLEngine()
        
        # Analytical queries use DuckDB
        if parsed.has_aggregations or parsed.has_window_functions:
            return DuckDBEngine()
        
        # Large scans use DuckDB
        if parsed.is_full_table_scan:
            return DuckDBEngine()
        
        # Default to PostgreSQL for OLTP
        return PostgreSQLEngine()

Query Translation

class QueryTranslator:
    def translate_for_duckdb(self, pg_query: str) -> str:
        # Handle PostgreSQL-specific syntax
        query = pg_query
        
        # Convert string concatenation
        query = query.replace("||", "CONCAT")
        
        # Convert array operations
        query = re.sub(r"ANY\s*\(ARRAY\[(.*?)\]\)", 
                      r"IN (\1)", query)
        
        # Handle date functions
        query = query.replace("NOW()", "CURRENT_TIMESTAMP")
        
        return query

Data Synchronization

Change Data Capture

class DataSync:
    async def sync_dataset_to_duckdb(self, dataset_id: str):
        # Get dataset metadata from PostgreSQL
        dataset = await self.pg.fetch_one(
            "SELECT * FROM datasets WHERE id = $1",
            dataset_id
        )
        
        # Load data into DuckDB
        self.duckdb.execute(f"""
            CREATE TABLE dataset_{dataset_id} AS
            SELECT * FROM read_parquet('{dataset['storage_path']}')
        """)
        
        # Create indexes for common queries
        self.create_duckdb_indexes(dataset_id, dataset['schema'])
        
        # Update sync status
        await self.pg.execute(
            "UPDATE datasets SET duckdb_synced_at = NOW() WHERE id = $1",
            dataset_id
        )

Real-time Updates

# Use PostgreSQL for real-time data
async def record_event(event: dict):
    await pg.execute("""
        INSERT INTO events (type, data, created_at)
        VALUES ($1, $2, NOW())
    """, event['type'], json.dumps(event['data']))

# Batch sync to DuckDB for analytics
async def sync_events_batch():
    # Export recent events
    events = await pg.fetch("""
        SELECT * FROM events 
        WHERE created_at > NOW() - INTERVAL '1 hour'
        AND NOT synced
    """)
    
    # Bulk insert to DuckDB
    duckdb.executemany(
        "INSERT INTO events_analytics VALUES (?, ?, ?)",
        events
    )
    
    # Mark as synced
    await pg.execute("""
        UPDATE events SET synced = true 
        WHERE created_at > NOW() - INTERVAL '1 hour'
    """)

Performance Characteristics

PostgreSQL Performance

Strengths

  • OLTP Operations: <10ms for simple queries
  • Index Lookups: O(log n) B-tree performance
  • Concurrent Writes: MVCC for high concurrency
  • JSON Operations: Efficient JSONB indexing

Limitations

  • Full table scans on large tables
  • Complex analytical queries
  • Large aggregations
  • Limited parallelization

DuckDB Performance

Strengths

  • Columnar Scans: 10-100x faster than row stores
  • Vectorization: SIMD CPU instructions
  • Parallel Execution: Multi-core utilization
  • Compression: 10:1 typical compression ratios

Benchmarks

# 100M row table performance
Simple Aggregation:
  PostgreSQL: 45 seconds
  DuckDB: 0.8 seconds

Complex Join (3 tables):
  PostgreSQL: 120 seconds
  DuckDB: 3.2 seconds

Window Functions:
  PostgreSQL: 89 seconds
  DuckDB: 1.5 seconds

Storage Management

Tiered Storage

class StorageManager:
    def determine_storage_tier(self, dataset: Dataset) -> StorageTier:
        # Hot data in PostgreSQL
        if dataset.access_frequency > 100:
            return PostgreSQLTier()
        
        # Warm data in DuckDB
        if dataset.size < 10_000_000:
            return DuckDBTier()
        
        # Cold data in S3/Parquet
        return S3ParquetTier()

Compression Strategies

-- DuckDB automatic compression
CREATE TABLE sales_compressed AS
SELECT * FROM read_csv('sales.csv', AUTO_DETECT=TRUE);

-- Check compression ratio
SELECT 
    table_name,
    compressed_size,
    actual_size,
    actual_size::FLOAT / compressed_size AS compression_ratio
FROM duckdb_tables()
WHERE table_name = 'sales_compressed';

Backup and Recovery

PostgreSQL Backup

# Continuous archiving with WAL
archive_mode = on
archive_command = 'aws s3 cp %p s3://backup/wal/%f'

# Daily base backups
pg_basebackup -D /backup/base -Ft -z -P

DuckDB Backup

# Export to Parquet for backup
def backup_duckdb_table(table_name: str):
    duckdb.execute(f"""
        COPY {table_name} 
        TO 's3://backup/duckdb/{table_name}_{date.today()}.parquet'
        (FORMAT PARQUET, COMPRESSION ZSTD)
    """)

Best Practices

Query Optimization

  1. Use Appropriate Engine: Let the router decide
  2. Partition Large Tables: Time-based partitioning
  3. Maintain Statistics: Regular ANALYZE
  4. Monitor Performance: Track slow queries

Data Modeling

  1. Normalize in PostgreSQL: 3NF for transactions
  2. Denormalize in DuckDB: Star schema for analytics
  3. Use Appropriate Types: Optimize storage
  4. Document Schemas: Clear column descriptions

Future Enhancements

Planned Features

  • Automatic query rewriting
  • Intelligent caching layer
  • Cross-database joins
  • GPU acceleration for DuckDB

Research Areas

  • Lakehouse architecture
  • Real-time OLAP
  • Distributed DuckDB
  • Hybrid transactional/analytical processing

Next Steps