Cloud SDK Integration

When building a house, you need various specialists - electricians, plumbers, carpenters. Each specialist has their own tools, work methods, and communication style. Cloud SDKs are like having universal translators and standardized tools that let you work seamlessly with all these specialists, regardless of their individual preferences.

Modern cloud applications leverage managed services across multiple cloud providers. This article covers production-ready patterns for integrating AWS SDK v2, Google Cloud client libraries, and Azure SDK for Go into your applications.

Introduction to Cloud SDKs

Cloud SDKs provide programmatic access to cloud services, enabling you to build applications that leverage managed infrastructure, storage, compute, and more.

💡 Key Takeaway: Cloud SDKs abstract away the complexity of REST APIs, authentication, retry logic, and error handling. They provide type-safe, idiomatic Go interfaces that make cloud services feel like native Go packages.

SDK Design Philosophy

Each cloud provider has a different philosophy for their Go SDK, reflecting their overall platform approach and design principles.

AWS SDK v2:

 1// Import only what you need - reduces binary size
 2import (
 3    "github.com/aws/aws-sdk-go-v2/config"
 4    "github.com/aws/aws-sdk-go-v2/service/s3"
 5    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
 6    // Each service is a separate module
 7)
 8
 9// AWS SDK v2 uses context throughout
10ctx := context.Background()
11cfg, _ := config.LoadDefaultConfig(ctx)
12client := s3.NewFromConfig(cfg)

Google Cloud:

 1// Google Cloud emphasizes idiomatic Go patterns
 2import (
 3    "cloud.google.com/go/storage"
 4    "cloud.google.com/go/pubsub"
 5)
 6
 7// Context-first design
 8ctx := context.Background()
 9client, _ := storage.NewClient(ctx)
10defer client.Close()

Azure SDK:

1// Azure SDK uses azidentity for auth
2import (
3    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
4    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
5)
6
7// Credential-based authentication
8cred, _ := azidentity.NewDefaultAzureCredential(nil)
9client, _ := azblob.NewClient(serviceURL, cred, nil)

Key Differences

Feature AWS SDK v2 Google Cloud Azure SDK
Modularity Per-service modules Per-service packages Per-service modules
Auth Config chain ADC, service accounts azidentity chain
Context Required everywhere First parameter Options pattern
Retries Built-in middleware Client options Policy-based

Real-World Impact: These differences affect how you structure your code. AWS SDK v2 requires context for every operation, making it more explicit about cancellation. Google Cloud's context-first design feels more idiomatic to Go developers. Azure's options pattern provides more flexibility but requires more verbose code.

AWS SDK v2 Fundamentals

AWS SDK v2 introduced a complete redesign with improved modularity, context support, and performance. Think of it as upgrading from a flip phone to a smartphone - same basic purpose, but entirely different capabilities and user experience.

Configuration Loading

The AWS SDK v2 configuration system is like a smart key that can open many doors. It tries different keys automatically until it finds one that works, eliminating the need to hardcode credentials.

Default Config Chain:

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "log"
 7
 8    "github.com/aws/aws-sdk-go-v2/config"
 9    "github.com/aws/aws-sdk-go-v2/aws"
10)
11
12// LoadAWSConfig loads configuration from environment, shared config, EC2 metadata
13func LoadAWSConfig(ctx context.Context) {
14    // Default config loader checks:
15    // 1. Environment variables
16    // 2. Shared config file
17    // 3. Shared credentials file
18    // 4. EC2 instance metadata
19    // 5. ECS task credentials
20    cfg, err := config.LoadDefaultConfig(ctx)
21    if err != nil {
22        return aws.Config{}, fmt.Errorf("unable to load SDK config: %w", err)
23    }
24
25    return cfg, nil
26}
27
28// CustomConfig demonstrates custom configuration
29func CustomConfig(ctx context.Context) {
30    cfg, err := config.LoadDefaultConfig(ctx,
31        config.WithRegion("us-west-2"),
32        config.WithSharedConfigProfile("production"),
33        // Custom retry configuration
34        config.WithRetryMaxAttempts(5),
35        // Custom endpoint
36        config.WithEndpointResolverWithOptions(
37            aws.EndpointResolverWithOptionsFunc(
38                func(service, region string, options ...interface{}) {
39                    if service == "s3" {
40                        return aws.Endpoint{
41                            URL: "http://localhost:4566",
42                        }, nil
43                    }
44                    return aws.Endpoint{}, &aws.EndpointNotFoundError{}
45                },
46            ),
47        ),
48    )
49    if err != nil {
50        return aws.Config{}, err
51    }
52
53    return cfg, nil
54}
55
56func main() {
57    ctx := context.Background()
58
59    cfg, err := LoadAWSConfig(ctx)
60    if err != nil {
61        log.Fatal(err)
62    }
63
64    fmt.Printf("Loaded config for region: %s\n", cfg.Region)
65}

⚠️ Important: Never hardcode AWS credentials in your code. Always use the default credential chain. This ensures your code works seamlessly across development, testing, and production environments without security risks.

Service Clients

Service clients are like specialized remote controls for each cloud service. Once configured, they handle all the communication details, letting you focus on what you want to do rather than how to do it.

Client Creation and Configuration:

 1package main
 2
 3import (
 4    "context"
 5    "time"
 6
 7    "github.com/aws/aws-sdk-go-v2/config"
 8    "github.com/aws/aws-sdk-go-v2/service/s3"
 9    "github.com/aws/aws-sdk-go-v2/aws"
10    "github.com/aws/aws-sdk-go-v2/aws/retry"
11)
12
13type AWSClients struct {
14    S3       *s3.Client
15    // Add other services as needed
16}
17
18func NewAWSClients(ctx context.Context) {
19    cfg, err := config.LoadDefaultConfig(ctx,
20        config.WithRegion("us-east-1"),
21        // Configure retries
22        config.WithRetryMode(aws.RetryModeAdaptive),
23        config.WithRetryMaxAttempts(3),
24    )
25    if err != nil {
26        return nil, err
27    }
28
29    // Create S3 client with custom options
30    s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
31        // Use path-style addressing
32        o.UsePathStyle = true
33
34        // Custom retry configuration per service
35        o.Retryer = retry.NewStandard(func(o *retry.StandardOptions) {
36            o.MaxAttempts = 5
37            o.MaxBackoff = 30 * time.Second
38        })
39    })
40
41    return &AWSClients{
42        S3: s3Client,
43    }, nil
44}

💡 Key Takeaway: Create service clients once and reuse them throughout your application. Clients are thread-safe and handle connection pooling internally. Creating new clients for every request is inefficient and can exhaust connection limits.

AWS Service Patterns

S3 Operations

S3 is like unlimited cloud storage with magical properties - it scales infinitely, is incredibly durable, and can serve files to millions of users simultaneously. The S3 client patterns below help you use this power effectively.

Production S3 Client:

 1package awsutil
 2
 3import (
 4    "context"
 5    "fmt"
 6    "io"
 7    "time"
 8
 9    "github.com/aws/aws-sdk-go-v2/aws"
10    "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
11    "github.com/aws/aws-sdk-go-v2/service/s3"
12    "github.com/aws/aws-sdk-go-v2/service/s3/types"
13)
14
15type S3Client struct {
16    client     *s3.Client
17    uploader   *manager.Uploader
18    downloader *manager.Downloader
19}
20
21func NewS3Client(cfg aws.Config) *S3Client {
22    client := s3.NewFromConfig(cfg)
23
24    return &S3Client{
25        client: client,
26        uploader: manager.NewUploader(client, func(u *manager.Uploader) {
27            // Configure multipart upload
28            u.PartSize = 10 * 1024 * 1024 // 10MB parts
29            u.Concurrency = 5
30        }),
31        downloader: manager.NewDownloader(client, func(d *manager.Downloader) {
32            // Configure multipart download
33            d.PartSize = 10 * 1024 * 1024
34            d.Concurrency = 5
35        }),
36    }
37}

Real-World Performance: With these multipart settings, uploading a 1GB file takes ~30 seconds compared to ~2 minutes for single-part uploads. The concurrent parts transfer data in parallel, maximizing network bandwidth utilization.

// Upload uploads a file to S3 with metadata
func Upload(ctx context.Context, bucket, key string, body io.Reader, metadata map[string]string) error {
_, err := s.uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: body,
Metadata: metadata,
// Server-side encryption
ServerSideEncryption: types.ServerSideEncryptionAes256,
// Storage class
StorageClass: types.StorageClassIntelligentTiering,
})
if err != nil {
return fmt.Errorf("upload failed: %w", err)
}

return nil

}

// Download downloads a file from S3
func Download(ctx context.Context, bucket, key string, w io.WriterAt) error {
_, err := s.downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}

return nil

}

// ListObjects lists objects with pagination
func ListObjects(ctx context.Context, bucket, prefix string) {
var objects []types.Object

paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
    Bucket: aws.String(bucket),
    Prefix: aws.String(prefix),
})

for paginator.HasMorePages() {
    page, err := paginator.NextPage(ctx)
    if err != nil {
        return nil, fmt.Errorf("failed to get page: %w", err)
    }
    objects = append(objects, page.Contents...)
}

return objects, nil

}

// GeneratePresignedURL creates a presigned URL for temporary access
func GeneratePresignedURL(ctx context.Context, bucket, key string, duration time.Duration) {
presignClient := s3.NewPresignClient(s.client)

request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
}, s3.WithPresignExpires(duration))
if err != nil {
    return "", fmt.Errorf("failed to create presigned URL: %w", err)
}

return request.URL, nil

}

// CopyObject copies an object within S3
func CopyObject(ctx context.Context, sourceBucket, sourceKey, destBucket, destKey string) error {
_, err := s.client.CopyObject(ctx, &s3.CopyObjectInput{
CopySource: aws.String(fmt.Sprintf("%s/%s", sourceBucket, sourceKey)),
Bucket: aws.String(destBucket),
Key: aws.String(destKey),
})
if err != nil {
return fmt.Errorf("copy failed: %w", err)
}

return nil

}

// DeleteObjects deletes multiple objects
func DeleteObjects(ctx context.Context, bucket string, keys []string) error {
objects := make([]types.ObjectIdentifier, len(keys))
for i, key := range keys {
objects[i] = types.ObjectIdentifier{Key: aws.String(key)}
}

_, err := s.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
    Bucket: aws.String(bucket),
    Delete: &types.Delete{
        Objects: objects,
        Quiet:   aws.Bool(true),
    },
})
if err != nil {
    return fmt.Errorf("delete failed: %w", err)
}

return nil

}


### DynamoDB Operations

**DynamoDB Client with Expression Builder:**
```go
package awsutil

import (
    "context"
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

type DynamoDBClient struct {
    client    *dynamodb.Client
    tableName string
}

func NewDynamoDBClient(cfg aws.Config, tableName string) *DynamoDBClient {
    return &DynamoDBClient{
        client:    dynamodb.NewFromConfig(cfg),
        tableName: tableName,
    }
}

type User struct {
    ID        string    `dynamodbav:"id"`
    Email     string    `dynamodbav:"email"`
    Name      string    `dynamodbav:"name"`
    CreatedAt time.Time `dynamodbav:"created_at"`
    UpdatedAt time.Time `dynamodbav:"updated_at"`
}

// PutItem writes an item to DynamoDB
func PutItem(ctx context.Context, user User) error {
    user.UpdatedAt = time.Now()
    if user.CreatedAt.IsZero() {
        user.CreatedAt = time.Now()
    }

    av, err := attributevalue.MarshalMap(user)
    if err != nil {
        return fmt.Errorf("failed to marshal: %w", err)
    }

    _, err = d.client.PutItem(ctx, &dynamodb.PutItemInput{
        TableName: aws.String(d.tableName),
        Item:      av,
        // Conditional write - only if item doesn't exist
        ConditionExpression: aws.String("attribute_not_exists(id)"),
    })
    if err != nil {
        return fmt.Errorf("put item failed: %w", err)
    }

    return nil
}

// GetItem retrieves an item by primary key
func GetItem(ctx context.Context, id string) {
    result, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{
        TableName: aws.String(d.tableName),
        Key: map[string]types.AttributeValue{
            "id": &types.AttributeValueMemberS{Value: id},
        },
        ConsistentRead: aws.Bool(true), // Strong consistency
    })
    if err != nil {
        return nil, fmt.Errorf("get item failed: %w", err)
    }

    if result.Item == nil {
        return nil, fmt.Errorf("item not found")
    }

    var user User
    err = attributevalue.UnmarshalMap(result.Item, &user)
    if err != nil {
        return nil, fmt.Errorf("failed to unmarshal: %w", err)
    }

    return &user, nil
}

// Query performs a query with filter expressions
func Query(ctx context.Context, email string) {
    // Build expression
    keyEx := expression.Key("email").Equal(expression.Value(email))
    filterEx := expression.Name("active").Equal(expression.Value(true))

    expr, err := expression.NewBuilder().
        WithKeyCondition(keyEx).
        WithFilter(filterEx).
        Build()
    if err != nil {
        return nil, fmt.Errorf("failed to build expression: %w", err)
    }

    result, err := d.client.Query(ctx, &dynamodb.QueryInput{
        TableName:                 aws.String(d.tableName),
        IndexName:                 aws.String("email-index"),
        KeyConditionExpression:    expr.KeyCondition(),
        FilterExpression:          expr.Filter(),
        ExpressionAttributeNames:  expr.Names(),
        ExpressionAttributeValues: expr.Values(),
    })
    if err != nil {
        return nil, fmt.Errorf("query failed: %w", err)
    }

    var users []User
    err = attributevalue.UnmarshalListOfMaps(result.Items, &users)
    if err != nil {
        return nil, fmt.Errorf("failed to unmarshal: %w", err)
    }

    return users, nil
}

// UpdateItem updates specific attributes
func UpdateItem(ctx context.Context, id string, name string) error {
    update := expression.Set(
        expression.Name("name"),
        expression.Value(name),
    ).Set(
        expression.Name("updated_at"),
        expression.Value(time.Now()),
    )

    expr, err := expression.NewBuilder().WithUpdate(update).Build()
    if err != nil {
        return fmt.Errorf("failed to build expression: %w", err)
    }

    _, err = d.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
        TableName: aws.String(d.tableName),
        Key: map[string]types.AttributeValue{
            "id": &types.AttributeValueMemberS{Value: id},
        },
        UpdateExpression:          expr.Update(),
        ExpressionAttributeNames:  expr.Names(),
        ExpressionAttributeValues: expr.Values(),
        ReturnValues:              types.ReturnValueUpdatedNew,
    })
    if err != nil {
        return fmt.Errorf("update failed: %w", err)
    }

    return nil
}

// BatchWrite writes multiple items efficiently
func BatchWrite(ctx context.Context, users []User) error {
    const batchSize = 25 // DynamoDB limit

    for i := 0; i < len(users); i += batchSize {
        end := i + batchSize
        if end > len(users) {
            end = len(users)
        }

        batch := users[i:end]
        writeRequests := make([]types.WriteRequest, len(batch))

        for j, user := range batch {
            av, err := attributevalue.MarshalMap(user)
            if err != nil {
                return fmt.Errorf("failed to marshal: %w", err)
            }

            writeRequests[j] = types.WriteRequest{
                PutRequest: &types.PutRequest{Item: av},
            }
        }

        _, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
            RequestItems: map[string][]types.WriteRequest{
                d.tableName: writeRequests,
            },
        })
        if err != nil {
            return fmt.Errorf("batch write failed: %w", err)
        }
    }

    return nil
}

SQS and SNS Patterns

Message Queue Integration:

  1package awsutil
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "time"
  8
  9    "github.com/aws/aws-sdk-go-v2/aws"
 10    "github.com/aws/aws-sdk-go-v2/service/sqs"
 11    "github.com/aws/aws-sdk-go-v2/service/sqs/types"
 12    "github.com/aws/aws-sdk-go-v2/service/sns"
 13)
 14
 15// SQSClient wraps SQS operations
 16type SQSClient struct {
 17    client   *sqs.Client
 18    queueURL string
 19}
 20
 21func NewSQSClient(cfg aws.Config, queueURL string) *SQSClient {
 22    return &SQSClient{
 23        client:   sqs.NewFromConfig(cfg),
 24        queueURL: queueURL,
 25    }
 26}
 27
 28// SendMessage sends a message to SQS
 29func SendMessage(ctx context.Context, body interface{}) error {
 30    data, err := json.Marshal(body)
 31    if err != nil {
 32        return fmt.Errorf("marshal failed: %w", err)
 33    }
 34
 35    _, err = s.client.SendMessage(ctx, &sqs.SendMessageInput{
 36        QueueUrl:    aws.String(s.queueURL),
 37        MessageBody: aws.String(string(data)),
 38        MessageAttributes: map[string]types.MessageAttributeValue{
 39            "Timestamp": {
 40                DataType:    aws.String("String"),
 41                StringValue: aws.String(time.Now().Format(time.RFC3339)),
 42            },
 43        },
 44    })
 45    if err != nil {
 46        return fmt.Errorf("send message failed: %w", err)
 47    }
 48
 49    return nil
 50}
 51
 52// ReceiveMessages polls for messages
 53func ReceiveMessages(ctx context.Context, maxMessages int32) {
 54    result, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
 55        QueueUrl:            aws.String(s.queueURL),
 56        MaxNumberOfMessages: maxMessages,
 57        WaitTimeSeconds:     20, // Long polling
 58        MessageAttributeNames: []string{
 59            string(types.QueueAttributeNameAll),
 60        },
 61    })
 62    if err != nil {
 63        return nil, fmt.Errorf("receive failed: %w", err)
 64    }
 65
 66    return result.Messages, nil
 67}
 68
 69// DeleteMessage removes a processed message
 70func DeleteMessage(ctx context.Context, receiptHandle string) error {
 71    _, err := s.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
 72        QueueUrl:      aws.String(s.queueURL),
 73        ReceiptHandle: aws.String(receiptHandle),
 74    })
 75    if err != nil {
 76        return fmt.Errorf("delete failed: %w", err)
 77    }
 78
 79    return nil
 80}
 81
 82// MessageProcessor handles message processing with retry
 83type MessageProcessor struct {
 84    client *SQSClient
 85}
 86
 87func NewMessageProcessor(client *SQSClient) *MessageProcessor {
 88    return &MessageProcessor{client: client}
 89}
 90
 91// ProcessMessages continuously processes messages
 92func ProcessMessages(ctx context.Context, handler func([]byte) error) error {
 93    for {
 94        select {
 95        case <-ctx.Done():
 96            return ctx.Err()
 97        default:
 98            messages, err := m.client.ReceiveMessages(ctx, 10)
 99            if err != nil {
100                return err
101            }
102
103            for _, msg := range messages {
104                if err := handler([]byte(*msg.Body)); err != nil {
105                    // Message will return to queue after visibility timeout
106                    continue
107                }
108
109                // Delete successfully processed message
110                if err := m.client.DeleteMessage(ctx, *msg.ReceiptHandle); err != nil {
111                    return err
112                }
113            }
114        }
115    }
116}
117
118// SNSClient wraps SNS operations
119type SNSClient struct {
120    client   *sns.Client
121    topicARN string
122}
123
124func NewSNSClient(cfg aws.Config, topicARN string) *SNSClient {
125    return &SNSClient{
126        client:   sns.NewFromConfig(cfg),
127        topicARN: topicARN,
128    }
129}
130
131// Publish publishes a message to SNS topic
132func Publish(ctx context.Context, message interface{}) error {
133    data, err := json.Marshal(message)
134    if err != nil {
135        return fmt.Errorf("marshal failed: %w", err)
136    }
137
138    _, err = s.client.Publish(ctx, &sns.PublishInput{
139        TopicArn: aws.String(s.topicARN),
140        Message:  aws.String(string(data)),
141        MessageAttributes: map[string]types.MessageAttributeValue{
142            "event_type": {
143                DataType:    aws.String("String"),
144                StringValue: aws.String("user.created"),
145            },
146        },
147    })
148    if err != nil {
149        return fmt.Errorf("publish failed: %w", err)
150    }
151
152    return nil
153}

Google Cloud Client Libraries

Google Cloud client libraries follow idiomatic Go patterns with context-first design.

Cloud Storage

Production GCS Client:

  1package gcp
  2
  3import (
  4    "context"
  5    "fmt"
  6    "io"
  7    "time"
  8
  9    "cloud.google.com/go/storage"
 10    "google.golang.org/api/iterator"
 11)
 12
 13type GCSClient struct {
 14    client *storage.Client
 15}
 16
 17func NewGCSClient(ctx context.Context) {
 18    client, err := storage.NewClient(ctx)
 19    if err != nil {
 20        return nil, fmt.Errorf("failed to create client: %w", err)
 21    }
 22
 23    return &GCSClient{client: client}, nil
 24}
 25
 26func Close() error {
 27    return g.client.Close()
 28}
 29
 30// Upload uploads a file to GCS
 31func Upload(ctx context.Context, bucket, object string, data io.Reader) error {
 32    wc := g.client.Bucket(bucket).Object(object).NewWriter(ctx)
 33
 34    // Set metadata
 35    wc.ContentType = "application/octet-stream"
 36    wc.Metadata = map[string]string{
 37        "uploaded_at": time.Now().Format(time.RFC3339),
 38    }
 39
 40    if _, err := io.Copy(wc, data); err != nil {
 41        return fmt.Errorf("io.Copy: %w", err)
 42    }
 43
 44    if err := wc.Close(); err != nil {
 45        return fmt.Errorf("Writer.Close: %w", err)
 46    }
 47
 48    return nil
 49}
 50
 51// Download downloads a file from GCS
 52func Download(ctx context.Context, bucket, object string) {
 53    rc, err := g.client.Bucket(bucket).Object(object).NewReader(ctx)
 54    if err != nil {
 55        return nil, fmt.Errorf("Object(%q).NewReader: %w", object, err)
 56    }
 57    defer rc.Close()
 58
 59    data, err := io.ReadAll(rc)
 60    if err != nil {
 61        return nil, fmt.Errorf("io.ReadAll: %w", err)
 62    }
 63
 64    return data, nil
 65}
 66
 67// List lists objects in a bucket with prefix
 68func List(ctx context.Context, bucket, prefix string) {
 69    var objects []string
 70
 71    it := g.client.Bucket(bucket).Objects(ctx, &storage.Query{
 72        Prefix: prefix,
 73    })
 74
 75    for {
 76        attrs, err := it.Next()
 77        if err == iterator.Done {
 78            break
 79        }
 80        if err != nil {
 81            return nil, fmt.Errorf("Bucket(%q).Objects: %w", bucket, err)
 82        }
 83        objects = append(objects, attrs.Name)
 84    }
 85
 86    return objects, nil
 87}
 88
 89// SignedURL generates a signed URL for temporary access
 90func SignedURL(ctx context.Context, bucket, object string, duration time.Duration) {
 91    opts := &storage.SignedURLOptions{
 92        Method:  "GET",
 93        Expires: time.Now().Add(duration),
 94    }
 95
 96    url, err := g.client.Bucket(bucket).SignedURL(object, opts)
 97    if err != nil {
 98        return "", fmt.Errorf("SignedURL: %w", err)
 99    }
100
101    return url, nil
102}

Pub/Sub

Production Pub/Sub Client:

 1package gcp
 2
 3import (
 4    "context"
 5    "fmt"
 6    "sync"
 7
 8    "cloud.google.com/go/pubsub"
 9)
10
11type PubSubClient struct {
12    client *pubsub.Client
13}
14
15func NewPubSubClient(ctx context.Context, projectID string) {
16    client, err := pubsub.NewClient(ctx, projectID)
17    if err != nil {
18        return nil, fmt.Errorf("pubsub.NewClient: %w", err)
19    }
20
21    return &PubSubClient{client: client}, nil
22}
23
24func Close() error {
25    return p.client.Close()
26}
27
28// Publish publishes messages to a topic
29func Publish(ctx context.Context, topicID string, data []byte) error {
30    topic := p.client.Topic(topicID)
31    defer topic.Stop()
32
33    result := topic.Publish(ctx, &pubsub.Message{
34        Data: data,
35        Attributes: map[string]string{
36            "origin": "go-service",
37        },
38    })
39
40    // Block until the result is returned
41    _, err := result.Get(ctx)
42    if err != nil {
43        return fmt.Errorf("Get: %w", err)
44    }
45
46    return nil
47}
48
49// BatchPublish publishes multiple messages efficiently
50func BatchPublish(ctx context.Context, topicID string, messages [][]byte) error {
51    topic := p.client.Topic(topicID)
52    defer topic.Stop()
53
54    var results []*pubsub.PublishResult
55    for _, data := range messages {
56        result := topic.Publish(ctx, &pubsub.Message{Data: data})
57        results = append(results, result)
58    }
59
60    // Wait for all publishes to complete
61    for _, result := range results {
62        if _, err := result.Get(ctx); err != nil {
63            return fmt.Errorf("Get: %w", err)
64        }
65    }
66
67    return nil
68}
69
70// Subscribe subscribes to a subscription with concurrent processing
71func Subscribe(ctx context.Context, subscriptionID string, handler func([]byte) error) error {
72    sub := p.client.Subscription(subscriptionID)
73
74    // Configure subscription
75    sub.ReceiveSettings.MaxOutstandingMessages = 100
76    sub.ReceiveSettings.NumGoroutines = 10
77
78    err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
79        if err := handler(msg.Data); err != nil {
80            msg.Nack() // Redelivery
81            return
82        }
83        msg.Ack()
84    })
85    if err != nil {
86        return fmt.Errorf("Receive: %w", err)
87    }
88
89    return nil
90}

Firestore

Firestore Document Operations:

  1package gcp
  2
  3import (
  4    "context"
  5    "fmt"
  6
  7    "cloud.google.com/go/firestore"
  8    "google.golang.org/api/iterator"
  9)
 10
 11type FirestoreClient struct {
 12    client *firestore.Client
 13}
 14
 15func NewFirestoreClient(ctx context.Context, projectID string) {
 16    client, err := firestore.NewClient(ctx, projectID)
 17    if err != nil {
 18        return nil, fmt.Errorf("firestore.NewClient: %w", err)
 19    }
 20
 21    return &FirestoreClient{client: client}, nil
 22}
 23
 24func Close() error {
 25    return f.client.Close()
 26}
 27
 28type User struct {
 29    Name      string    `firestore:"name"`
 30    Email     string    `firestore:"email"`
 31    CreatedAt time.Time `firestore:"created_at"`
 32}
 33
 34// Create creates a new document
 35func Create(ctx context.Context, collection, docID string, data interface{}) error {
 36    _, err := f.client.Collection(collection).Doc(docID).Set(ctx, data)
 37    if err != nil {
 38        return fmt.Errorf("Set: %w", err)
 39    }
 40
 41    return nil
 42}
 43
 44// Get retrieves a document
 45func Get(ctx context.Context, collection, docID string) {
 46    doc, err := f.client.Collection(collection).Doc(docID).Get(ctx)
 47    if err != nil {
 48        return nil, fmt.Errorf("Get: %w", err)
 49    }
 50
 51    var user User
 52    if err := doc.DataTo(&user); err != nil {
 53        return nil, fmt.Errorf("DataTo: %w", err)
 54    }
 55
 56    return &user, nil
 57}
 58
 59// Query performs a complex query
 60func Query(ctx context.Context, collection string) {
 61    iter := f.client.Collection(collection).
 62        Where("active", "==", true).
 63        OrderBy("created_at", firestore.Desc).
 64        Limit(100).
 65        Documents(ctx)
 66
 67    var users []User
 68    for {
 69        doc, err := iter.Next()
 70        if err == iterator.Done {
 71            break
 72        }
 73        if err != nil {
 74            return nil, fmt.Errorf("iterator.Next: %w", err)
 75        }
 76
 77        var user User
 78        if err := doc.DataTo(&user); err != nil {
 79            return nil, fmt.Errorf("DataTo: %w", err)
 80        }
 81        users = append(users, user)
 82    }
 83
 84    return users, nil
 85}
 86
 87// Transaction performs an atomic transaction
 88func Transaction(ctx context.Context) error {
 89    return f.client.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
 90        // Read
 91        docRef := f.client.Collection("counters").Doc("page_views")
 92        doc, err := tx.Get(docRef)
 93        if err != nil {
 94            return err
 95        }
 96
 97        var count int64
 98        doc.DataTo(&count)
 99
100        // Write
101        return tx.Set(docRef, map[string]interface{}{
102            "count": count + 1,
103        })
104    })
105}

Azure SDK for Go

Azure SDK uses azidentity for authentication and follows a credential-based pattern.

Azure Blob Storage

Production Blob Client:

 1package azure
 2
 3import (
 4    "context"
 5    "fmt"
 6    "io"
 7
 8    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 9    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
10)
11
12type BlobClient struct {
13    client *azblob.Client
14}
15
16func NewBlobClient(accountURL string) {
17    // Use DefaultAzureCredential for auth chain
18    cred, err := azidentity.NewDefaultAzureCredential(nil)
19    if err != nil {
20        return nil, fmt.Errorf("failed to create credential: %w", err)
21    }
22
23    client, err := azblob.NewClient(accountURL, cred, nil)
24    if err != nil {
25        return nil, fmt.Errorf("failed to create client: %w", err)
26    }
27
28    return &BlobClient{client: client}, nil
29}
30
31// Upload uploads a blob
32func Upload(ctx context.Context, container, blob string, data io.Reader) error {
33    _, err := b.client.UploadStream(ctx, container, blob, data, nil)
34    if err != nil {
35        return fmt.Errorf("UploadStream: %w", err)
36    }
37
38    return nil
39}
40
41// Download downloads a blob
42func Download(ctx context.Context, container, blob string) {
43    resp, err := b.client.DownloadStream(ctx, container, blob, nil)
44    if err != nil {
45        return nil, fmt.Errorf("DownloadStream: %w", err)
46    }
47
48    data, err := io.ReadAll(resp.Body)
49    if err != nil {
50        return nil, fmt.Errorf("ReadAll: %w", err)
51    }
52
53    return data, nil
54}
55
56// List lists blobs in a container
57func List(ctx context.Context, container string) {
58    var blobs []string
59
60    pager := b.client.NewListBlobsFlatPager(container, nil)
61    for pager.More() {
62        page, err := pager.NextPage(ctx)
63        if err != nil {
64            return nil, fmt.Errorf("NextPage: %w", err)
65        }
66
67        for _, blob := range page.Segment.BlobItems {
68            blobs = append(blobs, *blob.Name)
69        }
70    }
71
72    return blobs, nil
73}

Azure Service Bus

Service Bus Messaging:

 1package azure
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 8    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
 9)
10
11type ServiceBusClient struct {
12    client *azservicebus.Client
13}
14
15func NewServiceBusClient(namespace string) {
16    cred, err := azidentity.NewDefaultAzureCredential(nil)
17    if err != nil {
18        return nil, err
19    }
20
21    client, err := azservicebus.NewClient(namespace, cred, nil)
22    if err != nil {
23        return nil, err
24    }
25
26    return &ServiceBusClient{client: client}, nil
27}
28
29func Close(ctx context.Context) error {
30    return s.client.Close(ctx)
31}
32
33// SendMessage sends a message to a queue
34func SendMessage(ctx context.Context, queueName string, message []byte) error {
35    sender, err := s.client.NewSender(queueName, nil)
36    if err != nil {
37        return fmt.Errorf("NewSender: %w", err)
38    }
39    defer sender.Close(ctx)
40
41    err = sender.SendMessage(ctx, &azservicebus.Message{
42        Body: message,
43    }, nil)
44    if err != nil {
45        return fmt.Errorf("SendMessage: %w", err)
46    }
47
48    return nil
49}
50
51// ReceiveMessages receives messages from a queue
52func ReceiveMessages(ctx context.Context, queueName string) error {
53    receiver, err := s.client.NewReceiverForQueue(queueName, nil)
54    if err != nil {
55        return fmt.Errorf("NewReceiverForQueue: %w", err)
56    }
57    defer receiver.Close(ctx)
58
59    messages, err := receiver.ReceiveMessages(ctx, 10, nil)
60    if err != nil {
61        return fmt.Errorf("ReceiveMessages: %w", err)
62    }
63
64    for _, msg := range messages {
65        fmt.Printf("Received: %s\n", string(msg.Body))
66
67        // Complete the message
68        err := receiver.CompleteMessage(ctx, msg, nil)
69        if err != nil {
70            return fmt.Errorf("CompleteMessage: %w", err)
71        }
72    }
73
74    return nil
75}

Authentication Strategies

Authentication is like showing different types of ID to enter a building. You might use a keycard, a temporary visitor pass, or a guest badge. Cloud SDKs automatically try different authentication methods until one works.

AWS IAM Roles and Profiles

Environment-Based Authentication:

 1package auth
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "github.com/aws/aws-sdk-go-v2/config"
 8    "github.com/aws/aws-sdk-go-v2/credentials"
 9    "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
10    "github.com/aws/aws-sdk-go-v2/service/sts"
11)
12
13// LoadConfig loads AWS config with various auth strategies
14func LoadConfig(ctx context.Context, profile string) {
15    if profile != "" {
16        // Use specific profile
17        return config.LoadDefaultConfig(ctx,
18            config.WithSharedConfigProfile(profile),
19        )
20    }
21
22    // Use default credential chain
23    return config.LoadDefaultConfig(ctx)
24}
25
26// AssumeRole assumes an IAM role for cross-account access
27func AssumeRole(ctx context.Context, roleARN string) {
28    cfg, err := config.LoadDefaultConfig(ctx)
29    if err != nil {
30        return aws.Config{}, err
31    }
32
33    stsClient := sts.NewFromConfig(cfg)
34
35    creds := stscreds.NewAssumeRoleProvider(stsClient, roleARN, func(o *stscreds.AssumeRoleOptions) {
36        o.RoleSessionName = "go-application"
37    })
38
39    cfg.Credentials = aws.NewCredentialsCache(creds)
40
41    return cfg, nil
42}
43
44// StaticCredentials creates config from static credentials
45func StaticCredentials(ctx context.Context, accessKey, secretKey, region string) {
46    return config.LoadDefaultConfig(ctx,
47        config.WithCredentialsProvider(
48            credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
49        ),
50        config.WithRegion(region),
51    )
52}

Real-World Example: A microservices application running on ECS uses different authentication strategies:

  • Local development: AWS profiles from ~/.aws/credentials
  • CI/CD pipeline: Environment variables from secure secrets
  • Production: IAM roles attached to ECS tasks
  • Cross-account access: Assume role to access shared services

💡 Key Takeaway: The AWS credential chain automatically handles all these scenarios. Your code doesn't need to change - just configure the environment appropriately.

Google Cloud Service Accounts

GCP Authentication:

 1package auth
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "cloud.google.com/go/storage"
 8    "google.golang.org/api/option"
 9)
10
11// DefaultAuth uses Application Default Credentials
12func DefaultAuth(ctx context.Context) {
13    // Checks:
14    // 1. GOOGLE_APPLICATION_CREDENTIALS env var
15    // 2. gcloud auth application-default login
16    // 3. GCE/GKE metadata service
17    return storage.NewClient(ctx)
18}
19
20// ServiceAccountAuth uses service account key file
21func ServiceAccountAuth(ctx context.Context, keyFile string) {
22    return storage.NewClient(ctx,
23        option.WithCredentialsFile(keyFile),
24    )
25}
26
27// ServiceAccountJSON uses service account JSON bytes
28func ServiceAccountJSON(ctx context.Context, jsonKey []byte) {
29    return storage.NewClient(ctx,
30        option.WithCredentialsJSON(jsonKey),
31    )
32}

Azure Managed Identity

Azure Authentication:

 1package auth
 2
 3import (
 4    "fmt"
 5
 6    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 7    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
 8)
 9
10// ManagedIdentityAuth uses Azure Managed Identity
11func ManagedIdentityAuth(accountURL string) {
12    cred, err := azidentity.NewManagedIdentityCredential(nil)
13    if err != nil {
14        return nil, fmt.Errorf("NewManagedIdentityCredential: %w", err)
15    }
16
17    return azblob.NewClient(accountURL, cred, nil)
18}
19
20// DefaultAuth uses credential chain
21func DefaultAuth(accountURL string) {
22    // Tries in order:
23    // 1. Environment variables
24    // 2. Managed Identity
25    // 3. Azure CLI credentials
26    cred, err := azidentity.NewDefaultAzureCredential(nil)
27    if err != nil {
28        return nil, err
29    }
30
31    return azblob.NewClient(accountURL, cred, nil)
32}

Error Handling and Retries

Network failures are like delivery trucks getting stuck in traffic - sometimes you need to try a different route or wait for the traffic to clear. Proper error handling and retry logic make your cloud applications resilient to temporary failures.

AWS SDK v2 Retry Strategies

Configurable Retry Behavior:

 1package retry
 2
 3import (
 4    "context"
 5    "fmt"
 6    "time"
 7
 8    "github.com/aws/aws-sdk-go-v2/aws"
 9    "github.com/aws/aws-sdk-go-v2/aws/retry"
10    "github.com/aws/aws-sdk-go-v2/config"
11    "github.com/aws/aws-sdk-go-v2/service/s3"
12)
13
14// ConfigureRetries configures retry behavior
15func ConfigureRetries(ctx context.Context) {
16    return config.LoadDefaultConfig(ctx,
17        // Standard retry mode
18        config.WithRetryMode(aws.RetryModeStandard),
19
20        // Adaptive mode adjusts based on service responses
21        // config.WithRetryMode(aws.RetryModeAdaptive),
22
23        config.WithRetryMaxAttempts(5),
24    )
25}
26
27// CustomRetryer implements custom retry logic
28type CustomRetryer struct {
29    standard retry.Standard
30}
31
32func NewCustomRetryer() *CustomRetryer {
33    return &CustomRetryer{
34        standard: retry.NewStandard(func(o *retry.StandardOptions) {
35            o.MaxAttempts = 5
36            o.MaxBackoff = 30 * time.Second
37        }),
38    }
39}
40
41func IsErrorRetryable(err error) bool {
42    // Custom logic to determine if error is retryable
43    return r.standard.IsErrorRetryable(err)
44}
45
46func MaxAttempts() int {
47    return r.standard.MaxAttempts()
48}
49
50func RetryDelay(attempt int, err error) {
51    // Custom backoff strategy
52    delay := time.Duration(attempt) * time.Second
53    if delay > 30*time.Second {
54        delay = 30 * time.Second
55    }
56    return delay, nil
57}
58
59func GetRetryToken(ctx context.Context, err error) error, error) {
60    return r.standard.GetRetryToken(ctx, err)
61}
62
63func GetInitialToken() func(error) error {
64    return r.standard.GetInitialToken()
65}

Common Pitfalls in Error Handling:

  • Treating all errors equally: Some errors should never be retried
  • No exponential backoff: Fixed delays can overwhelm services during outages
  • Missing context cancellation: Retries should respect context cancellation
  • Inadequate logging: Not logging retry attempts makes debugging difficult

When to use Standard vs Adaptive retry mode:

  • Standard mode: Predictable retry behavior, good for most applications
  • Adaptive mode: Adjusts based on service-side response, better for high-throughput applications that can handle variable latency

Error Classification

Production Error Handling:

 1package errorutil
 2
 3import (
 4    "errors"
 5    "fmt"
 6    "log"
 7
 8    "github.com/aws/aws-sdk-go-v2/service/s3"
 9    "github.com/aws/smithy-go"
10    "google.golang.org/api/googleapi"
11)
12
13// AWS Error Handling
14func HandleAWSError(err error) {
15    var ae smithy.APIError
16    if errors.As(err, &ae) {
17        log.Printf("AWS API Error: %s - %s", ae.ErrorCode(), ae.ErrorMessage())
18
19        switch ae.ErrorCode() {
20        case "NoSuchBucket":
21            // Handle missing bucket
22        case "AccessDenied":
23            // Handle permission error
24        case "InvalidAccessKeyId":
25            // Handle auth error
26        }
27    }
28}
29
30// GCP Error Handling
31func HandleGCPError(err error) {
32    var gerr *googleapi.Error
33    if errors.As(err, &gerr) {
34        log.Printf("GCP Error: %d - %s", gerr.Code, gerr.Message)
35
36        switch gerr.Code {
37        case 404:
38            // Handle not found
39        case 403:
40            // Handle permission denied
41        case 429:
42            // Handle rate limit
43        }
44    }
45}
46
47// Circuit Breaker for SDK calls
48type CircuitBreaker struct {
49    maxFailures int
50    resetTime   time.Duration
51    failures    int
52    lastFailure time.Time
53    state       string // "closed", "open", "half-open"
54}
55
56func NewCircuitBreaker(maxFailures int, resetTime time.Duration) *CircuitBreaker {
57    return &CircuitBreaker{
58        maxFailures: maxFailures,
59        resetTime:   resetTime,
60        state:       "closed",
61    }
62}
63
64func Call(fn func() error) error {
65    if cb.state == "open" {
66        if time.Since(cb.lastFailure) > cb.resetTime {
67            cb.state = "half-open"
68        } else {
69            return fmt.Errorf("circuit breaker is open")
70        }
71    }
72
73    err := fn()
74    if err != nil {
75        cb.failures++
76        cb.lastFailure = time.Now()
77
78        if cb.failures >= cb.maxFailures {
79            cb.state = "open"
80        }
81        return err
82    }
83
84    if cb.state == "half-open" {
85        cb.state = "closed"
86        cb.failures = 0
87    }
88
89    return nil
90}

Multi-Cloud Abstraction

Multi-cloud abstraction is like having a universal remote control that works with any TV brand. Instead of learning different controls for Samsung, LG, and Sony, you have one interface that translates your commands appropriately for each device.

Storage Abstraction Layer

Cloud-Agnostic Storage Interface:

  1package storage
  2
  3import (
  4    "context"
  5    "io"
  6    "time"
  7)
  8
  9// Storage defines cloud-agnostic storage interface
 10type Storage interface {
 11    Upload(ctx context.Context, path string, data io.Reader) error
 12    Download(ctx context.Context, path string)
 13    Delete(ctx context.Context, path string) error
 14    List(ctx context.Context, prefix string)
 15    GenerateURL(ctx context.Context, path string, expiry time.Duration)
 16}
 17
 18// S3Storage implements Storage for AWS S3
 19type S3Storage struct {
 20    client *awsutil.S3Client
 21    bucket string
 22}
 23
 24func NewS3Storage(client *awsutil.S3Client, bucket string) *S3Storage {
 25    return &S3Storage{
 26        client: client,
 27        bucket: bucket,
 28    }
 29}
 30
 31func Upload(ctx context.Context, path string, data io.Reader) error {
 32    return s.client.Upload(ctx, s.bucket, path, data, nil)
 33}
 34
 35func Download(ctx context.Context, path string) {
 36    var buf bytes.Buffer
 37    err := s.client.Download(ctx, s.bucket, path, &buf)
 38    return buf.Bytes(), err
 39}
 40
 41func Delete(ctx context.Context, path string) error {
 42    return s.client.DeleteObjects(ctx, s.bucket, []string{path})
 43}
 44
 45func List(ctx context.Context, prefix string) {
 46    objects, err := s.client.ListObjects(ctx, s.bucket, prefix)
 47    if err != nil {
 48        return nil, err
 49    }
 50
 51    var paths []string
 52    for _, obj := range objects {
 53        paths = append(paths, *obj.Key)
 54    }
 55    return paths, nil
 56}
 57
 58func GenerateURL(ctx context.Context, path string, expiry time.Duration) {
 59    return s.client.GeneratePresignedURL(ctx, s.bucket, path, expiry)
 60}
 61
 62// GCSStorage implements Storage for Google Cloud Storage
 63type GCSStorage struct {
 64    client *gcp.GCSClient
 65    bucket string
 66}
 67
 68func NewGCSStorage(client *gcp.GCSClient, bucket string) *GCSStorage {
 69    return &GCSStorage{
 70        client: client,
 71        bucket: bucket,
 72    }
 73}
 74
 75func Upload(ctx context.Context, path string, data io.Reader) error {
 76    return g.client.Upload(ctx, g.bucket, path, data)
 77}
 78
 79func Download(ctx context.Context, path string) {
 80    return g.client.Download(ctx, g.bucket, path)
 81}
 82
 83func Delete(ctx context.Context, path string) error {
 84    // Implement delete
 85    return nil
 86}
 87
 88func List(ctx context.Context, prefix string) {
 89    return g.client.List(ctx, g.bucket, prefix)
 90}
 91
 92func GenerateURL(ctx context.Context, path string, expiry time.Duration) {
 93    return g.client.SignedURL(ctx, g.bucket, path, expiry)
 94}
 95
 96// Usage example
 97func Example() {
 98    ctx := context.Background()
 99
100    // Use S3
101    var storage Storage
102    s3Client := // ... create S3 client
103    storage = NewS3Storage(s3Client, "my-bucket")
104
105    // Or use GCS
106    gcsClient := // ... create GCS client
107    storage = NewGCSStorage(gcsClient, "my-bucket")
108
109    // Use same interface
110    storage.Upload(ctx, "file.txt", strings.NewReader("data"))
111}

Multi-Provider Message Queue Abstraction

Queue Abstraction for SQS, Pub/Sub, and Service Bus:

  1package queue
  2
  3import (
  4    "context"
  5    "time"
  6)
  7
  8// Message represents a queue message
  9type Message struct {
 10    ID            string
 11    Body          []byte
 12    Attributes    map[string]string
 13    ReceiptHandle string
 14    PublishTime   time.Time
 15}
 16
 17// Queue defines cloud-agnostic message queue interface
 18type Queue interface {
 19    Send(ctx context.Context, body []byte, attributes map[string]string) error
 20    SendBatch(ctx context.Context, messages []Message) error
 21    Receive(ctx context.Context, maxMessages int) ([]Message, error)
 22    Delete(ctx context.Context, receiptHandle string) error
 23    DeleteBatch(ctx context.Context, receiptHandles []string) error
 24    ChangeVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error
 25}
 26
 27// SQSQueue implements Queue for AWS SQS
 28type SQSQueue struct {
 29    client   *awsutil.SQSClient
 30    queueURL string
 31}
 32
 33func NewSQSQueue(client *awsutil.SQSClient, queueURL string) *SQSQueue {
 34    return &SQSQueue{
 35        client:   client,
 36        queueURL: queueURL,
 37    }
 38}
 39
 40func (q *SQSQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
 41    // Convert attributes to SQS format
 42    return q.client.SendMessage(ctx, body)
 43}
 44
 45func (q *SQSQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
 46    sqsMessages, err := q.client.ReceiveMessages(ctx, int32(maxMessages))
 47    if err != nil {
 48        return nil, err
 49    }
 50
 51    messages := make([]Message, len(sqsMessages))
 52    for i, msg := range sqsMessages {
 53        messages[i] = Message{
 54            ID:            *msg.MessageId,
 55            Body:          []byte(*msg.Body),
 56            ReceiptHandle: *msg.ReceiptHandle,
 57        }
 58    }
 59    return messages, nil
 60}
 61
 62// PubSubQueue implements Queue for Google Cloud Pub/Sub
 63type PubSubQueue struct {
 64    client         *gcp.PubSubClient
 65    topicID        string
 66    subscriptionID string
 67}
 68
 69func NewPubSubQueue(client *gcp.PubSubClient, topicID, subscriptionID string) *PubSubQueue {
 70    return &PubSubQueue{
 71        client:         client,
 72        topicID:        topicID,
 73        subscriptionID: subscriptionID,
 74    }
 75}
 76
 77func (q *PubSubQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
 78    return q.client.Publish(ctx, q.topicID, body)
 79}
 80
 81func (q *PubSubQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
 82    // Pub/Sub uses pull subscriptions
 83    var messages []Message
 84    // Implementation details...
 85    return messages, nil
 86}
 87
 88// ServiceBusQueue implements Queue for Azure Service Bus
 89type ServiceBusQueue struct {
 90    client    *azure.ServiceBusClient
 91    queueName string
 92}
 93
 94func NewServiceBusQueue(client *azure.ServiceBusClient, queueName string) *ServiceBusQueue {
 95    return &ServiceBusQueue{
 96        client:    client,
 97        queueName: queueName,
 98    }
 99}
100
101func (q *ServiceBusQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
102    return q.client.SendMessage(ctx, q.queueName, body)
103}
104
105func (q *ServiceBusQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
106    // Azure Service Bus implementation
107    var messages []Message
108    // Implementation details...
109    return messages, nil
110}

Database Abstraction Layer

Multi-Cloud Database Interface:

 1package database
 2
 3import (
 4    "context"
 5)
 6
 7// Document represents a database document
 8type Document map[string]interface{}
 9
10// QueryResult represents query results
11type QueryResult struct {
12    Items      []Document
13    NextToken  string
14    Count      int
15}
16
17// Database defines cloud-agnostic NoSQL database interface
18type Database interface {
19    Put(ctx context.Context, table string, item Document) error
20    Get(ctx context.Context, table string, key Document) (Document, error)
21    Query(ctx context.Context, table string, filter Document) (*QueryResult, error)
22    Update(ctx context.Context, table string, key Document, updates Document) error
23    Delete(ctx context.Context, table string, key Document) error
24    BatchWrite(ctx context.Context, table string, items []Document) error
25}
26
27// DynamoDBDatabase implements Database for AWS DynamoDB
28type DynamoDBDatabase struct {
29    client *awsutil.DynamoDBClient
30}
31
32func NewDynamoDBDatabase(client *awsutil.DynamoDBClient) *DynamoDBDatabase {
33    return &DynamoDBDatabase{client: client}
34}
35
36func (db *DynamoDBDatabase) Put(ctx context.Context, table string, item Document) error {
37    // Convert Document to DynamoDB AttributeValue
38    // Use PutItem API
39    return nil
40}
41
42func (db *DynamoDBDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
43    // Build DynamoDB query expression
44    // Execute query with pagination
45    return &QueryResult{}, nil
46}
47
48// FirestoreDatabase implements Database for Google Cloud Firestore
49type FirestoreDatabase struct {
50    client *gcp.FirestoreClient
51}
52
53func NewFirestoreDatabase(client *gcp.FirestoreClient) *FirestoreDatabase {
54    return &FirestoreDatabase{client: client}
55}
56
57func (db *FirestoreDatabase) Put(ctx context.Context, table string, item Document) error {
58    // Use Firestore Set operation
59    return db.client.Create(ctx, table, item["id"].(string), item)
60}
61
62func (db *FirestoreDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
63    // Build Firestore query
64    // Execute with filters
65    return &QueryResult{}, nil
66}
67
68// CosmosDBDatabase implements Database for Azure Cosmos DB
69type CosmosDBDatabase struct {
70    // Azure Cosmos DB client
71}
72
73func NewCosmosDBDatabase() *CosmosDBDatabase {
74    return &CosmosDBDatabase{}
75}
76
77func (db *CosmosDBDatabase) Put(ctx context.Context, table string, item Document) error {
78    // Use Cosmos DB upsert
79    return nil
80}
81
82func (db *CosmosDBDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
83    // Execute SQL query on Cosmos DB
84    return &QueryResult{}, nil
85}

Real-World Multi-Cloud Strategy: A financial services company uses this abstraction to run the same application on AWS (primary), GCP (disaster recovery), and Azure (regulatory compliance). The abstraction layer allows them to:

  • Switch providers with minimal code changes
  • Run multi-region deployments across different clouds
  • Avoid vendor lock-in
  • Optimize costs by using the cheapest provider for each service

💡 Key Takeaway: Multi-cloud abstraction adds complexity but provides flexibility. Only implement it if you have genuine multi-cloud requirements. For most applications, optimizing for a single cloud provider yields better performance and lower costs.

Production Best Practices

Connection Pooling

Connection pooling is like having a dedicated team of delivery workers always ready, instead of hiring new people for each delivery. This eliminates the overhead of constantly establishing new connections.

Reuse Clients Across Requests:

 1package best
 2
 3import (
 4    "context"
 5    "sync"
 6
 7    "github.com/aws/aws-sdk-go-v2/config"
 8    "github.com/aws/aws-sdk-go-v2/service/s3"
 9)
10
11// ClientPool manages reusable SDK clients
12type ClientPool struct {
13    s3Client *s3.Client
14    once     sync.Once
15}
16
17func GetS3Client(ctx context.Context) {
18    var err error
19    p.once.Do(func() {
20        var cfg aws.Config
21        cfg, err = config.LoadDefaultConfig(ctx)
22        if err != nil {
23            return
24        }
25        p.s3Client = s3.NewFromConfig(cfg)
26    })
27
28    return p.s3Client, err
29}
30
31// Global client pool
32var globalPool = &ClientPool{}
33
34func GetS3Client(ctx context.Context) {
35    return globalPool.GetS3Client(ctx)
36}

Real-World Impact: A web application that reuses S3 clients saw a 90% reduction in API latency and 80% fewer connection errors after implementing proper client reuse.

⚠️ Important: Never store credentials in global variables. Only store the configured clients. The credential chain should be evaluated at runtime to handle different environments securely.

Metrics and Monitoring

Instrument SDK Calls:

 1package monitoring
 2
 3import (
 4    "context"
 5    "time"
 6
 7    "github.com/prometheus/client_golang/prometheus"
 8    "github.com/prometheus/client_golang/prometheus/promauto"
 9)
10
11var (
12    sdkCallDuration = promauto.NewHistogramVec(
13        prometheus.HistogramOpts{
14            Name:    "cloud_sdk_call_duration_seconds",
15            Help:    "Duration of cloud SDK calls",
16            Buckets: prometheus.DefBuckets,
17        },
18        []string{"service", "operation", "status"},
19    )
20
21    sdkCallTotal = promauto.NewCounterVec(
22        prometheus.CounterOpts{
23            Name: "cloud_sdk_calls_total",
24            Help: "Total number of cloud SDK calls",
25        },
26        []string{"service", "operation", "status"},
27    )
28)
29
30// InstrumentedS3Client wraps S3 client with metrics
31type InstrumentedS3Client struct {
32    client *s3.Client
33}
34
35func GetObject(ctx context.Context, input *s3.GetObjectInput) {
36    start := time.Now()
37
38    output, err := i.client.GetObject(ctx, input)
39
40    status := "success"
41    if err != nil {
42        status = "error"
43    }
44
45    duration := time.Since(start).Seconds()
46    sdkCallDuration.WithLabelValues("s3", "GetObject", status).Observe(duration)
47    sdkCallTotal.WithLabelValues("s3", "GetObject", status).Inc()
48
49    return output, err
50}

Cost Optimization

Cloud services are like utility bills - you pay for what you use. Smart optimization can reduce your bill by 50-90% while maintaining or even improving performance.

Reduce API Calls:

 1package optimization
 2
 3import (
 4    "context"
 5    "sync"
 6    "time"
 7)
 8
 9// CachedStorage adds caching layer to storage
10type CachedStorage struct {
11    storage Storage
12    cache   map[string]cachedItem
13    mu      sync.RWMutex
14    ttl     time.Duration
15}
16
17type cachedItem struct {
18    data      []byte
19    expiresAt time.Time
20}
21
22func NewCachedStorage(storage Storage, ttl time.Duration) *CachedStorage {
23    return &CachedStorage{
24        storage: storage,
25        cache:   make(map[string]cachedItem),
26        ttl:     ttl,
27    }
28}
29
30func Download(ctx context.Context, path string) {
31    // Check cache
32    c.mu.RLock()
33    if item, ok := c.cache[path]; ok && time.Now().Before(item.expiresAt) {
34        c.mu.RUnlock()
35        return item.data, nil
36    }
37    c.mu.RUnlock()
38
39    // Fetch from storage
40    data, err := c.storage.Download(ctx, path)
41    if err != nil {
42        return nil, err
43    }
44
45    // Update cache
46    c.mu.Lock()
47    c.cache[path] = cachedItem{
48        data:      data,
49        expiresAt: time.Now().Add(c.ttl),
50    }
51    c.mu.Unlock()
52
53    return data, nil
54}
55
56// BatchOperations groups operations to reduce API calls
57func BatchUpload(ctx context.Context, storage Storage, files map[string]io.Reader) error {
58    errCh := make(chan error, len(files))
59    sem := make(chan struct{}, 10) // Limit concurrency
60
61    for path, data := range files {
62        sem <- struct{}{}
63        go func(p string, d io.Reader) {
64            defer func() { <-sem }()
65            errCh <- storage.Upload(ctx, p, d)
66        }(path, data)
67    }
68
69    // Wait for all uploads
70    for i := 0; i < len(files); i++ {
71        if err := <-errCh; err != nil {
72            return err
73        }
74    }
75
76    return nil
77}

Real-World Savings: An image processing service implemented these optimizations:

  • Caching: Reduced S3 GET requests by 80%
  • Batch operations: Reduced DynamoDB writes by 90%
  • Overall savings: $1,200/month reduction in AWS costs
  • Performance improvement: 60% faster response times due to cache hits

Cost Optimization vs Performance Trade-offs:

  • More caching: Lower costs, faster reads, but potential stale data
  • Larger batch sizes: Fewer API calls but higher memory usage
  • Aggressive timeouts: Lower costs but more failures
  • Compression: Lower transfer costs but higher CPU usage

💡 Key Takeaway: Always measure before and after optimizations. Sometimes the "optimization" actually increases costs due to increased complexity or memory usage.

IAM and Authentication Patterns

Identity and Access Management (IAM) is like the security system for your cloud applications. Just as a building has key cards, security guards, and visitor logs, cloud IAM provides authentication (who you are) and authorization (what you can do).

Cross-Account Access Patterns

AWS Cross-Account Access with Assume Role:

 1package iam
 2
 3import (
 4    "context"
 5    "fmt"
 6    "time"
 7
 8    "github.com/aws/aws-sdk-go-v2/aws"
 9    "github.com/aws/aws-sdk-go-v2/config"
10    "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
11    "github.com/aws/aws-sdk-go-v2/service/sts"
12    "github.com/aws/aws-sdk-go-v2/service/s3"
13)
14
15// CrossAccountClient manages cross-account access
16type CrossAccountClient struct {
17    accounts map[string]aws.Config
18}
19
20func NewCrossAccountClient() *CrossAccountClient {
21    return &CrossAccountClient{
22        accounts: make(map[string]aws.Config),
23    }
24}
25
26// AssumeRole creates credentials for cross-account access
27func AssumeRole(ctx context.Context, roleARN, externalID string, duration time.Duration) (aws.Config, error) {
28    // Load default config
29    cfg, err := config.LoadDefaultConfig(ctx)
30    if err != nil {
31        return aws.Config{}, fmt.Errorf("failed to load config: %w", err)
32    }
33
34    // Create STS client
35    stsClient := sts.NewFromConfig(cfg)
36
37    // Create assume role provider
38    provider := stscreds.NewAssumeRoleProvider(stsClient, roleARN, func(o *stscreds.AssumeRoleOptions) {
39        o.RoleSessionName = "go-application"
40        o.ExternalID = aws.String(externalID)
41        o.Duration = duration
42    })
43
44    // Create new config with assumed role credentials
45    cfg.Credentials = aws.NewCredentialsCache(provider)
46
47    return cfg, nil
48}
49
50// GetS3ClientForAccount gets S3 client for specific account
51func GetS3ClientForAccount(ctx context.Context, accountID, roleARN, externalID string) (*s3.Client, error) {
52    // Check cache first
53    if cfg, ok := c.accounts[accountID]; ok {
54        return s3.NewFromConfig(cfg), nil
55    }
56
57    // Assume role
58    cfg, err := AssumeRole(ctx, roleARN, externalID, 1*time.Hour)
59    if err != nil {
60        return nil, fmt.Errorf("failed to assume role: %w", err)
61    }
62
63    // Cache configuration
64    c.accounts[accountID] = cfg
65
66    return s3.NewFromConfig(cfg), nil
67}
68
69// RefreshCredentials refreshes expired credentials
70func RefreshCredentials(ctx context.Context, accountID, roleARN, externalID string) error {
71    cfg, err := AssumeRole(ctx, roleARN, externalID, 1*time.Hour)
72    if err != nil {
73        return err
74    }
75
76    c.accounts[accountID] = cfg
77    return nil
78}

Real-World Scenario: A SaaS platform stores customer data in separate AWS accounts for isolation. The application uses cross-account access to:

  • Read/write customer data from isolated accounts
  • Maintain security boundaries between customers
  • Simplify billing and cost tracking per customer
  • Meet compliance requirements for data isolation

Service Account Management

Google Cloud Service Account Patterns:

 1package iam
 2
 3import (
 4    "context"
 5    "encoding/json"
 6    "fmt"
 7
 8    "cloud.google.com/go/storage"
 9    "golang.org/x/oauth2/google"
10    "google.golang.org/api/option"
11)
12
13// ServiceAccountManager manages GCP service accounts
14type ServiceAccountManager struct {
15    projectID string
16}
17
18func NewServiceAccountManager(projectID string) *ServiceAccountManager {
19    return &ServiceAccountManager{projectID: projectID}
20}
21
22// CreateClientWithServiceAccount creates client using service account
23func CreateClientWithServiceAccount(ctx context.Context, keyJSON []byte) (*storage.Client, error) {
24    creds, err := google.CredentialsFromJSON(ctx, keyJSON, storage.ScopeReadWrite)
25    if err != nil {
26        return nil, fmt.Errorf("failed to create credentials: %w", err)
27    }
28
29    client, err := storage.NewClient(ctx, option.WithCredentials(creds))
30    if err != nil {
31        return nil, fmt.Errorf("failed to create client: %w", err)
32    }
33
34    return client, nil
35}
36
37// ImpersonateServiceAccount impersonates a service account
38func ImpersonateServiceAccount(ctx context.Context, targetServiceAccount string) (*storage.Client, error) {
39    // Load default credentials
40    creds, err := google.FindDefaultCredentials(ctx, storage.ScopeReadWrite)
41    if err != nil {
42        return nil, err
43    }
44
45    // Create impersonation configuration
46    ts, err := google.ImpersonateTokenSource(
47        ctx,
48        creds.TokenSource,
49        targetServiceAccount,
50        []string{storage.ScopeReadWrite},
51    )
52    if err != nil {
53        return nil, err
54    }
55
56    client, err := storage.NewClient(ctx, option.WithTokenSource(ts))
57    if err != nil {
58        return nil, err
59    }
60
61    return client, nil
62}
63
64// RotateServiceAccountKey demonstrates key rotation pattern
65func RotateServiceAccountKey(ctx context.Context, oldKeyJSON, newKeyJSON []byte) error {
66    // Create new client with new key
67    newClient, err := sam.CreateClientWithServiceAccount(ctx, newKeyJSON)
68    if err != nil {
69        return fmt.Errorf("failed to create client with new key: %w", err)
70    }
71    defer newClient.Close()
72
73    // Test new credentials
74    bucket := newClient.Bucket("test-bucket")
75    if _, err := bucket.Attrs(ctx); err != nil {
76        return fmt.Errorf("new credentials test failed: %w", err)
77    }
78
79    // Old credentials can now be safely deleted
80    return nil
81}

Workload Identity Federation

Azure Managed Identity with Workload Identity:

 1package iam
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
 8    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
 9)
10
11// WorkloadIdentityClient manages workload identity
12type WorkloadIdentityClient struct {
13    credential *azidentity.WorkloadIdentityCredential
14}
15
16func NewWorkloadIdentityClient(ctx context.Context, tenantID, clientID, tokenFilePath string) (*WorkloadIdentityClient, error) {
17    cred, err := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
18        TenantID:      tenantID,
19        ClientID:      clientID,
20        TokenFilePath: tokenFilePath,
21    })
22    if err != nil {
23        return nil, fmt.Errorf("failed to create workload identity credential: %w", err)
24    }
25
26    return &WorkloadIdentityClient{credential: cred}, nil
27}
28
29// GetBlobClient creates blob client with workload identity
30func GetBlobClient(ctx context.Context, accountURL string) (*azblob.Client, error) {
31    client, err := azblob.NewClient(accountURL, w.credential, nil)
32    if err != nil {
33        return nil, fmt.Errorf("failed to create blob client: %w", err)
34    }
35
36    return client, nil
37}
38
39// ChainedCredentials demonstrates credential chaining
40func ChainedCredentials(ctx context.Context) (*azidentity.ChainedTokenCredential, error) {
41    // Try multiple credential types in order
42    cred, err := azidentity.NewChainedTokenCredential([]azidentity.TokenCredential{
43        // First try: Managed Identity (for Azure VMs, App Service, etc.)
44        &azidentity.ManagedIdentityCredential{},
45        // Second try: Environment variables
46        &azidentity.EnvironmentCredential{},
47        // Third try: Azure CLI credentials
48        &azidentity.AzureCLICredential{},
49    }, nil)
50
51    if err != nil {
52        return nil, fmt.Errorf("failed to create chained credential: %w", err)
53    }
54
55    return cred, nil
56}

Production Best Practice: Use workload identity instead of storing credentials as secrets. This eliminates credential leakage risks, simplifies rotation, and provides built-in credential lifecycle management.

Network and VPC Configurations

Network configuration is like designing the plumbing and electrical systems for a building. You need to plan how data flows between services, where firewalls should be, and how to connect to external systems.

VPC Endpoint Configuration

AWS VPC Endpoint for S3 Access:

 1package network
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "github.com/aws/aws-sdk-go-v2/aws"
 8    "github.com/aws/aws-sdk-go-v2/config"
 9    "github.com/aws/aws-sdk-go-v2/service/s3"
10)
11
12// VPCEndpointClient configures S3 access via VPC endpoint
13type VPCEndpointClient struct {
14    client *s3.Client
15}
16
17func NewVPCEndpointClient(ctx context.Context, vpcEndpointURL string) (*VPCEndpointClient, error) {
18    cfg, err := config.LoadDefaultConfig(ctx)
19    if err != nil {
20        return nil, err
21    }
22
23    // Configure S3 to use VPC endpoint
24    client := s3.NewFromConfig(cfg, func(o *s3.Options) {
25        // Use custom endpoint for VPC
26        o.BaseEndpoint = aws.String(vpcEndpointURL)
27        // Force path-style addressing for VPC endpoints
28        o.UsePathStyle = true
29    })
30
31    return &VPCEndpointClient{client: client}, nil
32}
33
34// AccessS3ViaVPC accesses S3 through VPC endpoint
35func AccessS3ViaVPC(ctx context.Context, bucket, key string) ([]byte, error) {
36    // This request goes through VPC endpoint, not public internet
37    result, err := v.client.GetObject(ctx, &s3.GetObjectInput{
38        Bucket: aws.String(bucket),
39        Key:    aws.String(key),
40    })
41    if err != nil {
42        return nil, err
43    }
44    defer result.Body.Close()
45
46    return io.ReadAll(result.Body)
47}

Cost Impact: VPC endpoints eliminate data transfer costs for traffic between your VPC and AWS services. For high-volume applications, this can save thousands of dollars monthly while improving security by keeping traffic off the public internet.

Private Service Connect

Google Cloud Private Service Connect:

 1package network
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "cloud.google.com/go/storage"
 8    "google.golang.org/api/option"
 9)
10
11// PrivateServiceConnectClient uses Private Service Connect
12type PrivateServiceConnectClient struct {
13    client *storage.Client
14}
15
16func NewPrivateServiceConnectClient(ctx context.Context, endpoint string) (*PrivateServiceConnectClient, error) {
17    // Configure client to use private endpoint
18    client, err := storage.NewClient(ctx,
19        option.WithEndpoint(endpoint),
20    )
21    if err != nil {
22        return nil, fmt.Errorf("failed to create client: %w", err)
23    }
24
25    return &PrivateServiceConnectClient{client: client}, nil
26}
27
28// AccessStoragePrivately accesses storage via private network
29func AccessStoragePrivately(ctx context.Context, bucket, object string) ([]byte, error) {
30    // Traffic goes through Private Service Connect
31    rc, err := p.client.Bucket(bucket).Object(object).NewReader(ctx)
32    if err != nil {
33        return nil, err
34    }
35    defer rc.Close()
36
37    return io.ReadAll(rc)
38}

Network Security Groups

Azure NSG-Aware Client Configuration:

 1package network
 2
 3import (
 4    "context"
 5    "fmt"
 6    "net"
 7    "net/http"
 8    "time"
 9
10    "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
11    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
12)
13
14// SecureNetworkClient configures secure network access
15type SecureNetworkClient struct {
16    client *azblob.Client
17}
18
19func NewSecureNetworkClient(ctx context.Context, accountURL string) (*SecureNetworkClient, error) {
20    cred, err := azidentity.NewDefaultAzureCredential(nil)
21    if err != nil {
22        return nil, err
23    }
24
25    // Configure HTTP client with network restrictions
26    httpClient := &http.Client{
27        Transport: &http.Transport{
28            DialContext: (&net.Dialer{
29                Timeout:   30 * time.Second,
30                KeepAlive: 30 * time.Second,
31            }).DialContext,
32            MaxIdleConns:          100,
33            IdleConnTimeout:       90 * time.Second,
34            TLSHandshakeTimeout:   10 * time.Second,
35            ExpectContinueTimeout: 1 * time.Second,
36            // Disable HTTP/2 for better connection pooling control
37            ForceAttemptHTTP2: false,
38        },
39        Timeout: 60 * time.Second,
40    }
41
42    client, err := azblob.NewClient(accountURL, cred, &azblob.ClientOptions{
43        ClientOptions: azcore.ClientOptions{
44            Transport: httpClient,
45        },
46    })
47    if err != nil {
48        return nil, err
49    }
50
51    return &SecureNetworkClient{client: client}, nil
52}

Database Migration Patterns

Database migration is like moving from an apartment to a house. You need to plan what to move, how to move it, and ensure nothing is lost in the process while minimizing downtime.

SQL to NoSQL Migration

PostgreSQL to DynamoDB Migration Pattern:

  1package migration
  2
  3import (
  4    "context"
  5    "database/sql"
  6    "fmt"
  7    "log"
  8    "time"
  9
 10    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
 11    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
 12)
 13
 14// Migrator handles database migration
 15type Migrator struct {
 16    sourceDB    *sql.DB
 17    targetDB    *dynamodb.Client
 18    batchSize   int
 19    tableName   string
 20}
 21
 22func NewMigrator(sourceDB *sql.DB, targetDB *dynamodb.Client, tableName string) *Migrator {
 23    return &Migrator{
 24        sourceDB:  sourceDB,
 25        targetDB:  targetDB,
 26        batchSize: 25, // DynamoDB batch limit
 27        tableName: tableName,
 28    }
 29}
 30
 31// MigrateUsers migrates users from PostgreSQL to DynamoDB
 32func MigrateUsers(ctx context.Context) error {
 33    // Query source database
 34    rows, err := m.sourceDB.QueryContext(ctx,
 35        "SELECT id, email, name, created_at, metadata FROM users ORDER BY id")
 36    if err != nil {
 37        return fmt.Errorf("query failed: %w", err)
 38    }
 39    defer rows.Close()
 40
 41    var batch []map[string]interface{}
 42    count := 0
 43
 44    for rows.Next() {
 45        var id, email, name string
 46        var createdAt time.Time
 47        var metadata []byte
 48
 49        if err := rows.Scan(&id, &email, &name, &createdAt, &metadata); err != nil {
 50            return err
 51        }
 52
 53        // Transform to DynamoDB format
 54        item := map[string]interface{}{
 55            "id":         id,
 56            "email":      email,
 57            "name":       name,
 58            "created_at": createdAt.Unix(),
 59            "metadata":   string(metadata),
 60            "migrated_at": time.Now().Unix(),
 61        }
 62
 63        batch = append(batch, item)
 64
 65        // Write batch when full
 66        if len(batch) >= m.batchSize {
 67            if err := m.writeBatch(ctx, batch); err != nil {
 68                return err
 69            }
 70            count += len(batch)
 71            log.Printf("Migrated %d records", count)
 72            batch = batch[:0]
 73        }
 74    }
 75
 76    // Write remaining items
 77    if len(batch) > 0 {
 78        if err := m.writeBatch(ctx, batch); err != nil {
 79            return err
 80        }
 81        count += len(batch)
 82    }
 83
 84    log.Printf("Migration complete: %d total records", count)
 85    return nil
 86}
 87
 88// writeBatch writes a batch to DynamoDB
 89func writeBatch(ctx context.Context, items []map[string]interface{}) error {
 90    writeRequests := make([]types.WriteRequest, len(items))
 91
 92    for i, item := range items {
 93        av, err := attributevalue.MarshalMap(item)
 94        if err != nil {
 95            return err
 96        }
 97
 98        writeRequests[i] = types.WriteRequest{
 99            PutRequest: &types.PutRequest{Item: av},
100        }
101    }
102
103    _, err := m.targetDB.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
104        RequestItems: map[string][]types.WriteRequest{
105            m.tableName: writeRequests,
106        },
107    })
108
109    return err
110}
111
112// DualWrite writes to both databases during migration
113func DualWrite(ctx context.Context, user User) error {
114    // Write to PostgreSQL
115    _, err := m.sourceDB.ExecContext(ctx,
116        "INSERT INTO users (id, email, name, created_at) VALUES ($1, $2, $3, $4)",
117        user.ID, user.Email, user.Name, user.CreatedAt)
118    if err != nil {
119        return fmt.Errorf("postgres write failed: %w", err)
120    }
121
122    // Write to DynamoDB
123    av, err := attributevalue.MarshalMap(user)
124    if err != nil {
125        return err
126    }
127
128    _, err = m.targetDB.PutItem(ctx, &dynamodb.PutItemInput{
129        TableName: aws.String(m.tableName),
130        Item:      av,
131    })
132    if err != nil {
133        log.Printf("WARNING: DynamoDB write failed: %v", err)
134        // Don't fail the request - log for later reconciliation
135    }
136
137    return nil
138}
139
140// VerifyMigration verifies data consistency
141func VerifyMigration(ctx context.Context) error {
142    // Count records in source
143    var sourceCount int
144    err := m.sourceDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM users").Scan(&sourceCount)
145    if err != nil {
146        return err
147    }
148
149    // Count records in target
150    result, err := m.targetDB.Scan(ctx, &dynamodb.ScanInput{
151        TableName: aws.String(m.tableName),
152        Select:    types.SelectCount,
153    })
154    if err != nil {
155        return err
156    }
157
158    targetCount := int(result.Count)
159
160    if sourceCount != targetCount {
161        return fmt.Errorf("count mismatch: source=%d, target=%d", sourceCount, targetCount)
162    }
163
164    log.Printf("Verification successful: %d records in both databases", sourceCount)
165    return nil
166}

Migration Strategy: The dual-write pattern ensures zero downtime migration:

  1. Phase 1: Migrate existing data in batches
  2. Phase 2: Dual-write new records to both databases
  3. Phase 3: Verify data consistency
  4. Phase 4: Switch reads to new database
  5. Phase 5: Stop writes to old database
  6. Phase 6: Decommission old database

NoSQL to NoSQL Migration

DynamoDB to Firestore Migration:

 1package migration
 2
 3import (
 4    "context"
 5    "fmt"
 6
 7    "cloud.google.com/go/firestore"
 8    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
 9)
10
11// NoSQLMigrator migrates between NoSQL databases
12type NoSQLMigrator struct {
13    sourceDynamoDB *dynamodb.Client
14    targetFirestore *firestore.Client
15}
16
17func NewNoSQLMigrator(dynamo *dynamodb.Client, fs *firestore.Client) *NoSQLMigrator {
18    return &NoSQLMigrator{
19        sourceDynamoDB: dynamo,
20        targetFirestore: fs,
21    }
22}
23
24// MigrateTable migrates entire DynamoDB table to Firestore
25func MigrateTable(ctx context.Context, dynamoTable, firestoreCollection string) error {
26    // Scan DynamoDB table
27    paginator := dynamodb.NewScanPaginator(n.sourceDynamoDB, &dynamodb.ScanInput{
28        TableName: aws.String(dynamoTable),
29    })
30
31    count := 0
32    for paginator.HasMorePages() {
33        page, err := paginator.NextPage(ctx)
34        if err != nil {
35            return fmt.Errorf("scan failed: %w", err)
36        }
37
38        // Convert and write to Firestore
39        for _, item := range page.Items {
40            doc := convertDynamoToFirestore(item)
41
42            docID := doc["id"].(string)
43            _, err := n.targetFirestore.Collection(firestoreCollection).Doc(docID).Set(ctx, doc)
44            if err != nil {
45                return fmt.Errorf("firestore write failed: %w", err)
46            }
47            count++
48        }
49
50        log.Printf("Migrated %d documents", count)
51    }
52
53    return nil
54}
55
56func convertDynamoToFirestore(item map[string]types.AttributeValue) map[string]interface{} {
57    // Convert DynamoDB attribute values to Firestore format
58    doc := make(map[string]interface{})
59
60    for key, value := range item {
61        switch v := value.(type) {
62        case *types.AttributeValueMemberS:
63            doc[key] = v.Value
64        case *types.AttributeValueMemberN:
65            // Convert number string to actual number
66            doc[key] = v.Value
67        case *types.AttributeValueMemberBOOL:
68            doc[key] = v.Value
69        // Add other type conversions as needed
70        }
71    }
72
73    return doc
74}

Monitoring and Observability Integration

Monitoring is like installing security cameras and sensors throughout a building. You need to know what's happening, detect problems early, and diagnose issues quickly when they occur.

SDK Metrics and Tracing

Comprehensive SDK Instrumentation:

  1package observability
  2
  3import (
  4    "context"
  5    "time"
  6
  7    "github.com/aws/aws-sdk-go-v2/aws"
  8    "github.com/aws/aws-sdk-go-v2/service/s3"
  9    "github.com/prometheus/client_golang/prometheus"
 10    "github.com/prometheus/client_golang/prometheus/promauto"
 11    "go.opentelemetry.io/otel"
 12    "go.opentelemetry.io/otel/attribute"
 13    "go.opentelemetry.io/otel/trace"
 14)
 15
 16var (
 17    // Prometheus metrics
 18    sdkCallDuration = promauto.NewHistogramVec(
 19        prometheus.HistogramOpts{
 20            Name:    "cloud_sdk_duration_seconds",
 21            Help:    "Duration of cloud SDK calls",
 22            Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
 23        },
 24        []string{"provider", "service", "operation", "status"},
 25    )
 26
 27    sdkCallsTotal = promauto.NewCounterVec(
 28        prometheus.CounterOpts{
 29            Name: "cloud_sdk_calls_total",
 30            Help: "Total number of cloud SDK calls",
 31        },
 32        []string{"provider", "service", "operation", "status"},
 33    )
 34
 35    sdkErrorsTotal = promauto.NewCounterVec(
 36        prometheus.CounterOpts{
 37            Name: "cloud_sdk_errors_total",
 38            Help: "Total number of cloud SDK errors",
 39        },
 40        []string{"provider", "service", "operation", "error_type"},
 41    )
 42
 43    sdkRetries = promauto.NewCounterVec(
 44        prometheus.CounterOpts{
 45            Name: "cloud_sdk_retries_total",
 46            Help: "Total number of SDK operation retries",
 47        },
 48        []string{"provider", "service", "operation"},
 49    )
 50
 51    sdkThrottles = promauto.NewCounterVec(
 52        prometheus.CounterOpts{
 53            Name: "cloud_sdk_throttles_total",
 54            Help: "Total number of SDK throttle errors",
 55        },
 56        []string{"provider", "service", "operation"},
 57    )
 58)
 59
 60// InstrumentedS3Client wraps S3 client with observability
 61type InstrumentedS3Client struct {
 62    client *s3.Client
 63    tracer trace.Tracer
 64}
 65
 66func NewInstrumentedS3Client(client *s3.Client) *InstrumentedS3Client {
 67    return &InstrumentedS3Client{
 68        client: client,
 69        tracer: otel.Tracer("aws-s3"),
 70    }
 71}
 72
 73// GetObject gets object with full instrumentation
 74func GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
 75    // Start trace span
 76    ctx, span := i.tracer.Start(ctx, "s3.GetObject",
 77        trace.WithAttributes(
 78            attribute.String("bucket", bucket),
 79            attribute.String("key", key),
 80        ),
 81    )
 82    defer span.End()
 83
 84    // Record metrics
 85    start := time.Now()
 86    status := "success"
 87    var errorType string
 88
 89    // Execute operation
 90    result, err := i.client.GetObject(ctx, &s3.GetObjectInput{
 91        Bucket: aws.String(bucket),
 92        Key:    aws.String(key),
 93    })
 94
 95    duration := time.Since(start).Seconds()
 96
 97    // Handle errors
 98    if err != nil {
 99        status = "error"
100        errorType = classifyError(err)
101
102        span.RecordError(err)
103
104        // Track specific error types
105        if isThrottleError(err) {
106            sdkThrottles.WithLabelValues("aws", "s3", "GetObject").Inc()
107        }
108
109        sdkErrorsTotal.WithLabelValues("aws", "s3", "GetObject", errorType).Inc()
110    }
111
112    // Record metrics
113    sdkCallDuration.WithLabelValues("aws", "s3", "GetObject", status).Observe(duration)
114    sdkCallsTotal.WithLabelValues("aws", "s3", "GetObject", status).Inc()
115
116    // Add span attributes
117    span.SetAttributes(
118        attribute.Float64("duration_ms", duration*1000),
119        attribute.String("status", status),
120    )
121
122    if result != nil {
123        span.SetAttributes(
124            attribute.Int64("content_length", result.ContentLength),
125        )
126    }
127
128    return result, err
129}
130
131func classifyError(err error) string {
132    // Classify AWS errors
133    var ae smithy.APIError
134    if errors.As(err, &ae) {
135        return ae.ErrorCode()
136    }
137    return "unknown"
138}
139
140func isThrottleError(err error) bool {
141    var ae smithy.APIError
142    if errors.As(err, &ae) {
143        code := ae.ErrorCode()
144        return code == "ThrottlingException" ||
145               code == "ProvisionedThroughputExceededException" ||
146               code == "RequestLimitExceeded"
147    }
148    return false
149}

CloudWatch/Stackdriver Integration

Custom Metrics Publishing:

 1package observability
 2
 3import (
 4    "context"
 5    "time"
 6
 7    "github.com/aws/aws-sdk-go-v2/aws"
 8    "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
 9    "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
10)
11
12// MetricsPublisher publishes custom metrics
13type MetricsPublisher struct {
14    cwClient  *cloudwatch.Client
15    namespace string
16}
17
18func NewMetricsPublisher(client *cloudwatch.Client, namespace string) *MetricsPublisher {
19    return &MetricsPublisher{
20        cwClient:  client,
21        namespace: namespace,
22    }
23}
24
25// PublishSDKMetrics publishes SDK performance metrics
26func PublishSDKMetrics(ctx context.Context, service, operation string, duration time.Duration, success bool) error {
27    metricData := []types.MetricDatum{
28        {
29            MetricName: aws.String("SDKCallDuration"),
30            Value:      aws.Float64(duration.Seconds()),
31            Unit:       types.StandardUnitSeconds,
32            Timestamp:  aws.Time(time.Now()),
33            Dimensions: []types.Dimension{
34                {Name: aws.String("Service"), Value: aws.String(service)},
35                {Name: aws.String("Operation"), Value: aws.String(operation)},
36            },
37        },
38        {
39            MetricName: aws.String("SDKCallSuccess"),
40            Value:      aws.Float64(boolToFloat(success)),
41            Unit:       types.StandardUnitCount,
42            Timestamp:  aws.Time(time.Now()),
43            Dimensions: []types.Dimension{
44                {Name: aws.String("Service"), Value: aws.String(service)},
45                {Name: aws.String("Operation"), Value: aws.String(operation)},
46            },
47        },
48    }
49
50    _, err := m.cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{
51        Namespace:  aws.String(m.namespace),
52        MetricData: metricData,
53    })
54
55    return err
56}
57
58// PublishBatchMetrics publishes metrics in batch for efficiency
59func PublishBatchMetrics(ctx context.Context, metrics []types.MetricDatum) error {
60    // CloudWatch allows up to 20 metrics per request
61    const batchSize = 20
62
63    for i := 0; i < len(metrics); i += batchSize {
64        end := i + batchSize
65        if end > len(metrics) {
66            end = len(metrics)
67        }
68
69        batch := metrics[i:end]
70        _, err := m.cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{
71            Namespace:  aws.String(m.namespace),
72            MetricData: batch,
73        })
74        if err != nil {
75            return err
76        }
77    }
78
79    return nil
80}
81
82func boolToFloat(b bool) float64 {
83    if b {
84        return 1.0
85    }
86    return 0.0
87}

Disaster Recovery and Failover Strategies

Disaster recovery is like having fire extinguishers, emergency exits, and backup generators. You hope you never need them, but when disaster strikes, they're invaluable.

Multi-Region Failover

Automated Failover Pattern:

  1package dr
  2
  3import (
  4    "context"
  5    "fmt"
  6    "sync"
  7    "time"
  8
  9    "github.com/aws/aws-sdk-go-v2/aws"
 10    "github.com/aws/aws-sdk-go-v2/service/s3"
 11)
 12
 13// RegionConfig represents a region configuration
 14type RegionConfig struct {
 15    Region   string
 16    Endpoint string
 17    Priority int
 18}
 19
 20// MultiRegionClient handles multi-region failover
 21type MultiRegionClient struct {
 22    regions        []RegionConfig
 23    currentRegion  int
 24    healthCheck    func(context.Context, *s3.Client) error
 25    mu             sync.RWMutex
 26    clients        map[string]*s3.Client
 27}
 28
 29func NewMultiRegionClient(regions []RegionConfig, healthCheck func(context.Context, *s3.Client) error) *MultiRegionClient {
 30    return &MultiRegionClient{
 31        regions:     regions,
 32        healthCheck: healthCheck,
 33        clients:     make(map[string]*s3.Client),
 34    }
 35}
 36
 37// GetObject retrieves object with automatic failover
 38func GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
 39    m.mu.RLock()
 40    region := m.regions[m.currentRegion]
 41    client := m.clients[region.Region]
 42    m.mu.RUnlock()
 43
 44    // Try primary region
 45    result, err := client.GetObject(ctx, &s3.GetObjectInput{
 46        Bucket: aws.String(bucket),
 47        Key:    aws.String(key),
 48    })
 49
 50    if err == nil {
 51        return result, nil
 52    }
 53
 54    // Failover to next region
 55    log.Printf("Primary region %s failed: %v", region.Region, err)
 56    return m.failover(ctx, bucket, key)
 57}
 58
 59// failover attempts to use backup regions
 60func failover(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
 61    m.mu.Lock()
 62    defer m.mu.Unlock()
 63
 64    // Try each region in priority order
 65    for i := 1; i < len(m.regions); i++ {
 66        nextIdx := (m.currentRegion + i) % len(m.regions)
 67        region := m.regions[nextIdx]
 68        client := m.clients[region.Region]
 69
 70        // Health check
 71        if err := m.healthCheck(ctx, client); err != nil {
 72            log.Printf("Region %s health check failed: %v", region.Region, err)
 73            continue
 74        }
 75
 76        // Try operation
 77        result, err := client.GetObject(ctx, &s3.GetObjectInput{
 78            Bucket: aws.String(bucket),
 79            Key:    aws.String(key),
 80        })
 81
 82        if err == nil {
 83            // Update current region
 84            m.currentRegion = nextIdx
 85            log.Printf("Failed over to region: %s", region.Region)
 86            return result, nil
 87        }
 88
 89        log.Printf("Region %s failed: %v", region.Region, err)
 90    }
 91
 92    return nil, fmt.Errorf("all regions failed")
 93}
 94
 95// MonitorHealth continuously monitors region health
 96func MonitorHealth(ctx context.Context, interval time.Duration) {
 97    ticker := time.NewTicker(interval)
 98    defer ticker.Stop()
 99
100    for {
101        select {
102        case <-ctx.Done():
103            return
104        case <-ticker.C:
105            m.checkAllRegions(ctx)
106        }
107    }
108}
109
110func checkAllRegions(ctx context.Context) {
111    for i, region := range m.regions {
112        client := m.clients[region.Region]
113
114        err := m.healthCheck(ctx, client)
115        if err != nil {
116            log.Printf("Region %s unhealthy: %v", region.Region, err)
117
118            // If current region is unhealthy, trigger failover
119            m.mu.RLock()
120            if i == m.currentRegion {
121                m.mu.RUnlock()
122                m.triggerFailover(ctx)
123                continue
124            }
125            m.mu.RUnlock()
126        }
127    }
128}
129
130func triggerFailover(ctx context.Context) {
131    m.mu.Lock()
132    defer m.mu.Unlock()
133
134    // Find next healthy region
135    for i := 1; i < len(m.regions); i++ {
136        nextIdx := (m.currentRegion + i) % len(m.regions)
137        region := m.regions[nextIdx]
138        client := m.clients[region.Region]
139
140        if err := m.healthCheck(ctx, client); err == nil {
141            m.currentRegion = nextIdx
142            log.Printf("Proactively failed over to region: %s", region.Region)
143            return
144        }
145    }
146}

Cross-Cloud Backup Strategy

Automated Cross-Cloud Backup:

  1package dr
  2
  3import (
  4    "context"
  5    "fmt"
  6    "io"
  7    "time"
  8)
  9
 10// BackupManager manages cross-cloud backups
 11type BackupManager struct {
 12    primary   Storage
 13    backup    Storage
 14    scheduler *time.Ticker
 15}
 16
 17func NewBackupManager(primary, backup Storage, interval time.Duration) *BackupManager {
 18    return &BackupManager{
 19        primary:   primary,
 20        backup:    backup,
 21        scheduler: time.NewTicker(interval),
 22    }
 23}
 24
 25// StartContinuousBackup starts continuous backup process
 26func StartContinuousBackup(ctx context.Context) error {
 27    for {
 28        select {
 29        case <-ctx.Done():
 30            return ctx.Err()
 31        case <-bm.scheduler.C:
 32            if err := bm.performBackup(ctx); err != nil {
 33                log.Printf("Backup failed: %v", err)
 34            }
 35        }
 36    }
 37}
 38
 39// performBackup backs up data from primary to backup storage
 40func performBackup(ctx context.Context) error {
 41    // List all files in primary
 42    files, err := bm.primary.List(ctx, "")
 43    if err != nil {
 44        return fmt.Errorf("failed to list files: %w", err)
 45    }
 46
 47    log.Printf("Starting backup of %d files", len(files))
 48
 49    for _, file := range files {
 50        // Download from primary
 51        data, err := bm.primary.Download(ctx, file)
 52        if err != nil {
 53            log.Printf("Failed to download %s: %v", file, err)
 54            continue
 55        }
 56
 57        // Upload to backup
 58        reader := bytes.NewReader(data)
 59        if err := bm.backup.Upload(ctx, file, reader); err != nil {
 60            log.Printf("Failed to backup %s: %v", file, err)
 61            continue
 62        }
 63
 64        log.Printf("Backed up: %s", file)
 65    }
 66
 67    return nil
 68}
 69
 70// VerifyBackup verifies backup integrity
 71func VerifyBackup(ctx context.Context, sampleSize int) error {
 72    // Get sample of files
 73    files, err := bm.primary.List(ctx, "")
 74    if err != nil {
 75        return err
 76    }
 77
 78    // Verify random sample
 79    for i := 0; i < sampleSize && i < len(files); i++ {
 80        file := files[i]
 81
 82        // Download from both storages
 83        primaryData, err := bm.primary.Download(ctx, file)
 84        if err != nil {
 85            return fmt.Errorf("primary download failed: %w", err)
 86        }
 87
 88        backupData, err := bm.backup.Download(ctx, file)
 89        if err != nil {
 90            return fmt.Errorf("backup download failed: %w", err)
 91        }
 92
 93        // Compare checksums
 94        if !bytes.Equal(primaryData, backupData) {
 95            return fmt.Errorf("data mismatch for %s", file)
 96        }
 97    }
 98
 99    log.Printf("Backup verification successful for %d files", sampleSize)
100    return nil
101}
102
103// RestoreFromBackup restores data from backup to primary
104func RestoreFromBackup(ctx context.Context, prefix string) error {
105    files, err := bm.backup.List(ctx, prefix)
106    if err != nil {
107        return err
108    }
109
110    log.Printf("Restoring %d files from backup", len(files))
111
112    for _, file := range files {
113        data, err := bm.backup.Download(ctx, file)
114        if err != nil {
115            return err
116        }
117
118        reader := bytes.NewReader(data)
119        if err := bm.primary.Upload(ctx, file, reader); err != nil {
120            return err
121        }
122
123        log.Printf("Restored: %s", file)
124    }
125
126    return nil
127}

RTO and RPO Targets:

  • Recovery Time Objective (RTO): Maximum acceptable downtime
    • Multi-region failover: < 1 minute
    • Cross-cloud restore: < 15 minutes
  • Recovery Point Objective (RPO): Maximum acceptable data loss
    • Continuous replication: < 1 second
    • Hourly backups: < 1 hour
    • Daily backups: < 24 hours

💡 Key Takeaway: Disaster recovery isn't just about having backups—it's about regularly testing your recovery procedures. Schedule disaster recovery drills quarterly to ensure your team knows how to execute the plan under pressure.

Further Reading

Practice Exercises

Exercise 1: Multi-Cloud File Manager

Learning Objective: Master multi-cloud storage abstraction and understand how to build resilient systems that can work across different cloud providers. This exercise teaches you to create a unified interface that works seamlessly across AWS S3, Google Cloud Storage, and Azure Blob Storage while implementing consistent patterns for file operations.

Real-World Context: Multi-cloud strategies are essential for enterprise applications that require high availability and vendor diversification. Companies like Netflix and Spotify use multi-cloud storage to avoid vendor lock-in and ensure their services remain available even when an entire cloud region experiences an outage. This pattern is particularly valuable for disaster recovery, cost optimization, and compliance requirements.

Difficulty: Intermediate | Time Estimate: 45-60 minutes

Objective: Build a file manager that works with AWS S3, Google Cloud Storage, and Azure Blob Storage using a unified interface.

Solution with Explanation
  1package main
  2
  3import (
  4    "context"
  5    "fmt"
  6    "io"
  7    "log"
  8    "strings"
  9    "time"
 10)
 11
 12// Storage interface for cloud-agnostic operations
 13type Storage interface {
 14    Upload(ctx context.Context, path string, data io.Reader) error
 15    Download(ctx context.Context, path string)
 16    List(ctx context.Context, prefix string)
 17    Delete(ctx context.Context, path string) error
 18}
 19
 20// FileManager provides high-level file operations
 21type FileManager struct {
 22    primary   Storage
 23    secondary Storage
 24    mirror    bool
 25}
 26
 27func NewFileManager(primary, secondary Storage, mirror bool) *FileManager {
 28    return &FileManager{
 29        primary:   primary,
 30        secondary: secondary,
 31        mirror:    mirror,
 32    }
 33}
 34
 35// Upload uploads to primary and optionally mirrors to secondary
 36func Upload(ctx context.Context, path string, data []byte) error {
 37    reader := strings.NewReader(string(data))
 38
 39    // Upload to primary
 40    if err := fm.primary.Upload(ctx, path, reader); err != nil {
 41        return fmt.Errorf("primary upload failed: %w", err)
 42    }
 43
 44    // Mirror to secondary if enabled
 45    if fm.mirror && fm.secondary != nil {
 46        reader.Seek(0, io.SeekStart)
 47        if err := fm.secondary.Upload(ctx, path, reader); err != nil {
 48            log.Printf("WARNING: secondary upload failed: %v", err)
 49            // Don't fail the operation
 50        }
 51    }
 52
 53    return nil
 54}
 55
 56// Download tries primary first, falls back to secondary
 57func Download(ctx context.Context, path string) {
 58    data, err := fm.primary.Download(ctx, path)
 59    if err == nil {
 60        return data, nil
 61    }
 62
 63    log.Printf("Primary download failed, trying secondary: %v", err)
 64
 65    if fm.secondary != nil {
 66        return fm.secondary.Download(ctx, path)
 67    }
 68
 69    return nil, fmt.Errorf("download failed from all sources: %w", err)
 70}
 71
 72// Sync synchronizes files from primary to secondary
 73func Sync(ctx context.Context, prefix string) error {
 74    if fm.secondary == nil {
 75        return fmt.Errorf("no secondary storage configured")
 76    }
 77
 78    // List files in primary
 79    files, err := fm.primary.List(ctx, prefix)
 80    if err != nil {
 81        return fmt.Errorf("failed to list primary: %w", err)
 82    }
 83
 84    // Copy each file to secondary
 85    for _, file := range files {
 86        data, err := fm.primary.Download(ctx, file)
 87        if err != nil {
 88            log.Printf("Failed to download %s: %v", file, err)
 89            continue
 90        }
 91
 92        reader := strings.NewReader(string(data))
 93        if err := fm.secondary.Upload(ctx, file, reader); err != nil {
 94            log.Printf("Failed to upload %s to secondary: %v", file, err)
 95            continue
 96        }
 97
 98        log.Printf("Synced: %s", file)
 99    }
100
101    return nil
102}
103
104// CleanupOldFiles deletes files older than retention period
105func CleanupOldFiles(ctx context.Context, prefix string, retention time.Duration) error {
106    files, err := fm.primary.List(ctx, prefix)
107    if err != nil {
108        return fmt.Errorf("failed to list files: %w", err)
109    }
110
111    cutoff := time.Now().Add(-retention)
112
113    for _, file := range files {
114        // In production, you'd check file modification time
115        // This is a simplified example
116        log.Printf("Would delete %s if older than %s", file, cutoff)
117        // fm.primary.Delete(ctx, file)
118    }
119
120    return nil
121}
122
123func main() {
124    ctx := context.Background()
125
126    // Initialize storage clients
127    var s3Storage Storage   // = NewS3Storage(...)
128    var gcsStorage Storage  // = NewGCSStorage(...)
129
130    // Create file manager with S3 primary and GCS secondary
131    fm := NewFileManager(s3Storage, gcsStorage, true)
132
133    // Upload with automatic mirroring
134    data := []byte("Hello, multi-cloud world!")
135    if err := fm.Upload(ctx, "test/hello.txt", data); err != nil {
136        log.Fatal(err)
137    }
138
139    // Download with automatic failover
140    downloaded, err := fm.Download(ctx, "test/hello.txt")
141    if err != nil {
142        log.Fatal(err)
143    }
144
145    fmt.Printf("Downloaded: %s\n", string(downloaded))
146
147    // Sync from primary to secondary
148    if err := fm.Sync(ctx, "test/"); err != nil {
149        log.Printf("Sync failed: %v", err)
150    }
151}

Exercise 2: Event-Driven Processing Pipeline

Learning Objective: Understand event-driven architecture patterns by building a scalable processing pipeline that responds to cloud storage events. This exercise teaches you to implement asynchronous processing workflows using message queues, event sources, and worker pools that can handle high-throughput data processing scenarios.

Real-World Context: Event-driven architectures are the backbone of modern data processing systems at companies like Airbnb, Uber, and Dropbox. When users upload files, these systems automatically trigger workflows for virus scanning, thumbnail generation, metadata extraction, and notifications. This pattern enables horizontal scaling, loose coupling between services, and fault-tolerant processing that can handle millions of events per day.

Difficulty: Intermediate | Time Estimate: 60-75 minutes

Objective: Create an event-driven pipeline using AWS S3 events, SQS, and Lambda-like processing.

Solution
  1package main
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "sync"
  9    "time"
 10)
 11
 12// S3Event represents an S3 event notification
 13type S3Event struct {
 14    EventName string    `json:"eventName"`
 15    Bucket    string    `json:"bucket"`
 16    Key       string    `json:"key"`
 17    Size      int64     `json:"size"`
 18    Timestamp time.Time `json:"timestamp"`
 19}
 20
 21// ProcessorFunc processes S3 events
 22type ProcessorFunc func(context.Context, S3Event) error
 23
 24// EventPipeline orchestrates event processing
 25type EventPipeline struct {
 26    sqsClient  *SQSClient
 27    s3Client   *S3Client
 28    processors []ProcessorFunc
 29    workers    int
 30}
 31
 32func NewEventPipeline(sqs *SQSClient, s3 *S3Client, workers int) *EventPipeline {
 33    return &EventPipeline{
 34        sqsClient: sqs,
 35        s3Client:  s3,
 36        workers:   workers,
 37    }
 38}
 39
 40// RegisterProcessor adds a processor to the pipeline
 41func RegisterProcessor(processor ProcessorFunc) {
 42    ep.processors = append(ep.processors, processor)
 43}
 44
 45// Start begins processing events
 46func Start(ctx context.Context) error {
 47    var wg sync.WaitGroup
 48
 49    // Start worker goroutines
 50    for i := 0; i < ep.workers; i++ {
 51        wg.Add(1)
 52        go func(workerID int) {
 53            defer wg.Done()
 54            ep.worker(ctx, workerID)
 55        }(i)
 56    }
 57
 58    wg.Wait()
 59    return nil
 60}
 61
 62// worker processes messages from SQS
 63func worker(ctx context.Context, workerID int) {
 64    log.Printf("Worker %d started", workerID)
 65
 66    for {
 67        select {
 68        case <-ctx.Done():
 69            log.Printf("Worker %d stopping", workerID)
 70            return
 71        default:
 72            messages, err := ep.sqsClient.ReceiveMessages(ctx, 1)
 73            if err != nil {
 74                log.Printf("Worker %d: receive error: %v", workerID, err)
 75                time.Sleep(1 * time.Second)
 76                continue
 77            }
 78
 79            for _, msg := range messages {
 80                if err := ep.processMessage(ctx, msg); err != nil {
 81                    log.Printf("Worker %d: process error: %v", workerID, err)
 82                    continue
 83                }
 84
 85                // Delete message after successful processing
 86                if err := ep.sqsClient.DeleteMessage(ctx, *msg.ReceiptHandle); err != nil {
 87                    log.Printf("Worker %d: delete error: %v", workerID, err)
 88                }
 89            }
 90        }
 91    }
 92}
 93
 94// processMessage processes a single SQS message
 95func processMessage(ctx context.Context, msg types.Message) error {
 96    // Parse S3 event
 97    var event S3Event
 98    if err := json.Unmarshal([]byte(*msg.Body), &event); err != nil {
 99        return fmt.Errorf("failed to unmarshal event: %w", err)
100    }
101
102    log.Printf("Processing: %s/%s", event.Bucket, event.Key, event.Size)
103
104    // Run all processors
105    for i, processor := range ep.processors {
106        if err := processor(ctx, event); err != nil {
107            return fmt.Errorf("processor %d failed: %w", i, err)
108        }
109    }
110
111    return nil
112}
113
114// ImageProcessor validates and processes image files
115func ImageProcessor(ctx context.Context, event S3Event) error {
116    // Skip non-image files
117    if !strings.HasSuffix(event.Key, ".jpg") &&
118       !strings.HasSuffix(event.Key, ".png") {
119        return nil
120    }
121
122    log.Printf("Processing image: %s", event.Key)
123
124    // In production, you would:
125    // 1. Download image from S3
126    // 2. Generate thumbnail
127    // 3. Upload thumbnail to S3
128    // 4. Update database with metadata
129
130    time.Sleep(100 * time.Millisecond) // Simulate processing
131    return nil
132}
133
134// MetadataProcessor extracts and stores file metadata
135func MetadataProcessor(ctx context.Context, event S3Event) error {
136    log.Printf("Extracting metadata: %s", event.Key)
137
138    metadata := map[string]interface{}{
139        "key":       event.Key,
140        "size":      event.Size,
141        "timestamp": event.Timestamp,
142        "event":     event.EventName,
143    }
144
145    // Store in database
146    data, _ := json.Marshal(metadata)
147    log.Printf("Metadata: %s", string(data))
148
149    return nil
150}
151
152// VirusScanProcessor scans files for malware
153func VirusScanProcessor(ctx context.Context, event S3Event) error {
154    // Only scan files smaller than 100MB
155    if event.Size > 100*1024*1024 {
156        log.Printf("Skipping large file: %s", event.Key)
157        return nil
158    }
159
160    log.Printf("Scanning: %s", event.Key)
161
162    // Simulate virus scan
163    time.Sleep(50 * time.Millisecond)
164
165    // Tag file as scanned
166    log.Printf("File clean: %s", event.Key)
167    return nil
168}
169
170// NotificationProcessor sends notifications for important events
171func NotificationProcessor(ctx context.Context, event S3Event) error {
172    // Only notify for large files
173    if event.Size < 10*1024*1024 {
174        return nil
175    }
176
177    log.Printf("NOTIFICATION: Large file uploaded: %s",
178        event.Key, event.Size)
179
180    // Send email, Slack message, etc.
181    return nil
182}
183
184func main() {
185    ctx, cancel := context.WithCancel(context.Background())
186    defer cancel()
187
188    // Initialize clients
189    sqsClient := &SQSClient{} // NewSQSClient(...)
190    s3Client := &S3Client{}   // NewS3Client(...)
191
192    // Create pipeline
193    pipeline := NewEventPipeline(sqsClient, s3Client, 5)
194
195    // Register processors
196    pipeline.RegisterProcessor(ImageProcessor)
197    pipeline.RegisterProcessor(MetadataProcessor)
198    pipeline.RegisterProcessor(VirusScanProcessor)
199    pipeline.RegisterProcessor(NotificationProcessor)
200
201    // Start processing
202    log.Println("Starting event pipeline...")
203    if err := pipeline.Start(ctx); err != nil {
204        log.Fatal(err)
205    }
206}

Explanation: This exercise demonstrates building an event-driven pipeline using multiple AWS services. The pipeline consumes S3 events from SQS, processes them through multiple processors, and handles errors gracefully. Key patterns include concurrent processing with workers, retry logic, and processor chaining for complex workflows.

Exercise 3: S3 File Manager with Metadata

Learning Objective: Master advanced S3 operations including custom metadata management, multipart uploads, and search functionality. This exercise teaches you to build a production-ready file management system that leverages S3's metadata capabilities for organizing and retrieving files efficiently.

Real-World Context: Content management systems and document platforms like Box, Dropbox, and Google Drive rely heavily on metadata-driven file organization. Metadata enables powerful search capabilities, automatic file categorization, access control, and workflow automation. Understanding how to effectively use S3 metadata is crucial for building scalable document management, media libraries, and data lakes that can handle millions of files with efficient retrieval patterns.

Difficulty: Intermediate | Time Estimate: 50-65 minutes

Objective: Build an S3 file manager that handles uploads, downloads, and custom metadata management with search capabilities.

Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"log"
  9	"strings"
 10	"time"
 11)
 12
 13// FileManager manages S3 files with metadata
 14type FileManager struct {
 15	bucket string
 16	// client *s3.Client // Simulated for demonstration
 17}
 18
 19type FileMetadata struct {
 20	Key         string            `json:"key"`
 21	Size        int64             `json:"size"`
 22	ContentType string            `json:"content_type"`
 23	Tags        map[string]string `json:"tags"`
 24	UploadedAt  time.Time         `json:"uploaded_at"`
 25	UploadedBy  string            `json:"uploaded_by"`
 26}
 27
 28func NewFileManager(bucket string) *FileManager {
 29	return &FileManager{bucket: bucket}
 30}
 31
 32// Upload simulates uploading file with metadata
 33func Upload(ctx context.Context, key string, size int64, contentType string, tags map[string]string, userID string) {
 34	metadata := &FileMetadata{
 35		Key:         key,
 36		Size:        size,
 37		ContentType: contentType,
 38		Tags:        tags,
 39		UploadedAt:  time.Now(),
 40		UploadedBy:  userID,
 41	}
 42
 43	// In production:
 44	// 1. Upload file to S3 using PutObject
 45	// 2. Set object metadata and tags
 46	// 3. Store metadata in DynamoDB for fast querying
 47
 48	fmt.Printf("Uploaded: %s by %s\n", key, size, userID)
 49	fmt.Printf("Tags: %v\n", tags)
 50
 51	return metadata, nil
 52}
 53
 54// SearchByTag searches files by tag
 55func SearchByTag(ctx context.Context, tagKey, tagValue string) {
 56	// In production:
 57	// 1. Query DynamoDB index on tag
 58	// 2. Or use S3 Select with metadata
 59
 60	// Simulated results
 61	results := []*FileMetadata{
 62		{
 63			Key:         "documents/report-2024.pdf",
 64			Size:        1024000,
 65			ContentType: "application/pdf",
 66			Tags:        map[string]string{"department": "finance", "year": "2024"},
 67			UploadedAt:  time.Now().Add(-24 * time.Hour),
 68			UploadedBy:  "user-123",
 69		},
 70	}
 71
 72	fmt.Printf("Found %d files with tag %s=%s\n", len(results), tagKey, tagValue)
 73	return results, nil
 74}
 75
 76// UpdateTags updates file tags
 77func UpdateTags(ctx context.Context, key string, tags map[string]string) error {
 78	// In production:
 79	// Use PutObjectTagging API
 80
 81	fmt.Printf("Updated tags for %s: %v\n", key, tags)
 82	return nil
 83}
 84
 85// GenerateDownloadURL generates presigned download URL
 86func GenerateDownloadURL(ctx context.Context, key string, expiry time.Duration) {
 87	// In production:
 88	// Use s3.NewPresignClient and PresignGetObject
 89
 90	url := fmt.Sprintf("https://%s.s3.amazonaws.com/%s?X-Amz-Expires=%d", fm.bucket, key, int(expiry.Seconds()))
 91	fmt.Printf("Generated download URL: %s\n", expiry, url)
 92	return url, nil
 93}
 94
 95// ListByPrefix lists files by prefix with pagination
 96func ListByPrefix(ctx context.Context, prefix string, maxResults int) {
 97	// In production:
 98	// Use ListObjectsV2Paginator
 99
100	files := []*FileMetadata{
101		{
102			Key:         prefix + "file1.txt",
103			Size:        1024,
104			ContentType: "text/plain",
105			Tags:        map[string]string{"category": "documents"},
106			UploadedAt:  time.Now(),
107			UploadedBy:  "user-456",
108		},
109		{
110			Key:         prefix + "file2.txt",
111			Size:        2048,
112			ContentType: "text/plain",
113			Tags:        map[string]string{"category": "documents"},
114			UploadedAt:  time.Now(),
115			UploadedBy:  "user-456",
116		},
117	}
118
119	fmt.Printf("Listed %d files with prefix '%s'\n", len(files), prefix)
120	return files, nil
121}
122
123func main() {
124	ctx := context.Background()
125	fm := NewFileManager("my-files-bucket")
126
127	// Upload file with tags
128	metadata, _ := fm.Upload(ctx, "reports/quarterly-2024.pdf", 5242880,
129		"application/pdf",
130		map[string]string{
131			"department": "finance",
132			"year":       "2024",
133			"quarter":    "Q1",
134		},
135		"user-123")
136
137	// Search by tag
138	results, _ := fm.SearchByTag(ctx, "department", "finance")
139	fmt.Printf("\nSearch results: %d files\n", len(results))
140
141	// Update tags
142	fm.UpdateTags(ctx, metadata.Key, map[string]string{
143		"department": "finance",
144		"year":       "2024",
145		"quarter":    "Q1",
146		"reviewed":   "true",
147	})
148
149	// Generate download URL
150	url, _ := fm.GenerateDownloadURL(ctx, metadata.Key, 1*time.Hour)
151
152	// List files by prefix
153	files, _ := fm.ListByPrefix(ctx, "reports/", 10)
154	for _, f := range files {
155		fmt.Printf("  - %s\n", f.Key, f.Size)
156	}
157}

Explanation: This exercise shows S3 file management with rich metadata. It demonstrates uploading files with custom tags, searching by tags, updating metadata, generating presigned URLs for downloads, and listing files with pagination. In production, this pattern combines S3 for storage with DynamoDB for fast metadata queries.

Exercise 4: DynamoDB CRUD Application

Learning Objective: Master DynamoDB programming patterns including expression builders, conditional writes, transactions, and efficient query design. This exercise teaches you to build a complete data access layer that handles complex operations while maintaining performance and consistency in a NoSQL environment.

Real-World Context: DynamoDB powers some of the largest applications in the world including Netflix's recommendation engine, Airbnb's booking system, and Lyft's ride matching platform. Understanding DynamoDB's unique approach to data modeling, consistency models, and performance optimization is essential for building scalable applications that can handle millions of requests per day with single-digit millisecond latency.

Difficulty: Advanced | Time Estimate: 70-90 minutes

Objective: Build a complete CRUD application using DynamoDB with expression builders, transactions, and conditional writes.

Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"fmt"
  7	"log"
  8	"time"
  9)
 10
 11// User represents a user document
 12type User struct {
 13	ID        string    `json:"id"`
 14	Email     string    `json:"email"`
 15	Name      string    `json:"name"`
 16	Role      string    `json:"role"`
 17	Active    bool      `json:"active"`
 18	CreatedAt time.Time `json:"created_at"`
 19	UpdatedAt time.Time `json:"updated_at"`
 20	Version   int       `json:"version"` // For optimistic locking
 21}
 22
 23// UserRepository handles DynamoDB operations
 24type UserRepository struct {
 25	tableName string
 26	// client *dynamodb.Client // Simulated for demonstration
 27}
 28
 29func NewUserRepository(tableName string) *UserRepository {
 30	return &UserRepository{tableName: tableName}
 31}
 32
 33// Create creates a new user with conditional write
 34func Create(ctx context.Context, user *User) error {
 35	now := time.Now()
 36	user.CreatedAt = now
 37	user.UpdatedAt = now
 38	user.Version = 1
 39
 40	// In production:
 41	// 1. Marshal user to attribute map
 42	// 2. Use PutItem with condition: attribute_not_exists(id)
 43	// 3. Handle ConditionalCheckFailedException
 44
 45	fmt.Printf("Created user: %s\n", user.Name, user.Email)
 46	return nil
 47}
 48
 49// Get retrieves a user by ID
 50func Get(ctx context.Context, id string) {
 51	// In production:
 52	// 1. Use GetItem with primary key
 53	// 2. Set ConsistentRead for strong consistency
 54	// 3. Unmarshal result to User struct
 55
 56	user := &User{
 57		ID:        id,
 58		Email:     "user@example.com",
 59		Name:      "John Doe",
 60		Role:      "admin",
 61		Active:    true,
 62		CreatedAt: time.Now().Add(-30 * 24 * time.Hour),
 63		UpdatedAt: time.Now(),
 64		Version:   1,
 65	}
 66
 67	fmt.Printf("Retrieved user: %s\n", user.Name)
 68	return user, nil
 69}
 70
 71// Update updates user with optimistic locking
 72func Update(ctx context.Context, user *User) error {
 73	// In production:
 74	// 1. Build update expression for changed fields
 75	// 2. Use UpdateItem with condition: version = current_version
 76	// 3. Increment version number
 77	// 4. Handle version mismatch
 78
 79	oldVersion := user.Version
 80	user.Version++
 81	user.UpdatedAt = time.Now()
 82
 83	fmt.Printf("Updated user %s\n", user.ID, oldVersion, user.Version)
 84	return nil
 85}
 86
 87// UpdateEmail updates just the email using expression builder
 88func UpdateEmail(ctx context.Context, id, newEmail string) error {
 89	// In production:
 90	// Use expression.Set() to build:
 91	// SET email = :email, updated_at = :now, version = version + 1
 92
 93	fmt.Printf("Updated email for user %s to %s\n", id, newEmail)
 94	return nil
 95}
 96
 97// Delete deletes a user
 98func Delete(ctx context.Context, id string) error {
 99	// In production:
100	// Option 1: Soft delete - set active = false
101	// Option 2: Hard delete - use DeleteItem
102
103	fmt.Printf("Deleted user %s\n", id)
104	return nil
105}
106
107// Query queries users by role
108func QueryByRole(ctx context.Context, role string) {
109	// In production:
110	// 1. Use Query on GSI
111	// 2. Build key condition expression
112	// 3. Optionally add filter expression
113	// 4. Handle pagination with ExclusiveStartKey
114
115	users := []*User{
116		{
117			ID:     "user-1",
118			Name:   "Alice",
119			Email:  "alice@example.com",
120			Role:   role,
121			Active: true,
122		},
123		{
124			ID:     "user-2",
125			Name:   "Bob",
126			Email:  "bob@example.com",
127			Role:   role,
128			Active: true,
129		},
130	}
131
132	fmt.Printf("Found %d users with role '%s'\n", len(users), role)
133	return users, nil
134}
135
136// BatchCreate creates multiple users in batch
137func BatchCreate(ctx context.Context, users []*User) error {
138	// In production:
139	// 1. Split into batches of 25
140	// 2. Use BatchWriteItem
141	// 3. Handle unprocessed items
142	// 4. Implement retry logic
143
144	fmt.Printf("Batch created %d users\n", len(users))
145	return nil
146}
147
148// TransactionalUpdate performs transaction across multiple items
149func TransactionalUpdate(ctx context.Context, fromUserID, toUserID string, amount int) error {
150	// In production:
151	// Use TransactWriteItems to:
152	// 1. Decrement balance from source user
153	// 2. Increment balance for destination user
154	// 3. All-or-nothing guarantee
155
156	fmt.Printf("Transaction: Transfer %d from %s to %s\n", amount, fromUserID, toUserID)
157	return nil
158}
159
160func main() {
161	ctx := context.Background()
162	repo := NewUserRepository("users")
163
164	// Create user
165	newUser := &User{
166		ID:     "user-123",
167		Email:  "john@example.com",
168		Name:   "John Doe",
169		Role:   "admin",
170		Active: true,
171	}
172	repo.Create(ctx, newUser)
173
174	// Get user
175	user, _ := repo.Get(ctx, "user-123")
176	fmt.Printf("User: %+v\n", user)
177
178	// Update user
179	user.Name = "John Smith"
180	repo.Update(ctx, user)
181
182	// Update specific field
183	repo.UpdateEmail(ctx, "user-123", "john.smith@example.com")
184
185	// Query by role
186	admins, _ := repo.QueryByRole(ctx, "admin")
187	fmt.Printf("Found %d admins\n", len(admins))
188
189	// Batch create
190	users := []*User{
191		{ID: "user-201", Email: "user1@example.com", Name: "User 1", Role: "user"},
192		{ID: "user-202", Email: "user2@example.com", Name: "User 2", Role: "user"},
193		{ID: "user-203", Email: "user3@example.com", Name: "User 3", Role: "user"},
194	}
195	repo.BatchCreate(ctx, users)
196
197	// Transactional update
198	repo.TransactionalUpdate(ctx, "user-123", "user-456", 100)
199
200	// Soft delete
201	repo.Delete(ctx, "user-123")
202}

Explanation: This exercise demonstrates complete DynamoDB CRUD operations with production patterns. It includes conditional writes for data integrity, optimistic locking using version numbers, expression builders for flexible updates, querying with GSI, batch operations for efficiency, and transactions for atomic multi-item updates. These patterns ensure data consistency and optimal performance.

Exercise 5: SQS Message Processor

Learning Objective: Master enterprise message processing patterns using SQS including dead letter queues, message batching, and visibility timeout management. This exercise teaches you to build robust, fault-tolerant message processing systems that can handle high-throughput workloads while preventing message loss.

Real-World Context: Message queues are the backbone of microservices architectures at companies like Amazon, Uber, and Netflix. SQS processes billions of messages daily for critical workflows like order processing, inventory updates, and user notifications. Understanding how to implement reliable message processing with proper error handling, retry logic, and monitoring is essential for building distributed systems that can scale and maintain data consistency.

Difficulty: Advanced | Time Estimate: 65-80 minutes

Objective: Build a reliable SQS message processor with dead letter queues, batching, and visibility timeout management.

Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"log"
  9	"sync"
 10	"time"
 11)
 12
 13// Message represents an SQS message
 14type Message struct {
 15	ID            string                 `json:"id"`
 16	Type          string                 `json:"type"`
 17	Body          map[string]interface{} `json:"body"`
 18	ReceiptHandle string                 `json:"-"` // Not serialized
 19	RetryCount    int                    `json:"retry_count"`
 20}
 21
 22// MessageProcessor processes SQS messages
 23type MessageProcessor struct {
 24	queueURL    string
 25	dlqURL      string
 26	maxRetries  int
 27	workers     int
 28	// client *sqs.Client // Simulated for demonstration
 29}
 30
 31func NewMessageProcessor(queueURL, dlqURL string, workers int) *MessageProcessor {
 32	return &MessageProcessor{
 33		queueURL:   queueURL,
 34		dlqURL:     dlqURL,
 35		maxRetries: 3,
 36		workers:    workers,
 37	}
 38}
 39
 40// Start starts message processing with multiple workers
 41func Start(ctx context.Context) error {
 42	var wg sync.WaitGroup
 43
 44	for i := 0; i < mp.workers; i++ {
 45		wg.Add(1)
 46		go func(workerID int) {
 47			defer wg.Done()
 48			mp.worker(ctx, workerID)
 49		}(i)
 50	}
 51
 52	wg.Wait()
 53	return nil
 54}
 55
 56// worker processes messages in a loop
 57func worker(ctx context.Context, workerID int) {
 58	fmt.Printf("Worker %d started\n", workerID)
 59
 60	for {
 61		select {
 62		case <-ctx.Done():
 63			fmt.Printf("Worker %d stopping\n", workerID)
 64			return
 65		default:
 66			// In production:
 67			// Use ReceiveMessage with:
 68			// - MaxNumberOfMessages: 10
 69			// - WaitTimeSeconds: 20
 70			// - VisibilityTimeout: 30
 71			messages := mp.receiveMessages(ctx, 10)
 72
 73			for _, msg := range messages {
 74				mp.processMessage(ctx, workerID, msg)
 75			}
 76		}
 77	}
 78}
 79
 80// receiveMessages simulates receiving messages from SQS
 81func receiveMessages(ctx context.Context, maxMessages int) []*Message {
 82	// Simulated messages
 83	time.Sleep(100 * time.Millisecond) // Simulate long polling
 84
 85	return []*Message{
 86		{
 87			ID:            "msg-001",
 88			Type:          "order.created",
 89			Body:          map[string]interface{}{"order_id": "12345", "amount": 99.99},
 90			ReceiptHandle: "receipt-001",
 91			RetryCount:    0,
 92		},
 93	}
 94}
 95
 96// processMessage processes a single message
 97func processMessage(ctx context.Context, workerID int, msg *Message) {
 98	fmt.Printf("Worker %d processing message: %s\n", workerID, msg.ID, msg.Type)
 99
100	// Extend visibility timeout for long-running tasks
101	go mp.extendVisibilityTimeout(ctx, msg)
102
103	// Process message
104	err := mp.handleMessage(ctx, msg)
105	if err != nil {
106		fmt.Printf("Worker %d: Processing failed: %v\n", workerID, err)
107		mp.handleFailure(ctx, msg, err)
108		return
109	}
110
111	// Delete message after successful processing
112	mp.deleteMessage(ctx, msg)
113	fmt.Printf("Worker %d: Message %s processed successfully\n", workerID, msg.ID)
114}
115
116// handleMessage handles message based on type
117func handleMessage(ctx context.Context, msg *Message) error {
118	switch msg.Type {
119	case "order.created":
120		return mp.processOrder(ctx, msg.Body)
121	case "user.registered":
122		return mp.processUserRegistration(ctx, msg.Body)
123	case "payment.completed":
124		return mp.processPayment(ctx, msg.Body)
125	default:
126		return fmt.Errorf("unknown message type: %s", msg.Type)
127	}
128}
129
130func processOrder(ctx context.Context, body map[string]interface{}) error {
131	orderID := body["order_id"]
132	fmt.Printf("Processing order: %v\n", orderID)
133	time.Sleep(50 * time.Millisecond) // Simulate processing
134	return nil
135}
136
137func processUserRegistration(ctx context.Context, body map[string]interface{}) error {
138	fmt.Println("Processing user registration")
139	return nil
140}
141
142func processPayment(ctx context.Context, body map[string]interface{}) error {
143	fmt.Println("Processing payment")
144	return nil
145}
146
147// extendVisibilityTimeout extends message visibility for long tasks
148func extendVisibilityTimeout(ctx context.Context, msg *Message) {
149	ticker := time.NewTicker(20 * time.Second)
150	defer ticker.Stop()
151
152	for {
153		select {
154		case <-ctx.Done():
155			return
156		case <-ticker.C:
157			// In production:
158			// Use ChangeMessageVisibility to extend timeout
159			fmt.Printf("Extending visibility timeout for message %s\n", msg.ID)
160		}
161	}
162}
163
164// handleFailure handles message processing failure
165func handleFailure(ctx context.Context, msg *Message, err error) {
166	msg.RetryCount++
167
168	if msg.RetryCount >= mp.maxRetries {
169		// Send to dead letter queue
170		fmt.Printf("Max retries exceeded for message %s, sending to DLQ\n", msg.ID)
171		mp.sendToDLQ(ctx, msg, err)
172		mp.deleteMessage(ctx, msg)
173	} else {
174		// Return to queue with delay
175		fmt.Printf("Retry %d/%d for message %s\n", msg.RetryCount, mp.maxRetries, msg.ID)
176		// Message will become visible again after visibility timeout expires
177	}
178}
179
180// sendToDLQ sends failed message to dead letter queue
181func sendToDLQ(ctx context.Context, msg *Message, err error) {
182	// In production:
183	// 1. Add failure metadata to message attributes
184	// 2. Send to DLQ using SendMessage
185	// 3. Include original error and retry count
186
187	dlqMessage := map[string]interface{}{
188		"original_message": msg,
189		"error":            err.Error(),
190		"failed_at":        time.Now(),
191		"retry_count":      msg.RetryCount,
192	}
193
194	data, _ := json.MarshalIndent(dlqMessage, "", "  ")
195	fmt.Printf("DLQ Message:\n%s\n", string(data))
196}
197
198// deleteMessage deletes processed message
199func deleteMessage(ctx context.Context, msg *Message) {
200	// In production:
201	// Use DeleteMessage with ReceiptHandle
202	fmt.Printf("Deleted message: %s\n", msg.ID)
203}
204
205// BatchDelete deletes multiple messages efficiently
206func BatchDelete(ctx context.Context, messages []*Message) error {
207	// In production:
208	// Use DeleteMessageBatch
209	fmt.Printf("Batch deleted %d messages\n", len(messages))
210	return nil
211}
212
213func main() {
214	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
215	defer cancel()
216
217	processor := NewMessageProcessor(
218		"https://sqs.us-east-1.amazonaws.com/123456/my-queue",
219		"https://sqs.us-east-1.amazonaws.com/123456/my-dlq",
220		3, // 3 workers
221	)
222
223	fmt.Println("Starting SQS message processor...")
224	if err := processor.Start(ctx); err != nil {
225		log.Fatal(err)
226	}
227
228	fmt.Println("Message processor stopped")
229}

Explanation: This exercise demonstrates production-ready SQS message processing with multiple workers for parallel processing, long polling for efficiency, visibility timeout extension for long-running tasks, retry logic with configurable limits, dead letter queue for failed messages, and batch operations for optimal throughput. Key patterns include graceful shutdown, error handling, and message lifecycle management.

Summary

Cloud SDK integration enables Go applications to leverage managed cloud services effectively.

Key Takeaways

  1. SDK Selection: AWS SDK v2 is modular, Google Cloud is idiomatic, Azure uses Track 2
  2. Authentication: Use credential chains for flexibility
  3. Error Handling: Implement retries, circuit breakers, and proper error classification
  4. Abstraction: Create cloud-agnostic interfaces for portability
  5. Performance: Reuse clients, implement connection pooling, use batch operations
  6. Monitoring: Instrument SDK calls with metrics and distributed tracing
  7. Cost Optimization: Cache responses, batch operations, use appropriate storage classes

Best Practices

  1. Always use context for cancellation and timeouts
  2. Reuse SDK clients across requests
  3. Implement exponential backoff for retries
  4. Use credential chains instead of hardcoded credentials
  5. Monitor SDK call metrics for performance and cost
  6. Implement graceful degradation with fallbacks
  7. Test with cloud emulators
  8. Use batch operations to reduce API calls and costs