Learning Objectives
By the end of this tutorial, you will be able to:
- Design resilient distributed systems using proven patterns like circuit breakers and sagas
- Implement consensus algorithms for consistent distributed state management
- Build fault-tolerant services that survive node failures and network partitions
- Apply CAP theorem trade-offs to make informed architecture decisions
- Create service mesh patterns for microservice communication and observability
- Handle distributed transactions using compensation and event sourcing patterns
Hook - The Global Scale Challenge
Imagine running a global restaurant chain with hundreds of locations. Each restaurant needs to coordinate with others, share information, and continue operating even when some locations face problems. This is exactly what distributed systems do for software - they help multiple computers work together as a team.
Real-World Impact:
- Netflix processes 7 billion requests/day using circuit breakers and bulkhead patterns
- Uber uses sagas to coordinate rides across 50+ microservices
- Kubernetes depends on etcd consensus for cluster state management
Modern applications rarely run on a single machine. Whether you're building microservices, handling millions of users, or processing terabytes of data, you need distributed systems patterns. Go's concurrency primitives, strong standard library, and compiled nature make it ideal for distributed systems.
Core Concepts - Distributed Systems Challenges
Real-World Examples
Netflix - Resilience at scale:
1// Circuit breaker prevents cascade failures
2// Before: One failing service crashes entire system
3// After: Circuit breaker isolates failures, 99.99% uptime
Netflix processes 7 billion requests/day using circuit breakers and bulkhead patterns. When one recommendation service fails, users can still stream movies.
Uber - Consistent data across microservices:
1// Saga pattern for distributed transactions
2// Before: Inconsistent data across services
3// After: Eventual consistency with compensation
Uber uses sagas to coordinate rides across 50+ microservices. When payment fails after ride completion, the system automatically cancels the ride and processes refunds.
Kubernetes - Distributed consensus:
1// etcd uses Raft consensus for cluster state
2// Before: Split-brain scenarios, data loss
3// After: Strong consistency, automatic leader election
Every Kubernetes cluster depends on distributed consensus. When the master node fails, etcd ensures a new leader is elected without losing cluster state.
💡 Key Takeaway: Distributed systems patterns aren't just theoretical concepts - they solve real business problems of scale, reliability, and performance that affect millions of users daily.
Distributed Systems Challenges
Think of distributed systems like managing a team spread across different offices. Each person can have their own opinion, communication can fail, and not everyone receives messages at the same time. This creates unique challenges you never see in single-machine applications.
The CAP Theorem - You can have only 2 of 3:
- Consistency: All nodes see the same data simultaneously
- Availability: Every request gets a response
- Partition Tolerance: System works despite network failures
⚠️ Important: Network partitions WILL happen in distributed systems. You must choose between consistency and availability during those partitions.
┌─────────────────────────────────┐
│ CAP THEOREM │
│ │
│ Consistency │
│ /\ │
│ / \ │
│ / \ │
│ / CA \ │
│ / \ │
│ / \ │
│ / Choose \ │
│ / Any 2 \ │
│ / \ │
│ / \ │
│ /_____________________\ │
│ Availability Partition │
│ Tolerance │
└─────────────────────────────────┘
CA: Traditional RDBMS
CP: etcd, Consul, ZooKeeper
AP: Cassandra, DynamoDB, Riak
Common Distributed Systems Problems:
- Network failures - Messages lost, delayed, or duplicated
- Partial failures - Some nodes work while others fail
- Clock skew - Time differs across machines
- Concurrency - Race conditions at scale
- Data consistency - Keeping replicas in sync
- Cascading failures - One failure triggers others
When to Use Distributed Patterns
✅ Use Distributed Patterns For:
- High availability - 99.9%+ uptime requirements
- Scalability - Handle millions of requests
- Fault tolerance - Survive node failures
- Geographic distribution - Serve users globally with low latency
- Data replication - Backup and disaster recovery
- Microservices - Decouple services for independent development
❌ Not Needed For:
- Single server apps - Monoliths under light load
- Prototypes - Premature distribution adds complexity
- Low traffic - < 100 req/sec can run on one box
- Simple CRUD - Standard database is enough
Real-World Example:
When Twitter started, they used a single database. As they grew to millions of users, they had to adopt distributed patterns. But they didn't start there - they evolved to distributed systems when the business need became clear.
Decision Tree:
Can your system run on one machine?
├─ Yes → Do you need high availability?
│ ├─ Yes → ✅ USE DISTRIBUTED PATTERNS
│ └─ No → ❌ KEEP IT SIMPLE
└─ No → Is network partitioning likely?
├─ Yes → ✅ USE DISTRIBUTED PATTERNS
└─ No → Do you have multiple datacenters?
├─ Yes → ✅ USE DISTRIBUTED PATTERNS
└─ No → ⚠️ START SIMPLE, ADD AS NEEDED
Now let's dive into the core patterns that solve these challenges. We'll start with understanding the fundamental tradeoffs in distributed systems.
CAP Theorem and Consistency Models
Understanding CAP in Practice
Consider managing a chain of coffee shops. When a customer buys coffee, you need to decide how to handle inventory updates. This is exactly the CAP tradeoff in action:
- CP: If communication fails between shops, stop selling coffee until connectivity is restored. Better to say "out of stock" than sell something you can't track.
- AP: Always accept sales, even if shops can't communicate. You might oversell, but customers can always buy coffee.
Let's see this in code:
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9// Example: Different consistency models
10
11// CP System: Consistent + Partition-tolerant
12type CPStore struct {
13 mu sync.RWMutex
14 data map[string]string
15 alive bool
16}
17
18func Write(key, value string) error {
19 s.mu.Lock()
20 defer s.mu.Unlock()
21
22 if !s.alive {
23 return fmt.Errorf("node unavailable")
24 }
25
26 s.data[key] = value
27 return nil
28}
29
30func Read(key string) {
31 s.mu.RLock()
32 defer s.mu.RUnlock()
33
34 if !s.alive {
35 return "", fmt.Errorf("node unavailable")
36 }
37
38 return s.data[key], nil
39}
40
41// AP System: Available + Partition-tolerant
42type APStore struct {
43 mu sync.RWMutex
44 data map[string]string
45 vectorClock map[string]int
46}
47
48func Write(key, value string) error {
49 s.mu.Lock()
50 defer s.mu.Unlock()
51
52 // Always accept writes
53 s.data[key] = value
54 s.vectorClock[key]++
55 return nil
56}
57
58func Read(key string) {
59 s.mu.RLock()
60 defer s.mu.RUnlock()
61
62 // Always return data
63 return s.data[key], nil
64}
65
66func main() {
67 // CP System: Prioritizes consistency over availability
68 cpStore := &CPStore{
69 data: make(map[string]string),
70 alive: true,
71 }
72
73 cpStore.Write("user:1", "Alice")
74 fmt.Println("CP Store:", cpStore.data)
75
76 // Simulate partition
77 cpStore.alive = false
78 err := cpStore.Write("user:2", "Bob")
79 fmt.Printf("CP Write during partition: %v\n", err) // Error!
80
81 // AP System: Prioritizes availability over consistency
82 apStore := &APStore{
83 data: make(map[string]string),
84 vectorClock: make(map[string]int),
85 }
86
87 apStore.Write("user:1", "Alice")
88 fmt.Println("AP Store:", apStore.data)
89
90 // Always accepts writes
91 apStore.Write("user:2", "Bob")
92 fmt.Printf("AP Write during partition: Success\n")
93}
Real-World Examples:
- Banking: When ATM networks are down, banks block transactions rather than risk incorrect balances
- Social Media: Facebook shows you content even if some servers are down - better to show slightly outdated content than nothing
- E-commerce: Amazon accepts orders even if inventory systems are partitioned, but payment processing remains consistent
Consistency Models
Think about consistency models like updating contact information across multiple devices. How quickly should changes propagate? This depends on your needs:
- Strong Consistency: Like editing a Google Doc together - everyone sees changes instantly
- Eventual Consistency: Like syncing photos to the cloud - they appear on all devices eventually
- Causal Consistency: Like commenting on a social media post - replies appear after the original post
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9// Strong Consistency: All reads see latest write immediately
10type StronglyConsistent struct {
11 mu sync.RWMutex
12 data map[string]string
13 nodes []*StronglyConsistent
14}
15
16func Write(key, value string) error {
17 sc.mu.Lock()
18 defer sc.mu.Unlock()
19
20 // Write to all replicas synchronously
21 for _, node := range sc.nodes {
22 node.mu.Lock()
23 node.data[key] = value
24 node.mu.Unlock()
25 }
26
27 sc.data[key] = value
28 return nil
29}
30
31// Eventual Consistency: Reads may return stale data
32type EventuallyConsistent struct {
33 mu sync.RWMutex
34 data map[string]string
35}
36
37func Write(key, value string) {
38 ec.mu.Lock()
39 defer ec.mu.Unlock()
40 ec.data[key] = value
41}
42
43func AsyncReplicate(target *EventuallyConsistent, key, value string) {
44 // Replicate asynchronously
45 go func() {
46 time.Sleep(10 * time.Millisecond) // Simulate network delay
47 target.mu.Lock()
48 target.data[key] = value
49 target.mu.Unlock()
50 }()
51}
52
53// Causal Consistency: Causally related writes seen in order
54type CausallyConsistent struct {
55 mu sync.RWMutex
56 data map[string]string
57 vectorClock map[string]int
58}
59
60func Write(key, value string, dependencies map[string]int) {
61 cc.mu.Lock()
62 defer cc.mu.Unlock()
63
64 // Wait for dependencies to be satisfied
65 for depKey, depVersion := range dependencies {
66 for cc.vectorClock[depKey] < depVersion {
67 cc.mu.Unlock()
68 time.Sleep(1 * time.Millisecond)
69 cc.mu.Lock()
70 }
71 }
72
73 cc.data[key] = value
74 cc.vectorClock[key]++
75}
76
77func main() {
78 fmt.Println("Consistency Models Demo")
79
80 // Strong Consistency
81 strong := &StronglyConsistent{data: make(map[string]string)}
82 strong.Write("key", "value")
83 fmt.Println("Strong:", strong.data) // Always up-to-date
84
85 // Eventual Consistency
86 eventual1 := &EventuallyConsistent{data: make(map[string]string)}
87 eventual2 := &EventuallyConsistent{data: make(map[string]string)}
88 eventual1.Write("key", "value")
89 eventual1.AsyncReplicate(eventual2, "key", "value")
90 fmt.Println("Eventual:", eventual1.data)
91 time.Sleep(20 * time.Millisecond)
92 fmt.Println("Eventual:", eventual2.data)
93
94 // Causal Consistency
95 causal := &CausallyConsistent{
96 data: make(map[string]string),
97 vectorClock: make(map[string]int),
98 }
99 causal.Write("a", "1", nil)
100 causal.Write("b", "2", map[string]int{"a": 1}) // b depends on a
101 fmt.Println("Causal:", causal.data)
102}
When to Use Each Consistency Model:
💡 Strong Consistency: Use when data correctness is critical
- Financial transactions
- Inventory management
- Configuration settings
💡 Eventual Consistency: Use when availability is more important than immediacy
- Social media feeds
- Analytics data
- Cache systems
💡 Causal Consistency: Use when operation order matters
- Collaborative editing
- Social media
- Workflow systems
Now that we understand the fundamental tradeoffs, let's explore practical patterns for building distributed systems. We'll start with service discovery - how services find each other in a distributed environment.
Service Discovery
At a huge conference center with hundreds of rooms, how do you find which room has the session you want to attend? You look at the directory board that shows all sessions and their locations. This is exactly what service discovery does for microservices.
Basic Service Registry
Real-World Example: Netflix's Eureka service registry helps thousands of microservices find each other. When a new instance starts up, it registers itself. When it shuts down gracefully, it deregisters. And crucially, if it crashes, the registry detects the failure through missing heartbeats.
1package main
2
3import (
4 "errors"
5 "fmt"
6 "sync"
7 "time"
8)
9
10// ServiceInstance represents a single service instance
11type ServiceInstance struct {
12 ID string
13 Name string
14 Host string
15 Port int
16 Healthy bool
17 LastSeen time.Time
18}
19
20// ServiceRegistry manages service discovery
21type ServiceRegistry struct {
22 mu sync.RWMutex
23 services map[string][]*ServiceInstance // name -> instances
24 heartbeat time.Duration
25}
26
27func NewServiceRegistry(heartbeat time.Duration) *ServiceRegistry {
28 sr := &ServiceRegistry{
29 services: make(map[string][]*ServiceInstance),
30 heartbeat: heartbeat,
31 }
32
33 // Background health checker
34 go sr.healthCheck()
35
36 return sr
37}
38
39// Register adds a service instance
40func Register(instance *ServiceInstance) {
41 sr.mu.Lock()
42 defer sr.mu.Unlock()
43
44 instance.Healthy = true
45 instance.LastSeen = time.Now()
46
47 sr.services[instance.Name] = append(sr.services[instance.Name], instance)
48}
49
50// Deregister removes a service instance
51func Deregister(name, id string) {
52 sr.mu.Lock()
53 defer sr.mu.Unlock()
54
55 instances := sr.services[name]
56 for i, instance := range instances {
57 if instance.ID == id {
58 sr.services[name] = append(instances[:i], instances[i+1:]...)
59 break
60 }
61 }
62}
63
64// Discover finds healthy instances of a service
65func Discover(name string) {
66 sr.mu.RLock()
67 defer sr.mu.RUnlock()
68
69 instances := sr.services[name]
70 if len(instances) == 0 {
71 return nil, errors.New("service not found")
72 }
73
74 // Return only healthy instances
75 var healthy []*ServiceInstance
76 for _, instance := range instances {
77 if instance.Healthy {
78 healthy = append(healthy, instance)
79 }
80 }
81
82 if len(healthy) == 0 {
83 return nil, errors.New("no healthy instances")
84 }
85
86 return healthy, nil
87}
88
89// Heartbeat updates instance health
90func Heartbeat(name, id string) error {
91 sr.mu.Lock()
92 defer sr.mu.Unlock()
93
94 instances := sr.services[name]
95 for _, instance := range instances {
96 if instance.ID == id {
97 instance.LastSeen = time.Now()
98 instance.Healthy = true
99 return nil
100 }
101 }
102
103 return errors.New("instance not found")
104}
105
106// healthCheck marks instances unhealthy if no heartbeat
107func healthCheck() {
108 ticker := time.NewTicker(sr.heartbeat / 2)
109 defer ticker.Stop()
110
111 for range ticker.C {
112 sr.mu.Lock()
113 now := time.Now()
114
115 for _, instances := range sr.services {
116 for _, instance := range instances {
117 if now.Sub(instance.LastSeen) > sr.heartbeat {
118 instance.Healthy = false
119 }
120 }
121 }
122
123 sr.mu.Unlock()
124 }
125}
126
127func main() {
128 registry := NewServiceRegistry(5 * time.Second)
129
130 // Register service instances
131 registry.Register(&ServiceInstance{
132 ID: "api-1",
133 Name: "api-service",
134 Host: "10.0.0.1",
135 Port: 8080,
136 })
137
138 registry.Register(&ServiceInstance{
139 ID: "api-2",
140 Name: "api-service",
141 Host: "10.0.0.2",
142 Port: 8080,
143 })
144
145 // Discover service
146 instances, err := registry.Discover("api-service")
147 if err != nil {
148 panic(err)
149 }
150
151 fmt.Printf("Found %d instances:\n", len(instances))
152 for _, inst := range instances {
153 fmt.Printf(" %s: %s:%d\n",
154 inst.ID, inst.Host, inst.Port, inst.Healthy)
155 }
156
157 // Simulate heartbeats
158 go func() {
159 ticker := time.NewTicker(2 * time.Second)
160 defer ticker.Stop()
161 for range ticker.C {
162 registry.Heartbeat("api-service", "api-1")
163 registry.Heartbeat("api-service", "api-2")
164 }
165 }()
166
167 time.Sleep(10 * time.Second)
168}
⚠️ Common Pitfalls in Service Discovery:
- Split brain: Multiple nodes think they're the leader
- Stale instances: Registry keeps dead services
- Network partitions: Services can't reach the registry
- Registration storms: All services restart at once
Now let's look at load balancing strategies - once you discover services, how do you distribute requests among them?
Load Balancing Strategies
Think of load balancing like managing checkout lines at a busy supermarket. You need to distribute customers among cashiers efficiently. Some cashiers are faster, some customers have loyalty cards and always use the same cashier, and sometimes you just send people to the shortest line.
When to Use Each Strategy:
💡 Round-Robin: Great for equal-capacity servers with similar request patterns
- Web servers serving static content
- API gateways with uniform endpoint performance
- Load testing environments
💡 Weighted Round-Robin: Perfect when servers have different capacities
- Mixed infrastructure
- Gradual migration from old to new hardware
- Cloud environments with different instance sizes
💡 Consistent Hashing: Essential for maintaining session state
- Shopping carts
- Caching systems
- Database sharding
1package main
2
3import (
4 "fmt"
5 "hash/fnv"
6 "math/rand"
7 "sync/atomic"
8)
9
10// LoadBalancer selects service instances
11type LoadBalancer interface {
12 Next(instances []*ServiceInstance) *ServiceInstance
13}
14
15// RoundRobin load balancer
16type RoundRobin struct {
17 counter atomic.Uint64
18}
19
20func Next(instances []*ServiceInstance) *ServiceInstance {
21 if len(instances) == 0 {
22 return nil
23 }
24
25 index := rr.counter.Add(1) % uint64(len(instances))
26 return instances[index]
27}
28
29// Random load balancer
30type Random struct{}
31
32func Next(instances []*ServiceInstance) *ServiceInstance {
33 if len(instances) == 0 {
34 return nil
35 }
36
37 return instances[rand.Intn(len(instances))]
38}
39
40// Weighted round-robin
41type WeightedRoundRobin struct {
42 weights map[string]int
43 counter atomic.Uint64
44}
45
46func NewWeightedRoundRobin() *WeightedRoundRobin {
47 return &WeightedRoundRobin{
48 weights: make(map[string]int),
49 }
50}
51
52func SetWeight(id string, weight int) {
53 wrr.weights[id] = weight
54}
55
56func Next(instances []*ServiceInstance) *ServiceInstance {
57 if len(instances) == 0 {
58 return nil
59 }
60
61 // Simple weighted selection
62 totalWeight := 0
63 for _, inst := range instances {
64 weight := wrr.weights[inst.ID]
65 if weight == 0 {
66 weight = 1
67 }
68 totalWeight += weight
69 }
70
71 target := int(wrr.counter.Add(1) % uint64(totalWeight))
72 cumulative := 0
73
74 for _, inst := range instances {
75 weight := wrr.weights[inst.ID]
76 if weight == 0 {
77 weight = 1
78 }
79 cumulative += weight
80
81 if target < cumulative {
82 return inst
83 }
84 }
85
86 return instances[0]
87}
88
89// Consistent hashing
90type ConsistentHash struct{}
91
92func NextForKey(instances []*ServiceInstance, key string) *ServiceInstance {
93 if len(instances) == 0 {
94 return nil
95 }
96
97 hash := ch.hash(key)
98 index := hash % uint32(len(instances))
99 return instances[index]
100}
101
102func hash(s string) uint32 {
103 h := fnv.New32a()
104 h.Write([]byte(s))
105 return h.Sum32()
106}
107
108func main() {
109 instances := []*ServiceInstance{
110 {ID: "1", Host: "10.0.0.1", Port: 8080},
111 {ID: "2", Host: "10.0.0.2", Port: 8080},
112 {ID: "3", Host: "10.0.0.3", Port: 8080},
113 }
114
115 // Round-robin
116 rr := &RoundRobin{}
117 fmt.Println("Round-robin:")
118 for i := 0; i < 5; i++ {
119 inst := rr.Next(instances)
120 fmt.Printf(" %s\n", inst.ID)
121 }
122
123 // Weighted round-robin
124 wrr := NewWeightedRoundRobin()
125 wrr.SetWeight("1", 1)
126 wrr.SetWeight("2", 2)
127 wrr.SetWeight("3", 1)
128 fmt.Println("\nWeighted round-robin:")
129 for i := 0; i < 8; i++ {
130 inst := wrr.Next(instances)
131 fmt.Printf(" %s\n", inst.ID)
132 }
133
134 // Consistent hashing
135 ch := &ConsistentHash{}
136 fmt.Println("\nConsistent hashing:")
137 keys := []string{"user-123", "user-456", "user-123", "user-789"}
138 for _, key := range keys {
139 inst := ch.NextForKey(instances, key)
140 fmt.Printf(" %s -> %s\n", key, inst.ID)
141 }
142}
Now let's explore one of the most important patterns in distributed systems - the circuit breaker. This pattern prevents cascading failures, which can bring down entire systems.
Circuit Breaker Pattern
The circuit breaker prevents cascading failures by stopping requests to failing services.
Real-World Analogy: Think of circuit breakers like the electrical circuit breakers in your home. When a circuit overloads, the breaker trips to prevent damage to your appliances. After a while, it tries to reset itself to see if the problem is fixed.
Real-World Example: During Black Friday sales, Amazon's payment processing service came under extreme load. Circuit breakers prevented the payment failures from taking down the entire website. Users could still browse products and add to cart - they just got a friendly "try again later" message at checkout.
1package main
2
3import (
4 "errors"
5 "fmt"
6 "sync"
7 "time"
8)
9
10// State represents circuit breaker state
11type State int
12
13const (
14 StateClosed State = iota // Normal operation
15 StateOpen // Failing, reject requests
16 StateHalfOpen // Testing if service recovered
17)
18
19func String() string {
20 return [...]string{"CLOSED", "OPEN", "HALF-OPEN"}[s]
21}
22
23// CircuitBreaker implements the circuit breaker pattern
24type CircuitBreaker struct {
25 mu sync.Mutex
26
27 state State
28 failureCount int
29 successCount int
30 lastFailTime time.Time
31 lastStateTime time.Time
32
33 // Configuration
34 maxFailures int
35 timeout time.Duration
36 halfOpenMax int
37}
38
39// NewCircuitBreaker creates a circuit breaker
40func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
41 return &CircuitBreaker{
42 state: StateClosed,
43 maxFailures: maxFailures,
44 timeout: timeout,
45 halfOpenMax: 3, // Try 3 requests in half-open
46 }
47}
48
49// Call executes a function with circuit breaker protection
50func Call(fn func() error) error {
51 cb.mu.Lock()
52
53 // Check if circuit is open
54 if cb.state == StateOpen {
55 // Check if timeout elapsed
56 if time.Since(cb.lastStateTime) > cb.timeout {
57 cb.state = StateHalfOpen
58 cb.successCount = 0
59 cb.lastStateTime = time.Now()
60 } else {
61 cb.mu.Unlock()
62 return errors.New("circuit breaker is OPEN")
63 }
64 }
65
66 // In half-open, limit concurrent requests
67 if cb.state == StateHalfOpen && cb.successCount >= cb.halfOpenMax {
68 cb.mu.Unlock()
69 return errors.New("circuit breaker is HALF-OPEN")
70 }
71
72 cb.mu.Unlock()
73
74 // Execute function
75 err := fn()
76
77 cb.mu.Lock()
78 defer cb.mu.Unlock()
79
80 if err != nil {
81 cb.onFailure()
82 return err
83 }
84
85 cb.onSuccess()
86 return nil
87}
88
89func onSuccess() {
90 cb.failureCount = 0
91
92 if cb.state == StateHalfOpen {
93 cb.successCount++
94
95 // After enough successes, close circuit
96 if cb.successCount >= cb.halfOpenMax {
97 cb.state = StateClosed
98 cb.lastStateTime = time.Now()
99 }
100 }
101}
102
103func onFailure() {
104 cb.failureCount++
105 cb.lastFailTime = time.Now()
106
107 if cb.state == StateHalfOpen {
108 // Any failure in half-open -> back to open
109 cb.state = StateOpen
110 cb.lastStateTime = time.Now()
111 return
112 }
113
114 // Too many failures -> open circuit
115 if cb.failureCount >= cb.maxFailures {
116 cb.state = StateOpen
117 cb.lastStateTime = time.Now()
118 }
119}
120
121// State returns current state
122func State() State {
123 cb.mu.Lock()
124 defer cb.mu.Unlock()
125 return cb.state
126}
127
128func main() {
129 cb := NewCircuitBreaker(3, 5*time.Second)
130
131 // Simulate service calls
132 callService := func() error {
133 // Simulate failing service
134 if time.Now().Unix()%2 == 0 {
135 return errors.New("service error")
136 }
137 return nil
138 }
139
140 for i := 0; i < 10; i++ {
141 err := cb.Call(callService)
142 state := cb.State()
143
144 if err != nil {
145 fmt.Printf("Call %d: ERROR - State: %s\n", i+1, err, state)
146 } else {
147 fmt.Printf("Call %d: SUCCESS - State: %s\n", i+1, state)
148 }
149
150 time.Sleep(500 * time.Millisecond)
151 }
152}
When to Use Circuit Breakers:
✅ Essential for:
- External API calls
- Database connections
- Critical microservices
- Rate-limited services
⚠️ Common Pitfalls:
- Setting thresholds too low: False positives, unnecessary circuit breaking
- Setting thresholds too high: Too much damage before circuit breaks
- No monitoring: You don't know when circuits are breaking
- No fallback behavior: Users get errors instead of degraded service
Let's look at a more advanced circuit breaker with metrics to understand how it behaves in production.
Circuit Breaker with Metrics
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9// CircuitBreakerWithMetrics tracks detailed metrics
10type CircuitBreakerWithMetrics struct {
11 *CircuitBreaker
12 mu sync.RWMutex
13
14 // Metrics
15 totalCalls int64
16 successCalls int64
17 failureCalls int64
18 rejectedCalls int64
19 stateChanges map[string]int64
20}
21
22func NewCircuitBreakerWithMetrics(maxFailures int, timeout time.Duration) *CircuitBreakerWithMetrics {
23 return &CircuitBreakerWithMetrics{
24 CircuitBreaker: NewCircuitBreaker(maxFailures, timeout),
25 stateChanges: make(map[string]int64),
26 }
27}
28
29func Call(fn func() error) error {
30 cbm.mu.Lock()
31 cbm.totalCalls++
32 oldState := cbm.state
33 cbm.mu.Unlock()
34
35 err := cbm.CircuitBreaker.Call(fn)
36
37 cbm.mu.Lock()
38 defer cbm.mu.Unlock()
39
40 newState := cbm.state
41
42 if err != nil {
43 if err.Error() == "circuit breaker is OPEN" || err.Error() == "circuit breaker is HALF-OPEN" {
44 cbm.rejectedCalls++
45 } else {
46 cbm.failureCalls++
47 }
48 } else {
49 cbm.successCalls++
50 }
51
52 // Track state changes
53 if oldState != newState {
54 transition := fmt.Sprintf("%s->%s", oldState, newState)
55 cbm.stateChanges[transition]++
56 }
57
58 return err
59}
60
61func Metrics() map[string]interface{} {
62 cbm.mu.RLock()
63 defer cbm.mu.RUnlock()
64
65 successRate := float64(0)
66 if cbm.totalCalls > 0 {
67 successRate = float64(cbm.successCalls) / float64(cbm.totalCalls) * 100
68 }
69
70 return map[string]interface{}{
71 "total_calls": cbm.totalCalls,
72 "success_calls": cbm.successCalls,
73 "failure_calls": cbm.failureCalls,
74 "rejected_calls": cbm.rejectedCalls,
75 "success_rate": fmt.Sprintf("%.2f%%", successRate),
76 "current_state": cbm.state.String(),
77 "state_changes": cbm.stateChanges,
78 }
79}
80
81func main() {
82 cb := NewCircuitBreakerWithMetrics(3, 2*time.Second)
83
84 // Simulate varying service reliability
85 for i := 0; i < 20; i++ {
86 err := cb.Call(func() error {
87 // Fail 60% of the time
88 if i%5 < 3 {
89 return fmt.Errorf("service error")
90 }
91 return nil
92 })
93
94 if err != nil {
95 fmt.Printf("Call %d: FAILED\n", i+1, err)
96 } else {
97 fmt.Printf("Call %d: SUCCESS\n", i+1)
98 }
99
100 time.Sleep(300 * time.Millisecond)
101 }
102
103 // Print metrics
104 fmt.Println("\nMetrics:")
105 metrics := cb.Metrics()
106 for k, v := range metrics {
107 fmt.Printf(" %s: %v\n", k, v)
108 }
109}
Raft Consensus Basics
Raft is a consensus algorithm that ensures distributed systems agree on state even with failures.
Simplified Raft Leader Election
1package main
2
3import (
4 "fmt"
5 "math/rand"
6 "sync"
7 "time"
8)
9
10type NodeState int
11
12const (
13 Follower NodeState = iota
14 Candidate
15 Leader
16)
17
18func String() string {
19 return [...]string{"FOLLOWER", "CANDIDATE", "LEADER"}[s]
20}
21
22// RaftNode represents a node in Raft cluster
23type RaftNode struct {
24 id int
25 state NodeState
26 currentTerm int
27 votedFor int
28 voteCount int
29
30 // Cluster
31 peers []*RaftNode
32
33 // Timeouts
34 electionTimeout time.Duration
35 heartbeatTimeout time.Duration
36 lastHeartbeat time.Time
37
38 mu sync.Mutex
39}
40
41func NewRaftNode(id int, peers []*RaftNode) *RaftNode {
42 return &RaftNode{
43 id: id,
44 state: Follower,
45 currentTerm: 0,
46 votedFor: -1,
47 peers: peers,
48 electionTimeout: time.Duration(150+rand.Intn(150)) * time.Millisecond,
49 heartbeatTimeout: 50 * time.Millisecond,
50 lastHeartbeat: time.Now(),
51 }
52}
53
54// Start begins the node's operation
55func Start() {
56 go n.run()
57}
58
59func run() {
60 for {
61 n.mu.Lock()
62 state := n.state
63 n.mu.Unlock()
64
65 switch state {
66 case Follower:
67 n.runFollower()
68 case Candidate:
69 n.runCandidate()
70 case Leader:
71 n.runLeader()
72 }
73 }
74}
75
76func runFollower() {
77 timeout := time.After(n.electionTimeout)
78
79 for {
80 n.mu.Lock()
81 if n.state != Follower {
82 n.mu.Unlock()
83 return
84 }
85 n.mu.Unlock()
86
87 select {
88 case <-timeout:
89 // No heartbeat from leader, become candidate
90 n.mu.Lock()
91 n.state = Candidate
92 n.mu.Unlock()
93 fmt.Printf("Node %d: Follower -> Candidate\n", n.id)
94 return
95 case <-time.After(10 * time.Millisecond):
96 // Check if heartbeat received
97 n.mu.Lock()
98 if time.Since(n.lastHeartbeat) > n.electionTimeout {
99 n.state = Candidate
100 n.mu.Unlock()
101 return
102 }
103 n.mu.Unlock()
104 }
105 }
106}
107
108func runCandidate() {
109 n.mu.Lock()
110 n.currentTerm++
111 n.votedFor = n.id
112 n.voteCount = 1 // Vote for self
113 term := n.currentTerm
114 n.mu.Unlock()
115
116 fmt.Printf("Node %d: Starting election for term %d\n", n.id, term)
117
118 // Request votes from peers
119 for _, peer := range n.peers {
120 if peer.id == n.id {
121 continue
122 }
123
124 go func(p *RaftNode) {
125 if p.RequestVote(n.id, term) {
126 n.mu.Lock()
127 n.voteCount++
128 voteCount := n.voteCount
129 n.mu.Unlock()
130
131 // Majority?
132 if voteCount > len(n.peers)/2 {
133 n.mu.Lock()
134 if n.state == Candidate {
135 n.state = Leader
136 fmt.Printf("Node %d: Candidate -> Leader\n",
137 n.id, voteCount)
138 }
139 n.mu.Unlock()
140 }
141 }
142 }(peer)
143 }
144
145 // Election timeout
146 time.Sleep(n.electionTimeout)
147
148 n.mu.Lock()
149 if n.state == Candidate {
150 // Election failed, retry
151 fmt.Printf("Node %d: Election failed, retrying...\n", n.id)
152 n.state = Follower
153 n.votedFor = -1
154 }
155 n.mu.Unlock()
156}
157
158func runLeader() {
159 fmt.Printf("Node %d: Acting as leader\n", n.id)
160
161 ticker := time.NewTicker(n.heartbeatTimeout)
162 defer ticker.Stop()
163
164 for {
165 n.mu.Lock()
166 if n.state != Leader {
167 n.mu.Unlock()
168 return
169 }
170 term := n.currentTerm
171 n.mu.Unlock()
172
173 // Send heartbeats
174 for _, peer := range n.peers {
175 if peer.id == n.id {
176 continue
177 }
178
179 go peer.Heartbeat(n.id, term)
180 }
181
182 <-ticker.C
183 }
184}
185
186// RequestVote handles vote requests
187func RequestVote(candidateID, term int) bool {
188 n.mu.Lock()
189 defer n.mu.Unlock()
190
191 // Reject if we've already voted in this term
192 if n.currentTerm > term {
193 return false
194 }
195
196 if n.currentTerm < term {
197 n.currentTerm = term
198 n.votedFor = -1
199 n.state = Follower
200 }
201
202 if n.votedFor == -1 || n.votedFor == candidateID {
203 n.votedFor = candidateID
204 return true
205 }
206
207 return false
208}
209
210// Heartbeat handles heartbeat from leader
211func Heartbeat(leaderID, term int) {
212 n.mu.Lock()
213 defer n.mu.Unlock()
214
215 if term >= n.currentTerm {
216 n.currentTerm = term
217 n.state = Follower
218 n.lastHeartbeat = time.Now()
219 }
220}
221
222func main() {
223 // Create cluster of 5 nodes
224 nodes := make([]*RaftNode, 5)
225 for i := range nodes {
226 nodes[i] = &RaftNode{id: i}
227 }
228
229 // Set peers
230 for i := range nodes {
231 nodes[i] = NewRaftNode(i, nodes)
232 }
233
234 // Start all nodes
235 for _, node := range nodes {
236 node.Start()
237 }
238
239 // Run for 10 seconds
240 time.Sleep(10 * time.Second)
241}
Distributed Locks
Redis-Based Distributed Lock
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/google/uuid"
10)
11
12// DistributedLock implements distributed locking
13type DistributedLock struct {
14 key string
15 value string
16 ttl time.Duration
17}
18
19// NewDistributedLock creates a new lock
20func NewDistributedLock(key string, ttl time.Duration) *DistributedLock {
21 return &DistributedLock{
22 key: key,
23 value: uuid.New().String(), // Unique token
24 ttl: ttl,
25 }
26}
27
28// Lock acquires the lock
29func Lock(ctx context.Context) error {
30 deadline, ok := ctx.Deadline()
31 if !ok {
32 deadline = time.Now().Add(10 * time.Second)
33 }
34
35 for {
36 // Try to acquire lock
37 if dl.tryLock() {
38 return nil
39 }
40
41 // Check timeout
42 if time.Now().After(deadline) {
43 return errors.New("lock acquisition timeout")
44 }
45
46 // Retry with exponential backoff
47 select {
48 case <-ctx.Done():
49 return ctx.Err()
50 case <-time.After(100 * time.Millisecond):
51 // Retry
52 }
53 }
54}
55
56func tryLock() bool {
57 // Simulated SET NX
58 // In real implementation, use Redis SET key value NX EX seconds
59 // For now, use a simple in-memory map
60
61 lockStore.mu.Lock()
62 defer lockStore.mu.Unlock()
63
64 if existing, ok := lockStore.locks[dl.key]; ok {
65 // Check if expired
66 if time.Now().Before(existing.expiry) {
67 return false
68 }
69 }
70
71 // Acquire lock
72 lockStore.locks[dl.key] = &lockEntry{
73 value: dl.value,
74 expiry: time.Now().Add(dl.ttl),
75 }
76
77 return true
78}
79
80// Unlock releases the lock
81func Unlock() error {
82 lockStore.mu.Lock()
83 defer lockStore.mu.Unlock()
84
85 existing, ok := lockStore.locks[dl.key]
86 if !ok {
87 return errors.New("lock not held")
88 }
89
90 // Only unlock if we own it
91 if existing.value != dl.value {
92 return errors.New("lock owned by another process")
93 }
94
95 delete(lockStore.locks, dl.key)
96 return nil
97}
98
99// Extend extends the lock TTL
100func Extend(duration time.Duration) error {
101 lockStore.mu.Lock()
102 defer lockStore.mu.Unlock()
103
104 existing, ok := lockStore.locks[dl.key]
105 if !ok {
106 return errors.New("lock not held")
107 }
108
109 if existing.value != dl.value {
110 return errors.New("lock owned by another process")
111 }
112
113 existing.expiry = time.Now().Add(duration)
114 return nil
115}
116
117// Simple in-memory lock store
118type lockEntry struct {
119 value string
120 expiry time.Time
121}
122
123var lockStore = struct {
124 mu sync.Mutex
125 locks map[string]*lockEntry
126}{
127 locks: make(map[string]*lockEntry),
128}
129
130func main() {
131 lock := NewDistributedLock("resource:123", 5*time.Second)
132
133 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
134 defer cancel()
135
136 // Acquire lock
137 if err := lock.Lock(ctx); err != nil {
138 panic(err)
139 }
140 fmt.Println("Lock acquired")
141
142 // Do work
143 time.Sleep(2 * time.Second)
144
145 // Extend lock
146 if err := lock.Extend(5 * time.Second); err != nil {
147 panic(err)
148 }
149 fmt.Println("Lock extended")
150
151 // Do more work
152 time.Sleep(2 * time.Second)
153
154 // Release lock
155 if err := lock.Unlock(); err != nil {
156 panic(err)
157 }
158 fmt.Println("Lock released")
159}
Further Reading
Books
- Designing Data-Intensive Applications by Martin Kleppmann
- Building Microservices by Sam Newman
- Release It! by Michael Nygard
Papers
Tools
- etcd - Distributed key-value store using Raft
- Consul - Service mesh and service discovery
- Istio - Service mesh for Kubernetes
Articles
Integration and Mastery
Building distributed systems is fundamentally different from building monolithic applications. The patterns and techniques covered in this tutorial represent years of hard-won lessons from production systems serving millions of users. This section synthesizes these lessons into actionable guidance for designing, implementing, and operating distributed systems in Go.
Key Takeaways and Best Practices
1. Embrace Failure as the Normal Case
In distributed systems, failures are not exceptional - they're inevitable. Design every component assuming that:
- Network calls will timeout or fail intermittently
- Servers will crash at the worst possible moment
- Network partitions will split your cluster unexpectedly
- Clocks will drift and messages will arrive out of order
1// Always implement timeouts and retries
2ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
3defer cancel()
4
5var result Response
6err := retry.Do(
7 func() error {
8 return makeRequest(ctx, &result)
9 },
10 retry.Attempts(3),
11 retry.Delay(100*time.Millisecond),
12 retry.MaxDelay(1*time.Second),
13 retry.DelayType(retry.BackOffDelay),
14)
2. Start Simple, Scale Gradually
The patterns in this tutorial solve real problems, but they add complexity. Follow this progression:
Phase 1 - Single Service (0-10K users):
- Start with a well-structured monolith
- Use simple HTTP/gRPC for external APIs
- Single database with proper indexing
- Basic health checks and logging
Phase 2 - Fault Tolerance (10K-100K users):
- Add circuit breakers to prevent cascade failures
- Implement structured logging and basic metrics
- Add load balancers for high availability
- Use connection pools and timeouts everywhere
Phase 3 - Distribution (100K-1M users):
- Split into microservices based on team boundaries
- Implement service discovery (Consul, etcd)
- Add distributed tracing (OpenTelemetry)
- Consider event-driven architecture
Phase 4 - Scale and Resilience (1M+ users):
- Implement consensus for critical state (Raft, Paxos)
- Use saga patterns for distributed transactions
- Deploy service mesh for advanced traffic management
- Implement chaos engineering practices
3. Consistency Model Alignment
Choose consistency models based on business requirements, not technical preferences:
1// Example: Banking requires strong consistency
2type AccountTransfer struct {
3 // Use distributed transactions or consensus
4 // Cannot have "split-brain" money creation
5}
6
7// Example: Social media feed is eventually consistent
8type UserFeed struct {
9 // Can tolerate temporary inconsistency
10 // Higher availability is more valuable
11}
12
13// Example: Shopping cart is session-consistent
14type ShoppingCart struct {
15 // User sees their own updates immediately
16 // Other users don't need real-time view
17}
Matching Patterns to Requirements:
- Strong Consistency Needed: Use Raft/Paxos consensus, distributed locks, or two-phase commit
- Eventual Consistency Acceptable: Use message queues, event sourcing, or CRDT data structures
- Session Consistency Required: Use sticky sessions, read-your-writes guarantees, or client-side caching
Common Pitfalls and How to Avoid Them
Pitfall 1: Distributed Transactions Without Timeouts
1// ❌ Bad: Can deadlock indefinitely
2func transferMoney(from, to string, amount int) error {
3 lock1 := acquireLock(from) // What if this hangs?
4 lock2 := acquireLock(to)
5 defer releaseLock(from, to)
6 // ... transfer logic
7}
8
9// ✅ Good: Always use timeouts and proper lock ordering
10func transferMoney(ctx context.Context, from, to string, amount int) error {
11 // Order locks alphabetically to prevent deadlock
12 first, second := from, to
13 if from > to {
14 first, second = to, from
15 }
16
17 lock1, err := acquireLockWithTimeout(ctx, first, 5*time.Second)
18 if err != nil {
19 return fmt.Errorf("failed to acquire first lock: %w", err)
20 }
21 defer lock1.Release()
22
23 lock2, err := acquireLockWithTimeout(ctx, second, 5*time.Second)
24 if err != nil {
25 return fmt.Errorf("failed to acquire second lock: %w", err)
26 }
27 defer lock2.Release()
28
29 return performTransfer(ctx, from, to, amount)
30}
Pitfall 2: Ignoring Idempotency
Distributed systems often retry operations. If your operations aren't idempotent, retries cause duplicate actions:
1// ❌ Bad: Retrying this creates duplicate charges
2func chargeCustomer(customerID string, amount int) error {
3 return db.Exec("INSERT INTO charges (customer_id, amount) VALUES (?, ?)",
4 customerID, amount)
5}
6
7// ✅ Good: Use idempotency keys
8func chargeCustomer(customerID string, amount int, idempotencyKey string) error {
9 // Try to insert with unique key constraint
10 err := db.Exec(`
11 INSERT INTO charges (idempotency_key, customer_id, amount)
12 VALUES (?, ?, ?)
13 ON CONFLICT (idempotency_key) DO NOTHING`,
14 idempotencyKey, customerID, amount)
15
16 if err != nil {
17 return fmt.Errorf("charge failed: %w", err)
18 }
19
20 // Return existing charge if key already existed
21 return nil
22}
Pitfall 3: Cascading Failures
One slow service can bring down your entire system:
1// ❌ Bad: No protection against slow dependencies
2func getRecommendations(userID string) ([]Product, error) {
3 profile := fetchUserProfile(userID) // 5s timeout
4 history := fetchPurchaseHistory(userID) // 5s timeout
5 trending := fetchTrendingProducts() // 5s timeout
6 // Total worst case: 15 seconds!
7}
8
9// ✅ Good: Use circuit breakers and parallel execution
10func getRecommendations(ctx context.Context, userID string) ([]Product, error) {
11 // Set overall deadline
12 ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
13 defer cancel()
14
15 var profile UserProfile
16 var history []Purchase
17 var trending []Product
18
19 // Execute in parallel with circuit breakers
20 g, ctx := errgroup.WithContext(ctx)
21
22 g.Go(func() error {
23 var err error
24 profile, err = profileBreaker.Execute(func() (UserProfile, error) {
25 return fetchUserProfile(ctx, userID)
26 })
27 if err != nil {
28 // Use cached or default profile
29 profile = getDefaultProfile()
30 }
31 return nil // Don't fail overall request
32 })
33
34 g.Go(func() error {
35 var err error
36 history, err = historyBreaker.Execute(func() ([]Purchase, error) {
37 return fetchPurchaseHistory(ctx, userID)
38 })
39 if err != nil {
40 history = []Purchase{} // Empty history is acceptable
41 }
42 return nil
43 })
44
45 g.Go(func() error {
46 var err error
47 trending, err = trendingBreaker.Execute(func() ([]Product, error) {
48 return fetchTrendingProducts(ctx)
49 })
50 if err != nil {
51 trending = getCachedTrending() // Use cache
52 }
53 return nil
54 })
55
56 g.Wait() // Wait for all to complete
57
58 return generateRecommendations(profile, history, trending), nil
59}
Pitfall 4: Split-Brain Scenarios
Network partitions can create multiple leaders:
1// ✅ Always use odd number of nodes for quorum
2type ClusterConfig struct {
3 Nodes []string // Must be 3, 5, 7, etc.
4}
5
6// ✅ Require majority votes for leadership
7func electLeader(votes map[string]int, totalNodes int) (string, error) {
8 quorum := totalNodes/2 + 1
9
10 for candidate, voteCount := range votes {
11 if voteCount >= quorum {
12 return candidate, nil
13 }
14 }
15
16 return "", errors.New("no candidate achieved quorum")
17}
18
19// ✅ Use fencing tokens to prevent stale leaders
20type LeaderLease struct {
21 Token int64 // Monotonically increasing
22 Expires time.Time
23}
24
25func executeLeaderAction(lease LeaderLease, action func() error) error {
26 // Check lease is still valid
27 if time.Now().After(lease.Expires) {
28 return errors.New("lease expired")
29 }
30
31 // Include token in all operations
32 // Storage layer rejects operations with stale tokens
33 return action()
34}
When to Use Distributed Patterns
Use Service Discovery When:
- You have more than 3-5 services
- Service instances are ephemeral (containers, auto-scaling)
- You need dynamic configuration updates
- Health checking is critical to your reliability
Use Circuit Breakers When:
- You depend on external services (APIs, databases)
- Failures can cascade through your system
- You need graceful degradation
- Response time SLAs are important
Use Consensus Algorithms When:
- Strong consistency is non-negotiable (financial data, inventory)
- You need distributed locks or leader election
- Data loss is unacceptable
- You can tolerate reduced availability during partitions
Use Saga Patterns When:
- Transactions span multiple microservices
- Two-phase commit is too slow or complex
- Eventual consistency is acceptable
- You need to maintain service autonomy
Use Distributed Locks When:
- Multiple processes need exclusive access to resources
- You're implementing leader election
- Preventing duplicate work is critical
- Lock duration is short (seconds, not minutes)
Don't Use Distributed Systems When:
- A monolith with proper architecture would suffice
- Your team lacks distributed systems expertise
- The added complexity isn't justified by scale
- Network latency would hurt user experience
Production Deployment Considerations
Observability is Non-Negotiable
Distributed systems are impossible to debug without proper observability:
1// Implement the three pillars: Logs, Metrics, Traces
2
3// 1. Structured Logging with Context
4import "go.uber.org/zap"
5
6logger.Info("processing request",
7 zap.String("request_id", reqID),
8 zap.String("user_id", userID),
9 zap.Duration("duration", elapsed),
10 zap.Int("status_code", statusCode),
11)
12
13// 2. Metrics with Labels
14import "github.com/prometheus/client_golang/prometheus"
15
16requestDuration := prometheus.NewHistogramVec(
17 prometheus.HistogramOpts{
18 Name: "http_request_duration_seconds",
19 Help: "HTTP request latency distribution",
20 Buckets: prometheus.DefBuckets,
21 },
22 []string{"method", "endpoint", "status"},
23)
24
25// 3. Distributed Tracing
26import "go.opentelemetry.io/otel"
27
28ctx, span := tracer.Start(ctx, "process-order")
29defer span.End()
30
31span.SetAttributes(
32 attribute.String("order.id", orderID),
33 attribute.Int("order.items", len(items)),
34)
Gradual Rollouts
Deploy distributed systems changes gradually:
1// Feature flags for controlled rollouts
2type FeatureFlags struct {
3 UseNewConsensusAlgorithm bool // 0% → 1% → 5% → 50% → 100%
4 EnableCircuitBreaker bool
5 RolloutPercentage int
6}
7
8func shouldUseNewFeature(userID string, rolloutPercentage int) bool {
9 // Consistent hashing ensures same user always gets same result
10 hash := crc32.ChecksumIEEE([]byte(userID))
11 return int(hash%100) < rolloutPercentage
12}
13
14// Canary deployments: Route small percentage to new version
15// Blue-green deployments: Switch all traffic at once (with quick rollback)
16// Rolling deployments: Gradually replace old instances
Capacity Planning
Distributed systems scale differently than monoliths:
1// Calculate required capacity with safety margins
2
3// Base calculation
4expectedRPS := 10000
5avgLatency := 50 * time.Millisecond
6concurrency := float64(expectedRPS) * avgLatency.Seconds()
7
8// Apply safety factors
9safetyFactor := 2.0 // Handle 2x traffic spikes
10concurrency *= safetyFactor
11
12// Account for inefficiencies
13clusterEfficiency := 0.7 // 70% efficiency (N+2 redundancy, upgrades, etc.)
14requiredInstances := int(math.Ceil(concurrency / (1000 * clusterEfficiency)))
15
16// Always use odd number for consensus
17if requiredInstances%2 == 0 {
18 requiredInstances++
19}
20
21fmt.Printf("Required instances: %d\n", requiredInstances)
Disaster Recovery
Plan for worst-case scenarios:
- Multi-Region Deployment: Survive entire region failures
- Regular Backups: Automate backups with restore testing
- Runbooks: Document failure scenarios and recovery steps
- Chaos Engineering: Regularly test failure scenarios
- Circuit Breaking: Fail fast and isolate failures
1// Example: Multi-region failover
2type RegionConfig struct {
3 Primary string // us-east-1
4 Fallbacks []string // [us-west-2, eu-west-1]
5}
6
7func makeRequest(ctx context.Context, regions RegionConfig) (Response, error) {
8 // Try primary first
9 resp, err := makeRegionRequest(ctx, regions.Primary)
10 if err == nil {
11 return resp, nil
12 }
13
14 // Fallback to other regions
15 for _, region := range regions.Fallbacks {
16 resp, err := makeRegionRequest(ctx, region)
17 if err == nil {
18 // Log for monitoring
19 logger.Warn("primary region failed, using fallback",
20 zap.String("primary", regions.Primary),
21 zap.String("fallback", region),
22 )
23 return resp, nil
24 }
25 }
26
27 return Response{}, errors.New("all regions unavailable")
28}
Final Thoughts
Distributed systems are powerful but complex. The patterns in this tutorial - circuit breakers, consensus algorithms, service discovery, and sagas - are your toolkit for managing this complexity. Remember:
- Start Simple: Don't over-engineer. Most systems don't need distributed consensus.
- Fail Gracefully: Design for failure, not just success.
- Monitor Everything: You can't fix what you can't see.
- Test Failures: Use chaos engineering to validate resilience.
- Document Trade-offs: Every distributed systems decision involves trade-offs - document them.
The distributed systems landscape is vast, and this tutorial covers foundational patterns. As you build production systems, you'll encounter scenarios requiring deeper expertise in specific areas. The Further Reading section provides resources for diving deeper into topics like distributed databases, stream processing, and consensus protocols.
Most importantly, remember that distributed systems exist to solve business problems. Before reaching for these patterns, ask yourself: "Does the business value justify the added complexity?" Sometimes the best distributed system is a well-designed monolith.
Practice Exercises
Exercise 1: Implement a Saga Pattern
🎯 Learning Objectives:
- Master distributed transaction patterns without two-phase commit
- Implement compensation-based rollback mechanisms
- Learn to handle partial failures in distributed systems
- Design stateful workflow coordinators
🌍 Real-World Context:
Saga patterns are essential for microservices architectures where traditional ACID transactions don't work across service boundaries. Companies like Uber use sagas to coordinate ride-booking across payment, routing, and notification services. When any step fails, compensation actions ensure system consistency without locking resources across services, which is crucial for high-throughput distributed systems.
⏱️ Time Estimate: 60-90 minutes
📊 Difficulty: Advanced
Build a saga coordinator for distributed transactions with compensation. Your implementation should execute steps sequentially, track progress, handle partial failures gracefully, and automatically compensate completed steps when failures occur. This pattern is critical for maintaining data consistency across microservices without distributed locks.
Solution
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8)
9
10// Step represents a saga step
11type Step struct {
12 Name string
13 Execute func(ctx context.Context) error
14 Compensate func(ctx context.Context) error
15}
16
17// Saga coordinates distributed transactions
18type Saga struct {
19 steps []Step
20 executed []int // Track executed step indices
21 mu sync.Mutex
22}
23
24func NewSaga() *Saga {
25 return &Saga{
26 executed: make([]int, 0),
27 }
28}
29
30// AddStep adds a step to the saga
31func AddStep(name string, execute, compensate func(ctx context.Context) error) {
32 s.steps = append(s.steps, Step{
33 Name: name,
34 Execute: execute,
35 Compensate: compensate,
36 })
37}
38
39// Execute runs the saga
40func Execute(ctx context.Context) error {
41 s.mu.Lock()
42 defer s.mu.Unlock()
43
44 // Execute each step
45 for i, step := range s.steps {
46 fmt.Printf("Executing step: %s\n", step.Name)
47
48 if err := step.Execute(ctx); err != nil {
49 fmt.Printf("Step %s failed: %v\n", step.Name, err)
50
51 // Compensate in reverse order
52 if err := s.compensate(ctx); err != nil {
53 return fmt.Errorf("compensation failed: %w", err)
54 }
55
56 return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
57 }
58
59 s.executed = append(s.executed, i)
60 fmt.Printf("Step %s completed\n", step.Name)
61 }
62
63 fmt.Println("Saga completed successfully")
64 return nil
65}
66
67func compensate(ctx context.Context) error {
68 fmt.Println("Starting compensation...")
69
70 // Compensate in reverse order
71 for i := len(s.executed) - 1; i >= 0; i-- {
72 stepIdx := s.executed[i]
73 step := s.steps[stepIdx]
74
75 fmt.Printf("Compensating step: %s\n", step.Name)
76
77 if err := step.Compensate(ctx); err != nil {
78 return fmt.Errorf("compensation failed for step %s: %w", step.Name, err)
79 }
80
81 fmt.Printf("Step %s compensated\n", step.Name)
82 }
83
84 s.executed = nil
85 fmt.Println("Compensation completed")
86 return nil
87}
88
89// Example: Order processing saga
90func main() {
91 saga := NewSaga()
92
93 // Step 1: Reserve inventory
94 var inventoryReserved bool
95 saga.AddStep("ReserveInventory",
96 func(ctx context.Context) error {
97 fmt.Println(" Reserving inventory...")
98 inventoryReserved = true
99 return nil
100 },
101 func(ctx context.Context) error {
102 if inventoryReserved {
103 fmt.Println(" Releasing inventory...")
104 inventoryReserved = false
105 }
106 return nil
107 },
108 )
109
110 // Step 2: Charge payment
111 var paymentCharged bool
112 saga.AddStep("ChargePayment",
113 func(ctx context.Context) error {
114 fmt.Println(" Charging payment...")
115 paymentCharged = true
116 return nil
117 },
118 func(ctx context.Context) error {
119 if paymentCharged {
120 fmt.Println(" Refunding payment...")
121 paymentCharged = false
122 }
123 return nil
124 },
125 )
126
127 // Step 3: Create shipment
128 saga.AddStep("CreateShipment",
129 func(ctx context.Context) error {
130 fmt.Println(" Creating shipment...")
131 return errors.New("shipping service unavailable")
132 },
133 func(ctx context.Context) error {
134 fmt.Println(" Canceling shipment...")
135 return nil
136 },
137 )
138
139 // Execute saga
140 ctx := context.Background()
141 if err := saga.Execute(ctx); err != nil {
142 fmt.Printf("\nSaga failed: %v\n", err)
143 fmt.Printf("Inventory reserved: %v\n", inventoryReserved)
144 fmt.Printf("Payment charged: %v\n", paymentCharged)
145 }
146}
Output:
Executing step: ReserveInventory
Reserving inventory...
Step ReserveInventory completed
Executing step: ChargePayment
Charging payment...
Step ChargePayment completed
Executing step: CreateShipment
Creating shipment...
Step CreateShipment failed: shipping service unavailable
Starting compensation...
Compensating step: ChargePayment
Refunding payment...
Step ChargePayment compensated
Compensating step: ReserveInventory
Releasing inventory...
Step ReserveInventory compensated
Compensation completed
Saga failed: saga failed at step CreateShipment: shipping service unavailable
Inventory reserved: false
Payment charged: false
Exercise 2: Build a Bulkhead Pattern Implementation
🎯 Learning Objectives:
- Implement resource isolation patterns for fault tolerance
- Design concurrent request management with backpressure
- Learn semaphore-based resource allocation
- Build metrics and monitoring for capacity management
🌍 Real-World Context:
The bulkhead pattern prevents cascading failures by isolating resources, just like watertight compartments in ships prevent sinking when one section floods. Netflix uses bulkheads extensively to isolate different parts of their streaming service - when the recommendation service fails, video playback continues unaffected. This pattern is crucial for systems where partial functionality is better than complete outage.
⏱️ Time Estimate: 75-90 minutes
📊 Difficulty: Advanced
Implement the bulkhead pattern to isolate resources and prevent cascading failures. Your implementation should create separate resource pools for different services, limit concurrent requests per pool, queue overflow requests gracefully, and provide comprehensive metrics for monitoring resource utilization and identifying bottlenecks.
Solution
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9)
10
11// Bulkhead isolates resources to prevent cascading failures
12type Bulkhead struct {
13 name string
14 maxWorkers int
15 queueSize int
16
17 workers chan struct{} // Semaphore for workers
18 queue chan *task
19 wg sync.WaitGroup
20
21 // Metrics
22 mu sync.RWMutex
23 activeWorkers int
24 queuedTasks int
25 completedTasks int64
26 rejectedTasks int64
27}
28
29type task struct {
30 fn func(context.Context) error
31 result chan error
32}
33
34// NewBulkhead creates a new bulkhead
35func NewBulkhead(name string, maxWorkers, queueSize int) *Bulkhead {
36 b := &Bulkhead{
37 name: name,
38 maxWorkers: maxWorkers,
39 queueSize: queueSize,
40 workers: make(chan struct{}, maxWorkers),
41 queue: make(chan *task, queueSize),
42 }
43
44 // Start worker pool
45 for i := 0; i < maxWorkers; i++ {
46 b.wg.Add(1)
47 go b.worker()
48 }
49
50 return b
51}
52
53// Execute runs a function with bulkhead protection
54func Execute(ctx context.Context, fn func(context.Context) error) error {
55 // Create task
56 t := &task{
57 fn: fn,
58 result: make(chan error, 1),
59 }
60
61 // Try to queue
62 select {
63 case b.queue <- t:
64 b.mu.Lock()
65 b.queuedTasks++
66 b.mu.Unlock()
67
68 // Wait for result
69 select {
70 case err := <-t.result:
71 return err
72 case <-ctx.Done():
73 return ctx.Err()
74 }
75
76 default:
77 // Queue full
78 b.mu.Lock()
79 b.rejectedTasks++
80 b.mu.Unlock()
81 return errors.New("bulkhead queue full")
82 }
83}
84
85func worker() {
86 defer b.wg.Done()
87
88 for t := range b.queue {
89 b.mu.Lock()
90 b.activeWorkers++
91 b.queuedTasks--
92 b.mu.Unlock()
93
94 // Execute task
95 ctx := context.Background()
96 err := t.fn(ctx)
97 t.result <- err
98
99 b.mu.Lock()
100 b.activeWorkers--
101 b.completedTasks++
102 b.mu.Unlock()
103 }
104}
105
106// Metrics returns current bulkhead metrics
107func Metrics() map[string]interface{} {
108 b.mu.RLock()
109 defer b.mu.RUnlock()
110
111 return map[string]interface{}{
112 "name": b.name,
113 "max_workers": b.maxWorkers,
114 "active_workers": b.activeWorkers,
115 "queued_tasks": b.queuedTasks,
116 "completed_tasks": b.completedTasks,
117 "rejected_tasks": b.rejectedTasks,
118 }
119}
120
121// Close shuts down the bulkhead
122func Close() {
123 close(b.queue)
124 b.wg.Wait()
125}
126
127func main() {
128 // Create separate bulkheads for different services
129 paymentBulkhead := NewBulkhead("payment", 3, 5)
130 shippingBulkhead := NewBulkhead("shipping", 2, 3)
131
132 defer paymentBulkhead.Close()
133 defer shippingBulkhead.Close()
134
135 // Simulate load
136 var wg sync.WaitGroup
137
138 // Payment service
139 for i := 0; i < 10; i++ {
140 wg.Add(1)
141 go func(id int) {
142 defer wg.Done()
143
144 err := paymentBulkhead.Execute(context.Background(), func(ctx context.Context) error {
145 fmt.Printf("Payment %d: Processing...\n", id)
146 time.Sleep(100 * time.Millisecond)
147 return nil
148 })
149
150 if err != nil {
151 fmt.Printf("Payment %d: REJECTED\n", id, err)
152 }
153 }(i)
154 }
155
156 // Shipping service
157 for i := 0; i < 15; i++ {
158 wg.Add(1)
159 go func(id int) {
160 defer wg.Done()
161
162 err := shippingBulkhead.Execute(context.Background(), func(ctx context.Context) error {
163 fmt.Printf("Shipping %d: Processing...\n", id)
164 time.Sleep(500 * time.Millisecond)
165 return nil
166 })
167
168 if err != nil {
169 fmt.Printf("Shipping %d: REJECTED\n", id, err)
170 }
171 }(i)
172 }
173
174 wg.Wait()
175
176 // Print metrics
177 fmt.Println("\n=== Bulkhead Metrics ===")
178 fmt.Printf("Payment: %+v\n", paymentBulkhead.Metrics())
179 fmt.Printf("Shipping: %+v\n", shippingBulkhead.Metrics())
180}
Key Points:
- Separate resource pools prevent one slow service from affecting others
- Queue provides buffer for bursty traffic
- Metrics help identify bottlenecks
- Failed requests fail fast instead of hanging
Exercise 3: Vector Clock Implementation
🎯 Learning Objectives:
- Master causal consistency and happened-before relationships
- Implement distributed event ordering without synchronized clocks
- Learn to detect concurrent vs causally related events
- Design clock merging and comparison algorithms
🌍 Real-World Context:
Vector clocks are fundamental to distributed systems where physical clock synchronization is impossible or unreliable. Amazon Dynamo uses vector clocks to resolve conflicts in eventually consistent storage, while collaborative editing tools like Google Docs use similar concepts to maintain consistency across distributed users. Understanding vector clocks is essential for building systems that need to reason about event causality without global time.
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert
Implement vector clocks for tracking causality in distributed systems. Your implementation should track happened-before relationships between events, detect concurrent events, merge clocks from different nodes, and provide comparison operations for ordering events causally. This is a core building block for eventually consistent systems and collaborative applications.
Solution with Explanation
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7)
8
9// VectorClock tracks causality in distributed systems
10type VectorClock struct {
11 mu sync.RWMutex
12 clock map[string]int
13 nodeID string
14}
15
16// NewVectorClock creates a new vector clock for a node
17func NewVectorClock(nodeID string) *VectorClock {
18 return &VectorClock{
19 clock: make(map[string]int),
20 nodeID: nodeID,
21 }
22}
23
24// Tick increments this node's clock
25func Tick() {
26 vc.mu.Lock()
27 defer vc.mu.Unlock()
28
29 vc.clock[vc.nodeID]++
30}
31
32// Update merges another clock into this one
33func Update(other *VectorClock) {
34 vc.mu.Lock()
35 defer vc.mu.Unlock()
36
37 other.mu.RLock()
38 defer other.mu.RUnlock()
39
40 // Take max of each node's counter
41 for node, timestamp := range other.clock {
42 if vc.clock[node] < timestamp {
43 vc.clock[node] = timestamp
44 }
45 }
46
47 // Increment our own counter
48 vc.clock[vc.nodeID]++
49}
50
51// Compare determines the relationship between two clocks
52type Ordering int
53
54const (
55 Before Ordering = iota // vc happened before other
56 After // vc happened after other
57 Concurrent // vc and other are concurrent
58)
59
60func String() string {
61 switch o {
62 case Before:
63 return "BEFORE"
64 case After:
65 return "AFTER"
66 case Concurrent:
67 return "CONCURRENT"
68 default:
69 return "UNKNOWN"
70 }
71}
72
73// Compare compares this clock with another
74func Compare(other *VectorClock) Ordering {
75 vc.mu.RLock()
76 defer vc.mu.RUnlock()
77
78 other.mu.RLock()
79 defer other.mu.RUnlock()
80
81 allLessOrEqual := true
82 allGreaterOrEqual := true
83
84 // Collect all node IDs
85 nodes := make(map[string]bool)
86 for node := range vc.clock {
87 nodes[node] = true
88 }
89 for node := range other.clock {
90 nodes[node] = true
91 }
92
93 // Compare each node's timestamp
94 for node := range nodes {
95 vcTime := vc.clock[node]
96 otherTime := other.clock[node]
97
98 if vcTime > otherTime {
99 allLessOrEqual = false
100 }
101 if vcTime < otherTime {
102 allGreaterOrEqual = false
103 }
104 }
105
106 // Determine ordering
107 if allLessOrEqual && !allGreaterOrEqual {
108 return Before
109 }
110 if allGreaterOrEqual && !allLessOrEqual {
111 return After
112 }
113 if allLessOrEqual && allGreaterOrEqual {
114 // All equal - technically "Before" but same event
115 return Before
116 }
117
118 return Concurrent
119}
120
121// Copy creates a deep copy of the clock
122func Copy() *VectorClock {
123 vc.mu.RLock()
124 defer vc.mu.RUnlock()
125
126 newClock := NewVectorClock(vc.nodeID)
127 for node, timestamp := range vc.clock {
128 newClock.clock[node] = timestamp
129 }
130
131 return newClock
132}
133
134// String returns a string representation
135func String() string {
136 vc.mu.RLock()
137 defer vc.mu.RUnlock()
138
139 return fmt.Sprintf("%v", vc.clock)
140}
141
142// Event represents a distributed system event
143type Event struct {
144 NodeID string
145 Action string
146 Clock *VectorClock
147}
148
149func main() {
150 // Create three nodes
151 nodeA := NewVectorClock("A")
152 nodeB := NewVectorClock("B")
153 nodeC := NewVectorClock("C")
154
155 fmt.Println("=== Distributed System Event Ordering ===\n")
156
157 // Event 1: Node A sends message
158 fmt.Println("Event 1: Node A sends message")
159 nodeA.Tick()
160 fmt.Printf(" Node A clock: %s\n", nodeA)
161
162 // Event 2: Node B receives message from A
163 fmt.Println("\nEvent 2: Node B receives from A")
164 nodeB.Update(nodeA)
165 fmt.Printf(" Node B clock: %s\n", nodeB)
166
167 // Event 3: Node A does local operation
168 fmt.Println("\nEvent 3: Node A local operation")
169 nodeA.Tick()
170 fmt.Printf(" Node A clock: %s\n", nodeA)
171
172 // Event 4: Node C does local operation
173 fmt.Println("\nEvent 4: Node C local operation")
174 nodeC.Tick()
175 fmt.Printf(" Node C clock: %s\n", nodeC)
176
177 // Event 5: Node C receives from B
178 fmt.Println("\nEvent 5: Node C receives from B")
179 nodeC.Update(nodeB)
180 fmt.Printf(" Node C clock: %s\n", nodeC)
181
182 // Compare clocks
183 fmt.Println("\n=== Clock Comparisons ===\n")
184
185 // Save states for comparison
186 stateA1 := NewVectorClock("A")
187 stateA1.clock["A"] = 1
188 stateB2 := NewVectorClock("B")
189 stateB2.clock["A"] = 1
190 stateB2.clock["B"] = 1
191 stateA3 := NewVectorClock("A")
192 stateA3.clock["A"] = 2
193 stateC4 := NewVectorClock("C")
194 stateC4.clock["C"] = 1
195
196 fmt.Printf("A1 %s vs B2 %s: %s\n",
197 stateA1, stateB2, stateA1.Compare(stateB2))
198
199 fmt.Printf("B2 %s vs C5 %s: %s\n",
200 stateB2, nodeC, stateB2.Compare(nodeC))
201
202 fmt.Printf("A3 %s vs C4 %s: %s\n",
203 stateA3, stateC4, stateA3.Compare(stateC4))
204
205 // Demonstrate causality detection
206 fmt.Println("\n=== Causality Detection ===\n")
207
208 // Create two concurrent events
209 node1 := NewVectorClock("Node1")
210 node2 := NewVectorClock("Node2")
211
212 // Node1 event
213 node1.Tick()
214 event1 := node1.Copy()
215
216 // Node2 event
217 node2.Tick()
218 event2 := node2.Copy()
219
220 // Node1 receives from Node2
221 node1.Update(node2)
222 event3 := node1.Copy()
223
224 fmt.Printf("Event 1: %s\n", event1)
225 fmt.Printf("Event 2: %s\n", event2)
226 fmt.Printf("Event 3: %s\n", event3)
227
228 fmt.Printf("\nEvent1 vs Event2: %s\n",
229 event1.Compare(event2))
230 fmt.Printf("Event1 vs Event3: %s\n",
231 event1.Compare(event3))
232 fmt.Printf("Event2 vs Event3: %s\n",
233 event2.Compare(event3))
234}
Explanation:
Vector Clock Properties:
-
Causality Tracking
- Each node maintains a clock
- Local events: increment own counter
- Send message: include vector clock
- Receive message: merge clocks, then increment
-
Happened-Before Relation
- Event A happened before B if: A's clock ≤ B's clock
- Events are concurrent if: neither A ≤ B nor B ≤ A
-
Properties
- If A → B, then VectorClock(A) < VectorClock(B)
- If VectorClock(A) < VectorClock(B), then A → B
- If VectorClock(A) || VectorClock(B), no causal relationship
Algorithm:
On local event at node N:
VectorClock[N]++
On send message from node N to node M:
VectorClock[N]++
Include VectorClock in message
On receive message at node M from node N:
For each node i:
VectorClock[i] = max(VectorClock[i], ReceivedClock[i])
VectorClock[M]++
Comparison Rules:
- A < B: All components A[i] ≤ B[i], and at least one A[i] < B[i]
- A > B: All components A[i] ≥ B[i], and at least one A[i] > B[i]
- A || B: Neither A < B nor A > B
Real-World Use Cases:
-
Distributed Databases
- Detect conflicts in multi-master replication
- Cassandra, Riak use vector clocks
- Resolve concurrent writes
-
Version Control
- Git uses similar concept
- Detect merge conflicts
- Track file version lineage
-
Message Ordering
- Ensure causal delivery of messages
- Prevent reading effects before causes
- Implement causal consistency
Advantages:
- Detect causality without synchronized clocks
- Identify concurrent operations
- No single point of failure
- Works with network partitions
Limitations:
- Space grows with number of nodes: O(N)
- Cannot garbage collect old entries easily
- Doesn't provide total ordering
- Not suitable for thousands of nodes
Optimizations:
- Dotted Version Vectors: Reduce size
- Interval Tree Clocks: Compact representation
- Pruning: Remove inactive nodes
Exercise 4: Distributed Lock Service
🎯 Learning Objectives:
- Implement distributed locking mechanisms for coordination
- Master lease-based locking and renewal strategies
- Learn to handle network partitions and split-brain scenarios
- Design lock acquisition with timeout and retry logic
🌍 Real-World Context:
Distributed locks are essential for coordinating access to shared resources across multiple nodes. Apache ZooKeeper and etcd provide distributed locking primitives used by systems like Kafka for leader election and by databases for coordinating schema migrations. Understanding distributed locks is crucial for building systems that need exclusive access to resources in a distributed environment, such as coordinating database migrations or managing access to limited resources.
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert
Build a distributed lock service with lease-based locking, automatic renewal, and proper handling of network partitions. Your implementation should include lock acquisition with timeouts, automatic lease renewal to prevent deadlock, proper lock release mechanisms, and strategies for detecting and handling split-brain scenarios where multiple nodes believe they hold the same lock.
Requirements:
- Implement lease-based locking with TTL
- Add automatic lease renewal mechanism
- Handle network partitions gracefully
- Implement lock fencing for safety
- Add metrics for lock acquisition/release tracking
Solution with Explanation
1// run
2package main
3
4import (
5 "context"
6 "errors"
7 "fmt"
8 "sync"
9 "time"
10
11 "github.com/google/uuid"
12)
13
14// DistributedLockService provides distributed locking capabilities
15type DistributedLockService struct {
16 mu sync.RWMutex
17 locks map[string]*LockEntry
18
19 // Configuration
20 defaultTTL time.Duration
21 clockSkew time.Duration
22}
23
24// LockEntry represents a held lock
25type LockEntry struct {
26 LockID string
27 OwnerID string
28 ExpiresAt time.Time
29 Version int64
30 CreatedAt time.Time
31 LastRenewed time.Time
32}
33
34// NewDistributedLockService creates a new lock service
35func NewDistributedLockService(defaultTTL time.Duration) *DistributedLockService {
36 return &DistributedLockService{
37 locks: make(map[string]*LockEntry),
38 defaultTTL: defaultTTL,
39 clockSkew: 100 * time.Millisecond, // Account for clock skew
40 }
41}
42
43// LockRequest represents a lock acquisition request
44type LockRequest struct {
45 ResourceID string
46 OwnerID string
47 TTL time.Duration // Optional, uses default if 0
48 WaitTime time.Duration // Max time to wait for lock
49 RetryDelay time.Duration // Delay between retries
50}
51
52// LockResult represents the result of a lock operation
53type LockResult struct {
54 LockID string
55 Acquired bool
56 ExpiresAt time.Time
57 Version int64
58}
59
60// AcquireLock attempts to acquire a distributed lock
61func AcquireLock(ctx context.Context, req LockRequest) {
62 if req.TTL == 0 {
63 req.TTL = dls.defaultTTL
64 }
65 if req.RetryDelay == 0 {
66 req.RetryDelay = 100 * time.Millisecond
67 }
68
69 lockID := uuid.New().String()
70 now := time.Now()
71 timeout := now.Add(req.WaitTime)
72
73 for {
74 dls.mu.Lock()
75
76 // Check if lock exists and is still valid
77 if existing, exists := dls.locks[req.ResourceID]; exists {
78 // Check if lock has expired
79 if existing.ExpiresAt.Add(dls.clockSkew).After(now) {
80 // Lock is still valid
81 dls.mu.Unlock()
82
83 // Check if we should retry
84 if time.Now().After(timeout) {
85 return nil, errors.New("lock acquisition timeout")
86 }
87
88 // Wait before retrying
89 select {
90 case <-time.After(req.RetryDelay):
91 continue
92 case <-ctx.Done():
93 return nil, ctx.Err()
94 }
95 }
96 // Lock has expired, we can acquire it
97 }
98
99 // Acquire the lock
100 entry := &LockEntry{
101 LockID: lockID,
102 OwnerID: req.OwnerID,
103 ExpiresAt: now.Add(req.TTL),
104 Version: time.Now().UnixNano(),
105 CreatedAt: now,
106 LastRenewed: now,
107 }
108
109 dls.locks[req.ResourceID] = entry
110 dls.mu.Unlock()
111
112 return &LockResult{
113 LockID: lockID,
114 Acquired: true,
115 ExpiresAt: entry.ExpiresAt,
116 Version: entry.Version,
117 }, nil
118 }
119}
120
121// RenewLock extends the lease of an existing lock
122func RenewLock(resourceID, lockID string, ttl time.Duration) error {
123 dls.mu.Lock()
124 defer dls.mu.Unlock()
125
126 entry, exists := dls.locks[resourceID]
127 if !exists {
128 return errors.New("lock not found")
129 }
130
131 if entry.LockID != lockID {
132 return errors.New("lock not owned by this client")
133 }
134
135 // Check if lock has already expired
136 if time.Now().After(entry.ExpiresAt) {
137 return errors.New("lock has expired")
138 }
139
140 // Renew the lock
141 entry.ExpiresAt = time.Now().Add(ttl)
142 entry.Version++
143 entry.LastRenewed = time.Now()
144
145 return nil
146}
147
148// ReleaseLock releases a distributed lock
149func ReleaseLock(resourceID, lockID string) error {
150 dls.mu.Lock()
151 defer dls.mu.Unlock()
152
153 entry, exists := dls.locks[resourceID]
154 if !exists {
155 return errors.New("lock not found")
156 }
157
158 if entry.LockID != lockID {
159 return errors.New("lock not owned by this client")
160 }
161
162 delete(dls.locks, resourceID)
163 return nil
164}
165
166// IsLocked checks if a resource is currently locked
167func IsLocked(resourceID string) bool {
168 dls.mu.RLock()
169 defer dls.mu.RUnlock()
170
171 entry, exists := dls.locks[resourceID]
172 if !exists {
173 return false
174 }
175
176 // Check if lock has expired
177 return time.Now().Before(entry.ExpiresAt.Add(dls.clockSkew))
178}
179
180// GetLockInfo returns information about a lock
181func GetLockInfo(resourceID string) {
182 dls.mu.RLock()
183 defer dls.mu.RUnlock()
184
185 entry, exists := dls.locks[resourceID]
186 if !exists {
187 return nil, errors.New("lock not found")
188 }
189
190 // Return a copy to prevent external modification
191 return &LockEntry{
192 LockID: entry.LockID,
193 OwnerID: entry.OwnerID,
194 ExpiresAt: entry.ExpiresAt,
195 Version: entry.Version,
196 CreatedAt: entry.CreatedAt,
197 LastRenewed: entry.LastRenewed,
198 }, nil
199}
200
201// CleanupExpiredLocks removes expired locks
202func CleanupExpiredLocks() {
203 dls.mu.Lock()
204 defer dls.mu.Unlock()
205
206 now := time.Now()
207 for resourceID, entry := range dls.locks {
208 if now.After(entry.ExpiresAt.Add(dls.clockSkew)) {
209 delete(dls.locks, resourceID)
210 }
211 }
212}
213
214// LockManager manages lock lifecycle with automatic renewal
215type LockManager struct {
216 service *DistributedLockService
217 ownerID string
218 locksHeld map[string]*ManagedLock
219 mu sync.RWMutex
220 stopChan chan struct{}
221}
222
223// ManagedLock represents a managed lock with automatic renewal
224type ManagedLock struct {
225 ResourceID string
226 LockID string
227 ExpiresAt time.Time
228 cancelRenew context.CancelFunc
229}
230
231// NewLockManager creates a new lock manager
232func NewLockManager(service *DistributedLockService, ownerID string) *LockManager {
233 lm := &LockManager{
234 service: service,
235 ownerID: ownerID,
236 locksHeld: make(map[string]*ManagedLock),
237 stopChan: make(chan struct{}),
238 }
239
240 // Start cleanup routine
241 go lm.cleanupRoutine()
242
243 return lm
244}
245
246// AcquireLock acquires a managed lock with automatic renewal
247func AcquireLock(ctx context.Context, resourceID string, ttl time.Duration) {
248 req := LockRequest{
249 ResourceID: resourceID,
250 OwnerID: lm.ownerID,
251 TTL: ttl,
252 WaitTime: 30 * time.Second,
253 RetryDelay: 100 * time.Millisecond,
254 }
255
256 result, err := lm.service.AcquireLock(ctx, req)
257 if err != nil {
258 return nil, err
259 }
260
261 if !result.Acquired {
262 return nil, errors.New("failed to acquire lock")
263 }
264
265 // Create managed lock with automatic renewal
266 renewCtx, cancelRenew := context.WithCancel(context.Background())
267 managedLock := &ManagedLock{
268 ResourceID: resourceID,
269 LockID: result.LockID,
270 ExpiresAt: result.ExpiresAt,
271 cancelRenew: cancelRenew,
272 }
273
274 // Start renewal routine
275 go lm.renewalRoutine(renewCtx, managedLock, ttl/3) // Renew at 1/3 of TTL
276
277 lm.mu.Lock()
278 lm.locksHeld[resourceID] = managedLock
279 lm.mu.Unlock()
280
281 return managedLock, nil
282}
283
284// ReleaseLock releases a managed lock
285func ReleaseLock(managedLock *ManagedLock) error {
286 // Stop renewal
287 managedLock.cancelRenew()
288
289 // Release the lock
290 err := lm.service.ReleaseLock(managedLock.ResourceID, managedLock.LockID)
291
292 // Remove from held locks
293 lm.mu.Lock()
294 delete(lm.locksHeld, managedLock.ResourceID)
295 lm.mu.Unlock()
296
297 return err
298}
299
300// renewalRoutine automatically renews locks
301func renewalRoutine(ctx context.Context, managedLock *ManagedLock, renewInterval time.Duration) {
302 ticker := time.NewTicker(renewInterval)
303 defer ticker.Stop()
304
305 for {
306 select {
307 case <-ctx.Done():
308 return
309 case <-ticker.C:
310 // Check if lock is still valid
311 info, err := lm.service.GetLockInfo(managedLock.ResourceID)
312 if err != nil || info.LockID != managedLock.LockID {
313 // Lock is no longer valid, stop renewal
314 return
315 }
316
317 // Renew the lock
318 err = lm.service.RenewLock(managedLock.ResourceID, managedLock.LockID, renewInterval*3)
319 if err != nil {
320 // Renewal failed, stop renewal
321 return
322 }
323
324 // Update expiration time
325 newInfo, _ := lm.service.GetLockInfo(managedLock.ResourceID)
326 if newInfo != nil {
327 managedLock.ExpiresAt = newInfo.ExpiresAt
328 }
329 }
330 }
331}
332
333// cleanupRoutine periodically cleans up expired locks
334func cleanupRoutine() {
335 ticker := time.NewTicker(10 * time.Second)
336 defer ticker.Stop()
337
338 for {
339 select {
340 case <-lm.stopChan:
341 return
342 case <-ticker.C:
343 lm.service.CleanupExpiredLocks()
344 }
345 }
346}
347
348// Stop stops the lock manager
349func Stop() {
350 close(lm.stopChan)
351
352 lm.mu.Lock()
353 defer lm.mu.Unlock()
354
355 // Release all held locks
356 for _, managedLock := range lm.locksHeld {
357 managedLock.cancelRenew()
358 lm.service.ReleaseLock(managedLock.ResourceID, managedLock.LockID)
359 }
360 lm.locksHeld = make(map[string]*ManagedLock)
361}
362
363// GetStats returns lock service statistics
364func GetStats() map[string]interface{} {
365 dls.mu.RLock()
366 defer dls.mu.RUnlock()
367
368 activeLocks := 0
369 expiredLocks := 0
370 now := time.Now()
371
372 for _, entry := range dls.locks {
373 if now.Before(entry.ExpiresAt) {
374 activeLocks++
375 } else {
376 expiredLocks++
377 }
378 }
379
380 return map[string]interface{}{
381 "total_locks": len(dls.locks),
382 "active_locks": activeLocks,
383 "expired_locks": expiredLocks,
384 "default_ttl": dls.defaultTTL,
385 }
386}
387
388func main() {
389 // Create distributed lock service
390 lockService := NewDistributedLockService(5 * time.Second)
391
392 // Create lock manager for a client
393 lockManager := NewLockManager(lockService, "client-123")
394 defer lockManager.Stop()
395
396 fmt.Println("=== Distributed Lock Service Demo ===\n")
397
398 // Scenario 1: Basic lock acquisition and release
399 fmt.Println("Scenario 1: Basic Lock Operations")
400 ctx := context.Background()
401
402 lock1, err := lockManager.AcquireLock(ctx, "resource-1", 3*time.Second)
403 if err != nil {
404 panic(err)
405 }
406 fmt.Printf("Acquired lock for resource-1: %s\n",
407 lock1.LockID, lock1.ExpiresAt.Format("15:04:05"))
408
409 // Try to acquire the same lock from another client
410 lockService2 := NewDistributedLockService(5 * time.Second)
411 req := LockRequest{
412 ResourceID: "resource-1",
413 OwnerID: "client-456",
414 TTL: 3 * time.Second,
415 WaitTime: 2 * time.Second,
416 }
417
418 result, err := lockService2.AcquireLock(ctx, req)
419 if err != nil {
420 fmt.Printf("Client-456 failed to acquire lock: %v\n", err)
421 } else if !result.Acquired {
422 fmt.Println("Client-456: Lock not available")
423 }
424
425 // Release the lock
426 err = lockManager.ReleaseLock(lock1)
427 if err != nil {
428 panic(err)
429 }
430 fmt.Println("Released lock for resource-1")
431
432 // Scenario 2: Lock expiration and reclamation
433 fmt.Println("\nScenario 2: Lock Expiration")
434 expiringLock, err := lockManager.AcquireLock(ctx, "resource-2", 1*time.Second)
435 if err != nil {
436 panic(err)
437 }
438 fmt.Printf("Acquired expiring lock: %s\n", expiringLock.LockID)
439
440 // Wait for expiration
441 time.Sleep(2 * time.Second)
442
443 // Now another client should be able to acquire it
444 result, err = lockService2.AcquireLock(ctx, req)
445 if err != nil {
446 panic(err)
447 }
448 if result.Acquired {
449 fmt.Printf("Client-456 acquired expired lock: %s\n", result.LockID)
450 }
451
452 // Scenario 3: Lock renewal
453 fmt.Println("\nScenario 3: Automatic Lock Renewal")
454 renewableLock, err := lockManager.AcquireLock(ctx, "resource-3", 2*time.Second)
455 if err != nil {
456 panic(err)
457 }
458 fmt.Printf("Acquired renewable lock: %s\n", renewableLock.LockID)
459
460 // Let it renew automatically
461 time.Sleep(4 * time.Second)
462
463 // Check if lock is still held
464 isLocked := lockService.IsLocked("resource-3")
465 fmt.Printf("Lock still held after 4 seconds: %v\n", isLocked)
466
467 // Print final statistics
468 fmt.Println("\n=== Lock Service Statistics ===")
469 stats := lockService.GetStats()
470 for k, v := range stats {
471 fmt.Printf("%s: %v\n", k, v)
472 }
473}
Explanation:
Distributed Lock Service Design:
-
Lease-Based Locking: Locks have TTL to prevent deadlocks. If a client crashes, the lock automatically expires.
-
Automatic Renewal: LockManager automatically renews locks before expiration to prevent accidental loss.
-
Clock Skew Handling: Accounts for network delays and clock differences between nodes.
-
Lock Verification: Clients must provide the lock ID to release or renew, preventing accidental releases.
-
Cleanup Mechanism: Periodic cleanup removes expired locks to prevent memory leaks.
Key Components:
-
DistributedLockService: Core lock management with acquire, renew, release operations.
-
LockManager: Higher-level abstraction with automatic renewal and lifecycle management.
-
ManagedLock: Represents a lock with automatic renewal capabilities.
Safety Features:
-
Fencing Tokens: Version numbers ensure operations are applied in correct order.
-
Expiration Handling: Automatic cleanup prevents permanent lock accumulation.
-
Retry Logic: Exponential backoff for lock acquisition attempts.
-
Context Support: Proper cancellation and timeout handling.
Real-World Considerations:
-
Split-Brain Prevention: Clock skew tolerance and lease expiration handle network partitions.
-
Performance: In-memory storage with efficient locking using sync.RWMutex.
-
Monitoring: Built-in metrics for lock acquisition, expiration, and renewal tracking.
-
Extensibility: Can be extended to use external storage for persistence.
Use Cases:
- Leader election in distributed systems
- Coordinating database schema migrations
- Managing access to limited resources
- Preventing concurrent execution of critical operations
Exercise 5: Consistent Hashing Ring
🎯 Learning Objectives:
- Implement consistent hashing for load distribution
- Master virtual nodes for improved key distribution
- Learn to handle node addition and removal with minimal data movement
- Design ring-based data partitioning systems
🌍 Real-World Context:
Consistent hashing is fundamental to distributed systems like Amazon DynamoDB and Cassandra for data partitioning. Content delivery networks use consistent hashing to cache content across edge servers efficiently. When you watch Netflix, consistent hashing ensures your request hits the same cache server that has the content you need, minimizing cache misses and improving performance.
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert
Build a consistent hashing ring implementation that distributes keys across nodes with minimal remapping when nodes are added or removed. Your implementation should include virtual nodes for better load distribution, support for weighted nodes, efficient key lookup operations, and mechanisms for handling node failures and additions gracefully.
Requirements:
- Implement the core consistent hashing ring algorithm
- Add virtual nodes for improved distribution
- Support weighted nodes
- Handle node addition/removal with minimal key movement
- Include load balancing metrics and statistics
Solution with Explanation
1// run
2package main
3
4import (
5 "crypto/sha1"
6 "fmt"
7 "sort"
8 "strconv"
9 "sync"
10)
11
12// Node represents a server in the consistent hash ring
13type Node struct {
14 ID string
15 Weight int // Higher weight = more virtual nodes
16 ActualID string // For virtual nodes, points to actual node
17}
18
19// ConsistentHashRing implements consistent hashing
20type ConsistentHashRing struct {
21 nodes map[uint32]*Node // Hash -> Node mapping
22 sortedKeys []uint32 // Sorted hash keys for binary search
23 nodeMap map[string]*Node // Node ID -> Node mapping
24 mu sync.RWMutex
25}
26
27// NewConsistentHashRing creates a new consistent hash ring
28func NewConsistentHashRing() *ConsistentHashRing {
29 return &ConsistentHashRing{
30 nodes: make(map[uint32]*Node),
31 nodeMap: make(map[string]*Node),
32 }
33}
34
35// AddNode adds a node to the hash ring
36func AddNode(nodeID string, weight int) {
37 chr.mu.Lock()
38 defer chr.mu.Unlock()
39
40 // Remove existing node if present
41 chr.removeNodeUnsafe(nodeID)
42
43 node := &Node{
44 ID: nodeID,
45 Weight: weight,
46 ActualID: nodeID,
47 }
48
49 // Add virtual nodes based on weight
50 virtualCount := weight
51 if virtualCount < 1 {
52 virtualCount = 1
53 }
54
55 for i := 0; i < virtualCount; i++ {
56 virtualNodeID := fmt.Sprintf("%s#%d", nodeID, i)
57 hash := chr.hashKey(virtualNodeID)
58
59 virtualNode := &Node{
60 ID: virtualNodeID,
61 Weight: weight,
62 ActualID: nodeID,
63 }
64
65 chr.nodes[hash] = virtualNode
66 chr.sortedKeys = append(chr.sortedKeys, hash)
67 }
68
69 chr.nodeMap[nodeID] = node
70 chr.sortKeysUnsafe()
71}
72
73// RemoveNode removes a node from the hash ring
74func RemoveNode(nodeID string) {
75 chr.mu.Lock()
76 defer chr.mu.Unlock()
77 chr.removeNodeUnsafe(nodeID)
78}
79
80// removeNodeUnsafe removes a node without locking
81func removeNodeUnsafe(nodeID string) {
82 node, exists := chr.nodeMap[nodeID]
83 if !exists {
84 return
85 }
86
87 // Remove all virtual nodes for this actual node
88 virtualCount := node.Weight
89 if virtualCount < 1 {
90 virtualCount = 1
91 }
92
93 for i := 0; i < virtualCount; i++ {
94 virtualNodeID := fmt.Sprintf("%s#%d", nodeID, i)
95 hash := chr.hashKey(virtualNodeID)
96 delete(chr.nodes, hash)
97
98 // Remove from sorted keys
99 for j, key := range chr.sortedKeys {
100 if key == hash {
101 chr.sortedKeys = append(chr.sortedKeys[:j], chr.sortedKeys[j+1:]...)
102 break
103 }
104 }
105 }
106
107 delete(chr.nodeMap, nodeID)
108}
109
110// GetNode returns the node responsible for the given key
111func GetNode(key string) *Node {
112 chr.mu.RLock()
113 defer chr.mu.RUnlock()
114
115 if len(chr.nodes) == 0 {
116 return nil
117 }
118
119 hash := chr.hashKey(key)
120
121 // Find the first node with hash >= key hash
122 idx := chr.searchKey(hash)
123
124 // If no node found, wrap around to the first node
125 if idx == len(chr.sortedKeys) {
126 idx = 0
127 }
128
129 virtualNode := chr.nodes[chr.sortedKeys[idx]]
130
131 // Return the actual node
132 return chr.nodeMap[virtualNode.ActualID]
133}
134
135// GetNodes returns the N nodes responsible for the given key
136func GetNodes(key string, count int) []*Node {
137 chr.mu.RLock()
138 defer chr.mu.RUnlock()
139
140 if len(chr.nodes) == 0 {
141 return nil
142 }
143
144 if count <= 0 {
145 return nil
146 }
147
148 hash := chr.hashKey(key)
149 idx := chr.searchKey(hash)
150
151 if idx == len(chr.sortedKeys) {
152 idx = 0
153 }
154
155 result := make([]*Node, 0, count)
156 seen := make(map[string]bool)
157
158 // Collect N unique actual nodes
159 for len(result) < count && len(seen) < len(chr.nodeMap) {
160 if idx >= len(chr.sortedKeys) {
161 idx = 0
162 }
163
164 virtualNode := chr.nodes[chr.sortedKeys[idx]]
165 actualNode := chr.nodeMap[virtualNode.ActualID]
166
167 if !seen[actualNode.ID] {
168 result = append(result, actualNode)
169 seen[actualNode.ID] = true
170 }
171
172 idx++
173 }
174
175 return result
176}
177
178// hashKey computes SHA1 hash of a key
179func hashKey(key string) uint32 {
180 hash := sha1.Sum([]byte(key))
181
182 // Use first 4 bytes of SHA1 hash
183 return uint32(hash[0])<<24 | uint32(hash[1])<<16 | uint32(hash[2])<<8 | uint32(hash[3])
184}
185
186// sortKeysUnsafe sorts the hash keys
187func sortKeysUnsafe() {
188 sort.Slice(chr.sortedKeys, func(i, j int) bool {
189 return chr.sortedKeys[i] < chr.sortedKeys[j]
190 })
191}
192
193// searchKey finds the index of the first key >= target
194func searchKey(target uint32) int {
195 return sort.Search(len(chr.sortedKeys), func(i int) bool {
196 return chr.sortedKeys[i] >= target
197 })
198}
199
200// GetNodes returns all nodes in the ring
201func GetNodes() []*Node {
202 chr.mu.RLock()
203 defer chr.mu.RUnlock()
204
205 result := make([]*Node, 0, len(chr.nodeMap))
206 for _, node := range chr.nodeMap {
207 result = append(result, node)
208 }
209
210 return result
211}
212
213// GetNodeCount returns the number of nodes in the ring
214func GetNodeCount() int {
215 chr.mu.RLock()
216 defer chr.mu.RUnlock()
217 return len(chr.nodeMap)
218}
219
220// GetDistribution returns the distribution of keys across nodes
221func GetDistribution(sampleKeys []string) map[string]int {
222 chr.mu.RLock()
223 defer chr.mu.RUnlock()
224
225 distribution := make(map[string]int)
226
227 for _, key := range sampleKeys {
228 node := chr.GetNode(key)
229 if node != nil {
230 distribution[node.ID]++
231 }
232 }
233
234 return distribution
235}
236
237// Stats returns statistics about the hash ring
238func Stats() map[string]interface{} {
239 chr.mu.RLock()
240 defer chr.mu.RUnlock()
241
242 weightSum := 0
243 for _, node := range chr.nodeMap {
244 weightSum += node.Weight
245 }
246
247 return map[string]interface{}{
248 "actual_nodes": len(chr.nodeMap),
249 "virtual_nodes": len(chr.nodes),
250 "total_weight": weightSum,
251 "average_weight": float64(weightSum) / float64(len(chr.nodeMap)),
252 }
253}
254
255// LoadBalancer demonstrates using consistent hashing for load balancing
256type LoadBalancer struct {
257 ring *ConsistentHashRing
258}
259
260// NewLoadBalancer creates a new load balancer
261func NewLoadBalancer() *LoadBalancer {
262 return &LoadBalancer{
263 ring: NewConsistentHashRing(),
264 }
265}
266
267// AddServer adds a server to the load balancer
268func AddServer(serverID string, capacity int) {
269 lb.ring.AddNode(serverID, capacity)
270}
271
272// RemoveServer removes a server from the load balancer
273func RemoveServer(serverID string) {
274 lb.ring.RemoveNode(serverID)
275}
276
277// RouteRequest routes a request to the appropriate server
278func RouteRequest(requestID string) string {
279 node := lb.ring.GetNode(requestID)
280 if node == nil {
281 return ""
282 }
283 return node.ID
284}
285
286// RouteForReplication routes a request to multiple servers for replication
287func RouteForReplication(requestID string, replicationFactor int) []string {
288 nodes := lb.ring.GetNodes(requestID, replicationFactor)
289 serverIDs := make([]string, len(nodes))
290 for i, node := range nodes {
291 serverIDs[i] = node.ID
292 }
293 return serverIDs
294}
295
296func main() {
297 fmt.Println("=== Consistent Hashing Ring Demo ===\n")
298
299 // Create a consistent hash ring
300 ring := NewConsistentHashRing()
301
302 // Add nodes with different weights
303 nodes := []struct {
304 id string
305 weight int
306 }{
307 {"server-a", 3}, // Higher capacity
308 {"server-b", 2}, // Medium capacity
309 {"server-c", 1}, // Lower capacity
310 {"server-d", 2},
311 {"server-e", 1},
312 }
313
314 fmt.Println("Adding nodes to the ring:")
315 for _, node := range nodes {
316 ring.AddNode(node.id, node.weight)
317 fmt.Printf(" Added %s\n", node.id, node.weight)
318 }
319
320 // Print ring statistics
321 stats := ring.Stats()
322 fmt.Printf("\nRing Statistics: %+v\n\n", stats)
323
324 // Test key distribution
325 fmt.Println("Testing key distribution:")
326 testKeys := []string{
327 "user-123", "user-456", "user-789", "user-abc", "user-def",
328 "order-1", "order-2", "order-3", "order-4", "order-5",
329 "product-a", "product-b", "product-c", "product-d", "product-e",
330 "session-x", "session-y", "session-z", "session-1", "session-2",
331 }
332
333 distribution := ring.GetDistribution(testKeys)
334 for serverID, count := range distribution {
335 fmt.Printf(" %s: %d keys\n", serverID, count, float64(count)/float64(len(testKeys))*100)
336 }
337
338 // Test replication
339 fmt.Println("\nTesting replication:")
340 for _, key := range testKeys[:5] {
341 nodes := ring.GetNodes(key, 3)
342 nodeIDs := make([]string, len(nodes))
343 for i, node := range nodes {
344 nodeIDs[i] = node.ID
345 }
346 fmt.Printf(" %s -> %v\n", key, nodeIDs)
347 }
348
349 // Test node removal and key remapping
350 fmt.Println("\nTesting node removal:")
351 removedKey := testKeys[0]
352 originalNode := ring.GetNode(removedKey)
353 fmt.Printf("Key '%s' initially maps to: %s\n", removedKey, originalNode.ID)
354
355 // Remove a node
356 ring.RemoveNode("server-c")
357 fmt.Println("Removed server-c")
358
359 // Check where the key maps now
360 newNode := ring.GetNode(removedKey)
361 fmt.Printf("Key '%s' now maps to: %s\n", removedKey, newNode.ID)
362
363 // Calculate key movement
364 movement := 0
365 for _, key := range testKeys {
366 originalMapping := ring.GetNode(key) // This would need to be tracked properly
367 // For demo, we're just showing current state
368 }
369
370 // Demonstrate load balancer usage
371 fmt.Println("\n=== Load Balancer Demo ===")
372 lb := NewLoadBalancer()
373
374 // Add servers with different capacities
375 lb.AddServer("web-server-1", 3)
376 lb.AddServer("web-server-2", 2)
377 lb.AddServer("web-server-3", 1)
378
379 // Route requests
380 requests := []string{
381 "req-001", "req-002", "req-003", "req-004", "req-005",
382 "req-006", "req-007", "req-008", "req-009", "req-010",
383 }
384
385 fmt.Println("Routing requests:")
386 for _, req := range requests {
387 server := lb.RouteRequest(req)
388 fmt.Printf(" %s -> %s\n", req, server)
389 }
390
391 // Show replication routing
392 fmt.Println("\nReplication routing:")
393 for _, req := range requests[:3] {
394 servers := lb.RouteForReplication(req, 2)
395 fmt.Printf(" %s -> %v\n", req, servers)
396 }
397
398 // Test load balancing with many requests
399 fmt.Println("\nLoad distribution with 1000 requests:")
400 requestCount := 1000
401 requestDist := make(map[string]int)
402
403 for i := 0; i < requestCount; i++ {
404 reqID := "req-" + strconv.Itoa(i)
405 server := lb.RouteRequest(reqID)
406 requestDist[server]++
407 }
408
409 for server, count := range requestDist {
410 percentage := float64(count) / float64(requestCount) * 100
411 fmt.Printf(" %s: %d requests\n", server, count, percentage)
412 }
413
414 // Final statistics
415 fmt.Println("\n=== Final Ring Statistics ===")
416 finalStats := ring.Stats()
417 for k, v := range finalStats {
418 fmt.Printf("%s: %v\n", k, v)
419 }
420}
Explanation:
Consistent Hashing Algorithm:
-
Ring Structure: Nodes and keys are placed on a virtual ring using hash values.
-
Virtual Nodes: Each physical node has multiple virtual nodes for better distribution.
-
Key Lookup: Find the first node clockwise from the key's hash position.
-
Minimal Remapping: When nodes are added/removed, only keys in affected ranges are remapped.
Key Components:
-
ConsistentHashRing: Core implementation with ring management and key lookup.
-
Node Structure: Represents both physical and virtual nodes with weights.
-
LoadBalancer: Higher-level abstraction demonstrating practical usage.
Algorithm Details:
-
Hash Function: Uses SHA1 for uniform distribution of keys and nodes.
-
Binary Search: Efficient key lookup with O(log N) complexity.
-
Weight Distribution: Higher weight nodes get more virtual nodes, proportionally more keys.
-
Replication Support: Get multiple nodes for the same key for data redundancy.
Load Balancing Benefits:
-
Uniform Distribution: Virtual nodes ensure even key distribution.
-
Scalability: Adding/removing nodes affects minimal key ranges.
-
Fault Tolerance: Node failures only affect keys mapped to that node.
-
Weighted Routing: Higher capacity servers handle proportionally more requests.
Real-World Applications:
-
Distributed Caches: Memcached, Redis clusters for data partitioning.
-
Databases: Cassandra, DynamoDB for data distribution across nodes.
-
CDNs: Content distribution across edge servers.
-
Load Balancers: Request routing to backend servers.
Advantages:
- Scalability: Easy to add/remove nodes
- Performance: O(log N) lookup time
- Flexibility: Supports weighted nodes
- Reliability: Minimal disruption during topology changes
Use Cases:
- Session affinity in web applications
- Cache key distribution
- Database sharding
- Content distribution networks
Summary
In this comprehensive tutorial, you've mastered the essential patterns and techniques for building production-ready distributed systems in Go. From understanding the fundamental challenges posed by the CAP theorem to implementing sophisticated patterns like circuit breakers, consensus algorithms, and distributed locks, you now have the tools to design resilient systems that operate at scale.
Key Concepts Mastered:
-
CAP Theorem and Trade-offs: Understanding that consistency, availability, and partition tolerance cannot all be achieved simultaneously, and making informed architectural decisions based on business requirements.
-
Service Discovery: Implementing dynamic service registration and health checking using patterns that enable microservices to find and communicate with each other reliably.
-
Circuit Breaker Pattern: Building fault-tolerant systems that fail fast and gracefully degrade when dependencies are unhealthy, preventing cascade failures across distributed services.
-
Raft Consensus: Implementing strong consistency through distributed consensus, enabling leader election and replicated state machines that survive node failures.
-
Distributed Locks: Coordinating exclusive access to resources across multiple processes using lock leasing, timeouts, and fencing tokens to prevent split-brain scenarios.
-
Consistent Hashing: Distributing load evenly across nodes while minimizing key remapping during topology changes, essential for caching systems and databases.
Production-Ready Patterns:
- Always implement timeouts and retry logic with exponential backoff
- Use idempotency keys to make operations safe to retry
- Apply circuit breakers to protect against cascade failures
- Design for eventual consistency when strong consistency isn't required
- Implement comprehensive observability with logs, metrics, and traces
- Deploy changes gradually using feature flags and canary deployments
- Plan for disaster recovery with multi-region failover strategies
When to Apply These Patterns:
Distributed systems patterns solve real problems but add significant complexity. Use them when:
- Scale requirements exceed single-machine capabilities
- High availability is critical to your business
- You need to coordinate state across multiple services
- Network partitions and failures are expected
However, remember that many systems don't need these patterns. Start with a well-designed monolith and adopt distributed systems patterns only when the business value justifies the added operational complexity.
Next Steps:
To deepen your expertise:
- Practice the exercises to solidify your understanding
- Study the referenced papers and books for theoretical foundations
- Experiment with production tools like etcd, Consul, and Istio
- Apply chaos engineering to test your systems' resilience
- Learn from production incidents and document lessons learned
Distributed systems engineering is a journey of continuous learning. The patterns you've learned here are foundational, but each production system will teach you new lessons about scale, reliability, and the inevitable surprises that come with coordinating multiple computers across unreliable networks.
Build systems that embrace failure, monitor everything, and always ask: "Does the complexity justify the benefits?" Your future self - and your operations team - will thank you.