Distributed Lock

Exercise: Distributed Lock

Difficulty - Advanced

Learning Objectives

  • Implement distributed locking with Redis
  • Handle lock acquisition and release
  • Implement lock expiration and renewal
  • Handle network failures and split-brain scenarios
  • Use Lua scripts for atomic operations
  • Implement lock reentrant behavior

Problem Statement

Create a distributed lock implementation using Redis that handles timeouts, automatic renewal, and failure scenarios.

Core Components

 1package distlock
 2
 3import (
 4    "context"
 5    "time"
 6)
 7
 8type Lock struct {
 9    key      string
10    value    string
11    ttl      time.Duration
12    client   RedisClient
13    stopChan chan struct{}
14}
15
16type RedisClient interface {
17    SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration)
18    Eval(ctx context.Context, script string, keys []string, args ...interface{})
19    Del(ctx context.Context, keys ...string) error
20}
21
22func New(client RedisClient, key string, ttl time.Duration) *Lock
23func Lock(ctx context.Context) error
24func Unlock(ctx context.Context) error
25func TryLock(ctx context.Context)
26func Refresh(ctx context.Context) error
27func StartAutoRenewal(ctx context.Context, interval time.Duration)

Solution

Click to see the solution

Algorithm Overview

The distributed lock implementation uses Redis as a coordination service with the following key principles:

1. Lock Acquisition Strategy:

  • Uses Redis SETNX for atomic lock acquisition
  • Generates unique token per lock instance to prevent accidental unlocking
  • Sets TTL to prevent deadlocks from crashed processes
  • Implements retry with exponential backoff for contention scenarios

2. Atomic Operations with Lua Scripts:

  • Unlock and refresh operations must be atomic
  • Lua scripts execute atomically in Redis, preventing race conditions
  • Scripts verify ownership before modifying lock state

3. Auto-Renewal Mechanism:

  • Background goroutine extends lock TTL periodically
  • Prevents expiration during long-running critical sections
  • Stops renewal automatically when lock is released

4. Safety Guarantees:

  • Mutual exclusion: Only one client can hold lock at a time
  • Deadlock freedom: Locks auto-expire if holder crashes
  • Ownership tracking: Unique values prevent unauthorized unlock

Time Complexity Analysis

Operation Time Complexity Explanation
Lock O(1) Single Redis SETNX command
Lock O(n) worst case n retries until acquisition
Unlock O(1) Single Lua script execution
Refresh O(1) Single Lua script execution
Auto-renewal O(1) per tick Periodic refresh operation

Space Complexity

  • O(1) per lock instance
  • Each lock stores: key, value, metadata
  • Memory footprint: ~100-200 bytes per lock in Redis

Implementation

  1package distlock
  2
  3import (
  4    "context"
  5    "crypto/rand"
  6    "encoding/hex"
  7    "errors"
  8    "fmt"
  9    "time"
 10)
 11
 12var (
 13    ErrLockNotAcquired = errors.New("lock not acquired")
 14    ErrLockNotHeld     = errors.New("lock not held")
 15    ErrLockExpired     = errors.New("lock expired")
 16)
 17
 18// Lua script for atomic unlock
 19const unlockScript = `
 20if redis.call("get", KEYS[1]) == ARGV[1] then
 21    return redis.call("del", KEYS[1])
 22else
 23    return 0
 24end
 25`
 26
 27// Lua script for atomic refresh
 28const refreshScript = `
 29if redis.call("get", KEYS[1]) == ARGV[1] then
 30    return redis.call("pexpire", KEYS[1], ARGV[2])
 31else
 32    return 0
 33end
 34`
 35
 36type RedisClient interface {
 37    SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration)
 38    Eval(ctx context.Context, script string, keys []string, args ...interface{})
 39    Del(ctx context.Context, keys ...string) error
 40}
 41
 42type Lock struct {
 43    key      string
 44    value    string
 45    ttl      time.Duration
 46    client   RedisClient
 47    stopChan chan struct{}
 48    stopped  bool
 49}
 50
 51func New(client RedisClient, key string, ttl time.Duration) *Lock {
 52    return &Lock{
 53        key:      key,
 54        value:    generateUniqueValue(),
 55        ttl:      ttl,
 56        client:   client,
 57        stopChan: make(chan struct{}),
 58    }
 59}
 60
 61func generateUniqueValue() string {
 62    b := make([]byte, 16)
 63    rand.Read(b)
 64    return hex.EncodeToString(b)
 65}
 66
 67func Lock(ctx context.Context) error {
 68    ticker := time.NewTicker(100 * time.Millisecond)
 69    defer ticker.Stop()
 70
 71    for {
 72        select {
 73        case <-ctx.Done():
 74            return ctx.Err()
 75        case <-ticker.C:
 76            acquired, err := l.TryLock(ctx)
 77            if err != nil {
 78                return err
 79            }
 80            if acquired {
 81                return nil
 82            }
 83        }
 84    }
 85}
 86
 87func TryLock(ctx context.Context) {
 88    acquired, err := l.client.SetNX(ctx, l.key, l.value, l.ttl)
 89    if err != nil {
 90        return false, fmt.Errorf("failed to acquire lock: %w", err)
 91    }
 92    return acquired, nil
 93}
 94
 95func Unlock(ctx context.Context) error {
 96    result, err := l.client.Eval(ctx, unlockScript, []string{l.key}, l.value)
 97    if err != nil {
 98        return fmt.Errorf("failed to unlock: %w", err)
 99    }
100
101    // Stop auto-renewal if running
102    if !l.stopped {
103        close(l.stopChan)
104        l.stopped = true
105    }
106
107    deleted, ok := result.(int64)
108    if !ok || deleted == 0 {
109        return ErrLockNotHeld
110    }
111
112    return nil
113}
114
115func Refresh(ctx context.Context) error {
116    ttlMs := l.ttl.Milliseconds()
117    result, err := l.client.Eval(ctx, refreshScript, []string{l.key}, l.value, ttlMs)
118    if err != nil {
119        return fmt.Errorf("failed to refresh lock: %w", err)
120    }
121
122    refreshed, ok := result.(int64)
123    if !ok || refreshed == 0 {
124        return ErrLockNotHeld
125    }
126
127    return nil
128}
129
130func StartAutoRenewal(ctx context.Context, interval time.Duration) {
131    go func() {
132        ticker := time.NewTicker(interval)
133        defer ticker.Stop()
134
135        for {
136            select {
137            case <-ctx.Done():
138                return
139            case <-l.stopChan:
140                return
141            case <-ticker.C:
142                if err := l.Refresh(ctx); err != nil {
143                    // Log error in production
144                    return
145                }
146            }
147        }
148    }()
149}
150
151// WithLock is a helper that acquires a lock, executes a function, and releases the lock
152func WithLock(ctx context.Context, lock *Lock, fn func() error) error {
153    if err := lock.Lock(ctx); err != nil {
154        return err
155    }
156    defer lock.Unlock(ctx)
157
158    return fn()
159}
160
161// Example usage with redis client wrapper
162type RedisLockClient struct {
163    // Wrap actual Redis client
164}
165
166func SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) {
167    // Implementation using actual Redis client
168    // Example with go-redis:
169    // return r.client.SetNX(ctx, key, value, expiration).Result()
170    return false, nil
171}
172
173func Eval(ctx context.Context, script string, keys []string, args ...interface{}) {
174    // Implementation using actual Redis client
175    // Example with go-redis:
176    // return r.client.Eval(ctx, script, keys, args...).Result()
177    return nil, nil
178}
179
180func Del(ctx context.Context, keys ...string) error {
181    // Implementation using actual Redis client
182    // Example with go-redis:
183    // return r.client.Del(ctx, keys...).Err()
184    return nil
185}

Usage Example

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "time"
 7)
 8
 9func main() {
10    // Create Redis client
11    client := &RedisLockClient{
12        // Initialize with actual Redis connection
13    }
14
15    // Create a lock with 10-second TTL
16    lock := distlock.New(client, "my-resource", 10*time.Second)
17
18    ctx := context.Background()
19
20    // Option 1: Manual lock/unlock
21    if err := lock.Lock(ctx); err != nil {
22        panic(err)
23    }
24    defer lock.Unlock(ctx)
25
26    // Start auto-renewal
27    lock.StartAutoRenewal(ctx, 5*time.Second)
28
29    // Do work
30    fmt.Println("Lock acquired, doing work...")
31    time.Sleep(2 * time.Second)
32
33    // Option 2: Use WithLock helper
34    err := distlock.WithLock(ctx, lock, func() error {
35        fmt.Println("Doing work with lock...")
36        time.Sleep(2 * time.Second)
37        return nil
38    })
39    if err != nil {
40        panic(err)
41    }
42
43    // Option 3: Try lock
44    acquired, err := lock.TryLock(ctx)
45    if err != nil {
46        panic(err)
47    }
48    if acquired {
49        defer lock.Unlock(ctx)
50        fmt.Println("Lock acquired immediately")
51    } else {
52        fmt.Println("Lock not available")
53    }
54}

Benchmarking Code

  1package distlock_test
  2
  3import (
  4    "context"
  5    "sync"
  6    "sync/atomic"
  7    "testing"
  8    "time"
  9)
 10
 11// MockRedisClient for testing
 12type MockRedisClient struct {
 13    mu      sync.Mutex
 14    storage map[string]string
 15    expiry  map[string]time.Time
 16}
 17
 18func NewMockRedisClient() *MockRedisClient {
 19    return &MockRedisClient{
 20        storage: make(map[string]string),
 21        expiry:  make(map[string]time.Time),
 22    }
 23}
 24
 25func SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) {
 26    m.mu.Lock()
 27    defer m.mu.Unlock()
 28
 29    if _, exists := m.storage[key]; exists {
 30        if exp, ok := m.expiry[key]; ok && time.Now().Before(exp) {
 31            return false, nil
 32        }
 33    }
 34
 35    m.storage[key] = value.(string)
 36    m.expiry[key] = time.Now().Add(expiration)
 37    return true, nil
 38}
 39
 40func Eval(ctx context.Context, script string, keys []string, args ...interface{}) {
 41    m.mu.Lock()
 42    defer m.mu.Unlock()
 43
 44    key := keys[0]
 45    value := args[0].(string)
 46
 47    if m.storage[key] == value {
 48        delete(m.storage, key)
 49        delete(m.expiry, key)
 50        return int64(1), nil
 51    }
 52    return int64(0), nil
 53}
 54
 55func Del(ctx context.Context, keys ...string) error {
 56    m.mu.Lock()
 57    defer m.mu.Unlock()
 58    for _, key := range keys {
 59        delete(m.storage, key)
 60        delete(m.expiry, key)
 61    }
 62    return nil
 63}
 64
 65// Benchmark lock acquisition without contention
 66func BenchmarkLockAcquisition(b *testing.B) {
 67    client := NewMockRedisClient()
 68    ctx := context.Background()
 69
 70    b.ResetTimer()
 71    for i := 0; i < b.N; i++ {
 72        lock := distlock.New(client, "resource", 10*time.Second)
 73        lock.Lock(ctx)
 74        lock.Unlock(ctx)
 75    }
 76}
 77
 78// Benchmark lock acquisition with high contention
 79func BenchmarkLockContention(b *testing.B) {
 80    client := NewMockRedisClient()
 81    ctx := context.Background()
 82
 83    var counter int64
 84    var wg sync.WaitGroup
 85
 86    b.ResetTimer()
 87    for i := 0; i < b.N; i++ {
 88        wg.Add(1)
 89        go func() {
 90            defer wg.Done()
 91            lock := distlock.New(client, "shared-resource", 10*time.Second)
 92            if err := lock.Lock(ctx); err == nil {
 93                atomic.AddInt64(&counter, 1)
 94                time.Sleep(time.Microsecond) // Simulate work
 95                lock.Unlock(ctx)
 96            }
 97        }()
 98    }
 99    wg.Wait()
100}
101
102// Benchmark TryLock performance
103func BenchmarkTryLock(b *testing.B) {
104    client := NewMockRedisClient()
105    ctx := context.Background()
106
107    b.ResetTimer()
108    for i := 0; i < b.N; i++ {
109        lock := distlock.New(client, "resource", 10*time.Second)
110        lock.TryLock(ctx)
111        lock.Unlock(ctx)
112    }
113}
114
115// Benchmark refresh operation
116func BenchmarkRefresh(b *testing.B) {
117    client := NewMockRedisClient()
118    ctx := context.Background()
119    lock := distlock.New(client, "resource", 10*time.Second)
120    lock.Lock(ctx)
121    defer lock.Unlock(ctx)
122
123    b.ResetTimer()
124    for i := 0; i < b.N; i++ {
125        lock.Refresh(ctx)
126    }
127}
128
129// Example benchmark results:
130// BenchmarkLockAcquisition-8    1000000    1250 ns/op    320 B/op    5 allocs/op
131// BenchmarkLockContention-8      100000   12500 ns/op   1280 B/op   20 allocs/op
132// BenchmarkTryLock-8            2000000     850 ns/op    256 B/op    4 allocs/op
133// BenchmarkRefresh-8            1500000     920 ns/op    224 B/op    3 allocs/op

Production Considerations

1. Redlock Algorithm for High Availability:

For production deployments with multiple Redis instances, implement the Redlock algorithm:

 1type RedlockClient struct {
 2    clients []RedisClient
 3    quorum  int
 4}
 5
 6func Lock(ctx context.Context, key string, value string, ttl time.Duration) error {
 7    start := time.Now()
 8    acquired := 0
 9
10    for _, client := range r.clients {
11        ok, err := client.SetNX(ctx, key, value, ttl)
12        if err == nil && ok {
13            acquired++
14        }
15    }
16
17    drift := time.Duration(int64(float64(ttl.Nanoseconds()) * 0.01))
18    validityTime := ttl - time.Since(start) - drift
19
20    if acquired >= r.quorum && validityTime > 0 {
21        return nil
22    }
23
24    // Failed to acquire quorum, release all locks
25    r.Unlock(ctx, key, value)
26    return ErrLockNotAcquired
27}

2. Monitoring and Observability:

 1type Metrics struct {
 2    AcquisitionLatency prometheus.Histogram
 3    RenewalFailures    prometheus.Counter
 4    ContentionRate     prometheus.Gauge
 5    LockHoldTime       prometheus.Histogram
 6}
 7
 8func LockWithMetrics(ctx context.Context, m *Metrics) error {
 9    start := time.Now()
10    defer func() {
11        m.AcquisitionLatency.Observe(time.Since(start).Seconds())
12    }()
13
14    return l.Lock(ctx)
15}

3. Graceful Degradation:

 1type LockWithFallback struct {
 2    primary   *Lock
 3    fallback  *Lock
 4    timeout   time.Duration
 5}
 6
 7func Lock(ctx context.Context) error {
 8    errCh := make(chan error, 1)
 9
10    go func() {
11        errCh <- l.primary.Lock(ctx)
12    }()
13
14    select {
15    case err := <-errCh:
16        if err == nil {
17            return nil
18        }
19    case <-time.After(l.timeout):
20    }
21
22    // Fallback to local lock or alternative coordination service
23    return l.fallback.Lock(ctx)
24}

4. Testing Strategies:

  • Use chaos engineering to test network partitions
  • Simulate Redis failures and recovery
  • Test clock drift between servers
  • Verify behavior under high contention
  • Test auto-renewal under various failure modes

Performance Benchmarks:

  • Lock acquisition: 1-2ms, 5-10ms
  • TryLock: 0.5-1ms, 2-3ms
  • Unlock: 0.5-1ms, 2-3ms
  • Memory overhead: ~200 bytes per lock instance

Key Takeaways

  • Distributed locks coordinate access across multiple processes
  • Redis SetNX provides atomic lock acquisition
  • Unique values prevent accidental unlock by other processes
  • TTL prevents deadlocks from crashed processes
  • Lua scripts ensure atomicity for complex operations
  • Auto-renewal extends locks for long-running operations
  • Always use timeouts to avoid indefinite blocking
  • Consider Redlock algorithm for multi-Redis deployments

Advanced Considerations

Split-Brain Scenarios:

  • Use Redlock algorithm with multiple Redis instances
  • Require majority quorum for lock acquisition
  • Handle clock drift between servers

Failure Modes:

  • Network partitions can cause multiple lock holders
  • Redis failover can lose locks
  • Consider etcd or Consul for stronger consistency

Performance:

  • SetNX is O(1) operation
  • Lua scripts execute atomically
  • Use pipelining for multiple operations

Alternatives:

  • etcd: Stronger consistency with Raft consensus
  • Consul: Service mesh integration
  • ZooKeeper: Mature distributed coordination