Event-Driven Systems

Think of a busy newsroom where journalists write stories, editors review them, and publishers distribute them - all happening continuously and independently. Each journalist doesn't need to know who will read their article or how it will be distributed. They just write the story, and the system ensures it reaches the right people. This is how event-driven architecture works - services produce events without knowing who will consume them.

Event-driven architecture enables loosely coupled systems where services communicate through events. This comprehensive guide covers event sourcing, CQRS, event stores, projections, sagas, and building production-ready event-driven systems in Go.

💡 Key Takeaway: Event-driven architecture transforms your system from a chain of direct dependencies into a responsive ecosystem where services react to what happens, rather than being told what to do.

Event-Driven Architecture Fundamentals

Think of events like a newspaper's daily publication. Each article is an immutable fact about something that happened. Readers can subscribe to the sections they care about, and new articles can be added without changing how existing readers work.

Event-driven systems react to events asynchronously, enabling scalable and decoupled architectures.

Why Event-Driven Architecture:

  • Decoupling: Services don't need to know about each other, only about events
  • Scalability: Async processing allows handling bursts of traffic
  • Auditability: Events provide a complete history of what happened
  • Flexibility: Easy to add new consumers without modifying producers
  • Resilience: Failure in one service doesn't cascade to others
  • Time Travel: Reconstruct past states for debugging and analysis

Key Patterns:

  • Event Sourcing: Store all changes as a sequence of events
  • CQRS: Separate write models from read models
  • Saga: Coordinate distributed transactions across services using events
  • Event Streaming: Continuous flow of events for real-time processing
  • Event Notification: Notify interested parties when events occur

Trade-offs:

  • ✅ Extreme scalability and flexibility
  • ✅ Natural audit logging and compliance
  • ✅ Temporal queries and point-in-time recovery
  • ❌ Eventual consistency challenges
  • ❌ Increased system complexity
  • ❌ Event versioning and schema evolution
  • ❌ Debugging distributed flows is harder

⚠️ Important: Event-driven systems require a mindset shift from imperative to reactive. Be prepared for more complex debugging and monitoring challenges.

Real-world Example: Uber uses event-driven architecture to coordinate millions of rides. When a driver accepts a ride, that event triggers updates to passenger apps, payment processing, and driver dispatch systems - all without direct coupling between these services.

Core Concepts

  1// run
  2package main
  3
  4import (
  5	"fmt"
  6	"time"
  7)
  8
  9// Event represents something that happened
 10type Event struct {
 11	ID            string
 12	Type          string
 13	AggregateID   string
 14	AggregateType string
 15	Data          interface{}
 16	Metadata      map[string]string
 17	Timestamp     time.Time
 18	Version       int
 19}
 20
 21func main() {
 22	fmt.Println("Event-Driven Architecture Concepts")
 23	fmt.Println("==================================\n")
 24
 25	concepts := map[string]string{
 26		"Event":         "Immutable fact about something that happened",
 27		"Command":       "Intent to change state",
 28		"Aggregate":     "Cluster of domain objects treated as a unit",
 29		"Event Store":   "Append-only log of events",
 30		"Projection":    "Materialized view built from events",
 31		"Event Handler": "Processes events and updates state",
 32		"Event Bus":     "Routes events to subscribers",
 33		"Snapshot":      "Point-in-time state for optimization",
 34	}
 35
 36	for concept, description := range concepts {
 37		fmt.Printf("%-15s: %s\n", concept, description)
 38	}
 39
 40	fmt.Println("\nEvent vs Command:")
 41	fmt.Println("=================")
 42	fmt.Println("Command: CreateOrder       (imperative)")
 43	fmt.Println("Event:   OrderCreated      (past tense)")
 44	fmt.Println()
 45	fmt.Println("Command: UpdateInventory   (imperative)")
 46	fmt.Println("Event:   InventoryUpdated  (past tense)")
 47
 48	fmt.Println("\nBenefits:")
 49	fmt.Println("- Audit trail: Complete history of changes")
 50	fmt.Println("- Temporal queries: State at any point in time")
 51	fmt.Println("- Event replay: Rebuild state from events")
 52	fmt.Println("- Decoupling: Services react to events independently")
 53	fmt.Println("- Scalability: Async processing, parallel handlers")
 54	fmt.Println("- Flexibility: Add new features without changing existing code")
 55
 56	// Example events
 57	events := []Event{
 58		{
 59			ID:            "evt-1",
 60			Type:          "OrderCreated",
 61			AggregateID:   "order-123",
 62			AggregateType: "Order",
 63			Data: map[string]interface{}{
 64				"customer_id": "cust-456",
 65				"total":       99.99,
 66			},
 67			Timestamp: time.Now(),
 68			Version:   1,
 69		},
 70		{
 71			ID:            "evt-2",
 72			Type:          "OrderPaid",
 73			AggregateID:   "order-123",
 74			AggregateType: "Order",
 75			Data: map[string]interface{}{
 76				"payment_id": "pay-789",
 77				"amount":     99.99,
 78			},
 79			Timestamp: time.Now().Add(1 * time.Minute),
 80			Version:   2,
 81		},
 82		{
 83			ID:            "evt-3",
 84			Type:          "OrderShipped",
 85			AggregateID:   "order-123",
 86			AggregateType: "Order",
 87			Data: map[string]interface{}{
 88				"tracking_number": "TRK123456",
 89			},
 90			Timestamp: time.Now().Add(1 * time.Hour),
 91			Version:   3,
 92		},
 93	}
 94
 95	fmt.Println("\nEvent Stream Example:")
 96	for _, evt := range events {
 97		fmt.Printf("  v%d %s: %s (aggregate: %s)\n",
 98			evt.Version, evt.Timestamp.Format("15:04:05"),
 99			evt.Type, evt.AggregateID)
100	}
101
102	fmt.Println("\nEvent Characteristics:")
103	fmt.Println("- Immutable: Events never change once created")
104	fmt.Println("- Ordered: Events have a sequence/version number")
105	fmt.Println("- Past tense: Events describe what happened")
106	fmt.Println("- Self-contained: Events carry all necessary data")
107	fmt.Println("- Timestamped: Events know when they occurred")
108}

Event Sourcing

Imagine keeping a complete diary of your life instead of just remembering your current situation. Your diary contains every significant event that happened to you, in chronological order. If someone asks about your current job, you don't just tell them - you can trace back through your diary to see all the events that led to where you are today. This is exactly how event sourcing works.

Instead of storing just the current state of data, event sourcing stores every change as an immutable event. The current state is derived by replaying these events.

Event sourcing stores state changes as a sequence of events rather than current state.

Common Pitfalls:

  • Event stores can grow very large over time
  • Rebuilding state from many events can be slow
  • Schema evolution requires careful planning
  • Debugging requires understanding event flows, not just current state

💡 Key Takeaway: Event sourcing provides a complete audit trail and enables powerful time-travel queries, but requires careful planning for performance and storage.

When to Use Event Sourcing:

  • Financial systems requiring complete audit trails
  • Systems needing temporal queries (state at any point in time)
  • Complex domains with multiple state transitions
  • Systems requiring event replay for bug fixes or migrations
  • Regulatory compliance requiring immutable audit logs

When NOT to Use Event Sourcing:

  • Simple CRUD applications
  • Systems with high write throughput (>100k events/second)
  • When eventual consistency is unacceptable
  • Teams unfamiliar with event-driven patterns

Event Sourcing Implementation

  1package eventsourcing
  2
  3import (
  4	"errors"
  5	"fmt"
  6	"time"
  7)
  8
  9// Base event interface
 10type Event interface {
 11	EventType() string
 12	AggregateID() string
 13	EventVersion() int
 14	OccurredAt() time.Time
 15}
 16
 17// Base event struct
 18type BaseEvent struct {
 19	Type      string
 20	ID        string
 21	Version   int
 22	Timestamp time.Time
 23}
 24
 25func (e BaseEvent) EventType() string     { return e.Type }
 26func (e BaseEvent) AggregateID() string   { return e.ID }
 27func (e BaseEvent) EventVersion() int     { return e.Version }
 28func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
 29
 30// Order aggregate
 31type Order struct {
 32	ID         string
 33	CustomerID string
 34	Items      []OrderItem
 35	Total      float64
 36	Status     string
 37	Version    int
 38	events     []Event
 39}
 40
 41type OrderItem struct {
 42	ProductID string
 43	Quantity  int
 44	Price     float64
 45}
 46
 47// Order events
 48type OrderCreatedEvent struct {
 49	BaseEvent
 50	CustomerID string
 51	Items      []OrderItem
 52	Total      float64
 53}
 54
 55type OrderPaidEvent struct {
 56	BaseEvent
 57	PaymentID string
 58	Amount    float64
 59}
 60
 61type OrderShippedEvent struct {
 62	BaseEvent
 63	TrackingNumber string
 64}
 65
 66type OrderCancelledEvent struct {
 67	BaseEvent
 68	Reason string
 69}
 70
 71// Aggregate methods
 72func NewOrder(id, customerID string, items []OrderItem) (*Order, error) {
 73	if customerID == "" {
 74		return nil, errors.New("customer ID required")
 75	}
 76	if len(items) == 0 {
 77		return nil, errors.New("at least one item required")
 78	}
 79
 80	// Calculate total
 81	var total float64
 82	for _, item := range items {
 83		total += item.Price * float64(item.Quantity)
 84	}
 85
 86	order := &Order{
 87		ID:      id,
 88		Version: 0,
 89	}
 90
 91	// Raise event
 92	event := OrderCreatedEvent{
 93		BaseEvent: BaseEvent{
 94			Type:      "OrderCreated",
 95			ID:        id,
 96			Version:   1,
 97			Timestamp: time.Now(),
 98		},
 99		CustomerID: customerID,
100		Items:      items,
101		Total:      total,
102	}
103
104	order.Apply(event)
105	order.RecordEvent(event)
106
107	return order, nil
108}
109
110func (o *Order) Pay(paymentID string, amount float64) error {
111	if o.Status != "pending" {
112		return errors.New("order must be pending to pay")
113	}
114	if amount != o.Total {
115		return errors.New("payment amount must match order total")
116	}
117
118	event := OrderPaidEvent{
119		BaseEvent: BaseEvent{
120			Type:      "OrderPaid",
121			ID:        o.ID,
122			Version:   o.Version + 1,
123			Timestamp: time.Now(),
124		},
125		PaymentID: paymentID,
126		Amount:    amount,
127	}
128
129	o.Apply(event)
130	o.RecordEvent(event)
131
132	return nil
133}
134
135func (o *Order) Ship(trackingNumber string) error {
136	if o.Status != "paid" {
137		return errors.New("order must be paid to ship")
138	}
139
140	event := OrderShippedEvent{
141		BaseEvent: BaseEvent{
142			Type:      "OrderShipped",
143			ID:        o.ID,
144			Version:   o.Version + 1,
145			Timestamp: time.Now(),
146		},
147		TrackingNumber: trackingNumber,
148	}
149
150	o.Apply(event)
151	o.RecordEvent(event)
152
153	return nil
154}
155
156func (o *Order) Cancel(reason string) error {
157	if o.Status == "shipped" || o.Status == "delivered" {
158		return errors.New("cannot cancel shipped or delivered order")
159	}
160
161	event := OrderCancelledEvent{
162		BaseEvent: BaseEvent{
163			Type:      "OrderCancelled",
164			ID:        o.ID,
165			Version:   o.Version + 1,
166			Timestamp: time.Now(),
167		},
168		Reason: reason,
169	}
170
171	o.Apply(event)
172	o.RecordEvent(event)
173
174	return nil
175}
176
177// Apply events to update state
178func (o *Order) Apply(event Event) {
179	switch e := event.(type) {
180	case OrderCreatedEvent:
181		o.CustomerID = e.CustomerID
182		o.Items = e.Items
183		o.Total = e.Total
184		o.Status = "pending"
185		o.Version = e.Version
186
187	case OrderPaidEvent:
188		o.Status = "paid"
189		o.Version = e.Version
190
191	case OrderShippedEvent:
192		o.Status = "shipped"
193		o.Version = e.Version
194
195	case OrderCancelledEvent:
196		o.Status = "cancelled"
197		o.Version = e.Version
198	}
199}
200
201func (o *Order) RecordEvent(event Event) {
202	o.events = append(o.events, event)
203}
204
205func (o *Order) GetUncommittedEvents() []Event {
206	return o.events
207}
208
209func (o *Order) ClearUncommittedEvents() {
210	o.events = nil
211}
212
213// Rebuild from events
214func RebuildOrder(events []Event) *Order {
215	order := &Order{}
216	for _, event := range events {
217		order.Apply(event)
218	}
219	return order
220}
221
222// Example usage
223func main() {
224	// Create order
225	items := []OrderItem{
226		{ProductID: "prod-1", Quantity: 2, Price: 29.99},
227		{ProductID: "prod-2", Quantity: 1, Price: 49.99},
228	}
229
230	order, err := NewOrder("order-123", "cust-456", items)
231	if err != nil {
232		fmt.Printf("Error: %v\n", err)
233		return
234	}
235
236	fmt.Printf("Order created: %s (status=%s, total=$%.2f)\n",
237		order.ID, order.Status, order.Total)
238
239	// Pay for order
240	err = order.Pay("pay-789", order.Total)
241	if err != nil {
242		fmt.Printf("Error: %v\n", err)
243		return
244	}
245
246	fmt.Printf("Order paid: %s (status=%s)\n", order.ID, order.Status)
247
248	// Ship order
249	err = order.Ship("TRK123456")
250	if err != nil {
251		fmt.Printf("Error: %v\n", err)
252		return
253	}
254
255	fmt.Printf("Order shipped: %s (status=%s)\n", order.ID, order.Status)
256
257	// Get uncommitted events
258	events := order.GetUncommittedEvents()
259	fmt.Printf("\nGenerated %d events:\n", len(events))
260	for _, evt := range events {
261		fmt.Printf("  - %s (v%d)\n", evt.EventType(), evt.EventVersion())
262	}
263
264	// Rebuild order from events
265	rebuilt := RebuildOrder(events)
266	fmt.Printf("\nRebuilt order: Status=%s, Total=$%.2f, Version=%d\n",
267		rebuilt.Status, rebuilt.Total, rebuilt.Version)
268}

Advanced Event Sourcing Patterns

Snapshots for Performance Optimization

When an aggregate has thousands of events, replaying all of them becomes slow. Snapshots solve this by storing the aggregate's state at a specific version, allowing you to load the snapshot and then replay only events after that point.

  1// run
  2package main
  3
  4import (
  5	"encoding/json"
  6	"fmt"
  7	"time"
  8)
  9
 10// Snapshot represents aggregate state at a specific version
 11type Snapshot struct {
 12	AggregateID   string          `json:"aggregate_id"`
 13	AggregateType string          `json:"aggregate_type"`
 14	Version       int             `json:"version"`
 15	State         json.RawMessage `json:"state"`
 16	Timestamp     time.Time       `json:"timestamp"`
 17}
 18
 19// Account aggregate with snapshot support
 20type Account struct {
 21	ID            string
 22	Balance       float64
 23	Transactions  int
 24	Version       int
 25	LastUpdated   time.Time
 26	uncommittedEvents []Event
 27}
 28
 29type Event interface {
 30	EventType() string
 31	AggregateID() string
 32	EventVersion() int
 33}
 34
 35type DepositedEvent struct {
 36	ID      string
 37	Amount  float64
 38	Version int
 39}
 40
 41func (e DepositedEvent) EventType() string { return "Deposited" }
 42func (e DepositedEvent) AggregateID() string { return e.ID }
 43func (e DepositedEvent) EventVersion() int { return e.Version }
 44
 45type WithdrawnEvent struct {
 46	ID      string
 47	Amount  float64
 48	Version int
 49}
 50
 51func (e WithdrawnEvent) EventType() string { return "Withdrawn" }
 52func (e WithdrawnEvent) AggregateID() string { return e.ID }
 53func (e WithdrawnEvent) EventVersion() int { return e.Version }
 54
 55// Create snapshot from current state
 56func (a *Account) CreateSnapshot() (*Snapshot, error) {
 57	stateData, err := json.Marshal(map[string]interface{}{
 58		"id":           a.ID,
 59		"balance":      a.Balance,
 60		"transactions": a.Transactions,
 61		"last_updated": a.LastUpdated,
 62	})
 63
 64	if err != nil {
 65		return nil, err
 66	}
 67
 68	return &Snapshot{
 69		AggregateID:   a.ID,
 70		AggregateType: "Account",
 71		Version:       a.Version,
 72		State:         stateData,
 73		Timestamp:     time.Now(),
 74	}, nil
 75}
 76
 77// Load from snapshot
 78func LoadAccountFromSnapshot(snapshot *Snapshot) (*Account, error) {
 79	var state map[string]interface{}
 80	if err := json.Unmarshal(snapshot.State, &state); err != nil {
 81		return nil, err
 82	}
 83
 84	return &Account{
 85		ID:           state["id"].(string),
 86		Balance:      state["balance"].(float64),
 87		Transactions: int(state["transactions"].(float64)),
 88		Version:      snapshot.Version,
 89	}, nil
 90}
 91
 92// Apply event to account
 93func (a *Account) Apply(event Event) {
 94	switch e := event.(type) {
 95	case DepositedEvent:
 96		a.Balance += e.Amount
 97		a.Transactions++
 98		a.Version = e.Version
 99		a.LastUpdated = time.Now()
100
101	case WithdrawnEvent:
102		a.Balance -= e.Amount
103		a.Transactions++
104		a.Version = e.Version
105		a.LastUpdated = time.Now()
106	}
107}
108
109// Snapshot strategy: Take snapshot every N events
110type SnapshotStrategy struct {
111	snapshotInterval int
112}
113
114func NewSnapshotStrategy(interval int) *SnapshotStrategy {
115	return &SnapshotStrategy{snapshotInterval: interval}
116}
117
118func (s *SnapshotStrategy) ShouldTakeSnapshot(currentVersion int) bool {
119	return currentVersion%s.snapshotInterval == 0
120}
121
122func main() {
123	fmt.Println("Event Sourcing with Snapshots")
124	fmt.Println("=============================\n")
125
126	account := &Account{ID: "acc-123", Balance: 0, Version: 0}
127	strategy := NewSnapshotStrategy(100) // Snapshot every 100 events
128
129	// Simulate many transactions
130	events := make([]Event, 0)
131	for i := 1; i <= 250; i++ {
132		var event Event
133		if i%2 == 0 {
134			event = DepositedEvent{
135				ID:      account.ID,
136				Amount:  100.0,
137				Version: i,
138			}
139		} else {
140			event = WithdrawnEvent{
141				ID:      account.ID,
142				Amount:  50.0,
143				Version: i,
144			}
145		}
146		events = append(events, event)
147		account.Apply(event)
148
149		// Check if we should take a snapshot
150		if strategy.ShouldTakeSnapshot(i) {
151			snapshot, _ := account.CreateSnapshot()
152			fmt.Printf("📸 Snapshot taken at version %d (balance: $%.2f)\n",
153				snapshot.Version, account.Balance)
154		}
155	}
156
157	fmt.Printf("\nFinal state: Balance=$%.2f, Transactions=%d, Version=%d\n",
158		account.Balance, account.Transactions, account.Version)
159
160	// Demonstrate snapshot loading
161	fmt.Println("\n--- Snapshot Recovery Demonstration ---")
162
163	// Take final snapshot
164	finalSnapshot, _ := account.CreateSnapshot()
165
166	// Load from snapshot (much faster than replaying 250 events)
167	recovered, _ := LoadAccountFromSnapshot(finalSnapshot)
168	fmt.Printf("Recovered from snapshot: Balance=$%.2f, Version=%d\n",
169		recovered.Balance, recovered.Version)
170
171	// Only need to replay events after the snapshot
172	eventsAfterSnapshot := events[finalSnapshot.Version:]
173	fmt.Printf("Only need to replay %d events instead of %d\n",
174		len(eventsAfterSnapshot), len(events))
175}

Event Upcasting for Schema Evolution

As your system evolves, event schemas change. Event upcasting allows you to transform old event formats to new formats automatically.

  1// run
  2package main
  3
  4import (
  5	"encoding/json"
  6	"fmt"
  7)
  8
  9// Version 1 of OrderCreated event
 10type OrderCreatedV1 struct {
 11	OrderID    string  `json:"order_id"`
 12	CustomerID string  `json:"customer_id"`
 13	Total      float64 `json:"total"`
 14	Version    int     `json:"version"`
 15}
 16
 17// Version 2 adds currency and tax information
 18type OrderCreatedV2 struct {
 19	OrderID    string  `json:"order_id"`
 20	CustomerID string  `json:"customer_id"`
 21	Subtotal   float64 `json:"subtotal"`
 22	Tax        float64 `json:"tax"`
 23	Total      float64 `json:"total"`
 24	Currency   string  `json:"currency"`
 25	Version    int     `json:"version"`
 26}
 27
 28// Event metadata tracks schema version
 29type EventEnvelope struct {
 30	EventType     string          `json:"event_type"`
 31	SchemaVersion int             `json:"schema_version"`
 32	Data          json.RawMessage `json:"data"`
 33}
 34
 35// Upcaster interface
 36type EventUpcaster interface {
 37	Upcast(data json.RawMessage) (json.RawMessage, error)
 38	FromVersion() int
 39	ToVersion() int
 40}
 41
 42// Upcast OrderCreated from V1 to V2
 43type OrderCreatedV1ToV2Upcaster struct{}
 44
 45func (u OrderCreatedV1ToV2Upcaster) FromVersion() int { return 1 }
 46func (u OrderCreatedV1ToV2Upcaster) ToVersion() int   { return 2 }
 47
 48func (u OrderCreatedV1ToV2Upcaster) Upcast(data json.RawMessage) (json.RawMessage, error) {
 49	var v1 OrderCreatedV1
 50	if err := json.Unmarshal(data, &v1); err != nil {
 51		return nil, err
 52	}
 53
 54	// Transform V1 to V2 with defaults
 55	v2 := OrderCreatedV2{
 56		OrderID:    v1.OrderID,
 57		CustomerID: v1.CustomerID,
 58		Subtotal:   v1.Total * 0.9,  // Assume 10% tax
 59		Tax:        v1.Total * 0.1,
 60		Total:      v1.Total,
 61		Currency:   "USD",            // Default currency
 62		Version:    v1.Version,
 63	}
 64
 65	return json.Marshal(v2)
 66}
 67
 68// Upcaster registry
 69type UpcasterRegistry struct {
 70	upcasters map[string][]EventUpcaster
 71}
 72
 73func NewUpcasterRegistry() *UpcasterRegistry {
 74	return &UpcasterRegistry{
 75		upcasters: make(map[string][]EventUpcaster),
 76	}
 77}
 78
 79func (r *UpcasterRegistry) Register(eventType string, upcaster EventUpcaster) {
 80	r.upcasters[eventType] = append(r.upcasters[eventType], upcaster)
 81}
 82
 83func (r *UpcasterRegistry) Upcast(envelope EventEnvelope, targetVersion int) (json.RawMessage, error) {
 84	currentVersion := envelope.SchemaVersion
 85	currentData := envelope.Data
 86
 87	// Apply upcasters sequentially
 88	for currentVersion < targetVersion {
 89		upcasters := r.upcasters[envelope.EventType]
 90
 91		var applied bool
 92		for _, upcaster := range upcasters {
 93			if upcaster.FromVersion() == currentVersion {
 94				upcastedData, err := upcaster.Upcast(currentData)
 95				if err != nil {
 96					return nil, err
 97				}
 98				currentData = upcastedData
 99				currentVersion = upcaster.ToVersion()
100				applied = true
101				break
102			}
103		}
104
105		if !applied {
106			return nil, fmt.Errorf("no upcaster found from version %d", currentVersion)
107		}
108	}
109
110	return currentData, nil
111}
112
113func main() {
114	fmt.Println("Event Schema Evolution with Upcasting")
115	fmt.Println("=====================================\n")
116
117	// Create registry and register upcasters
118	registry := NewUpcasterRegistry()
119	registry.Register("OrderCreated", OrderCreatedV1ToV2Upcaster{})
120
121	// Old event in V1 format
122	v1Event := OrderCreatedV1{
123		OrderID:    "order-123",
124		CustomerID: "cust-456",
125		Total:      100.00,
126		Version:    1,
127	}
128
129	v1Data, _ := json.Marshal(v1Event)
130	envelope := EventEnvelope{
131		EventType:     "OrderCreated",
132		SchemaVersion: 1,
133		Data:          v1Data,
134	}
135
136	fmt.Println("Original V1 Event:")
137	fmt.Printf("%s\n", v1Data)
138
139	// Upcast to V2
140	v2Data, err := registry.Upcast(envelope, 2)
141	if err != nil {
142		fmt.Printf("Error: %v\n", err)
143		return
144	}
145
146	fmt.Println("\nUpcasted to V2:")
147	fmt.Printf("%s\n", v2Data)
148
149	// Verify the upcasted event
150	var v2Event OrderCreatedV2
151	json.Unmarshal(v2Data, &v2Event)
152
153	fmt.Println("\nParsed V2 Event:")
154	fmt.Printf("  Order ID: %s\n", v2Event.OrderID)
155	fmt.Printf("  Subtotal: $%.2f\n", v2Event.Subtotal)
156	fmt.Printf("  Tax: $%.2f\n", v2Event.Tax)
157	fmt.Printf("  Total: $%.2f\n", v2Event.Total)
158	fmt.Printf("  Currency: %s\n", v2Event.Currency)
159}

CQRS Pattern

Think of a library where you have different sections for different purposes. The reference desk is optimized for quickly finding information, while the processing room is optimized for cataloging new books. Each system is designed for its specific task, even though they're dealing with the same collection.

CQRS separates the part of your system that changes data from the part that reads data. This allows you to optimize each side independently.

CQRS separates read and write models for optimized performance and scalability.

When to Use CQRS vs Traditional CRUD:

  • Use CQRS: Complex business logic, different read/write patterns, high scalability needs, multiple read models
  • Use Traditional CRUD: Simple CRUD operations, low traffic, tight coupling between reads and writes is acceptable

⚠️ Important: CQRS adds complexity to your system. Don't use it unless you have specific performance or scalability requirements that justify the additional complexity.

CQRS Implementation

  1package cqrs
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"encoding/json"
  7	"fmt"
  8	"time"
  9)
 10
 11// Command side
 12
 13type Command interface {
 14	CommandType() string
 15}
 16
 17type CreateOrderCommand struct {
 18	OrderID    string
 19	CustomerID string
 20	Items      []OrderItem
 21}
 22
 23func (c CreateOrderCommand) CommandType() string { return "CreateOrder" }
 24
 25type PayOrderCommand struct {
 26	OrderID   string
 27	PaymentID string
 28	Amount    float64
 29}
 30
 31func (c PayOrderCommand) CommandType() string { return "PayOrder" }
 32
 33// Command handler
 34type CommandHandler interface {
 35	Handle(ctx context.Context, cmd Command) error
 36}
 37
 38type OrderCommandHandler struct {
 39	eventStore EventStore
 40	eventBus   EventBus
 41}
 42
 43func NewOrderCommandHandler(store EventStore, bus EventBus) *OrderCommandHandler {
 44	return &OrderCommandHandler{
 45		eventStore: store,
 46		eventBus:   bus,
 47	}
 48}
 49
 50func (h *OrderCommandHandler) Handle(ctx context.Context, cmd Command) error {
 51	switch c := cmd.(type) {
 52	case CreateOrderCommand:
 53		return h.handleCreateOrder(ctx, c)
 54	case PayOrderCommand:
 55		return h.handlePayOrder(ctx, c)
 56	default:
 57		return fmt.Errorf("unknown command: %s", c.CommandType())
 58	}
 59}
 60
 61func (h *OrderCommandHandler) handleCreateOrder(ctx context.Context, cmd CreateOrderCommand) error {
 62	// Create aggregate
 63	order, err := NewOrder(cmd.OrderID, cmd.CustomerID, cmd.Items)
 64	if err != nil {
 65		return err
 66	}
 67
 68	// Save events
 69	events := order.GetUncommittedEvents()
 70	if err := h.eventStore.SaveEvents(ctx, cmd.OrderID, events, 0); err != nil {
 71		return err
 72	}
 73
 74	// Publish events
 75	for _, event := range events {
 76		h.eventBus.Publish(ctx, event)
 77	}
 78
 79	order.ClearUncommittedEvents()
 80	return nil
 81}
 82
 83func (h *OrderCommandHandler) handlePayOrder(ctx context.Context, cmd PayOrderCommand) error {
 84	// Load aggregate from events
 85	events, err := h.eventStore.GetEvents(ctx, cmd.OrderID)
 86	if err != nil {
 87		return err
 88	}
 89
 90	order := RebuildOrder(events)
 91
 92	// Execute command
 93	if err := order.Pay(cmd.PaymentID, cmd.Amount); err != nil {
 94		return err
 95	}
 96
 97	// Save new events
 98	newEvents := order.GetUncommittedEvents()
 99	if err := h.eventStore.SaveEvents(ctx, cmd.OrderID, newEvents, order.Version-len(newEvents)); err != nil {
100		return err
101	}
102
103	// Publish events
104	for _, event := range newEvents {
105		h.eventBus.Publish(ctx, event)
106	}
107
108	order.ClearUncommittedEvents()
109	return nil
110}
111
112// Query side
113
114type OrderReadModel struct {
115	ID             string
116	CustomerID     string
117	Items          []OrderItem
118	Total          float64
119	Status         string
120	PaymentID      string
121	TrackingNumber string
122	CreatedAt      time.Time
123	UpdatedAt      time.Time
124}
125
126type QueryHandler interface {
127	Handle(ctx context.Context, query interface{}) (interface{}, error)
128}
129
130type OrderQueryHandler struct {
131	db *sql.DB
132}
133
134func NewOrderQueryHandler(db *sql.DB) *OrderQueryHandler {
135	return &OrderQueryHandler{db: db}
136}
137
138type GetOrderQuery struct {
139	OrderID string
140}
141
142type ListOrdersQuery struct {
143	CustomerID string
144	Status     string
145	Limit      int
146	Offset     int
147}
148
149func (q *OrderQueryHandler) Handle(ctx context.Context, query interface{}) (interface{}, error) {
150	switch qry := query.(type) {
151	case GetOrderQuery:
152		return q.getOrder(ctx, qry)
153	case ListOrdersQuery:
154		return q.listOrders(ctx, qry)
155	default:
156		return nil, fmt.Errorf("unknown query type")
157	}
158}
159
160func (q *OrderQueryHandler) getOrder(ctx context.Context, query GetOrderQuery) (*OrderReadModel, error) {
161	row := q.db.QueryRowContext(ctx, `
162		SELECT id, customer_id, items, total, status, payment_id, tracking_number, created_at, updated_at
163		FROM order_read_model
164		WHERE id = $1
165	`, query.OrderID)
166
167	var order OrderReadModel
168	var itemsJSON []byte
169
170	err := row.Scan(
171		&order.ID,
172		&order.CustomerID,
173		&itemsJSON,
174		&order.Total,
175		&order.Status,
176		&order.PaymentID,
177		&order.TrackingNumber,
178		&order.CreatedAt,
179		&order.UpdatedAt,
180	)
181
182	if err != nil {
183		return nil, err
184	}
185
186	json.Unmarshal(itemsJSON, &order.Items)
187
188	return &order, nil
189}
190
191func (q *OrderQueryHandler) listOrders(ctx context.Context, query ListOrdersQuery) ([]*OrderReadModel, error) {
192	rows, err := q.db.QueryContext(ctx, `
193		SELECT id, customer_id, items, total, status, created_at
194		FROM order_read_model
195		WHERE customer_id = $1 AND status = $2
196		ORDER BY created_at DESC
197		LIMIT $3 OFFSET $4
198	`, query.CustomerID, query.Status, query.Limit, query.Offset)
199
200	if err != nil {
201		return nil, err
202	}
203	defer rows.Close()
204
205	var orders []*OrderReadModel
206	for rows.Next() {
207		var order OrderReadModel
208		var itemsJSON []byte
209
210		if err := rows.Scan(
211			&order.ID,
212			&order.CustomerID,
213			&itemsJSON,
214			&order.Total,
215			&order.Status,
216			&order.CreatedAt,
217		); err != nil {
218			continue
219		}
220
221		json.Unmarshal(itemsJSON, &order.Items)
222		orders = append(orders, &order)
223	}
224
225	return orders, nil
226}
227
228// Projection builder
229type OrderProjection struct {
230	db *sql.DB
231}
232
233func NewOrderProjection(db *sql.DB) *OrderProjection {
234	return &OrderProjection{db: db}
235}
236
237func (p *OrderProjection) HandleEvent(ctx context.Context, event Event) error {
238	switch e := event.(type) {
239	case OrderCreatedEvent:
240		return p.handleOrderCreated(ctx, e)
241	case OrderPaidEvent:
242		return p.handleOrderPaid(ctx, e)
243	case OrderShippedEvent:
244		return p.handleOrderShipped(ctx, e)
245	case OrderCancelledEvent:
246		return p.handleOrderCancelled(ctx, e)
247	}
248	return nil
249}
250
251func (p *OrderProjection) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
252	itemsJSON, _ := json.Marshal(event.Items)
253
254	_, err := p.db.ExecContext(ctx, `
255		INSERT INTO order_read_model
256		(id, customer_id, items, total, status, created_at, updated_at)
257		VALUES ($1, $2, $3, $4, $5, $6, $7)
258	`, event.ID, event.CustomerID, itemsJSON, event.Total, "pending", event.Timestamp, event.Timestamp)
259
260	return err
261}
262
263func (p *OrderProjection) handleOrderPaid(ctx context.Context, event OrderPaidEvent) error {
264	_, err := p.db.ExecContext(ctx, `
265		UPDATE order_read_model
266		SET status = $1, payment_id = $2, updated_at = $3
267		WHERE id = $4
268	`, "paid", event.PaymentID, event.Timestamp, event.ID)
269
270	return err
271}
272
273func (p *OrderProjection) handleOrderShipped(ctx context.Context, event OrderShippedEvent) error {
274	_, err := p.db.ExecContext(ctx, `
275		UPDATE order_read_model
276		SET status = $1, tracking_number = $2, updated_at = $3
277		WHERE id = $4
278	`, "shipped", event.TrackingNumber, event.Timestamp, event.ID)
279
280	return err
281}
282
283func (p *OrderProjection) handleOrderCancelled(ctx context.Context, event OrderCancelledEvent) error {
284	_, err := p.db.ExecContext(ctx, `
285		UPDATE order_read_model
286		SET status = $1, updated_at = $2
287		WHERE id = $3
288	`, "cancelled", event.Timestamp, event.ID)
289
290	return err
291}

Multiple Read Models for Different Use Cases

One of CQRS's powerful features is creating multiple read models optimized for different query patterns:

  1// run
  2package main
  3
  4import (
  5	"encoding/json"
  6	"fmt"
  7	"sort"
  8	"time"
  9)
 10
 11// Different read models for different purposes
 12
 13// 1. Order Summary View - Optimized for list views
 14type OrderSummaryView struct {
 15	OrderID     string
 16	CustomerID  string
 17	Total       float64
 18	Status      string
 19	OrderDate   time.Time
 20	ItemCount   int
 21}
 22
 23// 2. Order Details View - Optimized for detail pages
 24type OrderDetailsView struct {
 25	OrderID        string
 26	CustomerID     string
 27	CustomerName   string
 28	Items          []OrderItemDetail
 29	Subtotal       float64
 30	Tax            float64
 31	Total          float64
 32	Status         string
 33	PaymentMethod  string
 34	ShippingAddr   string
 35	TrackingNumber string
 36	OrderDate      time.Time
 37	ShipDate       *time.Time
 38}
 39
 40type OrderItemDetail struct {
 41	ProductID   string
 42	ProductName string
 43	Quantity    int
 44	UnitPrice   float64
 45	TotalPrice  float64
 46}
 47
 48// 3. Customer Order History View - Optimized for customer pages
 49type CustomerOrderHistoryView struct {
 50	CustomerID     string
 51	CustomerName   string
 52	TotalOrders    int
 53	TotalSpent     float64
 54	AverageOrder   float64
 55	RecentOrders   []RecentOrder
 56	TopProducts    []ProductPurchase
 57}
 58
 59type RecentOrder struct {
 60	OrderID   string
 61	Total     float64
 62	Status    string
 63	OrderDate time.Time
 64}
 65
 66type ProductPurchase struct {
 67	ProductID    string
 68	ProductName  string
 69	TimesPurchased int
 70	TotalSpent   float64
 71}
 72
 73// 4. Analytics View - Optimized for reporting
 74type SalesAnalyticsView struct {
 75	Period           string
 76	TotalOrders      int
 77	TotalRevenue     float64
 78	AverageOrderValue float64
 79	OrdersByStatus   map[string]int
 80	TopCustomers     []CustomerSales
 81	TopProducts      []ProductSales
 82}
 83
 84type CustomerSales struct {
 85	CustomerID   string
 86	CustomerName string
 87	OrderCount   int
 88	TotalSpent   float64
 89}
 90
 91type ProductSales struct {
 92	ProductID    string
 93	ProductName  string
 94	UnitsSold    int
 95	Revenue      float64
 96}
 97
 98// Projection manager builds all read models from events
 99type MultiViewProjection struct {
100	summaries  map[string]*OrderSummaryView
101	details    map[string]*OrderDetailsView
102	customers  map[string]*CustomerOrderHistoryView
103	analytics  *SalesAnalyticsView
104}
105
106func NewMultiViewProjection() *MultiViewProjection {
107	return &MultiViewProjection{
108		summaries: make(map[string]*OrderSummaryView),
109		details:   make(map[string]*OrderDetailsView),
110		customers: make(map[string]*CustomerOrderHistoryView),
111		analytics: &SalesAnalyticsView{
112			OrdersByStatus: make(map[string]int),
113			TopCustomers:   make([]CustomerSales, 0),
114			TopProducts:    make([]ProductSales, 0),
115		},
116	}
117}
118
119type OrderEvent interface {
120	GetOrderID() string
121}
122
123type OrderCreatedEvent struct {
124	OrderID      string
125	CustomerID   string
126	CustomerName string
127	Items        []OrderItemDetail
128	Subtotal     float64
129	Tax          float64
130	Total        float64
131	OrderDate    time.Time
132}
133
134func (e OrderCreatedEvent) GetOrderID() string { return e.OrderID }
135
136func (p *MultiViewProjection) HandleOrderCreated(event OrderCreatedEvent) {
137	// 1. Update summary view
138	p.summaries[event.OrderID] = &OrderSummaryView{
139		OrderID:    event.OrderID,
140		CustomerID: event.CustomerID,
141		Total:      event.Total,
142		Status:     "pending",
143		OrderDate:  event.OrderDate,
144		ItemCount:  len(event.Items),
145	}
146
147	// 2. Update details view
148	p.details[event.OrderID] = &OrderDetailsView{
149		OrderID:      event.OrderID,
150		CustomerID:   event.CustomerID,
151		CustomerName: event.CustomerName,
152		Items:        event.Items,
153		Subtotal:     event.Subtotal,
154		Tax:          event.Tax,
155		Total:        event.Total,
156		Status:       "pending",
157		OrderDate:    event.OrderDate,
158	}
159
160	// 3. Update customer history view
161	if customer, exists := p.customers[event.CustomerID]; exists {
162		customer.TotalOrders++
163		customer.TotalSpent += event.Total
164		customer.AverageOrder = customer.TotalSpent / float64(customer.TotalOrders)
165		customer.RecentOrders = append([]RecentOrder{{
166			OrderID:   event.OrderID,
167			Total:     event.Total,
168			Status:    "pending",
169			OrderDate: event.OrderDate,
170		}}, customer.RecentOrders...)
171
172		// Update top products
173		for _, item := range event.Items {
174			found := false
175			for i := range customer.TopProducts {
176				if customer.TopProducts[i].ProductID == item.ProductID {
177					customer.TopProducts[i].TimesPurchased++
178					customer.TopProducts[i].TotalSpent += item.TotalPrice
179					found = true
180					break
181				}
182			}
183			if !found {
184				customer.TopProducts = append(customer.TopProducts, ProductPurchase{
185					ProductID:      item.ProductID,
186					ProductName:    item.ProductName,
187					TimesPurchased: 1,
188					TotalSpent:     item.TotalPrice,
189				})
190			}
191		}
192	} else {
193		p.customers[event.CustomerID] = &CustomerOrderHistoryView{
194			CustomerID:   event.CustomerID,
195			CustomerName: event.CustomerName,
196			TotalOrders:  1,
197			TotalSpent:   event.Total,
198			AverageOrder: event.Total,
199			RecentOrders: []RecentOrder{{
200				OrderID:   event.OrderID,
201				Total:     event.Total,
202				Status:    "pending",
203				OrderDate: event.OrderDate,
204			}},
205			TopProducts: make([]ProductPurchase, 0),
206		}
207	}
208
209	// 4. Update analytics view
210	p.analytics.TotalOrders++
211	p.analytics.TotalRevenue += event.Total
212	p.analytics.AverageOrderValue = p.analytics.TotalRevenue / float64(p.analytics.TotalOrders)
213	p.analytics.OrdersByStatus["pending"]++
214}
215
216func main() {
217	fmt.Println("CQRS: Multiple Read Models")
218	fmt.Println("===========================\n")
219
220	projection := NewMultiViewProjection()
221
222	// Simulate order creation events
223	events := []OrderCreatedEvent{
224		{
225			OrderID:      "order-1",
226			CustomerID:   "cust-1",
227			CustomerName: "Alice Smith",
228			Items: []OrderItemDetail{
229				{ProductID: "prod-1", ProductName: "Laptop", Quantity: 1, UnitPrice: 999.99, TotalPrice: 999.99},
230				{ProductID: "prod-2", ProductName: "Mouse", Quantity: 2, UnitPrice: 29.99, TotalPrice: 59.98},
231			},
232			Subtotal:  1059.97,
233			Tax:       105.99,
234			Total:     1165.96,
235			OrderDate: time.Now(),
236		},
237		{
238			OrderID:      "order-2",
239			CustomerID:   "cust-1",
240			CustomerName: "Alice Smith",
241			Items: []OrderItemDetail{
242				{ProductID: "prod-3", ProductName: "Keyboard", Quantity: 1, UnitPrice: 149.99, TotalPrice: 149.99},
243			},
244			Subtotal:  149.99,
245			Tax:       15.00,
246			Total:     164.99,
247			OrderDate: time.Now().Add(-24 * time.Hour),
248		},
249		{
250			OrderID:      "order-3",
251			CustomerID:   "cust-2",
252			CustomerName: "Bob Johnson",
253			Items: []OrderItemDetail{
254				{ProductID: "prod-1", ProductName: "Laptop", Quantity: 1, UnitPrice: 999.99, TotalPrice: 999.99},
255			},
256			Subtotal:  999.99,
257			Tax:       100.00,
258			Total:     1099.99,
259			OrderDate: time.Now().Add(-2 * time.Hour),
260		},
261	}
262
263	// Process events
264	for _, event := range events {
265		projection.HandleOrderCreated(event)
266	}
267
268	// Query 1: Order summary view
269	fmt.Println("1. Order Summary View (for list pages):")
270	for _, summary := range projection.summaries {
271		fmt.Printf("   Order %s: Customer=%s, Total=$%.2f, Items=%d, Status=%s\n",
272			summary.OrderID, summary.CustomerID, summary.Total, summary.ItemCount, summary.Status)
273	}
274
275	// Query 2: Order details view
276	fmt.Println("\n2. Order Details View (for detail pages):")
277	if details, exists := projection.details["order-1"]; exists {
278		fmt.Printf("   Order: %s\n", details.OrderID)
279		fmt.Printf("   Customer: %s\n", details.CustomerName)
280		fmt.Printf("   Items:\n")
281		for _, item := range details.Items {
282			fmt.Printf("     - %s (x%d) @ $%.2f = $%.2f\n",
283				item.ProductName, item.Quantity, item.UnitPrice, item.TotalPrice)
284		}
285		fmt.Printf("   Subtotal: $%.2f\n", details.Subtotal)
286		fmt.Printf("   Tax: $%.2f\n", details.Tax)
287		fmt.Printf("   Total: $%.2f\n", details.Total)
288	}
289
290	// Query 3: Customer history view
291	fmt.Println("\n3. Customer Order History View (for customer pages):")
292	if history, exists := projection.customers["cust-1"]; exists {
293		fmt.Printf("   Customer: %s\n", history.CustomerName)
294		fmt.Printf("   Total Orders: %d\n", history.TotalOrders)
295		fmt.Printf("   Total Spent: $%.2f\n", history.TotalSpent)
296		fmt.Printf("   Average Order: $%.2f\n", history.AverageOrder)
297		fmt.Printf("   Recent Orders:\n")
298		for _, order := range history.RecentOrders {
299			fmt.Printf("     - %s: $%.2f (%s)\n", order.OrderID, order.Total, order.Status)
300		}
301		fmt.Printf("   Top Products:\n")
302
303		// Sort by times purchased
304		sort.Slice(history.TopProducts, func(i, j int) bool {
305			return history.TopProducts[i].TimesPurchased > history.TopProducts[j].TimesPurchased
306		})
307
308		for _, product := range history.TopProducts {
309			fmt.Printf("     - %s: purchased %d times, spent $%.2f\n",
310				product.ProductName, product.TimesPurchased, product.TotalSpent)
311		}
312	}
313
314	// Query 4: Analytics view
315	fmt.Println("\n4. Sales Analytics View (for reporting):")
316	fmt.Printf("   Total Orders: %d\n", projection.analytics.TotalOrders)
317	fmt.Printf("   Total Revenue: $%.2f\n", projection.analytics.TotalRevenue)
318	fmt.Printf("   Average Order Value: $%.2f\n", projection.analytics.AverageOrderValue)
319	fmt.Printf("   Orders by Status:\n")
320	for status, count := range projection.analytics.OrdersByStatus {
321		fmt.Printf("     - %s: %d\n", status, count)
322	}
323
324	fmt.Println("\n💡 Each view is optimized for its specific use case!")
325	fmt.Println("   - Summary: Fast list queries")
326	fmt.Println("   - Details: Complete order information")
327	fmt.Println("   - Customer History: Customer-centric insights")
328	fmt.Println("   - Analytics: Business intelligence")
329}

Event Store Implementation

Imagine a historian's archive where every event is recorded chronologically and never altered. New events are always added to the end of the timeline, creating a complete, immutable record of history. This is exactly how an event store works - it's an append-only log of all events that have occurred in your system.

Event store persists events in an append-only log.

Real-world Example: Financial trading systems use event stores to maintain a complete audit trail of all trades. Every buy, sell, cancel, and modify operation is recorded as an immutable event, ensuring complete traceability and regulatory compliance.

💡 Key Takeaway: The append-only nature of event stores provides strong consistency guarantees and enables powerful features like event replay and temporal queries, but requires different thinking than traditional databases.

PostgreSQL Event Store

  1package eventstore
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"encoding/json"
  7	"errors"
  8	"fmt"
  9	"time"
 10)
 11
 12type EventStore interface {
 13	SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
 14	GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
 15	GetEventsSince(ctx context.Context, aggregateID string, version int) ([]Event, error)
 16	GetAllEvents(ctx context.Context, limit, offset int) ([]Event, error)
 17}
 18
 19type PostgresEventStore struct {
 20	db *sql.DB
 21}
 22
 23func NewPostgresEventStore(db *sql.DB) *PostgresEventStore {
 24	return &PostgresEventStore{db: db}
 25}
 26
 27type StoredEvent struct {
 28	ID            int64
 29	AggregateID   string
 30	AggregateType string
 31	EventType     string
 32	EventData     json.RawMessage
 33	Metadata      json.RawMessage
 34	Version       int
 35	CreatedAt     time.Time
 36}
 37
 38func (s *PostgresEventStore) SaveEvents(
 39	ctx context.Context,
 40	aggregateID string,
 41	events []Event,
 42	expectedVersion int,
 43) error {
 44	tx, err := s.db.BeginTx(ctx, nil)
 45	if err != nil {
 46		return err
 47	}
 48	defer tx.Rollback()
 49
 50	// Check current version for optimistic concurrency control
 51	var currentVersion int
 52	err = tx.QueryRowContext(ctx, `
 53		SELECT COALESCE(MAX(version), 0)
 54		FROM events
 55		WHERE aggregate_id = $1
 56	`, aggregateID).Scan(&currentVersion)
 57
 58	if err != nil && err != sql.ErrNoRows {
 59		return err
 60	}
 61
 62	if currentVersion != expectedVersion {
 63		return errors.New("concurrency conflict: version mismatch")
 64	}
 65
 66	// Insert events
 67	for _, event := range events {
 68		eventData, err := json.Marshal(event)
 69		if err != nil {
 70			return err
 71		}
 72
 73		_, err = tx.ExecContext(ctx, `
 74			INSERT INTO events
 75			(aggregate_id, aggregate_type, event_type, event_data, version, created_at)
 76			VALUES ($1, $2, $3, $4, $5, $6)
 77		`, aggregateID, "Order", event.EventType(), eventData, event.EventVersion(), event.OccurredAt())
 78
 79		if err != nil {
 80			return err
 81		}
 82	}
 83
 84	return tx.Commit()
 85}
 86
 87func (s *PostgresEventStore) GetEvents(ctx context.Context, aggregateID string) ([]Event, error) {
 88	rows, err := s.db.QueryContext(ctx, `
 89		SELECT event_type, event_data, version, created_at
 90		FROM events
 91		WHERE aggregate_id = $1
 92		ORDER BY version ASC
 93	`, aggregateID)
 94
 95	if err != nil {
 96		return nil, err
 97	}
 98	defer rows.Close()
 99
100	var events []Event
101	for rows.Next() {
102		var eventType string
103		var eventData json.RawMessage
104		var version int
105		var createdAt time.Time
106
107		if err := rows.Scan(&eventType, &eventData, &version, &createdAt); err != nil {
108			return nil, err
109		}
110
111		event := s.deserializeEvent(eventType, eventData, version, createdAt)
112		if event != nil {
113			events = append(events, event)
114		}
115	}
116
117	return events, nil
118}
119
120func (s *PostgresEventStore) GetEventsSince(
121	ctx context.Context,
122	aggregateID string,
123	version int,
124) ([]Event, error) {
125	rows, err := s.db.QueryContext(ctx, `
126		SELECT event_type, event_data, version, created_at
127		FROM events
128		WHERE aggregate_id = $1 AND version > $2
129		ORDER BY version ASC
130	`, aggregateID, version)
131
132	if err != nil {
133		return nil, err
134	}
135	defer rows.Close()
136
137	var events []Event
138	for rows.Next() {
139		var eventType string
140		var eventData json.RawMessage
141		var ver int
142		var createdAt time.Time
143
144		if err := rows.Scan(&eventType, &eventData, &ver, &createdAt); err != nil {
145			return nil, err
146		}
147
148		event := s.deserializeEvent(eventType, eventData, ver, createdAt)
149		if event != nil {
150			events = append(events, event)
151		}
152	}
153
154	return events, nil
155}
156
157func (s *PostgresEventStore) GetAllEvents(ctx context.Context, limit, offset int) ([]Event, error) {
158	rows, err := s.db.QueryContext(ctx, `
159		SELECT aggregate_id, event_type, event_data, version, created_at
160		FROM events
161		ORDER BY id ASC
162		LIMIT $1 OFFSET $2
163	`, limit, offset)
164
165	if err != nil {
166		return nil, err
167	}
168	defer rows.Close()
169
170	var events []Event
171	for rows.Next() {
172		var aggregateID, eventType string
173		var eventData json.RawMessage
174		var version int
175		var createdAt time.Time
176
177		if err := rows.Scan(&aggregateID, &eventType, &eventData, &version, &createdAt); err != nil {
178			return nil, err
179		}
180
181		event := s.deserializeEvent(eventType, eventData, version, createdAt)
182		if event != nil {
183			events = append(events, event)
184		}
185	}
186
187	return events, nil
188}
189
190func (s *PostgresEventStore) deserializeEvent(
191	eventType string,
192	data json.RawMessage,
193	version int,
194	timestamp time.Time,
195) Event {
196	switch eventType {
197	case "OrderCreated":
198		var event OrderCreatedEvent
199		json.Unmarshal(data, &event)
200		return event
201
202	case "OrderPaid":
203		var event OrderPaidEvent
204		json.Unmarshal(data, &event)
205		return event
206
207	case "OrderShipped":
208		var event OrderShippedEvent
209		json.Unmarshal(data, &event)
210		return event
211
212	case "OrderCancelled":
213		var event OrderCancelledEvent
214		json.Unmarshal(data, &event)
215		return event
216
217	default:
218		return nil
219	}
220}
221
222// Snapshot support for performance
223type Snapshot struct {
224	AggregateID string
225	State       json.RawMessage
226	Version     int
227	CreatedAt   time.Time
228}
229
230func (s *PostgresEventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
231	_, err := s.db.ExecContext(ctx, `
232		INSERT INTO snapshots (aggregate_id, state, version, created_at)
233		VALUES ($1, $2, $3, $4)
234		ON CONFLICT (aggregate_id) DO UPDATE
235		SET state = $2, version = $3, created_at = $4
236	`, snapshot.AggregateID, snapshot.State, snapshot.Version, snapshot.CreatedAt)
237
238	return err
239}
240
241func (s *PostgresEventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
242	var snapshot Snapshot
243	err := s.db.QueryRowContext(ctx, `
244		SELECT aggregate_id, state, version, created_at
245		FROM snapshots
246		WHERE aggregate_id = $1
247	`, aggregateID).Scan(&snapshot.AggregateID, &snapshot.State, &snapshot.Version, &snapshot.CreatedAt)
248
249	if err == sql.ErrNoRows {
250		return nil, nil
251	}
252
253	return &snapshot, err
254}
255
256// Database schema
257const schema = `
258CREATE TABLE IF NOT EXISTS events (
259	id BIGSERIAL PRIMARY KEY,
260	aggregate_id VARCHAR(255) NOT NULL,
261	aggregate_type VARCHAR(255) NOT NULL,
262	event_type VARCHAR(255) NOT NULL,
263	event_data JSONB NOT NULL,
264	metadata JSONB,
265	version INTEGER NOT NULL,
266	created_at TIMESTAMP NOT NULL,
267	UNIQUE (aggregate_id, version)
268);
269
270CREATE INDEX IF NOT EXISTS idx_events_aggregate ON events(aggregate_id, version);
271CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
272CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at);
273
274CREATE TABLE IF NOT EXISTS snapshots (
275	aggregate_id VARCHAR(255) PRIMARY KEY,
276	state JSONB NOT NULL,
277	version INTEGER NOT NULL,
278	created_at TIMESTAMP NOT NULL
279);
280`

Event Bus and Message Brokers

Think of an event bus like a city's postal service. When you send a letter, you don't need to know exactly who will receive it or how it will get there. The postal service handles routing the letter to all interested recipients who have signed up for that type of mail.

Event bus distributes events to subscribers for processing.

When to Use Different Message Brokers:

  • In-Memory: Simple applications, testing, single process
  • RabbitMQ: Complex routing, reliable delivery, message acknowledgments
  • Apache Kafka: High throughput, event streaming, replay capabilities
  • Redis Pub/Sub: Simple pub/sub, high speed, lightweight
  • NATS: Cloud-native, high performance, geographic distribution

⚠️ Important: Choose your message broker based on your reliability needs. In-memory buses don't persist messages, so if a service is down when an event is published, that event is lost forever.

In-Memory Event Bus

 1package eventbus
 2
 3import (
 4	"context"
 5	"log"
 6	"sync"
 7)
 8
 9type EventHandler func(ctx context.Context, event Event) error
10
11type EventBus interface {
12	Subscribe(eventType string, handler EventHandler)
13	Publish(ctx context.Context, event Event) error
14}
15
16type InMemoryEventBus struct {
17	mu       sync.RWMutex
18	handlers map[string][]EventHandler
19}
20
21func NewInMemoryEventBus() *InMemoryEventBus {
22	return &InMemoryEventBus{
23		handlers: make(map[string][]EventHandler),
24	}
25}
26
27func (bus *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) {
28	bus.mu.Lock()
29	defer bus.mu.Unlock()
30
31	bus.handlers[eventType] = append(bus.handlers[eventType], handler)
32}
33
34func (bus *InMemoryEventBus) Publish(ctx context.Context, event Event) error {
35	bus.mu.RLock()
36	handlers := bus.handlers[event.EventType()]
37	bus.mu.RUnlock()
38
39	var wg sync.WaitGroup
40	errors := make(chan error, len(handlers))
41
42	for _, handler := range handlers {
43		wg.Add(1)
44		go func(h EventHandler) {
45			defer wg.Done()
46			if err := h(ctx, event); err != nil {
47				errors <- err
48			}
49		}(handler)
50	}
51
52	wg.Wait()
53	close(errors)
54
55	// Log errors but don't fail the publish
56	for err := range errors {
57		log.Printf("Event handler error: %v", err)
58	}
59
60	return nil
61}

RabbitMQ Event Bus

  1package eventbus
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7
  8	amqp "github.com/rabbitmq/amqp091-go"
  9)
 10
 11type RabbitMQEventBus struct {
 12	conn     *amqp.Connection
 13	channel  *amqp.Channel
 14	exchange string
 15}
 16
 17func NewRabbitMQEventBus(url, exchange string) (*RabbitMQEventBus, error) {
 18	conn, err := amqp.Dial(url)
 19	if err != nil {
 20		return nil, err
 21	}
 22
 23	channel, err := conn.Channel()
 24	if err != nil {
 25		return nil, err
 26	}
 27
 28	// Declare exchange
 29	err = channel.ExchangeDeclare(
 30		exchange,
 31		"topic",
 32		true,
 33		false,
 34		false,
 35		false,
 36		nil,
 37	)
 38	if err != nil {
 39		return nil, err
 40	}
 41
 42	return &RabbitMQEventBus{
 43		conn:     conn,
 44		channel:  channel,
 45		exchange: exchange,
 46	}, nil
 47}
 48
 49func (bus *RabbitMQEventBus) Publish(ctx context.Context, event Event) error {
 50	data, err := json.Marshal(event)
 51	if err != nil {
 52		return err
 53	}
 54
 55	return bus.channel.PublishWithContext(
 56		ctx,
 57		bus.exchange,
 58		event.EventType(), // routing key
 59		false,
 60		false,
 61		amqp.Publishing{
 62			ContentType: "application/json",
 63			Body:        data,
 64		},
 65	)
 66}
 67
 68func (bus *RabbitMQEventBus) Subscribe(queue, eventType string, handler EventHandler) error {
 69	// Declare queue
 70	q, err := bus.channel.QueueDeclare(
 71		queue,
 72		true,
 73		false,
 74		false,
 75		false,
 76		nil,
 77	)
 78	if err != nil {
 79		return err
 80	}
 81
 82	// Bind queue to exchange
 83	err = bus.channel.QueueBind(
 84		q.Name,
 85		eventType,
 86		bus.exchange,
 87		false,
 88		nil,
 89	)
 90	if err != nil {
 91		return err
 92	}
 93
 94	// Consume messages
 95	msgs, err := bus.channel.Consume(
 96		q.Name,
 97		"",
 98		false,
 99		false,
100		false,
101		false,
102		nil,
103	)
104	if err != nil {
105		return err
106	}
107
108	go func() {
109		for msg := range msgs {
110			var event Event
111			if err := json.Unmarshal(msg.Body, &event); err != nil {
112				msg.Nack(false, false)
113				continue
114			}
115
116			if err := handler(context.Background(), event); err != nil {
117				msg.Nack(false, true) // requeue
118			} else {
119				msg.Ack(false)
120			}
121		}
122	}()
123
124	return nil
125}
126
127func (bus *RabbitMQEventBus) Close() error {
128	bus.channel.Close()
129	return bus.conn.Close()
130}

Production Patterns

Think of event-driven systems in production like a live television broadcast. Everything needs to work perfectly, but when things go wrong, you need backup plans, retry mechanisms, and the ability to replay segments if something was missed.

Production event-driven systems require robust error handling, monitoring, and replay capabilities.

Common Pitfalls in Production:

  • Not handling duplicate events (idempotency)
  • Losing events during system failures
  • Projections getting out of sync with events
  • Schema evolution breaking existing consumers
  • Monitoring that doesn't capture async failures
  • Cascading failures when consumers fail

💡 Key Takeaway: Production event-driven systems fail in ways that traditional systems don't. Plan for failure, implement comprehensive monitoring, and design for recovery from the start.

Event Replay and Projection Rebuilding

 1package projection
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log"
 7	"time"
 8)
 9
10type ProjectionManager struct {
11	eventStore  EventStore
12	projections []Projection
13}
14
15type Projection interface {
16	Name() string
17	HandleEvent(ctx context.Context, event Event) error
18	Reset(ctx context.Context) error
19}
20
21func NewProjectionManager(store EventStore) *ProjectionManager {
22	return &ProjectionManager{
23		eventStore:  store,
24		projections: []Projection{},
25	}
26}
27
28func (pm *ProjectionManager) RegisterProjection(projection Projection) {
29	pm.projections = append(pm.projections, projection)
30}
31
32// Rebuild all projections from events
33func (pm *ProjectionManager) RebuildAll(ctx context.Context) error {
34	for _, projection := range pm.projections {
35		if err := pm.Rebuild(ctx, projection); err != nil {
36			return err
37		}
38	}
39	return nil
40}
41
42func (pm *ProjectionManager) Rebuild(ctx context.Context, projection Projection) error {
43	log.Printf("Rebuilding projection: %s", projection.Name())
44
45	// Reset projection
46	if err := projection.Reset(ctx); err != nil {
47		return fmt.Errorf("failed to reset projection: %w", err)
48	}
49
50	// Replay all events
51	offset := 0
52	batchSize := 100
53
54	for {
55		events, err := pm.eventStore.GetAllEvents(ctx, batchSize, offset)
56		if err != nil {
57			return err
58		}
59
60		if len(events) == 0 {
61			break
62		}
63
64		for _, event := range events {
65			if err := projection.HandleEvent(ctx, event); err != nil {
66				log.Printf("Error handling event in projection %s: %v", projection.Name(), err)
67			}
68		}
69
70		offset += len(events)
71		log.Printf("Processed %d events for projection %s", offset, projection.Name())
72
73		if len(events) < batchSize {
74			break
75		}
76	}
77
78	log.Printf("Projection %s rebuilt successfully", projection.Name())
79	return nil
80}
81
82// Start continuous projection
83func (pm *ProjectionManager) Start(ctx context.Context, eventBus EventBus) {
84	for _, projection := range pm.projections {
85		p := projection
86		eventBus.Subscribe("*", func(ctx context.Context, event Event) error {
87			return p.HandleEvent(ctx, event)
88		})
89	}
90}

Idempotency and Deduplication

 1package idempotency
 2
 3import (
 4	"context"
 5	"database/sql"
 6	"fmt"
 7	"time"
 8)
 9
10type IdempotencyChecker struct {
11	db *sql.DB
12}
13
14func NewIdempotencyChecker(db *sql.DB) *IdempotencyChecker {
15	return &IdempotencyChecker{db: db}
16}
17
18func (ic *IdempotencyChecker) IsProcessed(ctx context.Context, eventID string) (bool, error) {
19	var count int
20	err := ic.db.QueryRowContext(ctx, `
21		SELECT COUNT(*) FROM processed_events WHERE event_id = $1
22	`, eventID).Scan(&count)
23
24	if err != nil {
25		return false, err
26	}
27
28	return count > 0, nil
29}
30
31func (ic *IdempotencyChecker) MarkProcessed(ctx context.Context, eventID string) error {
32	_, err := ic.db.ExecContext(ctx, `
33		INSERT INTO processed_events (event_id, processed_at)
34		VALUES ($1, $2)
35		ON CONFLICT (event_id) DO NOTHING
36	`, eventID, time.Now())
37
38	return err
39}
40
41// Idempotent event handler wrapper
42func MakeIdempotent(checker *IdempotencyChecker, handler EventHandler) EventHandler {
43	return func(ctx context.Context, event Event) error {
44		eventID := event.AggregateID() + "-" + event.EventType() + "-" + fmt.Sprint(event.EventVersion())
45
46		processed, err := checker.IsProcessed(ctx, eventID)
47		if err != nil {
48			return err
49		}
50
51		if processed {
52			return nil // Already processed
53		}
54
55		if err := handler(ctx, event); err != nil {
56			return err
57		}
58
59		return checker.MarkProcessed(ctx, eventID)
60	}
61}

Saga Pattern for Distributed Transactions

A saga is a sequence of local transactions where each transaction updates data within a single service. If one transaction fails, the saga executes compensating transactions to undo the impact of the preceding transactions.

  1// run
  2package main
  3
  4import (
  5	"context"
  6	"errors"
  7	"fmt"
  8	"sync"
  9	"time"
 10)
 11
 12// Saga state
 13type SagaState string
 14
 15const (
 16	SagaStatePending    SagaState = "pending"
 17	SagaStateRunning    SagaState = "running"
 18	SagaStateCompleted  SagaState = "completed"
 19	SagaStateFailed     SagaState = "failed"
 20	SagaStateRolledBack SagaState = "rolled_back"
 21)
 22
 23// Saga step
 24type SagaStep struct {
 25	Name       string
 26	Execute    func(context.Context, *SagaContext) error
 27	Compensate func(context.Context, *SagaContext) error
 28	MaxRetries int
 29}
 30
 31// Saga context
 32type SagaContext struct {
 33	Data map[string]interface{}
 34	mu   sync.RWMutex
 35}
 36
 37func NewSagaContext() *SagaContext {
 38	return &SagaContext{
 39		Data: make(map[string]interface{}),
 40	}
 41}
 42
 43func (c *SagaContext) Set(key string, value interface{}) {
 44	c.mu.Lock()
 45	defer c.mu.Unlock()
 46	c.Data[key] = value
 47}
 48
 49func (c *SagaContext) Get(key string) (interface{}, bool) {
 50	c.mu.RLock()
 51	defer c.mu.RUnlock()
 52	val, ok := c.Data[key]
 53	return val, ok
 54}
 55
 56// Saga instance
 57type Saga struct {
 58	ID             string
 59	Steps          []SagaStep
 60	Context        *SagaContext
 61	State          SagaState
 62	CompletedSteps int
 63	CurrentStep    int
 64	Error          error
 65	StartTime      time.Time
 66	CompletionTime time.Time
 67}
 68
 69func NewSaga(id string, steps []SagaStep) *Saga {
 70	return &Saga{
 71		ID:      id,
 72		Steps:   steps,
 73		Context: NewSagaContext(),
 74		State:   SagaStatePending,
 75	}
 76}
 77
 78// Saga orchestrator
 79type SagaOrchestrator struct {
 80	sagas map[string]*Saga
 81	mu    sync.RWMutex
 82}
 83
 84func NewSagaOrchestrator() *SagaOrchestrator {
 85	return &SagaOrchestrator{
 86		sagas: make(map[string]*Saga),
 87	}
 88}
 89
 90func (o *SagaOrchestrator) Execute(ctx context.Context, saga *Saga) error {
 91	o.mu.Lock()
 92	o.sagas[saga.ID] = saga
 93	o.mu.Unlock()
 94
 95	saga.State = SagaStateRunning
 96	saga.StartTime = time.Now()
 97
 98	fmt.Printf("Starting saga: %s\n", saga.ID)
 99
100	// Execute steps
101	for i, step := range saga.Steps {
102		saga.CurrentStep = i
103		fmt.Printf("  Executing step %d: %s\n", i+1, step.Name)
104
105		if err := o.executeStepWithRetry(ctx, saga, step); err != nil {
106			saga.Error = err
107			saga.State = SagaStateFailed
108			fmt.Printf("  ✗ Step failed: %v\n", err)
109
110			// Compensate
111			fmt.Println("  Starting compensation...")
112			if err := o.compensate(ctx, saga); err != nil {
113				fmt.Printf("  ✗ Compensation failed: %v\n", err)
114				return err
115			}
116
117			saga.State = SagaStateRolledBack
118			saga.CompletionTime = time.Now()
119			return err
120		}
121
122		saga.CompletedSteps++
123		fmt.Printf("  ✓ Step completed: %s\n", step.Name)
124	}
125
126	saga.State = SagaStateCompleted
127	saga.CompletionTime = time.Now()
128	fmt.Printf("✓ Saga completed: %s (duration: %v)\n",
129		saga.ID, saga.CompletionTime.Sub(saga.StartTime))
130
131	return nil
132}
133
134func (o *SagaOrchestrator) executeStepWithRetry(
135	ctx context.Context,
136	saga *Saga,
137	step SagaStep,
138) error {
139	var err error
140	for attempt := 0; attempt <= step.MaxRetries; attempt++ {
141		if attempt > 0 {
142			fmt.Printf("    Retry attempt %d/%d\n", attempt, step.MaxRetries)
143			time.Sleep(time.Duration(attempt) * time.Second)
144		}
145
146		err = step.Execute(ctx, saga.Context)
147		if err == nil {
148			return nil
149		}
150	}
151	return err
152}
153
154func (o *SagaOrchestrator) compensate(ctx context.Context, saga *Saga) error {
155	// Compensate in reverse order
156	for i := saga.CompletedSteps - 1; i >= 0; i-- {
157		step := saga.Steps[i]
158		fmt.Printf("  Compensating step: %s\n", step.Name)
159
160		if step.Compensate != nil {
161			if err := step.Compensate(ctx, saga.Context); err != nil {
162				return fmt.Errorf("compensation failed for %s: %w", step.Name, err)
163			}
164			fmt.Printf("  ✓ Compensated: %s\n", step.Name)
165		}
166	}
167	return nil
168}
169
170func (o *SagaOrchestrator) GetSaga(id string) *Saga {
171	o.mu.RLock()
172	defer o.mu.RUnlock()
173	return o.sagas[id]
174}
175
176// Simulated services
177type InventoryService struct {
178	stock map[string]int
179	mu    sync.Mutex
180}
181
182func NewInventoryService() *InventoryService {
183	return &InventoryService{
184		stock: map[string]int{
185			"product-1": 100,
186			"product-2": 50,
187		},
188	}
189}
190
191func (s *InventoryService) ReserveInventory(productID string, quantity int) error {
192	s.mu.Lock()
193	defer s.mu.Unlock()
194
195	available := s.stock[productID]
196	if available < quantity {
197		return fmt.Errorf("insufficient inventory: need %d, have %d", quantity, available)
198	}
199
200	s.stock[productID] -= quantity
201	return nil
202}
203
204func (s *InventoryService) ReleaseInventory(productID string, quantity int) {
205	s.mu.Lock()
206	defer s.mu.Unlock()
207	s.stock[productID] += quantity
208}
209
210type PaymentService struct {
211	successRate float64
212}
213
214func NewPaymentService(successRate float64) *PaymentService {
215	return &PaymentService{successRate: successRate}
216}
217
218func (s *PaymentService) ProcessPayment(orderID string, amount float64) (string, error) {
219	// Simulate payment processing
220	time.Sleep(100 * time.Millisecond)
221
222	// Simulate random failures
223	if time.Now().UnixNano()%100 > int64(s.successRate*100) {
224		return "", errors.New("payment processing failed")
225	}
226
227	paymentID := fmt.Sprintf("pay-%s", orderID)
228	return paymentID, nil
229}
230
231func (s *PaymentService) RefundPayment(paymentID string) error {
232	time.Sleep(50 * time.Millisecond)
233	return nil
234}
235
236type ShippingService struct{}
237
238func NewShippingService() *ShippingService {
239	return &ShippingService{}
240}
241
242func (s *ShippingService) ShipOrder(orderID string) (string, error) {
243	time.Sleep(100 * time.Millisecond)
244	trackingID := fmt.Sprintf("track-%s", orderID)
245	return trackingID, nil
246}
247
248func (s *ShippingService) CancelShipment(trackingID string) error {
249	time.Sleep(50 * time.Millisecond)
250	return nil
251}
252
253func main() {
254	fmt.Println("Saga Orchestration Pattern")
255	fmt.Println("===========================\n")
256
257	// Initialize services
258	inventory := NewInventoryService()
259	payment := NewPaymentService(0.8) // 80% success rate
260	shipping := NewShippingService()
261	orchestrator := NewSagaOrchestrator()
262
263	// Order details
264	orderID := "order-123"
265	productID := "product-1"
266	quantity := 5
267	amount := 99.99
268
269	// Define saga steps
270	steps := []SagaStep{
271		{
272			Name:       "Reserve Inventory",
273			MaxRetries: 2,
274			Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
275				if err := inventory.ReserveInventory(productID, quantity); err != nil {
276					return err
277				}
278				sagaCtx.Set("product_id", productID)
279				sagaCtx.Set("quantity", quantity)
280				return nil
281			},
282			Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
283				productID, _ := sagaCtx.Get("product_id")
284				quantity, _ := sagaCtx.Get("quantity")
285				inventory.ReleaseInventory(productID.(string), quantity.(int))
286				return nil
287			},
288		},
289		{
290			Name:       "Process Payment",
291			MaxRetries: 3,
292			Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
293				paymentID, err := payment.ProcessPayment(orderID, amount)
294				if err != nil {
295					return err
296				}
297				sagaCtx.Set("payment_id", paymentID)
298				return nil
299			},
300			Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
301				paymentID, ok := sagaCtx.Get("payment_id")
302				if !ok {
303					return nil
304				}
305				return payment.RefundPayment(paymentID.(string))
306			},
307		},
308		{
309			Name:       "Ship Order",
310			MaxRetries: 2,
311			Execute: func(ctx context.Context, sagaCtx *SagaContext) error {
312				trackingID, err := shipping.ShipOrder(orderID)
313				if err != nil {
314					return err
315				}
316				sagaCtx.Set("tracking_id", trackingID)
317				return nil
318			},
319			Compensate: func(ctx context.Context, sagaCtx *SagaContext) error {
320				trackingID, ok := sagaCtx.Get("tracking_id")
321				if !ok {
322					return nil
323				}
324				return shipping.CancelShipment(trackingID.(string))
325			},
326		},
327	}
328
329	// Create and execute saga
330	saga := NewSaga(orderID, steps)
331	ctx := context.Background()
332
333	if err := orchestrator.Execute(ctx, saga); err != nil {
334		fmt.Printf("\n✗ Saga failed: %v\n", err)
335		fmt.Printf("State: %s\n", saga.State)
336	} else {
337		fmt.Printf("\n✓ Order fulfilled successfully\n")
338
339		if trackingID, ok := saga.Context.Get("tracking_id"); ok {
340			fmt.Printf("Tracking ID: %s\n", trackingID)
341		}
342	}
343
344	// Display final saga state
345	fmt.Printf("\nSaga Summary:\n")
346	fmt.Printf("  ID: %s\n", saga.ID)
347	fmt.Printf("  State: %s\n", saga.State)
348	fmt.Printf("  Completed Steps: %d/%d\n", saga.CompletedSteps, len(saga.Steps))
349	fmt.Printf("  Duration: %v\n", saga.CompletionTime.Sub(saga.StartTime))
350}

Monitoring and Observability

Event-driven systems require specialized monitoring strategies to track event flows, detect failures, and ensure system health.

  1// run
  2package main
  3
  4import (
  5	"context"
  6	"fmt"
  7	"sync"
  8	"time"
  9)
 10
 11// Event metrics
 12type EventMetrics struct {
 13	mu                sync.RWMutex
 14	eventCounts       map[string]int64
 15	processingTimes   map[string][]time.Duration
 16	failureCounts     map[string]int64
 17	lastProcessedTime map[string]time.Time
 18}
 19
 20func NewEventMetrics() *EventMetrics {
 21	return &EventMetrics{
 22		eventCounts:       make(map[string]int64),
 23		processingTimes:   make(map[string][]time.Duration),
 24		failureCounts:     make(map[string]int64),
 25		lastProcessedTime: make(map[string]time.Time),
 26	}
 27}
 28
 29func (m *EventMetrics) RecordEvent(eventType string, duration time.Duration, success bool) {
 30	m.mu.Lock()
 31	defer m.mu.Unlock()
 32
 33	m.eventCounts[eventType]++
 34	m.processingTimes[eventType] = append(m.processingTimes[eventType], duration)
 35	m.lastProcessedTime[eventType] = time.Now()
 36
 37	if !success {
 38		m.failureCounts[eventType]++
 39	}
 40}
 41
 42func (m *EventMetrics) GetStats(eventType string) map[string]interface{} {
 43	m.mu.RLock()
 44	defer m.mu.RUnlock()
 45
 46	times := m.processingTimes[eventType]
 47	if len(times) == 0 {
 48		return map[string]interface{}{
 49			"count":         0,
 50			"failures":      0,
 51			"avg_time":      "0ms",
 52			"last_processed": "never",
 53		}
 54	}
 55
 56	var total time.Duration
 57	for _, d := range times {
 58		total += d
 59	}
 60	avg := total / time.Duration(len(times))
 61
 62	return map[string]interface{}{
 63		"count":          m.eventCounts[eventType],
 64		"failures":       m.failureCounts[eventType],
 65		"avg_time":       avg.String(),
 66		"last_processed": m.lastProcessedTime[eventType].Format(time.RFC3339),
 67	}
 68}
 69
 70// Monitored event handler wrapper
 71func WithMetrics(metrics *EventMetrics, handler EventHandler) EventHandler {
 72	return func(ctx context.Context, event Event) error {
 73		start := time.Now()
 74		err := handler(ctx, event)
 75		duration := time.Since(start)
 76
 77		metrics.RecordEvent(event.EventType(), duration, err == nil)
 78
 79		if err != nil {
 80			fmt.Printf("❌ Event processing failed: %s (took %v)\n", event.EventType(), duration)
 81		}
 82
 83		return err
 84	}
 85}
 86
 87// Health check for event processing
 88type HealthChecker struct {
 89	metrics *EventMetrics
 90}
 91
 92func NewHealthChecker(metrics *EventMetrics) *HealthChecker {
 93	return &HealthChecker{metrics: metrics}
 94}
 95
 96func (h *HealthChecker) CheckHealth() map[string]string {
 97	h.metrics.mu.RLock()
 98	defer h.metrics.mu.RUnlock()
 99
100	health := make(map[string]string)
101
102	for eventType, lastTime := range h.metrics.lastProcessedTime {
103		timeSince := time.Since(lastTime)
104
105		if timeSince > 5*time.Minute {
106			health[eventType] = "unhealthy: no events in 5 minutes"
107		} else {
108			failureRate := float64(h.metrics.failureCounts[eventType]) / float64(h.metrics.eventCounts[eventType])
109			if failureRate > 0.1 {
110				health[eventType] = fmt.Sprintf("degraded: %.1f%% failure rate", failureRate*100)
111			} else {
112				health[eventType] = "healthy"
113			}
114		}
115	}
116
117	return health
118}
119
120func main() {
121	fmt.Println("Event-Driven System Monitoring")
122	fmt.Println("===============================\n")
123
124	metrics := NewEventMetrics()
125	healthChecker := NewHealthChecker(metrics)
126
127	// Simulate event processing
128	eventTypes := []string{"OrderCreated", "OrderPaid", "OrderShipped"}
129
130	for i := 0; i < 100; i++ {
131		eventType := eventTypes[i%len(eventTypes)]
132
133		// Simulate processing
134		start := time.Now()
135		success := i%10 != 0 // 10% failure rate
136		time.Sleep(time.Duration(10+i%50) * time.Millisecond)
137		duration := time.Since(start)
138
139		metrics.RecordEvent(eventType, duration, success)
140	}
141
142	// Display metrics
143	fmt.Println("Event Processing Statistics:")
144	for _, eventType := range eventTypes {
145		stats := metrics.GetStats(eventType)
146		fmt.Printf("\n%s:\n", eventType)
147		for key, value := range stats {
148			fmt.Printf("  %s: %v\n", key, value)
149		}
150	}
151
152	// Health check
153	fmt.Println("\nSystem Health:")
154	health := healthChecker.CheckHealth()
155	for eventType, status := range health {
156		fmt.Printf("  %s: %s\n", eventType, status)
157	}
158}

Further Reading

Practice Exercises

Exercise 1: Event Sourced Banking System

🎯 Learning Objectives:

  • Understand event sourcing fundamentals and implementation patterns
  • Design domain events that capture business state changes
  • Build event stores with optimistic concurrency control
  • Implement aggregate rebuilding from event streams
  • Create business invariants through event validation

⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A financial services company needs a banking system with complete audit trails and regulatory compliance. Event sourcing provides immutable transaction history and enables temporal queries for business analysis and regulatory reporting.

Task: Build a comprehensive event-sourced banking system that demonstrates core event sourcing concepts while handling real-world banking scenarios and business rules.

Core Event Sourcing Components:

  • Domain Events: AccountOpened, MoneyDeposited, MoneyWithdrawn, AccountClosed, OverdraftOccurred
  • Event Store: Optimistic concurrency with version checking and persistence
  • Aggregate Root: Account entity with business logic and event application
  • Event Rebuilding: State reconstruction from event history
  • Snapshot Strategy: Performance optimization for long-lived aggregates

Business Rules to Implement:

  • Account opening validation and minimum balance requirements
  • Overdraft protection with configurable limits
  • Daily transaction limits and fraud detection
  • Account closure with balance validation
  • Interest calculation and fee application events
  • Account suspension and reactivation workflows

Advanced Features:

  • Event versioning and schema evolution
  • Temporal queries for account history at any point in time
  • Event replay for bug fixes and business logic updates
  • Integration with external systems through event publishing
  • Audit trail and compliance reporting capabilities
  • Performance monitoring and optimization strategies
Complete Solution
  1// run
  2package main
  3
  4import (
  5	"errors"
  6	"fmt"
  7	"sync"
  8	"time"
  9)
 10
 11// Event interface
 12type Event interface {
 13	EventType() string
 14	AggregateID() string
 15	EventVersion() int
 16	OccurredAt() time.Time
 17}
 18
 19// Base event
 20type BaseEvent struct {
 21	Type      string
 22	ID        string
 23	Version   int
 24	Timestamp time.Time
 25}
 26
 27func (e BaseEvent) EventType() string     { return e.Type }
 28func (e BaseEvent) AggregateID() string   { return e.ID }
 29func (e BaseEvent) EventVersion() int     { return e.Version }
 30func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
 31
 32// Account events
 33type AccountOpenedEvent struct {
 34	BaseEvent
 35	AccountHolder  string
 36	InitialBalance float64
 37}
 38
 39type MoneyDepositedEvent struct {
 40	BaseEvent
 41	Amount      float64
 42	Description string
 43}
 44
 45type MoneyWithdrawnEvent struct {
 46	BaseEvent
 47	Amount      float64
 48	Description string
 49}
 50
 51type AccountClosedEvent struct {
 52	BaseEvent
 53	Reason string
 54}
 55
 56// Account aggregate
 57type Account struct {
 58	ID            string
 59	AccountHolder string
 60	Balance       float64
 61	Status        string
 62	Version       int
 63	events        []Event
 64}
 65
 66// Create new account
 67func OpenAccount(id, holder string, initialBalance float64) (*Account, error) {
 68	if holder == "" {
 69		return nil, errors.New("account holder required")
 70	}
 71	if initialBalance < 0 {
 72		return nil, errors.New("initial balance cannot be negative")
 73	}
 74
 75	account := &Account{ID: id, Version: 0}
 76
 77	event := AccountOpenedEvent{
 78		BaseEvent: BaseEvent{
 79			Type:      "AccountOpened",
 80			ID:        id,
 81			Version:   1,
 82			Timestamp: time.Now(),
 83		},
 84		AccountHolder:  holder,
 85		InitialBalance: initialBalance,
 86	}
 87
 88	account.Apply(event)
 89	account.RecordEvent(event)
 90
 91	return account, nil
 92}
 93
 94// Deposit money
 95func (a *Account) Deposit(amount float64, description string) error {
 96	if a.Status != "active" {
 97		return errors.New("account is not active")
 98	}
 99	if amount <= 0 {
100		return errors.New("amount must be positive")
101	}
102
103	event := MoneyDepositedEvent{
104		BaseEvent: BaseEvent{
105			Type:      "MoneyDeposited",
106			ID:        a.ID,
107			Version:   a.Version + 1,
108			Timestamp: time.Now(),
109		},
110		Amount:      amount,
111		Description: description,
112	}
113
114	a.Apply(event)
115	a.RecordEvent(event)
116
117	return nil
118}
119
120// Withdraw money
121func (a *Account) Withdraw(amount float64, description string) error {
122	if a.Status != "active" {
123		return errors.New("account is not active")
124	}
125	if amount <= 0 {
126		return errors.New("amount must be positive")
127	}
128	if a.Balance < amount {
129		return errors.New("insufficient funds")
130	}
131
132	event := MoneyWithdrawnEvent{
133		BaseEvent: BaseEvent{
134			Type:      "MoneyWithdrawn",
135			ID:        a.ID,
136			Version:   a.Version + 1,
137			Timestamp: time.Now(),
138		},
139		Amount:      amount,
140		Description: description,
141	}
142
143	a.Apply(event)
144	a.RecordEvent(event)
145
146	return nil
147}
148
149// Close account
150func (a *Account) Close(reason string) error {
151	if a.Status != "active" {
152		return errors.New("account is not active")
153	}
154	if a.Balance > 0 {
155		return errors.New("cannot close account with positive balance")
156	}
157
158	event := AccountClosedEvent{
159		BaseEvent: BaseEvent{
160			Type:      "AccountClosed",
161			ID:        a.ID,
162			Version:   a.Version + 1,
163			Timestamp: time.Now(),
164		},
165		Reason: reason,
166	}
167
168	a.Apply(event)
169	a.RecordEvent(event)
170
171	return nil
172}
173
174// Apply event to update state
175func (a *Account) Apply(event Event) {
176	switch e := event.(type) {
177	case AccountOpenedEvent:
178		a.AccountHolder = e.AccountHolder
179		a.Balance = e.InitialBalance
180		a.Status = "active"
181		a.Version = e.Version
182
183	case MoneyDepositedEvent:
184		a.Balance += e.Amount
185		a.Version = e.Version
186
187	case MoneyWithdrawnEvent:
188		a.Balance -= e.Amount
189		a.Version = e.Version
190
191	case AccountClosedEvent:
192		a.Status = "closed"
193		a.Version = e.Version
194	}
195}
196
197func (a *Account) RecordEvent(event Event) {
198	a.events = append(a.events, event)
199}
200
201func (a *Account) GetUncommittedEvents() []Event {
202	return a.events
203}
204
205func (a *Account) ClearUncommittedEvents() {
206	a.events = nil
207}
208
209// Rebuild account from events
210func RebuildAccount(events []Event) *Account {
211	account := &Account{}
212	for _, event := range events {
213		account.Apply(event)
214	}
215	return account
216}
217
218// In-memory event store
219type InMemoryEventStore struct {
220	events map[string][]Event
221	mu     sync.RWMutex
222}
223
224func NewInMemoryEventStore() *InMemoryEventStore {
225	return &InMemoryEventStore{
226		events: make(map[string][]Event),
227	}
228}
229
230func (s *InMemoryEventStore) SaveEvents(aggregateID string, events []Event, expectedVersion int) error {
231	s.mu.Lock()
232	defer s.mu.Unlock()
233
234	currentEvents := s.events[aggregateID]
235	if len(currentEvents) != expectedVersion {
236		return errors.New("concurrency conflict")
237	}
238
239	s.events[aggregateID] = append(currentEvents, events...)
240	return nil
241}
242
243func (s *InMemoryEventStore) GetEvents(aggregateID string) []Event {
244	s.mu.RLock()
245	defer s.mu.RUnlock()
246	return s.events[aggregateID]
247}
248
249func main() {
250	fmt.Println("Event Sourcing: Banking Account")
251	fmt.Println("================================\n")
252
253	store := NewInMemoryEventStore()
254
255	// Open account
256	account, err := OpenAccount("acc-123", "John Doe", 1000.0)
257	if err != nil {
258		fmt.Printf("Error: %v\n", err)
259		return
260	}
261	fmt.Printf("✓ Account opened: %s (holder=%s, balance=$%.2f)\n",
262		account.ID, account.AccountHolder, account.Balance)
263
264	// Save events
265	store.SaveEvents(account.ID, account.GetUncommittedEvents(), 0)
266	account.ClearUncommittedEvents()
267
268	// Deposit
269	account.Deposit(500.0, "Salary")
270	fmt.Printf("✓ Deposited $500.00 (new balance=$%.2f)\n", account.Balance)
271
272	// Save events
273	store.SaveEvents(account.ID, account.GetUncommittedEvents(), account.Version-1)
274	account.ClearUncommittedEvents()
275
276	// Withdraw
277	account.Withdraw(200.0, "ATM withdrawal")
278	fmt.Printf("✓ Withdrew $200.00 (new balance=$%.2f)\n", account.Balance)
279
280	// Save events
281	store.SaveEvents(account.ID, account.GetUncommittedEvents(), account.Version-1)
282	account.ClearUncommittedEvents()
283
284	// Get all events for the account
285	events := store.GetEvents("acc-123")
286	fmt.Printf("\nEvent History (%d events):\n", len(events))
287	for i, evt := range events {
288		fmt.Printf("  %d. %s (v%d) at %s\n",
289			i+1, evt.EventType(), evt.EventVersion(),
290			evt.OccurredAt().Format("15:04:05"))
291	}
292
293	// Rebuild account from events
294	fmt.Println("\nRebuilding account from events...")
295	rebuilt := RebuildAccount(events)
296	fmt.Printf("✓ Rebuilt: %s, Balance: $%.2f, Status: %s, Version: %d\n",
297		rebuilt.AccountHolder, rebuilt.Balance, rebuilt.Status, rebuilt.Version)
298
299	// Test overdraft protection
300	fmt.Println("\nTesting overdraft protection...")
301	err = rebuilt.Withdraw(5000.0, "Large withdrawal")
302	if err != nil {
303		fmt.Printf("✗ Withdrawal blocked: %v\n", err)
304	}
305}

Exercise 2: CQRS Order Management System

🎯 Learning Objectives:

  • Design separate command and query models for different performance requirements
  • Implement event-driven projections for real-time read model updates
  • Create optimized read models for different query patterns
  • Understand eventual consistency and trade-offs in CQRS systems
  • Build scalable architectures with read/write separation

⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform experiences high read-to-write ratios during sales events. CQRS enables independent scaling of command and query sides while providing optimized read models for different customer journeys and business reporting needs.

Task: Build a comprehensive order management system using CQRS pattern that demonstrates read/write model separation, event-driven projections, and optimized query handling for different use cases.

Command Side Components:

  • Command Handlers: CreateOrder, UpdateOrder, CancelOrder, AddOrderItem, RemoveOrderItem
  • Aggregate Root: Order entity with business logic and state transitions
  • Event Publishing: Domain events for projection updates
  • Validation Framework: Business rules and command validation
  • Command Bus: Asynchronous command processing with retries

Query Side Components:

  • Read Models: OrderListView, OrderDetailsView, OrderStatistics, CustomerOrderHistory
  • Projections: Event handlers that update read models
  • Query Handlers: Optimized queries for different read models
  • Materialized Views: Pre-computed data for common queries
  • Caching Layer: Performance optimization for frequent queries

Advanced Features:

  • Event versioning and schema evolution for read models
  • Snapshot strategies for large datasets
  • Query optimization with indexing strategies
  • Real-time updates through websockets
  • Multi-tenant data isolation
  • Performance monitoring and query optimization
  • Integration with search engines for full-text search
  • Data consistency validation and repair mechanisms
Complete Solution
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"sync"
  9	"time"
 10)
 11
 12// Events
 13type Event interface {
 14	EventType() string
 15	AggregateID() string
 16}
 17
 18type OrderCreatedEvent struct {
 19	OrderID    string
 20	CustomerID string
 21	Items      []OrderItem
 22	Total      float64
 23	Timestamp  time.Time
 24}
 25
 26func (e OrderCreatedEvent) EventType() string   { return "OrderCreated" }
 27func (e OrderCreatedEvent) AggregateID() string { return e.OrderID }
 28
 29type OrderUpdatedEvent struct {
 30	OrderID   string
 31	Items     []OrderItem
 32	Total     float64
 33	Timestamp time.Time
 34}
 35
 36func (e OrderUpdatedEvent) EventType() string   { return "OrderUpdated" }
 37func (e OrderUpdatedEvent) AggregateID() string { return e.OrderID }
 38
 39type OrderCancelledEvent struct {
 40	OrderID   string
 41	Reason    string
 42	Timestamp time.Time
 43}
 44
 45func (e OrderCancelledEvent) EventType() string   { return "OrderCancelled" }
 46func (e OrderCancelledEvent) AggregateID() string { return e.OrderID }
 47
 48// Domain models
 49type OrderItem struct {
 50	ProductID string  `json:"product_id"`
 51	Quantity  int     `json:"quantity"`
 52	Price     float64 `json:"price"`
 53}
 54
 55type Order struct {
 56	ID         string
 57	CustomerID string
 58	Items      []OrderItem
 59	Total      float64
 60	Status     string
 61	CreatedAt  time.Time
 62	events     []Event
 63}
 64
 65// Commands
 66type CreateOrderCommand struct {
 67	OrderID    string
 68	CustomerID string
 69	Items      []OrderItem
 70}
 71
 72type UpdateOrderCommand struct {
 73	OrderID string
 74	Items   []OrderItem
 75}
 76
 77type CancelOrderCommand struct {
 78	OrderID string
 79	Reason  string
 80}
 81
 82// Command Handler
 83type OrderCommandHandler struct {
 84	eventStore EventStore
 85	eventBus   EventBus
 86}
 87
 88func NewOrderCommandHandler(store EventStore, bus EventBus) *OrderCommandHandler {
 89	return &OrderCommandHandler{
 90		eventStore: store,
 91		eventBus:   bus,
 92	}
 93}
 94
 95func (h *OrderCommandHandler) CreateOrder(cmd CreateOrderCommand) error {
 96	// Calculate total
 97	var total float64
 98	for _, item := range cmd.Items {
 99		total += item.Price * float64(item.Quantity)
100	}
101
102	// Create event
103	event := OrderCreatedEvent{
104		OrderID:    cmd.OrderID,
105		CustomerID: cmd.CustomerID,
106		Items:      cmd.Items,
107		Total:      total,
108		Timestamp:  time.Now(),
109	}
110
111	// Save to event store
112	if err := h.eventStore.Save(event); err != nil {
113		return err
114	}
115
116	// Publish to event bus
117	h.eventBus.Publish(event)
118
119	return nil
120}
121
122func (h *OrderCommandHandler) UpdateOrder(cmd UpdateOrderCommand) error {
123	var total float64
124	for _, item := range cmd.Items {
125		total += item.Price * float64(item.Quantity)
126	}
127
128	event := OrderUpdatedEvent{
129		OrderID:   cmd.OrderID,
130		Items:     cmd.Items,
131		Total:     total,
132		Timestamp: time.Now(),
133	}
134
135	if err := h.eventStore.Save(event); err != nil {
136		return err
137	}
138
139	h.eventBus.Publish(event)
140	return nil
141}
142
143func (h *OrderCommandHandler) CancelOrder(cmd CancelOrderCommand) error {
144	event := OrderCancelledEvent{
145		OrderID:   cmd.OrderID,
146		Reason:    cmd.Reason,
147		Timestamp: time.Now(),
148	}
149
150	if err := h.eventStore.Save(event); err != nil {
151		return err
152	}
153
154	h.eventBus.Publish(event)
155	return nil
156}
157
158// Read Models
159type OrderListView struct {
160	OrderID    string    `json:"order_id"`
161	CustomerID string    `json:"customer_id"`
162	Total      float64   `json:"total"`
163	Status     string    `json:"status"`
164	CreatedAt  time.Time `json:"created_at"`
165}
166
167type OrderDetailsView struct {
168	OrderID    string      `json:"order_id"`
169	CustomerID string      `json:"customer_id"`
170	Items      []OrderItem `json:"items"`
171	Total      float64     `json:"total"`
172	Status     string      `json:"status"`
173	CreatedAt  time.Time   `json:"created_at"`
174	UpdatedAt  time.Time   `json:"updated_at"`
175}
176
177type OrderStatistics struct {
178	TotalOrders     int     `json:"total_orders"`
179	ActiveOrders    int     `json:"active_orders"`
180	CancelledOrders int     `json:"cancelled_orders"`
181	TotalRevenue    float64 `json:"total_revenue"`
182}
183
184// Query Handler
185type OrderQueryHandler struct {
186	listStore   map[string]*OrderListView
187	detailStore map[string]*OrderDetailsView
188	stats       *OrderStatistics
189	mu          sync.RWMutex
190}
191
192func NewOrderQueryHandler() *OrderQueryHandler {
193	return &OrderQueryHandler{
194		listStore:   make(map[string]*OrderListView),
195		detailStore: make(map[string]*OrderDetailsView),
196		stats:       &OrderStatistics{},
197	}
198}
199
200func (q *OrderQueryHandler) GetOrderList() []*OrderListView {
201	q.mu.RLock()
202	defer q.mu.RUnlock()
203
204	list := make([]*OrderListView, 0, len(q.listStore))
205	for _, view := range q.listStore {
206		list = append(list, view)
207	}
208	return list
209}
210
211func (q *OrderQueryHandler) GetOrderDetails(orderID string) *OrderDetailsView {
212	q.mu.RLock()
213	defer q.mu.RUnlock()
214	return q.detailStore[orderID]
215}
216
217func (q *OrderQueryHandler) GetStatistics() *OrderStatistics {
218	q.mu.RLock()
219	defer q.mu.RUnlock()
220	return q.stats
221}
222
223// Projection
224type OrderProjection struct {
225	queryHandler *OrderQueryHandler
226}
227
228func NewOrderProjection(handler *OrderQueryHandler) *OrderProjection {
229	return &OrderProjection{queryHandler: handler}
230}
231
232func (p *OrderProjection) HandleEvent(event Event) {
233	p.queryHandler.mu.Lock()
234	defer p.queryHandler.mu.Unlock()
235
236	switch e := event.(type) {
237	case OrderCreatedEvent:
238		// Update list view
239		p.queryHandler.listStore[e.OrderID] = &OrderListView{
240			OrderID:    e.OrderID,
241			CustomerID: e.CustomerID,
242			Total:      e.Total,
243			Status:     "active",
244			CreatedAt:  e.Timestamp,
245		}
246
247		// Update details view
248		p.queryHandler.detailStore[e.OrderID] = &OrderDetailsView{
249			OrderID:    e.OrderID,
250			CustomerID: e.CustomerID,
251			Items:      e.Items,
252			Total:      e.Total,
253			Status:     "active",
254			CreatedAt:  e.Timestamp,
255			UpdatedAt:  e.Timestamp,
256		}
257
258		// Update statistics
259		p.queryHandler.stats.TotalOrders++
260		p.queryHandler.stats.ActiveOrders++
261		p.queryHandler.stats.TotalRevenue += e.Total
262
263	case OrderUpdatedEvent:
264		if details, ok := p.queryHandler.detailStore[e.OrderID]; ok {
265			oldTotal := details.Total
266			details.Items = e.Items
267			details.Total = e.Total
268			details.UpdatedAt = e.Timestamp
269
270			// Update list view
271			if list, ok := p.queryHandler.listStore[e.OrderID]; ok {
272				list.Total = e.Total
273			}
274
275			// Update statistics
276			p.queryHandler.stats.TotalRevenue += (e.Total - oldTotal)
277		}
278
279	case OrderCancelledEvent:
280		if details, ok := p.queryHandler.detailStore[e.OrderID]; ok {
281			details.Status = "cancelled"
282			details.UpdatedAt = e.Timestamp
283
284			// Update list view
285			if list, ok := p.queryHandler.listStore[e.OrderID]; ok {
286				list.Status = "cancelled"
287			}
288
289			// Update statistics
290			p.queryHandler.stats.ActiveOrders--
291			p.queryHandler.stats.CancelledOrders++
292			p.queryHandler.stats.TotalRevenue -= details.Total
293		}
294	}
295}
296
297// Infrastructure
298type EventStore interface {
299	Save(event Event) error
300	GetAll() []Event
301}
302
303type InMemoryEventStore struct {
304	events []Event
305	mu     sync.Mutex
306}
307
308func NewInMemoryEventStore() *InMemoryEventStore {
309	return &InMemoryEventStore{events: make([]Event, 0)}
310}
311
312func (s *InMemoryEventStore) Save(event Event) error {
313	s.mu.Lock()
314	defer s.mu.Unlock()
315	s.events = append(s.events, event)
316	return nil
317}
318
319func (s *InMemoryEventStore) GetAll() []Event {
320	s.mu.Lock()
321	defer s.mu.Unlock()
322	return s.events
323}
324
325type EventBus interface {
326	Publish(event Event)
327	Subscribe(handler func(Event))
328}
329
330type InMemoryEventBus struct {
331	handlers []func(Event)
332	mu       sync.RWMutex
333}
334
335func NewInMemoryEventBus() *InMemoryEventBus {
336	return &InMemoryEventBus{handlers: make([]func(Event), 0)}
337}
338
339func (b *InMemoryEventBus) Publish(event Event) {
340	b.mu.RLock()
341	defer b.mu.RUnlock()
342
343	for _, handler := range b.handlers {
344		go handler(event)
345	}
346}
347
348func (b *InMemoryEventBus) Subscribe(handler func(Event)) {
349	b.mu.Lock()
350	defer b.mu.Unlock()
351	b.handlers = append(b.handlers, handler)
352}
353
354func main() {
355	fmt.Println("CQRS Pattern: Order Management")
356	fmt.Println("===============================\n")
357
358	// Setup
359	eventStore := NewInMemoryEventStore()
360	eventBus := NewInMemoryEventBus()
361	commandHandler := NewOrderCommandHandler(eventStore, eventBus)
362	queryHandler := NewOrderQueryHandler()
363	projection := NewOrderProjection(queryHandler)
364
365	// Subscribe projection to events
366	eventBus.Subscribe(projection.HandleEvent)
367
368	// Command: Create orders
369	fmt.Println("Creating orders...")
370	commandHandler.CreateOrder(CreateOrderCommand{
371		OrderID:    "order-1",
372		CustomerID: "customer-1",
373		Items: []OrderItem{
374			{ProductID: "prod-1", Quantity: 2, Price: 29.99},
375			{ProductID: "prod-2", Quantity: 1, Price: 49.99},
376		},
377	})
378
379	commandHandler.CreateOrder(CreateOrderCommand{
380		OrderID:    "order-2",
381		CustomerID: "customer-2",
382		Items: []OrderItem{
383			{ProductID: "prod-3", Quantity: 1, Price: 99.99},
384		},
385	})
386
387	time.Sleep(100 * time.Millisecond) // Wait for async processing
388
389	// Query: List orders
390	fmt.Println("\nOrder List:")
391	for _, order := range queryHandler.GetOrderList() {
392		fmt.Printf("  %s: Customer=%s, Total=$%.2f, Status=%s\n",
393			order.OrderID, order.CustomerID, order.Total, order.Status)
394	}
395
396	// Query: Order details
397	fmt.Println("\nOrder Details:")
398	details := queryHandler.GetOrderDetails("order-1")
399	if details != nil {
400		data, _ := json.MarshalIndent(details, "", "  ")
401		fmt.Println(string(data))
402	}
403
404	// Command: Update order
405	fmt.Println("\nUpdating order-1...")
406	commandHandler.UpdateOrder(UpdateOrderCommand{
407		OrderID: "order-1",
408		Items: []OrderItem{
409			{ProductID: "prod-1", Quantity: 3, Price: 29.99},
410		},
411	})
412
413	time.Sleep(100 * time.Millisecond)
414
415	// Command: Cancel order
416	fmt.Println("\nCancelling order-2...")
417	commandHandler.CancelOrder(CancelOrderCommand{
418		OrderID: "order-2",
419		Reason:  "Customer request",
420	})
421
422	time.Sleep(100 * time.Millisecond)
423
424	// Query: Statistics
425	fmt.Println("\nOrder Statistics:")
426	stats := queryHandler.GetStatistics()
427	data, _ := json.MarshalIndent(stats, "", "  ")
428	fmt.Println(string(data))
429
430	// Show event stream
431	fmt.Println("\nEvent Stream:")
432	for i, event := range eventStore.GetAll() {
433		fmt.Printf("  %d. %s (aggregate: %s)\n", i+1, event.EventType(), event.AggregateID())
434	}
435}

Exercise 3: Distributed Saga for Order Fulfillment

🎯 Learning Objectives:

  • Implement distributed transaction management using saga pattern
  • Design compensation actions for complex business workflows
  • Build saga orchestrators with state persistence and recovery
  • Handle timeouts, retries, and failure scenarios gracefully
  • Coordinate multiple services in distributed systems

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform needs to coordinate order fulfillment across inventory, payment, shipping, and notification services. Distributed sagas ensure data consistency across services without using distributed transactions, which don't scale well in microservices architectures.

Task: Create a comprehensive saga orchestration system for order fulfillment that coordinates multiple services while ensuring data consistency and providing reliable failure recovery mechanisms.

Core Saga Components:

  • Saga Orchestrator: Central coordinator managing workflow state and execution
  • Saga Steps: Inventory reservation, payment processing, order shipping, notification delivery
  • Compensation Actions: Inventory release, payment refund, shipment cancellation
  • State Persistence: Reliable saga state storage with crash recovery
  • Timeout Management: Automatic timeout handling for long-running operations

Orchestration Workflow:

  1. Order Validation: Validate order data and check customer eligibility
  2. Inventory Reservation: Reserve products and lock stock levels
  3. Payment Processing: Charge customer payment method
  4. Order Creation: Create official order record with tracking
  5. Shipping Arrangement: Schedule shipment and generate tracking numbers
  6. Notification Dispatch: Send order confirmation and tracking updates

Failure Scenarios and Recovery:

  • Payment gateway timeouts and retry logic
  • Inventory out-of-stock after reservation
  • Shipping service unavailability
  • Notification system failures
  • Network partitions and service unavailability
  • Database connection issues and consistency problems

Advanced Patterns:

  • Event-driven saga choreography vs orchestration
  • Saga monitoring and observability dashboards
  • Manual intervention for failed sagas
  • Performance optimization with parallel execution
  • Integration with external monitoring systems
  • Saga analytics and business intelligence
Complete Solution

See the saga implementation in the main article content above (already included in the Production Patterns section).

Exercise 4: Event Store with Replay Capabilities

🎯 Learning Objectives:

  • Build production-ready event store with persistence
  • Implement event replay for debugging and migration
  • Create event versioning strategies
  • Handle concurrent writes with optimistic locking
  • Design efficient event streaming

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: Financial systems require complete audit trails with ability to replay events for regulatory compliance and bug investigation.

Task: Implement a robust event store with PostgreSQL backend that supports event replay, versioning, and concurrent access.

Requirements:

  • Append-only event log with versioning
  • Optimistic concurrency control
  • Event replay by aggregate or time range
  • Snapshot support for performance
  • Event stream pagination
Complete Solution

See the PostgreSQL Event Store implementation in the main article content above.

Exercise 5: Multi-View CQRS with Real-Time Updates

🎯 Learning Objectives:

  • Create multiple optimized read models
  • Implement real-time projection updates
  • Design efficient query patterns
  • Handle eventual consistency
  • Build scalable read architectures

⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: E-commerce platforms need different views for customers, administrators, and analytics - each optimized for specific queries.

Task: Build a CQRS system with multiple read models (list view, detail view, analytics view) that update in real-time as events occur.

Requirements:

  • Multiple specialized read models
  • Event-driven projection updates
  • Query optimization for each view
  • Real-time synchronization
  • Performance monitoring
Complete Solution

See the Multiple Read Models implementation in the main article content above.

Summary

Key Takeaways

  1. Event Sourcing

    • Store all state changes as events
    • Enable time travel and audit trails
    • Rebuild state by replaying events
    • Use snapshots for performance
  2. CQRS

    • Separate write and read models
    • Optimize each for its purpose
    • Use projections to build read models
    • Handle eventual consistency
  3. Event Store

    • Append-only event log
    • Optimistic concurrency control
    • Support snapshots
    • Efficient event retrieval
  4. Event Bus

    • Publish events to subscribers
    • Support multiple handlers
    • Handle failures gracefully
    • Consider ordering guarantees
  5. Production Readiness

    • Implement idempotency
    • Support event replay
    • Monitor projections
    • Handle schema evolution

Next Steps:

  • Explore advanced testing techniques for event-driven systems
  • Learn about chaos engineering for distributed systems
  • Study production monitoring and observability
  • Investigate cloud-native event streaming platforms

You've mastered event-driven architecture in Go - you're now ready to build scalable, resilient distributed systems!