Why This Matters - The Data Backbone of Modern Applications
Every time you upload a photo to Instagram, stream a movie on Netflix, or save a document to Dropbox, you're using cloud object storage. This technology has revolutionized how we store and access data at scale, making it possible to build applications that serve billions of users without managing physical storage infrastructure.
Real-World Impact: Companies like Spotify and Netflix store petabytes of media files in object storage, serving millions of concurrent users while paying only for the storage they actually use. This scalability and cost-effectiveness has enabled entirely new business models and transformed digital content delivery.
The Engineering Challenge: Traditional file systems can't handle modern application requirements. When you need to store billions of objects, serve them globally with low latency, and ensure 99.999999999% durability, you need a fundamentally different approach. Object storage provides exactly that - a massive, infinitely scalable digital warehouse where every item has a unique address and can be retrieved instantly from anywhere in the world.
Learning Objectives
By the end of this tutorial, you will be able to:
🎯 Core Understanding: Explain how object storage differs from traditional file systems and when to use each
🏗️ Implementation Skills: Build production-ready storage clients for AWS S3, Google Cloud Storage, and Azure Blob Storage
🔧 Advanced Patterns: Implement multipart uploads, presigned URLs, versioning, and lifecycle management
🚀 Production Mastery: Design scalable, secure, and cost-effective storage solutions for real-world applications
Core Concepts - Understanding Object Storage
Object storage represents a paradigm shift from traditional file systems. Think of it as the difference between a local library and Amazon's global fulfillment network.
Key Architectural Differences
Traditional File Systems:
- Hierarchical structure with folders and files
- Limited by physical disk capacity
- Designed for frequent modifications
- Local or network-attached access only
Object Storage:
- Flat namespace with unique object keys
- Virtually infinite scalability
- Optimized for write-once, read-many patterns
- Global access via HTTP/REST APIs
The Object Model:
1// Object Storage Structure
2type Object struct {
3 Key string // Unique identifier
4 Data io.Reader // The actual content
5 Metadata map[string]string // Custom key-value pairs
6 ContentType string // MIME type for the content
7 StorageClass string // Performance/cost tier
8 ETag string // Content hash for integrity
9 LastModified time.Time // Modification timestamp
10}
Storage Classes: The Performance-Cost Spectrum
Cloud providers offer different storage classes optimized for different access patterns:
Frequent Access: Low latency, high cost - for active user content
Infrequent Access: Higher latency, lower cost - for backups, archives
Archive: High latency, very low cost - for compliance data
Cost Impact Example: Storing 1TB for 1 year:
- Standard: ~$23/month
- Infrequent Access: ~$12/month
- Glacier: ~$4/month
Choosing the right storage class can save 80%+ on storage costs while maintaining required performance.
Practical Examples - Building Production Storage Systems
Let's start with a simple S3 client and progressively add production features.
Basic S3 Operations
1// run
2package main
3
4import (
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "log"
10 "time"
11
12 "github.com/aws/aws-sdk-go-v2/aws"
13 "github.com/aws/aws-sdk-go-v2/config"
14 "github.com/aws/aws-sdk-go-v2/service/s3"
15 "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
16)
17
18// S3Client provides a simplified interface for S3 operations
19type S3Client struct {
20 client *s3.Client
21 uploader *manager.Uploader
22 downloader *manager.Downloader
23 bucket string
24}
25
26// NewS3Client creates a new S3 client with production defaults
27func NewS3Client(ctx context.Context, bucket string) {
28 // Load AWS configuration from environment/credentials
29 cfg, err := config.LoadDefaultConfig(ctx)
30 if err != nil {
31 return nil, fmt.Errorf("failed to load AWS config: %w", err)
32 }
33
34 client := s3.NewFromConfig(cfg)
35
36 // Configure uploader with optimized settings for production
37 uploader := manager.NewUploader(client, func(u *manager.Uploader) {
38 u.PartSize = 16 * 1024 * 1024 // 16MB parts for better performance
39 u.Concurrency = 5 // Parallel uploads
40 })
41
42 // Configure downloader with optimized settings
43 downloader := manager.NewDownloader(client, func(d *manager.Downloader) {
44 d.PartSize = 16 * 1024 * 1024
45 d.Concurrency = 5
46 })
47
48 return &S3Client{
49 client: client,
50 uploader: uploader,
51 downloader: downloader,
52 bucket: bucket,
53 }, nil
54}
55
56// Upload stores data in S3 with automatic multipart handling
57func Upload(ctx context.Context, key string, data io.Reader) error {
58 input := &s3.PutObjectInput{
59 Bucket: aws.String(s.bucket),
60 Key: aws.String(key),
61 Body: data,
62 ContentType: aws.String("application/octet-stream"),
63 }
64
65 // Use multipart uploader for files > 16MB
66 _, err := s.uploader.Upload(ctx, input)
67 if err != nil {
68 return fmt.Errorf("upload failed for key %s: %w", key, err)
69 }
70
71 return nil
72}
73
74// Download retrieves data from S3 with automatic multipart handling
75func Download(ctx context.Context, key string) {
76 // Use write-at buffer for memory efficiency
77 buf := manager.NewWriteAtBuffer([]byte{})
78
79 _, err := s.downloader.Download(ctx, buf, &s3.GetObjectInput{
80 Bucket: aws.String(s.bucket),
81 Key: aws.String(key),
82 })
83 if err != nil {
84 return nil, fmt.Errorf("download failed for key %s: %w", key, err)
85 }
86
87 return buf.Bytes(), nil
88}
89
90// List objects with pagination support
91func List(ctx context.Context, prefix string) {
92 var objects []string
93
94 paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
95 Bucket: aws.String(s.bucket),
96 Prefix: aws.String(prefix),
97 })
98
99 for paginator.HasMorePages() {
100 page, err := paginator.NextPage(ctx)
101 if err != nil {
102 return nil, fmt.Errorf("failed to list objects: %w", err)
103 }
104
105 for _, obj := range page.Contents {
106 objects = append(objects, *obj.Key)
107 }
108 }
109
110 return objects, nil
111}
112
113func main() {
114 ctx := context.Background()
115
116 // Initialize S3 client
117 client, err := NewS3Client(ctx, "my-production-bucket")
118 if err != nil {
119 log.Fatal("Failed to create S3 client:", err)
120 }
121
122 // Example: Upload a file
123 data := bytes.NewReader([]byte("Hello, Cloud Storage!"))
124 err = client.Upload(ctx, "examples/hello.txt", data)
125 if err != nil {
126 log.Fatal("Upload failed:", err)
127 }
128 fmt.Println("✅ File uploaded successfully")
129
130 // Example: List files
131 files, err := client.List(ctx, "examples/")
132 if err != nil {
133 log.Fatal("List failed:", err)
134 }
135 fmt.Printf("✅ Found %d files in examples/\n", len(files))
136
137 // Example: Download a file
138 content, err := client.Download(ctx, "examples/hello.txt")
139 if err != nil {
140 log.Fatal("Download failed:", err)
141 }
142 fmt.Printf("✅ Downloaded content: %s\n", string(content))
143}
Enhanced S3 Client with Production Features
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "net/url"
8 "time"
9
10 "github.com/aws/aws-sdk-go-v2/aws"
11 "github.com/aws/aws-sdk-go-v2/service/s3"
12 "github.com/aws/aws-sdk-go-v2/service/s3/types"
13)
14
15// ProductionS3Client extends basic functionality with enterprise features
16type ProductionS3Client struct {
17 *S3Client
18}
19
20// UploadOptions configures upload behavior
21type UploadOptions struct {
22 ContentType string
23 CacheControl string
24 StorageClass types.StorageClass
25 Metadata map[string]string
26 Encrypt bool
27}
28
29// UploadWithOptions provides enhanced upload capabilities
30func UploadWithOptions(ctx context.Context, key string, data io.Reader, opts UploadOptions) error {
31 input := &s3.PutObjectInput{
32 Bucket: aws.String(p.bucket),
33 Key: aws.String(key),
34 Body: data,
35 ContentType: aws.String(opts.ContentType),
36 }
37
38 // Set storage class if specified
39 if opts.StorageClass != "" {
40 input.StorageClass = opts.StorageClass
41 }
42
43 // Set cache control if specified
44 if opts.CacheControl != "" {
45 input.CacheControl = aws.String(opts.CacheControl)
46 }
47
48 // Set metadata if provided
49 if len(opts.Metadata) > 0 {
50 input.Metadata = opts.Metadata
51 }
52
53 // Enable server-side encryption if requested
54 if opts.Encrypt {
55 input.ServerSideEncryption = types.ServerSideEncryptionAes256
56 }
57
58 _, err := p.uploader.Upload(ctx, input)
59 return err
60}
61
62// GeneratePresignedURL creates a temporary URL for direct access
63func GeneratePresignedURL(ctx context.Context, key string, expiry time.Duration) {
64 presignClient := s3.NewPresignClient(p.client)
65
66 request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
67 Bucket: aws.String(p.bucket),
68 Key: aws.String(key),
69 }, s3.WithPresignExpires(expiry))
70
71 if err != nil {
72 return "", fmt.Errorf("failed to generate presigned URL: %w", err)
73 }
74
75 return request.URL, nil
76}
77
78// GeneratePresignedUploadURL creates a URL for direct browser uploads
79func GeneratePresignedUploadURL(ctx context.Context, key string, expiry time.Duration) {
80 presignClient := s3.NewPresignClient(p.client)
81
82 request, err := presignClient.PresignPutObject(ctx, &s3.PutObjectInput{
83 Bucket: aws.String(p.bucket),
84 Key: aws.String(key),
85 }, s3.WithPresignExpires(expiry))
86
87 if err != nil {
88 return "", fmt.Errorf("failed to generate presigned upload URL: %w", err)
89 }
90
91 return request.URL, nil
92}
93
94// CopyObject creates a copy of an object within S3
95func CopyObject(ctx context.Context, sourceKey, destKey string) error {
96 copySource := fmt.Sprintf("%s/%s", p.bucket, sourceKey)
97
98 _, err := p.client.CopyObject(ctx, &s3.CopyObjectInput{
99 Bucket: aws.String(p.bucket),
100 CopySource: aws.String(copySource),
101 Key: aws.String(destKey),
102 })
103
104 return err
105}
106
107// DeleteObjects efficiently deletes multiple objects
108func DeleteObjects(ctx context.Context, keys []string) error {
109 if len(keys) == 0 {
110 return nil
111 }
112
113 // S3 allows deleting up to 1000 objects per request
114 const batchSize = 1000
115
116 for i := 0; i < len(keys); i += batchSize {
117 end := i + batchSize
118 if end > len(keys) {
119 end = len(keys)
120 }
121
122 batch := keys[i:end]
123 objects := make([]types.ObjectIdentifier, len(batch))
124
125 for j, key := range batch {
126 objects[j] = types.ObjectIdentifier{Key: aws.String(key)}
127 }
128
129 _, err := p.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
130 Bucket: aws.String(p.bucket),
131 Delete: &types.Delete{
132 Objects: objects,
133 },
134 })
135
136 if err != nil {
137 return fmt.Errorf("failed to delete batch %d-%d: %w", i, end-1, err)
138 }
139 }
140
141 return nil
142}
143
144func main() {
145 ctx := context.Background()
146
147 // Initialize production S3 client
148 client, err := NewS3Client(ctx, "my-production-bucket")
149 if err != nil {
150 panic(err)
151 }
152
153 prodClient := &ProductionS3Client{S3Client: client}
154
155 // Example: Upload with options
156 data := bytes.NewReader([]byte("Production file with metadata"))
157 err = prodClient.UploadWithOptions(ctx, "production/example.txt", data, UploadOptions{
158 ContentType: "text/plain",
159 CacheControl: "max-age=3600",
160 StorageClass: types.StorageClassStandardIa, // Infrequent access
161 Metadata: map[string]string{
162 "environment": "production",
163 "owner": "data-team",
164 },
165 Encrypt: true,
166 })
167 if err != nil {
168 panic(err)
169 }
170 fmt.Println("✅ File uploaded with production options")
171
172 // Example: Generate presigned URL
173 url, err := prodClient.GeneratePresignedURL(ctx, "production/example.txt", 15*time.Minute)
174 if err != nil {
175 panic(err)
176 }
177 fmt.Printf("✅ Presigned URL: %s\n", url)
178
179 // Example: Generate presigned upload URL
180 uploadURL, err := prodClient.GeneratePresignedUploadURL(ctx, "uploads/user-file.jpg", 1*time.Hour)
181 if err != nil {
182 panic(err)
183 }
184 fmt.Printf("✅ Presigned upload URL: %s\n", uploadURL)
185}
Common Patterns and Pitfalls - Production Experience
Pattern 1: Direct Browser Uploads
Problem: Handling file uploads through your server creates bandwidth costs and scaling bottlenecks.
Solution: Use presigned URLs to let browsers upload directly to cloud storage.
1// DirectUploadHandler generates presigned URLs for browser uploads
2func DirectUploadHandler(w http.ResponseWriter, r *http.Request) {
3 // Generate unique key with timestamp and random ID
4 key := fmt.Sprintf("uploads/%d/%s",
5 time.Now().Unix(),
6 generateSecureID())
7
8 // Generate presigned upload URL
9 uploadURL, err := h.storage.GeneratePresignedUploadURL(r.Context(), key, time.Hour)
10 if err != nil {
11 http.Error(w, "Failed to generate upload URL", http.StatusInternalServerError)
12 return
13 }
14
15 // Return upload configuration to browser
16 response := map[string]interface{}{
17 "upload_url": uploadURL,
18 "key": key,
19 "expires_at": time.Now().Add(time.Hour),
20 "max_size": 100 * 1024 * 1024, // 100MB limit
21 }
22
23 w.Header().Set("Content-Type", "application/json")
24 json.NewEncoder(w).Encode(response)
25}
Pattern 2: Intelligent Storage Class Selection
Problem: Storing all data in Standard storage is expensive and unnecessary.
Solution: Automatically move data to cheaper storage classes based on access patterns.
1// IntelligentStorageManager handles storage class transitions
2type IntelligentStorageManager struct {
3 client *ProductionS3Client
4}
5
6// AutoClassify determines optimal storage class based on access patterns
7func AutoClassify(ctx context.Context, key string) error {
8 // Get object metadata
9 metadata, err := m.client.GetObjectMetadata(ctx, key)
10 if err != nil {
11 return err
12 }
13
14 // Implement your business logic here
15 daysSinceCreation := time.Since(metadata.LastModified).Hours() / 24
16 accessFrequency := m.getAccessFrequency(key)
17
18 var newClass types.StorageClass
19
20 switch {
21 case daysSinceCreation < 30 && accessFrequency > 10:
22 newClass = types.StorageClassStandard
23 case daysSinceCreation < 90 && accessFrequency > 1:
24 newClass = types.StorageClassStandardIa
25 case daysSinceCreation < 365:
26 newClass = types.StorageClassGlacier
27 default:
28 newClass = types.StorageClassDeepArchive
29 }
30
31 // Only transition if class would change
32 if metadata.StorageClass == newClass {
33 return nil
34 }
35
36 return m.client.TransitionObject(ctx, key, newClass)
37}
Common Pitfalls to Avoid
❌ Ignoring Data Transfer Costs: Moving data between regions or out of cloud storage can be expensive. Always check data transfer pricing.
❌ Forgetting Lifecycle Policies: Old data accumulates and costs spiral. Implement automatic deletion or archival.
❌ Using Wrong Storage Class: Storing frequently accessed data in Glacier class leads to high retrieval costs and delays.
❌ Not Implementing Retries: Network failures are common. Implement exponential backoff retries.
❌ Ignoring Security: Never make objects public by default. Use presigned URLs for temporary access.
Integration and Mastery - Building Real-World Systems
Real-World Example: Social Media Storage System
Let's build a complete storage system like those used by Instagram and TikTok:
1// run
2package main
3
4import (
5 "context"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "io"
10 "net/http"
11 "path/filepath"
12 "strings"
13 "time"
14
15 "github.com/aws/aws-sdk-go-v2/service/s3/types"
16)
17
18// MediaStorageSystem handles all storage operations for a social media platform
19type MediaStorageSystem struct {
20 client *ProductionS3Client
21 imageProcessor *ImageProcessor
22 cdnManager *CDNManager
23}
24
25// NewMediaStorageSystem creates a complete media storage solution
26func NewMediaStorageSystem(bucket string) {
27 ctx := context.Background()
28
29 s3Client, err := NewS3Client(ctx, bucket)
30 if err != nil {
31 return nil, err
32 }
33
34 prodClient := &ProductionS3Client{S3Client: s3Client}
35
36 return &MediaStorageSystem{
37 client: prodClient,
38 imageProcessor: NewImageProcessor(),
39 cdnManager: NewCDNManager(bucket),
40 }, nil
41}
42
43// UploadMedia handles complete media upload workflow
44func UploadMedia(ctx context.Context, userID string,
45 file io.Reader, filename, contentType string) {
46
47 // Generate secure object key
48 objectKey := m.generateObjectKey(userID, filename)
49
50 // Determine optimal storage class based on content type
51 storageClass := m.determineStorageClass(contentType)
52
53 // Upload original file
54 err := m.client.UploadWithOptions(ctx, objectKey, file, UploadOptions{
55 ContentType: contentType,
56 StorageClass: storageClass,
57 Metadata: map[string]string{
58 "user_id": userID,
59 "filename": filename,
60 "uploaded": time.Now().Format(time.RFC3339),
61 },
62 Encrypt: true,
63 })
64 if err != nil {
65 return nil, fmt.Errorf("upload failed: %w", err)
66 }
67
68 // Process image if it's an image file
69 if strings.HasPrefix(contentType, "image/") {
70 go m.processImage(ctx, objectKey, contentType)
71 }
72
73 // Generate CDN URLs
74 cdnURLs := m.cdnManager.GetURLs(objectKey)
75
76 return &MediaUploadResult{
77 ObjectKey: objectKey,
78 CDNURLs: cdnURLs,
79 StorageClass: storageClass,
80 UploadedAt: time.Now(),
81 }, nil
82}
83
84// processImage creates multiple sizes for images
85func processImage(ctx context.Context, originalKey, contentType string) {
86 // Download original
87 originalData, err := m.client.Download(ctx, originalKey)
88 if err != nil {
89 log.Printf("Failed to download original for processing: %v", err)
90 return
91 }
92
93 // Generate different sizes
94 sizes := []struct {
95 name string
96 size string
97 class types.StorageClass
98 }{
99 {"thumbnail", "200x200", types.StorageClassStandard}, // Hot data
100 {"medium", "800x600", types.StorageClassStandardIa}, // Warm data
101 {"large", "1920x1080", types.StorageClassGlacier}, // Cold data
102 }
103
104 for _, sizeInfo := range sizes {
105 resizedData, err := m.imageProcessor.Resize(originalData, sizeInfo.size)
106 if err != nil {
107 log.Printf("Failed to resize image for %s: %v", sizeInfo.name, err)
108 continue
109 }
110
111 // Upload resized version
112 resizedKey := fmt.Sprintf("%s.%s",
113 strings.TrimSuffix(originalKey, filepath.Ext(originalKey)),
114 sizeInfo.name)
115
116 err = m.client.UploadWithOptions(ctx, resizedKey,
117 bytes.NewReader(resizedData), UploadOptions{
118 ContentType: contentType,
119 StorageClass: sizeInfo.class,
120 Metadata: map[string]string{
121 "derived_from": originalKey,
122 "size": sizeInfo.name,
123 },
124 })
125 if err != nil {
126 log.Printf("Failed to upload resized image %s: %v", sizeInfo.name, err)
127 }
128 }
129}
130
131// GenerateObjectKey creates a secure, unique object key
132func generateObjectKey(userID, filename string) string {
133 timestamp := time.Now().Format("2006/01/02")
134 hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%s-%d",
135 userID, filename, time.Now().UnixNano())))
136
137 return fmt.Sprintf("users/%s/%s/%s%s",
138 userID,
139 timestamp,
140 hex.EncodeToString(hash[:8]),
141 filepath.Ext(filename))
142}
143
144// determineStorageClass selects optimal storage based on content type
145func determineStorageClass(contentType string) types.StorageClass {
146 switch {
147 case strings.HasPrefix(contentType, "image/"), strings.HasPrefix(contentType, "video/"):
148 return types.StorageClassStandardIa // Media files typically infrequent access
149 case strings.HasPrefix(contentType, "text/"):
150 return types.StorageClassStandard // Text files often accessed
151 default:
152 return types.StorageClassStandard
153 }
154}
155
156// MediaUploadResult contains upload metadata
157type MediaUploadResult struct {
158 ObjectKey string `json:"object_key"`
159 CDNURLs []string `json:"cdn_urls"`
160 StorageClass string `json:"storage_class"`
161 UploadedAt time.Time `json:"uploaded_at"`
162}
163
164func main() {
165 // Example usage
166 storage, err := NewMediaStorageSystem("social-media-bucket")
167 if err != nil {
168 panic(err)
169 }
170
171 // Simulate uploading a user photo
172 fileData := strings.NewReader("fake image data for demonstration")
173
174 result, err := storage.UploadMedia(context.Background(),
175 "user123", fileData, "profile.jpg", "image/jpeg")
176 if err != nil {
177 panic(err)
178 }
179
180 fmt.Printf("✅ Media uploaded successfully:\n")
181 fmt.Printf(" Object Key: %s\n", result.ObjectKey)
182 fmt.Printf(" Storage Class: %s\n", result.StorageClass)
183 fmt.Printf(" CDN URLs: %v\n", result.CDNURLs)
184}
Advanced Features: Multi-Cloud Storage
For maximum reliability, implement multi-cloud storage with automatic failover:
1// MultiCloudStorage provides redundancy across cloud providers
2type MultiCloudStorage struct {
3 primary StorageProvider
4 secondary StorageProvider
5 config MultiCloudConfig
6}
7
8type StorageProvider interface {
9 Upload(ctx context.Context, key string, data io.Reader) error
10 Download(ctx context.Context, key string)
11 Exists(ctx context.Context, key string)
12}
13
14// Upload with replication to secondary provider
15func Upload(ctx context.Context, key string, data io.Reader) error {
16 // Upload to primary
17 err := m.primary.Upload(ctx, key, data)
18 if err != nil {
19 return fmt.Errorf("primary upload failed: %w", err)
20 }
21
22 // Replicate to secondary in background
23 go func() {
24 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
25 defer cancel()
26
27 // Read data for secondary upload
28 data, err := m.primary.Download(ctx, key)
29 if err != nil {
30 log.Printf("Failed to read from primary for replication: %v", err)
31 return
32 }
33
34 // Upload to secondary
35 if err := m.secondary.Upload(ctx, key, bytes.NewReader(data)); err != nil {
36 log.Printf("Failed to replicate to secondary: %v", err)
37 }
38 }()
39
40 return nil
41}
42
43// Download with fallback to secondary
44func Download(ctx context.Context, key string) {
45 // Try primary first
46 data, err := m.primary.Download(ctx, key)
47 if err == nil {
48 return data, nil
49 }
50
51 log.Printf("Primary download failed, trying secondary: %v", err)
52
53 // Fallback to secondary
54 return m.secondary.Download(ctx, key)
55}
Advanced Multi-Cloud Storage Strategies
Cloud Provider Abstraction Layer
Building a unified interface across different cloud providers enables flexibility and reduces vendor lock-in:
1package storage
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8)
9
10// UnifiedStorage provides cloud-agnostic storage interface
11type UnifiedStorage interface {
12 Upload(ctx context.Context, bucket, key string, data io.Reader, opts *UploadOptions) error
13 Download(ctx context.Context, bucket, key string) ([]byte, error)
14 Delete(ctx context.Context, bucket, key string) error
15 List(ctx context.Context, bucket, prefix string, limit int) ([]ObjectInfo, error)
16 Copy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string) error
17 GetMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error)
18 GeneratePresignedURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
19}
20
21type UploadOptions struct {
22 ContentType string
23 ContentEncoding string
24 CacheControl string
25 Metadata map[string]string
26 StorageClass string
27 ServerSideEncryption bool
28}
29
30type ObjectInfo struct {
31 Key string
32 Size int64
33 LastModified time.Time
34 ETag string
35 StorageClass string
36}
37
38type ObjectMetadata struct {
39 ContentType string
40 ContentLength int64
41 LastModified time.Time
42 ETag string
43 Metadata map[string]string
44 StorageClass string
45 VersionID string
46}
47
48// MultiCloudStorage manages multiple storage providers with failover
49type MultiCloudStorage struct {
50 primary UnifiedStorage
51 secondary UnifiedStorage
52 tertiary UnifiedStorage
53 strategy ReplicationStrategy
54 monitor *StorageMonitor
55}
56
57type ReplicationStrategy string
58
59const (
60 StrategyPrimaryOnly ReplicationStrategy = "primary_only"
61 StrategyAsyncReplicate ReplicationStrategy = "async_replicate"
62 StrategySyncReplicate ReplicationStrategy = "sync_replicate"
63 StrategyReadFailover ReplicationStrategy = "read_failover"
64)
65
66func NewMultiCloudStorage(primary, secondary, tertiary UnifiedStorage, strategy ReplicationStrategy) *MultiCloudStorage {
67 return &MultiCloudStorage{
68 primary: primary,
69 secondary: secondary,
70 tertiary: tertiary,
71 strategy: strategy,
72 monitor: NewStorageMonitor(),
73 }
74}
75
76// Upload with intelligent replication based on strategy
77func (m *MultiCloudStorage) Upload(ctx context.Context, bucket, key string, data io.Reader, opts *UploadOptions) error {
78 // For replication, we need to buffer the data
79 var buf *bytes.Buffer
80 if m.strategy == StrategySyncReplicate || m.strategy == StrategyAsyncReplicate {
81 buf = &bytes.Buffer{}
82 data = io.TeeReader(data, buf)
83 }
84
85 // Upload to primary
86 start := time.Now()
87 err := m.primary.Upload(ctx, bucket, key, data, opts)
88 m.monitor.RecordOperation("primary", "upload", time.Since(start), err)
89
90 if err != nil {
91 return fmt.Errorf("primary upload failed: %w", err)
92 }
93
94 // Handle replication based on strategy
95 switch m.strategy {
96 case StrategySyncReplicate:
97 // Synchronous replication to all providers
98 var errs []error
99
100 if m.secondary != nil {
101 if err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(buf.Bytes()), opts); err != nil {
102 errs = append(errs, fmt.Errorf("secondary: %w", err))
103 }
104 }
105
106 if m.tertiary != nil {
107 if err := m.tertiary.Upload(ctx, bucket, key, bytes.NewReader(buf.Bytes()), opts); err != nil {
108 errs = append(errs, fmt.Errorf("tertiary: %w", err))
109 }
110 }
111
112 if len(errs) > 0 {
113 return fmt.Errorf("replication errors: %v", errs)
114 }
115
116 case StrategyAsyncReplicate:
117 // Asynchronous replication
118 go m.asyncReplicate(bucket, key, buf.Bytes(), opts)
119 }
120
121 return nil
122}
123
124func (m *MultiCloudStorage) asyncReplicate(bucket, key string, data []byte, opts *UploadOptions) {
125 ctx := context.Background()
126
127 if m.secondary != nil {
128 start := time.Now()
129 err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(data), opts)
130 m.monitor.RecordOperation("secondary", "upload", time.Since(start), err)
131 }
132
133 if m.tertiary != nil {
134 start := time.Now()
135 err := m.tertiary.Upload(ctx, bucket, key, bytes.NewReader(data), opts)
136 m.monitor.RecordOperation("tertiary", "upload", time.Since(start), err)
137 }
138}
139
140// Download with automatic failover
141func (m *MultiCloudStorage) Download(ctx context.Context, bucket, key string) ([]byte, error) {
142 // Try primary first
143 start := time.Now()
144 data, err := m.primary.Download(ctx, bucket, key)
145 m.monitor.RecordOperation("primary", "download", time.Since(start), err)
146
147 if err == nil {
148 return data, nil
149 }
150
151 // Failover to secondary
152 if m.secondary != nil {
153 start = time.Now()
154 data, err = m.secondary.Download(ctx, bucket, key)
155 m.monitor.RecordOperation("secondary", "download", time.Since(start), err)
156
157 if err == nil {
158 // Async heal: restore to primary
159 go m.healPrimary(bucket, key, data)
160 return data, nil
161 }
162 }
163
164 // Failover to tertiary
165 if m.tertiary != nil {
166 start = time.Now()
167 data, err = m.tertiary.Download(ctx, bucket, key)
168 m.monitor.RecordOperation("tertiary", "download", time.Since(start), err)
169
170 if err == nil {
171 // Async heal: restore to primary and secondary
172 go m.healAll(bucket, key, data)
173 return data, nil
174 }
175 }
176
177 return nil, fmt.Errorf("all providers failed to download %s/%s", bucket, key)
178}
179
180func (m *MultiCloudStorage) healPrimary(bucket, key string, data []byte) {
181 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
182 defer cancel()
183
184 opts := &UploadOptions{ContentType: "application/octet-stream"}
185 if err := m.primary.Upload(ctx, bucket, key, bytes.NewReader(data), opts); err != nil {
186 log.Printf("Failed to heal primary for %s/%s: %v", bucket, key, err)
187 }
188}
189
190func (m *MultiCloudStorage) healAll(bucket, key string, data []byte) {
191 m.healPrimary(bucket, key, data)
192
193 if m.secondary != nil {
194 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
195 defer cancel()
196
197 opts := &UploadOptions{ContentType: "application/octet-stream"}
198 if err := m.secondary.Upload(ctx, bucket, key, bytes.NewReader(data), opts); err != nil {
199 log.Printf("Failed to heal secondary for %s/%s: %v", bucket, key, err)
200 }
201 }
202}
203
204// GetHealthStatus returns health of all storage providers
205func (m *MultiCloudStorage) GetHealthStatus() map[string]ProviderHealth {
206 return m.monitor.GetProviderHealth()
207}
208
209// StorageMonitor tracks health and performance metrics
210type StorageMonitor struct {
211 metrics map[string]*ProviderMetrics
212 mu sync.RWMutex
213}
214
215type ProviderMetrics struct {
216 TotalOperations int64
217 SuccessCount int64
218 ErrorCount int64
219 TotalLatency time.Duration
220 LastError error
221 LastErrorTime time.Time
222}
223
224type ProviderHealth struct {
225 Provider string
226 IsHealthy bool
227 ErrorRate float64
228 AvgLatency time.Duration
229 LastError string
230 LastErrorTime time.Time
231}
232
233func NewStorageMonitor() *StorageMonitor {
234 return &StorageMonitor{
235 metrics: make(map[string]*ProviderMetrics),
236 }
237}
238
239func (sm *StorageMonitor) RecordOperation(provider, operation string, latency time.Duration, err error) {
240 sm.mu.Lock()
241 defer sm.mu.Unlock()
242
243 key := fmt.Sprintf("%s:%s", provider, operation)
244 if sm.metrics[key] == nil {
245 sm.metrics[key] = &ProviderMetrics{}
246 }
247
248 m := sm.metrics[key]
249 m.TotalOperations++
250 m.TotalLatency += latency
251
252 if err != nil {
253 m.ErrorCount++
254 m.LastError = err
255 m.LastErrorTime = time.Now()
256 } else {
257 m.SuccessCount++
258 }
259}
260
261func (sm *StorageMonitor) GetProviderHealth() map[string]ProviderHealth {
262 sm.mu.RLock()
263 defer sm.mu.RUnlock()
264
265 health := make(map[string]ProviderHealth)
266
267 for key, metrics := range sm.metrics {
268 parts := strings.Split(key, ":")
269 provider := parts[0]
270
271 if _, exists := health[provider]; !exists {
272 errorRate := 0.0
273 if metrics.TotalOperations > 0 {
274 errorRate = float64(metrics.ErrorCount) / float64(metrics.TotalOperations)
275 }
276
277 avgLatency := time.Duration(0)
278 if metrics.SuccessCount > 0 {
279 avgLatency = metrics.TotalLatency / time.Duration(metrics.SuccessCount)
280 }
281
282 lastError := ""
283 if metrics.LastError != nil {
284 lastError = metrics.LastError.Error()
285 }
286
287 health[provider] = ProviderHealth{
288 Provider: provider,
289 IsHealthy: errorRate < 0.1, // Less than 10% error rate
290 ErrorRate: errorRate,
291 AvgLatency: avgLatency,
292 LastError: lastError,
293 LastErrorTime: metrics.LastErrorTime,
294 }
295 }
296 }
297
298 return health
299}
Performance Optimization Techniques
Intelligent Caching Strategy
Implement multi-tier caching to reduce cloud storage costs and improve performance:
1// run
2package main
3
4import (
5 "bytes"
6 "context"
7 "crypto/sha256"
8 "encoding/hex"
9 "fmt"
10 "io"
11 "sync"
12 "time"
13)
14
15// CachedStorage wraps cloud storage with intelligent caching
16type CachedStorage struct {
17 backend CloudStorage
18 memCache *MemoryCache
19 diskCache *DiskCache
20 config CacheConfig
21}
22
23type CacheConfig struct {
24 MemoryCacheSize int64 // Bytes
25 DiskCacheSize int64 // Bytes
26 MemoryTTL time.Duration
27 DiskTTL time.Duration
28 CacheableMinSize int64 // Don't cache files smaller than this
29 CacheableMaxSize int64 // Don't cache files larger than this
30}
31
32type CloudStorage interface {
33 Get(ctx context.Context, key string) ([]byte, error)
34 Put(ctx context.Context, key string, data []byte) error
35}
36
37func NewCachedStorage(backend CloudStorage, config CacheConfig) *CachedStorage {
38 return &CachedStorage{
39 backend: backend,
40 memCache: NewMemoryCache(config.MemoryCacheSize),
41 diskCache: NewDiskCache(config.DiskCacheSize),
42 config: config,
43 }
44}
45
46// Get retrieves data with multi-tier caching
47func (c *CachedStorage) Get(ctx context.Context, key string) ([]byte, error) {
48 // Try memory cache first
49 if data, found := c.memCache.Get(key); found {
50 return data, nil
51 }
52
53 // Try disk cache
54 if data, found := c.diskCache.Get(key); found {
55 // Promote to memory cache
56 c.memCache.Set(key, data, c.config.MemoryTTL)
57 return data, nil
58 }
59
60 // Fetch from backend storage
61 data, err := c.backend.Get(ctx, key)
62 if err != nil {
63 return nil, err
64 }
65
66 // Cache if size is appropriate
67 size := int64(len(data))
68 if size >= c.config.CacheableMinSize && size <= c.config.CacheableMaxSize {
69 // Cache in memory and disk
70 c.memCache.Set(key, data, c.config.MemoryTTL)
71 c.diskCache.Set(key, data, c.config.DiskTTL)
72 }
73
74 return data, nil
75}
76
77// Put stores data and invalidates cache
78func (c *CachedStorage) Put(ctx context.Context, key string, data []byte) error {
79 // Store in backend
80 if err := c.backend.Put(ctx, key, data); err != nil {
81 return err
82 }
83
84 // Invalidate caches
85 c.memCache.Delete(key)
86 c.diskCache.Delete(key)
87
88 return nil
89}
90
91// MemoryCache implements LRU cache in memory
92type MemoryCache struct {
93 maxSize int64
94 currentSize int64
95 items map[string]*cacheItem
96 lru *lruList
97 mu sync.RWMutex
98}
99
100type cacheItem struct {
101 key string
102 data []byte
103 size int64
104 expiresAt time.Time
105 lruNode *lruNode
106}
107
108type lruList struct {
109 head *lruNode
110 tail *lruNode
111}
112
113type lruNode struct {
114 key string
115 prev *lruNode
116 next *lruNode
117}
118
119func NewMemoryCache(maxSize int64) *MemoryCache {
120 return &MemoryCache{
121 maxSize: maxSize,
122 items: make(map[string]*cacheItem),
123 lru: &lruList{},
124 }
125}
126
127func (mc *MemoryCache) Get(key string) ([]byte, bool) {
128 mc.mu.Lock()
129 defer mc.mu.Unlock()
130
131 item, exists := mc.items[key]
132 if !exists {
133 return nil, false
134 }
135
136 // Check expiration
137 if time.Now().After(item.expiresAt) {
138 mc.deleteItem(key)
139 return nil, false
140 }
141
142 // Move to front of LRU list
143 mc.lru.moveToFront(item.lruNode)
144
145 return item.data, true
146}
147
148func (mc *MemoryCache) Set(key string, data []byte, ttl time.Duration) {
149 mc.mu.Lock()
150 defer mc.mu.Unlock()
151
152 size := int64(len(data))
153
154 // Evict if necessary
155 for mc.currentSize+size > mc.maxSize && len(mc.items) > 0 {
156 // Remove least recently used
157 if mc.lru.tail != nil {
158 mc.deleteItem(mc.lru.tail.key)
159 }
160 }
161
162 // Create new item
163 node := &lruNode{key: key}
164 item := &cacheItem{
165 key: key,
166 data: data,
167 size: size,
168 expiresAt: time.Now().Add(ttl),
169 lruNode: node,
170 }
171
172 mc.items[key] = item
173 mc.currentSize += size
174 mc.lru.addToFront(node)
175}
176
177func (mc *MemoryCache) Delete(key string) {
178 mc.mu.Lock()
179 defer mc.mu.Unlock()
180 mc.deleteItem(key)
181}
182
183func (mc *MemoryCache) deleteItem(key string) {
184 item, exists := mc.items[key]
185 if !exists {
186 return
187 }
188
189 delete(mc.items, key)
190 mc.currentSize -= item.size
191 mc.lru.remove(item.lruNode)
192}
193
194// LRU list operations
195func (l *lruList) addToFront(node *lruNode) {
196 if l.head == nil {
197 l.head = node
198 l.tail = node
199 return
200 }
201
202 node.next = l.head
203 l.head.prev = node
204 l.head = node
205}
206
207func (l *lruList) moveToFront(node *lruNode) {
208 if node == l.head {
209 return
210 }
211
212 // Remove from current position
213 if node.prev != nil {
214 node.prev.next = node.next
215 }
216 if node.next != nil {
217 node.next.prev = node.prev
218 }
219 if node == l.tail {
220 l.tail = node.prev
221 }
222
223 // Add to front
224 node.prev = nil
225 node.next = l.head
226 l.head.prev = node
227 l.head = node
228}
229
230func (l *lruList) remove(node *lruNode) {
231 if node.prev != nil {
232 node.prev.next = node.next
233 } else {
234 l.head = node.next
235 }
236
237 if node.next != nil {
238 node.next.prev = node.prev
239 } else {
240 l.tail = node.prev
241 }
242}
243
244// DiskCache implements disk-based cache
245type DiskCache struct {
246 maxSize int64
247 currentSize int64
248 baseDir string
249 items map[string]*diskCacheItem
250 mu sync.RWMutex
251}
252
253type diskCacheItem struct {
254 key string
255 path string
256 size int64
257 expiresAt time.Time
258}
259
260func NewDiskCache(maxSize int64) *DiskCache {
261 baseDir := filepath.Join(os.TempDir(), "storage_cache")
262 os.MkdirAll(baseDir, 0755)
263
264 return &DiskCache{
265 maxSize: maxSize,
266 baseDir: baseDir,
267 items: make(map[string]*diskCacheItem),
268 }
269}
270
271func (dc *DiskCache) Get(key string) ([]byte, bool) {
272 dc.mu.RLock()
273 item, exists := dc.items[key]
274 dc.mu.RUnlock()
275
276 if !exists {
277 return nil, false
278 }
279
280 // Check expiration
281 if time.Now().After(item.expiresAt) {
282 dc.Delete(key)
283 return nil, false
284 }
285
286 // Read from disk
287 data, err := os.ReadFile(item.path)
288 if err != nil {
289 dc.Delete(key)
290 return nil, false
291 }
292
293 return data, true
294}
295
296func (dc *DiskCache) Set(key string, data []byte, ttl time.Duration) {
297 size := int64(len(data))
298
299 dc.mu.Lock()
300 defer dc.mu.Unlock()
301
302 // Evict if necessary
303 for dc.currentSize+size > dc.maxSize && len(dc.items) > 0 {
304 // Remove oldest item
305 var oldestKey string
306 var oldestTime time.Time
307 for k, v := range dc.items {
308 if oldestTime.IsZero() || v.expiresAt.Before(oldestTime) {
309 oldestKey = k
310 oldestTime = v.expiresAt
311 }
312 }
313 if oldestKey != "" {
314 dc.deleteItem(oldestKey)
315 }
316 }
317
318 // Write to disk
319 hash := sha256.Sum256([]byte(key))
320 filename := hex.EncodeToString(hash[:])
321 path := filepath.Join(dc.baseDir, filename)
322
323 if err := os.WriteFile(path, data, 0644); err != nil {
324 return
325 }
326
327 // Store metadata
328 dc.items[key] = &diskCacheItem{
329 key: key,
330 path: path,
331 size: size,
332 expiresAt: time.Now().Add(ttl),
333 }
334 dc.currentSize += size
335}
336
337func (dc *DiskCache) Delete(key string) {
338 dc.mu.Lock()
339 defer dc.mu.Unlock()
340 dc.deleteItem(key)
341}
342
343func (dc *DiskCache) deleteItem(key string) {
344 item, exists := dc.items[key]
345 if !exists {
346 return
347 }
348
349 os.Remove(item.path)
350 delete(dc.items, key)
351 dc.currentSize -= item.size
352}
353
354func main() {
355 // Example usage
356 backend := &SimpleCloudStorage{}
357 config := CacheConfig{
358 MemoryCacheSize: 100 * 1024 * 1024, // 100MB
359 DiskCacheSize: 1024 * 1024 * 1024, // 1GB
360 MemoryTTL: 5 * time.Minute,
361 DiskTTL: 1 * time.Hour,
362 CacheableMinSize: 1024, // 1KB
363 CacheableMaxSize: 10 * 1024 * 1024, // 10MB
364 }
365
366 storage := NewCachedStorage(backend, config)
367
368 ctx := context.Background()
369
370 // First get - from backend
371 data1, err := storage.Get(ctx, "example-key")
372 if err != nil {
373 fmt.Printf("Error: %v\n", err)
374 } else {
375 fmt.Printf("Retrieved %d bytes (from backend)\n", len(data1))
376 }
377
378 // Second get - from cache
379 data2, err := storage.Get(ctx, "example-key")
380 if err != nil {
381 fmt.Printf("Error: %v\n", err)
382 } else {
383 fmt.Printf("Retrieved %d bytes (from cache)\n", len(data2))
384 }
385}
386
387// Simple backend for demonstration
388type SimpleCloudStorage struct{}
389
390func (s *SimpleCloudStorage) Get(ctx context.Context, key string) ([]byte, error) {
391 return []byte("example data from cloud storage"), nil
392}
393
394func (s *SimpleCloudStorage) Put(ctx context.Context, key string, data []byte) error {
395 return nil
396}
Security and Compliance Best Practices
Encryption and Access Control
Implement comprehensive security for cloud storage:
1package security
2
3import (
4 "crypto/aes"
5 "crypto/cipher"
6 "crypto/rand"
7 "crypto/sha256"
8 "encoding/base64"
9 "fmt"
10 "io"
11 "time"
12
13 "golang.org/x/crypto/pbkdf2"
14)
15
16// SecureStorage wraps cloud storage with encryption and access control
17type SecureStorage struct {
18 backend CloudStorage
19 encryptionKey []byte
20 accessControl *AccessControl
21 auditLog *AuditLog
22}
23
24type AccessControl struct {
25 policies map[string]*AccessPolicy
26 mu sync.RWMutex
27}
28
29type AccessPolicy struct {
30 AllowedUsers []string
31 AllowedRoles []string
32 Permissions []Permission
33 ExpiresAt *time.Time
34}
35
36type Permission string
37
38const (
39 PermissionRead Permission = "read"
40 PermissionWrite Permission = "write"
41 PermissionDelete Permission = "delete"
42)
43
44func NewSecureStorage(backend CloudStorage, masterKey string) *SecureStorage {
45 // Derive encryption key from master key
46 salt := []byte("cloud-storage-salt")
47 encryptionKey := pbkdf2.Key([]byte(masterKey), salt, 100000, 32, sha256.New)
48
49 return &SecureStorage{
50 backend: backend,
51 encryptionKey: encryptionKey,
52 accessControl: NewAccessControl(),
53 auditLog: NewAuditLog(),
54 }
55}
56
57// Upload with encryption
58func (s *SecureStorage) UploadEncrypted(ctx context.Context, bucket, key string, data io.Reader, userID string) error {
59 // Check permissions
60 if !s.accessControl.HasPermission(userID, key, PermissionWrite) {
61 s.auditLog.LogAccess(userID, key, "upload", false, "permission denied")
62 return fmt.Errorf("access denied")
63 }
64
65 // Read data
66 plaintext, err := io.ReadAll(data)
67 if err != nil {
68 return err
69 }
70
71 // Encrypt data
72 ciphertext, err := s.encrypt(plaintext)
73 if err != nil {
74 return err
75 }
76
77 // Upload encrypted data
78 opts := &UploadOptions{
79 ContentType: "application/octet-stream",
80 Metadata: map[string]string{
81 "encrypted": "true",
82 "user_id": userID,
83 "timestamp": time.Now().Format(time.RFC3339),
84 },
85 ServerSideEncryption: true,
86 }
87
88 err = s.backend.Upload(ctx, bucket, key, bytes.NewReader(ciphertext), opts)
89
90 // Audit log
91 s.auditLog.LogAccess(userID, key, "upload", err == nil, "")
92
93 return err
94}
95
96// Download with decryption
97func (s *SecureStorage) DownloadDecrypted(ctx context.Context, bucket, key string, userID string) ([]byte, error) {
98 // Check permissions
99 if !s.accessControl.HasPermission(userID, key, PermissionRead) {
100 s.auditLog.LogAccess(userID, key, "download", false, "permission denied")
101 return nil, fmt.Errorf("access denied")
102 }
103
104 // Download encrypted data
105 ciphertext, err := s.backend.Download(ctx, bucket, key)
106 if err != nil {
107 s.auditLog.LogAccess(userID, key, "download", false, err.Error())
108 return nil, err
109 }
110
111 // Decrypt data
112 plaintext, err := s.decrypt(ciphertext)
113 if err != nil {
114 return nil, err
115 }
116
117 // Audit log
118 s.auditLog.LogAccess(userID, key, "download", true, "")
119
120 return plaintext, nil
121}
122
123// encrypt uses AES-GCM for authenticated encryption
124func (s *SecureStorage) encrypt(plaintext []byte) ([]byte, error) {
125 block, err := aes.NewCipher(s.encryptionKey)
126 if err != nil {
127 return nil, err
128 }
129
130 gcm, err := cipher.NewGCM(block)
131 if err != nil {
132 return nil, err
133 }
134
135 nonce := make([]byte, gcm.NonceSize())
136 if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
137 return nil, err
138 }
139
140 return gcm.Seal(nonce, nonce, plaintext, nil), nil
141}
142
143// decrypt using AES-GCM
144func (s *SecureStorage) decrypt(ciphertext []byte) ([]byte, error) {
145 block, err := aes.NewCipher(s.encryptionKey)
146 if err != nil {
147 return nil, err
148 }
149
150 gcm, err := cipher.NewGCM(block)
151 if err != nil {
152 return nil, err
153 }
154
155 nonceSize := gcm.NonceSize()
156 if len(ciphertext) < nonceSize {
157 return nil, fmt.Errorf("ciphertext too short")
158 }
159
160 nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
161 return gcm.Open(nil, nonce, ciphertext, nil)
162}
163
164// NewAccessControl creates an access control manager
165func NewAccessControl() *AccessControl {
166 return &AccessControl{
167 policies: make(map[string]*AccessPolicy),
168 }
169}
170
171// SetPolicy sets access policy for a resource
172func (ac *AccessControl) SetPolicy(resource string, policy *AccessPolicy) {
173 ac.mu.Lock()
174 defer ac.mu.Unlock()
175 ac.policies[resource] = policy
176}
177
178// HasPermission checks if user has permission for resource
179func (ac *AccessControl) HasPermission(userID, resource string, perm Permission) bool {
180 ac.mu.RLock()
181 defer ac.mu.RUnlock()
182
183 policy, exists := ac.policies[resource]
184 if !exists {
185 return false
186 }
187
188 // Check expiration
189 if policy.ExpiresAt != nil && time.Now().After(*policy.ExpiresAt) {
190 return false
191 }
192
193 // Check user permission
194 for _, allowedUser := range policy.AllowedUsers {
195 if allowedUser == userID {
196 for _, p := range policy.Permissions {
197 if p == perm {
198 return true
199 }
200 }
201 }
202 }
203
204 return false
205}
206
207// AuditLog tracks all access attempts
208type AuditLog struct {
209 entries []AuditEntry
210 mu sync.Mutex
211}
212
213type AuditEntry struct {
214 Timestamp time.Time
215 UserID string
216 Resource string
217 Action string
218 Success bool
219 Error string
220}
221
222func NewAuditLog() *AuditLog {
223 return &AuditLog{
224 entries: make([]AuditEntry, 0),
225 }
226}
227
228func (al *AuditLog) LogAccess(userID, resource, action string, success bool, errorMsg string) {
229 al.mu.Lock()
230 defer al.mu.Unlock()
231
232 entry := AuditEntry{
233 Timestamp: time.Now(),
234 UserID: userID,
235 Resource: resource,
236 Action: action,
237 Success: success,
238 Error: errorMsg,
239 }
240
241 al.entries = append(al.entries, entry)
242
243 // In production, persist to database or log aggregation system
244}
245
246func (al *AuditLog) GetRecentEntries(limit int) []AuditEntry {
247 al.mu.Lock()
248 defer al.mu.Unlock()
249
250 if len(al.entries) < limit {
251 return al.entries
252 }
253
254 return al.entries[len(al.entries)-limit:]
255}
S3-Compatible Storage and Alternative Providers
While AWS S3 is the most popular object storage service, many applications benefit from S3-compatible alternatives like MinIO, DigitalOcean Spaces, Backblaze B2, and Wasabi. These services offer cost savings, on-premises deployment options, and geographic flexibility while maintaining S3 API compatibility.
Why S3-Compatible Storage Matters
Cost Optimization: Alternative providers like Wasabi and Backblaze B2 can be 80% cheaper than AWS S3 for certain workloads, with no egress fees.
On-Premises Control: MinIO enables you to run S3-compatible storage in your own data center, providing full control over data sovereignty and compliance requirements.
Multi-Cloud Strategy: Using S3-compatible APIs allows seamless migration between providers without changing application code.
Edge Deployment: MinIO can run on edge devices and IoT gateways, enabling distributed storage architectures that aren't possible with cloud-only solutions.
MinIO: Self-Hosted S3-Compatible Storage
MinIO is a high-performance, distributed object storage server that implements the S3 API. It's perfect for:
- Development and testing environments
- On-premises enterprise storage
- Hybrid cloud architectures
- Edge computing scenarios
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "io"
8 "log"
9 "strings"
10 "time"
11
12 "github.com/minio/minio-go/v7"
13 "github.com/minio/minio-go/v7/pkg/credentials"
14)
15
16// MinIOClient wraps MinIO operations with production patterns
17type MinIOClient struct {
18 client *minio.Client
19 endpoint string
20 bucketName string
21}
22
23// NewMinIOClient creates a new MinIO client
24func NewMinIOClient(endpoint, accessKey, secretKey, bucketName string, useSSL bool) (*MinIOClient, error) {
25 client, err := minio.New(endpoint, &minio.Options{
26 Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
27 Secure: useSSL,
28 })
29 if err != nil {
30 return nil, fmt.Errorf("failed to create MinIO client: %w", err)
31 }
32
33 return &MinIOClient{
34 client: client,
35 endpoint: endpoint,
36 bucketName: bucketName,
37 }, nil
38}
39
40// EnsureBucket creates bucket if it doesn't exist
41func (m *MinIOClient) EnsureBucket(ctx context.Context) error {
42 exists, err := m.client.BucketExists(ctx, m.bucketName)
43 if err != nil {
44 return fmt.Errorf("failed to check bucket: %w", err)
45 }
46
47 if !exists {
48 err = m.client.MakeBucket(ctx, m.bucketName, minio.MakeBucketOptions{
49 Region: "us-east-1",
50 ObjectLocking: false,
51 })
52 if err != nil {
53 return fmt.Errorf("failed to create bucket: %w", err)
54 }
55 log.Printf("Created bucket: %s", m.bucketName)
56 }
57
58 return nil
59}
60
61// Upload uploads an object to MinIO
62func (m *MinIOClient) Upload(ctx context.Context, objectName string, data io.Reader, size int64, contentType string) error {
63 _, err := m.client.PutObject(
64 ctx,
65 m.bucketName,
66 objectName,
67 data,
68 size,
69 minio.PutObjectOptions{
70 ContentType: contentType,
71 UserMetadata: map[string]string{
72 "uploaded-at": time.Now().Format(time.RFC3339),
73 },
74 },
75 )
76 if err != nil {
77 return fmt.Errorf("failed to upload: %w", err)
78 }
79
80 log.Printf("Uploaded %s (%d bytes)", objectName, size)
81 return nil
82}
83
84// Download downloads an object from MinIO
85func (m *MinIOClient) Download(ctx context.Context, objectName string) ([]byte, error) {
86 object, err := m.client.GetObject(ctx, m.bucketName, objectName, minio.GetObjectOptions{})
87 if err != nil {
88 return nil, fmt.Errorf("failed to get object: %w", err)
89 }
90 defer object.Close()
91
92 data, err := io.ReadAll(object)
93 if err != nil {
94 return nil, fmt.Errorf("failed to read object: %w", err)
95 }
96
97 return data, nil
98}
99
100// GetPresignedURL generates a presigned URL for direct uploads/downloads
101func (m *MinIOClient) GetPresignedURL(ctx context.Context, objectName string, expires time.Duration, forUpload bool) (string, error) {
102 var url string
103 var err error
104
105 if forUpload {
106 url, err = m.client.PresignedPutObject(ctx, m.bucketName, objectName, expires)
107 } else {
108 url, err = m.client.PresignedGetObject(ctx, m.bucketName, objectName, expires, nil)
109 }
110
111 if err != nil {
112 return "", fmt.Errorf("failed to generate presigned URL: %w", err)
113 }
114
115 return url, nil
116}
117
118// ListObjects lists all objects in the bucket with pagination
119func (m *MinIOClient) ListObjects(ctx context.Context, prefix string, maxResults int) ([]ObjectInfo, error) {
120 var objects []ObjectInfo
121
122 opts := minio.ListObjectsOptions{
123 Prefix: prefix,
124 Recursive: true,
125 MaxKeys: maxResults,
126 }
127
128 for object := range m.client.ListObjects(ctx, m.bucketName, opts) {
129 if object.Err != nil {
130 return nil, fmt.Errorf("error listing objects: %w", object.Err)
131 }
132
133 objects = append(objects, ObjectInfo{
134 Key: object.Key,
135 Size: object.Size,
136 LastModified: object.LastModified,
137 ContentType: object.ContentType,
138 ETag: object.ETag,
139 })
140 }
141
142 return objects, nil
143}
144
145// DeleteObject removes an object from storage
146func (m *MinIOClient) DeleteObject(ctx context.Context, objectName string) error {
147 err := m.client.RemoveObject(ctx, m.bucketName, objectName, minio.RemoveObjectOptions{})
148 if err != nil {
149 return fmt.Errorf("failed to delete object: %w", err)
150 }
151
152 log.Printf("Deleted object: %s", objectName)
153 return nil
154}
155
156type ObjectInfo struct {
157 Key string
158 Size int64
159 LastModified time.Time
160 ContentType string
161 ETag string
162}
163
164func main() {
165 // Example usage with MinIO
166 ctx := context.Background()
167
168 // Connect to MinIO (use your MinIO server details)
169 client, err := NewMinIOClient(
170 "localhost:9000", // MinIO endpoint
171 "minioadmin", // Access key
172 "minioadmin", // Secret key
173 "my-bucket", // Bucket name
174 false, // Use SSL
175 )
176 if err != nil {
177 log.Fatal(err)
178 }
179
180 // Ensure bucket exists
181 if err := client.EnsureBucket(ctx); err != nil {
182 log.Fatal(err)
183 }
184
185 // Upload a file
186 content := strings.NewReader("Hello from MinIO!")
187 err = client.Upload(ctx, "test.txt", content, int64(content.Len()), "text/plain")
188 if err != nil {
189 log.Fatal(err)
190 }
191
192 // Generate presigned URL for download
193 url, err := client.GetPresignedURL(ctx, "test.txt", 15*time.Minute, false)
194 if err != nil {
195 log.Fatal(err)
196 }
197 fmt.Printf("Presigned URL: %s\n", url)
198
199 // List objects
200 objects, err := client.ListObjects(ctx, "", 100)
201 if err != nil {
202 log.Fatal(err)
203 }
204
205 fmt.Printf("\nObjects in bucket:\n")
206 for _, obj := range objects {
207 fmt.Printf(" %s (%d bytes, %s)\n", obj.Key, obj.Size, obj.LastModified.Format(time.RFC3339))
208 }
209
210 // Download the file
211 data, err := client.Download(ctx, "test.txt")
212 if err != nil {
213 log.Fatal(err)
214 }
215 fmt.Printf("\nDownloaded content: %s\n", string(data))
216
217 // Clean up
218 if err := client.DeleteObject(ctx, "test.txt"); err != nil {
219 log.Fatal(err)
220 }
221
222 fmt.Println("\nMinIO operations completed successfully!")
223}
Multi-Provider Abstraction for S3-Compatible Services
When working with multiple S3-compatible providers, create a unified interface:
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "io"
8 "log"
9 "strings"
10)
11
12// StorageProvider defines the interface for all S3-compatible storage
13type StorageProvider interface {
14 Upload(ctx context.Context, key string, data io.Reader, size int64) error
15 Download(ctx context.Context, key string) ([]byte, error)
16 Delete(ctx context.Context, key string) error
17 List(ctx context.Context, prefix string) ([]string, error)
18 GetURL(ctx context.Context, key string) (string, error)
19}
20
21// ProviderType identifies the storage provider
22type ProviderType string
23
24const (
25 ProviderAWS ProviderType = "aws"
26 ProviderMinIO ProviderType = "minio"
27 ProviderDigitalOcean ProviderType = "digitalocean"
28 ProviderBackblaze ProviderType = "backblaze"
29 ProviderWasabi ProviderType = "wasabi"
30)
31
32// StorageConfig holds provider configuration
33type StorageConfig struct {
34 Provider ProviderType
35 Endpoint string
36 AccessKey string
37 SecretKey string
38 BucketName string
39 Region string
40 UseSSL bool
41}
42
43// MultiProviderStorage manages multiple storage providers
44type MultiProviderStorage struct {
45 providers map[ProviderType]StorageProvider
46 primary ProviderType
47 fallbacks []ProviderType
48}
49
50// NewMultiProviderStorage creates a storage manager with multiple providers
51func NewMultiProviderStorage(configs []StorageConfig, primary ProviderType) (*MultiProviderStorage, error) {
52 mps := &MultiProviderStorage{
53 providers: make(map[ProviderType]StorageProvider),
54 primary: primary,
55 fallbacks: make([]ProviderType, 0),
56 }
57
58 for _, config := range configs {
59 var provider StorageProvider
60 var err error
61
62 switch config.Provider {
63 case ProviderMinIO, ProviderDigitalOcean, ProviderBackblaze, ProviderWasabi:
64 // All use S3-compatible API
65 provider, err = NewS3CompatibleProvider(config)
66 case ProviderAWS:
67 provider, err = NewAWSProvider(config)
68 default:
69 return nil, fmt.Errorf("unsupported provider: %s", config.Provider)
70 }
71
72 if err != nil {
73 return nil, fmt.Errorf("failed to initialize %s: %w", config.Provider, err)
74 }
75
76 mps.providers[config.Provider] = provider
77
78 if config.Provider != primary {
79 mps.fallbacks = append(mps.fallbacks, config.Provider)
80 }
81 }
82
83 return mps, nil
84}
85
86// Upload with automatic failover
87func (mps *MultiProviderStorage) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
88 // Try primary provider first
89 primary := mps.providers[mps.primary]
90 err := primary.Upload(ctx, key, data, size)
91 if err == nil {
92 log.Printf("Uploaded to primary provider (%s)", mps.primary)
93 return nil
94 }
95
96 log.Printf("Primary provider (%s) failed: %v, trying fallbacks", mps.primary, err)
97
98 // Try fallback providers
99 for _, fallbackType := range mps.fallbacks {
100 fallback := mps.providers[fallbackType]
101 if err := fallback.Upload(ctx, key, data, size); err == nil {
102 log.Printf("Uploaded to fallback provider (%s)", fallbackType)
103 return nil
104 }
105 log.Printf("Fallback provider (%s) failed: %v", fallbackType, err)
106 }
107
108 return fmt.Errorf("all providers failed")
109}
110
111// Download with automatic failover
112func (mps *MultiProviderStorage) Download(ctx context.Context, key string) ([]byte, error) {
113 // Try primary provider first
114 primary := mps.providers[mps.primary]
115 data, err := primary.Download(ctx, key)
116 if err == nil {
117 return data, nil
118 }
119
120 log.Printf("Primary provider (%s) failed: %v, trying fallbacks", mps.primary, err)
121
122 // Try fallback providers
123 for _, fallbackType := range mps.fallbacks {
124 fallback := mps.providers[fallbackType]
125 data, err := fallback.Download(ctx, key)
126 if err == nil {
127 log.Printf("Downloaded from fallback provider (%s)", fallbackType)
128 return data, nil
129 }
130 }
131
132 return nil, fmt.Errorf("all providers failed")
133}
134
135// S3CompatibleProvider implements StorageProvider for S3-compatible services
136type S3CompatibleProvider struct {
137 config StorageConfig
138 // In production, this would wrap minio-go client
139}
140
141func NewS3CompatibleProvider(config StorageConfig) (*S3CompatibleProvider, error) {
142 // Initialize S3-compatible client (MinIO, DigitalOcean Spaces, etc.)
143 return &S3CompatibleProvider{config: config}, nil
144}
145
146func (p *S3CompatibleProvider) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
147 log.Printf("[%s] Uploading %s", p.config.Provider, key)
148 // Actual implementation would use minio-go client
149 return nil
150}
151
152func (p *S3CompatibleProvider) Download(ctx context.Context, key string) ([]byte, error) {
153 log.Printf("[%s] Downloading %s", p.config.Provider, key)
154 // Actual implementation would use minio-go client
155 return []byte(fmt.Sprintf("Data from %s", p.config.Provider)), nil
156}
157
158func (p *S3CompatibleProvider) Delete(ctx context.Context, key string) error {
159 log.Printf("[%s] Deleting %s", p.config.Provider, key)
160 return nil
161}
162
163func (p *S3CompatibleProvider) List(ctx context.Context, prefix string) ([]string, error) {
164 log.Printf("[%s] Listing with prefix %s", p.config.Provider, prefix)
165 return []string{"file1.txt", "file2.txt"}, nil
166}
167
168func (p *S3CompatibleProvider) GetURL(ctx context.Context, key string) (string, error) {
169 return fmt.Sprintf("https://%s/%s/%s", p.config.Endpoint, p.config.BucketName, key), nil
170}
171
172// AWSProvider implements StorageProvider for AWS S3
173type AWSProvider struct {
174 config StorageConfig
175}
176
177func NewAWSProvider(config StorageConfig) (*AWSProvider, error) {
178 return &AWSProvider{config: config}, nil
179}
180
181func (p *AWSProvider) Upload(ctx context.Context, key string, data io.Reader, size int64) error {
182 log.Printf("[AWS] Uploading %s", key)
183 return nil
184}
185
186func (p *AWSProvider) Download(ctx context.Context, key string) ([]byte, error) {
187 log.Printf("[AWS] Downloading %s", key)
188 return []byte("Data from AWS"), nil
189}
190
191func (p *AWSProvider) Delete(ctx context.Context, key string) error {
192 log.Printf("[AWS] Deleting %s", key)
193 return nil
194}
195
196func (p *AWSProvider) List(ctx context.Context, prefix string) ([]string, error) {
197 log.Printf("[AWS] Listing with prefix %s", prefix)
198 return []string{"file1.txt", "file2.txt"}, nil
199}
200
201func (p *AWSProvider) GetURL(ctx context.Context, key string) (string, error) {
202 return fmt.Sprintf("https://s3.%s.amazonaws.com/%s/%s", p.config.Region, p.config.BucketName, key), nil
203}
204
205func main() {
206 ctx := context.Background()
207
208 // Configure multiple providers
209 configs := []StorageConfig{
210 {
211 Provider: ProviderAWS,
212 Region: "us-east-1",
213 BucketName: "my-aws-bucket",
214 AccessKey: "AWS_ACCESS_KEY",
215 SecretKey: "AWS_SECRET_KEY",
216 UseSSL: true,
217 },
218 {
219 Provider: ProviderMinIO,
220 Endpoint: "localhost:9000",
221 BucketName: "my-minio-bucket",
222 AccessKey: "minioadmin",
223 SecretKey: "minioadmin",
224 UseSSL: false,
225 },
226 {
227 Provider: ProviderDigitalOcean,
228 Endpoint: "nyc3.digitaloceanspaces.com",
229 BucketName: "my-do-bucket",
230 AccessKey: "DO_ACCESS_KEY",
231 SecretKey: "DO_SECRET_KEY",
232 UseSSL: true,
233 },
234 }
235
236 // Create multi-provider storage with AWS as primary
237 storage, err := NewMultiProviderStorage(configs, ProviderAWS)
238 if err != nil {
239 log.Fatal(err)
240 }
241
242 // Upload with automatic failover
243 content := strings.NewReader("Important data")
244 err = storage.Upload(ctx, "important.txt", content, int64(content.Len()))
245 if err != nil {
246 log.Fatal(err)
247 }
248
249 // Download with automatic failover
250 data, err := storage.Download(ctx, "important.txt")
251 if err != nil {
252 log.Fatal(err)
253 }
254
255 fmt.Printf("Downloaded: %s\n", string(data))
256 fmt.Println("\nMulti-provider storage demonstration complete!")
257}
Cost Comparison: AWS S3 vs S3-Compatible Alternatives
Understanding pricing differences helps optimize storage costs:
Storage Costs (per TB/month):
- AWS S3 Standard: $23.00
- DigitalOcean Spaces: $5.00
- Backblaze B2: $5.00
- Wasabi: $5.99
- MinIO (self-hosted): Infrastructure cost only
Egress Costs (per TB):
- AWS S3: $90.00
- DigitalOcean Spaces: $10.00 (after 1TB included)
- Backblaze B2: $10.00
- Wasabi: $0 (no egress fees)
- MinIO (self-hosted): Bandwidth cost only
Decision Framework:
- High Egress Traffic: Use Wasabi or self-hosted MinIO
- Low Traffic, Small Scale: Use DigitalOcean Spaces or Backblaze
- Enterprise Scale: Use AWS S3 with CloudFront CDN
- Data Sovereignty: Use self-hosted MinIO
- Hybrid Requirements: Use multi-provider strategy
Resumable and Multipart Upload Patterns
Large file uploads require special handling to deal with network interruptions, timeouts, and memory constraints. Multipart uploads and resumable patterns are essential for production applications.
Why Resumable Uploads Matter
User Experience: Users shouldn't lose progress when uploading large files if their connection drops temporarily.
Reliability: Network instability is common on mobile networks and in areas with poor connectivity.
Performance: Parallel multipart uploads can significantly speed up large file transfers.
Memory Efficiency: Streaming uploads prevent loading entire files into memory.
Multipart Upload Implementation
Multipart upload splits large files into smaller chunks, uploads them in parallel, and reassembles them server-side:
1// run
2package main
3
4import (
5 "context"
6 "crypto/md5"
7 "encoding/hex"
8 "fmt"
9 "io"
10 "log"
11 "sync"
12 "time"
13)
14
15// MultipartUploader handles chunked uploads with resume capability
16type MultipartUploader struct {
17 chunkSize int64
18 maxWorkers int
19 maxRetries int
20 uploadStore UploadStateStore
21}
22
23// UploadState tracks progress of a multipart upload
24type UploadState struct {
25 UploadID string
26 Key string
27 TotalSize int64
28 ChunkSize int64
29 CompletedParts map[int]PartInfo
30 CreatedAt time.Time
31 UpdatedAt time.Time
32}
33
34// PartInfo contains information about an uploaded part
35type PartInfo struct {
36 PartNumber int
37 ETag string
38 Size int64
39 UploadedAt time.Time
40}
41
42// UploadStateStore persists upload state for resumability
43type UploadStateStore interface {
44 Save(state *UploadState) error
45 Load(uploadID string) (*UploadState, error)
46 Delete(uploadID string) error
47}
48
49// InMemoryUploadStore provides in-memory state storage (use Redis in production)
50type InMemoryUploadStore struct {
51 states map[string]*UploadState
52 mu sync.RWMutex
53}
54
55func NewInMemoryUploadStore() *InMemoryUploadStore {
56 return &InMemoryUploadStore{
57 states: make(map[string]*UploadState),
58 }
59}
60
61func (s *InMemoryUploadStore) Save(state *UploadState) error {
62 s.mu.Lock()
63 defer s.mu.Unlock()
64 state.UpdatedAt = time.Now()
65 s.states[state.UploadID] = state
66 return nil
67}
68
69func (s *InMemoryUploadStore) Load(uploadID string) (*UploadState, error) {
70 s.mu.RLock()
71 defer s.mu.RUnlock()
72 state, exists := s.states[uploadID]
73 if !exists {
74 return nil, fmt.Errorf("upload state not found")
75 }
76 return state, nil
77}
78
79func (s *InMemoryUploadStore) Delete(uploadID string) error {
80 s.mu.Lock()
81 defer s.mu.Unlock()
82 delete(s.states, uploadID)
83 return nil
84}
85
86// NewMultipartUploader creates a new uploader
87func NewMultipartUploader(chunkSize int64, maxWorkers int, store UploadStateStore) *MultipartUploader {
88 return &MultipartUploader{
89 chunkSize: chunkSize,
90 maxWorkers: maxWorkers,
91 maxRetries: 3,
92 uploadStore: store,
93 }
94}
95
96// InitiateUpload starts a new multipart upload
97func (u *MultipartUploader) InitiateUpload(ctx context.Context, key string, totalSize int64) (string, error) {
98 uploadID := generateUploadID()
99
100 state := &UploadState{
101 UploadID: uploadID,
102 Key: key,
103 TotalSize: totalSize,
104 ChunkSize: u.chunkSize,
105 CompletedParts: make(map[int]PartInfo),
106 CreatedAt: time.Now(),
107 }
108
109 if err := u.uploadStore.Save(state); err != nil {
110 return "", fmt.Errorf("failed to save upload state: %w", err)
111 }
112
113 log.Printf("Initiated upload %s for %s (%d bytes)", uploadID, key, totalSize)
114 return uploadID, nil
115}
116
117// UploadPart uploads a single part
118func (u *MultipartUploader) UploadPart(ctx context.Context, uploadID string, partNumber int, data io.Reader, size int64) error {
119 state, err := u.uploadStore.Load(uploadID)
120 if err != nil {
121 return err
122 }
123
124 // Check if part already uploaded
125 if _, exists := state.CompletedParts[partNumber]; exists {
126 log.Printf("Part %d already uploaded, skipping", partNumber)
127 return nil
128 }
129
130 // Read and hash the data
131 hasher := md5.New()
132 buf := make([]byte, size)
133 n, err := io.ReadFull(data, buf)
134 if err != nil && err != io.EOF {
135 return fmt.Errorf("failed to read part data: %w", err)
136 }
137
138 hasher.Write(buf[:n])
139 etag := hex.EncodeToString(hasher.Sum(nil))
140
141 // Simulate upload (in production, upload to actual storage)
142 time.Sleep(100 * time.Millisecond) // Simulate network latency
143
144 // Record completed part
145 state.CompletedParts[partNumber] = PartInfo{
146 PartNumber: partNumber,
147 ETag: etag,
148 Size: int64(n),
149 UploadedAt: time.Now(),
150 }
151
152 if err := u.uploadStore.Save(state); err != nil {
153 return fmt.Errorf("failed to save upload state: %w", err)
154 }
155
156 log.Printf("Uploaded part %d (%d bytes, ETag: %s)", partNumber, n, etag)
157 return nil
158}
159
160// UploadWithResume uploads a file with resume capability
161func (u *MultipartUploader) UploadWithResume(ctx context.Context, uploadID string, data io.Reader, totalSize int64) error {
162 state, err := u.uploadStore.Load(uploadID)
163 if err != nil {
164 return err
165 }
166
167 totalParts := int((totalSize + u.chunkSize - 1) / u.chunkSize)
168 log.Printf("Uploading %d parts (chunk size: %d bytes)", totalParts, u.chunkSize)
169
170 // Create work queue for parts
171 type partWork struct {
172 partNumber int
173 offset int64
174 size int64
175 }
176
177 workChan := make(chan partWork, totalParts)
178 errorChan := make(chan error, totalParts)
179 var wg sync.WaitGroup
180
181 // Queue parts that haven't been uploaded yet
182 for partNum := 1; partNum <= totalParts; partNum++ {
183 if _, completed := state.CompletedParts[partNum]; !completed {
184 offset := int64(partNum-1) * u.chunkSize
185 size := u.chunkSize
186 if offset+size > totalSize {
187 size = totalSize - offset
188 }
189
190 workChan <- partWork{
191 partNumber: partNum,
192 offset: offset,
193 size: size,
194 }
195 }
196 }
197 close(workChan)
198
199 // Start worker pool
200 for i := 0; i < u.maxWorkers; i++ {
201 wg.Add(1)
202 go func(workerID int) {
203 defer wg.Done()
204
205 for work := range workChan {
206 // In production, seek to work.offset and read work.size bytes
207 // For demo, create sample data
208 chunk := make([]byte, work.size)
209 for i := range chunk {
210 chunk[i] = byte((work.offset + int64(i)) % 256)
211 }
212
213 err := u.UploadPart(ctx, uploadID, work.partNumber,
214 &limitedReader{data: chunk, limit: work.size}, work.size)
215 if err != nil {
216 log.Printf("Worker %d failed to upload part %d: %v",
217 workerID, work.partNumber, err)
218 errorChan <- err
219 return
220 }
221
222 log.Printf("Worker %d completed part %d", workerID, work.partNumber)
223 }
224 }(i)
225 }
226
227 wg.Wait()
228 close(errorChan)
229
230 // Check for errors
231 if len(errorChan) > 0 {
232 return <-errorChan
233 }
234
235 return nil
236}
237
238// CompleteUpload finalizes the multipart upload
239func (u *MultipartUploader) CompleteUpload(ctx context.Context, uploadID string) error {
240 state, err := u.uploadStore.Load(uploadID)
241 if err != nil {
242 return err
243 }
244
245 totalParts := int((state.TotalSize + u.chunkSize - 1) / u.chunkSize)
246 if len(state.CompletedParts) != totalParts {
247 return fmt.Errorf("upload incomplete: %d/%d parts completed",
248 len(state.CompletedParts), totalParts)
249 }
250
251 log.Printf("Completing upload %s (%d parts)", uploadID, len(state.CompletedParts))
252
253 // In production, call CompleteMultipartUpload API
254 // This tells S3/storage to assemble all parts into final object
255
256 // Clean up upload state
257 if err := u.uploadStore.Delete(uploadID); err != nil {
258 log.Printf("Warning: failed to clean up upload state: %v", err)
259 }
260
261 log.Printf("Upload completed successfully: %s", state.Key)
262 return nil
263}
264
265// GetProgress returns upload progress
266func (u *MultipartUploader) GetProgress(uploadID string) (float64, error) {
267 state, err := u.uploadStore.Load(uploadID)
268 if err != nil {
269 return 0, err
270 }
271
272 var uploadedBytes int64
273 for _, part := range state.CompletedParts {
274 uploadedBytes += part.Size
275 }
276
277 progress := float64(uploadedBytes) / float64(state.TotalSize) * 100
278 return progress, nil
279}
280
281// AbortUpload cancels an upload and cleans up parts
282func (u *MultipartUploader) AbortUpload(ctx context.Context, uploadID string) error {
283 state, err := u.uploadStore.Load(uploadID)
284 if err != nil {
285 return err
286 }
287
288 log.Printf("Aborting upload %s for %s", uploadID, state.Key)
289
290 // In production, call AbortMultipartUpload API to delete uploaded parts
291
292 if err := u.uploadStore.Delete(uploadID); err != nil {
293 return fmt.Errorf("failed to delete upload state: %w", err)
294 }
295
296 return nil
297}
298
299type limitedReader struct {
300 data []byte
301 limit int64
302 pos int64
303}
304
305func (r *limitedReader) Read(p []byte) (n int, err error) {
306 if r.pos >= r.limit {
307 return 0, io.EOF
308 }
309
310 remaining := r.limit - r.pos
311 if int64(len(p)) > remaining {
312 p = p[:remaining]
313 }
314
315 n = copy(p, r.data[r.pos:])
316 r.pos += int64(n)
317 return n, nil
318}
319
320func generateUploadID() string {
321 return fmt.Sprintf("upload-%d", time.Now().UnixNano())
322}
323
324func main() {
325 ctx := context.Background()
326
327 // Create uploader with 5MB chunks, 4 workers
328 store := NewInMemoryUploadStore()
329 uploader := NewMultipartUploader(5*1024*1024, 4, store)
330
331 // Simulate uploading a 50MB file
332 fileSize := int64(50 * 1024 * 1024)
333 key := "large-file.bin"
334
335 // Initiate upload
336 uploadID, err := uploader.InitiateUpload(ctx, key, fileSize)
337 if err != nil {
338 log.Fatal(err)
339 }
340
341 fmt.Printf("Upload initiated: %s\n", uploadID)
342
343 // Upload parts (simulated)
344 dummyReader := &dummyReader{size: fileSize}
345 err = uploader.UploadWithResume(ctx, uploadID, dummyReader, fileSize)
346 if err != nil {
347 log.Printf("Upload failed: %v", err)
348 if abortErr := uploader.AbortUpload(ctx, uploadID); abortErr != nil {
349 log.Printf("Failed to abort upload: %v", abortErr)
350 }
351 log.Fatal(err)
352 }
353
354 // Check progress
355 progress, err := uploader.GetProgress(uploadID)
356 if err != nil {
357 log.Fatal(err)
358 }
359 fmt.Printf("Upload progress: %.2f%%\n", progress)
360
361 // Complete upload
362 err = uploader.CompleteUpload(ctx, uploadID)
363 if err != nil {
364 log.Fatal(err)
365 }
366
367 fmt.Println("Multipart upload completed successfully!")
368}
369
370type dummyReader struct {
371 size int64
372 pos int64
373}
374
375func (r *dummyReader) Read(p []byte) (n int, err error) {
376 if r.pos >= r.size {
377 return 0, io.EOF
378 }
379 n = len(p)
380 if int64(n) > r.size-r.pos {
381 n = int(r.size - r.pos)
382 }
383 for i := 0; i < n; i++ {
384 p[i] = byte((r.pos + int64(i)) % 256)
385 }
386 r.pos += int64(n)
387 return n, nil
388}
Production Multipart Upload with AWS S3
Here's a production-ready implementation using AWS SDK v2:
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "io"
8 "log"
9 "sync"
10 "time"
11)
12
13// S3MultipartUploader handles production multipart uploads to S3
14type S3MultipartUploader struct {
15 // In production, would contain:
16 // client *s3.Client
17 bucket string
18 minPartSize int64
19 maxWorkers int
20 maxRetries int
21}
22
23// UploadResult contains the result of a multipart upload
24type UploadResult struct {
25 Location string
26 VersionID string
27 ETag string
28 Duration time.Duration
29}
30
31// NewS3MultipartUploader creates an S3 multipart uploader
32func NewS3MultipartUploader(bucket string) *S3MultipartUploader {
33 return &S3MultipartUploader{
34 bucket: bucket,
35 minPartSize: 5 * 1024 * 1024, // 5MB minimum for S3
36 maxWorkers: 10,
37 maxRetries: 3,
38 }
39}
40
41// UploadLargeFile handles multipart upload with progress tracking
42func (u *S3MultipartUploader) UploadLargeFile(
43 ctx context.Context,
44 key string,
45 data io.Reader,
46 totalSize int64,
47 progressCallback func(bytesUploaded int64, totalSize int64),
48) (*UploadResult, error) {
49 startTime := time.Now()
50
51 log.Printf("Starting multipart upload: %s (%d bytes)", key, totalSize)
52
53 // Step 1: Create multipart upload
54 uploadID, err := u.createMultipartUpload(ctx, key)
55 if err != nil {
56 return nil, fmt.Errorf("failed to create multipart upload: %w", err)
57 }
58
59 log.Printf("Created multipart upload: %s", uploadID)
60
61 // Step 2: Upload parts in parallel
62 parts, err := u.uploadParts(ctx, uploadID, key, data, totalSize, progressCallback)
63 if err != nil {
64 // Abort upload on failure
65 u.abortMultipartUpload(ctx, key, uploadID)
66 return nil, fmt.Errorf("failed to upload parts: %w", err)
67 }
68
69 // Step 3: Complete multipart upload
70 result, err := u.completeMultipartUpload(ctx, key, uploadID, parts)
71 if err != nil {
72 u.abortMultipartUpload(ctx, key, uploadID)
73 return nil, fmt.Errorf("failed to complete upload: %w", err)
74 }
75
76 result.Duration = time.Since(startTime)
77 log.Printf("Upload completed in %v", result.Duration)
78
79 return result, nil
80}
81
82func (u *S3MultipartUploader) createMultipartUpload(ctx context.Context, key string) (string, error) {
83 // In production:
84 // input := &s3.CreateMultipartUploadInput{
85 // Bucket: aws.String(u.bucket),
86 // Key: aws.String(key),
87 // ServerSideEncryption: types.ServerSideEncryptionAes256,
88 // }
89 // output, err := u.client.CreateMultipartUpload(ctx, input)
90
91 uploadID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
92 return uploadID, nil
93}
94
95func (u *S3MultipartUploader) uploadParts(
96 ctx context.Context,
97 uploadID string,
98 key string,
99 data io.Reader,
100 totalSize int64,
101 progressCallback func(int64, int64),
102) ([]CompletedPart, error) {
103 numParts := (totalSize + u.minPartSize - 1) / u.minPartSize
104 parts := make([]CompletedPart, int(numParts))
105
106 var uploadedBytes int64
107 var mu sync.Mutex
108
109 type partWork struct {
110 partNumber int
111 data []byte
112 }
113
114 workChan := make(chan partWork, numParts)
115 errorChan := make(chan error, 1)
116 var wg sync.WaitGroup
117
118 // Start workers
119 for i := 0; i < u.maxWorkers; i++ {
120 wg.Add(1)
121 go func() {
122 defer wg.Done()
123
124 for work := range workChan {
125 select {
126 case <-ctx.Done():
127 return
128 default:
129 }
130
131 etag, err := u.uploadSinglePart(ctx, uploadID, key, work.partNumber, work.data)
132 if err != nil {
133 select {
134 case errorChan <- err:
135 default:
136 }
137 return
138 }
139
140 mu.Lock()
141 parts[work.partNumber-1] = CompletedPart{
142 PartNumber: work.partNumber,
143 ETag: etag,
144 }
145 uploadedBytes += int64(len(work.data))
146 if progressCallback != nil {
147 progressCallback(uploadedBytes, totalSize)
148 }
149 mu.Unlock()
150
151 log.Printf("Completed part %d/%d", work.partNumber, numParts)
152 }
153 }()
154 }
155
156 // Queue all parts
157 buf := make([]byte, u.minPartSize)
158 for partNum := 1; partNum <= int(numParts); partNum++ {
159 n, err := io.ReadFull(data, buf)
160 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
161 close(workChan)
162 return nil, err
163 }
164
165 partData := make([]byte, n)
166 copy(partData, buf[:n])
167
168 workChan <- partWork{
169 partNumber: partNum,
170 data: partData,
171 }
172
173 if err == io.EOF || err == io.ErrUnexpectedEOF {
174 break
175 }
176 }
177
178 close(workChan)
179 wg.Wait()
180 close(errorChan)
181
182 if err := <-errorChan; err != nil {
183 return nil, err
184 }
185
186 return parts, nil
187}
188
189func (u *S3MultipartUploader) uploadSinglePart(
190 ctx context.Context,
191 uploadID string,
192 key string,
193 partNumber int,
194 data []byte,
195) (string, error) {
196 // In production:
197 // input := &s3.UploadPartInput{
198 // Bucket: aws.String(u.bucket),
199 // Key: aws.String(key),
200 // UploadId: aws.String(uploadID),
201 // PartNumber: aws.Int32(int32(partNumber)),
202 // Body: bytes.NewReader(data),
203 // }
204 //
205 // output, err := u.client.UploadPart(ctx, input)
206 // return *output.ETag, err
207
208 // Simulate upload
209 time.Sleep(50 * time.Millisecond)
210 return fmt.Sprintf("etag-%d", partNumber), nil
211}
212
213func (u *S3MultipartUploader) completeMultipartUpload(
214 ctx context.Context,
215 key string,
216 uploadID string,
217 parts []CompletedPart,
218) (*UploadResult, error) {
219 // In production:
220 // completedParts := make([]types.CompletedPart, len(parts))
221 // for i, p := range parts {
222 // completedParts[i] = types.CompletedPart{
223 // ETag: aws.String(p.ETag),
224 // PartNumber: aws.Int32(int32(p.PartNumber)),
225 // }
226 // }
227 //
228 // input := &s3.CompleteMultipartUploadInput{
229 // Bucket: aws.String(u.bucket),
230 // Key: aws.String(key),
231 // UploadId: aws.String(uploadID),
232 // MultipartUpload: &types.CompletedMultipartUpload{
233 // Parts: completedParts,
234 // },
235 // }
236 //
237 // output, err := u.client.CompleteMultipartUpload(ctx, input)
238
239 return &UploadResult{
240 Location: fmt.Sprintf("https://%s.s3.amazonaws.com/%s", u.bucket, key),
241 ETag: "final-etag",
242 VersionID: "version-id",
243 }, nil
244}
245
246func (u *S3MultipartUploader) abortMultipartUpload(ctx context.Context, key string, uploadID string) error {
247 // In production:
248 // input := &s3.AbortMultipartUploadInput{
249 // Bucket: aws.String(u.bucket),
250 // Key: aws.String(key),
251 // UploadId: aws.String(uploadID),
252 // }
253 // _, err := u.client.AbortMultipartUpload(ctx, input)
254
255 log.Printf("Aborted multipart upload: %s", uploadID)
256 return nil
257}
258
259type CompletedPart struct {
260 PartNumber int
261 ETag string
262}
263
264func main() {
265 ctx := context.Background()
266
267 uploader := NewS3MultipartUploader("my-bucket")
268
269 // Simulate 100MB file
270 fileSize := int64(100 * 1024 * 1024)
271 dummyData := &dummyDataReader{size: fileSize}
272
273 // Progress callback
274 progressCallback := func(uploaded, total int64) {
275 percent := float64(uploaded) / float64(total) * 100
276 fmt.Printf("\rProgress: %.2f%% (%d/%d bytes)", percent, uploaded, total)
277 }
278
279 result, err := uploader.UploadLargeFile(
280 ctx,
281 "large-file.bin",
282 dummyData,
283 fileSize,
284 progressCallback,
285 )
286 if err != nil {
287 log.Fatal(err)
288 }
289
290 fmt.Printf("\n\nUpload successful!\n")
291 fmt.Printf("Location: %s\n", result.Location)
292 fmt.Printf("ETag: %s\n", result.ETag)
293 fmt.Printf("Duration: %v\n", result.Duration)
294}
295
296type dummyDataReader struct {
297 size int64
298 pos int64
299}
300
301func (r *dummyDataReader) Read(p []byte) (n int, err error) {
302 if r.pos >= r.size {
303 return 0, io.EOF
304 }
305 n = len(p)
306 if int64(n) > r.size-r.pos {
307 n = int(r.size - r.pos)
308 }
309 for i := 0; i < n; i++ {
310 p[i] = byte((r.pos + int64(i)) % 256)
311 }
312 r.pos += int64(n)
313 return n, nil
314}
Best Practices for Resumable Uploads
1. State Persistence: Store upload state in Redis or a database for reliability across server restarts.
2. Part Size Selection:
- Minimum: 5MB for S3 (except last part)
- Optimal: 10-100MB depending on file size
- Maximum: 5GB per part
3. Error Handling:
- Implement exponential backoff for retries
- Abort uploads after maximum retry attempts
- Clean up incomplete uploads periodically
4. Progress Tracking:
- Report progress to users
- Enable pause/resume functionality
- Store checksums for data integrity
5. Concurrency Control:
- Use worker pools to limit concurrent uploads
- Adjust worker count based on available bandwidth
- Implement rate limiting to prevent throttling
Practice Exercises
Now let's apply these concepts with hands-on exercises:
Exercise 1: File Upload Service with Direct Browser Uploads
Learning Objective: Build a secure, scalable file upload service that handles direct browser uploads, content validation, and automated processing pipelines.
Real-World Context: File upload services are critical infrastructure at companies like Dropbox and Instagram, where billions of files are uploaded daily. These systems must handle security threats, process various file types, generate thumbnails, and store content efficiently across multiple storage tiers.
Difficulty: Intermediate | Time Estimate: 65-80 minutes
Objective: Build a file upload service with presigned URLs, virus scanning, and thumbnail generation.
Solution
1package main
2
3import (
4 "context"
5 "crypto/sha256"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9 "image"
10 "image/jpeg"
11 "image/png"
12 "io"
13 "net/http"
14 "time"
15
16 "github.com/aws/aws-sdk-go-v2/service/s3"
17 "github.com/nfnt/resize"
18)
19
20type FileUploadService struct {
21 storage *S3Storage
22 maxSize int64
23 allowedTypes map[string]bool
24}
25
26func NewFileUploadService(storage *S3Storage) *FileUploadService {
27 return &FileUploadService{
28 storage: storage,
29 maxSize: 10 * 1024 * 1024, // 10MB
30 allowedTypes: map[string]bool{
31 "image/jpeg": true,
32 "image/png": true,
33 "image/gif": true,
34 },
35 }
36}
37
38// GenerateUploadURL generates presigned upload URL
39func GenerateUploadURL(ctx context.Context, filename, contentType string) {
40 // Validate content type
41 if !f.allowedTypes[contentType] {
42 return nil, fmt.Errorf("content type not allowed: %s", contentType)
43 }
44
45 // Generate unique key
46 key := f.generateKey(filename)
47
48 // Generate presigned URL
49 url, err := f.storage.GetSignedUploadURL(ctx, key, 15*time.Minute)
50 if err != nil {
51 return nil, err
52 }
53
54 return &UploadURLResponse{
55 UploadURL: url,
56 Key: key,
57 ExpiresAt: time.Now().Add(15 * time.Minute),
58 }, nil
59}
60
61type UploadURLResponse struct {
62 UploadURL string `json:"upload_url"`
63 Key string `json:"key"`
64 ExpiresAt time.Time `json:"expires_at"`
65}
66
67// ProcessUpload processes uploaded file
68func ProcessUpload(ctx context.Context, key string) error {
69 // Download file
70 data, err := f.storage.Download(ctx, key)
71 if err != nil {
72 return err
73 }
74
75 // Scan for viruses
76 if err := f.scanFile(data); err != nil {
77 return fmt.Errorf("virus detected: %w", err)
78 }
79
80 // Generate thumbnail if image
81 if err := f.generateThumbnail(ctx, key, data); err != nil {
82 return fmt.Errorf("thumbnail generation failed: %w", err)
83 }
84
85 // Tag as processed
86 return f.tagAsProcessed(ctx, key)
87}
88
89func generateKey(filename string) string {
90 hash := sha256.Sum256([]byte(fmt.Sprintf("%s-%d", filename, time.Now().UnixNano())))
91 return fmt.Sprintf("uploads/%s/%s", time.Now().Format("2006/01/02"), hex.EncodeToString(hash[:8]))
92}
93
94func scanFile(data []byte) error {
95 // Placeholder for virus scanning
96 // In production, integrate with ClamAV or cloud scanning service
97 return nil
98}
99
100func generateThumbnail(ctx context.Context, key string, data []byte) error {
101 // Decode image
102 img, format, err := image.Decode(bytes.NewReader(data))
103 if err != nil {
104 return err
105 }
106
107 // Generate thumbnail
108 thumbnail := resize.Resize(200, 0, img, resize.Lanczos3)
109
110 // Encode thumbnail
111 var buf bytes.Buffer
112 switch format {
113 case "jpeg":
114 jpeg.Encode(&buf, thumbnail, &jpeg.Options{Quality: 85})
115 case "png":
116 png.Encode(&buf, thumbnail)
117 default:
118 return fmt.Errorf("unsupported format: %s", format)
119 }
120
121 // Upload thumbnail
122 thumbnailKey := fmt.Sprintf("thumbnails/%s", key)
123 return f.storage.Upload(ctx, thumbnailKey, &buf,
124 WithContentType(fmt.Sprintf("image/%s", format)),
125 )
126}
127
128func tagAsProcessed(ctx context.Context, key string) error {
129 // Add metadata tag
130 // Implementation depends on storage provider
131 return nil
132}
133
134// HTTP Handlers
135func HandleGetUploadURL(w http.ResponseWriter, r *http.Request) {
136 var req struct {
137 Filename string `json:"filename"`
138 ContentType string `json:"content_type"`
139 }
140
141 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
142 http.Error(w, "Invalid request", http.StatusBadRequest)
143 return
144 }
145
146 resp, err := f.GenerateUploadURL(r.Context(), req.Filename, req.ContentType)
147 if err != nil {
148 http.Error(w, err.Error(), http.StatusBadRequest)
149 return
150 }
151
152 w.Header().Set("Content-Type", "application/json")
153 json.NewEncoder(w).Encode(resp)
154}
155
156func HandleProcessUpload(w http.ResponseWriter, r *http.Request) {
157 var req struct {
158 Key string `json:"key"`
159 }
160
161 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
162 http.Error(w, "Invalid request", http.StatusBadRequest)
163 return
164 }
165
166 if err := f.ProcessUpload(r.Context(), req.Key); err != nil {
167 http.Error(w, err.Error(), http.StatusInternalServerError)
168 return
169 }
170
171 w.WriteHeader(http.StatusOK)
172 json.NewEncoder(w).Encode(map[string]string{"status": "processed"})
173}
174
175func main() {
176 ctx := context.Background()
177
178 storage, _ := NewS3Storage(ctx, "my-uploads-bucket")
179 service := NewFileUploadService(storage)
180
181 http.HandleFunc("/upload-url", service.HandleGetUploadURL)
182 http.HandleFunc("/process", service.HandleProcessUpload)
183
184 log.Println("Server starting on :8080")
185 http.ListenAndServe(":8080", nil)
186}
Exercise 2: Multi-Cloud Storage Abstraction
Learning Objective: Design and implement a unified storage interface that abstracts away the differences between major cloud storage providers while enabling multi-cloud strategies.
Real-World Context: Multi-cloud storage strategies are essential for enterprise risk management at companies like Netflix and Spotify, where avoiding vendor lock-in and ensuring business continuity are critical.
Difficulty: Advanced | Time Estimate: 70-85 minutes
Objective: Create a storage abstraction that works seamlessly across S3, GCS, and Azure Blob Storage.
Solution
1package main
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8)
9
10// CloudStorage provides cloud-agnostic storage interface
11type CloudStorage interface {
12 Upload(ctx context.Context, key string, data io.Reader) error
13 Download(ctx context.Context, key string)
14 Delete(ctx context.Context, key string) error
15 List(ctx context.Context, prefix string)
16 GetSignedURL(ctx context.Context, key string, expiry time.Duration)
17}
18
19// MultiCloudStorage manages multiple storage backends
20type MultiCloudStorage struct {
21 primary CloudStorage
22 secondary CloudStorage
23 replicate bool
24}
25
26func NewMultiCloudStorage(primary, secondary CloudStorage, replicate bool) *MultiCloudStorage {
27 return &MultiCloudStorage{
28 primary: primary,
29 secondary: secondary,
30 replicate: replicate,
31 }
32}
33
34// Upload uploads to primary and optionally replicates to secondary
35func Upload(ctx context.Context, key string, data io.Reader) error {
36 // For replication, we need to read data twice
37 var buf *bytes.Buffer
38 if m.replicate && m.secondary != nil {
39 buf = &bytes.Buffer{}
40 data = io.TeeReader(data, buf)
41 }
42
43 // Upload to primary
44 if err := m.primary.Upload(ctx, key, data); err != nil {
45 return fmt.Errorf("primary upload failed: %w", err)
46 }
47
48 // Replicate to secondary
49 if m.replicate && m.secondary != nil {
50 go func() {
51 ctx := context.Background()
52 if err := m.secondary.Upload(ctx, key, buf); err != nil {
53 log.Printf("Secondary upload failed: %v", err)
54 }
55 }()
56 }
57
58 return nil
59}
60
61// Download tries primary first, falls back to secondary
62func Download(ctx context.Context, key string) {
63 data, err := m.primary.Download(ctx, key)
64 if err == nil {
65 return data, nil
66 }
67
68 log.Printf("Primary download failed, trying secondary: %v", err)
69
70 if m.secondary != nil {
71 return m.secondary.Download(ctx, key)
72 }
73
74 return nil, err
75}
76
77// Delete deletes from both storages
78func Delete(ctx context.Context, key string) error {
79 err1 := m.primary.Delete(ctx, key)
80
81 if m.secondary != nil {
82 err2 := m.secondary.Delete(ctx, key)
83 if err2 != nil {
84 log.Printf("Secondary delete failed: %v", err2)
85 }
86 }
87
88 return err1
89}
90
91func main() {
92 ctx := context.Background()
93
94 // Setup S3 as primary
95 s3Storage, _ := NewS3Storage(ctx, "primary-bucket")
96 s3Adapter := &S3Adapter{storage: s3Storage}
97
98 // Setup GCS as secondary
99 gcsStorage, _ := NewGCSStorage(ctx, "secondary-bucket")
100 gcsAdapter := &GCSAdapter{storage: gcsStorage}
101
102 // Create multi-cloud storage with replication
103 storage := NewMultiCloudStorage(s3Adapter, gcsAdapter, true)
104
105 // Use unified interface
106 data := bytes.NewReader([]byte("Hello, multi-cloud!"))
107 storage.Upload(ctx, "test.txt", data)
108
109 downloaded, _ := storage.Download(ctx, "test.txt")
110 fmt.Printf("Downloaded: %s\n", string(downloaded))
111}
Exercise 3: Presigned URL Generator with Advanced Security
Learning Objective: Master secure direct-to-cloud upload patterns by building a comprehensive presigned URL generation system with advanced security features.
Real-World Context: Presigned URLs enable efficient file uploads at companies like Facebook and Google, where billions of files are uploaded directly to cloud storage without burdening application servers.
Difficulty: Advanced | Time Estimate: 75-90 minutes
Objective: Build a comprehensive presigned URL generator supporting S3, GCS, and Azure Blob with custom headers, expiration, and security validation.
Solution with Explanation
1// run
2package main
3
4import (
5 "context"
6 "crypto/hmac"
7 "crypto/sha256"
8 "encoding/base64"
9 "fmt"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/aws/aws-sdk-go-v2/aws"
15 "github.com/aws/aws-sdk-go-v2/config"
16 "github.com/aws/aws-sdk-go-v2/service/s3"
17)
18
19// PresignedURLGenerator generates presigned URLs for multiple cloud providers
20type PresignedURLGenerator struct {
21 s3Client *s3.Client
22 bucket string
23}
24
25func NewPresignedURLGenerator(ctx context.Context, bucket string) {
26 cfg, err := config.LoadDefaultConfig(ctx)
27 if err != nil {
28 return nil, fmt.Errorf("failed to load config: %w", err)
29 }
30
31 return &PresignedURLGenerator{
32 s3Client: s3.NewFromConfig(cfg),
33 bucket: bucket,
34 }, nil
35}
36
37// PresignConfig configures presigned URL generation
38type PresignConfig struct {
39 Key string
40 Expiry time.Duration
41 ContentType string
42 CustomHeaders map[string]string
43 MaxSizeBytes int64
44 AllowedOrigins []string
45 Operation string // "upload" or "download"
46}
47
48// PresignedURLResponse contains the generated URL and metadata
49type PresignedURLResponse struct {
50 URL string `json:"url"`
51 Key string `json:"key"`
52 ExpiresAt time.Time `json:"expires_at"`
53 Headers map[string]string `json:"headers"`
54 Method string `json:"method"`
55 MaxSize int64 `json:"max_size,omitempty"`
56 Checksum string `json:"checksum"`
57}
58
59// GenerateUploadURL generates presigned URL for uploads
60func GenerateUploadURL(ctx context.Context, cfg PresignConfig) {
61 // Validate configuration
62 if err := p.validateConfig(cfg); err != nil {
63 return nil, fmt.Errorf("invalid config: %w", err)
64 }
65
66 presignClient := s3.NewPresignClient(p.s3Client)
67
68 // Build PutObject input with constraints
69 input := &s3.PutObjectInput{
70 Bucket: aws.String(p.bucket),
71 Key: aws.String(cfg.Key),
72 }
73
74 // Set content-type constraint
75 if cfg.ContentType != "" {
76 input.ContentType = aws.String(cfg.ContentType)
77 }
78
79 // Add metadata for validation
80 metadata := make(map[string]string)
81 if cfg.MaxSizeBytes > 0 {
82 metadata["max-size"] = fmt.Sprintf("%d", cfg.MaxSizeBytes)
83 }
84 metadata["upload-token"] = p.generateUploadToken(cfg.Key)
85 input.Metadata = metadata
86
87 // Generate presigned URL
88 request, err := presignClient.PresignPutObject(ctx, input,
89 s3.WithPresignExpires(cfg.Expiry))
90 if err != nil {
91 return nil, fmt.Errorf("failed to create presigned URL: %w", err)
92 }
93
94 // Build response headers
95 headers := make(map[string]string)
96 if cfg.ContentType != "" {
97 headers["Content-Type"] = cfg.ContentType
98 }
99 for k, v := range cfg.CustomHeaders {
100 headers[k] = v
101 }
102
103 // Add security headers
104 headers["x-amz-server-side-encryption"] = "AES256"
105
106 return &PresignedURLResponse{
107 URL: request.URL,
108 Key: cfg.Key,
109 ExpiresAt: time.Now().Add(cfg.Expiry),
110 Headers: headers,
111 Method: "PUT",
112 MaxSize: cfg.MaxSizeBytes,
113 Checksum: p.generateChecksum(cfg.Key, cfg.Expiry),
114 }, nil
115}
116
117// validateConfig validates presign configuration
118func validateConfig(cfg PresignConfig) error {
119 if cfg.Key == "" {
120 return fmt.Errorf("key is required")
121 }
122
123 if cfg.Expiry < time.Minute || cfg.Expiry > 7*24*time.Hour {
124 return fmt.Errorf("expiry must be between 1 minute and 7 days")
125 }
126
127 if cfg.MaxSizeBytes > 5*1024*1024*1024 { // 5GB S3 limit
128 return fmt.Errorf("max size exceeds S3 limit of 5GB")
129 }
130
131 // Validate content type if specified
132 if cfg.ContentType != "" && !isValidContentType(cfg.ContentType) {
133 return fmt.Errorf("invalid content type: %s", cfg.ContentType)
134 }
135
136 return nil
137}
138
139func isValidContentType(ct string) bool {
140 validTypes := []string{
141 "image/jpeg", "image/png", "image/gif", "image/webp",
142 "video/mp4", "video/webm",
143 "application/pdf", "application/json",
144 "text/plain", "text/csv",
145 }
146 for _, valid := range validTypes {
147 if ct == valid {
148 return true
149 }
150 }
151 return false
152}
153
154// generateUploadToken generates a verification token
155func generateUploadToken(key string) string {
156 h := sha256.New()
157 h.Write([]byte(key + time.Now().Format(time.RFC3339)))
158 return base64.URLEncoding.EncodeToString(h.Sum(nil))[:16]
159}
160
161// generateChecksum generates URL integrity checksum
162func generateChecksum(key string, expiry time.Duration) string {
163 data := fmt.Sprintf("%s:%s", key, expiry)
164 h := hmac.New(sha256.New, []byte("secret-key"))
165 h.Write([]byte(data))
166 return base64.StdEncoding.EncodeToString(h.Sum(nil))[:12]
167}
168
169// Example usage demonstrating direct browser uploads
170func main() {
171 ctx := context.Background()
172
173 generator, err := NewPresignedURLGenerator(ctx, "my-upload-bucket")
174 if err != nil {
175 fmt.Printf("Error creating generator: %v\n", err)
176 return
177 }
178
179 // Example 1: Generate upload URL for image
180 uploadConfig := PresignConfig{
181 Key: "uploads/profile-photo.jpg",
182 Expiry: 15 * time.Minute,
183 ContentType: "image/jpeg",
184 MaxSizeBytes: 5 * 1024 * 1024, // 5MB
185 AllowedOrigins: []string{"https://myapp.com"},
186 Operation: "upload",
187 }
188
189 uploadURL, err := generator.GenerateUploadURL(ctx, uploadConfig)
190 if err != nil {
191 fmt.Printf("Error generating upload URL: %v\n", err)
192 return
193 }
194
195 fmt.Printf("Upload URL Generated:\n")
196 fmt.Printf(" URL: %s\n", uploadURL.URL[:80]+"...")
197 fmt.Printf(" Method: %s\n", uploadURL.Method)
198 fmt.Printf(" Expires: %v\n", uploadURL.ExpiresAt)
199 fmt.Printf(" Max Size: %d bytes\n", uploadURL.MaxSize)
200 fmt.Printf(" Required Headers: %v\n", uploadURL.Headers)
201
202 // Example 4: Browser upload example
203 fmt.Printf("\n--- Browser Upload Example ---\n")
204 fmt.Printf(`
205// JavaScript code for direct browser upload
206async function uploadFile(file) {
207 // 1. Request presigned URL from your backend
208 const response = await fetch('/api/presigned-url', {
209 method: 'POST',
210 headers: { 'Content-Type': 'application/json' },
211 body: JSON.stringify({
212 filename: file.name,
213 contentType: file.type,
214 size: file.size
215 })
216 });
217
218 const { url, headers } = await response.json();
219
220 // 2. Upload directly to S3 using presigned URL
221 const uploadResponse = await fetch(url, {
222 method: 'PUT',
223 headers: headers,
224 body: file
225 });
226
227 if {
228 console.log('Upload successful!');
229 }
230}
231`)
232}
Explanation:
This presigned URL generator provides production-ready features:
- Multi-Operation Support: Handles both upload and download URLs with appropriate HTTP methods and constraints
- Security Validation: Validates content types, size limits, expiration bounds, and URL integrity
- Custom Headers: Supports custom headers for CORS, content-type enforcement, and metadata
- Upload Tokens: Generates verification tokens to prevent unauthorized uploads
- URL Validation: Verifies presigned URLs before use to prevent security issues
- Checksum Generation: Creates integrity checksums for URL validation
Production Considerations:
- Expiration Limits: URLs expire between 1 minute and 7 days to balance usability and security
- Size Constraints: Enforces S3's 5GB single-object limit
- Content-Type Enforcement: Validates allowed file types to prevent malicious uploads
- HTTPS Only: Ensures all presigned URLs use secure transport
- Token-Based Auth: Upload tokens provide additional security layer
- Browser-Friendly: Designed for direct browser uploads, reducing server load
- CORS Support: Includes proper CORS headers for cross-origin uploads
The browser example shows how clients can upload directly to S3 without proxying through your backend, significantly reducing bandwidth costs and improving upload performance.
Exercise 4: Cost-Optimized Storage with Intelligent Lifecycle Management
Learning Objective: Build a comprehensive storage cost optimization system that automatically transitions objects between storage classes based on access patterns and implements intelligent lifecycle policies.
Real-World Context: Companies like Airbnb and Pinterest manage petabytes of user-generated content, where storage costs can run into millions of dollars annually. Intelligent lifecycle management can reduce these costs by 70-80% while maintaining required access performance.
Difficulty: Advanced | Time Estimate: 80-95 minutes
Objective: Create a storage lifecycle manager that monitors access patterns, automatically transitions objects between storage tiers, and implements cost-aware deletion policies.
Show Solution
1package exercise
2
3import (
4 "context"
5 "fmt"
6 "time"
7)
8
9// LifecycleManager handles intelligent storage lifecycle transitions
10type LifecycleManager struct {
11 storage CloudStorage
12 analytics *AccessAnalytics
13 costCalculator *CostCalculator
14 policies []LifecyclePolicy
15}
16
17type CloudStorage interface {
18 GetObjectMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error)
19 TransitionStorageClass(ctx context.Context, bucket, key string, class StorageClass) error
20 DeleteObject(ctx context.Context, bucket, key string) error
21 ListObjects(ctx context.Context, bucket, prefix string) ([]ObjectInfo, error)
22}
23
24type StorageClass string
25
26const (
27 StorageClassStandard StorageClass = "STANDARD"
28 StorageClassIA StorageClass = "STANDARD_IA"
29 StorageClassGlacier StorageClass = "GLACIER"
30 StorageClassDeepArchive StorageClass = "DEEP_ARCHIVE"
31)
32
33type LifecyclePolicy struct {
34 Name string
35 Prefix string
36 MinAge time.Duration
37 TargetClass StorageClass
38 DeleteAfter time.Duration
39 AccessThreshold int // Min accesses to stay in current class
40 CostThreshold float64
41}
42
43type ObjectMetadata struct {
44 Key string
45 Size int64
46 StorageClass StorageClass
47 LastModified time.Time
48 LastAccessed time.Time
49 AccessCount int
50}
51
52type ObjectInfo struct {
53 Key string
54 Size int64
55 LastModified time.Time
56 StorageClass StorageClass
57}
58
59func NewLifecycleManager(storage CloudStorage) *LifecycleManager {
60 return &LifecycleManager{
61 storage: storage,
62 analytics: NewAccessAnalytics(),
63 costCalculator: NewCostCalculator(),
64 policies: make([]LifecyclePolicy, 0),
65 }
66}
67
68// AddPolicy adds a lifecycle policy
69func (lm *LifecycleManager) AddPolicy(policy LifecyclePolicy) {
70 lm.policies = append(lm.policies, policy)
71}
72
73// ApplyPolicies runs lifecycle policies on a bucket
74func (lm *LifecycleManager) ApplyPolicies(ctx context.Context, bucket string) (*LifecycleReport, error) {
75 report := &LifecycleReport{
76 StartTime: time.Now(),
77 Actions: make([]LifecycleAction, 0),
78 }
79
80 for _, policy := range lm.policies {
81 objects, err := lm.storage.ListObjects(ctx, bucket, policy.Prefix)
82 if err != nil {
83 return nil, err
84 }
85
86 for _, obj := range objects {
87 action, err := lm.evaluateObject(ctx, bucket, obj, policy)
88 if err != nil {
89 continue
90 }
91
92 if action != nil {
93 report.Actions = append(report.Actions, *action)
94 }
95 }
96 }
97
98 report.EndTime = time.Now()
99 report.Duration = report.EndTime.Sub(report.StartTime)
100 lm.calculateSavings(report)
101
102 return report, nil
103}
104
105// evaluateObject evaluates a single object against a policy
106func (lm *LifecycleManager) evaluateObject(ctx context.Context, bucket string, obj ObjectInfo, policy LifecyclePolicy) (*LifecycleAction, error) {
107 // Get detailed metadata
108 metadata, err := lm.storage.GetObjectMetadata(ctx, bucket, obj.Key)
109 if err != nil {
110 return nil, err
111 }
112
113 // Get access analytics
114 accessPattern := lm.analytics.GetAccessPattern(obj.Key)
115
116 now := time.Now()
117 age := now.Sub(metadata.LastModified)
118
119 // Check if object should be deleted
120 if policy.DeleteAfter > 0 && age > policy.DeleteAfter {
121 if accessPattern.RecentAccesses < policy.AccessThreshold {
122 err := lm.storage.DeleteObject(ctx, bucket, obj.Key)
123 if err != nil {
124 return nil, err
125 }
126
127 return &LifecycleAction{
128 ObjectKey: obj.Key,
129 Action: ActionDelete,
130 OldClass: metadata.StorageClass,
131 Size: metadata.Size,
132 EstimatedSavings: lm.costCalculator.CalculateDeleteSavings(metadata),
133 }, nil
134 }
135 }
136
137 // Check if object should transition
138 if age > policy.MinAge && metadata.StorageClass != policy.TargetClass {
139 // Calculate cost benefit
140 currentCost := lm.costCalculator.CalculateMonthlyCost(metadata.Size, metadata.StorageClass, accessPattern)
141 newCost := lm.costCalculator.CalculateMonthlyCost(metadata.Size, policy.TargetClass, accessPattern)
142
143 if newCost < currentCost {
144 err := lm.storage.TransitionStorageClass(ctx, bucket, obj.Key, policy.TargetClass)
145 if err != nil {
146 return nil, err
147 }
148
149 return &LifecycleAction{
150 ObjectKey: obj.Key,
151 Action: ActionTransition,
152 OldClass: metadata.StorageClass,
153 NewClass: policy.TargetClass,
154 Size: metadata.Size,
155 EstimatedSavings: currentCost - newCost,
156 }, nil
157 }
158 }
159
160 return nil, nil
161}
162
163type LifecycleAction struct {
164 ObjectKey string
165 Action ActionType
166 OldClass StorageClass
167 NewClass StorageClass
168 Size int64
169 EstimatedSavings float64
170}
171
172type ActionType string
173
174const (
175 ActionTransition ActionType = "transition"
176 ActionDelete ActionType = "delete"
177)
178
179type LifecycleReport struct {
180 StartTime time.Time
181 EndTime time.Time
182 Duration time.Duration
183 Actions []LifecycleAction
184 TotalSavings float64
185 ObjectsTransitioned int
186 ObjectsDeleted int
187 BytesFreed int64
188}
189
190func (lm *LifecycleManager) calculateSavings(report *LifecycleReport) {
191 totalSavings := 0.0
192 objectsTransitioned := 0
193 objectsDeleted := 0
194 bytesFreed := int64(0)
195
196 for _, action := range report.Actions {
197 totalSavings += action.EstimatedSavings
198
199 switch action.Action {
200 case ActionTransition:
201 objectsTransitioned++
202 case ActionDelete:
203 objectsDeleted++
204 bytesFreed += action.Size
205 }
206 }
207
208 report.TotalSavings = totalSavings
209 report.ObjectsTransitioned = objectsTransitioned
210 report.ObjectsDeleted = objectsDeleted
211 report.BytesFreed = bytesFreed
212}
213
214// AccessAnalytics tracks access patterns
215type AccessAnalytics struct {
216 patterns map[string]*AccessPattern
217 mu sync.RWMutex
218}
219
220type AccessPattern struct {
221 TotalAccesses int
222 RecentAccesses int // Last 30 days
223 LastAccess time.Time
224 AvgAccessRate float64 // Accesses per day
225}
226
227func NewAccessAnalytics() *AccessAnalytics {
228 return &AccessAnalytics{
229 patterns: make(map[string]*AccessPattern),
230 }
231}
232
233func (aa *AccessAnalytics) GetAccessPattern(key string) *AccessPattern {
234 aa.mu.RLock()
235 defer aa.mu.RUnlock()
236
237 pattern, exists := aa.patterns[key]
238 if !exists {
239 return &AccessPattern{
240 TotalAccesses: 0,
241 RecentAccesses: 0,
242 LastAccess: time.Time{},
243 AvgAccessRate: 0,
244 }
245 }
246
247 return pattern
248}
249
250func (aa *AccessAnalytics) RecordAccess(key string) {
251 aa.mu.Lock()
252 defer aa.mu.Unlock()
253
254 pattern, exists := aa.patterns[key]
255 if !exists {
256 pattern = &AccessPattern{}
257 aa.patterns[key] = pattern
258 }
259
260 pattern.TotalAccesses++
261 pattern.RecentAccesses++
262 pattern.LastAccess = time.Now()
263
264 // Update average access rate
265 // Simple calculation - in production, use more sophisticated analytics
266 pattern.AvgAccessRate = float64(pattern.TotalAccesses) / 30.0
267}
268
269// CostCalculator calculates storage costs
270type CostCalculator struct {
271 // Cost per GB per month
272 storageCosts map[StorageClass]float64
273
274 // Cost per 1000 requests
275 requestCosts map[StorageClass]float64
276}
277
278func NewCostCalculator() *CostCalculator {
279 return &CostCalculator{
280 storageCosts: map[StorageClass]float64{
281 StorageClassStandard: 0.023,
282 StorageClassIA: 0.0125,
283 StorageClassGlacier: 0.004,
284 StorageClassDeepArchive: 0.00099,
285 },
286 requestCosts: map[StorageClass]float64{
287 StorageClassStandard: 0.0004,
288 StorageClassIA: 0.001,
289 StorageClassGlacier: 0.05,
290 StorageClassDeepArchive: 0.10,
291 },
292 }
293}
294
295func (cc *CostCalculator) CalculateMonthlyCost(sizeBytes int64, class StorageClass, pattern *AccessPattern) float64 {
296 sizeGB := float64(sizeBytes) / (1024 * 1024 * 1024)
297
298 // Storage cost
299 storageCost := sizeGB * cc.storageCosts[class]
300
301 // Request cost (estimated based on access pattern)
302 requestCost := (pattern.AvgAccessRate * 30 / 1000) * cc.requestCosts[class]
303
304 return storageCost + requestCost
305}
306
307func (cc *CostCalculator) CalculateDeleteSavings(metadata *ObjectMetadata) float64 {
308 sizeGB := float64(metadata.Size) / (1024 * 1024 * 1024)
309 return sizeGB * cc.storageCosts[metadata.StorageClass]
310}
311
312// Example usage
313func main() {
314 storage := &MockCloudStorage{}
315 manager := NewLifecycleManager(storage)
316
317 // Add policies
318 manager.AddPolicy(LifecyclePolicy{
319 Name: "transition-to-ia",
320 Prefix: "images/",
321 MinAge: 30 * 24 * time.Hour, // 30 days
322 TargetClass: StorageClassIA,
323 AccessThreshold: 10,
324 })
325
326 manager.AddPolicy(LifecyclePolicy{
327 Name: "archive-old-logs",
328 Prefix: "logs/",
329 MinAge: 90 * 24 * time.Hour, // 90 days
330 TargetClass: StorageClassGlacier,
331 DeleteAfter: 365 * 24 * time.Hour, // 1 year
332 AccessThreshold: 0,
333 })
334
335 // Apply policies
336 ctx := context.Background()
337 report, err := manager.ApplyPolicies(ctx, "my-bucket")
338 if err != nil {
339 fmt.Printf("Error: %v\n", err)
340 return
341 }
342
343 // Print report
344 fmt.Printf("Lifecycle Management Report\n")
345 fmt.Printf("Duration: %v\n", report.Duration)
346 fmt.Printf("Objects Transitioned: %d\n", report.ObjectsTransitioned)
347 fmt.Printf("Objects Deleted: %d\n", report.ObjectsDeleted)
348 fmt.Printf("Bytes Freed: %d GB\n", report.BytesFreed/(1024*1024*1024))
349 fmt.Printf("Estimated Monthly Savings: $%.2f\n", report.TotalSavings)
350}
351
352type MockCloudStorage struct{}
353
354func (m *MockCloudStorage) GetObjectMetadata(ctx context.Context, bucket, key string) (*ObjectMetadata, error) {
355 return &ObjectMetadata{
356 Key: key,
357 Size: 1024 * 1024 * 100, // 100MB
358 StorageClass: StorageClassStandard,
359 LastModified: time.Now().Add(-60 * 24 * time.Hour),
360 }, nil
361}
362
363func (m *MockCloudStorage) TransitionStorageClass(ctx context.Context, bucket, key string, class StorageClass) error {
364 return nil
365}
366
367func (m *MockCloudStorage) DeleteObject(ctx context.Context, bucket, key string) error {
368 return nil
369}
370
371func (m *MockCloudStorage) ListObjects(ctx context.Context, bucket, prefix string) ([]ObjectInfo, error) {
372 return []ObjectInfo{
373 {Key: "example.jpg", Size: 1024 * 1024 * 100, LastModified: time.Now().Add(-60 * 24 * time.Hour)},
374 }, nil
375}
Key Learning Points:
- Access Pattern Analysis: Tracks object access patterns to inform lifecycle decisions
- Cost Calculation: Computes actual costs for different storage classes including request costs
- Intelligent Transitions: Makes data-driven decisions about storage class transitions
- Policy-Based Management: Flexible policy engine for different data types and use cases
- Reporting: Comprehensive reporting on actions taken and cost savings achieved
- Production Patterns: Implements asynchronous processing, proper error handling, and monitoring
Exercise 5: Resilient Multi-Region Storage with Automatic Failover
Learning Objective: Design and implement a production-grade multi-region storage system with automatic failover, data replication, and geo-distributed reads.
Real-World Context: Global applications like GitHub and Zoom require storage systems that can survive regional outages, provide low-latency access worldwide, and maintain data consistency across regions.
Difficulty: Expert | Time Estimate: 95-120 minutes
Objective: Build a complete multi-region storage system with replication strategies, health monitoring, automatic failover, and conflict resolution.
Show Solution
1package exercise
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8)
9
10// MultiRegionStorage manages storage across multiple geographic regions
11type MultiRegionStorage struct {
12 regions map[string]*RegionConfig
13 primary string
14 replication ReplicationStrategy
15 healthMonitor *HealthMonitor
16 failoverMgr *FailoverManager
17 mu sync.RWMutex
18}
19
20type RegionConfig struct {
21 Name string
22 Storage CloudStorage
23 Endpoint string
24 IsHealthy bool
25 IsPrimary bool
26 Latency time.Duration
27 ErrorRate float64
28}
29
30type ReplicationStrategy string
31
32const (
33 ReplicationSync ReplicationStrategy = "sync"
34 ReplicationAsync ReplicationStrategy = "async"
35 ReplicationPrimaryOnly ReplicationStrategy = "primary_only"
36)
37
38func NewMultiRegionStorage(primaryRegion string, replication ReplicationStrategy) *MultiRegionStorage {
39 mrs := &MultiRegionStorage{
40 regions: make(map[string]*RegionConfig),
41 primary: primaryRegion,
42 replication: replication,
43 }
44
45 mrs.healthMonitor = NewHealthMonitor(mrs)
46 mrs.failoverMgr = NewFailoverManager(mrs)
47
48 // Start background health monitoring
49 go mrs.healthMonitor.Start(context.Background())
50
51 return mrs
52}
53
54// AddRegion registers a new storage region
55func (mrs *MultiRegionStorage) AddRegion(region *RegionConfig) {
56 mrs.mu.Lock()
57 defer mrs.mu.Unlock()
58
59 region.IsPrimary = (region.Name == mrs.primary)
60 region.IsHealthy = true
61 mrs.regions[region.Name] = region
62}
63
64// Upload with multi-region replication
65func (mrs *MultiRegionStorage) Upload(ctx context.Context, key string, data []byte) error {
66 mrs.mu.RLock()
67 primaryRegion := mrs.regions[mrs.primary]
68 mrs.mu.RUnlock()
69
70 if primaryRegion == nil || !primaryRegion.IsHealthy {
71 // Primary unavailable, failover
72 return mrs.uploadWithFailover(ctx, key, data)
73 }
74
75 // Upload to primary
76 start := time.Now()
77 err := primaryRegion.Storage.Put(ctx, key, data)
78 primaryRegion.Latency = time.Since(start)
79
80 if err != nil {
81 primaryRegion.ErrorRate += 0.1
82 return mrs.uploadWithFailover(ctx, key, data)
83 }
84
85 // Handle replication based on strategy
86 switch mrs.replication {
87 case ReplicationSync:
88 return mrs.replicateSync(ctx, key, data, primaryRegion.Name)
89 case ReplicationAsync:
90 go mrs.replicateAsync(key, data, primaryRegion.Name)
91 return nil
92 case ReplicationPrimaryOnly:
93 return nil
94 }
95
96 return nil
97}
98
99// uploadWithFailover attempts upload to healthy regions
100func (mrs *MultiRegionStorage) uploadWithFailover(ctx context.Context, key string, data []byte) error {
101 mrs.mu.RLock()
102 defer mrs.mu.RUnlock()
103
104 // Try each healthy region
105 for _, region := range mrs.regions {
106 if !region.IsHealthy || region.Name == mrs.primary {
107 continue
108 }
109
110 err := region.Storage.Put(ctx, key, data)
111 if err == nil {
112 // Successful upload to secondary
113 fmt.Printf("Failover upload to %s successful\n", region.Name)
114
115 // Async replicate to other regions
116 go mrs.replicateAsync(key, data, region.Name)
117
118 return nil
119 }
120 }
121
122 return fmt.Errorf("all regions unavailable")
123}
124
125// replicateSync synchronously replicates to all regions
126func (mrs *MultiRegionStorage) replicateSync(ctx context.Context, key string, data []byte, excludeRegion string) error {
127 mrs.mu.RLock()
128 defer mrs.mu.RUnlock()
129
130 var wg sync.WaitGroup
131 errChan := make(chan error, len(mrs.regions))
132
133 for name, region := range mrs.regions {
134 if name == excludeRegion || !region.IsHealthy {
135 continue
136 }
137
138 wg.Add(1)
139 go func(r *RegionConfig) {
140 defer wg.Done()
141
142 err := r.Storage.Put(ctx, key, data)
143 if err != nil {
144 errChan <- fmt.Errorf("%s: %w", r.Name, err)
145 }
146 }(region)
147 }
148
149 wg.Wait()
150 close(errChan)
151
152 // Collect errors
153 var errs []error
154 for err := range errChan {
155 errs = append(errs, err)
156 }
157
158 if len(errs) > 0 {
159 return fmt.Errorf("replication errors: %v", errs)
160 }
161
162 return nil
163}
164
165// replicateAsync asynchronously replicates to all regions
166func (mrs *MultiRegionStorage) replicateAsync(key string, data []byte, excludeRegion string) {
167 ctx := context.Background()
168
169 mrs.mu.RLock()
170 defer mrs.mu.RUnlock()
171
172 for name, region := range mrs.regions {
173 if name == excludeRegion || !region.IsHealthy {
174 continue
175 }
176
177 go func(r *RegionConfig) {
178 if err := r.Storage.Put(ctx, key, data); err != nil {
179 fmt.Printf("Async replication to %s failed: %v\n", r.Name, err)
180 }
181 }(region)
182 }
183}
184
185// Download with intelligent region selection
186func (mrs *MultiRegionStorage) Download(ctx context.Context, key string) ([]byte, error) {
187 // Select best region based on health and latency
188 region := mrs.selectBestRegion()
189
190 if region == nil {
191 return nil, fmt.Errorf("no healthy regions available")
192 }
193
194 start := time.Now()
195 data, err := region.Storage.Get(ctx, key)
196 region.Latency = time.Since(start)
197
198 if err != nil {
199 region.ErrorRate += 0.1
200
201 // Try other regions
202 return mrs.downloadWithFailover(ctx, key, region.Name)
203 }
204
205 region.ErrorRate *= 0.9 // Decay error rate
206 return data, nil
207}
208
209// downloadWithFailover tries other regions on failure
210func (mrs *MultiRegionStorage) downloadWithFailover(ctx context.Context, key string, excludeRegion string) ([]byte, error) {
211 mrs.mu.RLock()
212 defer mrs.mu.RUnlock()
213
214 for name, region := range mrs.regions {
215 if name == excludeRegion || !region.IsHealthy {
216 continue
217 }
218
219 data, err := region.Storage.Get(ctx, key)
220 if err == nil {
221 return data, nil
222 }
223 }
224
225 return nil, fmt.Errorf("object not found in any region")
226}
227
228// selectBestRegion chooses the optimal region for reads
229func (mrs *MultiRegionStorage) selectBestRegion() *RegionConfig {
230 mrs.mu.RLock()
231 defer mrs.mu.RUnlock()
232
233 var best *RegionConfig
234 bestScore := -1.0
235
236 for _, region := range mrs.regions {
237 if !region.IsHealthy {
238 continue
239 }
240
241 // Calculate score based on latency and error rate
242 latencyScore := 1.0 - (float64(region.Latency) / float64(time.Second))
243 errorScore := 1.0 - region.ErrorRate
244
245 score := (latencyScore * 0.6) + (errorScore * 0.4)
246
247 // Prefer primary region slightly
248 if region.IsPrimary {
249 score *= 1.1
250 }
251
252 if best == nil || score > bestScore {
253 best = region
254 bestScore = score
255 }
256 }
257
258 return best
259}
260
261// HealthMonitor monitors region health
262type HealthMonitor struct {
263 storage *MultiRegionStorage
264 interval time.Duration
265}
266
267func NewHealthMonitor(storage *MultiRegionStorage) *HealthMonitor {
268 return &HealthMonitor{
269 storage: storage,
270 interval: 30 * time.Second,
271 }
272}
273
274func (hm *HealthMonitor) Start(ctx context.Context) {
275 ticker := time.NewTicker(hm.interval)
276 defer ticker.Stop()
277
278 for {
279 select {
280 case <-ctx.Done():
281 return
282 case <-ticker.C:
283 hm.checkHealth()
284 }
285 }
286}
287
288func (hm *HealthMonitor) checkHealth() {
289 hm.storage.mu.RLock()
290 regions := make([]*RegionConfig, 0, len(hm.storage.regions))
291 for _, region := range hm.storage.regions {
292 regions = append(regions, region)
293 }
294 hm.storage.mu.RUnlock()
295
296 for _, region := range regions {
297 go func(r *RegionConfig) {
298 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
299 defer cancel()
300
301 // Perform health check (e.g., HEAD request to known object)
302 start := time.Now()
303 _, err := r.Storage.Get(ctx, "_health_check")
304 latency := time.Since(start)
305
306 hm.storage.mu.Lock()
307 defer hm.storage.mu.Unlock()
308
309 if err != nil {
310 r.IsHealthy = false
311 r.ErrorRate += 0.2
312 } else {
313 r.IsHealthy = true
314 r.Latency = latency
315 r.ErrorRate *= 0.8
316 }
317 }(region)
318 }
319}
320
321// FailoverManager handles automatic failovers
322type FailoverManager struct {
323 storage *MultiRegionStorage
324 failoverHistory []FailoverEvent
325 mu sync.Mutex
326}
327
328type FailoverEvent struct {
329 Timestamp time.Time
330 FromRegion string
331 ToRegion string
332 Reason string
333}
334
335func NewFailoverManager(storage *MultiRegionStorage) *FailoverManager {
336 return &FailoverManager{
337 storage: storage,
338 failoverHistory: make([]FailoverEvent, 0),
339 }
340}
341
342func (fm *FailoverManager) TriggerFailover(fromRegion, toRegion, reason string) error {
343 fm.mu.Lock()
344 defer fm.mu.Unlock()
345
346 fm.storage.mu.Lock()
347 defer fm.storage.mu.Unlock()
348
349 targetRegion := fm.storage.regions[toRegion]
350 if targetRegion == nil || !targetRegion.IsHealthy {
351 return fmt.Errorf("target region %s not available", toRegion)
352 }
353
354 // Update primary
355 oldPrimary := fm.storage.regions[fm.storage.primary]
356 if oldPrimary != nil {
357 oldPrimary.IsPrimary = false
358 }
359
360 fm.storage.primary = toRegion
361 targetRegion.IsPrimary = true
362
363 // Record failover
364 event := FailoverEvent{
365 Timestamp: time.Now(),
366 FromRegion: fromRegion,
367 ToRegion: toRegion,
368 Reason: reason,
369 }
370 fm.failoverHistory = append(fm.failoverHistory, event)
371
372 fmt.Printf("Failover: %s -> %s (reason: %s)\n", fromRegion, toRegion, reason)
373
374 return nil
375}
376
377func (fm *FailoverManager) GetFailoverHistory() []FailoverEvent {
378 fm.mu.Lock()
379 defer fm.mu.Unlock()
380 return fm.failoverHistory
381}
382
383// Example usage
384func main() {
385 storage := NewMultiRegionStorage("us-east-1", ReplicationAsync)
386
387 // Add regions
388 storage.AddRegion(&RegionConfig{
389 Name: "us-east-1",
390 Storage: &MockStorage{},
391 Endpoint: "https://us-east-1.storage.example.com",
392 })
393
394 storage.AddRegion(&RegionConfig{
395 Name: "us-west-2",
396 Storage: &MockStorage{},
397 Endpoint: "https://us-west-2.storage.example.com",
398 })
399
400 storage.AddRegion(&RegionConfig{
401 Name: "eu-west-1",
402 Storage: &MockStorage{},
403 Endpoint: "https://eu-west-1.storage.example.com",
404 })
405
406 ctx := context.Background()
407
408 // Upload
409 err := storage.Upload(ctx, "example.jpg", []byte("data"))
410 if err != nil {
411 fmt.Printf("Upload failed: %v\n", err)
412 } else {
413 fmt.Println("Upload successful with multi-region replication")
414 }
415
416 // Download from best region
417 data, err := storage.Download(ctx, "example.jpg")
418 if err != nil {
419 fmt.Printf("Download failed: %v\n", err)
420 } else {
421 fmt.Printf("Downloaded %d bytes from optimal region\n", len(data))
422 }
423}
424
425type MockStorage struct{}
426
427func (m *MockStorage) Get(ctx context.Context, key string) ([]byte, error) {
428 return []byte("mock data"), nil
429}
430
431func (m *MockStorage) Put(ctx context.Context, key string, data []byte) error {
432 return nil
433}
Key Learning Points:
- Multi-Region Architecture: Manages storage across geographic regions with automatic replication
- Intelligent Region Selection: Chooses optimal region based on latency, error rates, and health
- Automatic Failover: Detects region failures and switches to healthy regions seamlessly
- Replication Strategies: Supports sync, async, and primary-only replication modes
- Health Monitoring: Continuous health checks with automatic circuit breaking
- Production Patterns: Comprehensive error handling, metrics tracking, and observability
Further Reading
- AWS S3 Documentation
- Google Cloud Storage Documentation
- Azure Blob Storage Documentation
- S3 Best Practices
- Cloud Storage Security
- Multi-Cloud Architecture Patterns
Summary
Key Takeaways
🎯 Core Concepts to Master:
- Object Storage Fundamentals: Flat namespace, HTTP-based access, infinite scalability
- Storage Classes: Choose Standard, IA, or Glacier based on access patterns and cost requirements
- Direct Browser Uploads: Use presigned URLs to eliminate server bandwidth bottlenecks
- Multipart Uploads: Handle large files efficiently with parallel uploads and resume capability
- Multi-Cloud Strategies: Implement redundancy across providers for maximum reliability
- Security Practices: Encryption, access control, and secure URL generation
- Cost Optimization: Lifecycle policies, storage class selection, and data transfer management
Production Best Practices
⚠️ Critical Success Factors:
- Always use presigned URLs for direct client uploads—reduces server load and improves user experience
- Implement automatic lifecycle policies—data accumulates and costs spiral without automated management
- Choose storage classes wisely—match access patterns to cost optimization
- Enable encryption by default—protect data at rest with server-side or client-side encryption
- Monitor costs and usage proactively—surprises are expensive at cloud scale
- Implement robust retry logic with exponential backoff—network failures are inevitable
- Use CDN integration for frequently accessed content—dramatically improves user experience
- Design for multi-cloud—avoid vendor lock-in and ensure business continuity
When to Use Cloud Object Storage
- Unstructured data: Images, videos, documents, backups, log files
- Large-scale storage: Terabytes to petabytes of data with predictable access patterns
- Global distribution: Content that needs to be accessible worldwide with low latency
- Archive requirements: Long-term retention with compliance needs and automatic lifecycle management
- Variable workloads: Applications with unpredictable growth patterns requiring elastic scalability
- Data lakes: Analytics and machine learning datasets requiring massively parallel processing
When NOT to Use Cloud Object Storage
- Transactional databases: Need ACID properties, low-latency random access, and complex queries
- Frequently modified files: Each modification creates new versions and increases costs
- Real-time processing: High latency compared to local storage or databases
- File system operations: No support for directories, renames, appends, or locking
- Small,频繁 accessed files: Better served by databases or caching layers
Next Steps in Your Learning Journey
- Study CDN integration: Learn how CloudFront, Cloud CDN, and Azure CDN integrate with object storage
- Explore data processing: Understand how to process data stored in object storage using services like AWS Lambda
- Master monitoring and cost management: Learn tools and techniques for optimizing cloud storage costs
- Implement advanced security: Study IAM policies, VPC endpoints, and access control patterns
- Build real applications: Apply these patterns to build actual production systems
Remember: Cloud object storage is not just a file replacement—it's a completely different paradigm designed for massive scale, global distribution, and cost-effective storage of unstructured data. Master these patterns to build the next generation of cloud-native applications.