Think of a busy newsroom where journalists write stories, editors review them, and publishers distribute them - all happening continuously and independently. Each journalist doesn't need to know who will read their article or how it will be distributed. They just write the story, and the system ensures it reaches the right people. This is how event-driven architecture works - services produce events without knowing who will consume them.
Event-driven architecture enables loosely coupled systems where services communicate through events. This comprehensive guide covers event sourcing, CQRS, event stores, projections, sagas, and building production-ready event-driven systems in Go.
💡 Key Takeaway: Event-driven architecture transforms your system from a chain of direct dependencies into a responsive ecosystem where services react to what happens, rather than being told what to do.
Event-Driven Architecture Fundamentals
Think of events like a newspaper's daily publication. Each article is an immutable fact about something that happened. Readers can subscribe to the sections they care about, and new articles can be added without changing how existing readers work.
Event-driven systems react to events asynchronously, enabling scalable and decoupled architectures.
Why Event-Driven Architecture:
- Decoupling: Services don't need to know about each other, only about events
- Scalability: Async processing allows handling bursts of traffic
- Auditability: Events provide a complete history of what happened
- Flexibility: Easy to add new consumers without modifying producers
- Resilience: Failure in one service doesn't cascade to others
- Time Travel: Reconstruct past states for debugging and analysis
Key Patterns:
- Event Sourcing: Store all changes as a sequence of events
- CQRS: Separate write models from read models
- Saga: Coordinate distributed transactions across services using events
- Event Streaming: Continuous flow of events for real-time processing
- Event Notification: Notify interested parties when events occur
Trade-offs:
- ✅ Extreme scalability and flexibility
- ✅ Natural audit logging and compliance
- ✅ Temporal queries and point-in-time recovery
- ❌ Eventual consistency challenges
- ❌ Increased system complexity
- ❌ Event versioning and schema evolution
- ❌ Debugging distributed flows is harder
⚠️ Important: Event-driven systems require a mindset shift from imperative to reactive. Be prepared for more complex debugging and monitoring challenges.
Real-world Example: Uber uses event-driven architecture to coordinate millions of rides. When a driver accepts a ride, that event triggers updates to passenger apps, payment processing, and driver dispatch systems - all without direct coupling between these services.
Core Concepts
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9// Event represents something that happened
10type Event struct {
11 ID string
12 Type string
13 AggregateID string
14 AggregateType string
15 Data interface{}
16 Metadata map[string]string
17 Timestamp time.Time
18 Version int
19}
20
21func main() {
22 fmt.Println("Event-Driven Architecture Concepts")
23 fmt.Println("==================================\n")
24
25 concepts := map[string]string{
26 "Event": "Immutable fact about something that happened",
27 "Command": "Intent to change state",
28 "Aggregate": "Cluster of domain objects treated as a unit",
29 "Event Store": "Append-only log of events",
30 "Projection": "Materialized view built from events",
31 "Event Handler": "Processes events and updates state",
32 "Event Bus": "Routes events to subscribers",
33 "Snapshot": "Point-in-time state for optimization",
34 }
35
36 for concept, description := range concepts {
37 fmt.Printf("%-15s: %s\n", concept, description)
38 }
39
40 fmt.Println("\nEvent vs Command:")
41 fmt.Println("=================")
42 fmt.Println("Command: CreateOrder (imperative)")
43 fmt.Println("Event: OrderCreated (past tense)")
44 fmt.Println()
45 fmt.Println("Command: UpdateInventory (imperative)")
46 fmt.Println("Event: InventoryUpdated (past tense)")
47
48 fmt.Println("\nBenefits:")
49 fmt.Println("- Audit trail: Complete history of changes")
50 fmt.Println("- Temporal queries: State at any point in time")
51 fmt.Println("- Event replay: Rebuild state from events")
52 fmt.Println("- Decoupling: Services react to events independently")
53 fmt.Println("- Scalability: Async processing, parallel handlers")
54 fmt.Println("- Flexibility: Add new features without changing existing code")
55
56 // Example events
57 events := []Event{
58 {
59 ID: "evt-1",
60 Type: "OrderCreated",
61 AggregateID: "order-123",
62 AggregateType: "Order",
63 Data: map[string]interface{}{
64 "customer_id": "cust-456",
65 "total": 99.99,
66 },
67 Timestamp: time.Now(),
68 Version: 1,
69 },
70 {
71 ID: "evt-2",
72 Type: "OrderPaid",
73 AggregateID: "order-123",
74 AggregateType: "Order",
75 Data: map[string]interface{}{
76 "payment_id": "pay-789",
77 "amount": 99.99,
78 },
79 Timestamp: time.Now().Add(1 * time.Minute),
80 Version: 2,
81 },
82 {
83 ID: "evt-3",
84 Type: "OrderShipped",
85 AggregateID: "order-123",
86 AggregateType: "Order",
87 Data: map[string]interface{}{
88 "tracking_number": "TRK123456",
89 },
90 Timestamp: time.Now().Add(1 * time.Hour),
91 Version: 3,
92 },
93 }
94
95 fmt.Println("\nEvent Stream Example:")
96 for _, evt := range events {
97 fmt.Printf(" v%d %s: %s (aggregate: %s)\n",
98 evt.Version, evt.Timestamp.Format("15:04:05"),
99 evt.Type, evt.AggregateID)
100 }
101
102 fmt.Println("\nEvent Characteristics:")
103 fmt.Println("- Immutable: Events never change once created")
104 fmt.Println("- Ordered: Events have a sequence/version number")
105 fmt.Println("- Past tense: Events describe what happened")
106 fmt.Println("- Self-contained: Events carry all necessary data")
107 fmt.Println("- Timestamped: Events know when they occurred")
108}
Event Sourcing
Imagine keeping a complete diary of your life instead of just remembering your current situation. Your diary contains every significant event that happened to you, in chronological order. If someone asks about your current job, you don't just tell them - you can trace back through your diary to see all the events that led to where you are today. This is exactly how event sourcing works.
Instead of storing just the current state of data, event sourcing stores every change as an immutable event. The current state is derived by replaying these events.
Event sourcing stores state changes as a sequence of events rather than current state.
Common Pitfalls:
- Event stores can grow very large over time
- Rebuilding state from many events can be slow
- Schema evolution requires careful planning
- Debugging requires understanding event flows, not just current state
💡 Key Takeaway: Event sourcing provides a complete audit trail and enables powerful time-travel queries, but requires careful planning for performance and storage.
When to Use Event Sourcing:
- Financial systems requiring complete audit trails
- Systems needing temporal queries (state at any point in time)
- Complex domains with multiple state transitions
- Systems requiring event replay for bug fixes or migrations
- Regulatory compliance requiring immutable audit logs
When NOT to Use Event Sourcing:
- Simple CRUD applications
- Systems with high write throughput (>100k events/second)
- When eventual consistency is unacceptable
- Teams unfamiliar with event-driven patterns
Event Sourcing Implementation
1package eventsourcing
2
3import (
4 "errors"
5 "fmt"
6 "time"
7)
8
9// Base event interface
10type Event interface {
11 EventType() string
12 AggregateID() string
13 EventVersion() int
14 OccurredAt() time.Time
15}
16
17// Base event struct
18type BaseEvent struct {
19 Type string
20 ID string
21 Version int
22 Timestamp time.Time
23}
24
25func (e BaseEvent) EventType() string { return e.Type }
26func (e BaseEvent) AggregateID() string { return e.ID }
27func (e BaseEvent) EventVersion() int { return e.Version }
28func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
29
30// Order aggregate
31type Order struct {
32 ID string
33 CustomerID string
34 Items []OrderItem
35 Total float64
36 Status string
37 Version int
38 events []Event
39}
40
41type OrderItem struct {
42 ProductID string
43 Quantity int
44 Price float64
45}
46
47// Order events
48type OrderCreatedEvent struct {
49 BaseEvent
50 CustomerID string
51 Items []OrderItem
52 Total float64
53}
54
55type OrderPaidEvent struct {
56 BaseEvent
57 PaymentID string
58 Amount float64
59}
60
61type OrderShippedEvent struct {
62 BaseEvent
63 TrackingNumber string
64}
65
66type OrderCancelledEvent struct {
67 BaseEvent
68 Reason string
69}
70
71// Aggregate methods
72func NewOrder(id, customerID string, items []OrderItem) (*Order, error) {
73 if customerID == "" {
74 return nil, errors.New("customer ID required")
75 }
76 if len(items) == 0 {
77 return nil, errors.New("at least one item required")
78 }
79
80 // Calculate total
81 var total float64
82 for _, item := range items {
83 total += item.Price * float64(item.Quantity)
84 }
85
86 order := &Order{
87 ID: id,
88 Version: 0,
89 }
90
91 // Raise event
92 event := OrderCreatedEvent{
93 BaseEvent: BaseEvent{
94 Type: "OrderCreated",
95 ID: id,
96 Version: 1,
97 Timestamp: time.Now(),
98 },
99 CustomerID: customerID,
100 Items: items,
101 Total: total,
102 }
103
104 order.Apply(event)
105 order.RecordEvent(event)
106
107 return order, nil
108}
109
110func (o *Order) Pay(paymentID string, amount float64) error {
111 if o.Status != "pending" {
112 return errors.New("order must be pending to pay")
113 }
114 if amount != o.Total {
115 return errors.New("payment amount must match order total")
116 }
117
118 event := OrderPaidEvent{
119 BaseEvent: BaseEvent{
120 Type: "OrderPaid",
121 ID: o.ID,
122 Version: o.Version + 1,
123 Timestamp: time.Now(),
124 },
125 PaymentID: paymentID,
126 Amount: amount,
127 }
128
129 o.Apply(event)
130 o.RecordEvent(event)
131
132 return nil
133}
134
135func (o *Order) Ship(trackingNumber string) error {
136 if o.Status != "paid" {
137 return errors.New("order must be paid to ship")
138 }
139
140 event := OrderShippedEvent{
141 BaseEvent: BaseEvent{
142 Type: "OrderShipped",
143 ID: o.ID,
144 Version: o.Version + 1,
145 Timestamp: time.Now(),
146 },
147 TrackingNumber: trackingNumber,
148 }
149
150 o.Apply(event)
151 o.RecordEvent(event)
152
153 return nil
154}
155
156func (o *Order) Cancel(reason string) error {
157 if o.Status == "shipped" || o.Status == "delivered" {
158 return errors.New("cannot cancel shipped or delivered order")
159 }
160
161 event := OrderCancelledEvent{
162 BaseEvent: BaseEvent{
163 Type: "OrderCancelled",
164 ID: o.ID,
165 Version: o.Version + 1,
166 Timestamp: time.Now(),
167 },
168 Reason: reason,
169 }
170
171 o.Apply(event)
172 o.RecordEvent(event)
173
174 return nil
175}
176
177// Apply events to update state
178func (o *Order) Apply(event Event) {
179 switch e := event.(type) {
180 case OrderCreatedEvent:
181 o.CustomerID = e.CustomerID
182 o.Items = e.Items
183 o.Total = e.Total
184 o.Status = "pending"
185 o.Version = e.Version
186
187 case OrderPaidEvent:
188 o.Status = "paid"
189 o.Version = e.Version
190
191 case OrderShippedEvent:
192 o.Status = "shipped"
193 o.Version = e.Version
194
195 case OrderCancelledEvent:
196 o.Status = "cancelled"
197 o.Version = e.Version
198 }
199}
200
201func (o *Order) RecordEvent(event Event) {
202 o.events = append(o.events, event)
203}
204
205func (o *Order) GetUncommittedEvents() []Event {
206 return o.events
207}
208
209func (o *Order) ClearUncommittedEvents() {
210 o.events = nil
211}
212
213// Rebuild from events
214func RebuildOrder(events []Event) *Order {
215 order := &Order{}
216 for _, event := range events {
217 order.Apply(event)
218 }
219 return order
220}
221
222// Example usage
223func main() {
224 // Create order
225 items := []OrderItem{
226 {ProductID: "prod-1", Quantity: 2, Price: 29.99},
227 {ProductID: "prod-2", Quantity: 1, Price: 49.99},
228 }
229
230 order, err := NewOrder("order-123", "cust-456", items)
231 if err != nil {
232 fmt.Printf("Error: %v\n", err)
233 return
234 }
235
236 fmt.Printf("Order created: %s (status=%s, total=$%.2f)\n",
237 order.ID, order.Status, order.Total)
238
239 // Pay for order
240 err = order.Pay("pay-789", order.Total)
241 if err != nil {
242 fmt.Printf("Error: %v\n", err)
243 return
244 }
245
246 fmt.Printf("Order paid: %s (status=%s)\n", order.ID, order.Status)
247
248 // Ship order
249 err = order.Ship("TRK123456")
250 if err != nil {
251 fmt.Printf("Error: %v\n", err)
252 return
253 }
254
255 fmt.Printf("Order shipped: %s (status=%s)\n", order.ID, order.Status)
256
257 // Get uncommitted events
258 events := order.GetUncommittedEvents()
259 fmt.Printf("\nGenerated %d events:\n", len(events))
260 for _, evt := range events {
261 fmt.Printf(" - %s (v%d)\n", evt.EventType(), evt.EventVersion())
262 }
263
264 // Rebuild order from events
265 rebuilt := RebuildOrder(events)
266 fmt.Printf("\nRebuilt order: Status=%s, Total=$%.2f, Version=%d\n",
267 rebuilt.Status, rebuilt.Total, rebuilt.Version)
268}
Advanced Event Sourcing Patterns
Snapshots for Performance Optimization
When an aggregate has thousands of events, replaying all of them becomes slow. Snapshots solve this by storing the aggregate's state at a specific version, allowing you to load the snapshot and then replay only events after that point.
1// run
2package main
3
4import (
5 "encoding/json"
6 "fmt"
7 "time"
8)
9
10// Snapshot represents aggregate state at a specific version
11type Snapshot struct {
12 AggregateID string `json:"aggregate_id"`
13 AggregateType string `json:"aggregate_type"`
14 Version int `json:"version"`
15 State json.RawMessage `json:"state"`
16 Timestamp time.Time `json:"timestamp"`
17}
18
19// Account aggregate with snapshot support
20type Account struct {
21 ID string
22 Balance float64
23 Transactions int
24 Version int
25 LastUpdated time.Time
26 uncommittedEvents []Event
27}
28
29type Event interface {
30 EventType() string
31 AggregateID() string
32 EventVersion() int
33}
34
35type DepositedEvent struct {
36 ID string
37 Amount float64
38 Version int
39}
40
41func (e DepositedEvent) EventType() string { return "Deposited" }
42func (e DepositedEvent) AggregateID() string { return e.ID }
43func (e DepositedEvent) EventVersion() int { return e.Version }
44
45type WithdrawnEvent struct {
46 ID string
47 Amount float64
48 Version int
49}
50
51func (e WithdrawnEvent) EventType() string { return "Withdrawn" }
52func (e WithdrawnEvent) AggregateID() string { return e.ID }
53func (e WithdrawnEvent) EventVersion() int { return e.Version }
54
55// Create snapshot from current state
56func (a *Account) CreateSnapshot() (*Snapshot, error) {
57 stateData, err := json.Marshal(map[string]interface{}{
58 "id": a.ID,
59 "balance": a.Balance,
60 "transactions": a.Transactions,
61 "last_updated": a.LastUpdated,
62 })
63
64 if err != nil {
65 return nil, err
66 }
67
68 return &Snapshot{
69 AggregateID: a.ID,
70 AggregateType: "Account",
71 Version: a.Version,
72 State: stateData,
73 Timestamp: time.Now(),
74 }, nil
75}
76
77// Load from snapshot
78func LoadAccountFromSnapshot(snapshot *Snapshot) (*Account, error) {
79 var state map[string]interface{}
80 if err := json.Unmarshal(snapshot.State, &state); err != nil {
81 return nil, err
82 }
83
84 return &Account{
85 ID: state["id"].(string),
86 Balance: state["balance"].(float64),
87 Transactions: int(state["transactions"].(float64)),
88 Version: snapshot.Version,
89 }, nil
90}
91
92// Apply event to account
93func (a *Account) Apply(event Event) {
94 switch e := event.(type) {
95 case DepositedEvent:
96 a.Balance += e.Amount
97 a.Transactions++
98 a.Version = e.Version
99 a.LastUpdated = time.Now()
100
101 case WithdrawnEvent:
102 a.Balance -= e.Amount
103 a.Transactions++
104 a.Version = e.Version
105 a.LastUpdated = time.Now()
106 }
107}
108
109// Snapshot strategy: Take snapshot every N events
110type SnapshotStrategy struct {
111 snapshotInterval int
112}
113
114func NewSnapshotStrategy(interval int) *SnapshotStrategy {
115 return &SnapshotStrategy{snapshotInterval: interval}
116}
117
118func (s *SnapshotStrategy) ShouldTakeSnapshot(currentVersion int) bool {
119 return currentVersion%s.snapshotInterval == 0
120}
121
122func main() {
123 fmt.Println("Event Sourcing with Snapshots")
124 fmt.Println("=============================\n")
125
126 account := &Account{ID: "acc-123", Balance: 0, Version: 0}
127 strategy := NewSnapshotStrategy(100) // Snapshot every 100 events
128
129 // Simulate many transactions
130 events := make([]Event, 0)
131 for i := 1; i <= 250; i++ {
132 var event Event
133 if i%2 == 0 {
134 event = DepositedEvent{
135 ID: account.ID,
136 Amount: 100.0,
137 Version: i,
138 }
139 } else {
140 event = WithdrawnEvent{
141 ID: account.ID,
142 Amount: 50.0,
143 Version: i,
144 }
145 }
146 events = append(events, event)
147 account.Apply(event)
148
149 // Check if we should take a snapshot
150 if strategy.ShouldTakeSnapshot(i) {
151 snapshot, _ := account.CreateSnapshot()
152 fmt.Printf("📸 Snapshot taken at version %d (balance: $%.2f)\n",
153 snapshot.Version, account.Balance)
154 }
155 }
156
157 fmt.Printf("\nFinal state: Balance=$%.2f, Transactions=%d, Version=%d\n",
158 account.Balance, account.Transactions, account.Version)
159
160 // Demonstrate snapshot loading
161 fmt.Println("\n--- Snapshot Recovery Demonstration ---")
162
163 // Take final snapshot
164 finalSnapshot, _ := account.CreateSnapshot()
165
166 // Load from snapshot (much faster than replaying 250 events)
167 recovered, _ := LoadAccountFromSnapshot(finalSnapshot)
168 fmt.Printf("Recovered from snapshot: Balance=$%.2f, Version=%d\n",
169 recovered.Balance, recovered.Version)
170
171 // Only need to replay events after the snapshot
172 eventsAfterSnapshot := events[finalSnapshot.Version:]
173 fmt.Printf("Only need to replay %d events instead of %d\n",
174 len(eventsAfterSnapshot), len(events))
175}
Event Upcasting for Schema Evolution
As your system evolves, event schemas change. Event upcasting allows you to transform old event formats to new formats automatically.
1// run
2package main
3
4import (
5 "encoding/json"
6 "fmt"
7)
8
9// Version 1 of OrderCreated event
10type OrderCreatedV1 struct {
11 OrderID string `json:"order_id"`
12 CustomerID string `json:"customer_id"`
13 Total float64 `json:"total"`
14 Version int `json:"version"`
15}
16
17// Version 2 adds currency and tax information
18type OrderCreatedV2 struct {
19 OrderID string `json:"order_id"`
20 CustomerID string `json:"customer_id"`
21 Subtotal float64 `json:"subtotal"`
22 Tax float64 `json:"tax"`
23 Total float64 `json:"total"`
24 Currency string `json:"currency"`
25 Version int `json:"version"`
26}
27
28// Event metadata tracks schema version
29type EventEnvelope struct {
30 EventType string `json:"event_type"`
31 SchemaVersion int `json:"schema_version"`
32 Data json.RawMessage `json:"data"`
33}
34
35// Upcaster interface
36type EventUpcaster interface {
37 Upcast(data json.RawMessage) (json.RawMessage, error)
38 FromVersion() int
39 ToVersion() int
40}
41
42// Upcast OrderCreated from V1 to V2
43type OrderCreatedV1ToV2Upcaster struct{}
44
45func (u OrderCreatedV1ToV2Upcaster) FromVersion() int { return 1 }
46func (u OrderCreatedV1ToV2Upcaster) ToVersion() int { return 2 }
47
48func (u OrderCreatedV1ToV2Upcaster) Upcast(data json.RawMessage) (json.RawMessage, error) {
49 var v1 OrderCreatedV1
50 if err := json.Unmarshal(data, &v1); err != nil {
51 return nil, err
52 }
53
54 // Transform V1 to V2 with defaults
55 v2 := OrderCreatedV2{
56 OrderID: v1.OrderID,
57 CustomerID: v1.CustomerID,
58 Subtotal: v1.Total * 0.9, // Assume 10% tax
59 Tax: v1.Total * 0.1,
60 Total: v1.Total,
61 Currency: "USD", // Default currency
62 Version: v1.Version,
63 }
64
65 return json.Marshal(v2)
66}
67
68// Upcaster registry
69type UpcasterRegistry struct {
70 upcasters map[string][]EventUpcaster
71}
72
73func NewUpcasterRegistry() *UpcasterRegistry {
74 return &UpcasterRegistry{
75 upcasters: make(map[string][]EventUpcaster),
76 }
77}
78
79func (r *UpcasterRegistry) Register(eventType string, upcaster EventUpcaster) {
80 r.upcasters[eventType] = append(r.upcasters[eventType], upcaster)
81}
82
83func (r *UpcasterRegistry) Upcast(envelope EventEnvelope, targetVersion int) (json.RawMessage, error) {
84 currentVersion := envelope.SchemaVersion
85 currentData := envelope.Data
86
87 // Apply upcasters sequentially
88 for currentVersion < targetVersion {
89 upcasters := r.upcasters[envelope.EventType]
90
91 var applied bool
92 for _, upcaster := range upcasters {
93 if upcaster.FromVersion() == currentVersion {
94 upcastedData, err := upcaster.Upcast(currentData)
95 if err != nil {
96 return nil, err
97 }
98 currentData = upcastedData
99 currentVersion = upcaster.ToVersion()
100 applied = true
101 break
102 }
103 }
104
105 if !applied {
106 return nil, fmt.Errorf("no upcaster found from version %d", currentVersion)
107 }
108 }
109
110 return currentData, nil
111}
112
113func main() {
114 fmt.Println("Event Schema Evolution with Upcasting")
115 fmt.Println("=====================================\n")
116
117 // Create registry and register upcasters
118 registry := NewUpcasterRegistry()
119 registry.Register("OrderCreated", OrderCreatedV1ToV2Upcaster{})
120
121 // Old event in V1 format
122 v1Event := OrderCreatedV1{
123 OrderID: "order-123",
124 CustomerID: "cust-456",
125 Total: 100.00,
126 Version: 1,
127 }
128
129 v1Data, _ := json.Marshal(v1Event)
130 envelope := EventEnvelope{
131 EventType: "OrderCreated",
132 SchemaVersion: 1,
133 Data: v1Data,
134 }
135
136 fmt.Println("Original V1 Event:")
137 fmt.Printf("%s\n", v1Data)
138
139 // Upcast to V2
140 v2Data, err := registry.Upcast(envelope, 2)
141 if err != nil {
142 fmt.Printf("Error: %v\n", err)
143 return
144 }
145
146 fmt.Println("\nUpcasted to V2:")
147 fmt.Printf("%s\n", v2Data)
148
149 // Verify the upcasted event
150 var v2Event OrderCreatedV2
151 json.Unmarshal(v2Data, &v2Event)
152
153 fmt.Println("\nParsed V2 Event:")
154 fmt.Printf(" Order ID: %s\n", v2Event.OrderID)
155 fmt.Printf(" Subtotal: $%.2f\n", v2Event.Subtotal)
156 fmt.Printf(" Tax: $%.2f\n", v2Event.Tax)
157 fmt.Printf(" Total: $%.2f\n", v2Event.Total)
158 fmt.Printf(" Currency: %s\n", v2Event.Currency)
159}
CQRS Pattern
Think of a library where you have different sections for different purposes. The reference desk is optimized for quickly finding information, while the processing room is optimized for cataloging new books. Each system is designed for its specific task, even though they're dealing with the same collection.
CQRS separates the part of your system that changes data from the part that reads data. This allows you to optimize each side independently.
CQRS separates read and write models for optimized performance and scalability.
When to Use CQRS vs Traditional CRUD:
- Use CQRS: Complex business logic, different read/write patterns, high scalability needs, multiple read models
- Use Traditional CRUD: Simple CRUD operations, low traffic, tight coupling between reads and writes is acceptable
⚠️ Important: CQRS adds complexity to your system. Don't use it unless you have specific performance or scalability requirements that justify the additional complexity.
CQRS Implementation
1package cqrs
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "time"
9)
10
11// Command side
12
13type Command interface {
14 CommandType() string
15}
16
17type CreateOrderCommand struct {
18 OrderID string
19 CustomerID string
20 Items []OrderItem
21}
22
23func (c CreateOrderCommand) CommandType() string { return "CreateOrder" }
24
25type PayOrderCommand struct {
26 OrderID string
27 PaymentID string
28 Amount float64
29}
30
31func (c PayOrderCommand) CommandType() string { return "PayOrder" }
32
33// Command handler
34type CommandHandler interface {
35 Handle(ctx context.Context, cmd Command) error
36}
37
38type OrderCommandHandler struct {
39 eventStore EventStore
40 eventBus EventBus
41}
42
43func NewOrderCommandHandler(store EventStore, bus EventBus) *OrderCommandHandler {
44 return &OrderCommandHandler{
45 eventStore: store,
46 eventBus: bus,
47 }
48}
49
50func (h *OrderCommandHandler) Handle(ctx context.Context, cmd Command) error {
51 switch c := cmd.(type) {
52 case CreateOrderCommand:
53 return h.handleCreateOrder(ctx, c)
54 case PayOrderCommand:
55 return h.handlePayOrder(ctx, c)
56 default:
57 return fmt.Errorf("unknown command: %s", c.CommandType())
58 }
59}
60
61func (h *OrderCommandHandler) handleCreateOrder(ctx context.Context, cmd CreateOrderCommand) error {
62 // Create aggregate
63 order, err := NewOrder(cmd.OrderID, cmd.CustomerID, cmd.Items)
64 if err != nil {
65 return err
66 }
67
68 // Save events
69 events := order.GetUncommittedEvents()
70 if err := h.eventStore.SaveEvents(ctx, cmd.OrderID, events, 0); err != nil {
71 return err
72 }
73
74 // Publish events
75 for _, event := range events {
76 h.eventBus.Publish(ctx, event)
77 }
78
79 order.ClearUncommittedEvents()
80 return nil
81}
82
83func (h *OrderCommandHandler) handlePayOrder(ctx context.Context, cmd PayOrderCommand) error {
84 // Load aggregate from events
85 events, err := h.eventStore.GetEvents(ctx, cmd.OrderID)
86 if err != nil {
87 return err
88 }
89
90 order := RebuildOrder(events)
91
92 // Execute command
93 if err := order.Pay(cmd.PaymentID, cmd.Amount); err != nil {
94 return err
95 }
96
97 // Save new events
98 newEvents := order.GetUncommittedEvents()
99 if err := h.eventStore.SaveEvents(ctx, cmd.OrderID, newEvents, order.Version-len(newEvents)); err != nil {
100 return err
101 }
102
103 // Publish events
104 for _, event := range newEvents {
105 h.eventBus.Publish(ctx, event)
106 }
107
108 order.ClearUncommittedEvents()
109 return nil
110}
111
112// Query side
113
114type OrderReadModel struct {
115 ID string
116 CustomerID string
117 Items []OrderItem
118 Total float64
119 Status string
120 PaymentID string
121 TrackingNumber string
122 CreatedAt time.Time
123 UpdatedAt time.Time
124}
125
126type QueryHandler interface {
127 Handle(ctx context.Context, query interface{}) (interface{}, error)
128}
129
130type OrderQueryHandler struct {
131 db *sql.DB
132}
133
134func NewOrderQueryHandler(db *sql.DB) *OrderQueryHandler {
135 return &OrderQueryHandler{db: db}
136}
137
138type GetOrderQuery struct {
139 OrderID string
140}
141
142type ListOrdersQuery struct {
143 CustomerID string
144 Status string
145 Limit int
146 Offset int
147}
148
149func (q *OrderQueryHandler) Handle(ctx context.Context, query interface{}) (interface{}, error) {
150 switch qry := query.(type) {
151 case GetOrderQuery:
152 return q.getOrder(ctx, qry)
153 case ListOrdersQuery:
154 return q.listOrders(ctx, qry)
155 default:
156 return nil, fmt.Errorf("unknown query type")
157 }
158}
159
160func (q *OrderQueryHandler) getOrder(ctx context.Context, query GetOrderQuery) (*OrderReadModel, error) {
161 row := q.db.QueryRowContext(ctx, `
162 SELECT id, customer_id, items, total, status, payment_id, tracking_number, created_at, updated_at
163 FROM order_read_model
164 WHERE id = $1
165 `, query.OrderID)
166
167 var order OrderReadModel
168 var itemsJSON []byte
169
170 err := row.Scan(
171 &order.ID,
172 &order.CustomerID,
173 &itemsJSON,
174 &order.Total,
175 &order.Status,
176 &order.PaymentID,
177 &order.TrackingNumber,
178 &order.CreatedAt,
179 &order.UpdatedAt,
180 )
181
182 if err != nil {
183 return nil, err
184 }
185
186 json.Unmarshal(itemsJSON, &order.Items)
187
188 return &order, nil
189}
190
191func (q *OrderQueryHandler) listOrders(ctx context.Context, query ListOrdersQuery) ([]*OrderReadModel, error) {
192 rows, err := q.db.QueryContext(ctx, `
193 SELECT id, customer_id, items, total, status, created_at
194 FROM order_read_model
195 WHERE customer_id = $1 AND status = $2
196 ORDER BY created_at DESC
197 LIMIT $3 OFFSET $4
198 `, query.CustomerID, query.Status, query.Limit, query.Offset)
199
200 if err != nil {
201 return nil, err
202 }
203 defer rows.Close()
204
205 var orders []*OrderReadModel
206 for rows.Next() {
207 var order OrderReadModel
208 var itemsJSON []byte
209
210 if err := rows.Scan(
211 &order.ID,
212 &order.CustomerID,
213 &itemsJSON,
214 &order.Total,
215 &order.Status,
216 &order.CreatedAt,
217 ); err != nil {
218 continue
219 }
220
221 json.Unmarshal(itemsJSON, &order.Items)
222 orders = append(orders, &order)
223 }
224
225 return orders, nil
226}
227
228// Projection builder
229type OrderProjection struct {
230 db *sql.DB
231}
232
233func NewOrderProjection(db *sql.DB) *OrderProjection {
234 return &OrderProjection{db: db}
235}
236
237func (p *OrderProjection) HandleEvent(ctx context.Context, event Event) error {
238 switch e := event.(type) {
239 case OrderCreatedEvent:
240 return p.handleOrderCreated(ctx, e)
241 case OrderPaidEvent:
242 return p.handleOrderPaid(ctx, e)
243 case OrderShippedEvent:
244 return p.handleOrderShipped(ctx, e)
245 case OrderCancelledEvent:
246 return p.handleOrderCancelled(ctx, e)
247 }
248 return nil
249}
250
251func (p *OrderProjection) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
252 itemsJSON, _ := json.Marshal(event.Items)
253
254 _, err := p.db.ExecContext(ctx, `
255 INSERT INTO order_read_model
256 (id, customer_id, items, total, status, created_at, updated_at)
257 VALUES ($1, $2, $3, $4, $5, $6, $7)
258 `, event.ID, event.CustomerID, itemsJSON, event.Total, "pending", event.Timestamp, event.Timestamp)
259
260 return err
261}
262
263func (p *OrderProjection) handleOrderPaid(ctx context.Context, event OrderPaidEvent) error {
264 _, err := p.db.ExecContext(ctx, `
265 UPDATE order_read_model
266 SET status = $1, payment_id = $2, updated_at = $3
267 WHERE id = $4
268 `, "paid", event.PaymentID, event.Timestamp, event.ID)
269
270 return err
271}
272
273func (p *OrderProjection) handleOrderShipped(ctx context.Context, event OrderShippedEvent) error {
274 _, err := p.db.ExecContext(ctx, `
275 UPDATE order_read_model
276 SET status = $1, tracking_number = $2, updated_at = $3
277 WHERE id = $4
278 `, "shipped", event.TrackingNumber, event.Timestamp, event.ID)
279
280 return err
281}
282
283func (p *OrderProjection) handleOrderCancelled(ctx context.Context, event OrderCancelledEvent) error {
284 _, err := p.db.ExecContext(ctx, `
285 UPDATE order_read_model
286 SET status = $1, updated_at = $2
287 WHERE id = $3
288 `, "cancelled", event.Timestamp, event.ID)
289
290 return err
291}
Multiple Read Models for Different Use Cases
One of CQRS's powerful features is creating multiple read models optimized for different query patterns:
1// run
2package main
3
4import (
5 "encoding/json"
6 "fmt"
7 "sort"
8 "time"
9)
10
11// Different read models for different purposes
12
13// 1. Order Summary View - Optimized for list views
14type OrderSummaryView struct {
15 OrderID string
16 CustomerID string
17 Total float64
18 Status string
19 OrderDate time.Time
20 ItemCount int
21}
22
23// 2. Order Details View - Optimized for detail pages
24type OrderDetailsView struct {
25 OrderID string
26 CustomerID string
27 CustomerName string
28 Items []OrderItemDetail
29 Subtotal float64
30 Tax float64
31 Total float64
32 Status string
33 PaymentMethod string
34 ShippingAddr string
35 TrackingNumber string
36 OrderDate time.Time
37 ShipDate *time.Time
38}
39
40type OrderItemDetail struct {
41 ProductID string
42 ProductName string
43 Quantity int
44 UnitPrice float64
45 TotalPrice float64
46}
47
48// 3. Customer Order History View - Optimized for customer pages
49type CustomerOrderHistoryView struct {
50 CustomerID string
51 CustomerName string
52 TotalOrders int
53 TotalSpent float64
54 AverageOrder float64
55 RecentOrders []RecentOrder
56 TopProducts []ProductPurchase
57}
58
59type RecentOrder struct {
60 OrderID string
61 Total float64
62 Status string
63 OrderDate time.Time
64}
65
66type ProductPurchase struct {
67 ProductID string
68 ProductName string
69 TimesPurchased int
70 TotalSpent float64
71}
72
73// 4. Analytics View - Optimized for reporting
74type SalesAnalyticsView struct {
75 Period string
76 TotalOrders int
77 TotalRevenue float64
78 AverageOrderValue float64
79 OrdersByStatus map[string]int
80 TopCustomers []CustomerSales
81 TopProducts []ProductSales
82}
83
84type CustomerSales struct {
85 CustomerID string
86 CustomerName string
87 OrderCount int
88 TotalSpent float64
89}
90
91type ProductSales struct {
92 ProductID string
93 ProductName string
94 UnitsSold int
95 Revenue float64
96}
97
98// Projection manager builds all read models from events
99type MultiViewProjection struct {
100 summaries map[string]*OrderSummaryView
101 details map[string]*OrderDetailsView
102 customers map[string]*CustomerOrderHistoryView
103 analytics *SalesAnalyticsView
104}
105
106func NewMultiViewProjection() *MultiViewProjection {
107 return &MultiViewProjection{
108 summaries: make(map[string]*OrderSummaryView),
109 details: make(map[string]*OrderDetailsView),
110 customers: make(map[string]*CustomerOrderHistoryView),
111 analytics: &SalesAnalyticsView{
112 OrdersByStatus: make(map[string]int),
113 TopCustomers: make([]CustomerSales, 0),
114 TopProducts: make([]ProductSales, 0),
115 },
116 }
117}
118
119type OrderEvent interface {
120 GetOrderID() string
121}
122
123type OrderCreatedEvent struct {
124 OrderID string
125 CustomerID string
126 CustomerName string
127 Items []OrderItemDetail
128 Subtotal float64
129 Tax float64
130 Total float64
131 OrderDate time.Time
132}
133
134func (e OrderCreatedEvent) GetOrderID() string { return e.OrderID }
135
136func (p *MultiViewProjection) HandleOrderCreated(event OrderCreatedEvent) {
137 // 1. Update summary view
138 p.summaries[event.OrderID] = &OrderSummaryView{
139 OrderID: event.OrderID,
140 CustomerID: event.CustomerID,
141 Total: event.Total,
142 Status: "pending",
143 OrderDate: event.OrderDate,
144 ItemCount: len(event.Items),
145 }
146
147 // 2. Update details view
148 p.details[event.OrderID] = &OrderDetailsView{
149 OrderID: event.OrderID,
150 CustomerID: event.CustomerID,
151 CustomerName: event.CustomerName,
152 Items: event.Items,
153 Subtotal: event.Subtotal,
154 Tax: event.Tax,
155 Total: event.Total,
156 Status: "pending",
157 OrderDate: event.OrderDate,
158 }
159
160 // 3. Update customer history view
161 if customer, exists := p.customers[event.CustomerID]; exists {
162 customer.TotalOrders++
163 customer.TotalSpent += event.Total
164 customer.AverageOrder = customer.TotalSpent / float64(customer.TotalOrders)
165 customer.RecentOrders = append([]RecentOrder{{
166 OrderID: event.OrderID,
167 Total: event.Total,
168 Status: "pending",
169 OrderDate: event.OrderDate,
170 }}, customer.RecentOrders...)
171
172 // Update top products
173 for _, item := range event.Items {
174 found := false
175 for i := range customer.TopProducts {
176 if customer.TopProducts[i].ProductID == item.ProductID {
177 customer.TopProducts[i].TimesPurchased++
178 customer.TopProducts[i].TotalSpent += item.TotalPrice
179 found = true
180 break
181 }
182 }
183 if !found {
184 customer.TopProducts = append(customer.TopProducts, ProductPurchase{
185 ProductID: item.ProductID,
186 ProductName: item.ProductName,
187 TimesPurchased: 1,
188 TotalSpent: item.TotalPrice,
189 })
190 }
191 }
192 } else {
193 p.customers[event.CustomerID] = &CustomerOrderHistoryView{
194 CustomerID: event.CustomerID,
195 CustomerName: event.CustomerName,
196 TotalOrders: 1,
197 TotalSpent: event.Total,
198 AverageOrder: event.Total,
199 RecentOrders: []RecentOrder{{
200 OrderID: event.OrderID,
201 Total: event.Total,
202 Status: "pending",
203 OrderDate: event.OrderDate,
204 }},
205 TopProducts: make([]ProductPurchase, 0),
206 }
207 }
208
209 // 4. Update analytics view
210 p.analytics.TotalOrders++
211 p.analytics.TotalRevenue += event.Total
212 p.analytics.AverageOrderValue = p.analytics.TotalRevenue / float64(p.analytics.TotalOrders)
213 p.analytics.OrdersByStatus["pending"]++
214}
215
216func main() {
217 fmt.Println("CQRS: Multiple Read Models")
218 fmt.Println("===========================\n")
219
220 projection := NewMultiViewProjection()
221
222 // Simulate order creation events
223 events := []OrderCreatedEvent{
224 {
225 OrderID: "order-1",
226 CustomerID: "cust-1",
227 CustomerName: "Alice Smith",
228 Items: []OrderItemDetail{
229 {ProductID: "prod-1", ProductName: "Laptop", Quantity: 1, UnitPrice: 999.99, TotalPrice: 999.99},
230 {ProductID: "prod-2", ProductName: "Mouse", Quantity: 2, UnitPrice: 29.99, TotalPrice: 59.98},
231 },
232 Subtotal: 1059.97,
233 Tax: 105.99,
234 Total: 1165.96,
235 OrderDate: time.Now(),
236 },
237 {
238 OrderID: "order-2",
239 CustomerID: "cust-1",
240 CustomerName: "Alice Smith",
241 Items: []OrderItemDetail{
242 {ProductID: "prod-3", ProductName: "Keyboard", Quantity: 1, UnitPrice: 149.99, TotalPrice: 149.99},
243 },
244 Subtotal: 149.99,
245 Tax: 15.00,
246 Total: 164.99,
247 OrderDate: time.Now().Add(-24 * time.Hour),
248 },
249 {
250 OrderID: "order-3",
251 CustomerID: "cust-2",
252 CustomerName: "Bob Johnson",
253 Items: []OrderItemDetail{
254 {ProductID: "prod-1", ProductName: "Laptop", Quantity: 1, UnitPrice: 999.99, TotalPrice: 999.99},
255 },
256 Subtotal: 999.99,
257 Tax: 100.00,
258 Total: 1099.99,
259 OrderDate: time.Now().Add(-2 * time.Hour),
260 },
261 }
262
263 // Process events
264 for _, event := range events {
265 projection.HandleOrderCreated(event)
266 }
267
268 // Query 1: Order summary view
269 fmt.Println("1. Order Summary View (for list pages):")
270 for _, summary := range projection.summaries {
271 fmt.Printf(" Order %s: Customer=%s, Total=$%.2f, Items=%d, Status=%s\n",
272 summary.OrderID, summary.CustomerID, summary.Total, summary.ItemCount, summary.Status)
273 }
274
275 // Query 2: Order details view
276 fmt.Println("\n2. Order Details View (for detail pages):")
277 if details, exists := projection.details["order-1"]; exists {
278 fmt.Printf(" Order: %s\n", details.OrderID)
279 fmt.Printf(" Customer: %s\n", details.CustomerName)
280 fmt.Printf(" Items:\n")
281 for _, item := range details.Items {
282 fmt.Printf(" - %s (x%d) @ $%.2f = $%.2f\n",
283 item.ProductName, item.Quantity, item.UnitPrice, item.TotalPrice)
284 }
285 fmt.Printf(" Subtotal: $%.2f\n", details.Subtotal)
286 fmt.Printf(" Tax: $%.2f\n", details.Tax)
287 fmt.Printf(" Total: $%.2f\n", details.Total)
288 }
289
290 // Query 3: Customer history view
291 fmt.Println("\n3. Customer Order History View (for customer pages):")
292 if history, exists := projection.customers["cust-1"]; exists {
293 fmt.Printf(" Customer: %s\n", history.CustomerName)
294 fmt.Printf(" Total Orders: %d\n", history.TotalOrders)
295 fmt.Printf(" Total Spent: $%.2f\n", history.TotalSpent)
296 fmt.Printf(" Average Order: $%.2f\n", history.AverageOrder)
297 fmt.Printf(" Recent Orders:\n")
298 for _, order := range history.RecentOrders {
299 fmt.Printf(" - %s: $%.2f (%s)\n", order.OrderID, order.Total, order.Status)
300 }
301 fmt.Printf(" Top Products:\n")
302
303 // Sort by times purchased
304 sort.Slice(history.TopProducts, func(i, j int) bool {
305 return history.TopProducts[i].TimesPurchased > history.TopProducts[j].TimesPurchased
306 })
307
308 for _, product := range history.TopProducts {
309 fmt.Printf(" - %s: purchased %d times, spent $%.2f\n",
310 product.ProductName, product.TimesPurchased, product.TotalSpent)
311 }
312 }
313
314 // Query 4: Analytics view
315 fmt.Println("\n4. Sales Analytics View (for reporting):")
316 fmt.Printf(" Total Orders: %d\n", projection.analytics.TotalOrders)
317 fmt.Printf(" Total Revenue: $%.2f\n", projection.analytics.TotalRevenue)
318 fmt.Printf(" Average Order Value: $%.2f\n", projection.analytics.AverageOrderValue)
319 fmt.Printf(" Orders by Status:\n")
320 for status, count := range projection.analytics.OrdersByStatus {
321 fmt.Printf(" - %s: %d\n", status, count)
322 }
323
324 fmt.Println("\n💡 Each view is optimized for its specific use case!")
325 fmt.Println(" - Summary: Fast list queries")
326 fmt.Println(" - Details: Complete order information")
327 fmt.Println(" - Customer History: Customer-centric insights")
328 fmt.Println(" - Analytics: Business intelligence")
329}
Event Store Implementation
Imagine a historian's archive where every event is recorded chronologically and never altered. New events are always added to the end of the timeline, creating a complete, immutable record of history. This is exactly how an event store works - it's an append-only log of all events that have occurred in your system.
Event store persists events in an append-only log.
Real-world Example: Financial trading systems use event stores to maintain a complete audit trail of all trades. Every buy, sell, cancel, and modify operation is recorded as an immutable event, ensuring complete traceability and regulatory compliance.
💡 Key Takeaway: The append-only nature of event stores provides strong consistency guarantees and enables powerful features like event replay and temporal queries, but requires different thinking than traditional databases.
PostgreSQL Event Store
1package eventstore
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "time"
10)
11
12type EventStore interface {
13 SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
14 GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
15 GetEventsSince(ctx context.Context, aggregateID string, version int) ([]Event, error)
16 GetAllEvents(ctx context.Context, limit, offset int) ([]Event, error)
17}
18
19type PostgresEventStore struct {
20 db *sql.DB
21}
22
23func NewPostgresEventStore(db *sql.DB) *PostgresEventStore {
24 return &PostgresEventStore{db: db}
25}
26
27type StoredEvent struct {
28 ID int64
29 AggregateID string
30 AggregateType string
31 EventType string
32 EventData json.RawMessage
33 Metadata json.RawMessage
34 Version int
35 CreatedAt time.Time
36}
37
38func (s *PostgresEventStore) SaveEvents(
39 ctx context.Context,
40 aggregateID string,
41 events []Event,
42 expectedVersion int,
43) error {
44 tx, err := s.db.BeginTx(ctx, nil)
45 if err != nil {
46 return err
47 }
48 defer tx.Rollback()
49
50 // Check current version for optimistic concurrency control
51 var currentVersion int
52 err = tx.QueryRowContext(ctx, `
53 SELECT COALESCE(MAX(version), 0)
54 FROM events
55 WHERE aggregate_id = $1
56 `, aggregateID).Scan(¤tVersion)
57
58 if err != nil && err != sql.ErrNoRows {
59 return err
60 }
61
62 if currentVersion != expectedVersion {
63 return errors.New("concurrency conflict: version mismatch")
64 }
65
66 // Insert events
67 for _, event := range events {
68 eventData, err := json.Marshal(event)
69 if err != nil {
70 return err
71 }
72
73 _, err = tx.ExecContext(ctx, `
74 INSERT INTO events
75 (aggregate_id, aggregate_type, event_type, event_data, version, created_at)
76 VALUES ($1, $2, $3, $4, $5, $6)
77 `, aggregateID, "Order", event.EventType(), eventData, event.EventVersion(), event.OccurredAt())
78
79 if err != nil {
80 return err
81 }
82 }
83
84 return tx.Commit()
85}
86
87func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
88 rows, err := s.db.QueryContext(ctx, `
89 SELECT event_type, event_data, version, created_at
90 FROM events
91 WHERE aggregate_id = $1
92 ORDER BY version ASC
93 `, aggregateID)
94
95 if err != nil {
96 return nil, err
97 }
98 defer rows.Close()
99
100 var events []Event
101 for rows.Next() {
102 var eventType string
103 var eventData json.RawMessage
104 var version int
105 var createdAt time.Time
106
107 if err := rows.Scan(&eventType, &eventData, &version, &createdAt); err != nil {
108 return nil, err
109 }
110
111 event := s.deserializeEvent(eventType, eventData, version, createdAt)
112 if event != nil {
113 events = append(events, event)
114 }
115 }
116
117 return events, nil
118}
119
120func (s *PostgresEventStore) GetEventsSince(
121 ctx context.Context,
122 aggregateID string,
123 version int,
124) ([]Event, error) {
125 rows, err := s.db.QueryContext(ctx, `
126 SELECT event_type, event_data, version, created_at
127 FROM events
128 WHERE aggregate_id = $1 AND version > $2
129 ORDER BY version ASC
130 `, aggregateID, version)
131
132 if err != nil {
133 return nil, err
134 }
135 defer rows.Close()
136
137 var events []Event
138 for rows.Next() {
139 var eventType string
140 var eventData json.RawMessage
141 var ver int
142 var createdAt time.Time
143
144 if err := rows.Scan(&eventType, &eventData, &ver, &createdAt); err != nil {
145 return nil, err
146 }
147
148 event := s.deserializeEvent(eventType, eventData, ver, createdAt)
149 if event != nil {
150 events = append(events, event)
151 }
152 }
153
154 return events, nil
155}
156
157func (s *PostgresEventStore) GetAllEvents(ctx context.Context, limit, offset int) ([]Event, error) {
158 rows, err := s.db.QueryContext(ctx, `
159 SELECT aggregate_id, event_type, event_data, version, created_at
160 FROM events
161 ORDER BY id ASC
162 LIMIT $1 OFFSET $2
163 `, limit, offset)
164
165 if err != nil {
166 return nil, err
167 }
168 defer rows.Close()
169
170 var events []Event
171 for rows.Next() {
172 var aggregateID, eventType string
173 var eventData json.RawMessage
174 var version int
175 var createdAt time.Time
176
177 if err := rows.Scan(&aggregateID, &eventType, &eventData, &version, &createdAt); err != nil {
178 return nil, err
179 }
180
181 event := s.deserializeEvent(eventType, eventData, version, createdAt)
182 if event != nil {
183 events = append(events, event)
184 }
185 }
186
187 return events, nil
188}
189
190func (s *PostgresEventStore) deserializeEvent(
191 eventType string,
192 data json.RawMessage,
193 version int,
194 timestamp time.Time,
195) Event {
196 switch eventType {
197 case "OrderCreated":
198 var event OrderCreatedEvent
199 json.Unmarshal(data, &event)
200 return event
201
202 case "OrderPaid":
203 var event OrderPaidEvent
204 json.Unmarshal(data, &event)
205 return event
206
207 case "OrderShipped":
208 var event OrderShippedEvent
209 json.Unmarshal(data, &event)
210 return event
211
212 case "OrderCancelled":
213 var event OrderCancelledEvent
214 json.Unmarshal(data, &event)
215 return event
216
217 default:
218 return nil
219 }
220}
221
222// Snapshot support for performance
223type Snapshot struct {
224 AggregateID string
225 State json.RawMessage
226 Version int
227 CreatedAt time.Time
228}
229
230func (s *PostgresEventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
231 _, err := s.db.ExecContext(ctx, `
232 INSERT INTO snapshots (aggregate_id, state, version, created_at)
233 VALUES ($1, $2, $3, $4)
234 ON CONFLICT (aggregate_id) DO UPDATE
235 SET state = $2, version = $3, created_at = $4
236 `, snapshot.AggregateID, snapshot.State, snapshot.Version, snapshot.CreatedAt)
237
238 return err
239}
240
241func (s *PostgresEventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
242 var snapshot Snapshot
243 err := s.db.QueryRowContext(ctx, `
244 SELECT aggregate_id, state, version, created_at
245 FROM snapshots
246 WHERE aggregate_id = $1
247 `, aggregateID).Scan(&snapshot.AggregateID, &snapshot.State, &snapshot.Version, &snapshot.CreatedAt)
248
249 if err == sql.ErrNoRows {
250 return nil, nil
251 }
252
253 return &snapshot, err
254}
255
256// Database schema
257const schema = `
258CREATE TABLE IF NOT EXISTS events (
259 id BIGSERIAL PRIMARY KEY,
260 aggregate_id VARCHAR(255) NOT NULL,
261 aggregate_type VARCHAR(255) NOT NULL,
262 event_type VARCHAR(255) NOT NULL,
263 event_data JSONB NOT NULL,
264 metadata JSONB,
265 version INTEGER NOT NULL,
266 created_at TIMESTAMP NOT NULL,
267 UNIQUE (aggregate_id, version)
268);
269
270CREATE INDEX IF NOT EXISTS idx_events_aggregate ON events(aggregate_id, version);
271CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
272CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
273
274CREATE TABLE IF NOT EXISTS snapshots (
275 aggregate_id VARCHAR(255) PRIMARY KEY,
276 state JSONB NOT NULL,
277 version INTEGER NOT NULL,
278 created_at TIMESTAMP NOT NULL
279);
280`
Event Bus and Message Brokers
Think of an event bus like a city's postal service. When you send a letter, you don't need to know exactly who will receive it or how it will get there. The postal service handles routing the letter to all interested recipients who have signed up for that type of mail.
Event bus distributes events to subscribers for processing.
When to Use Different Message Brokers:
- In-Memory: Simple applications, testing, single process
- RabbitMQ: Complex routing, reliable delivery, message acknowledgments
- Apache Kafka: High throughput, event streaming, replay capabilities
- Redis Pub/Sub: Simple pub/sub, high speed, lightweight
- NATS: Cloud-native, high performance, geographic distribution
⚠️ Important: Choose your message broker based on your reliability needs. In-memory buses don't persist messages, so if a service is down when an event is published, that event is lost forever.
In-Memory Event Bus
1package eventbus
2
3import (
4 "context"
5 "log"
6 "sync"
7)
8
9type EventHandler func(ctx context.Context, event Event) error
10
11type EventBus interface {
12 Subscribe(eventType string, handler EventHandler)
13 Publish(ctx context.Context, event Event) error
14}
15
16type InMemoryEventBus struct {
17 mu sync.RWMutex
18 handlers map[string][]EventHandler
19}
20
21func NewInMemoryEventBus() *InMemoryEventBus {
22 return &InMemoryEventBus{
23 handlers: make(map[string][]EventHandler),
24 }
25}
26
27func (bus *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) {
28 bus.mu.Lock()
29 defer bus.mu.Unlock()
30
31 bus.handlers[eventType] = append(bus.handlers[eventType], handler)
32}
33
34func (bus *InMemoryEventBus) Publish(ctx context.Context, event Event) error {
35 bus.mu.RLock()
36 handlers := bus.handlers[event.EventType()]
37 bus.mu.RUnlock()
38
39 var wg sync.WaitGroup
40 errors := make(chan error, len(handlers))
41
42 for _, handler := range handlers {
43 wg.Add(1)
44 go func(h EventHandler) {
45 defer wg.Done()
46 if err := h(ctx, event); err != nil {
47 errors <- err
48 }
49 }(handler)
50 }
51
52 wg.Wait()
53 close(errors)
54
55 // Log errors but don't fail the publish
56 for err := range errors {
57 log.Printf("Event handler error: %v", err)
58 }
59
60 return nil
61}
RabbitMQ Event Bus
1package eventbus
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 amqp "github.com/rabbitmq/amqp091-go"
9)
10
11type RabbitMQEventBus struct {
12 conn *amqp.Connection
13 channel *amqp.Channel
14 exchange string
15}
16
17func NewRabbitMQEventBus(url, exchange string) (*RabbitMQEventBus, error) {
18 conn, err := amqp.Dial(url)
19 if err != nil {
20 return nil, err
21 }
22
23 channel, err := conn.Channel()
24 if err != nil {
25 return nil, err
26 }
27
28 // Declare exchange
29 err = channel.ExchangeDeclare(
30 exchange,
31 "topic",
32 true,
33 false,
34 false,
35 false,
36 nil,
37 )
38 if err != nil {
39 return nil, err
40 }
41
42 return &RabbitMQEventBus{
43 conn: conn,
44 channel: channel,
45 exchange: exchange,
46 }, nil
47}
48
49func (bus *RabbitMQEventBus) Publish(ctx context.Context, event Event) error {
50 data, err := json.Marshal(event)
51 if err != nil {
52 return err
53 }
54
55 return bus.channel.PublishWithContext(
56 ctx,
57 bus.exchange,
58 event.EventType(), // routing key
59 false,
60 false,
61 amqp.Publishing{
62 ContentType: "application/json",
63 Body: data,
64 },
65 )
66}
67
68func (bus *RabbitMQEventBus) Subscribe(queue, eventType string, handler EventHandler) error {
69 // Declare queue
70 q, err := bus.channel.QueueDeclare(
71 queue,
72 true,
73 false,
74 false,
75 false,
76 nil,
77 )
78 if err != nil {
79 return err
80 }
81
82 // Bind queue to exchange
83 err = bus.channel.QueueBind(
84 q.Name,
85 eventType,
86 bus.exchange,
87 false,
88 nil,
89 )
90 if err != nil {
91 return err
92 }
93
94 // Consume messages
95 msgs, err := bus.channel.Consume(
96 q.Name,
97 "",
98 false,
99 false,
100 false,
101 false,
102 nil,
103 )
104 if err != nil {
105 return err
106 }
107
108 go func() {
109 for msg := range msgs {
110 var event Event
111 if err := json.Unmarshal(msg.Body, &event); err != nil {
112 msg.Nack(false, false)
113 continue
114 }
115
116 if err := handler(context.Background(), event); err != nil {
117 msg.Nack(false, true) // requeue
118 } else {
119 msg.Ack(false)
120 }
121 }
122 }()
123
124 return nil
125}
126
127func (bus *RabbitMQEventBus) Close() error {
128 bus.channel.Close()
129 return bus.conn.Close()
130}
Production Patterns
Think of event-driven systems in production like a live television broadcast. Everything needs to work perfectly, but when things go wrong, you need backup plans, retry mechanisms, and the ability to replay segments if something was missed.
Production event-driven systems require robust error handling, monitoring, and replay capabilities.
Common Pitfalls in Production:
- Not handling duplicate events (idempotency)
- Losing events during system failures
- Projections getting out of sync with events
- Schema evolution breaking existing consumers
- Monitoring that doesn't capture async failures
- Cascading failures when consumers fail
💡 Key Takeaway: Production event-driven systems fail in ways that traditional systems don't. Plan for failure, implement comprehensive monitoring, and design for recovery from the start.
Event Replay and Projection Rebuilding
1package projection
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8)
9
10type ProjectionManager struct {
11 eventStore EventStore
12 projections []Projection
13}
14
15type Projection interface {
16 Name() string
17 HandleEvent(ctx context.Context, event Event) error
18 Reset(ctx context.Context) error
19}
20
21func NewProjectionManager(store EventStore) *ProjectionManager {
22 return &ProjectionManager{
23 eventStore: store,
24 projections: []Projection{},
25 }
26}
27
28func (pm *ProjectionManager) RegisterProjection(projection Projection) {
29 pm.projections = append(pm.projections, projection)
30}
31
32// Rebuild all projections from events
33func (pm *ProjectionManager) RebuildAll(ctx context.Context) error {
34 for _, projection := range pm.projections {
35 if err := pm.Rebuild(ctx, projection); err != nil {
36 return err
37 }
38 }
39 return nil
40}
41
42func (pm *ProjectionManager) Rebuild(ctx context.Context, projection Projection) error {
43 log.Printf("Rebuilding projection: %s", projection.Name())
44
45 // Reset projection
46 if err := projection.Reset(ctx); err != nil {
47 return fmt.Errorf("failed to reset projection: %w", err)
48 }
49
50 // Replay all events
51 offset := 0
52 batchSize := 100
53
54 for {
55 events, err := pm.eventStore.GetAllEvents(ctx, batchSize, offset)
56 if err != nil {
57 return err
58 }
59
60 if len(events) == 0 {
61 break
62 }
63
64 for _, event := range events {
65 if err := projection.HandleEvent(ctx, event); err != nil {
66 log.Printf("Error handling event in projection %s: %v", projection.Name(), err)
67 }
68 }
69
70 offset += len(events)
71 log.Printf("Processed %d events for projection %s", offset, projection.Name())
72
73 if len(events) < batchSize {
74 break
75 }
76 }
77
78 log.Printf("Projection %s rebuilt successfully", projection.Name())
79 return nil
80}
81
82// Start continuous projection
83func (pm *ProjectionManager) Start(ctx context.Context, eventBus EventBus) {
84 for _, projection := range pm.projections {
85 p := projection
86 eventBus.Subscribe("*", func(ctx context.Context, event Event) error {
87 return p.HandleEvent(ctx, event)
88 })
89 }
90}
Idempotency and Deduplication
1package idempotency
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "time"
8)
9
10type IdempotencyChecker struct {
11 db *sql.DB
12}
13
14func NewIdempotencyChecker(db *sql.DB) *IdempotencyChecker {
15 return &IdempotencyChecker{db: db}
16}
17
18func (ic *IdempotencyChecker) IsProcessed(ctx context.Context, eventID string) (bool, error) {
19 var count int
20 err := ic.db.QueryRowContext(ctx, `
21 SELECT COUNT(*) FROM processed_events WHERE event_id = $1
22 `, eventID).Scan(&count)
23
24 if err != nil {
25 return false, err
26 }
27
28 return count > 0, nil
29}
30
31func (ic *IdempotencyChecker) MarkProcessed(ctx context.Context, eventID string) error {
32 _, err := ic.db.ExecContext(ctx, `
33 INSERT INTO processed_events (event_id, processed_at)
34 VALUES ($1, $2)
35 ON CONFLICT (event_id) DO NOTHING
36 `, eventID, time.Now())
37
38 return err
39}
40
41// Idempotent event handler wrapper
42func MakeIdempotent(checker *IdempotencyChecker, handler EventHandler) EventHandler {
43 return func(ctx context.Context, event Event) error {
44 eventID := event.AggregateID() + "-" + event.EventType() + "-" + fmt.Sprint(event.EventVersion())
45
46 processed, err := checker.IsProcessed(ctx, eventID)
47 if err != nil {
48 return err
49 }
50
51 if processed {
52 return nil // Already processed
53 }
54
55 if err := handler(ctx, event); err != nil {
56 return err
57 }
58
59 return checker.MarkProcessed(ctx, eventID)
60 }
61}
Saga Pattern for Distributed Transactions
A saga is a sequence of local transactions where each transaction updates data within a single service. If one transaction fails, the saga executes compensating transactions to undo the impact of the preceding transactions.
1// run
2package main
3
4import (
5 "context"
6 "errors"
7 "fmt"
8 "sync"
9 "time"
10)
11
12// Saga state
13type SagaState string
14
15const (
16 SagaStatePending SagaState = "pending"
17 SagaStateRunning SagaState = "running"
18 SagaStateCompleted SagaState = "completed"
19 SagaStateFailed SagaState = "failed"
20 SagaStateRolledBack SagaState = "rolled_back"
21)
22
23// Saga step
24type SagaStep struct {
25 Name string
26 Execute func(context.Context, *SagaContext) error
27 Compensate func(context.Context, *SagaContext) error
28 MaxRetries int
29}
30
31// Saga context
32type SagaContext struct {
33 Data map[string]interface{}
34 mu sync.RWMutex
35}
36
37func NewSagaContext() *SagaContext {
38 return &SagaContext{
39 Data: make(map[string]interface{}),
40 }
41}
42
43func (c *SagaContext) Set(key string, value interface{}) {
44 c.mu.Lock()
45 defer c.mu.Unlock()
46 c.Data[key] = value
47}
48
49func (c *SagaContext) Get(key string) (interface{}, bool) {
50 c.mu.RLock()
51 defer c.mu.RUnlock()
52 val, ok := c.Data[key]
53 return val, ok
54}
55
56// Saga instance
57type Saga struct {
58 ID string
59 Steps []SagaStep
60 Context *SagaContext
61 State SagaState
62 CompletedSteps int
63 CurrentStep int
64 Error error
65 StartTime time.Time
66 CompletionTime time.Time
67}
68
69func NewSaga(id string, steps []SagaStep) *Saga {
70 return &Saga{
71 ID: id,
72 Steps: steps,
73 Context: NewSagaContext(),
74 State: SagaStatePending,
75 }
76}
77
78// Saga orchestrator
79type SagaOrchestrator struct {
80 sagas map[string]*Saga
81 mu sync.RWMutex
82}
83
84func NewSagaOrchestrator() *SagaOrchestrator {
85 return &SagaOrchestrator{
86 sagas: make(map[string]*Saga),
87 }
88}
89
90func (o *SagaOrchestrator) Execute(ctx context.Context, saga *Saga) error {
91 o.mu.Lock()
92 o.sagas[saga.ID] = saga
93 o.mu.Unlock()
94
95 saga.State = SagaStateRunning
96 saga.StartTime = time.Now()
97
98 fmt.Printf("Starting saga: %s\n", saga.ID)
99
100 // Execute steps
101 for i, step := range saga.Steps {
102 saga.CurrentStep = i
103 fmt.Printf(" Executing step %d: %s\n", i+1, step.Name)
104
105 if err := o.executeStepWithRetry(ctx, saga, step); err != nil {
106 saga.Error = err
107 saga.State = SagaStateFailed
108 fmt.Printf(" ✗ Step failed: %v\n", err)
109
110 // Compensate
111 fmt.Println(" Starting compensation...")
112 if err := o.compensate(ctx, saga); err != nil {
113 fmt.Printf(" ✗ Compensation failed: %v\n", err)
114 return err
115 }
116
117 saga.State = SagaStateRolledBack
118 saga.CompletionTime = time.Now()
119 return err
120 }
121
122 saga.CompletedSteps++
123 fmt.Printf(" ✓ Step completed: %s\n", step.Name)
124 }
125
126 saga.State = SagaStateCompleted
127 saga.CompletionTime = time.Now()
128 fmt.Printf("✓ Saga completed: %s (duration: %v)\n",
129 saga.ID, saga.CompletionTime.Sub(saga.StartTime))
130
131 return nil
132}
133
134func (o *SagaOrchestrator) executeStepWithRetry(
135 ctx context.Context,
136 saga *Saga,
137 step SagaStep,
138) error {
139 var err error
140 for attempt := 0; attempt <= step.MaxRetries; attempt++ {
141 if attempt > 0 {
142 fmt.Printf(" Retry attempt %d/%d\n", attempt, step.MaxRetries)
143 time.Sleep(time.Duration(attempt) * time.Second)
144 }
145
146 err = step.Execute(ctx, saga.Context)
147 if err == nil {
148 return nil
149 }
150 }
151 return err
152}
153
154func (o *SagaOrchestrator) compensate(ctx context.Context, saga *Saga) error {
155 // Compensate in reverse order
156 for i := saga.CompletedSteps - 1; i >= 0; i-- {
157 step := saga.Steps[i]
158 fmt.Printf(" Compensating step: %s\n", step.Name)
159
160 if step.Compensate != nil {
161 if err := step.Compensate(ctx, saga.Context); err != nil {
162 return fmt.Errorf("compensation failed for %s: %w", step.Name, err)
163 }
164 fmt.Printf(" ✓ Compensated: %s\n", step.Name)
165 }
166 }
167 return nil
168}
169
170func (o *SagaOrchestrator) GetSaga(id string) *Saga {
171 o.mu.RLock()
172 defer o.mu.RUnlock()
173 return o.sagas[id]
174}
175
176// Simulated services
177type InventoryService struct {
178 stock map[string]int
179 mu sync.Mutex
180}
181
182func NewInventoryService() *InventoryService {
183 return &InventoryService{
184 stock: map[string]int{
185 "product-1": 100,
186 "product-2": 50,
187 },
188 }
189}
190
191func (s *InventoryService) ReserveInventory(productID string, quantity int) error {
192 s.mu.Lock()
193 defer s.mu.Unlock()
194
195 available := s.stock[productID]
196 if available < quantity {
197 return fmt.Errorf("insufficient inventory: need %d, have %d", quantity, available)
198 }
199
200 s.stock[productID] -= quantity
201 return nil
202}
203
204func (s *InventoryService) ReleaseInventory(productID string, quantity int) {
205 s.mu.Lock()
206 defer s.mu.Unlock()
207 s.stock[productID] += quantity
208}
209
210type PaymentService struct {
211 successRate float64
212}
213
214func NewPaymentService(successRate float64) *PaymentService {
215 return &PaymentService{successRate: successRate}
216}
217
218func (s *PaymentService) ProcessPayment(orderID string, amount float64) (string, error) {
219 // Simulate payment processing
220 time.Sleep(100 * time.Millisecond)
221
222 // Simulate random failures
223 if time.Now().UnixNano()%100 > int64(s.successRate*100) {
224 return "", errors.New("payment processing failed")
225 }
226
227 paymentID := fmt.Sprintf("pay-%s", orderID)
228 return paymentID, nil
229}
230
231func (s *PaymentService) RefundPayment(paymentID string) error {
232 time.Sleep(50 * time.Millisecond)
233 return nil
234}
235
236type ShippingService struct{}
237
238func NewShippingService() *ShippingService {
239 return &ShippingService{}
240}
241
242func (s *ShippingService) ShipOrder(orderID string) (string, error) {
243 time.Sleep(100 * time.Millisecond)
244 trackingID := fmt.Sprintf("track-%s", orderID)
245 return trackingID, nil
246}
247
248func (s *ShippingService) CancelShipment(trackingID string) error {
249 time.Sleep(50 * time.Millisecond)
250 return nil
251}
252
253func main() {
254 fmt.Println("Saga Orchestration Pattern")
255 fmt.Println("===========================\n")
256
257 // Initialize services
258 inventory := NewInventoryService()
259 payment := NewPaymentService(0.8) // 80% success rate
260 shipping := NewShippingService()
261 orchestrator := NewSagaOrchestrator()
262
263 // Order details
264 orderID := "order-123"
265 productID := "product-1"
266 quantity := 5
267 amount := 99.99
268
269 // Define saga steps
270 steps := []SagaStep{
271 {
272 Name: "Reserve Inventory",
273 MaxRetries: 2,
274 Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
275 if err := inventory.ReserveInventory(productID, quantity); err != nil {
276 return err
277 }
278 sagaCtx.Set("product_id", productID)
279 sagaCtx.Set("quantity", quantity)
280 return nil
281 },
282 Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
283 productID, _ := sagaCtx.Get("product_id")
284 quantity, _ := sagaCtx.Get("quantity")
285 inventory.ReleaseInventory(productID.(string), quantity.(int))
286 return nil
287 },
288 },
289 {
290 Name: "Process Payment",
291 MaxRetries: 3,
292 Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
293 paymentID, err := payment.ProcessPayment(orderID, amount)
294 if err != nil {
295 return err
296 }
297 sagaCtx.Set("payment_id", paymentID)
298 return nil
299 },
300 Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
301 paymentID, ok := sagaCtx.Get("payment_id")
302 if !ok {
303 return nil
304 }
305 return payment.RefundPayment(paymentID.(string))
306 },
307 },
308 {
309 Name: "Ship Order",
310 MaxRetries: 2,
311 Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
312 trackingID, err := shipping.ShipOrder(orderID)
313 if err != nil {
314 return err
315 }
316 sagaCtx.Set("tracking_id", trackingID)
317 return nil
318 },
319 Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
320 trackingID, ok := sagaCtx.Get("tracking_id")
321 if !ok {
322 return nil
323 }
324 return shipping.CancelShipment(trackingID.(string))
325 },
326 },
327 }
328
329 // Create and execute saga
330 saga := NewSaga(orderID, steps)
331 ctx := context.Background()
332
333 if err := orchestrator.Execute(ctx, saga); err != nil {
334 fmt.Printf("\n✗ Saga failed: %v\n", err)
335 fmt.Printf("State: %s\n", saga.State)
336 } else {
337 fmt.Printf("\n✓ Order fulfilled successfully\n")
338
339 if trackingID, ok := saga.Context.Get("tracking_id"); ok {
340 fmt.Printf("Tracking ID: %s\n", trackingID)
341 }
342 }
343
344 // Display final saga state
345 fmt.Printf("\nSaga Summary:\n")
346 fmt.Printf(" ID: %s\n", saga.ID)
347 fmt.Printf(" State: %s\n", saga.State)
348 fmt.Printf(" Completed Steps: %d/%d\n", saga.CompletedSteps, len(saga.Steps))
349 fmt.Printf(" Duration: %v\n", saga.CompletionTime.Sub(saga.StartTime))
350}
Monitoring and Observability
Event-driven systems require specialized monitoring strategies to track event flows, detect failures, and ensure system health.
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "sync"
8 "time"
9)
10
11// Event metrics
12type EventMetrics struct {
13 mu sync.RWMutex
14 eventCounts map[string]int64
15 processingTimes map[string][]time.Duration
16 failureCounts map[string]int64
17 lastProcessedTime map[string]time.Time
18}
19
20func NewEventMetrics() *EventMetrics {
21 return &EventMetrics{
22 eventCounts: make(map[string]int64),
23 processingTimes: make(map[string][]time.Duration),
24 failureCounts: make(map[string]int64),
25 lastProcessedTime: make(map[string]time.Time),
26 }
27}
28
29func (m *EventMetrics) RecordEvent(eventType string, duration time.Duration, success bool) {
30 m.mu.Lock()
31 defer m.mu.Unlock()
32
33 m.eventCounts[eventType]++
34 m.processingTimes[eventType] = append(m.processingTimes[eventType], duration)
35 m.lastProcessedTime[eventType] = time.Now()
36
37 if !success {
38 m.failureCounts[eventType]++
39 }
40}
41
42func (m *EventMetrics) GetStats(eventType string) map[string]interface{} {
43 m.mu.RLock()
44 defer m.mu.RUnlock()
45
46 times := m.processingTimes[eventType]
47 if len(times) == 0 {
48 return map[string]interface{}{
49 "count": 0,
50 "failures": 0,
51 "avg_time": "0ms",
52 "last_processed": "never",
53 }
54 }
55
56 var total time.Duration
57 for _, d := range times {
58 total += d
59 }
60 avg := total / time.Duration(len(times))
61
62 return map[string]interface{}{
63 "count": m.eventCounts[eventType],
64 "failures": m.failureCounts[eventType],
65 "avg_time": avg.String(),
66 "last_processed": m.lastProcessedTime[eventType].Format(time.RFC3339),
67 }
68}
69
70// Monitored event handler wrapper
71func WithMetrics(metrics *EventMetrics, handler EventHandler) EventHandler {
72 return func(ctx context.Context, event Event) error {
73 start := time.Now()
74 err := handler(ctx, event)
75 duration := time.Since(start)
76
77 metrics.RecordEvent(event.EventType(), duration, err == nil)
78
79 if err != nil {
80 fmt.Printf("❌ Event processing failed: %s (took %v)\n", event.EventType(), duration)
81 }
82
83 return err
84 }
85}
86
87// Health check for event processing
88type HealthChecker struct {
89 metrics *EventMetrics
90}
91
92func NewHealthChecker(metrics *EventMetrics) *HealthChecker {
93 return &HealthChecker{metrics: metrics}
94}
95
96func (h *HealthChecker) CheckHealth() map[string]string {
97 h.metrics.mu.RLock()
98 defer h.metrics.mu.RUnlock()
99
100 health := make(map[string]string)
101
102 for eventType, lastTime := range h.metrics.lastProcessedTime {
103 timeSince := time.Since(lastTime)
104
105 if timeSince > 5*time.Minute {
106 health[eventType] = "unhealthy: no events in 5 minutes"
107 } else {
108 failureRate := float64(h.metrics.failureCounts[eventType]) / float64(h.metrics.eventCounts[eventType])
109 if failureRate > 0.1 {
110 health[eventType] = fmt.Sprintf("degraded: %.1f%% failure rate", failureRate*100)
111 } else {
112 health[eventType] = "healthy"
113 }
114 }
115 }
116
117 return health
118}
119
120func main() {
121 fmt.Println("Event-Driven System Monitoring")
122 fmt.Println("===============================\n")
123
124 metrics := NewEventMetrics()
125 healthChecker := NewHealthChecker(metrics)
126
127 // Simulate event processing
128 eventTypes := []string{"OrderCreated", "OrderPaid", "OrderShipped"}
129
130 for i := 0; i < 100; i++ {
131 eventType := eventTypes[i%len(eventTypes)]
132
133 // Simulate processing
134 start := time.Now()
135 success := i%10 != 0 // 10% failure rate
136 time.Sleep(time.Duration(10+i%50) * time.Millisecond)
137 duration := time.Since(start)
138
139 metrics.RecordEvent(eventType, duration, success)
140 }
141
142 // Display metrics
143 fmt.Println("Event Processing Statistics:")
144 for _, eventType := range eventTypes {
145 stats := metrics.GetStats(eventType)
146 fmt.Printf("\n%s:\n", eventType)
147 for key, value := range stats {
148 fmt.Printf(" %s: %v\n", key, value)
149 }
150 }
151
152 // Health check
153 fmt.Println("\nSystem Health:")
154 health := healthChecker.CheckHealth()
155 for eventType, status := range health {
156 fmt.Printf(" %s: %s\n", eventType, status)
157 }
158}
Further Reading
- Event Sourcing Pattern - Martin Fowler
- CQRS Pattern - Martin Fowler
- Event Store Documentation
- Domain-Driven Design - Eric Evans
- Versioning in an Event Sourced System - Greg Young
- Building Event-Driven Microservices - Adam Bellemare
- Enterprise Integration Patterns - Gregor Hohpe
- Reactive Messaging Patterns
Practice Exercises
Exercise 1: Event Sourced Banking System
🎯 Learning Objectives:
- Understand event sourcing fundamentals and implementation patterns
- Design domain events that capture business state changes
- Build event stores with optimistic concurrency control
- Implement aggregate rebuilding from event streams
- Create business invariants through event validation
⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A financial services company needs a banking system with complete audit trails and regulatory compliance. Event sourcing provides immutable transaction history and enables temporal queries for business analysis and regulatory reporting.
Task: Build a comprehensive event-sourced banking system that demonstrates core event sourcing concepts while handling real-world banking scenarios and business rules.
Core Event Sourcing Components:
- Domain Events: AccountOpened, MoneyDeposited, MoneyWithdrawn, AccountClosed, OverdraftOccurred
- Event Store: Optimistic concurrency with version checking and persistence
- Aggregate Root: Account entity with business logic and event application
- Event Rebuilding: State reconstruction from event history
- Snapshot Strategy: Performance optimization for long-lived aggregates
Business Rules to Implement:
- Account opening validation and minimum balance requirements
- Overdraft protection with configurable limits
- Daily transaction limits and fraud detection
- Account closure with balance validation
- Interest calculation and fee application events
- Account suspension and reactivation workflows
Advanced Features:
- Event versioning and schema evolution
- Temporal queries for account history at any point in time
- Event replay for bug fixes and business logic updates
- Integration with external systems through event publishing
- Audit trail and compliance reporting capabilities
- Performance monitoring and optimization strategies
Complete Solution
1// run
2package main
3
4import (
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9)
10
11// Event interface
12type Event interface {
13 EventType() string
14 AggregateID() string
15 EventVersion() int
16 OccurredAt() time.Time
17}
18
19// Base event
20type BaseEvent struct {
21 Type string
22 ID string
23 Version int
24 Timestamp time.Time
25}
26
27func (e BaseEvent) EventType() string { return e.Type }
28func (e BaseEvent) AggregateID() string { return e.ID }
29func (e BaseEvent) EventVersion() int { return e.Version }
30func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
31
32// Account events
33type AccountOpenedEvent struct {
34 BaseEvent
35 AccountHolder string
36 InitialBalance float64
37}
38
39type MoneyDepositedEvent struct {
40 BaseEvent
41 Amount float64
42 Description string
43}
44
45type MoneyWithdrawnEvent struct {
46 BaseEvent
47 Amount float64
48 Description string
49}
50
51type AccountClosedEvent struct {
52 BaseEvent
53 Reason string
54}
55
56// Account aggregate
57type Account struct {
58 ID string
59 AccountHolder string
60 Balance float64
61 Status string
62 Version int
63 events []Event
64}
65
66// Create new account
67func OpenAccount(id, holder string, initialBalance float64) (*Account, error) {
68 if holder == "" {
69 return nil, errors.New("account holder required")
70 }
71 if initialBalance < 0 {
72 return nil, errors.New("initial balance cannot be negative")
73 }
74
75 account := &Account{ID: id, Version: 0}
76
77 event := AccountOpenedEvent{
78 BaseEvent: BaseEvent{
79 Type: "AccountOpened",
80 ID: id,
81 Version: 1,
82 Timestamp: time.Now(),
83 },
84 AccountHolder: holder,
85 InitialBalance: initialBalance,
86 }
87
88 account.Apply(event)
89 account.RecordEvent(event)
90
91 return account, nil
92}
93
94// Deposit money
95func (a *Account) Deposit(amount float64, description string) error {
96 if a.Status != "active" {
97 return errors.New("account is not active")
98 }
99 if amount <= 0 {
100 return errors.New("amount must be positive")
101 }
102
103 event := MoneyDepositedEvent{
104 BaseEvent: BaseEvent{
105 Type: "MoneyDeposited",
106 ID: a.ID,
107 Version: a.Version + 1,
108 Timestamp: time.Now(),
109 },
110 Amount: amount,
111 Description: description,
112 }
113
114 a.Apply(event)
115 a.RecordEvent(event)
116
117 return nil
118}
119
120// Withdraw money
121func (a *Account) Withdraw(amount float64, description string) error {
122 if a.Status != "active" {
123 return errors.New("account is not active")
124 }
125 if amount <= 0 {
126 return errors.New("amount must be positive")
127 }
128 if a.Balance < amount {
129 return errors.New("insufficient funds")
130 }
131
132 event := MoneyWithdrawnEvent{
133 BaseEvent: BaseEvent{
134 Type: "MoneyWithdrawn",
135 ID: a.ID,
136 Version: a.Version + 1,
137 Timestamp: time.Now(),
138 },
139 Amount: amount,
140 Description: description,
141 }
142
143 a.Apply(event)
144 a.RecordEvent(event)
145
146 return nil
147}
148
149// Close account
150func (a *Account) Close(reason string) error {
151 if a.Status != "active" {
152 return errors.New("account is not active")
153 }
154 if a.Balance > 0 {
155 return errors.New("cannot close account with positive balance")
156 }
157
158 event := AccountClosedEvent{
159 BaseEvent: BaseEvent{
160 Type: "AccountClosed",
161 ID: a.ID,
162 Version: a.Version + 1,
163 Timestamp: time.Now(),
164 },
165 Reason: reason,
166 }
167
168 a.Apply(event)
169 a.RecordEvent(event)
170
171 return nil
172}
173
174// Apply event to update state
175func (a *Account) Apply(event Event) {
176 switch e := event.(type) {
177 case AccountOpenedEvent:
178 a.AccountHolder = e.AccountHolder
179 a.Balance = e.InitialBalance
180 a.Status = "active"
181 a.Version = e.Version
182
183 case MoneyDepositedEvent:
184 a.Balance += e.Amount
185 a.Version = e.Version
186
187 case MoneyWithdrawnEvent:
188 a.Balance -= e.Amount
189 a.Version = e.Version
190
191 case AccountClosedEvent:
192 a.Status = "closed"
193 a.Version = e.Version
194 }
195}
196
197func (a *Account) RecordEvent(event Event) {
198 a.events = append(a.events, event)
199}
200
201func (a *Account) GetUncommittedEvents() []Event {
202 return a.events
203}
204
205func (a *Account) ClearUncommittedEvents() {
206 a.events = nil
207}
208
209// Rebuild account from events
210func RebuildAccount(events []Event) *Account {
211 account := &Account{}
212 for _, event := range events {
213 account.Apply(event)
214 }
215 return account
216}
217
218// In-memory event store
219type InMemoryEventStore struct {
220 events map[string][]Event
221 mu sync.RWMutex
222}
223
224func NewInMemoryEventStore() *InMemoryEventStore {
225 return &InMemoryEventStore{
226 events: make(map[string][]Event),
227 }
228}
229
230func (s *InMemoryEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int) error {
231 s.mu.Lock()
232 defer s.mu.Unlock()
233
234 currentEvents := s.events[aggregateID]
235 if len(currentEvents) != expectedVersion {
236 return errors.New("concurrency conflict")
237 }
238
239 s.events[aggregateID] = append(currentEvents, events...)
240 return nil
241}
242
243func (s *InMemoryEventStore) GetEvents(aggregateID string) []Event {
244 s.mu.RLock()
245 defer s.mu.RUnlock()
246 return s.events[aggregateID]
247}
248
249func main() {
250 fmt.Println("Event Sourcing: Banking Account")
251 fmt.Println("================================\n")
252
253 store := NewInMemoryEventStore()
254
255 // Open account
256 account, err := OpenAccount("acc-123", "John Doe", 1000.0)
257 if err != nil {
258 fmt.Printf("Error: %v\n", err)
259 return
260 }
261 fmt.Printf("✓ Account opened: %s (holder=%s, balance=$%.2f)\n",
262 account.ID, account.AccountHolder, account.Balance)
263
264 // Save events
265 store.SaveEvents(account.ID, account.GetUncommittedEvents(), 0)
266 account.ClearUncommittedEvents()
267
268 // Deposit
269 account.Deposit(500.0, "Salary")
270 fmt.Printf("✓ Deposited $500.00 (new balance=$%.2f)\n", account.Balance)
271
272 // Save events
273 store.SaveEvents(account.ID, account.GetUncommittedEvents(), account.Version-1)
274 account.ClearUncommittedEvents()
275
276 // Withdraw
277 account.Withdraw(200.0, "ATM withdrawal")
278 fmt.Printf("✓ Withdrew $200.00 (new balance=$%.2f)\n", account.Balance)
279
280 // Save events
281 store.SaveEvents(account.ID, account.GetUncommittedEvents(), account.Version-1)
282 account.ClearUncommittedEvents()
283
284 // Get all events for the account
285 events := store.GetEvents("acc-123")
286 fmt.Printf("\nEvent History (%d events):\n", len(events))
287 for i, evt := range events {
288 fmt.Printf(" %d. %s (v%d) at %s\n",
289 i+1, evt.EventType(), evt.EventVersion(),
290 evt.OccurredAt().Format("15:04:05"))
291 }
292
293 // Rebuild account from events
294 fmt.Println("\nRebuilding account from events...")
295 rebuilt := RebuildAccount(events)
296 fmt.Printf("✓ Rebuilt: %s, Balance: $%.2f, Status: %s, Version: %d\n",
297 rebuilt.AccountHolder, rebuilt.Balance, rebuilt.Status, rebuilt.Version)
298
299 // Test overdraft protection
300 fmt.Println("\nTesting overdraft protection...")
301 err = rebuilt.Withdraw(5000.0, "Large withdrawal")
302 if err != nil {
303 fmt.Printf("✗ Withdrawal blocked: %v\n", err)
304 }
305}
Exercise 2: CQRS Order Management System
🎯 Learning Objectives:
- Design separate command and query models for different performance requirements
- Implement event-driven projections for real-time read model updates
- Create optimized read models for different query patterns
- Understand eventual consistency and trade-offs in CQRS systems
- Build scalable architectures with read/write separation
⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform experiences high read-to-write ratios during sales events. CQRS enables independent scaling of command and query sides while providing optimized read models for different customer journeys and business reporting needs.
Task: Build a comprehensive order management system using CQRS pattern that demonstrates read/write model separation, event-driven projections, and optimized query handling for different use cases.
Command Side Components:
- Command Handlers: CreateOrder, UpdateOrder, CancelOrder, AddOrderItem, RemoveOrderItem
- Aggregate Root: Order entity with business logic and state transitions
- Event Publishing: Domain events for projection updates
- Validation Framework: Business rules and command validation
- Command Bus: Asynchronous command processing with retries
Query Side Components:
- Read Models: OrderListView, OrderDetailsView, OrderStatistics, CustomerOrderHistory
- Projections: Event handlers that update read models
- Query Handlers: Optimized queries for different read models
- Materialized Views: Pre-computed data for common queries
- Caching Layer: Performance optimization for frequent queries
Advanced Features:
- Event versioning and schema evolution for read models
- Snapshot strategies for large datasets
- Query optimization with indexing strategies
- Real-time updates through websockets
- Multi-tenant data isolation
- Performance monitoring and query optimization
- Integration with search engines for full-text search
- Data consistency validation and repair mechanisms
Complete Solution
1// run
2package main
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "sync"
9 "time"
10)
11
12// Events
13type Event interface {
14 EventType() string
15 AggregateID() string
16}
17
18type OrderCreatedEvent struct {
19 OrderID string
20 CustomerID string
21 Items []OrderItem
22 Total float64
23 Timestamp time.Time
24}
25
26func (e OrderCreatedEvent) EventType() string { return "OrderCreated" }
27func (e OrderCreatedEvent) AggregateID() string { return e.OrderID }
28
29type OrderUpdatedEvent struct {
30 OrderID string
31 Items []OrderItem
32 Total float64
33 Timestamp time.Time
34}
35
36func (e OrderUpdatedEvent) EventType() string { return "OrderUpdated" }
37func (e OrderUpdatedEvent) AggregateID() string { return e.OrderID }
38
39type OrderCancelledEvent struct {
40 OrderID string
41 Reason string
42 Timestamp time.Time
43}
44
45func (e OrderCancelledEvent) EventType() string { return "OrderCancelled" }
46func (e OrderCancelledEvent) AggregateID() string { return e.OrderID }
47
48// Domain models
49type OrderItem struct {
50 ProductID string `json:"product_id"`
51 Quantity int `json:"quantity"`
52 Price float64 `json:"price"`
53}
54
55type Order struct {
56 ID string
57 CustomerID string
58 Items []OrderItem
59 Total float64
60 Status string
61 CreatedAt time.Time
62 events []Event
63}
64
65// Commands
66type CreateOrderCommand struct {
67 OrderID string
68 CustomerID string
69 Items []OrderItem
70}
71
72type UpdateOrderCommand struct {
73 OrderID string
74 Items []OrderItem
75}
76
77type CancelOrderCommand struct {
78 OrderID string
79 Reason string
80}
81
82// Command Handler
83type OrderCommandHandler struct {
84 eventStore EventStore
85 eventBus EventBus
86}
87
88func NewOrderCommandHandler(store EventStore, bus EventBus) *OrderCommandHandler {
89 return &OrderCommandHandler{
90 eventStore: store,
91 eventBus: bus,
92 }
93}
94
95func (h *OrderCommandHandler) CreateOrder(cmd CreateOrderCommand) error {
96 // Calculate total
97 var total float64
98 for _, item := range cmd.Items {
99 total += item.Price * float64(item.Quantity)
100 }
101
102 // Create event
103 event := OrderCreatedEvent{
104 OrderID: cmd.OrderID,
105 CustomerID: cmd.CustomerID,
106 Items: cmd.Items,
107 Total: total,
108 Timestamp: time.Now(),
109 }
110
111 // Save to event store
112 if err := h.eventStore.Save(event); err != nil {
113 return err
114 }
115
116 // Publish to event bus
117 h.eventBus.Publish(event)
118
119 return nil
120}
121
122func (h *OrderCommandHandler) UpdateOrder(cmd UpdateOrderCommand) error {
123 var total float64
124 for _, item := range cmd.Items {
125 total += item.Price * float64(item.Quantity)
126 }
127
128 event := OrderUpdatedEvent{
129 OrderID: cmd.OrderID,
130 Items: cmd.Items,
131 Total: total,
132 Timestamp: time.Now(),
133 }
134
135 if err := h.eventStore.Save(event); err != nil {
136 return err
137 }
138
139 h.eventBus.Publish(event)
140 return nil
141}
142
143func (h *OrderCommandHandler) CancelOrder(cmd CancelOrderCommand) error {
144 event := OrderCancelledEvent{
145 OrderID: cmd.OrderID,
146 Reason: cmd.Reason,
147 Timestamp: time.Now(),
148 }
149
150 if err := h.eventStore.Save(event); err != nil {
151 return err
152 }
153
154 h.eventBus.Publish(event)
155 return nil
156}
157
158// Read Models
159type OrderListView struct {
160 OrderID string `json:"order_id"`
161 CustomerID string `json:"customer_id"`
162 Total float64 `json:"total"`
163 Status string `json:"status"`
164 CreatedAt time.Time `json:"created_at"`
165}
166
167type OrderDetailsView struct {
168 OrderID string `json:"order_id"`
169 CustomerID string `json:"customer_id"`
170 Items []OrderItem `json:"items"`
171 Total float64 `json:"total"`
172 Status string `json:"status"`
173 CreatedAt time.Time `json:"created_at"`
174 UpdatedAt time.Time `json:"updated_at"`
175}
176
177type OrderStatistics struct {
178 TotalOrders int `json:"total_orders"`
179 ActiveOrders int `json:"active_orders"`
180 CancelledOrders int `json:"cancelled_orders"`
181 TotalRevenue float64 `json:"total_revenue"`
182}
183
184// Query Handler
185type OrderQueryHandler struct {
186 listStore map[string]*OrderListView
187 detailStore map[string]*OrderDetailsView
188 stats *OrderStatistics
189 mu sync.RWMutex
190}
191
192func NewOrderQueryHandler() *OrderQueryHandler {
193 return &OrderQueryHandler{
194 listStore: make(map[string]*OrderListView),
195 detailStore: make(map[string]*OrderDetailsView),
196 stats: &OrderStatistics{},
197 }
198}
199
200func (q *OrderQueryHandler) GetOrderList() []*OrderListView {
201 q.mu.RLock()
202 defer q.mu.RUnlock()
203
204 list := make([]*OrderListView, 0, len(q.listStore))
205 for _, view := range q.listStore {
206 list = append(list, view)
207 }
208 return list
209}
210
211func (q *OrderQueryHandler) GetOrderDetails(orderID string) *OrderDetailsView {
212 q.mu.RLock()
213 defer q.mu.RUnlock()
214 return q.detailStore[orderID]
215}
216
217func (q *OrderQueryHandler) GetStatistics() *OrderStatistics {
218 q.mu.RLock()
219 defer q.mu.RUnlock()
220 return q.stats
221}
222
223// Projection
224type OrderProjection struct {
225 queryHandler *OrderQueryHandler
226}
227
228func NewOrderProjection(handler *OrderQueryHandler) *OrderProjection {
229 return &OrderProjection{queryHandler: handler}
230}
231
232func (p *OrderProjection) HandleEvent(event Event) {
233 p.queryHandler.mu.Lock()
234 defer p.queryHandler.mu.Unlock()
235
236 switch e := event.(type) {
237 case OrderCreatedEvent:
238 // Update list view
239 p.queryHandler.listStore[e.OrderID] = &OrderListView{
240 OrderID: e.OrderID,
241 CustomerID: e.CustomerID,
242 Total: e.Total,
243 Status: "active",
244 CreatedAt: e.Timestamp,
245 }
246
247 // Update details view
248 p.queryHandler.detailStore[e.OrderID] = &OrderDetailsView{
249 OrderID: e.OrderID,
250 CustomerID: e.CustomerID,
251 Items: e.Items,
252 Total: e.Total,
253 Status: "active",
254 CreatedAt: e.Timestamp,
255 UpdatedAt: e.Timestamp,
256 }
257
258 // Update statistics
259 p.queryHandler.stats.TotalOrders++
260 p.queryHandler.stats.ActiveOrders++
261 p.queryHandler.stats.TotalRevenue += e.Total
262
263 case OrderUpdatedEvent:
264 if details, ok := p.queryHandler.detailStore[e.OrderID]; ok {
265 oldTotal := details.Total
266 details.Items = e.Items
267 details.Total = e.Total
268 details.UpdatedAt = e.Timestamp
269
270 // Update list view
271 if list, ok := p.queryHandler.listStore[e.OrderID]; ok {
272 list.Total = e.Total
273 }
274
275 // Update statistics
276 p.queryHandler.stats.TotalRevenue += (e.Total - oldTotal)
277 }
278
279 case OrderCancelledEvent:
280 if details, ok := p.queryHandler.detailStore[e.OrderID]; ok {
281 details.Status = "cancelled"
282 details.UpdatedAt = e.Timestamp
283
284 // Update list view
285 if list, ok := p.queryHandler.listStore[e.OrderID]; ok {
286 list.Status = "cancelled"
287 }
288
289 // Update statistics
290 p.queryHandler.stats.ActiveOrders--
291 p.queryHandler.stats.CancelledOrders++
292 p.queryHandler.stats.TotalRevenue -= details.Total
293 }
294 }
295}
296
297// Infrastructure
298type EventStore interface {
299 Save(event Event) error
300 GetAll() []Event
301}
302
303type InMemoryEventStore struct {
304 events []Event
305 mu sync.Mutex
306}
307
308func NewInMemoryEventStore() *InMemoryEventStore {
309 return &InMemoryEventStore{events: make([]Event, 0)}
310}
311
312func (s *InMemoryEventStore) Save(event Event) error {
313 s.mu.Lock()
314 defer s.mu.Unlock()
315 s.events = append(s.events, event)
316 return nil
317}
318
319func (s *InMemoryEventStore) GetAll() []Event {
320 s.mu.Lock()
321 defer s.mu.Unlock()
322 return s.events
323}
324
325type EventBus interface {
326 Publish(event Event)
327 Subscribe(handler func(Event))
328}
329
330type InMemoryEventBus struct {
331 handlers []func(Event)
332 mu sync.RWMutex
333}
334
335func NewInMemoryEventBus() *InMemoryEventBus {
336 return &InMemoryEventBus{handlers: make([]func(Event), 0)}
337}
338
339func (b *InMemoryEventBus) Publish(event Event) {
340 b.mu.RLock()
341 defer b.mu.RUnlock()
342
343 for _, handler := range b.handlers {
344 go handler(event)
345 }
346}
347
348func (b *InMemoryEventBus) Subscribe(handler func(Event)) {
349 b.mu.Lock()
350 defer b.mu.Unlock()
351 b.handlers = append(b.handlers, handler)
352}
353
354func main() {
355 fmt.Println("CQRS Pattern: Order Management")
356 fmt.Println("===============================\n")
357
358 // Setup
359 eventStore := NewInMemoryEventStore()
360 eventBus := NewInMemoryEventBus()
361 commandHandler := NewOrderCommandHandler(eventStore, eventBus)
362 queryHandler := NewOrderQueryHandler()
363 projection := NewOrderProjection(queryHandler)
364
365 // Subscribe projection to events
366 eventBus.Subscribe(projection.HandleEvent)
367
368 // Command: Create orders
369 fmt.Println("Creating orders...")
370 commandHandler.CreateOrder(CreateOrderCommand{
371 OrderID: "order-1",
372 CustomerID: "customer-1",
373 Items: []OrderItem{
374 {ProductID: "prod-1", Quantity: 2, Price: 29.99},
375 {ProductID: "prod-2", Quantity: 1, Price: 49.99},
376 },
377 })
378
379 commandHandler.CreateOrder(CreateOrderCommand{
380 OrderID: "order-2",
381 CustomerID: "customer-2",
382 Items: []OrderItem{
383 {ProductID: "prod-3", Quantity: 1, Price: 99.99},
384 },
385 })
386
387 time.Sleep(100 * time.Millisecond) // Wait for async processing
388
389 // Query: List orders
390 fmt.Println("\nOrder List:")
391 for _, order := range queryHandler.GetOrderList() {
392 fmt.Printf(" %s: Customer=%s, Total=$%.2f, Status=%s\n",
393 order.OrderID, order.CustomerID, order.Total, order.Status)
394 }
395
396 // Query: Order details
397 fmt.Println("\nOrder Details:")
398 details := queryHandler.GetOrderDetails("order-1")
399 if details != nil {
400 data, _ := json.MarshalIndent(details, "", " ")
401 fmt.Println(string(data))
402 }
403
404 // Command: Update order
405 fmt.Println("\nUpdating order-1...")
406 commandHandler.UpdateOrder(UpdateOrderCommand{
407 OrderID: "order-1",
408 Items: []OrderItem{
409 {ProductID: "prod-1", Quantity: 3, Price: 29.99},
410 },
411 })
412
413 time.Sleep(100 * time.Millisecond)
414
415 // Command: Cancel order
416 fmt.Println("\nCancelling order-2...")
417 commandHandler.CancelOrder(CancelOrderCommand{
418 OrderID: "order-2",
419 Reason: "Customer request",
420 })
421
422 time.Sleep(100 * time.Millisecond)
423
424 // Query: Statistics
425 fmt.Println("\nOrder Statistics:")
426 stats := queryHandler.GetStatistics()
427 data, _ := json.MarshalIndent(stats, "", " ")
428 fmt.Println(string(data))
429
430 // Show event stream
431 fmt.Println("\nEvent Stream:")
432 for i, event := range eventStore.GetAll() {
433 fmt.Printf(" %d. %s (aggregate: %s)\n", i+1, event.EventType(), event.AggregateID())
434 }
435}
Exercise 3: Distributed Saga for Order Fulfillment
🎯 Learning Objectives:
- Implement distributed transaction management using saga pattern
- Design compensation actions for complex business workflows
- Build saga orchestrators with state persistence and recovery
- Handle timeouts, retries, and failure scenarios gracefully
- Coordinate multiple services in distributed systems
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform needs to coordinate order fulfillment across inventory, payment, shipping, and notification services. Distributed sagas ensure data consistency across services without using distributed transactions, which don't scale well in microservices architectures.
Task: Create a comprehensive saga orchestration system for order fulfillment that coordinates multiple services while ensuring data consistency and providing reliable failure recovery mechanisms.
Core Saga Components:
- Saga Orchestrator: Central coordinator managing workflow state and execution
- Saga Steps: Inventory reservation, payment processing, order shipping, notification delivery
- Compensation Actions: Inventory release, payment refund, shipment cancellation
- State Persistence: Reliable saga state storage with crash recovery
- Timeout Management: Automatic timeout handling for long-running operations
Orchestration Workflow:
- Order Validation: Validate order data and check customer eligibility
- Inventory Reservation: Reserve products and lock stock levels
- Payment Processing: Charge customer payment method
- Order Creation: Create official order record with tracking
- Shipping Arrangement: Schedule shipment and generate tracking numbers
- Notification Dispatch: Send order confirmation and tracking updates
Failure Scenarios and Recovery:
- Payment gateway timeouts and retry logic
- Inventory out-of-stock after reservation
- Shipping service unavailability
- Notification system failures
- Network partitions and service unavailability
- Database connection issues and consistency problems
Advanced Patterns:
- Event-driven saga choreography vs orchestration
- Saga monitoring and observability dashboards
- Manual intervention for failed sagas
- Performance optimization with parallel execution
- Integration with external monitoring systems
- Saga analytics and business intelligence
Complete Solution
See the saga implementation in the main article content above (already included in the Production Patterns section).
Exercise 4: Event Store with Replay Capabilities
🎯 Learning Objectives:
- Build production-ready event store with persistence
- Implement event replay for debugging and migration
- Create event versioning strategies
- Handle concurrent writes with optimistic locking
- Design efficient event streaming
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: Financial systems require complete audit trails with ability to replay events for regulatory compliance and bug investigation.
Task: Implement a robust event store with PostgreSQL backend that supports event replay, versioning, and concurrent access.
Requirements:
- Append-only event log with versioning
- Optimistic concurrency control
- Event replay by aggregate or time range
- Snapshot support for performance
- Event stream pagination
Complete Solution
See the PostgreSQL Event Store implementation in the main article content above.
Exercise 5: Multi-View CQRS with Real-Time Updates
🎯 Learning Objectives:
- Create multiple optimized read models
- Implement real-time projection updates
- Design efficient query patterns
- Handle eventual consistency
- Build scalable read architectures
⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: E-commerce platforms need different views for customers, administrators, and analytics - each optimized for specific queries.
Task: Build a CQRS system with multiple read models (list view, detail view, analytics view) that update in real-time as events occur.
Requirements:
- Multiple specialized read models
- Event-driven projection updates
- Query optimization for each view
- Real-time synchronization
- Performance monitoring
Complete Solution
See the Multiple Read Models implementation in the main article content above.
Summary
Key Takeaways
-
Event Sourcing
- Store all state changes as events
- Enable time travel and audit trails
- Rebuild state by replaying events
- Use snapshots for performance
-
CQRS
- Separate write and read models
- Optimize each for its purpose
- Use projections to build read models
- Handle eventual consistency
-
Event Store
- Append-only event log
- Optimistic concurrency control
- Support snapshots
- Efficient event retrieval
-
Event Bus
- Publish events to subscribers
- Support multiple handlers
- Handle failures gracefully
- Consider ordering guarantees
-
Production Readiness
- Implement idempotency
- Support event replay
- Monitor projections
- Handle schema evolution
Next Steps:
- Explore advanced testing techniques for event-driven systems
- Learn about chaos engineering for distributed systems
- Study production monitoring and observability
- Investigate cloud-native event streaming platforms
You've mastered event-driven architecture in Go - you're now ready to build scalable, resilient distributed systems!