Saga Orchestrator

Exercise: Saga Orchestrator

Difficulty - Advanced

Learning Objectives

  • Understand saga pattern for distributed transactions
  • Implement orchestration-based sagas
  • Handle compensation and rollback
  • Practice failure recovery
  • Build idempotent operations

Problem Statement

Create a saga orchestrator for an e-commerce order processing system that coordinates multiple services.

Implementation

  1package saga
  2
  3import (
  4	"context"
  5	"fmt"
  6	"sync"
  7	"time"
  8)
  9
 10// SagaStatus represents the current state of a saga
 11type SagaStatus string
 12
 13const (
 14	StatusPending     SagaStatus = "pending"
 15	StatusInProgress  SagaStatus = "in_progress"
 16	StatusCompleted   SagaStatus = "completed"
 17	StatusFailed      SagaStatus = "failed"
 18	StatusCompensating SagaStatus = "compensating"
 19	StatusCompensated SagaStatus = "compensated"
 20)
 21
 22// Step represents a single step in a saga
 23type Step struct {
 24	Name        string
 25	Action      func(ctx context.Context, data map[string]interface{}) error
 26	Compensation func(ctx context.Context, data map[string]interface{}) error
 27}
 28
 29// Saga coordinates a distributed transaction
 30type Saga struct {
 31	ID              string
 32	Steps           []Step
 33	Status          SagaStatus
 34	CurrentStep     int
 35	Data            map[string]interface{}
 36	CompletedSteps  []string
 37	mu              sync.Mutex
 38}
 39
 40// NewSaga creates a new saga
 41func NewSaga(id string) *Saga {
 42	return &Saga{
 43		ID:             id,
 44		Steps:          make([]Step, 0),
 45		Status:         StatusPending,
 46		Data:           make(map[string]interface{}),
 47		CompletedSteps: make([]string, 0),
 48	}
 49}
 50
 51// AddStep adds a step to the saga
 52func AddStep(step Step) {
 53	s.Steps = append(s.Steps, step)
 54}
 55
 56// Execute runs the saga
 57func Execute(ctx context.Context) error {
 58	s.mu.Lock()
 59	s.Status = StatusInProgress
 60	s.mu.Unlock()
 61
 62	for i, step := range s.Steps {
 63		s.mu.Lock()
 64		s.CurrentStep = i
 65		s.mu.Unlock()
 66
 67		// Execute step
 68		if err := step.Action(ctx, s.Data); err != nil {
 69			// Step failed - compensate
 70			s.mu.Lock()
 71			s.Status = StatusCompensating
 72			s.mu.Unlock()
 73
 74			if compErr := s.compensate(ctx); compErr != nil {
 75				s.mu.Lock()
 76				s.Status = StatusFailed
 77				s.mu.Unlock()
 78				return fmt.Errorf("saga failed and compensation failed: %w", compErr)
 79			}
 80
 81			s.mu.Lock()
 82			s.Status = StatusCompensated
 83			s.mu.Unlock()
 84			return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
 85		}
 86
 87		// Mark step as completed
 88		s.mu.Lock()
 89		s.CompletedSteps = append(s.CompletedSteps, step.Name)
 90		s.mu.Unlock()
 91	}
 92
 93	s.mu.Lock()
 94	s.Status = StatusCompleted
 95	s.mu.Unlock()
 96
 97	return nil
 98}
 99
100// compensate executes compensation for completed steps in reverse order
101func compensate(ctx context.Context) error {
102	// Compensate in reverse order
103	for i := len(s.CompletedSteps) - 1; i >= 0; i-- {
104		stepName := s.CompletedSteps[i]
105
106		// Find the step
107		var step *Step
108		for j := range s.Steps {
109			if s.Steps[j].Name == stepName {
110				step = &s.Steps[j]
111				break
112			}
113		}
114
115		if step == nil || step.Compensation == nil {
116			continue
117		}
118
119		// Execute compensation
120		if err := step.Compensation(ctx, s.Data); err != nil {
121			return fmt.Errorf("compensation failed for step %s: %w", stepName, err)
122		}
123	}
124
125	return nil
126}
127
128// Orchestrator manages multiple sagas
129type Orchestrator struct {
130	mu    sync.RWMutex
131	sagas map[string]*Saga
132}
133
134func NewOrchestrator() *Orchestrator {
135	return &Orchestrator{
136		sagas: make(map[string]*Saga),
137	}
138}
139
140// CreateSaga creates and registers a new saga
141func CreateSaga(id string) *Saga {
142	o.mu.Lock()
143	defer o.mu.Unlock()
144
145	saga := NewSaga(id)
146	o.sagas[id] = saga
147
148	return saga
149}
150
151// ExecuteSaga runs a saga
152func ExecuteSaga(ctx context.Context, id string) error {
153	o.mu.RLock()
154	saga, exists := o.sagas[id]
155	o.mu.RUnlock()
156
157	if !exists {
158		return fmt.Errorf("saga not found: %s", id)
159	}
160
161	return saga.Execute(ctx)
162}
163
164// GetSaga retrieves a saga
165func GetSaga(id string) {
166	o.mu.RLock()
167	defer o.mu.RUnlock()
168
169	saga, exists := o.sagas[id]
170	if !exists {
171		return nil, fmt.Errorf("saga not found: %s", id)
172	}
173
174	return saga, nil
175}
176
177// Example: Order Processing Saga
178type OrderService struct {
179	orders map[string]string // orderID -> status
180	mu     sync.Mutex
181}
182
183func NewOrderService() *OrderService {
184	return &OrderService{
185		orders: make(map[string]string),
186	}
187}
188
189func ReserveInventory(ctx context.Context, data map[string]interface{}) error {
190	orderID := data["order_id"].(string)
191	productID := data["product_id"].(string)
192	quantity := data["quantity"].(int)
193
194	fmt.Printf("Reserving inventory: order=%s, product=%s, qty=%d\n", orderID, productID, quantity)
195
196	// Simulate inventory check
197	if quantity > 100 {
198		return fmt.Errorf("insufficient inventory")
199	}
200
201	os.mu.Lock()
202	os.orders[orderID] = "inventory_reserved"
203	os.mu.Unlock()
204
205	return nil
206}
207
208func ReleaseInventory(ctx context.Context, data map[string]interface{}) error {
209	orderID := data["order_id"].(string)
210	fmt.Printf("Releasing inventory for order: %s\n", orderID)
211
212	os.mu.Lock()
213	os.orders[orderID] = "inventory_released"
214	os.mu.Unlock()
215
216	return nil
217}
218
219func ProcessPayment(ctx context.Context, data map[string]interface{}) error {
220	orderID := data["order_id"].(string)
221	amount := data["amount"].(float64)
222
223	fmt.Printf("Processing payment: order=%s, amount=%.2f\n", orderID, amount)
224
225	// Simulate payment processing
226	if amount > 1000 {
227		return fmt.Errorf("payment declined")
228	}
229
230	os.mu.Lock()
231	os.orders[orderID] = "payment_processed"
232	os.mu.Unlock()
233
234	return nil
235}
236
237func RefundPayment(ctx context.Context, data map[string]interface{}) error {
238	orderID := data["order_id"].(string)
239	fmt.Printf("Refunding payment for order: %s\n", orderID)
240
241	os.mu.Lock()
242	os.orders[orderID] = "payment_refunded"
243	os.mu.Unlock()
244
245	return nil
246}
247
248func ArrangeShipping(ctx context.Context, data map[string]interface{}) error {
249	orderID := data["order_id"].(string)
250	address := data["address"].(string)
251
252	fmt.Printf("Arranging shipping: order=%s, address=%s\n", orderID, address)
253
254	os.mu.Lock()
255	os.orders[orderID] = "shipping_arranged"
256	os.mu.Unlock()
257
258	return nil
259}
260
261func CancelShipping(ctx context.Context, data map[string]interface{}) error {
262	orderID := data["order_id"].(string)
263	fmt.Printf("Canceling shipping for order: %s\n", orderID)
264
265	os.mu.Lock()
266	os.orders[orderID] = "shipping_canceled"
267	os.mu.Unlock()
268
269	return nil
270}
271
272// Example usage
273func Example() {
274	orchestrator := NewOrchestrator()
275	orderService := NewOrderService()
276
277	// Create order saga
278	saga := orchestrator.CreateSaga("order-123")
279
280	// Add steps
281	saga.AddStep(Step{
282		Name:         "reserve_inventory",
283		Action:       orderService.ReserveInventory,
284		Compensation: orderService.ReleaseInventory,
285	})
286
287	saga.AddStep(Step{
288		Name:         "process_payment",
289		Action:       orderService.ProcessPayment,
290		Compensation: orderService.RefundPayment,
291	})
292
293	saga.AddStep(Step{
294		Name:         "arrange_shipping",
295		Action:       orderService.ArrangeShipping,
296		Compensation: orderService.CancelShipping,
297	})
298
299	// Set saga data
300	saga.Data = map[string]interface{}{
301		"order_id":   "order-123",
302		"product_id": "prod-456",
303		"quantity":   5,
304		"amount":     99.99,
305		"address":    "123 Main St",
306	}
307
308	// Execute saga
309	ctx := context.Background()
310	if err := orchestrator.ExecuteSaga(ctx, "order-123"); err != nil {
311		fmt.Printf("Saga failed: %v\n", err)
312	} else {
313		fmt.Println("Saga completed successfully")
314	}
315
316	// Check saga status
317	if completedSaga, err := orchestrator.GetSaga("order-123"); err == nil {
318		fmt.Printf("Status: %s\n", completedSaga.Status)
319	}
320}

Key Takeaways

  1. Orchestration: Central coordinator manages saga execution
  2. Compensation: Always provide compensation for each step
  3. Idempotency: Steps should be safely re-runnable
  4. Failure Handling: Compensate in reverse order on failure
  5. State Tracking: Maintain clear saga state and progress