Storage Configuration

Setting up and configuring storage systems for GoPie

GoPie uses S3-compatible object storage for storing datasets, exports, and other files. This guide covers storage setup and configuration options.

Storage Architecture

Overview

GoPie's storage system handles:

  • Dataset files - CSV, JSON, Parquet, Excel files
  • Export files - Query results and reports
  • Temporary files - Processing and cache files
  • User uploads - Direct file uploads

MinIO Setup (Development)

Docker Deployment

# docker-compose.yml
services:
  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"  # S3 API
      - "9001:9001"  # Web Console
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
      MINIO_DEFAULT_BUCKETS: gopie-datasets:public,gopie-exports:private
    volumes:
      - minio_data:/data
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

Initial Setup

# Install MinIO client
brew install minio-mc  # macOS
# or
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc

# Configure MinIO client
mc alias set local http://localhost:9000 minioadmin minioadmin

# Create buckets
mc mb local/gopie-datasets
mc mb local/gopie-exports
mc mb local/gopie-temp

# Set bucket policies
mc anonymous set download local/gopie-datasets  # Public read
mc anonymous set none local/gopie-exports       # Private

Bucket Structure

gopie-datasets/
├── project-uuid/
│   ├── dataset-uuid/
│   │   ├── original/
│   │   │   └── data.csv
│   │   ├── processed/
│   │   │   ├── data.parquet
│   │   │   └── metadata.json
│   │   └── samples/
│   │       └── sample_100.csv
│   └── dataset-uuid-2/
│       └── ...

gopie-exports/
├── user-uuid/
│   ├── export-uuid/
│   │   ├── results.csv
│   │   └── query.sql
│   └── export-uuid-2/
│       └── ...

gopie-temp/
├── upload-uuid/
│   └── processing/
└── cache/

AWS S3 Configuration (Production)

IAM Setup

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket",
        "s3:GetBucketLocation",
        "s3:GetObjectVersion",
        "s3:PutObjectAcl",
        "s3:GetObjectAcl"
      ],
      "Resource": [
        "arn:aws:s3:::gopie-datasets/*",
        "arn:aws:s3:::gopie-exports/*",
        "arn:aws:s3:::gopie-temp/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetBucketLocation",
        "s3:ListBucketVersions"
      ],
      "Resource": [
        "arn:aws:s3:::gopie-datasets",
        "arn:aws:s3:::gopie-exports",
        "arn:aws:s3:::gopie-temp"
      ]
    }
  ]
}

Bucket Configuration

# Create buckets
aws s3 mb s3://gopie-datasets --region us-east-1
aws s3 mb s3://gopie-exports --region us-east-1
aws s3 mb s3://gopie-temp --region us-east-1

# Enable versioning
aws s3api put-bucket-versioning \
  --bucket gopie-datasets \
  --versioning-configuration Status=Enabled

# Configure lifecycle rules
aws s3api put-bucket-lifecycle-configuration \
  --bucket gopie-temp \
  --lifecycle-configuration file://lifecycle.json

Lifecycle Rules

{
  "Rules": [
    {
      "ID": "DeleteTempFiles",
      "Status": "Enabled",
      "Prefix": "temp/",
      "Expiration": {
        "Days": 1
      }
    },
    {
      "ID": "TransitionOldExports",
      "Status": "Enabled",
      "Prefix": "exports/",
      "Transitions": [
        {
          "Days": 30,
          "StorageClass": "STANDARD_IA"
        },
        {
          "Days": 90,
          "StorageClass": "GLACIER"
        }
      ]
    },
    {
      "ID": "DeleteOldVersions",
      "Status": "Enabled",
      "NoncurrentVersionExpiration": {
        "NoncurrentDays": 30
      }
    }
  ]
}

Storage Client Configuration

Go Backend

// server/infrastructure/storage/s3_client.go
package storage

import (
    "context"
    "io"
    "time"
    
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

type S3Config struct {
    Endpoint        string
    AccessKey       string
    SecretKey       string
    Region          string
    BucketName      string
    UseSSL          bool
    ForcePathStyle  bool  // Required for MinIO
}

type S3Client struct {
    client     *s3.Client
    bucketName string
}

func NewS3Client(cfg S3Config) (*S3Client, error) {
    // Custom resolver for MinIO
    customResolver := aws.EndpointResolverWithOptionsFunc(
        func(service, region string, options ...interface{}) (aws.Endpoint, error) {
            if cfg.Endpoint != "" {
                return aws.Endpoint{
                    URL:               cfg.Endpoint,
                    SigningRegion:     cfg.Region,
                    HostnameImmutable: true,
                }, nil
            }
            return aws.Endpoint{}, &aws.EndpointNotFoundError{}
        },
    )
    
    // Load config
    awsCfg, err := config.LoadDefaultConfig(context.Background(),
        config.WithRegion(cfg.Region),
        config.WithEndpointResolverWithOptions(customResolver),
        config.WithCredentialsProvider(
            aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
                return aws.Credentials{
                    AccessKeyID:     cfg.AccessKey,
                    SecretAccessKey: cfg.SecretKey,
                }, nil
            }),
        ),
    )
    
    if err != nil {
        return nil, err
    }
    
    // Create S3 client
    client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
        o.UsePathStyle = cfg.ForcePathStyle
    })
    
    return &S3Client{
        client:     client,
        bucketName: cfg.BucketName,
    }, nil
}

// Upload file with progress tracking
func (c *S3Client) UploadFile(ctx context.Context, key string, reader io.Reader, contentType string) error {
    _, err := c.client.PutObject(ctx, &s3.PutObjectInput{
        Bucket:      &c.bucketName,
        Key:         &key,
        Body:        reader,
        ContentType: &contentType,
    })
    return err
}

// Generate presigned URL for upload
func (c *S3Client) GetPresignedUploadURL(ctx context.Context, key string, expiry time.Duration) (string, error) {
    presigner := s3.NewPresignedClient(c.client)
    
    request, err := presigner.PresignedPutObject(ctx, &s3.PutObjectInput{
        Bucket: &c.bucketName,
        Key:    &key,
    }, s3.WithPresignOptions(s3.PresignOptions{
        Expires: expiry,
    }))
    
    if err != nil {
        return "", err
    }
    
    return request.URL, nil
}

Python Chat Server

# chat-server/app/services/storage_service.py
import boto3
from botocore.client import Config
from typing import BinaryIO, Optional
import os

class StorageService:
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            endpoint_url=os.getenv('S3_ENDPOINT'),
            aws_access_key_id=os.getenv('S3_ACCESS_KEY'),
            aws_secret_access_key=os.getenv('S3_SECRET_KEY'),
            region_name=os.getenv('S3_REGION', 'us-east-1'),
            config=Config(
                signature_version='s3v4',
                s3={'addressing_style': 'path'}  # For MinIO
            )
        )
        self.bucket_name = os.getenv('S3_BUCKET_NAME')
    
    async def upload_file(
        self, 
        file: BinaryIO, 
        key: str, 
        content_type: Optional[str] = None
    ) -> str:
        """Upload file to S3"""
        extra_args = {}
        if content_type:
            extra_args['ContentType'] = content_type
        
        self.s3_client.upload_fileobj(
            file, 
            self.bucket_name, 
            key,
            ExtraArgs=extra_args
        )
        
        return f"s3://{self.bucket_name}/{key}"
    
    async def download_file(self, key: str) -> bytes:
        """Download file from S3"""
        response = self.s3_client.get_object(
            Bucket=self.bucket_name,
            Key=key
        )
        return response['Body'].read()
    
    def generate_presigned_url(
        self, 
        key: str, 
        expiration: int = 3600,
        http_method: str = 'GET'
    ) -> str:
        """Generate presigned URL"""
        return self.s3_client.generate_presigned_url(
            ClientMethod='get_object' if http_method == 'GET' else 'put_object',
            Params={'Bucket': self.bucket_name, 'Key': key},
            ExpiresIn=expiration
        )

File Upload Strategies

Direct Upload (Small Files)

// web/src/services/storage.ts
export async function uploadFile(file: File, projectId: string): Promise<string> {
  const formData = new FormData();
  formData.append('file', file);
  formData.append('projectId', projectId);
  
  const response = await fetch('/api/upload', {
    method: 'POST',
    body: formData,
  });
  
  if (!response.ok) {
    throw new Error('Upload failed');
  }
  
  const data = await response.json();
  return data.fileId;
}

Multipart Upload (Large Files)

// server/application/services/multipart_upload.go
func (s *StorageService) InitiateMultipartUpload(ctx context.Context, key string) (*s3.CreateMultipartUploadOutput, error) {
    return s.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
        Bucket: &s.bucketName,
        Key:    &key,
    })
}

func (s *StorageService) UploadPart(ctx context.Context, key string, uploadId string, partNumber int32, data io.Reader) (*s3.UploadPartOutput, error) {
    return s.client.UploadPart(ctx, &s3.UploadPartInput{
        Bucket:     &s.bucketName,
        Key:        &key,
        UploadId:   &uploadId,
        PartNumber: &partNumber,
        Body:       data,
    })
}

func (s *StorageService) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, parts []types.CompletedPart) error {
    _, err := s.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
        Bucket:   &s.bucketName,
        Key:      &key,
        UploadId: &uploadId,
        MultipartUpload: &types.CompletedMultipartUpload{
            Parts: parts,
        },
    })
    return err
}

Presigned URL Upload

// web/src/services/presigned-upload.ts
export async function uploadWithPresignedUrl(file: File, projectId: string) {
  // Get presigned URL from backend
  const response = await fetch('/api/upload/presigned', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      fileName: file.name,
      fileSize: file.size,
      contentType: file.type,
      projectId,
    }),
  });
  
  const { uploadUrl, fileId } = await response.json();
  
  // Upload directly to S3
  await fetch(uploadUrl, {
    method: 'PUT',
    body: file,
    headers: {
      'Content-Type': file.type,
    },
  });
  
  // Confirm upload
  await fetch('/api/upload/confirm', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ fileId }),
  });
  
  return fileId;
}

Data Processing Pipeline

File Processing Workflow

# chat-server/app/services/file_processor.py
import pandas as pd
import pyarrow.parquet as pq
from typing import Dict, Any

class FileProcessor:
    def __init__(self, storage_service: StorageService):
        self.storage = storage_service
    
    async def process_dataset(self, file_key: str, dataset_id: str) -> Dict[str, Any]:
        """Process uploaded dataset file"""
        # Download file
        file_data = await self.storage.download_file(file_key)
        
        # Detect file type and read
        if file_key.endswith('.csv'):
            df = pd.read_csv(io.BytesIO(file_data))
        elif file_key.endswith('.json'):
            df = pd.read_json(io.BytesIO(file_data))
        elif file_key.endswith('.parquet'):
            df = pd.read_parquet(io.BytesIO(file_data))
        elif file_key.endswith(('.xlsx', '.xls')):
            df = pd.read_excel(io.BytesIO(file_data))
        else:
            raise ValueError(f"Unsupported file type: {file_key}")
        
        # Generate metadata
        metadata = {
            'row_count': len(df),
            'column_count': len(df.columns),
            'columns': self._analyze_columns(df),
            'file_size': len(file_data),
            'memory_usage': df.memory_usage(deep=True).sum()
        }
        
        # Convert to Parquet for efficient storage
        parquet_buffer = io.BytesIO()
        df.to_parquet(parquet_buffer, engine='pyarrow', compression='snappy')
        parquet_buffer.seek(0)
        
        # Upload processed file
        processed_key = f"processed/{dataset_id}/data.parquet"
        await self.storage.upload_file(
            parquet_buffer,
            processed_key,
            'application/octet-stream'
        )
        
        # Create sample for preview
        sample_df = df.head(100)
        sample_buffer = io.BytesIO()
        sample_df.to_csv(sample_buffer, index=False)
        sample_buffer.seek(0)
        
        sample_key = f"samples/{dataset_id}/sample_100.csv"
        await self.storage.upload_file(
            sample_buffer,
            sample_key,
            'text/csv'
        )
        
        return {
            'metadata': metadata,
            'processed_file': processed_key,
            'sample_file': sample_key
        }
    
    def _analyze_columns(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
        """Analyze DataFrame columns"""
        columns = []
        
        for col in df.columns:
            col_data = {
                'name': col,
                'dtype': str(df[col].dtype),
                'null_count': df[col].isnull().sum(),
                'unique_count': df[col].nunique(),
                'sample_values': df[col].dropna().head(5).tolist()
            }
            
            # Additional statistics for numeric columns
            if pd.api.types.is_numeric_dtype(df[col]):
                col_data['stats'] = {
                    'min': float(df[col].min()),
                    'max': float(df[col].max()),
                    'mean': float(df[col].mean()),
                    'median': float(df[col].median()),
                    'std': float(df[col].std())
                }
            
            columns.append(col_data)
        
        return columns

Performance Optimization

Caching Strategy

// server/infrastructure/storage/cache.go
type CachedStorage struct {
    storage S3Client
    cache   *redis.Client
    ttl     time.Duration
}

func (cs *CachedStorage) GetFile(ctx context.Context, key string) ([]byte, error) {
    // Check cache first
    cached, err := cs.cache.Get(ctx, key).Bytes()
    if err == nil {
        return cached, nil
    }
    
    // Fetch from S3
    data, err := cs.storage.DownloadFile(ctx, key)
    if err != nil {
        return nil, err
    }
    
    // Cache for future requests
    cs.cache.Set(ctx, key, data, cs.ttl)
    
    return data, nil
}

CDN Integration

# nginx.conf for CDN
location /datasets/ {
    proxy_pass http://minio:9000/gopie-datasets/;
    proxy_cache datasets_cache;
    proxy_cache_valid 200 1d;
    proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504;
    add_header X-Cache-Status $upstream_cache_status;
    
    # CORS headers
    add_header 'Access-Control-Allow-Origin' '*';
    add_header 'Access-Control-Allow-Methods' 'GET, HEAD, OPTIONS';
}

Security Configuration

Encryption at Rest

# Enable S3 bucket encryption
aws s3api put-bucket-encryption \
  --bucket gopie-datasets \
  --server-side-encryption-configuration '{
    "Rules": [{
      "ApplyServerSideEncryptionByDefault": {
        "SSEAlgorithm": "AES256"
      }
    }]
  }'

Access Control

// Bucket policy for public datasets
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "PublicReadGetObject",
      "Effect": "Allow",
      "Principal": "*",
      "Action": "s3:GetObject",
      "Resource": "arn:aws:s3:::gopie-datasets/public/*"
    },
    {
      "Sid": "DenyUnencryptedObjectUploads",
      "Effect": "Deny",
      "Principal": "*",
      "Action": "s3:PutObject",
      "Resource": "arn:aws:s3:::gopie-datasets/*",
      "Condition": {
        "StringNotEquals": {
          "s3:x-amz-server-side-encryption": "AES256"
        }
      }
    }
  ]
}

File Validation

// server/domain/validators/file_validator.go
type FileValidator struct {
    maxSize      int64
    allowedTypes []string
}

func (v *FileValidator) Validate(file io.Reader, filename string) error {
    // Check file size
    buf := make([]byte, 512)
    n, err := file.Read(buf)
    if err != nil {
        return err
    }
    
    // Detect content type
    contentType := http.DetectContentType(buf[:n])
    
    // Validate content type
    allowed := false
    for _, t := range v.allowedTypes {
        if strings.Contains(contentType, t) {
            allowed = true
            break
        }
    }
    
    if !allowed {
        return fmt.Errorf("file type %s not allowed", contentType)
    }
    
    // Check for malicious content
    if err := v.scanForMalware(buf); err != nil {
        return err
    }
    
    return nil
}

Monitoring and Alerts

Storage Metrics

# chat-server/app/monitoring/storage_metrics.py
from prometheus_client import Counter, Histogram, Gauge

# Define metrics
storage_upload_total = Counter(
    'storage_upload_total',
    'Total number of file uploads',
    ['bucket', 'status']
)

storage_upload_duration = Histogram(
    'storage_upload_duration_seconds',
    'File upload duration',
    ['bucket']
)

storage_bucket_size = Gauge(
    'storage_bucket_size_bytes',
    'Total size of bucket in bytes',
    ['bucket']
)

# Track metrics
async def track_upload(bucket: str, size: int, duration: float, success: bool):
    storage_upload_total.labels(
        bucket=bucket,
        status='success' if success else 'failure'
    ).inc()
    
    if success:
        storage_upload_duration.labels(bucket=bucket).observe(duration)
        storage_bucket_size.labels(bucket=bucket).inc(size)

CloudWatch Alarms

# terraform/storage_alarms.tf
resource "aws_cloudwatch_metric_alarm" "s3_high_error_rate" {
  alarm_name          = "gopie-s3-high-error-rate"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name         = "4xxErrors"
  namespace           = "AWS/S3"
  period              = "300"
  statistic           = "Sum"
  threshold           = "10"
  alarm_description   = "S3 bucket experiencing high error rate"
  
  dimensions = {
    BucketName = "gopie-datasets"
  }
}

resource "aws_cloudwatch_metric_alarm" "s3_storage_limit" {
  alarm_name          = "gopie-s3-storage-limit"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "BucketSizeBytes"
  namespace           = "AWS/S3"
  period              = "86400"
  statistic           = "Average"
  threshold           = "1099511627776"  # 1TB
  alarm_description   = "S3 bucket approaching storage limit"
  
  dimensions = {
    BucketName = "gopie-datasets"
  }
}

Disaster Recovery

Cross-Region Replication

# Enable versioning (required for replication)
aws s3api put-bucket-versioning \
  --bucket gopie-datasets \
  --versioning-configuration Status=Enabled

# Configure replication
aws s3api put-bucket-replication \
  --bucket gopie-datasets \
  --replication-configuration file://replication.json

Backup Strategy

# k8s/cronjob-backup.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: storage-backup
spec:
  schedule: "0 2 * * *"  # Daily at 2 AM
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: backup
            image: amazon/aws-cli
            command:
            - /bin/sh
            - -c
            - |
              # Sync to backup bucket
              aws s3 sync s3://gopie-datasets s3://gopie-datasets-backup \
                --delete \
                --storage-class GLACIER
              
              # Create backup manifest
              aws s3 ls s3://gopie-datasets --recursive > /tmp/manifest.txt
              aws s3 cp /tmp/manifest.txt s3://gopie-datasets-backup/manifests/$(date +%Y%m%d).txt

Next Steps