Exercise: Distributed Lock
Difficulty - Advanced
Learning Objectives
- Implement distributed locking with Redis
- Handle lock acquisition and release
- Implement lock expiration and renewal
- Handle network failures and split-brain scenarios
- Use Lua scripts for atomic operations
- Implement lock reentrant behavior
Problem Statement
Create a distributed lock implementation using Redis that handles timeouts, automatic renewal, and failure scenarios.
Core Components
1package distlock
2
3import (
4 "context"
5 "time"
6)
7
8type Lock struct {
9 key string
10 value string
11 ttl time.Duration
12 client RedisClient
13 stopChan chan struct{}
14}
15
16type RedisClient interface {
17 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration)
18 Eval(ctx context.Context, script string, keys []string, args ...interface{})
19 Del(ctx context.Context, keys ...string) error
20}
21
22func New(client RedisClient, key string, ttl time.Duration) *Lock
23func Lock(ctx context.Context) error
24func Unlock(ctx context.Context) error
25func TryLock(ctx context.Context)
26func Refresh(ctx context.Context) error
27func StartAutoRenewal(ctx context.Context, interval time.Duration)
Solution
Click to see the solution
Algorithm Overview
The distributed lock implementation uses Redis as a coordination service with the following key principles:
1. Lock Acquisition Strategy:
- Uses Redis
SETNXfor atomic lock acquisition - Generates unique token per lock instance to prevent accidental unlocking
- Sets TTL to prevent deadlocks from crashed processes
- Implements retry with exponential backoff for contention scenarios
2. Atomic Operations with Lua Scripts:
- Unlock and refresh operations must be atomic
- Lua scripts execute atomically in Redis, preventing race conditions
- Scripts verify ownership before modifying lock state
3. Auto-Renewal Mechanism:
- Background goroutine extends lock TTL periodically
- Prevents expiration during long-running critical sections
- Stops renewal automatically when lock is released
4. Safety Guarantees:
- Mutual exclusion: Only one client can hold lock at a time
- Deadlock freedom: Locks auto-expire if holder crashes
- Ownership tracking: Unique values prevent unauthorized unlock
Time Complexity Analysis
| Operation | Time Complexity | Explanation |
|---|---|---|
| Lock | O(1) | Single Redis SETNX command |
| Lock | O(n) worst case | n retries until acquisition |
| Unlock | O(1) | Single Lua script execution |
| Refresh | O(1) | Single Lua script execution |
| Auto-renewal | O(1) per tick | Periodic refresh operation |
Space Complexity
- O(1) per lock instance
- Each lock stores: key, value, metadata
- Memory footprint: ~100-200 bytes per lock in Redis
Implementation
1package distlock
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "errors"
8 "fmt"
9 "time"
10)
11
12var (
13 ErrLockNotAcquired = errors.New("lock not acquired")
14 ErrLockNotHeld = errors.New("lock not held")
15 ErrLockExpired = errors.New("lock expired")
16)
17
18// Lua script for atomic unlock
19const unlockScript = `
20if redis.call("get", KEYS[1]) == ARGV[1] then
21 return redis.call("del", KEYS[1])
22else
23 return 0
24end
25`
26
27// Lua script for atomic refresh
28const refreshScript = `
29if redis.call("get", KEYS[1]) == ARGV[1] then
30 return redis.call("pexpire", KEYS[1], ARGV[2])
31else
32 return 0
33end
34`
35
36type RedisClient interface {
37 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration)
38 Eval(ctx context.Context, script string, keys []string, args ...interface{})
39 Del(ctx context.Context, keys ...string) error
40}
41
42type Lock struct {
43 key string
44 value string
45 ttl time.Duration
46 client RedisClient
47 stopChan chan struct{}
48 stopped bool
49}
50
51func New(client RedisClient, key string, ttl time.Duration) *Lock {
52 return &Lock{
53 key: key,
54 value: generateUniqueValue(),
55 ttl: ttl,
56 client: client,
57 stopChan: make(chan struct{}),
58 }
59}
60
61func generateUniqueValue() string {
62 b := make([]byte, 16)
63 rand.Read(b)
64 return hex.EncodeToString(b)
65}
66
67func Lock(ctx context.Context) error {
68 ticker := time.NewTicker(100 * time.Millisecond)
69 defer ticker.Stop()
70
71 for {
72 select {
73 case <-ctx.Done():
74 return ctx.Err()
75 case <-ticker.C:
76 acquired, err := l.TryLock(ctx)
77 if err != nil {
78 return err
79 }
80 if acquired {
81 return nil
82 }
83 }
84 }
85}
86
87func TryLock(ctx context.Context) {
88 acquired, err := l.client.SetNX(ctx, l.key, l.value, l.ttl)
89 if err != nil {
90 return false, fmt.Errorf("failed to acquire lock: %w", err)
91 }
92 return acquired, nil
93}
94
95func Unlock(ctx context.Context) error {
96 result, err := l.client.Eval(ctx, unlockScript, []string{l.key}, l.value)
97 if err != nil {
98 return fmt.Errorf("failed to unlock: %w", err)
99 }
100
101 // Stop auto-renewal if running
102 if !l.stopped {
103 close(l.stopChan)
104 l.stopped = true
105 }
106
107 deleted, ok := result.(int64)
108 if !ok || deleted == 0 {
109 return ErrLockNotHeld
110 }
111
112 return nil
113}
114
115func Refresh(ctx context.Context) error {
116 ttlMs := l.ttl.Milliseconds()
117 result, err := l.client.Eval(ctx, refreshScript, []string{l.key}, l.value, ttlMs)
118 if err != nil {
119 return fmt.Errorf("failed to refresh lock: %w", err)
120 }
121
122 refreshed, ok := result.(int64)
123 if !ok || refreshed == 0 {
124 return ErrLockNotHeld
125 }
126
127 return nil
128}
129
130func StartAutoRenewal(ctx context.Context, interval time.Duration) {
131 go func() {
132 ticker := time.NewTicker(interval)
133 defer ticker.Stop()
134
135 for {
136 select {
137 case <-ctx.Done():
138 return
139 case <-l.stopChan:
140 return
141 case <-ticker.C:
142 if err := l.Refresh(ctx); err != nil {
143 // Log error in production
144 return
145 }
146 }
147 }
148 }()
149}
150
151// WithLock is a helper that acquires a lock, executes a function, and releases the lock
152func WithLock(ctx context.Context, lock *Lock, fn func() error) error {
153 if err := lock.Lock(ctx); err != nil {
154 return err
155 }
156 defer lock.Unlock(ctx)
157
158 return fn()
159}
160
161// Example usage with redis client wrapper
162type RedisLockClient struct {
163 // Wrap actual Redis client
164}
165
166func SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) {
167 // Implementation using actual Redis client
168 // Example with go-redis:
169 // return r.client.SetNX(ctx, key, value, expiration).Result()
170 return false, nil
171}
172
173func Eval(ctx context.Context, script string, keys []string, args ...interface{}) {
174 // Implementation using actual Redis client
175 // Example with go-redis:
176 // return r.client.Eval(ctx, script, keys, args...).Result()
177 return nil, nil
178}
179
180func Del(ctx context.Context, keys ...string) error {
181 // Implementation using actual Redis client
182 // Example with go-redis:
183 // return r.client.Del(ctx, keys...).Err()
184 return nil
185}
Usage Example
1package main
2
3import (
4 "context"
5 "fmt"
6 "time"
7)
8
9func main() {
10 // Create Redis client
11 client := &RedisLockClient{
12 // Initialize with actual Redis connection
13 }
14
15 // Create a lock with 10-second TTL
16 lock := distlock.New(client, "my-resource", 10*time.Second)
17
18 ctx := context.Background()
19
20 // Option 1: Manual lock/unlock
21 if err := lock.Lock(ctx); err != nil {
22 panic(err)
23 }
24 defer lock.Unlock(ctx)
25
26 // Start auto-renewal
27 lock.StartAutoRenewal(ctx, 5*time.Second)
28
29 // Do work
30 fmt.Println("Lock acquired, doing work...")
31 time.Sleep(2 * time.Second)
32
33 // Option 2: Use WithLock helper
34 err := distlock.WithLock(ctx, lock, func() error {
35 fmt.Println("Doing work with lock...")
36 time.Sleep(2 * time.Second)
37 return nil
38 })
39 if err != nil {
40 panic(err)
41 }
42
43 // Option 3: Try lock
44 acquired, err := lock.TryLock(ctx)
45 if err != nil {
46 panic(err)
47 }
48 if acquired {
49 defer lock.Unlock(ctx)
50 fmt.Println("Lock acquired immediately")
51 } else {
52 fmt.Println("Lock not available")
53 }
54}
Benchmarking Code
1package distlock_test
2
3import (
4 "context"
5 "sync"
6 "sync/atomic"
7 "testing"
8 "time"
9)
10
11// MockRedisClient for testing
12type MockRedisClient struct {
13 mu sync.Mutex
14 storage map[string]string
15 expiry map[string]time.Time
16}
17
18func NewMockRedisClient() *MockRedisClient {
19 return &MockRedisClient{
20 storage: make(map[string]string),
21 expiry: make(map[string]time.Time),
22 }
23}
24
25func SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) {
26 m.mu.Lock()
27 defer m.mu.Unlock()
28
29 if _, exists := m.storage[key]; exists {
30 if exp, ok := m.expiry[key]; ok && time.Now().Before(exp) {
31 return false, nil
32 }
33 }
34
35 m.storage[key] = value.(string)
36 m.expiry[key] = time.Now().Add(expiration)
37 return true, nil
38}
39
40func Eval(ctx context.Context, script string, keys []string, args ...interface{}) {
41 m.mu.Lock()
42 defer m.mu.Unlock()
43
44 key := keys[0]
45 value := args[0].(string)
46
47 if m.storage[key] == value {
48 delete(m.storage, key)
49 delete(m.expiry, key)
50 return int64(1), nil
51 }
52 return int64(0), nil
53}
54
55func Del(ctx context.Context, keys ...string) error {
56 m.mu.Lock()
57 defer m.mu.Unlock()
58 for _, key := range keys {
59 delete(m.storage, key)
60 delete(m.expiry, key)
61 }
62 return nil
63}
64
65// Benchmark lock acquisition without contention
66func BenchmarkLockAcquisition(b *testing.B) {
67 client := NewMockRedisClient()
68 ctx := context.Background()
69
70 b.ResetTimer()
71 for i := 0; i < b.N; i++ {
72 lock := distlock.New(client, "resource", 10*time.Second)
73 lock.Lock(ctx)
74 lock.Unlock(ctx)
75 }
76}
77
78// Benchmark lock acquisition with high contention
79func BenchmarkLockContention(b *testing.B) {
80 client := NewMockRedisClient()
81 ctx := context.Background()
82
83 var counter int64
84 var wg sync.WaitGroup
85
86 b.ResetTimer()
87 for i := 0; i < b.N; i++ {
88 wg.Add(1)
89 go func() {
90 defer wg.Done()
91 lock := distlock.New(client, "shared-resource", 10*time.Second)
92 if err := lock.Lock(ctx); err == nil {
93 atomic.AddInt64(&counter, 1)
94 time.Sleep(time.Microsecond) // Simulate work
95 lock.Unlock(ctx)
96 }
97 }()
98 }
99 wg.Wait()
100}
101
102// Benchmark TryLock performance
103func BenchmarkTryLock(b *testing.B) {
104 client := NewMockRedisClient()
105 ctx := context.Background()
106
107 b.ResetTimer()
108 for i := 0; i < b.N; i++ {
109 lock := distlock.New(client, "resource", 10*time.Second)
110 lock.TryLock(ctx)
111 lock.Unlock(ctx)
112 }
113}
114
115// Benchmark refresh operation
116func BenchmarkRefresh(b *testing.B) {
117 client := NewMockRedisClient()
118 ctx := context.Background()
119 lock := distlock.New(client, "resource", 10*time.Second)
120 lock.Lock(ctx)
121 defer lock.Unlock(ctx)
122
123 b.ResetTimer()
124 for i := 0; i < b.N; i++ {
125 lock.Refresh(ctx)
126 }
127}
128
129// Example benchmark results:
130// BenchmarkLockAcquisition-8 1000000 1250 ns/op 320 B/op 5 allocs/op
131// BenchmarkLockContention-8 100000 12500 ns/op 1280 B/op 20 allocs/op
132// BenchmarkTryLock-8 2000000 850 ns/op 256 B/op 4 allocs/op
133// BenchmarkRefresh-8 1500000 920 ns/op 224 B/op 3 allocs/op
Production Considerations
1. Redlock Algorithm for High Availability:
For production deployments with multiple Redis instances, implement the Redlock algorithm:
1type RedlockClient struct {
2 clients []RedisClient
3 quorum int
4}
5
6func Lock(ctx context.Context, key string, value string, ttl time.Duration) error {
7 start := time.Now()
8 acquired := 0
9
10 for _, client := range r.clients {
11 ok, err := client.SetNX(ctx, key, value, ttl)
12 if err == nil && ok {
13 acquired++
14 }
15 }
16
17 drift := time.Duration(int64(float64(ttl.Nanoseconds()) * 0.01))
18 validityTime := ttl - time.Since(start) - drift
19
20 if acquired >= r.quorum && validityTime > 0 {
21 return nil
22 }
23
24 // Failed to acquire quorum, release all locks
25 r.Unlock(ctx, key, value)
26 return ErrLockNotAcquired
27}
2. Monitoring and Observability:
1type Metrics struct {
2 AcquisitionLatency prometheus.Histogram
3 RenewalFailures prometheus.Counter
4 ContentionRate prometheus.Gauge
5 LockHoldTime prometheus.Histogram
6}
7
8func LockWithMetrics(ctx context.Context, m *Metrics) error {
9 start := time.Now()
10 defer func() {
11 m.AcquisitionLatency.Observe(time.Since(start).Seconds())
12 }()
13
14 return l.Lock(ctx)
15}
3. Graceful Degradation:
1type LockWithFallback struct {
2 primary *Lock
3 fallback *Lock
4 timeout time.Duration
5}
6
7func Lock(ctx context.Context) error {
8 errCh := make(chan error, 1)
9
10 go func() {
11 errCh <- l.primary.Lock(ctx)
12 }()
13
14 select {
15 case err := <-errCh:
16 if err == nil {
17 return nil
18 }
19 case <-time.After(l.timeout):
20 }
21
22 // Fallback to local lock or alternative coordination service
23 return l.fallback.Lock(ctx)
24}
4. Testing Strategies:
- Use chaos engineering to test network partitions
- Simulate Redis failures and recovery
- Test clock drift between servers
- Verify behavior under high contention
- Test auto-renewal under various failure modes
Performance Benchmarks:
- Lock acquisition: 1-2ms, 5-10ms
- TryLock: 0.5-1ms, 2-3ms
- Unlock: 0.5-1ms, 2-3ms
- Memory overhead: ~200 bytes per lock instance
Key Takeaways
- Distributed locks coordinate access across multiple processes
- Redis SetNX provides atomic lock acquisition
- Unique values prevent accidental unlock by other processes
- TTL prevents deadlocks from crashed processes
- Lua scripts ensure atomicity for complex operations
- Auto-renewal extends locks for long-running operations
- Always use timeouts to avoid indefinite blocking
- Consider Redlock algorithm for multi-Redis deployments
Advanced Considerations
Split-Brain Scenarios:
- Use Redlock algorithm with multiple Redis instances
- Require majority quorum for lock acquisition
- Handle clock drift between servers
Failure Modes:
- Network partitions can cause multiple lock holders
- Redis failover can lose locks
- Consider etcd or Consul for stronger consistency
Performance:
- SetNX is O(1) operation
- Lua scripts execute atomically
- Use pipelining for multiple operations
Alternatives:
- etcd: Stronger consistency with Raft consensus
- Consul: Service mesh integration
- ZooKeeper: Mature distributed coordination