Message Queues and Event Streaming

Why This Matters - The Digital Communication Revolution

Consider a bustling e-commerce platform during Black Friday sales. Thousands of customers are simultaneously placing orders, making payments, and expecting confirmation emails. If each action had to wait for the previous one to complete, the system would grind to a halt, customers would abandon their carts, and revenue would be lost.

This is where message queues become the unsung heroes of modern distributed systems. They act like a sophisticated digital postal service, ensuring that every message is delivered reliably while allowing different parts of your system to work independently, at their own pace.

Real-world impact: Companies like Netflix process billions of messages daily through their streaming platforms, Uber handles millions of ride requests per minute, and Amazon's order processing relies on intricate message routing. Without message queues, these systems would simply not exist at their current scale.

Learning Objectives

By the end of this article, you will be able to:

  1. Distinguish between message queues and event streams and choose the right tool for each scenario
  2. Implement production-ready producers and consumers for Kafka, RabbitMQ, and NATS
  3. Design reliable message patterns with proper error handling and delivery guarantees
  4. Apply serialization strategies for efficient data transfer
  5. Build resilient systems that can handle failures and maintain message integrity
  6. Monitor and troubleshoot message systems in production environments

Core Concepts - Understanding the Messaging Landscape

The Fundamental Difference: Queues vs Streams

Before diving into implementations, let's understand the crucial distinction that many developers overlook:

Message Queues are like postal mail delivery - each message has a single recipient and is deleted after delivery. They're perfect for task distribution and one-time operations.

 1// Message Queue Pattern: Point-to-point delivery
 2// Think: Send this specific order to the payment processor
 3type OrderMessage struct {
 4    OrderID    string
 5    CustomerID string
 6    Amount     float64
 7    Items      []OrderItem
 8}
 9
10// Once the payment processor handles this, it's gone forever
11// Each order is processed exactly once by one consumer

Event Streams are like newspaper subscriptions - multiple readers can see the same content, and it's preserved for future reference. They excel at audit trails and analytics.

 1// Event Stream Pattern: Publish-subscribe with retention
 2// Think: Record every price change for analytics and reporting
 3type PriceChangeEvent struct {
 4    ProductID   string
 5    OldPrice    float64
 6    NewPrice    float64
 7    ChangedAt   time.Time
 8    ChangedBy   string
 9}
10
11// Multiple services can read this independently:
12// - Analytics service tracks price trends
13// - Audit service logs all changes
14// - Notification service alerts customers of discounts

💡 Critical Insight: The choice between queues and streams isn't technical - it's architectural. Ask yourself: "Do I need to deliver this message once to one recipient, or preserve it forever for multiple readers?"

When to Use Each Pattern

Choose Message Queues when:

  • You need to distribute work among multiple workers
  • Messages represent commands to be executed
  • Order matters within a single conversation
  • You need guaranteed delivery with retry logic
  • Each message should be processed exactly once
1// Perfect for queues:
2tasks := []string{
3    "send_welcome_email",
4    "process_payment",
5    "update_inventory",
6    "generate_invoice",
7}
8// Each task executed once by an available worker

Choose Event Streams when:

  • You need to maintain a complete history of events
  • Multiple services need to react to the same events
  • You're implementing event sourcing or CQRS
  • Audit trails and replay capabilities are required
  • Real-time analytics are important
1// Perfect for streams:
2events := []string{
3    "user_registered",
4    "profile_updated",
5    "password_changed",
6    "account_deleted",
7}
8// Multiple services: analytics, notifications, audit, etc.

Practical Examples - Building Production-Ready Message Systems

Apache Kafka: The Distributed Event Log

Kafka isn't just a messaging system - it's a distributed commit log that provides the backbone for data-intensive applications. Think of it as a massive, replicated journal where every event is recorded in chronological order.

Core Kafka Architecture

 1package kafka
 2
 3import (
 4    "context"
 5    "fmt"
 6    "log"
 7    "time"
 8
 9    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
10)
11
12/*
13Kafka Architecture Visualized:
14
15Topic: orders
16├── Partition 0: [msg0, msg1, msg2, ...]  offset: 0, 1, 2, ...
17├── Partition 1: [msg3, msg4, msg5, ...]  offset: 0, 1, 2, ...
18└── Partition 2: [msg6, msg7, msg8, ...]  offset: 0, 1, 2, ...
19
20Real-world meaning:
21- Topic: A category of events
22- Partition: Parallel processing unit
23- Offset: Unique ID for each message within a partition
24- Ordering: Guaranteed within a partition
25- Parallelism: Multiple partitions enable concurrent processing
26*/

Production-Ready Kafka Producer

 1type Producer struct {
 2    producer *kafka.Producer
 3    topic    string
 4}
 5
 6func NewProducer(brokers, topic string) {
 7    config := &kafka.ConfigMap{
 8        "bootstrap.servers": brokers,
 9
10        // Performance tuning for high throughput
11        "compression.type":       "snappy",        // Compress messages to save bandwidth
12        "batch.size":             16384,           // Batch messages for efficiency
13        "linger.ms":              10,              // Wait up to 10ms to batch
14        "acks":                   "all",            // Wait for all replicas
15
16        // Reliability and durability
17        "enable.idempotence":     true,             // Prevent duplicates
18        "max.in.flight.requests.per.connection": 5,
19        "retries":                10,              // Retry on transient failures
20
21        // Monitoring and observability
22        "statistics.interval.ms": 60000,           // Emit stats every minute
23    }
24
25    producer, err := kafka.NewProducer(config)
26    if err != nil {
27        return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
28    }
29
30    return &Producer{
31        producer: producer,
32        topic:    topic,
33    }, nil
34}
35
36// ProduceEvent sends an event with proper partitioning
37func ProduceEvent(key string, event interface{}) error {
38    // Serialize event to JSON
39    data, err := json.Marshal(event)
40    if err != nil {
41        return fmt.Errorf("failed to serialize event: %w", err)
42    }
43
44    // Create message with partitioning key
45    // This ensures all events for the same entity go to the same partition
46    // Maintaining order for related events
47    message := &kafka.Message{
48        TopicPartition: kafka.TopicPartition{
49            Topic:     &p.topic,
50            Partition: kafka.PartitionAny, // Let Kafka decide based on key
51        },
52        Key:   []byte(key),             // Partitioning key
53        Value: data,                    // Event data
54        Headers: []kafka.Header{
55            {Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))},
56            {Key: "version", Value: []byte("1.0")},
57        },
58    }
59
60    // Produce asynchronously with delivery confirmation
61    deliveryChan := make(chan kafka.Event, 1)
62    err = p.producer.Produce(message, deliveryChan)
63    if err != nil {
64        return fmt.Errorf("failed to produce message: %w", err)
65    }
66
67    // Wait for delivery report
68    select {
69    case e := <-deliveryChan:
70        m := e.(*kafka.Message)
71        if m.TopicPartition.Error != nil {
72            return fmt.Errorf("message delivery failed: %w", m.TopicPartition.Error)
73        }
74        log.Printf("Event delivered to %s [partition %d] at offset %v",
75            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
76        return nil
77
78    case <-time.After(10 * time.Second):
79        return fmt.Errorf("message delivery timeout")
80    }
81}
82
83// Example: Processing user orders
84type OrderEvent struct {
85    OrderID    string    `json:"order_id"`
86    CustomerID string    `json:"customer_id"`
87    Amount     float64   `json:"amount"`
88    Timestamp  time.Time `json:"timestamp"`
89}
90
91func PublishOrder(order OrderEvent) error {
92    // Use customer_id as partitioning key
93    // This ensures all orders for a customer are processed in order
94    return p.ProduceEvent(order.CustomerID, order)
95}

Production-Ready Kafka Consumer

  1type Consumer struct {
  2    consumer   *kafka.Consumer
  3    handler    func(*kafka.Message) error
  4    maxRetries int
  5}
  6
  7func NewConsumer(brokers, groupID, topic string, handler func(*kafka.Message) error, maxRetries int) {
  8    config := &kafka.ConfigMap{
  9        "bootstrap.servers": brokers,
 10        "group.id":          groupID,
 11
 12        // Consumer behavior for reliability
 13        "auto.offset.reset":     "earliest",  // Start from beginning if no offset stored
 14        "enable.auto.commit":    false,        // Manual commit for control
 15        "session.timeout.ms":    30000,        // 30 seconds
 16        "heartbeat.interval.ms": 3000,         // 3 seconds
 17
 18        // Performance tuning
 19        "fetch.min.bytes": 1024,                // Wait for at least 1KB
 20        "fetch.wait.max.ms": 500,               // But don't wait more than 500ms
 21        "max.partition.fetch.bytes": 1048576,    // 1MB per partition max
 22    }
 23
 24    consumer, err := kafka.NewConsumer(config)
 25    if err != nil {
 26        return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
 27    }
 28
 29    // Subscribe to topic
 30    err = consumer.SubscribeTopics([]string{topic}, nil)
 31    if err != nil {
 32        consumer.Close()
 33        return nil, fmt.Errorf("failed to subscribe to topic: %w", err)
 34    }
 35
 36    return &Consumer{
 37        consumer:   consumer,
 38        handler:    handler,
 39        maxRetries: maxRetries,
 40    }, nil
 41}
 42
 43// Consume starts the message consumption loop with retry logic
 44func Consume(ctx context.Context) error {
 45    for {
 46        select {
 47        case <-ctx.Done():
 48            return ctx.Err()
 49        default:
 50            msg, err := c.consumer.ReadMessage(5 * time.Second)
 51            if err != nil {
 52                // Timeout is expected, continue loop
 53                if err.(kafka.Error).Code() == kafka.ErrTimedOut {
 54                    continue
 55                }
 56                log.Printf("Consumer error: %v", err)
 57                continue
 58            }
 59
 60            // Process message with retry logic
 61            if err := c.processMessageWithRetry(ctx, msg); err != nil {
 62                log.Printf("Failed to process message after %d retries: %v", c.maxRetries, err)
 63                // Send to dead letter queue or handle appropriately
 64                continue
 65            }
 66        }
 67    }
 68}
 69
 70// processMessageWithRetry handles message processing with exponential backoff
 71func processMessageWithRetry(ctx context.Context, msg *kafka.Message) error {
 72    var lastErr error
 73
 74    for attempt := 0; attempt <= c.maxRetries; attempt++ {
 75        if attempt > 0 {
 76            // Exponential backoff: 1s, 2s, 4s, 8s, 16s
 77            backoff := time.Duration(1<<uint(attempt-1)) * time.Second
 78            if backoff > 30*time.Second {
 79                backoff = 30 * time.Second // Cap at 30 seconds
 80            }
 81
 82            log.Printf("Retry attempt %d for message at offset %v", attempt, msg.TopicPartition.Offset)
 83            select {
 84            case <-time.After(backoff):
 85            case <-ctx.Done():
 86                return ctx.Err()
 87            }
 88        }
 89
 90        // Process the message
 91        if err := c.handler(msg); err != nil {
 92            lastErr = err
 93            log.Printf("Processing attempt %d failed: %v", attempt+1, err)
 94            continue
 95        }
 96
 97        // Success - commit the offset
 98        _, err := c.consumer.CommitMessage(msg)
 99        if err != nil {
100            return fmt.Errorf("failed to commit message: %w", err)
101        }
102
103        log.Printf("Successfully processed and committed message at offset %v", msg.TopicPartition.Offset)
104        return nil
105    }
106
107    return fmt.Errorf("message processing failed after %d attempts: %w", c.maxRetries, lastErr)
108}
109
110// Example: Processing order events
111func NewOrderProcessor() *Consumer {
112    handler := func(msg *kafka.Message) error {
113        var order OrderEvent
114        if err := json.Unmarshal(msg.Value, &order); err != nil {
115            return fmt.Errorf("failed to unmarshal order event: %w", err)
116        }
117
118        // Process the order
119        log.Printf("Processing order %s for customer %s, amount $%.2f",
120            order.OrderID, order.CustomerID, order.Amount)
121
122        // Simulate processing that might fail
123        if time.Now().UnixNano()%10 == 0 { // 10% failure rate for demo
124            return fmt.Errorf("simulated processing failure")
125        }
126
127        // Update inventory, send notifications, etc.
128        return processOrder(order)
129    }
130
131    return &Consumer{
132        handler:    handler,
133        maxRetries: 5,
134    }
135}

RabbitMQ: The Intelligent Message Router

RabbitMQ excels at sophisticated routing scenarios where you need precise control over message delivery. Think of it as an advanced postal service that can sort, prioritize, and deliver messages with incredible precision.

Exchange Patterns: The Power of Smart Routing

  1package rabbitmq
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "time"
  9
 10    amqp "github.com/rabbitmq/amqp091-go"
 11)
 12
 13type RabbitClient struct {
 14    conn    *amqp.Connection
 15    channel *amqp.Channel
 16}
 17
 18func NewRabbitClient(url string) {
 19    conn, err := amqp.Dial(url)
 20    if err != nil {
 21        return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
 22    }
 23
 24    ch, err := conn.Channel()
 25    if err != nil {
 26        conn.Close()
 27        return nil, fmt.Errorf("failed to open RabbitMQ channel: %w", err)
 28    }
 29
 30    // Set QoS to limit unacknowledged messages
 31    err = ch.Qos(
 32        10,    // prefetch count
 33        0,     // prefetch size
 34        false, // global
 35    )
 36    if err != nil {
 37        ch.Close()
 38        conn.Close()
 39        return nil, fmt.Errorf("failed to set QoS: %w", err)
 40    }
 41
 42    return &RabbitClient{
 43        conn:    conn,
 44        channel: ch,
 45    }, nil
 46}
 47
 48// Exchange patterns explained with real-world examples
 49func SetupExchangePatterns() error {
 50    /*
 51    Exchange Types Explained:
 52
 53    1. Direct Exchange: Like specific mailboxes
 54       - Message routing: exact key match
 55       - Use case: Send different types of notifications to specific handlers
 56       - Example: "error" messages go to error handling service
 57
 58    2. Fanout Exchange: Like town crier announcements
 59       - Message routing: broadcast to all queues
 60       - Use case: Notify multiple services of the same event
 61       - Example: System maintenance notifications
 62
 63    3. Topic Exchange: Like smart mail sorting
 64       - Message routing: pattern matching with wildcards
 65       - Use case: Route messages based on hierarchical categories
 66       - Example: "user.created" and "user.updated" go to user service
 67    */
 68
 69    // Direct Exchange for targeted messaging
 70    err := rc.channel.ExchangeDeclare(
 71        "notifications.direct", // exchange name
 72        "direct",              // exchange type
 73        true,                  // durable
 74        false,                 // auto-delete
 75        false,                 // internal
 76        false,                 // no-wait
 77        nil,                   // arguments
 78    )
 79    if err != nil {
 80        return fmt.Errorf("failed to declare direct exchange: %w", err)
 81    }
 82
 83    // Fanout Exchange for broadcasting
 84    err = rc.channel.ExchangeDeclare(
 85        "notifications.broadcast",
 86        "fanout",
 87        true,
 88        false,
 89        false,
 90        false,
 91        nil,
 92    )
 93    if err != nil {
 94        return fmt.Errorf("failed to declare fanout exchange: %w", err)
 95    }
 96
 97    // Topic Exchange for pattern-based routing
 98    err = rc.channel.ExchangeDeclare(
 99        "events.topic",
100        "topic",
101        true,
102        false,
103        false,
104        false,
105        nil,
106    )
107    if err != nil {
108        return fmt.Errorf("failed to declare topic exchange: %w", err)
109    }
110
111    return nil
112}
113
114// Direct Exchange Usage: Targeted Notifications
115func SendDirectNotification(routingKey string, notification Notification) error {
116    body, err := json.Marshal(notification)
117    if err != nil {
118        return fmt.Errorf("failed to marshal notification: %w", err)
119    }
120
121    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
122    defer cancel()
123
124    err = rc.channel.PublishWithContext(
125        ctx,
126        "notifications.direct", // exchange
127        routingKey,           // routing key
128        false,                // mandatory
129        false,                // immediate
130        amqp.Publishing{
131            ContentType:  "application/json",
132            DeliveryMode: amqp.Persistent, // Ensure message survives restart
133            Timestamp:    time.Now(),
134            Body:         body,
135        },
136    )
137    if err != nil {
138        return fmt.Errorf("failed to publish notification: %w", err)
139    }
140
141    log.Printf("Sent %s notification via direct exchange", routingKey)
142    return nil
143}
144
145// Topic Exchange Usage: Event-Driven Architecture
146func PublishEvent(eventType string, eventData interface{}) error {
147    // Create routing key pattern like "user.created", "order.completed", "payment.failed"
148    routingKey := fmt.Sprintf("%s.%s", eventSource(eventData), eventType)
149
150    body, err := json.Marshal(eventData)
151    if err != nil {
152        return fmt.Errorf("failed to marshal event: %w", err)
153    }
154
155    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
156    defer cancel()
157
158    err = rc.channel.PublishWithContext(
159        ctx,
160        "events.topic",
161        routingKey,
162        false,
163        false,
164        amqp.Publishing{
165            ContentType:  "application/json",
166            DeliveryMode: amqp.Persistent,
167            Timestamp:    time.Now(),
168            Body:         body,
169            Headers: map[string]interface{}{
170                "event_id":   generateEventID(),
171                "event_type": eventType,
172                "timestamp":  time.Now().Unix(),
173            },
174        },
175    )
176    if err != nil {
177        return fmt.Errorf("failed to publish event: %w", err)
178    }
179
180    log.Printf("Published event: %s", routingKey)
181    return nil
182}
183
184// Setup consumers with different routing patterns
185func SetupEventConsumers() error {
186    /*
187    Pattern Matching Examples:
188
189    user.*     -> Matches: user.created, user.updated, user.deleted
190    *.created   -> Matches: user.created, order.created, payment.created
191    order.*     -> Matches: order.created, order.updated, order.completed
192    *.payment.* -> Matches: user.payment.completed, order.payment.failed
193    */
194
195    // User service consumes all user events
196    userQueue, err := rc.setupQueueForPatterns("user.service", []string{
197        "user.*", // Match all user events
198    })
199    if err != nil {
200        return err
201    }
202
203    // Analytics service consumes all creation events
204    analyticsQueue, err := rc.setupQueueForPatterns("analytics.service", []string{
205        "*.created", // Match all creation events
206        "*.deleted", // And all deletion events
207    })
208    if err != nil {
209        return err
210    }
211
212    // Notification service consumes specific events
213    notificationQueue, err := rc.setupQueueForPatterns("notification.service", []string{
214        "order.completed",    // Order completion notifications
215        "payment.failed",     // Payment failure alerts
216        "user.created",       // Welcome emails
217    })
218    if err != nil {
219        return err
220    }
221
222    // Start consumers
223    go rc.consumeQueue(userQueue, rc.handleUserEvent)
224    go rc.consumeQueue(analyticsQueue, rc.handleAnalyticsEvent)
225    go rc.consumeQueue(notificationQueue, rc.handleNotificationEvent)
226
227    return nil
228}
229
230func setupQueueForPatterns(queueName string, patterns []string) {
231    // Declare the queue
232    queue, err := rc.channel.QueueDeclare(
233        queueName, // name
234        true,      // durable
235        false,     // delete when unused
236        false,     // exclusive
237        false,     // no-wait
238        amqp.Table{
239            "x-message-ttl":         86400000, // 24 hours TTL
240            "x-max-length":          10000,    // Max queue length
241            "x-dead-letter-exchange": "dlx",    // Dead letter exchange
242        },
243    )
244    if err != nil {
245        return amqp.Queue{}, fmt.Errorf("failed to declare queue %s: %w", queueName, err)
246    }
247
248    // Bind the queue to the topic exchange with each pattern
249    for _, pattern := range patterns {
250        err = rc.channel.QueueBind(
251            queue.Name,               // queue name
252            pattern,                  // routing key pattern
253            "events.topic",           // exchange
254            false,
255            nil,
256        )
257        if err != nil {
258            return amqp.Queue{}, fmt.Errorf("failed to bind queue %s to pattern %s: %w", queueName, pattern, err)
259        }
260    }
261
262    return queue, nil
263}

NATS: The High-Speed Messenger

NATS prioritizes simplicity and performance, making it perfect for microservices that need lightning-fast messaging without the complexity of Kafka or RabbitMQ.

Core NATS Implementation

  1package nats
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "time"
  9
 10    "github.com/nats-io/nats.go"
 11    "github.com/nats-io/nats.go/jetstream"
 12)
 13
 14type NATSClient struct {
 15    conn *nats.Conn
 16    js   jetstream.JetStream
 17}
 18
 19func NewNATSClient(url string) {
 20    // Connect with reconnection settings
 21    conn, err := nats.Connect(url,
 22        nats.MaxReconnects(10),           // Try to reconnect 10 times
 23        nats.ReconnectWait(2*time.Second), // Wait 2 seconds between reconnects
 24        nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
 25            log.Printf("NATS disconnected: %v", err)
 26        }),
 27        nats.ReconnectHandler(func(nc *nats.Conn) {
 28            log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
 29        }),
 30        nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
 31            log.Printf("NATS error: %v", err)
 32        }),
 33    )
 34    if err != nil {
 35        return nil, fmt.Errorf("failed to connect to NATS: %w", err)
 36    }
 37
 38    // Initialize JetStream for persistence
 39    js, err := jetstream.New(conn)
 40    if err != nil {
 41        conn.Close()
 42        return nil, fmt.Errorf("failed to initialize JetStream: %w", err)
 43    }
 44
 45    return &NATSClient{
 46        conn: conn,
 47        js:   js,
 48    }, nil
 49}
 50
 51// Simple pub/sub for high-speed messaging
 52func PublishEvent(subject string, event interface{}) error {
 53    data, err := json.Marshal(event)
 54    if err != nil {
 55        return fmt.Errorf("failed to marshal event: %w", err)
 56    }
 57
 58    // Simple publish - fire and forget for maximum speed
 59    err = nc.conn.Publish(subject, data)
 60    if err != nil {
 61        return fmt.Errorf("failed to publish to subject %s: %w", subject, err)
 62    }
 63
 64    return nil
 65}
 66
 67// Request-Reply pattern for synchronous communication
 68func Request(subject string, request interface{}, timeout time.Duration) {
 69    data, err := json.Marshal(request)
 70    if err != nil {
 71        return nil, fmt.Errorf("failed to marshal request: %w", err)
 72    }
 73
 74    // Send request and wait for response
 75    msg, err := nc.conn.Request(subject, data, timeout)
 76    if err != nil {
 77        return nil, fmt.Errorf("request to %s failed: %w", subject, err)
 78    }
 79
 80    var response interface{}
 81    err = json.Unmarshal(msg.Data, &response)
 82    if err != nil {
 83        return nil, fmt.Errorf("failed to unmarshal response: %w", err)
 84    }
 85
 86    return response, nil
 87}
 88
 89// Set up a service that responds to requests
 90func Respond(subject string, handler func(interface{})) error {
 91    _, err := nc.conn.Subscribe(subject, func(msg *nats.Msg) {
 92        var request interface{}
 93        if err := json.Unmarshal(msg.Data, &request); err != nil {
 94            log.Printf("Failed to unmarshal request on %s: %v", subject, err)
 95            return
 96        }
 97
 98        // Process the request
 99        response, err := handler(request)
100        if err != nil {
101            log.Printf("Handler error on %s: %v", subject, err)
102            return
103        }
104
105        // Send response
106        responseData, err := json.Marshal(response)
107        if err != nil {
108            log.Printf("Failed to marshal response on %s: %v", subject, err)
109            return
110        }
111
112        msg.Respond(responseData)
113    })
114    if err != nil {
115        return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
116    }
117
118    return nil
119}
120
121// JetStream for persistent messaging
122func CreatePersistentStream(streamName string, subjects []string) error {
123    stream, err := nc.js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
124        Name:     streamName,
125        Subjects: subjects,
126
127        // Retention policy - keep messages for 24 hours or up to 1GB
128        Retention: jetstream.LimitsPolicy,
129        MaxAge:    24 * time.Hour,
130        MaxBytes:  1024 * 1024 * 1024, // 1GB
131
132        // Storage and replication
133        Storage:   jetstream.FileStorage,
134        Replicas:  3, // Replicate across 3 nodes for HA
135    })
136    if err != nil {
137        return fmt.Errorf("failed to create stream %s: %w", streamName, err)
138    }
139
140    log.Printf("Created persistent stream: %s with config: %+v", stream.CachedInfo())
141    return nil
142}
143
144// Publish to persistent stream with acknowledgment
145func PublishPersistent(subject string, event interface{}) error {
146    data, err := json.Marshal(event)
147    if err != nil {
148        return fmt.Errorf("failed to marshal event: %w", err)
149    }
150
151    // Publish with acknowledgment for durability
152    _, err = nc.js.Publish(context.Background(), subject, data)
153    if err != nil {
154        return fmt.Errorf("failed to publish persistent message: %w", err)
155    }
156
157    return nil
158}
159
160// Create durable consumer for reliable processing
161func CreateDurableConsumer(stream, consumer string, handler func([]byte) error) error {
162    _, err := nc.js.CreateOrUpdateConsumer(context.Background(), stream, jetstream.ConsumerConfig{
163        Name:          consumer,
164        Durable:       true,
165        FilterSubject: ">", // All subjects in stream
166        AckPolicy:     jetstream.AckExplicitPolicy,
167        AckWait:       30 * time.Second,
168        MaxDeliver:    3, // Retry up to 3 times
169    })
170    if err != nil {
171        return fmt.Errorf("failed to create consumer %s: %w", consumer, err)
172    }
173
174    // Start consuming
175    _, err = nc.js.Consume(func(msg jetstream.Msg) {
176        if err := handler(msg.Data()); err != nil {
177            log.Printf("Handler error for consumer %s: %v", consumer, err)
178            msg.Nak() // Negative acknowledgment
179            return
180        }
181        msg.Ack() // Acknowledge successful processing
182    })
183    if err != nil {
184        return fmt.Errorf("failed to start consumer %s: %w", consumer, err)
185    }
186
187    return nil
188}

Common Patterns and Pitfalls - Learning from Real Experience

Pattern 1: Exactly-Once Processing with Idempotency

Problem: Network issues and retries can cause duplicate message processing, leading to data inconsistency.

Solution: Implement idempotency to ensure processing the same message multiple times produces the same result.

 1type IdempotentProcessor struct {
 2    processedCache map[string]bool
 3    mu            sync.RWMutex
 4    ttl           time.Duration
 5}
 6
 7func NewIdempotentProcessor() *IdempotentProcessor {
 8    processor := &IdempotentProcessor{
 9        processedCache: make(map[string]bool),
10        ttl:           24 * time.Hour,
11    }
12
13    // Start cleanup goroutine
14    go processor.cleanup()
15    return processor
16}
17
18func ProcessWithIdempotency(msgID string, handler func() error) error {
19    ip.mu.RLock()
20    if processed, exists := ip.processedCache[msgID]; exists && processed {
21        ip.mu.RUnlock()
22        log.Printf("Message %s already processed, skipping", msgID)
23        return nil
24    }
25    ip.mu.RUnlock()
26
27    // Acquire write lock for the actual processing
28    ip.mu.Lock()
29    defer ip.mu.Unlock()
30
31    // Double-check after acquiring write lock
32    if processed, exists := ip.processedCache[msgID]; exists && processed {
33        return nil
34    }
35
36    // Process the message
37    if err := handler(); err != nil {
38        return fmt.Errorf("processing failed for message %s: %w", msgID, err)
39    }
40
41    // Mark as processed
42    ip.processedCache[msgID] = true
43    log.Printf("Successfully processed message %s", msgID)
44    return nil
45}
46
47func cleanup() {
48    ticker := time.NewTicker(ip.ttl / 2)
49    for range ticker.C {
50        ip.mu.Lock()
51        // In production, you'd track timestamps for proper cleanup
52        if len(ip.processedCache) > 1000000 { // Simple size-based cleanup
53            // Clear old entries - implement proper time-based cleanup in production
54            ip.processedCache = make(map[string]bool)
55            log.Printf("Cleared processed messages cache")
56        }
57        ip.mu.Unlock()
58    }
59}
60
61// Usage example:
62func ProcessOrderMessage(msg *kafka.Message) error {
63    // Generate unique ID from message metadata
64    msgID := fmt.Sprintf("%s-%d-%d",
65        *msg.TopicPartition.Topic,
66        msg.TopicPartition.Partition,
67        msg.TopicPartition.Offset)
68
69    processor := NewIdempotentProcessor()
70
71    return processor.ProcessWithIdempotency(msgID, func() error {
72        var order OrderEvent
73        if err := json.Unmarshal(msg.Value, &order); err != nil {
74            return err
75        }
76
77        // Process order
78        return processOrder(order)
79    })
80}

Pattern 2: Dead Letter Queues for Failed Messages

Problem: Messages that repeatedly fail can block processing and create infinite retry loops.

Solution: Route failed messages to a dead letter queue for later analysis and manual intervention.

 1type DeadLetterHandler struct {
 2    producer *kafka.Producer
 3    dlqTopic string
 4}
 5
 6func NewDeadLetterHandler(brokers string) {
 7    producer, err := kafka.NewProducer(&kafka.ConfigMap{
 8        "bootstrap.servers": brokers,
 9        "acks":             "all",
10    })
11    if err != nil {
12        return nil, err
13    }
14
15    return &DeadLetterHandler{
16        producer: producer,
17        dlqTopic: "orders.dlq",
18    }, nil
19}
20
21func SendToDLQ(originalMsg *kafka.Message, processingErr error, retryCount int) error {
22    dlqMessage := DLQMessage{
23        OriginalMessage: originalMsg.Value,
24        Error:          processingErr.Error(),
25        RetryCount:      retryCount,
26        OriginalTopic:   *originalMsg.TopicPartition.Topic,
27        OriginalOffset:  originalMsg.TopicPartition.Offset,
28        FailedAt:        time.Now(),
29    }
30
31    dlqData, err := json.Marshal(dlqMessage)
32    if err != nil {
33        return fmt.Errorf("failed to marshal DLQ message: %w", err)
34    }
35
36    err = dlh.producer.Produce(&kafka.Message{
37        TopicPartition: kafka.TopicPartition{
38            Topic:     &dlh.dlqTopic,
39            Partition: kafka.PartitionAny,
40        },
41        Value: dlqData,
42        Headers: []kafka.Header{
43            {Key: "original_topic", Value: []byte(*originalMsg.TopicPartition.Topic)},
44            {Key: "original_offset", Value: []byte(fmt.Sprintf("%d", originalMsg.TopicPartition.Offset))},
45            {Key: "failure_reason", Value: []byte(processingErr.Error())},
46            {Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339))},
47        },
48    }, nil)
49    if err != nil {
50        return fmt.Errorf("failed to send message to DLQ: %w", err)
51    }
52
53    log.Printf("Sent message to DLQ: %v", processingErr)
54    return nil
55}
56
57type DLQMessage struct {
58    OriginalMessage []byte    `json:"original_message"`
59    Error          string    `json:"error"`
60    RetryCount     int       `json:"retry_count"`
61    OriginalTopic  string    `json:"original_topic"`
62    OriginalOffset int64     `json:"original_offset"`
63    FailedAt       time.Time `json:"failed_at"`
64}

Common Pitfalls to Avoid

Pitfall 1: Poor Partition Key Selection

 1// BAD: All messages go to same partition -> no parallelism
 2key := "constant_key"
 3
 4// BAD: Random key -> related messages scattered across partitions
 5key := fmt.Sprintf("random_%d", rand.Int())
 6
 7// GOOD: Business-meaningful key -> related messages stick together
 8key := order.CustomerID // All orders for customer stay together
 9key := deviceID         // All events from device stay together
10key = userID           // All user actions stay together

Pitfall 2: Ignoring Consumer Lag

 1// Monitor consumer lag to detect processing issues
 2func MonitorLag(ctx context.Context) {
 3    ticker := time.NewTicker(30 * time.Second)
 4    defer ticker.Stop()
 5
 6    for {
 7        select {
 8        case <-ctx.Done():
 9            return
10        case <-ticker.C:
11            lag, err := c.getConsumerLag()
12            if err != nil {
13                log.Printf("Failed to get consumer lag: %v", err)
14                continue
15            }
16
17            if lag > 10000 { // Alert if lag exceeds 10k messages
18                log.Printf("ALERT: Consumer lag is high: %d messages", lag)
19                // Trigger alerts, scale consumers, or investigate
20            }
21        }
22    }
23}

Pitfall 3: No Backpressure Handling

 1// Implement backpressure to prevent overwhelming consumers
 2func ProcessWithBackpressure(ctx context.Context, handler func(*kafka.Message) error) {
 3    semaphore := make(chan struct{}, 10) // Process max 10 messages concurrently
 4
 5    for {
 6        select {
 7        case <-ctx.Done():
 8            return
 9        default:
10            msg, err := c.consumer.ReadMessage(5 * time.Second)
11            if err != nil {
12                continue
13            }
14
15            // Acquire semaphore slot
16            semaphore <- struct{}{}
17
18            go func(m *kafka.Message) {
19                defer func() { <-semaphore }()
20
21                if err := handler(m); err == nil {
22                    c.consumer.CommitMessage(m)
23                }
24            }(msg)
25        }
26    }
27}

Integration and Mastery - Building a Complete Message-Driven System

Multi-System Integration Pattern

Here's how to combine Kafka, RabbitMQ, and NATS in a real e-commerce system:

  1type ECommerceMessageBus struct {
  2    kafkaProducer *kafka.Producer     // Event sourcing and analytics
  3    rabbitClient  *RabbitClient      // Business process orchestration
  4    natsClient    *NATSClient        // Real-time notifications
  5}
  6
  7func NewECommerceMessageBus() {
  8    // Initialize Kafka for event sourcing
  9    kafkaProducer, err := kafka.NewProducer(&kafka.ConfigMap{
 10        "bootstrap.servers": "kafka:9092",
 11        "acks":             "all",
 12    })
 13    if err != nil {
 14        return nil, fmt.Errorf("failed to initialize Kafka: %w", err)
 15    }
 16
 17    // Initialize RabbitMQ for business processes
 18    rabbitClient, err := NewRabbitClient("amqp://guest:guest@rabbitmq:5672/")
 19    if err != nil {
 20        return nil, fmt.Errorf("failed to initialize RabbitMQ: %w", err)
 21    }
 22
 23    // Initialize NATS for real-time messaging
 24    natsClient, err := NewNATSClient("nats://nats:4222")
 25    if err != nil {
 26        return nil, fmt.Errorf("failed to initialize NATS: %w", err)
 27    }
 28
 29    mb := &ECommerceMessageBus{
 30        kafkaProducer: kafkaProducer,
 31        rabbitClient:  rabbitClient,
 32        natsClient:    natsClient,
 33    }
 34
 35    // Setup message flows
 36    if err := mb.setupMessageFlows(); err != nil {
 37        return nil, fmt.Errorf("failed to setup message flows: %w", err)
 38    }
 39
 40    return mb, nil
 41}
 42
 43func setupMessageFlows() error {
 44    // Setup RabbitMQ exchanges for different notification types
 45    err := mb.rabbitClient.SetupExchangePatterns()
 46    if err != nil {
 47        return err
 48    }
 49
 50    // Setup NATS subjects for real-time updates
 51    go mb.setupRealtimeNotifications()
 52
 53    return nil
 54}
 55
 56// Process order through multiple message systems
 57func ProcessOrder(order Order) error {
 58    // 1. Publish order event to Kafka for analytics and audit
 59    orderEvent := OrderEvent{
 60        OrderID:    order.ID,
 61        CustomerID: order.CustomerID,
 62        Amount:     order.Total,
 63        Timestamp:  time.Now(),
 64        Status:     "created",
 65    }
 66
 67    // Use customer ID as partitioning key
 68    err := mb.publishKafkaEvent("order.created", order.CustomerID, orderEvent)
 69    if err != nil {
 70        return fmt.Errorf("failed to publish order event: %w", err)
 71    }
 72
 73    // 2. Send immediate notification via NATS for real-time UI updates
 74    realtimeUpdate := RealtimeOrderUpdate{
 75        OrderID: order.ID,
 76        Status:  "pending",
 77        Message: "Order received and processing",
 78    }
 79
 80    err = mb.natsClient.PublishEvent(fmt.Sprintf("orders.updates.%s", order.CustomerID), realtimeUpdate)
 81    if err != nil {
 82        log.Printf("Failed to publish real-time update: %v", err)
 83        // Non-critical, continue processing
 84    }
 85
 86    // 3. Route business process via RabbitMQ
 87    businessProcess := OrderProcess{
 88        OrderID:      order.ID,
 89        ProcessType:  "order_fulfillment",
 90        Priority:     order.getPriority(),
 91        CreatedAt:    time.Now(),
 92    }
 93
 94    err = mb.rabbitClient.SendDirectNotification("order_processing", businessProcess)
 95    if err != nil {
 96        return fmt.Errorf("failed to route business process: %w", err)
 97    }
 98
 99    return nil
100}
101
102// Handle payment completion across all systems
103func HandlePaymentCompleted(payment PaymentCompleted) error {
104    // 1. Record payment event in Kafka for audit trail
105    paymentEvent := PaymentEvent{
106        PaymentID: payment.ID,
107        OrderID:   payment.OrderID,
108        Amount:    payment.Amount,
109        Status:    "completed",
110        Timestamp: time.Now(),
111    }
112
113    err := mb.publishKafkaEvent("payment.completed", payment.OrderID, paymentEvent)
114    if err != nil {
115        return fmt.Errorf("failed to record payment event: %w", err)
116    }
117
118    // 2. Trigger order fulfillment via RabbitMQ
119    fulfillment := OrderFulfillment{
120        OrderID:    payment.OrderID,
121        PaymentID:  payment.ID,
122        Status:     "paid",
123        ProcessAt:  time.Now(),
124    }
125
126    err = mb.rabbitClient.PublishEvent("order.fulfilled", fulfillment)
127    if err != nil {
128        return fmt.Errorf("failed to trigger fulfillment: %w", err)
129    }
130
131    // 3. Send real-time confirmation to customer
132    confirmation := OrderConfirmation{
133        OrderID:  payment.OrderID,
134        Status:   "paid",
135        Message:  "Payment received, preparing your order",
136    }
137
138    customerID := payment.Order.CustomerID // Get from order lookup
139    err = mb.natsClient.PublishEvent(fmt.Sprintf("orders.confirmation.%s", customerID), confirmation)
140    if err != nil {
141        log.Printf("Failed to send real-time confirmation: %v", err)
142    }
143
144    // 4. Send email notification via RabbitMQ direct exchange
145    notification := Notification{
146        Type:      "order_confirmation",
147        Recipient: payment.Order.CustomerEmail,
148        OrderID:   payment.OrderID,
149        Amount:    payment.Amount,
150    }
151
152    err = mb.rabbitClient.SendDirectNotification("email", notification)
153    if err != nil {
154        return fmt.Errorf("failed to queue email notification: %w", err)
155    }
156
157    return nil
158}
159
160func publishKafkaEvent(eventType, key string, eventData interface{}) error {
161    data, err := json.Marshal(eventData)
162    if err != nil {
163        return err
164    }
165
166    err = mb.kafkaProducer.Produce(&kafka.Message{
167        TopicPartition: kafka.TopicPartition{
168            Topic:     func() *string { t := "events"; return &t }(),
169            Partition: kafka.PartitionAny,
170        },
171        Key:   []byte(key),
172        Value: data,
173    }, nil)
174
175    return err
176}

Monitoring and Observability

 1type MessageBusMetrics struct {
 2    // Kafka metrics
 3    kafkaMessagesProduced   int64
 4    kafkaMessagesConsumed   int64
 5    kafkaProducerErrors     int64
 6    kafkaConsumerErrors     int64
 7
 8    // RabbitMQ metrics
 9    rabbitMessagesRouted   int64
10    rabbitRoutingErrors     int64
11
12    // NATS metrics
13    natsMessagesPublished   int64
14    natsRequestsHandled    int64
15    natsErrors            int64
16
17    // Overall system health
18    totalLatency         time.Duration
19    errorRate            float64
20    throughput           float64
21}
22
23func MonitorPerformance(ctx context.Context) {
24    metrics := &MessageBusMetrics{}
25
26    ticker := time.NewTicker(10 * time.Second)
27    defer ticker.Stop()
28
29    for {
30        select {
31        case <-ctx.Done():
32            return
33        case <-ticker.C:
34            mb.collectMetrics(metrics)
35            mb.reportMetrics(metrics)
36            mb.checkHealthThresholds(metrics)
37        }
38    }
39}
40
41func collectMetrics(metrics *MessageBusMetrics) {
42    // Collect Kafka metrics
43    kafkaStats := mb.kafkaProducer.GetStats()
44    metrics.kafkaMessagesProduced = kafkaStats.MsgsSent
45    // ... collect other metrics
46
47    // Collect RabbitMQ metrics
48    // Collect NATS metrics
49}
50
51func checkHealthThresholds(metrics *MessageBusMetrics) {
52    // Alert on high error rates
53    if metrics.errorRate > 0.05 { // 5% error rate
54        log.Printf("ALERT: High error rate detected: %.2f%%", metrics.errorRate*100)
55    }
56
57    // Alert on high latency
58    if metrics.totalLatency > 5*time.Second {
59        log.Printf("ALERT: High message latency detected: %v", metrics.totalLatency)
60    }
61
62    // Alert on low throughput
63    if metrics.throughput < 100 { // Less than 100 messages/second
64        log.Printf("ALERT: Low throughput detected: %.2f msg/sec", metrics.throughput)
65    }
66}

Practice Exercises

Exercise 1: Implement Exactly-Once Processing

Build a message processor that handles duplicate messages gracefully. You'll need to:

  1. Create a message processor with idempotency tracking
  2. Implement retry logic with exponential backoff
  3. Add dead letter queue handling
  4. Test with message duplication scenarios
Show Solution
 1package exercise
 2
 3import (
 4    "context"
 5    "fmt"
 6    "log"
 7    "time"
 8)
 9
10type ExactlyOnceProcessor struct {
11    processed map[string]time.Time
12    mu        sync.RWMutex
13    maxAge    time.Duration
14}
15
16func NewExactlyOnceProcessor(maxAge time.Duration) *ExactlyOnceProcessor {
17    proc := &ExactlyOnceProcessor{
18        processed: make(map[string]time.Time),
19        maxAge:    maxAge,
20    }
21
22    // Start cleanup goroutine
23    go proc.cleanup()
24    return proc
25}
26
27func ProcessMessage(msgID string, handler func() error) error {
28    // Check if already processed
29    eop.mu.RLock()
30    if processedTime, exists := eop.processed[msgID]; exists {
31        eop.mu.RUnlock()
32        if time.Since(processedTime) < eop.maxAge {
33            log.Printf("Message %s already processed, skipping", msgID)
34            return nil
35        }
36        // Expired entry, allow processing
37    }
38    eop.mu.RUnlock()
39
40    // Acquire write lock
41    eop.mu.Lock()
42    defer eop.mu.Unlock()
43
44    // Double-check after acquiring write lock
45    if processedTime, exists := eop.processed[msgID]; exists {
46        if time.Since(processedTime) < eop.maxAge {
47            return nil
48        }
49    }
50
51    // Process the message with retries
52    return eop.processWithRetries(msgID, handler)
53}
54
55func processWithRetries(msgID string, handler func() error) error {
56    maxRetries := 3
57    baseBackoff := 100 * time.Millisecond
58
59    for attempt := 0; attempt <= maxRetries; attempt++ {
60        if attempt > 0 {
61            backoff := time.Duration(1<<uint(attempt-1)) * baseBackoff
62            if backoff > 5*time.Second {
63                backoff = 5 * time.Second
64            }
65            log.Printf("Retry attempt %d for message %s", attempt+1, msgID)
66            time.Sleep(backoff)
67        }
68
69        if err := handler(); err != nil {
70            log.Printf("Processing attempt %d failed for message %s: %v", attempt+1, msgID, err)
71            if attempt == maxRetries {
72                return fmt.Errorf("message processing failed after %d attempts: %w", maxRetries, err)
73            }
74            continue
75        }
76
77        // Mark as processed
78        eop.processed[msgID] = time.Now()
79        log.Printf("Successfully processed message %s", msgID)
80        return nil
81    }
82
83    return nil
84}
85
86func cleanup() {
87    ticker := time.NewTicker(eop.maxAge / 4)
88    for range ticker.C {
89        eop.mu.Lock()
90        now := time.Now()
91        for msgID, processedTime := range eop.processed {
92            if now.Sub(processedTime) > eop.maxAge {
93                delete(eop.processed, msgID)
94            }
95        }
96        eop.mu.Unlock()
97    }
98}

Exercise 2: Build Multi-Protocol Message Router

Create a message router that can:

  1. Route messages between Kafka, RabbitMQ, and NATS
  2. Transform messages for different target systems
  3. Handle message format conversions
  4. Provide monitoring and error handling
Show Solution
 1package exercise
 2
 3type MessageRouter struct {
 4    kafka   *KafkaClient
 5    rabbit  *RabbitClient
 6    nats    *NATSClient
 7    routes  []Route
 8    metrics *RouterMetrics
 9}
10
11type Route struct {
12    From        string          // "kafka", "rabbitmq", "nats"
13    To          string          // Target system
14    Topic       string          // Source topic/subject
15    Transform   func([]byte) // Message transformation
16    Filter      func([]byte) bool             // Message filtering
17}
18
19func NewMessageRouter(kafka *KafkaClient, rabbit *RabbitClient, nats *NATSClient) *MessageRouter {
20    return &MessageRouter{
21        kafka:  kafka,
22        rabbit: rabbit,
23        nats:   nats,
24        routes: make([]Route, 0),
25        metrics: NewRouterMetrics(),
26    }
27}
28
29func AddRoute(route Route) {
30    mr.routes = append(mr.routes, route)
31}
32
33func Start(ctx context.Context) error {
34    for _, route := range mr.routes {
35        switch route.From {
36        case "kafka":
37            go mr.startKafkaRoute(ctx, route)
38        case "rabbitmq":
39            go mr.startRabbitRoute(ctx, route)
40        case "nats":
41            go mr.startNatsRoute(ctx, route)
42        }
43    }
44    return nil
45}
46
47func routeMessage(msg []byte, route Route) error {
48    // Apply filter
49    if route.Filter != nil && !route.Filter(msg) {
50        return nil // Skip message
51    }
52
53    // Apply transformation
54    if route.Transform != nil {
55        transformed, err := route.Transform(msg)
56        if err != nil {
57            mr.metrics.recordTransformationError()
58            return fmt.Errorf("transformation failed: %w", err)
59        }
60        msg = transformed
61    }
62
63    // Route to target system
64    switch route.To {
65    case "kafka":
66        return mr.kafka.Publish(route.Topic, msg)
67    case "rabbitmq":
68        return mr.rabbit.Publish(route.Topic, msg)
69    case "nats":
70        return mr.nats.Publish(route.Topic, msg)
71    default:
72        return fmt.Errorf("unknown target system: %s", route.To)
73    }
74}

Exercise 3: Implement Circuit Breaker for Message Systems

Build a circuit breaker that protects downstream systems when message systems become unavailable.

Show Solution
 1package exercise
 2
 3type MessageCircuitBreaker struct {
 4    maxFailures   int
 5    resetTimeout  time.Duration
 6    state         CircuitState
 7    failures      int
 8    lastFailure   time.Time
 9    mu            sync.RWMutex
10}
11
12type CircuitState int
13
14const (
15    StateClosed CircuitState = iota
16    StateOpen
17    StateHalfOpen
18)
19
20func NewMessageCircuitBreaker(maxFailures int, resetTimeout time.Duration) *MessageCircuitBreaker {
21    return &MessageCircuitBreaker{
22        maxFailures:  maxFailures,
23        resetTimeout: resetTimeout,
24        state:        StateClosed,
25    }
26}
27
28func Execute(fn func() error) error {
29    if !mcb.canExecute() {
30        return fmt.Errorf("circuit breaker is open")
31    }
32
33    err := fn()
34    mcb.recordResult(err)
35    return err
36}
37
38func canExecute() bool {
39    mcb.mu.RLock()
40    defer mcb.mu.RUnlock()
41
42    switch mcb.state {
43    case StateClosed:
44        return true
45    case StateOpen:
46        if time.Since(mcb.lastFailure) > mcb.resetTimeout {
47            mcb.state = StateHalfOpen
48            return true
49        }
50        return false
51    case StateHalfOpen:
52        return true
53    default:
54        return false
55    }
56}
57
58func recordResult(err error) {
59    mcb.mu.Lock()
60    defer mcb.mu.Unlock()
61
62    if err != nil {
63        mcb.failures++
64        mcb.lastFailure = time.Now()
65
66        if mcb.failures >= mcb.maxFailures {
67            mcb.state = StateOpen
68        }
69    } else {
70        if mcb.state == StateHalfOpen {
71            mcb.state = StateClosed
72        }
73        mcb.failures = 0
74    }
75}

Further Reading

Exercise 4: Build Event Sourcing System with Kafka

Implement an event sourcing system that captures all state changes as events and provides event replay capabilities.

Requirements:

  1. Event store using Kafka with proper partitioning
  2. Event replay functionality for rebuilding state
  3. Snapshot mechanism to optimize replay
  4. Multiple projections (read models) from same event stream
Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "sync"
  9    "time"
 10
 11    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
 12)
 13
 14// Event represents a domain event in the system
 15type Event struct {
 16    ID          string                 `json:"id"`
 17    AggregateID string                 `json:"aggregate_id"`
 18    EventType   string                 `json:"event_type"`
 19    Version     int64                  `json:"version"`
 20    Timestamp   time.Time              `json:"timestamp"`
 21    Data        map[string]interface{} `json:"data"`
 22    Metadata    map[string]string      `json:"metadata"`
 23}
 24
 25// EventStore manages event persistence and retrieval
 26type EventStore struct {
 27    producer      *kafka.Producer
 28    topic         string
 29    snapshots     map[string]*Snapshot
 30    snapshotMutex sync.RWMutex
 31}
 32
 33// Snapshot represents a point-in-time state snapshot
 34type Snapshot struct {
 35    AggregateID string                 `json:"aggregate_id"`
 36    Version     int64                  `json:"version"`
 37    Timestamp   time.Time              `json:"timestamp"`
 38    State       map[string]interface{} `json:"state"`
 39}
 40
 41func NewEventStore(brokers, topic string) (*EventStore, error) {
 42    producer, err := kafka.NewProducer(&kafka.ConfigMap{
 43        "bootstrap.servers": brokers,
 44        "acks":             "all",
 45        "enable.idempotence": true,
 46        "compression.type":   "snappy",
 47    })
 48    if err != nil {
 49        return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
 50    }
 51
 52    return &EventStore{
 53        producer:  producer,
 54        topic:     topic,
 55        snapshots: make(map[string]*Snapshot),
 56    }, nil
 57}
 58
 59// AppendEvent stores a new event in the event stream
 60func (es *EventStore) AppendEvent(ctx context.Context, event *Event) error {
 61    // Set event metadata
 62    event.Timestamp = time.Now()
 63    if event.ID == "" {
 64        event.ID = generateEventID()
 65    }
 66
 67    // Serialize event
 68    eventData, err := json.Marshal(event)
 69    if err != nil {
 70        return fmt.Errorf("failed to serialize event: %w", err)
 71    }
 72
 73    // Publish to Kafka using aggregate ID as partition key
 74    // This ensures all events for same aggregate are in same partition (ordering)
 75    message := &kafka.Message{
 76        TopicPartition: kafka.TopicPartition{
 77            Topic:     &es.topic,
 78            Partition: kafka.PartitionAny,
 79        },
 80        Key:   []byte(event.AggregateID),
 81        Value: eventData,
 82        Headers: []kafka.Header{
 83            {Key: "event_type", Value: []byte(event.EventType)},
 84            {Key: "event_id", Value: []byte(event.ID)},
 85            {Key: "version", Value: []byte(fmt.Sprintf("%d", event.Version))},
 86        },
 87    }
 88
 89    deliveryChan := make(chan kafka.Event, 1)
 90    err = es.producer.Produce(message, deliveryChan)
 91    if err != nil {
 92        return fmt.Errorf("failed to produce event: %w", err)
 93    }
 94
 95    // Wait for delivery confirmation
 96    select {
 97    case e := <-deliveryChan:
 98        m := e.(*kafka.Message)
 99        if m.TopicPartition.Error != nil {
100            return fmt.Errorf("event delivery failed: %w", m.TopicPartition.Error)
101        }
102        log.Printf("Event %s appended at offset %v", event.ID, m.TopicPartition.Offset)
103        return nil
104
105    case <-time.After(10 * time.Second):
106        return fmt.Errorf("event delivery timeout")
107    }
108}
109
110// GetEvents retrieves all events for an aggregate from a specific version
111func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int64) ([]*Event, error) {
112    // Check if we have a recent snapshot
113    es.snapshotMutex.RLock()
114    snapshot, hasSnapshot := es.snapshots[aggregateID]
115    es.snapshotMutex.RUnlock()
116
117    if hasSnapshot && snapshot.Version >= fromVersion {
118        fromVersion = snapshot.Version + 1
119        log.Printf("Using snapshot at version %d for aggregate %s", snapshot.Version, aggregateID)
120    }
121
122    // Create consumer to read events
123    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
124        "bootstrap.servers": es.producer.String(),
125        "group.id":          fmt.Sprintf("replay-%s", aggregateID),
126        "auto.offset.reset": "earliest",
127    })
128    if err != nil {
129        return nil, fmt.Errorf("failed to create consumer: %w", err)
130    }
131    defer consumer.Close()
132
133    // Subscribe to topic
134    err = consumer.Subscribe(es.topic, nil)
135    if err != nil {
136        return nil, fmt.Errorf("failed to subscribe: %w", err)
137    }
138
139    events := make([]*Event, 0)
140    timeout := time.After(5 * time.Second)
141
142    for {
143        select {
144        case <-ctx.Done():
145            return events, ctx.Err()
146
147        case <-timeout:
148            return events, nil
149
150        default:
151            msg, err := consumer.ReadMessage(100 * time.Millisecond)
152            if err != nil {
153                if err.(kafka.Error).Code() == kafka.ErrTimedOut {
154                    continue
155                }
156                log.Printf("Consumer error: %v", err)
157                continue
158            }
159
160            // Parse event
161            var event Event
162            if err := json.Unmarshal(msg.Value, &event); err != nil {
163                log.Printf("Failed to unmarshal event: %v", err)
164                continue
165            }
166
167            // Filter by aggregate ID and version
168            if event.AggregateID == aggregateID && event.Version >= fromVersion {
169                events = append(events, &event)
170            }
171        }
172    }
173}
174
175// CreateSnapshot saves a snapshot of aggregate state
176func (es *EventStore) CreateSnapshot(aggregateID string, version int64, state map[string]interface{}) {
177    snapshot := &Snapshot{
178        AggregateID: aggregateID,
179        Version:     version,
180        Timestamp:   time.Now(),
181        State:       state,
182    }
183
184    es.snapshotMutex.Lock()
185    es.snapshots[aggregateID] = snapshot
186    es.snapshotMutex.Unlock()
187
188    log.Printf("Created snapshot for aggregate %s at version %d", aggregateID, version)
189}
190
191// OrderAggregate represents an order aggregate that uses event sourcing
192type OrderAggregate struct {
193    ID          string
194    CustomerID  string
195    Items       []OrderItem
196    TotalAmount float64
197    Status      string
198    Version     int64
199    events      []*Event
200}
201
202type OrderItem struct {
203    ProductID string  `json:"product_id"`
204    Quantity  int     `json:"quantity"`
205    Price     float64 `json:"price"`
206}
207
208// NewOrderAggregate creates a new order aggregate
209func NewOrderAggregate(id, customerID string) *OrderAggregate {
210    return &OrderAggregate{
211        ID:         id,
212        CustomerID: customerID,
213        Status:     "draft",
214        Items:      make([]OrderItem, 0),
215        events:     make([]*Event, 0),
216    }
217}
218
219// AddItem adds an item to the order
220func (oa *OrderAggregate) AddItem(productID string, quantity int, price float64) {
221    event := &Event{
222        AggregateID: oa.ID,
223        EventType:   "item_added",
224        Version:     oa.Version + 1,
225        Data: map[string]interface{}{
226            "product_id": productID,
227            "quantity":   quantity,
228            "price":      price,
229        },
230    }
231
232    oa.applyEvent(event)
233    oa.events = append(oa.events, event)
234}
235
236// PlaceOrder places the order
237func (oa *OrderAggregate) PlaceOrder() error {
238    if len(oa.Items) == 0 {
239        return fmt.Errorf("cannot place empty order")
240    }
241
242    if oa.Status != "draft" {
243        return fmt.Errorf("order already placed")
244    }
245
246    event := &Event{
247        AggregateID: oa.ID,
248        EventType:   "order_placed",
249        Version:     oa.Version + 1,
250        Data: map[string]interface{}{
251            "customer_id":  oa.CustomerID,
252            "total_amount": oa.TotalAmount,
253            "item_count":   len(oa.Items),
254        },
255    }
256
257    oa.applyEvent(event)
258    oa.events = append(oa.events, event)
259    return nil
260}
261
262// applyEvent applies an event to the aggregate state
263func (oa *OrderAggregate) applyEvent(event *Event) {
264    switch event.EventType {
265    case "item_added":
266        item := OrderItem{
267            ProductID: event.Data["product_id"].(string),
268            Quantity:  int(event.Data["quantity"].(float64)),
269            Price:     event.Data["price"].(float64),
270        }
271        oa.Items = append(oa.Items, item)
272        oa.TotalAmount += item.Price * float64(item.Quantity)
273        oa.Version = event.Version
274
275    case "order_placed":
276        oa.Status = "placed"
277        oa.Version = event.Version
278
279    case "order_shipped":
280        oa.Status = "shipped"
281        oa.Version = event.Version
282
283    case "order_delivered":
284        oa.Status = "delivered"
285        oa.Version = event.Version
286    }
287}
288
289// GetUncommittedEvents returns events that haven't been persisted
290func (oa *OrderAggregate) GetUncommittedEvents() []*Event {
291    return oa.events
292}
293
294// MarkEventsCommitted clears uncommitted events after persistence
295func (oa *OrderAggregate) MarkEventsCommitted() {
296    oa.events = make([]*Event, 0)
297}
298
299// ReplayEvents rebuilds aggregate state from events
300func (oa *OrderAggregate) ReplayEvents(events []*Event) {
301    for _, event := range events {
302        oa.applyEvent(event)
303    }
304}
305
306// Projection represents a read model built from events
307type Projection interface {
308    HandleEvent(event *Event) error
309    GetState() interface{}
310}
311
312// OrderSummaryProjection maintains order summaries
313type OrderSummaryProjection struct {
314    summaries map[string]*OrderSummary
315    mu        sync.RWMutex
316}
317
318type OrderSummary struct {
319    OrderID     string    `json:"order_id"`
320    CustomerID  string    `json:"customer_id"`
321    ItemCount   int       `json:"item_count"`
322    TotalAmount float64   `json:"total_amount"`
323    Status      string    `json:"status"`
324    CreatedAt   time.Time `json:"created_at"`
325    UpdatedAt   time.Time `json:"updated_at"`
326}
327
328func NewOrderSummaryProjection() *OrderSummaryProjection {
329    return &OrderSummaryProjection{
330        summaries: make(map[string]*OrderSummary),
331    }
332}
333
334func (osp *OrderSummaryProjection) HandleEvent(event *Event) error {
335    osp.mu.Lock()
336    defer osp.mu.Unlock()
337
338    summary, exists := osp.summaries[event.AggregateID]
339    if !exists {
340        summary = &OrderSummary{
341            OrderID:   event.AggregateID,
342            CreatedAt: event.Timestamp,
343        }
344        osp.summaries[event.AggregateID] = summary
345    }
346
347    summary.UpdatedAt = event.Timestamp
348
349    switch event.EventType {
350    case "item_added":
351        summary.ItemCount++
352        price := event.Data["price"].(float64)
353        quantity := event.Data["quantity"].(float64)
354        summary.TotalAmount += price * quantity
355
356    case "order_placed":
357        summary.CustomerID = event.Data["customer_id"].(string)
358        summary.Status = "placed"
359
360    case "order_shipped":
361        summary.Status = "shipped"
362
363    case "order_delivered":
364        summary.Status = "delivered"
365    }
366
367    return nil
368}
369
370func (osp *OrderSummaryProjection) GetState() interface{} {
371    osp.mu.RLock()
372    defer osp.mu.RUnlock()
373    return osp.summaries
374}
375
376func (osp *OrderSummaryProjection) GetOrderSummary(orderID string) (*OrderSummary, bool) {
377    osp.mu.RLock()
378    defer osp.mu.RUnlock()
379    summary, exists := osp.summaries[orderID]
380    return summary, exists
381}
382
383// Helper function to generate unique event IDs
384func generateEventID() string {
385    return fmt.Sprintf("evt_%d", time.Now().UnixNano())
386}
387
388// Example usage
389func ExampleEventSourcing() {
390    // Create event store
391    eventStore, err := NewEventStore("localhost:9092", "order-events")
392    if err != nil {
393        log.Fatal(err)
394    }
395
396    // Create order aggregate
397    order := NewOrderAggregate("order-001", "customer-123")
398    order.AddItem("product-1", 2, 29.99)
399    order.AddItem("product-2", 1, 49.99)
400
401    if err := order.PlaceOrder(); err != nil {
402        log.Fatal(err)
403    }
404
405    // Persist events
406    ctx := context.Background()
407    for _, event := range order.GetUncommittedEvents() {
408        if err := eventStore.AppendEvent(ctx, event); err != nil {
409            log.Printf("Failed to append event: %v", err)
410        }
411    }
412    order.MarkEventsCommitted()
413
414    // Create projection
415    projection := NewOrderSummaryProjection()
416
417    // Replay events to build projection
418    events, err := eventStore.GetEvents(ctx, "order-001", 0)
419    if err != nil {
420        log.Fatal(err)
421    }
422
423    for _, event := range events {
424        projection.HandleEvent(event)
425    }
426
427    // Query projection
428    summary, _ := projection.GetOrderSummary("order-001")
429    log.Printf("Order summary: %+v", summary)
430}

Exercise 5: Implement Saga Pattern for Distributed Transactions

Build a saga orchestrator that coordinates distributed transactions across multiple services using compensating actions.

Requirements:

  1. Saga coordinator that manages transaction steps
  2. Compensation mechanism for rollback
  3. State persistence for recovery
  4. Timeout handling and retry logic
  5. Support for both orchestration and choreography patterns
Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "sync"
  9    "time"
 10)
 11
 12// SagaStep represents a single step in a saga transaction
 13type SagaStep struct {
 14    Name           string
 15    Action         func(context.Context, interface{}) (interface{}, error)
 16    Compensation   func(context.Context, interface{}) error
 17    Timeout        time.Duration
 18}
 19
 20// SagaState represents the current state of a saga execution
 21type SagaState string
 22
 23const (
 24    SagaStateInitiated  SagaState = "initiated"
 25    SagaStateInProgress SagaState = "in_progress"
 26    SagaStateCompleted  SagaState = "completed"
 27    SagaStateFailed     SagaState = "failed"
 28    SagaStateRolledBack SagaState = "rolled_back"
 29)
 30
 31// SagaExecution tracks the execution of a saga
 32type SagaExecution struct {
 33    ID              string
 34    State           SagaState
 35    CurrentStep     int
 36    CompletedSteps  []StepResult
 37    StartTime       time.Time
 38    EndTime         time.Time
 39    Error           string
 40    mu              sync.RWMutex
 41}
 42
 43type StepResult struct {
 44    StepName  string
 45    Success   bool
 46    Result    interface{}
 47    Error     string
 48    Timestamp time.Time
 49}
 50
 51// SagaOrchestrator coordinates saga executions
 52type SagaOrchestrator struct {
 53    steps          []SagaStep
 54    executions     map[string]*SagaExecution
 55    messageClient  MessageClient
 56    stateStore     StateStore
 57    mu             sync.RWMutex
 58}
 59
 60// MessageClient interface for publishing saga events
 61type MessageClient interface {
 62    Publish(topic string, message interface{}) error
 63    Subscribe(topic string, handler func([]byte) error) error
 64}
 65
 66// StateStore interface for persisting saga state
 67type StateStore interface {
 68    Save(id string, execution *SagaExecution) error
 69    Load(id string) (*SagaExecution, error)
 70    Delete(id string) error
 71}
 72
 73func NewSagaOrchestrator(steps []SagaStep, msgClient MessageClient, stateStore StateStore) *SagaOrchestrator {
 74    return &SagaOrchestrator{
 75        steps:         steps,
 76        executions:    make(map[string]*SagaExecution),
 77        messageClient: msgClient,
 78        stateStore:    stateStore,
 79    }
 80}
 81
 82// ExecuteSaga starts a new saga execution
 83func (so *SagaOrchestrator) ExecuteSaga(ctx context.Context, sagaID string, initialData interface{}) error {
 84    // Create saga execution
 85    execution := &SagaExecution{
 86        ID:             sagaID,
 87        State:          SagaStateInitiated,
 88        CurrentStep:    0,
 89        CompletedSteps: make([]StepResult, 0),
 90        StartTime:      time.Now(),
 91    }
 92
 93    // Store execution
 94    so.mu.Lock()
 95    so.executions[sagaID] = execution
 96    so.mu.Unlock()
 97
 98    // Persist state
 99    if err := so.stateStore.Save(sagaID, execution); err != nil {
100        return fmt.Errorf("failed to persist saga state: %w", err)
101    }
102
103    // Publish saga started event
104    so.publishEvent("saga.started", map[string]interface{}{
105        "saga_id":    sagaID,
106        "start_time": execution.StartTime,
107    })
108
109    // Execute saga steps
110    return so.executeSteps(ctx, execution, initialData)
111}
112
113// executeSteps runs all saga steps sequentially
114func (so *SagaOrchestrator) executeSteps(ctx context.Context, execution *SagaExecution, data interface{}) error {
115    execution.mu.Lock()
116    execution.State = SagaStateInProgress
117    execution.mu.Unlock()
118
119    currentData := data
120
121    for i, step := range so.steps {
122        execution.mu.Lock()
123        execution.CurrentStep = i
124        execution.mu.Unlock()
125
126        // Execute step with timeout
127        stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
128        result, err := so.executeStepWithRetry(stepCtx, step, currentData)
129        cancel()
130
131        stepResult := StepResult{
132            StepName:  step.Name,
133            Success:   err == nil,
134            Result:    result,
135            Timestamp: time.Now(),
136        }
137
138        if err != nil {
139            stepResult.Error = err.Error()
140            execution.mu.Lock()
141            execution.CompletedSteps = append(execution.CompletedSteps, stepResult)
142            execution.State = SagaStateFailed
143            execution.Error = err.Error()
144            execution.EndTime = time.Now()
145            execution.mu.Unlock()
146
147            // Persist failed state
148            so.stateStore.Save(execution.ID, execution)
149
150            // Publish failure event
151            so.publishEvent("saga.step.failed", map[string]interface{}{
152                "saga_id":   execution.ID,
153                "step_name": step.Name,
154                "error":     err.Error(),
155            })
156
157            // Execute compensations for completed steps
158            log.Printf("Saga %s failed at step %s, starting rollback", execution.ID, step.Name)
159            return so.rollback(ctx, execution)
160        }
161
162        // Step succeeded
163        execution.mu.Lock()
164        execution.CompletedSteps = append(execution.CompletedSteps, stepResult)
165        execution.mu.Unlock()
166
167        // Persist state after each step
168        so.stateStore.Save(execution.ID, execution)
169
170        // Publish step completed event
171        so.publishEvent("saga.step.completed", map[string]interface{}{
172            "saga_id":   execution.ID,
173            "step_name": step.Name,
174            "step_num":  i + 1,
175            "total":     len(so.steps),
176        })
177
178        currentData = result
179    }
180
181    // All steps completed successfully
182    execution.mu.Lock()
183    execution.State = SagaStateCompleted
184    execution.EndTime = time.Now()
185    execution.mu.Unlock()
186
187    so.stateStore.Save(execution.ID, execution)
188
189    // Publish completion event
190    so.publishEvent("saga.completed", map[string]interface{}{
191        "saga_id":  execution.ID,
192        "duration": execution.EndTime.Sub(execution.StartTime).Seconds(),
193    })
194
195    log.Printf("Saga %s completed successfully in %v", execution.ID, execution.EndTime.Sub(execution.StartTime))
196    return nil
197}
198
199// executeStepWithRetry executes a step with retry logic
200func (so *SagaOrchestrator) executeStepWithRetry(ctx context.Context, step SagaStep, data interface{}) (interface{}, error) {
201    maxRetries := 3
202    baseBackoff := 100 * time.Millisecond
203
204    var lastErr error
205    for attempt := 0; attempt <= maxRetries; attempt++ {
206        if attempt > 0 {
207            backoff := time.Duration(1<<uint(attempt-1)) * baseBackoff
208            if backoff > 5*time.Second {
209                backoff = 5 * time.Second
210            }
211
212            log.Printf("Retrying step %s, attempt %d/%d", step.Name, attempt+1, maxRetries+1)
213            select {
214            case <-time.After(backoff):
215            case <-ctx.Done():
216                return nil, ctx.Err()
217            }
218        }
219
220        result, err := step.Action(ctx, data)
221        if err == nil {
222            return result, nil
223        }
224
225        lastErr = err
226        log.Printf("Step %s failed (attempt %d): %v", step.Name, attempt+1, err)
227    }
228
229    return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, maxRetries+1, lastErr)
230}
231
232// rollback executes compensation actions in reverse order
233func (so *SagaOrchestrator) rollback(ctx context.Context, execution *SagaExecution) error {
234    log.Printf("Starting rollback for saga %s", execution.ID)
235
236    execution.mu.RLock()
237    completedSteps := make([]StepResult, len(execution.CompletedSteps))
238    copy(completedSteps, execution.CompletedSteps)
239    execution.mu.RUnlock()
240
241    // Compensate in reverse order
242    for i := len(completedSteps) - 1; i >= 0; i-- {
243        stepResult := completedSteps[i]
244        if !stepResult.Success {
245            continue // Skip failed steps
246        }
247
248        step := so.steps[i]
249        if step.Compensation == nil {
250            log.Printf("No compensation defined for step %s", step.Name)
251            continue
252        }
253
254        log.Printf("Compensating step %s", step.Name)
255
256        // Execute compensation with timeout
257        compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout)
258        err := step.Compensation(compensateCtx, stepResult.Result)
259        cancel()
260
261        if err != nil {
262            log.Printf("Compensation failed for step %s: %v", step.Name, err)
263            // Continue with other compensations even if one fails
264            so.publishEvent("saga.compensation.failed", map[string]interface{}{
265                "saga_id":   execution.ID,
266                "step_name": step.Name,
267                "error":     err.Error(),
268            })
269        } else {
270            so.publishEvent("saga.compensation.completed", map[string]interface{}{
271                "saga_id":   execution.ID,
272                "step_name": step.Name,
273            })
274        }
275    }
276
277    execution.mu.Lock()
278    execution.State = SagaStateRolledBack
279    execution.EndTime = time.Now()
280    execution.mu.Unlock()
281
282    so.stateStore.Save(execution.ID, execution)
283
284    so.publishEvent("saga.rolled_back", map[string]interface{}{
285        "saga_id":  execution.ID,
286        "duration": execution.EndTime.Sub(execution.StartTime).Seconds(),
287    })
288
289    return fmt.Errorf("saga rolled back: %s", execution.Error)
290}
291
292// publishEvent publishes a saga event
293func (so *SagaOrchestrator) publishEvent(eventType string, data map[string]interface{}) {
294    event := map[string]interface{}{
295        "event_type": eventType,
296        "timestamp":  time.Now(),
297        "data":       data,
298    }
299
300    if err := so.messageClient.Publish("saga-events", event); err != nil {
301        log.Printf("Failed to publish event %s: %v", eventType, err)
302    }
303}
304
305// GetSagaStatus returns the current status of a saga
306func (so *SagaOrchestrator) GetSagaStatus(sagaID string) (*SagaExecution, error) {
307    so.mu.RLock()
308    execution, exists := so.executions[sagaID]
309    so.mu.RUnlock()
310
311    if !exists {
312        // Try loading from state store
313        return so.stateStore.Load(sagaID)
314    }
315
316    return execution, nil
317}
318
319// Example: Order fulfillment saga
320func ExampleOrderFulfillmentSaga() {
321    // Define saga steps
322    steps := []SagaStep{
323        {
324            Name:    "reserve_inventory",
325            Timeout: 5 * time.Second,
326            Action: func(ctx context.Context, data interface{}) (interface{}, error) {
327                order := data.(map[string]interface{})
328                log.Printf("Reserving inventory for order %s", order["order_id"])
329                // Simulate inventory reservation
330                return map[string]interface{}{
331                    "reservation_id": "inv-12345",
332                    "order_id":       order["order_id"],
333                }, nil
334            },
335            Compensation: func(ctx context.Context, data interface{}) error {
336                result := data.(map[string]interface{})
337                log.Printf("Releasing inventory reservation %s", result["reservation_id"])
338                // Simulate inventory release
339                return nil
340            },
341        },
342        {
343            Name:    "process_payment",
344            Timeout: 10 * time.Second,
345            Action: func(ctx context.Context, data interface{}) (interface{}, error) {
346                result := data.(map[string]interface{})
347                log.Printf("Processing payment for order %s", result["order_id"])
348                // Simulate payment processing
349                return map[string]interface{}{
350                    "payment_id": "pay-67890",
351                    "order_id":   result["order_id"],
352                }, nil
353            },
354            Compensation: func(ctx context.Context, data interface{}) error {
355                result := data.(map[string]interface{})
356                log.Printf("Refunding payment %s", result["payment_id"])
357                // Simulate payment refund
358                return nil
359            },
360        },
361        {
362            Name:    "create_shipment",
363            Timeout: 5 * time.Second,
364            Action: func(ctx context.Context, data interface{}) (interface{}, error) {
365                result := data.(map[string]interface{})
366                log.Printf("Creating shipment for order %s", result["order_id"])
367                // Simulate shipment creation
368                return map[string]interface{}{
369                    "shipment_id": "ship-11111",
370                    "order_id":    result["order_id"],
371                }, nil
372            },
373            Compensation: func(ctx context.Context, data interface{}) error {
374                result := data.(map[string]interface{})
375                log.Printf("Canceling shipment %s", result["shipment_id"])
376                // Simulate shipment cancellation
377                return nil
378            },
379        },
380        {
381            Name:    "send_confirmation",
382            Timeout: 3 * time.Second,
383            Action: func(ctx context.Context, data interface{}) (interface{}, error) {
384                result := data.(map[string]interface{})
385                log.Printf("Sending confirmation for order %s", result["order_id"])
386                // Simulate sending confirmation email
387                return result, nil
388            },
389            Compensation: nil, // No compensation needed for notification
390        },
391    }
392
393    // Create saga orchestrator (with mock implementations)
394    var msgClient MessageClient   // Implement with actual message broker
395    var stateStore StateStore     // Implement with actual persistence
396
397    orchestrator := NewSagaOrchestrator(steps, msgClient, stateStore)
398
399    // Execute saga
400    ctx := context.Background()
401    orderData := map[string]interface{}{
402        "order_id":    "order-12345",
403        "customer_id": "cust-67890",
404        "amount":      99.99,
405    }
406
407    if err := orchestrator.ExecuteSaga(ctx, "saga-order-12345", orderData); err != nil {
408        log.Printf("Saga failed: %v", err)
409    }
410}

Practice Exercises

Summary

Key Takeaways

  1. Choose the Right Tool: Kafka for event sourcing and analytics, RabbitMQ for complex routing, NATS for high-speed simplicity
  2. Exactly-Once Processing: Implement idempotency tracking to handle duplicates gracefully
  3. Error Handling: Use dead letter queues and circuit breakers for resilience
  4. Monitoring: Track latency, throughput, and error rates across all systems
  5. Testing: Test failure scenarios, not just happy paths

Production Checklist

  • Message Serialization: Use efficient formats like Protobuf for high-volume systems
  • Retry Logic: Implement exponential backoff with maximum attempts
  • Dead Letter Queues: Handle messages that repeatedly fail processing
  • Circuit Breakers: Prevent cascade failures when systems are down
  • Monitoring: Track consumer lag, error rates, and throughput
  • Security: Implement authentication and encryption for sensitive messages
  • Testing: Test failure scenarios, network partitions, and high load
  • Documentation: Document message schemas and routing patterns

When to Use Each System

Use Kafka when:

  • You need to preserve complete event history
  • Multiple consumers need to read events independently
  • High throughput is critical
  • You're implementing event sourcing or CQRS

Use RabbitMQ when:

  • You need complex routing patterns
  • Guaranteed delivery is critical
  • Message ordering within conversations matters
  • You have diverse consumer requirements

Use NATS when:

  • Simplicity and performance are priorities
  • You need sub-millisecond latency
  • Message durability isn't critical
  • You have microservices with lightweight communication needs

💡 Final Insight: The best message systems use multiple technologies for different use cases. Don't force one tool to solve all problems—understand the trade-offs and choose the right tool for each specific need.

Next Steps

  1. Practice: Build a small e-commerce system using all three message brokers
  2. Deepen Knowledge: Read the official documentation for Kafka, RabbitMQ, and NATS
  3. Production Readiness: Learn about monitoring, scaling, and operational aspects
  4. Advanced Patterns: Study CQRS, event sourcing, and saga patterns
  5. Cloud Services: Explore managed message services

Remember: Message systems are the backbone of distributed systems. Master them, and you can build systems that scale to millions of users while maintaining reliability and performance.