Cloud Object Storage

Why This Matters - The Data Backbone of Modern Applications

Every time you upload a photo to Instagram, stream a movie on Netflix, or save a document to Dropbox, you're using cloud object storage. This technology has revolutionized how we store and access data at scale, making it possible to build applications that serve billions of users without managing physical storage infrastructure.

Real-World Impact: Companies like Spotify and Netflix store petabytes of media files in object storage, serving millions of concurrent users while paying only for the storage they actually use. This scalability and cost-effectiveness has enabled entirely new business models and transformed digital content delivery.

The Engineering Challenge: Traditional file systems can't handle modern application requirements. When you need to store billions of objects, serve them globally with low latency, and ensure 99.999999999% durability, you need a fundamentally different approach. Object storage provides exactly that - a massive, infinitely scalable digital warehouse where every item has a unique address and can be retrieved instantly from anywhere in the world.

Learning Objectives

By the end of this tutorial, you will be able to:

🎯 Core Understanding: Explain how object storage differs from traditional file systems and when to use each

🏗️ Implementation Skills: Build production-ready storage clients for AWS S3, Google Cloud Storage, and Azure Blob Storage

🔧 Advanced Patterns: Implement multipart uploads, presigned URLs, versioning, and lifecycle management

🚀 Production Mastery: Design scalable, secure, and cost-effective storage solutions for real-world applications

Core Concepts - Understanding Object Storage

Object storage represents a paradigm shift from traditional file systems. Think of it as the difference between a local library and Amazon's global fulfillment network.

Key Architectural Differences

Traditional File Systems:

  • Hierarchical structure with folders and files
  • Limited by physical disk capacity
  • Designed for frequent modifications
  • Local or network-attached access only

Object Storage:

  • Flat namespace with unique object keys
  • Virtually infinite scalability
  • Optimized for write-once, read-many patterns
  • Global access via HTTP/REST APIs

The Object Model:

 1// Object Storage Structure
 2type Object struct {
 3    Key           string            // Unique identifier
 4    Data          io.Reader         // The actual content
 5    Metadata      map[string]string // Custom key-value pairs
 6    ContentType   string            // MIME type for the content
 7    StorageClass  string            // Performance/cost tier
 8    ETag          string            // Content hash for integrity
 9    LastModified  time.Time         // Modification timestamp
10}

Storage Classes: The Performance-Cost Spectrum

Cloud providers offer different storage classes optimized for different access patterns:

Frequent Access: Low latency, high cost - for active user content
Infrequent Access: Higher latency, lower cost - for backups, archives
Archive: High latency, very low cost - for compliance data

Cost Impact Example: Storing 1TB for 1 year:

  • Standard: ~$23/month
  • Infrequent Access: ~$12/month
  • Glacier: ~$4/month

Choosing the right storage class can save 80%+ on storage costs while maintaining required performance.

Practical Examples - Building Production Storage Systems

Let's start with a simple S3 client and progressively add production features.

Basic S3 Operations

  1// run
  2package main
  3
  4import (
  5    "bytes"
  6    "context"
  7    "fmt"
  8    "io"
  9    "log"
 10    "time"
 11
 12    "github.com/aws/aws-sdk-go-v2/aws"
 13    "github.com/aws/aws-sdk-go-v2/config"
 14    "github.com/aws/aws-sdk-go-v2/service/s3"
 15    "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
 16)
 17
 18// S3Client provides a simplified interface for S3 operations
 19type S3Client struct {
 20    client     *s3.Client
 21    uploader   *manager.Uploader
 22    downloader *manager.Downloader
 23    bucket     string
 24}
 25
 26// NewS3Client creates a new S3 client with production defaults
 27func NewS3Client(ctx context.Context, bucket string) {
 28    // Load AWS configuration from environment/credentials
 29    cfg, err := config.LoadDefaultConfig(ctx)
 30    if err != nil {
 31        return nil, fmt.Errorf("failed to load AWS config: %w", err)
 32    }
 33
 34    client := s3.NewFromConfig(cfg)
 35
 36    // Configure uploader with optimized settings for production
 37    uploader := manager.NewUploader(client, func(u *manager.Uploader) {
 38        u.PartSize = 16 * 1024 * 1024 // 16MB parts for better performance
 39        u.Concurrency = 5                // Parallel uploads
 40    })
 41
 42    // Configure downloader with optimized settings
 43    downloader := manager.NewDownloader(client, func(d *manager.Downloader) {
 44        d.PartSize = 16 * 1024 * 1024
 45        d.Concurrency = 5
 46    })
 47
 48    return &S3Client{
 49        client:     client,
 50        uploader:   uploader,
 51        downloader: downloader,
 52        bucket:     bucket,
 53    }, nil
 54}
 55
 56// Upload stores data in S3 with automatic multipart handling
 57func Upload(ctx context.Context, key string, data io.Reader) error {
 58    input := &s3.PutObjectInput{
 59        Bucket:      aws.String(s.bucket),
 60        Key:         aws.String(key),
 61        Body:        data,
 62        ContentType: aws.String("application/octet-stream"),
 63    }
 64
 65    // Use multipart uploader for files > 16MB
 66    _, err := s.uploader.Upload(ctx, input)
 67    if err != nil {
 68        return fmt.Errorf("upload failed for key %s: %w", key, err)
 69    }
 70
 71    return nil
 72}
 73
 74// Download retrieves data from S3 with automatic multipart handling
 75func Download(ctx context.Context, key string) {
 76    // Use write-at buffer for memory efficiency
 77    buf := manager.NewWriteAtBuffer([]byte{})
 78
 79    _, err := s.downloader.Download(ctx, buf, &s3.GetObjectInput{
 80        Bucket: aws.String(s.bucket),
 81        Key:    aws.String(key),
 82    })
 83    if err != nil {
 84        return nil, fmt.Errorf("download failed for key %s: %w", key, err)
 85    }
 86
 87    return buf.Bytes(), nil
 88}
 89
 90// List objects with pagination support
 91func List(ctx context.Context, prefix string) {
 92    var objects []string
 93
 94    paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
 95        Bucket: aws.String(s.bucket),
 96        Prefix: aws.String(prefix),
 97    })
 98
 99    for paginator.HasMorePages() {
100        page, err := paginator.NextPage(ctx)
101        if err != nil {
102            return nil, fmt.Errorf("failed to list objects: %w", err)
103        }
104
105        for _, obj := range page.Contents {
106            objects = append(objects, *obj.Key)
107        }
108    }
109
110    return objects, nil
111}
112
113func main() {
114    ctx := context.Background()
115
116    // Initialize S3 client
117    client, err := NewS3Client(ctx, "my-production-bucket")
118    if err != nil {
119        log.Fatal("Failed to create S3 client:", err)
120    }
121
122    // Example: Upload a file
123    data := bytes.NewReader([]byte("Hello, Cloud Storage!"))
124    err = client.Upload(ctx, "examples/hello.txt", data)
125    if err != nil {
126        log.Fatal("Upload failed:", err)
127    }
128    fmt.Println("✅ File uploaded successfully")
129
130    // Example: List files
131    files, err := client.List(ctx, "examples/")
132    if err != nil {
133        log.Fatal("List failed:", err)
134    }
135    fmt.Printf("✅ Found %d files in examples/\n", len(files))
136
137    // Example: Download a file
138    content, err := client.Download(ctx, "examples/hello.txt")
139    if err != nil {
140        log.Fatal("Download failed:", err)
141    }
142    fmt.Printf("✅ Downloaded content: %s\n", string(content))
143}

Enhanced S3 Client with Production Features

  1// run
  2package main
  3
  4import (
  5    "context"
  6    "fmt"
  7    "net/url"
  8    "time"
  9
 10    "github.com/aws/aws-sdk-go-v2/aws"
 11    "github.com/aws/aws-sdk-go-v2/service/s3"
 12    "github.com/aws/aws-sdk-go-v2/service/s3/types"
 13)
 14
 15// ProductionS3Client extends basic functionality with enterprise features
 16type ProductionS3Client struct {
 17    *S3Client
 18}
 19
 20// UploadOptions configures upload behavior
 21type UploadOptions struct {
 22    ContentType   string
 23    CacheControl  string
 24    StorageClass  types.StorageClass
 25    Metadata      map[string]string
 26    Encrypt       bool
 27}
 28
 29// UploadWithOptions provides enhanced upload capabilities
 30func UploadWithOptions(ctx context.Context, key string, data io.Reader, opts UploadOptions) error {
 31    input := &s3.PutObjectInput{
 32        Bucket:      aws.String(p.bucket),
 33        Key:         aws.String(key),
 34        Body:        data,
 35        ContentType: aws.String(opts.ContentType),
 36    }
 37
 38    // Set storage class if specified
 39    if opts.StorageClass != "" {
 40        input.StorageClass = opts.StorageClass
 41    }
 42
 43    // Set cache control if specified
 44    if opts.CacheControl != "" {
 45        input.CacheControl = aws.String(opts.CacheControl)
 46    }
 47
 48    // Set metadata if provided
 49    if len(opts.Metadata) > 0 {
 50        input.Metadata = opts.Metadata
 51    }
 52
 53    // Enable server-side encryption if requested
 54    if opts.Encrypt {
 55        input.ServerSideEncryption = types.ServerSideEncryptionAes256
 56    }
 57
 58    _, err := p.uploader.Upload(ctx, input)
 59    return err
 60}
 61
 62// GeneratePresignedURL creates a temporary URL for direct access
 63func GeneratePresignedURL(ctx context.Context, key string, expiry time.Duration) {
 64    presignClient := s3.NewPresignClient(p.client)
 65
 66    request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
 67        Bucket: aws.String(p.bucket),
 68        Key:    aws.String(key),
 69    }, s3.WithPresignExpires(expiry))
 70
 71    if err != nil {
 72        return "", fmt.Errorf("failed to generate presigned URL: %w", err)
 73    }
 74
 75    return request.URL, nil
 76}
 77
 78// GeneratePresignedUploadURL creates a URL for direct browser uploads
 79func GeneratePresignedUploadURL(ctx context.Context, key string, expiry time.Duration) {
 80    presignClient := s3.NewPresignClient(p.client)
 81
 82    request, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{
 83        Bucket: aws.String(p.bucket),
 84        Key:    aws.String(key),
 85    }, s3.WithPresignExpires(expiry))
 86
 87    if err != nil {
 88        return "", fmt.Errorf("failed to generate presigned upload URL: %w", err)
 89    }
 90
 91    return request.URL, nil
 92}
 93
 94// CopyObject creates a copy of an object within S3
 95func CopyObject(ctx context.Context, sourceKey, destKey string) error {
 96    copySource := fmt.Sprintf("%s/%s", p.bucket, sourceKey)
 97
 98    _, err := p.client.CopyObject(ctx, &s3.CopyObjectInput{
 99        Bucket:     aws.String(p.bucket),
100        CopySource: aws.String(copySource),
101        Key:        aws.String(destKey),
102    })
103
104    return err
105}
106
107// DeleteObjects efficiently deletes multiple objects
108func DeleteObjects(ctx context.Context, keys []string) error {
109    if len(keys) == 0 {
110        return nil
111    }
112
113    // S3 allows deleting up to 1000 objects per request
114    const batchSize = 1000
115
116    for i := 0; i < len(keys); i += batchSize {
117        end := i + batchSize
118        if end > len(keys) {
119            end = len(keys)
120        }
121
122        batch := keys[i:end]
123        objects := make([]types.ObjectIdentifier, len(batch))
124
125        for j, key := range batch {
126            objects[j] = types.ObjectIdentifier{Key: aws.String(key)}
127        }
128
129        _, err := p.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
130            Bucket: aws.String(p.bucket),
131            Delete: &types.Delete{
132                Objects: objects,
133            },
134        })
135
136        if err != nil {
137            return fmt.Errorf("failed to delete batch %d-%d: %w", i, end-1, err)
138        }
139    }
140
141    return nil
142}
143
144func main() {
145    ctx := context.Background()
146
147    // Initialize production S3 client
148    client, err := NewS3Client(ctx, "my-production-bucket")
149    if err != nil {
150        panic(err)
151    }
152
153    prodClient := &ProductionS3Client{S3Client: client}
154
155    // Example: Upload with options
156    data := bytes.NewReader([]byte("Production file with metadata"))
157    err = prodClient.UploadWithOptions(ctx, "production/example.txt", data, UploadOptions{
158        ContentType:  "text/plain",
159        CacheControl: "max-age=3600",
160        StorageClass: types.StorageClassStandardIa, // Infrequent access
161        Metadata: map[string]string{
162            "environment": "production",
163            "owner":       "data-team",
164        },
165        Encrypt: true,
166    })
167    if err != nil {
168        panic(err)
169    }
170    fmt.Println("✅ File uploaded with production options")
171
172    // Example: Generate presigned URL
173    url, err := prodClient.GeneratePresignedURL(ctx, "production/example.txt", 15*time.Minute)
174    if err != nil {
175        panic(err)
176    }
177    fmt.Printf("✅ Presigned URL: %s\n", url)
178
179    // Example: Generate presigned upload URL
180    uploadURL, err := prodClient.GeneratePresignedUploadURL(ctx, "uploads/user-file.jpg", 1*time.Hour)
181    if err != nil {
182        panic(err)
183    }
184    fmt.Printf("✅ Presigned upload URL: %s\n", uploadURL)
185}

Common Patterns and Pitfalls - Production Experience

Pattern 1: Direct Browser Uploads

Problem: Handling file uploads through your server creates bandwidth costs and scaling bottlenecks.

Solution: Use presigned URLs to let browsers upload directly to cloud storage.

 1// DirectUploadHandler generates presigned URLs for browser uploads
 2func DirectUploadHandler(w http.ResponseWriter, r *http.Request) {
 3    // Generate unique key with timestamp and random ID
 4    key := fmt.Sprintf("uploads/%d/%s",
 5        time.Now().Unix(),
 6        generateSecureID())
 7
 8    // Generate presigned upload URL
 9    uploadURL, err := h.storage.GeneratePresignedUploadURL(r.Context(), key, time.Hour)
10    if err != nil {
11        http.Error(w, "Failed to generate upload URL", http.StatusInternalServerError)
12        return
13    }
14
15    // Return upload configuration to browser
16    response := map[string]interface{}{
17        "upload_url": uploadURL,
18        "key":        key,
19        "expires_at": time.Now().Add(time.Hour),
20        "max_size":   100 * 1024 * 1024, // 100MB limit
21    }
22
23    w.Header().Set("Content-Type", "application/json")
24    json.NewEncoder(w).Encode(response)
25}

Pattern 2: Intelligent Storage Class Selection

Problem: Storing all data in Standard storage is expensive and unnecessary.

Solution: Automatically move data to cheaper storage classes based on access patterns.

 1// IntelligentStorageManager handles storage class transitions
 2type IntelligentStorageManager struct {
 3    client *ProductionS3Client
 4}
 5
 6// AutoClassify determines optimal storage class based on access patterns
 7func AutoClassify(ctx context.Context, key string) error {
 8    // Get object metadata
 9    metadata, err := m.client.GetObjectMetadata(ctx, key)
10    if err != nil {
11        return err
12    }
13
14    // Implement your business logic here
15    daysSinceCreation := time.Since(metadata.LastModified).Hours() / 24
16    accessFrequency := m.getAccessFrequency(key)
17
18    var newClass types.StorageClass
19
20    switch {
21    case daysSinceCreation < 30 && accessFrequency > 10:
22        newClass = types.StorageClassStandard
23    case daysSinceCreation < 90 && accessFrequency > 1:
24        newClass = types.StorageClassStandardIa
25    case daysSinceCreation < 365:
26        newClass = types.StorageClassGlacier
27    default:
28        newClass = types.StorageClassDeepArchive
29    }
30
31    // Only transition if class would change
32    if metadata.StorageClass == newClass {
33        return nil
34    }
35
36    return m.client.TransitionObject(ctx, key, newClass)
37}

Common Pitfalls to Avoid

❌ Ignoring Data Transfer Costs: Moving data between regions or out of cloud storage can be expensive. Always check data transfer pricing.

❌ Forgetting Lifecycle Policies: Old data accumulates and costs spiral. Implement automatic deletion or archival.

❌ Using Wrong Storage Class: Storing frequently accessed data in Glacier class leads to high retrieval costs and delays.

❌ Not Implementing Retries: Network failures are common. Implement exponential backoff retries.

❌ Ignoring Security: Never make objects public by default. Use presigned URLs for temporary access.

Integration and Mastery - Building Real-World Systems

Real-World Example: Social Media Storage System

Let's build a complete storage system like those used by Instagram and TikTok:

  1// run
  2package main
  3
  4import (
  5    "context"
  6    "crypto/sha256"
  7    "encoding/hex"
  8    "fmt"
  9    "io"
 10    "net/http"
 11    "path/filepath"
 12    "strings"
 13    "time"
 14
 15    "github.com/aws/aws-sdk-go-v2/service/s3/types"
 16)
 17
 18// MediaStorageSystem handles all storage operations for a social media platform
 19type MediaStorageSystem struct {
 20    client      *ProductionS3Client
 21    imageProcessor *ImageProcessor
 22    cdnManager     *CDNManager
 23}
 24
 25// NewMediaStorageSystem creates a complete media storage solution
 26func NewMediaStorageSystem(bucket string) {
 27    ctx := context.Background()
 28
 29    s3Client, err := NewS3Client(ctx, bucket)
 30    if err != nil {
 31        return nil, err
 32    }
 33
 34    prodClient := &ProductionS3Client{S3Client: s3Client}
 35
 36    return &MediaStorageSystem{
 37        client:      prodClient,
 38        imageProcessor: NewImageProcessor(),
 39        cdnManager:     NewCDNManager(bucket),
 40    }, nil
 41}
 42
 43// UploadMedia handles complete media upload workflow
 44func UploadMedia(ctx context.Context, userID string,
 45    file io.Reader, filename, contentType string) {
 46
 47    // Generate secure object key
 48    objectKey := m.generateObjectKey(userID, filename)
 49
 50    // Determine optimal storage class based on content type
 51    storageClass := m.determineStorageClass(contentType)
 52
 53    // Upload original file
 54    err := m.client.UploadWithOptions(ctx, objectKey, file, UploadOptions{
 55        ContentType:  contentType,
 56        StorageClass: storageClass,
 57        Metadata: map[string]string{
 58            "user_id":   userID,
 59            "filename":  filename,
 60            "uploaded": time.Now().Format(time.RFC3339),
 61        },
 62        Encrypt: true,
 63    })
 64    if err != nil {
 65        return nil, fmt.Errorf("upload failed: %w", err)
 66    }
 67
 68    // Process image if it's an image file
 69    if strings.HasPrefix(contentType, "image/") {
 70        go m.processImage(ctx, objectKey, contentType)
 71    }
 72
 73    // Generate CDN URLs
 74    cdnURLs := m.cdnManager.GetURLs(objectKey)
 75
 76    return &MediaUploadResult{
 77        ObjectKey:    objectKey,
 78        CDNURLs:      cdnURLs,
 79        StorageClass: storageClass,
 80        UploadedAt:   time.Now(),
 81    }, nil
 82}
 83
 84// processImage creates multiple sizes for images
 85func processImage(ctx context.Context, originalKey, contentType string) {
 86    // Download original
 87    originalData, err := m.client.Download(ctx, originalKey)
 88    if err != nil {
 89        log.Printf("Failed to download original for processing: %v", err)
 90        return
 91    }
 92
 93    // Generate different sizes
 94    sizes := []struct {
 95        name string
 96        size string
 97        class types.StorageClass
 98    }{
 99        {"thumbnail", "200x200", types.StorageClassStandard},     // Hot data
100        {"medium",    "800x600", types.StorageClassStandardIa}, // Warm data
101        {"large",     "1920x1080", types.StorageClassGlacier},    // Cold data
102    }
103
104    for _, sizeInfo := range sizes {
105        resizedData, err := m.imageProcessor.Resize(originalData, sizeInfo.size)
106        if err != nil {
107            log.Printf("Failed to resize image for %s: %v", sizeInfo.name, err)
108            continue
109        }
110
111        // Upload resized version
112        resizedKey := fmt.Sprintf("%s.%s",
113            strings.TrimSuffix(originalKey, filepath.Ext(originalKey)),
114            sizeInfo.name)
115
116        err = m.client.UploadWithOptions(ctx, resizedKey,
117            bytes.NewReader(resizedData), UploadOptions{
118                ContentType:  contentType,
119                StorageClass: sizeInfo.class,
120                Metadata: map[string]string{
121                    "derived_from": originalKey,
122                    "size":        sizeInfo.name,
123                },
124            })
125        if err != nil {
126            log.Printf("Failed to upload resized image %s: %v", sizeInfo.name, err)
127        }
128    }
129}
130
131// GenerateObjectKey creates a secure, unique object key
132func generateObjectKey(userID, filename string) string {
133    timestamp := time.Now().Format("2006/01/02")
134    hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%d",
135        userID, filename, time.Now().UnixNano())))
136
137    return fmt.Sprintf("users/%s/%s/%s%s",
138        userID,
139        timestamp,
140        hex.EncodeToString(hash[:8]),
141        filepath.Ext(filename))
142}
143
144// determineStorageClass selects optimal storage based on content type
145func determineStorageClass(contentType string) types.StorageClass {
146    switch {
147    case strings.HasPrefix(contentType, "image/"), strings.HasPrefix(contentType, "video/"):
148        return types.StorageClassStandardIa // Media files typically infrequent access
149    case strings.HasPrefix(contentType, "text/"):
150        return types.StorageClassStandard // Text files often accessed
151    default:
152        return types.StorageClassStandard
153    }
154}
155
156// MediaUploadResult contains upload metadata
157type MediaUploadResult struct {
158    ObjectKey    string      `json:"object_key"`
159    CDNURLs      []string    `json:"cdn_urls"`
160    StorageClass string      `json:"storage_class"`
161    UploadedAt   time.Time   `json:"uploaded_at"`
162}
163
164func main() {
165    // Example usage
166    storage, err := NewMediaStorageSystem("social-media-bucket")
167    if err != nil {
168        panic(err)
169    }
170
171    // Simulate uploading a user photo
172    fileData := strings.NewReader("fake image data for demonstration")
173
174    result, err := storage.UploadMedia(context.Background(),
175        "user123", fileData, "profile.jpg", "image/jpeg")
176    if err != nil {
177        panic(err)
178    }
179
180    fmt.Printf("✅ Media uploaded successfully:\n")
181    fmt.Printf("   Object Key: %s\n", result.ObjectKey)
182    fmt.Printf("   Storage Class: %s\n", result.StorageClass)
183    fmt.Printf("   CDN URLs: %v\n", result.CDNURLs)
184}

Advanced Features: Multi-Cloud Storage

For maximum reliability, implement multi-cloud storage with automatic failover:

 1// MultiCloudStorage provides redundancy across cloud providers
 2type MultiCloudStorage struct {
 3    primary   StorageProvider
 4    secondary StorageProvider
 5    config    MultiCloudConfig
 6}
 7
 8type StorageProvider interface {
 9    Upload(ctx context.Context, key string, data io.Reader) error
10    Download(ctx context.Context, key string)
11    Exists(ctx context.Context, key string)
12}
13
14// Upload with replication to secondary provider
15func Upload(ctx context.Context, key string, data io.Reader) error {
16    // Upload to primary
17    err := m.primary.Upload(ctx, key, data)
18    if err != nil {
19        return fmt.Errorf("primary upload failed: %w", err)
20    }
21
22    // Replicate to secondary in background
23    go func() {
24        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
25        defer cancel()
26
27        // Read data for secondary upload
28        data, err := m.primary.Download(ctx, key)
29        if err != nil {
30            log.Printf("Failed to read from primary for replication: %v", err)
31            return
32        }
33
34        // Upload to secondary
35        if err := m.secondary.Upload(ctx, key, bytes.NewReader(data)); err != nil {
36            log.Printf("Failed to replicate to secondary: %v", err)
37        }
38    }()
39
40    return nil
41}
42
43// Download with fallback to secondary
44func Download(ctx context.Context, key string) {
45    // Try primary first
46    data, err := m.primary.Download(ctx, key)
47    if err == nil {
48        return data, nil
49    }
50
51    log.Printf("Primary download failed, trying secondary: %v", err)
52
53    // Fallback to secondary
54    return m.secondary.Download(ctx, key)
55}

Advanced Multi-Cloud Storage Strategies

Cloud Provider Abstraction Layer

Building a unified interface across different cloud providers enables flexibility and reduces vendor lock-in:

  1package storage
  2
  3import (
  4    "context"
  5    "fmt"
  6    "io"
  7    "time"
  8)
  9
 10// UnifiedStorage provides cloud-agnostic storage interface
 11type UnifiedStorage interface {
 12    Upload(ctx context.Context, bucket, key string, data io.Reader, opts *UploadOptions) error
 13    Download(ctx context.Context, bucket, key string) ([]byte, error)
 14    Delete(ctx context.Context, bucket, key string) error
 15    List(ctx context.Context, bucket, prefix string, limit int) ([]ObjectInfo, error)
 16    Copy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string) error
 17    GetMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error)
 18    GeneratePresignedURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
 19}
 20
 21type UploadOptions struct {
 22    ContentType     string
 23    ContentEncoding string
 24    CacheControl    string
 25    Metadata        map[string]string
 26    StorageClass    string
 27    ServerSideEncryption bool
 28}
 29
 30type ObjectInfo struct {
 31    Key          string
 32    Size         int64
 33    LastModified time.Time
 34    ETag         string
 35    StorageClass string
 36}
 37
 38type ObjectMetadata struct {
 39    ContentType     string
 40    ContentLength   int64
 41    LastModified    time.Time
 42    ETag            string
 43    Metadata        map[string]string
 44    StorageClass    string
 45    VersionID       string
 46}
 47
 48// MultiCloudStorage manages multiple storage providers with failover
 49type MultiCloudStorage struct {
 50    primary   UnifiedStorage
 51    secondary UnifiedStorage
 52    tertiary  UnifiedStorage
 53    strategy  ReplicationStrategy
 54    monitor   *StorageMonitor
 55}
 56
 57type ReplicationStrategy string
 58
 59const (
 60    StrategyPrimaryOnly     ReplicationStrategy = "primary_only"
 61    StrategyAsyncReplicate  ReplicationStrategy = "async_replicate"
 62    StrategySyncReplicate   ReplicationStrategy = "sync_replicate"
 63    StrategyReadFailover    ReplicationStrategy = "read_failover"
 64)
 65
 66func NewMultiCloudStorage(primary, secondary, tertiary UnifiedStorage, strategy ReplicationStrategy) *MultiCloudStorage {
 67    return &MultiCloudStorage{
 68        primary:   primary,
 69        secondary: secondary,
 70        tertiary:  tertiary,
 71        strategy:  strategy,
 72        monitor:   NewStorageMonitor(),
 73    }
 74}
 75
 76// Upload with intelligent replication based on strategy
 77func (m *MultiCloudStorage) Upload(ctx context.Context, bucket, key string, data io.Reader, opts *UploadOptions) error {
 78    // For replication, we need to buffer the data
 79    var buf *bytes.Buffer
 80    if m.strategy == StrategySyncReplicate || m.strategy == StrategyAsyncReplicate {
 81        buf = &bytes.Buffer{}
 82        data = io.TeeReader(data, buf)
 83    }
 84
 85    // Upload to primary
 86    start := time.Now()
 87    err := m.primary.Upload(ctx, bucket, key, data, opts)
 88    m.monitor.RecordOperation("primary", "upload", time.Since(start), err)
 89
 90    if err != nil {
 91        return fmt.Errorf("primary upload failed: %w", err)
 92    }
 93
 94    // Handle replication based on strategy
 95    switch m.strategy {
 96    case StrategySyncReplicate:
 97        // Synchronous replication to all providers
 98        var errs []error
 99
100        if m.secondary != nil {
101            if err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(buf.Bytes()), opts); err != nil {
102                errs = append(errs, fmt.Errorf("secondary: %w", err))
103            }
104        }
105
106        if m.tertiary != nil {
107            if err := m.tertiary.Upload(ctx, bucket, key, bytes.NewReader(buf.Bytes()), opts); err != nil {
108                errs = append(errs, fmt.Errorf("tertiary: %w", err))
109            }
110        }
111
112        if len(errs) > 0 {
113            return fmt.Errorf("replication errors: %v", errs)
114        }
115
116    case StrategyAsyncReplicate:
117        // Asynchronous replication
118        go m.asyncReplicate(bucket, key, buf.Bytes(), opts)
119    }
120
121    return nil
122}
123
124func (m *MultiCloudStorage) asyncReplicate(bucket, key string, data []byte, opts *UploadOptions) {
125    ctx := context.Background()
126
127    if m.secondary != nil {
128        start := time.Now()
129        err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(data), opts)
130        m.monitor.RecordOperation("secondary", "upload", time.Since(start), err)
131    }
132
133    if m.tertiary != nil {
134        start := time.Now()
135        err := m.tertiary.Upload(ctx, bucket, key, bytes.NewReader(data), opts)
136        m.monitor.RecordOperation("tertiary", "upload", time.Since(start), err)
137    }
138}
139
140// Download with automatic failover
141func (m *MultiCloudStorage) Download(ctx context.Context, bucket, key string) ([]byte, error) {
142    // Try primary first
143    start := time.Now()
144    data, err := m.primary.Download(ctx, bucket, key)
145    m.monitor.RecordOperation("primary", "download", time.Since(start), err)
146
147    if err == nil {
148        return data, nil
149    }
150
151    // Failover to secondary
152    if m.secondary != nil {
153        start = time.Now()
154        data, err = m.secondary.Download(ctx, bucket, key)
155        m.monitor.RecordOperation("secondary", "download", time.Since(start), err)
156
157        if err == nil {
158            // Async heal: restore to primary
159            go m.healPrimary(bucket, key, data)
160            return data, nil
161        }
162    }
163
164    // Failover to tertiary
165    if m.tertiary != nil {
166        start = time.Now()
167        data, err = m.tertiary.Download(ctx, bucket, key)
168        m.monitor.RecordOperation("tertiary", "download", time.Since(start), err)
169
170        if err == nil {
171            // Async heal: restore to primary and secondary
172            go m.healAll(bucket, key, data)
173            return data, nil
174        }
175    }
176
177    return nil, fmt.Errorf("all providers failed to download %s/%s", bucket, key)
178}
179
180func (m *MultiCloudStorage) healPrimary(bucket, key string, data []byte) {
181    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
182    defer cancel()
183
184    opts := &UploadOptions{ContentType: "application/octet-stream"}
185    if err := m.primary.Upload(ctx, bucket, key, bytes.NewReader(data), opts); err != nil {
186        log.Printf("Failed to heal primary for %s/%s: %v", bucket, key, err)
187    }
188}
189
190func (m *MultiCloudStorage) healAll(bucket, key string, data []byte) {
191    m.healPrimary(bucket, key, data)
192
193    if m.secondary != nil {
194        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
195        defer cancel()
196
197        opts := &UploadOptions{ContentType: "application/octet-stream"}
198        if err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(data), opts); err != nil {
199            log.Printf("Failed to heal secondary for %s/%s: %v", bucket, key, err)
200        }
201    }
202}
203
204// GetHealthStatus returns health of all storage providers
205func (m *MultiCloudStorage) GetHealthStatus() map[string]ProviderHealth {
206    return m.monitor.GetProviderHealth()
207}
208
209// StorageMonitor tracks health and performance metrics
210type StorageMonitor struct {
211    metrics map[string]*ProviderMetrics
212    mu      sync.RWMutex
213}
214
215type ProviderMetrics struct {
216    TotalOperations int64
217    SuccessCount    int64
218    ErrorCount      int64
219    TotalLatency    time.Duration
220    LastError       error
221    LastErrorTime   time.Time
222}
223
224type ProviderHealth struct {
225    Provider      string
226    IsHealthy     bool
227    ErrorRate     float64
228    AvgLatency    time.Duration
229    LastError     string
230    LastErrorTime time.Time
231}
232
233func NewStorageMonitor() *StorageMonitor {
234    return &StorageMonitor{
235        metrics: make(map[string]*ProviderMetrics),
236    }
237}
238
239func (sm *StorageMonitor) RecordOperation(provider, operation string, latency time.Duration, err error) {
240    sm.mu.Lock()
241    defer sm.mu.Unlock()
242
243    key := fmt.Sprintf("%s:%s", provider, operation)
244    if sm.metrics[key] == nil {
245        sm.metrics[key] = &ProviderMetrics{}
246    }
247
248    m := sm.metrics[key]
249    m.TotalOperations++
250    m.TotalLatency += latency
251
252    if err != nil {
253        m.ErrorCount++
254        m.LastError = err
255        m.LastErrorTime = time.Now()
256    } else {
257        m.SuccessCount++
258    }
259}
260
261func (sm *StorageMonitor) GetProviderHealth() map[string]ProviderHealth {
262    sm.mu.RLock()
263    defer sm.mu.RUnlock()
264
265    health := make(map[string]ProviderHealth)
266
267    for key, metrics := range sm.metrics {
268        parts := strings.Split(key, ":")
269        provider := parts[0]
270
271        if _, exists := health[provider]; !exists {
272            errorRate := 0.0
273            if metrics.TotalOperations > 0 {
274                errorRate = float64(metrics.ErrorCount) / float64(metrics.TotalOperations)
275            }
276
277            avgLatency := time.Duration(0)
278            if metrics.SuccessCount > 0 {
279                avgLatency = metrics.TotalLatency / time.Duration(metrics.SuccessCount)
280            }
281
282            lastError := ""
283            if metrics.LastError != nil {
284                lastError = metrics.LastError.Error()
285            }
286
287            health[provider] = ProviderHealth{
288                Provider:      provider,
289                IsHealthy:     errorRate < 0.1, // Less than 10% error rate
290                ErrorRate:     errorRate,
291                AvgLatency:    avgLatency,
292                LastError:     lastError,
293                LastErrorTime: metrics.LastErrorTime,
294            }
295        }
296    }
297
298    return health
299}

Performance Optimization Techniques

Intelligent Caching Strategy

Implement multi-tier caching to reduce cloud storage costs and improve performance:

  1// run
  2package main
  3
  4import (
  5    "bytes"
  6    "context"
  7    "crypto/sha256"
  8    "encoding/hex"
  9    "fmt"
 10    "io"
 11    "sync"
 12    "time"
 13)
 14
 15// CachedStorage wraps cloud storage with intelligent caching
 16type CachedStorage struct {
 17    backend    CloudStorage
 18    memCache   *MemoryCache
 19    diskCache  *DiskCache
 20    config     CacheConfig
 21}
 22
 23type CacheConfig struct {
 24    MemoryCacheSize   int64         // Bytes
 25    DiskCacheSize     int64         // Bytes
 26    MemoryTTL         time.Duration
 27    DiskTTL           time.Duration
 28    CacheableMinSize  int64         // Don't cache files smaller than this
 29    CacheableMaxSize  int64         // Don't cache files larger than this
 30}
 31
 32type CloudStorage interface {
 33    Get(ctx context.Context, key string) ([]byte, error)
 34    Put(ctx context.Context, key string, data []byte) error
 35}
 36
 37func NewCachedStorage(backend CloudStorage, config CacheConfig) *CachedStorage {
 38    return &CachedStorage{
 39        backend:   backend,
 40        memCache:  NewMemoryCache(config.MemoryCacheSize),
 41        diskCache: NewDiskCache(config.DiskCacheSize),
 42        config:    config,
 43    }
 44}
 45
 46// Get retrieves data with multi-tier caching
 47func (c *CachedStorage) Get(ctx context.Context, key string) ([]byte, error) {
 48    // Try memory cache first
 49    if data, found := c.memCache.Get(key); found {
 50        return data, nil
 51    }
 52
 53    // Try disk cache
 54    if data, found := c.diskCache.Get(key); found {
 55        // Promote to memory cache
 56        c.memCache.Set(key, data, c.config.MemoryTTL)
 57        return data, nil
 58    }
 59
 60    // Fetch from backend storage
 61    data, err := c.backend.Get(ctx, key)
 62    if err != nil {
 63        return nil, err
 64    }
 65
 66    // Cache if size is appropriate
 67    size := int64(len(data))
 68    if size >= c.config.CacheableMinSize && size <= c.config.CacheableMaxSize {
 69        // Cache in memory and disk
 70        c.memCache.Set(key, data, c.config.MemoryTTL)
 71        c.diskCache.Set(key, data, c.config.DiskTTL)
 72    }
 73
 74    return data, nil
 75}
 76
 77// Put stores data and invalidates cache
 78func (c *CachedStorage) Put(ctx context.Context, key string, data []byte) error {
 79    // Store in backend
 80    if err := c.backend.Put(ctx, key, data); err != nil {
 81        return err
 82    }
 83
 84    // Invalidate caches
 85    c.memCache.Delete(key)
 86    c.diskCache.Delete(key)
 87
 88    return nil
 89}
 90
 91// MemoryCache implements LRU cache in memory
 92type MemoryCache struct {
 93    maxSize    int64
 94    currentSize int64
 95    items      map[string]*cacheItem
 96    lru        *lruList
 97    mu         sync.RWMutex
 98}
 99
100type cacheItem struct {
101    key       string
102    data      []byte
103    size      int64
104    expiresAt time.Time
105    lruNode   *lruNode
106}
107
108type lruList struct {
109    head *lruNode
110    tail *lruNode
111}
112
113type lruNode struct {
114    key  string
115    prev *lruNode
116    next *lruNode
117}
118
119func NewMemoryCache(maxSize int64) *MemoryCache {
120    return &MemoryCache{
121        maxSize: maxSize,
122        items:   make(map[string]*cacheItem),
123        lru:     &lruList{},
124    }
125}
126
127func (mc *MemoryCache) Get(key string) ([]byte, bool) {
128    mc.mu.Lock()
129    defer mc.mu.Unlock()
130
131    item, exists := mc.items[key]
132    if !exists {
133        return nil, false
134    }
135
136    // Check expiration
137    if time.Now().After(item.expiresAt) {
138        mc.deleteItem(key)
139        return nil, false
140    }
141
142    // Move to front of LRU list
143    mc.lru.moveToFront(item.lruNode)
144
145    return item.data, true
146}
147
148func (mc *MemoryCache) Set(key string, data []byte, ttl time.Duration) {
149    mc.mu.Lock()
150    defer mc.mu.Unlock()
151
152    size := int64(len(data))
153
154    // Evict if necessary
155    for mc.currentSize+size > mc.maxSize && len(mc.items) > 0 {
156        // Remove least recently used
157        if mc.lru.tail != nil {
158            mc.deleteItem(mc.lru.tail.key)
159        }
160    }
161
162    // Create new item
163    node := &lruNode{key: key}
164    item := &cacheItem{
165        key:       key,
166        data:      data,
167        size:      size,
168        expiresAt: time.Now().Add(ttl),
169        lruNode:   node,
170    }
171
172    mc.items[key] = item
173    mc.currentSize += size
174    mc.lru.addToFront(node)
175}
176
177func (mc *MemoryCache) Delete(key string) {
178    mc.mu.Lock()
179    defer mc.mu.Unlock()
180    mc.deleteItem(key)
181}
182
183func (mc *MemoryCache) deleteItem(key string) {
184    item, exists := mc.items[key]
185    if !exists {
186        return
187    }
188
189    delete(mc.items, key)
190    mc.currentSize -= item.size
191    mc.lru.remove(item.lruNode)
192}
193
194// LRU list operations
195func (l *lruList) addToFront(node *lruNode) {
196    if l.head == nil {
197        l.head = node
198        l.tail = node
199        return
200    }
201
202    node.next = l.head
203    l.head.prev = node
204    l.head = node
205}
206
207func (l *lruList) moveToFront(node *lruNode) {
208    if node == l.head {
209        return
210    }
211
212    // Remove from current position
213    if node.prev != nil {
214        node.prev.next = node.next
215    }
216    if node.next != nil {
217        node.next.prev = node.prev
218    }
219    if node == l.tail {
220        l.tail = node.prev
221    }
222
223    // Add to front
224    node.prev = nil
225    node.next = l.head
226    l.head.prev = node
227    l.head = node
228}
229
230func (l *lruList) remove(node *lruNode) {
231    if node.prev != nil {
232        node.prev.next = node.next
233    } else {
234        l.head = node.next
235    }
236
237    if node.next != nil {
238        node.next.prev = node.prev
239    } else {
240        l.tail = node.prev
241    }
242}
243
244// DiskCache implements disk-based cache
245type DiskCache struct {
246    maxSize     int64
247    currentSize int64
248    baseDir     string
249    items       map[string]*diskCacheItem
250    mu          sync.RWMutex
251}
252
253type diskCacheItem struct {
254    key       string
255    path      string
256    size      int64
257    expiresAt time.Time
258}
259
260func NewDiskCache(maxSize int64) *DiskCache {
261    baseDir := filepath.Join(os.TempDir(), "storage_cache")
262    os.MkdirAll(baseDir, 0755)
263
264    return &DiskCache{
265        maxSize: maxSize,
266        baseDir: baseDir,
267        items:   make(map[string]*diskCacheItem),
268    }
269}
270
271func (dc *DiskCache) Get(key string) ([]byte, bool) {
272    dc.mu.RLock()
273    item, exists := dc.items[key]
274    dc.mu.RUnlock()
275
276    if !exists {
277        return nil, false
278    }
279
280    // Check expiration
281    if time.Now().After(item.expiresAt) {
282        dc.Delete(key)
283        return nil, false
284    }
285
286    // Read from disk
287    data, err := os.ReadFile(item.path)
288    if err != nil {
289        dc.Delete(key)
290        return nil, false
291    }
292
293    return data, true
294}
295
296func (dc *DiskCache) Set(key string, data []byte, ttl time.Duration) {
297    size := int64(len(data))
298
299    dc.mu.Lock()
300    defer dc.mu.Unlock()
301
302    // Evict if necessary
303    for dc.currentSize+size > dc.maxSize && len(dc.items) > 0 {
304        // Remove oldest item
305        var oldestKey string
306        var oldestTime time.Time
307        for k, v := range dc.items {
308            if oldestTime.IsZero() || v.expiresAt.Before(oldestTime) {
309                oldestKey = k
310                oldestTime = v.expiresAt
311            }
312        }
313        if oldestKey != "" {
314            dc.deleteItem(oldestKey)
315        }
316    }
317
318    // Write to disk
319    hash := sha256.Sum256([]byte(key))
320    filename := hex.EncodeToString(hash[:])
321    path := filepath.Join(dc.baseDir, filename)
322
323    if err := os.WriteFile(path, data, 0644); err != nil {
324        return
325    }
326
327    // Store metadata
328    dc.items[key] = &diskCacheItem{
329        key:       key,
330        path:      path,
331        size:      size,
332        expiresAt: time.Now().Add(ttl),
333    }
334    dc.currentSize += size
335}
336
337func (dc *DiskCache) Delete(key string) {
338    dc.mu.Lock()
339    defer dc.mu.Unlock()
340    dc.deleteItem(key)
341}
342
343func (dc *DiskCache) deleteItem(key string) {
344    item, exists := dc.items[key]
345    if !exists {
346        return
347    }
348
349    os.Remove(item.path)
350    delete(dc.items, key)
351    dc.currentSize -= item.size
352}
353
354func main() {
355    // Example usage
356    backend := &SimpleCloudStorage{}
357    config := CacheConfig{
358        MemoryCacheSize:  100 * 1024 * 1024, // 100MB
359        DiskCacheSize:    1024 * 1024 * 1024, // 1GB
360        MemoryTTL:        5 * time.Minute,
361        DiskTTL:          1 * time.Hour,
362        CacheableMinSize: 1024,              // 1KB
363        CacheableMaxSize: 10 * 1024 * 1024, // 10MB
364    }
365
366    storage := NewCachedStorage(backend, config)
367
368    ctx := context.Background()
369
370    // First get - from backend
371    data1, err := storage.Get(ctx, "example-key")
372    if err != nil {
373        fmt.Printf("Error: %v\n", err)
374    } else {
375        fmt.Printf("Retrieved %d bytes (from backend)\n", len(data1))
376    }
377
378    // Second get - from cache
379    data2, err := storage.Get(ctx, "example-key")
380    if err != nil {
381        fmt.Printf("Error: %v\n", err)
382    } else {
383        fmt.Printf("Retrieved %d bytes (from cache)\n", len(data2))
384    }
385}
386
387// Simple backend for demonstration
388type SimpleCloudStorage struct{}
389
390func (s *SimpleCloudStorage) Get(ctx context.Context, key string) ([]byte, error) {
391    return []byte("example data from cloud storage"), nil
392}
393
394func (s *SimpleCloudStorage) Put(ctx context.Context, key string, data []byte) error {
395    return nil
396}

Security and Compliance Best Practices

Encryption and Access Control

Implement comprehensive security for cloud storage:

  1package security
  2
  3import (
  4    "crypto/aes"
  5    "crypto/cipher"
  6    "crypto/rand"
  7    "crypto/sha256"
  8    "encoding/base64"
  9    "fmt"
 10    "io"
 11    "time"
 12
 13    "golang.org/x/crypto/pbkdf2"
 14)
 15
 16// SecureStorage wraps cloud storage with encryption and access control
 17type SecureStorage struct {
 18    backend         CloudStorage
 19    encryptionKey   []byte
 20    accessControl   *AccessControl
 21    auditLog        *AuditLog
 22}
 23
 24type AccessControl struct {
 25    policies map[string]*AccessPolicy
 26    mu       sync.RWMutex
 27}
 28
 29type AccessPolicy struct {
 30    AllowedUsers []string
 31    AllowedRoles []string
 32    Permissions  []Permission
 33    ExpiresAt    *time.Time
 34}
 35
 36type Permission string
 37
 38const (
 39    PermissionRead   Permission = "read"
 40    PermissionWrite  Permission = "write"
 41    PermissionDelete Permission = "delete"
 42)
 43
 44func NewSecureStorage(backend CloudStorage, masterKey string) *SecureStorage {
 45    // Derive encryption key from master key
 46    salt := []byte("cloud-storage-salt")
 47    encryptionKey := pbkdf2.Key([]byte(masterKey), salt, 100000, 32, sha256.New)
 48
 49    return &SecureStorage{
 50        backend:       backend,
 51        encryptionKey: encryptionKey,
 52        accessControl: NewAccessControl(),
 53        auditLog:      NewAuditLog(),
 54    }
 55}
 56
 57// Upload with encryption
 58func (s *SecureStorage) UploadEncrypted(ctx context.Context, bucket, key string, data io.Reader, userID string) error {
 59    // Check permissions
 60    if !s.accessControl.HasPermission(userID, key, PermissionWrite) {
 61        s.auditLog.LogAccess(userID, key, "upload", false, "permission denied")
 62        return fmt.Errorf("access denied")
 63    }
 64
 65    // Read data
 66    plaintext, err := io.ReadAll(data)
 67    if err != nil {
 68        return err
 69    }
 70
 71    // Encrypt data
 72    ciphertext, err := s.encrypt(plaintext)
 73    if err != nil {
 74        return err
 75    }
 76
 77    // Upload encrypted data
 78    opts := &UploadOptions{
 79        ContentType: "application/octet-stream",
 80        Metadata: map[string]string{
 81            "encrypted": "true",
 82            "user_id":   userID,
 83            "timestamp": time.Now().Format(time.RFC3339),
 84        },
 85        ServerSideEncryption: true,
 86    }
 87
 88    err = s.backend.Upload(ctx, bucket, key, bytes.NewReader(ciphertext), opts)
 89
 90    // Audit log
 91    s.auditLog.LogAccess(userID, key, "upload", err == nil, "")
 92
 93    return err
 94}
 95
 96// Download with decryption
 97func (s *SecureStorage) DownloadDecrypted(ctx context.Context, bucket, key string, userID string) ([]byte, error) {
 98    // Check permissions
 99    if !s.accessControl.HasPermission(userID, key, PermissionRead) {
100        s.auditLog.LogAccess(userID, key, "download", false, "permission denied")
101        return nil, fmt.Errorf("access denied")
102    }
103
104    // Download encrypted data
105    ciphertext, err := s.backend.Download(ctx, bucket, key)
106    if err != nil {
107        s.auditLog.LogAccess(userID, key, "download", false, err.Error())
108        return nil, err
109    }
110
111    // Decrypt data
112    plaintext, err := s.decrypt(ciphertext)
113    if err != nil {
114        return nil, err
115    }
116
117    // Audit log
118    s.auditLog.LogAccess(userID, key, "download", true, "")
119
120    return plaintext, nil
121}
122
123// encrypt uses AES-GCM for authenticated encryption
124func (s *SecureStorage) encrypt(plaintext []byte) ([]byte, error) {
125    block, err := aes.NewCipher(s.encryptionKey)
126    if err != nil {
127        return nil, err
128    }
129
130    gcm, err := cipher.NewGCM(block)
131    if err != nil {
132        return nil, err
133    }
134
135    nonce := make([]byte, gcm.NonceSize())
136    if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
137        return nil, err
138    }
139
140    return gcm.Seal(nonce, nonce, plaintext, nil), nil
141}
142
143// decrypt using AES-GCM
144func (s *SecureStorage) decrypt(ciphertext []byte) ([]byte, error) {
145    block, err := aes.NewCipher(s.encryptionKey)
146    if err != nil {
147        return nil, err
148    }
149
150    gcm, err := cipher.NewGCM(block)
151    if err != nil {
152        return nil, err
153    }
154
155    nonceSize := gcm.NonceSize()
156    if len(ciphertext) < nonceSize {
157        return nil, fmt.Errorf("ciphertext too short")
158    }
159
160    nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
161    return gcm.Open(nil, nonce, ciphertext, nil)
162}
163
164// NewAccessControl creates an access control manager
165func NewAccessControl() *AccessControl {
166    return &AccessControl{
167        policies: make(map[string]*AccessPolicy),
168    }
169}
170
171// SetPolicy sets access policy for a resource
172func (ac *AccessControl) SetPolicy(resource string, policy *AccessPolicy) {
173    ac.mu.Lock()
174    defer ac.mu.Unlock()
175    ac.policies[resource] = policy
176}
177
178// HasPermission checks if user has permission for resource
179func (ac *AccessControl) HasPermission(userID, resource string, perm Permission) bool {
180    ac.mu.RLock()
181    defer ac.mu.RUnlock()
182
183    policy, exists := ac.policies[resource]
184    if !exists {
185        return false
186    }
187
188    // Check expiration
189    if policy.ExpiresAt != nil && time.Now().After(*policy.ExpiresAt) {
190        return false
191    }
192
193    // Check user permission
194    for _, allowedUser := range policy.AllowedUsers {
195        if allowedUser == userID {
196            for _, p := range policy.Permissions {
197                if p == perm {
198                    return true
199                }
200            }
201        }
202    }
203
204    return false
205}
206
207// AuditLog tracks all access attempts
208type AuditLog struct {
209    entries []AuditEntry
210    mu      sync.Mutex
211}
212
213type AuditEntry struct {
214    Timestamp time.Time
215    UserID    string
216    Resource  string
217    Action    string
218    Success   bool
219    Error     string
220}
221
222func NewAuditLog() *AuditLog {
223    return &AuditLog{
224        entries: make([]AuditEntry, 0),
225    }
226}
227
228func (al *AuditLog) LogAccess(userID, resource, action string, success bool, errorMsg string) {
229    al.mu.Lock()
230    defer al.mu.Unlock()
231
232    entry := AuditEntry{
233        Timestamp: time.Now(),
234        UserID:    userID,
235        Resource:  resource,
236        Action:    action,
237        Success:   success,
238        Error:     errorMsg,
239    }
240
241    al.entries = append(al.entries, entry)
242
243    // In production, persist to database or log aggregation system
244}
245
246func (al *AuditLog) GetRecentEntries(limit int) []AuditEntry {
247    al.mu.Lock()
248    defer al.mu.Unlock()
249
250    if len(al.entries) < limit {
251        return al.entries
252    }
253
254    return al.entries[len(al.entries)-limit:]
255}

S3-Compatible Storage and Alternative Providers

While AWS S3 is the most popular object storage service, many applications benefit from S3-compatible alternatives like MinIO, DigitalOcean Spaces, Backblaze B2, and Wasabi. These services offer cost savings, on-premises deployment options, and geographic flexibility while maintaining S3 API compatibility.

Why S3-Compatible Storage Matters

Cost Optimization: Alternative providers like Wasabi and Backblaze B2 can be 80% cheaper than AWS S3 for certain workloads, with no egress fees.

On-Premises Control: MinIO enables you to run S3-compatible storage in your own data center, providing full control over data sovereignty and compliance requirements.

Multi-Cloud Strategy: Using S3-compatible APIs allows seamless migration between providers without changing application code.

Edge Deployment: MinIO can run on edge devices and IoT gateways, enabling distributed storage architectures that aren't possible with cloud-only solutions.

MinIO: Self-Hosted S3-Compatible Storage

MinIO is a high-performance, distributed object storage server that implements the S3 API. It's perfect for:

  • Development and testing environments
  • On-premises enterprise storage
  • Hybrid cloud architectures
  • Edge computing scenarios
  1// run
  2package main
  3
  4import (
  5    "context"
  6    "fmt"
  7    "io"
  8    "log"
  9    "strings"
 10    "time"
 11
 12    "github.com/minio/minio-go/v7"
 13    "github.com/minio/minio-go/v7/pkg/credentials"
 14)
 15
 16// MinIOClient wraps MinIO operations with production patterns
 17type MinIOClient struct {
 18    client     *minio.Client
 19    endpoint   string
 20    bucketName string
 21}
 22
 23// NewMinIOClient creates a new MinIO client
 24func NewMinIOClient(endpoint, accessKey, secretKey, bucketName string, useSSL bool) (*MinIOClient, error) {
 25    client, err := minio.New(endpoint, &minio.Options{
 26        Creds:  credentials.NewStaticV4(accessKey, secretKey, ""),
 27        Secure: useSSL,
 28    })
 29    if err != nil {
 30        return nil, fmt.Errorf("failed to create MinIO client: %w", err)
 31    }
 32
 33    return &MinIOClient{
 34        client:     client,
 35        endpoint:   endpoint,
 36        bucketName: bucketName,
 37    }, nil
 38}
 39
 40// EnsureBucket creates bucket if it doesn't exist
 41func (m *MinIOClient) EnsureBucket(ctx context.Context) error {
 42    exists, err := m.client.BucketExists(ctx, m.bucketName)
 43    if err != nil {
 44        return fmt.Errorf("failed to check bucket: %w", err)
 45    }
 46
 47    if !exists {
 48        err = m.client.MakeBucket(ctx, m.bucketName, minio.MakeBucketOptions{
 49            Region:        "us-east-1",
 50            ObjectLocking: false,
 51        })
 52        if err != nil {
 53            return fmt.Errorf("failed to create bucket: %w", err)
 54        }
 55        log.Printf("Created bucket: %s", m.bucketName)
 56    }
 57
 58    return nil
 59}
 60
 61// Upload uploads an object to MinIO
 62func (m *MinIOClient) Upload(ctx context.Context, objectName string, data io.Reader, size int64, contentType string) error {
 63    _, err := m.client.PutObject(
 64        ctx,
 65        m.bucketName,
 66        objectName,
 67        data,
 68        size,
 69        minio.PutObjectOptions{
 70            ContentType: contentType,
 71            UserMetadata: map[string]string{
 72                "uploaded-at": time.Now().Format(time.RFC3339),
 73            },
 74        },
 75    )
 76    if err != nil {
 77        return fmt.Errorf("failed to upload: %w", err)
 78    }
 79
 80    log.Printf("Uploaded %s (%d bytes)", objectName, size)
 81    return nil
 82}
 83
 84// Download downloads an object from MinIO
 85func (m *MinIOClient) Download(ctx context.Context, objectName string) ([]byte, error) {
 86    object, err := m.client.GetObject(ctx, m.bucketName, objectName, minio.GetObjectOptions{})
 87    if err != nil {
 88        return nil, fmt.Errorf("failed to get object: %w", err)
 89    }
 90    defer object.Close()
 91
 92    data, err := io.ReadAll(object)
 93    if err != nil {
 94        return nil, fmt.Errorf("failed to read object: %w", err)
 95    }
 96
 97    return data, nil
 98}
 99
100// GetPresignedURL generates a presigned URL for direct uploads/downloads
101func (m *MinIOClient) GetPresignedURL(ctx context.Context, objectName string, expires time.Duration, forUpload bool) (string, error) {
102    var url string
103    var err error
104
105    if forUpload {
106        url, err = m.client.PresignedPutObject(ctx, m.bucketName, objectName, expires)
107    } else {
108        url, err = m.client.PresignedGetObject(ctx, m.bucketName, objectName, expires, nil)
109    }
110
111    if err != nil {
112        return "", fmt.Errorf("failed to generate presigned URL: %w", err)
113    }
114
115    return url, nil
116}
117
118// ListObjects lists all objects in the bucket with pagination
119func (m *MinIOClient) ListObjects(ctx context.Context, prefix string, maxResults int) ([]ObjectInfo, error) {
120    var objects []ObjectInfo
121
122    opts := minio.ListObjectsOptions{
123        Prefix:    prefix,
124        Recursive: true,
125        MaxKeys:   maxResults,
126    }
127
128    for object := range m.client.ListObjects(ctx, m.bucketName, opts) {
129        if object.Err != nil {
130            return nil, fmt.Errorf("error listing objects: %w", object.Err)
131        }
132
133        objects = append(objects, ObjectInfo{
134            Key:          object.Key,
135            Size:         object.Size,
136            LastModified: object.LastModified,
137            ContentType:  object.ContentType,
138            ETag:         object.ETag,
139        })
140    }
141
142    return objects, nil
143}
144
145// DeleteObject removes an object from storage
146func (m *MinIOClient) DeleteObject(ctx context.Context, objectName string) error {
147    err := m.client.RemoveObject(ctx, m.bucketName, objectName, minio.RemoveObjectOptions{})
148    if err != nil {
149        return fmt.Errorf("failed to delete object: %w", err)
150    }
151
152    log.Printf("Deleted object: %s", objectName)
153    return nil
154}
155
156type ObjectInfo struct {
157    Key          string
158    Size         int64
159    LastModified time.Time
160    ContentType  string
161    ETag         string
162}
163
164func main() {
165    // Example usage with MinIO
166    ctx := context.Background()
167
168    // Connect to MinIO (use your MinIO server details)
169    client, err := NewMinIOClient(
170        "localhost:9000",  // MinIO endpoint
171        "minioadmin",      // Access key
172        "minioadmin",      // Secret key
173        "my-bucket",       // Bucket name
174        false,             // Use SSL
175    )
176    if err != nil {
177        log.Fatal(err)
178    }
179
180    // Ensure bucket exists
181    if err := client.EnsureBucket(ctx); err != nil {
182        log.Fatal(err)
183    }
184
185    // Upload a file
186    content := strings.NewReader("Hello from MinIO!")
187    err = client.Upload(ctx, "test.txt", content, int64(content.Len()), "text/plain")
188    if err != nil {
189        log.Fatal(err)
190    }
191
192    // Generate presigned URL for download
193    url, err := client.GetPresignedURL(ctx, "test.txt", 15*time.Minute, false)
194    if err != nil {
195        log.Fatal(err)
196    }
197    fmt.Printf("Presigned URL: %s\n", url)
198
199    // List objects
200    objects, err := client.ListObjects(ctx, "", 100)
201    if err != nil {
202        log.Fatal(err)
203    }
204
205    fmt.Printf("\nObjects in bucket:\n")
206    for _, obj := range objects {
207        fmt.Printf("  %s (%d bytes, %s)\n", obj.Key, obj.Size, obj.LastModified.Format(time.RFC3339))
208    }
209
210    // Download the file
211    data, err := client.Download(ctx, "test.txt")
212    if err != nil {
213        log.Fatal(err)
214    }
215    fmt.Printf("\nDownloaded content: %s\n", string(data))
216
217    // Clean up
218    if err := client.DeleteObject(ctx, "test.txt"); err != nil {
219        log.Fatal(err)
220    }
221
222    fmt.Println("\nMinIO operations completed successfully!")
223}

Multi-Provider Abstraction for S3-Compatible Services

When working with multiple S3-compatible providers, create a unified interface:

  1// run
  2package main
  3
  4import (
  5    "context"
  6    "fmt"
  7    "io"
  8    "log"
  9    "strings"
 10)
 11
 12// StorageProvider defines the interface for all S3-compatible storage
 13type StorageProvider interface {
 14    Upload(ctx context.Context, key string, data io.Reader, size int64) error
 15    Download(ctx context.Context, key string) ([]byte, error)
 16    Delete(ctx context.Context, key string) error
 17    List(ctx context.Context, prefix string) ([]string, error)
 18    GetURL(ctx context.Context, key string) (string, error)
 19}
 20
 21// ProviderType identifies the storage provider
 22type ProviderType string
 23
 24const (
 25    ProviderAWS            ProviderType = "aws"
 26    ProviderMinIO          ProviderType = "minio"
 27    ProviderDigitalOcean   ProviderType = "digitalocean"
 28    ProviderBackblaze      ProviderType = "backblaze"
 29    ProviderWasabi         ProviderType = "wasabi"
 30)
 31
 32// StorageConfig holds provider configuration
 33type StorageConfig struct {
 34    Provider    ProviderType
 35    Endpoint    string
 36    AccessKey   string
 37    SecretKey   string
 38    BucketName  string
 39    Region      string
 40    UseSSL      bool
 41}
 42
 43// MultiProviderStorage manages multiple storage providers
 44type MultiProviderStorage struct {
 45    providers map[ProviderType]StorageProvider
 46    primary   ProviderType
 47    fallbacks []ProviderType
 48}
 49
 50// NewMultiProviderStorage creates a storage manager with multiple providers
 51func NewMultiProviderStorage(configs []StorageConfig, primary ProviderType) (*MultiProviderStorage, error) {
 52    mps := &MultiProviderStorage{
 53        providers: make(map[ProviderType]StorageProvider),
 54        primary:   primary,
 55        fallbacks: make([]ProviderType, 0),
 56    }
 57
 58    for _, config := range configs {
 59        var provider StorageProvider
 60        var err error
 61
 62        switch config.Provider {
 63        case ProviderMinIO, ProviderDigitalOcean, ProviderBackblaze, ProviderWasabi:
 64            // All use S3-compatible API
 65            provider, err = NewS3CompatibleProvider(config)
 66        case ProviderAWS:
 67            provider, err = NewAWSProvider(config)
 68        default:
 69            return nil, fmt.Errorf("unsupported provider: %s", config.Provider)
 70        }
 71
 72        if err != nil {
 73            return nil, fmt.Errorf("failed to initialize %s: %w", config.Provider, err)
 74        }
 75
 76        mps.providers[config.Provider] = provider
 77
 78        if config.Provider != primary {
 79            mps.fallbacks = append(mps.fallbacks, config.Provider)
 80        }
 81    }
 82
 83    return mps, nil
 84}
 85
 86// Upload with automatic failover
 87func (mps *MultiProviderStorage) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
 88    // Try primary provider first
 89    primary := mps.providers[mps.primary]
 90    err := primary.Upload(ctx, key, data, size)
 91    if err == nil {
 92        log.Printf("Uploaded to primary provider (%s)", mps.primary)
 93        return nil
 94    }
 95
 96    log.Printf("Primary provider (%s) failed: %v, trying fallbacks", mps.primary, err)
 97
 98    // Try fallback providers
 99    for _, fallbackType := range mps.fallbacks {
100        fallback := mps.providers[fallbackType]
101        if err := fallback.Upload(ctx, key, data, size); err == nil {
102            log.Printf("Uploaded to fallback provider (%s)", fallbackType)
103            return nil
104        }
105        log.Printf("Fallback provider (%s) failed: %v", fallbackType, err)
106    }
107
108    return fmt.Errorf("all providers failed")
109}
110
111// Download with automatic failover
112func (mps *MultiProviderStorage) Download(ctx context.Context, key string) ([]byte, error) {
113    // Try primary provider first
114    primary := mps.providers[mps.primary]
115    data, err := primary.Download(ctx, key)
116    if err == nil {
117        return data, nil
118    }
119
120    log.Printf("Primary provider (%s) failed: %v, trying fallbacks", mps.primary, err)
121
122    // Try fallback providers
123    for _, fallbackType := range mps.fallbacks {
124        fallback := mps.providers[fallbackType]
125        data, err := fallback.Download(ctx, key)
126        if err == nil {
127            log.Printf("Downloaded from fallback provider (%s)", fallbackType)
128            return data, nil
129        }
130    }
131
132    return nil, fmt.Errorf("all providers failed")
133}
134
135// S3CompatibleProvider implements StorageProvider for S3-compatible services
136type S3CompatibleProvider struct {
137    config StorageConfig
138    // In production, this would wrap minio-go client
139}
140
141func NewS3CompatibleProvider(config StorageConfig) (*S3CompatibleProvider, error) {
142    // Initialize S3-compatible client (MinIO, DigitalOcean Spaces, etc.)
143    return &S3CompatibleProvider{config: config}, nil
144}
145
146func (p *S3CompatibleProvider) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
147    log.Printf("[%s] Uploading %s", p.config.Provider, key)
148    // Actual implementation would use minio-go client
149    return nil
150}
151
152func (p *S3CompatibleProvider) Download(ctx context.Context, key string) ([]byte, error) {
153    log.Printf("[%s] Downloading %s", p.config.Provider, key)
154    // Actual implementation would use minio-go client
155    return []byte(fmt.Sprintf("Data from %s", p.config.Provider)), nil
156}
157
158func (p *S3CompatibleProvider) Delete(ctx context.Context, key string) error {
159    log.Printf("[%s] Deleting %s", p.config.Provider, key)
160    return nil
161}
162
163func (p *S3CompatibleProvider) List(ctx context.Context, prefix string) ([]string, error) {
164    log.Printf("[%s] Listing with prefix %s", p.config.Provider, prefix)
165    return []string{"file1.txt", "file2.txt"}, nil
166}
167
168func (p *S3CompatibleProvider) GetURL(ctx context.Context, key string) (string, error) {
169    return fmt.Sprintf("https://%s/%s/%s", p.config.Endpoint, p.config.BucketName, key), nil
170}
171
172// AWSProvider implements StorageProvider for AWS S3
173type AWSProvider struct {
174    config StorageConfig
175}
176
177func NewAWSProvider(config StorageConfig) (*AWSProvider, error) {
178    return &AWSProvider{config: config}, nil
179}
180
181func (p *AWSProvider) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
182    log.Printf("[AWS] Uploading %s", key)
183    return nil
184}
185
186func (p *AWSProvider) Download(ctx context.Context, key string) ([]byte, error) {
187    log.Printf("[AWS] Downloading %s", key)
188    return []byte("Data from AWS"), nil
189}
190
191func (p *AWSProvider) Delete(ctx context.Context, key string) error {
192    log.Printf("[AWS] Deleting %s", key)
193    return nil
194}
195
196func (p *AWSProvider) List(ctx context.Context, prefix string) ([]string, error) {
197    log.Printf("[AWS] Listing with prefix %s", prefix)
198    return []string{"file1.txt", "file2.txt"}, nil
199}
200
201func (p *AWSProvider) GetURL(ctx context.Context, key string) (string, error) {
202    return fmt.Sprintf("https://s3.%s.amazonaws.com/%s/%s", p.config.Region, p.config.BucketName, key), nil
203}
204
205func main() {
206    ctx := context.Background()
207
208    // Configure multiple providers
209    configs := []StorageConfig{
210        {
211            Provider:   ProviderAWS,
212            Region:     "us-east-1",
213            BucketName: "my-aws-bucket",
214            AccessKey:  "AWS_ACCESS_KEY",
215            SecretKey:  "AWS_SECRET_KEY",
216            UseSSL:     true,
217        },
218        {
219            Provider:   ProviderMinIO,
220            Endpoint:   "localhost:9000",
221            BucketName: "my-minio-bucket",
222            AccessKey:  "minioadmin",
223            SecretKey:  "minioadmin",
224            UseSSL:     false,
225        },
226        {
227            Provider:   ProviderDigitalOcean,
228            Endpoint:   "nyc3.digitaloceanspaces.com",
229            BucketName: "my-do-bucket",
230            AccessKey:  "DO_ACCESS_KEY",
231            SecretKey:  "DO_SECRET_KEY",
232            UseSSL:     true,
233        },
234    }
235
236    // Create multi-provider storage with AWS as primary
237    storage, err := NewMultiProviderStorage(configs, ProviderAWS)
238    if err != nil {
239        log.Fatal(err)
240    }
241
242    // Upload with automatic failover
243    content := strings.NewReader("Important data")
244    err = storage.Upload(ctx, "important.txt", content, int64(content.Len()))
245    if err != nil {
246        log.Fatal(err)
247    }
248
249    // Download with automatic failover
250    data, err := storage.Download(ctx, "important.txt")
251    if err != nil {
252        log.Fatal(err)
253    }
254
255    fmt.Printf("Downloaded: %s\n", string(data))
256    fmt.Println("\nMulti-provider storage demonstration complete!")
257}

Cost Comparison: AWS S3 vs S3-Compatible Alternatives

Understanding pricing differences helps optimize storage costs:

Storage Costs (per TB/month):
- AWS S3 Standard:        $23.00
- DigitalOcean Spaces:    $5.00
- Backblaze B2:          $5.00
- Wasabi:                $5.99
- MinIO (self-hosted):   Infrastructure cost only

Egress Costs (per TB):
- AWS S3:                $90.00
- DigitalOcean Spaces:   $10.00 (after 1TB included)
- Backblaze B2:          $10.00
- Wasabi:                $0 (no egress fees)
- MinIO (self-hosted):   Bandwidth cost only

Decision Framework:

  1. High Egress Traffic: Use Wasabi or self-hosted MinIO
  2. Low Traffic, Small Scale: Use DigitalOcean Spaces or Backblaze
  3. Enterprise Scale: Use AWS S3 with CloudFront CDN
  4. Data Sovereignty: Use self-hosted MinIO
  5. Hybrid Requirements: Use multi-provider strategy

Resumable and Multipart Upload Patterns

Large file uploads require special handling to deal with network interruptions, timeouts, and memory constraints. Multipart uploads and resumable patterns are essential for production applications.

Why Resumable Uploads Matter

User Experience: Users shouldn't lose progress when uploading large files if their connection drops temporarily.

Reliability: Network instability is common on mobile networks and in areas with poor connectivity.

Performance: Parallel multipart uploads can significantly speed up large file transfers.

Memory Efficiency: Streaming uploads prevent loading entire files into memory.

Multipart Upload Implementation

Multipart upload splits large files into smaller chunks, uploads them in parallel, and reassembles them server-side:

  1// run
  2package main
  3
  4import (
  5    "context"
  6    "crypto/md5"
  7    "encoding/hex"
  8    "fmt"
  9    "io"
 10    "log"
 11    "sync"
 12    "time"
 13)
 14
 15// MultipartUploader handles chunked uploads with resume capability
 16type MultipartUploader struct {
 17    chunkSize   int64
 18    maxWorkers  int
 19    maxRetries  int
 20    uploadStore UploadStateStore
 21}
 22
 23// UploadState tracks progress of a multipart upload
 24type UploadState struct {
 25    UploadID       string
 26    Key            string
 27    TotalSize      int64
 28    ChunkSize      int64
 29    CompletedParts map[int]PartInfo
 30    CreatedAt      time.Time
 31    UpdatedAt      time.Time
 32}
 33
 34// PartInfo contains information about an uploaded part
 35type PartInfo struct {
 36    PartNumber int
 37    ETag       string
 38    Size       int64
 39    UploadedAt time.Time
 40}
 41
 42// UploadStateStore persists upload state for resumability
 43type UploadStateStore interface {
 44    Save(state *UploadState) error
 45    Load(uploadID string) (*UploadState, error)
 46    Delete(uploadID string) error
 47}
 48
 49// InMemoryUploadStore provides in-memory state storage (use Redis in production)
 50type InMemoryUploadStore struct {
 51    states map[string]*UploadState
 52    mu     sync.RWMutex
 53}
 54
 55func NewInMemoryUploadStore() *InMemoryUploadStore {
 56    return &InMemoryUploadStore{
 57        states: make(map[string]*UploadState),
 58    }
 59}
 60
 61func (s *InMemoryUploadStore) Save(state *UploadState) error {
 62    s.mu.Lock()
 63    defer s.mu.Unlock()
 64    state.UpdatedAt = time.Now()
 65    s.states[state.UploadID] = state
 66    return nil
 67}
 68
 69func (s *InMemoryUploadStore) Load(uploadID string) (*UploadState, error) {
 70    s.mu.RLock()
 71    defer s.mu.RUnlock()
 72    state, exists := s.states[uploadID]
 73    if !exists {
 74        return nil, fmt.Errorf("upload state not found")
 75    }
 76    return state, nil
 77}
 78
 79func (s *InMemoryUploadStore) Delete(uploadID string) error {
 80    s.mu.Lock()
 81    defer s.mu.Unlock()
 82    delete(s.states, uploadID)
 83    return nil
 84}
 85
 86// NewMultipartUploader creates a new uploader
 87func NewMultipartUploader(chunkSize int64, maxWorkers int, store UploadStateStore) *MultipartUploader {
 88    return &MultipartUploader{
 89        chunkSize:   chunkSize,
 90        maxWorkers:  maxWorkers,
 91        maxRetries:  3,
 92        uploadStore: store,
 93    }
 94}
 95
 96// InitiateUpload starts a new multipart upload
 97func (u *MultipartUploader) InitiateUpload(ctx context.Context, key string, totalSize int64) (string, error) {
 98    uploadID := generateUploadID()
 99
100    state := &UploadState{
101        UploadID:       uploadID,
102        Key:            key,
103        TotalSize:      totalSize,
104        ChunkSize:      u.chunkSize,
105        CompletedParts: make(map[int]PartInfo),
106        CreatedAt:      time.Now(),
107    }
108
109    if err := u.uploadStore.Save(state); err != nil {
110        return "", fmt.Errorf("failed to save upload state: %w", err)
111    }
112
113    log.Printf("Initiated upload %s for %s (%d bytes)", uploadID, key, totalSize)
114    return uploadID, nil
115}
116
117// UploadPart uploads a single part
118func (u *MultipartUploader) UploadPart(ctx context.Context, uploadID string, partNumber int, data io.Reader, size int64) error {
119    state, err := u.uploadStore.Load(uploadID)
120    if err != nil {
121        return err
122    }
123
124    // Check if part already uploaded
125    if _, exists := state.CompletedParts[partNumber]; exists {
126        log.Printf("Part %d already uploaded, skipping", partNumber)
127        return nil
128    }
129
130    // Read and hash the data
131    hasher := md5.New()
132    buf := make([]byte, size)
133    n, err := io.ReadFull(data, buf)
134    if err != nil && err != io.EOF {
135        return fmt.Errorf("failed to read part data: %w", err)
136    }
137
138    hasher.Write(buf[:n])
139    etag := hex.EncodeToString(hasher.Sum(nil))
140
141    // Simulate upload (in production, upload to actual storage)
142    time.Sleep(100 * time.Millisecond) // Simulate network latency
143
144    // Record completed part
145    state.CompletedParts[partNumber] = PartInfo{
146        PartNumber: partNumber,
147        ETag:       etag,
148        Size:       int64(n),
149        UploadedAt: time.Now(),
150    }
151
152    if err := u.uploadStore.Save(state); err != nil {
153        return fmt.Errorf("failed to save upload state: %w", err)
154    }
155
156    log.Printf("Uploaded part %d (%d bytes, ETag: %s)", partNumber, n, etag)
157    return nil
158}
159
160// UploadWithResume uploads a file with resume capability
161func (u *MultipartUploader) UploadWithResume(ctx context.Context, uploadID string, data io.Reader, totalSize int64) error {
162    state, err := u.uploadStore.Load(uploadID)
163    if err != nil {
164        return err
165    }
166
167    totalParts := int((totalSize + u.chunkSize - 1) / u.chunkSize)
168    log.Printf("Uploading %d parts (chunk size: %d bytes)", totalParts, u.chunkSize)
169
170    // Create work queue for parts
171    type partWork struct {
172        partNumber int
173        offset     int64
174        size       int64
175    }
176
177    workChan := make(chan partWork, totalParts)
178    errorChan := make(chan error, totalParts)
179    var wg sync.WaitGroup
180
181    // Queue parts that haven't been uploaded yet
182    for partNum := 1; partNum <= totalParts; partNum++ {
183        if _, completed := state.CompletedParts[partNum]; !completed {
184            offset := int64(partNum-1) * u.chunkSize
185            size := u.chunkSize
186            if offset+size > totalSize {
187                size = totalSize - offset
188            }
189
190            workChan <- partWork{
191                partNumber: partNum,
192                offset:     offset,
193                size:       size,
194            }
195        }
196    }
197    close(workChan)
198
199    // Start worker pool
200    for i := 0; i < u.maxWorkers; i++ {
201        wg.Add(1)
202        go func(workerID int) {
203            defer wg.Done()
204
205            for work := range workChan {
206                // In production, seek to work.offset and read work.size bytes
207                // For demo, create sample data
208                chunk := make([]byte, work.size)
209                for i := range chunk {
210                    chunk[i] = byte((work.offset + int64(i)) % 256)
211                }
212
213                err := u.UploadPart(ctx, uploadID, work.partNumber,
214                    &limitedReader{data: chunk, limit: work.size}, work.size)
215                if err != nil {
216                    log.Printf("Worker %d failed to upload part %d: %v",
217                        workerID, work.partNumber, err)
218                    errorChan <- err
219                    return
220                }
221
222                log.Printf("Worker %d completed part %d", workerID, work.partNumber)
223            }
224        }(i)
225    }
226
227    wg.Wait()
228    close(errorChan)
229
230    // Check for errors
231    if len(errorChan) > 0 {
232        return <-errorChan
233    }
234
235    return nil
236}
237
238// CompleteUpload finalizes the multipart upload
239func (u *MultipartUploader) CompleteUpload(ctx context.Context, uploadID string) error {
240    state, err := u.uploadStore.Load(uploadID)
241    if err != nil {
242        return err
243    }
244
245    totalParts := int((state.TotalSize + u.chunkSize - 1) / u.chunkSize)
246    if len(state.CompletedParts) != totalParts {
247        return fmt.Errorf("upload incomplete: %d/%d parts completed",
248            len(state.CompletedParts), totalParts)
249    }
250
251    log.Printf("Completing upload %s (%d parts)", uploadID, len(state.CompletedParts))
252
253    // In production, call CompleteMultipartUpload API
254    // This tells S3/storage to assemble all parts into final object
255
256    // Clean up upload state
257    if err := u.uploadStore.Delete(uploadID); err != nil {
258        log.Printf("Warning: failed to clean up upload state: %v", err)
259    }
260
261    log.Printf("Upload completed successfully: %s", state.Key)
262    return nil
263}
264
265// GetProgress returns upload progress
266func (u *MultipartUploader) GetProgress(uploadID string) (float64, error) {
267    state, err := u.uploadStore.Load(uploadID)
268    if err != nil {
269        return 0, err
270    }
271
272    var uploadedBytes int64
273    for _, part := range state.CompletedParts {
274        uploadedBytes += part.Size
275    }
276
277    progress := float64(uploadedBytes) / float64(state.TotalSize) * 100
278    return progress, nil
279}
280
281// AbortUpload cancels an upload and cleans up parts
282func (u *MultipartUploader) AbortUpload(ctx context.Context, uploadID string) error {
283    state, err := u.uploadStore.Load(uploadID)
284    if err != nil {
285        return err
286    }
287
288    log.Printf("Aborting upload %s for %s", uploadID, state.Key)
289
290    // In production, call AbortMultipartUpload API to delete uploaded parts
291
292    if err := u.uploadStore.Delete(uploadID); err != nil {
293        return fmt.Errorf("failed to delete upload state: %w", err)
294    }
295
296    return nil
297}
298
299type limitedReader struct {
300    data  []byte
301    limit int64
302    pos   int64
303}
304
305func (r *limitedReader) Read(p []byte) (n int, err error) {
306    if r.pos >= r.limit {
307        return 0, io.EOF
308    }
309
310    remaining := r.limit - r.pos
311    if int64(len(p)) > remaining {
312        p = p[:remaining]
313    }
314
315    n = copy(p, r.data[r.pos:])
316    r.pos += int64(n)
317    return n, nil
318}
319
320func generateUploadID() string {
321    return fmt.Sprintf("upload-%d", time.Now().UnixNano())
322}
323
324func main() {
325    ctx := context.Background()
326
327    // Create uploader with 5MB chunks, 4 workers
328    store := NewInMemoryUploadStore()
329    uploader := NewMultipartUploader(5*1024*1024, 4, store)
330
331    // Simulate uploading a 50MB file
332    fileSize := int64(50 * 1024 * 1024)
333    key := "large-file.bin"
334
335    // Initiate upload
336    uploadID, err := uploader.InitiateUpload(ctx, key, fileSize)
337    if err != nil {
338        log.Fatal(err)
339    }
340
341    fmt.Printf("Upload initiated: %s\n", uploadID)
342
343    // Upload parts (simulated)
344    dummyReader := &dummyReader{size: fileSize}
345    err = uploader.UploadWithResume(ctx, uploadID, dummyReader, fileSize)
346    if err != nil {
347        log.Printf("Upload failed: %v", err)
348        if abortErr := uploader.AbortUpload(ctx, uploadID); abortErr != nil {
349            log.Printf("Failed to abort upload: %v", abortErr)
350        }
351        log.Fatal(err)
352    }
353
354    // Check progress
355    progress, err := uploader.GetProgress(uploadID)
356    if err != nil {
357        log.Fatal(err)
358    }
359    fmt.Printf("Upload progress: %.2f%%\n", progress)
360
361    // Complete upload
362    err = uploader.CompleteUpload(ctx, uploadID)
363    if err != nil {
364        log.Fatal(err)
365    }
366
367    fmt.Println("Multipart upload completed successfully!")
368}
369
370type dummyReader struct {
371    size int64
372    pos  int64
373}
374
375func (r *dummyReader) Read(p []byte) (n int, err error) {
376    if r.pos >= r.size {
377        return 0, io.EOF
378    }
379    n = len(p)
380    if int64(n) > r.size-r.pos {
381        n = int(r.size - r.pos)
382    }
383    for i := 0; i < n; i++ {
384        p[i] = byte((r.pos + int64(i)) % 256)
385    }
386    r.pos += int64(n)
387    return n, nil
388}

Production Multipart Upload with AWS S3

Here's a production-ready implementation using AWS SDK v2:

  1// run
  2package main
  3
  4import (
  5    "context"
  6    "fmt"
  7    "io"
  8    "log"
  9    "sync"
 10    "time"
 11)
 12
 13// S3MultipartUploader handles production multipart uploads to S3
 14type S3MultipartUploader struct {
 15    // In production, would contain:
 16    // client *s3.Client
 17    bucket      string
 18    minPartSize int64
 19    maxWorkers  int
 20    maxRetries  int
 21}
 22
 23// UploadResult contains the result of a multipart upload
 24type UploadResult struct {
 25    Location  string
 26    VersionID string
 27    ETag      string
 28    Duration  time.Duration
 29}
 30
 31// NewS3MultipartUploader creates an S3 multipart uploader
 32func NewS3MultipartUploader(bucket string) *S3MultipartUploader {
 33    return &S3MultipartUploader{
 34        bucket:      bucket,
 35        minPartSize: 5 * 1024 * 1024, // 5MB minimum for S3
 36        maxWorkers:  10,
 37        maxRetries:  3,
 38    }
 39}
 40
 41// UploadLargeFile handles multipart upload with progress tracking
 42func (u *S3MultipartUploader) UploadLargeFile(
 43    ctx context.Context,
 44    key string,
 45    data io.Reader,
 46    totalSize int64,
 47    progressCallback func(bytesUploaded int64, totalSize int64),
 48) (*UploadResult, error) {
 49    startTime := time.Now()
 50
 51    log.Printf("Starting multipart upload: %s (%d bytes)", key, totalSize)
 52
 53    // Step 1: Create multipart upload
 54    uploadID, err := u.createMultipartUpload(ctx, key)
 55    if err != nil {
 56        return nil, fmt.Errorf("failed to create multipart upload: %w", err)
 57    }
 58
 59    log.Printf("Created multipart upload: %s", uploadID)
 60
 61    // Step 2: Upload parts in parallel
 62    parts, err := u.uploadParts(ctx, uploadID, key, data, totalSize, progressCallback)
 63    if err != nil {
 64        // Abort upload on failure
 65        u.abortMultipartUpload(ctx, key, uploadID)
 66        return nil, fmt.Errorf("failed to upload parts: %w", err)
 67    }
 68
 69    // Step 3: Complete multipart upload
 70    result, err := u.completeMultipartUpload(ctx, key, uploadID, parts)
 71    if err != nil {
 72        u.abortMultipartUpload(ctx, key, uploadID)
 73        return nil, fmt.Errorf("failed to complete upload: %w", err)
 74    }
 75
 76    result.Duration = time.Since(startTime)
 77    log.Printf("Upload completed in %v", result.Duration)
 78
 79    return result, nil
 80}
 81
 82func (u *S3MultipartUploader) createMultipartUpload(ctx context.Context, key string) (string, error) {
 83    // In production:
 84    // input := &s3.CreateMultipartUploadInput{
 85    //     Bucket: aws.String(u.bucket),
 86    //     Key:    aws.String(key),
 87    //     ServerSideEncryption: types.ServerSideEncryptionAes256,
 88    // }
 89    // output, err := u.client.CreateMultipartUpload(ctx, input)
 90
 91    uploadID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
 92    return uploadID, nil
 93}
 94
 95func (u *S3MultipartUploader) uploadParts(
 96    ctx context.Context,
 97    uploadID string,
 98    key string,
 99    data io.Reader,
100    totalSize int64,
101    progressCallback func(int64, int64),
102) ([]CompletedPart, error) {
103    numParts := (totalSize + u.minPartSize - 1) / u.minPartSize
104    parts := make([]CompletedPart, int(numParts))
105
106    var uploadedBytes int64
107    var mu sync.Mutex
108
109    type partWork struct {
110        partNumber int
111        data       []byte
112    }
113
114    workChan := make(chan partWork, numParts)
115    errorChan := make(chan error, 1)
116    var wg sync.WaitGroup
117
118    // Start workers
119    for i := 0; i < u.maxWorkers; i++ {
120        wg.Add(1)
121        go func() {
122            defer wg.Done()
123
124            for work := range workChan {
125                select {
126                case <-ctx.Done():
127                    return
128                default:
129                }
130
131                etag, err := u.uploadSinglePart(ctx, uploadID, key, work.partNumber, work.data)
132                if err != nil {
133                    select {
134                    case errorChan <- err:
135                    default:
136                    }
137                    return
138                }
139
140                mu.Lock()
141                parts[work.partNumber-1] = CompletedPart{
142                    PartNumber: work.partNumber,
143                    ETag:       etag,
144                }
145                uploadedBytes += int64(len(work.data))
146                if progressCallback != nil {
147                    progressCallback(uploadedBytes, totalSize)
148                }
149                mu.Unlock()
150
151                log.Printf("Completed part %d/%d", work.partNumber, numParts)
152            }
153        }()
154    }
155
156    // Queue all parts
157    buf := make([]byte, u.minPartSize)
158    for partNum := 1; partNum <= int(numParts); partNum++ {
159        n, err := io.ReadFull(data, buf)
160        if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
161            close(workChan)
162            return nil, err
163        }
164
165        partData := make([]byte, n)
166        copy(partData, buf[:n])
167
168        workChan <- partWork{
169            partNumber: partNum,
170            data:       partData,
171        }
172
173        if err == io.EOF || err == io.ErrUnexpectedEOF {
174            break
175        }
176    }
177
178    close(workChan)
179    wg.Wait()
180    close(errorChan)
181
182    if err := <-errorChan; err != nil {
183        return nil, err
184    }
185
186    return parts, nil
187}
188
189func (u *S3MultipartUploader) uploadSinglePart(
190    ctx context.Context,
191    uploadID string,
192    key string,
193    partNumber int,
194    data []byte,
195) (string, error) {
196    // In production:
197    // input := &s3.UploadPartInput{
198    //     Bucket:     aws.String(u.bucket),
199    //     Key:        aws.String(key),
200    //     UploadId:   aws.String(uploadID),
201    //     PartNumber: aws.Int32(int32(partNumber)),
202    //     Body:       bytes.NewReader(data),
203    // }
204    //
205    // output, err := u.client.UploadPart(ctx, input)
206    // return *output.ETag, err
207
208    // Simulate upload
209    time.Sleep(50 * time.Millisecond)
210    return fmt.Sprintf("etag-%d", partNumber), nil
211}
212
213func (u *S3MultipartUploader) completeMultipartUpload(
214    ctx context.Context,
215    key string,
216    uploadID string,
217    parts []CompletedPart,
218) (*UploadResult, error) {
219    // In production:
220    // completedParts := make([]types.CompletedPart, len(parts))
221    // for i, p := range parts {
222    //     completedParts[i] = types.CompletedPart{
223    //         ETag:       aws.String(p.ETag),
224    //         PartNumber: aws.Int32(int32(p.PartNumber)),
225    //     }
226    // }
227    //
228    // input := &s3.CompleteMultipartUploadInput{
229    //     Bucket:   aws.String(u.bucket),
230    //     Key:      aws.String(key),
231    //     UploadId: aws.String(uploadID),
232    //     MultipartUpload: &types.CompletedMultipartUpload{
233    //         Parts: completedParts,
234    //     },
235    // }
236    //
237    // output, err := u.client.CompleteMultipartUpload(ctx, input)
238
239    return &UploadResult{
240        Location:  fmt.Sprintf("https://%s.s3.amazonaws.com/%s", u.bucket, key),
241        ETag:      "final-etag",
242        VersionID: "version-id",
243    }, nil
244}
245
246func (u *S3MultipartUploader) abortMultipartUpload(ctx context.Context, key string, uploadID string) error {
247    // In production:
248    // input := &s3.AbortMultipartUploadInput{
249    //     Bucket:   aws.String(u.bucket),
250    //     Key:      aws.String(key),
251    //     UploadId: aws.String(uploadID),
252    // }
253    // _, err := u.client.AbortMultipartUpload(ctx, input)
254
255    log.Printf("Aborted multipart upload: %s", uploadID)
256    return nil
257}
258
259type CompletedPart struct {
260    PartNumber int
261    ETag       string
262}
263
264func main() {
265    ctx := context.Background()
266
267    uploader := NewS3MultipartUploader("my-bucket")
268
269    // Simulate 100MB file
270    fileSize := int64(100 * 1024 * 1024)
271    dummyData := &dummyDataReader{size: fileSize}
272
273    // Progress callback
274    progressCallback := func(uploaded, total int64) {
275        percent := float64(uploaded) / float64(total) * 100
276        fmt.Printf("\rProgress: %.2f%% (%d/%d bytes)", percent, uploaded, total)
277    }
278
279    result, err := uploader.UploadLargeFile(
280        ctx,
281        "large-file.bin",
282        dummyData,
283        fileSize,
284        progressCallback,
285    )
286    if err != nil {
287        log.Fatal(err)
288    }
289
290    fmt.Printf("\n\nUpload successful!\n")
291    fmt.Printf("Location: %s\n", result.Location)
292    fmt.Printf("ETag: %s\n", result.ETag)
293    fmt.Printf("Duration: %v\n", result.Duration)
294}
295
296type dummyDataReader struct {
297    size int64
298    pos  int64
299}
300
301func (r *dummyDataReader) Read(p []byte) (n int, err error) {
302    if r.pos >= r.size {
303        return 0, io.EOF
304    }
305    n = len(p)
306    if int64(n) > r.size-r.pos {
307        n = int(r.size - r.pos)
308    }
309    for i := 0; i < n; i++ {
310        p[i] = byte((r.pos + int64(i)) % 256)
311    }
312    r.pos += int64(n)
313    return n, nil
314}

Best Practices for Resumable Uploads

1. State Persistence: Store upload state in Redis or a database for reliability across server restarts.

2. Part Size Selection:

  • Minimum: 5MB for S3 (except last part)
  • Optimal: 10-100MB depending on file size
  • Maximum: 5GB per part

3. Error Handling:

  • Implement exponential backoff for retries
  • Abort uploads after maximum retry attempts
  • Clean up incomplete uploads periodically

4. Progress Tracking:

  • Report progress to users
  • Enable pause/resume functionality
  • Store checksums for data integrity

5. Concurrency Control:

  • Use worker pools to limit concurrent uploads
  • Adjust worker count based on available bandwidth
  • Implement rate limiting to prevent throttling

Practice Exercises

Now let's apply these concepts with hands-on exercises:

Exercise 1: File Upload Service with Direct Browser Uploads

Learning Objective: Build a secure, scalable file upload service that handles direct browser uploads, content validation, and automated processing pipelines.

Real-World Context: File upload services are critical infrastructure at companies like Dropbox and Instagram, where billions of files are uploaded daily. These systems must handle security threats, process various file types, generate thumbnails, and store content efficiently across multiple storage tiers.

Difficulty: Intermediate | Time Estimate: 65-80 minutes

Objective: Build a file upload service with presigned URLs, virus scanning, and thumbnail generation.

Solution
  1package main
  2
  3import (
  4    "context"
  5    "crypto/sha256"
  6    "encoding/hex"
  7    "encoding/json"
  8    "fmt"
  9    "image"
 10    "image/jpeg"
 11    "image/png"
 12    "io"
 13    "net/http"
 14    "time"
 15
 16    "github.com/aws/aws-sdk-go-v2/service/s3"
 17    "github.com/nfnt/resize"
 18)
 19
 20type FileUploadService struct {
 21    storage    *S3Storage
 22    maxSize    int64
 23    allowedTypes map[string]bool
 24}
 25
 26func NewFileUploadService(storage *S3Storage) *FileUploadService {
 27    return &FileUploadService{
 28        storage:  storage,
 29        maxSize:  10 * 1024 * 1024, // 10MB
 30        allowedTypes: map[string]bool{
 31            "image/jpeg": true,
 32            "image/png":  true,
 33            "image/gif":  true,
 34        },
 35    }
 36}
 37
 38// GenerateUploadURL generates presigned upload URL
 39func GenerateUploadURL(ctx context.Context, filename, contentType string) {
 40    // Validate content type
 41    if !f.allowedTypes[contentType] {
 42        return nil, fmt.Errorf("content type not allowed: %s", contentType)
 43    }
 44
 45    // Generate unique key
 46    key := f.generateKey(filename)
 47
 48    // Generate presigned URL
 49    url, err := f.storage.GetSignedUploadURL(ctx, key, 15*time.Minute)
 50    if err != nil {
 51        return nil, err
 52    }
 53
 54    return &UploadURLResponse{
 55        UploadURL: url,
 56        Key:       key,
 57        ExpiresAt: time.Now().Add(15 * time.Minute),
 58    }, nil
 59}
 60
 61type UploadURLResponse struct {
 62    UploadURL string    `json:"upload_url"`
 63    Key       string    `json:"key"`
 64    ExpiresAt time.Time `json:"expires_at"`
 65}
 66
 67// ProcessUpload processes uploaded file
 68func ProcessUpload(ctx context.Context, key string) error {
 69    // Download file
 70    data, err := f.storage.Download(ctx, key)
 71    if err != nil {
 72        return err
 73    }
 74
 75    // Scan for viruses
 76    if err := f.scanFile(data); err != nil {
 77        return fmt.Errorf("virus detected: %w", err)
 78    }
 79
 80    // Generate thumbnail if image
 81    if err := f.generateThumbnail(ctx, key, data); err != nil {
 82        return fmt.Errorf("thumbnail generation failed: %w", err)
 83    }
 84
 85    // Tag as processed
 86    return f.tagAsProcessed(ctx, key)
 87}
 88
 89func generateKey(filename string) string {
 90    hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%d", filename, time.Now().UnixNano())))
 91    return fmt.Sprintf("uploads/%s/%s", time.Now().Format("2006/01/02"), hex.EncodeToString(hash[:8]))
 92}
 93
 94func scanFile(data []byte) error {
 95    // Placeholder for virus scanning
 96    // In production, integrate with ClamAV or cloud scanning service
 97    return nil
 98}
 99
100func generateThumbnail(ctx context.Context, key string, data []byte) error {
101    // Decode image
102    img, format, err := image.Decode(bytes.NewReader(data))
103    if err != nil {
104        return err
105    }
106
107    // Generate thumbnail
108    thumbnail := resize.Resize(200, 0, img, resize.Lanczos3)
109
110    // Encode thumbnail
111    var buf bytes.Buffer
112    switch format {
113    case "jpeg":
114        jpeg.Encode(&buf, thumbnail, &jpeg.Options{Quality: 85})
115    case "png":
116        png.Encode(&buf, thumbnail)
117    default:
118        return fmt.Errorf("unsupported format: %s", format)
119    }
120
121    // Upload thumbnail
122    thumbnailKey := fmt.Sprintf("thumbnails/%s", key)
123    return f.storage.Upload(ctx, thumbnailKey, &buf,
124        WithContentType(fmt.Sprintf("image/%s", format)),
125    )
126}
127
128func tagAsProcessed(ctx context.Context, key string) error {
129    // Add metadata tag
130    // Implementation depends on storage provider
131    return nil
132}
133
134// HTTP Handlers
135func HandleGetUploadURL(w http.ResponseWriter, r *http.Request) {
136    var req struct {
137        Filename    string `json:"filename"`
138        ContentType string `json:"content_type"`
139    }
140
141    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
142        http.Error(w, "Invalid request", http.StatusBadRequest)
143        return
144    }
145
146    resp, err := f.GenerateUploadURL(r.Context(), req.Filename, req.ContentType)
147    if err != nil {
148        http.Error(w, err.Error(), http.StatusBadRequest)
149        return
150    }
151
152    w.Header().Set("Content-Type", "application/json")
153    json.NewEncoder(w).Encode(resp)
154}
155
156func HandleProcessUpload(w http.ResponseWriter, r *http.Request) {
157    var req struct {
158        Key string `json:"key"`
159    }
160
161    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
162        http.Error(w, "Invalid request", http.StatusBadRequest)
163        return
164    }
165
166    if err := f.ProcessUpload(r.Context(), req.Key); err != nil {
167        http.Error(w, err.Error(), http.StatusInternalServerError)
168        return
169    }
170
171    w.WriteHeader(http.StatusOK)
172    json.NewEncoder(w).Encode(map[string]string{"status": "processed"})
173}
174
175func main() {
176    ctx := context.Background()
177
178    storage, _ := NewS3Storage(ctx, "my-uploads-bucket")
179    service := NewFileUploadService(storage)
180
181    http.HandleFunc("/upload-url", service.HandleGetUploadURL)
182    http.HandleFunc("/process", service.HandleProcessUpload)
183
184    log.Println("Server starting on :8080")
185    http.ListenAndServe(":8080", nil)
186}

Exercise 2: Multi-Cloud Storage Abstraction

Learning Objective: Design and implement a unified storage interface that abstracts away the differences between major cloud storage providers while enabling multi-cloud strategies.

Real-World Context: Multi-cloud storage strategies are essential for enterprise risk management at companies like Netflix and Spotify, where avoiding vendor lock-in and ensuring business continuity are critical.

Difficulty: Advanced | Time Estimate: 70-85 minutes

Objective: Create a storage abstraction that works seamlessly across S3, GCS, and Azure Blob Storage.

Solution
  1package main
  2
  3import (
  4    "context"
  5    "fmt"
  6    "io"
  7    "time"
  8)
  9
 10// CloudStorage provides cloud-agnostic storage interface
 11type CloudStorage interface {
 12    Upload(ctx context.Context, key string, data io.Reader) error
 13    Download(ctx context.Context, key string)
 14    Delete(ctx context.Context, key string) error
 15    List(ctx context.Context, prefix string)
 16    GetSignedURL(ctx context.Context, key string, expiry time.Duration)
 17}
 18
 19// MultiCloudStorage manages multiple storage backends
 20type MultiCloudStorage struct {
 21    primary   CloudStorage
 22    secondary CloudStorage
 23    replicate bool
 24}
 25
 26func NewMultiCloudStorage(primary, secondary CloudStorage, replicate bool) *MultiCloudStorage {
 27    return &MultiCloudStorage{
 28        primary:   primary,
 29        secondary: secondary,
 30        replicate: replicate,
 31    }
 32}
 33
 34// Upload uploads to primary and optionally replicates to secondary
 35func Upload(ctx context.Context, key string, data io.Reader) error {
 36    // For replication, we need to read data twice
 37    var buf *bytes.Buffer
 38    if m.replicate && m.secondary != nil {
 39        buf = &bytes.Buffer{}
 40        data = io.TeeReader(data, buf)
 41    }
 42
 43    // Upload to primary
 44    if err := m.primary.Upload(ctx, key, data); err != nil {
 45        return fmt.Errorf("primary upload failed: %w", err)
 46    }
 47
 48    // Replicate to secondary
 49    if m.replicate && m.secondary != nil {
 50        go func() {
 51            ctx := context.Background()
 52            if err := m.secondary.Upload(ctx, key, buf); err != nil {
 53                log.Printf("Secondary upload failed: %v", err)
 54            }
 55        }()
 56    }
 57
 58    return nil
 59}
 60
 61// Download tries primary first, falls back to secondary
 62func Download(ctx context.Context, key string) {
 63    data, err := m.primary.Download(ctx, key)
 64    if err == nil {
 65        return data, nil
 66    }
 67
 68    log.Printf("Primary download failed, trying secondary: %v", err)
 69
 70    if m.secondary != nil {
 71        return m.secondary.Download(ctx, key)
 72    }
 73
 74    return nil, err
 75}
 76
 77// Delete deletes from both storages
 78func Delete(ctx context.Context, key string) error {
 79    err1 := m.primary.Delete(ctx, key)
 80
 81    if m.secondary != nil {
 82        err2 := m.secondary.Delete(ctx, key)
 83        if err2 != nil {
 84            log.Printf("Secondary delete failed: %v", err2)
 85        }
 86    }
 87
 88    return err1
 89}
 90
 91func main() {
 92    ctx := context.Background()
 93
 94    // Setup S3 as primary
 95    s3Storage, _ := NewS3Storage(ctx, "primary-bucket")
 96    s3Adapter := &S3Adapter{storage: s3Storage}
 97
 98    // Setup GCS as secondary
 99    gcsStorage, _ := NewGCSStorage(ctx, "secondary-bucket")
100    gcsAdapter := &GCSAdapter{storage: gcsStorage}
101
102    // Create multi-cloud storage with replication
103    storage := NewMultiCloudStorage(s3Adapter, gcsAdapter, true)
104
105    // Use unified interface
106    data := bytes.NewReader([]byte("Hello, multi-cloud!"))
107    storage.Upload(ctx, "test.txt", data)
108
109    downloaded, _ := storage.Download(ctx, "test.txt")
110    fmt.Printf("Downloaded: %s\n", string(downloaded))
111}

Exercise 3: Presigned URL Generator with Advanced Security

Learning Objective: Master secure direct-to-cloud upload patterns by building a comprehensive presigned URL generation system with advanced security features.

Real-World Context: Presigned URLs enable efficient file uploads at companies like Facebook and Google, where billions of files are uploaded directly to cloud storage without burdening application servers.

Difficulty: Advanced | Time Estimate: 75-90 minutes

Objective: Build a comprehensive presigned URL generator supporting S3, GCS, and Azure Blob with custom headers, expiration, and security validation.

Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"crypto/hmac"
  7	"crypto/sha256"
  8	"encoding/base64"
  9	"fmt"
 10	"net/url"
 11	"strings"
 12	"time"
 13
 14	"github.com/aws/aws-sdk-go-v2/aws"
 15	"github.com/aws/aws-sdk-go-v2/config"
 16	"github.com/aws/aws-sdk-go-v2/service/s3"
 17)
 18
 19// PresignedURLGenerator generates presigned URLs for multiple cloud providers
 20type PresignedURLGenerator struct {
 21	s3Client *s3.Client
 22	bucket   string
 23}
 24
 25func NewPresignedURLGenerator(ctx context.Context, bucket string) {
 26	cfg, err := config.LoadDefaultConfig(ctx)
 27	if err != nil {
 28		return nil, fmt.Errorf("failed to load config: %w", err)
 29	}
 30
 31	return &PresignedURLGenerator{
 32		s3Client: s3.NewFromConfig(cfg),
 33		bucket:   bucket,
 34	}, nil
 35}
 36
 37// PresignConfig configures presigned URL generation
 38type PresignConfig struct {
 39	Key            string
 40	Expiry         time.Duration
 41	ContentType    string
 42	CustomHeaders  map[string]string
 43	MaxSizeBytes   int64
 44	AllowedOrigins []string
 45	Operation      string // "upload" or "download"
 46}
 47
 48// PresignedURLResponse contains the generated URL and metadata
 49type PresignedURLResponse struct {
 50	URL        string            `json:"url"`
 51	Key        string            `json:"key"`
 52	ExpiresAt  time.Time         `json:"expires_at"`
 53	Headers    map[string]string `json:"headers"`
 54	Method     string            `json:"method"`
 55	MaxSize    int64             `json:"max_size,omitempty"`
 56	Checksum   string            `json:"checksum"`
 57}
 58
 59// GenerateUploadURL generates presigned URL for uploads
 60func GenerateUploadURL(ctx context.Context, cfg PresignConfig) {
 61	// Validate configuration
 62	if err := p.validateConfig(cfg); err != nil {
 63		return nil, fmt.Errorf("invalid config: %w", err)
 64	}
 65
 66	presignClient := s3.NewPresignClient(p.s3Client)
 67
 68	// Build PutObject input with constraints
 69	input := &s3.PutObjectInput{
 70		Bucket: aws.String(p.bucket),
 71		Key:    aws.String(cfg.Key),
 72	}
 73
 74	// Set content-type constraint
 75	if cfg.ContentType != "" {
 76		input.ContentType = aws.String(cfg.ContentType)
 77	}
 78
 79	// Add metadata for validation
 80	metadata := make(map[string]string)
 81	if cfg.MaxSizeBytes > 0 {
 82		metadata["max-size"] = fmt.Sprintf("%d", cfg.MaxSizeBytes)
 83	}
 84	metadata["upload-token"] = p.generateUploadToken(cfg.Key)
 85	input.Metadata = metadata
 86
 87	// Generate presigned URL
 88	request, err := presignClient.PresignPutObject(ctx, input,
 89		s3.WithPresignExpires(cfg.Expiry))
 90	if err != nil {
 91		return nil, fmt.Errorf("failed to create presigned URL: %w", err)
 92	}
 93
 94	// Build response headers
 95	headers := make(map[string]string)
 96	if cfg.ContentType != "" {
 97		headers["Content-Type"] = cfg.ContentType
 98	}
 99	for k, v := range cfg.CustomHeaders {
100		headers[k] = v
101	}
102
103	// Add security headers
104	headers["x-amz-server-side-encryption"] = "AES256"
105
106	return &PresignedURLResponse{
107		URL:       request.URL,
108		Key:       cfg.Key,
109		ExpiresAt: time.Now().Add(cfg.Expiry),
110		Headers:   headers,
111		Method:    "PUT",
112		MaxSize:   cfg.MaxSizeBytes,
113		Checksum:  p.generateChecksum(cfg.Key, cfg.Expiry),
114	}, nil
115}
116
117// validateConfig validates presign configuration
118func validateConfig(cfg PresignConfig) error {
119	if cfg.Key == "" {
120		return fmt.Errorf("key is required")
121	}
122
123	if cfg.Expiry < time.Minute || cfg.Expiry > 7*24*time.Hour {
124		return fmt.Errorf("expiry must be between 1 minute and 7 days")
125	}
126
127	if cfg.MaxSizeBytes > 5*1024*1024*1024 { // 5GB S3 limit
128		return fmt.Errorf("max size exceeds S3 limit of 5GB")
129	}
130
131	// Validate content type if specified
132	if cfg.ContentType != "" && !isValidContentType(cfg.ContentType) {
133		return fmt.Errorf("invalid content type: %s", cfg.ContentType)
134	}
135
136	return nil
137}
138
139func isValidContentType(ct string) bool {
140	validTypes := []string{
141		"image/jpeg", "image/png", "image/gif", "image/webp",
142		"video/mp4", "video/webm",
143		"application/pdf", "application/json",
144		"text/plain", "text/csv",
145	}
146	for _, valid := range validTypes {
147		if ct == valid {
148			return true
149		}
150	}
151	return false
152}
153
154// generateUploadToken generates a verification token
155func generateUploadToken(key string) string {
156	h := sha256.New()
157	h.Write([]byte(key + time.Now().Format(time.RFC3339)))
158	return base64.URLEncoding.EncodeToString(h.Sum(nil))[:16]
159}
160
161// generateChecksum generates URL integrity checksum
162func generateChecksum(key string, expiry time.Duration) string {
163	data := fmt.Sprintf("%s:%s", key, expiry)
164	h := hmac.New(sha256.New, []byte("secret-key"))
165	h.Write([]byte(data))
166	return base64.StdEncoding.EncodeToString(h.Sum(nil))[:12]
167}
168
169// Example usage demonstrating direct browser uploads
170func main() {
171	ctx := context.Background()
172
173	generator, err := NewPresignedURLGenerator(ctx, "my-upload-bucket")
174	if err != nil {
175		fmt.Printf("Error creating generator: %v\n", err)
176		return
177	}
178
179	// Example 1: Generate upload URL for image
180	uploadConfig := PresignConfig{
181		Key:           "uploads/profile-photo.jpg",
182		Expiry:        15 * time.Minute,
183		ContentType:   "image/jpeg",
184		MaxSizeBytes:  5 * 1024 * 1024, // 5MB
185		AllowedOrigins: []string{"https://myapp.com"},
186		Operation:     "upload",
187	}
188
189	uploadURL, err := generator.GenerateUploadURL(ctx, uploadConfig)
190	if err != nil {
191		fmt.Printf("Error generating upload URL: %v\n", err)
192		return
193	}
194
195	fmt.Printf("Upload URL Generated:\n")
196	fmt.Printf("  URL: %s\n", uploadURL.URL[:80]+"...")
197	fmt.Printf("  Method: %s\n", uploadURL.Method)
198	fmt.Printf("  Expires: %v\n", uploadURL.ExpiresAt)
199	fmt.Printf("  Max Size: %d bytes\n", uploadURL.MaxSize)
200	fmt.Printf("  Required Headers: %v\n", uploadURL.Headers)
201
202	// Example 4: Browser upload example
203	fmt.Printf("\n--- Browser Upload Example ---\n")
204	fmt.Printf(`
205// JavaScript code for direct browser upload
206async function uploadFile(file) {
207    // 1. Request presigned URL from your backend
208    const response = await fetch('/api/presigned-url', {
209        method: 'POST',
210        headers: { 'Content-Type': 'application/json' },
211        body: JSON.stringify({
212            filename: file.name,
213            contentType: file.type,
214            size: file.size
215        })
216    });
217
218    const { url, headers } = await response.json();
219
220    // 2. Upload directly to S3 using presigned URL
221    const uploadResponse = await fetch(url, {
222        method: 'PUT',
223        headers: headers,
224        body: file
225    });
226
227    if {
228        console.log('Upload successful!');
229    }
230}
231`)
232}

Explanation:

This presigned URL generator provides production-ready features:

  1. Multi-Operation Support: Handles both upload and download URLs with appropriate HTTP methods and constraints
  2. Security Validation: Validates content types, size limits, expiration bounds, and URL integrity
  3. Custom Headers: Supports custom headers for CORS, content-type enforcement, and metadata
  4. Upload Tokens: Generates verification tokens to prevent unauthorized uploads
  5. URL Validation: Verifies presigned URLs before use to prevent security issues
  6. Checksum Generation: Creates integrity checksums for URL validation

Production Considerations:

  • Expiration Limits: URLs expire between 1 minute and 7 days to balance usability and security
  • Size Constraints: Enforces S3's 5GB single-object limit
  • Content-Type Enforcement: Validates allowed file types to prevent malicious uploads
  • HTTPS Only: Ensures all presigned URLs use secure transport
  • Token-Based Auth: Upload tokens provide additional security layer
  • Browser-Friendly: Designed for direct browser uploads, reducing server load
  • CORS Support: Includes proper CORS headers for cross-origin uploads

The browser example shows how clients can upload directly to S3 without proxying through your backend, significantly reducing bandwidth costs and improving upload performance.

Exercise 4: Cost-Optimized Storage with Intelligent Lifecycle Management

Learning Objective: Build a comprehensive storage cost optimization system that automatically transitions objects between storage classes based on access patterns and implements intelligent lifecycle policies.

Real-World Context: Companies like Airbnb and Pinterest manage petabytes of user-generated content, where storage costs can run into millions of dollars annually. Intelligent lifecycle management can reduce these costs by 70-80% while maintaining required access performance.

Difficulty: Advanced | Time Estimate: 80-95 minutes

Objective: Create a storage lifecycle manager that monitors access patterns, automatically transitions objects between storage tiers, and implements cost-aware deletion policies.

Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "fmt"
  6    "time"
  7)
  8
  9// LifecycleManager handles intelligent storage lifecycle transitions
 10type LifecycleManager struct {
 11    storage        CloudStorage
 12    analytics      *AccessAnalytics
 13    costCalculator *CostCalculator
 14    policies       []LifecyclePolicy
 15}
 16
 17type CloudStorage interface {
 18    GetObjectMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error)
 19    TransitionStorageClass(ctx context.Context, bucket, key string, class StorageClass) error
 20    DeleteObject(ctx context.Context, bucket, key string) error
 21    ListObjects(ctx context.Context, bucket, prefix string) ([]ObjectInfo, error)
 22}
 23
 24type StorageClass string
 25
 26const (
 27    StorageClassStandard      StorageClass = "STANDARD"
 28    StorageClassIA            StorageClass = "STANDARD_IA"
 29    StorageClassGlacier       StorageClass = "GLACIER"
 30    StorageClassDeepArchive   StorageClass = "DEEP_ARCHIVE"
 31)
 32
 33type LifecyclePolicy struct {
 34    Name                string
 35    Prefix              string
 36    MinAge              time.Duration
 37    TargetClass         StorageClass
 38    DeleteAfter         time.Duration
 39    AccessThreshold     int // Min accesses to stay in current class
 40    CostThreshold       float64
 41}
 42
 43type ObjectMetadata struct {
 44    Key           string
 45    Size          int64
 46    StorageClass  StorageClass
 47    LastModified  time.Time
 48    LastAccessed  time.Time
 49    AccessCount   int
 50}
 51
 52type ObjectInfo struct {
 53    Key          string
 54    Size         int64
 55    LastModified time.Time
 56    StorageClass StorageClass
 57}
 58
 59func NewLifecycleManager(storage CloudStorage) *LifecycleManager {
 60    return &LifecycleManager{
 61        storage:        storage,
 62        analytics:      NewAccessAnalytics(),
 63        costCalculator: NewCostCalculator(),
 64        policies:       make([]LifecyclePolicy, 0),
 65    }
 66}
 67
 68// AddPolicy adds a lifecycle policy
 69func (lm *LifecycleManager) AddPolicy(policy LifecyclePolicy) {
 70    lm.policies = append(lm.policies, policy)
 71}
 72
 73// ApplyPolicies runs lifecycle policies on a bucket
 74func (lm *LifecycleManager) ApplyPolicies(ctx context.Context, bucket string) (*LifecycleReport, error) {
 75    report := &LifecycleReport{
 76        StartTime: time.Now(),
 77        Actions:   make([]LifecycleAction, 0),
 78    }
 79
 80    for _, policy := range lm.policies {
 81        objects, err := lm.storage.ListObjects(ctx, bucket, policy.Prefix)
 82        if err != nil {
 83            return nil, err
 84        }
 85
 86        for _, obj := range objects {
 87            action, err := lm.evaluateObject(ctx, bucket, obj, policy)
 88            if err != nil {
 89                continue
 90            }
 91
 92            if action != nil {
 93                report.Actions = append(report.Actions, *action)
 94            }
 95        }
 96    }
 97
 98    report.EndTime = time.Now()
 99    report.Duration = report.EndTime.Sub(report.StartTime)
100    lm.calculateSavings(report)
101
102    return report, nil
103}
104
105// evaluateObject evaluates a single object against a policy
106func (lm *LifecycleManager) evaluateObject(ctx context.Context, bucket string, obj ObjectInfo, policy LifecyclePolicy) (*LifecycleAction, error) {
107    // Get detailed metadata
108    metadata, err := lm.storage.GetObjectMetadata(ctx, bucket, obj.Key)
109    if err != nil {
110        return nil, err
111    }
112
113    // Get access analytics
114    accessPattern := lm.analytics.GetAccessPattern(obj.Key)
115
116    now := time.Now()
117    age := now.Sub(metadata.LastModified)
118
119    // Check if object should be deleted
120    if policy.DeleteAfter > 0 && age > policy.DeleteAfter {
121        if accessPattern.RecentAccesses < policy.AccessThreshold {
122            err := lm.storage.DeleteObject(ctx, bucket, obj.Key)
123            if err != nil {
124                return nil, err
125            }
126
127            return &LifecycleAction{
128                ObjectKey:      obj.Key,
129                Action:         ActionDelete,
130                OldClass:       metadata.StorageClass,
131                Size:           metadata.Size,
132                EstimatedSavings: lm.costCalculator.CalculateDeleteSavings(metadata),
133            }, nil
134        }
135    }
136
137    // Check if object should transition
138    if age > policy.MinAge && metadata.StorageClass != policy.TargetClass {
139        // Calculate cost benefit
140        currentCost := lm.costCalculator.CalculateMonthlyCost(metadata.Size, metadata.StorageClass, accessPattern)
141        newCost := lm.costCalculator.CalculateMonthlyCost(metadata.Size, policy.TargetClass, accessPattern)
142
143        if newCost < currentCost {
144            err := lm.storage.TransitionStorageClass(ctx, bucket, obj.Key, policy.TargetClass)
145            if err != nil {
146                return nil, err
147            }
148
149            return &LifecycleAction{
150                ObjectKey:        obj.Key,
151                Action:           ActionTransition,
152                OldClass:         metadata.StorageClass,
153                NewClass:         policy.TargetClass,
154                Size:             metadata.Size,
155                EstimatedSavings: currentCost - newCost,
156            }, nil
157        }
158    }
159
160    return nil, nil
161}
162
163type LifecycleAction struct {
164    ObjectKey        string
165    Action           ActionType
166    OldClass         StorageClass
167    NewClass         StorageClass
168    Size             int64
169    EstimatedSavings float64
170}
171
172type ActionType string
173
174const (
175    ActionTransition ActionType = "transition"
176    ActionDelete     ActionType = "delete"
177)
178
179type LifecycleReport struct {
180    StartTime         time.Time
181    EndTime           time.Time
182    Duration          time.Duration
183    Actions           []LifecycleAction
184    TotalSavings      float64
185    ObjectsTransitioned int
186    ObjectsDeleted    int
187    BytesFreed        int64
188}
189
190func (lm *LifecycleManager) calculateSavings(report *LifecycleReport) {
191    totalSavings := 0.0
192    objectsTransitioned := 0
193    objectsDeleted := 0
194    bytesFreed := int64(0)
195
196    for _, action := range report.Actions {
197        totalSavings += action.EstimatedSavings
198
199        switch action.Action {
200        case ActionTransition:
201            objectsTransitioned++
202        case ActionDelete:
203            objectsDeleted++
204            bytesFreed += action.Size
205        }
206    }
207
208    report.TotalSavings = totalSavings
209    report.ObjectsTransitioned = objectsTransitioned
210    report.ObjectsDeleted = objectsDeleted
211    report.BytesFreed = bytesFreed
212}
213
214// AccessAnalytics tracks access patterns
215type AccessAnalytics struct {
216    patterns map[string]*AccessPattern
217    mu       sync.RWMutex
218}
219
220type AccessPattern struct {
221    TotalAccesses   int
222    RecentAccesses  int // Last 30 days
223    LastAccess      time.Time
224    AvgAccessRate   float64 // Accesses per day
225}
226
227func NewAccessAnalytics() *AccessAnalytics {
228    return &AccessAnalytics{
229        patterns: make(map[string]*AccessPattern),
230    }
231}
232
233func (aa *AccessAnalytics) GetAccessPattern(key string) *AccessPattern {
234    aa.mu.RLock()
235    defer aa.mu.RUnlock()
236
237    pattern, exists := aa.patterns[key]
238    if !exists {
239        return &AccessPattern{
240            TotalAccesses:  0,
241            RecentAccesses: 0,
242            LastAccess:     time.Time{},
243            AvgAccessRate:  0,
244        }
245    }
246
247    return pattern
248}
249
250func (aa *AccessAnalytics) RecordAccess(key string) {
251    aa.mu.Lock()
252    defer aa.mu.Unlock()
253
254    pattern, exists := aa.patterns[key]
255    if !exists {
256        pattern = &AccessPattern{}
257        aa.patterns[key] = pattern
258    }
259
260    pattern.TotalAccesses++
261    pattern.RecentAccesses++
262    pattern.LastAccess = time.Now()
263
264    // Update average access rate
265    // Simple calculation - in production, use more sophisticated analytics
266    pattern.AvgAccessRate = float64(pattern.TotalAccesses) / 30.0
267}
268
269// CostCalculator calculates storage costs
270type CostCalculator struct {
271    // Cost per GB per month
272    storageCosts map[StorageClass]float64
273
274    // Cost per 1000 requests
275    requestCosts map[StorageClass]float64
276}
277
278func NewCostCalculator() *CostCalculator {
279    return &CostCalculator{
280        storageCosts: map[StorageClass]float64{
281            StorageClassStandard:    0.023,
282            StorageClassIA:          0.0125,
283            StorageClassGlacier:     0.004,
284            StorageClassDeepArchive: 0.00099,
285        },
286        requestCosts: map[StorageClass]float64{
287            StorageClassStandard:    0.0004,
288            StorageClassIA:          0.001,
289            StorageClassGlacier:     0.05,
290            StorageClassDeepArchive: 0.10,
291        },
292    }
293}
294
295func (cc *CostCalculator) CalculateMonthlyCost(sizeBytes int64, class StorageClass, pattern *AccessPattern) float64 {
296    sizeGB := float64(sizeBytes) / (1024 * 1024 * 1024)
297
298    // Storage cost
299    storageCost := sizeGB * cc.storageCosts[class]
300
301    // Request cost (estimated based on access pattern)
302    requestCost := (pattern.AvgAccessRate * 30 / 1000) * cc.requestCosts[class]
303
304    return storageCost + requestCost
305}
306
307func (cc *CostCalculator) CalculateDeleteSavings(metadata *ObjectMetadata) float64 {
308    sizeGB := float64(metadata.Size) / (1024 * 1024 * 1024)
309    return sizeGB * cc.storageCosts[metadata.StorageClass]
310}
311
312// Example usage
313func main() {
314    storage := &MockCloudStorage{}
315    manager := NewLifecycleManager(storage)
316
317    // Add policies
318    manager.AddPolicy(LifecyclePolicy{
319        Name:            "transition-to-ia",
320        Prefix:          "images/",
321        MinAge:          30 * 24 * time.Hour, // 30 days
322        TargetClass:     StorageClassIA,
323        AccessThreshold: 10,
324    })
325
326    manager.AddPolicy(LifecyclePolicy{
327        Name:            "archive-old-logs",
328        Prefix:          "logs/",
329        MinAge:          90 * 24 * time.Hour, // 90 days
330        TargetClass:     StorageClassGlacier,
331        DeleteAfter:     365 * 24 * time.Hour, // 1 year
332        AccessThreshold: 0,
333    })
334
335    // Apply policies
336    ctx := context.Background()
337    report, err := manager.ApplyPolicies(ctx, "my-bucket")
338    if err != nil {
339        fmt.Printf("Error: %v\n", err)
340        return
341    }
342
343    // Print report
344    fmt.Printf("Lifecycle Management Report\n")
345    fmt.Printf("Duration: %v\n", report.Duration)
346    fmt.Printf("Objects Transitioned: %d\n", report.ObjectsTransitioned)
347    fmt.Printf("Objects Deleted: %d\n", report.ObjectsDeleted)
348    fmt.Printf("Bytes Freed: %d GB\n", report.BytesFreed/(1024*1024*1024))
349    fmt.Printf("Estimated Monthly Savings: $%.2f\n", report.TotalSavings)
350}
351
352type MockCloudStorage struct{}
353
354func (m *MockCloudStorage) GetObjectMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error) {
355    return &ObjectMetadata{
356        Key:          key,
357        Size:         1024 * 1024 * 100, // 100MB
358        StorageClass: StorageClassStandard,
359        LastModified: time.Now().Add(-60 * 24 * time.Hour),
360    }, nil
361}
362
363func (m *MockCloudStorage) TransitionStorageClass(ctx context.Context, bucket, key string, class StorageClass) error {
364    return nil
365}
366
367func (m *MockCloudStorage) DeleteObject(ctx context.Context, bucket, key string) error {
368    return nil
369}
370
371func (m *MockCloudStorage) ListObjects(ctx context.Context, bucket, prefix string) ([]ObjectInfo, error) {
372    return []ObjectInfo{
373        {Key: "example.jpg", Size: 1024 * 1024 * 100, LastModified: time.Now().Add(-60 * 24 * time.Hour)},
374    }, nil
375}

Key Learning Points:

  1. Access Pattern Analysis: Tracks object access patterns to inform lifecycle decisions
  2. Cost Calculation: Computes actual costs for different storage classes including request costs
  3. Intelligent Transitions: Makes data-driven decisions about storage class transitions
  4. Policy-Based Management: Flexible policy engine for different data types and use cases
  5. Reporting: Comprehensive reporting on actions taken and cost savings achieved
  6. Production Patterns: Implements asynchronous processing, proper error handling, and monitoring

Exercise 5: Resilient Multi-Region Storage with Automatic Failover

Learning Objective: Design and implement a production-grade multi-region storage system with automatic failover, data replication, and geo-distributed reads.

Real-World Context: Global applications like GitHub and Zoom require storage systems that can survive regional outages, provide low-latency access worldwide, and maintain data consistency across regions.

Difficulty: Expert | Time Estimate: 95-120 minutes

Objective: Build a complete multi-region storage system with replication strategies, health monitoring, automatic failover, and conflict resolution.

Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "fmt"
  6    "sync"
  7    "time"
  8)
  9
 10// MultiRegionStorage manages storage across multiple geographic regions
 11type MultiRegionStorage struct {
 12    regions        map[string]*RegionConfig
 13    primary        string
 14    replication    ReplicationStrategy
 15    healthMonitor  *HealthMonitor
 16    failoverMgr    *FailoverManager
 17    mu             sync.RWMutex
 18}
 19
 20type RegionConfig struct {
 21    Name      string
 22    Storage   CloudStorage
 23    Endpoint  string
 24    IsHealthy bool
 25    IsPrimary bool
 26    Latency   time.Duration
 27    ErrorRate float64
 28}
 29
 30type ReplicationStrategy string
 31
 32const (
 33    ReplicationSync      ReplicationStrategy = "sync"
 34    ReplicationAsync     ReplicationStrategy = "async"
 35    ReplicationPrimaryOnly ReplicationStrategy = "primary_only"
 36)
 37
 38func NewMultiRegionStorage(primaryRegion string, replication ReplicationStrategy) *MultiRegionStorage {
 39    mrs := &MultiRegionStorage{
 40        regions:     make(map[string]*RegionConfig),
 41        primary:     primaryRegion,
 42        replication: replication,
 43    }
 44
 45    mrs.healthMonitor = NewHealthMonitor(mrs)
 46    mrs.failoverMgr = NewFailoverManager(mrs)
 47
 48    // Start background health monitoring
 49    go mrs.healthMonitor.Start(context.Background())
 50
 51    return mrs
 52}
 53
 54// AddRegion registers a new storage region
 55func (mrs *MultiRegionStorage) AddRegion(region *RegionConfig) {
 56    mrs.mu.Lock()
 57    defer mrs.mu.Unlock()
 58
 59    region.IsPrimary = (region.Name == mrs.primary)
 60    region.IsHealthy = true
 61    mrs.regions[region.Name] = region
 62}
 63
 64// Upload with multi-region replication
 65func (mrs *MultiRegionStorage) Upload(ctx context.Context, key string, data []byte) error {
 66    mrs.mu.RLock()
 67    primaryRegion := mrs.regions[mrs.primary]
 68    mrs.mu.RUnlock()
 69
 70    if primaryRegion == nil || !primaryRegion.IsHealthy {
 71        // Primary unavailable, failover
 72        return mrs.uploadWithFailover(ctx, key, data)
 73    }
 74
 75    // Upload to primary
 76    start := time.Now()
 77    err := primaryRegion.Storage.Put(ctx, key, data)
 78    primaryRegion.Latency = time.Since(start)
 79
 80    if err != nil {
 81        primaryRegion.ErrorRate += 0.1
 82        return mrs.uploadWithFailover(ctx, key, data)
 83    }
 84
 85    // Handle replication based on strategy
 86    switch mrs.replication {
 87    case ReplicationSync:
 88        return mrs.replicateSync(ctx, key, data, primaryRegion.Name)
 89    case ReplicationAsync:
 90        go mrs.replicateAsync(key, data, primaryRegion.Name)
 91        return nil
 92    case ReplicationPrimaryOnly:
 93        return nil
 94    }
 95
 96    return nil
 97}
 98
 99// uploadWithFailover attempts upload to healthy regions
100func (mrs *MultiRegionStorage) uploadWithFailover(ctx context.Context, key string, data []byte) error {
101    mrs.mu.RLock()
102    defer mrs.mu.RUnlock()
103
104    // Try each healthy region
105    for _, region := range mrs.regions {
106        if !region.IsHealthy || region.Name == mrs.primary {
107            continue
108        }
109
110        err := region.Storage.Put(ctx, key, data)
111        if err == nil {
112            // Successful upload to secondary
113            fmt.Printf("Failover upload to %s successful\n", region.Name)
114
115            // Async replicate to other regions
116            go mrs.replicateAsync(key, data, region.Name)
117
118            return nil
119        }
120    }
121
122    return fmt.Errorf("all regions unavailable")
123}
124
125// replicateSync synchronously replicates to all regions
126func (mrs *MultiRegionStorage) replicateSync(ctx context.Context, key string, data []byte, excludeRegion string) error {
127    mrs.mu.RLock()
128    defer mrs.mu.RUnlock()
129
130    var wg sync.WaitGroup
131    errChan := make(chan error, len(mrs.regions))
132
133    for name, region := range mrs.regions {
134        if name == excludeRegion || !region.IsHealthy {
135            continue
136        }
137
138        wg.Add(1)
139        go func(r *RegionConfig) {
140            defer wg.Done()
141
142            err := r.Storage.Put(ctx, key, data)
143            if err != nil {
144                errChan <- fmt.Errorf("%s: %w", r.Name, err)
145            }
146        }(region)
147    }
148
149    wg.Wait()
150    close(errChan)
151
152    // Collect errors
153    var errs []error
154    for err := range errChan {
155        errs = append(errs, err)
156    }
157
158    if len(errs) > 0 {
159        return fmt.Errorf("replication errors: %v", errs)
160    }
161
162    return nil
163}
164
165// replicateAsync asynchronously replicates to all regions
166func (mrs *MultiRegionStorage) replicateAsync(key string, data []byte, excludeRegion string) {
167    ctx := context.Background()
168
169    mrs.mu.RLock()
170    defer mrs.mu.RUnlock()
171
172    for name, region := range mrs.regions {
173        if name == excludeRegion || !region.IsHealthy {
174            continue
175        }
176
177        go func(r *RegionConfig) {
178            if err := r.Storage.Put(ctx, key, data); err != nil {
179                fmt.Printf("Async replication to %s failed: %v\n", r.Name, err)
180            }
181        }(region)
182    }
183}
184
185// Download with intelligent region selection
186func (mrs *MultiRegionStorage) Download(ctx context.Context, key string) ([]byte, error) {
187    // Select best region based on health and latency
188    region := mrs.selectBestRegion()
189
190    if region == nil {
191        return nil, fmt.Errorf("no healthy regions available")
192    }
193
194    start := time.Now()
195    data, err := region.Storage.Get(ctx, key)
196    region.Latency = time.Since(start)
197
198    if err != nil {
199        region.ErrorRate += 0.1
200
201        // Try other regions
202        return mrs.downloadWithFailover(ctx, key, region.Name)
203    }
204
205    region.ErrorRate *= 0.9 // Decay error rate
206    return data, nil
207}
208
209// downloadWithFailover tries other regions on failure
210func (mrs *MultiRegionStorage) downloadWithFailover(ctx context.Context, key string, excludeRegion string) ([]byte, error) {
211    mrs.mu.RLock()
212    defer mrs.mu.RUnlock()
213
214    for name, region := range mrs.regions {
215        if name == excludeRegion || !region.IsHealthy {
216            continue
217        }
218
219        data, err := region.Storage.Get(ctx, key)
220        if err == nil {
221            return data, nil
222        }
223    }
224
225    return nil, fmt.Errorf("object not found in any region")
226}
227
228// selectBestRegion chooses the optimal region for reads
229func (mrs *MultiRegionStorage) selectBestRegion() *RegionConfig {
230    mrs.mu.RLock()
231    defer mrs.mu.RUnlock()
232
233    var best *RegionConfig
234    bestScore := -1.0
235
236    for _, region := range mrs.regions {
237        if !region.IsHealthy {
238            continue
239        }
240
241        // Calculate score based on latency and error rate
242        latencyScore := 1.0 - (float64(region.Latency) / float64(time.Second))
243        errorScore := 1.0 - region.ErrorRate
244
245        score := (latencyScore * 0.6) + (errorScore * 0.4)
246
247        // Prefer primary region slightly
248        if region.IsPrimary {
249            score *= 1.1
250        }
251
252        if best == nil || score > bestScore {
253            best = region
254            bestScore = score
255        }
256    }
257
258    return best
259}
260
261// HealthMonitor monitors region health
262type HealthMonitor struct {
263    storage  *MultiRegionStorage
264    interval time.Duration
265}
266
267func NewHealthMonitor(storage *MultiRegionStorage) *HealthMonitor {
268    return &HealthMonitor{
269        storage:  storage,
270        interval: 30 * time.Second,
271    }
272}
273
274func (hm *HealthMonitor) Start(ctx context.Context) {
275    ticker := time.NewTicker(hm.interval)
276    defer ticker.Stop()
277
278    for {
279        select {
280        case <-ctx.Done():
281            return
282        case <-ticker.C:
283            hm.checkHealth()
284        }
285    }
286}
287
288func (hm *HealthMonitor) checkHealth() {
289    hm.storage.mu.RLock()
290    regions := make([]*RegionConfig, 0, len(hm.storage.regions))
291    for _, region := range hm.storage.regions {
292        regions = append(regions, region)
293    }
294    hm.storage.mu.RUnlock()
295
296    for _, region := range regions {
297        go func(r *RegionConfig) {
298            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
299            defer cancel()
300
301            // Perform health check (e.g., HEAD request to known object)
302            start := time.Now()
303            _, err := r.Storage.Get(ctx, "_health_check")
304            latency := time.Since(start)
305
306            hm.storage.mu.Lock()
307            defer hm.storage.mu.Unlock()
308
309            if err != nil {
310                r.IsHealthy = false
311                r.ErrorRate += 0.2
312            } else {
313                r.IsHealthy = true
314                r.Latency = latency
315                r.ErrorRate *= 0.8
316            }
317        }(region)
318    }
319}
320
321// FailoverManager handles automatic failovers
322type FailoverManager struct {
323    storage         *MultiRegionStorage
324    failoverHistory []FailoverEvent
325    mu              sync.Mutex
326}
327
328type FailoverEvent struct {
329    Timestamp   time.Time
330    FromRegion  string
331    ToRegion    string
332    Reason      string
333}
334
335func NewFailoverManager(storage *MultiRegionStorage) *FailoverManager {
336    return &FailoverManager{
337        storage:         storage,
338        failoverHistory: make([]FailoverEvent, 0),
339    }
340}
341
342func (fm *FailoverManager) TriggerFailover(fromRegion, toRegion, reason string) error {
343    fm.mu.Lock()
344    defer fm.mu.Unlock()
345
346    fm.storage.mu.Lock()
347    defer fm.storage.mu.Unlock()
348
349    targetRegion := fm.storage.regions[toRegion]
350    if targetRegion == nil || !targetRegion.IsHealthy {
351        return fmt.Errorf("target region %s not available", toRegion)
352    }
353
354    // Update primary
355    oldPrimary := fm.storage.regions[fm.storage.primary]
356    if oldPrimary != nil {
357        oldPrimary.IsPrimary = false
358    }
359
360    fm.storage.primary = toRegion
361    targetRegion.IsPrimary = true
362
363    // Record failover
364    event := FailoverEvent{
365        Timestamp:  time.Now(),
366        FromRegion: fromRegion,
367        ToRegion:   toRegion,
368        Reason:     reason,
369    }
370    fm.failoverHistory = append(fm.failoverHistory, event)
371
372    fmt.Printf("Failover: %s -> %s (reason: %s)\n", fromRegion, toRegion, reason)
373
374    return nil
375}
376
377func (fm *FailoverManager) GetFailoverHistory() []FailoverEvent {
378    fm.mu.Lock()
379    defer fm.mu.Unlock()
380    return fm.failoverHistory
381}
382
383// Example usage
384func main() {
385    storage := NewMultiRegionStorage("us-east-1", ReplicationAsync)
386
387    // Add regions
388    storage.AddRegion(&RegionConfig{
389        Name:     "us-east-1",
390        Storage:  &MockStorage{},
391        Endpoint: "https://us-east-1.storage.example.com",
392    })
393
394    storage.AddRegion(&RegionConfig{
395        Name:     "us-west-2",
396        Storage:  &MockStorage{},
397        Endpoint: "https://us-west-2.storage.example.com",
398    })
399
400    storage.AddRegion(&RegionConfig{
401        Name:     "eu-west-1",
402        Storage:  &MockStorage{},
403        Endpoint: "https://eu-west-1.storage.example.com",
404    })
405
406    ctx := context.Background()
407
408    // Upload
409    err := storage.Upload(ctx, "example.jpg", []byte("data"))
410    if err != nil {
411        fmt.Printf("Upload failed: %v\n", err)
412    } else {
413        fmt.Println("Upload successful with multi-region replication")
414    }
415
416    // Download from best region
417    data, err := storage.Download(ctx, "example.jpg")
418    if err != nil {
419        fmt.Printf("Download failed: %v\n", err)
420    } else {
421        fmt.Printf("Downloaded %d bytes from optimal region\n", len(data))
422    }
423}
424
425type MockStorage struct{}
426
427func (m *MockStorage) Get(ctx context.Context, key string) ([]byte, error) {
428    return []byte("mock data"), nil
429}
430
431func (m *MockStorage) Put(ctx context.Context, key string, data []byte) error {
432    return nil
433}

Key Learning Points:

  1. Multi-Region Architecture: Manages storage across geographic regions with automatic replication
  2. Intelligent Region Selection: Chooses optimal region based on latency, error rates, and health
  3. Automatic Failover: Detects region failures and switches to healthy regions seamlessly
  4. Replication Strategies: Supports sync, async, and primary-only replication modes
  5. Health Monitoring: Continuous health checks with automatic circuit breaking
  6. Production Patterns: Comprehensive error handling, metrics tracking, and observability

Further Reading

Summary

Key Takeaways

🎯 Core Concepts to Master:

  1. Object Storage Fundamentals: Flat namespace, HTTP-based access, infinite scalability
  2. Storage Classes: Choose Standard, IA, or Glacier based on access patterns and cost requirements
  3. Direct Browser Uploads: Use presigned URLs to eliminate server bandwidth bottlenecks
  4. Multipart Uploads: Handle large files efficiently with parallel uploads and resume capability
  5. Multi-Cloud Strategies: Implement redundancy across providers for maximum reliability
  6. Security Practices: Encryption, access control, and secure URL generation
  7. Cost Optimization: Lifecycle policies, storage class selection, and data transfer management

Production Best Practices

⚠️ Critical Success Factors:

  1. Always use presigned URLs for direct client uploads—reduces server load and improves user experience
  2. Implement automatic lifecycle policies—data accumulates and costs spiral without automated management
  3. Choose storage classes wisely—match access patterns to cost optimization
  4. Enable encryption by default—protect data at rest with server-side or client-side encryption
  5. Monitor costs and usage proactively—surprises are expensive at cloud scale
  6. Implement robust retry logic with exponential backoff—network failures are inevitable
  7. Use CDN integration for frequently accessed content—dramatically improves user experience
  8. Design for multi-cloud—avoid vendor lock-in and ensure business continuity

When to Use Cloud Object Storage

  • Unstructured data: Images, videos, documents, backups, log files
  • Large-scale storage: Terabytes to petabytes of data with predictable access patterns
  • Global distribution: Content that needs to be accessible worldwide with low latency
  • Archive requirements: Long-term retention with compliance needs and automatic lifecycle management
  • Variable workloads: Applications with unpredictable growth patterns requiring elastic scalability
  • Data lakes: Analytics and machine learning datasets requiring massively parallel processing

When NOT to Use Cloud Object Storage

  • Transactional databases: Need ACID properties, low-latency random access, and complex queries
  • Frequently modified files: Each modification creates new versions and increases costs
  • Real-time processing: High latency compared to local storage or databases
  • File system operations: No support for directories, renames, appends, or locking
  • Small,频繁 accessed files: Better served by databases or caching layers

Next Steps in Your Learning Journey

  1. Study CDN integration: Learn how CloudFront, Cloud CDN, and Azure CDN integrate with object storage
  2. Explore data processing: Understand how to process data stored in object storage using services like AWS Lambda
  3. Master monitoring and cost management: Learn tools and techniques for optimizing cloud storage costs
  4. Implement advanced security: Study IAM policies, VPC endpoints, and access control patterns
  5. Build real applications: Apply these patterns to build actual production systems

Remember: Cloud object storage is not just a file replacement—it's a completely different paradigm designed for massive scale, global distribution, and cost-effective storage of unstructured data. Master these patterns to build the next generation of cloud-native applications.