Consider running a restaurant kitchen. Instead of hiring full-time chefs who sit around when there are no customers, you hire freelance chefs who appear instantly when orders come in, cook the food, and disappear when done. You only pay for the actual cooking time! This is exactly how serverless computing works - your code appears on demand, executes, and you only pay for the exact execution time.
Serverless computing allows you to run code without managing servers. This article covers deploying Go applications as serverless functions on AWS Lambda, Knative, and other platforms with production-ready patterns.
Introduction to Serverless
Serverless computing provides automatic scaling, pay-per-use pricing, and zero server management. It represents the highest level of abstraction in cloud-native computing - you write code, the platform handles everything else.
💡 Key Takeaway: Serverless isn't actually "no servers" - it's "no server management." The cloud provider handles all infrastructure, patching, scaling, and availability. You focus purely on business logic.
Why Serverless in Cloud-Native
Traditional Cloud-Native:
You manage: Container images, deployments, replicas, resource limits, autoscaling policies
Platform manages: Hardware, networking, orchestration
Serverless:
You manage: Just your code
Platform manages: Everything else - scaling, availability, infrastructure, runtime
When Serverless Makes Sense:
- Event-driven workloads: Respond to S3 uploads, database changes, API requests
- Variable load: Traffic spikes from 0 to 1000 requests/sec unpredictably
- Cost optimization: Workloads that run infrequently
- Fast iteration: Ship features without infrastructure complexity
Real-World Example: A photo processing service that generates thumbnails when users upload images. With serverless, you pay nothing when no photos are being uploaded, but can instantly handle thousands of concurrent uploads during peak hours.
Cloud-Native Serverless Benefits:
- Automatic scaling from zero: Scale down to zero during idle periods, scale up to handle millions of requests
- Built-in fault tolerance: Platform automatically retries, handles failures, distributes across AZs
- Focus on business logic: No Dockerfiles, Kubernetes manifests, or infrastructure code needed
- Regional availability: Deploy globally with a single command
⚠️ Important: While serverless scales automatically, you still need to design for concurrency. Shared resources like databases must handle concurrent access from multiple function instances.
Serverless Benefits
Cost Efficiency:
- Pay only for execution time
- No idle server costs - $0 when not running
- Automatic scaling - no over-provisioning needed
- Example: API that runs 1 hour/day costs ~$0.20/month vs ~$30/month for smallest container
Operational Simplicity:
- No server management - no SSH, no patching, no capacity planning
- Automatic patching - runtime updates handled by platform
- Built-in high availability - runs across multiple AZs automatically
- No Dockerfiles or Kubernetes manifests to maintain
Rapid Development:
- Focus on business logic - write pure Go functions
- Fast deployment - seconds from code to production
- Easy integration - native integrations with cloud services
- Instant rollback - deploy new versions instantly, rollback just as fast
Go for Serverless
Think of Go as a high-performance race car for serverless functions. While other languages are like family sedans that take time to warm up, Go starts instantly and runs efficiently at high speeds.
1// Why Go is ideal for serverless
2
3/*
4Advantages:
51. Fast cold starts
62. Small binary size
73. Low memory footprint
84. Built-in concurrency
95. Native compilation
106. Strong typing
11
12Challenges:
131. Larger binaries than interpreted languages
142. No dynamic loading
153. Must compile for Linux AMD64/ARM64
16*/
Real-World Performance: A Go Lambda function processing images typically completes in 200-500ms with 128MB memory, while a Python equivalent might need 512MB and take 1-2 seconds for the same task.
Serverless Platforms
Choosing a serverless platform is like choosing a delivery service for your packages. Each has different strengths, coverage areas, and pricing models.
1// Platform comparison
2
3/*
4AWS Lambda:
5- Most mature
6- Broad service integration
7- Multiple runtime options
8- Complex pricing
9
10Google Cloud Functions:
11- Simple interface
12- Good for GCP integration
13- Limited runtime options
14
15Azure Functions:
16- Microsoft ecosystem
17- Good Windows support
18- Hybrid cloud scenarios
19
20Knative:
21- Kubernetes-native
22- Portable across clouds
23- More control
24- Self-hosted option
25
26OpenFaaS:
27- Self-hosted
28- Docker-based
29- Kubernetes/Docker Swarm
30- Full control
31*/
When to use AWS Lambda vs Knative:
- AWS Lambda: Best for AWS-centric workloads, rapid prototyping, and when you want zero infrastructure management
- Knative: Best for multi-cloud strategies, custom runtime requirements, and when you need more control over the execution environment
AWS Lambda with Go
AWS Lambda is the most popular serverless platform, and Go's performance characteristics make it an excellent match for Lambda's execution model.
Basic Lambda Function
Let's start with a simple "Hello World" Lambda function. This is the serverless equivalent of a traditional web API endpoint - it receives a request, processes it, and returns a response.
1// main.go
2package main
3
4import (
5 "context"
6 "fmt"
7
8 "github.com/aws/aws-lambda-go/lambda"
9)
10
11type Request struct {
12 Name string `json:"name"`
13}
14
15type Response struct {
16 Message string `json:"message"`
17}
18
19func handler(ctx context.Context, req Request) {
20 message := fmt.Sprintf("Hello, %s!", req.Name)
21 return Response{Message: message}, nil
22}
23
24func main() {
25 lambda.Start(handler)
26}
Building and Deploying:
1# Build for Linux
2GOOS=linux GOARCH=amd64 go build -o bootstrap main.go
3
4# Create deployment package
5zip function.zip bootstrap
6
7# Deploy with AWS CLI
8aws lambda create-function \
9 --function-name hello-go \
10 --runtime provided.al2 \
11 --handler bootstrap \
12 --zip-file fileb://function.zip \
13 --role arn:aws:iam::ACCOUNT_ID:role/lambda-role
💡 Key Takeaway: The bootstrap filename is required for Go Lambda functions using the provided.al2 runtime. AWS looks for this executable to run when your function is invoked.
Lambda with Dependencies
Real-world Lambda functions almost always need to interact with other AWS services. Here's where Go's concurrency and performance really shine compared to interpreted languages.
1package main
2
3import (
4 "context"
5 "encoding/json"
6
7 "github.com/aws/aws-lambda-go/lambda"
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/session"
10 "github.com/aws/aws-sdk-go/service/dynamodb"
11)
12
13type User struct {
14 ID string `json:"id"`
15 Name string `json:"name"`
16 Email string `json:"email"`
17}
18
19var (
20 sess *session.Session
21 db *dynamodb.DynamoDB
22)
23
24func init() {
25 // Initialize AWS session once
26 sess = session.Must(session.NewSession())
27 db = dynamodb.New(sess)
28}
29
30func handler(ctx context.Context, event map[string]interface{}) {
31 userID := event["userId"].(string)
32
33 // Query DynamoDB
34 result, err := db.GetItemWithContext(ctx, &dynamodb.GetItemInput{
35 TableName: aws.String("Users"),
36 Key: map[string]*dynamodb.AttributeValue{
37 "ID": {
38 S: aws.String(userID),
39 },
40 },
41 })
42 if err != nil {
43 return User{}, err
44 }
45
46 // Parse result
47 var user User
48 err = dynamodbattribute.UnmarshalMap(result.Item, &user)
49 return user, err
50}
51
52func main() {
53 lambda.Start(handler)
54}
⚠️ Important: Always initialize external clients in the init() function or as global variables. This connection reuse across invocations is crucial for performance - initializing clients on every request would make your function much slower and more expensive.
Error Handling
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/aws/aws-lambda-go/lambda"
9)
10
11type CustomError struct {
12 Code string `json:"code"`
13 Message string `json:"message"`
14}
15
16func Error() string {
17 return fmt.Sprintf("%s: %s", e.Code, e.Message)
18}
19
20func handler(ctx context.Context, event map[string]interface{}) {
21 // Validate input
22 email, ok := event["email"].(string)
23 if !ok || email == "" {
24 return nil, &CustomError{
25 Code: "INVALID_INPUT",
26 Message: "email is required",
27 }
28 }
29
30 // Business logic
31 if err := processEmail(email); err != nil {
32 // Return different error types
33 if errors.Is(err, ErrNotFound) {
34 return nil, &CustomError{
35 Code: "NOT_FOUND",
36 Message: "user not found",
37 }
38 }
39 return nil, &CustomError{
40 Code: "INTERNAL_ERROR",
41 Message: err.Error(),
42 }
43 }
44
45 return map[string]string{"status": "success"}, nil
46}
47
48func main() {
49 lambda.Start(handler)
50}
Environment Variables and Configuration
1package main
2
3import (
4 "context"
5 "os"
6
7 "github.com/aws/aws-lambda-go/lambda"
8)
9
10type Config struct {
11 DatabaseURL string
12 APIKey string
13 Environment string
14}
15
16func loadConfig() *Config {
17 return &Config{
18 DatabaseURL: os.Getenv("DATABASE_URL"),
19 APIKey: os.Getenv("API_KEY"),
20 Environment: os.Getenv("ENVIRONMENT"),
21 }
22}
23
24var config *Config
25
26func init() {
27 config = loadConfig()
28}
29
30func handler(ctx context.Context, event map[string]interface{}) {
31 // Use configuration
32 if config.Environment == "production" {
33 // Production logic
34 }
35
36 return map[string]string{"env": config.Environment}, nil
37}
38
39func main() {
40 lambda.Start(handler)
41}
Lambda Triggers and Events
AWS Lambda supports various event sources.
API Gateway Integration
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7
8 "github.com/aws/aws-lambda-go/events"
9 "github.com/aws/aws-lambda-go/lambda"
10)
11
12type RequestBody struct {
13 Name string `json:"name"`
14 Email string `json:"email"`
15}
16
17func handler(ctx context.Context, request events.APIGatewayProxyRequest) {
18 // Parse request body
19 var body RequestBody
20 if err := json.Unmarshal([]byte(request.Body), &body); err != nil {
21 return events.APIGatewayProxyResponse{
22 StatusCode: http.StatusBadRequest,
23 Body: `{"error": "invalid request body"}`,
24 }, nil
25 }
26
27 // Access path parameters
28 userID := request.PathParameters["id"]
29
30 // Access query parameters
31 filter := request.QueryStringParameters["filter"]
32
33 // Access headers
34 authToken := request.Headers["Authorization"]
35
36 // Business logic
37 result := map[string]interface{}{
38 "userId": userID,
39 "filter": filter,
40 "hasAuth": authToken != "",
41 "body": body,
42 }
43
44 responseBody, _ := json.Marshal(result)
45
46 return events.APIGatewayProxyResponse{
47 StatusCode: http.StatusOK,
48 Headers: map[string]string{
49 "Content-Type": "application/json",
50 },
51 Body: string(responseBody),
52 }, nil
53}
54
55func main() {
56 lambda.Start(handler)
57}
S3 Event Handler
1package main
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/aws/aws-lambda-go/events"
8 "github.com/aws/aws-lambda-go/lambda"
9 "github.com/aws/aws-sdk-go/aws"
10 "github.com/aws/aws-sdk-go/aws/session"
11 "github.com/aws/aws-sdk-go/service/s3"
12)
13
14var s3Client *s3.S3
15
16func init() {
17 sess := session.Must(session.NewSession())
18 s3Client = s3.New(sess)
19}
20
21func handler(ctx context.Context, s3Event events.S3Event) error {
22 for _, record := range s3Event.Records {
23 bucket := record.S3.Bucket.Name
24 key := record.S3.Object.Key
25
26 fmt.Printf("Processing file: s3://%s/%s\n", bucket, key)
27
28 // Get object
29 result, err := s3Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
30 Bucket: aws.String(bucket),
31 Key: aws.String(key),
32 })
33 if err != nil {
34 return err
35 }
36 defer result.Body.Close()
37
38 // Process file
39 if err := processFile(result.Body); err != nil {
40 return err
41 }
42
43 fmt.Printf("Successfully processed: %s\n", key)
44 }
45
46 return nil
47}
48
49func main() {
50 lambda.Start(handler)
51}
DynamoDB Streams
1package main
2
3import (
4 "context"
5 "fmt"
6
7 "github.com/aws/aws-lambda-go/events"
8 "github.com/aws/aws-lambda-go/lambda"
9)
10
11func handler(ctx context.Context, event events.DynamoDBEvent) error {
12 for _, record := range event.Records {
13 fmt.Printf("Event: %s\n", record.EventName)
14
15 switch record.EventName {
16 case "INSERT":
17 handleInsert(record.Change.NewImage)
18 case "MODIFY":
19 handleModify(record.Change.OldImage, record.Change.NewImage)
20 case "REMOVE":
21 handleRemove(record.Change.OldImage)
22 }
23 }
24
25 return nil
26}
27
28func handleInsert(newImage map[string]events.DynamoDBAttributeValue) {
29 // Process new record
30 userID := newImage["ID"].String()
31 fmt.Printf("New user: %s\n", userID)
32}
33
34func handleModify(oldImage, newImage map[string]events.DynamoDBAttributeValue) {
35 // Process update
36 fmt.Println("User updated")
37}
38
39func handleRemove(oldImage map[string]events.DynamoDBAttributeValue) {
40 // Process deletion
41 fmt.Println("User deleted")
42}
43
44func main() {
45 lambda.Start(handler)
46}
SQS Message Processing
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "github.com/aws/aws-lambda-go/events"
9 "github.com/aws/aws-lambda-go/lambda"
10)
11
12type Message struct {
13 Type string `json:"type"`
14 Data map[string]interface{} `json:"data"`
15}
16
17func handler(ctx context.Context, sqsEvent events.SQSEvent) error {
18 for _, record := range sqsEvent.Records {
19 // Parse message
20 var msg Message
21 if err := json.Unmarshal([]byte(record.Body), &msg); err != nil {
22 fmt.Printf("Failed to parse message: %v\n", err)
23 continue
24 }
25
26 // Process based on type
27 if err := processMessage(msg); err != nil {
28 // Return error to retry
29 return err
30 }
31 }
32
33 return nil
34}
35
36func processMessage(msg Message) error {
37 switch msg.Type {
38 case "user_created":
39 return handleUserCreated(msg.Data)
40 case "user_updated":
41 return handleUserUpdated(msg.Data)
42 default:
43 fmt.Printf("Unknown message type: %s\n", msg.Type)
44 }
45 return nil
46}
47
48func main() {
49 lambda.Start(handler)
50}
EventBridge Scheduled Events
1package main
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/aws/aws-lambda-go/events"
9 "github.com/aws/aws-lambda-go/lambda"
10)
11
12func handler(ctx context.Context, event events.CloudWatchEvent) error {
13 fmt.Printf("Scheduled execution at: %s\n", time.Now())
14
15 // Perform scheduled task
16 if err := performCleanup(); err != nil {
17 return err
18 }
19
20 if err := generateReports(); err != nil {
21 return err
22 }
23
24 return nil
25}
26
27func performCleanup() error {
28 // Cleanup logic
29 fmt.Println("Performing cleanup...")
30 return nil
31}
32
33func generateReports() error {
34 // Report generation
35 fmt.Println("Generating reports...")
36 return nil
37}
38
39func main() {
40 lambda.Start(handler)
41}
Cold Start Optimization
Cold starts are like waiting for a delivery truck to warm up its engine before making a delivery. The first request after idle time takes longer while AWS prepares your function. With Go, this "warm-up" time is minimal, but we can optimize it even further.
Minimizing Binary Size
Smaller binaries download faster and use less memory, reducing cold start time and cost.
1# Optimize build
2CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
3 -ldflags="-s -w" \
4 -trimpath \
5 -o bootstrap main.go
6
7# Further compress with UPX
8upx --best --lzma bootstrap
9
10# Result: 30-50% size reduction
Real-World Impact: A 15MB binary vs 30MB can reduce cold start time by ~200ms and monthly costs by $5-10 for high-traffic functions.
💡 Key Takeaway: Use CGO_ENABLED=0 to disable CGO and create a static binary. This eliminates C library dependencies and ensures portability across Lambda environments.
Global Variable Initialization
Think of global variables like pre-loading your delivery truck with all the tools you'll need before starting deliveries. This avoids wasting time loading tools for each delivery.
1package main
2
3import (
4 "context"
5
6 "github.com/aws/aws-lambda-go/lambda"
7 "github.com/aws/aws-sdk-go/aws/session"
8 "github.com/aws/aws-sdk-go/service/dynamodb"
9)
10
11var (
12 // Initialize once, reuse across invocations
13 sess *session.Session
14 db *dynamodb.DynamoDB
15)
16
17func init() {
18 // This runs once per cold start
19 sess = session.Must(session.NewSession())
20 db = dynamodb.New(sess)
21}
22
23func handler(ctx context.Context, event map[string]interface{}) error {
24 // Reuse initialized clients
25 // No cold start penalty on warm invocations
26 result, err := db.GetItem(&dynamodb.GetItemInput{
27 // ...
28 })
29 return err
30}
31
32func main() {
33 lambda.Start(handler)
34}
⚠️ Important: Don't initialize everything in init(). Only initialize what you know will be used in most invocations. For rarely-used clients, consider lazy initialization to keep cold starts fast.
Connection Pooling
1package main
2
3import (
4 "context"
5 "database/sql"
6
7 "github.com/aws/aws-lambda-go/lambda"
8 _ "github.com/lib/pq"
9)
10
11var db *sql.DB
12
13func init() {
14 var err error
15 db, err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
16 if err != nil {
17 panic(err)
18 }
19
20 // Configure connection pool for Lambda
21 db.SetMaxOpenConns(5) // Low for serverless
22 db.SetMaxIdleConns(2)
23 db.SetConnMaxLifetime(5 * time.Minute)
24}
25
26func handler(ctx context.Context, event map[string]interface{}) error {
27 // Use pooled connection
28 rows, err := db.QueryContext(ctx, "SELECT * FROM users")
29 defer rows.Close()
30 return err
31}
32
33func main() {
34 lambda.Start(handler)
35}
Provisioned Concurrency
Provisioned Concurrency is like keeping a fleet of delivery trucks always running and ready, instead of starting them only when orders arrive. No waiting, but higher fuel costs.
1// CloudFormation template
2Resources:
3 MyFunction:
4 Type: AWS::Lambda::Function
5 Properties:
6 FunctionName: my-function
7 Runtime: provided.al2
8 Handler: bootstrap
9 Code:
10 S3Bucket: my-bucket
11 S3Key: function.zip
12
13 MyFunctionAlias:
14 Type: AWS::Lambda::Alias
15 Properties:
16 FunctionName: !Ref MyFunction
17 FunctionVersion: $LATEST
18 Name: live
19 ProvisionedConcurrencyConfig:
20 ProvisionedConcurrentExecutions: 5 # Always warm
21
22/*
23Provisioned Concurrency:
24- Keeps instances warm
25- No cold starts
26- Costs more
27- Use for latency-sensitive APIs
28*/
Real-World Example: An e-commerce checkout API used provisioned concurrency with 10 instances to handle payment processing. This eliminated cold starts during peak shopping seasons, reducing checkout latency from 800ms to 200ms, but increased monthly costs by ~$200.
When to use Provisioned Concurrency:
- APIs with strict latency requirements
- Functions with unpredictable traffic spikes
- Critical business functions where performance outweighs cost
- Functions with high initialization overhead
When to avoid:
- Background jobs and batch processing
- Functions with consistent, predictable traffic
- Cost-sensitive applications
- Development and testing environments
Lazy Initialization
1package main
2
3import (
4 "context"
5 "sync"
6
7 "github.com/aws/aws-lambda-go/lambda"
8 "github.com/aws/aws-sdk-go/aws/session"
9 "github.com/aws/aws-sdk-go/service/s3"
10)
11
12var (
13 s3Client *s3.S3
14 once sync.Once
15)
16
17func getS3Client() *s3.S3 {
18 once.Do(func() {
19 sess := session.Must(session.NewSession())
20 s3Client = s3.New(sess)
21 })
22 return s3Client
23}
24
25func handler(ctx context.Context, event map[string]interface{}) error {
26 // Initialize only when needed
27 // Reduces cold start if not all code paths need S3
28 client := getS3Client()
29
30 _, err := client.ListBuckets(&s3.ListBucketsInput{})
31 return err
32}
33
34func main() {
35 lambda.Start(handler)
36}
Knative Functions
Knative is a Kubernetes-based serverless platform.
Knative Function
1// main.go
2package main
3
4import (
5 "fmt"
6 "net/http"
7)
8
9func handler(w http.ResponseWriter, r *http.Request) {
10 name := r.URL.Query().Get("name")
11 if name == "" {
12 name = "World"
13 }
14
15 fmt.Fprintf(w, "Hello, %s!\n", name)
16}
17
18func main() {
19 http.HandleFunc("/", handler)
20 http.ListenAndServe(":8080", nil)
21}
Deployment:
1# service.yaml
2apiVersion: serving.knative.dev/v1
3kind: Service
4metadata:
5 name: hello-go
6spec:
7 template:
8 metadata:
9 annotations:
10 autoscaling.knative.dev/minScale: "0"
11 autoscaling.knative.dev/maxScale: "10"
12 spec:
13 containers:
14 - image: myregistry/hello-go:latest
15 ports:
16 - containerPort: 8080
17 env:
18 - name: TARGET
19 value: "Go"
1# Deploy to Knative
2kubectl apply -f service.yaml
3
4# Get URL
5kubectl get ksvc hello-go
Knative with CloudEvents
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net/http"
8
9 cloudevents "github.com/cloudevents/sdk-go/v2"
10)
11
12type UserCreated struct {
13 UserID string `json:"userId"`
14 Email string `json:"email"`
15}
16
17func receiver(ctx context.Context, event cloudevents.Event) error {
18 fmt.Printf("Received event: %s\n", event.Type())
19
20 var data UserCreated
21 if err := event.DataAs(&data); err != nil {
22 return err
23 }
24
25 fmt.Printf("User created: %s\n", data.UserID, data.Email)
26
27 // Process event
28 return processUser(data)
29}
30
31func main() {
32 c, err := cloudevents.NewClientHTTP()
33 if err != nil {
34 log.Fatal(err)
35 }
36
37 if err := c.StartReceiver(context.Background(), receiver); err != nil {
38 log.Fatal(err)
39 }
40}
Knative Autoscaling
1apiVersion: serving.knative.dev/v1
2kind: Service
3metadata:
4 name: autoscale-go
5spec:
6 template:
7 metadata:
8 annotations:
9 # Scale to zero after 30s
10 autoscaling.knative.dev/scale-to-zero-pod-retention-period: "30s"
11 # Target 100 concurrent requests per pod
12 autoscaling.knative.dev/target: "100"
13 # Minimum instances
14 autoscaling.knative.dev/minScale: "1"
15 # Maximum instances
16 autoscaling.knative.dev/maxScale: "100"
17 # Scale down gradually
18 autoscaling.knative.dev/scale-down-delay: "15m"
19 spec:
20 containers:
21 - image: myregistry/autoscale-go:latest
22 resources:
23 limits:
24 cpu: "1000m"
25 memory: "256Mi"
26 requests:
27 cpu: "100m"
28 memory: "64Mi"
OpenFaaS Deployment
OpenFaaS is a self-hosted serverless framework.
OpenFaaS Function
1// handler.go
2package function
3
4import (
5 "encoding/json"
6 "fmt"
7)
8
9type Request struct {
10 Name string `json:"name"`
11}
12
13type Response struct {
14 Message string `json:"message"`
15}
16
17func Handle(req []byte) {
18 var request Request
19 if err := json.Unmarshal(req, &request); err != nil {
20 return nil, err
21 }
22
23 response := Response{
24 Message: fmt.Sprintf("Hello, %s!", request.Name),
25 }
26
27 return json.Marshal(response)
28}
Function Configuration:
1# stack.yml
2provider:
3 name: openfaas
4 gateway: http://localhost:8080
5
6functions:
7 hello-go:
8 lang: go
9 handler: ./hello-go
10 image: myregistry/hello-go:latest
11 environment:
12 write_debug: true
13 limits:
14 memory: "128Mi"
15 cpu: "0.5"
16 requests:
17 memory: "64Mi"
18 cpu: "0.1"
19 labels:
20 com.openfaas.scale.min: "1"
21 com.openfaas.scale.max: "10"
Deployment:
1# Build
2faas-cli build -f stack.yml
3
4# Push to registry
5faas-cli push -f stack.yml
6
7# Deploy
8faas-cli deploy -f stack.yml
9
10# Invoke
11echo '{"name":"OpenFaaS"}' | faas-cli invoke hello-go
OpenFaaS with Secrets
1package function
2
3import (
4 "io/ioutil"
5)
6
7func Handle(req []byte) {
8 // Read secret from mounted file
9 apiKey, err := ioutil.ReadFile("/var/openfaas/secrets/api-key")
10 if err != nil {
11 return nil, err
12 }
13
14 // Use secret
15 // ...
16
17 return []byte("success"), nil
18}
Azure and Google Cloud Functions
Azure Functions
1package main
2
3import (
4 "encoding/json"
5 "fmt"
6 "net/http"
7)
8
9type Request struct {
10 Name string `json:"name"`
11}
12
13type Response struct {
14 Message string `json:"message"`
15}
16
17func handler(w http.ResponseWriter, r *http.Request) {
18 var req Request
19 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
20 http.Error(w, err.Error(), http.StatusBadRequest)
21 return
22 }
23
24 resp := Response{
25 Message: fmt.Sprintf("Hello, %s!", req.Name),
26 }
27
28 w.Header().Set("Content-Type", "application/json")
29 json.NewEncoder(w).Encode(resp)
30}
31
32func main() {
33 http.HandleFunc("/api/hello", handler)
34 http.ListenAndServe(":8080", nil)
35}
Google Cloud Functions
1package hello
2
3import (
4 "encoding/json"
5 "fmt"
6 "net/http"
7)
8
9type Request struct {
10 Name string `json:"name"`
11}
12
13// HelloHTTP is an HTTP Cloud Function
14func HelloHTTP(w http.ResponseWriter, r *http.Request) {
15 var req Request
16 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
17 http.Error(w, "Invalid request", http.StatusBadRequest)
18 return
19 }
20
21 fmt.Fprintf(w, "Hello, %s!", req.Name)
22}
Deployment:
1gcloud functions deploy hello-go \
2 --runtime go121 \
3 --trigger-http \
4 --allow-unauthenticated \
5 --entry-point HelloHTTP
Production Patterns
Function Composition
Function composition is like an assembly line where each worker performs a specific task and passes the result to the next worker. This creates modular, reusable serverless workflows.
1package main
2
3import (
4 "context"
5 "encoding/json"
6
7 "github.com/aws/aws-lambda-go/lambda"
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/session"
10 "github.com/aws/aws-sdk-go/service/lambda as awslambda"
11)
12
13var lambdaClient *awslambda.Lambda
14
15func init() {
16 sess := session.Must(session.NewSession())
17 lambdaClient = awslambda.New(sess)
18}
19
20func handler(ctx context.Context, event map[string]interface{}) {
21 // Step 1: Validate
22 validatePayload, _ := json.Marshal(event)
23 validateResp, err := lambdaClient.InvokeWithContext(ctx, &awslambda.InvokeInput{
24 FunctionName: aws.String("validate-function"),
25 Payload: validatePayload,
26 })
27 if err != nil {
28 return nil, err
29 }
30
31 // Step 2: Process
32 processResp, err := lambdaClient.InvokeWithContext(ctx, &awslambda.InvokeInput{
33 FunctionName: aws.String("process-function"),
34 Payload: validateResp.Payload,
35 })
36 if err != nil {
37 return nil, err
38 }
39
40 // Step 3: Store
41 storeResp, err := lambdaClient.InvokeWithContext(ctx, &awslambda.InvokeInput{
42 FunctionName: aws.String("store-function"),
43 Payload: processResp.Payload,
44 })
45 if err != nil {
46 return nil, err
47 }
48
49 var result interface{}
50 json.Unmarshal(storeResp.Payload, &result)
51
52 return result, nil
53}
54
55func main() {
56 lambda.Start(handler)
57}
Common Pitfalls in Function Composition:
- Chained failures: If one function fails, the entire chain fails
- Payload size limits: Lambda responses have 6MB limits
- Cold start accumulation: Each function in the chain can have cold starts
- Error handling complexity: Need to handle failures at each step
Alternative: Consider using AWS Step Functions for complex workflows. They provide built-in error handling, retries, and visual workflow monitoring.
Retry and Error Handling
1package main
2
3import (
4 "context"
5 "errors"
6 "time"
7
8 "github.com/aws/aws-lambda-go/lambda"
9)
10
11type RetriableError struct {
12 error
13}
14
15func Error() string {
16 return e.error.Error()
17}
18
19func handler(ctx context.Context, event map[string]interface{}) error {
20 err := processEvent(event)
21
22 if err != nil {
23 // Determine if error is retriable
24 if isRetriable(err) {
25 // Return error - Lambda will retry
26 return &RetriableError{err}
27 }
28
29 // Non-retriable error - send to DLQ
30 return err
31 }
32
33 return nil
34}
35
36func processEvent(event map[string]interface{}) error {
37 // Implement exponential backoff for external calls
38 maxRetries := 3
39 baseDelay := 100 * time.Millisecond
40
41 for attempt := 0; attempt < maxRetries; attempt++ {
42 err := callExternalAPI()
43 if err == nil {
44 return nil
45 }
46
47 if !isRetriable(err) {
48 return err
49 }
50
51 delay := baseDelay * time.Duration(1<<uint(attempt))
52 time.Sleep(delay)
53 }
54
55 return errors.New("max retries exceeded")
56}
57
58func isRetriable(err error) bool {
59 // Check if error is transient
60 // Network errors, rate limits, etc.
61 return true
62}
63
64func main() {
65 lambda.Start(handler)
66}
Observability
1package main
2
3import (
4 "context"
5 "log"
6
7 "github.com/aws/aws-lambda-go/lambda"
8 "github.com/aws/aws-lambda-go/lambdacontext"
9 "github.com/aws/aws-xray-sdk-go/xray"
10)
11
12func handler(ctx context.Context, event map[string]interface{}) error {
13 // Get Lambda context
14 lc, _ := lambdacontext.FromContext(ctx)
15
16 log.Printf("Request ID: %s", lc.AwsRequestID)
17 log.Printf("Function ARN: %s", lc.InvokedFunctionArn)
18 log.Printf("Memory Limit: %d MB", lc.MemoryLimitInMB)
19
20 // X-Ray tracing
21 ctx, seg := xray.BeginSubsegment(ctx, "business-logic")
22 defer seg.Close(nil)
23
24 // Add metadata
25 seg.AddMetadata("event", event)
26
27 // Process
28 if err := processEvent(ctx, event); err != nil {
29 seg.AddError(err)
30 return err
31 }
32
33 return nil
34}
35
36func processEvent(ctx context.Context, event map[string]interface{}) error {
37 // Traced database call
38 err := xray.Capture(ctx, "database-query", func(ctx context.Context) error {
39 return queryDatabase(ctx)
40 })
41
42 return err
43}
44
45func main() {
46 lambda.Start(handler)
47}
Further Reading
Practice Exercises
Exercise 1: Build a Serverless API
Objective: Create a complete CRUD API using Lambda, API Gateway, and DynamoDB.
Requirements:
- Create/Read/Update/Delete endpoints
- Input validation
- Error handling
- DynamoDB integration
- CloudFormation/SAM template
Solution
1// handlers/user.go
2package handlers
3
4import (
5 "context"
6 "encoding/json"
7 "net/http"
8
9 "github.com/aws/aws-lambda-go/events"
10 "github.com/aws/aws-sdk-go/aws"
11 "github.com/aws/aws-sdk-go/service/dynamodb"
12 "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
13 "github.com/google/uuid"
14)
15
16type User struct {
17 ID string `json:"id" dynamodbav:"id"`
18 Name string `json:"name" dynamodbav:"name"`
19 Email string `json:"email" dynamodbav:"email"`
20}
21
22type UserHandler struct {
23 db *dynamodb.DynamoDB
24 tableName string
25}
26
27func NewUserHandler(db *dynamodb.DynamoDB, tableName string) *UserHandler {
28 return &UserHandler{
29 db: db,
30 tableName: tableName,
31 }
32}
33
34func Create(ctx context.Context, req events.APIGatewayProxyRequest) {
35 var user User
36 if err := json.Unmarshal([]byte(req.Body), &user); err != nil {
37 return response(http.StatusBadRequest, map[string]string{"error": "invalid request"}), nil
38 }
39
40 // Validate
41 if user.Name == "" || user.Email == "" {
42 return response(http.StatusBadRequest, map[string]string{"error": "name and email required"}), nil
43 }
44
45 user.ID = uuid.New().String()
46
47 // Save to DynamoDB
48 item, _ := dynamodbattribute.MarshalMap(user)
49 _, err := h.db.PutItemWithContext(ctx, &dynamodb.PutItemInput{
50 TableName: aws.String(h.tableName),
51 Item: item,
52 })
53 if err != nil {
54 return response(http.StatusInternalServerError, map[string]string{"error": "failed to create user"}), nil
55 }
56
57 return response(http.StatusCreated, user), nil
58}
59
60func Get(ctx context.Context, req events.APIGatewayProxyRequest) {
61 id := req.PathParameters["id"]
62
63 result, err := h.db.GetItemWithContext(ctx, &dynamodb.GetItemInput{
64 TableName: aws.String(h.tableName),
65 Key: map[string]*dynamodb.AttributeValue{
66 "id": {S: aws.String(id)},
67 },
68 })
69 if err != nil {
70 return response(http.StatusInternalServerError, map[string]string{"error": "failed to get user"}), nil
71 }
72
73 if result.Item == nil {
74 return response(http.StatusNotFound, map[string]string{"error": "user not found"}), nil
75 }
76
77 var user User
78 dynamodbattribute.UnmarshalMap(result.Item, &user)
79
80 return response(http.StatusOK, user), nil
81}
82
83func Update(ctx context.Context, req events.APIGatewayProxyRequest) {
84 id := req.PathParameters["id"]
85
86 var updates map[string]string
87 json.Unmarshal([]byte(req.Body), &updates)
88
89 // Build update expression
90 updateExpr := "SET "
91 exprAttrValues := make(map[string]*dynamodb.AttributeValue)
92
93 if name, ok := updates["name"]; ok {
94 updateExpr += "#n = :name, "
95 exprAttrValues[":name"] = &dynamodb.AttributeValue{S: aws.String(name)}
96 }
97
98 if email, ok := updates["email"]; ok {
99 updateExpr += "email = :email"
100 exprAttrValues[":email"] = &dynamodb.AttributeValue{S: aws.String(email)}
101 }
102
103 _, err := h.db.UpdateItemWithContext(ctx, &dynamodb.UpdateItemInput{
104 TableName: aws.String(h.tableName),
105 Key: map[string]*dynamodb.AttributeValue{
106 "id": {S: aws.String(id)},
107 },
108 UpdateExpression: aws.String(updateExpr),
109 ExpressionAttributeValues: exprAttrValues,
110 ExpressionAttributeNames: map[string]*string{
111 "#n": aws.String("name"),
112 },
113 })
114 if err != nil {
115 return response(http.StatusInternalServerError, map[string]string{"error": "failed to update user"}), nil
116 }
117
118 return response(http.StatusOK, map[string]string{"status": "updated"}), nil
119}
120
121func Delete(ctx context.Context, req events.APIGatewayProxyRequest) {
122 id := req.PathParameters["id"]
123
124 _, err := h.db.DeleteItemWithContext(ctx, &dynamodb.DeleteItemInput{
125 TableName: aws.String(h.tableName),
126 Key: map[string]*dynamodb.AttributeValue{
127 "id": {S: aws.String(id)},
128 },
129 })
130 if err != nil {
131 return response(http.StatusInternalServerError, map[string]string{"error": "failed to delete user"}), nil
132 }
133
134 return response(http.StatusNoContent, nil), nil
135}
136
137func response(statusCode int, body interface{}) events.APIGatewayProxyResponse {
138 bodyBytes, _ := json.Marshal(body)
139
140 return events.APIGatewayProxyResponse{
141 StatusCode: statusCode,
142 Headers: map[string]string{
143 "Content-Type": "application/json",
144 },
145 Body: string(bodyBytes),
146 }
147}
1# SAM template
2AWSTemplateFormatVersion: '2010-09-09'
3Transform: AWS::Serverless-2016-10-31
4
5Resources:
6 UsersTable:
7 Type: AWS::DynamoDB::Table
8 Properties:
9 TableName: Users
10 AttributeDefinitions:
11 - AttributeName: id
12 AttributeType: S
13 KeySchema:
14 - AttributeName: id
15 KeyType: HASH
16 BillingMode: PAY_PER_REQUEST
17
18 CreateUserFunction:
19 Type: AWS::Serverless::Function
20 Properties:
21 Runtime: provided.al2
22 Handler: bootstrap
23 CodeUri: ./build/create
24 Events:
25 Create:
26 Type: Api
27 Properties:
28 Path: /users
29 Method: POST
30 Environment:
31 Variables:
32 TABLE_NAME: !Ref UsersTable
33 Policies:
34 - DynamoDBCrudPolicy:
35 TableName: !Ref UsersTable
Exercise 2: Implement Event-Driven Processing
Objective: Build an image processing pipeline with S3 triggers.
Solution
1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "image"
8 "image/jpeg"
9 "image/png"
10
11 "github.com/aws/aws-lambda-go/events"
12 "github.com/aws/aws-lambda-go/lambda"
13 "github.com/aws/aws-sdk-go/aws"
14 "github.com/aws/aws-sdk-go/aws/session"
15 "github.com/aws/aws-sdk-go/service/s3"
16 "github.com/nfnt/resize"
17)
18
19var s3Client *s3.S3
20
21func init() {
22 sess := session.Must(session.NewSession())
23 s3Client = s3.New(sess)
24}
25
26func handler(ctx context.Context, s3Event events.S3Event) error {
27 for _, record := range s3Event.Records {
28 bucket := record.S3.Bucket.Name
29 key := record.S3.Object.Key
30
31 // Download image
32 result, err := s3Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
33 Bucket: aws.String(bucket),
34 Key: aws.String(key),
35 })
36 if err != nil {
37 return err
38 }
39
40 // Decode image
41 img, format, err := image.Decode(result.Body)
42 result.Body.Close()
43 if err != nil {
44 return err
45 }
46
47 // Generate thumbnails
48 sizes := []uint{100, 200, 400}
49 for _, size := range sizes {
50 thumbnail := resize.Resize(size, 0, img, resize.Lanczos3)
51
52 // Encode
53 var buf bytes.Buffer
54 switch format {
55 case "jpeg":
56 jpeg.Encode(&buf, thumbnail, &jpeg.Options{Quality: 85})
57 case "png":
58 png.Encode(&buf, thumbnail)
59 }
60
61 // Upload thumbnail
62 thumbnailKey := fmt.Sprintf("thumbnails/%dx/%s", size, key)
63 _, err = s3Client.PutObjectWithContext(ctx, &s3.PutObjectInput{
64 Bucket: aws.String(bucket),
65 Key: aws.String(thumbnailKey),
66 Body: bytes.NewReader(buf.Bytes()),
67 ContentType: aws.String(fmt.Sprintf("image/%s", format)),
68 })
69 if err != nil {
70 return err
71 }
72 }
73 }
74
75 return nil
76}
77
78func main() {
79 lambda.Start(handler)
80}
Exercise 3: Serverless Data Pipeline
Objective: Build a complete serverless data processing pipeline using AWS Lambda, S3, SQS, and DynamoDB with error handling and monitoring.
Requirements:
- Implement multi-stage data pipeline
- Error handling with Dead Letter Queues
- Idempotent processing with deduplication
- CloudWatch metrics and alarms
- Step Functions orchestration
Solution with Explanation
1// run
2// ingestion/main.go - S3 upload trigger
3package main
4
5import (
6 "context"
7 "encoding/json"
8 "fmt"
9 "log"
10
11 "github.com/aws/aws-lambda-go/events"
12 "github.com/aws/aws-lambda-go/lambda"
13 "github.com/aws/aws-sdk-go/aws"
14 "github.com/aws/aws-sdk-go/aws/session"
15 "github.com/aws/aws-sdk-go/service/s3"
16 "github.com/aws/aws-sdk-go/service/sqs"
17)
18
19type FileMetadata struct {
20 FileID string `json:"file_id"`
21 Bucket string `json:"bucket"`
22 Key string `json:"key"`
23 Size int64 `json:"size"`
24 ContentType string `json:"content_type"`
25 UploadTime string `json:"upload_time"`
26}
27
28var (
29 s3Client *s3.S3
30 sqsClient *sqs.SQS
31 queueURL string
32)
33
34func init() {
35 sess := session.Must(session.NewSession())
36 s3Client = s3.New(sess)
37 sqsClient = sqs.New(sess)
38 queueURL = os.Getenv("PROCESSING_QUEUE_URL")
39}
40
41func handler(ctx context.Context, s3Event events.S3Event) error {
42 for _, record := range s3Event.Records {
43 bucket := record.S3.Bucket.Name
44 key := record.S3.Object.Key
45 size := record.S3.Object.Size
46
47 log.Printf("Processing S3 event: s3://%s/%s", bucket, key, size)
48
49 // Get object metadata
50 headOutput, err := s3Client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
51 Bucket: aws.String(bucket),
52 Key: aws.String(key),
53 })
54 if err != nil {
55 log.Printf("Error getting object metadata: %v", err)
56 return err
57 }
58
59 // Create file metadata
60 metadata := FileMetadata{
61 FileID: record.S3.Object.ETag,
62 Bucket: bucket,
63 Key: key,
64 Size: size,
65 ContentType: aws.StringValue(headOutput.ContentType),
66 UploadTime: record.EventTime.String(),
67 }
68
69 // Send to processing queue
70 messageBody, _ := json.Marshal(metadata)
71 _, err = sqsClient.SendMessageWithContext(ctx, &sqs.SendMessageInput{
72 QueueUrl: aws.String(queueURL),
73 MessageBody: aws.String(string(messageBody)),
74 MessageAttributes: map[string]*sqs.MessageAttributeValue{
75 "FileSize": {
76 DataType: aws.String("Number"),
77 StringValue: aws.String(fmt.Sprintf("%d", size)),
78 },
79 "ContentType": {
80 DataType: aws.String("String"),
81 StringValue: aws.String(metadata.ContentType),
82 },
83 },
84 // Deduplication
85 MessageDeduplicationId: aws.String(metadata.FileID),
86 MessageGroupId: aws.String("file-processing"),
87 })
88 if err != nil {
89 log.Printf("Error sending message to queue: %v", err)
90 return err
91 }
92
93 log.Printf("Queued file for processing: %s", metadata.FileID)
94 }
95
96 return nil
97}
98
99func main() {
100 lambda.Start(handler)
101}
1// run
2// processing/main.go - SQS message processor
3package main
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "io"
11 "log"
12 "os"
13 "time"
14
15 "github.com/aws/aws-lambda-go/events"
16 "github.com/aws/aws-lambda-go/lambda"
17 "github.com/aws/aws-sdk-go/aws"
18 "github.com/aws/aws-sdk-go/aws/session"
19 "github.com/aws/aws-sdk-go/service/cloudwatch"
20 "github.com/aws/aws-sdk-go/service/dynamodb"
21 "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
22 "github.com/aws/aws-sdk-go/service/s3"
23)
24
25type ProcessedRecord struct {
26 FileID string `json:"file_id" dynamodbav:"file_id"`
27 ProcessedAt time.Time `json:"processed_at" dynamodbav:"processed_at"`
28 Status string `json:"status" dynamodbav:"status"`
29 RecordCount int `json:"record_count" dynamodbav:"record_count"`
30 ErrorMessage string `json:"error_message,omitempty" dynamodbav:"error_message,omitempty"`
31 ProcessingTimeMs int64 `json:"processing_time_ms" dynamodbav:"processing_time_ms"`
32}
33
34var (
35 s3Client *s3.S3
36 dbClient *dynamodb.DynamoDB
37 cwClient *cloudwatch.CloudWatch
38 tableName string
39)
40
41func init() {
42 sess := session.Must(session.NewSession())
43 s3Client = s3.New(sess)
44 dbClient = dynamodb.New(sess)
45 cwClient = cloudwatch.New(sess)
46 tableName = os.Getenv("PROCESSED_TABLE")
47}
48
49func handler(ctx context.Context, sqsEvent events.SQSEvent) {
50 batchItemFailures := []events.SQSBatchItemFailure{}
51
52 for _, message := range sqsEvent.Records {
53 if err := processMessage(ctx, message); err != nil {
54 log.Printf("Failed to process message %s: %v", message.MessageId, err)
55 batchItemFailures = append(batchItemFailures, events.SQSBatchItemFailure{
56 ItemIdentifier: message.MessageId,
57 })
58 }
59 }
60
61 return events.SQSEventResponse{
62 BatchItemFailures: batchItemFailures,
63 }, nil
64}
65
66func processMessage(ctx context.Context, message events.SQSMessage) error {
67 start := time.Now()
68
69 var metadata FileMetadata
70 if err := json.Unmarshal([]byte(message.Body), &metadata); err != nil {
71 return fmt.Errorf("failed to unmarshal message: %w", err)
72 }
73
74 // Check if already processed
75 exists, err := checkIfProcessed(ctx, metadata.FileID)
76 if err != nil {
77 return err
78 }
79 if exists {
80 log.Printf("File %s already processed, skipping", metadata.FileID)
81 return nil
82 }
83
84 // Download and process file
85 result, err := s3Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
86 Bucket: aws.String(metadata.Bucket),
87 Key: aws.String(metadata.Key),
88 })
89 if err != nil {
90 return recordFailure(ctx, metadata.FileID, err, time.Since(start))
91 }
92 defer result.Body.Close()
93
94 // Process file contents
95 data, err := io.ReadAll(result.Body)
96 if err != nil {
97 return recordFailure(ctx, metadata.FileID, err, time.Since(start))
98 }
99
100 // Parse and transform data
101 records, err := processData(data, metadata.ContentType)
102 if err != nil {
103 return recordFailure(ctx, metadata.FileID, err, time.Since(start))
104 }
105
106 // Store processed record
107 processedRecord := ProcessedRecord{
108 FileID: metadata.FileID,
109 ProcessedAt: time.Now(),
110 Status: "completed",
111 RecordCount: len(records),
112 ProcessingTimeMs: time.Since(start).Milliseconds(),
113 }
114
115 item, _ := dynamodbattribute.MarshalMap(processedRecord)
116 _, err = dbClient.PutItemWithContext(ctx, &dynamodb.PutItemInput{
117 TableName: aws.String(tableName),
118 Item: item,
119 ConditionExpression: aws.String("attribute_not_exists(file_id)"),
120 })
121 if err != nil {
122 return err
123 }
124
125 // Publish metrics
126 publishMetrics(ctx, processedRecord)
127
128 log.Printf("Successfully processed file %s: %d records in %dms",
129 metadata.FileID, len(records), processedRecord.ProcessingTimeMs)
130
131 return nil
132}
133
134func checkIfProcessed(ctx context.Context, fileID string) {
135 result, err := dbClient.GetItemWithContext(ctx, &dynamodb.GetItemInput{
136 TableName: aws.String(tableName),
137 Key: map[string]*dynamodb.AttributeValue{
138 "file_id": {S: aws.String(fileID)},
139 },
140 })
141 if err != nil {
142 return false, err
143 }
144
145 return result.Item != nil, nil
146}
147
148func processData(data []byte, contentType string) {
149 // Implement data parsing based on content type
150 var records []map[string]interface{}
151
152 switch contentType {
153 case "application/json":
154 if err := json.Unmarshal(data, &records); err != nil {
155 return nil, err
156 }
157 case "text/csv":
158 // CSV parsing logic
159 records = parseCSV(data)
160 default:
161 return nil, fmt.Errorf("unsupported content type: %s", contentType)
162 }
163
164 return records, nil
165}
166
167func recordFailure(ctx context.Context, fileID string, err error, duration time.Duration) error {
168 processedRecord := ProcessedRecord{
169 FileID: fileID,
170 ProcessedAt: time.Now(),
171 Status: "failed",
172 ErrorMessage: err.Error(),
173 ProcessingTimeMs: duration.Milliseconds(),
174 }
175
176 item, _ := dynamodbattribute.MarshalMap(processedRecord)
177 dbClient.PutItemWithContext(ctx, &dynamodb.PutItemInput{
178 TableName: aws.String(tableName),
179 Item: item,
180 })
181
182 publishMetrics(ctx, processedRecord)
183
184 return err
185}
186
187func publishMetrics(ctx context.Context, record ProcessedRecord) {
188 namespace := "DataPipeline"
189
190 metrics := []*cloudwatch.MetricDatum{
191 {
192 MetricName: aws.String("ProcessingTime"),
193 Value: aws.Float64(float64(record.ProcessingTimeMs)),
194 Unit: aws.String("Milliseconds"),
195 Timestamp: aws.Time(time.Now()),
196 },
197 {
198 MetricName: aws.String("RecordCount"),
199 Value: aws.Float64(float64(record.RecordCount)),
200 Unit: aws.String("Count"),
201 Timestamp: aws.Time(time.Now()),
202 },
203 }
204
205 if record.Status == "failed" {
206 metrics = append(metrics, &cloudwatch.MetricDatum{
207 MetricName: aws.String("ProcessingErrors"),
208 Value: aws.Float64(1),
209 Unit: aws.String("Count"),
210 Timestamp: aws.Time(time.Now()),
211 })
212 }
213
214 cwClient.PutMetricDataWithContext(ctx, &cloudwatch.PutMetricDataInput{
215 Namespace: aws.String(namespace),
216 MetricData: metrics,
217 })
218}
219
220func main() {
221 lambda.Start(handler)
222}
1# run
2# SAM template for serverless data pipeline
3AWSTemplateFormatVersion: '2010-09-09'
4Transform: AWS::Serverless-2016-10-31
5
6Description: Serverless Data Processing Pipeline
7
8Globals:
9 Function:
10 Timeout: 300
11 MemorySize: 512
12 Runtime: provided.al2
13 Environment:
14 Variables:
15 LOG_LEVEL: info
16
17Resources:
18 # S3 Bucket for data ingestion
19 DataBucket:
20 Type: AWS::S3::Bucket
21 Properties:
22 BucketName: !Sub '${AWS::StackName}-data-bucket'
23 NotificationConfiguration:
24 LambdaConfigurations:
25 - Event: s3:ObjectCreated:*
26 Function: !GetAtt IngestionFunction.Arn
27 Filter:
28 S3Key:
29 Rules:
30 - Name: prefix
31 Value: incoming/
32
33 # SQS Queue for processing
34 ProcessingQueue:
35 Type: AWS::SQS::Queue
36 Properties:
37 QueueName: !Sub '${AWS::StackName}-processing-queue'
38 FifoQueue: true
39 ContentBasedDeduplication: true
40 MessageRetentionPeriod: 1209600 # 14 days
41 VisibilityTimeout: 900 # 15 minutes
42 RedrivePolicy:
43 deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
44 maxReceiveCount: 3
45
46 DeadLetterQueue:
47 Type: AWS::SQS::Queue
48 Properties:
49 QueueName: !Sub '${AWS::StackName}-dlq'
50 FifoQueue: true
51 MessageRetentionPeriod: 1209600
52
53 # DynamoDB Table for processed records
54 ProcessedTable:
55 Type: AWS::DynamoDB::Table
56 Properties:
57 TableName: !Sub '${AWS::StackName}-processed'
58 AttributeDefinitions:
59 - AttributeName: file_id
60 AttributeType: S
61 - AttributeName: processed_at
62 AttributeType: S
63 KeySchema:
64 - AttributeName: file_id
65 KeyType: HASH
66 - AttributeName: processed_at
67 KeyType: RANGE
68 BillingMode: PAY_PER_REQUEST
69 StreamSpecification:
70 StreamViewType: NEW_AND_OLD_IMAGES
71 TimeToLiveSpecification:
72 AttributeName: ttl
73 Enabled: true
74
75 # Ingestion Lambda Function
76 IngestionFunction:
77 Type: AWS::Serverless::Function
78 Properties:
79 CodeUri: ./build/ingestion
80 Handler: bootstrap
81 Environment:
82 Variables:
83 PROCESSING_QUEUE_URL: !Ref ProcessingQueue
84 Policies:
85 - S3ReadPolicy:
86 BucketName: !Ref DataBucket
87 - SQSSendMessagePolicy:
88 QueueName: !GetAtt ProcessingQueue.QueueName
89
90 IngestionFunctionPermission:
91 Type: AWS::Lambda::Permission
92 Properties:
93 FunctionName: !Ref IngestionFunction
94 Action: lambda:InvokeFunction
95 Principal: s3.amazonaws.com
96 SourceArn: !GetAtt DataBucket.Arn
97
98 # Processing Lambda Function
99 ProcessingFunction:
100 Type: AWS::Serverless::Function
101 Properties:
102 CodeUri: ./build/processing
103 Handler: bootstrap
104 ReservedConcurrentExecutions: 10
105 Environment:
106 Variables:
107 PROCESSED_TABLE: !Ref ProcessedTable
108 Events:
109 SQSEvent:
110 Type: SQS
111 Properties:
112 Queue: !GetAtt ProcessingQueue.Arn
113 BatchSize: 10
114 FunctionResponseTypes:
115 - ReportBatchItemFailures
116 Policies:
117 - S3ReadPolicy:
118 BucketName: !Ref DataBucket
119 - DynamoDBCrudPolicy:
120 TableName: !Ref ProcessedTable
121 - CloudWatchPutMetricPolicy: {}
122
123 # CloudWatch Alarms
124 ProcessingErrorAlarm:
125 Type: AWS::CloudWatch::Alarm
126 Properties:
127 AlarmName: !Sub '${AWS::StackName}-processing-errors'
128 AlarmDescription: Alert on processing errors
129 MetricName: ProcessingErrors
130 Namespace: DataPipeline
131 Statistic: Sum
132 Period: 300
133 EvaluationPeriods: 1
134 Threshold: 5
135 ComparisonOperator: GreaterThanThreshold
136 TreatMissingData: notBreaching
137
138 DLQAlarm:
139 Type: AWS::CloudWatch::Alarm
140 Properties:
141 AlarmName: !Sub '${AWS::StackName}-dlq-messages'
142 AlarmDescription: Alert on messages in DLQ
143 MetricName: ApproximateNumberOfMessagesVisible
144 Namespace: AWS/SQS
145 Dimensions:
146 - Name: QueueName
147 Value: !GetAtt DeadLetterQueue.QueueName
148 Statistic: Average
149 Period: 300
150 EvaluationPeriods: 1
151 Threshold: 1
152 ComparisonOperator: GreaterThanOrEqualToThreshold
153
154 # Dashboard
155 PipelineDashboard:
156 Type: AWS::CloudWatch::Dashboard
157 Properties:
158 DashboardName: !Sub '${AWS::StackName}-pipeline'
159 DashboardBody: !Sub |
160 {
161 "widgets": [
162 {
163 "type": "metric",
164 "properties": {
165 "metrics": [
166 ["AWS/Lambda", "Invocations", {"stat": "Sum", "label": "Ingestion"}],
167 [".", ".", {"stat": "Sum", "label": "Processing"}]
168 ],
169 "period": 300,
170 "stat": "Sum",
171 "region": "${AWS::Region}",
172 "title": "Lambda Invocations"
173 }
174 },
175 {
176 "type": "metric",
177 "properties": {
178 "metrics": [
179 ["DataPipeline", "ProcessingTime", {"stat": "Average"}],
180 [".", "RecordCount", {"stat": "Sum"}],
181 [".", "ProcessingErrors", {"stat": "Sum"}]
182 ],
183 "period": 300,
184 "region": "${AWS::Region}",
185 "title": "Pipeline Metrics"
186 }
187 }
188 ]
189 }
190
191Outputs:
192 DataBucketName:
193 Description: S3 bucket for data ingestion
194 Value: !Ref DataBucket
195
196 ProcessingQueueURL:
197 Description: SQS queue URL
198 Value: !Ref ProcessingQueue
199
200 ProcessedTableName:
201 Description: DynamoDB table name
202 Value: !Ref ProcessedTable
Explanation:
This solution implements a production-grade serverless data pipeline with:
-
Multi-Stage Processing:
- S3 trigger for ingestion
- SQS for reliable async processing
- DynamoDB for state tracking
-
Error Handling:
- Dead Letter Queue for failed messages
- Batch item failures for partial retries
- CloudWatch alarms for monitoring
-
Idempotency:
- Message deduplication in FIFO queue
- DynamoDB conditional writes
- File ID-based tracking
-
Observability:
- CloudWatch metrics for processing time, record count, errors
- Custom dashboard for pipeline visibility
- Structured logging with context
-
Production Features:
- Reserved concurrency to control costs
- TTL for automatic data cleanup
- Batch processing for efficiency
- Comprehensive IAM policies
The pipeline handles data ingestion, processing, and storage with automatic retries, error handling, and monitoring suitable for production workloads.
Deployment:
1# Build functions
2GOOS=linux GOARCH=amd64 go build -o build/ingestion/bootstrap ./ingestion
3GOOS=linux GOARCH=amd64 go build -o build/processing/bootstrap ./processing
4
5# Deploy with SAM
6sam build
7sam deploy --guided
Exercise 4: Serverless API with Authentication and Rate Limiting
Learning Objective: Build a production-ready serverless REST API with JWT authentication, rate limiting, and comprehensive error handling. This exercise teaches you to implement security best practices, protect against abuse, and create robust API endpoints that can handle high-traffic scenarios while maintaining performance and security.
Real-World Context: Production APIs at companies like Stripe, Twilio, and GitHub process millions of requests daily with strict authentication and rate limiting requirements. Understanding how to implement these patterns in a serverless environment is crucial for building secure, scalable APIs that prevent abuse while maintaining excellent performance. JWT authentication provides stateless auth that scales horizontally, while rate limiting prevents API abuse and ensures fair resource allocation across users.
Difficulty: Advanced | Time Estimate: 60-75 minutes
Objective: Create a serverless API with JWT authentication, per-user rate limiting using DynamoDB, request validation, and comprehensive error handling.
Requirements:
- JWT token generation and validation
- Rate limiting per API key/user
- Request validation middleware
- CORS handling
- Comprehensive error responses
- API versioning support
Solution with Explanation
1// run
2// auth/jwt.go - JWT token handling
3package auth
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 "github.com/golang-jwt/jwt/v5"
11)
12
13var jwtSecret = []byte("your-secret-key") // Use AWS Secrets Manager in production
14
15type Claims struct {
16 UserID string `json:"user_id"`
17 Email string `json:"email"`
18 Roles []string `json:"roles"`
19 jwt.RegisteredClaims
20}
21
22// GenerateToken creates a new JWT token
23func GenerateToken(userID, email string, roles []string) (string, error) {
24 claims := &Claims{
25 UserID: userID,
26 Email: email,
27 Roles: roles,
28 RegisteredClaims: jwt.RegisteredClaims{
29 ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
30 IssuedAt: jwt.NewNumericDate(time.Now()),
31 Issuer: "serverless-api",
32 },
33 }
34
35 token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
36 return token.SignedString(jwtSecret)
37}
38
39// ValidateToken validates and parses a JWT token
40func ValidateToken(tokenString string) (*Claims, error) {
41 token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
42 if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
43 return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
44 }
45 return jwtSecret, nil
46 })
47
48 if err != nil {
49 return nil, err
50 }
51
52 if claims, ok := token.Claims.(*Claims); ok && token.Valid {
53 return claims, nil
54 }
55
56 return nil, fmt.Errorf("invalid token")
57}
58
59// HasRole checks if user has required role
60func (c *Claims) HasRole(role string) bool {
61 for _, r := range c.Roles {
62 if r == role {
63 return true
64 }
65 }
66 return false
67}
1// run
2// ratelimit/limiter.go - DynamoDB-based rate limiter
3package ratelimit
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 "github.com/aws/aws-sdk-go-v2/aws"
11 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
12 "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
13)
14
15type RateLimiter struct {
16 client *dynamodb.Client
17 tableName string
18}
19
20type RateLimit struct {
21 Key string `dynamodbav:"key"`
22 RequestCount int `dynamodbav:"request_count"`
23 WindowStart time.Time `dynamodbav:"window_start"`
24 TTL int64 `dynamodbav:"ttl"`
25}
26
27func NewRateLimiter(client *dynamodb.Client, tableName string) *RateLimiter {
28 return &RateLimiter{
29 client: client,
30 tableName: tableName,
31 }
32}
33
34// CheckLimit checks if request is within rate limit
35func (rl *RateLimiter) CheckLimit(ctx context.Context, userID string, limit int, window time.Duration) (bool, int, error) {
36 now := time.Now()
37 windowKey := fmt.Sprintf("%s:%d", userID, now.Unix()/int64(window.Seconds()))
38
39 // Try to increment counter
40 result, err := rl.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
41 TableName: aws.String(rl.tableName),
42 Key: map[string]types.AttributeValue{
43 "key": &types.AttributeValueMemberS{Value: windowKey},
44 },
45 UpdateExpression: aws.String("ADD request_count :inc SET window_start = if_not_exists(window_start, :now), #ttl = :ttl"),
46 ExpressionAttributeNames: map[string]string{
47 "#ttl": "ttl",
48 },
49 ExpressionAttributeValues: map[string]types.AttributeValue{
50 ":inc": &types.AttributeValueMemberN{Value: "1"},
51 ":now": &types.AttributeValueMemberS{Value: now.Format(time.RFC3339)},
52 ":ttl": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", now.Add(window).Unix())},
53 },
54 ReturnValues: types.ReturnValueAllNew,
55 })
56
57 if err != nil {
58 return false, 0, err
59 }
60
61 // Parse request count
62 countAttr := result.Attributes["request_count"]
63 if countAttr == nil {
64 return true, 0, nil
65 }
66
67 var count int
68 if n, ok := countAttr.(*types.AttributeValueMemberN); ok {
69 fmt.Sscanf(n.Value, "%d", &count)
70 }
71
72 remaining := limit - count
73 if remaining < 0 {
74 remaining = 0
75 }
76
77 allowed := count <= limit
78 return allowed, remaining, nil
79}
1// run
2// main.go - Lambda handler with authentication and rate limiting
3package main
4
5import (
6 "context"
7 "encoding/json"
8 "fmt"
9 "net/http"
10 "os"
11 "strings"
12
13 "github.com/aws/aws-lambda-go/events"
14 "github.com/aws/aws-lambda-go/lambda"
15 "github.com/aws/aws-sdk-go-v2/config"
16 "github.com/aws/aws-sdk-go-v2/service/dynamodb"
17)
18
19var (
20 dbClient *dynamodb.Client
21 rateLimiter *ratelimit.RateLimiter
22 rateLimitTbl string
23)
24
25func init() {
26 cfg, _ := config.LoadDefaultConfig(context.Background())
27 dbClient = dynamodb.NewFromConfig(cfg)
28 rateLimitTbl = os.Getenv("RATE_LIMIT_TABLE")
29 rateLimiter = ratelimit.NewRateLimiter(dbClient, rateLimitTbl)
30}
31
32type APIResponse struct {
33 StatusCode int `json:"statusCode"`
34 Body string `json:"body"`
35 Headers map[string]string `json:"headers"`
36}
37
38// Middleware chain
39type Middleware func(events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, bool)
40
41func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
42 // Apply middleware chain
43 middlewares := []Middleware{
44 corsMiddleware,
45 authMiddleware,
46 rateLimitMiddleware,
47 }
48
49 for _, mw := range middlewares {
50 if response, shouldReturn := mw(request); shouldReturn {
51 return response, nil
52 }
53 }
54
55 // Route request
56 return routeRequest(ctx, request)
57}
58
59// corsMiddleware handles CORS
60func corsMiddleware(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, bool) {
61 if request.HTTPMethod == "OPTIONS" {
62 return events.APIGatewayProxyResponse{
63 StatusCode: http.StatusOK,
64 Headers: getCORSHeaders(),
65 Body: "",
66 }, true
67 }
68 return events.APIGatewayProxyResponse{}, false
69}
70
71// authMiddleware validates JWT token
72func authMiddleware(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, bool) {
73 // Skip auth for public endpoints
74 if strings.HasPrefix(request.Path, "/public") {
75 return events.APIGatewayProxyResponse{}, false
76 }
77
78 authHeader := request.Headers["Authorization"]
79 if authHeader == "" {
80 return errorResponse(http.StatusUnauthorized, "Missing authorization header"), true
81 }
82
83 // Extract token
84 parts := strings.Split(authHeader, " ")
85 if len(parts) != 2 || parts[0] != "Bearer" {
86 return errorResponse(http.StatusUnauthorized, "Invalid authorization header"), true
87 }
88
89 // Validate token
90 claims, err := auth.ValidateToken(parts[1])
91 if err != nil {
92 return errorResponse(http.StatusUnauthorized, "Invalid token"), true
93 }
94
95 // Store claims in request context (simulate)
96 request.RequestContext.Authorizer = map[string]interface{}{
97 "claims": claims,
98 }
99
100 return events.APIGatewayProxyResponse{}, false
101}
102
103// rateLimitMiddleware enforces rate limits
104func rateLimitMiddleware(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, bool) {
105 // Get user from context
106 var userID string
107 if auth, ok := request.RequestContext.Authorizer["claims"]; ok {
108 if claims, ok := auth.(*auth.Claims); ok {
109 userID = claims.UserID
110 }
111 }
112
113 if userID == "" {
114 userID = request.RequestContext.Identity.SourceIP
115 }
116
117 // Check rate limit: 100 requests per minute
118 ctx := context.Background()
119 allowed, remaining, err := rateLimiter.CheckLimit(ctx, userID, 100, 1*time.Minute)
120 if err != nil {
121 return errorResponse(http.StatusInternalServerError, "Rate limit check failed"), true
122 }
123
124 if !allowed {
125 response := errorResponse(http.StatusTooManyRequests, "Rate limit exceeded")
126 response.Headers["X-RateLimit-Limit"] = "100"
127 response.Headers["X-RateLimit-Remaining"] = "0"
128 response.Headers["Retry-After"] = "60"
129 return response, true
130 }
131
132 // Add rate limit headers
133 request.Headers["X-RateLimit-Limit"] = "100"
134 request.Headers["X-RateLimit-Remaining"] = fmt.Sprintf("%d", remaining)
135
136 return events.APIGatewayProxyResponse{}, false
137}
138
139// routeRequest routes to appropriate handler
140func routeRequest(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
141 method := request.HTTPMethod
142 path := request.Path
143
144 // API versioning
145 if strings.HasPrefix(path, "/v1") {
146 return routeV1(ctx, method, strings.TrimPrefix(path, "/v1"), request)
147 }
148
149 return errorResponse(http.StatusNotFound, "Endpoint not found"), nil
150}
151
152func routeV1(ctx context.Context, method, path string, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
153 switch {
154 case path == "/users" && method == "GET":
155 return listUsers(ctx, request)
156 case path == "/users" && method == "POST":
157 return createUser(ctx, request)
158 case strings.HasPrefix(path, "/users/") && method == "GET":
159 userID := strings.TrimPrefix(path, "/users/")
160 return getUser(ctx, userID, request)
161 case strings.HasPrefix(path, "/users/") && method == "PUT":
162 userID := strings.TrimPrefix(path, "/users/")
163 return updateUser(ctx, userID, request)
164 case strings.HasPrefix(path, "/users/") && method == "DELETE":
165 userID := strings.TrimPrefix(path, "/users/")
166 return deleteUser(ctx, userID, request)
167 default:
168 return errorResponse(http.StatusNotFound, "Endpoint not found"), nil
169 }
170}
171
172// Handler implementations
173func listUsers(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
174 users := []map[string]interface{}{
175 {"id": "1", "name": "Alice", "email": "alice@example.com"},
176 {"id": "2", "name": "Bob", "email": "bob@example.com"},
177 }
178
179 return jsonResponse(http.StatusOK, map[string]interface{}{
180 "users": users,
181 "total": len(users),
182 }), nil
183}
184
185func createUser(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
186 var input map[string]interface{}
187 if err := json.Unmarshal([]byte(request.Body), &input); err != nil {
188 return errorResponse(http.StatusBadRequest, "Invalid request body"), nil
189 }
190
191 // Validate required fields
192 if input["name"] == nil || input["email"] == nil {
193 return errorResponse(http.StatusBadRequest, "Name and email are required"), nil
194 }
195
196 // Create user (simulated)
197 user := map[string]interface{}{
198 "id": "123",
199 "name": input["name"],
200 "email": input["email"],
201 }
202
203 return jsonResponse(http.StatusCreated, user), nil
204}
205
206func getUser(ctx context.Context, userID string, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
207 user := map[string]interface{}{
208 "id": userID,
209 "name": "John Doe",
210 "email": "john@example.com",
211 }
212
213 return jsonResponse(http.StatusOK, user), nil
214}
215
216func updateUser(ctx context.Context, userID string, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
217 var input map[string]interface{}
218 if err := json.Unmarshal([]byte(request.Body), &input); err != nil {
219 return errorResponse(http.StatusBadRequest, "Invalid request body"), nil
220 }
221
222 user := map[string]interface{}{
223 "id": userID,
224 "name": input["name"],
225 "email": input["email"],
226 }
227
228 return jsonResponse(http.StatusOK, user), nil
229}
230
231func deleteUser(ctx context.Context, userID string, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
232 return jsonResponse(http.StatusNoContent, nil), nil
233}
234
235// Helper functions
236func jsonResponse(statusCode int, body interface{}) events.APIGatewayProxyResponse {
237 bodyBytes, _ := json.Marshal(body)
238 return events.APIGatewayProxyResponse{
239 StatusCode: statusCode,
240 Headers: getCORSHeaders(),
241 Body: string(bodyBytes),
242 }
243}
244
245func errorResponse(statusCode int, message string) events.APIGatewayProxyResponse {
246 return jsonResponse(statusCode, map[string]interface{}{
247 "error": map[string]interface{}{
248 "code": statusCode,
249 "message": message,
250 },
251 })
252}
253
254func getCORSHeaders() map[string]string {
255 return map[string]string{
256 "Content-Type": "application/json",
257 "Access-Control-Allow-Origin": "*",
258 "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
259 "Access-Control-Allow-Headers": "Content-Type, Authorization",
260 }
261}
262
263func main() {
264 lambda.Start(handler)
265}
Explanation:
This exercise demonstrates building a production-ready serverless API with enterprise features:
- JWT Authentication: Stateless token-based auth that scales horizontally
- Rate Limiting: DynamoDB-based per-user rate limiting to prevent abuse
- Middleware Chain: Clean separation of concerns for auth, CORS, and rate limiting
- API Versioning:
/v1prefix allows backward-compatible changes - Comprehensive Error Handling: Proper HTTP status codes and error messages
- CORS Support: Handles preflight requests and CORS headers
- Request Validation: Validates input before processing
- Production Patterns: TTL for automatic cleanup, atomic counters, proper headers
Key Patterns:
- Middleware pattern for reusable request processing
- DynamoDB atomic counters for distributed rate limiting
- JWT claims stored in request context for downstream handlers
- Proper HTTP semantics (status codes, headers, methods)
- Security best practices (token validation, input validation)
Real-World Usage: This pattern is used by API platforms like Stripe, Twilio, and SendGrid to handle millions of authenticated requests with rate limiting and comprehensive error handling.
Exercise 5: Multi-Stage Serverless Workflow with Step Functions
Learning Objective: Master complex serverless orchestration by building a multi-stage workflow using AWS Step Functions. This exercise teaches you to coordinate multiple Lambda functions, handle long-running processes, implement retry and error handling at the workflow level, and create resilient distributed systems that can handle failures gracefully.
Real-World Context: Complex business processes like order fulfillment, video transcoding, and data processing pipelines at companies like Amazon, Netflix, and Adobe use Step Functions to orchestrate multiple services. These workflows can run for hours or days, survive service failures, and coordinate dozens of steps with complex branching logic. Understanding Step Functions is essential for building enterprise-grade serverless applications that go beyond simple request-response patterns.
Difficulty: Advanced | Time Estimate: 75-90 minutes
Objective: Build a complete serverless workflow using Step Functions to orchestrate order processing with payment, inventory, shipping, and notification steps.
Requirements:
- Define Step Functions state machine
- Implement Lambda functions for each step
- Handle errors and retries at workflow level
- Implement compensation logic (rollback)
- Track workflow state and history
- Integration with SNS for notifications
1// run
2// statemachine.json - Step Functions definition
3{
4 "Comment": "Order Processing Workflow",
5 "StartAt": "ValidateOrder",
6 "States": {
7 "ValidateOrder": {
8 "Type": "Task",
9 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:validate-order",
10 "ResultPath": "$.validation",
11 "Retry": [
12 {
13 "ErrorEquals": ["States.TaskFailed"],
14 "IntervalSeconds": 2,
15 "MaxAttempts": 3,
16 "BackoffRate": 2.0
17 }
18 ],
19 "Catch": [
20 {
21 "ErrorEquals": ["ValidationError"],
22 "ResultPath": "$.error",
23 "Next": "OrderValidationFailed"
24 }
25 ],
26 "Next": "ProcessPayment"
27 },
28 "ProcessPayment": {
29 "Type": "Task",
30 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:process-payment",
31 "ResultPath": "$.payment",
32 "Retry": [
33 {
34 "ErrorEquals": ["PaymentGatewayError"],
35 "IntervalSeconds": 5,
36 "MaxAttempts": 2,
37 "BackoffRate": 2.0
38 }
39 ],
40 "Catch": [
41 {
42 "ErrorEquals": ["PaymentDeclined", "InsufficientFunds"],
43 "ResultPath": "$.error",
44 "Next": "PaymentFailed"
45 }
46 ],
47 "Next": "ReserveInventory"
48 },
49 "ReserveInventory": {
50 "Type": "Task",
51 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:reserve-inventory",
52 "ResultPath": "$.inventory",
53 "Catch": [
54 {
55 "ErrorEquals": ["OutOfStock"],
56 "ResultPath": "$.error",
57 "Next": "RefundPayment"
58 }
59 ],
60 "Next": "CreateShipment"
61 },
62 "CreateShipment": {
63 "Type": "Task",
64 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:create-shipment",
65 "ResultPath": "$.shipment",
66 "Catch": [
67 {
68 "ErrorEquals": ["ShippingError"],
69 "ResultPath": "$.error",
70 "Next": "ReleaseInventory"
71 }
72 ],
73 "Next": "SendConfirmation"
74 },
75 "SendConfirmation": {
76 "Type": "Task",
77 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:send-confirmation",
78 "ResultPath": "$.notification",
79 "Retry": [
80 {
81 "ErrorEquals": ["States.TaskFailed"],
82 "IntervalSeconds": 10,
83 "MaxAttempts": 3
84 }
85 ],
86 "Next": "OrderSuccess"
87 },
88 "OrderSuccess": {
89 "Type": "Succeed"
90 },
91 "OrderValidationFailed": {
92 "Type": "Task",
93 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:send-notification",
94 "Parameters": {
95 "type": "order_validation_failed",
96 "order.$": "$.order",
97 "error.$": "$.error"
98 },
99 "Next": "OrderFailed"
100 },
101 "PaymentFailed": {
102 "Type": "Task",
103 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:send-notification",
104 "Parameters": {
105 "type": "payment_failed",
106 "order.$": "$.order",
107 "error.$": "$.error"
108 },
109 "Next": "OrderFailed"
110 },
111 "RefundPayment": {
112 "Type": "Task",
113 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:refund-payment",
114 "Parameters": {
115 "payment_id.$": "$.payment.id"
116 },
117 "ResultPath": "$.refund",
118 "Next": "NotifyOutOfStock"
119 },
120 "ReleaseInventory": {
121 "Type": "Task",
122 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:release-inventory",
123 "Parameters": {
124 "reservation_id.$": "$.inventory.reservation_id"
125 },
126 "ResultPath": "$.release",
127 "Next": "RefundPayment"
128 },
129 "NotifyOutOfStock": {
130 "Type": "Task",
131 "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:send-notification",
132 "Parameters": {
133 "type": "out_of_stock",
134 "order.$": "$.order"
135 },
136 "Next": "OrderFailed"
137 },
138 "OrderFailed": {
139 "Type": "Fail",
140 "Cause": "Order processing failed"
141 }
142 }
143}
1// run
2// validate/main.go - Order validation function
3package main
4
5import (
6 "context"
7 "errors"
8 "fmt"
9
10 "github.com/aws/aws-lambda-go/lambda"
11)
12
13type Order struct {
14 OrderID string `json:"order_id"`
15 CustomerID string `json:"customer_id"`
16 Items []Item `json:"items"`
17 TotalAmount float64 `json:"total_amount"`
18}
19
20type Item struct {
21 ProductID string `json:"product_id"`
22 Quantity int `json:"quantity"`
23 Price float64 `json:"price"`
24}
25
26type ValidationResult struct {
27 Valid bool `json:"valid"`
28 Errors []string `json:"errors,omitempty"`
29}
30
31func handler(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
32 // Extract order
33 orderData := input["order"].(map[string]interface{})
34
35 errors := []string{}
36
37 // Validate customer ID
38 if orderData["customer_id"] == nil || orderData["customer_id"] == "" {
39 errors = append(errors, "customer_id is required")
40 }
41
42 // Validate items
43 items, ok := orderData["items"].([]interface{})
44 if !ok || len(items) == 0 {
45 errors = append(errors, "order must have at least one item")
46 }
47
48 // Validate total amount
49 totalAmount, ok := orderData["total_amount"].(float64)
50 if !ok || totalAmount <= 0 {
51 errors = append(errors, "total_amount must be positive")
52 }
53
54 if len(errors) > 0 {
55 return nil, fmt.Errorf("ValidationError: %v", errors)
56 }
57
58 return map[string]interface{}{
59 "valid": true,
60 "order_id": orderData["order_id"],
61 }, nil
62}
63
64func main() {
65 lambda.Start(handler)
66}
1// run
2// payment/main.go - Payment processing function
3package main
4
5import (
6 "context"
7 "fmt"
8 "math/rand"
9 "time"
10
11 "github.com/aws/aws-lambda-go/lambda"
12 "github.com/google/uuid"
13)
14
15type PaymentRequest struct {
16 OrderID string `json:"order_id"`
17 CustomerID string `json:"customer_id"`
18 Amount float64 `json:"amount"`
19 PaymentMethod string `json:"payment_method"`
20}
21
22type PaymentResult struct {
23 PaymentID string `json:"id"`
24 Status string `json:"status"`
25 TransactionID string `json:"transaction_id"`
26 ProcessedAt time.Time `json:"processed_at"`
27}
28
29func handler(ctx context.Context, input map[string]interface{}) (PaymentResult, error) {
30 order := input["order"].(map[string]interface{})
31
32 // Simulate payment processing
33 time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
34
35 // Simulate 10% payment failures
36 if rand.Float64() < 0.1 {
37 return PaymentResult{}, fmt.Errorf("PaymentDeclined: Card declined")
38 }
39
40 // Process payment
41 result := PaymentResult{
42 PaymentID: uuid.New().String(),
43 Status: "completed",
44 TransactionID: fmt.Sprintf("TXN-%d", time.Now().Unix()),
45 ProcessedAt: time.Now(),
46 }
47
48 fmt.Printf("Payment processed: %+v\n", result)
49 return result, nil
50}
51
52func main() {
53 lambda.Start(handler)
54}
1// run
2// inventory/main.go - Inventory reservation function
3package main
4
5import (
6 "context"
7 "fmt"
8 "math/rand"
9 "time"
10
11 "github.com/aws/aws-lambda-go/lambda"
12 "github.com/google/uuid"
13)
14
15type InventoryReservation struct {
16 ReservationID string `json:"reservation_id"`
17 Items []ReservedItem `json:"items"`
18 ReservedAt time.Time `json:"reserved_at"`
19 ExpiresAt time.Time `json:"expires_at"`
20}
21
22type ReservedItem struct {
23 ProductID string `json:"product_id"`
24 Quantity int `json:"quantity"`
25}
26
27func handler(ctx context.Context, input map[string]interface{}) (InventoryReservation, error) {
28 order := input["order"].(map[string]interface{})
29 items := order["items"].([]interface{})
30
31 // Simulate inventory check
32 time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
33
34 // Simulate 5% out of stock
35 if rand.Float64() < 0.05 {
36 return InventoryReservation{}, fmt.Errorf("OutOfStock: One or more items out of stock")
37 }
38
39 // Reserve inventory
40 reservedItems := []ReservedItem{}
41 for _, item := range items {
42 itemMap := item.(map[string]interface{})
43 reservedItems = append(reservedItems, ReservedItem{
44 ProductID: itemMap["product_id"].(string),
45 Quantity: int(itemMap["quantity"].(float64)),
46 })
47 }
48
49 reservation := InventoryReservation{
50 ReservationID: uuid.New().String(),
51 Items: reservedItems,
52 ReservedAt: time.Now(),
53 ExpiresAt: time.Now().Add(30 * time.Minute),
54 }
55
56 fmt.Printf("Inventory reserved: %+v\n", reservation)
57 return reservation, nil
58}
59
60func main() {
61 lambda.Start(handler)
62}
1// run
2// shipment/main.go - Shipment creation function
3package main
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 "github.com/aws/aws-lambda-go/lambda"
11 "github.com/google/uuid"
12)
13
14type Shipment struct {
15 ShipmentID string `json:"shipment_id"`
16 TrackingNumber string `json:"tracking_number"`
17 Carrier string `json:"carrier"`
18 EstimatedDelivery time.Time `json:"estimated_delivery"`
19 CreatedAt time.Time `json:"created_at"`
20}
21
22func handler(ctx context.Context, input map[string]interface{}) (Shipment, error) {
23 order := input["order"].(map[string]interface{})
24 inventory := input["inventory"].(map[string]interface{})
25
26 fmt.Printf("Creating shipment for order: %v, reservation: %v\n",
27 order["order_id"], inventory["reservation_id"])
28
29 // Create shipment
30 shipment := Shipment{
31 ShipmentID: uuid.New().String(),
32 TrackingNumber: fmt.Sprintf("TRACK-%d", time.Now().Unix()),
33 Carrier: "FastShip",
34 EstimatedDelivery: time.Now().Add(72 * time.Hour),
35 CreatedAt: time.Now(),
36 }
37
38 fmt.Printf("Shipment created: %+v\n", shipment)
39 return shipment, nil
40}
41
42func main() {
43 lambda.Start(handler)
44}
1// run
2// notification/main.go - Notification function
3package main
4
5import (
6 "context"
7 "encoding/json"
8 "fmt"
9
10 "github.com/aws/aws-lambda-go/lambda"
11 "github.com/aws/aws-sdk-go-v2/aws"
12 "github.com/aws/aws-sdk-go-v2/config"
13 "github.com/aws/aws-sdk-go-v2/service/sns"
14)
15
16var (
17 snsClient *sns.Client
18 topicARN string
19)
20
21func init() {
22 cfg, _ := config.LoadDefaultConfig(context.Background())
23 snsClient = sns.NewFromConfig(cfg)
24 topicARN = "arn:aws:sns:REGION:ACCOUNT:order-notifications"
25}
26
27func handler(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
28 notificationType := input["type"].(string)
29
30 var message string
31 var subject string
32
33 switch notificationType {
34 case "order_success":
35 order := input["order"].(map[string]interface{})
36 shipment := input["shipment"].(map[string]interface{})
37 message = fmt.Sprintf("Your order %s has been shipped! Tracking: %s",
38 order["order_id"], shipment["tracking_number"])
39 subject = "Order Shipped"
40
41 case "payment_failed":
42 order := input["order"].(map[string]interface{})
43 message = fmt.Sprintf("Payment failed for order %s", order["order_id"])
44 subject = "Payment Failed"
45
46 case "out_of_stock":
47 order := input["order"].(map[string]interface{})
48 message = fmt.Sprintf("Order %s cancelled - items out of stock", order["order_id"])
49 subject = "Order Cancelled"
50
51 default:
52 message = "Order notification"
53 subject = "Order Update"
54 }
55
56 // Send SNS notification
57 messageJSON, _ := json.Marshal(input)
58 _, err := snsClient.Publish(ctx, &sns.PublishInput{
59 TopicArn: aws.String(topicARN),
60 Subject: aws.String(subject),
61 Message: aws.String(message),
62 MessageAttributes: map[string]types.MessageAttributeValue{
63 "type": {
64 DataType: aws.String("String"),
65 StringValue: aws.String(notificationType),
66 },
67 },
68 })
69
70 if err != nil {
71 return nil, err
72 }
73
74 fmt.Printf("Notification sent: %s - %s\n", subject, message)
75 return map[string]interface{}{
76 "notification_sent": true,
77 "type": notificationType,
78 }, nil
79}
80
81func main() {
82 lambda.Start(handler)
83}
Explanation:
This exercise demonstrates complex serverless orchestration with Step Functions:
- State Machine Definition: JSON DSL defining the complete workflow
- Error Handling: Catch blocks for specific errors, retry policies for transient failures
- Compensation Logic: Automatic rollback (refund payment, release inventory) on failure
- Multiple Paths: Success path and various failure paths with appropriate notifications
- Integration: Coordinates multiple Lambda functions and SNS
- Workflow State: Step Functions maintains state across long-running processes
- Observability: Complete execution history and CloudWatch integration
Key Patterns:
- Saga pattern for distributed transactions with compensation
- Retry with exponential backoff for transient failures
- Error-specific handling for different failure scenarios
- State passing between functions via Step Functions
- Parallel execution capability (can be added with Parallel state)
Real-World Benefits:
- Handles failures gracefully without data inconsistency
- Visual workflow editor for non-developers
- Automatic retries and error handling
- Built-in logging and monitoring
- Can run for up to 1 year
- Survives Lambda failures and redeployments
Production Usage: This pattern is essential for:
- Order processing systems
- ETL pipelines
- Video processing workflows
- Multi-step approval processes
- Complex business workflows
Summary
Serverless computing with Go offers excellent performance and cost efficiency:
Key Takeaways
- Fast Cold Starts: Go's compiled nature provides sub-100ms cold starts
- Multiple Platforms: AWS Lambda, Knative, OpenFaaS, Azure, GCP
- Event-Driven: Integrates with various event sources
- Cost Effective: Pay per invocation, no idle costs
- Production Ready: Retry mechanisms, monitoring, error handling
- Optimization: Binary size reduction, connection pooling, provisioned concurrency
Best Practices
- Initialize outside handler for connection reuse
- Optimize binary size with build flags
- Use environment variables for configuration
- Implement proper error handling with retry logic
- Enable monitoring with X-Ray or OpenTelemetry
- Consider provisioned concurrency for latency-critical apps
- Use appropriate event sources for your use case