When building a house, you need various specialists - electricians, plumbers, carpenters. Each specialist has their own tools, work methods, and communication style. Cloud SDKs are like having universal translators and standardized tools that let you work seamlessly with all these specialists, regardless of their individual preferences.
Modern cloud applications leverage managed services across multiple cloud providers. This article covers production-ready patterns for integrating AWS SDK v2, Google Cloud client libraries, and Azure SDK for Go into your applications.
Introduction to Cloud SDKs
Cloud SDKs provide programmatic access to cloud services, enabling you to build applications that leverage managed infrastructure, storage, compute, and more.
💡 Key Takeaway: Cloud SDKs abstract away the complexity of REST APIs, authentication, retry logic, and error handling. They provide type-safe, idiomatic Go interfaces that make cloud services feel like native Go packages.
SDK Design Philosophy
Each cloud provider has a different philosophy for their Go SDK, reflecting their overall platform approach and design principles.
AWS SDK v2:
1// Import only what you need - reduces binary size
2import (
3 "github.com/aws/aws-sdk-go-v2/config"
4 "github.com/aws/aws-sdk-go-v2/service/s3"
5 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
6 // Each service is a separate module
7)
8
9// AWS SDK v2 uses context throughout
10ctx := context.Background()
11cfg, _ := config.LoadDefaultConfig(ctx)
12client := s3.NewFromConfig(cfg)
Google Cloud:
1// Google Cloud emphasizes idiomatic Go patterns
2import (
3 "cloud.google.com/go/storage"
4 "cloud.google.com/go/pubsub"
5)
6
7// Context-first design
8ctx := context.Background()
9client, _ := storage.NewClient(ctx)
10defer client.Close()
Azure SDK:
1// Azure SDK uses azidentity for auth
2import (
3 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
4 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
5)
6
7// Credential-based authentication
8cred, _ := azidentity.NewDefaultAzureCredential(nil)
9client, _ := azblob.NewClient(serviceURL, cred, nil)
Key Differences
| Feature | AWS SDK v2 | Google Cloud | Azure SDK |
|---|---|---|---|
| Modularity | Per-service modules | Per-service packages | Per-service modules |
| Auth | Config chain | ADC, service accounts | azidentity chain |
| Context | Required everywhere | First parameter | Options pattern |
| Retries | Built-in middleware | Client options | Policy-based |
Real-World Impact: These differences affect how you structure your code. AWS SDK v2 requires context for every operation, making it more explicit about cancellation. Google Cloud's context-first design feels more idiomatic to Go developers. Azure's options pattern provides more flexibility but requires more verbose code.
AWS SDK v2 Fundamentals
AWS SDK v2 introduced a complete redesign with improved modularity, context support, and performance. Think of it as upgrading from a flip phone to a smartphone - same basic purpose, but entirely different capabilities and user experience.
Configuration Loading
The AWS SDK v2 configuration system is like a smart key that can open many doors. It tries different keys automatically until it finds one that works, eliminating the need to hardcode credentials.
Default Config Chain:
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7
8 "github.com/aws/aws-sdk-go-v2/config"
9 "github.com/aws/aws-sdk-go-v2/aws"
10)
11
12// LoadAWSConfig loads configuration from environment, shared config, EC2 metadata
13func LoadAWSConfig(ctx context.Context) {
14 // Default config loader checks:
15 // 1. Environment variables
16 // 2. Shared config file
17 // 3. Shared credentials file
18 // 4. EC2 instance metadata
19 // 5. ECS task credentials
20 cfg, err := config.LoadDefaultConfig(ctx)
21 if err != nil {
22 return aws.Config{}, fmt.Errorf("unable to load SDK config: %w", err)
23 }
24
25 return cfg, nil
26}
27
28// CustomConfig demonstrates custom configuration
29func CustomConfig(ctx context.Context) {
30 cfg, err := config.LoadDefaultConfig(ctx,
31 config.WithRegion("us-west-2"),
32 config.WithSharedConfigProfile("production"),
33 // Custom retry configuration
34 config.WithRetryMaxAttempts(5),
35 // Custom endpoint
36 config.WithEndpointResolverWithOptions(
37 aws.EndpointResolverWithOptionsFunc(
38 func(service, region string, options ...interface{}) {
39 if service == "s3" {
40 return aws.Endpoint{
41 URL: "http://localhost:4566",
42 }, nil
43 }
44 return aws.Endpoint{}, &aws.EndpointNotFoundError{}
45 },
46 ),
47 ),
48 )
49 if err != nil {
50 return aws.Config{}, err
51 }
52
53 return cfg, nil
54}
55
56func main() {
57 ctx := context.Background()
58
59 cfg, err := LoadAWSConfig(ctx)
60 if err != nil {
61 log.Fatal(err)
62 }
63
64 fmt.Printf("Loaded config for region: %s\n", cfg.Region)
65}
⚠️ Important: Never hardcode AWS credentials in your code. Always use the default credential chain. This ensures your code works seamlessly across development, testing, and production environments without security risks.
Service Clients
Service clients are like specialized remote controls for each cloud service. Once configured, they handle all the communication details, letting you focus on what you want to do rather than how to do it.
Client Creation and Configuration:
1package main
2
3import (
4 "context"
5 "time"
6
7 "github.com/aws/aws-sdk-go-v2/config"
8 "github.com/aws/aws-sdk-go-v2/service/s3"
9 "github.com/aws/aws-sdk-go-v2/aws"
10 "github.com/aws/aws-sdk-go-v2/aws/retry"
11)
12
13type AWSClients struct {
14 S3 *s3.Client
15 // Add other services as needed
16}
17
18func NewAWSClients(ctx context.Context) {
19 cfg, err := config.LoadDefaultConfig(ctx,
20 config.WithRegion("us-east-1"),
21 // Configure retries
22 config.WithRetryMode(aws.RetryModeAdaptive),
23 config.WithRetryMaxAttempts(3),
24 )
25 if err != nil {
26 return nil, err
27 }
28
29 // Create S3 client with custom options
30 s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) {
31 // Use path-style addressing
32 o.UsePathStyle = true
33
34 // Custom retry configuration per service
35 o.Retryer = retry.NewStandard(func(o *retry.StandardOptions) {
36 o.MaxAttempts = 5
37 o.MaxBackoff = 30 * time.Second
38 })
39 })
40
41 return &AWSClients{
42 S3: s3Client,
43 }, nil
44}
💡 Key Takeaway: Create service clients once and reuse them throughout your application. Clients are thread-safe and handle connection pooling internally. Creating new clients for every request is inefficient and can exhaust connection limits.
AWS Service Patterns
S3 Operations
S3 is like unlimited cloud storage with magical properties - it scales infinitely, is incredibly durable, and can serve files to millions of users simultaneously. The S3 client patterns below help you use this power effectively.
Production S3 Client:
1package awsutil
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "github.com/aws/aws-sdk-go-v2/aws"
10 "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
11 "github.com/aws/aws-sdk-go-v2/service/s3"
12 "github.com/aws/aws-sdk-go-v2/service/s3/types"
13)
14
15type S3Client struct {
16 client *s3.Client
17 uploader *manager.Uploader
18 downloader *manager.Downloader
19}
20
21func NewS3Client(cfg aws.Config) *S3Client {
22 client := s3.NewFromConfig(cfg)
23
24 return &S3Client{
25 client: client,
26 uploader: manager.NewUploader(client, func(u *manager.Uploader) {
27 // Configure multipart upload
28 u.PartSize = 10 * 1024 * 1024 // 10MB parts
29 u.Concurrency = 5
30 }),
31 downloader: manager.NewDownloader(client, func(d *manager.Downloader) {
32 // Configure multipart download
33 d.PartSize = 10 * 1024 * 1024
34 d.Concurrency = 5
35 }),
36 }
37}
Real-World Performance: With these multipart settings, uploading a 1GB file takes ~30 seconds compared to ~2 minutes for single-part uploads. The concurrent parts transfer data in parallel, maximizing network bandwidth utilization.
// Upload uploads a file to S3 with metadata
func Upload(ctx context.Context, bucket, key string, body io.Reader, metadata map[string]string) error {
_, err := s.uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: body,
Metadata: metadata,
// Server-side encryption
ServerSideEncryption: types.ServerSideEncryptionAes256,
// Storage class
StorageClass: types.StorageClassIntelligentTiering,
})
if err != nil {
return fmt.Errorf("upload failed: %w", err)
}
return nil
}
// Download downloads a file from S3
func Download(ctx context.Context, bucket, key string, w io.WriterAt) error {
_, err := s.downloader.Download(ctx, w, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("download failed: %w", err)
}
return nil
}
// ListObjects lists objects with pagination
func ListObjects(ctx context.Context, bucket, prefix string) {
var objects []types.Object
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get page: %w", err)
}
objects = append(objects, page.Contents...)
}
return objects, nil
}
// GeneratePresignedURL creates a presigned URL for temporary access
func GeneratePresignedURL(ctx context.Context, bucket, key string, duration time.Duration) {
presignClient := s3.NewPresignClient(s.client)
request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}, s3.WithPresignExpires(duration))
if err != nil {
return "", fmt.Errorf("failed to create presigned URL: %w", err)
}
return request.URL, nil
}
// CopyObject copies an object within S3
func CopyObject(ctx context.Context, sourceBucket, sourceKey, destBucket, destKey string) error {
_, err := s.client.CopyObject(ctx, &s3.CopyObjectInput{
CopySource: aws.String(fmt.Sprintf("%s/%s", sourceBucket, sourceKey)),
Bucket: aws.String(destBucket),
Key: aws.String(destKey),
})
if err != nil {
return fmt.Errorf("copy failed: %w", err)
}
return nil
}
// DeleteObjects deletes multiple objects
func DeleteObjects(ctx context.Context, bucket string, keys []string) error {
objects := make([]types.ObjectIdentifier, len(keys))
for i, key := range keys {
objects[i] = types.ObjectIdentifier{Key: aws.String(key)}
}
_, err := s.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &types.Delete{
Objects: objects,
Quiet: aws.Bool(true),
},
})
if err != nil {
return fmt.Errorf("delete failed: %w", err)
}
return nil
}
### DynamoDB Operations
**DynamoDB Client with Expression Builder:**
```go
package awsutil
import (
"context"
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
type DynamoDBClient struct {
client *dynamodb.Client
tableName string
}
func NewDynamoDBClient(cfg aws.Config, tableName string) *DynamoDBClient {
return &DynamoDBClient{
client: dynamodb.NewFromConfig(cfg),
tableName: tableName,
}
}
type User struct {
ID string `dynamodbav:"id"`
Email string `dynamodbav:"email"`
Name string `dynamodbav:"name"`
CreatedAt time.Time `dynamodbav:"created_at"`
UpdatedAt time.Time `dynamodbav:"updated_at"`
}
// PutItem writes an item to DynamoDB
func PutItem(ctx context.Context, user User) error {
user.UpdatedAt = time.Now()
if user.CreatedAt.IsZero() {
user.CreatedAt = time.Now()
}
av, err := attributevalue.MarshalMap(user)
if err != nil {
return fmt.Errorf("failed to marshal: %w", err)
}
_, err = d.client.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(d.tableName),
Item: av,
// Conditional write - only if item doesn't exist
ConditionExpression: aws.String("attribute_not_exists(id)"),
})
if err != nil {
return fmt.Errorf("put item failed: %w", err)
}
return nil
}
// GetItem retrieves an item by primary key
func GetItem(ctx context.Context, id string) {
result, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(d.tableName),
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{Value: id},
},
ConsistentRead: aws.Bool(true), // Strong consistency
})
if err != nil {
return nil, fmt.Errorf("get item failed: %w", err)
}
if result.Item == nil {
return nil, fmt.Errorf("item not found")
}
var user User
err = attributevalue.UnmarshalMap(result.Item, &user)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal: %w", err)
}
return &user, nil
}
// Query performs a query with filter expressions
func Query(ctx context.Context, email string) {
// Build expression
keyEx := expression.Key("email").Equal(expression.Value(email))
filterEx := expression.Name("active").Equal(expression.Value(true))
expr, err := expression.NewBuilder().
WithKeyCondition(keyEx).
WithFilter(filterEx).
Build()
if err != nil {
return nil, fmt.Errorf("failed to build expression: %w", err)
}
result, err := d.client.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(d.tableName),
IndexName: aws.String("email-index"),
KeyConditionExpression: expr.KeyCondition(),
FilterExpression: expr.Filter(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
})
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
var users []User
err = attributevalue.UnmarshalListOfMaps(result.Items, &users)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal: %w", err)
}
return users, nil
}
// UpdateItem updates specific attributes
func UpdateItem(ctx context.Context, id string, name string) error {
update := expression.Set(
expression.Name("name"),
expression.Value(name),
).Set(
expression.Name("updated_at"),
expression.Value(time.Now()),
)
expr, err := expression.NewBuilder().WithUpdate(update).Build()
if err != nil {
return fmt.Errorf("failed to build expression: %w", err)
}
_, err = d.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(d.tableName),
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{Value: id},
},
UpdateExpression: expr.Update(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
ReturnValues: types.ReturnValueUpdatedNew,
})
if err != nil {
return fmt.Errorf("update failed: %w", err)
}
return nil
}
// BatchWrite writes multiple items efficiently
func BatchWrite(ctx context.Context, users []User) error {
const batchSize = 25 // DynamoDB limit
for i := 0; i < len(users); i += batchSize {
end := i + batchSize
if end > len(users) {
end = len(users)
}
batch := users[i:end]
writeRequests := make([]types.WriteRequest, len(batch))
for j, user := range batch {
av, err := attributevalue.MarshalMap(user)
if err != nil {
return fmt.Errorf("failed to marshal: %w", err)
}
writeRequests[j] = types.WriteRequest{
PutRequest: &types.PutRequest{Item: av},
}
}
_, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
d.tableName: writeRequests,
},
})
if err != nil {
return fmt.Errorf("batch write failed: %w", err)
}
}
return nil
}
SQS and SNS Patterns
Message Queue Integration:
1package awsutil
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/aws/aws-sdk-go-v2/aws"
10 "github.com/aws/aws-sdk-go-v2/service/sqs"
11 "github.com/aws/aws-sdk-go-v2/service/sqs/types"
12 "github.com/aws/aws-sdk-go-v2/service/sns"
13)
14
15// SQSClient wraps SQS operations
16type SQSClient struct {
17 client *sqs.Client
18 queueURL string
19}
20
21func NewSQSClient(cfg aws.Config, queueURL string) *SQSClient {
22 return &SQSClient{
23 client: sqs.NewFromConfig(cfg),
24 queueURL: queueURL,
25 }
26}
27
28// SendMessage sends a message to SQS
29func SendMessage(ctx context.Context, body interface{}) error {
30 data, err := json.Marshal(body)
31 if err != nil {
32 return fmt.Errorf("marshal failed: %w", err)
33 }
34
35 _, err = s.client.SendMessage(ctx, &sqs.SendMessageInput{
36 QueueUrl: aws.String(s.queueURL),
37 MessageBody: aws.String(string(data)),
38 MessageAttributes: map[string]types.MessageAttributeValue{
39 "Timestamp": {
40 DataType: aws.String("String"),
41 StringValue: aws.String(time.Now().Format(time.RFC3339)),
42 },
43 },
44 })
45 if err != nil {
46 return fmt.Errorf("send message failed: %w", err)
47 }
48
49 return nil
50}
51
52// ReceiveMessages polls for messages
53func ReceiveMessages(ctx context.Context, maxMessages int32) {
54 result, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
55 QueueUrl: aws.String(s.queueURL),
56 MaxNumberOfMessages: maxMessages,
57 WaitTimeSeconds: 20, // Long polling
58 MessageAttributeNames: []string{
59 string(types.QueueAttributeNameAll),
60 },
61 })
62 if err != nil {
63 return nil, fmt.Errorf("receive failed: %w", err)
64 }
65
66 return result.Messages, nil
67}
68
69// DeleteMessage removes a processed message
70func DeleteMessage(ctx context.Context, receiptHandle string) error {
71 _, err := s.client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
72 QueueUrl: aws.String(s.queueURL),
73 ReceiptHandle: aws.String(receiptHandle),
74 })
75 if err != nil {
76 return fmt.Errorf("delete failed: %w", err)
77 }
78
79 return nil
80}
81
82// MessageProcessor handles message processing with retry
83type MessageProcessor struct {
84 client *SQSClient
85}
86
87func NewMessageProcessor(client *SQSClient) *MessageProcessor {
88 return &MessageProcessor{client: client}
89}
90
91// ProcessMessages continuously processes messages
92func ProcessMessages(ctx context.Context, handler func([]byte) error) error {
93 for {
94 select {
95 case <-ctx.Done():
96 return ctx.Err()
97 default:
98 messages, err := m.client.ReceiveMessages(ctx, 10)
99 if err != nil {
100 return err
101 }
102
103 for _, msg := range messages {
104 if err := handler([]byte(*msg.Body)); err != nil {
105 // Message will return to queue after visibility timeout
106 continue
107 }
108
109 // Delete successfully processed message
110 if err := m.client.DeleteMessage(ctx, *msg.ReceiptHandle); err != nil {
111 return err
112 }
113 }
114 }
115 }
116}
117
118// SNSClient wraps SNS operations
119type SNSClient struct {
120 client *sns.Client
121 topicARN string
122}
123
124func NewSNSClient(cfg aws.Config, topicARN string) *SNSClient {
125 return &SNSClient{
126 client: sns.NewFromConfig(cfg),
127 topicARN: topicARN,
128 }
129}
130
131// Publish publishes a message to SNS topic
132func Publish(ctx context.Context, message interface{}) error {
133 data, err := json.Marshal(message)
134 if err != nil {
135 return fmt.Errorf("marshal failed: %w", err)
136 }
137
138 _, err = s.client.Publish(ctx, &sns.PublishInput{
139 TopicArn: aws.String(s.topicARN),
140 Message: aws.String(string(data)),
141 MessageAttributes: map[string]types.MessageAttributeValue{
142 "event_type": {
143 DataType: aws.String("String"),
144 StringValue: aws.String("user.created"),
145 },
146 },
147 })
148 if err != nil {
149 return fmt.Errorf("publish failed: %w", err)
150 }
151
152 return nil
153}
Google Cloud Client Libraries
Google Cloud client libraries follow idiomatic Go patterns with context-first design.
Cloud Storage
Production GCS Client:
1package gcp
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "cloud.google.com/go/storage"
10 "google.golang.org/api/iterator"
11)
12
13type GCSClient struct {
14 client *storage.Client
15}
16
17func NewGCSClient(ctx context.Context) {
18 client, err := storage.NewClient(ctx)
19 if err != nil {
20 return nil, fmt.Errorf("failed to create client: %w", err)
21 }
22
23 return &GCSClient{client: client}, nil
24}
25
26func Close() error {
27 return g.client.Close()
28}
29
30// Upload uploads a file to GCS
31func Upload(ctx context.Context, bucket, object string, data io.Reader) error {
32 wc := g.client.Bucket(bucket).Object(object).NewWriter(ctx)
33
34 // Set metadata
35 wc.ContentType = "application/octet-stream"
36 wc.Metadata = map[string]string{
37 "uploaded_at": time.Now().Format(time.RFC3339),
38 }
39
40 if _, err := io.Copy(wc, data); err != nil {
41 return fmt.Errorf("io.Copy: %w", err)
42 }
43
44 if err := wc.Close(); err != nil {
45 return fmt.Errorf("Writer.Close: %w", err)
46 }
47
48 return nil
49}
50
51// Download downloads a file from GCS
52func Download(ctx context.Context, bucket, object string) {
53 rc, err := g.client.Bucket(bucket).Object(object).NewReader(ctx)
54 if err != nil {
55 return nil, fmt.Errorf("Object(%q).NewReader: %w", object, err)
56 }
57 defer rc.Close()
58
59 data, err := io.ReadAll(rc)
60 if err != nil {
61 return nil, fmt.Errorf("io.ReadAll: %w", err)
62 }
63
64 return data, nil
65}
66
67// List lists objects in a bucket with prefix
68func List(ctx context.Context, bucket, prefix string) {
69 var objects []string
70
71 it := g.client.Bucket(bucket).Objects(ctx, &storage.Query{
72 Prefix: prefix,
73 })
74
75 for {
76 attrs, err := it.Next()
77 if err == iterator.Done {
78 break
79 }
80 if err != nil {
81 return nil, fmt.Errorf("Bucket(%q).Objects: %w", bucket, err)
82 }
83 objects = append(objects, attrs.Name)
84 }
85
86 return objects, nil
87}
88
89// SignedURL generates a signed URL for temporary access
90func SignedURL(ctx context.Context, bucket, object string, duration time.Duration) {
91 opts := &storage.SignedURLOptions{
92 Method: "GET",
93 Expires: time.Now().Add(duration),
94 }
95
96 url, err := g.client.Bucket(bucket).SignedURL(object, opts)
97 if err != nil {
98 return "", fmt.Errorf("SignedURL: %w", err)
99 }
100
101 return url, nil
102}
Pub/Sub
Production Pub/Sub Client:
1package gcp
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "cloud.google.com/go/pubsub"
9)
10
11type PubSubClient struct {
12 client *pubsub.Client
13}
14
15func NewPubSubClient(ctx context.Context, projectID string) {
16 client, err := pubsub.NewClient(ctx, projectID)
17 if err != nil {
18 return nil, fmt.Errorf("pubsub.NewClient: %w", err)
19 }
20
21 return &PubSubClient{client: client}, nil
22}
23
24func Close() error {
25 return p.client.Close()
26}
27
28// Publish publishes messages to a topic
29func Publish(ctx context.Context, topicID string, data []byte) error {
30 topic := p.client.Topic(topicID)
31 defer topic.Stop()
32
33 result := topic.Publish(ctx, &pubsub.Message{
34 Data: data,
35 Attributes: map[string]string{
36 "origin": "go-service",
37 },
38 })
39
40 // Block until the result is returned
41 _, err := result.Get(ctx)
42 if err != nil {
43 return fmt.Errorf("Get: %w", err)
44 }
45
46 return nil
47}
48
49// BatchPublish publishes multiple messages efficiently
50func BatchPublish(ctx context.Context, topicID string, messages [][]byte) error {
51 topic := p.client.Topic(topicID)
52 defer topic.Stop()
53
54 var results []*pubsub.PublishResult
55 for _, data := range messages {
56 result := topic.Publish(ctx, &pubsub.Message{Data: data})
57 results = append(results, result)
58 }
59
60 // Wait for all publishes to complete
61 for _, result := range results {
62 if _, err := result.Get(ctx); err != nil {
63 return fmt.Errorf("Get: %w", err)
64 }
65 }
66
67 return nil
68}
69
70// Subscribe subscribes to a subscription with concurrent processing
71func Subscribe(ctx context.Context, subscriptionID string, handler func([]byte) error) error {
72 sub := p.client.Subscription(subscriptionID)
73
74 // Configure subscription
75 sub.ReceiveSettings.MaxOutstandingMessages = 100
76 sub.ReceiveSettings.NumGoroutines = 10
77
78 err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
79 if err := handler(msg.Data); err != nil {
80 msg.Nack() // Redelivery
81 return
82 }
83 msg.Ack()
84 })
85 if err != nil {
86 return fmt.Errorf("Receive: %w", err)
87 }
88
89 return nil
90}
Firestore
Firestore Document Operations:
1package gcp
2
3import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/firestore"
8 "google.golang.org/api/iterator"
9)
10
11type FirestoreClient struct {
12 client *firestore.Client
13}
14
15func NewFirestoreClient(ctx context.Context, projectID string) {
16 client, err := firestore.NewClient(ctx, projectID)
17 if err != nil {
18 return nil, fmt.Errorf("firestore.NewClient: %w", err)
19 }
20
21 return &FirestoreClient{client: client}, nil
22}
23
24func Close() error {
25 return f.client.Close()
26}
27
28type User struct {
29 Name string `firestore:"name"`
30 Email string `firestore:"email"`
31 CreatedAt time.Time `firestore:"created_at"`
32}
33
34// Create creates a new document
35func Create(ctx context.Context, collection, docID string, data interface{}) error {
36 _, err := f.client.Collection(collection).Doc(docID).Set(ctx, data)
37 if err != nil {
38 return fmt.Errorf("Set: %w", err)
39 }
40
41 return nil
42}
43
44// Get retrieves a document
45func Get(ctx context.Context, collection, docID string) {
46 doc, err := f.client.Collection(collection).Doc(docID).Get(ctx)
47 if err != nil {
48 return nil, fmt.Errorf("Get: %w", err)
49 }
50
51 var user User
52 if err := doc.DataTo(&user); err != nil {
53 return nil, fmt.Errorf("DataTo: %w", err)
54 }
55
56 return &user, nil
57}
58
59// Query performs a complex query
60func Query(ctx context.Context, collection string) {
61 iter := f.client.Collection(collection).
62 Where("active", "==", true).
63 OrderBy("created_at", firestore.Desc).
64 Limit(100).
65 Documents(ctx)
66
67 var users []User
68 for {
69 doc, err := iter.Next()
70 if err == iterator.Done {
71 break
72 }
73 if err != nil {
74 return nil, fmt.Errorf("iterator.Next: %w", err)
75 }
76
77 var user User
78 if err := doc.DataTo(&user); err != nil {
79 return nil, fmt.Errorf("DataTo: %w", err)
80 }
81 users = append(users, user)
82 }
83
84 return users, nil
85}
86
87// Transaction performs an atomic transaction
88func Transaction(ctx context.Context) error {
89 return f.client.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
90 // Read
91 docRef := f.client.Collection("counters").Doc("page_views")
92 doc, err := tx.Get(docRef)
93 if err != nil {
94 return err
95 }
96
97 var count int64
98 doc.DataTo(&count)
99
100 // Write
101 return tx.Set(docRef, map[string]interface{}{
102 "count": count + 1,
103 })
104 })
105}
Azure SDK for Go
Azure SDK uses azidentity for authentication and follows a credential-based pattern.
Azure Blob Storage
Production Blob Client:
1package azure
2
3import (
4 "context"
5 "fmt"
6 "io"
7
8 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
9 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
10)
11
12type BlobClient struct {
13 client *azblob.Client
14}
15
16func NewBlobClient(accountURL string) {
17 // Use DefaultAzureCredential for auth chain
18 cred, err := azidentity.NewDefaultAzureCredential(nil)
19 if err != nil {
20 return nil, fmt.Errorf("failed to create credential: %w", err)
21 }
22
23 client, err := azblob.NewClient(accountURL, cred, nil)
24 if err != nil {
25 return nil, fmt.Errorf("failed to create client: %w", err)
26 }
27
28 return &BlobClient{client: client}, nil
29}
30
31// Upload uploads a blob
32func Upload(ctx context.Context, container, blob string, data io.Reader) error {
33 _, err := b.client.UploadStream(ctx, container, blob, data, nil)
34 if err != nil {
35 return fmt.Errorf("UploadStream: %w", err)
36 }
37
38 return nil
39}
40
41// Download downloads a blob
42func Download(ctx context.Context, container, blob string) {
43 resp, err := b.client.DownloadStream(ctx, container, blob, nil)
44 if err != nil {
45 return nil, fmt.Errorf("DownloadStream: %w", err)
46 }
47
48 data, err := io.ReadAll(resp.Body)
49 if err != nil {
50 return nil, fmt.Errorf("ReadAll: %w", err)
51 }
52
53 return data, nil
54}
55
56// List lists blobs in a container
57func List(ctx context.Context, container string) {
58 var blobs []string
59
60 pager := b.client.NewListBlobsFlatPager(container, nil)
61 for pager.More() {
62 page, err := pager.NextPage(ctx)
63 if err != nil {
64 return nil, fmt.Errorf("NextPage: %w", err)
65 }
66
67 for _, blob := range page.Segment.BlobItems {
68 blobs = append(blobs, *blob.Name)
69 }
70 }
71
72 return blobs, nil
73}
Azure Service Bus
Service Bus Messaging:
1package azure
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
8 "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
9)
10
11type ServiceBusClient struct {
12 client *azservicebus.Client
13}
14
15func NewServiceBusClient(namespace string) {
16 cred, err := azidentity.NewDefaultAzureCredential(nil)
17 if err != nil {
18 return nil, err
19 }
20
21 client, err := azservicebus.NewClient(namespace, cred, nil)
22 if err != nil {
23 return nil, err
24 }
25
26 return &ServiceBusClient{client: client}, nil
27}
28
29func Close(ctx context.Context) error {
30 return s.client.Close(ctx)
31}
32
33// SendMessage sends a message to a queue
34func SendMessage(ctx context.Context, queueName string, message []byte) error {
35 sender, err := s.client.NewSender(queueName, nil)
36 if err != nil {
37 return fmt.Errorf("NewSender: %w", err)
38 }
39 defer sender.Close(ctx)
40
41 err = sender.SendMessage(ctx, &azservicebus.Message{
42 Body: message,
43 }, nil)
44 if err != nil {
45 return fmt.Errorf("SendMessage: %w", err)
46 }
47
48 return nil
49}
50
51// ReceiveMessages receives messages from a queue
52func ReceiveMessages(ctx context.Context, queueName string) error {
53 receiver, err := s.client.NewReceiverForQueue(queueName, nil)
54 if err != nil {
55 return fmt.Errorf("NewReceiverForQueue: %w", err)
56 }
57 defer receiver.Close(ctx)
58
59 messages, err := receiver.ReceiveMessages(ctx, 10, nil)
60 if err != nil {
61 return fmt.Errorf("ReceiveMessages: %w", err)
62 }
63
64 for _, msg := range messages {
65 fmt.Printf("Received: %s\n", string(msg.Body))
66
67 // Complete the message
68 err := receiver.CompleteMessage(ctx, msg, nil)
69 if err != nil {
70 return fmt.Errorf("CompleteMessage: %w", err)
71 }
72 }
73
74 return nil
75}
Authentication Strategies
Authentication is like showing different types of ID to enter a building. You might use a keycard, a temporary visitor pass, or a guest badge. Cloud SDKs automatically try different authentication methods until one works.
AWS IAM Roles and Profiles
Environment-Based Authentication:
1package auth
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/aws/aws-sdk-go-v2/config"
8 "github.com/aws/aws-sdk-go-v2/credentials"
9 "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
10 "github.com/aws/aws-sdk-go-v2/service/sts"
11)
12
13// LoadConfig loads AWS config with various auth strategies
14func LoadConfig(ctx context.Context, profile string) {
15 if profile != "" {
16 // Use specific profile
17 return config.LoadDefaultConfig(ctx,
18 config.WithSharedConfigProfile(profile),
19 )
20 }
21
22 // Use default credential chain
23 return config.LoadDefaultConfig(ctx)
24}
25
26// AssumeRole assumes an IAM role for cross-account access
27func AssumeRole(ctx context.Context, roleARN string) {
28 cfg, err := config.LoadDefaultConfig(ctx)
29 if err != nil {
30 return aws.Config{}, err
31 }
32
33 stsClient := sts.NewFromConfig(cfg)
34
35 creds := stscreds.NewAssumeRoleProvider(stsClient, roleARN, func(o *stscreds.AssumeRoleOptions) {
36 o.RoleSessionName = "go-application"
37 })
38
39 cfg.Credentials = aws.NewCredentialsCache(creds)
40
41 return cfg, nil
42}
43
44// StaticCredentials creates config from static credentials
45func StaticCredentials(ctx context.Context, accessKey, secretKey, region string) {
46 return config.LoadDefaultConfig(ctx,
47 config.WithCredentialsProvider(
48 credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""),
49 ),
50 config.WithRegion(region),
51 )
52}
Real-World Example: A microservices application running on ECS uses different authentication strategies:
- Local development: AWS profiles from
~/.aws/credentials - CI/CD pipeline: Environment variables from secure secrets
- Production: IAM roles attached to ECS tasks
- Cross-account access: Assume role to access shared services
💡 Key Takeaway: The AWS credential chain automatically handles all these scenarios. Your code doesn't need to change - just configure the environment appropriately.
Google Cloud Service Accounts
GCP Authentication:
1package auth
2
3import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/storage"
8 "google.golang.org/api/option"
9)
10
11// DefaultAuth uses Application Default Credentials
12func DefaultAuth(ctx context.Context) {
13 // Checks:
14 // 1. GOOGLE_APPLICATION_CREDENTIALS env var
15 // 2. gcloud auth application-default login
16 // 3. GCE/GKE metadata service
17 return storage.NewClient(ctx)
18}
19
20// ServiceAccountAuth uses service account key file
21func ServiceAccountAuth(ctx context.Context, keyFile string) {
22 return storage.NewClient(ctx,
23 option.WithCredentialsFile(keyFile),
24 )
25}
26
27// ServiceAccountJSON uses service account JSON bytes
28func ServiceAccountJSON(ctx context.Context, jsonKey []byte) {
29 return storage.NewClient(ctx,
30 option.WithCredentialsJSON(jsonKey),
31 )
32}
Azure Managed Identity
Azure Authentication:
1package auth
2
3import (
4 "fmt"
5
6 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
7 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
8)
9
10// ManagedIdentityAuth uses Azure Managed Identity
11func ManagedIdentityAuth(accountURL string) {
12 cred, err := azidentity.NewManagedIdentityCredential(nil)
13 if err != nil {
14 return nil, fmt.Errorf("NewManagedIdentityCredential: %w", err)
15 }
16
17 return azblob.NewClient(accountURL, cred, nil)
18}
19
20// DefaultAuth uses credential chain
21func DefaultAuth(accountURL string) {
22 // Tries in order:
23 // 1. Environment variables
24 // 2. Managed Identity
25 // 3. Azure CLI credentials
26 cred, err := azidentity.NewDefaultAzureCredential(nil)
27 if err != nil {
28 return nil, err
29 }
30
31 return azblob.NewClient(accountURL, cred, nil)
32}
Error Handling and Retries
Network failures are like delivery trucks getting stuck in traffic - sometimes you need to try a different route or wait for the traffic to clear. Proper error handling and retry logic make your cloud applications resilient to temporary failures.
AWS SDK v2 Retry Strategies
Configurable Retry Behavior:
1package retry
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/aws/aws-sdk-go-v2/aws"
9 "github.com/aws/aws-sdk-go-v2/aws/retry"
10 "github.com/aws/aws-sdk-go-v2/config"
11 "github.com/aws/aws-sdk-go-v2/service/s3"
12)
13
14// ConfigureRetries configures retry behavior
15func ConfigureRetries(ctx context.Context) {
16 return config.LoadDefaultConfig(ctx,
17 // Standard retry mode
18 config.WithRetryMode(aws.RetryModeStandard),
19
20 // Adaptive mode adjusts based on service responses
21 // config.WithRetryMode(aws.RetryModeAdaptive),
22
23 config.WithRetryMaxAttempts(5),
24 )
25}
26
27// CustomRetryer implements custom retry logic
28type CustomRetryer struct {
29 standard retry.Standard
30}
31
32func NewCustomRetryer() *CustomRetryer {
33 return &CustomRetryer{
34 standard: retry.NewStandard(func(o *retry.StandardOptions) {
35 o.MaxAttempts = 5
36 o.MaxBackoff = 30 * time.Second
37 }),
38 }
39}
40
41func IsErrorRetryable(err error) bool {
42 // Custom logic to determine if error is retryable
43 return r.standard.IsErrorRetryable(err)
44}
45
46func MaxAttempts() int {
47 return r.standard.MaxAttempts()
48}
49
50func RetryDelay(attempt int, err error) {
51 // Custom backoff strategy
52 delay := time.Duration(attempt) * time.Second
53 if delay > 30*time.Second {
54 delay = 30 * time.Second
55 }
56 return delay, nil
57}
58
59func GetRetryToken(ctx context.Context, err error) error, error) {
60 return r.standard.GetRetryToken(ctx, err)
61}
62
63func GetInitialToken() func(error) error {
64 return r.standard.GetInitialToken()
65}
Common Pitfalls in Error Handling:
- Treating all errors equally: Some errors should never be retried
- No exponential backoff: Fixed delays can overwhelm services during outages
- Missing context cancellation: Retries should respect context cancellation
- Inadequate logging: Not logging retry attempts makes debugging difficult
When to use Standard vs Adaptive retry mode:
- Standard mode: Predictable retry behavior, good for most applications
- Adaptive mode: Adjusts based on service-side response, better for high-throughput applications that can handle variable latency
Error Classification
Production Error Handling:
1package errorutil
2
3import (
4 "errors"
5 "fmt"
6 "log"
7
8 "github.com/aws/aws-sdk-go-v2/service/s3"
9 "github.com/aws/smithy-go"
10 "google.golang.org/api/googleapi"
11)
12
13// AWS Error Handling
14func HandleAWSError(err error) {
15 var ae smithy.APIError
16 if errors.As(err, &ae) {
17 log.Printf("AWS API Error: %s - %s", ae.ErrorCode(), ae.ErrorMessage())
18
19 switch ae.ErrorCode() {
20 case "NoSuchBucket":
21 // Handle missing bucket
22 case "AccessDenied":
23 // Handle permission error
24 case "InvalidAccessKeyId":
25 // Handle auth error
26 }
27 }
28}
29
30// GCP Error Handling
31func HandleGCPError(err error) {
32 var gerr *googleapi.Error
33 if errors.As(err, &gerr) {
34 log.Printf("GCP Error: %d - %s", gerr.Code, gerr.Message)
35
36 switch gerr.Code {
37 case 404:
38 // Handle not found
39 case 403:
40 // Handle permission denied
41 case 429:
42 // Handle rate limit
43 }
44 }
45}
46
47// Circuit Breaker for SDK calls
48type CircuitBreaker struct {
49 maxFailures int
50 resetTime time.Duration
51 failures int
52 lastFailure time.Time
53 state string // "closed", "open", "half-open"
54}
55
56func NewCircuitBreaker(maxFailures int, resetTime time.Duration) *CircuitBreaker {
57 return &CircuitBreaker{
58 maxFailures: maxFailures,
59 resetTime: resetTime,
60 state: "closed",
61 }
62}
63
64func Call(fn func() error) error {
65 if cb.state == "open" {
66 if time.Since(cb.lastFailure) > cb.resetTime {
67 cb.state = "half-open"
68 } else {
69 return fmt.Errorf("circuit breaker is open")
70 }
71 }
72
73 err := fn()
74 if err != nil {
75 cb.failures++
76 cb.lastFailure = time.Now()
77
78 if cb.failures >= cb.maxFailures {
79 cb.state = "open"
80 }
81 return err
82 }
83
84 if cb.state == "half-open" {
85 cb.state = "closed"
86 cb.failures = 0
87 }
88
89 return nil
90}
Multi-Cloud Abstraction
Multi-cloud abstraction is like having a universal remote control that works with any TV brand. Instead of learning different controls for Samsung, LG, and Sony, you have one interface that translates your commands appropriately for each device.
Storage Abstraction Layer
Cloud-Agnostic Storage Interface:
1package storage
2
3import (
4 "context"
5 "io"
6 "time"
7)
8
9// Storage defines cloud-agnostic storage interface
10type Storage interface {
11 Upload(ctx context.Context, path string, data io.Reader) error
12 Download(ctx context.Context, path string)
13 Delete(ctx context.Context, path string) error
14 List(ctx context.Context, prefix string)
15 GenerateURL(ctx context.Context, path string, expiry time.Duration)
16}
17
18// S3Storage implements Storage for AWS S3
19type S3Storage struct {
20 client *awsutil.S3Client
21 bucket string
22}
23
24func NewS3Storage(client *awsutil.S3Client, bucket string) *S3Storage {
25 return &S3Storage{
26 client: client,
27 bucket: bucket,
28 }
29}
30
31func Upload(ctx context.Context, path string, data io.Reader) error {
32 return s.client.Upload(ctx, s.bucket, path, data, nil)
33}
34
35func Download(ctx context.Context, path string) {
36 var buf bytes.Buffer
37 err := s.client.Download(ctx, s.bucket, path, &buf)
38 return buf.Bytes(), err
39}
40
41func Delete(ctx context.Context, path string) error {
42 return s.client.DeleteObjects(ctx, s.bucket, []string{path})
43}
44
45func List(ctx context.Context, prefix string) {
46 objects, err := s.client.ListObjects(ctx, s.bucket, prefix)
47 if err != nil {
48 return nil, err
49 }
50
51 var paths []string
52 for _, obj := range objects {
53 paths = append(paths, *obj.Key)
54 }
55 return paths, nil
56}
57
58func GenerateURL(ctx context.Context, path string, expiry time.Duration) {
59 return s.client.GeneratePresignedURL(ctx, s.bucket, path, expiry)
60}
61
62// GCSStorage implements Storage for Google Cloud Storage
63type GCSStorage struct {
64 client *gcp.GCSClient
65 bucket string
66}
67
68func NewGCSStorage(client *gcp.GCSClient, bucket string) *GCSStorage {
69 return &GCSStorage{
70 client: client,
71 bucket: bucket,
72 }
73}
74
75func Upload(ctx context.Context, path string, data io.Reader) error {
76 return g.client.Upload(ctx, g.bucket, path, data)
77}
78
79func Download(ctx context.Context, path string) {
80 return g.client.Download(ctx, g.bucket, path)
81}
82
83func Delete(ctx context.Context, path string) error {
84 // Implement delete
85 return nil
86}
87
88func List(ctx context.Context, prefix string) {
89 return g.client.List(ctx, g.bucket, prefix)
90}
91
92func GenerateURL(ctx context.Context, path string, expiry time.Duration) {
93 return g.client.SignedURL(ctx, g.bucket, path, expiry)
94}
95
96// Usage example
97func Example() {
98 ctx := context.Background()
99
100 // Use S3
101 var storage Storage
102 s3Client := // ... create S3 client
103 storage = NewS3Storage(s3Client, "my-bucket")
104
105 // Or use GCS
106 gcsClient := // ... create GCS client
107 storage = NewGCSStorage(gcsClient, "my-bucket")
108
109 // Use same interface
110 storage.Upload(ctx, "file.txt", strings.NewReader("data"))
111}
Multi-Provider Message Queue Abstraction
Queue Abstraction for SQS, Pub/Sub, and Service Bus:
1package queue
2
3import (
4 "context"
5 "time"
6)
7
8// Message represents a queue message
9type Message struct {
10 ID string
11 Body []byte
12 Attributes map[string]string
13 ReceiptHandle string
14 PublishTime time.Time
15}
16
17// Queue defines cloud-agnostic message queue interface
18type Queue interface {
19 Send(ctx context.Context, body []byte, attributes map[string]string) error
20 SendBatch(ctx context.Context, messages []Message) error
21 Receive(ctx context.Context, maxMessages int) ([]Message, error)
22 Delete(ctx context.Context, receiptHandle string) error
23 DeleteBatch(ctx context.Context, receiptHandles []string) error
24 ChangeVisibility(ctx context.Context, receiptHandle string, timeout time.Duration) error
25}
26
27// SQSQueue implements Queue for AWS SQS
28type SQSQueue struct {
29 client *awsutil.SQSClient
30 queueURL string
31}
32
33func NewSQSQueue(client *awsutil.SQSClient, queueURL string) *SQSQueue {
34 return &SQSQueue{
35 client: client,
36 queueURL: queueURL,
37 }
38}
39
40func (q *SQSQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
41 // Convert attributes to SQS format
42 return q.client.SendMessage(ctx, body)
43}
44
45func (q *SQSQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
46 sqsMessages, err := q.client.ReceiveMessages(ctx, int32(maxMessages))
47 if err != nil {
48 return nil, err
49 }
50
51 messages := make([]Message, len(sqsMessages))
52 for i, msg := range sqsMessages {
53 messages[i] = Message{
54 ID: *msg.MessageId,
55 Body: []byte(*msg.Body),
56 ReceiptHandle: *msg.ReceiptHandle,
57 }
58 }
59 return messages, nil
60}
61
62// PubSubQueue implements Queue for Google Cloud Pub/Sub
63type PubSubQueue struct {
64 client *gcp.PubSubClient
65 topicID string
66 subscriptionID string
67}
68
69func NewPubSubQueue(client *gcp.PubSubClient, topicID, subscriptionID string) *PubSubQueue {
70 return &PubSubQueue{
71 client: client,
72 topicID: topicID,
73 subscriptionID: subscriptionID,
74 }
75}
76
77func (q *PubSubQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
78 return q.client.Publish(ctx, q.topicID, body)
79}
80
81func (q *PubSubQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
82 // Pub/Sub uses pull subscriptions
83 var messages []Message
84 // Implementation details...
85 return messages, nil
86}
87
88// ServiceBusQueue implements Queue for Azure Service Bus
89type ServiceBusQueue struct {
90 client *azure.ServiceBusClient
91 queueName string
92}
93
94func NewServiceBusQueue(client *azure.ServiceBusClient, queueName string) *ServiceBusQueue {
95 return &ServiceBusQueue{
96 client: client,
97 queueName: queueName,
98 }
99}
100
101func (q *ServiceBusQueue) Send(ctx context.Context, body []byte, attributes map[string]string) error {
102 return q.client.SendMessage(ctx, q.queueName, body)
103}
104
105func (q *ServiceBusQueue) Receive(ctx context.Context, maxMessages int) ([]Message, error) {
106 // Azure Service Bus implementation
107 var messages []Message
108 // Implementation details...
109 return messages, nil
110}
Database Abstraction Layer
Multi-Cloud Database Interface:
1package database
2
3import (
4 "context"
5)
6
7// Document represents a database document
8type Document map[string]interface{}
9
10// QueryResult represents query results
11type QueryResult struct {
12 Items []Document
13 NextToken string
14 Count int
15}
16
17// Database defines cloud-agnostic NoSQL database interface
18type Database interface {
19 Put(ctx context.Context, table string, item Document) error
20 Get(ctx context.Context, table string, key Document) (Document, error)
21 Query(ctx context.Context, table string, filter Document) (*QueryResult, error)
22 Update(ctx context.Context, table string, key Document, updates Document) error
23 Delete(ctx context.Context, table string, key Document) error
24 BatchWrite(ctx context.Context, table string, items []Document) error
25}
26
27// DynamoDBDatabase implements Database for AWS DynamoDB
28type DynamoDBDatabase struct {
29 client *awsutil.DynamoDBClient
30}
31
32func NewDynamoDBDatabase(client *awsutil.DynamoDBClient) *DynamoDBDatabase {
33 return &DynamoDBDatabase{client: client}
34}
35
36func (db *DynamoDBDatabase) Put(ctx context.Context, table string, item Document) error {
37 // Convert Document to DynamoDB AttributeValue
38 // Use PutItem API
39 return nil
40}
41
42func (db *DynamoDBDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
43 // Build DynamoDB query expression
44 // Execute query with pagination
45 return &QueryResult{}, nil
46}
47
48// FirestoreDatabase implements Database for Google Cloud Firestore
49type FirestoreDatabase struct {
50 client *gcp.FirestoreClient
51}
52
53func NewFirestoreDatabase(client *gcp.FirestoreClient) *FirestoreDatabase {
54 return &FirestoreDatabase{client: client}
55}
56
57func (db *FirestoreDatabase) Put(ctx context.Context, table string, item Document) error {
58 // Use Firestore Set operation
59 return db.client.Create(ctx, table, item["id"].(string), item)
60}
61
62func (db *FirestoreDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
63 // Build Firestore query
64 // Execute with filters
65 return &QueryResult{}, nil
66}
67
68// CosmosDBDatabase implements Database for Azure Cosmos DB
69type CosmosDBDatabase struct {
70 // Azure Cosmos DB client
71}
72
73func NewCosmosDBDatabase() *CosmosDBDatabase {
74 return &CosmosDBDatabase{}
75}
76
77func (db *CosmosDBDatabase) Put(ctx context.Context, table string, item Document) error {
78 // Use Cosmos DB upsert
79 return nil
80}
81
82func (db *CosmosDBDatabase) Query(ctx context.Context, table string, filter Document) (*QueryResult, error) {
83 // Execute SQL query on Cosmos DB
84 return &QueryResult{}, nil
85}
Real-World Multi-Cloud Strategy: A financial services company uses this abstraction to run the same application on AWS (primary), GCP (disaster recovery), and Azure (regulatory compliance). The abstraction layer allows them to:
- Switch providers with minimal code changes
- Run multi-region deployments across different clouds
- Avoid vendor lock-in
- Optimize costs by using the cheapest provider for each service
💡 Key Takeaway: Multi-cloud abstraction adds complexity but provides flexibility. Only implement it if you have genuine multi-cloud requirements. For most applications, optimizing for a single cloud provider yields better performance and lower costs.
Production Best Practices
Connection Pooling
Connection pooling is like having a dedicated team of delivery workers always ready, instead of hiring new people for each delivery. This eliminates the overhead of constantly establishing new connections.
Reuse Clients Across Requests:
1package best
2
3import (
4 "context"
5 "sync"
6
7 "github.com/aws/aws-sdk-go-v2/config"
8 "github.com/aws/aws-sdk-go-v2/service/s3"
9)
10
11// ClientPool manages reusable SDK clients
12type ClientPool struct {
13 s3Client *s3.Client
14 once sync.Once
15}
16
17func GetS3Client(ctx context.Context) {
18 var err error
19 p.once.Do(func() {
20 var cfg aws.Config
21 cfg, err = config.LoadDefaultConfig(ctx)
22 if err != nil {
23 return
24 }
25 p.s3Client = s3.NewFromConfig(cfg)
26 })
27
28 return p.s3Client, err
29}
30
31// Global client pool
32var globalPool = &ClientPool{}
33
34func GetS3Client(ctx context.Context) {
35 return globalPool.GetS3Client(ctx)
36}
Real-World Impact: A web application that reuses S3 clients saw a 90% reduction in API latency and 80% fewer connection errors after implementing proper client reuse.
⚠️ Important: Never store credentials in global variables. Only store the configured clients. The credential chain should be evaluated at runtime to handle different environments securely.
Metrics and Monitoring
Instrument SDK Calls:
1package monitoring
2
3import (
4 "context"
5 "time"
6
7 "github.com/prometheus/client_golang/prometheus"
8 "github.com/prometheus/client_golang/prometheus/promauto"
9)
10
11var (
12 sdkCallDuration = promauto.NewHistogramVec(
13 prometheus.HistogramOpts{
14 Name: "cloud_sdk_call_duration_seconds",
15 Help: "Duration of cloud SDK calls",
16 Buckets: prometheus.DefBuckets,
17 },
18 []string{"service", "operation", "status"},
19 )
20
21 sdkCallTotal = promauto.NewCounterVec(
22 prometheus.CounterOpts{
23 Name: "cloud_sdk_calls_total",
24 Help: "Total number of cloud SDK calls",
25 },
26 []string{"service", "operation", "status"},
27 )
28)
29
30// InstrumentedS3Client wraps S3 client with metrics
31type InstrumentedS3Client struct {
32 client *s3.Client
33}
34
35func GetObject(ctx context.Context, input *s3.GetObjectInput) {
36 start := time.Now()
37
38 output, err := i.client.GetObject(ctx, input)
39
40 status := "success"
41 if err != nil {
42 status = "error"
43 }
44
45 duration := time.Since(start).Seconds()
46 sdkCallDuration.WithLabelValues("s3", "GetObject", status).Observe(duration)
47 sdkCallTotal.WithLabelValues("s3", "GetObject", status).Inc()
48
49 return output, err
50}
Cost Optimization
Cloud services are like utility bills - you pay for what you use. Smart optimization can reduce your bill by 50-90% while maintaining or even improving performance.
Reduce API Calls:
1package optimization
2
3import (
4 "context"
5 "sync"
6 "time"
7)
8
9// CachedStorage adds caching layer to storage
10type CachedStorage struct {
11 storage Storage
12 cache map[string]cachedItem
13 mu sync.RWMutex
14 ttl time.Duration
15}
16
17type cachedItem struct {
18 data []byte
19 expiresAt time.Time
20}
21
22func NewCachedStorage(storage Storage, ttl time.Duration) *CachedStorage {
23 return &CachedStorage{
24 storage: storage,
25 cache: make(map[string]cachedItem),
26 ttl: ttl,
27 }
28}
29
30func Download(ctx context.Context, path string) {
31 // Check cache
32 c.mu.RLock()
33 if item, ok := c.cache[path]; ok && time.Now().Before(item.expiresAt) {
34 c.mu.RUnlock()
35 return item.data, nil
36 }
37 c.mu.RUnlock()
38
39 // Fetch from storage
40 data, err := c.storage.Download(ctx, path)
41 if err != nil {
42 return nil, err
43 }
44
45 // Update cache
46 c.mu.Lock()
47 c.cache[path] = cachedItem{
48 data: data,
49 expiresAt: time.Now().Add(c.ttl),
50 }
51 c.mu.Unlock()
52
53 return data, nil
54}
55
56// BatchOperations groups operations to reduce API calls
57func BatchUpload(ctx context.Context, storage Storage, files map[string]io.Reader) error {
58 errCh := make(chan error, len(files))
59 sem := make(chan struct{}, 10) // Limit concurrency
60
61 for path, data := range files {
62 sem <- struct{}{}
63 go func(p string, d io.Reader) {
64 defer func() { <-sem }()
65 errCh <- storage.Upload(ctx, p, d)
66 }(path, data)
67 }
68
69 // Wait for all uploads
70 for i := 0; i < len(files); i++ {
71 if err := <-errCh; err != nil {
72 return err
73 }
74 }
75
76 return nil
77}
Real-World Savings: An image processing service implemented these optimizations:
- Caching: Reduced S3 GET requests by 80%
- Batch operations: Reduced DynamoDB writes by 90%
- Overall savings: $1,200/month reduction in AWS costs
- Performance improvement: 60% faster response times due to cache hits
Cost Optimization vs Performance Trade-offs:
- More caching: Lower costs, faster reads, but potential stale data
- Larger batch sizes: Fewer API calls but higher memory usage
- Aggressive timeouts: Lower costs but more failures
- Compression: Lower transfer costs but higher CPU usage
💡 Key Takeaway: Always measure before and after optimizations. Sometimes the "optimization" actually increases costs due to increased complexity or memory usage.
IAM and Authentication Patterns
Identity and Access Management (IAM) is like the security system for your cloud applications. Just as a building has key cards, security guards, and visitor logs, cloud IAM provides authentication (who you are) and authorization (what you can do).
Cross-Account Access Patterns
AWS Cross-Account Access with Assume Role:
1package iam
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/aws/aws-sdk-go-v2/aws"
9 "github.com/aws/aws-sdk-go-v2/config"
10 "github.com/aws/aws-sdk-go-v2/credentials/stscreds"
11 "github.com/aws/aws-sdk-go-v2/service/sts"
12 "github.com/aws/aws-sdk-go-v2/service/s3"
13)
14
15// CrossAccountClient manages cross-account access
16type CrossAccountClient struct {
17 accounts map[string]aws.Config
18}
19
20func NewCrossAccountClient() *CrossAccountClient {
21 return &CrossAccountClient{
22 accounts: make(map[string]aws.Config),
23 }
24}
25
26// AssumeRole creates credentials for cross-account access
27func AssumeRole(ctx context.Context, roleARN, externalID string, duration time.Duration) (aws.Config, error) {
28 // Load default config
29 cfg, err := config.LoadDefaultConfig(ctx)
30 if err != nil {
31 return aws.Config{}, fmt.Errorf("failed to load config: %w", err)
32 }
33
34 // Create STS client
35 stsClient := sts.NewFromConfig(cfg)
36
37 // Create assume role provider
38 provider := stscreds.NewAssumeRoleProvider(stsClient, roleARN, func(o *stscreds.AssumeRoleOptions) {
39 o.RoleSessionName = "go-application"
40 o.ExternalID = aws.String(externalID)
41 o.Duration = duration
42 })
43
44 // Create new config with assumed role credentials
45 cfg.Credentials = aws.NewCredentialsCache(provider)
46
47 return cfg, nil
48}
49
50// GetS3ClientForAccount gets S3 client for specific account
51func GetS3ClientForAccount(ctx context.Context, accountID, roleARN, externalID string) (*s3.Client, error) {
52 // Check cache first
53 if cfg, ok := c.accounts[accountID]; ok {
54 return s3.NewFromConfig(cfg), nil
55 }
56
57 // Assume role
58 cfg, err := AssumeRole(ctx, roleARN, externalID, 1*time.Hour)
59 if err != nil {
60 return nil, fmt.Errorf("failed to assume role: %w", err)
61 }
62
63 // Cache configuration
64 c.accounts[accountID] = cfg
65
66 return s3.NewFromConfig(cfg), nil
67}
68
69// RefreshCredentials refreshes expired credentials
70func RefreshCredentials(ctx context.Context, accountID, roleARN, externalID string) error {
71 cfg, err := AssumeRole(ctx, roleARN, externalID, 1*time.Hour)
72 if err != nil {
73 return err
74 }
75
76 c.accounts[accountID] = cfg
77 return nil
78}
Real-World Scenario: A SaaS platform stores customer data in separate AWS accounts for isolation. The application uses cross-account access to:
- Read/write customer data from isolated accounts
- Maintain security boundaries between customers
- Simplify billing and cost tracking per customer
- Meet compliance requirements for data isolation
Service Account Management
Google Cloud Service Account Patterns:
1package iam
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "cloud.google.com/go/storage"
9 "golang.org/x/oauth2/google"
10 "google.golang.org/api/option"
11)
12
13// ServiceAccountManager manages GCP service accounts
14type ServiceAccountManager struct {
15 projectID string
16}
17
18func NewServiceAccountManager(projectID string) *ServiceAccountManager {
19 return &ServiceAccountManager{projectID: projectID}
20}
21
22// CreateClientWithServiceAccount creates client using service account
23func CreateClientWithServiceAccount(ctx context.Context, keyJSON []byte) (*storage.Client, error) {
24 creds, err := google.CredentialsFromJSON(ctx, keyJSON, storage.ScopeReadWrite)
25 if err != nil {
26 return nil, fmt.Errorf("failed to create credentials: %w", err)
27 }
28
29 client, err := storage.NewClient(ctx, option.WithCredentials(creds))
30 if err != nil {
31 return nil, fmt.Errorf("failed to create client: %w", err)
32 }
33
34 return client, nil
35}
36
37// ImpersonateServiceAccount impersonates a service account
38func ImpersonateServiceAccount(ctx context.Context, targetServiceAccount string) (*storage.Client, error) {
39 // Load default credentials
40 creds, err := google.FindDefaultCredentials(ctx, storage.ScopeReadWrite)
41 if err != nil {
42 return nil, err
43 }
44
45 // Create impersonation configuration
46 ts, err := google.ImpersonateTokenSource(
47 ctx,
48 creds.TokenSource,
49 targetServiceAccount,
50 []string{storage.ScopeReadWrite},
51 )
52 if err != nil {
53 return nil, err
54 }
55
56 client, err := storage.NewClient(ctx, option.WithTokenSource(ts))
57 if err != nil {
58 return nil, err
59 }
60
61 return client, nil
62}
63
64// RotateServiceAccountKey demonstrates key rotation pattern
65func RotateServiceAccountKey(ctx context.Context, oldKeyJSON, newKeyJSON []byte) error {
66 // Create new client with new key
67 newClient, err := sam.CreateClientWithServiceAccount(ctx, newKeyJSON)
68 if err != nil {
69 return fmt.Errorf("failed to create client with new key: %w", err)
70 }
71 defer newClient.Close()
72
73 // Test new credentials
74 bucket := newClient.Bucket("test-bucket")
75 if _, err := bucket.Attrs(ctx); err != nil {
76 return fmt.Errorf("new credentials test failed: %w", err)
77 }
78
79 // Old credentials can now be safely deleted
80 return nil
81}
Workload Identity Federation
Azure Managed Identity with Workload Identity:
1package iam
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
8 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
9)
10
11// WorkloadIdentityClient manages workload identity
12type WorkloadIdentityClient struct {
13 credential *azidentity.WorkloadIdentityCredential
14}
15
16func NewWorkloadIdentityClient(ctx context.Context, tenantID, clientID, tokenFilePath string) (*WorkloadIdentityClient, error) {
17 cred, err := azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
18 TenantID: tenantID,
19 ClientID: clientID,
20 TokenFilePath: tokenFilePath,
21 })
22 if err != nil {
23 return nil, fmt.Errorf("failed to create workload identity credential: %w", err)
24 }
25
26 return &WorkloadIdentityClient{credential: cred}, nil
27}
28
29// GetBlobClient creates blob client with workload identity
30func GetBlobClient(ctx context.Context, accountURL string) (*azblob.Client, error) {
31 client, err := azblob.NewClient(accountURL, w.credential, nil)
32 if err != nil {
33 return nil, fmt.Errorf("failed to create blob client: %w", err)
34 }
35
36 return client, nil
37}
38
39// ChainedCredentials demonstrates credential chaining
40func ChainedCredentials(ctx context.Context) (*azidentity.ChainedTokenCredential, error) {
41 // Try multiple credential types in order
42 cred, err := azidentity.NewChainedTokenCredential([]azidentity.TokenCredential{
43 // First try: Managed Identity (for Azure VMs, App Service, etc.)
44 &azidentity.ManagedIdentityCredential{},
45 // Second try: Environment variables
46 &azidentity.EnvironmentCredential{},
47 // Third try: Azure CLI credentials
48 &azidentity.AzureCLICredential{},
49 }, nil)
50
51 if err != nil {
52 return nil, fmt.Errorf("failed to create chained credential: %w", err)
53 }
54
55 return cred, nil
56}
Production Best Practice: Use workload identity instead of storing credentials as secrets. This eliminates credential leakage risks, simplifies rotation, and provides built-in credential lifecycle management.
Network and VPC Configurations
Network configuration is like designing the plumbing and electrical systems for a building. You need to plan how data flows between services, where firewalls should be, and how to connect to external systems.
VPC Endpoint Configuration
AWS VPC Endpoint for S3 Access:
1package network
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/aws/aws-sdk-go-v2/aws"
8 "github.com/aws/aws-sdk-go-v2/config"
9 "github.com/aws/aws-sdk-go-v2/service/s3"
10)
11
12// VPCEndpointClient configures S3 access via VPC endpoint
13type VPCEndpointClient struct {
14 client *s3.Client
15}
16
17func NewVPCEndpointClient(ctx context.Context, vpcEndpointURL string) (*VPCEndpointClient, error) {
18 cfg, err := config.LoadDefaultConfig(ctx)
19 if err != nil {
20 return nil, err
21 }
22
23 // Configure S3 to use VPC endpoint
24 client := s3.NewFromConfig(cfg, func(o *s3.Options) {
25 // Use custom endpoint for VPC
26 o.BaseEndpoint = aws.String(vpcEndpointURL)
27 // Force path-style addressing for VPC endpoints
28 o.UsePathStyle = true
29 })
30
31 return &VPCEndpointClient{client: client}, nil
32}
33
34// AccessS3ViaVPC accesses S3 through VPC endpoint
35func AccessS3ViaVPC(ctx context.Context, bucket, key string) ([]byte, error) {
36 // This request goes through VPC endpoint, not public internet
37 result, err := v.client.GetObject(ctx, &s3.GetObjectInput{
38 Bucket: aws.String(bucket),
39 Key: aws.String(key),
40 })
41 if err != nil {
42 return nil, err
43 }
44 defer result.Body.Close()
45
46 return io.ReadAll(result.Body)
47}
Cost Impact: VPC endpoints eliminate data transfer costs for traffic between your VPC and AWS services. For high-volume applications, this can save thousands of dollars monthly while improving security by keeping traffic off the public internet.
Private Service Connect
Google Cloud Private Service Connect:
1package network
2
3import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/storage"
8 "google.golang.org/api/option"
9)
10
11// PrivateServiceConnectClient uses Private Service Connect
12type PrivateServiceConnectClient struct {
13 client *storage.Client
14}
15
16func NewPrivateServiceConnectClient(ctx context.Context, endpoint string) (*PrivateServiceConnectClient, error) {
17 // Configure client to use private endpoint
18 client, err := storage.NewClient(ctx,
19 option.WithEndpoint(endpoint),
20 )
21 if err != nil {
22 return nil, fmt.Errorf("failed to create client: %w", err)
23 }
24
25 return &PrivateServiceConnectClient{client: client}, nil
26}
27
28// AccessStoragePrivately accesses storage via private network
29func AccessStoragePrivately(ctx context.Context, bucket, object string) ([]byte, error) {
30 // Traffic goes through Private Service Connect
31 rc, err := p.client.Bucket(bucket).Object(object).NewReader(ctx)
32 if err != nil {
33 return nil, err
34 }
35 defer rc.Close()
36
37 return io.ReadAll(rc)
38}
Network Security Groups
Azure NSG-Aware Client Configuration:
1package network
2
3import (
4 "context"
5 "fmt"
6 "net"
7 "net/http"
8 "time"
9
10 "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
11 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
12)
13
14// SecureNetworkClient configures secure network access
15type SecureNetworkClient struct {
16 client *azblob.Client
17}
18
19func NewSecureNetworkClient(ctx context.Context, accountURL string) (*SecureNetworkClient, error) {
20 cred, err := azidentity.NewDefaultAzureCredential(nil)
21 if err != nil {
22 return nil, err
23 }
24
25 // Configure HTTP client with network restrictions
26 httpClient := &http.Client{
27 Transport: &http.Transport{
28 DialContext: (&net.Dialer{
29 Timeout: 30 * time.Second,
30 KeepAlive: 30 * time.Second,
31 }).DialContext,
32 MaxIdleConns: 100,
33 IdleConnTimeout: 90 * time.Second,
34 TLSHandshakeTimeout: 10 * time.Second,
35 ExpectContinueTimeout: 1 * time.Second,
36 // Disable HTTP/2 for better connection pooling control
37 ForceAttemptHTTP2: false,
38 },
39 Timeout: 60 * time.Second,
40 }
41
42 client, err := azblob.NewClient(accountURL, cred, &azblob.ClientOptions{
43 ClientOptions: azcore.ClientOptions{
44 Transport: httpClient,
45 },
46 })
47 if err != nil {
48 return nil, err
49 }
50
51 return &SecureNetworkClient{client: client}, nil
52}
Database Migration Patterns
Database migration is like moving from an apartment to a house. You need to plan what to move, how to move it, and ensure nothing is lost in the process while minimizing downtime.
SQL to NoSQL Migration
PostgreSQL to DynamoDB Migration Pattern:
1package migration
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
11 "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
12)
13
14// Migrator handles database migration
15type Migrator struct {
16 sourceDB *sql.DB
17 targetDB *dynamodb.Client
18 batchSize int
19 tableName string
20}
21
22func NewMigrator(sourceDB *sql.DB, targetDB *dynamodb.Client, tableName string) *Migrator {
23 return &Migrator{
24 sourceDB: sourceDB,
25 targetDB: targetDB,
26 batchSize: 25, // DynamoDB batch limit
27 tableName: tableName,
28 }
29}
30
31// MigrateUsers migrates users from PostgreSQL to DynamoDB
32func MigrateUsers(ctx context.Context) error {
33 // Query source database
34 rows, err := m.sourceDB.QueryContext(ctx,
35 "SELECT id, email, name, created_at, metadata FROM users ORDER BY id")
36 if err != nil {
37 return fmt.Errorf("query failed: %w", err)
38 }
39 defer rows.Close()
40
41 var batch []map[string]interface{}
42 count := 0
43
44 for rows.Next() {
45 var id, email, name string
46 var createdAt time.Time
47 var metadata []byte
48
49 if err := rows.Scan(&id, &email, &name, &createdAt, &metadata); err != nil {
50 return err
51 }
52
53 // Transform to DynamoDB format
54 item := map[string]interface{}{
55 "id": id,
56 "email": email,
57 "name": name,
58 "created_at": createdAt.Unix(),
59 "metadata": string(metadata),
60 "migrated_at": time.Now().Unix(),
61 }
62
63 batch = append(batch, item)
64
65 // Write batch when full
66 if len(batch) >= m.batchSize {
67 if err := m.writeBatch(ctx, batch); err != nil {
68 return err
69 }
70 count += len(batch)
71 log.Printf("Migrated %d records", count)
72 batch = batch[:0]
73 }
74 }
75
76 // Write remaining items
77 if len(batch) > 0 {
78 if err := m.writeBatch(ctx, batch); err != nil {
79 return err
80 }
81 count += len(batch)
82 }
83
84 log.Printf("Migration complete: %d total records", count)
85 return nil
86}
87
88// writeBatch writes a batch to DynamoDB
89func writeBatch(ctx context.Context, items []map[string]interface{}) error {
90 writeRequests := make([]types.WriteRequest, len(items))
91
92 for i, item := range items {
93 av, err := attributevalue.MarshalMap(item)
94 if err != nil {
95 return err
96 }
97
98 writeRequests[i] = types.WriteRequest{
99 PutRequest: &types.PutRequest{Item: av},
100 }
101 }
102
103 _, err := m.targetDB.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
104 RequestItems: map[string][]types.WriteRequest{
105 m.tableName: writeRequests,
106 },
107 })
108
109 return err
110}
111
112// DualWrite writes to both databases during migration
113func DualWrite(ctx context.Context, user User) error {
114 // Write to PostgreSQL
115 _, err := m.sourceDB.ExecContext(ctx,
116 "INSERT INTO users (id, email, name, created_at) VALUES ($1, $2, $3, $4)",
117 user.ID, user.Email, user.Name, user.CreatedAt)
118 if err != nil {
119 return fmt.Errorf("postgres write failed: %w", err)
120 }
121
122 // Write to DynamoDB
123 av, err := attributevalue.MarshalMap(user)
124 if err != nil {
125 return err
126 }
127
128 _, err = m.targetDB.PutItem(ctx, &dynamodb.PutItemInput{
129 TableName: aws.String(m.tableName),
130 Item: av,
131 })
132 if err != nil {
133 log.Printf("WARNING: DynamoDB write failed: %v", err)
134 // Don't fail the request - log for later reconciliation
135 }
136
137 return nil
138}
139
140// VerifyMigration verifies data consistency
141func VerifyMigration(ctx context.Context) error {
142 // Count records in source
143 var sourceCount int
144 err := m.sourceDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM users").Scan(&sourceCount)
145 if err != nil {
146 return err
147 }
148
149 // Count records in target
150 result, err := m.targetDB.Scan(ctx, &dynamodb.ScanInput{
151 TableName: aws.String(m.tableName),
152 Select: types.SelectCount,
153 })
154 if err != nil {
155 return err
156 }
157
158 targetCount := int(result.Count)
159
160 if sourceCount != targetCount {
161 return fmt.Errorf("count mismatch: source=%d, target=%d", sourceCount, targetCount)
162 }
163
164 log.Printf("Verification successful: %d records in both databases", sourceCount)
165 return nil
166}
Migration Strategy: The dual-write pattern ensures zero downtime migration:
- Phase 1: Migrate existing data in batches
- Phase 2: Dual-write new records to both databases
- Phase 3: Verify data consistency
- Phase 4: Switch reads to new database
- Phase 5: Stop writes to old database
- Phase 6: Decommission old database
NoSQL to NoSQL Migration
DynamoDB to Firestore Migration:
1package migration
2
3import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/firestore"
8 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
9)
10
11// NoSQLMigrator migrates between NoSQL databases
12type NoSQLMigrator struct {
13 sourceDynamoDB *dynamodb.Client
14 targetFirestore *firestore.Client
15}
16
17func NewNoSQLMigrator(dynamo *dynamodb.Client, fs *firestore.Client) *NoSQLMigrator {
18 return &NoSQLMigrator{
19 sourceDynamoDB: dynamo,
20 targetFirestore: fs,
21 }
22}
23
24// MigrateTable migrates entire DynamoDB table to Firestore
25func MigrateTable(ctx context.Context, dynamoTable, firestoreCollection string) error {
26 // Scan DynamoDB table
27 paginator := dynamodb.NewScanPaginator(n.sourceDynamoDB, &dynamodb.ScanInput{
28 TableName: aws.String(dynamoTable),
29 })
30
31 count := 0
32 for paginator.HasMorePages() {
33 page, err := paginator.NextPage(ctx)
34 if err != nil {
35 return fmt.Errorf("scan failed: %w", err)
36 }
37
38 // Convert and write to Firestore
39 for _, item := range page.Items {
40 doc := convertDynamoToFirestore(item)
41
42 docID := doc["id"].(string)
43 _, err := n.targetFirestore.Collection(firestoreCollection).Doc(docID).Set(ctx, doc)
44 if err != nil {
45 return fmt.Errorf("firestore write failed: %w", err)
46 }
47 count++
48 }
49
50 log.Printf("Migrated %d documents", count)
51 }
52
53 return nil
54}
55
56func convertDynamoToFirestore(item map[string]types.AttributeValue) map[string]interface{} {
57 // Convert DynamoDB attribute values to Firestore format
58 doc := make(map[string]interface{})
59
60 for key, value := range item {
61 switch v := value.(type) {
62 case *types.AttributeValueMemberS:
63 doc[key] = v.Value
64 case *types.AttributeValueMemberN:
65 // Convert number string to actual number
66 doc[key] = v.Value
67 case *types.AttributeValueMemberBOOL:
68 doc[key] = v.Value
69 // Add other type conversions as needed
70 }
71 }
72
73 return doc
74}
Monitoring and Observability Integration
Monitoring is like installing security cameras and sensors throughout a building. You need to know what's happening, detect problems early, and diagnose issues quickly when they occur.
SDK Metrics and Tracing
Comprehensive SDK Instrumentation:
1package observability
2
3import (
4 "context"
5 "time"
6
7 "github.com/aws/aws-sdk-go-v2/aws"
8 "github.com/aws/aws-sdk-go-v2/service/s3"
9 "github.com/prometheus/client_golang/prometheus"
10 "github.com/prometheus/client_golang/prometheus/promauto"
11 "go.opentelemetry.io/otel"
12 "go.opentelemetry.io/otel/attribute"
13 "go.opentelemetry.io/otel/trace"
14)
15
16var (
17 // Prometheus metrics
18 sdkCallDuration = promauto.NewHistogramVec(
19 prometheus.HistogramOpts{
20 Name: "cloud_sdk_duration_seconds",
21 Help: "Duration of cloud SDK calls",
22 Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
23 },
24 []string{"provider", "service", "operation", "status"},
25 )
26
27 sdkCallsTotal = promauto.NewCounterVec(
28 prometheus.CounterOpts{
29 Name: "cloud_sdk_calls_total",
30 Help: "Total number of cloud SDK calls",
31 },
32 []string{"provider", "service", "operation", "status"},
33 )
34
35 sdkErrorsTotal = promauto.NewCounterVec(
36 prometheus.CounterOpts{
37 Name: "cloud_sdk_errors_total",
38 Help: "Total number of cloud SDK errors",
39 },
40 []string{"provider", "service", "operation", "error_type"},
41 )
42
43 sdkRetries = promauto.NewCounterVec(
44 prometheus.CounterOpts{
45 Name: "cloud_sdk_retries_total",
46 Help: "Total number of SDK operation retries",
47 },
48 []string{"provider", "service", "operation"},
49 )
50
51 sdkThrottles = promauto.NewCounterVec(
52 prometheus.CounterOpts{
53 Name: "cloud_sdk_throttles_total",
54 Help: "Total number of SDK throttle errors",
55 },
56 []string{"provider", "service", "operation"},
57 )
58)
59
60// InstrumentedS3Client wraps S3 client with observability
61type InstrumentedS3Client struct {
62 client *s3.Client
63 tracer trace.Tracer
64}
65
66func NewInstrumentedS3Client(client *s3.Client) *InstrumentedS3Client {
67 return &InstrumentedS3Client{
68 client: client,
69 tracer: otel.Tracer("aws-s3"),
70 }
71}
72
73// GetObject gets object with full instrumentation
74func GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
75 // Start trace span
76 ctx, span := i.tracer.Start(ctx, "s3.GetObject",
77 trace.WithAttributes(
78 attribute.String("bucket", bucket),
79 attribute.String("key", key),
80 ),
81 )
82 defer span.End()
83
84 // Record metrics
85 start := time.Now()
86 status := "success"
87 var errorType string
88
89 // Execute operation
90 result, err := i.client.GetObject(ctx, &s3.GetObjectInput{
91 Bucket: aws.String(bucket),
92 Key: aws.String(key),
93 })
94
95 duration := time.Since(start).Seconds()
96
97 // Handle errors
98 if err != nil {
99 status = "error"
100 errorType = classifyError(err)
101
102 span.RecordError(err)
103
104 // Track specific error types
105 if isThrottleError(err) {
106 sdkThrottles.WithLabelValues("aws", "s3", "GetObject").Inc()
107 }
108
109 sdkErrorsTotal.WithLabelValues("aws", "s3", "GetObject", errorType).Inc()
110 }
111
112 // Record metrics
113 sdkCallDuration.WithLabelValues("aws", "s3", "GetObject", status).Observe(duration)
114 sdkCallsTotal.WithLabelValues("aws", "s3", "GetObject", status).Inc()
115
116 // Add span attributes
117 span.SetAttributes(
118 attribute.Float64("duration_ms", duration*1000),
119 attribute.String("status", status),
120 )
121
122 if result != nil {
123 span.SetAttributes(
124 attribute.Int64("content_length", result.ContentLength),
125 )
126 }
127
128 return result, err
129}
130
131func classifyError(err error) string {
132 // Classify AWS errors
133 var ae smithy.APIError
134 if errors.As(err, &ae) {
135 return ae.ErrorCode()
136 }
137 return "unknown"
138}
139
140func isThrottleError(err error) bool {
141 var ae smithy.APIError
142 if errors.As(err, &ae) {
143 code := ae.ErrorCode()
144 return code == "ThrottlingException" ||
145 code == "ProvisionedThroughputExceededException" ||
146 code == "RequestLimitExceeded"
147 }
148 return false
149}
CloudWatch/Stackdriver Integration
Custom Metrics Publishing:
1package observability
2
3import (
4 "context"
5 "time"
6
7 "github.com/aws/aws-sdk-go-v2/aws"
8 "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
9 "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
10)
11
12// MetricsPublisher publishes custom metrics
13type MetricsPublisher struct {
14 cwClient *cloudwatch.Client
15 namespace string
16}
17
18func NewMetricsPublisher(client *cloudwatch.Client, namespace string) *MetricsPublisher {
19 return &MetricsPublisher{
20 cwClient: client,
21 namespace: namespace,
22 }
23}
24
25// PublishSDKMetrics publishes SDK performance metrics
26func PublishSDKMetrics(ctx context.Context, service, operation string, duration time.Duration, success bool) error {
27 metricData := []types.MetricDatum{
28 {
29 MetricName: aws.String("SDKCallDuration"),
30 Value: aws.Float64(duration.Seconds()),
31 Unit: types.StandardUnitSeconds,
32 Timestamp: aws.Time(time.Now()),
33 Dimensions: []types.Dimension{
34 {Name: aws.String("Service"), Value: aws.String(service)},
35 {Name: aws.String("Operation"), Value: aws.String(operation)},
36 },
37 },
38 {
39 MetricName: aws.String("SDKCallSuccess"),
40 Value: aws.Float64(boolToFloat(success)),
41 Unit: types.StandardUnitCount,
42 Timestamp: aws.Time(time.Now()),
43 Dimensions: []types.Dimension{
44 {Name: aws.String("Service"), Value: aws.String(service)},
45 {Name: aws.String("Operation"), Value: aws.String(operation)},
46 },
47 },
48 }
49
50 _, err := m.cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{
51 Namespace: aws.String(m.namespace),
52 MetricData: metricData,
53 })
54
55 return err
56}
57
58// PublishBatchMetrics publishes metrics in batch for efficiency
59func PublishBatchMetrics(ctx context.Context, metrics []types.MetricDatum) error {
60 // CloudWatch allows up to 20 metrics per request
61 const batchSize = 20
62
63 for i := 0; i < len(metrics); i += batchSize {
64 end := i + batchSize
65 if end > len(metrics) {
66 end = len(metrics)
67 }
68
69 batch := metrics[i:end]
70 _, err := m.cwClient.PutMetricData(ctx, &cloudwatch.PutMetricDataInput{
71 Namespace: aws.String(m.namespace),
72 MetricData: batch,
73 })
74 if err != nil {
75 return err
76 }
77 }
78
79 return nil
80}
81
82func boolToFloat(b bool) float64 {
83 if b {
84 return 1.0
85 }
86 return 0.0
87}
Disaster Recovery and Failover Strategies
Disaster recovery is like having fire extinguishers, emergency exits, and backup generators. You hope you never need them, but when disaster strikes, they're invaluable.
Multi-Region Failover
Automated Failover Pattern:
1package dr
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/aws/aws-sdk-go-v2/aws"
10 "github.com/aws/aws-sdk-go-v2/service/s3"
11)
12
13// RegionConfig represents a region configuration
14type RegionConfig struct {
15 Region string
16 Endpoint string
17 Priority int
18}
19
20// MultiRegionClient handles multi-region failover
21type MultiRegionClient struct {
22 regions []RegionConfig
23 currentRegion int
24 healthCheck func(context.Context, *s3.Client) error
25 mu sync.RWMutex
26 clients map[string]*s3.Client
27}
28
29func NewMultiRegionClient(regions []RegionConfig, healthCheck func(context.Context, *s3.Client) error) *MultiRegionClient {
30 return &MultiRegionClient{
31 regions: regions,
32 healthCheck: healthCheck,
33 clients: make(map[string]*s3.Client),
34 }
35}
36
37// GetObject retrieves object with automatic failover
38func GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
39 m.mu.RLock()
40 region := m.regions[m.currentRegion]
41 client := m.clients[region.Region]
42 m.mu.RUnlock()
43
44 // Try primary region
45 result, err := client.GetObject(ctx, &s3.GetObjectInput{
46 Bucket: aws.String(bucket),
47 Key: aws.String(key),
48 })
49
50 if err == nil {
51 return result, nil
52 }
53
54 // Failover to next region
55 log.Printf("Primary region %s failed: %v", region.Region, err)
56 return m.failover(ctx, bucket, key)
57}
58
59// failover attempts to use backup regions
60func failover(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
61 m.mu.Lock()
62 defer m.mu.Unlock()
63
64 // Try each region in priority order
65 for i := 1; i < len(m.regions); i++ {
66 nextIdx := (m.currentRegion + i) % len(m.regions)
67 region := m.regions[nextIdx]
68 client := m.clients[region.Region]
69
70 // Health check
71 if err := m.healthCheck(ctx, client); err != nil {
72 log.Printf("Region %s health check failed: %v", region.Region, err)
73 continue
74 }
75
76 // Try operation
77 result, err := client.GetObject(ctx, &s3.GetObjectInput{
78 Bucket: aws.String(bucket),
79 Key: aws.String(key),
80 })
81
82 if err == nil {
83 // Update current region
84 m.currentRegion = nextIdx
85 log.Printf("Failed over to region: %s", region.Region)
86 return result, nil
87 }
88
89 log.Printf("Region %s failed: %v", region.Region, err)
90 }
91
92 return nil, fmt.Errorf("all regions failed")
93}
94
95// MonitorHealth continuously monitors region health
96func MonitorHealth(ctx context.Context, interval time.Duration) {
97 ticker := time.NewTicker(interval)
98 defer ticker.Stop()
99
100 for {
101 select {
102 case <-ctx.Done():
103 return
104 case <-ticker.C:
105 m.checkAllRegions(ctx)
106 }
107 }
108}
109
110func checkAllRegions(ctx context.Context) {
111 for i, region := range m.regions {
112 client := m.clients[region.Region]
113
114 err := m.healthCheck(ctx, client)
115 if err != nil {
116 log.Printf("Region %s unhealthy: %v", region.Region, err)
117
118 // If current region is unhealthy, trigger failover
119 m.mu.RLock()
120 if i == m.currentRegion {
121 m.mu.RUnlock()
122 m.triggerFailover(ctx)
123 continue
124 }
125 m.mu.RUnlock()
126 }
127 }
128}
129
130func triggerFailover(ctx context.Context) {
131 m.mu.Lock()
132 defer m.mu.Unlock()
133
134 // Find next healthy region
135 for i := 1; i < len(m.regions); i++ {
136 nextIdx := (m.currentRegion + i) % len(m.regions)
137 region := m.regions[nextIdx]
138 client := m.clients[region.Region]
139
140 if err := m.healthCheck(ctx, client); err == nil {
141 m.currentRegion = nextIdx
142 log.Printf("Proactively failed over to region: %s", region.Region)
143 return
144 }
145 }
146}
Cross-Cloud Backup Strategy
Automated Cross-Cloud Backup:
1package dr
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8)
9
10// BackupManager manages cross-cloud backups
11type BackupManager struct {
12 primary Storage
13 backup Storage
14 scheduler *time.Ticker
15}
16
17func NewBackupManager(primary, backup Storage, interval time.Duration) *BackupManager {
18 return &BackupManager{
19 primary: primary,
20 backup: backup,
21 scheduler: time.NewTicker(interval),
22 }
23}
24
25// StartContinuousBackup starts continuous backup process
26func StartContinuousBackup(ctx context.Context) error {
27 for {
28 select {
29 case <-ctx.Done():
30 return ctx.Err()
31 case <-bm.scheduler.C:
32 if err := bm.performBackup(ctx); err != nil {
33 log.Printf("Backup failed: %v", err)
34 }
35 }
36 }
37}
38
39// performBackup backs up data from primary to backup storage
40func performBackup(ctx context.Context) error {
41 // List all files in primary
42 files, err := bm.primary.List(ctx, "")
43 if err != nil {
44 return fmt.Errorf("failed to list files: %w", err)
45 }
46
47 log.Printf("Starting backup of %d files", len(files))
48
49 for _, file := range files {
50 // Download from primary
51 data, err := bm.primary.Download(ctx, file)
52 if err != nil {
53 log.Printf("Failed to download %s: %v", file, err)
54 continue
55 }
56
57 // Upload to backup
58 reader := bytes.NewReader(data)
59 if err := bm.backup.Upload(ctx, file, reader); err != nil {
60 log.Printf("Failed to backup %s: %v", file, err)
61 continue
62 }
63
64 log.Printf("Backed up: %s", file)
65 }
66
67 return nil
68}
69
70// VerifyBackup verifies backup integrity
71func VerifyBackup(ctx context.Context, sampleSize int) error {
72 // Get sample of files
73 files, err := bm.primary.List(ctx, "")
74 if err != nil {
75 return err
76 }
77
78 // Verify random sample
79 for i := 0; i < sampleSize && i < len(files); i++ {
80 file := files[i]
81
82 // Download from both storages
83 primaryData, err := bm.primary.Download(ctx, file)
84 if err != nil {
85 return fmt.Errorf("primary download failed: %w", err)
86 }
87
88 backupData, err := bm.backup.Download(ctx, file)
89 if err != nil {
90 return fmt.Errorf("backup download failed: %w", err)
91 }
92
93 // Compare checksums
94 if !bytes.Equal(primaryData, backupData) {
95 return fmt.Errorf("data mismatch for %s", file)
96 }
97 }
98
99 log.Printf("Backup verification successful for %d files", sampleSize)
100 return nil
101}
102
103// RestoreFromBackup restores data from backup to primary
104func RestoreFromBackup(ctx context.Context, prefix string) error {
105 files, err := bm.backup.List(ctx, prefix)
106 if err != nil {
107 return err
108 }
109
110 log.Printf("Restoring %d files from backup", len(files))
111
112 for _, file := range files {
113 data, err := bm.backup.Download(ctx, file)
114 if err != nil {
115 return err
116 }
117
118 reader := bytes.NewReader(data)
119 if err := bm.primary.Upload(ctx, file, reader); err != nil {
120 return err
121 }
122
123 log.Printf("Restored: %s", file)
124 }
125
126 return nil
127}
RTO and RPO Targets:
- Recovery Time Objective (RTO): Maximum acceptable downtime
- Multi-region failover: < 1 minute
- Cross-cloud restore: < 15 minutes
- Recovery Point Objective (RPO): Maximum acceptable data loss
- Continuous replication: < 1 second
- Hourly backups: < 1 hour
- Daily backups: < 24 hours
💡 Key Takeaway: Disaster recovery isn't just about having backups—it's about regularly testing your recovery procedures. Schedule disaster recovery drills quarterly to ensure your team knows how to execute the plan under pressure.
Further Reading
- AWS SDK for Go v2
- Google Cloud Go Libraries
- Azure SDK for Go
- AWS Well-Architected Framework
- Google Cloud Architecture Center
- Azure Architecture Center
Practice Exercises
Exercise 1: Multi-Cloud File Manager
Learning Objective: Master multi-cloud storage abstraction and understand how to build resilient systems that can work across different cloud providers. This exercise teaches you to create a unified interface that works seamlessly across AWS S3, Google Cloud Storage, and Azure Blob Storage while implementing consistent patterns for file operations.
Real-World Context: Multi-cloud strategies are essential for enterprise applications that require high availability and vendor diversification. Companies like Netflix and Spotify use multi-cloud storage to avoid vendor lock-in and ensure their services remain available even when an entire cloud region experiences an outage. This pattern is particularly valuable for disaster recovery, cost optimization, and compliance requirements.
Difficulty: Intermediate | Time Estimate: 45-60 minutes
Objective: Build a file manager that works with AWS S3, Google Cloud Storage, and Azure Blob Storage using a unified interface.
Solution with Explanation
1package main
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log"
8 "strings"
9 "time"
10)
11
12// Storage interface for cloud-agnostic operations
13type Storage interface {
14 Upload(ctx context.Context, path string, data io.Reader) error
15 Download(ctx context.Context, path string)
16 List(ctx context.Context, prefix string)
17 Delete(ctx context.Context, path string) error
18}
19
20// FileManager provides high-level file operations
21type FileManager struct {
22 primary Storage
23 secondary Storage
24 mirror bool
25}
26
27func NewFileManager(primary, secondary Storage, mirror bool) *FileManager {
28 return &FileManager{
29 primary: primary,
30 secondary: secondary,
31 mirror: mirror,
32 }
33}
34
35// Upload uploads to primary and optionally mirrors to secondary
36func Upload(ctx context.Context, path string, data []byte) error {
37 reader := strings.NewReader(string(data))
38
39 // Upload to primary
40 if err := fm.primary.Upload(ctx, path, reader); err != nil {
41 return fmt.Errorf("primary upload failed: %w", err)
42 }
43
44 // Mirror to secondary if enabled
45 if fm.mirror && fm.secondary != nil {
46 reader.Seek(0, io.SeekStart)
47 if err := fm.secondary.Upload(ctx, path, reader); err != nil {
48 log.Printf("WARNING: secondary upload failed: %v", err)
49 // Don't fail the operation
50 }
51 }
52
53 return nil
54}
55
56// Download tries primary first, falls back to secondary
57func Download(ctx context.Context, path string) {
58 data, err := fm.primary.Download(ctx, path)
59 if err == nil {
60 return data, nil
61 }
62
63 log.Printf("Primary download failed, trying secondary: %v", err)
64
65 if fm.secondary != nil {
66 return fm.secondary.Download(ctx, path)
67 }
68
69 return nil, fmt.Errorf("download failed from all sources: %w", err)
70}
71
72// Sync synchronizes files from primary to secondary
73func Sync(ctx context.Context, prefix string) error {
74 if fm.secondary == nil {
75 return fmt.Errorf("no secondary storage configured")
76 }
77
78 // List files in primary
79 files, err := fm.primary.List(ctx, prefix)
80 if err != nil {
81 return fmt.Errorf("failed to list primary: %w", err)
82 }
83
84 // Copy each file to secondary
85 for _, file := range files {
86 data, err := fm.primary.Download(ctx, file)
87 if err != nil {
88 log.Printf("Failed to download %s: %v", file, err)
89 continue
90 }
91
92 reader := strings.NewReader(string(data))
93 if err := fm.secondary.Upload(ctx, file, reader); err != nil {
94 log.Printf("Failed to upload %s to secondary: %v", file, err)
95 continue
96 }
97
98 log.Printf("Synced: %s", file)
99 }
100
101 return nil
102}
103
104// CleanupOldFiles deletes files older than retention period
105func CleanupOldFiles(ctx context.Context, prefix string, retention time.Duration) error {
106 files, err := fm.primary.List(ctx, prefix)
107 if err != nil {
108 return fmt.Errorf("failed to list files: %w", err)
109 }
110
111 cutoff := time.Now().Add(-retention)
112
113 for _, file := range files {
114 // In production, you'd check file modification time
115 // This is a simplified example
116 log.Printf("Would delete %s if older than %s", file, cutoff)
117 // fm.primary.Delete(ctx, file)
118 }
119
120 return nil
121}
122
123func main() {
124 ctx := context.Background()
125
126 // Initialize storage clients
127 var s3Storage Storage // = NewS3Storage(...)
128 var gcsStorage Storage // = NewGCSStorage(...)
129
130 // Create file manager with S3 primary and GCS secondary
131 fm := NewFileManager(s3Storage, gcsStorage, true)
132
133 // Upload with automatic mirroring
134 data := []byte("Hello, multi-cloud world!")
135 if err := fm.Upload(ctx, "test/hello.txt", data); err != nil {
136 log.Fatal(err)
137 }
138
139 // Download with automatic failover
140 downloaded, err := fm.Download(ctx, "test/hello.txt")
141 if err != nil {
142 log.Fatal(err)
143 }
144
145 fmt.Printf("Downloaded: %s\n", string(downloaded))
146
147 // Sync from primary to secondary
148 if err := fm.Sync(ctx, "test/"); err != nil {
149 log.Printf("Sync failed: %v", err)
150 }
151}
Exercise 2: Event-Driven Processing Pipeline
Learning Objective: Understand event-driven architecture patterns by building a scalable processing pipeline that responds to cloud storage events. This exercise teaches you to implement asynchronous processing workflows using message queues, event sources, and worker pools that can handle high-throughput data processing scenarios.
Real-World Context: Event-driven architectures are the backbone of modern data processing systems at companies like Airbnb, Uber, and Dropbox. When users upload files, these systems automatically trigger workflows for virus scanning, thumbnail generation, metadata extraction, and notifications. This pattern enables horizontal scaling, loose coupling between services, and fault-tolerant processing that can handle millions of events per day.
Difficulty: Intermediate | Time Estimate: 60-75 minutes
Objective: Create an event-driven pipeline using AWS S3 events, SQS, and Lambda-like processing.
Solution
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10)
11
12// S3Event represents an S3 event notification
13type S3Event struct {
14 EventName string `json:"eventName"`
15 Bucket string `json:"bucket"`
16 Key string `json:"key"`
17 Size int64 `json:"size"`
18 Timestamp time.Time `json:"timestamp"`
19}
20
21// ProcessorFunc processes S3 events
22type ProcessorFunc func(context.Context, S3Event) error
23
24// EventPipeline orchestrates event processing
25type EventPipeline struct {
26 sqsClient *SQSClient
27 s3Client *S3Client
28 processors []ProcessorFunc
29 workers int
30}
31
32func NewEventPipeline(sqs *SQSClient, s3 *S3Client, workers int) *EventPipeline {
33 return &EventPipeline{
34 sqsClient: sqs,
35 s3Client: s3,
36 workers: workers,
37 }
38}
39
40// RegisterProcessor adds a processor to the pipeline
41func RegisterProcessor(processor ProcessorFunc) {
42 ep.processors = append(ep.processors, processor)
43}
44
45// Start begins processing events
46func Start(ctx context.Context) error {
47 var wg sync.WaitGroup
48
49 // Start worker goroutines
50 for i := 0; i < ep.workers; i++ {
51 wg.Add(1)
52 go func(workerID int) {
53 defer wg.Done()
54 ep.worker(ctx, workerID)
55 }(i)
56 }
57
58 wg.Wait()
59 return nil
60}
61
62// worker processes messages from SQS
63func worker(ctx context.Context, workerID int) {
64 log.Printf("Worker %d started", workerID)
65
66 for {
67 select {
68 case <-ctx.Done():
69 log.Printf("Worker %d stopping", workerID)
70 return
71 default:
72 messages, err := ep.sqsClient.ReceiveMessages(ctx, 1)
73 if err != nil {
74 log.Printf("Worker %d: receive error: %v", workerID, err)
75 time.Sleep(1 * time.Second)
76 continue
77 }
78
79 for _, msg := range messages {
80 if err := ep.processMessage(ctx, msg); err != nil {
81 log.Printf("Worker %d: process error: %v", workerID, err)
82 continue
83 }
84
85 // Delete message after successful processing
86 if err := ep.sqsClient.DeleteMessage(ctx, *msg.ReceiptHandle); err != nil {
87 log.Printf("Worker %d: delete error: %v", workerID, err)
88 }
89 }
90 }
91 }
92}
93
94// processMessage processes a single SQS message
95func processMessage(ctx context.Context, msg types.Message) error {
96 // Parse S3 event
97 var event S3Event
98 if err := json.Unmarshal([]byte(*msg.Body), &event); err != nil {
99 return fmt.Errorf("failed to unmarshal event: %w", err)
100 }
101
102 log.Printf("Processing: %s/%s", event.Bucket, event.Key, event.Size)
103
104 // Run all processors
105 for i, processor := range ep.processors {
106 if err := processor(ctx, event); err != nil {
107 return fmt.Errorf("processor %d failed: %w", i, err)
108 }
109 }
110
111 return nil
112}
113
114// ImageProcessor validates and processes image files
115func ImageProcessor(ctx context.Context, event S3Event) error {
116 // Skip non-image files
117 if !strings.HasSuffix(event.Key, ".jpg") &&
118 !strings.HasSuffix(event.Key, ".png") {
119 return nil
120 }
121
122 log.Printf("Processing image: %s", event.Key)
123
124 // In production, you would:
125 // 1. Download image from S3
126 // 2. Generate thumbnail
127 // 3. Upload thumbnail to S3
128 // 4. Update database with metadata
129
130 time.Sleep(100 * time.Millisecond) // Simulate processing
131 return nil
132}
133
134// MetadataProcessor extracts and stores file metadata
135func MetadataProcessor(ctx context.Context, event S3Event) error {
136 log.Printf("Extracting metadata: %s", event.Key)
137
138 metadata := map[string]interface{}{
139 "key": event.Key,
140 "size": event.Size,
141 "timestamp": event.Timestamp,
142 "event": event.EventName,
143 }
144
145 // Store in database
146 data, _ := json.Marshal(metadata)
147 log.Printf("Metadata: %s", string(data))
148
149 return nil
150}
151
152// VirusScanProcessor scans files for malware
153func VirusScanProcessor(ctx context.Context, event S3Event) error {
154 // Only scan files smaller than 100MB
155 if event.Size > 100*1024*1024 {
156 log.Printf("Skipping large file: %s", event.Key)
157 return nil
158 }
159
160 log.Printf("Scanning: %s", event.Key)
161
162 // Simulate virus scan
163 time.Sleep(50 * time.Millisecond)
164
165 // Tag file as scanned
166 log.Printf("File clean: %s", event.Key)
167 return nil
168}
169
170// NotificationProcessor sends notifications for important events
171func NotificationProcessor(ctx context.Context, event S3Event) error {
172 // Only notify for large files
173 if event.Size < 10*1024*1024 {
174 return nil
175 }
176
177 log.Printf("NOTIFICATION: Large file uploaded: %s",
178 event.Key, event.Size)
179
180 // Send email, Slack message, etc.
181 return nil
182}
183
184func main() {
185 ctx, cancel := context.WithCancel(context.Background())
186 defer cancel()
187
188 // Initialize clients
189 sqsClient := &SQSClient{} // NewSQSClient(...)
190 s3Client := &S3Client{} // NewS3Client(...)
191
192 // Create pipeline
193 pipeline := NewEventPipeline(sqsClient, s3Client, 5)
194
195 // Register processors
196 pipeline.RegisterProcessor(ImageProcessor)
197 pipeline.RegisterProcessor(MetadataProcessor)
198 pipeline.RegisterProcessor(VirusScanProcessor)
199 pipeline.RegisterProcessor(NotificationProcessor)
200
201 // Start processing
202 log.Println("Starting event pipeline...")
203 if err := pipeline.Start(ctx); err != nil {
204 log.Fatal(err)
205 }
206}
Explanation: This exercise demonstrates building an event-driven pipeline using multiple AWS services. The pipeline consumes S3 events from SQS, processes them through multiple processors, and handles errors gracefully. Key patterns include concurrent processing with workers, retry logic, and processor chaining for complex workflows.
Exercise 3: S3 File Manager with Metadata
Learning Objective: Master advanced S3 operations including custom metadata management, multipart uploads, and search functionality. This exercise teaches you to build a production-ready file management system that leverages S3's metadata capabilities for organizing and retrieving files efficiently.
Real-World Context: Content management systems and document platforms like Box, Dropbox, and Google Drive rely heavily on metadata-driven file organization. Metadata enables powerful search capabilities, automatic file categorization, access control, and workflow automation. Understanding how to effectively use S3 metadata is crucial for building scalable document management, media libraries, and data lakes that can handle millions of files with efficient retrieval patterns.
Difficulty: Intermediate | Time Estimate: 50-65 minutes
Objective: Build an S3 file manager that handles uploads, downloads, and custom metadata management with search capabilities.
Solution with Explanation
1// run
2package main
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "strings"
10 "time"
11)
12
13// FileManager manages S3 files with metadata
14type FileManager struct {
15 bucket string
16 // client *s3.Client // Simulated for demonstration
17}
18
19type FileMetadata struct {
20 Key string `json:"key"`
21 Size int64 `json:"size"`
22 ContentType string `json:"content_type"`
23 Tags map[string]string `json:"tags"`
24 UploadedAt time.Time `json:"uploaded_at"`
25 UploadedBy string `json:"uploaded_by"`
26}
27
28func NewFileManager(bucket string) *FileManager {
29 return &FileManager{bucket: bucket}
30}
31
32// Upload simulates uploading file with metadata
33func Upload(ctx context.Context, key string, size int64, contentType string, tags map[string]string, userID string) {
34 metadata := &FileMetadata{
35 Key: key,
36 Size: size,
37 ContentType: contentType,
38 Tags: tags,
39 UploadedAt: time.Now(),
40 UploadedBy: userID,
41 }
42
43 // In production:
44 // 1. Upload file to S3 using PutObject
45 // 2. Set object metadata and tags
46 // 3. Store metadata in DynamoDB for fast querying
47
48 fmt.Printf("Uploaded: %s by %s\n", key, size, userID)
49 fmt.Printf("Tags: %v\n", tags)
50
51 return metadata, nil
52}
53
54// SearchByTag searches files by tag
55func SearchByTag(ctx context.Context, tagKey, tagValue string) {
56 // In production:
57 // 1. Query DynamoDB index on tag
58 // 2. Or use S3 Select with metadata
59
60 // Simulated results
61 results := []*FileMetadata{
62 {
63 Key: "documents/report-2024.pdf",
64 Size: 1024000,
65 ContentType: "application/pdf",
66 Tags: map[string]string{"department": "finance", "year": "2024"},
67 UploadedAt: time.Now().Add(-24 * time.Hour),
68 UploadedBy: "user-123",
69 },
70 }
71
72 fmt.Printf("Found %d files with tag %s=%s\n", len(results), tagKey, tagValue)
73 return results, nil
74}
75
76// UpdateTags updates file tags
77func UpdateTags(ctx context.Context, key string, tags map[string]string) error {
78 // In production:
79 // Use PutObjectTagging API
80
81 fmt.Printf("Updated tags for %s: %v\n", key, tags)
82 return nil
83}
84
85// GenerateDownloadURL generates presigned download URL
86func GenerateDownloadURL(ctx context.Context, key string, expiry time.Duration) {
87 // In production:
88 // Use s3.NewPresignClient and PresignGetObject
89
90 url := fmt.Sprintf("https://%s.s3.amazonaws.com/%s?X-Amz-Expires=%d", fm.bucket, key, int(expiry.Seconds()))
91 fmt.Printf("Generated download URL: %s\n", expiry, url)
92 return url, nil
93}
94
95// ListByPrefix lists files by prefix with pagination
96func ListByPrefix(ctx context.Context, prefix string, maxResults int) {
97 // In production:
98 // Use ListObjectsV2Paginator
99
100 files := []*FileMetadata{
101 {
102 Key: prefix + "file1.txt",
103 Size: 1024,
104 ContentType: "text/plain",
105 Tags: map[string]string{"category": "documents"},
106 UploadedAt: time.Now(),
107 UploadedBy: "user-456",
108 },
109 {
110 Key: prefix + "file2.txt",
111 Size: 2048,
112 ContentType: "text/plain",
113 Tags: map[string]string{"category": "documents"},
114 UploadedAt: time.Now(),
115 UploadedBy: "user-456",
116 },
117 }
118
119 fmt.Printf("Listed %d files with prefix '%s'\n", len(files), prefix)
120 return files, nil
121}
122
123func main() {
124 ctx := context.Background()
125 fm := NewFileManager("my-files-bucket")
126
127 // Upload file with tags
128 metadata, _ := fm.Upload(ctx, "reports/quarterly-2024.pdf", 5242880,
129 "application/pdf",
130 map[string]string{
131 "department": "finance",
132 "year": "2024",
133 "quarter": "Q1",
134 },
135 "user-123")
136
137 // Search by tag
138 results, _ := fm.SearchByTag(ctx, "department", "finance")
139 fmt.Printf("\nSearch results: %d files\n", len(results))
140
141 // Update tags
142 fm.UpdateTags(ctx, metadata.Key, map[string]string{
143 "department": "finance",
144 "year": "2024",
145 "quarter": "Q1",
146 "reviewed": "true",
147 })
148
149 // Generate download URL
150 url, _ := fm.GenerateDownloadURL(ctx, metadata.Key, 1*time.Hour)
151
152 // List files by prefix
153 files, _ := fm.ListByPrefix(ctx, "reports/", 10)
154 for _, f := range files {
155 fmt.Printf(" - %s\n", f.Key, f.Size)
156 }
157}
Explanation: This exercise shows S3 file management with rich metadata. It demonstrates uploading files with custom tags, searching by tags, updating metadata, generating presigned URLs for downloads, and listing files with pagination. In production, this pattern combines S3 for storage with DynamoDB for fast metadata queries.
Exercise 4: DynamoDB CRUD Application
Learning Objective: Master DynamoDB programming patterns including expression builders, conditional writes, transactions, and efficient query design. This exercise teaches you to build a complete data access layer that handles complex operations while maintaining performance and consistency in a NoSQL environment.
Real-World Context: DynamoDB powers some of the largest applications in the world including Netflix's recommendation engine, Airbnb's booking system, and Lyft's ride matching platform. Understanding DynamoDB's unique approach to data modeling, consistency models, and performance optimization is essential for building scalable applications that can handle millions of requests per day with single-digit millisecond latency.
Difficulty: Advanced | Time Estimate: 70-90 minutes
Objective: Build a complete CRUD application using DynamoDB with expression builders, transactions, and conditional writes.
Solution with Explanation
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "log"
8 "time"
9)
10
11// User represents a user document
12type User struct {
13 ID string `json:"id"`
14 Email string `json:"email"`
15 Name string `json:"name"`
16 Role string `json:"role"`
17 Active bool `json:"active"`
18 CreatedAt time.Time `json:"created_at"`
19 UpdatedAt time.Time `json:"updated_at"`
20 Version int `json:"version"` // For optimistic locking
21}
22
23// UserRepository handles DynamoDB operations
24type UserRepository struct {
25 tableName string
26 // client *dynamodb.Client // Simulated for demonstration
27}
28
29func NewUserRepository(tableName string) *UserRepository {
30 return &UserRepository{tableName: tableName}
31}
32
33// Create creates a new user with conditional write
34func Create(ctx context.Context, user *User) error {
35 now := time.Now()
36 user.CreatedAt = now
37 user.UpdatedAt = now
38 user.Version = 1
39
40 // In production:
41 // 1. Marshal user to attribute map
42 // 2. Use PutItem with condition: attribute_not_exists(id)
43 // 3. Handle ConditionalCheckFailedException
44
45 fmt.Printf("Created user: %s\n", user.Name, user.Email)
46 return nil
47}
48
49// Get retrieves a user by ID
50func Get(ctx context.Context, id string) {
51 // In production:
52 // 1. Use GetItem with primary key
53 // 2. Set ConsistentRead for strong consistency
54 // 3. Unmarshal result to User struct
55
56 user := &User{
57 ID: id,
58 Email: "user@example.com",
59 Name: "John Doe",
60 Role: "admin",
61 Active: true,
62 CreatedAt: time.Now().Add(-30 * 24 * time.Hour),
63 UpdatedAt: time.Now(),
64 Version: 1,
65 }
66
67 fmt.Printf("Retrieved user: %s\n", user.Name)
68 return user, nil
69}
70
71// Update updates user with optimistic locking
72func Update(ctx context.Context, user *User) error {
73 // In production:
74 // 1. Build update expression for changed fields
75 // 2. Use UpdateItem with condition: version = current_version
76 // 3. Increment version number
77 // 4. Handle version mismatch
78
79 oldVersion := user.Version
80 user.Version++
81 user.UpdatedAt = time.Now()
82
83 fmt.Printf("Updated user %s\n", user.ID, oldVersion, user.Version)
84 return nil
85}
86
87// UpdateEmail updates just the email using expression builder
88func UpdateEmail(ctx context.Context, id, newEmail string) error {
89 // In production:
90 // Use expression.Set() to build:
91 // SET email = :email, updated_at = :now, version = version + 1
92
93 fmt.Printf("Updated email for user %s to %s\n", id, newEmail)
94 return nil
95}
96
97// Delete deletes a user
98func Delete(ctx context.Context, id string) error {
99 // In production:
100 // Option 1: Soft delete - set active = false
101 // Option 2: Hard delete - use DeleteItem
102
103 fmt.Printf("Deleted user %s\n", id)
104 return nil
105}
106
107// Query queries users by role
108func QueryByRole(ctx context.Context, role string) {
109 // In production:
110 // 1. Use Query on GSI
111 // 2. Build key condition expression
112 // 3. Optionally add filter expression
113 // 4. Handle pagination with ExclusiveStartKey
114
115 users := []*User{
116 {
117 ID: "user-1",
118 Name: "Alice",
119 Email: "alice@example.com",
120 Role: role,
121 Active: true,
122 },
123 {
124 ID: "user-2",
125 Name: "Bob",
126 Email: "bob@example.com",
127 Role: role,
128 Active: true,
129 },
130 }
131
132 fmt.Printf("Found %d users with role '%s'\n", len(users), role)
133 return users, nil
134}
135
136// BatchCreate creates multiple users in batch
137func BatchCreate(ctx context.Context, users []*User) error {
138 // In production:
139 // 1. Split into batches of 25
140 // 2. Use BatchWriteItem
141 // 3. Handle unprocessed items
142 // 4. Implement retry logic
143
144 fmt.Printf("Batch created %d users\n", len(users))
145 return nil
146}
147
148// TransactionalUpdate performs transaction across multiple items
149func TransactionalUpdate(ctx context.Context, fromUserID, toUserID string, amount int) error {
150 // In production:
151 // Use TransactWriteItems to:
152 // 1. Decrement balance from source user
153 // 2. Increment balance for destination user
154 // 3. All-or-nothing guarantee
155
156 fmt.Printf("Transaction: Transfer %d from %s to %s\n", amount, fromUserID, toUserID)
157 return nil
158}
159
160func main() {
161 ctx := context.Background()
162 repo := NewUserRepository("users")
163
164 // Create user
165 newUser := &User{
166 ID: "user-123",
167 Email: "john@example.com",
168 Name: "John Doe",
169 Role: "admin",
170 Active: true,
171 }
172 repo.Create(ctx, newUser)
173
174 // Get user
175 user, _ := repo.Get(ctx, "user-123")
176 fmt.Printf("User: %+v\n", user)
177
178 // Update user
179 user.Name = "John Smith"
180 repo.Update(ctx, user)
181
182 // Update specific field
183 repo.UpdateEmail(ctx, "user-123", "john.smith@example.com")
184
185 // Query by role
186 admins, _ := repo.QueryByRole(ctx, "admin")
187 fmt.Printf("Found %d admins\n", len(admins))
188
189 // Batch create
190 users := []*User{
191 {ID: "user-201", Email: "user1@example.com", Name: "User 1", Role: "user"},
192 {ID: "user-202", Email: "user2@example.com", Name: "User 2", Role: "user"},
193 {ID: "user-203", Email: "user3@example.com", Name: "User 3", Role: "user"},
194 }
195 repo.BatchCreate(ctx, users)
196
197 // Transactional update
198 repo.TransactionalUpdate(ctx, "user-123", "user-456", 100)
199
200 // Soft delete
201 repo.Delete(ctx, "user-123")
202}
Explanation: This exercise demonstrates complete DynamoDB CRUD operations with production patterns. It includes conditional writes for data integrity, optimistic locking using version numbers, expression builders for flexible updates, querying with GSI, batch operations for efficiency, and transactions for atomic multi-item updates. These patterns ensure data consistency and optimal performance.
Exercise 5: SQS Message Processor
Learning Objective: Master enterprise message processing patterns using SQS including dead letter queues, message batching, and visibility timeout management. This exercise teaches you to build robust, fault-tolerant message processing systems that can handle high-throughput workloads while preventing message loss.
Real-World Context: Message queues are the backbone of microservices architectures at companies like Amazon, Uber, and Netflix. SQS processes billions of messages daily for critical workflows like order processing, inventory updates, and user notifications. Understanding how to implement reliable message processing with proper error handling, retry logic, and monitoring is essential for building distributed systems that can scale and maintain data consistency.
Difficulty: Advanced | Time Estimate: 65-80 minutes
Objective: Build a reliable SQS message processor with dead letter queues, batching, and visibility timeout management.
Solution with Explanation
1// run
2package main
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "sync"
10 "time"
11)
12
13// Message represents an SQS message
14type Message struct {
15 ID string `json:"id"`
16 Type string `json:"type"`
17 Body map[string]interface{} `json:"body"`
18 ReceiptHandle string `json:"-"` // Not serialized
19 RetryCount int `json:"retry_count"`
20}
21
22// MessageProcessor processes SQS messages
23type MessageProcessor struct {
24 queueURL string
25 dlqURL string
26 maxRetries int
27 workers int
28 // client *sqs.Client // Simulated for demonstration
29}
30
31func NewMessageProcessor(queueURL, dlqURL string, workers int) *MessageProcessor {
32 return &MessageProcessor{
33 queueURL: queueURL,
34 dlqURL: dlqURL,
35 maxRetries: 3,
36 workers: workers,
37 }
38}
39
40// Start starts message processing with multiple workers
41func Start(ctx context.Context) error {
42 var wg sync.WaitGroup
43
44 for i := 0; i < mp.workers; i++ {
45 wg.Add(1)
46 go func(workerID int) {
47 defer wg.Done()
48 mp.worker(ctx, workerID)
49 }(i)
50 }
51
52 wg.Wait()
53 return nil
54}
55
56// worker processes messages in a loop
57func worker(ctx context.Context, workerID int) {
58 fmt.Printf("Worker %d started\n", workerID)
59
60 for {
61 select {
62 case <-ctx.Done():
63 fmt.Printf("Worker %d stopping\n", workerID)
64 return
65 default:
66 // In production:
67 // Use ReceiveMessage with:
68 // - MaxNumberOfMessages: 10
69 // - WaitTimeSeconds: 20
70 // - VisibilityTimeout: 30
71 messages := mp.receiveMessages(ctx, 10)
72
73 for _, msg := range messages {
74 mp.processMessage(ctx, workerID, msg)
75 }
76 }
77 }
78}
79
80// receiveMessages simulates receiving messages from SQS
81func receiveMessages(ctx context.Context, maxMessages int) []*Message {
82 // Simulated messages
83 time.Sleep(100 * time.Millisecond) // Simulate long polling
84
85 return []*Message{
86 {
87 ID: "msg-001",
88 Type: "order.created",
89 Body: map[string]interface{}{"order_id": "12345", "amount": 99.99},
90 ReceiptHandle: "receipt-001",
91 RetryCount: 0,
92 },
93 }
94}
95
96// processMessage processes a single message
97func processMessage(ctx context.Context, workerID int, msg *Message) {
98 fmt.Printf("Worker %d processing message: %s\n", workerID, msg.ID, msg.Type)
99
100 // Extend visibility timeout for long-running tasks
101 go mp.extendVisibilityTimeout(ctx, msg)
102
103 // Process message
104 err := mp.handleMessage(ctx, msg)
105 if err != nil {
106 fmt.Printf("Worker %d: Processing failed: %v\n", workerID, err)
107 mp.handleFailure(ctx, msg, err)
108 return
109 }
110
111 // Delete message after successful processing
112 mp.deleteMessage(ctx, msg)
113 fmt.Printf("Worker %d: Message %s processed successfully\n", workerID, msg.ID)
114}
115
116// handleMessage handles message based on type
117func handleMessage(ctx context.Context, msg *Message) error {
118 switch msg.Type {
119 case "order.created":
120 return mp.processOrder(ctx, msg.Body)
121 case "user.registered":
122 return mp.processUserRegistration(ctx, msg.Body)
123 case "payment.completed":
124 return mp.processPayment(ctx, msg.Body)
125 default:
126 return fmt.Errorf("unknown message type: %s", msg.Type)
127 }
128}
129
130func processOrder(ctx context.Context, body map[string]interface{}) error {
131 orderID := body["order_id"]
132 fmt.Printf("Processing order: %v\n", orderID)
133 time.Sleep(50 * time.Millisecond) // Simulate processing
134 return nil
135}
136
137func processUserRegistration(ctx context.Context, body map[string]interface{}) error {
138 fmt.Println("Processing user registration")
139 return nil
140}
141
142func processPayment(ctx context.Context, body map[string]interface{}) error {
143 fmt.Println("Processing payment")
144 return nil
145}
146
147// extendVisibilityTimeout extends message visibility for long tasks
148func extendVisibilityTimeout(ctx context.Context, msg *Message) {
149 ticker := time.NewTicker(20 * time.Second)
150 defer ticker.Stop()
151
152 for {
153 select {
154 case <-ctx.Done():
155 return
156 case <-ticker.C:
157 // In production:
158 // Use ChangeMessageVisibility to extend timeout
159 fmt.Printf("Extending visibility timeout for message %s\n", msg.ID)
160 }
161 }
162}
163
164// handleFailure handles message processing failure
165func handleFailure(ctx context.Context, msg *Message, err error) {
166 msg.RetryCount++
167
168 if msg.RetryCount >= mp.maxRetries {
169 // Send to dead letter queue
170 fmt.Printf("Max retries exceeded for message %s, sending to DLQ\n", msg.ID)
171 mp.sendToDLQ(ctx, msg, err)
172 mp.deleteMessage(ctx, msg)
173 } else {
174 // Return to queue with delay
175 fmt.Printf("Retry %d/%d for message %s\n", msg.RetryCount, mp.maxRetries, msg.ID)
176 // Message will become visible again after visibility timeout expires
177 }
178}
179
180// sendToDLQ sends failed message to dead letter queue
181func sendToDLQ(ctx context.Context, msg *Message, err error) {
182 // In production:
183 // 1. Add failure metadata to message attributes
184 // 2. Send to DLQ using SendMessage
185 // 3. Include original error and retry count
186
187 dlqMessage := map[string]interface{}{
188 "original_message": msg,
189 "error": err.Error(),
190 "failed_at": time.Now(),
191 "retry_count": msg.RetryCount,
192 }
193
194 data, _ := json.MarshalIndent(dlqMessage, "", " ")
195 fmt.Printf("DLQ Message:\n%s\n", string(data))
196}
197
198// deleteMessage deletes processed message
199func deleteMessage(ctx context.Context, msg *Message) {
200 // In production:
201 // Use DeleteMessage with ReceiptHandle
202 fmt.Printf("Deleted message: %s\n", msg.ID)
203}
204
205// BatchDelete deletes multiple messages efficiently
206func BatchDelete(ctx context.Context, messages []*Message) error {
207 // In production:
208 // Use DeleteMessageBatch
209 fmt.Printf("Batch deleted %d messages\n", len(messages))
210 return nil
211}
212
213func main() {
214 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
215 defer cancel()
216
217 processor := NewMessageProcessor(
218 "https://sqs.us-east-1.amazonaws.com/123456/my-queue",
219 "https://sqs.us-east-1.amazonaws.com/123456/my-dlq",
220 3, // 3 workers
221 )
222
223 fmt.Println("Starting SQS message processor...")
224 if err := processor.Start(ctx); err != nil {
225 log.Fatal(err)
226 }
227
228 fmt.Println("Message processor stopped")
229}
Explanation: This exercise demonstrates production-ready SQS message processing with multiple workers for parallel processing, long polling for efficiency, visibility timeout extension for long-running tasks, retry logic with configurable limits, dead letter queue for failed messages, and batch operations for optimal throughput. Key patterns include graceful shutdown, error handling, and message lifecycle management.
Summary
Cloud SDK integration enables Go applications to leverage managed cloud services effectively.
Key Takeaways
- SDK Selection: AWS SDK v2 is modular, Google Cloud is idiomatic, Azure uses Track 2
- Authentication: Use credential chains for flexibility
- Error Handling: Implement retries, circuit breakers, and proper error classification
- Abstraction: Create cloud-agnostic interfaces for portability
- Performance: Reuse clients, implement connection pooling, use batch operations
- Monitoring: Instrument SDK calls with metrics and distributed tracing
- Cost Optimization: Cache responses, batch operations, use appropriate storage classes
Best Practices
- Always use context for cancellation and timeouts
- Reuse SDK clients across requests
- Implement exponential backoff for retries
- Use credential chains instead of hardcoded credentials
- Monitor SDK call metrics for performance and cost
- Implement graceful degradation with fallbacks
- Test with cloud emulators
- Use batch operations to reduce API calls and costs