Storage Configuration
Setting up and configuring storage systems for GoPie
GoPie uses S3-compatible object storage for storing datasets, exports, and other files. This guide covers storage setup and configuration options.
Storage Architecture
Overview
GoPie's storage system handles:
- Dataset files - CSV, JSON, Parquet, Excel files
- Export files - Query results and reports
- Temporary files - Processing and cache files
- User uploads - Direct file uploads
MinIO Setup (Development)
Docker Deployment
# docker-compose.yml
services:
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
ports:
- "9000:9000" # S3 API
- "9001:9001" # Web Console
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
MINIO_DEFAULT_BUCKETS: gopie-datasets:public,gopie-exports:private
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3Initial Setup
# Install MinIO client
brew install minio-mc # macOS
# or
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
# Configure MinIO client
mc alias set local http://localhost:9000 minioadmin minioadmin
# Create buckets
mc mb local/gopie-datasets
mc mb local/gopie-exports
mc mb local/gopie-temp
# Set bucket policies
mc anonymous set download local/gopie-datasets # Public read
mc anonymous set none local/gopie-exports # PrivateBucket Structure
gopie-datasets/
├── project-uuid/
│ ├── dataset-uuid/
│ │ ├── original/
│ │ │ └── data.csv
│ │ ├── processed/
│ │ │ ├── data.parquet
│ │ │ └── metadata.json
│ │ └── samples/
│ │ └── sample_100.csv
│ └── dataset-uuid-2/
│ └── ...
gopie-exports/
├── user-uuid/
│ ├── export-uuid/
│ │ ├── results.csv
│ │ └── query.sql
│ └── export-uuid-2/
│ └── ...
gopie-temp/
├── upload-uuid/
│ └── processing/
└── cache/AWS S3 Configuration (Production)
IAM Setup
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:GetObjectVersion",
"s3:PutObjectAcl",
"s3:GetObjectAcl"
],
"Resource": [
"arn:aws:s3:::gopie-datasets/*",
"arn:aws:s3:::gopie-exports/*",
"arn:aws:s3:::gopie-temp/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:ListBucketVersions"
],
"Resource": [
"arn:aws:s3:::gopie-datasets",
"arn:aws:s3:::gopie-exports",
"arn:aws:s3:::gopie-temp"
]
}
]
}Bucket Configuration
# Create buckets
aws s3 mb s3://gopie-datasets --region us-east-1
aws s3 mb s3://gopie-exports --region us-east-1
aws s3 mb s3://gopie-temp --region us-east-1
# Enable versioning
aws s3api put-bucket-versioning \
--bucket gopie-datasets \
--versioning-configuration Status=Enabled
# Configure lifecycle rules
aws s3api put-bucket-lifecycle-configuration \
--bucket gopie-temp \
--lifecycle-configuration file://lifecycle.jsonLifecycle Rules
{
"Rules": [
{
"ID": "DeleteTempFiles",
"Status": "Enabled",
"Prefix": "temp/",
"Expiration": {
"Days": 1
}
},
{
"ID": "TransitionOldExports",
"Status": "Enabled",
"Prefix": "exports/",
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA"
},
{
"Days": 90,
"StorageClass": "GLACIER"
}
]
},
{
"ID": "DeleteOldVersions",
"Status": "Enabled",
"NoncurrentVersionExpiration": {
"NoncurrentDays": 30
}
}
]
}Storage Client Configuration
Go Backend
// server/infrastructure/storage/s3_client.go
package storage
import (
"context"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
type S3Config struct {
Endpoint string
AccessKey string
SecretKey string
Region string
BucketName string
UseSSL bool
ForcePathStyle bool // Required for MinIO
}
type S3Client struct {
client *s3.Client
bucketName string
}
func NewS3Client(cfg S3Config) (*S3Client, error) {
// Custom resolver for MinIO
customResolver := aws.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if cfg.Endpoint != "" {
return aws.Endpoint{
URL: cfg.Endpoint,
SigningRegion: cfg.Region,
HostnameImmutable: true,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
},
)
// Load config
awsCfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(cfg.Region),
config.WithEndpointResolverWithOptions(customResolver),
config.WithCredentialsProvider(
aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) {
return aws.Credentials{
AccessKeyID: cfg.AccessKey,
SecretAccessKey: cfg.SecretKey,
}, nil
}),
),
)
if err != nil {
return nil, err
}
// Create S3 client
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
o.UsePathStyle = cfg.ForcePathStyle
})
return &S3Client{
client: client,
bucketName: cfg.BucketName,
}, nil
}
// Upload file with progress tracking
func (c *S3Client) UploadFile(ctx context.Context, key string, reader io.Reader, contentType string) error {
_, err := c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &c.bucketName,
Key: &key,
Body: reader,
ContentType: &contentType,
})
return err
}
// Generate presigned URL for upload
func (c *S3Client) GetPresignedUploadURL(ctx context.Context, key string, expiry time.Duration) (string, error) {
presigner := s3.NewPresignedClient(c.client)
request, err := presigner.PresignedPutObject(ctx, &s3.PutObjectInput{
Bucket: &c.bucketName,
Key: &key,
}, s3.WithPresignOptions(s3.PresignOptions{
Expires: expiry,
}))
if err != nil {
return "", err
}
return request.URL, nil
}Python Chat Server
# chat-server/app/services/storage_service.py
import boto3
from botocore.client import Config
from typing import BinaryIO, Optional
import os
class StorageService:
def __init__(self):
self.s3_client = boto3.client(
's3',
endpoint_url=os.getenv('S3_ENDPOINT'),
aws_access_key_id=os.getenv('S3_ACCESS_KEY'),
aws_secret_access_key=os.getenv('S3_SECRET_KEY'),
region_name=os.getenv('S3_REGION', 'us-east-1'),
config=Config(
signature_version='s3v4',
s3={'addressing_style': 'path'} # For MinIO
)
)
self.bucket_name = os.getenv('S3_BUCKET_NAME')
async def upload_file(
self,
file: BinaryIO,
key: str,
content_type: Optional[str] = None
) -> str:
"""Upload file to S3"""
extra_args = {}
if content_type:
extra_args['ContentType'] = content_type
self.s3_client.upload_fileobj(
file,
self.bucket_name,
key,
ExtraArgs=extra_args
)
return f"s3://{self.bucket_name}/{key}"
async def download_file(self, key: str) -> bytes:
"""Download file from S3"""
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=key
)
return response['Body'].read()
def generate_presigned_url(
self,
key: str,
expiration: int = 3600,
http_method: str = 'GET'
) -> str:
"""Generate presigned URL"""
return self.s3_client.generate_presigned_url(
ClientMethod='get_object' if http_method == 'GET' else 'put_object',
Params={'Bucket': self.bucket_name, 'Key': key},
ExpiresIn=expiration
)File Upload Strategies
Direct Upload (Small Files)
// web/src/services/storage.ts
export async function uploadFile(file: File, projectId: string): Promise<string> {
const formData = new FormData();
formData.append('file', file);
formData.append('projectId', projectId);
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
});
if (!response.ok) {
throw new Error('Upload failed');
}
const data = await response.json();
return data.fileId;
}Multipart Upload (Large Files)
// server/application/services/multipart_upload.go
func (s *StorageService) InitiateMultipartUpload(ctx context.Context, key string) (*s3.CreateMultipartUploadOutput, error) {
return s.client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: &s.bucketName,
Key: &key,
})
}
func (s *StorageService) UploadPart(ctx context.Context, key string, uploadId string, partNumber int32, data io.Reader) (*s3.UploadPartOutput, error) {
return s.client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: &s.bucketName,
Key: &key,
UploadId: &uploadId,
PartNumber: &partNumber,
Body: data,
})
}
func (s *StorageService) CompleteMultipartUpload(ctx context.Context, key string, uploadId string, parts []types.CompletedPart) error {
_, err := s.client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &s.bucketName,
Key: &key,
UploadId: &uploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: parts,
},
})
return err
}Presigned URL Upload
// web/src/services/presigned-upload.ts
export async function uploadWithPresignedUrl(file: File, projectId: string) {
// Get presigned URL from backend
const response = await fetch('/api/upload/presigned', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
fileName: file.name,
fileSize: file.size,
contentType: file.type,
projectId,
}),
});
const { uploadUrl, fileId } = await response.json();
// Upload directly to S3
await fetch(uploadUrl, {
method: 'PUT',
body: file,
headers: {
'Content-Type': file.type,
},
});
// Confirm upload
await fetch('/api/upload/confirm', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ fileId }),
});
return fileId;
}Data Processing Pipeline
File Processing Workflow
# chat-server/app/services/file_processor.py
import pandas as pd
import pyarrow.parquet as pq
from typing import Dict, Any
class FileProcessor:
def __init__(self, storage_service: StorageService):
self.storage = storage_service
async def process_dataset(self, file_key: str, dataset_id: str) -> Dict[str, Any]:
"""Process uploaded dataset file"""
# Download file
file_data = await self.storage.download_file(file_key)
# Detect file type and read
if file_key.endswith('.csv'):
df = pd.read_csv(io.BytesIO(file_data))
elif file_key.endswith('.json'):
df = pd.read_json(io.BytesIO(file_data))
elif file_key.endswith('.parquet'):
df = pd.read_parquet(io.BytesIO(file_data))
elif file_key.endswith(('.xlsx', '.xls')):
df = pd.read_excel(io.BytesIO(file_data))
else:
raise ValueError(f"Unsupported file type: {file_key}")
# Generate metadata
metadata = {
'row_count': len(df),
'column_count': len(df.columns),
'columns': self._analyze_columns(df),
'file_size': len(file_data),
'memory_usage': df.memory_usage(deep=True).sum()
}
# Convert to Parquet for efficient storage
parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, engine='pyarrow', compression='snappy')
parquet_buffer.seek(0)
# Upload processed file
processed_key = f"processed/{dataset_id}/data.parquet"
await self.storage.upload_file(
parquet_buffer,
processed_key,
'application/octet-stream'
)
# Create sample for preview
sample_df = df.head(100)
sample_buffer = io.BytesIO()
sample_df.to_csv(sample_buffer, index=False)
sample_buffer.seek(0)
sample_key = f"samples/{dataset_id}/sample_100.csv"
await self.storage.upload_file(
sample_buffer,
sample_key,
'text/csv'
)
return {
'metadata': metadata,
'processed_file': processed_key,
'sample_file': sample_key
}
def _analyze_columns(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Analyze DataFrame columns"""
columns = []
for col in df.columns:
col_data = {
'name': col,
'dtype': str(df[col].dtype),
'null_count': df[col].isnull().sum(),
'unique_count': df[col].nunique(),
'sample_values': df[col].dropna().head(5).tolist()
}
# Additional statistics for numeric columns
if pd.api.types.is_numeric_dtype(df[col]):
col_data['stats'] = {
'min': float(df[col].min()),
'max': float(df[col].max()),
'mean': float(df[col].mean()),
'median': float(df[col].median()),
'std': float(df[col].std())
}
columns.append(col_data)
return columnsPerformance Optimization
Caching Strategy
// server/infrastructure/storage/cache.go
type CachedStorage struct {
storage S3Client
cache *redis.Client
ttl time.Duration
}
func (cs *CachedStorage) GetFile(ctx context.Context, key string) ([]byte, error) {
// Check cache first
cached, err := cs.cache.Get(ctx, key).Bytes()
if err == nil {
return cached, nil
}
// Fetch from S3
data, err := cs.storage.DownloadFile(ctx, key)
if err != nil {
return nil, err
}
// Cache for future requests
cs.cache.Set(ctx, key, data, cs.ttl)
return data, nil
}CDN Integration
# nginx.conf for CDN
location /datasets/ {
proxy_pass http://minio:9000/gopie-datasets/;
proxy_cache datasets_cache;
proxy_cache_valid 200 1d;
proxy_cache_use_stale error timeout updating http_500 http_502 http_503 http_504;
add_header X-Cache-Status $upstream_cache_status;
# CORS headers
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, HEAD, OPTIONS';
}Security Configuration
Encryption at Rest
# Enable S3 bucket encryption
aws s3api put-bucket-encryption \
--bucket gopie-datasets \
--server-side-encryption-configuration '{
"Rules": [{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "AES256"
}
}]
}'Access Control
// Bucket policy for public datasets
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "PublicReadGetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::gopie-datasets/public/*"
},
{
"Sid": "DenyUnencryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::gopie-datasets/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}
}
]
}File Validation
// server/domain/validators/file_validator.go
type FileValidator struct {
maxSize int64
allowedTypes []string
}
func (v *FileValidator) Validate(file io.Reader, filename string) error {
// Check file size
buf := make([]byte, 512)
n, err := file.Read(buf)
if err != nil {
return err
}
// Detect content type
contentType := http.DetectContentType(buf[:n])
// Validate content type
allowed := false
for _, t := range v.allowedTypes {
if strings.Contains(contentType, t) {
allowed = true
break
}
}
if !allowed {
return fmt.Errorf("file type %s not allowed", contentType)
}
// Check for malicious content
if err := v.scanForMalware(buf); err != nil {
return err
}
return nil
}Monitoring and Alerts
Storage Metrics
# chat-server/app/monitoring/storage_metrics.py
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
storage_upload_total = Counter(
'storage_upload_total',
'Total number of file uploads',
['bucket', 'status']
)
storage_upload_duration = Histogram(
'storage_upload_duration_seconds',
'File upload duration',
['bucket']
)
storage_bucket_size = Gauge(
'storage_bucket_size_bytes',
'Total size of bucket in bytes',
['bucket']
)
# Track metrics
async def track_upload(bucket: str, size: int, duration: float, success: bool):
storage_upload_total.labels(
bucket=bucket,
status='success' if success else 'failure'
).inc()
if success:
storage_upload_duration.labels(bucket=bucket).observe(duration)
storage_bucket_size.labels(bucket=bucket).inc(size)CloudWatch Alarms
# terraform/storage_alarms.tf
resource "aws_cloudwatch_metric_alarm" "s3_high_error_rate" {
alarm_name = "gopie-s3-high-error-rate"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "4xxErrors"
namespace = "AWS/S3"
period = "300"
statistic = "Sum"
threshold = "10"
alarm_description = "S3 bucket experiencing high error rate"
dimensions = {
BucketName = "gopie-datasets"
}
}
resource "aws_cloudwatch_metric_alarm" "s3_storage_limit" {
alarm_name = "gopie-s3-storage-limit"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "BucketSizeBytes"
namespace = "AWS/S3"
period = "86400"
statistic = "Average"
threshold = "1099511627776" # 1TB
alarm_description = "S3 bucket approaching storage limit"
dimensions = {
BucketName = "gopie-datasets"
}
}Disaster Recovery
Cross-Region Replication
# Enable versioning (required for replication)
aws s3api put-bucket-versioning \
--bucket gopie-datasets \
--versioning-configuration Status=Enabled
# Configure replication
aws s3api put-bucket-replication \
--bucket gopie-datasets \
--replication-configuration file://replication.jsonBackup Strategy
# k8s/cronjob-backup.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: storage-backup
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: amazon/aws-cli
command:
- /bin/sh
- -c
- |
# Sync to backup bucket
aws s3 sync s3://gopie-datasets s3://gopie-datasets-backup \
--delete \
--storage-class GLACIER
# Create backup manifest
aws s3 ls s3://gopie-datasets --recursive > /tmp/manifest.txt
aws s3 cp /tmp/manifest.txt s3://gopie-datasets-backup/manifests/$(date +%Y%m%d).txtNext Steps
- Set up AI Providers
- Configure Docker Deployment
- Implement Monitoring
- Review Scaling Strategies