Why This Matters - The Performance Revolution
Consider a bustling e-commerce platform during a flash sale event. Thousands of customers are simultaneously browsing products, checking prices, and adding items to carts. Without proper caching, each request would hammer the database, causing response times to skyrocket from milliseconds to seconds. Customers would abandon their carts, and revenue would plummet.
This is where distributed caching becomes the unsung hero of modern applications. It acts like a super-fast memory layer that keeps frequently accessed data close to your users, transforming sluggish applications into lightning-fast experiences that delight customers.
Real-world impact: Companies like Twitter process 500 million tweets per day using Redis clusters to serve user timelines in sub-millisecond time. Reddit handles millions of page views by caching comment threads and user data. Instagram serves billions of image metadata requests through Redis, making user experiences feel instantaneous.
Without distributed caching, these platforms would require thousands of additional database servers and still couldn't achieve the same performance levels. The economics are staggering - caching can reduce database costs by 80-90% while improving performance by 10-100x.
Learning Objectives
By the end of this article, you will be able to:
- Choose appropriate caching patterns for different use cases and avoid common pitfalls
- Implement production-ready Redis clients with proper connection pooling and error handling
- Leverage Redis data structures effectively
- Design cache-aside, write-through, and write-behind patterns with proper invalidation
- Build distributed caching systems using consistent hashing and Redis Cluster
- Apply performance optimization techniques like pipelining and Lua scripts
- Monitor and troubleshoot Redis performance in production environments
Core Concepts - Understanding Caching Landscape
The Caching Dilemma: Speed vs Consistency
Before diving into implementations, let's understand the fundamental trade-offs that every caching system must balance:
Speed: Cached data is served from memory vs database queries
Consistency: Cached data might be stale vs database data is always current
💡 Critical Insight: The right caching strategy isn't about choosing speed OR consistency—it's about finding the sweet spot for your specific use case. Some data can be slightly stale for massive performance gains, while other data must always be current even if it's slower.
When to Cache
Perfect Caching Candidates:
- User Profile Data: Changes infrequently, read constantly
- Product Catalog: Updates in batches, browsed continuously
- API Responses: Computationally expensive, same inputs produce same outputs
- Session Data: Frequently accessed during user interactions
- Configuration: Rarely changes, constantly read
1// Perfect for caching:
2type UserProfile struct {
3 ID string `json:"id"`
4 Name string `json:"name"`
5 Email string `json:"email"`
6 Settings UserSettings `json:"settings"`
7 LastLogin time.Time `json:"last_login"`
8}
9// Read 1000x/day, updated maybe 1x/month
10// 99.9% cache hit rate achievable
Poor Caching Candidates:
- Real-time Stock Prices: Change constantly, staleness is unacceptable
- Bank Account Balances: Must always be accurate for compliance
- High-frequency Trading Data: Milliseconds of staleness cause huge losses
- User Password Hashes: Security-sensitive, rarely accessed
1// BAD for caching:
2type StockPrice struct {
3 Symbol string `json:"symbol"`
4 Price float64 `json:"price"`
5 Timestamp time.Time `json:"timestamp"`
6 Volume int64 `json:"volume"`
7}
8// Changes multiple times per second
9// Stale data causes financial losses
10// Cache invalidation overhead exceeds benefit
⚠️ Important: Not all data should be cached. Cache data that is read more often than written, where slight staleness is acceptable, and where performance improvements justify the added complexity.
Practical Examples - Building Production-Ready Caching Systems
Redis Fundamentals: Beyond Simple Key-Value
Redis isn't just a simple key-value store—it's a multi-model database with sophisticated data structures that enable complex operations to be performed atomically and efficiently.
Production-Ready Redis Client
1package redis
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8
9 "github.com/redis/go-redis/v9"
10)
11
12type RedisClient struct {
13 rdb *redis.Client
14 config *Config
15}
16
17type Config struct {
18 Addr string // Redis server address
19 Password string // Redis password
20 DB int // Database number
21 PoolSize int // Connection pool size
22 MinIdleConns int // Minimum idle connections
23 MaxRetries int // Maximum retry attempts
24 DialTimeout time.Duration // Connection timeout
25 ReadTimeout time.Duration // Read timeout
26 WriteTimeout time.Duration // Write timeout
27}
28
29func NewRedisClient(config *Config) {
30 rdb := redis.NewClient(&redis.Options{
31 Addr: config.Addr,
32 Password: config.Password,
33 DB: config.DB,
34
35 // Connection pool settings for production performance
36 PoolSize: config.PoolSize, // Max connections in pool
37 MinIdleConns: config.MinIdleConns, // Min idle connections to maintain
38 MaxIdleConns: config.PoolSize / 2, // Max idle connections
39 ConnMaxIdleTime: 5 * time.Minute, // Close idle connections after 5 min
40 ConnMaxLifetime: 30 * time.Minute, // Reconnect after 30 min
41 PoolTimeout: 4 * time.Second, // Wait time for connection
42
43 // Performance and reliability settings
44 DialTimeout: config.DialTimeout, // Connection establishment timeout
45 ReadTimeout: config.ReadTimeout, // Read operation timeout
46 WriteTimeout: config.WriteTimeout, // Write operation timeout
47
48 // Retry settings
49 MaxRetries: config.MaxRetries, // Maximum retry attempts
50 MinRetryBackoff: 8 * time.Millisecond, // Initial retry backoff
51 MaxRetryBackoff: 512 * time.Millisecond, // Maximum retry backoff
52
53 // Advanced settings
54 ReadOnly: false, // Allow read-only commands
55 RouteByLatency: false, // Route to least-latency node
56 })
57
58 // Test connection immediately
59 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
60 defer cancel()
61
62 if err := rdb.Ping(ctx).Err(); err != nil {
63 return nil, fmt.Errorf("failed to connect to Redis: %w", err)
64 }
65
66 client := &RedisClient{
67 rdb: rdb,
68 config: config,
69 }
70
71 log.Printf("Successfully connected to Redis at %s", config.Addr, config.DB)
72 return client, nil
73}
74
75// Enhanced Get with metrics and error handling
76func Get(ctx context.Context, key string) {
77 start := time.Now()
78
79 val, err := rc.rdb.Get(ctx, key).Result()
80
81 // Record metrics
82 rc.recordLatency("GET", time.Since(start))
83
84 if err == redis.Nil {
85 // Key not found - this is expected behavior, not an error
86 return "", nil
87 }
88
89 if err != nil {
90 rc.recordError("GET", err)
91 return "", fmt.Errorf("Redis GET failed for key %s: %w", key, err)
92 }
93
94 rc.recordHit("GET")
95 return val, nil
96}
97
98// Enhanced Set with TTL and metrics
99func Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
100 start := time.Now()
101
102 err := rc.rdb.Set(ctx, key, value, ttl).Err()
103
104 rc.recordLatency("SET", time.Since(start))
105
106 if err != nil {
107 rc.recordError("SET", err)
108 return fmt.Errorf("Redis SET failed for key %s: %w", key, err)
109 }
110
111 rc.recordOperation("SET")
112 return nil
113}
114
115// Atomic Get-And-Set operation with retry logic
116func GetAndSet(ctx context.Context, key string, value interface{}, ttl time.Duration) {
117 // Use Lua script for atomic get-and-set
118 script := redis.NewScript(`
119 local current_value = redis.call('GET', KEYS[1])
120 if current_value == false then
121 redis.call('SET', KEYS[1], ARGV[1], ARGV[2])
122 return {nil, 1}
123 else
124 return {current_value, 0}
125 end
126 `)
127
128 result, err := script.Run(ctx, rc.rdb, []string{key}, value, int(ttl.Seconds())).Result()
129 if err != nil {
130 return "", false, fmt.Errorf("GET-AND-SET failed: %w", err)
131 }
132
133 // Parse the result array: [current_value, was_set]
134 resultSlice, ok := result.([]interface{})
135 if !ok || len(resultSlice) != 2 {
136 return "", false, fmt.Errorf("unexpected script result format")
137 }
138
139 var currentVal string
140 if resultSlice[0] != nil {
141 currentVal, ok = resultSlice[0].(string)
142 if !ok {
143 return "", false, fmt.Errorf("failed to parse current value")
144 }
145 }
146
147 wasSet, ok := resultSlice[1].(int64)
148 if !ok {
149 return "", false, fmt.Errorf("failed to parse was_set flag")
150 }
151
152 return currentVal, wasSet == 1, nil
153}
154
155// Metrics recording methods
156func recordLatency(operation string, duration time.Duration) {
157 // Record operation latency for monitoring
158 log.Printf("Redis %s took %v", operation, duration)
159}
160
161func recordHit(operation string) {
162 // Record cache hit
163 log.Printf("Redis %s cache hit", operation)
164}
165
166func recordError(operation string, err error) {
167 // Record cache error
168 log.Printf("Redis %s error: %v", operation, err)
169}
170
171func recordOperation(operation string) {
172 // Record successful operation
173 log.Printf("Redis %s operation successful", operation)
174}
Redis Data Structures: The Right Tool for Every Job
Redis provides multiple data structures that excel at different use cases. Choosing the right structure is crucial for performance and simplicity.
Strings: Simple but Powerful
1// String operations for basic caching
2type StringCache struct {
3 client *RedisClient
4}
5
6func NewStringCache(client *RedisClient) *StringCache {
7 return &StringCache{client: client}
8}
9
10// Cache JSON objects as strings
11func CacheUser(ctx context.Context, user *User) error {
12 // Serialize user to JSON
13 userData, err := json.Marshal(user)
14 if err != nil {
15 return fmt.Errorf("failed to serialize user: %w", err)
16 }
17
18 // Cache with 30-minute TTL
19 key := fmt.Sprintf("user:%s", user.ID)
20 return sc.client.Set(ctx, key, userData, 30*time.Minute)
21}
22
23func GetUser(ctx context.Context, userID string) {
24 key := fmt.Sprintf("user:%s", userID)
25 userData, err := sc.client.Get(ctx, key)
26 if err != nil {
27 return nil, fmt.Errorf("failed to get user from cache: %w", err)
28 }
29
30 if userData == "" {
31 return nil, nil // Cache miss
32 }
33
34 var user User
35 if err := json.Unmarshal([]byte(userData), &user); err != nil {
36 return nil, fmt.Errorf("failed to deserialize user: %w", err)
37 }
38
39 return &user, nil
40}
41
42// Atomic counters for rate limiting and analytics
43func IncrementAPIRate(ctx context.Context, clientIP string, window time.Duration) {
44 // Use IP as key with expiration
45 key := fmt.Sprintf("rate_limit:%s", clientIP)
46
47 // Increment and set expiration atomically
48 script := redis.NewScript(`
49 local current = redis.call('INCR', KEYS[1])
50 if current == 1 then
51 redis.call('EXPIRE', KEYS[1], ARGV[1])
52 end
53 return current
54 `)
55
56 result, err := script.Run(ctx, sc.client.rdb, []string{key}, int(window.Seconds())).Result()
57 if err != nil {
58 return 0, fmt.Errorf("failed to increment rate limit: %w", err)
59 }
60
61 return result.(int64), nil
62}
63
64// Distributed lock implementation
65func AcquireLock(ctx context.Context, lockKey string, ttl time.Duration) {
66 // Use SET NX EX for atomic lock acquisition
67 result, err := sc.client.rdb.SetNX(ctx, lockKey, "locked", ttl).Result()
68 if err != nil {
69 return false, fmt.Errorf("failed to acquire lock: %w", err)
70 }
71
72 return result, nil
73}
Hashes: Perfect for Object Caching
1// Hash operations for structured data caching
2type HashCache struct {
3 client *RedisClient
4}
5
6func NewHashCache(client *RedisClient) *HashCache {
7 return &HashCache{client: client}
8}
9
10// Cache user profile as hash
11func CacheUserProfile(ctx context.Context, user *User) error {
12 key := fmt.Sprintf("user_profile:%s", user.ID)
13
14 // Store user fields as hash
15 fields := map[string]interface{}{
16 "name": user.Name,
17 "email": user.Email,
18 "created_at": user.CreatedAt.Unix(),
19 "last_login": user.LastLogin.Unix(),
20 "tier": user.Tier,
21 }
22
23 // Set all fields in one operation
24 err := hc.client.rdb.HMSet(ctx, key, fields).Err()
25 if err != nil {
26 return fmt.Errorf("failed to cache user profile hash: %w", err)
27 }
28
29 // Set TTL on entire hash
30 return hc.client.rdb.Expire(ctx, key, 24*time.Hour).Err()
31}
32
33// Get specific fields from user profile
34func GetUserProfileFields(ctx context.Context, userID string, fields []string) {
35 key := fmt.Sprintf("user_profile:%s", userID)
36
37 // Get only requested fields
38 result, err := hc.client.rdb.HMGet(ctx, key, fields...).Result()
39 if err != nil {
40 return nil, fmt.Errorf("failed to get user profile fields: %w", err)
41 }
42
43 // Build result map
44 profile := make(map[string]string)
45 for i, field := range fields {
46 if result[i] != nil {
47 profile[field] = result[i].(string)
48 }
49 }
50
51 return profile, nil
52}
53
54// Update specific fields without affecting others
55func UpdateUserLogin(ctx context.Context, userID string) error {
56 key := fmt.Sprintf("user_profile:%s", userID)
57 now := time.Now().Unix()
58
59 // Update last login field only
60 err := hc.client.rdb.HSet(ctx, key, "last_login", now).Err()
61 if err != nil {
62 return fmt.Errorf("failed to update user login: %w", err)
63 }
64
65 // Increment login count atomically
66 _, err = hc.client.rdb.HIncrBy(ctx, key, "login_count", 1).Result()
67 return err
68}
69
70// Atomic field increment for counters
71func IncrementUserMetric(ctx context.Context, userID, metric string) {
72 key := fmt.Sprintf("user_metrics:%s", userID)
73
74 // Increment metric and set TTL if first time
75 script := redis.NewScript(`
76 local new_value = redis.call('HINCRBY', KEYS[1], ARGV[1], ARGV[2])
77 local exists = redis.call('HEXISTS', KEYS[1], 'created_at')
78 if exists == 0 then
79 redis.call('HSET', KEYS[1], 'created_at', ARGV[3])
80 redis.call('EXPIRE', KEYS[1], ARGV[4])
81 end
82 return new_value
83 `)
84
85 result, err := script.Run(ctx, hc.client.rdb,
86 []string{key},
87 metric,
88 1,
89 time.Now().Unix(),
90 int((24 * time.Hour).Seconds())).Result()
91
92 if err != nil {
93 return 0, fmt.Errorf("failed to increment user metric: %w", err)
94 }
95
96 return result.(int64), nil
97}
Sorted Sets: Power Rankings and Leaderboards
1// Sorted set operations for leaderboards and scoring
2type SortedSetCache struct {
3 client *RedisClient
4}
5
6func NewSortedSetCache(client *RedisClient) *SortedSetCache {
7 return &SortedSetCache{client: client}
8}
9
10// Game leaderboard implementation
11func UpdatePlayerScore(ctx context.Context, gameID, playerID string, score float64) error {
12 key := fmt.Sprintf("leaderboard:%s", gameID)
13
14 // Add player to sorted set with score
15 _, err := ssc.client.rdb.ZAdd(ctx, key, redis.Z{
16 Score: score,
17 Member: playerID,
18 }).Result()
19
20 if err != nil {
21 return fmt.Errorf("failed to update player score: %w", err)
22 }
23
24 // Set TTL on leaderboard
25 return ssc.client.rdb.Expire(ctx, key, 7*24*time.Hour).Err()
26}
27
28// Get top N players with rankings
29func GetTopPlayers(ctx context.Context, gameID string, n int64) {
30 key := fmt.Sprintf("leaderboard:%s", gameID)
31
32 // Get top N players with scores
33 results, err := ssc.client.rdb.ZRevRangeWithScores(ctx, key, 0, n-1).Result()
34 if err != nil {
35 return nil, fmt.Errorf("failed to get top players: %w", err)
36 }
37
38 players := make([]PlayerRank, len(results))
39 for i, result := range results {
40 rank := i + 1
41 playerID := result.Member.(string)
42 score := result.Score
43
44 // Get player details from hash
45 playerKey := fmt.Sprintf("player:%s", playerID)
46 playerData, err := ssc.client.rdb.HGetAll(ctx, playerKey).Result()
47 if err != nil {
48 log.Printf("Failed to get player details for %s: %v", playerID, err)
49 continue
50 }
51
52 players[i] = PlayerRank{
53 Rank: rank,
54 PlayerID: playerID,
55 Score: score,
56 Name: playerData["name"],
57 Avatar: playerData["avatar"],
58 }
59 }
60
61 return players, nil
62}
63
64// Get player's rank and position
65func GetPlayerRank(ctx context.Context, gameID, playerID string) {
66 key := fmt.Sprintf("leaderboard:%s", gameID)
67
68 // Get player's rank
69 rank, err := ssc.client.rdb.ZRevRank(ctx, key, playerID).Result()
70 if err != nil {
71 if err == redis.Nil {
72 return nil, nil // Player not in leaderboard
73 }
74 return nil, fmt.Errorf("failed to get player rank: %w", err)
75 }
76
77 // Get player's score
78 score, err := ssc.client.rdb.ZScore(ctx, key, playerID).Result()
79 if err != nil {
80 return nil, fmt.Errorf("failed to get player score: %w", err)
81 }
82
83 // Get player details
84 playerKey := fmt.Sprintf("player:%s", playerID)
85 playerData, err := ssc.client.rdb.HGetAll(ctx, playerKey).Result()
86 if err != nil {
87 log.Printf("Failed to get player details: %v", err)
88 }
89
90 return &PlayerRank{
91 Rank: int(rank) + 1, // Convert to 1-based ranking
92 PlayerID: playerID,
93 Score: score,
94 Name: playerData["name"],
95 Avatar: playerData["avatar"],
96 }, nil
97}
98
99// Page through leaderboard efficiently
100func GetLeaderboardPage(ctx context.Context, gameID string, page, pageSize int64) {
101 if page < 1 {
102 page = 1
103 }
104
105 key := fmt.Sprintf("leaderboard:%s", gameID)
106
107 // Calculate start and end indices
108 start := * pageSize
109 end := start + pageSize - 1
110
111 // Get players for this page
112 results, err := ssc.client.rdb.ZRevRangeWithScores(ctx, key, start, end).Result()
113 if err != nil {
114 return nil, fmt.Errorf("failed to get leaderboard page: %w", err)
115 }
116
117 players := make([]PlayerRank, len(results))
118 for i, result := range results {
119 rank := start + i + 1
120 playerID := result.Member.(string)
121 score := result.Score
122
123 players[i] = PlayerRank{
124 Rank: int(rank),
125 PlayerID: playerID,
126 Score: score,
127 }
128 }
129
130 return players, nil
131}
Lists: Queues and Time-Series Data
1// List operations for queues and recent activity feeds
2type ListCache struct {
3 client *RedisClient
4}
5
6func NewListCache(client *RedisClient) *ListCache {
7 return &ListCache{client: client}
8}
9
10// Recent activity feed implementation
11func AddActivity(ctx context.Context, userID string, activity *Activity) error {
12 // Serialize activity
13 activityData, err := json.Marshal(activity)
14 if err != nil {
15 return fmt.Errorf("failed to serialize activity: %w", err)
16 }
17
18 // Add to user's activity feed
19 key := fmt.Sprintf("activity_feed:%s", userID)
20
21 err = lc.client.rdb.LPush(ctx, key, activityData).Err()
22 if err != nil {
23 return fmt.Errorf("failed to add activity: %w", err)
24 }
25
26 // Trim list to keep only most recent 100 activities
27 err = lc.client.rdb.LTrim(ctx, key, 0, 99).Err()
28 if err != nil {
29 return fmt.Errorf("failed to trim activity feed: %w", err)
30 }
31
32 // Set TTL on the entire list
33 return lc.client.rdb.Expire(ctx, key, 7*24*time.Hour).Err()
34}
35
36// Get recent activities for user
37func GetRecentActivities(ctx context.Context, userID string, limit int64) {
38 key := fmt.Sprintf("activity_feed:%s", userID)
39
40 // Get recent activities
41 start := -limit // Get last 'limit' items
42 end := -1 // Until the end
43
44 activityData, err := lc.client.rdb.LRange(ctx, key, start, end).Result()
45 if err != nil {
46 return nil, fmt.Errorf("failed to get recent activities: %w", err)
47 }
48
49 activities := make([]*Activity, 0, len(activityData))
50
51 // Reverse order to show most recent first
52 for i := len(activityData) - 1; i >= 0; i-- {
53 var activity Activity
54 if err := json.Unmarshal([]byte(activityData[i]), &activity); err != nil {
55 log.Printf("Failed to unmarshal activity %d: %v", i, err)
56 continue
57 }
58 activities = append(activities, &activity)
59 }
60
61 return activities, nil
62}
63
64// Work queue implementation with blocking operations
65func EnqueueTask(ctx context.Context, queueName string, task *Task) error {
66 // Serialize task
67 taskData, err := json.Marshal(task)
68 if err != nil {
69 return fmt.Errorf("failed to serialize task: %w", err)
70 }
71
72 // Add to queue
73 key := fmt.Sprintf("task_queue:%s", queueName)
74
75 err = lc.client.rdb.RPush(ctx, key, taskData).Err()
76 if err != nil {
77 return fmt.Errorf("failed to enqueue task: %w", err)
78 }
79
80 return nil
81}
82
83// Blocking dequeue for worker processes
84func DequeueTask(ctx context.Context, queueName string, timeout time.Duration) {
85 key := fmt.Sprintf("task_queue:%s", queueName)
86
87 // Blocking pop from front of list
88 result, err := lc.client.rdb.BLPop(ctx, timeout, key).Result()
89 if err != nil {
90 if err == redis.Nil {
91 return nil, nil // Timeout, no task available
92 }
93 return nil, fmt.Errorf("failed to dequeue task: %w", err)
94 }
95
96 // BLOPOP returns [key, value]
97 if len(result) != 2 {
98 return nil, fmt.Errorf("unexpected BLOPOP result format")
99 }
100
101 taskData := result[1]
102 var task Task
103 if err := json.Unmarshal([]byte(taskData), &task); err != nil {
104 return nil, fmt.Errorf("failed to unmarshal task: %w", err)
105 }
106
107 return &task, nil
108}
Common Patterns and Pitfalls - Learning from Production Experience
Pattern 1: Cache-Aside
This is the most common and versatile caching pattern. Think of it like looking up a word in the dictionary - you check your personal notes first, and if it's not there, you look it up in the big dictionary and write it down for next time.
1type CacheAsideCache struct {
2 redis *RedisClient
3 database Database
4 ttl time.Duration
5}
6
7func NewCacheAsideCache(redis *RedisClient, db Database, ttl time.Duration) *CacheAsideCache {
8 return &CacheAsideCache{
9 redis: redis,
10 database: db,
11 ttl: ttl,
12 }
13}
14
15// Get implements cache-aside pattern with single flight protection
16func Get(ctx context.Context, key string) {
17 // 1. Try cache first
18 cached, err := cac.redis.Get(ctx, key)
19 if err == nil && cached != "" {
20 // Cache hit - return cached data
21 var data interface{}
22 if err := json.Unmarshal([]byte(cached), &data); err != nil {
23 log.Printf("Failed to unmarshal cached data for key %s: %v", key, err)
24 } else {
25 return data, nil
26 }
27 }
28
29 // 2. Cache miss - fetch from database
30 data, err := cac.database.Get(ctx, key)
31 if err != nil {
32 return nil, fmt.Errorf("database fetch failed: %w", err)
33 }
34
35 // 3. Populate cache for future requests
36 go func() {
37 // Fire-and-forget cache population
38 if serialized, err := json.Marshal(data); err == nil {
39 cac.redis.Set(context.Background(), key, serialized, cac.ttl)
40 }
41 }()
42
43 return data, nil
44}
45
46// Invalidate cache on data updates
47func Invalidate(ctx context.Context, key string) error {
48 return cac.redis.Delete(ctx, key)
49}
50
51// Update with cache invalidation
52func Set(ctx context.Context, key string, value interface{}) error {
53 // Update database first
54 if err := cac.database.Set(ctx, key, value); err != nil {
55 return fmt.Errorf("database update failed: %w", err)
56 }
57
58 // Invalidate cache
59 return cac.Invalidate(ctx, key)
60}
Pattern 2: Write-Through for Critical Data
Use this when you need guaranteed cache consistency and can tolerate the extra write latency.
1type WriteThroughCache struct {
2 redis *RedisClient
3 database Database
4 ttl time.Duration
5}
6
7func NewWriteThroughCache(redis *RedisClient, db Database, ttl time.Duration) *WriteThroughCache {
8 return &WriteThroughCache{
9 redis: redis,
10 database: db,
11 ttl: ttl,
12 }
13}
14
15// Set writes to both database and cache synchronously
16func Set(ctx context.Context, key string, value interface{}) error {
17 // Serialize value for cache
18 serialized, err := json.Marshal(value)
19 if err != nil {
20 return fmt.Errorf("failed to serialize value: %w", err)
21 }
22
23 // Write to database first
24 if err := wtc.database.Set(ctx, key, value); err != nil {
25 return fmt.Errorf("database write failed: %w", err)
26 }
27
28 // Write to cache
29 if err := wtc.redis.Set(ctx, key, serialized, wtc.ttl); err != nil {
30 // Cache write failed, but database succeeded
31 // Log error but don't fail the operation
32 log.Printf("Cache write failed for key %s: %v", key, err)
33 }
34
35 return nil
36}
37
38// Get always reads from cache first, then database on miss
39func Get(ctx context.Context, key string) {
40 // Try cache first
41 cached, err := wtc.redis.Get(ctx, key)
42 if err == nil && cached != "" {
43 var value interface{}
44 if err := json.Unmarshal([]byte(cached), &value); err == nil {
45 return value, nil
46 }
47 log.Printf("Failed to unmarshal cached data for key %s: %v", key, err)
48 }
49
50 // Cache miss - fetch from database
51 value, err := wtc.database.Get(ctx, key)
52 if err != nil {
53 return nil, fmt.Errorf("database fetch failed: %w", err)
54 }
55
56 // Populate cache for future reads
57 go func() {
58 if serialized, err := json.Marshal(value); err == nil {
59 wtc.redis.Set(context.Background(), key, serialized, wtc.ttl)
60 }
61 }()
62
63 return value, nil
64}
Pattern 3: Write-Behind for High Throughput
Perfect for high-write scenarios where you can tolerate slight data staleness. Think of it like writing changes in a notebook throughout the day and updating the master ledger once at night.
1type WriteBehindCache struct {
2 redis *RedisClient
3 database Database
4 ttl time.Duration
5 batchSize int
6 flushInterval time.Duration
7 buffer chan WriteOperation
8 stopCh chan struct{}
9 wg sync.WaitGroup
10}
11
12type WriteOperation struct {
13 Key string
14 Value interface{}
15 Op string // "SET", "DELETE", "INCR"
16}
17
18func NewWriteBehindCache(redis *RedisClient, db Database, ttl time.Duration, batchSize int, flushInterval time.Duration) *WriteBehindCache {
19 wbc := &WriteBehindCache{
20 redis: redis,
21 database: db,
22 ttl: ttl,
23 batchSize: batchSize,
24 flushInterval: flushInterval,
25 buffer: make(chan WriteOperation, 1000),
26 stopCh: make(chan struct{}),
27 }
28
29 // Start background flusher
30 wbc.wg.Add(1)
31 go wbc.backgroundFlusher()
32
33 return wbc
34}
35
36// Set writes to cache immediately, queues database write
37func Set(ctx context.Context, key string, value interface{}) error {
38 // Write to cache immediately
39 serialized, err := json.Marshal(value)
40 if err != nil {
41 return fmt.Errorf("failed to serialize value: %w", err)
42 }
43
44 if err := wbc.redis.Set(ctx, key, serialized, wbc.ttl); err != nil {
45 return fmt.Errorf("cache write failed: %w", err)
46 }
47
48 // Queue database write
49 select {
50 case wbc.buffer <- WriteOperation{Key: key, Value: value, Op: "SET"}:
51 return nil
52 case <-ctx.Done():
53 return ctx.Err()
54 default:
55 // Buffer full - return error or handle overflow
56 return fmt.Errorf("write buffer is full")
57 }
58}
59
60// backgroundFlusher processes queued writes
61func backgroundFlusher() {
62 defer wbc.wg.Done()
63
64 ticker := time.NewTicker(wbc.flushInterval)
65 defer ticker.Stop()
66
67 batch := make([]WriteOperation, 0, wbc.batchSize)
68
69 for {
70 select {
71 case op := <-wbc.buffer:
72 batch = append(batch, op)
73 if len(batch) >= wbc.batchSize {
74 wbc.flushBatch(batch)
75 batch = batch[:0] // Reset slice
76 }
77
78 case <-ticker.C:
79 if len(batch) > 0 {
80 wbc.flushBatch(batch)
81 batch = batch[:0]
82 }
83
84 case <-wbc.stopCh:
85 // Flush remaining operations before exiting
86 if len(batch) > 0 {
87 wbc.flushBatch(batch)
88 }
89 return
90 }
91 }
92}
93
94// flushBatch writes operations to database
95func flushBatch(batch []WriteOperation) {
96 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
97 defer cancel()
98
99 for _, op := range batch {
100 switch op.Op {
101 case "SET":
102 if err := wbc.database.Set(ctx, op.Key, op.Value); err != nil {
103 log.Printf("Failed to write %s to database: %v", op.Key, err)
104 }
105 case "DELETE":
106 if err := wbc.database.Delete(ctx, op.Key); err != nil {
107 log.Printf("Failed to delete %s from database: %v", op.Key, err)
108 }
109 case "INCR":
110 // Handle increment operations
111 if err := wbc.database.Increment(ctx, op.Key, op.Value.(int64)); err != nil {
112 log.Printf("Failed to increment %s in database: %v", op.Key, err)
113 }
114 }
115 }
116}
117
118func Close() error {
119 close(wbc.stopCh)
120 wbc.wg.Wait()
121 return nil
122}
Common Pitfalls and Solutions
Pitfall 1: Cache Stampede
When cache expires, multiple requests try to populate it simultaneously, overwhelming the backend system.
1// Solution: Single flight pattern with lock-based protection
2type StampedeProtection struct {
3 cache *RedisClient
4 locks map[string]*sync.Mutex
5 mu sync.RWMutex
6}
7
8func GetWithProtection(ctx context.Context, key string, loader func(), ttl time.Duration) {
9 // Try cache first
10 cached, err := sp.cache.Get(ctx, key)
11 if err == nil && cached != "" {
12 var data interface{}
13 if err := json.Unmarshal([]byte(cached), &data); err == nil {
14 return data, nil
15 }
16 }
17
18 // Get or create lock for this key
19 sp.mu.Lock()
20 lock, exists := sp.locks[key]
21 if !exists {
22 lock = &sync.Mutex{}
23 sp.locks[key] = lock
24 }
25 sp.mu.Unlock()
26
27 // Acquire lock
28 lock.Lock()
29 defer lock.Unlock()
30
31 // Double-check cache after acquiring lock
32 cached, err = sp.cache.Get(ctx, key)
33 if err == nil && cached != "" {
34 var data interface{}
35 if err := json.Unmarshal([]byte(cached), &data); err == nil {
36 return data, nil
37 }
38 }
39
40 // Load data
41 data, err := loader()
42 if err != nil {
43 return nil, err
44 }
45
46 // Populate cache
47 serialized, _ := json.Marshal(data)
48 go sp.cache.Set(context.Background(), key, serialized, ttl)
49
50 return data, nil
51}
Pitfall 2: Poor Cache Key Design
Inconsistent cache keys lead to duplicate data and cache misses.
1// BAD: Inconsistent key patterns
2userKey1 := "user:123"
3userKey2 := "123:user"
4userKey3 := "user_profile_123"
5
6// GOOD: Consistent, predictable key patterns
7const (
8 UserKeyPrefix = "user:"
9 UserProfilePrefix = "user_profile:"
10 UserSettingsPrefix = "user_settings:"
11)
12
13func UserKey(userID string) string {
14 return fmt.Sprintf("%s%s", UserKeyPrefix, userID)
15}
16
17func UserProfileKey(userID string) string {
18 return fmt.Sprintf("%s%s", UserProfilePrefix, userID)
19}
20
21// Use constants to avoid typos and ensure consistency
Pitfall 3: No TTL Management
Without proper TTL management, cache grows indefinitely and memory fills up.
1// Solution: Automatic TTL based on data type and access patterns
2type TTLManager struct {
3 DefaultTTL time.Duration
4 UserTTL time.Duration
5 ConfigTTL time.Duration
6 SessionTTL time.Duration
7}
8
9func NewTTLManager() *TTLManager {
10 return &TTLManager{
11 DefaultTTL: 5 * time.Minute,
12 UserTTL: 30 * time.Minute,
13 ConfigTTL: 1 * time.Hour,
14 SessionTTL: 24 * time.Hour,
15 }
16}
17
18func GetTTL(dataType string) time.Duration {
19 switch dataType {
20 case "user":
21 return tm.UserTTL
22 case "config":
23 return tm.ConfigTTL
24 case "session":
25 return tm.SessionTTL
26 default:
27 return tm.DefaultTTL
28 }
29}
Integration and Mastery - Building Complete Caching Systems
Multi-Level Caching Architecture
Combine local and distributed caching for optimal performance:
1type MultiLevelCache struct {
2 localCache *LocalCache // In-process L1 cache
3 redisCache *RedisClient // Distributed L2 cache
4 database Database // L3 data source
5 localTTL time.Duration
6 redisTTL time.Duration
7}
8
9func NewMultiLevelCache(redis *RedisClient, db Database) *MultiLevelCache {
10 return &MultiLevelCache{
11 localCache: NewLocalCache(1000), // 1000 items max
12 redisCache: redis,
13 database: db,
14 localTTL: 5 * time.Minute, // Local cache TTL
15 redisTTL: 30 * time.Minute, // Redis cache TTL
16 }
17}
18
19func Get(ctx context.Context, key string) {
20 // L1: Check local cache
21 if value, found := mlc.localCache.Get(key); found {
22 return value, nil
23 }
24
25 // L2: Check Redis cache
26 cached, err := mlc.redisCache.Get(ctx, key)
27 if err == nil && cached != "" {
28 var value interface{}
29 if err := json.Unmarshal([]byte(cached), &value); err == nil {
30 // Promote to local cache
31 mlc.localCache.Set(key, value, mlc.localTTL)
32 return value, nil
33 }
34 }
35
36 // L3: Fetch from database
37 value, err := mlc.database.Get(ctx, key)
38 if err != nil {
39 return nil, fmt.Errorf("database fetch failed: %w", err)
40 }
41
42 // Populate both cache levels
43 serialized, _ := json.Marshal(value)
44
45 // Update Redis
46 go mlc.redisCache.Set(context.Background(), key, serialized, mlc.redisTTL)
47
48 // Update local cache
49 mlc.localCache.Set(key, value, mlc.localTTL)
50
51 return value, nil
52}
Cache Warming and Population
1type CacheWarmer struct {
2 redis *RedisClient
3 database Database
4 workers int
5}
6
7func NewCacheWarmer(redis *RedisClient, db Database, workers int) *CacheWarmer {
8 return &CacheWarmer{
9 redis: redis,
10 database: db,
11 workers: workers,
12 }
13}
14
15// Warm cache with frequently accessed data
16func WarmCache(ctx context.Context) error {
17 // Get list of frequently accessed keys
18 frequentKeys, err := cw.database.GetFrequentKeys(ctx, 1000)
19 if err != nil {
20 return fmt.Errorf("failed to get frequent keys: %w", err)
21 }
22
23 // Process keys in parallel
24 keyChan := make(chan string, len(frequentKeys))
25 for _, key := range frequentKeys {
26 keyChan <- key
27 }
28 close(keyChan)
29
30 var wg sync.WaitGroup
31 for i := 0; i < cw.workers; i++ {
32 wg.Add(1)
33 go cw.worker(ctx, keyChan, &wg)
34 }
35
36 wg.Wait()
37 return nil
38}
39
40func worker(ctx context.Context, keyChan <-chan string, wg *sync.WaitGroup) {
41 defer wg.Done()
42
43 for key := range keyChan {
44 select {
45 case <-ctx.Done():
46 return
47 default:
48 // Load data from database
49 data, err := cw.database.Get(ctx, key)
50 if err != nil {
51 log.Printf("Failed to load data for key %s: %v", key, err)
52 continue
53 }
54
55 // Cache the data
56 serialized, err := json.Marshal(data)
57 if err != nil {
58 log.Printf("Failed to serialize data for key %s: %v", key, err)
59 continue
60 }
61
62 // Set appropriate TTL based on data type
63 ttl := cw.getTTLForData(key)
64 if err := cw.redis.Set(ctx, key, serialized, ttl); err != nil {
65 log.Printf("Failed to cache data for key %s: %v", key, err)
66 }
67 }
68 }
69}
Practice Exercises
Exercise 1: Implement Exactly-Once Cache Pattern
Build a cache that prevents duplicate cache population and handles concurrent access safely.
Show Solution
1package exercise
2
3import (
4 "context"
5 "sync"
6 "time"
7)
8
9type ExactlyOnceCache struct {
10 cache map[string]*cacheEntry
11 locks map[string]*sync.Mutex
12 mu sync.RWMutex
13 defaultTTL time.Duration
14}
15
16type cacheEntry struct {
17 value interface{}
18 expiresAt time.Time
19 loading bool
20 cond *sync.Cond
21}
22
23func NewExactlyOnceCache(defaultTTL time.Duration) *ExactlyOnceCache {
24 return &ExactlyOnceCache{
25 cache: make(map[string]*cacheEntry),
26 locks: make(map[string]*sync.Mutex),
27 defaultTTL: defaultTTL,
28 }
29}
30
31func GetOrLoad(ctx context.Context, key string, loader func()) {
32 eoc.mu.RLock()
33 entry, exists := eoc.cache[key]
34 eoc.mu.RUnlock()
35
36 // Fast path: entry exists and not expired
37 if exists && !entry.expiresAt.Before(time.Now()) {
38 return entry.value, nil
39 }
40
41 // Get or create lock for this key
42 eoc.mu.Lock()
43 lock, lockExists := eoc.locks[key]
44 if !lockExists {
45 lock = &sync.Mutex{}
46 eoc.locks[key] = lock
47 }
48 eoc.mu.Unlock()
49
50 // Acquire lock
51 lock.Lock()
52 defer lock.Unlock()
53
54 // Double-check after acquiring lock
55 eoc.mu.RLock()
56 entry, exists = eoc.cache[key]
57 eoc.mu.RUnlock()
58
59 if exists && !entry.expiresAt.Before(time.Now()) {
60 return entry.value, nil
61 }
62
63 // Create or update entry for loading
64 if !exists {
65 entry = &cacheEntry{
66 expiresAt: time.Now().Add(eoc.defaultTTL),
67 cond: sync.NewCond(&sync.Mutex{}),
68 }
69 eoc.mu.Lock()
70 eoc.cache[key] = entry
71 eoc.mu.Unlock()
72 }
73
74 // Handle concurrent loading
75 if entry.loading {
76 // Wait for other goroutine to finish loading
77 entry.cond.L.Lock()
78 for entry.loading {
79 entry.cond.Wait()
80 }
81 entry.cond.L.Unlock()
82
83 // Check if loading succeeded
84 if !entry.expiresAt.Before(time.Now()) {
85 return entry.value, nil
86 }
87 return nil, fmt.Errorf("cache loading failed")
88 }
89
90 // This goroutine will load the value
91 entry.loading = true
92 entry.cond.L.Lock()
93
94 // Load value
95 value, err := loader()
96
97 // Update entry
98 eoc.mu.Lock()
99 if err != nil {
100 // Loading failed - remove entry
101 delete(eoc.cache, key)
102 delete(eoc.locks, key)
103 } else {
104 entry.value = value
105 entry.expiresAt = time.Now().Add(eoc.defaultTTL)
106 }
107 eoc.mu.Unlock()
108
109 // Signal waiting goroutines
110 entry.loading = false
111 entry.cond.Broadcast()
112 entry.cond.L.Unlock()
113
114 if err != nil {
115 return nil, err
116 }
117
118 return value, nil
119}
Exercise 2: Build Intelligent Cache Eviction
Implement a cache with multiple eviction policies and automatic cleanup.
Show Solution
1package exercise
2
3import (
4 "container/heap"
5 "sync"
6 "time"
7)
8
9type EvictionPolicy int
10
11const (
12 EvictLRU EvictionPolicy = iota
13 EvictLFU
14 EvictTTL
15)
16
17type IntelligentCache struct {
18 items map[string]*cacheItem
19 lruQueue *lruQueue
20 lfuHeap *lfuHeap
21 maxSize int
22 policy EvictionPolicy
23 mu sync.RWMutex
24}
25
26type cacheItem struct {
27 key string
28 value interface{}
29 accessAt time.Time
30 frequency int
31 expiresAt time.Time
32 lruIndex int
33}
34
35type lruQueue struct {
36 keys []string
37 index map[string]int
38}
39
40type lfuItem struct {
41 key string
42 frequency int
43 index int
44}
45
46type lfuHeap []*lfuItem
47
48func NewIntelligentCache(maxSize int, policy EvictionPolicy) *IntelligentCache {
49 cache := &IntelligentCache{
50 items: make(map[string]*cacheItem),
51 lruQueue: &lruQueue{index: make(map[string]int)},
52 lfuHeap: make(lfuHeap, 0),
53 maxSize: maxSize,
54 policy: policy,
55 }
56 heap.Init(&cache.lfuHeap)
57 return cache
58}
59
60func Set(key string, value interface{}, ttl time.Duration) {
61 ic.mu.Lock()
62 defer ic.mu.Unlock()
63
64 now := time.Now()
65 expiresAt := now.Add(ttl)
66
67 if item, exists := ic.items[key]; exists {
68 // Update existing item
69 item.value = value
70 item.accessAt = now
71 item.frequency++
72 item.expiresAt = expiresAt
73
74 ic.updateStructures(item, key)
75 } else {
76 // Add new item
77 item := &cacheItem{
78 key: key,
79 value: value,
80 accessAt: now,
81 frequency: 1,
82 expiresAt: expiresAt,
83 }
84
85 ic.items[key] = item
86
87 // Check if eviction needed
88 if len(ic.items) > ic.maxSize {
89 ic.evict()
90 }
91
92 ic.addToStructures(item)
93 }
94}
95
96func Get(key string) {
97 ic.mu.Lock()
98 defer ic.mu.Unlock()
99
100 item, exists := ic.items[key]
101 if !exists {
102 return nil, false
103 }
104
105 // Check expiration
106 if time.Now().After(item.expiresAt) {
107 ic.removeItem(key)
108 return nil, false
109 }
110
111 // Update access information
112 now := time.Now()
113 item.accessAt = now
114 item.frequency++
115
116 ic.updateStructures(item, key)
117
118 return item.value, true
119}
120
121func evict() {
122 switch ic.policy {
123 case EvictLRU:
124 ic.evictLRU()
125 case EvictLFU:
126 ic.evictLFU()
127 case EvictTTL:
128 ic.evictTTL()
129 }
130}
131
132func evictLRU() {
133 if len(ic.lruQueue.keys) == 0 {
134 return
135 }
136
137 // Remove least recently used item
138 key := ic.lruQueue.keys[0]
139 ic.removeItem(key)
140}
141
142func evictLFU() {
143 if len(ic.lfuHeap) == 0 {
144 return
145 }
146
147 // Remove least frequently used item
148 item := heap.Pop(&ic.lfuHeap).(*lfuItem)
149 ic.removeItem(item.key)
150}
151
152func evictTTL() {
153 now := time.Now()
154 oldestKey := ""
155 oldestTime := now
156
157 for key, item := range ic.items {
158 if item.expiresAt.Before(oldestTime) {
159 oldestTime = item.expiresAt
160 oldestKey = key
161 }
162 }
163
164 if oldestKey != "" {
165 ic.removeItem(oldestKey)
166 }
167}
Exercise 3: Implement Distributed Lock Manager
Build a distributed lock system using Redis with proper lock renewal and deadlock prevention.
Show Solution
1package exercise
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "fmt"
8 "time"
9)
10
11type DistributedLock struct {
12 redis *RedisClient
13 key string
14 value string
15 ttl time.Duration
16 renewCh chan struct{}
17 stopCh chan struct{}
18}
19
20func NewDistributedLock(redis *RedisClient, key string, ttl time.Duration) *DistributedLock {
21 // Generate unique lock value
22 valueBytes := make([]byte, 16)
23 rand.Read(valueBytes)
24 value := hex.EncodeToString(valueBytes)
25
26 return &DistributedLock{
27 redis: redis,
28 key: key,
29 value: value,
30 ttl: ttl,
31 renewCh: make(chan struct{}),
32 stopCh: make(chan struct{}),
33 }
34}
35
36func Acquire(ctx context.Context) {
37 // Try to acquire lock with SET NX EX
38 success, err := dl.redis.rdb.SetNX(ctx, dl.key, dl.value, dl.ttl).Result()
39 if err != nil {
40 return false, fmt.Errorf("failed to acquire lock: %w", err)
41 }
42
43 if !success {
44 return false, nil // Lock not available
45 }
46
47 // Start renewal goroutine
48 go dl.renewLock()
49
50 return true, nil
51}
52
53func renewLock() {
54 ticker := time.NewTicker(dl.ttl / 3) // Renew at 1/3 of TTL
55 defer ticker.Stop()
56
57 for {
58 select {
59 case <-ticker.C:
60 // Renew lock using Lua script
61 script := redis.NewScript(`
62 if redis.call('GET', KEYS[1]) == ARGV[1] then
63 return redis.call('EXPIRE', KEYS[1], ARGV[2])
64 else
65 return 0
66 end
67 `)
68
69 result, err := script.Run(context.Background(), dl.redis.rdb,
70 []string{dl.key}, dl.value, int(dl.ttl.Seconds())).Result()
71
72 if err != nil || result.(int64) == 0 {
73 // Lock lost or renewal failed
74 close(dl.stopCh)
75 return
76 }
77
78 case <-dl.stopCh:
79 return
80 }
81 }
82}
83
84func Release(ctx context.Context) error {
85 close(dl.stopCh)
86
87 // Release lock using Lua script to ensure atomicity
88 script := redis.NewScript(`
89 if redis.call('GET', KEYS[1]) == ARGV[1] then
90 return redis.call('DEL', KEYS[1])
91 else
92 return 0
93 end
94 `)
95
96 result, err := script.Run(ctx, dl.redis.rdb, []string{dl.key}, dl.value).Result()
97 if err != nil {
98 return fmt.Errorf("failed to release lock: %w", err)
99 }
100
101 if result.(int64) == 0 {
102 return fmt.Errorf("lock not owned by this instance")
103 }
104
105 return nil
106}
Further Reading
- Redis Documentation
- Redis Best Practices
- Caching Strategies and Patterns
- go-redis Documentation
- Designing Data-Intensive Applications by Martin Kleppmann
Exercise 4: Implement Redis Cluster with Consistent Hashing
Build a Redis cluster client that uses consistent hashing for automatic failover and load balancing across multiple Redis nodes.
Requirements:
- Consistent hashing implementation for key distribution
- Automatic failover when nodes fail
- Health checking and node discovery
- Support for adding/removing nodes dynamically
- Connection pooling for each node
Show Solution
1package exercise
2
3import (
4 "context"
5 "crypto/md5"
6 "encoding/binary"
7 "fmt"
8 "log"
9 "sort"
10 "sync"
11 "time"
12
13 "github.com/redis/go-redis/v9"
14)
15
16// ConsistentHash implements consistent hashing for distributing keys
17type ConsistentHash struct {
18 circle map[uint32]string
19 sortedHashes []uint32
20 nodes map[string]*redis.Client
21 virtualNodes int
22 mu sync.RWMutex
23}
24
25// RedisClusterClient manages connections to multiple Redis nodes
26type RedisClusterClient struct {
27 hash *ConsistentHash
28 healthChecker *HealthChecker
29 config *ClusterConfig
30}
31
32type ClusterConfig struct {
33 Nodes []NodeConfig
34 VirtualNodes int
35 HealthCheckInterval time.Duration
36 MaxRetries int
37 DialTimeout time.Duration
38 ReadTimeout time.Duration
39 WriteTimeout time.Duration
40}
41
42type NodeConfig struct {
43 Addr string
44 Password string
45 DB int
46}
47
48func NewConsistentHash(virtualNodes int) *ConsistentHash {
49 return &ConsistentHash{
50 circle: make(map[uint32]string),
51 sortedHashes: make([]uint32, 0),
52 nodes: make(map[string]*redis.Client),
53 virtualNodes: virtualNodes,
54 }
55}
56
57// AddNode adds a Redis node to the consistent hash ring
58func (ch *ConsistentHash) AddNode(addr string, client *redis.Client) {
59 ch.mu.Lock()
60 defer ch.mu.Unlock()
61
62 // Add virtual nodes to the ring
63 for i := 0; i < ch.virtualNodes; i++ {
64 hash := ch.hashKey(fmt.Sprintf("%s:%d", addr, i))
65 ch.circle[hash] = addr
66 ch.sortedHashes = append(ch.sortedHashes, hash)
67 }
68
69 // Sort hashes for binary search
70 sort.Slice(ch.sortedHashes, func(i, j int) bool {
71 return ch.sortedHashes[i] < ch.sortedHashes[j]
72 })
73
74 ch.nodes[addr] = client
75 log.Printf("Added node %s to consistent hash ring with %d virtual nodes", addr, ch.virtualNodes)
76}
77
78// RemoveNode removes a Redis node from the consistent hash ring
79func (ch *ConsistentHash) RemoveNode(addr string) {
80 ch.mu.Lock()
81 defer ch.mu.Unlock()
82
83 // Remove virtual nodes from the ring
84 for i := 0; i < ch.virtualNodes; i++ {
85 hash := ch.hashKey(fmt.Sprintf("%s:%d", addr, i))
86 delete(ch.circle, hash)
87 }
88
89 // Rebuild sorted hashes
90 ch.sortedHashes = make([]uint32, 0, len(ch.circle))
91 for hash := range ch.circle {
92 ch.sortedHashes = append(ch.sortedHashes, hash)
93 }
94 sort.Slice(ch.sortedHashes, func(i, j int) bool {
95 return ch.sortedHashes[i] < ch.sortedHashes[j]
96 })
97
98 // Close client connection
99 if client, exists := ch.nodes[addr]; exists {
100 client.Close()
101 delete(ch.nodes, addr)
102 }
103
104 log.Printf("Removed node %s from consistent hash ring", addr)
105}
106
107// GetNode returns the Redis client for a given key
108func (ch *ConsistentHash) GetNode(key string) (*redis.Client, error) {
109 ch.mu.RLock()
110 defer ch.mu.RUnlock()
111
112 if len(ch.circle) == 0 {
113 return nil, fmt.Errorf("no nodes available")
114 }
115
116 hash := ch.hashKey(key)
117
118 // Binary search to find the node
119 idx := sort.Search(len(ch.sortedHashes), func(i int) bool {
120 return ch.sortedHashes[i] >= hash
121 })
122
123 // Wrap around if necessary
124 if idx >= len(ch.sortedHashes) {
125 idx = 0
126 }
127
128 nodeAddr := ch.circle[ch.sortedHashes[idx]]
129 client, exists := ch.nodes[nodeAddr]
130 if !exists {
131 return nil, fmt.Errorf("node %s not found", nodeAddr)
132 }
133
134 return client, nil
135}
136
137// hashKey generates a hash for a key using MD5
138func (ch *ConsistentHash) hashKey(key string) uint32 {
139 hasher := md5.New()
140 hasher.Write([]byte(key))
141 hashBytes := hasher.Sum(nil)
142 return binary.BigEndian.Uint32(hashBytes)
143}
144
145// GetAllNodes returns all active Redis clients
146func (ch *ConsistentHash) GetAllNodes() []*redis.Client {
147 ch.mu.RLock()
148 defer ch.mu.RUnlock()
149
150 clients := make([]*redis.Client, 0, len(ch.nodes))
151 for _, client := range ch.nodes {
152 clients = append(clients, client)
153 }
154 return clients
155}
156
157// HealthChecker monitors node health
158type HealthChecker struct {
159 cluster *RedisClusterClient
160 interval time.Duration
161 stopCh chan struct{}
162}
163
164func NewHealthChecker(cluster *RedisClusterClient, interval time.Duration) *HealthChecker {
165 return &HealthChecker{
166 cluster: cluster,
167 interval: interval,
168 stopCh: make(chan struct{}),
169 }
170}
171
172// Start begins health checking
173func (hc *HealthChecker) Start() {
174 go hc.healthCheckLoop()
175}
176
177// Stop stops health checking
178func (hc *HealthChecker) Stop() {
179 close(hc.stopCh)
180}
181
182func (hc *HealthChecker) healthCheckLoop() {
183 ticker := time.NewTicker(hc.interval)
184 defer ticker.Stop()
185
186 for {
187 select {
188 case <-hc.stopCh:
189 return
190 case <-ticker.C:
191 hc.checkAllNodes()
192 }
193 }
194}
195
196func (hc *HealthChecker) checkAllNodes() {
197 clients := hc.cluster.hash.GetAllNodes()
198
199 for _, client := range clients {
200 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
201 err := client.Ping(ctx).Err()
202 cancel()
203
204 if err != nil {
205 log.Printf("Health check failed for node: %v", err)
206 // Remove unhealthy node
207 // hc.cluster.hash.RemoveNode(addr)
208 }
209 }
210}
211
212func NewRedisClusterClient(config *ClusterConfig) (*RedisClusterClient, error) {
213 hash := NewConsistentHash(config.VirtualNodes)
214
215 // Create clients for all nodes
216 for _, nodeConfig := range config.Nodes {
217 client := redis.NewClient(&redis.Options{
218 Addr: nodeConfig.Addr,
219 Password: nodeConfig.Password,
220 DB: nodeConfig.DB,
221 DialTimeout: config.DialTimeout,
222 ReadTimeout: config.ReadTimeout,
223 WriteTimeout: config.WriteTimeout,
224 MaxRetries: config.MaxRetries,
225 PoolSize: 10,
226 MinIdleConns: 5,
227 })
228
229 // Test connection
230 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
231 err := client.Ping(ctx).Err()
232 cancel()
233
234 if err != nil {
235 log.Printf("Failed to connect to node %s: %v", nodeConfig.Addr, err)
236 continue
237 }
238
239 hash.AddNode(nodeConfig.Addr, client)
240 }
241
242 if len(hash.nodes) == 0 {
243 return nil, fmt.Errorf("no Redis nodes available")
244 }
245
246 cluster := &RedisClusterClient{
247 hash: hash,
248 config: config,
249 }
250
251 // Start health checker
252 cluster.healthChecker = NewHealthChecker(cluster, config.HealthCheckInterval)
253 cluster.healthChecker.Start()
254
255 log.Printf("Redis cluster client initialized with %d nodes", len(hash.nodes))
256 return cluster, nil
257}
258
259// Set sets a key-value pair in the cluster
260func (rcc *RedisClusterClient) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
261 client, err := rcc.hash.GetNode(key)
262 if err != nil {
263 return fmt.Errorf("failed to get node for key %s: %w", key, err)
264 }
265
266 return client.Set(ctx, key, value, ttl).Err()
267}
268
269// Get retrieves a value from the cluster
270func (rcc *RedisClusterClient) Get(ctx context.Context, key string) (string, error) {
271 client, err := rcc.hash.GetNode(key)
272 if err != nil {
273 return "", fmt.Errorf("failed to get node for key %s: %w", key, err)
274 }
275
276 val, err := client.Get(ctx, key).Result()
277 if err == redis.Nil {
278 return "", nil
279 }
280 return val, err
281}
282
283// Delete removes a key from the cluster
284func (rcc *RedisClusterClient) Delete(ctx context.Context, key string) error {
285 client, err := rcc.hash.GetNode(key)
286 if err != nil {
287 return fmt.Errorf("failed to get node for key %s: %w", key, err)
288 }
289
290 return client.Del(ctx, key).Err()
291}
292
293// MGet retrieves multiple keys from the cluster
294func (rcc *RedisClusterClient) MGet(ctx context.Context, keys []string) (map[string]string, error) {
295 // Group keys by node
296 nodeKeys := make(map[*redis.Client][]string)
297 keyToNode := make(map[string]*redis.Client)
298
299 for _, key := range keys {
300 client, err := rcc.hash.GetNode(key)
301 if err != nil {
302 log.Printf("Failed to get node for key %s: %v", key, err)
303 continue
304 }
305
306 nodeKeys[client] = append(nodeKeys[client], key)
307 keyToNode[key] = client
308 }
309
310 // Execute MGet for each node in parallel
311 var wg sync.WaitGroup
312 resultCh := make(chan map[string]string, len(nodeKeys))
313 errorCh := make(chan error, len(nodeKeys))
314
315 for client, nodeKeyList := range nodeKeys {
316 wg.Add(1)
317 go func(c *redis.Client, keys []string) {
318 defer wg.Done()
319
320 results, err := c.MGet(ctx, keys...).Result()
321 if err != nil {
322 errorCh <- err
323 return
324 }
325
326 resultMap := make(map[string]string)
327 for i, key := range keys {
328 if results[i] != nil {
329 resultMap[key] = results[i].(string)
330 }
331 }
332 resultCh <- resultMap
333 }(client, nodeKeyList)
334 }
335
336 wg.Wait()
337 close(resultCh)
338 close(errorCh)
339
340 // Check for errors
341 if len(errorCh) > 0 {
342 return nil, <-errorCh
343 }
344
345 // Merge results
346 finalResults := make(map[string]string)
347 for result := range resultCh {
348 for k, v := range result {
349 finalResults[k] = v
350 }
351 }
352
353 return finalResults, nil
354}
355
356// Pipeline executes multiple commands in a pipeline
357func (rcc *RedisClusterClient) Pipeline(ctx context.Context, commands []PipelineCommand) error {
358 // Group commands by node
359 nodeCommands := make(map[*redis.Client][]PipelineCommand)
360
361 for _, cmd := range commands {
362 client, err := rcc.hash.GetNode(cmd.Key)
363 if err != nil {
364 log.Printf("Failed to get node for key %s: %v", cmd.Key, err)
365 continue
366 }
367
368 nodeCommands[client] = append(nodeCommands[client], cmd)
369 }
370
371 // Execute pipelines for each node
372 var wg sync.WaitGroup
373 errorCh := make(chan error, len(nodeCommands))
374
375 for client, cmds := range nodeCommands {
376 wg.Add(1)
377 go func(c *redis.Client, commands []PipelineCommand) {
378 defer wg.Done()
379
380 pipe := c.Pipeline()
381 for _, cmd := range commands {
382 switch cmd.Op {
383 case "SET":
384 pipe.Set(ctx, cmd.Key, cmd.Value, cmd.TTL)
385 case "GET":
386 pipe.Get(ctx, cmd.Key)
387 case "DEL":
388 pipe.Del(ctx, cmd.Key)
389 }
390 }
391
392 _, err := pipe.Exec(ctx)
393 if err != nil {
394 errorCh <- err
395 }
396 }(client, cmds)
397 }
398
399 wg.Wait()
400 close(errorCh)
401
402 if len(errorCh) > 0 {
403 return <-errorCh
404 }
405
406 return nil
407}
408
409type PipelineCommand struct {
410 Op string
411 Key string
412 Value interface{}
413 TTL time.Duration
414}
415
416// Close closes all Redis connections
417func (rcc *RedisClusterClient) Close() error {
418 rcc.healthChecker.Stop()
419
420 clients := rcc.hash.GetAllNodes()
421 for _, client := range clients {
422 if err := client.Close(); err != nil {
423 log.Printf("Error closing Redis client: %v", err)
424 }
425 }
426
427 return nil
428}
429
430// Example usage
431func ExampleRedisCluster() {
432 config := &ClusterConfig{
433 Nodes: []NodeConfig{
434 {Addr: "localhost:6379", Password: "", DB: 0},
435 {Addr: "localhost:6380", Password: "", DB: 0},
436 {Addr: "localhost:6381", Password: "", DB: 0},
437 },
438 VirtualNodes: 150,
439 HealthCheckInterval: 10 * time.Second,
440 MaxRetries: 3,
441 DialTimeout: 5 * time.Second,
442 ReadTimeout: 3 * time.Second,
443 WriteTimeout: 3 * time.Second,
444 }
445
446 cluster, err := NewRedisClusterClient(config)
447 if err != nil {
448 log.Fatal(err)
449 }
450 defer cluster.Close()
451
452 ctx := context.Background()
453
454 // Set values
455 for i := 0; i < 100; i++ {
456 key := fmt.Sprintf("key:%d", i)
457 value := fmt.Sprintf("value:%d", i)
458 if err := cluster.Set(ctx, key, value, 1*time.Hour); err != nil {
459 log.Printf("Failed to set %s: %v", key, err)
460 }
461 }
462
463 // Get values
464 val, err := cluster.Get(ctx, "key:42")
465 if err != nil {
466 log.Printf("Failed to get key:42: %v", err)
467 } else {
468 log.Printf("key:42 = %s", val)
469 }
470
471 // MGet example
472 keys := []string{"key:1", "key:2", "key:3", "key:4", "key:5"}
473 results, err := cluster.MGet(ctx, keys)
474 if err != nil {
475 log.Printf("MGet failed: %v", err)
476 } else {
477 log.Printf("MGet results: %+v", results)
478 }
479}
Exercise 5: Build Cache-Aside Pattern with Cache Warming and Probabilistic Early Expiration
Implement an advanced cache-aside pattern that includes cache warming, probabilistic early expiration to prevent thundering herd, and automatic stale-while-revalidate.
Requirements:
- Cache-aside pattern with automatic population
- Cache warming on startup
- Probabilistic early expiration (XFetch algorithm)
- Stale-while-revalidate for better availability
- Background refresh for hot keys
- Metrics and monitoring
Show Solution
1package exercise
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "math"
9 "math/rand"
10 "sync"
11 "time"
12
13 "github.com/redis/go-redis/v9"
14)
15
16// AdvancedCache implements cache-aside with probabilistic early expiration
17type AdvancedCache struct {
18 redis *redis.Client
19 database Database
20 metrics *CacheMetrics
21 refreshQueue chan string
22 hotKeys *HotKeyTracker
23 config *AdvancedCacheConfig
24 stopCh chan struct{}
25 wg sync.WaitGroup
26}
27
28type AdvancedCacheConfig struct {
29 DefaultTTL time.Duration
30 StaleWhileRevalidate time.Duration
31 Beta float64 // XFetch beta parameter (typically 1.0)
32 HotKeyThreshold int // Access count threshold for hot keys
33 RefreshWorkers int // Number of background refresh workers
34 WarmupKeys []string // Keys to warm up on startup
35}
36
37type Database interface {
38 Get(ctx context.Context, key string) (interface{}, error)
39 Set(ctx context.Context, key string, value interface{}) error
40}
41
42type CacheMetrics struct {
43 hits int64
44 misses int64
45 refreshes int64
46 errors int64
47 mu sync.RWMutex
48}
49
50type HotKeyTracker struct {
51 accessCounts map[string]*AccessInfo
52 threshold int
53 mu sync.RWMutex
54}
55
56type AccessInfo struct {
57 Count int
58 LastAccess time.Time
59}
60
61type CacheEntry struct {
62 Value interface{}
63 CachedAt time.Time
64 ExpiresAt time.Time
65 IsStale bool
66}
67
68func NewAdvancedCache(rdb *redis.Client, db Database, config *AdvancedCacheConfig) *AdvancedCache {
69 cache := &AdvancedCache{
70 redis: rdb,
71 database: db,
72 metrics: &CacheMetrics{},
73 refreshQueue: make(chan string, 1000),
74 hotKeys: &HotKeyTracker{
75 accessCounts: make(map[string]*AccessInfo),
76 threshold: config.HotKeyThreshold,
77 },
78 config: config,
79 stopCh: make(chan struct{}),
80 }
81
82 // Start background refresh workers
83 for i := 0; i < config.RefreshWorkers; i++ {
84 cache.wg.Add(1)
85 go cache.refreshWorker()
86 }
87
88 // Start hot key monitor
89 cache.wg.Add(1)
90 go cache.hotKeyMonitor()
91
92 // Perform cache warming
93 if len(config.WarmupKeys) > 0 {
94 cache.warmCache()
95 }
96
97 return cache
98}
99
100// Get retrieves a value with probabilistic early expiration
101func (ac *AdvancedCache) Get(ctx context.Context, key string) (interface{}, error) {
102 // Track hot keys
103 ac.hotKeys.RecordAccess(key)
104
105 // Try to get from cache with metadata
106 entry, err := ac.getCacheEntry(ctx, key)
107 if err != nil && err != redis.Nil {
108 ac.metrics.RecordError()
109 log.Printf("Redis error for key %s: %v", key, err)
110 }
111
112 // Cache hit - check if we should probabilistically refresh
113 if entry != nil && !entry.IsStale {
114 ac.metrics.RecordHit()
115
116 // Implement XFetch probabilistic early expiration
117 if ac.shouldRefreshEarly(entry) {
118 // Trigger background refresh
119 select {
120 case ac.refreshQueue <- key:
121 ac.metrics.RecordRefresh()
122 default:
123 // Queue full, skip refresh
124 }
125 }
126
127 return entry.Value, nil
128 }
129
130 // Cache miss or stale - decide whether to serve stale data
131 if entry != nil && entry.IsStale {
132 // Serve stale data while revalidating in background
133 select {
134 case ac.refreshQueue <- key:
135 ac.metrics.RecordRefresh()
136 default:
137 }
138
139 log.Printf("Serving stale data for key %s", key)
140 return entry.Value, nil
141 }
142
143 // Cache miss - fetch from database
144 ac.metrics.RecordMiss()
145 return ac.fetchAndCache(ctx, key)
146}
147
148// getCacheEntry retrieves cache entry with metadata
149func (ac *AdvancedCache) getCacheEntry(ctx context.Context, key string) (*CacheEntry, error) {
150 // Get cache data with metadata
151 data, err := ac.redis.HGetAll(ctx, ac.metaKey(key)).Result()
152 if err != nil || len(data) == 0 {
153 return nil, redis.Nil
154 }
155
156 var entry CacheEntry
157
158 // Parse cached value
159 if valueStr, exists := data["value"]; exists {
160 if err := json.Unmarshal([]byte(valueStr), &entry.Value); err != nil {
161 return nil, fmt.Errorf("failed to unmarshal value: %w", err)
162 }
163 }
164
165 // Parse timestamps
166 if cachedAtStr, exists := data["cached_at"]; exists {
167 var cachedAtUnix int64
168 fmt.Sscanf(cachedAtStr, "%d", &cachedAtUnix)
169 entry.CachedAt = time.Unix(cachedAtUnix, 0)
170 }
171
172 if expiresAtStr, exists := data["expires_at"]; exists {
173 var expiresAtUnix int64
174 fmt.Sscanf(expiresAtStr, "%d", &expiresAtUnix)
175 entry.ExpiresAt = time.Unix(expiresAtUnix, 0)
176 }
177
178 // Check if stale
179 now := time.Now()
180 if now.After(entry.ExpiresAt) {
181 // Check if within stale-while-revalidate window
182 if now.Before(entry.ExpiresAt.Add(ac.config.StaleWhileRevalidate)) {
183 entry.IsStale = true
184 return &entry, nil
185 }
186 // Too old, treat as miss
187 return nil, redis.Nil
188 }
189
190 return &entry, nil
191}
192
193// shouldRefreshEarly implements XFetch probabilistic early expiration
194func (ac *AdvancedCache) shouldRefreshEarly(entry *CacheEntry) bool {
195 now := time.Now()
196 delta := entry.ExpiresAt.Sub(now).Seconds()
197
198 if delta <= 0 {
199 return true // Already expired
200 }
201
202 // Calculate time since cached
203 age := now.Sub(entry.CachedAt).Seconds()
204
205 // XFetch formula: probability = beta * age / ttl
206 // This gives higher probability as entry gets older
207 ttl := entry.ExpiresAt.Sub(entry.CachedAt).Seconds()
208 probability := ac.config.Beta * age / ttl
209
210 // Random decision
211 return rand.Float64() < probability
212}
213
214// fetchAndCache fetches data from database and caches it
215func (ac *AdvancedCache) fetchAndCache(ctx context.Context, key string) (interface{}, error) {
216 // Fetch from database
217 value, err := ac.database.Get(ctx, key)
218 if err != nil {
219 return nil, fmt.Errorf("database fetch failed: %w", err)
220 }
221
222 // Cache the value with metadata
223 go ac.cacheValue(context.Background(), key, value)
224
225 return value, nil
226}
227
228// cacheValue stores value in cache with metadata
229func (ac *AdvancedCache) cacheValue(ctx context.Context, key string, value interface{}) {
230 // Serialize value
231 valueData, err := json.Marshal(value)
232 if err != nil {
233 log.Printf("Failed to marshal value for key %s: %v", key, err)
234 return
235 }
236
237 now := time.Now()
238 expiresAt := now.Add(ac.config.DefaultTTL)
239
240 // Store with metadata
241 data := map[string]interface{}{
242 "value": string(valueData),
243 "cached_at": now.Unix(),
244 "expires_at": expiresAt.Unix(),
245 }
246
247 pipe := ac.redis.Pipeline()
248 pipe.HMSet(ctx, ac.metaKey(key), data)
249 pipe.Expire(ctx, ac.metaKey(key), ac.config.DefaultTTL+ac.config.StaleWhileRevalidate)
250
251 if _, err := pipe.Exec(ctx); err != nil {
252 log.Printf("Failed to cache value for key %s: %v", key, err)
253 }
254}
255
256// refreshWorker processes background refresh queue
257func (ac *AdvancedCache) refreshWorker() {
258 defer ac.wg.Done()
259
260 for {
261 select {
262 case key := <-ac.refreshQueue:
263 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
264 value, err := ac.database.Get(ctx, key)
265 cancel()
266
267 if err != nil {
268 log.Printf("Background refresh failed for key %s: %v", key, err)
269 continue
270 }
271
272 ac.cacheValue(context.Background(), key, value)
273 log.Printf("Background refreshed key %s", key)
274
275 case <-ac.stopCh:
276 return
277 }
278 }
279}
280
281// hotKeyMonitor tracks and refreshes hot keys proactively
282func (ac *AdvancedCache) hotKeyMonitor() {
283 defer ac.wg.Done()
284
285 ticker := time.NewTicker(10 * time.Second)
286 defer ticker.Stop()
287
288 for {
289 select {
290 case <-ticker.C:
291 ac.refreshHotKeys()
292
293 case <-ac.stopCh:
294 return
295 }
296 }
297}
298
299// refreshHotKeys proactively refreshes frequently accessed keys
300func (ac *AdvancedCache) refreshHotKeys() {
301 hotKeys := ac.hotKeys.GetHotKeys()
302
303 for _, key := range hotKeys {
304 select {
305 case ac.refreshQueue <- key:
306 log.Printf("Proactively refreshing hot key: %s", key)
307 default:
308 // Queue full, skip
309 }
310 }
311}
312
313// warmCache performs initial cache warming
314func (ac *AdvancedCache) warmCache() {
315 log.Printf("Starting cache warming for %d keys", len(ac.config.WarmupKeys))
316
317 var wg sync.WaitGroup
318 semaphore := make(chan struct{}, 10) // Limit concurrent warming operations
319
320 for _, key := range ac.config.WarmupKeys {
321 wg.Add(1)
322 go func(k string) {
323 defer wg.Done()
324
325 semaphore <- struct{}{}
326 defer func() { <-semaphore }()
327
328 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
329 defer cancel()
330
331 value, err := ac.database.Get(ctx, k)
332 if err != nil {
333 log.Printf("Failed to warm key %s: %v", k, err)
334 return
335 }
336
337 ac.cacheValue(context.Background(), k, value)
338 log.Printf("Warmed key %s", k)
339 }(key)
340 }
341
342 wg.Wait()
343 log.Printf("Cache warming completed")
344}
345
346// metaKey generates cache key with metadata prefix
347func (ac *AdvancedCache) metaKey(key string) string {
348 return fmt.Sprintf("cache:meta:%s", key)
349}
350
351// RecordAccess tracks key access for hot key detection
352func (hkt *HotKeyTracker) RecordAccess(key string) {
353 hkt.mu.Lock()
354 defer hkt.mu.Unlock()
355
356 info, exists := hkt.accessCounts[key]
357 if !exists {
358 info = &AccessInfo{}
359 hkt.accessCounts[key] = info
360 }
361
362 info.Count++
363 info.LastAccess = time.Now()
364
365 // Cleanup old entries periodically
366 if len(hkt.accessCounts) > 10000 {
367 hkt.cleanup()
368 }
369}
370
371// GetHotKeys returns keys exceeding access threshold
372func (hkt *HotKeyTracker) GetHotKeys() []string {
373 hkt.mu.RLock()
374 defer hkt.mu.RUnlock()
375
376 var hotKeys []string
377 cutoff := time.Now().Add(-1 * time.Minute)
378
379 for key, info := range hkt.accessCounts {
380 if info.Count >= hkt.threshold && info.LastAccess.After(cutoff) {
381 hotKeys = append(hotKeys, key)
382 }
383 }
384
385 return hotKeys
386}
387
388// cleanup removes stale access tracking entries
389func (hkt *HotKeyTracker) cleanup() {
390 cutoff := time.Now().Add(-5 * time.Minute)
391
392 for key, info := range hkt.accessCounts {
393 if info.LastAccess.Before(cutoff) {
394 delete(hkt.accessCounts, key)
395 }
396 }
397}
398
399// Metrics methods
400func (cm *CacheMetrics) RecordHit() {
401 cm.mu.Lock()
402 cm.hits++
403 cm.mu.Unlock()
404}
405
406func (cm *CacheMetrics) RecordMiss() {
407 cm.mu.Lock()
408 cm.misses++
409 cm.mu.Unlock()
410}
411
412func (cm *CacheMetrics) RecordRefresh() {
413 cm.mu.Lock()
414 cm.refreshes++
415 cm.mu.Unlock()
416}
417
418func (cm *CacheMetrics) RecordError() {
419 cm.mu.Lock()
420 cm.errors++
421 cm.mu.Unlock()
422}
423
424func (cm *CacheMetrics) GetStats() map[string]int64 {
425 cm.mu.RLock()
426 defer cm.mu.RUnlock()
427
428 total := cm.hits + cm.misses
429 hitRate := float64(0)
430 if total > 0 {
431 hitRate = float64(cm.hits) / float64(total) * 100
432 }
433
434 return map[string]int64{
435 "hits": cm.hits,
436 "misses": cm.misses,
437 "refreshes": cm.refreshes,
438 "errors": cm.errors,
439 "hit_rate": int64(math.Round(hitRate)),
440 }
441}
442
443// Close stops all background workers
444func (ac *AdvancedCache) Close() error {
445 close(ac.stopCh)
446 ac.wg.Wait()
447 return nil
448}
449
450// Example usage
451func ExampleAdvancedCache() {
452 rdb := redis.NewClient(&redis.Options{
453 Addr: "localhost:6379",
454 })
455
456 // Mock database
457 var db Database
458
459 config := &AdvancedCacheConfig{
460 DefaultTTL: 30 * time.Minute,
461 StaleWhileRevalidate: 5 * time.Minute,
462 Beta: 1.0,
463 HotKeyThreshold: 100,
464 RefreshWorkers: 3,
465 WarmupKeys: []string{"popular:product:1", "popular:product:2"},
466 }
467
468 cache := NewAdvancedCache(rdb, db, config)
469 defer cache.Close()
470
471 ctx := context.Background()
472
473 // Get value (will cache miss and fetch from database)
474 value, err := cache.Get(ctx, "user:12345")
475 if err != nil {
476 log.Printf("Failed to get value: %v", err)
477 } else {
478 log.Printf("Value: %+v", value)
479 }
480
481 // Subsequent gets will hit cache
482 for i := 0; i < 10; i++ {
483 value, _ = cache.Get(ctx, "user:12345")
484 }
485
486 // Print metrics
487 stats := cache.metrics.GetStats()
488 log.Printf("Cache stats: %+v", stats)
489}
Practice Exercises
Summary
Key Takeaways
- Choose the Right Pattern: Cache-aside for general use, write-through for consistency, write-behind for high throughput
- Select Appropriate Data Structures: Strings for simple values, hashes for objects, lists for queues, sorted sets for rankings
- Handle Cache Stampedes: Use single-flight patterns and lock-based protection
- Implement Proper TTL: Set appropriate expiration times and manage memory effectively
- Monitor Performance: Track hit rates, latency, and error rates
- Design for Failure: Build resilient systems that can handle cache unavailability
- Use Pipelining: Batch operations for better performance
Production Checklist
- Connection Pooling: Configure appropriate pool sizes and timeouts
- TTL Management: Set reasonable expiration times for different data types
- Error Handling: Handle Redis failures gracefully with fallbacks
- Monitoring: Track hit rates, latency, memory usage, and error rates
- Cache Invalidation: Implement proper invalidation strategies
- Serialization: Use efficient formats
- Security: Implement authentication and encryption for sensitive data
- Testing: Test cache behavior, expiration, and failure scenarios
When to Use Each Pattern
Cache-Aside Pattern:
- General-purpose caching
- Data that changes frequently
- When you want control over cache population
- Most web applications and APIs
Write-Through Pattern:
- Data consistency is critical
- Write-heavy workloads
- Configuration and user preference data
- When you can tolerate slightly slower writes
Write-Behind Pattern:
- Very high write throughput requirements
- Bulk operations and analytics
- Logging and telemetry data
- When you can tolerate eventual consistency
💡 Final Insight: Caching is not a silver bullet, but it's one of the most effective ways to improve application performance. The key is understanding your data access patterns and choosing the right strategy for each use case.
Next Steps
- Practice: Build a complete caching system for a real application
- Experiment: Try different Redis data structures and patterns
- Deepen Knowledge: Read Redis documentation and best practices
- Production Readiness: Learn about Redis clustering, sentinel, and scaling
- Advanced Topics: Study distributed caching patterns and consistency models
Remember: Caching is about trade-offs. Understand what you're optimizing for and choose the right approach for your specific needs.