Exercise: Saga Orchestrator
Difficulty - Advanced
Learning Objectives
- Understand saga pattern for distributed transactions
- Implement orchestration-based sagas
- Handle compensation and rollback
- Practice failure recovery
- Build idempotent operations
Problem Statement
Create a saga orchestrator for an e-commerce order processing system that coordinates multiple services.
Implementation
1package saga
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8)
9
10// SagaStatus represents the current state of a saga
11type SagaStatus string
12
13const (
14 StatusPending SagaStatus = "pending"
15 StatusInProgress SagaStatus = "in_progress"
16 StatusCompleted SagaStatus = "completed"
17 StatusFailed SagaStatus = "failed"
18 StatusCompensating SagaStatus = "compensating"
19 StatusCompensated SagaStatus = "compensated"
20)
21
22// Step represents a single step in a saga
23type Step struct {
24 Name string
25 Action func(ctx context.Context, data map[string]interface{}) error
26 Compensation func(ctx context.Context, data map[string]interface{}) error
27}
28
29// Saga coordinates a distributed transaction
30type Saga struct {
31 ID string
32 Steps []Step
33 Status SagaStatus
34 CurrentStep int
35 Data map[string]interface{}
36 CompletedSteps []string
37 mu sync.Mutex
38}
39
40// NewSaga creates a new saga
41func NewSaga(id string) *Saga {
42 return &Saga{
43 ID: id,
44 Steps: make([]Step, 0),
45 Status: StatusPending,
46 Data: make(map[string]interface{}),
47 CompletedSteps: make([]string, 0),
48 }
49}
50
51// AddStep adds a step to the saga
52func AddStep(step Step) {
53 s.Steps = append(s.Steps, step)
54}
55
56// Execute runs the saga
57func Execute(ctx context.Context) error {
58 s.mu.Lock()
59 s.Status = StatusInProgress
60 s.mu.Unlock()
61
62 for i, step := range s.Steps {
63 s.mu.Lock()
64 s.CurrentStep = i
65 s.mu.Unlock()
66
67 // Execute step
68 if err := step.Action(ctx, s.Data); err != nil {
69 // Step failed - compensate
70 s.mu.Lock()
71 s.Status = StatusCompensating
72 s.mu.Unlock()
73
74 if compErr := s.compensate(ctx); compErr != nil {
75 s.mu.Lock()
76 s.Status = StatusFailed
77 s.mu.Unlock()
78 return fmt.Errorf("saga failed and compensation failed: %w", compErr)
79 }
80
81 s.mu.Lock()
82 s.Status = StatusCompensated
83 s.mu.Unlock()
84 return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
85 }
86
87 // Mark step as completed
88 s.mu.Lock()
89 s.CompletedSteps = append(s.CompletedSteps, step.Name)
90 s.mu.Unlock()
91 }
92
93 s.mu.Lock()
94 s.Status = StatusCompleted
95 s.mu.Unlock()
96
97 return nil
98}
99
100// compensate executes compensation for completed steps in reverse order
101func compensate(ctx context.Context) error {
102 // Compensate in reverse order
103 for i := len(s.CompletedSteps) - 1; i >= 0; i-- {
104 stepName := s.CompletedSteps[i]
105
106 // Find the step
107 var step *Step
108 for j := range s.Steps {
109 if s.Steps[j].Name == stepName {
110 step = &s.Steps[j]
111 break
112 }
113 }
114
115 if step == nil || step.Compensation == nil {
116 continue
117 }
118
119 // Execute compensation
120 if err := step.Compensation(ctx, s.Data); err != nil {
121 return fmt.Errorf("compensation failed for step %s: %w", stepName, err)
122 }
123 }
124
125 return nil
126}
127
128// Orchestrator manages multiple sagas
129type Orchestrator struct {
130 mu sync.RWMutex
131 sagas map[string]*Saga
132}
133
134func NewOrchestrator() *Orchestrator {
135 return &Orchestrator{
136 sagas: make(map[string]*Saga),
137 }
138}
139
140// CreateSaga creates and registers a new saga
141func CreateSaga(id string) *Saga {
142 o.mu.Lock()
143 defer o.mu.Unlock()
144
145 saga := NewSaga(id)
146 o.sagas[id] = saga
147
148 return saga
149}
150
151// ExecuteSaga runs a saga
152func ExecuteSaga(ctx context.Context, id string) error {
153 o.mu.RLock()
154 saga, exists := o.sagas[id]
155 o.mu.RUnlock()
156
157 if !exists {
158 return fmt.Errorf("saga not found: %s", id)
159 }
160
161 return saga.Execute(ctx)
162}
163
164// GetSaga retrieves a saga
165func GetSaga(id string) {
166 o.mu.RLock()
167 defer o.mu.RUnlock()
168
169 saga, exists := o.sagas[id]
170 if !exists {
171 return nil, fmt.Errorf("saga not found: %s", id)
172 }
173
174 return saga, nil
175}
176
177// Example: Order Processing Saga
178type OrderService struct {
179 orders map[string]string // orderID -> status
180 mu sync.Mutex
181}
182
183func NewOrderService() *OrderService {
184 return &OrderService{
185 orders: make(map[string]string),
186 }
187}
188
189func ReserveInventory(ctx context.Context, data map[string]interface{}) error {
190 orderID := data["order_id"].(string)
191 productID := data["product_id"].(string)
192 quantity := data["quantity"].(int)
193
194 fmt.Printf("Reserving inventory: order=%s, product=%s, qty=%d\n", orderID, productID, quantity)
195
196 // Simulate inventory check
197 if quantity > 100 {
198 return fmt.Errorf("insufficient inventory")
199 }
200
201 os.mu.Lock()
202 os.orders[orderID] = "inventory_reserved"
203 os.mu.Unlock()
204
205 return nil
206}
207
208func ReleaseInventory(ctx context.Context, data map[string]interface{}) error {
209 orderID := data["order_id"].(string)
210 fmt.Printf("Releasing inventory for order: %s\n", orderID)
211
212 os.mu.Lock()
213 os.orders[orderID] = "inventory_released"
214 os.mu.Unlock()
215
216 return nil
217}
218
219func ProcessPayment(ctx context.Context, data map[string]interface{}) error {
220 orderID := data["order_id"].(string)
221 amount := data["amount"].(float64)
222
223 fmt.Printf("Processing payment: order=%s, amount=%.2f\n", orderID, amount)
224
225 // Simulate payment processing
226 if amount > 1000 {
227 return fmt.Errorf("payment declined")
228 }
229
230 os.mu.Lock()
231 os.orders[orderID] = "payment_processed"
232 os.mu.Unlock()
233
234 return nil
235}
236
237func RefundPayment(ctx context.Context, data map[string]interface{}) error {
238 orderID := data["order_id"].(string)
239 fmt.Printf("Refunding payment for order: %s\n", orderID)
240
241 os.mu.Lock()
242 os.orders[orderID] = "payment_refunded"
243 os.mu.Unlock()
244
245 return nil
246}
247
248func ArrangeShipping(ctx context.Context, data map[string]interface{}) error {
249 orderID := data["order_id"].(string)
250 address := data["address"].(string)
251
252 fmt.Printf("Arranging shipping: order=%s, address=%s\n", orderID, address)
253
254 os.mu.Lock()
255 os.orders[orderID] = "shipping_arranged"
256 os.mu.Unlock()
257
258 return nil
259}
260
261func CancelShipping(ctx context.Context, data map[string]interface{}) error {
262 orderID := data["order_id"].(string)
263 fmt.Printf("Canceling shipping for order: %s\n", orderID)
264
265 os.mu.Lock()
266 os.orders[orderID] = "shipping_canceled"
267 os.mu.Unlock()
268
269 return nil
270}
271
272// Example usage
273func Example() {
274 orchestrator := NewOrchestrator()
275 orderService := NewOrderService()
276
277 // Create order saga
278 saga := orchestrator.CreateSaga("order-123")
279
280 // Add steps
281 saga.AddStep(Step{
282 Name: "reserve_inventory",
283 Action: orderService.ReserveInventory,
284 Compensation: orderService.ReleaseInventory,
285 })
286
287 saga.AddStep(Step{
288 Name: "process_payment",
289 Action: orderService.ProcessPayment,
290 Compensation: orderService.RefundPayment,
291 })
292
293 saga.AddStep(Step{
294 Name: "arrange_shipping",
295 Action: orderService.ArrangeShipping,
296 Compensation: orderService.CancelShipping,
297 })
298
299 // Set saga data
300 saga.Data = map[string]interface{}{
301 "order_id": "order-123",
302 "product_id": "prod-456",
303 "quantity": 5,
304 "amount": 99.99,
305 "address": "123 Main St",
306 }
307
308 // Execute saga
309 ctx := context.Background()
310 if err := orchestrator.ExecuteSaga(ctx, "order-123"); err != nil {
311 fmt.Printf("Saga failed: %v\n", err)
312 } else {
313 fmt.Println("Saga completed successfully")
314 }
315
316 // Check saga status
317 if completedSaga, err := orchestrator.GetSaga("order-123"); err == nil {
318 fmt.Printf("Status: %s\n", completedSaga.Status)
319 }
320}
Key Takeaways
- Orchestration: Central coordinator manages saga execution
- Compensation: Always provide compensation for each step
- Idempotency: Steps should be safely re-runnable
- Failure Handling: Compensate in reverse order on failure
- State Tracking: Maintain clear saga state and progress
Related Topics
- Distributed Systems - Distributed patterns
- Microservices - Microservice architecture
- Event-Driven - Event patterns