Distributed Systems Patterns

Learning Objectives

By the end of this tutorial, you will be able to:

  • Design resilient distributed systems using proven patterns like circuit breakers and sagas
  • Implement consensus algorithms for consistent distributed state management
  • Build fault-tolerant services that survive node failures and network partitions
  • Apply CAP theorem trade-offs to make informed architecture decisions
  • Create service mesh patterns for microservice communication and observability
  • Handle distributed transactions using compensation and event sourcing patterns

Hook - The Global Scale Challenge

Imagine running a global restaurant chain with hundreds of locations. Each restaurant needs to coordinate with others, share information, and continue operating even when some locations face problems. This is exactly what distributed systems do for software - they help multiple computers work together as a team.

Real-World Impact:

  • Netflix processes 7 billion requests/day using circuit breakers and bulkhead patterns
  • Uber uses sagas to coordinate rides across 50+ microservices
  • Kubernetes depends on etcd consensus for cluster state management

Modern applications rarely run on a single machine. Whether you're building microservices, handling millions of users, or processing terabytes of data, you need distributed systems patterns. Go's concurrency primitives, strong standard library, and compiled nature make it ideal for distributed systems.

Core Concepts - Distributed Systems Challenges

Real-World Examples

Netflix - Resilience at scale:

1// Circuit breaker prevents cascade failures
2// Before: One failing service crashes entire system
3// After: Circuit breaker isolates failures, 99.99% uptime

Netflix processes 7 billion requests/day using circuit breakers and bulkhead patterns. When one recommendation service fails, users can still stream movies.

Uber - Consistent data across microservices:

1// Saga pattern for distributed transactions
2// Before: Inconsistent data across services
3// After: Eventual consistency with compensation

Uber uses sagas to coordinate rides across 50+ microservices. When payment fails after ride completion, the system automatically cancels the ride and processes refunds.

Kubernetes - Distributed consensus:

1// etcd uses Raft consensus for cluster state
2// Before: Split-brain scenarios, data loss
3// After: Strong consistency, automatic leader election

Every Kubernetes cluster depends on distributed consensus. When the master node fails, etcd ensures a new leader is elected without losing cluster state.

💡 Key Takeaway: Distributed systems patterns aren't just theoretical concepts - they solve real business problems of scale, reliability, and performance that affect millions of users daily.

Distributed Systems Challenges

Think of distributed systems like managing a team spread across different offices. Each person can have their own opinion, communication can fail, and not everyone receives messages at the same time. This creates unique challenges you never see in single-machine applications.

The CAP Theorem - You can have only 2 of 3:

  • Consistency: All nodes see the same data simultaneously
  • Availability: Every request gets a response
  • Partition Tolerance: System works despite network failures

⚠️ Important: Network partitions WILL happen in distributed systems. You must choose between consistency and availability during those partitions.

┌─────────────────────────────────┐
│        CAP THEOREM              │
│                                 │
│          Consistency            │
│              /\                 │
│             /  \                │
│            /    \               │
│           /  CA  \              │
│          /        \             │
│         /          \            │
│        /   Choose   \           │
│       /     Any 2    \          │
│      /                \         │
│     /                  \        │
│    /_____________________\      │
│  Availability   Partition       │
│                 Tolerance       │
└─────────────────────────────────┘

CA: Traditional RDBMS
CP: etcd, Consul, ZooKeeper
AP: Cassandra, DynamoDB, Riak

Common Distributed Systems Problems:

  1. Network failures - Messages lost, delayed, or duplicated
  2. Partial failures - Some nodes work while others fail
  3. Clock skew - Time differs across machines
  4. Concurrency - Race conditions at scale
  5. Data consistency - Keeping replicas in sync
  6. Cascading failures - One failure triggers others

When to Use Distributed Patterns

✅ Use Distributed Patterns For:

  • High availability - 99.9%+ uptime requirements
  • Scalability - Handle millions of requests
  • Fault tolerance - Survive node failures
  • Geographic distribution - Serve users globally with low latency
  • Data replication - Backup and disaster recovery
  • Microservices - Decouple services for independent development

❌ Not Needed For:

  • Single server apps - Monoliths under light load
  • Prototypes - Premature distribution adds complexity
  • Low traffic - < 100 req/sec can run on one box
  • Simple CRUD - Standard database is enough

Real-World Example:
When Twitter started, they used a single database. As they grew to millions of users, they had to adopt distributed patterns. But they didn't start there - they evolved to distributed systems when the business need became clear.

Decision Tree:

Can your system run on one machine?
├─ Yes → Do you need high availability?
│   ├─ Yes → ✅ USE DISTRIBUTED PATTERNS
│   └─ No → ❌ KEEP IT SIMPLE
└─ No → Is network partitioning likely?
    ├─ Yes → ✅ USE DISTRIBUTED PATTERNS
    └─ No → Do you have multiple datacenters?
        ├─ Yes → ✅ USE DISTRIBUTED PATTERNS
        └─ No → ⚠️ START SIMPLE, ADD AS NEEDED

Now let's dive into the core patterns that solve these challenges. We'll start with understanding the fundamental tradeoffs in distributed systems.

CAP Theorem and Consistency Models

Understanding CAP in Practice

Consider managing a chain of coffee shops. When a customer buys coffee, you need to decide how to handle inventory updates. This is exactly the CAP tradeoff in action:

  • CP: If communication fails between shops, stop selling coffee until connectivity is restored. Better to say "out of stock" than sell something you can't track.
  • AP: Always accept sales, even if shops can't communicate. You might oversell, but customers can always buy coffee.

Let's see this in code:

 1package main
 2
 3import (
 4    "fmt"
 5    "sync"
 6    "time"
 7)
 8
 9// Example: Different consistency models
10
11// CP System: Consistent + Partition-tolerant
12type CPStore struct {
13    mu    sync.RWMutex
14    data  map[string]string
15    alive bool
16}
17
18func Write(key, value string) error {
19    s.mu.Lock()
20    defer s.mu.Unlock()
21
22    if !s.alive {
23        return fmt.Errorf("node unavailable")
24    }
25
26    s.data[key] = value
27    return nil
28}
29
30func Read(key string) {
31    s.mu.RLock()
32    defer s.mu.RUnlock()
33
34    if !s.alive {
35        return "", fmt.Errorf("node unavailable")
36    }
37
38    return s.data[key], nil
39}
40
41// AP System: Available + Partition-tolerant
42type APStore struct {
43    mu         sync.RWMutex
44    data       map[string]string
45    vectorClock map[string]int
46}
47
48func Write(key, value string) error {
49    s.mu.Lock()
50    defer s.mu.Unlock()
51
52    // Always accept writes
53    s.data[key] = value
54    s.vectorClock[key]++
55    return nil
56}
57
58func Read(key string) {
59    s.mu.RLock()
60    defer s.mu.RUnlock()
61
62    // Always return data
63    return s.data[key], nil
64}
65
66func main() {
67    // CP System: Prioritizes consistency over availability
68    cpStore := &CPStore{
69        data:  make(map[string]string),
70        alive: true,
71    }
72
73    cpStore.Write("user:1", "Alice")
74    fmt.Println("CP Store:", cpStore.data)
75
76    // Simulate partition
77    cpStore.alive = false
78    err := cpStore.Write("user:2", "Bob")
79    fmt.Printf("CP Write during partition: %v\n", err) // Error!
80
81    // AP System: Prioritizes availability over consistency
82    apStore := &APStore{
83        data:        make(map[string]string),
84        vectorClock: make(map[string]int),
85    }
86
87    apStore.Write("user:1", "Alice")
88    fmt.Println("AP Store:", apStore.data)
89
90    // Always accepts writes
91    apStore.Write("user:2", "Bob")
92    fmt.Printf("AP Write during partition: Success\n")
93}

Real-World Examples:

  • Banking: When ATM networks are down, banks block transactions rather than risk incorrect balances
  • Social Media: Facebook shows you content even if some servers are down - better to show slightly outdated content than nothing
  • E-commerce: Amazon accepts orders even if inventory systems are partitioned, but payment processing remains consistent

Consistency Models

Think about consistency models like updating contact information across multiple devices. How quickly should changes propagate? This depends on your needs:

  • Strong Consistency: Like editing a Google Doc together - everyone sees changes instantly
  • Eventual Consistency: Like syncing photos to the cloud - they appear on all devices eventually
  • Causal Consistency: Like commenting on a social media post - replies appear after the original post
  1package main
  2
  3import (
  4    "fmt"
  5    "sync"
  6    "time"
  7)
  8
  9// Strong Consistency: All reads see latest write immediately
 10type StronglyConsistent struct {
 11    mu    sync.RWMutex
 12    data  map[string]string
 13    nodes []*StronglyConsistent
 14}
 15
 16func Write(key, value string) error {
 17    sc.mu.Lock()
 18    defer sc.mu.Unlock()
 19
 20    // Write to all replicas synchronously
 21    for _, node := range sc.nodes {
 22        node.mu.Lock()
 23        node.data[key] = value
 24        node.mu.Unlock()
 25    }
 26
 27    sc.data[key] = value
 28    return nil
 29}
 30
 31// Eventual Consistency: Reads may return stale data
 32type EventuallyConsistent struct {
 33    mu   sync.RWMutex
 34    data map[string]string
 35}
 36
 37func Write(key, value string) {
 38    ec.mu.Lock()
 39    defer ec.mu.Unlock()
 40    ec.data[key] = value
 41}
 42
 43func AsyncReplicate(target *EventuallyConsistent, key, value string) {
 44    // Replicate asynchronously
 45    go func() {
 46        time.Sleep(10 * time.Millisecond) // Simulate network delay
 47        target.mu.Lock()
 48        target.data[key] = value
 49        target.mu.Unlock()
 50    }()
 51}
 52
 53// Causal Consistency: Causally related writes seen in order
 54type CausallyConsistent struct {
 55    mu          sync.RWMutex
 56    data        map[string]string
 57    vectorClock map[string]int
 58}
 59
 60func Write(key, value string, dependencies map[string]int) {
 61    cc.mu.Lock()
 62    defer cc.mu.Unlock()
 63
 64    // Wait for dependencies to be satisfied
 65    for depKey, depVersion := range dependencies {
 66        for cc.vectorClock[depKey] < depVersion {
 67            cc.mu.Unlock()
 68            time.Sleep(1 * time.Millisecond)
 69            cc.mu.Lock()
 70        }
 71    }
 72
 73    cc.data[key] = value
 74    cc.vectorClock[key]++
 75}
 76
 77func main() {
 78    fmt.Println("Consistency Models Demo")
 79
 80    // Strong Consistency
 81    strong := &StronglyConsistent{data: make(map[string]string)}
 82    strong.Write("key", "value")
 83    fmt.Println("Strong:", strong.data) // Always up-to-date
 84
 85    // Eventual Consistency
 86    eventual1 := &EventuallyConsistent{data: make(map[string]string)}
 87    eventual2 := &EventuallyConsistent{data: make(map[string]string)}
 88    eventual1.Write("key", "value")
 89    eventual1.AsyncReplicate(eventual2, "key", "value")
 90    fmt.Println("Eventual:", eventual1.data)
 91    time.Sleep(20 * time.Millisecond)
 92    fmt.Println("Eventual:", eventual2.data)
 93
 94    // Causal Consistency
 95    causal := &CausallyConsistent{
 96        data:        make(map[string]string),
 97        vectorClock: make(map[string]int),
 98    }
 99    causal.Write("a", "1", nil)
100    causal.Write("b", "2", map[string]int{"a": 1}) // b depends on a
101    fmt.Println("Causal:", causal.data)
102}

When to Use Each Consistency Model:

💡 Strong Consistency: Use when data correctness is critical

  • Financial transactions
  • Inventory management
  • Configuration settings

💡 Eventual Consistency: Use when availability is more important than immediacy

  • Social media feeds
  • Analytics data
  • Cache systems

💡 Causal Consistency: Use when operation order matters

  • Collaborative editing
  • Social media
  • Workflow systems

Now that we understand the fundamental tradeoffs, let's explore practical patterns for building distributed systems. We'll start with service discovery - how services find each other in a distributed environment.

Service Discovery

At a huge conference center with hundreds of rooms, how do you find which room has the session you want to attend? You look at the directory board that shows all sessions and their locations. This is exactly what service discovery does for microservices.

Basic Service Registry

Real-World Example: Netflix's Eureka service registry helps thousands of microservices find each other. When a new instance starts up, it registers itself. When it shuts down gracefully, it deregisters. And crucially, if it crashes, the registry detects the failure through missing heartbeats.

  1package main
  2
  3import (
  4    "errors"
  5    "fmt"
  6    "sync"
  7    "time"
  8)
  9
 10// ServiceInstance represents a single service instance
 11type ServiceInstance struct {
 12    ID       string
 13    Name     string
 14    Host     string
 15    Port     int
 16    Healthy  bool
 17    LastSeen time.Time
 18}
 19
 20// ServiceRegistry manages service discovery
 21type ServiceRegistry struct {
 22    mu        sync.RWMutex
 23    services  map[string][]*ServiceInstance // name -> instances
 24    heartbeat time.Duration
 25}
 26
 27func NewServiceRegistry(heartbeat time.Duration) *ServiceRegistry {
 28    sr := &ServiceRegistry{
 29        services:  make(map[string][]*ServiceInstance),
 30        heartbeat: heartbeat,
 31    }
 32
 33    // Background health checker
 34    go sr.healthCheck()
 35
 36    return sr
 37}
 38
 39// Register adds a service instance
 40func Register(instance *ServiceInstance) {
 41    sr.mu.Lock()
 42    defer sr.mu.Unlock()
 43
 44    instance.Healthy = true
 45    instance.LastSeen = time.Now()
 46
 47    sr.services[instance.Name] = append(sr.services[instance.Name], instance)
 48}
 49
 50// Deregister removes a service instance
 51func Deregister(name, id string) {
 52    sr.mu.Lock()
 53    defer sr.mu.Unlock()
 54
 55    instances := sr.services[name]
 56    for i, instance := range instances {
 57        if instance.ID == id {
 58            sr.services[name] = append(instances[:i], instances[i+1:]...)
 59            break
 60        }
 61    }
 62}
 63
 64// Discover finds healthy instances of a service
 65func Discover(name string) {
 66    sr.mu.RLock()
 67    defer sr.mu.RUnlock()
 68
 69    instances := sr.services[name]
 70    if len(instances) == 0 {
 71        return nil, errors.New("service not found")
 72    }
 73
 74    // Return only healthy instances
 75    var healthy []*ServiceInstance
 76    for _, instance := range instances {
 77        if instance.Healthy {
 78            healthy = append(healthy, instance)
 79        }
 80    }
 81
 82    if len(healthy) == 0 {
 83        return nil, errors.New("no healthy instances")
 84    }
 85
 86    return healthy, nil
 87}
 88
 89// Heartbeat updates instance health
 90func Heartbeat(name, id string) error {
 91    sr.mu.Lock()
 92    defer sr.mu.Unlock()
 93
 94    instances := sr.services[name]
 95    for _, instance := range instances {
 96        if instance.ID == id {
 97            instance.LastSeen = time.Now()
 98            instance.Healthy = true
 99            return nil
100        }
101    }
102
103    return errors.New("instance not found")
104}
105
106// healthCheck marks instances unhealthy if no heartbeat
107func healthCheck() {
108    ticker := time.NewTicker(sr.heartbeat / 2)
109    defer ticker.Stop()
110
111    for range ticker.C {
112        sr.mu.Lock()
113        now := time.Now()
114
115        for _, instances := range sr.services {
116            for _, instance := range instances {
117                if now.Sub(instance.LastSeen) > sr.heartbeat {
118                    instance.Healthy = false
119                }
120            }
121        }
122
123        sr.mu.Unlock()
124    }
125}
126
127func main() {
128    registry := NewServiceRegistry(5 * time.Second)
129
130    // Register service instances
131    registry.Register(&ServiceInstance{
132        ID:   "api-1",
133        Name: "api-service",
134        Host: "10.0.0.1",
135        Port: 8080,
136    })
137
138    registry.Register(&ServiceInstance{
139        ID:   "api-2",
140        Name: "api-service",
141        Host: "10.0.0.2",
142        Port: 8080,
143    })
144
145    // Discover service
146    instances, err := registry.Discover("api-service")
147    if err != nil {
148        panic(err)
149    }
150
151    fmt.Printf("Found %d instances:\n", len(instances))
152    for _, inst := range instances {
153        fmt.Printf("  %s: %s:%d\n",
154            inst.ID, inst.Host, inst.Port, inst.Healthy)
155    }
156
157    // Simulate heartbeats
158    go func() {
159        ticker := time.NewTicker(2 * time.Second)
160        defer ticker.Stop()
161        for range ticker.C {
162            registry.Heartbeat("api-service", "api-1")
163            registry.Heartbeat("api-service", "api-2")
164        }
165    }()
166
167    time.Sleep(10 * time.Second)
168}

⚠️ Common Pitfalls in Service Discovery:

  • Split brain: Multiple nodes think they're the leader
  • Stale instances: Registry keeps dead services
  • Network partitions: Services can't reach the registry
  • Registration storms: All services restart at once

Now let's look at load balancing strategies - once you discover services, how do you distribute requests among them?

Load Balancing Strategies

Think of load balancing like managing checkout lines at a busy supermarket. You need to distribute customers among cashiers efficiently. Some cashiers are faster, some customers have loyalty cards and always use the same cashier, and sometimes you just send people to the shortest line.

When to Use Each Strategy:

💡 Round-Robin: Great for equal-capacity servers with similar request patterns

  • Web servers serving static content
  • API gateways with uniform endpoint performance
  • Load testing environments

💡 Weighted Round-Robin: Perfect when servers have different capacities

  • Mixed infrastructure
  • Gradual migration from old to new hardware
  • Cloud environments with different instance sizes

💡 Consistent Hashing: Essential for maintaining session state

  • Shopping carts
  • Caching systems
  • Database sharding
  1package main
  2
  3import (
  4    "fmt"
  5    "hash/fnv"
  6    "math/rand"
  7    "sync/atomic"
  8)
  9
 10// LoadBalancer selects service instances
 11type LoadBalancer interface {
 12    Next(instances []*ServiceInstance) *ServiceInstance
 13}
 14
 15// RoundRobin load balancer
 16type RoundRobin struct {
 17    counter atomic.Uint64
 18}
 19
 20func Next(instances []*ServiceInstance) *ServiceInstance {
 21    if len(instances) == 0 {
 22        return nil
 23    }
 24
 25    index := rr.counter.Add(1) % uint64(len(instances))
 26    return instances[index]
 27}
 28
 29// Random load balancer
 30type Random struct{}
 31
 32func Next(instances []*ServiceInstance) *ServiceInstance {
 33    if len(instances) == 0 {
 34        return nil
 35    }
 36
 37    return instances[rand.Intn(len(instances))]
 38}
 39
 40// Weighted round-robin
 41type WeightedRoundRobin struct {
 42    weights map[string]int
 43    counter atomic.Uint64
 44}
 45
 46func NewWeightedRoundRobin() *WeightedRoundRobin {
 47    return &WeightedRoundRobin{
 48        weights: make(map[string]int),
 49    }
 50}
 51
 52func SetWeight(id string, weight int) {
 53    wrr.weights[id] = weight
 54}
 55
 56func Next(instances []*ServiceInstance) *ServiceInstance {
 57    if len(instances) == 0 {
 58        return nil
 59    }
 60
 61    // Simple weighted selection
 62    totalWeight := 0
 63    for _, inst := range instances {
 64        weight := wrr.weights[inst.ID]
 65        if weight == 0 {
 66            weight = 1
 67        }
 68        totalWeight += weight
 69    }
 70
 71    target := int(wrr.counter.Add(1) % uint64(totalWeight))
 72    cumulative := 0
 73
 74    for _, inst := range instances {
 75        weight := wrr.weights[inst.ID]
 76        if weight == 0 {
 77            weight = 1
 78        }
 79        cumulative += weight
 80
 81        if target < cumulative {
 82            return inst
 83        }
 84    }
 85
 86    return instances[0]
 87}
 88
 89// Consistent hashing
 90type ConsistentHash struct{}
 91
 92func NextForKey(instances []*ServiceInstance, key string) *ServiceInstance {
 93    if len(instances) == 0 {
 94        return nil
 95    }
 96
 97    hash := ch.hash(key)
 98    index := hash % uint32(len(instances))
 99    return instances[index]
100}
101
102func hash(s string) uint32 {
103    h := fnv.New32a()
104    h.Write([]byte(s))
105    return h.Sum32()
106}
107
108func main() {
109    instances := []*ServiceInstance{
110        {ID: "1", Host: "10.0.0.1", Port: 8080},
111        {ID: "2", Host: "10.0.0.2", Port: 8080},
112        {ID: "3", Host: "10.0.0.3", Port: 8080},
113    }
114
115    // Round-robin
116    rr := &RoundRobin{}
117    fmt.Println("Round-robin:")
118    for i := 0; i < 5; i++ {
119        inst := rr.Next(instances)
120        fmt.Printf("  %s\n", inst.ID)
121    }
122
123    // Weighted round-robin
124    wrr := NewWeightedRoundRobin()
125    wrr.SetWeight("1", 1)
126    wrr.SetWeight("2", 2)
127    wrr.SetWeight("3", 1)
128    fmt.Println("\nWeighted round-robin:")
129    for i := 0; i < 8; i++ {
130        inst := wrr.Next(instances)
131        fmt.Printf("  %s\n", inst.ID)
132    }
133
134    // Consistent hashing
135    ch := &ConsistentHash{}
136    fmt.Println("\nConsistent hashing:")
137    keys := []string{"user-123", "user-456", "user-123", "user-789"}
138    for _, key := range keys {
139        inst := ch.NextForKey(instances, key)
140        fmt.Printf("  %s -> %s\n", key, inst.ID)
141    }
142}

Now let's explore one of the most important patterns in distributed systems - the circuit breaker. This pattern prevents cascading failures, which can bring down entire systems.

Circuit Breaker Pattern

The circuit breaker prevents cascading failures by stopping requests to failing services.

Real-World Analogy: Think of circuit breakers like the electrical circuit breakers in your home. When a circuit overloads, the breaker trips to prevent damage to your appliances. After a while, it tries to reset itself to see if the problem is fixed.

Real-World Example: During Black Friday sales, Amazon's payment processing service came under extreme load. Circuit breakers prevented the payment failures from taking down the entire website. Users could still browse products and add to cart - they just got a friendly "try again later" message at checkout.

  1package main
  2
  3import (
  4    "errors"
  5    "fmt"
  6    "sync"
  7    "time"
  8)
  9
 10// State represents circuit breaker state
 11type State int
 12
 13const (
 14    StateClosed State = iota // Normal operation
 15    StateOpen                 // Failing, reject requests
 16    StateHalfOpen            // Testing if service recovered
 17)
 18
 19func String() string {
 20    return [...]string{"CLOSED", "OPEN", "HALF-OPEN"}[s]
 21}
 22
 23// CircuitBreaker implements the circuit breaker pattern
 24type CircuitBreaker struct {
 25    mu sync.Mutex
 26
 27    state         State
 28    failureCount  int
 29    successCount  int
 30    lastFailTime  time.Time
 31    lastStateTime time.Time
 32
 33    // Configuration
 34    maxFailures   int
 35    timeout       time.Duration
 36    halfOpenMax   int
 37}
 38
 39// NewCircuitBreaker creates a circuit breaker
 40func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker {
 41    return &CircuitBreaker{
 42        state:       StateClosed,
 43        maxFailures: maxFailures,
 44        timeout:     timeout,
 45        halfOpenMax: 3, // Try 3 requests in half-open
 46    }
 47}
 48
 49// Call executes a function with circuit breaker protection
 50func Call(fn func() error) error {
 51    cb.mu.Lock()
 52
 53    // Check if circuit is open
 54    if cb.state == StateOpen {
 55        // Check if timeout elapsed
 56        if time.Since(cb.lastStateTime) > cb.timeout {
 57            cb.state = StateHalfOpen
 58            cb.successCount = 0
 59            cb.lastStateTime = time.Now()
 60        } else {
 61            cb.mu.Unlock()
 62            return errors.New("circuit breaker is OPEN")
 63        }
 64    }
 65
 66    // In half-open, limit concurrent requests
 67    if cb.state == StateHalfOpen && cb.successCount >= cb.halfOpenMax {
 68        cb.mu.Unlock()
 69        return errors.New("circuit breaker is HALF-OPEN")
 70    }
 71
 72    cb.mu.Unlock()
 73
 74    // Execute function
 75    err := fn()
 76
 77    cb.mu.Lock()
 78    defer cb.mu.Unlock()
 79
 80    if err != nil {
 81        cb.onFailure()
 82        return err
 83    }
 84
 85    cb.onSuccess()
 86    return nil
 87}
 88
 89func onSuccess() {
 90    cb.failureCount = 0
 91
 92    if cb.state == StateHalfOpen {
 93        cb.successCount++
 94
 95        // After enough successes, close circuit
 96        if cb.successCount >= cb.halfOpenMax {
 97            cb.state = StateClosed
 98            cb.lastStateTime = time.Now()
 99        }
100    }
101}
102
103func onFailure() {
104    cb.failureCount++
105    cb.lastFailTime = time.Now()
106
107    if cb.state == StateHalfOpen {
108        // Any failure in half-open -> back to open
109        cb.state = StateOpen
110        cb.lastStateTime = time.Now()
111        return
112    }
113
114    // Too many failures -> open circuit
115    if cb.failureCount >= cb.maxFailures {
116        cb.state = StateOpen
117        cb.lastStateTime = time.Now()
118    }
119}
120
121// State returns current state
122func State() State {
123    cb.mu.Lock()
124    defer cb.mu.Unlock()
125    return cb.state
126}
127
128func main() {
129    cb := NewCircuitBreaker(3, 5*time.Second)
130
131    // Simulate service calls
132    callService := func() error {
133        // Simulate failing service
134        if time.Now().Unix()%2 == 0 {
135            return errors.New("service error")
136        }
137        return nil
138    }
139
140    for i := 0; i < 10; i++ {
141        err := cb.Call(callService)
142        state := cb.State()
143
144        if err != nil {
145            fmt.Printf("Call %d: ERROR - State: %s\n", i+1, err, state)
146        } else {
147            fmt.Printf("Call %d: SUCCESS - State: %s\n", i+1, state)
148        }
149
150        time.Sleep(500 * time.Millisecond)
151    }
152}

When to Use Circuit Breakers:

Essential for:

  • External API calls
  • Database connections
  • Critical microservices
  • Rate-limited services

⚠️ Common Pitfalls:

  • Setting thresholds too low: False positives, unnecessary circuit breaking
  • Setting thresholds too high: Too much damage before circuit breaks
  • No monitoring: You don't know when circuits are breaking
  • No fallback behavior: Users get errors instead of degraded service

Let's look at a more advanced circuit breaker with metrics to understand how it behaves in production.

Circuit Breaker with Metrics

  1package main
  2
  3import (
  4    "fmt"
  5    "sync"
  6    "time"
  7)
  8
  9// CircuitBreakerWithMetrics tracks detailed metrics
 10type CircuitBreakerWithMetrics struct {
 11    *CircuitBreaker
 12    mu sync.RWMutex
 13
 14    // Metrics
 15    totalCalls    int64
 16    successCalls  int64
 17    failureCalls  int64
 18    rejectedCalls int64
 19    stateChanges  map[string]int64
 20}
 21
 22func NewCircuitBreakerWithMetrics(maxFailures int, timeout time.Duration) *CircuitBreakerWithMetrics {
 23    return &CircuitBreakerWithMetrics{
 24        CircuitBreaker: NewCircuitBreaker(maxFailures, timeout),
 25        stateChanges:   make(map[string]int64),
 26    }
 27}
 28
 29func Call(fn func() error) error {
 30    cbm.mu.Lock()
 31    cbm.totalCalls++
 32    oldState := cbm.state
 33    cbm.mu.Unlock()
 34
 35    err := cbm.CircuitBreaker.Call(fn)
 36
 37    cbm.mu.Lock()
 38    defer cbm.mu.Unlock()
 39
 40    newState := cbm.state
 41
 42    if err != nil {
 43        if err.Error() == "circuit breaker is OPEN" || err.Error() == "circuit breaker is HALF-OPEN" {
 44            cbm.rejectedCalls++
 45        } else {
 46            cbm.failureCalls++
 47        }
 48    } else {
 49        cbm.successCalls++
 50    }
 51
 52    // Track state changes
 53    if oldState != newState {
 54        transition := fmt.Sprintf("%s->%s", oldState, newState)
 55        cbm.stateChanges[transition]++
 56    }
 57
 58    return err
 59}
 60
 61func Metrics() map[string]interface{} {
 62    cbm.mu.RLock()
 63    defer cbm.mu.RUnlock()
 64
 65    successRate := float64(0)
 66    if cbm.totalCalls > 0 {
 67        successRate = float64(cbm.successCalls) / float64(cbm.totalCalls) * 100
 68    }
 69
 70    return map[string]interface{}{
 71        "total_calls":     cbm.totalCalls,
 72        "success_calls":   cbm.successCalls,
 73        "failure_calls":   cbm.failureCalls,
 74        "rejected_calls":  cbm.rejectedCalls,
 75        "success_rate":    fmt.Sprintf("%.2f%%", successRate),
 76        "current_state":   cbm.state.String(),
 77        "state_changes":   cbm.stateChanges,
 78    }
 79}
 80
 81func main() {
 82    cb := NewCircuitBreakerWithMetrics(3, 2*time.Second)
 83
 84    // Simulate varying service reliability
 85    for i := 0; i < 20; i++ {
 86        err := cb.Call(func() error {
 87            // Fail 60% of the time
 88            if i%5 < 3 {
 89                return fmt.Errorf("service error")
 90            }
 91            return nil
 92        })
 93
 94        if err != nil {
 95            fmt.Printf("Call %d: FAILED\n", i+1, err)
 96        } else {
 97            fmt.Printf("Call %d: SUCCESS\n", i+1)
 98        }
 99
100        time.Sleep(300 * time.Millisecond)
101    }
102
103    // Print metrics
104    fmt.Println("\nMetrics:")
105    metrics := cb.Metrics()
106    for k, v := range metrics {
107        fmt.Printf("  %s: %v\n", k, v)
108    }
109}

Raft Consensus Basics

Raft is a consensus algorithm that ensures distributed systems agree on state even with failures.

Simplified Raft Leader Election

  1package main
  2
  3import (
  4    "fmt"
  5    "math/rand"
  6    "sync"
  7    "time"
  8)
  9
 10type NodeState int
 11
 12const (
 13    Follower NodeState = iota
 14    Candidate
 15    Leader
 16)
 17
 18func String() string {
 19    return [...]string{"FOLLOWER", "CANDIDATE", "LEADER"}[s]
 20}
 21
 22// RaftNode represents a node in Raft cluster
 23type RaftNode struct {
 24    id          int
 25    state       NodeState
 26    currentTerm int
 27    votedFor    int
 28    voteCount   int
 29
 30    // Cluster
 31    peers []*RaftNode
 32
 33    // Timeouts
 34    electionTimeout  time.Duration
 35    heartbeatTimeout time.Duration
 36    lastHeartbeat    time.Time
 37
 38    mu sync.Mutex
 39}
 40
 41func NewRaftNode(id int, peers []*RaftNode) *RaftNode {
 42    return &RaftNode{
 43        id:               id,
 44        state:            Follower,
 45        currentTerm:      0,
 46        votedFor:         -1,
 47        peers:            peers,
 48        electionTimeout:  time.Duration(150+rand.Intn(150)) * time.Millisecond,
 49        heartbeatTimeout: 50 * time.Millisecond,
 50        lastHeartbeat:    time.Now(),
 51    }
 52}
 53
 54// Start begins the node's operation
 55func Start() {
 56    go n.run()
 57}
 58
 59func run() {
 60    for {
 61        n.mu.Lock()
 62        state := n.state
 63        n.mu.Unlock()
 64
 65        switch state {
 66        case Follower:
 67            n.runFollower()
 68        case Candidate:
 69            n.runCandidate()
 70        case Leader:
 71            n.runLeader()
 72        }
 73    }
 74}
 75
 76func runFollower() {
 77    timeout := time.After(n.electionTimeout)
 78
 79    for {
 80        n.mu.Lock()
 81        if n.state != Follower {
 82            n.mu.Unlock()
 83            return
 84        }
 85        n.mu.Unlock()
 86
 87        select {
 88        case <-timeout:
 89            // No heartbeat from leader, become candidate
 90            n.mu.Lock()
 91            n.state = Candidate
 92            n.mu.Unlock()
 93            fmt.Printf("Node %d: Follower -> Candidate\n", n.id)
 94            return
 95        case <-time.After(10 * time.Millisecond):
 96            // Check if heartbeat received
 97            n.mu.Lock()
 98            if time.Since(n.lastHeartbeat) > n.electionTimeout {
 99                n.state = Candidate
100                n.mu.Unlock()
101                return
102            }
103            n.mu.Unlock()
104        }
105    }
106}
107
108func runCandidate() {
109    n.mu.Lock()
110    n.currentTerm++
111    n.votedFor = n.id
112    n.voteCount = 1 // Vote for self
113    term := n.currentTerm
114    n.mu.Unlock()
115
116    fmt.Printf("Node %d: Starting election for term %d\n", n.id, term)
117
118    // Request votes from peers
119    for _, peer := range n.peers {
120        if peer.id == n.id {
121            continue
122        }
123
124        go func(p *RaftNode) {
125            if p.RequestVote(n.id, term) {
126                n.mu.Lock()
127                n.voteCount++
128                voteCount := n.voteCount
129                n.mu.Unlock()
130
131                // Majority?
132                if voteCount > len(n.peers)/2 {
133                    n.mu.Lock()
134                    if n.state == Candidate {
135                        n.state = Leader
136                        fmt.Printf("Node %d: Candidate -> Leader\n",
137                            n.id, voteCount)
138                    }
139                    n.mu.Unlock()
140                }
141            }
142        }(peer)
143    }
144
145    // Election timeout
146    time.Sleep(n.electionTimeout)
147
148    n.mu.Lock()
149    if n.state == Candidate {
150        // Election failed, retry
151        fmt.Printf("Node %d: Election failed, retrying...\n", n.id)
152        n.state = Follower
153        n.votedFor = -1
154    }
155    n.mu.Unlock()
156}
157
158func runLeader() {
159    fmt.Printf("Node %d: Acting as leader\n", n.id)
160
161    ticker := time.NewTicker(n.heartbeatTimeout)
162    defer ticker.Stop()
163
164    for {
165        n.mu.Lock()
166        if n.state != Leader {
167            n.mu.Unlock()
168            return
169        }
170        term := n.currentTerm
171        n.mu.Unlock()
172
173        // Send heartbeats
174        for _, peer := range n.peers {
175            if peer.id == n.id {
176                continue
177            }
178
179            go peer.Heartbeat(n.id, term)
180        }
181
182        <-ticker.C
183    }
184}
185
186// RequestVote handles vote requests
187func RequestVote(candidateID, term int) bool {
188    n.mu.Lock()
189    defer n.mu.Unlock()
190
191    // Reject if we've already voted in this term
192    if n.currentTerm > term {
193        return false
194    }
195
196    if n.currentTerm < term {
197        n.currentTerm = term
198        n.votedFor = -1
199        n.state = Follower
200    }
201
202    if n.votedFor == -1 || n.votedFor == candidateID {
203        n.votedFor = candidateID
204        return true
205    }
206
207    return false
208}
209
210// Heartbeat handles heartbeat from leader
211func Heartbeat(leaderID, term int) {
212    n.mu.Lock()
213    defer n.mu.Unlock()
214
215    if term >= n.currentTerm {
216        n.currentTerm = term
217        n.state = Follower
218        n.lastHeartbeat = time.Now()
219    }
220}
221
222func main() {
223    // Create cluster of 5 nodes
224    nodes := make([]*RaftNode, 5)
225    for i := range nodes {
226        nodes[i] = &RaftNode{id: i}
227    }
228
229    // Set peers
230    for i := range nodes {
231        nodes[i] = NewRaftNode(i, nodes)
232    }
233
234    // Start all nodes
235    for _, node := range nodes {
236        node.Start()
237    }
238
239    // Run for 10 seconds
240    time.Sleep(10 * time.Second)
241}

Distributed Locks

Redis-Based Distributed Lock

  1package main
  2
  3import (
  4    "context"
  5    "errors"
  6    "fmt"
  7    "time"
  8
  9    "github.com/google/uuid"
 10)
 11
 12// DistributedLock implements distributed locking
 13type DistributedLock struct {
 14    key   string
 15    value string
 16    ttl   time.Duration
 17}
 18
 19// NewDistributedLock creates a new lock
 20func NewDistributedLock(key string, ttl time.Duration) *DistributedLock {
 21    return &DistributedLock{
 22        key:   key,
 23        value: uuid.New().String(), // Unique token
 24        ttl:   ttl,
 25    }
 26}
 27
 28// Lock acquires the lock
 29func Lock(ctx context.Context) error {
 30    deadline, ok := ctx.Deadline()
 31    if !ok {
 32        deadline = time.Now().Add(10 * time.Second)
 33    }
 34
 35    for {
 36        // Try to acquire lock
 37        if dl.tryLock() {
 38            return nil
 39        }
 40
 41        // Check timeout
 42        if time.Now().After(deadline) {
 43            return errors.New("lock acquisition timeout")
 44        }
 45
 46        // Retry with exponential backoff
 47        select {
 48        case <-ctx.Done():
 49            return ctx.Err()
 50        case <-time.After(100 * time.Millisecond):
 51            // Retry
 52        }
 53    }
 54}
 55
 56func tryLock() bool {
 57    // Simulated SET NX
 58    // In real implementation, use Redis SET key value NX EX seconds
 59    // For now, use a simple in-memory map
 60
 61    lockStore.mu.Lock()
 62    defer lockStore.mu.Unlock()
 63
 64    if existing, ok := lockStore.locks[dl.key]; ok {
 65        // Check if expired
 66        if time.Now().Before(existing.expiry) {
 67            return false
 68        }
 69    }
 70
 71    // Acquire lock
 72    lockStore.locks[dl.key] = &lockEntry{
 73        value:  dl.value,
 74        expiry: time.Now().Add(dl.ttl),
 75    }
 76
 77    return true
 78}
 79
 80// Unlock releases the lock
 81func Unlock() error {
 82    lockStore.mu.Lock()
 83    defer lockStore.mu.Unlock()
 84
 85    existing, ok := lockStore.locks[dl.key]
 86    if !ok {
 87        return errors.New("lock not held")
 88    }
 89
 90    // Only unlock if we own it
 91    if existing.value != dl.value {
 92        return errors.New("lock owned by another process")
 93    }
 94
 95    delete(lockStore.locks, dl.key)
 96    return nil
 97}
 98
 99// Extend extends the lock TTL
100func Extend(duration time.Duration) error {
101    lockStore.mu.Lock()
102    defer lockStore.mu.Unlock()
103
104    existing, ok := lockStore.locks[dl.key]
105    if !ok {
106        return errors.New("lock not held")
107    }
108
109    if existing.value != dl.value {
110        return errors.New("lock owned by another process")
111    }
112
113    existing.expiry = time.Now().Add(duration)
114    return nil
115}
116
117// Simple in-memory lock store
118type lockEntry struct {
119    value  string
120    expiry time.Time
121}
122
123var lockStore = struct {
124    mu    sync.Mutex
125    locks map[string]*lockEntry
126}{
127    locks: make(map[string]*lockEntry),
128}
129
130func main() {
131    lock := NewDistributedLock("resource:123", 5*time.Second)
132
133    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
134    defer cancel()
135
136    // Acquire lock
137    if err := lock.Lock(ctx); err != nil {
138        panic(err)
139    }
140    fmt.Println("Lock acquired")
141
142    // Do work
143    time.Sleep(2 * time.Second)
144
145    // Extend lock
146    if err := lock.Extend(5 * time.Second); err != nil {
147        panic(err)
148    }
149    fmt.Println("Lock extended")
150
151    // Do more work
152    time.Sleep(2 * time.Second)
153
154    // Release lock
155    if err := lock.Unlock(); err != nil {
156        panic(err)
157    }
158    fmt.Println("Lock released")
159}

Further Reading

Books

  • Designing Data-Intensive Applications by Martin Kleppmann
  • Building Microservices by Sam Newman
  • Release It! by Michael Nygard

Papers

Tools

  • etcd - Distributed key-value store using Raft
  • Consul - Service mesh and service discovery
  • Istio - Service mesh for Kubernetes

Articles

Integration and Mastery

Building distributed systems is fundamentally different from building monolithic applications. The patterns and techniques covered in this tutorial represent years of hard-won lessons from production systems serving millions of users. This section synthesizes these lessons into actionable guidance for designing, implementing, and operating distributed systems in Go.

Key Takeaways and Best Practices

1. Embrace Failure as the Normal Case

In distributed systems, failures are not exceptional - they're inevitable. Design every component assuming that:

  • Network calls will timeout or fail intermittently
  • Servers will crash at the worst possible moment
  • Network partitions will split your cluster unexpectedly
  • Clocks will drift and messages will arrive out of order
 1// Always implement timeouts and retries
 2ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 3defer cancel()
 4
 5var result Response
 6err := retry.Do(
 7    func() error {
 8        return makeRequest(ctx, &result)
 9    },
10    retry.Attempts(3),
11    retry.Delay(100*time.Millisecond),
12    retry.MaxDelay(1*time.Second),
13    retry.DelayType(retry.BackOffDelay),
14)

2. Start Simple, Scale Gradually

The patterns in this tutorial solve real problems, but they add complexity. Follow this progression:

Phase 1 - Single Service (0-10K users):

  • Start with a well-structured monolith
  • Use simple HTTP/gRPC for external APIs
  • Single database with proper indexing
  • Basic health checks and logging

Phase 2 - Fault Tolerance (10K-100K users):

  • Add circuit breakers to prevent cascade failures
  • Implement structured logging and basic metrics
  • Add load balancers for high availability
  • Use connection pools and timeouts everywhere

Phase 3 - Distribution (100K-1M users):

  • Split into microservices based on team boundaries
  • Implement service discovery (Consul, etcd)
  • Add distributed tracing (OpenTelemetry)
  • Consider event-driven architecture

Phase 4 - Scale and Resilience (1M+ users):

  • Implement consensus for critical state (Raft, Paxos)
  • Use saga patterns for distributed transactions
  • Deploy service mesh for advanced traffic management
  • Implement chaos engineering practices

3. Consistency Model Alignment

Choose consistency models based on business requirements, not technical preferences:

 1// Example: Banking requires strong consistency
 2type AccountTransfer struct {
 3    // Use distributed transactions or consensus
 4    // Cannot have "split-brain" money creation
 5}
 6
 7// Example: Social media feed is eventually consistent
 8type UserFeed struct {
 9    // Can tolerate temporary inconsistency
10    // Higher availability is more valuable
11}
12
13// Example: Shopping cart is session-consistent
14type ShoppingCart struct {
15    // User sees their own updates immediately
16    // Other users don't need real-time view
17}

Matching Patterns to Requirements:

  • Strong Consistency Needed: Use Raft/Paxos consensus, distributed locks, or two-phase commit
  • Eventual Consistency Acceptable: Use message queues, event sourcing, or CRDT data structures
  • Session Consistency Required: Use sticky sessions, read-your-writes guarantees, or client-side caching

Common Pitfalls and How to Avoid Them

Pitfall 1: Distributed Transactions Without Timeouts

 1// ❌ Bad: Can deadlock indefinitely
 2func transferMoney(from, to string, amount int) error {
 3    lock1 := acquireLock(from)  // What if this hangs?
 4    lock2 := acquireLock(to)
 5    defer releaseLock(from, to)
 6    // ... transfer logic
 7}
 8
 9// ✅ Good: Always use timeouts and proper lock ordering
10func transferMoney(ctx context.Context, from, to string, amount int) error {
11    // Order locks alphabetically to prevent deadlock
12    first, second := from, to
13    if from > to {
14        first, second = to, from
15    }
16
17    lock1, err := acquireLockWithTimeout(ctx, first, 5*time.Second)
18    if err != nil {
19        return fmt.Errorf("failed to acquire first lock: %w", err)
20    }
21    defer lock1.Release()
22
23    lock2, err := acquireLockWithTimeout(ctx, second, 5*time.Second)
24    if err != nil {
25        return fmt.Errorf("failed to acquire second lock: %w", err)
26    }
27    defer lock2.Release()
28
29    return performTransfer(ctx, from, to, amount)
30}

Pitfall 2: Ignoring Idempotency

Distributed systems often retry operations. If your operations aren't idempotent, retries cause duplicate actions:

 1// ❌ Bad: Retrying this creates duplicate charges
 2func chargeCustomer(customerID string, amount int) error {
 3    return db.Exec("INSERT INTO charges (customer_id, amount) VALUES (?, ?)",
 4        customerID, amount)
 5}
 6
 7// ✅ Good: Use idempotency keys
 8func chargeCustomer(customerID string, amount int, idempotencyKey string) error {
 9    // Try to insert with unique key constraint
10    err := db.Exec(`
11        INSERT INTO charges (idempotency_key, customer_id, amount)
12        VALUES (?, ?, ?)
13        ON CONFLICT (idempotency_key) DO NOTHING`,
14        idempotencyKey, customerID, amount)
15
16    if err != nil {
17        return fmt.Errorf("charge failed: %w", err)
18    }
19
20    // Return existing charge if key already existed
21    return nil
22}

Pitfall 3: Cascading Failures

One slow service can bring down your entire system:

 1// ❌ Bad: No protection against slow dependencies
 2func getRecommendations(userID string) ([]Product, error) {
 3    profile := fetchUserProfile(userID)      // 5s timeout
 4    history := fetchPurchaseHistory(userID)  // 5s timeout
 5    trending := fetchTrendingProducts()      // 5s timeout
 6    // Total worst case: 15 seconds!
 7}
 8
 9// ✅ Good: Use circuit breakers and parallel execution
10func getRecommendations(ctx context.Context, userID string) ([]Product, error) {
11    // Set overall deadline
12    ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
13    defer cancel()
14
15    var profile UserProfile
16    var history []Purchase
17    var trending []Product
18
19    // Execute in parallel with circuit breakers
20    g, ctx := errgroup.WithContext(ctx)
21
22    g.Go(func() error {
23        var err error
24        profile, err = profileBreaker.Execute(func() (UserProfile, error) {
25            return fetchUserProfile(ctx, userID)
26        })
27        if err != nil {
28            // Use cached or default profile
29            profile = getDefaultProfile()
30        }
31        return nil // Don't fail overall request
32    })
33
34    g.Go(func() error {
35        var err error
36        history, err = historyBreaker.Execute(func() ([]Purchase, error) {
37            return fetchPurchaseHistory(ctx, userID)
38        })
39        if err != nil {
40            history = []Purchase{} // Empty history is acceptable
41        }
42        return nil
43    })
44
45    g.Go(func() error {
46        var err error
47        trending, err = trendingBreaker.Execute(func() ([]Product, error) {
48            return fetchTrendingProducts(ctx)
49        })
50        if err != nil {
51            trending = getCachedTrending() // Use cache
52        }
53        return nil
54    })
55
56    g.Wait() // Wait for all to complete
57
58    return generateRecommendations(profile, history, trending), nil
59}

Pitfall 4: Split-Brain Scenarios

Network partitions can create multiple leaders:

 1// ✅ Always use odd number of nodes for quorum
 2type ClusterConfig struct {
 3    Nodes []string // Must be 3, 5, 7, etc.
 4}
 5
 6// ✅ Require majority votes for leadership
 7func electLeader(votes map[string]int, totalNodes int) (string, error) {
 8    quorum := totalNodes/2 + 1
 9
10    for candidate, voteCount := range votes {
11        if voteCount >= quorum {
12            return candidate, nil
13        }
14    }
15
16    return "", errors.New("no candidate achieved quorum")
17}
18
19// ✅ Use fencing tokens to prevent stale leaders
20type LeaderLease struct {
21    Token   int64     // Monotonically increasing
22    Expires time.Time
23}
24
25func executeLeaderAction(lease LeaderLease, action func() error) error {
26    // Check lease is still valid
27    if time.Now().After(lease.Expires) {
28        return errors.New("lease expired")
29    }
30
31    // Include token in all operations
32    // Storage layer rejects operations with stale tokens
33    return action()
34}

When to Use Distributed Patterns

Use Service Discovery When:

  • You have more than 3-5 services
  • Service instances are ephemeral (containers, auto-scaling)
  • You need dynamic configuration updates
  • Health checking is critical to your reliability

Use Circuit Breakers When:

  • You depend on external services (APIs, databases)
  • Failures can cascade through your system
  • You need graceful degradation
  • Response time SLAs are important

Use Consensus Algorithms When:

  • Strong consistency is non-negotiable (financial data, inventory)
  • You need distributed locks or leader election
  • Data loss is unacceptable
  • You can tolerate reduced availability during partitions

Use Saga Patterns When:

  • Transactions span multiple microservices
  • Two-phase commit is too slow or complex
  • Eventual consistency is acceptable
  • You need to maintain service autonomy

Use Distributed Locks When:

  • Multiple processes need exclusive access to resources
  • You're implementing leader election
  • Preventing duplicate work is critical
  • Lock duration is short (seconds, not minutes)

Don't Use Distributed Systems When:

  • A monolith with proper architecture would suffice
  • Your team lacks distributed systems expertise
  • The added complexity isn't justified by scale
  • Network latency would hurt user experience

Production Deployment Considerations

Observability is Non-Negotiable

Distributed systems are impossible to debug without proper observability:

 1// Implement the three pillars: Logs, Metrics, Traces
 2
 3// 1. Structured Logging with Context
 4import "go.uber.org/zap"
 5
 6logger.Info("processing request",
 7    zap.String("request_id", reqID),
 8    zap.String("user_id", userID),
 9    zap.Duration("duration", elapsed),
10    zap.Int("status_code", statusCode),
11)
12
13// 2. Metrics with Labels
14import "github.com/prometheus/client_golang/prometheus"
15
16requestDuration := prometheus.NewHistogramVec(
17    prometheus.HistogramOpts{
18        Name: "http_request_duration_seconds",
19        Help: "HTTP request latency distribution",
20        Buckets: prometheus.DefBuckets,
21    },
22    []string{"method", "endpoint", "status"},
23)
24
25// 3. Distributed Tracing
26import "go.opentelemetry.io/otel"
27
28ctx, span := tracer.Start(ctx, "process-order")
29defer span.End()
30
31span.SetAttributes(
32    attribute.String("order.id", orderID),
33    attribute.Int("order.items", len(items)),
34)

Gradual Rollouts

Deploy distributed systems changes gradually:

 1// Feature flags for controlled rollouts
 2type FeatureFlags struct {
 3    UseNewConsensusAlgorithm bool   // 0% → 1% → 5% → 50% → 100%
 4    EnableCircuitBreaker     bool
 5    RolloutPercentage        int
 6}
 7
 8func shouldUseNewFeature(userID string, rolloutPercentage int) bool {
 9    // Consistent hashing ensures same user always gets same result
10    hash := crc32.ChecksumIEEE([]byte(userID))
11    return int(hash%100) < rolloutPercentage
12}
13
14// Canary deployments: Route small percentage to new version
15// Blue-green deployments: Switch all traffic at once (with quick rollback)
16// Rolling deployments: Gradually replace old instances

Capacity Planning

Distributed systems scale differently than monoliths:

 1// Calculate required capacity with safety margins
 2
 3// Base calculation
 4expectedRPS := 10000
 5avgLatency := 50 * time.Millisecond
 6concurrency := float64(expectedRPS) * avgLatency.Seconds()
 7
 8// Apply safety factors
 9safetyFactor := 2.0  // Handle 2x traffic spikes
10concurrency *= safetyFactor
11
12// Account for inefficiencies
13clusterEfficiency := 0.7  // 70% efficiency (N+2 redundancy, upgrades, etc.)
14requiredInstances := int(math.Ceil(concurrency / (1000 * clusterEfficiency)))
15
16// Always use odd number for consensus
17if requiredInstances%2 == 0 {
18    requiredInstances++
19}
20
21fmt.Printf("Required instances: %d\n", requiredInstances)

Disaster Recovery

Plan for worst-case scenarios:

  1. Multi-Region Deployment: Survive entire region failures
  2. Regular Backups: Automate backups with restore testing
  3. Runbooks: Document failure scenarios and recovery steps
  4. Chaos Engineering: Regularly test failure scenarios
  5. Circuit Breaking: Fail fast and isolate failures
 1// Example: Multi-region failover
 2type RegionConfig struct {
 3    Primary   string   // us-east-1
 4    Fallbacks []string // [us-west-2, eu-west-1]
 5}
 6
 7func makeRequest(ctx context.Context, regions RegionConfig) (Response, error) {
 8    // Try primary first
 9    resp, err := makeRegionRequest(ctx, regions.Primary)
10    if err == nil {
11        return resp, nil
12    }
13
14    // Fallback to other regions
15    for _, region := range regions.Fallbacks {
16        resp, err := makeRegionRequest(ctx, region)
17        if err == nil {
18            // Log for monitoring
19            logger.Warn("primary region failed, using fallback",
20                zap.String("primary", regions.Primary),
21                zap.String("fallback", region),
22            )
23            return resp, nil
24        }
25    }
26
27    return Response{}, errors.New("all regions unavailable")
28}

Final Thoughts

Distributed systems are powerful but complex. The patterns in this tutorial - circuit breakers, consensus algorithms, service discovery, and sagas - are your toolkit for managing this complexity. Remember:

  1. Start Simple: Don't over-engineer. Most systems don't need distributed consensus.
  2. Fail Gracefully: Design for failure, not just success.
  3. Monitor Everything: You can't fix what you can't see.
  4. Test Failures: Use chaos engineering to validate resilience.
  5. Document Trade-offs: Every distributed systems decision involves trade-offs - document them.

The distributed systems landscape is vast, and this tutorial covers foundational patterns. As you build production systems, you'll encounter scenarios requiring deeper expertise in specific areas. The Further Reading section provides resources for diving deeper into topics like distributed databases, stream processing, and consensus protocols.

Most importantly, remember that distributed systems exist to solve business problems. Before reaching for these patterns, ask yourself: "Does the business value justify the added complexity?" Sometimes the best distributed system is a well-designed monolith.

Practice Exercises

Exercise 1: Implement a Saga Pattern

🎯 Learning Objectives:

  • Master distributed transaction patterns without two-phase commit
  • Implement compensation-based rollback mechanisms
  • Learn to handle partial failures in distributed systems
  • Design stateful workflow coordinators

🌍 Real-World Context:
Saga patterns are essential for microservices architectures where traditional ACID transactions don't work across service boundaries. Companies like Uber use sagas to coordinate ride-booking across payment, routing, and notification services. When any step fails, compensation actions ensure system consistency without locking resources across services, which is crucial for high-throughput distributed systems.

⏱️ Time Estimate: 60-90 minutes
📊 Difficulty: Advanced

Build a saga coordinator for distributed transactions with compensation. Your implementation should execute steps sequentially, track progress, handle partial failures gracefully, and automatically compensate completed steps when failures occur. This pattern is critical for maintaining data consistency across microservices without distributed locks.

Solution
  1package main
  2
  3import (
  4    "context"
  5    "errors"
  6    "fmt"
  7    "sync"
  8)
  9
 10// Step represents a saga step
 11type Step struct {
 12    Name       string
 13    Execute    func(ctx context.Context) error
 14    Compensate func(ctx context.Context) error
 15}
 16
 17// Saga coordinates distributed transactions
 18type Saga struct {
 19    steps     []Step
 20    executed  []int // Track executed step indices
 21    mu        sync.Mutex
 22}
 23
 24func NewSaga() *Saga {
 25    return &Saga{
 26        executed: make([]int, 0),
 27    }
 28}
 29
 30// AddStep adds a step to the saga
 31func AddStep(name string, execute, compensate func(ctx context.Context) error) {
 32    s.steps = append(s.steps, Step{
 33        Name:       name,
 34        Execute:    execute,
 35        Compensate: compensate,
 36    })
 37}
 38
 39// Execute runs the saga
 40func Execute(ctx context.Context) error {
 41    s.mu.Lock()
 42    defer s.mu.Unlock()
 43
 44    // Execute each step
 45    for i, step := range s.steps {
 46        fmt.Printf("Executing step: %s\n", step.Name)
 47
 48        if err := step.Execute(ctx); err != nil {
 49            fmt.Printf("Step %s failed: %v\n", step.Name, err)
 50
 51            // Compensate in reverse order
 52            if err := s.compensate(ctx); err != nil {
 53                return fmt.Errorf("compensation failed: %w", err)
 54            }
 55
 56            return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
 57        }
 58
 59        s.executed = append(s.executed, i)
 60        fmt.Printf("Step %s completed\n", step.Name)
 61    }
 62
 63    fmt.Println("Saga completed successfully")
 64    return nil
 65}
 66
 67func compensate(ctx context.Context) error {
 68    fmt.Println("Starting compensation...")
 69
 70    // Compensate in reverse order
 71    for i := len(s.executed) - 1; i >= 0; i-- {
 72        stepIdx := s.executed[i]
 73        step := s.steps[stepIdx]
 74
 75        fmt.Printf("Compensating step: %s\n", step.Name)
 76
 77        if err := step.Compensate(ctx); err != nil {
 78            return fmt.Errorf("compensation failed for step %s: %w", step.Name, err)
 79        }
 80
 81        fmt.Printf("Step %s compensated\n", step.Name)
 82    }
 83
 84    s.executed = nil
 85    fmt.Println("Compensation completed")
 86    return nil
 87}
 88
 89// Example: Order processing saga
 90func main() {
 91    saga := NewSaga()
 92
 93    // Step 1: Reserve inventory
 94    var inventoryReserved bool
 95    saga.AddStep("ReserveInventory",
 96        func(ctx context.Context) error {
 97            fmt.Println("  Reserving inventory...")
 98            inventoryReserved = true
 99            return nil
100        },
101        func(ctx context.Context) error {
102            if inventoryReserved {
103                fmt.Println("  Releasing inventory...")
104                inventoryReserved = false
105            }
106            return nil
107        },
108    )
109
110    // Step 2: Charge payment
111    var paymentCharged bool
112    saga.AddStep("ChargePayment",
113        func(ctx context.Context) error {
114            fmt.Println("  Charging payment...")
115            paymentCharged = true
116            return nil
117        },
118        func(ctx context.Context) error {
119            if paymentCharged {
120                fmt.Println("  Refunding payment...")
121                paymentCharged = false
122            }
123            return nil
124        },
125    )
126
127    // Step 3: Create shipment
128    saga.AddStep("CreateShipment",
129        func(ctx context.Context) error {
130            fmt.Println("  Creating shipment...")
131            return errors.New("shipping service unavailable")
132        },
133        func(ctx context.Context) error {
134            fmt.Println("  Canceling shipment...")
135            return nil
136        },
137    )
138
139    // Execute saga
140    ctx := context.Background()
141    if err := saga.Execute(ctx); err != nil {
142        fmt.Printf("\nSaga failed: %v\n", err)
143        fmt.Printf("Inventory reserved: %v\n", inventoryReserved)
144        fmt.Printf("Payment charged: %v\n", paymentCharged)
145    }
146}

Output:

Executing step: ReserveInventory
  Reserving inventory...
Step ReserveInventory completed
Executing step: ChargePayment
  Charging payment...
Step ChargePayment completed
Executing step: CreateShipment
  Creating shipment...
Step CreateShipment failed: shipping service unavailable
Starting compensation...
Compensating step: ChargePayment
  Refunding payment...
Step ChargePayment compensated
Compensating step: ReserveInventory
  Releasing inventory...
Step ReserveInventory compensated
Compensation completed

Saga failed: saga failed at step CreateShipment: shipping service unavailable
Inventory reserved: false
Payment charged: false

Exercise 2: Build a Bulkhead Pattern Implementation

🎯 Learning Objectives:

  • Implement resource isolation patterns for fault tolerance
  • Design concurrent request management with backpressure
  • Learn semaphore-based resource allocation
  • Build metrics and monitoring for capacity management

🌍 Real-World Context:
The bulkhead pattern prevents cascading failures by isolating resources, just like watertight compartments in ships prevent sinking when one section floods. Netflix uses bulkheads extensively to isolate different parts of their streaming service - when the recommendation service fails, video playback continues unaffected. This pattern is crucial for systems where partial functionality is better than complete outage.

⏱️ Time Estimate: 75-90 minutes
📊 Difficulty: Advanced

Implement the bulkhead pattern to isolate resources and prevent cascading failures. Your implementation should create separate resource pools for different services, limit concurrent requests per pool, queue overflow requests gracefully, and provide comprehensive metrics for monitoring resource utilization and identifying bottlenecks.

Solution
  1package main
  2
  3import (
  4    "context"
  5    "errors"
  6    "fmt"
  7    "sync"
  8    "time"
  9)
 10
 11// Bulkhead isolates resources to prevent cascading failures
 12type Bulkhead struct {
 13    name       string
 14    maxWorkers int
 15    queueSize  int
 16
 17    workers   chan struct{} // Semaphore for workers
 18    queue     chan *task
 19    wg        sync.WaitGroup
 20
 21    // Metrics
 22    mu              sync.RWMutex
 23    activeWorkers   int
 24    queuedTasks     int
 25    completedTasks  int64
 26    rejectedTasks   int64
 27}
 28
 29type task struct {
 30    fn     func(context.Context) error
 31    result chan error
 32}
 33
 34// NewBulkhead creates a new bulkhead
 35func NewBulkhead(name string, maxWorkers, queueSize int) *Bulkhead {
 36    b := &Bulkhead{
 37        name:       name,
 38        maxWorkers: maxWorkers,
 39        queueSize:  queueSize,
 40        workers:    make(chan struct{}, maxWorkers),
 41        queue:      make(chan *task, queueSize),
 42    }
 43
 44    // Start worker pool
 45    for i := 0; i < maxWorkers; i++ {
 46        b.wg.Add(1)
 47        go b.worker()
 48    }
 49
 50    return b
 51}
 52
 53// Execute runs a function with bulkhead protection
 54func Execute(ctx context.Context, fn func(context.Context) error) error {
 55    // Create task
 56    t := &task{
 57        fn:     fn,
 58        result: make(chan error, 1),
 59    }
 60
 61    // Try to queue
 62    select {
 63    case b.queue <- t:
 64        b.mu.Lock()
 65        b.queuedTasks++
 66        b.mu.Unlock()
 67
 68        // Wait for result
 69        select {
 70        case err := <-t.result:
 71            return err
 72        case <-ctx.Done():
 73            return ctx.Err()
 74        }
 75
 76    default:
 77        // Queue full
 78        b.mu.Lock()
 79        b.rejectedTasks++
 80        b.mu.Unlock()
 81        return errors.New("bulkhead queue full")
 82    }
 83}
 84
 85func worker() {
 86    defer b.wg.Done()
 87
 88    for t := range b.queue {
 89        b.mu.Lock()
 90        b.activeWorkers++
 91        b.queuedTasks--
 92        b.mu.Unlock()
 93
 94        // Execute task
 95        ctx := context.Background()
 96        err := t.fn(ctx)
 97        t.result <- err
 98
 99        b.mu.Lock()
100        b.activeWorkers--
101        b.completedTasks++
102        b.mu.Unlock()
103    }
104}
105
106// Metrics returns current bulkhead metrics
107func Metrics() map[string]interface{} {
108    b.mu.RLock()
109    defer b.mu.RUnlock()
110
111    return map[string]interface{}{
112        "name":            b.name,
113        "max_workers":     b.maxWorkers,
114        "active_workers":  b.activeWorkers,
115        "queued_tasks":    b.queuedTasks,
116        "completed_tasks": b.completedTasks,
117        "rejected_tasks":  b.rejectedTasks,
118    }
119}
120
121// Close shuts down the bulkhead
122func Close() {
123    close(b.queue)
124    b.wg.Wait()
125}
126
127func main() {
128    // Create separate bulkheads for different services
129    paymentBulkhead := NewBulkhead("payment", 3, 5)
130    shippingBulkhead := NewBulkhead("shipping", 2, 3)
131
132    defer paymentBulkhead.Close()
133    defer shippingBulkhead.Close()
134
135    // Simulate load
136    var wg sync.WaitGroup
137
138    // Payment service
139    for i := 0; i < 10; i++ {
140        wg.Add(1)
141        go func(id int) {
142            defer wg.Done()
143
144            err := paymentBulkhead.Execute(context.Background(), func(ctx context.Context) error {
145                fmt.Printf("Payment %d: Processing...\n", id)
146                time.Sleep(100 * time.Millisecond)
147                return nil
148            })
149
150            if err != nil {
151                fmt.Printf("Payment %d: REJECTED\n", id, err)
152            }
153        }(i)
154    }
155
156    // Shipping service
157    for i := 0; i < 15; i++ {
158        wg.Add(1)
159        go func(id int) {
160            defer wg.Done()
161
162            err := shippingBulkhead.Execute(context.Background(), func(ctx context.Context) error {
163                fmt.Printf("Shipping %d: Processing...\n", id)
164                time.Sleep(500 * time.Millisecond)
165                return nil
166            })
167
168            if err != nil {
169                fmt.Printf("Shipping %d: REJECTED\n", id, err)
170            }
171        }(i)
172    }
173
174    wg.Wait()
175
176    // Print metrics
177    fmt.Println("\n=== Bulkhead Metrics ===")
178    fmt.Printf("Payment: %+v\n", paymentBulkhead.Metrics())
179    fmt.Printf("Shipping: %+v\n", shippingBulkhead.Metrics())
180}

Key Points:

  • Separate resource pools prevent one slow service from affecting others
  • Queue provides buffer for bursty traffic
  • Metrics help identify bottlenecks
  • Failed requests fail fast instead of hanging

Exercise 3: Vector Clock Implementation

🎯 Learning Objectives:

  • Master causal consistency and happened-before relationships
  • Implement distributed event ordering without synchronized clocks
  • Learn to detect concurrent vs causally related events
  • Design clock merging and comparison algorithms

🌍 Real-World Context:
Vector clocks are fundamental to distributed systems where physical clock synchronization is impossible or unreliable. Amazon Dynamo uses vector clocks to resolve conflicts in eventually consistent storage, while collaborative editing tools like Google Docs use similar concepts to maintain consistency across distributed users. Understanding vector clocks is essential for building systems that need to reason about event causality without global time.

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert

Implement vector clocks for tracking causality in distributed systems. Your implementation should track happened-before relationships between events, detect concurrent events, merge clocks from different nodes, and provide comparison operations for ordering events causally. This is a core building block for eventually consistent systems and collaborative applications.

Solution with Explanation
  1// run
  2package main
  3
  4import (
  5    "fmt"
  6    "sync"
  7)
  8
  9// VectorClock tracks causality in distributed systems
 10type VectorClock struct {
 11    mu      sync.RWMutex
 12    clock   map[string]int
 13    nodeID  string
 14}
 15
 16// NewVectorClock creates a new vector clock for a node
 17func NewVectorClock(nodeID string) *VectorClock {
 18    return &VectorClock{
 19        clock:  make(map[string]int),
 20        nodeID: nodeID,
 21    }
 22}
 23
 24// Tick increments this node's clock
 25func Tick() {
 26    vc.mu.Lock()
 27    defer vc.mu.Unlock()
 28
 29    vc.clock[vc.nodeID]++
 30}
 31
 32// Update merges another clock into this one
 33func Update(other *VectorClock) {
 34    vc.mu.Lock()
 35    defer vc.mu.Unlock()
 36
 37    other.mu.RLock()
 38    defer other.mu.RUnlock()
 39
 40    // Take max of each node's counter
 41    for node, timestamp := range other.clock {
 42        if vc.clock[node] < timestamp {
 43            vc.clock[node] = timestamp
 44        }
 45    }
 46
 47    // Increment our own counter
 48    vc.clock[vc.nodeID]++
 49}
 50
 51// Compare determines the relationship between two clocks
 52type Ordering int
 53
 54const (
 55    Before     Ordering = iota // vc happened before other
 56    After                      // vc happened after other
 57    Concurrent                 // vc and other are concurrent
 58)
 59
 60func String() string {
 61    switch o {
 62    case Before:
 63        return "BEFORE"
 64    case After:
 65        return "AFTER"
 66    case Concurrent:
 67        return "CONCURRENT"
 68    default:
 69        return "UNKNOWN"
 70    }
 71}
 72
 73// Compare compares this clock with another
 74func Compare(other *VectorClock) Ordering {
 75    vc.mu.RLock()
 76    defer vc.mu.RUnlock()
 77
 78    other.mu.RLock()
 79    defer other.mu.RUnlock()
 80
 81    allLessOrEqual := true
 82    allGreaterOrEqual := true
 83
 84    // Collect all node IDs
 85    nodes := make(map[string]bool)
 86    for node := range vc.clock {
 87        nodes[node] = true
 88    }
 89    for node := range other.clock {
 90        nodes[node] = true
 91    }
 92
 93    // Compare each node's timestamp
 94    for node := range nodes {
 95        vcTime := vc.clock[node]
 96        otherTime := other.clock[node]
 97
 98        if vcTime > otherTime {
 99            allLessOrEqual = false
100        }
101        if vcTime < otherTime {
102            allGreaterOrEqual = false
103        }
104    }
105
106    // Determine ordering
107    if allLessOrEqual && !allGreaterOrEqual {
108        return Before
109    }
110    if allGreaterOrEqual && !allLessOrEqual {
111        return After
112    }
113    if allLessOrEqual && allGreaterOrEqual {
114        // All equal - technically "Before" but same event
115        return Before
116    }
117
118    return Concurrent
119}
120
121// Copy creates a deep copy of the clock
122func Copy() *VectorClock {
123    vc.mu.RLock()
124    defer vc.mu.RUnlock()
125
126    newClock := NewVectorClock(vc.nodeID)
127    for node, timestamp := range vc.clock {
128        newClock.clock[node] = timestamp
129    }
130
131    return newClock
132}
133
134// String returns a string representation
135func String() string {
136    vc.mu.RLock()
137    defer vc.mu.RUnlock()
138
139    return fmt.Sprintf("%v", vc.clock)
140}
141
142// Event represents a distributed system event
143type Event struct {
144    NodeID string
145    Action string
146    Clock  *VectorClock
147}
148
149func main() {
150    // Create three nodes
151    nodeA := NewVectorClock("A")
152    nodeB := NewVectorClock("B")
153    nodeC := NewVectorClock("C")
154
155    fmt.Println("=== Distributed System Event Ordering ===\n")
156
157    // Event 1: Node A sends message
158    fmt.Println("Event 1: Node A sends message")
159    nodeA.Tick()
160    fmt.Printf("  Node A clock: %s\n", nodeA)
161
162    // Event 2: Node B receives message from A
163    fmt.Println("\nEvent 2: Node B receives from A")
164    nodeB.Update(nodeA)
165    fmt.Printf("  Node B clock: %s\n", nodeB)
166
167    // Event 3: Node A does local operation
168    fmt.Println("\nEvent 3: Node A local operation")
169    nodeA.Tick()
170    fmt.Printf("  Node A clock: %s\n", nodeA)
171
172    // Event 4: Node C does local operation
173    fmt.Println("\nEvent 4: Node C local operation")
174    nodeC.Tick()
175    fmt.Printf("  Node C clock: %s\n", nodeC)
176
177    // Event 5: Node C receives from B
178    fmt.Println("\nEvent 5: Node C receives from B")
179    nodeC.Update(nodeB)
180    fmt.Printf("  Node C clock: %s\n", nodeC)
181
182    // Compare clocks
183    fmt.Println("\n=== Clock Comparisons ===\n")
184
185    // Save states for comparison
186    stateA1 := NewVectorClock("A")
187    stateA1.clock["A"] = 1
188    stateB2 := NewVectorClock("B")
189    stateB2.clock["A"] = 1
190    stateB2.clock["B"] = 1
191    stateA3 := NewVectorClock("A")
192    stateA3.clock["A"] = 2
193    stateC4 := NewVectorClock("C")
194    stateC4.clock["C"] = 1
195
196    fmt.Printf("A1 %s vs B2 %s: %s\n",
197        stateA1, stateB2, stateA1.Compare(stateB2))
198
199    fmt.Printf("B2 %s vs C5 %s: %s\n",
200        stateB2, nodeC, stateB2.Compare(nodeC))
201
202    fmt.Printf("A3 %s vs C4 %s: %s\n",
203        stateA3, stateC4, stateA3.Compare(stateC4))
204
205    // Demonstrate causality detection
206    fmt.Println("\n=== Causality Detection ===\n")
207
208    // Create two concurrent events
209    node1 := NewVectorClock("Node1")
210    node2 := NewVectorClock("Node2")
211
212    // Node1 event
213    node1.Tick()
214    event1 := node1.Copy()
215
216    // Node2 event
217    node2.Tick()
218    event2 := node2.Copy()
219
220    // Node1 receives from Node2
221    node1.Update(node2)
222    event3 := node1.Copy()
223
224    fmt.Printf("Event 1: %s\n", event1)
225    fmt.Printf("Event 2: %s\n", event2)
226    fmt.Printf("Event 3: %s\n", event3)
227
228    fmt.Printf("\nEvent1 vs Event2: %s\n",
229        event1.Compare(event2))
230    fmt.Printf("Event1 vs Event3: %s\n",
231        event1.Compare(event3))
232    fmt.Printf("Event2 vs Event3: %s\n",
233        event2.Compare(event3))
234}

Explanation:

Vector Clock Properties:

  1. Causality Tracking

    • Each node maintains a clock
    • Local events: increment own counter
    • Send message: include vector clock
    • Receive message: merge clocks, then increment
  2. Happened-Before Relation

    • Event A happened before B if: A's clock ≤ B's clock
    • Events are concurrent if: neither A ≤ B nor B ≤ A
  3. Properties

    • If A → B, then VectorClock(A) < VectorClock(B)
    • If VectorClock(A) < VectorClock(B), then A → B
    • If VectorClock(A) || VectorClock(B), no causal relationship

Algorithm:

On local event at node N:
  VectorClock[N]++

On send message from node N to node M:
  VectorClock[N]++
  Include VectorClock in message

On receive message at node M from node N:
  For each node i:
    VectorClock[i] = max(VectorClock[i], ReceivedClock[i])
  VectorClock[M]++

Comparison Rules:

  • A < B: All components A[i] ≤ B[i], and at least one A[i] < B[i]
  • A > B: All components A[i] ≥ B[i], and at least one A[i] > B[i]
  • A || B: Neither A < B nor A > B

Real-World Use Cases:

  1. Distributed Databases

    • Detect conflicts in multi-master replication
    • Cassandra, Riak use vector clocks
    • Resolve concurrent writes
  2. Version Control

    • Git uses similar concept
    • Detect merge conflicts
    • Track file version lineage
  3. Message Ordering

    • Ensure causal delivery of messages
    • Prevent reading effects before causes
    • Implement causal consistency

Advantages:

  • Detect causality without synchronized clocks
  • Identify concurrent operations
  • No single point of failure
  • Works with network partitions

Limitations:

  • Space grows with number of nodes: O(N)
  • Cannot garbage collect old entries easily
  • Doesn't provide total ordering
  • Not suitable for thousands of nodes

Optimizations:

  • Dotted Version Vectors: Reduce size
  • Interval Tree Clocks: Compact representation
  • Pruning: Remove inactive nodes

Exercise 4: Distributed Lock Service

🎯 Learning Objectives:

  • Implement distributed locking mechanisms for coordination
  • Master lease-based locking and renewal strategies
  • Learn to handle network partitions and split-brain scenarios
  • Design lock acquisition with timeout and retry logic

🌍 Real-World Context:
Distributed locks are essential for coordinating access to shared resources across multiple nodes. Apache ZooKeeper and etcd provide distributed locking primitives used by systems like Kafka for leader election and by databases for coordinating schema migrations. Understanding distributed locks is crucial for building systems that need exclusive access to resources in a distributed environment, such as coordinating database migrations or managing access to limited resources.

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert

Build a distributed lock service with lease-based locking, automatic renewal, and proper handling of network partitions. Your implementation should include lock acquisition with timeouts, automatic lease renewal to prevent deadlock, proper lock release mechanisms, and strategies for detecting and handling split-brain scenarios where multiple nodes believe they hold the same lock.

Requirements:

  1. Implement lease-based locking with TTL
  2. Add automatic lease renewal mechanism
  3. Handle network partitions gracefully
  4. Implement lock fencing for safety
  5. Add metrics for lock acquisition/release tracking
Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"errors"
  7	"fmt"
  8	"sync"
  9	"time"
 10
 11	"github.com/google/uuid"
 12)
 13
 14// DistributedLockService provides distributed locking capabilities
 15type DistributedLockService struct {
 16	mu    sync.RWMutex
 17	locks map[string]*LockEntry
 18
 19	// Configuration
 20	defaultTTL time.Duration
 21	clockSkew  time.Duration
 22}
 23
 24// LockEntry represents a held lock
 25type LockEntry struct {
 26	LockID      string
 27	OwnerID     string
 28	ExpiresAt   time.Time
 29	Version     int64
 30	CreatedAt   time.Time
 31	LastRenewed time.Time
 32}
 33
 34// NewDistributedLockService creates a new lock service
 35func NewDistributedLockService(defaultTTL time.Duration) *DistributedLockService {
 36	return &DistributedLockService{
 37		locks:      make(map[string]*LockEntry),
 38		defaultTTL: defaultTTL,
 39		clockSkew:  100 * time.Millisecond, // Account for clock skew
 40	}
 41}
 42
 43// LockRequest represents a lock acquisition request
 44type LockRequest struct {
 45	ResourceID string
 46	OwnerID    string
 47	TTL        time.Duration // Optional, uses default if 0
 48	WaitTime   time.Duration // Max time to wait for lock
 49	RetryDelay time.Duration // Delay between retries
 50}
 51
 52// LockResult represents the result of a lock operation
 53type LockResult struct {
 54	LockID    string
 55	Acquired  bool
 56	ExpiresAt time.Time
 57	Version   int64
 58}
 59
 60// AcquireLock attempts to acquire a distributed lock
 61func AcquireLock(ctx context.Context, req LockRequest) {
 62	if req.TTL == 0 {
 63		req.TTL = dls.defaultTTL
 64	}
 65	if req.RetryDelay == 0 {
 66		req.RetryDelay = 100 * time.Millisecond
 67	}
 68
 69	lockID := uuid.New().String()
 70	now := time.Now()
 71	timeout := now.Add(req.WaitTime)
 72
 73	for {
 74		dls.mu.Lock()
 75
 76		// Check if lock exists and is still valid
 77		if existing, exists := dls.locks[req.ResourceID]; exists {
 78			// Check if lock has expired
 79			if existing.ExpiresAt.Add(dls.clockSkew).After(now) {
 80				// Lock is still valid
 81				dls.mu.Unlock()
 82
 83				// Check if we should retry
 84				if time.Now().After(timeout) {
 85					return nil, errors.New("lock acquisition timeout")
 86				}
 87
 88				// Wait before retrying
 89				select {
 90				case <-time.After(req.RetryDelay):
 91					continue
 92				case <-ctx.Done():
 93					return nil, ctx.Err()
 94				}
 95			}
 96			// Lock has expired, we can acquire it
 97		}
 98
 99		// Acquire the lock
100		entry := &LockEntry{
101			LockID:      lockID,
102			OwnerID:     req.OwnerID,
103			ExpiresAt:   now.Add(req.TTL),
104			Version:     time.Now().UnixNano(),
105			CreatedAt:   now,
106			LastRenewed: now,
107		}
108
109		dls.locks[req.ResourceID] = entry
110		dls.mu.Unlock()
111
112		return &LockResult{
113			LockID:    lockID,
114			Acquired:  true,
115			ExpiresAt: entry.ExpiresAt,
116			Version:   entry.Version,
117		}, nil
118	}
119}
120
121// RenewLock extends the lease of an existing lock
122func RenewLock(resourceID, lockID string, ttl time.Duration) error {
123	dls.mu.Lock()
124	defer dls.mu.Unlock()
125
126	entry, exists := dls.locks[resourceID]
127	if !exists {
128		return errors.New("lock not found")
129	}
130
131	if entry.LockID != lockID {
132		return errors.New("lock not owned by this client")
133	}
134
135	// Check if lock has already expired
136	if time.Now().After(entry.ExpiresAt) {
137		return errors.New("lock has expired")
138	}
139
140	// Renew the lock
141	entry.ExpiresAt = time.Now().Add(ttl)
142	entry.Version++
143	entry.LastRenewed = time.Now()
144
145	return nil
146}
147
148// ReleaseLock releases a distributed lock
149func ReleaseLock(resourceID, lockID string) error {
150	dls.mu.Lock()
151	defer dls.mu.Unlock()
152
153	entry, exists := dls.locks[resourceID]
154	if !exists {
155		return errors.New("lock not found")
156	}
157
158	if entry.LockID != lockID {
159		return errors.New("lock not owned by this client")
160	}
161
162	delete(dls.locks, resourceID)
163	return nil
164}
165
166// IsLocked checks if a resource is currently locked
167func IsLocked(resourceID string) bool {
168	dls.mu.RLock()
169	defer dls.mu.RUnlock()
170
171	entry, exists := dls.locks[resourceID]
172	if !exists {
173		return false
174	}
175
176	// Check if lock has expired
177	return time.Now().Before(entry.ExpiresAt.Add(dls.clockSkew))
178}
179
180// GetLockInfo returns information about a lock
181func GetLockInfo(resourceID string) {
182	dls.mu.RLock()
183	defer dls.mu.RUnlock()
184
185	entry, exists := dls.locks[resourceID]
186	if !exists {
187		return nil, errors.New("lock not found")
188	}
189
190	// Return a copy to prevent external modification
191	return &LockEntry{
192		LockID:      entry.LockID,
193		OwnerID:     entry.OwnerID,
194		ExpiresAt:   entry.ExpiresAt,
195		Version:     entry.Version,
196		CreatedAt:   entry.CreatedAt,
197		LastRenewed: entry.LastRenewed,
198	}, nil
199}
200
201// CleanupExpiredLocks removes expired locks
202func CleanupExpiredLocks() {
203	dls.mu.Lock()
204	defer dls.mu.Unlock()
205
206	now := time.Now()
207	for resourceID, entry := range dls.locks {
208		if now.After(entry.ExpiresAt.Add(dls.clockSkew)) {
209			delete(dls.locks, resourceID)
210		}
211	}
212}
213
214// LockManager manages lock lifecycle with automatic renewal
215type LockManager struct {
216	service    *DistributedLockService
217	ownerID    string
218	locksHeld  map[string]*ManagedLock
219	mu         sync.RWMutex
220	stopChan   chan struct{}
221}
222
223// ManagedLock represents a managed lock with automatic renewal
224type ManagedLock struct {
225	ResourceID string
226	LockID     string
227	ExpiresAt  time.Time
228	cancelRenew context.CancelFunc
229}
230
231// NewLockManager creates a new lock manager
232func NewLockManager(service *DistributedLockService, ownerID string) *LockManager {
233	lm := &LockManager{
234		service:   service,
235		ownerID:   ownerID,
236		locksHeld: make(map[string]*ManagedLock),
237		stopChan:  make(chan struct{}),
238	}
239
240	// Start cleanup routine
241	go lm.cleanupRoutine()
242
243	return lm
244}
245
246// AcquireLock acquires a managed lock with automatic renewal
247func AcquireLock(ctx context.Context, resourceID string, ttl time.Duration) {
248	req := LockRequest{
249		ResourceID: resourceID,
250		OwnerID:    lm.ownerID,
251		TTL:        ttl,
252		WaitTime:   30 * time.Second,
253		RetryDelay: 100 * time.Millisecond,
254	}
255
256	result, err := lm.service.AcquireLock(ctx, req)
257	if err != nil {
258		return nil, err
259	}
260
261	if !result.Acquired {
262		return nil, errors.New("failed to acquire lock")
263	}
264
265	// Create managed lock with automatic renewal
266	renewCtx, cancelRenew := context.WithCancel(context.Background())
267	managedLock := &ManagedLock{
268		ResourceID: resourceID,
269		LockID:     result.LockID,
270		ExpiresAt:  result.ExpiresAt,
271		cancelRenew: cancelRenew,
272	}
273
274	// Start renewal routine
275	go lm.renewalRoutine(renewCtx, managedLock, ttl/3) // Renew at 1/3 of TTL
276
277	lm.mu.Lock()
278	lm.locksHeld[resourceID] = managedLock
279	lm.mu.Unlock()
280
281	return managedLock, nil
282}
283
284// ReleaseLock releases a managed lock
285func ReleaseLock(managedLock *ManagedLock) error {
286	// Stop renewal
287	managedLock.cancelRenew()
288
289	// Release the lock
290	err := lm.service.ReleaseLock(managedLock.ResourceID, managedLock.LockID)
291
292	// Remove from held locks
293	lm.mu.Lock()
294	delete(lm.locksHeld, managedLock.ResourceID)
295	lm.mu.Unlock()
296
297	return err
298}
299
300// renewalRoutine automatically renews locks
301func renewalRoutine(ctx context.Context, managedLock *ManagedLock, renewInterval time.Duration) {
302	ticker := time.NewTicker(renewInterval)
303	defer ticker.Stop()
304
305	for {
306		select {
307		case <-ctx.Done():
308			return
309		case <-ticker.C:
310			// Check if lock is still valid
311			info, err := lm.service.GetLockInfo(managedLock.ResourceID)
312			if err != nil || info.LockID != managedLock.LockID {
313				// Lock is no longer valid, stop renewal
314				return
315			}
316
317			// Renew the lock
318			err = lm.service.RenewLock(managedLock.ResourceID, managedLock.LockID, renewInterval*3)
319			if err != nil {
320				// Renewal failed, stop renewal
321				return
322			}
323
324			// Update expiration time
325			newInfo, _ := lm.service.GetLockInfo(managedLock.ResourceID)
326			if newInfo != nil {
327				managedLock.ExpiresAt = newInfo.ExpiresAt
328			}
329		}
330	}
331}
332
333// cleanupRoutine periodically cleans up expired locks
334func cleanupRoutine() {
335	ticker := time.NewTicker(10 * time.Second)
336	defer ticker.Stop()
337
338	for {
339		select {
340		case <-lm.stopChan:
341			return
342		case <-ticker.C:
343			lm.service.CleanupExpiredLocks()
344		}
345	}
346}
347
348// Stop stops the lock manager
349func Stop() {
350	close(lm.stopChan)
351
352	lm.mu.Lock()
353	defer lm.mu.Unlock()
354
355	// Release all held locks
356	for _, managedLock := range lm.locksHeld {
357		managedLock.cancelRenew()
358		lm.service.ReleaseLock(managedLock.ResourceID, managedLock.LockID)
359	}
360	lm.locksHeld = make(map[string]*ManagedLock)
361}
362
363// GetStats returns lock service statistics
364func GetStats() map[string]interface{} {
365	dls.mu.RLock()
366	defer dls.mu.RUnlock()
367
368	activeLocks := 0
369	expiredLocks := 0
370	now := time.Now()
371
372	for _, entry := range dls.locks {
373		if now.Before(entry.ExpiresAt) {
374			activeLocks++
375		} else {
376			expiredLocks++
377		}
378	}
379
380	return map[string]interface{}{
381		"total_locks":   len(dls.locks),
382		"active_locks":  activeLocks,
383		"expired_locks": expiredLocks,
384		"default_ttl":   dls.defaultTTL,
385	}
386}
387
388func main() {
389	// Create distributed lock service
390	lockService := NewDistributedLockService(5 * time.Second)
391
392	// Create lock manager for a client
393	lockManager := NewLockManager(lockService, "client-123")
394	defer lockManager.Stop()
395
396	fmt.Println("=== Distributed Lock Service Demo ===\n")
397
398	// Scenario 1: Basic lock acquisition and release
399	fmt.Println("Scenario 1: Basic Lock Operations")
400	ctx := context.Background()
401
402	lock1, err := lockManager.AcquireLock(ctx, "resource-1", 3*time.Second)
403	if err != nil {
404		panic(err)
405	}
406	fmt.Printf("Acquired lock for resource-1: %s\n",
407		lock1.LockID, lock1.ExpiresAt.Format("15:04:05"))
408
409	// Try to acquire the same lock from another client
410	lockService2 := NewDistributedLockService(5 * time.Second)
411	req := LockRequest{
412		ResourceID: "resource-1",
413		OwnerID:    "client-456",
414		TTL:        3 * time.Second,
415		WaitTime:   2 * time.Second,
416	}
417
418	result, err := lockService2.AcquireLock(ctx, req)
419	if err != nil {
420		fmt.Printf("Client-456 failed to acquire lock: %v\n", err)
421	} else if !result.Acquired {
422		fmt.Println("Client-456: Lock not available")
423	}
424
425	// Release the lock
426	err = lockManager.ReleaseLock(lock1)
427	if err != nil {
428		panic(err)
429	}
430	fmt.Println("Released lock for resource-1")
431
432	// Scenario 2: Lock expiration and reclamation
433	fmt.Println("\nScenario 2: Lock Expiration")
434	expiringLock, err := lockManager.AcquireLock(ctx, "resource-2", 1*time.Second)
435	if err != nil {
436		panic(err)
437	}
438	fmt.Printf("Acquired expiring lock: %s\n", expiringLock.LockID)
439
440	// Wait for expiration
441	time.Sleep(2 * time.Second)
442
443	// Now another client should be able to acquire it
444	result, err = lockService2.AcquireLock(ctx, req)
445	if err != nil {
446		panic(err)
447	}
448	if result.Acquired {
449		fmt.Printf("Client-456 acquired expired lock: %s\n", result.LockID)
450	}
451
452	// Scenario 3: Lock renewal
453	fmt.Println("\nScenario 3: Automatic Lock Renewal")
454	renewableLock, err := lockManager.AcquireLock(ctx, "resource-3", 2*time.Second)
455	if err != nil {
456		panic(err)
457	}
458	fmt.Printf("Acquired renewable lock: %s\n", renewableLock.LockID)
459
460	// Let it renew automatically
461	time.Sleep(4 * time.Second)
462
463	// Check if lock is still held
464	isLocked := lockService.IsLocked("resource-3")
465	fmt.Printf("Lock still held after 4 seconds: %v\n", isLocked)
466
467	// Print final statistics
468	fmt.Println("\n=== Lock Service Statistics ===")
469	stats := lockService.GetStats()
470	for k, v := range stats {
471		fmt.Printf("%s: %v\n", k, v)
472	}
473}

Explanation:

Distributed Lock Service Design:

  1. Lease-Based Locking: Locks have TTL to prevent deadlocks. If a client crashes, the lock automatically expires.

  2. Automatic Renewal: LockManager automatically renews locks before expiration to prevent accidental loss.

  3. Clock Skew Handling: Accounts for network delays and clock differences between nodes.

  4. Lock Verification: Clients must provide the lock ID to release or renew, preventing accidental releases.

  5. Cleanup Mechanism: Periodic cleanup removes expired locks to prevent memory leaks.

Key Components:

  1. DistributedLockService: Core lock management with acquire, renew, release operations.

  2. LockManager: Higher-level abstraction with automatic renewal and lifecycle management.

  3. ManagedLock: Represents a lock with automatic renewal capabilities.

Safety Features:

  1. Fencing Tokens: Version numbers ensure operations are applied in correct order.

  2. Expiration Handling: Automatic cleanup prevents permanent lock accumulation.

  3. Retry Logic: Exponential backoff for lock acquisition attempts.

  4. Context Support: Proper cancellation and timeout handling.

Real-World Considerations:

  1. Split-Brain Prevention: Clock skew tolerance and lease expiration handle network partitions.

  2. Performance: In-memory storage with efficient locking using sync.RWMutex.

  3. Monitoring: Built-in metrics for lock acquisition, expiration, and renewal tracking.

  4. Extensibility: Can be extended to use external storage for persistence.

Use Cases:

  • Leader election in distributed systems
  • Coordinating database schema migrations
  • Managing access to limited resources
  • Preventing concurrent execution of critical operations

Exercise 5: Consistent Hashing Ring

🎯 Learning Objectives:

  • Implement consistent hashing for load distribution
  • Master virtual nodes for improved key distribution
  • Learn to handle node addition and removal with minimal data movement
  • Design ring-based data partitioning systems

🌍 Real-World Context:
Consistent hashing is fundamental to distributed systems like Amazon DynamoDB and Cassandra for data partitioning. Content delivery networks use consistent hashing to cache content across edge servers efficiently. When you watch Netflix, consistent hashing ensures your request hits the same cache server that has the content you need, minimizing cache misses and improving performance.

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Expert

Build a consistent hashing ring implementation that distributes keys across nodes with minimal remapping when nodes are added or removed. Your implementation should include virtual nodes for better load distribution, support for weighted nodes, efficient key lookup operations, and mechanisms for handling node failures and additions gracefully.

Requirements:

  1. Implement the core consistent hashing ring algorithm
  2. Add virtual nodes for improved distribution
  3. Support weighted nodes
  4. Handle node addition/removal with minimal key movement
  5. Include load balancing metrics and statistics
Solution with Explanation
  1// run
  2package main
  3
  4import (
  5	"crypto/sha1"
  6	"fmt"
  7	"sort"
  8	"strconv"
  9	"sync"
 10)
 11
 12// Node represents a server in the consistent hash ring
 13type Node struct {
 14	ID       string
 15	Weight   int // Higher weight = more virtual nodes
 16	ActualID string // For virtual nodes, points to actual node
 17}
 18
 19// ConsistentHashRing implements consistent hashing
 20type ConsistentHashRing struct {
 21	nodes      map[uint32]*Node // Hash -> Node mapping
 22	sortedKeys []uint32         // Sorted hash keys for binary search
 23	nodeMap    map[string]*Node // Node ID -> Node mapping
 24	mu         sync.RWMutex
 25}
 26
 27// NewConsistentHashRing creates a new consistent hash ring
 28func NewConsistentHashRing() *ConsistentHashRing {
 29	return &ConsistentHashRing{
 30		nodes:   make(map[uint32]*Node),
 31		nodeMap: make(map[string]*Node),
 32	}
 33}
 34
 35// AddNode adds a node to the hash ring
 36func AddNode(nodeID string, weight int) {
 37	chr.mu.Lock()
 38	defer chr.mu.Unlock()
 39
 40	// Remove existing node if present
 41	chr.removeNodeUnsafe(nodeID)
 42
 43	node := &Node{
 44		ID:       nodeID,
 45		Weight:   weight,
 46		ActualID: nodeID,
 47	}
 48
 49	// Add virtual nodes based on weight
 50	virtualCount := weight
 51	if virtualCount < 1 {
 52		virtualCount = 1
 53	}
 54
 55	for i := 0; i < virtualCount; i++ {
 56		virtualNodeID := fmt.Sprintf("%s#%d", nodeID, i)
 57		hash := chr.hashKey(virtualNodeID)
 58
 59		virtualNode := &Node{
 60			ID:       virtualNodeID,
 61			Weight:   weight,
 62			ActualID: nodeID,
 63		}
 64
 65		chr.nodes[hash] = virtualNode
 66		chr.sortedKeys = append(chr.sortedKeys, hash)
 67	}
 68
 69	chr.nodeMap[nodeID] = node
 70	chr.sortKeysUnsafe()
 71}
 72
 73// RemoveNode removes a node from the hash ring
 74func RemoveNode(nodeID string) {
 75	chr.mu.Lock()
 76	defer chr.mu.Unlock()
 77	chr.removeNodeUnsafe(nodeID)
 78}
 79
 80// removeNodeUnsafe removes a node without locking
 81func removeNodeUnsafe(nodeID string) {
 82	node, exists := chr.nodeMap[nodeID]
 83	if !exists {
 84		return
 85	}
 86
 87	// Remove all virtual nodes for this actual node
 88	virtualCount := node.Weight
 89	if virtualCount < 1 {
 90		virtualCount = 1
 91	}
 92
 93	for i := 0; i < virtualCount; i++ {
 94		virtualNodeID := fmt.Sprintf("%s#%d", nodeID, i)
 95		hash := chr.hashKey(virtualNodeID)
 96		delete(chr.nodes, hash)
 97
 98		// Remove from sorted keys
 99		for j, key := range chr.sortedKeys {
100			if key == hash {
101				chr.sortedKeys = append(chr.sortedKeys[:j], chr.sortedKeys[j+1:]...)
102				break
103			}
104		}
105	}
106
107	delete(chr.nodeMap, nodeID)
108}
109
110// GetNode returns the node responsible for the given key
111func GetNode(key string) *Node {
112	chr.mu.RLock()
113	defer chr.mu.RUnlock()
114
115	if len(chr.nodes) == 0 {
116		return nil
117	}
118
119	hash := chr.hashKey(key)
120
121	// Find the first node with hash >= key hash
122	idx := chr.searchKey(hash)
123
124	// If no node found, wrap around to the first node
125	if idx == len(chr.sortedKeys) {
126		idx = 0
127	}
128
129	virtualNode := chr.nodes[chr.sortedKeys[idx]]
130
131	// Return the actual node
132	return chr.nodeMap[virtualNode.ActualID]
133}
134
135// GetNodes returns the N nodes responsible for the given key
136func GetNodes(key string, count int) []*Node {
137	chr.mu.RLock()
138	defer chr.mu.RUnlock()
139
140	if len(chr.nodes) == 0 {
141		return nil
142	}
143
144	if count <= 0 {
145		return nil
146	}
147
148	hash := chr.hashKey(key)
149	idx := chr.searchKey(hash)
150
151	if idx == len(chr.sortedKeys) {
152		idx = 0
153	}
154
155	result := make([]*Node, 0, count)
156	seen := make(map[string]bool)
157
158	// Collect N unique actual nodes
159	for len(result) < count && len(seen) < len(chr.nodeMap) {
160		if idx >= len(chr.sortedKeys) {
161			idx = 0
162		}
163
164		virtualNode := chr.nodes[chr.sortedKeys[idx]]
165		actualNode := chr.nodeMap[virtualNode.ActualID]
166
167		if !seen[actualNode.ID] {
168			result = append(result, actualNode)
169			seen[actualNode.ID] = true
170		}
171
172		idx++
173	}
174
175	return result
176}
177
178// hashKey computes SHA1 hash of a key
179func hashKey(key string) uint32 {
180	hash := sha1.Sum([]byte(key))
181
182	// Use first 4 bytes of SHA1 hash
183	return uint32(hash[0])<<24 | uint32(hash[1])<<16 | uint32(hash[2])<<8 | uint32(hash[3])
184}
185
186// sortKeysUnsafe sorts the hash keys
187func sortKeysUnsafe() {
188	sort.Slice(chr.sortedKeys, func(i, j int) bool {
189		return chr.sortedKeys[i] < chr.sortedKeys[j]
190	})
191}
192
193// searchKey finds the index of the first key >= target
194func searchKey(target uint32) int {
195	return sort.Search(len(chr.sortedKeys), func(i int) bool {
196		return chr.sortedKeys[i] >= target
197	})
198}
199
200// GetNodes returns all nodes in the ring
201func GetNodes() []*Node {
202	chr.mu.RLock()
203	defer chr.mu.RUnlock()
204
205	result := make([]*Node, 0, len(chr.nodeMap))
206	for _, node := range chr.nodeMap {
207		result = append(result, node)
208	}
209
210	return result
211}
212
213// GetNodeCount returns the number of nodes in the ring
214func GetNodeCount() int {
215	chr.mu.RLock()
216	defer chr.mu.RUnlock()
217	return len(chr.nodeMap)
218}
219
220// GetDistribution returns the distribution of keys across nodes
221func GetDistribution(sampleKeys []string) map[string]int {
222	chr.mu.RLock()
223	defer chr.mu.RUnlock()
224
225	distribution := make(map[string]int)
226
227	for _, key := range sampleKeys {
228		node := chr.GetNode(key)
229		if node != nil {
230			distribution[node.ID]++
231		}
232	}
233
234	return distribution
235}
236
237// Stats returns statistics about the hash ring
238func Stats() map[string]interface{} {
239	chr.mu.RLock()
240	defer chr.mu.RUnlock()
241
242	weightSum := 0
243	for _, node := range chr.nodeMap {
244		weightSum += node.Weight
245	}
246
247	return map[string]interface{}{
248		"actual_nodes":    len(chr.nodeMap),
249		"virtual_nodes":   len(chr.nodes),
250		"total_weight":    weightSum,
251		"average_weight":  float64(weightSum) / float64(len(chr.nodeMap)),
252	}
253}
254
255// LoadBalancer demonstrates using consistent hashing for load balancing
256type LoadBalancer struct {
257	ring *ConsistentHashRing
258}
259
260// NewLoadBalancer creates a new load balancer
261func NewLoadBalancer() *LoadBalancer {
262	return &LoadBalancer{
263		ring: NewConsistentHashRing(),
264	}
265}
266
267// AddServer adds a server to the load balancer
268func AddServer(serverID string, capacity int) {
269	lb.ring.AddNode(serverID, capacity)
270}
271
272// RemoveServer removes a server from the load balancer
273func RemoveServer(serverID string) {
274	lb.ring.RemoveNode(serverID)
275}
276
277// RouteRequest routes a request to the appropriate server
278func RouteRequest(requestID string) string {
279	node := lb.ring.GetNode(requestID)
280	if node == nil {
281		return ""
282	}
283	return node.ID
284}
285
286// RouteForReplication routes a request to multiple servers for replication
287func RouteForReplication(requestID string, replicationFactor int) []string {
288	nodes := lb.ring.GetNodes(requestID, replicationFactor)
289	serverIDs := make([]string, len(nodes))
290	for i, node := range nodes {
291		serverIDs[i] = node.ID
292	}
293	return serverIDs
294}
295
296func main() {
297	fmt.Println("=== Consistent Hashing Ring Demo ===\n")
298
299	// Create a consistent hash ring
300	ring := NewConsistentHashRing()
301
302	// Add nodes with different weights
303	nodes := []struct {
304		id     string
305		weight int
306	}{
307		{"server-a", 3}, // Higher capacity
308		{"server-b", 2}, // Medium capacity
309		{"server-c", 1}, // Lower capacity
310		{"server-d", 2},
311		{"server-e", 1},
312	}
313
314	fmt.Println("Adding nodes to the ring:")
315	for _, node := range nodes {
316		ring.AddNode(node.id, node.weight)
317		fmt.Printf("  Added %s\n", node.id, node.weight)
318	}
319
320	// Print ring statistics
321	stats := ring.Stats()
322	fmt.Printf("\nRing Statistics: %+v\n\n", stats)
323
324	// Test key distribution
325	fmt.Println("Testing key distribution:")
326	testKeys := []string{
327		"user-123", "user-456", "user-789", "user-abc", "user-def",
328		"order-1", "order-2", "order-3", "order-4", "order-5",
329		"product-a", "product-b", "product-c", "product-d", "product-e",
330		"session-x", "session-y", "session-z", "session-1", "session-2",
331	}
332
333	distribution := ring.GetDistribution(testKeys)
334	for serverID, count := range distribution {
335		fmt.Printf("  %s: %d keys\n", serverID, count, float64(count)/float64(len(testKeys))*100)
336	}
337
338	// Test replication
339	fmt.Println("\nTesting replication:")
340	for _, key := range testKeys[:5] {
341		nodes := ring.GetNodes(key, 3)
342		nodeIDs := make([]string, len(nodes))
343		for i, node := range nodes {
344			nodeIDs[i] = node.ID
345		}
346		fmt.Printf("  %s -> %v\n", key, nodeIDs)
347	}
348
349	// Test node removal and key remapping
350	fmt.Println("\nTesting node removal:")
351	removedKey := testKeys[0]
352	originalNode := ring.GetNode(removedKey)
353	fmt.Printf("Key '%s' initially maps to: %s\n", removedKey, originalNode.ID)
354
355	// Remove a node
356	ring.RemoveNode("server-c")
357	fmt.Println("Removed server-c")
358
359	// Check where the key maps now
360	newNode := ring.GetNode(removedKey)
361	fmt.Printf("Key '%s' now maps to: %s\n", removedKey, newNode.ID)
362
363	// Calculate key movement
364	movement := 0
365	for _, key := range testKeys {
366		originalMapping := ring.GetNode(key) // This would need to be tracked properly
367		// For demo, we're just showing current state
368	}
369
370	// Demonstrate load balancer usage
371	fmt.Println("\n=== Load Balancer Demo ===")
372	lb := NewLoadBalancer()
373
374	// Add servers with different capacities
375	lb.AddServer("web-server-1", 3)
376	lb.AddServer("web-server-2", 2)
377	lb.AddServer("web-server-3", 1)
378
379	// Route requests
380	requests := []string{
381		"req-001", "req-002", "req-003", "req-004", "req-005",
382		"req-006", "req-007", "req-008", "req-009", "req-010",
383	}
384
385	fmt.Println("Routing requests:")
386	for _, req := range requests {
387		server := lb.RouteRequest(req)
388		fmt.Printf("  %s -> %s\n", req, server)
389	}
390
391	// Show replication routing
392	fmt.Println("\nReplication routing:")
393	for _, req := range requests[:3] {
394		servers := lb.RouteForReplication(req, 2)
395		fmt.Printf("  %s -> %v\n", req, servers)
396	}
397
398	// Test load balancing with many requests
399	fmt.Println("\nLoad distribution with 1000 requests:")
400	requestCount := 1000
401	requestDist := make(map[string]int)
402
403	for i := 0; i < requestCount; i++ {
404		reqID := "req-" + strconv.Itoa(i)
405		server := lb.RouteRequest(reqID)
406		requestDist[server]++
407	}
408
409	for server, count := range requestDist {
410		percentage := float64(count) / float64(requestCount) * 100
411		fmt.Printf("  %s: %d requests\n", server, count, percentage)
412	}
413
414	// Final statistics
415	fmt.Println("\n=== Final Ring Statistics ===")
416	finalStats := ring.Stats()
417	for k, v := range finalStats {
418		fmt.Printf("%s: %v\n", k, v)
419	}
420}

Explanation:

Consistent Hashing Algorithm:

  1. Ring Structure: Nodes and keys are placed on a virtual ring using hash values.

  2. Virtual Nodes: Each physical node has multiple virtual nodes for better distribution.

  3. Key Lookup: Find the first node clockwise from the key's hash position.

  4. Minimal Remapping: When nodes are added/removed, only keys in affected ranges are remapped.

Key Components:

  1. ConsistentHashRing: Core implementation with ring management and key lookup.

  2. Node Structure: Represents both physical and virtual nodes with weights.

  3. LoadBalancer: Higher-level abstraction demonstrating practical usage.

Algorithm Details:

  1. Hash Function: Uses SHA1 for uniform distribution of keys and nodes.

  2. Binary Search: Efficient key lookup with O(log N) complexity.

  3. Weight Distribution: Higher weight nodes get more virtual nodes, proportionally more keys.

  4. Replication Support: Get multiple nodes for the same key for data redundancy.

Load Balancing Benefits:

  1. Uniform Distribution: Virtual nodes ensure even key distribution.

  2. Scalability: Adding/removing nodes affects minimal key ranges.

  3. Fault Tolerance: Node failures only affect keys mapped to that node.

  4. Weighted Routing: Higher capacity servers handle proportionally more requests.

Real-World Applications:

  1. Distributed Caches: Memcached, Redis clusters for data partitioning.

  2. Databases: Cassandra, DynamoDB for data distribution across nodes.

  3. CDNs: Content distribution across edge servers.

  4. Load Balancers: Request routing to backend servers.

Advantages:

  • Scalability: Easy to add/remove nodes
  • Performance: O(log N) lookup time
  • Flexibility: Supports weighted nodes
  • Reliability: Minimal disruption during topology changes

Use Cases:

  • Session affinity in web applications
  • Cache key distribution
  • Database sharding
  • Content distribution networks

Summary

In this comprehensive tutorial, you've mastered the essential patterns and techniques for building production-ready distributed systems in Go. From understanding the fundamental challenges posed by the CAP theorem to implementing sophisticated patterns like circuit breakers, consensus algorithms, and distributed locks, you now have the tools to design resilient systems that operate at scale.

Key Concepts Mastered:

  1. CAP Theorem and Trade-offs: Understanding that consistency, availability, and partition tolerance cannot all be achieved simultaneously, and making informed architectural decisions based on business requirements.

  2. Service Discovery: Implementing dynamic service registration and health checking using patterns that enable microservices to find and communicate with each other reliably.

  3. Circuit Breaker Pattern: Building fault-tolerant systems that fail fast and gracefully degrade when dependencies are unhealthy, preventing cascade failures across distributed services.

  4. Raft Consensus: Implementing strong consistency through distributed consensus, enabling leader election and replicated state machines that survive node failures.

  5. Distributed Locks: Coordinating exclusive access to resources across multiple processes using lock leasing, timeouts, and fencing tokens to prevent split-brain scenarios.

  6. Consistent Hashing: Distributing load evenly across nodes while minimizing key remapping during topology changes, essential for caching systems and databases.

Production-Ready Patterns:

  • Always implement timeouts and retry logic with exponential backoff
  • Use idempotency keys to make operations safe to retry
  • Apply circuit breakers to protect against cascade failures
  • Design for eventual consistency when strong consistency isn't required
  • Implement comprehensive observability with logs, metrics, and traces
  • Deploy changes gradually using feature flags and canary deployments
  • Plan for disaster recovery with multi-region failover strategies

When to Apply These Patterns:

Distributed systems patterns solve real problems but add significant complexity. Use them when:

  • Scale requirements exceed single-machine capabilities
  • High availability is critical to your business
  • You need to coordinate state across multiple services
  • Network partitions and failures are expected

However, remember that many systems don't need these patterns. Start with a well-designed monolith and adopt distributed systems patterns only when the business value justifies the added operational complexity.

Next Steps:

To deepen your expertise:

  • Practice the exercises to solidify your understanding
  • Study the referenced papers and books for theoretical foundations
  • Experiment with production tools like etcd, Consul, and Istio
  • Apply chaos engineering to test your systems' resilience
  • Learn from production incidents and document lessons learned

Distributed systems engineering is a journey of continuous learning. The patterns you've learned here are foundational, but each production system will teach you new lessons about scale, reliability, and the inevitable surprises that come with coordinating multiple computers across unreliable networks.

Build systems that embrace failure, monitor everything, and always ask: "Does the complexity justify the benefits?" Your future self - and your operations team - will thank you.