Why This Matters - The Digital Communication Revolution
Consider a bustling e-commerce platform during Black Friday sales. Thousands of customers are simultaneously placing orders, making payments, and expecting confirmation emails. If each action had to wait for the previous one to complete, the system would grind to a halt, customers would abandon their carts, and revenue would be lost.
This is where message queues become the unsung heroes of modern distributed systems. They act like a sophisticated digital postal service, ensuring that every message is delivered reliably while allowing different parts of your system to work independently, at their own pace.
Real-world impact: Companies like Netflix process billions of messages daily through their streaming platforms, Uber handles millions of ride requests per minute, and Amazon's order processing relies on intricate message routing. Without message queues, these systems would simply not exist at their current scale.
Learning Objectives
By the end of this article, you will be able to:
- Distinguish between message queues and event streams and choose the right tool for each scenario
- Implement production-ready producers and consumers for Kafka, RabbitMQ, and NATS
- Design reliable message patterns with proper error handling and delivery guarantees
- Apply serialization strategies for efficient data transfer
- Build resilient systems that can handle failures and maintain message integrity
- Monitor and troubleshoot message systems in production environments
Core Concepts - Understanding the Messaging Landscape
The Fundamental Difference: Queues vs Streams
Before diving into implementations, let's understand the crucial distinction that many developers overlook:
Message Queues are like postal mail delivery - each message has a single recipient and is deleted after delivery. They're perfect for task distribution and one-time operations.
1// Message Queue Pattern: Point-to-point delivery
2// Think: Send this specific order to the payment processor
3type OrderMessage struct {
4 OrderID string
5 CustomerID string
6 Amount float64
7 Items []OrderItem
8}
9
10// Once the payment processor handles this, it's gone forever
11// Each order is processed exactly once by one consumer
Event Streams are like newspaper subscriptions - multiple readers can see the same content, and it's preserved for future reference. They excel at audit trails and analytics.
1// Event Stream Pattern: Publish-subscribe with retention
2// Think: Record every price change for analytics and reporting
3type PriceChangeEvent struct {
4 ProductID string
5 OldPrice float64
6 NewPrice float64
7 ChangedAt time.Time
8 ChangedBy string
9}
10
11// Multiple services can read this independently:
12// - Analytics service tracks price trends
13// - Audit service logs all changes
14// - Notification service alerts customers of discounts
💡 Critical Insight: The choice between queues and streams isn't technical - it's architectural. Ask yourself: "Do I need to deliver this message once to one recipient, or preserve it forever for multiple readers?"
When to Use Each Pattern
Choose Message Queues when:
- You need to distribute work among multiple workers
- Messages represent commands to be executed
- Order matters within a single conversation
- You need guaranteed delivery with retry logic
- Each message should be processed exactly once
1// Perfect for queues:
2tasks := []string{
3 "send_welcome_email",
4 "process_payment",
5 "update_inventory",
6 "generate_invoice",
7}
8// Each task executed once by an available worker
Choose Event Streams when:
- You need to maintain a complete history of events
- Multiple services need to react to the same events
- You're implementing event sourcing or CQRS
- Audit trails and replay capabilities are required
- Real-time analytics are important
1// Perfect for streams:
2events := []string{
3 "user_registered",
4 "profile_updated",
5 "password_changed",
6 "account_deleted",
7}
8// Multiple services: analytics, notifications, audit, etc.
Practical Examples - Building Production-Ready Message Systems
Apache Kafka: The Distributed Event Log
Kafka isn't just a messaging system - it's a distributed commit log that provides the backbone for data-intensive applications. Think of it as a massive, replicated journal where every event is recorded in chronological order.
Core Kafka Architecture
1package kafka
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8
9 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
10)
11
12/*
13Kafka Architecture Visualized:
14
15Topic: orders
16├── Partition 0: [msg0, msg1, msg2, ...] offset: 0, 1, 2, ...
17├── Partition 1: [msg3, msg4, msg5, ...] offset: 0, 1, 2, ...
18└── Partition 2: [msg6, msg7, msg8, ...] offset: 0, 1, 2, ...
19
20Real-world meaning:
21- Topic: A category of events
22- Partition: Parallel processing unit
23- Offset: Unique ID for each message within a partition
24- Ordering: Guaranteed within a partition
25- Parallelism: Multiple partitions enable concurrent processing
26*/
Production-Ready Kafka Producer
1type Producer struct {
2 producer *kafka.Producer
3 topic string
4}
5
6func NewProducer(brokers, topic string) {
7 config := &kafka.ConfigMap{
8 "bootstrap.servers": brokers,
9
10 // Performance tuning for high throughput
11 "compression.type": "snappy", // Compress messages to save bandwidth
12 "batch.size": 16384, // Batch messages for efficiency
13 "linger.ms": 10, // Wait up to 10ms to batch
14 "acks": "all", // Wait for all replicas
15
16 // Reliability and durability
17 "enable.idempotence": true, // Prevent duplicates
18 "max.in.flight.requests.per.connection": 5,
19 "retries": 10, // Retry on transient failures
20
21 // Monitoring and observability
22 "statistics.interval.ms": 60000, // Emit stats every minute
23 }
24
25 producer, err := kafka.NewProducer(config)
26 if err != nil {
27 return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
28 }
29
30 return &Producer{
31 producer: producer,
32 topic: topic,
33 }, nil
34}
35
36// ProduceEvent sends an event with proper partitioning
37func ProduceEvent(key string, event interface{}) error {
38 // Serialize event to JSON
39 data, err := json.Marshal(event)
40 if err != nil {
41 return fmt.Errorf("failed to serialize event: %w", err)
42 }
43
44 // Create message with partitioning key
45 // This ensures all events for the same entity go to the same partition
46 // Maintaining order for related events
47 message := &kafka.Message{
48 TopicPartition: kafka.TopicPartition{
49 Topic: &p.topic,
50 Partition: kafka.PartitionAny, // Let Kafka decide based on key
51 },
52 Key: []byte(key), // Partitioning key
53 Value: data, // Event data
54 Headers: []kafka.Header{
55 {Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))},
56 {Key: "version", Value: []byte("1.0")},
57 },
58 }
59
60 // Produce asynchronously with delivery confirmation
61 deliveryChan := make(chan kafka.Event, 1)
62 err = p.producer.Produce(message, deliveryChan)
63 if err != nil {
64 return fmt.Errorf("failed to produce message: %w", err)
65 }
66
67 // Wait for delivery report
68 select {
69 case e := <-deliveryChan:
70 m := e.(*kafka.Message)
71 if m.TopicPartition.Error != nil {
72 return fmt.Errorf("message delivery failed: %w", m.TopicPartition.Error)
73 }
74 log.Printf("Event delivered to %s [partition %d] at offset %v",
75 *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
76 return nil
77
78 case <-time.After(10 * time.Second):
79 return fmt.Errorf("message delivery timeout")
80 }
81}
82
83// Example: Processing user orders
84type OrderEvent struct {
85 OrderID string `json:"order_id"`
86 CustomerID string `json:"customer_id"`
87 Amount float64 `json:"amount"`
88 Timestamp time.Time `json:"timestamp"`
89}
90
91func PublishOrder(order OrderEvent) error {
92 // Use customer_id as partitioning key
93 // This ensures all orders for a customer are processed in order
94 return p.ProduceEvent(order.CustomerID, order)
95}
Production-Ready Kafka Consumer
1type Consumer struct {
2 consumer *kafka.Consumer
3 handler func(*kafka.Message) error
4 maxRetries int
5}
6
7func NewConsumer(brokers, groupID, topic string, handler func(*kafka.Message) error, maxRetries int) {
8 config := &kafka.ConfigMap{
9 "bootstrap.servers": brokers,
10 "group.id": groupID,
11
12 // Consumer behavior for reliability
13 "auto.offset.reset": "earliest", // Start from beginning if no offset stored
14 "enable.auto.commit": false, // Manual commit for control
15 "session.timeout.ms": 30000, // 30 seconds
16 "heartbeat.interval.ms": 3000, // 3 seconds
17
18 // Performance tuning
19 "fetch.min.bytes": 1024, // Wait for at least 1KB
20 "fetch.wait.max.ms": 500, // But don't wait more than 500ms
21 "max.partition.fetch.bytes": 1048576, // 1MB per partition max
22 }
23
24 consumer, err := kafka.NewConsumer(config)
25 if err != nil {
26 return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
27 }
28
29 // Subscribe to topic
30 err = consumer.SubscribeTopics([]string{topic}, nil)
31 if err != nil {
32 consumer.Close()
33 return nil, fmt.Errorf("failed to subscribe to topic: %w", err)
34 }
35
36 return &Consumer{
37 consumer: consumer,
38 handler: handler,
39 maxRetries: maxRetries,
40 }, nil
41}
42
43// Consume starts the message consumption loop with retry logic
44func Consume(ctx context.Context) error {
45 for {
46 select {
47 case <-ctx.Done():
48 return ctx.Err()
49 default:
50 msg, err := c.consumer.ReadMessage(5 * time.Second)
51 if err != nil {
52 // Timeout is expected, continue loop
53 if err.(kafka.Error).Code() == kafka.ErrTimedOut {
54 continue
55 }
56 log.Printf("Consumer error: %v", err)
57 continue
58 }
59
60 // Process message with retry logic
61 if err := c.processMessageWithRetry(ctx, msg); err != nil {
62 log.Printf("Failed to process message after %d retries: %v", c.maxRetries, err)
63 // Send to dead letter queue or handle appropriately
64 continue
65 }
66 }
67 }
68}
69
70// processMessageWithRetry handles message processing with exponential backoff
71func processMessageWithRetry(ctx context.Context, msg *kafka.Message) error {
72 var lastErr error
73
74 for attempt := 0; attempt <= c.maxRetries; attempt++ {
75 if attempt > 0 {
76 // Exponential backoff: 1s, 2s, 4s, 8s, 16s
77 backoff := time.Duration(1<<uint(attempt-1)) * time.Second
78 if backoff > 30*time.Second {
79 backoff = 30 * time.Second // Cap at 30 seconds
80 }
81
82 log.Printf("Retry attempt %d for message at offset %v", attempt, msg.TopicPartition.Offset)
83 select {
84 case <-time.After(backoff):
85 case <-ctx.Done():
86 return ctx.Err()
87 }
88 }
89
90 // Process the message
91 if err := c.handler(msg); err != nil {
92 lastErr = err
93 log.Printf("Processing attempt %d failed: %v", attempt+1, err)
94 continue
95 }
96
97 // Success - commit the offset
98 _, err := c.consumer.CommitMessage(msg)
99 if err != nil {
100 return fmt.Errorf("failed to commit message: %w", err)
101 }
102
103 log.Printf("Successfully processed and committed message at offset %v", msg.TopicPartition.Offset)
104 return nil
105 }
106
107 return fmt.Errorf("message processing failed after %d attempts: %w", c.maxRetries, lastErr)
108}
109
110// Example: Processing order events
111func NewOrderProcessor() *Consumer {
112 handler := func(msg *kafka.Message) error {
113 var order OrderEvent
114 if err := json.Unmarshal(msg.Value, &order); err != nil {
115 return fmt.Errorf("failed to unmarshal order event: %w", err)
116 }
117
118 // Process the order
119 log.Printf("Processing order %s for customer %s, amount $%.2f",
120 order.OrderID, order.CustomerID, order.Amount)
121
122 // Simulate processing that might fail
123 if time.Now().UnixNano()%10 == 0 { // 10% failure rate for demo
124 return fmt.Errorf("simulated processing failure")
125 }
126
127 // Update inventory, send notifications, etc.
128 return processOrder(order)
129 }
130
131 return &Consumer{
132 handler: handler,
133 maxRetries: 5,
134 }
135}
RabbitMQ: The Intelligent Message Router
RabbitMQ excels at sophisticated routing scenarios where you need precise control over message delivery. Think of it as an advanced postal service that can sort, prioritize, and deliver messages with incredible precision.
Exchange Patterns: The Power of Smart Routing
1package rabbitmq
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 amqp "github.com/rabbitmq/amqp091-go"
11)
12
13type RabbitClient struct {
14 conn *amqp.Connection
15 channel *amqp.Channel
16}
17
18func NewRabbitClient(url string) {
19 conn, err := amqp.Dial(url)
20 if err != nil {
21 return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
22 }
23
24 ch, err := conn.Channel()
25 if err != nil {
26 conn.Close()
27 return nil, fmt.Errorf("failed to open RabbitMQ channel: %w", err)
28 }
29
30 // Set QoS to limit unacknowledged messages
31 err = ch.Qos(
32 10, // prefetch count
33 0, // prefetch size
34 false, // global
35 )
36 if err != nil {
37 ch.Close()
38 conn.Close()
39 return nil, fmt.Errorf("failed to set QoS: %w", err)
40 }
41
42 return &RabbitClient{
43 conn: conn,
44 channel: ch,
45 }, nil
46}
47
48// Exchange patterns explained with real-world examples
49func SetupExchangePatterns() error {
50 /*
51 Exchange Types Explained:
52
53 1. Direct Exchange: Like specific mailboxes
54 - Message routing: exact key match
55 - Use case: Send different types of notifications to specific handlers
56 - Example: "error" messages go to error handling service
57
58 2. Fanout Exchange: Like town crier announcements
59 - Message routing: broadcast to all queues
60 - Use case: Notify multiple services of the same event
61 - Example: System maintenance notifications
62
63 3. Topic Exchange: Like smart mail sorting
64 - Message routing: pattern matching with wildcards
65 - Use case: Route messages based on hierarchical categories
66 - Example: "user.created" and "user.updated" go to user service
67 */
68
69 // Direct Exchange for targeted messaging
70 err := rc.channel.ExchangeDeclare(
71 "notifications.direct", // exchange name
72 "direct", // exchange type
73 true, // durable
74 false, // auto-delete
75 false, // internal
76 false, // no-wait
77 nil, // arguments
78 )
79 if err != nil {
80 return fmt.Errorf("failed to declare direct exchange: %w", err)
81 }
82
83 // Fanout Exchange for broadcasting
84 err = rc.channel.ExchangeDeclare(
85 "notifications.broadcast",
86 "fanout",
87 true,
88 false,
89 false,
90 false,
91 nil,
92 )
93 if err != nil {
94 return fmt.Errorf("failed to declare fanout exchange: %w", err)
95 }
96
97 // Topic Exchange for pattern-based routing
98 err = rc.channel.ExchangeDeclare(
99 "events.topic",
100 "topic",
101 true,
102 false,
103 false,
104 false,
105 nil,
106 )
107 if err != nil {
108 return fmt.Errorf("failed to declare topic exchange: %w", err)
109 }
110
111 return nil
112}
113
114// Direct Exchange Usage: Targeted Notifications
115func SendDirectNotification(routingKey string, notification Notification) error {
116 body, err := json.Marshal(notification)
117 if err != nil {
118 return fmt.Errorf("failed to marshal notification: %w", err)
119 }
120
121 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
122 defer cancel()
123
124 err = rc.channel.PublishWithContext(
125 ctx,
126 "notifications.direct", // exchange
127 routingKey, // routing key
128 false, // mandatory
129 false, // immediate
130 amqp.Publishing{
131 ContentType: "application/json",
132 DeliveryMode: amqp.Persistent, // Ensure message survives restart
133 Timestamp: time.Now(),
134 Body: body,
135 },
136 )
137 if err != nil {
138 return fmt.Errorf("failed to publish notification: %w", err)
139 }
140
141 log.Printf("Sent %s notification via direct exchange", routingKey)
142 return nil
143}
144
145// Topic Exchange Usage: Event-Driven Architecture
146func PublishEvent(eventType string, eventData interface{}) error {
147 // Create routing key pattern like "user.created", "order.completed", "payment.failed"
148 routingKey := fmt.Sprintf("%s.%s", eventSource(eventData), eventType)
149
150 body, err := json.Marshal(eventData)
151 if err != nil {
152 return fmt.Errorf("failed to marshal event: %w", err)
153 }
154
155 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
156 defer cancel()
157
158 err = rc.channel.PublishWithContext(
159 ctx,
160 "events.topic",
161 routingKey,
162 false,
163 false,
164 amqp.Publishing{
165 ContentType: "application/json",
166 DeliveryMode: amqp.Persistent,
167 Timestamp: time.Now(),
168 Body: body,
169 Headers: map[string]interface{}{
170 "event_id": generateEventID(),
171 "event_type": eventType,
172 "timestamp": time.Now().Unix(),
173 },
174 },
175 )
176 if err != nil {
177 return fmt.Errorf("failed to publish event: %w", err)
178 }
179
180 log.Printf("Published event: %s", routingKey)
181 return nil
182}
183
184// Setup consumers with different routing patterns
185func SetupEventConsumers() error {
186 /*
187 Pattern Matching Examples:
188
189 user.* -> Matches: user.created, user.updated, user.deleted
190 *.created -> Matches: user.created, order.created, payment.created
191 order.* -> Matches: order.created, order.updated, order.completed
192 *.payment.* -> Matches: user.payment.completed, order.payment.failed
193 */
194
195 // User service consumes all user events
196 userQueue, err := rc.setupQueueForPatterns("user.service", []string{
197 "user.*", // Match all user events
198 })
199 if err != nil {
200 return err
201 }
202
203 // Analytics service consumes all creation events
204 analyticsQueue, err := rc.setupQueueForPatterns("analytics.service", []string{
205 "*.created", // Match all creation events
206 "*.deleted", // And all deletion events
207 })
208 if err != nil {
209 return err
210 }
211
212 // Notification service consumes specific events
213 notificationQueue, err := rc.setupQueueForPatterns("notification.service", []string{
214 "order.completed", // Order completion notifications
215 "payment.failed", // Payment failure alerts
216 "user.created", // Welcome emails
217 })
218 if err != nil {
219 return err
220 }
221
222 // Start consumers
223 go rc.consumeQueue(userQueue, rc.handleUserEvent)
224 go rc.consumeQueue(analyticsQueue, rc.handleAnalyticsEvent)
225 go rc.consumeQueue(notificationQueue, rc.handleNotificationEvent)
226
227 return nil
228}
229
230func setupQueueForPatterns(queueName string, patterns []string) {
231 // Declare the queue
232 queue, err := rc.channel.QueueDeclare(
233 queueName, // name
234 true, // durable
235 false, // delete when unused
236 false, // exclusive
237 false, // no-wait
238 amqp.Table{
239 "x-message-ttl": 86400000, // 24 hours TTL
240 "x-max-length": 10000, // Max queue length
241 "x-dead-letter-exchange": "dlx", // Dead letter exchange
242 },
243 )
244 if err != nil {
245 return amqp.Queue{}, fmt.Errorf("failed to declare queue %s: %w", queueName, err)
246 }
247
248 // Bind the queue to the topic exchange with each pattern
249 for _, pattern := range patterns {
250 err = rc.channel.QueueBind(
251 queue.Name, // queue name
252 pattern, // routing key pattern
253 "events.topic", // exchange
254 false,
255 nil,
256 )
257 if err != nil {
258 return amqp.Queue{}, fmt.Errorf("failed to bind queue %s to pattern %s: %w", queueName, pattern, err)
259 }
260 }
261
262 return queue, nil
263}
NATS: The High-Speed Messenger
NATS prioritizes simplicity and performance, making it perfect for microservices that need lightning-fast messaging without the complexity of Kafka or RabbitMQ.
Core NATS Implementation
1package nats
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/nats-io/nats.go"
11 "github.com/nats-io/nats.go/jetstream"
12)
13
14type NATSClient struct {
15 conn *nats.Conn
16 js jetstream.JetStream
17}
18
19func NewNATSClient(url string) {
20 // Connect with reconnection settings
21 conn, err := nats.Connect(url,
22 nats.MaxReconnects(10), // Try to reconnect 10 times
23 nats.ReconnectWait(2*time.Second), // Wait 2 seconds between reconnects
24 nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
25 log.Printf("NATS disconnected: %v", err)
26 }),
27 nats.ReconnectHandler(func(nc *nats.Conn) {
28 log.Printf("NATS reconnected to %s", nc.ConnectedUrl())
29 }),
30 nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
31 log.Printf("NATS error: %v", err)
32 }),
33 )
34 if err != nil {
35 return nil, fmt.Errorf("failed to connect to NATS: %w", err)
36 }
37
38 // Initialize JetStream for persistence
39 js, err := jetstream.New(conn)
40 if err != nil {
41 conn.Close()
42 return nil, fmt.Errorf("failed to initialize JetStream: %w", err)
43 }
44
45 return &NATSClient{
46 conn: conn,
47 js: js,
48 }, nil
49}
50
51// Simple pub/sub for high-speed messaging
52func PublishEvent(subject string, event interface{}) error {
53 data, err := json.Marshal(event)
54 if err != nil {
55 return fmt.Errorf("failed to marshal event: %w", err)
56 }
57
58 // Simple publish - fire and forget for maximum speed
59 err = nc.conn.Publish(subject, data)
60 if err != nil {
61 return fmt.Errorf("failed to publish to subject %s: %w", subject, err)
62 }
63
64 return nil
65}
66
67// Request-Reply pattern for synchronous communication
68func Request(subject string, request interface{}, timeout time.Duration) {
69 data, err := json.Marshal(request)
70 if err != nil {
71 return nil, fmt.Errorf("failed to marshal request: %w", err)
72 }
73
74 // Send request and wait for response
75 msg, err := nc.conn.Request(subject, data, timeout)
76 if err != nil {
77 return nil, fmt.Errorf("request to %s failed: %w", subject, err)
78 }
79
80 var response interface{}
81 err = json.Unmarshal(msg.Data, &response)
82 if err != nil {
83 return nil, fmt.Errorf("failed to unmarshal response: %w", err)
84 }
85
86 return response, nil
87}
88
89// Set up a service that responds to requests
90func Respond(subject string, handler func(interface{})) error {
91 _, err := nc.conn.Subscribe(subject, func(msg *nats.Msg) {
92 var request interface{}
93 if err := json.Unmarshal(msg.Data, &request); err != nil {
94 log.Printf("Failed to unmarshal request on %s: %v", subject, err)
95 return
96 }
97
98 // Process the request
99 response, err := handler(request)
100 if err != nil {
101 log.Printf("Handler error on %s: %v", subject, err)
102 return
103 }
104
105 // Send response
106 responseData, err := json.Marshal(response)
107 if err != nil {
108 log.Printf("Failed to marshal response on %s: %v", subject, err)
109 return
110 }
111
112 msg.Respond(responseData)
113 })
114 if err != nil {
115 return fmt.Errorf("failed to subscribe to %s: %w", subject, err)
116 }
117
118 return nil
119}
120
121// JetStream for persistent messaging
122func CreatePersistentStream(streamName string, subjects []string) error {
123 stream, err := nc.js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
124 Name: streamName,
125 Subjects: subjects,
126
127 // Retention policy - keep messages for 24 hours or up to 1GB
128 Retention: jetstream.LimitsPolicy,
129 MaxAge: 24 * time.Hour,
130 MaxBytes: 1024 * 1024 * 1024, // 1GB
131
132 // Storage and replication
133 Storage: jetstream.FileStorage,
134 Replicas: 3, // Replicate across 3 nodes for HA
135 })
136 if err != nil {
137 return fmt.Errorf("failed to create stream %s: %w", streamName, err)
138 }
139
140 log.Printf("Created persistent stream: %s with config: %+v", stream.CachedInfo())
141 return nil
142}
143
144// Publish to persistent stream with acknowledgment
145func PublishPersistent(subject string, event interface{}) error {
146 data, err := json.Marshal(event)
147 if err != nil {
148 return fmt.Errorf("failed to marshal event: %w", err)
149 }
150
151 // Publish with acknowledgment for durability
152 _, err = nc.js.Publish(context.Background(), subject, data)
153 if err != nil {
154 return fmt.Errorf("failed to publish persistent message: %w", err)
155 }
156
157 return nil
158}
159
160// Create durable consumer for reliable processing
161func CreateDurableConsumer(stream, consumer string, handler func([]byte) error) error {
162 _, err := nc.js.CreateOrUpdateConsumer(context.Background(), stream, jetstream.ConsumerConfig{
163 Name: consumer,
164 Durable: true,
165 FilterSubject: ">", // All subjects in stream
166 AckPolicy: jetstream.AckExplicitPolicy,
167 AckWait: 30 * time.Second,
168 MaxDeliver: 3, // Retry up to 3 times
169 })
170 if err != nil {
171 return fmt.Errorf("failed to create consumer %s: %w", consumer, err)
172 }
173
174 // Start consuming
175 _, err = nc.js.Consume(func(msg jetstream.Msg) {
176 if err := handler(msg.Data()); err != nil {
177 log.Printf("Handler error for consumer %s: %v", consumer, err)
178 msg.Nak() // Negative acknowledgment
179 return
180 }
181 msg.Ack() // Acknowledge successful processing
182 })
183 if err != nil {
184 return fmt.Errorf("failed to start consumer %s: %w", consumer, err)
185 }
186
187 return nil
188}
Common Patterns and Pitfalls - Learning from Real Experience
Pattern 1: Exactly-Once Processing with Idempotency
Problem: Network issues and retries can cause duplicate message processing, leading to data inconsistency.
Solution: Implement idempotency to ensure processing the same message multiple times produces the same result.
1type IdempotentProcessor struct {
2 processedCache map[string]bool
3 mu sync.RWMutex
4 ttl time.Duration
5}
6
7func NewIdempotentProcessor() *IdempotentProcessor {
8 processor := &IdempotentProcessor{
9 processedCache: make(map[string]bool),
10 ttl: 24 * time.Hour,
11 }
12
13 // Start cleanup goroutine
14 go processor.cleanup()
15 return processor
16}
17
18func ProcessWithIdempotency(msgID string, handler func() error) error {
19 ip.mu.RLock()
20 if processed, exists := ip.processedCache[msgID]; exists && processed {
21 ip.mu.RUnlock()
22 log.Printf("Message %s already processed, skipping", msgID)
23 return nil
24 }
25 ip.mu.RUnlock()
26
27 // Acquire write lock for the actual processing
28 ip.mu.Lock()
29 defer ip.mu.Unlock()
30
31 // Double-check after acquiring write lock
32 if processed, exists := ip.processedCache[msgID]; exists && processed {
33 return nil
34 }
35
36 // Process the message
37 if err := handler(); err != nil {
38 return fmt.Errorf("processing failed for message %s: %w", msgID, err)
39 }
40
41 // Mark as processed
42 ip.processedCache[msgID] = true
43 log.Printf("Successfully processed message %s", msgID)
44 return nil
45}
46
47func cleanup() {
48 ticker := time.NewTicker(ip.ttl / 2)
49 for range ticker.C {
50 ip.mu.Lock()
51 // In production, you'd track timestamps for proper cleanup
52 if len(ip.processedCache) > 1000000 { // Simple size-based cleanup
53 // Clear old entries - implement proper time-based cleanup in production
54 ip.processedCache = make(map[string]bool)
55 log.Printf("Cleared processed messages cache")
56 }
57 ip.mu.Unlock()
58 }
59}
60
61// Usage example:
62func ProcessOrderMessage(msg *kafka.Message) error {
63 // Generate unique ID from message metadata
64 msgID := fmt.Sprintf("%s-%d-%d",
65 *msg.TopicPartition.Topic,
66 msg.TopicPartition.Partition,
67 msg.TopicPartition.Offset)
68
69 processor := NewIdempotentProcessor()
70
71 return processor.ProcessWithIdempotency(msgID, func() error {
72 var order OrderEvent
73 if err := json.Unmarshal(msg.Value, &order); err != nil {
74 return err
75 }
76
77 // Process order
78 return processOrder(order)
79 })
80}
Pattern 2: Dead Letter Queues for Failed Messages
Problem: Messages that repeatedly fail can block processing and create infinite retry loops.
Solution: Route failed messages to a dead letter queue for later analysis and manual intervention.
1type DeadLetterHandler struct {
2 producer *kafka.Producer
3 dlqTopic string
4}
5
6func NewDeadLetterHandler(brokers string) {
7 producer, err := kafka.NewProducer(&kafka.ConfigMap{
8 "bootstrap.servers": brokers,
9 "acks": "all",
10 })
11 if err != nil {
12 return nil, err
13 }
14
15 return &DeadLetterHandler{
16 producer: producer,
17 dlqTopic: "orders.dlq",
18 }, nil
19}
20
21func SendToDLQ(originalMsg *kafka.Message, processingErr error, retryCount int) error {
22 dlqMessage := DLQMessage{
23 OriginalMessage: originalMsg.Value,
24 Error: processingErr.Error(),
25 RetryCount: retryCount,
26 OriginalTopic: *originalMsg.TopicPartition.Topic,
27 OriginalOffset: originalMsg.TopicPartition.Offset,
28 FailedAt: time.Now(),
29 }
30
31 dlqData, err := json.Marshal(dlqMessage)
32 if err != nil {
33 return fmt.Errorf("failed to marshal DLQ message: %w", err)
34 }
35
36 err = dlh.producer.Produce(&kafka.Message{
37 TopicPartition: kafka.TopicPartition{
38 Topic: &dlh.dlqTopic,
39 Partition: kafka.PartitionAny,
40 },
41 Value: dlqData,
42 Headers: []kafka.Header{
43 {Key: "original_topic", Value: []byte(*originalMsg.TopicPartition.Topic)},
44 {Key: "original_offset", Value: []byte(fmt.Sprintf("%d", originalMsg.TopicPartition.Offset))},
45 {Key: "failure_reason", Value: []byte(processingErr.Error())},
46 {Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339))},
47 },
48 }, nil)
49 if err != nil {
50 return fmt.Errorf("failed to send message to DLQ: %w", err)
51 }
52
53 log.Printf("Sent message to DLQ: %v", processingErr)
54 return nil
55}
56
57type DLQMessage struct {
58 OriginalMessage []byte `json:"original_message"`
59 Error string `json:"error"`
60 RetryCount int `json:"retry_count"`
61 OriginalTopic string `json:"original_topic"`
62 OriginalOffset int64 `json:"original_offset"`
63 FailedAt time.Time `json:"failed_at"`
64}
Common Pitfalls to Avoid
Pitfall 1: Poor Partition Key Selection
1// BAD: All messages go to same partition -> no parallelism
2key := "constant_key"
3
4// BAD: Random key -> related messages scattered across partitions
5key := fmt.Sprintf("random_%d", rand.Int())
6
7// GOOD: Business-meaningful key -> related messages stick together
8key := order.CustomerID // All orders for customer stay together
9key := deviceID // All events from device stay together
10key = userID // All user actions stay together
Pitfall 2: Ignoring Consumer Lag
1// Monitor consumer lag to detect processing issues
2func MonitorLag(ctx context.Context) {
3 ticker := time.NewTicker(30 * time.Second)
4 defer ticker.Stop()
5
6 for {
7 select {
8 case <-ctx.Done():
9 return
10 case <-ticker.C:
11 lag, err := c.getConsumerLag()
12 if err != nil {
13 log.Printf("Failed to get consumer lag: %v", err)
14 continue
15 }
16
17 if lag > 10000 { // Alert if lag exceeds 10k messages
18 log.Printf("ALERT: Consumer lag is high: %d messages", lag)
19 // Trigger alerts, scale consumers, or investigate
20 }
21 }
22 }
23}
Pitfall 3: No Backpressure Handling
1// Implement backpressure to prevent overwhelming consumers
2func ProcessWithBackpressure(ctx context.Context, handler func(*kafka.Message) error) {
3 semaphore := make(chan struct{}, 10) // Process max 10 messages concurrently
4
5 for {
6 select {
7 case <-ctx.Done():
8 return
9 default:
10 msg, err := c.consumer.ReadMessage(5 * time.Second)
11 if err != nil {
12 continue
13 }
14
15 // Acquire semaphore slot
16 semaphore <- struct{}{}
17
18 go func(m *kafka.Message) {
19 defer func() { <-semaphore }()
20
21 if err := handler(m); err == nil {
22 c.consumer.CommitMessage(m)
23 }
24 }(msg)
25 }
26 }
27}
Integration and Mastery - Building a Complete Message-Driven System
Multi-System Integration Pattern
Here's how to combine Kafka, RabbitMQ, and NATS in a real e-commerce system:
1type ECommerceMessageBus struct {
2 kafkaProducer *kafka.Producer // Event sourcing and analytics
3 rabbitClient *RabbitClient // Business process orchestration
4 natsClient *NATSClient // Real-time notifications
5}
6
7func NewECommerceMessageBus() {
8 // Initialize Kafka for event sourcing
9 kafkaProducer, err := kafka.NewProducer(&kafka.ConfigMap{
10 "bootstrap.servers": "kafka:9092",
11 "acks": "all",
12 })
13 if err != nil {
14 return nil, fmt.Errorf("failed to initialize Kafka: %w", err)
15 }
16
17 // Initialize RabbitMQ for business processes
18 rabbitClient, err := NewRabbitClient("amqp://guest:guest@rabbitmq:5672/")
19 if err != nil {
20 return nil, fmt.Errorf("failed to initialize RabbitMQ: %w", err)
21 }
22
23 // Initialize NATS for real-time messaging
24 natsClient, err := NewNATSClient("nats://nats:4222")
25 if err != nil {
26 return nil, fmt.Errorf("failed to initialize NATS: %w", err)
27 }
28
29 mb := &ECommerceMessageBus{
30 kafkaProducer: kafkaProducer,
31 rabbitClient: rabbitClient,
32 natsClient: natsClient,
33 }
34
35 // Setup message flows
36 if err := mb.setupMessageFlows(); err != nil {
37 return nil, fmt.Errorf("failed to setup message flows: %w", err)
38 }
39
40 return mb, nil
41}
42
43func setupMessageFlows() error {
44 // Setup RabbitMQ exchanges for different notification types
45 err := mb.rabbitClient.SetupExchangePatterns()
46 if err != nil {
47 return err
48 }
49
50 // Setup NATS subjects for real-time updates
51 go mb.setupRealtimeNotifications()
52
53 return nil
54}
55
56// Process order through multiple message systems
57func ProcessOrder(order Order) error {
58 // 1. Publish order event to Kafka for analytics and audit
59 orderEvent := OrderEvent{
60 OrderID: order.ID,
61 CustomerID: order.CustomerID,
62 Amount: order.Total,
63 Timestamp: time.Now(),
64 Status: "created",
65 }
66
67 // Use customer ID as partitioning key
68 err := mb.publishKafkaEvent("order.created", order.CustomerID, orderEvent)
69 if err != nil {
70 return fmt.Errorf("failed to publish order event: %w", err)
71 }
72
73 // 2. Send immediate notification via NATS for real-time UI updates
74 realtimeUpdate := RealtimeOrderUpdate{
75 OrderID: order.ID,
76 Status: "pending",
77 Message: "Order received and processing",
78 }
79
80 err = mb.natsClient.PublishEvent(fmt.Sprintf("orders.updates.%s", order.CustomerID), realtimeUpdate)
81 if err != nil {
82 log.Printf("Failed to publish real-time update: %v", err)
83 // Non-critical, continue processing
84 }
85
86 // 3. Route business process via RabbitMQ
87 businessProcess := OrderProcess{
88 OrderID: order.ID,
89 ProcessType: "order_fulfillment",
90 Priority: order.getPriority(),
91 CreatedAt: time.Now(),
92 }
93
94 err = mb.rabbitClient.SendDirectNotification("order_processing", businessProcess)
95 if err != nil {
96 return fmt.Errorf("failed to route business process: %w", err)
97 }
98
99 return nil
100}
101
102// Handle payment completion across all systems
103func HandlePaymentCompleted(payment PaymentCompleted) error {
104 // 1. Record payment event in Kafka for audit trail
105 paymentEvent := PaymentEvent{
106 PaymentID: payment.ID,
107 OrderID: payment.OrderID,
108 Amount: payment.Amount,
109 Status: "completed",
110 Timestamp: time.Now(),
111 }
112
113 err := mb.publishKafkaEvent("payment.completed", payment.OrderID, paymentEvent)
114 if err != nil {
115 return fmt.Errorf("failed to record payment event: %w", err)
116 }
117
118 // 2. Trigger order fulfillment via RabbitMQ
119 fulfillment := OrderFulfillment{
120 OrderID: payment.OrderID,
121 PaymentID: payment.ID,
122 Status: "paid",
123 ProcessAt: time.Now(),
124 }
125
126 err = mb.rabbitClient.PublishEvent("order.fulfilled", fulfillment)
127 if err != nil {
128 return fmt.Errorf("failed to trigger fulfillment: %w", err)
129 }
130
131 // 3. Send real-time confirmation to customer
132 confirmation := OrderConfirmation{
133 OrderID: payment.OrderID,
134 Status: "paid",
135 Message: "Payment received, preparing your order",
136 }
137
138 customerID := payment.Order.CustomerID // Get from order lookup
139 err = mb.natsClient.PublishEvent(fmt.Sprintf("orders.confirmation.%s", customerID), confirmation)
140 if err != nil {
141 log.Printf("Failed to send real-time confirmation: %v", err)
142 }
143
144 // 4. Send email notification via RabbitMQ direct exchange
145 notification := Notification{
146 Type: "order_confirmation",
147 Recipient: payment.Order.CustomerEmail,
148 OrderID: payment.OrderID,
149 Amount: payment.Amount,
150 }
151
152 err = mb.rabbitClient.SendDirectNotification("email", notification)
153 if err != nil {
154 return fmt.Errorf("failed to queue email notification: %w", err)
155 }
156
157 return nil
158}
159
160func publishKafkaEvent(eventType, key string, eventData interface{}) error {
161 data, err := json.Marshal(eventData)
162 if err != nil {
163 return err
164 }
165
166 err = mb.kafkaProducer.Produce(&kafka.Message{
167 TopicPartition: kafka.TopicPartition{
168 Topic: func() *string { t := "events"; return &t }(),
169 Partition: kafka.PartitionAny,
170 },
171 Key: []byte(key),
172 Value: data,
173 }, nil)
174
175 return err
176}
Monitoring and Observability
1type MessageBusMetrics struct {
2 // Kafka metrics
3 kafkaMessagesProduced int64
4 kafkaMessagesConsumed int64
5 kafkaProducerErrors int64
6 kafkaConsumerErrors int64
7
8 // RabbitMQ metrics
9 rabbitMessagesRouted int64
10 rabbitRoutingErrors int64
11
12 // NATS metrics
13 natsMessagesPublished int64
14 natsRequestsHandled int64
15 natsErrors int64
16
17 // Overall system health
18 totalLatency time.Duration
19 errorRate float64
20 throughput float64
21}
22
23func MonitorPerformance(ctx context.Context) {
24 metrics := &MessageBusMetrics{}
25
26 ticker := time.NewTicker(10 * time.Second)
27 defer ticker.Stop()
28
29 for {
30 select {
31 case <-ctx.Done():
32 return
33 case <-ticker.C:
34 mb.collectMetrics(metrics)
35 mb.reportMetrics(metrics)
36 mb.checkHealthThresholds(metrics)
37 }
38 }
39}
40
41func collectMetrics(metrics *MessageBusMetrics) {
42 // Collect Kafka metrics
43 kafkaStats := mb.kafkaProducer.GetStats()
44 metrics.kafkaMessagesProduced = kafkaStats.MsgsSent
45 // ... collect other metrics
46
47 // Collect RabbitMQ metrics
48 // Collect NATS metrics
49}
50
51func checkHealthThresholds(metrics *MessageBusMetrics) {
52 // Alert on high error rates
53 if metrics.errorRate > 0.05 { // 5% error rate
54 log.Printf("ALERT: High error rate detected: %.2f%%", metrics.errorRate*100)
55 }
56
57 // Alert on high latency
58 if metrics.totalLatency > 5*time.Second {
59 log.Printf("ALERT: High message latency detected: %v", metrics.totalLatency)
60 }
61
62 // Alert on low throughput
63 if metrics.throughput < 100 { // Less than 100 messages/second
64 log.Printf("ALERT: Low throughput detected: %.2f msg/sec", metrics.throughput)
65 }
66}
Practice Exercises
Exercise 1: Implement Exactly-Once Processing
Build a message processor that handles duplicate messages gracefully. You'll need to:
- Create a message processor with idempotency tracking
- Implement retry logic with exponential backoff
- Add dead letter queue handling
- Test with message duplication scenarios
Show Solution
1package exercise
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8)
9
10type ExactlyOnceProcessor struct {
11 processed map[string]time.Time
12 mu sync.RWMutex
13 maxAge time.Duration
14}
15
16func NewExactlyOnceProcessor(maxAge time.Duration) *ExactlyOnceProcessor {
17 proc := &ExactlyOnceProcessor{
18 processed: make(map[string]time.Time),
19 maxAge: maxAge,
20 }
21
22 // Start cleanup goroutine
23 go proc.cleanup()
24 return proc
25}
26
27func ProcessMessage(msgID string, handler func() error) error {
28 // Check if already processed
29 eop.mu.RLock()
30 if processedTime, exists := eop.processed[msgID]; exists {
31 eop.mu.RUnlock()
32 if time.Since(processedTime) < eop.maxAge {
33 log.Printf("Message %s already processed, skipping", msgID)
34 return nil
35 }
36 // Expired entry, allow processing
37 }
38 eop.mu.RUnlock()
39
40 // Acquire write lock
41 eop.mu.Lock()
42 defer eop.mu.Unlock()
43
44 // Double-check after acquiring write lock
45 if processedTime, exists := eop.processed[msgID]; exists {
46 if time.Since(processedTime) < eop.maxAge {
47 return nil
48 }
49 }
50
51 // Process the message with retries
52 return eop.processWithRetries(msgID, handler)
53}
54
55func processWithRetries(msgID string, handler func() error) error {
56 maxRetries := 3
57 baseBackoff := 100 * time.Millisecond
58
59 for attempt := 0; attempt <= maxRetries; attempt++ {
60 if attempt > 0 {
61 backoff := time.Duration(1<<uint(attempt-1)) * baseBackoff
62 if backoff > 5*time.Second {
63 backoff = 5 * time.Second
64 }
65 log.Printf("Retry attempt %d for message %s", attempt+1, msgID)
66 time.Sleep(backoff)
67 }
68
69 if err := handler(); err != nil {
70 log.Printf("Processing attempt %d failed for message %s: %v", attempt+1, msgID, err)
71 if attempt == maxRetries {
72 return fmt.Errorf("message processing failed after %d attempts: %w", maxRetries, err)
73 }
74 continue
75 }
76
77 // Mark as processed
78 eop.processed[msgID] = time.Now()
79 log.Printf("Successfully processed message %s", msgID)
80 return nil
81 }
82
83 return nil
84}
85
86func cleanup() {
87 ticker := time.NewTicker(eop.maxAge / 4)
88 for range ticker.C {
89 eop.mu.Lock()
90 now := time.Now()
91 for msgID, processedTime := range eop.processed {
92 if now.Sub(processedTime) > eop.maxAge {
93 delete(eop.processed, msgID)
94 }
95 }
96 eop.mu.Unlock()
97 }
98}
Exercise 2: Build Multi-Protocol Message Router
Create a message router that can:
- Route messages between Kafka, RabbitMQ, and NATS
- Transform messages for different target systems
- Handle message format conversions
- Provide monitoring and error handling
Show Solution
1package exercise
2
3type MessageRouter struct {
4 kafka *KafkaClient
5 rabbit *RabbitClient
6 nats *NATSClient
7 routes []Route
8 metrics *RouterMetrics
9}
10
11type Route struct {
12 From string // "kafka", "rabbitmq", "nats"
13 To string // Target system
14 Topic string // Source topic/subject
15 Transform func([]byte) // Message transformation
16 Filter func([]byte) bool // Message filtering
17}
18
19func NewMessageRouter(kafka *KafkaClient, rabbit *RabbitClient, nats *NATSClient) *MessageRouter {
20 return &MessageRouter{
21 kafka: kafka,
22 rabbit: rabbit,
23 nats: nats,
24 routes: make([]Route, 0),
25 metrics: NewRouterMetrics(),
26 }
27}
28
29func AddRoute(route Route) {
30 mr.routes = append(mr.routes, route)
31}
32
33func Start(ctx context.Context) error {
34 for _, route := range mr.routes {
35 switch route.From {
36 case "kafka":
37 go mr.startKafkaRoute(ctx, route)
38 case "rabbitmq":
39 go mr.startRabbitRoute(ctx, route)
40 case "nats":
41 go mr.startNatsRoute(ctx, route)
42 }
43 }
44 return nil
45}
46
47func routeMessage(msg []byte, route Route) error {
48 // Apply filter
49 if route.Filter != nil && !route.Filter(msg) {
50 return nil // Skip message
51 }
52
53 // Apply transformation
54 if route.Transform != nil {
55 transformed, err := route.Transform(msg)
56 if err != nil {
57 mr.metrics.recordTransformationError()
58 return fmt.Errorf("transformation failed: %w", err)
59 }
60 msg = transformed
61 }
62
63 // Route to target system
64 switch route.To {
65 case "kafka":
66 return mr.kafka.Publish(route.Topic, msg)
67 case "rabbitmq":
68 return mr.rabbit.Publish(route.Topic, msg)
69 case "nats":
70 return mr.nats.Publish(route.Topic, msg)
71 default:
72 return fmt.Errorf("unknown target system: %s", route.To)
73 }
74}
Exercise 3: Implement Circuit Breaker for Message Systems
Build a circuit breaker that protects downstream systems when message systems become unavailable.
Show Solution
1package exercise
2
3type MessageCircuitBreaker struct {
4 maxFailures int
5 resetTimeout time.Duration
6 state CircuitState
7 failures int
8 lastFailure time.Time
9 mu sync.RWMutex
10}
11
12type CircuitState int
13
14const (
15 StateClosed CircuitState = iota
16 StateOpen
17 StateHalfOpen
18)
19
20func NewMessageCircuitBreaker(maxFailures int, resetTimeout time.Duration) *MessageCircuitBreaker {
21 return &MessageCircuitBreaker{
22 maxFailures: maxFailures,
23 resetTimeout: resetTimeout,
24 state: StateClosed,
25 }
26}
27
28func Execute(fn func() error) error {
29 if !mcb.canExecute() {
30 return fmt.Errorf("circuit breaker is open")
31 }
32
33 err := fn()
34 mcb.recordResult(err)
35 return err
36}
37
38func canExecute() bool {
39 mcb.mu.RLock()
40 defer mcb.mu.RUnlock()
41
42 switch mcb.state {
43 case StateClosed:
44 return true
45 case StateOpen:
46 if time.Since(mcb.lastFailure) > mcb.resetTimeout {
47 mcb.state = StateHalfOpen
48 return true
49 }
50 return false
51 case StateHalfOpen:
52 return true
53 default:
54 return false
55 }
56}
57
58func recordResult(err error) {
59 mcb.mu.Lock()
60 defer mcb.mu.Unlock()
61
62 if err != nil {
63 mcb.failures++
64 mcb.lastFailure = time.Now()
65
66 if mcb.failures >= mcb.maxFailures {
67 mcb.state = StateOpen
68 }
69 } else {
70 if mcb.state == StateHalfOpen {
71 mcb.state = StateClosed
72 }
73 mcb.failures = 0
74 }
75}
Further Reading
- Designing Data-Intensive Applications by Martin Kleppmann
- Kafka: The Definitive Guide
- RabbitMQ in Action
- Enterprise Integration Patterns
- Building Microservices by Sam Newman
Exercise 4: Build Event Sourcing System with Kafka
Implement an event sourcing system that captures all state changes as events and provides event replay capabilities.
Requirements:
- Event store using Kafka with proper partitioning
- Event replay functionality for rebuilding state
- Snapshot mechanism to optimize replay
- Multiple projections (read models) from same event stream
Show Solution
1package exercise
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10
11 "github.com/confluentinc/confluent-kafka-go/v2/kafka"
12)
13
14// Event represents a domain event in the system
15type Event struct {
16 ID string `json:"id"`
17 AggregateID string `json:"aggregate_id"`
18 EventType string `json:"event_type"`
19 Version int64 `json:"version"`
20 Timestamp time.Time `json:"timestamp"`
21 Data map[string]interface{} `json:"data"`
22 Metadata map[string]string `json:"metadata"`
23}
24
25// EventStore manages event persistence and retrieval
26type EventStore struct {
27 producer *kafka.Producer
28 topic string
29 snapshots map[string]*Snapshot
30 snapshotMutex sync.RWMutex
31}
32
33// Snapshot represents a point-in-time state snapshot
34type Snapshot struct {
35 AggregateID string `json:"aggregate_id"`
36 Version int64 `json:"version"`
37 Timestamp time.Time `json:"timestamp"`
38 State map[string]interface{} `json:"state"`
39}
40
41func NewEventStore(brokers, topic string) (*EventStore, error) {
42 producer, err := kafka.NewProducer(&kafka.ConfigMap{
43 "bootstrap.servers": brokers,
44 "acks": "all",
45 "enable.idempotence": true,
46 "compression.type": "snappy",
47 })
48 if err != nil {
49 return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
50 }
51
52 return &EventStore{
53 producer: producer,
54 topic: topic,
55 snapshots: make(map[string]*Snapshot),
56 }, nil
57}
58
59// AppendEvent stores a new event in the event stream
60func (es *EventStore) AppendEvent(ctx context.Context, event *Event) error {
61 // Set event metadata
62 event.Timestamp = time.Now()
63 if event.ID == "" {
64 event.ID = generateEventID()
65 }
66
67 // Serialize event
68 eventData, err := json.Marshal(event)
69 if err != nil {
70 return fmt.Errorf("failed to serialize event: %w", err)
71 }
72
73 // Publish to Kafka using aggregate ID as partition key
74 // This ensures all events for same aggregate are in same partition (ordering)
75 message := &kafka.Message{
76 TopicPartition: kafka.TopicPartition{
77 Topic: &es.topic,
78 Partition: kafka.PartitionAny,
79 },
80 Key: []byte(event.AggregateID),
81 Value: eventData,
82 Headers: []kafka.Header{
83 {Key: "event_type", Value: []byte(event.EventType)},
84 {Key: "event_id", Value: []byte(event.ID)},
85 {Key: "version", Value: []byte(fmt.Sprintf("%d", event.Version))},
86 },
87 }
88
89 deliveryChan := make(chan kafka.Event, 1)
90 err = es.producer.Produce(message, deliveryChan)
91 if err != nil {
92 return fmt.Errorf("failed to produce event: %w", err)
93 }
94
95 // Wait for delivery confirmation
96 select {
97 case e := <-deliveryChan:
98 m := e.(*kafka.Message)
99 if m.TopicPartition.Error != nil {
100 return fmt.Errorf("event delivery failed: %w", m.TopicPartition.Error)
101 }
102 log.Printf("Event %s appended at offset %v", event.ID, m.TopicPartition.Offset)
103 return nil
104
105 case <-time.After(10 * time.Second):
106 return fmt.Errorf("event delivery timeout")
107 }
108}
109
110// GetEvents retrieves all events for an aggregate from a specific version
111func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int64) ([]*Event, error) {
112 // Check if we have a recent snapshot
113 es.snapshotMutex.RLock()
114 snapshot, hasSnapshot := es.snapshots[aggregateID]
115 es.snapshotMutex.RUnlock()
116
117 if hasSnapshot && snapshot.Version >= fromVersion {
118 fromVersion = snapshot.Version + 1
119 log.Printf("Using snapshot at version %d for aggregate %s", snapshot.Version, aggregateID)
120 }
121
122 // Create consumer to read events
123 consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
124 "bootstrap.servers": es.producer.String(),
125 "group.id": fmt.Sprintf("replay-%s", aggregateID),
126 "auto.offset.reset": "earliest",
127 })
128 if err != nil {
129 return nil, fmt.Errorf("failed to create consumer: %w", err)
130 }
131 defer consumer.Close()
132
133 // Subscribe to topic
134 err = consumer.Subscribe(es.topic, nil)
135 if err != nil {
136 return nil, fmt.Errorf("failed to subscribe: %w", err)
137 }
138
139 events := make([]*Event, 0)
140 timeout := time.After(5 * time.Second)
141
142 for {
143 select {
144 case <-ctx.Done():
145 return events, ctx.Err()
146
147 case <-timeout:
148 return events, nil
149
150 default:
151 msg, err := consumer.ReadMessage(100 * time.Millisecond)
152 if err != nil {
153 if err.(kafka.Error).Code() == kafka.ErrTimedOut {
154 continue
155 }
156 log.Printf("Consumer error: %v", err)
157 continue
158 }
159
160 // Parse event
161 var event Event
162 if err := json.Unmarshal(msg.Value, &event); err != nil {
163 log.Printf("Failed to unmarshal event: %v", err)
164 continue
165 }
166
167 // Filter by aggregate ID and version
168 if event.AggregateID == aggregateID && event.Version >= fromVersion {
169 events = append(events, &event)
170 }
171 }
172 }
173}
174
175// CreateSnapshot saves a snapshot of aggregate state
176func (es *EventStore) CreateSnapshot(aggregateID string, version int64, state map[string]interface{}) {
177 snapshot := &Snapshot{
178 AggregateID: aggregateID,
179 Version: version,
180 Timestamp: time.Now(),
181 State: state,
182 }
183
184 es.snapshotMutex.Lock()
185 es.snapshots[aggregateID] = snapshot
186 es.snapshotMutex.Unlock()
187
188 log.Printf("Created snapshot for aggregate %s at version %d", aggregateID, version)
189}
190
191// OrderAggregate represents an order aggregate that uses event sourcing
192type OrderAggregate struct {
193 ID string
194 CustomerID string
195 Items []OrderItem
196 TotalAmount float64
197 Status string
198 Version int64
199 events []*Event
200}
201
202type OrderItem struct {
203 ProductID string `json:"product_id"`
204 Quantity int `json:"quantity"`
205 Price float64 `json:"price"`
206}
207
208// NewOrderAggregate creates a new order aggregate
209func NewOrderAggregate(id, customerID string) *OrderAggregate {
210 return &OrderAggregate{
211 ID: id,
212 CustomerID: customerID,
213 Status: "draft",
214 Items: make([]OrderItem, 0),
215 events: make([]*Event, 0),
216 }
217}
218
219// AddItem adds an item to the order
220func (oa *OrderAggregate) AddItem(productID string, quantity int, price float64) {
221 event := &Event{
222 AggregateID: oa.ID,
223 EventType: "item_added",
224 Version: oa.Version + 1,
225 Data: map[string]interface{}{
226 "product_id": productID,
227 "quantity": quantity,
228 "price": price,
229 },
230 }
231
232 oa.applyEvent(event)
233 oa.events = append(oa.events, event)
234}
235
236// PlaceOrder places the order
237func (oa *OrderAggregate) PlaceOrder() error {
238 if len(oa.Items) == 0 {
239 return fmt.Errorf("cannot place empty order")
240 }
241
242 if oa.Status != "draft" {
243 return fmt.Errorf("order already placed")
244 }
245
246 event := &Event{
247 AggregateID: oa.ID,
248 EventType: "order_placed",
249 Version: oa.Version + 1,
250 Data: map[string]interface{}{
251 "customer_id": oa.CustomerID,
252 "total_amount": oa.TotalAmount,
253 "item_count": len(oa.Items),
254 },
255 }
256
257 oa.applyEvent(event)
258 oa.events = append(oa.events, event)
259 return nil
260}
261
262// applyEvent applies an event to the aggregate state
263func (oa *OrderAggregate) applyEvent(event *Event) {
264 switch event.EventType {
265 case "item_added":
266 item := OrderItem{
267 ProductID: event.Data["product_id"].(string),
268 Quantity: int(event.Data["quantity"].(float64)),
269 Price: event.Data["price"].(float64),
270 }
271 oa.Items = append(oa.Items, item)
272 oa.TotalAmount += item.Price * float64(item.Quantity)
273 oa.Version = event.Version
274
275 case "order_placed":
276 oa.Status = "placed"
277 oa.Version = event.Version
278
279 case "order_shipped":
280 oa.Status = "shipped"
281 oa.Version = event.Version
282
283 case "order_delivered":
284 oa.Status = "delivered"
285 oa.Version = event.Version
286 }
287}
288
289// GetUncommittedEvents returns events that haven't been persisted
290func (oa *OrderAggregate) GetUncommittedEvents() []*Event {
291 return oa.events
292}
293
294// MarkEventsCommitted clears uncommitted events after persistence
295func (oa *OrderAggregate) MarkEventsCommitted() {
296 oa.events = make([]*Event, 0)
297}
298
299// ReplayEvents rebuilds aggregate state from events
300func (oa *OrderAggregate) ReplayEvents(events []*Event) {
301 for _, event := range events {
302 oa.applyEvent(event)
303 }
304}
305
306// Projection represents a read model built from events
307type Projection interface {
308 HandleEvent(event *Event) error
309 GetState() interface{}
310}
311
312// OrderSummaryProjection maintains order summaries
313type OrderSummaryProjection struct {
314 summaries map[string]*OrderSummary
315 mu sync.RWMutex
316}
317
318type OrderSummary struct {
319 OrderID string `json:"order_id"`
320 CustomerID string `json:"customer_id"`
321 ItemCount int `json:"item_count"`
322 TotalAmount float64 `json:"total_amount"`
323 Status string `json:"status"`
324 CreatedAt time.Time `json:"created_at"`
325 UpdatedAt time.Time `json:"updated_at"`
326}
327
328func NewOrderSummaryProjection() *OrderSummaryProjection {
329 return &OrderSummaryProjection{
330 summaries: make(map[string]*OrderSummary),
331 }
332}
333
334func (osp *OrderSummaryProjection) HandleEvent(event *Event) error {
335 osp.mu.Lock()
336 defer osp.mu.Unlock()
337
338 summary, exists := osp.summaries[event.AggregateID]
339 if !exists {
340 summary = &OrderSummary{
341 OrderID: event.AggregateID,
342 CreatedAt: event.Timestamp,
343 }
344 osp.summaries[event.AggregateID] = summary
345 }
346
347 summary.UpdatedAt = event.Timestamp
348
349 switch event.EventType {
350 case "item_added":
351 summary.ItemCount++
352 price := event.Data["price"].(float64)
353 quantity := event.Data["quantity"].(float64)
354 summary.TotalAmount += price * quantity
355
356 case "order_placed":
357 summary.CustomerID = event.Data["customer_id"].(string)
358 summary.Status = "placed"
359
360 case "order_shipped":
361 summary.Status = "shipped"
362
363 case "order_delivered":
364 summary.Status = "delivered"
365 }
366
367 return nil
368}
369
370func (osp *OrderSummaryProjection) GetState() interface{} {
371 osp.mu.RLock()
372 defer osp.mu.RUnlock()
373 return osp.summaries
374}
375
376func (osp *OrderSummaryProjection) GetOrderSummary(orderID string) (*OrderSummary, bool) {
377 osp.mu.RLock()
378 defer osp.mu.RUnlock()
379 summary, exists := osp.summaries[orderID]
380 return summary, exists
381}
382
383// Helper function to generate unique event IDs
384func generateEventID() string {
385 return fmt.Sprintf("evt_%d", time.Now().UnixNano())
386}
387
388// Example usage
389func ExampleEventSourcing() {
390 // Create event store
391 eventStore, err := NewEventStore("localhost:9092", "order-events")
392 if err != nil {
393 log.Fatal(err)
394 }
395
396 // Create order aggregate
397 order := NewOrderAggregate("order-001", "customer-123")
398 order.AddItem("product-1", 2, 29.99)
399 order.AddItem("product-2", 1, 49.99)
400
401 if err := order.PlaceOrder(); err != nil {
402 log.Fatal(err)
403 }
404
405 // Persist events
406 ctx := context.Background()
407 for _, event := range order.GetUncommittedEvents() {
408 if err := eventStore.AppendEvent(ctx, event); err != nil {
409 log.Printf("Failed to append event: %v", err)
410 }
411 }
412 order.MarkEventsCommitted()
413
414 // Create projection
415 projection := NewOrderSummaryProjection()
416
417 // Replay events to build projection
418 events, err := eventStore.GetEvents(ctx, "order-001", 0)
419 if err != nil {
420 log.Fatal(err)
421 }
422
423 for _, event := range events {
424 projection.HandleEvent(event)
425 }
426
427 // Query projection
428 summary, _ := projection.GetOrderSummary("order-001")
429 log.Printf("Order summary: %+v", summary)
430}
Exercise 5: Implement Saga Pattern for Distributed Transactions
Build a saga orchestrator that coordinates distributed transactions across multiple services using compensating actions.
Requirements:
- Saga coordinator that manages transaction steps
- Compensation mechanism for rollback
- State persistence for recovery
- Timeout handling and retry logic
- Support for both orchestration and choreography patterns
Show Solution
1package exercise
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10)
11
12// SagaStep represents a single step in a saga transaction
13type SagaStep struct {
14 Name string
15 Action func(context.Context, interface{}) (interface{}, error)
16 Compensation func(context.Context, interface{}) error
17 Timeout time.Duration
18}
19
20// SagaState represents the current state of a saga execution
21type SagaState string
22
23const (
24 SagaStateInitiated SagaState = "initiated"
25 SagaStateInProgress SagaState = "in_progress"
26 SagaStateCompleted SagaState = "completed"
27 SagaStateFailed SagaState = "failed"
28 SagaStateRolledBack SagaState = "rolled_back"
29)
30
31// SagaExecution tracks the execution of a saga
32type SagaExecution struct {
33 ID string
34 State SagaState
35 CurrentStep int
36 CompletedSteps []StepResult
37 StartTime time.Time
38 EndTime time.Time
39 Error string
40 mu sync.RWMutex
41}
42
43type StepResult struct {
44 StepName string
45 Success bool
46 Result interface{}
47 Error string
48 Timestamp time.Time
49}
50
51// SagaOrchestrator coordinates saga executions
52type SagaOrchestrator struct {
53 steps []SagaStep
54 executions map[string]*SagaExecution
55 messageClient MessageClient
56 stateStore StateStore
57 mu sync.RWMutex
58}
59
60// MessageClient interface for publishing saga events
61type MessageClient interface {
62 Publish(topic string, message interface{}) error
63 Subscribe(topic string, handler func([]byte) error) error
64}
65
66// StateStore interface for persisting saga state
67type StateStore interface {
68 Save(id string, execution *SagaExecution) error
69 Load(id string) (*SagaExecution, error)
70 Delete(id string) error
71}
72
73func NewSagaOrchestrator(steps []SagaStep, msgClient MessageClient, stateStore StateStore) *SagaOrchestrator {
74 return &SagaOrchestrator{
75 steps: steps,
76 executions: make(map[string]*SagaExecution),
77 messageClient: msgClient,
78 stateStore: stateStore,
79 }
80}
81
82// ExecuteSaga starts a new saga execution
83func (so *SagaOrchestrator) ExecuteSaga(ctx context.Context, sagaID string, initialData interface{}) error {
84 // Create saga execution
85 execution := &SagaExecution{
86 ID: sagaID,
87 State: SagaStateInitiated,
88 CurrentStep: 0,
89 CompletedSteps: make([]StepResult, 0),
90 StartTime: time.Now(),
91 }
92
93 // Store execution
94 so.mu.Lock()
95 so.executions[sagaID] = execution
96 so.mu.Unlock()
97
98 // Persist state
99 if err := so.stateStore.Save(sagaID, execution); err != nil {
100 return fmt.Errorf("failed to persist saga state: %w", err)
101 }
102
103 // Publish saga started event
104 so.publishEvent("saga.started", map[string]interface{}{
105 "saga_id": sagaID,
106 "start_time": execution.StartTime,
107 })
108
109 // Execute saga steps
110 return so.executeSteps(ctx, execution, initialData)
111}
112
113// executeSteps runs all saga steps sequentially
114func (so *SagaOrchestrator) executeSteps(ctx context.Context, execution *SagaExecution, data interface{}) error {
115 execution.mu.Lock()
116 execution.State = SagaStateInProgress
117 execution.mu.Unlock()
118
119 currentData := data
120
121 for i, step := range so.steps {
122 execution.mu.Lock()
123 execution.CurrentStep = i
124 execution.mu.Unlock()
125
126 // Execute step with timeout
127 stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
128 result, err := so.executeStepWithRetry(stepCtx, step, currentData)
129 cancel()
130
131 stepResult := StepResult{
132 StepName: step.Name,
133 Success: err == nil,
134 Result: result,
135 Timestamp: time.Now(),
136 }
137
138 if err != nil {
139 stepResult.Error = err.Error()
140 execution.mu.Lock()
141 execution.CompletedSteps = append(execution.CompletedSteps, stepResult)
142 execution.State = SagaStateFailed
143 execution.Error = err.Error()
144 execution.EndTime = time.Now()
145 execution.mu.Unlock()
146
147 // Persist failed state
148 so.stateStore.Save(execution.ID, execution)
149
150 // Publish failure event
151 so.publishEvent("saga.step.failed", map[string]interface{}{
152 "saga_id": execution.ID,
153 "step_name": step.Name,
154 "error": err.Error(),
155 })
156
157 // Execute compensations for completed steps
158 log.Printf("Saga %s failed at step %s, starting rollback", execution.ID, step.Name)
159 return so.rollback(ctx, execution)
160 }
161
162 // Step succeeded
163 execution.mu.Lock()
164 execution.CompletedSteps = append(execution.CompletedSteps, stepResult)
165 execution.mu.Unlock()
166
167 // Persist state after each step
168 so.stateStore.Save(execution.ID, execution)
169
170 // Publish step completed event
171 so.publishEvent("saga.step.completed", map[string]interface{}{
172 "saga_id": execution.ID,
173 "step_name": step.Name,
174 "step_num": i + 1,
175 "total": len(so.steps),
176 })
177
178 currentData = result
179 }
180
181 // All steps completed successfully
182 execution.mu.Lock()
183 execution.State = SagaStateCompleted
184 execution.EndTime = time.Now()
185 execution.mu.Unlock()
186
187 so.stateStore.Save(execution.ID, execution)
188
189 // Publish completion event
190 so.publishEvent("saga.completed", map[string]interface{}{
191 "saga_id": execution.ID,
192 "duration": execution.EndTime.Sub(execution.StartTime).Seconds(),
193 })
194
195 log.Printf("Saga %s completed successfully in %v", execution.ID, execution.EndTime.Sub(execution.StartTime))
196 return nil
197}
198
199// executeStepWithRetry executes a step with retry logic
200func (so *SagaOrchestrator) executeStepWithRetry(ctx context.Context, step SagaStep, data interface{}) (interface{}, error) {
201 maxRetries := 3
202 baseBackoff := 100 * time.Millisecond
203
204 var lastErr error
205 for attempt := 0; attempt <= maxRetries; attempt++ {
206 if attempt > 0 {
207 backoff := time.Duration(1<<uint(attempt-1)) * baseBackoff
208 if backoff > 5*time.Second {
209 backoff = 5 * time.Second
210 }
211
212 log.Printf("Retrying step %s, attempt %d/%d", step.Name, attempt+1, maxRetries+1)
213 select {
214 case <-time.After(backoff):
215 case <-ctx.Done():
216 return nil, ctx.Err()
217 }
218 }
219
220 result, err := step.Action(ctx, data)
221 if err == nil {
222 return result, nil
223 }
224
225 lastErr = err
226 log.Printf("Step %s failed (attempt %d): %v", step.Name, attempt+1, err)
227 }
228
229 return nil, fmt.Errorf("step %s failed after %d attempts: %w", step.Name, maxRetries+1, lastErr)
230}
231
232// rollback executes compensation actions in reverse order
233func (so *SagaOrchestrator) rollback(ctx context.Context, execution *SagaExecution) error {
234 log.Printf("Starting rollback for saga %s", execution.ID)
235
236 execution.mu.RLock()
237 completedSteps := make([]StepResult, len(execution.CompletedSteps))
238 copy(completedSteps, execution.CompletedSteps)
239 execution.mu.RUnlock()
240
241 // Compensate in reverse order
242 for i := len(completedSteps) - 1; i >= 0; i-- {
243 stepResult := completedSteps[i]
244 if !stepResult.Success {
245 continue // Skip failed steps
246 }
247
248 step := so.steps[i]
249 if step.Compensation == nil {
250 log.Printf("No compensation defined for step %s", step.Name)
251 continue
252 }
253
254 log.Printf("Compensating step %s", step.Name)
255
256 // Execute compensation with timeout
257 compensateCtx, cancel := context.WithTimeout(ctx, step.Timeout)
258 err := step.Compensation(compensateCtx, stepResult.Result)
259 cancel()
260
261 if err != nil {
262 log.Printf("Compensation failed for step %s: %v", step.Name, err)
263 // Continue with other compensations even if one fails
264 so.publishEvent("saga.compensation.failed", map[string]interface{}{
265 "saga_id": execution.ID,
266 "step_name": step.Name,
267 "error": err.Error(),
268 })
269 } else {
270 so.publishEvent("saga.compensation.completed", map[string]interface{}{
271 "saga_id": execution.ID,
272 "step_name": step.Name,
273 })
274 }
275 }
276
277 execution.mu.Lock()
278 execution.State = SagaStateRolledBack
279 execution.EndTime = time.Now()
280 execution.mu.Unlock()
281
282 so.stateStore.Save(execution.ID, execution)
283
284 so.publishEvent("saga.rolled_back", map[string]interface{}{
285 "saga_id": execution.ID,
286 "duration": execution.EndTime.Sub(execution.StartTime).Seconds(),
287 })
288
289 return fmt.Errorf("saga rolled back: %s", execution.Error)
290}
291
292// publishEvent publishes a saga event
293func (so *SagaOrchestrator) publishEvent(eventType string, data map[string]interface{}) {
294 event := map[string]interface{}{
295 "event_type": eventType,
296 "timestamp": time.Now(),
297 "data": data,
298 }
299
300 if err := so.messageClient.Publish("saga-events", event); err != nil {
301 log.Printf("Failed to publish event %s: %v", eventType, err)
302 }
303}
304
305// GetSagaStatus returns the current status of a saga
306func (so *SagaOrchestrator) GetSagaStatus(sagaID string) (*SagaExecution, error) {
307 so.mu.RLock()
308 execution, exists := so.executions[sagaID]
309 so.mu.RUnlock()
310
311 if !exists {
312 // Try loading from state store
313 return so.stateStore.Load(sagaID)
314 }
315
316 return execution, nil
317}
318
319// Example: Order fulfillment saga
320func ExampleOrderFulfillmentSaga() {
321 // Define saga steps
322 steps := []SagaStep{
323 {
324 Name: "reserve_inventory",
325 Timeout: 5 * time.Second,
326 Action: func(ctx context.Context, data interface{}) (interface{}, error) {
327 order := data.(map[string]interface{})
328 log.Printf("Reserving inventory for order %s", order["order_id"])
329 // Simulate inventory reservation
330 return map[string]interface{}{
331 "reservation_id": "inv-12345",
332 "order_id": order["order_id"],
333 }, nil
334 },
335 Compensation: func(ctx context.Context, data interface{}) error {
336 result := data.(map[string]interface{})
337 log.Printf("Releasing inventory reservation %s", result["reservation_id"])
338 // Simulate inventory release
339 return nil
340 },
341 },
342 {
343 Name: "process_payment",
344 Timeout: 10 * time.Second,
345 Action: func(ctx context.Context, data interface{}) (interface{}, error) {
346 result := data.(map[string]interface{})
347 log.Printf("Processing payment for order %s", result["order_id"])
348 // Simulate payment processing
349 return map[string]interface{}{
350 "payment_id": "pay-67890",
351 "order_id": result["order_id"],
352 }, nil
353 },
354 Compensation: func(ctx context.Context, data interface{}) error {
355 result := data.(map[string]interface{})
356 log.Printf("Refunding payment %s", result["payment_id"])
357 // Simulate payment refund
358 return nil
359 },
360 },
361 {
362 Name: "create_shipment",
363 Timeout: 5 * time.Second,
364 Action: func(ctx context.Context, data interface{}) (interface{}, error) {
365 result := data.(map[string]interface{})
366 log.Printf("Creating shipment for order %s", result["order_id"])
367 // Simulate shipment creation
368 return map[string]interface{}{
369 "shipment_id": "ship-11111",
370 "order_id": result["order_id"],
371 }, nil
372 },
373 Compensation: func(ctx context.Context, data interface{}) error {
374 result := data.(map[string]interface{})
375 log.Printf("Canceling shipment %s", result["shipment_id"])
376 // Simulate shipment cancellation
377 return nil
378 },
379 },
380 {
381 Name: "send_confirmation",
382 Timeout: 3 * time.Second,
383 Action: func(ctx context.Context, data interface{}) (interface{}, error) {
384 result := data.(map[string]interface{})
385 log.Printf("Sending confirmation for order %s", result["order_id"])
386 // Simulate sending confirmation email
387 return result, nil
388 },
389 Compensation: nil, // No compensation needed for notification
390 },
391 }
392
393 // Create saga orchestrator (with mock implementations)
394 var msgClient MessageClient // Implement with actual message broker
395 var stateStore StateStore // Implement with actual persistence
396
397 orchestrator := NewSagaOrchestrator(steps, msgClient, stateStore)
398
399 // Execute saga
400 ctx := context.Background()
401 orderData := map[string]interface{}{
402 "order_id": "order-12345",
403 "customer_id": "cust-67890",
404 "amount": 99.99,
405 }
406
407 if err := orchestrator.ExecuteSaga(ctx, "saga-order-12345", orderData); err != nil {
408 log.Printf("Saga failed: %v", err)
409 }
410}
Practice Exercises
Summary
Key Takeaways
- Choose the Right Tool: Kafka for event sourcing and analytics, RabbitMQ for complex routing, NATS for high-speed simplicity
- Exactly-Once Processing: Implement idempotency tracking to handle duplicates gracefully
- Error Handling: Use dead letter queues and circuit breakers for resilience
- Monitoring: Track latency, throughput, and error rates across all systems
- Testing: Test failure scenarios, not just happy paths
Production Checklist
- Message Serialization: Use efficient formats like Protobuf for high-volume systems
- Retry Logic: Implement exponential backoff with maximum attempts
- Dead Letter Queues: Handle messages that repeatedly fail processing
- Circuit Breakers: Prevent cascade failures when systems are down
- Monitoring: Track consumer lag, error rates, and throughput
- Security: Implement authentication and encryption for sensitive messages
- Testing: Test failure scenarios, network partitions, and high load
- Documentation: Document message schemas and routing patterns
When to Use Each System
Use Kafka when:
- You need to preserve complete event history
- Multiple consumers need to read events independently
- High throughput is critical
- You're implementing event sourcing or CQRS
Use RabbitMQ when:
- You need complex routing patterns
- Guaranteed delivery is critical
- Message ordering within conversations matters
- You have diverse consumer requirements
Use NATS when:
- Simplicity and performance are priorities
- You need sub-millisecond latency
- Message durability isn't critical
- You have microservices with lightweight communication needs
💡 Final Insight: The best message systems use multiple technologies for different use cases. Don't force one tool to solve all problems—understand the trade-offs and choose the right tool for each specific need.
Next Steps
- Practice: Build a small e-commerce system using all three message brokers
- Deepen Knowledge: Read the official documentation for Kafka, RabbitMQ, and NATS
- Production Readiness: Learn about monitoring, scaling, and operational aspects
- Advanced Patterns: Study CQRS, event sourcing, and saga patterns
- Cloud Services: Explore managed message services
Remember: Message systems are the backbone of distributed systems. Master them, and you can build systems that scale to millions of users while maintaining reliability and performance.