Distributed Caching with Redis

Why This Matters - The Performance Revolution

Consider a bustling e-commerce platform during a flash sale event. Thousands of customers are simultaneously browsing products, checking prices, and adding items to carts. Without proper caching, each request would hammer the database, causing response times to skyrocket from milliseconds to seconds. Customers would abandon their carts, and revenue would plummet.

This is where distributed caching becomes the unsung hero of modern applications. It acts like a super-fast memory layer that keeps frequently accessed data close to your users, transforming sluggish applications into lightning-fast experiences that delight customers.

Real-world impact: Companies like Twitter process 500 million tweets per day using Redis clusters to serve user timelines in sub-millisecond time. Reddit handles millions of page views by caching comment threads and user data. Instagram serves billions of image metadata requests through Redis, making user experiences feel instantaneous.

Without distributed caching, these platforms would require thousands of additional database servers and still couldn't achieve the same performance levels. The economics are staggering - caching can reduce database costs by 80-90% while improving performance by 10-100x.

Learning Objectives

By the end of this article, you will be able to:

  1. Choose appropriate caching patterns for different use cases and avoid common pitfalls
  2. Implement production-ready Redis clients with proper connection pooling and error handling
  3. Leverage Redis data structures effectively
  4. Design cache-aside, write-through, and write-behind patterns with proper invalidation
  5. Build distributed caching systems using consistent hashing and Redis Cluster
  6. Apply performance optimization techniques like pipelining and Lua scripts
  7. Monitor and troubleshoot Redis performance in production environments

Core Concepts - Understanding Caching Landscape

The Caching Dilemma: Speed vs Consistency

Before diving into implementations, let's understand the fundamental trade-offs that every caching system must balance:

Speed: Cached data is served from memory vs database queries
Consistency: Cached data might be stale vs database data is always current

💡 Critical Insight: The right caching strategy isn't about choosing speed OR consistency—it's about finding the sweet spot for your specific use case. Some data can be slightly stale for massive performance gains, while other data must always be current even if it's slower.

When to Cache

Perfect Caching Candidates:

  • User Profile Data: Changes infrequently, read constantly
  • Product Catalog: Updates in batches, browsed continuously
  • API Responses: Computationally expensive, same inputs produce same outputs
  • Session Data: Frequently accessed during user interactions
  • Configuration: Rarely changes, constantly read
 1// Perfect for caching:
 2type UserProfile struct {
 3    ID       string    `json:"id"`
 4    Name     string    `json:"name"`
 5    Email    string    `json:"email"`
 6    Settings UserSettings `json:"settings"`
 7    LastLogin time.Time `json:"last_login"`
 8}
 9// Read 1000x/day, updated maybe 1x/month
10// 99.9% cache hit rate achievable

Poor Caching Candidates:

  • Real-time Stock Prices: Change constantly, staleness is unacceptable
  • Bank Account Balances: Must always be accurate for compliance
  • High-frequency Trading Data: Milliseconds of staleness cause huge losses
  • User Password Hashes: Security-sensitive, rarely accessed
 1// BAD for caching:
 2type StockPrice struct {
 3    Symbol    string    `json:"symbol"`
 4    Price     float64   `json:"price"`
 5    Timestamp time.Time `json:"timestamp"`
 6    Volume    int64     `json:"volume"`
 7}
 8// Changes multiple times per second
 9// Stale data causes financial losses
10// Cache invalidation overhead exceeds benefit

⚠️ Important: Not all data should be cached. Cache data that is read more often than written, where slight staleness is acceptable, and where performance improvements justify the added complexity.

Practical Examples - Building Production-Ready Caching Systems

Redis Fundamentals: Beyond Simple Key-Value

Redis isn't just a simple key-value store—it's a multi-model database with sophisticated data structures that enable complex operations to be performed atomically and efficiently.

Production-Ready Redis Client

  1package redis
  2
  3import (
  4    "context"
  5    "fmt"
  6    "log"
  7    "time"
  8
  9    "github.com/redis/go-redis/v9"
 10)
 11
 12type RedisClient struct {
 13    rdb *redis.Client
 14    config *Config
 15}
 16
 17type Config struct {
 18    Addr         string        // Redis server address
 19    Password     string        // Redis password
 20    DB           int           // Database number
 21    PoolSize     int           // Connection pool size
 22    MinIdleConns int           // Minimum idle connections
 23    MaxRetries   int           // Maximum retry attempts
 24    DialTimeout  time.Duration // Connection timeout
 25    ReadTimeout  time.Duration // Read timeout
 26    WriteTimeout time.Duration // Write timeout
 27}
 28
 29func NewRedisClient(config *Config) {
 30    rdb := redis.NewClient(&redis.Options{
 31        Addr:         config.Addr,
 32        Password:     config.Password,
 33        DB:           config.DB,
 34
 35        // Connection pool settings for production performance
 36        PoolSize:     config.PoolSize,           // Max connections in pool
 37        MinIdleConns: config.MinIdleConns,        // Min idle connections to maintain
 38        MaxIdleConns: config.PoolSize / 2,         // Max idle connections
 39        ConnMaxIdleTime: 5 * time.Minute,          // Close idle connections after 5 min
 40        ConnMaxLifetime: 30 * time.Minute,         // Reconnect after 30 min
 41        PoolTimeout: 4 * time.Second,              // Wait time for connection
 42
 43        // Performance and reliability settings
 44        DialTimeout:  config.DialTimeout,            // Connection establishment timeout
 45        ReadTimeout:  config.ReadTimeout,             // Read operation timeout
 46        WriteTimeout: config.WriteTimeout,            // Write operation timeout
 47
 48        // Retry settings
 49        MaxRetries:      config.MaxRetries,         // Maximum retry attempts
 50        MinRetryBackoff: 8 * time.Millisecond,     // Initial retry backoff
 51        MaxRetryBackoff: 512 * time.Millisecond,    // Maximum retry backoff
 52
 53        // Advanced settings
 54        ReadOnly: false,              // Allow read-only commands
 55        RouteByLatency: false,       // Route to least-latency node
 56    })
 57
 58    // Test connection immediately
 59    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 60    defer cancel()
 61
 62    if err := rdb.Ping(ctx).Err(); err != nil {
 63        return nil, fmt.Errorf("failed to connect to Redis: %w", err)
 64    }
 65
 66    client := &RedisClient{
 67        rdb:    rdb,
 68        config: config,
 69    }
 70
 71    log.Printf("Successfully connected to Redis at %s", config.Addr, config.DB)
 72    return client, nil
 73}
 74
 75// Enhanced Get with metrics and error handling
 76func Get(ctx context.Context, key string) {
 77    start := time.Now()
 78
 79    val, err := rc.rdb.Get(ctx, key).Result()
 80
 81    // Record metrics
 82    rc.recordLatency("GET", time.Since(start))
 83
 84    if err == redis.Nil {
 85        // Key not found - this is expected behavior, not an error
 86        return "", nil
 87    }
 88
 89    if err != nil {
 90        rc.recordError("GET", err)
 91        return "", fmt.Errorf("Redis GET failed for key %s: %w", key, err)
 92    }
 93
 94    rc.recordHit("GET")
 95    return val, nil
 96}
 97
 98// Enhanced Set with TTL and metrics
 99func Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
100    start := time.Now()
101
102    err := rc.rdb.Set(ctx, key, value, ttl).Err()
103
104    rc.recordLatency("SET", time.Since(start))
105
106    if err != nil {
107        rc.recordError("SET", err)
108        return fmt.Errorf("Redis SET failed for key %s: %w", key, err)
109    }
110
111    rc.recordOperation("SET")
112    return nil
113}
114
115// Atomic Get-And-Set operation with retry logic
116func GetAndSet(ctx context.Context, key string, value interface{}, ttl time.Duration) {
117    // Use Lua script for atomic get-and-set
118    script := redis.NewScript(`
119        local current_value = redis.call('GET', KEYS[1])
120        if current_value == false then
121            redis.call('SET', KEYS[1], ARGV[1], ARGV[2])
122            return {nil, 1}
123        else
124            return {current_value, 0}
125        end
126    `)
127
128    result, err := script.Run(ctx, rc.rdb, []string{key}, value, int(ttl.Seconds())).Result()
129    if err != nil {
130        return "", false, fmt.Errorf("GET-AND-SET failed: %w", err)
131    }
132
133    // Parse the result array: [current_value, was_set]
134    resultSlice, ok := result.([]interface{})
135    if !ok || len(resultSlice) != 2 {
136        return "", false, fmt.Errorf("unexpected script result format")
137    }
138
139    var currentVal string
140    if resultSlice[0] != nil {
141        currentVal, ok = resultSlice[0].(string)
142        if !ok {
143            return "", false, fmt.Errorf("failed to parse current value")
144        }
145    }
146
147    wasSet, ok := resultSlice[1].(int64)
148    if !ok {
149        return "", false, fmt.Errorf("failed to parse was_set flag")
150    }
151
152    return currentVal, wasSet == 1, nil
153}
154
155// Metrics recording methods
156func recordLatency(operation string, duration time.Duration) {
157    // Record operation latency for monitoring
158    log.Printf("Redis %s took %v", operation, duration)
159}
160
161func recordHit(operation string) {
162    // Record cache hit
163    log.Printf("Redis %s cache hit", operation)
164}
165
166func recordError(operation string, err error) {
167    // Record cache error
168    log.Printf("Redis %s error: %v", operation, err)
169}
170
171func recordOperation(operation string) {
172    // Record successful operation
173    log.Printf("Redis %s operation successful", operation)
174}

Redis Data Structures: The Right Tool for Every Job

Redis provides multiple data structures that excel at different use cases. Choosing the right structure is crucial for performance and simplicity.

Strings: Simple but Powerful

 1// String operations for basic caching
 2type StringCache struct {
 3    client *RedisClient
 4}
 5
 6func NewStringCache(client *RedisClient) *StringCache {
 7    return &StringCache{client: client}
 8}
 9
10// Cache JSON objects as strings
11func CacheUser(ctx context.Context, user *User) error {
12    // Serialize user to JSON
13    userData, err := json.Marshal(user)
14    if err != nil {
15        return fmt.Errorf("failed to serialize user: %w", err)
16    }
17
18    // Cache with 30-minute TTL
19    key := fmt.Sprintf("user:%s", user.ID)
20    return sc.client.Set(ctx, key, userData, 30*time.Minute)
21}
22
23func GetUser(ctx context.Context, userID string) {
24    key := fmt.Sprintf("user:%s", userID)
25    userData, err := sc.client.Get(ctx, key)
26    if err != nil {
27        return nil, fmt.Errorf("failed to get user from cache: %w", err)
28    }
29
30    if userData == "" {
31        return nil, nil // Cache miss
32    }
33
34    var user User
35    if err := json.Unmarshal([]byte(userData), &user); err != nil {
36        return nil, fmt.Errorf("failed to deserialize user: %w", err)
37    }
38
39    return &user, nil
40}
41
42// Atomic counters for rate limiting and analytics
43func IncrementAPIRate(ctx context.Context, clientIP string, window time.Duration) {
44    // Use IP as key with expiration
45    key := fmt.Sprintf("rate_limit:%s", clientIP)
46
47    // Increment and set expiration atomically
48    script := redis.NewScript(`
49        local current = redis.call('INCR', KEYS[1])
50        if current == 1 then
51            redis.call('EXPIRE', KEYS[1], ARGV[1])
52        end
53        return current
54    `)
55
56    result, err := script.Run(ctx, sc.client.rdb, []string{key}, int(window.Seconds())).Result()
57    if err != nil {
58        return 0, fmt.Errorf("failed to increment rate limit: %w", err)
59    }
60
61    return result.(int64), nil
62}
63
64// Distributed lock implementation
65func AcquireLock(ctx context.Context, lockKey string, ttl time.Duration) {
66    // Use SET NX EX for atomic lock acquisition
67    result, err := sc.client.rdb.SetNX(ctx, lockKey, "locked", ttl).Result()
68    if err != nil {
69        return false, fmt.Errorf("failed to acquire lock: %w", err)
70    }
71
72    return result, nil
73}

Hashes: Perfect for Object Caching

 1// Hash operations for structured data caching
 2type HashCache struct {
 3    client *RedisClient
 4}
 5
 6func NewHashCache(client *RedisClient) *HashCache {
 7    return &HashCache{client: client}
 8}
 9
10// Cache user profile as hash
11func CacheUserProfile(ctx context.Context, user *User) error {
12    key := fmt.Sprintf("user_profile:%s", user.ID)
13
14    // Store user fields as hash
15    fields := map[string]interface{}{
16        "name":       user.Name,
17        "email":      user.Email,
18        "created_at": user.CreatedAt.Unix(),
19        "last_login": user.LastLogin.Unix(),
20        "tier":       user.Tier,
21    }
22
23    // Set all fields in one operation
24    err := hc.client.rdb.HMSet(ctx, key, fields).Err()
25    if err != nil {
26        return fmt.Errorf("failed to cache user profile hash: %w", err)
27    }
28
29    // Set TTL on entire hash
30    return hc.client.rdb.Expire(ctx, key, 24*time.Hour).Err()
31}
32
33// Get specific fields from user profile
34func GetUserProfileFields(ctx context.Context, userID string, fields []string) {
35    key := fmt.Sprintf("user_profile:%s", userID)
36
37    // Get only requested fields
38    result, err := hc.client.rdb.HMGet(ctx, key, fields...).Result()
39    if err != nil {
40        return nil, fmt.Errorf("failed to get user profile fields: %w", err)
41    }
42
43    // Build result map
44    profile := make(map[string]string)
45    for i, field := range fields {
46        if result[i] != nil {
47            profile[field] = result[i].(string)
48        }
49    }
50
51    return profile, nil
52}
53
54// Update specific fields without affecting others
55func UpdateUserLogin(ctx context.Context, userID string) error {
56    key := fmt.Sprintf("user_profile:%s", userID)
57    now := time.Now().Unix()
58
59    // Update last login field only
60    err := hc.client.rdb.HSet(ctx, key, "last_login", now).Err()
61    if err != nil {
62        return fmt.Errorf("failed to update user login: %w", err)
63    }
64
65    // Increment login count atomically
66    _, err = hc.client.rdb.HIncrBy(ctx, key, "login_count", 1).Result()
67    return err
68}
69
70// Atomic field increment for counters
71func IncrementUserMetric(ctx context.Context, userID, metric string) {
72    key := fmt.Sprintf("user_metrics:%s", userID)
73
74    // Increment metric and set TTL if first time
75    script := redis.NewScript(`
76        local new_value = redis.call('HINCRBY', KEYS[1], ARGV[1], ARGV[2])
77        local exists = redis.call('HEXISTS', KEYS[1], 'created_at')
78        if exists == 0 then
79            redis.call('HSET', KEYS[1], 'created_at', ARGV[3])
80            redis.call('EXPIRE', KEYS[1], ARGV[4])
81        end
82        return new_value
83    `)
84
85    result, err := script.Run(ctx, hc.client.rdb,
86        []string{key},
87        metric,
88        1,
89        time.Now().Unix(),
90        int((24 * time.Hour).Seconds())).Result()
91
92    if err != nil {
93        return 0, fmt.Errorf("failed to increment user metric: %w", err)
94    }
95
96    return result.(int64), nil
97}

Sorted Sets: Power Rankings and Leaderboards

  1// Sorted set operations for leaderboards and scoring
  2type SortedSetCache struct {
  3    client *RedisClient
  4}
  5
  6func NewSortedSetCache(client *RedisClient) *SortedSetCache {
  7    return &SortedSetCache{client: client}
  8}
  9
 10// Game leaderboard implementation
 11func UpdatePlayerScore(ctx context.Context, gameID, playerID string, score float64) error {
 12    key := fmt.Sprintf("leaderboard:%s", gameID)
 13
 14    // Add player to sorted set with score
 15    _, err := ssc.client.rdb.ZAdd(ctx, key, redis.Z{
 16        Score:  score,
 17        Member: playerID,
 18    }).Result()
 19
 20    if err != nil {
 21        return fmt.Errorf("failed to update player score: %w", err)
 22    }
 23
 24    // Set TTL on leaderboard
 25    return ssc.client.rdb.Expire(ctx, key, 7*24*time.Hour).Err()
 26}
 27
 28// Get top N players with rankings
 29func GetTopPlayers(ctx context.Context, gameID string, n int64) {
 30    key := fmt.Sprintf("leaderboard:%s", gameID)
 31
 32    // Get top N players with scores
 33    results, err := ssc.client.rdb.ZRevRangeWithScores(ctx, key, 0, n-1).Result()
 34    if err != nil {
 35        return nil, fmt.Errorf("failed to get top players: %w", err)
 36    }
 37
 38    players := make([]PlayerRank, len(results))
 39    for i, result := range results {
 40        rank := i + 1
 41        playerID := result.Member.(string)
 42        score := result.Score
 43
 44        // Get player details from hash
 45        playerKey := fmt.Sprintf("player:%s", playerID)
 46        playerData, err := ssc.client.rdb.HGetAll(ctx, playerKey).Result()
 47        if err != nil {
 48            log.Printf("Failed to get player details for %s: %v", playerID, err)
 49            continue
 50        }
 51
 52        players[i] = PlayerRank{
 53            Rank:     rank,
 54            PlayerID: playerID,
 55            Score:    score,
 56            Name:     playerData["name"],
 57            Avatar:   playerData["avatar"],
 58        }
 59    }
 60
 61    return players, nil
 62}
 63
 64// Get player's rank and position
 65func GetPlayerRank(ctx context.Context, gameID, playerID string) {
 66    key := fmt.Sprintf("leaderboard:%s", gameID)
 67
 68    // Get player's rank
 69    rank, err := ssc.client.rdb.ZRevRank(ctx, key, playerID).Result()
 70    if err != nil {
 71        if err == redis.Nil {
 72            return nil, nil // Player not in leaderboard
 73        }
 74        return nil, fmt.Errorf("failed to get player rank: %w", err)
 75    }
 76
 77    // Get player's score
 78    score, err := ssc.client.rdb.ZScore(ctx, key, playerID).Result()
 79    if err != nil {
 80        return nil, fmt.Errorf("failed to get player score: %w", err)
 81    }
 82
 83    // Get player details
 84    playerKey := fmt.Sprintf("player:%s", playerID)
 85    playerData, err := ssc.client.rdb.HGetAll(ctx, playerKey).Result()
 86    if err != nil {
 87        log.Printf("Failed to get player details: %v", err)
 88    }
 89
 90    return &PlayerRank{
 91        Rank:     int(rank) + 1, // Convert to 1-based ranking
 92        PlayerID: playerID,
 93        Score:    score,
 94        Name:     playerData["name"],
 95        Avatar:   playerData["avatar"],
 96    }, nil
 97}
 98
 99// Page through leaderboard efficiently
100func GetLeaderboardPage(ctx context.Context, gameID string, page, pageSize int64) {
101    if page < 1 {
102        page = 1
103    }
104
105    key := fmt.Sprintf("leaderboard:%s", gameID)
106
107    // Calculate start and end indices
108    start := * pageSize
109    end := start + pageSize - 1
110
111    // Get players for this page
112    results, err := ssc.client.rdb.ZRevRangeWithScores(ctx, key, start, end).Result()
113    if err != nil {
114        return nil, fmt.Errorf("failed to get leaderboard page: %w", err)
115    }
116
117    players := make([]PlayerRank, len(results))
118    for i, result := range results {
119        rank := start + i + 1
120        playerID := result.Member.(string)
121        score := result.Score
122
123        players[i] = PlayerRank{
124            Rank:     int(rank),
125            PlayerID: playerID,
126            Score:    score,
127        }
128    }
129
130    return players, nil
131}

Lists: Queues and Time-Series Data

  1// List operations for queues and recent activity feeds
  2type ListCache struct {
  3    client *RedisClient
  4}
  5
  6func NewListCache(client *RedisClient) *ListCache {
  7    return &ListCache{client: client}
  8}
  9
 10// Recent activity feed implementation
 11func AddActivity(ctx context.Context, userID string, activity *Activity) error {
 12    // Serialize activity
 13    activityData, err := json.Marshal(activity)
 14    if err != nil {
 15        return fmt.Errorf("failed to serialize activity: %w", err)
 16    }
 17
 18    // Add to user's activity feed
 19    key := fmt.Sprintf("activity_feed:%s", userID)
 20
 21    err = lc.client.rdb.LPush(ctx, key, activityData).Err()
 22    if err != nil {
 23        return fmt.Errorf("failed to add activity: %w", err)
 24    }
 25
 26    // Trim list to keep only most recent 100 activities
 27    err = lc.client.rdb.LTrim(ctx, key, 0, 99).Err()
 28    if err != nil {
 29        return fmt.Errorf("failed to trim activity feed: %w", err)
 30    }
 31
 32    // Set TTL on the entire list
 33    return lc.client.rdb.Expire(ctx, key, 7*24*time.Hour).Err()
 34}
 35
 36// Get recent activities for user
 37func GetRecentActivities(ctx context.Context, userID string, limit int64) {
 38    key := fmt.Sprintf("activity_feed:%s", userID)
 39
 40    // Get recent activities
 41    start := -limit // Get last 'limit' items
 42    end := -1      // Until the end
 43
 44    activityData, err := lc.client.rdb.LRange(ctx, key, start, end).Result()
 45    if err != nil {
 46        return nil, fmt.Errorf("failed to get recent activities: %w", err)
 47    }
 48
 49    activities := make([]*Activity, 0, len(activityData))
 50
 51    // Reverse order to show most recent first
 52    for i := len(activityData) - 1; i >= 0; i-- {
 53        var activity Activity
 54        if err := json.Unmarshal([]byte(activityData[i]), &activity); err != nil {
 55            log.Printf("Failed to unmarshal activity %d: %v", i, err)
 56            continue
 57        }
 58        activities = append(activities, &activity)
 59    }
 60
 61    return activities, nil
 62}
 63
 64// Work queue implementation with blocking operations
 65func EnqueueTask(ctx context.Context, queueName string, task *Task) error {
 66    // Serialize task
 67    taskData, err := json.Marshal(task)
 68    if err != nil {
 69        return fmt.Errorf("failed to serialize task: %w", err)
 70    }
 71
 72    // Add to queue
 73    key := fmt.Sprintf("task_queue:%s", queueName)
 74
 75    err = lc.client.rdb.RPush(ctx, key, taskData).Err()
 76    if err != nil {
 77        return fmt.Errorf("failed to enqueue task: %w", err)
 78    }
 79
 80    return nil
 81}
 82
 83// Blocking dequeue for worker processes
 84func DequeueTask(ctx context.Context, queueName string, timeout time.Duration) {
 85    key := fmt.Sprintf("task_queue:%s", queueName)
 86
 87    // Blocking pop from front of list
 88    result, err := lc.client.rdb.BLPop(ctx, timeout, key).Result()
 89    if err != nil {
 90        if err == redis.Nil {
 91            return nil, nil // Timeout, no task available
 92        }
 93        return nil, fmt.Errorf("failed to dequeue task: %w", err)
 94    }
 95
 96    // BLOPOP returns [key, value]
 97    if len(result) != 2 {
 98        return nil, fmt.Errorf("unexpected BLOPOP result format")
 99    }
100
101    taskData := result[1]
102    var task Task
103    if err := json.Unmarshal([]byte(taskData), &task); err != nil {
104        return nil, fmt.Errorf("failed to unmarshal task: %w", err)
105    }
106
107    return &task, nil
108}

Common Patterns and Pitfalls - Learning from Production Experience

Pattern 1: Cache-Aside

This is the most common and versatile caching pattern. Think of it like looking up a word in the dictionary - you check your personal notes first, and if it's not there, you look it up in the big dictionary and write it down for next time.

 1type CacheAsideCache struct {
 2    redis  *RedisClient
 3    database Database
 4    ttl    time.Duration
 5}
 6
 7func NewCacheAsideCache(redis *RedisClient, db Database, ttl time.Duration) *CacheAsideCache {
 8    return &CacheAsideCache{
 9        redis:  redis,
10        database: db,
11        ttl:    ttl,
12    }
13}
14
15// Get implements cache-aside pattern with single flight protection
16func Get(ctx context.Context, key string) {
17    // 1. Try cache first
18    cached, err := cac.redis.Get(ctx, key)
19    if err == nil && cached != "" {
20        // Cache hit - return cached data
21        var data interface{}
22        if err := json.Unmarshal([]byte(cached), &data); err != nil {
23            log.Printf("Failed to unmarshal cached data for key %s: %v", key, err)
24        } else {
25            return data, nil
26        }
27    }
28
29    // 2. Cache miss - fetch from database
30    data, err := cac.database.Get(ctx, key)
31    if err != nil {
32        return nil, fmt.Errorf("database fetch failed: %w", err)
33    }
34
35    // 3. Populate cache for future requests
36    go func() {
37        // Fire-and-forget cache population
38        if serialized, err := json.Marshal(data); err == nil {
39            cac.redis.Set(context.Background(), key, serialized, cac.ttl)
40        }
41    }()
42
43    return data, nil
44}
45
46// Invalidate cache on data updates
47func Invalidate(ctx context.Context, key string) error {
48    return cac.redis.Delete(ctx, key)
49}
50
51// Update with cache invalidation
52func Set(ctx context.Context, key string, value interface{}) error {
53    // Update database first
54    if err := cac.database.Set(ctx, key, value); err != nil {
55        return fmt.Errorf("database update failed: %w", err)
56    }
57
58    // Invalidate cache
59    return cac.Invalidate(ctx, key)
60}

Pattern 2: Write-Through for Critical Data

Use this when you need guaranteed cache consistency and can tolerate the extra write latency.

 1type WriteThroughCache struct {
 2    redis  *RedisClient
 3    database Database
 4    ttl    time.Duration
 5}
 6
 7func NewWriteThroughCache(redis *RedisClient, db Database, ttl time.Duration) *WriteThroughCache {
 8    return &WriteThroughCache{
 9        redis:  redis,
10        database: db,
11        ttl:    ttl,
12    }
13}
14
15// Set writes to both database and cache synchronously
16func Set(ctx context.Context, key string, value interface{}) error {
17    // Serialize value for cache
18    serialized, err := json.Marshal(value)
19    if err != nil {
20        return fmt.Errorf("failed to serialize value: %w", err)
21    }
22
23    // Write to database first
24    if err := wtc.database.Set(ctx, key, value); err != nil {
25        return fmt.Errorf("database write failed: %w", err)
26    }
27
28    // Write to cache
29    if err := wtc.redis.Set(ctx, key, serialized, wtc.ttl); err != nil {
30        // Cache write failed, but database succeeded
31        // Log error but don't fail the operation
32        log.Printf("Cache write failed for key %s: %v", key, err)
33    }
34
35    return nil
36}
37
38// Get always reads from cache first, then database on miss
39func Get(ctx context.Context, key string) {
40    // Try cache first
41    cached, err := wtc.redis.Get(ctx, key)
42    if err == nil && cached != "" {
43        var value interface{}
44        if err := json.Unmarshal([]byte(cached), &value); err == nil {
45            return value, nil
46        }
47        log.Printf("Failed to unmarshal cached data for key %s: %v", key, err)
48    }
49
50    // Cache miss - fetch from database
51    value, err := wtc.database.Get(ctx, key)
52    if err != nil {
53        return nil, fmt.Errorf("database fetch failed: %w", err)
54    }
55
56    // Populate cache for future reads
57    go func() {
58        if serialized, err := json.Marshal(value); err == nil {
59            wtc.redis.Set(context.Background(), key, serialized, wtc.ttl)
60        }
61    }()
62
63    return value, nil
64}

Pattern 3: Write-Behind for High Throughput

Perfect for high-write scenarios where you can tolerate slight data staleness. Think of it like writing changes in a notebook throughout the day and updating the master ledger once at night.

  1type WriteBehindCache struct {
  2    redis      *RedisClient
  3    database   Database
  4    ttl        time.Duration
  5    batchSize  int
  6    flushInterval time.Duration
  7    buffer     chan WriteOperation
  8    stopCh     chan struct{}
  9    wg         sync.WaitGroup
 10}
 11
 12type WriteOperation struct {
 13    Key   string
 14    Value interface{}
 15    Op    string // "SET", "DELETE", "INCR"
 16}
 17
 18func NewWriteBehindCache(redis *RedisClient, db Database, ttl time.Duration, batchSize int, flushInterval time.Duration) *WriteBehindCache {
 19    wbc := &WriteBehindCache{
 20        redis:        redis,
 21        database:      db,
 22        ttl:          ttl,
 23        batchSize:     batchSize,
 24        flushInterval: flushInterval,
 25        buffer:       make(chan WriteOperation, 1000),
 26        stopCh:       make(chan struct{}),
 27    }
 28
 29    // Start background flusher
 30    wbc.wg.Add(1)
 31    go wbc.backgroundFlusher()
 32
 33    return wbc
 34}
 35
 36// Set writes to cache immediately, queues database write
 37func Set(ctx context.Context, key string, value interface{}) error {
 38    // Write to cache immediately
 39    serialized, err := json.Marshal(value)
 40    if err != nil {
 41        return fmt.Errorf("failed to serialize value: %w", err)
 42    }
 43
 44    if err := wbc.redis.Set(ctx, key, serialized, wbc.ttl); err != nil {
 45        return fmt.Errorf("cache write failed: %w", err)
 46    }
 47
 48    // Queue database write
 49    select {
 50    case wbc.buffer <- WriteOperation{Key: key, Value: value, Op: "SET"}:
 51        return nil
 52    case <-ctx.Done():
 53        return ctx.Err()
 54    default:
 55        // Buffer full - return error or handle overflow
 56        return fmt.Errorf("write buffer is full")
 57    }
 58}
 59
 60// backgroundFlusher processes queued writes
 61func backgroundFlusher() {
 62    defer wbc.wg.Done()
 63
 64    ticker := time.NewTicker(wbc.flushInterval)
 65    defer ticker.Stop()
 66
 67    batch := make([]WriteOperation, 0, wbc.batchSize)
 68
 69    for {
 70        select {
 71        case op := <-wbc.buffer:
 72            batch = append(batch, op)
 73            if len(batch) >= wbc.batchSize {
 74                wbc.flushBatch(batch)
 75                batch = batch[:0] // Reset slice
 76            }
 77
 78        case <-ticker.C:
 79            if len(batch) > 0 {
 80                wbc.flushBatch(batch)
 81                batch = batch[:0]
 82            }
 83
 84        case <-wbc.stopCh:
 85            // Flush remaining operations before exiting
 86            if len(batch) > 0 {
 87                wbc.flushBatch(batch)
 88            }
 89            return
 90        }
 91    }
 92}
 93
 94// flushBatch writes operations to database
 95func flushBatch(batch []WriteOperation) {
 96    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 97    defer cancel()
 98
 99    for _, op := range batch {
100        switch op.Op {
101        case "SET":
102            if err := wbc.database.Set(ctx, op.Key, op.Value); err != nil {
103                log.Printf("Failed to write %s to database: %v", op.Key, err)
104            }
105        case "DELETE":
106            if err := wbc.database.Delete(ctx, op.Key); err != nil {
107                log.Printf("Failed to delete %s from database: %v", op.Key, err)
108            }
109        case "INCR":
110            // Handle increment operations
111            if err := wbc.database.Increment(ctx, op.Key, op.Value.(int64)); err != nil {
112                log.Printf("Failed to increment %s in database: %v", op.Key, err)
113            }
114        }
115    }
116}
117
118func Close() error {
119    close(wbc.stopCh)
120    wbc.wg.Wait()
121    return nil
122}

Common Pitfalls and Solutions

Pitfall 1: Cache Stampede

When cache expires, multiple requests try to populate it simultaneously, overwhelming the backend system.

 1// Solution: Single flight pattern with lock-based protection
 2type StampedeProtection struct {
 3    cache  *RedisClient
 4    locks  map[string]*sync.Mutex
 5    mu     sync.RWMutex
 6}
 7
 8func GetWithProtection(ctx context.Context, key string, loader func(), ttl time.Duration) {
 9    // Try cache first
10    cached, err := sp.cache.Get(ctx, key)
11    if err == nil && cached != "" {
12        var data interface{}
13        if err := json.Unmarshal([]byte(cached), &data); err == nil {
14            return data, nil
15        }
16    }
17
18    // Get or create lock for this key
19    sp.mu.Lock()
20    lock, exists := sp.locks[key]
21    if !exists {
22        lock = &sync.Mutex{}
23        sp.locks[key] = lock
24    }
25    sp.mu.Unlock()
26
27    // Acquire lock
28    lock.Lock()
29    defer lock.Unlock()
30
31    // Double-check cache after acquiring lock
32    cached, err = sp.cache.Get(ctx, key)
33    if err == nil && cached != "" {
34        var data interface{}
35        if err := json.Unmarshal([]byte(cached), &data); err == nil {
36            return data, nil
37        }
38    }
39
40    // Load data
41    data, err := loader()
42    if err != nil {
43        return nil, err
44    }
45
46    // Populate cache
47    serialized, _ := json.Marshal(data)
48    go sp.cache.Set(context.Background(), key, serialized, ttl)
49
50    return data, nil
51}

Pitfall 2: Poor Cache Key Design

Inconsistent cache keys lead to duplicate data and cache misses.

 1// BAD: Inconsistent key patterns
 2userKey1 := "user:123"
 3userKey2 := "123:user"
 4userKey3 := "user_profile_123"
 5
 6// GOOD: Consistent, predictable key patterns
 7const (
 8    UserKeyPrefix    = "user:"
 9    UserProfilePrefix = "user_profile:"
10    UserSettingsPrefix = "user_settings:"
11)
12
13func UserKey(userID string) string {
14    return fmt.Sprintf("%s%s", UserKeyPrefix, userID)
15}
16
17func UserProfileKey(userID string) string {
18    return fmt.Sprintf("%s%s", UserProfilePrefix, userID)
19}
20
21// Use constants to avoid typos and ensure consistency

Pitfall 3: No TTL Management

Without proper TTL management, cache grows indefinitely and memory fills up.

 1// Solution: Automatic TTL based on data type and access patterns
 2type TTLManager struct {
 3    DefaultTTL    time.Duration
 4    UserTTL       time.Duration
 5    ConfigTTL     time.Duration
 6    SessionTTL     time.Duration
 7}
 8
 9func NewTTLManager() *TTLManager {
10    return &TTLManager{
11        DefaultTTL: 5 * time.Minute,
12        UserTTL:    30 * time.Minute,
13        ConfigTTL:   1 * time.Hour,
14        SessionTTL:  24 * time.Hour,
15    }
16}
17
18func GetTTL(dataType string) time.Duration {
19    switch dataType {
20    case "user":
21        return tm.UserTTL
22    case "config":
23        return tm.ConfigTTL
24    case "session":
25        return tm.SessionTTL
26    default:
27        return tm.DefaultTTL
28    }
29}

Integration and Mastery - Building Complete Caching Systems

Multi-Level Caching Architecture

Combine local and distributed caching for optimal performance:

 1type MultiLevelCache struct {
 2    localCache  *LocalCache   // In-process L1 cache
 3    redisCache  *RedisClient  // Distributed L2 cache
 4    database    Database      // L3 data source
 5    localTTL    time.Duration
 6    redisTTL    time.Duration
 7}
 8
 9func NewMultiLevelCache(redis *RedisClient, db Database) *MultiLevelCache {
10    return &MultiLevelCache{
11        localCache: NewLocalCache(1000), // 1000 items max
12        redisCache: redis,
13        database:   db,
14        localTTL:   5 * time.Minute,  // Local cache TTL
15        redisTTL:   30 * time.Minute, // Redis cache TTL
16    }
17}
18
19func Get(ctx context.Context, key string) {
20    // L1: Check local cache
21    if value, found := mlc.localCache.Get(key); found {
22        return value, nil
23    }
24
25    // L2: Check Redis cache
26    cached, err := mlc.redisCache.Get(ctx, key)
27    if err == nil && cached != "" {
28        var value interface{}
29        if err := json.Unmarshal([]byte(cached), &value); err == nil {
30            // Promote to local cache
31            mlc.localCache.Set(key, value, mlc.localTTL)
32            return value, nil
33        }
34    }
35
36    // L3: Fetch from database
37    value, err := mlc.database.Get(ctx, key)
38    if err != nil {
39        return nil, fmt.Errorf("database fetch failed: %w", err)
40    }
41
42    // Populate both cache levels
43    serialized, _ := json.Marshal(value)
44
45    // Update Redis
46    go mlc.redisCache.Set(context.Background(), key, serialized, mlc.redisTTL)
47
48    // Update local cache
49    mlc.localCache.Set(key, value, mlc.localTTL)
50
51    return value, nil
52}

Cache Warming and Population

 1type CacheWarmer struct {
 2    redis    *RedisClient
 3    database Database
 4    workers  int
 5}
 6
 7func NewCacheWarmer(redis *RedisClient, db Database, workers int) *CacheWarmer {
 8    return &CacheWarmer{
 9        redis:    redis,
10        database: db,
11        workers:  workers,
12    }
13}
14
15// Warm cache with frequently accessed data
16func WarmCache(ctx context.Context) error {
17    // Get list of frequently accessed keys
18    frequentKeys, err := cw.database.GetFrequentKeys(ctx, 1000)
19    if err != nil {
20        return fmt.Errorf("failed to get frequent keys: %w", err)
21    }
22
23    // Process keys in parallel
24    keyChan := make(chan string, len(frequentKeys))
25    for _, key := range frequentKeys {
26        keyChan <- key
27    }
28    close(keyChan)
29
30    var wg sync.WaitGroup
31    for i := 0; i < cw.workers; i++ {
32        wg.Add(1)
33        go cw.worker(ctx, keyChan, &wg)
34    }
35
36    wg.Wait()
37    return nil
38}
39
40func worker(ctx context.Context, keyChan <-chan string, wg *sync.WaitGroup) {
41    defer wg.Done()
42
43    for key := range keyChan {
44        select {
45        case <-ctx.Done():
46            return
47        default:
48            // Load data from database
49            data, err := cw.database.Get(ctx, key)
50            if err != nil {
51                log.Printf("Failed to load data for key %s: %v", key, err)
52                continue
53            }
54
55            // Cache the data
56            serialized, err := json.Marshal(data)
57            if err != nil {
58                log.Printf("Failed to serialize data for key %s: %v", key, err)
59                continue
60            }
61
62            // Set appropriate TTL based on data type
63            ttl := cw.getTTLForData(key)
64            if err := cw.redis.Set(ctx, key, serialized, ttl); err != nil {
65                log.Printf("Failed to cache data for key %s: %v", key, err)
66            }
67        }
68    }
69}

Practice Exercises

Exercise 1: Implement Exactly-Once Cache Pattern

Build a cache that prevents duplicate cache population and handles concurrent access safely.

Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "sync"
  6    "time"
  7)
  8
  9type ExactlyOnceCache struct {
 10    cache   map[string]*cacheEntry
 11    locks   map[string]*sync.Mutex
 12    mu      sync.RWMutex
 13    defaultTTL time.Duration
 14}
 15
 16type cacheEntry struct {
 17    value      interface{}
 18    expiresAt  time.Time
 19    loading    bool
 20    cond       *sync.Cond
 21}
 22
 23func NewExactlyOnceCache(defaultTTL time.Duration) *ExactlyOnceCache {
 24    return &ExactlyOnceCache{
 25        cache:      make(map[string]*cacheEntry),
 26        locks:      make(map[string]*sync.Mutex),
 27        defaultTTL: defaultTTL,
 28    }
 29}
 30
 31func GetOrLoad(ctx context.Context, key string, loader func()) {
 32    eoc.mu.RLock()
 33    entry, exists := eoc.cache[key]
 34    eoc.mu.RUnlock()
 35
 36    // Fast path: entry exists and not expired
 37    if exists && !entry.expiresAt.Before(time.Now()) {
 38        return entry.value, nil
 39    }
 40
 41    // Get or create lock for this key
 42    eoc.mu.Lock()
 43    lock, lockExists := eoc.locks[key]
 44    if !lockExists {
 45        lock = &sync.Mutex{}
 46        eoc.locks[key] = lock
 47    }
 48    eoc.mu.Unlock()
 49
 50    // Acquire lock
 51    lock.Lock()
 52    defer lock.Unlock()
 53
 54    // Double-check after acquiring lock
 55    eoc.mu.RLock()
 56    entry, exists = eoc.cache[key]
 57    eoc.mu.RUnlock()
 58
 59    if exists && !entry.expiresAt.Before(time.Now()) {
 60        return entry.value, nil
 61    }
 62
 63    // Create or update entry for loading
 64    if !exists {
 65        entry = &cacheEntry{
 66            expiresAt: time.Now().Add(eoc.defaultTTL),
 67            cond:       sync.NewCond(&sync.Mutex{}),
 68        }
 69        eoc.mu.Lock()
 70        eoc.cache[key] = entry
 71        eoc.mu.Unlock()
 72    }
 73
 74    // Handle concurrent loading
 75    if entry.loading {
 76        // Wait for other goroutine to finish loading
 77        entry.cond.L.Lock()
 78        for entry.loading {
 79            entry.cond.Wait()
 80        }
 81        entry.cond.L.Unlock()
 82
 83        // Check if loading succeeded
 84        if !entry.expiresAt.Before(time.Now()) {
 85            return entry.value, nil
 86        }
 87        return nil, fmt.Errorf("cache loading failed")
 88    }
 89
 90    // This goroutine will load the value
 91    entry.loading = true
 92    entry.cond.L.Lock()
 93
 94    // Load value
 95    value, err := loader()
 96
 97    // Update entry
 98    eoc.mu.Lock()
 99    if err != nil {
100        // Loading failed - remove entry
101        delete(eoc.cache, key)
102        delete(eoc.locks, key)
103    } else {
104        entry.value = value
105        entry.expiresAt = time.Now().Add(eoc.defaultTTL)
106    }
107    eoc.mu.Unlock()
108
109    // Signal waiting goroutines
110    entry.loading = false
111    entry.cond.Broadcast()
112    entry.cond.L.Unlock()
113
114    if err != nil {
115        return nil, err
116    }
117
118    return value, nil
119}

Exercise 2: Build Intelligent Cache Eviction

Implement a cache with multiple eviction policies and automatic cleanup.

Show Solution
  1package exercise
  2
  3import (
  4    "container/heap"
  5    "sync"
  6    "time"
  7)
  8
  9type EvictionPolicy int
 10
 11const (
 12    EvictLRU EvictionPolicy = iota
 13    EvictLFU
 14    EvictTTL
 15)
 16
 17type IntelligentCache struct {
 18    items     map[string]*cacheItem
 19    lruQueue  *lruQueue
 20    lfuHeap   *lfuHeap
 21    maxSize   int
 22    policy    EvictionPolicy
 23    mu        sync.RWMutex
 24}
 25
 26type cacheItem struct {
 27    key       string
 28    value     interface{}
 29    accessAt  time.Time
 30    frequency int
 31    expiresAt time.Time
 32    lruIndex  int
 33}
 34
 35type lruQueue struct {
 36    keys []string
 37    index map[string]int
 38}
 39
 40type lfuItem struct {
 41    key       string
 42    frequency int
 43    index     int
 44}
 45
 46type lfuHeap []*lfuItem
 47
 48func NewIntelligentCache(maxSize int, policy EvictionPolicy) *IntelligentCache {
 49    cache := &IntelligentCache{
 50        items:    make(map[string]*cacheItem),
 51        lruQueue: &lruQueue{index: make(map[string]int)},
 52        lfuHeap:  make(lfuHeap, 0),
 53        maxSize:  maxSize,
 54        policy:   policy,
 55    }
 56    heap.Init(&cache.lfuHeap)
 57    return cache
 58}
 59
 60func Set(key string, value interface{}, ttl time.Duration) {
 61    ic.mu.Lock()
 62    defer ic.mu.Unlock()
 63
 64    now := time.Now()
 65    expiresAt := now.Add(ttl)
 66
 67    if item, exists := ic.items[key]; exists {
 68        // Update existing item
 69        item.value = value
 70        item.accessAt = now
 71        item.frequency++
 72        item.expiresAt = expiresAt
 73
 74        ic.updateStructures(item, key)
 75    } else {
 76        // Add new item
 77        item := &cacheItem{
 78            key:       key,
 79            value:     value,
 80            accessAt:  now,
 81            frequency: 1,
 82            expiresAt: expiresAt,
 83        }
 84
 85        ic.items[key] = item
 86
 87        // Check if eviction needed
 88        if len(ic.items) > ic.maxSize {
 89            ic.evict()
 90        }
 91
 92        ic.addToStructures(item)
 93    }
 94}
 95
 96func Get(key string) {
 97    ic.mu.Lock()
 98    defer ic.mu.Unlock()
 99
100    item, exists := ic.items[key]
101    if !exists {
102        return nil, false
103    }
104
105    // Check expiration
106    if time.Now().After(item.expiresAt) {
107        ic.removeItem(key)
108        return nil, false
109    }
110
111    // Update access information
112    now := time.Now()
113    item.accessAt = now
114    item.frequency++
115
116    ic.updateStructures(item, key)
117
118    return item.value, true
119}
120
121func evict() {
122    switch ic.policy {
123    case EvictLRU:
124        ic.evictLRU()
125    case EvictLFU:
126        ic.evictLFU()
127    case EvictTTL:
128        ic.evictTTL()
129    }
130}
131
132func evictLRU() {
133    if len(ic.lruQueue.keys) == 0 {
134        return
135    }
136
137    // Remove least recently used item
138    key := ic.lruQueue.keys[0]
139    ic.removeItem(key)
140}
141
142func evictLFU() {
143    if len(ic.lfuHeap) == 0 {
144        return
145    }
146
147    // Remove least frequently used item
148    item := heap.Pop(&ic.lfuHeap).(*lfuItem)
149    ic.removeItem(item.key)
150}
151
152func evictTTL() {
153    now := time.Now()
154    oldestKey := ""
155    oldestTime := now
156
157    for key, item := range ic.items {
158        if item.expiresAt.Before(oldestTime) {
159            oldestTime = item.expiresAt
160            oldestKey = key
161        }
162    }
163
164    if oldestKey != "" {
165        ic.removeItem(oldestKey)
166    }
167}

Exercise 3: Implement Distributed Lock Manager

Build a distributed lock system using Redis with proper lock renewal and deadlock prevention.

Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "crypto/rand"
  6    "encoding/hex"
  7    "fmt"
  8    "time"
  9)
 10
 11type DistributedLock struct {
 12    redis    *RedisClient
 13    key      string
 14    value    string
 15    ttl      time.Duration
 16    renewCh  chan struct{}
 17    stopCh   chan struct{}
 18}
 19
 20func NewDistributedLock(redis *RedisClient, key string, ttl time.Duration) *DistributedLock {
 21    // Generate unique lock value
 22    valueBytes := make([]byte, 16)
 23    rand.Read(valueBytes)
 24    value := hex.EncodeToString(valueBytes)
 25
 26    return &DistributedLock{
 27        redis:   redis,
 28        key:     key,
 29        value:   value,
 30        ttl:     ttl,
 31        renewCh: make(chan struct{}),
 32        stopCh:  make(chan struct{}),
 33    }
 34}
 35
 36func Acquire(ctx context.Context) {
 37    // Try to acquire lock with SET NX EX
 38    success, err := dl.redis.rdb.SetNX(ctx, dl.key, dl.value, dl.ttl).Result()
 39    if err != nil {
 40        return false, fmt.Errorf("failed to acquire lock: %w", err)
 41    }
 42
 43    if !success {
 44        return false, nil // Lock not available
 45    }
 46
 47    // Start renewal goroutine
 48    go dl.renewLock()
 49
 50    return true, nil
 51}
 52
 53func renewLock() {
 54    ticker := time.NewTicker(dl.ttl / 3) // Renew at 1/3 of TTL
 55    defer ticker.Stop()
 56
 57    for {
 58        select {
 59        case <-ticker.C:
 60            // Renew lock using Lua script
 61            script := redis.NewScript(`
 62                if redis.call('GET', KEYS[1]) == ARGV[1] then
 63                    return redis.call('EXPIRE', KEYS[1], ARGV[2])
 64                else
 65                    return 0
 66                end
 67            `)
 68
 69            result, err := script.Run(context.Background(), dl.redis.rdb,
 70                []string{dl.key}, dl.value, int(dl.ttl.Seconds())).Result()
 71
 72            if err != nil || result.(int64) == 0 {
 73                // Lock lost or renewal failed
 74                close(dl.stopCh)
 75                return
 76            }
 77
 78        case <-dl.stopCh:
 79            return
 80        }
 81    }
 82}
 83
 84func Release(ctx context.Context) error {
 85    close(dl.stopCh)
 86
 87    // Release lock using Lua script to ensure atomicity
 88    script := redis.NewScript(`
 89        if redis.call('GET', KEYS[1]) == ARGV[1] then
 90            return redis.call('DEL', KEYS[1])
 91        else
 92            return 0
 93        end
 94    `)
 95
 96    result, err := script.Run(ctx, dl.redis.rdb, []string{dl.key}, dl.value).Result()
 97    if err != nil {
 98        return fmt.Errorf("failed to release lock: %w", err)
 99    }
100
101    if result.(int64) == 0 {
102        return fmt.Errorf("lock not owned by this instance")
103    }
104
105    return nil
106}

Further Reading

Exercise 4: Implement Redis Cluster with Consistent Hashing

Build a Redis cluster client that uses consistent hashing for automatic failover and load balancing across multiple Redis nodes.

Requirements:

  1. Consistent hashing implementation for key distribution
  2. Automatic failover when nodes fail
  3. Health checking and node discovery
  4. Support for adding/removing nodes dynamically
  5. Connection pooling for each node
Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "crypto/md5"
  6    "encoding/binary"
  7    "fmt"
  8    "log"
  9    "sort"
 10    "sync"
 11    "time"
 12
 13    "github.com/redis/go-redis/v9"
 14)
 15
 16// ConsistentHash implements consistent hashing for distributing keys
 17type ConsistentHash struct {
 18    circle       map[uint32]string
 19    sortedHashes []uint32
 20    nodes        map[string]*redis.Client
 21    virtualNodes int
 22    mu           sync.RWMutex
 23}
 24
 25// RedisClusterClient manages connections to multiple Redis nodes
 26type RedisClusterClient struct {
 27    hash         *ConsistentHash
 28    healthChecker *HealthChecker
 29    config       *ClusterConfig
 30}
 31
 32type ClusterConfig struct {
 33    Nodes           []NodeConfig
 34    VirtualNodes    int
 35    HealthCheckInterval time.Duration
 36    MaxRetries      int
 37    DialTimeout     time.Duration
 38    ReadTimeout     time.Duration
 39    WriteTimeout    time.Duration
 40}
 41
 42type NodeConfig struct {
 43    Addr     string
 44    Password string
 45    DB       int
 46}
 47
 48func NewConsistentHash(virtualNodes int) *ConsistentHash {
 49    return &ConsistentHash{
 50        circle:       make(map[uint32]string),
 51        sortedHashes: make([]uint32, 0),
 52        nodes:        make(map[string]*redis.Client),
 53        virtualNodes: virtualNodes,
 54    }
 55}
 56
 57// AddNode adds a Redis node to the consistent hash ring
 58func (ch *ConsistentHash) AddNode(addr string, client *redis.Client) {
 59    ch.mu.Lock()
 60    defer ch.mu.Unlock()
 61
 62    // Add virtual nodes to the ring
 63    for i := 0; i < ch.virtualNodes; i++ {
 64        hash := ch.hashKey(fmt.Sprintf("%s:%d", addr, i))
 65        ch.circle[hash] = addr
 66        ch.sortedHashes = append(ch.sortedHashes, hash)
 67    }
 68
 69    // Sort hashes for binary search
 70    sort.Slice(ch.sortedHashes, func(i, j int) bool {
 71        return ch.sortedHashes[i] < ch.sortedHashes[j]
 72    })
 73
 74    ch.nodes[addr] = client
 75    log.Printf("Added node %s to consistent hash ring with %d virtual nodes", addr, ch.virtualNodes)
 76}
 77
 78// RemoveNode removes a Redis node from the consistent hash ring
 79func (ch *ConsistentHash) RemoveNode(addr string) {
 80    ch.mu.Lock()
 81    defer ch.mu.Unlock()
 82
 83    // Remove virtual nodes from the ring
 84    for i := 0; i < ch.virtualNodes; i++ {
 85        hash := ch.hashKey(fmt.Sprintf("%s:%d", addr, i))
 86        delete(ch.circle, hash)
 87    }
 88
 89    // Rebuild sorted hashes
 90    ch.sortedHashes = make([]uint32, 0, len(ch.circle))
 91    for hash := range ch.circle {
 92        ch.sortedHashes = append(ch.sortedHashes, hash)
 93    }
 94    sort.Slice(ch.sortedHashes, func(i, j int) bool {
 95        return ch.sortedHashes[i] < ch.sortedHashes[j]
 96    })
 97
 98    // Close client connection
 99    if client, exists := ch.nodes[addr]; exists {
100        client.Close()
101        delete(ch.nodes, addr)
102    }
103
104    log.Printf("Removed node %s from consistent hash ring", addr)
105}
106
107// GetNode returns the Redis client for a given key
108func (ch *ConsistentHash) GetNode(key string) (*redis.Client, error) {
109    ch.mu.RLock()
110    defer ch.mu.RUnlock()
111
112    if len(ch.circle) == 0 {
113        return nil, fmt.Errorf("no nodes available")
114    }
115
116    hash := ch.hashKey(key)
117
118    // Binary search to find the node
119    idx := sort.Search(len(ch.sortedHashes), func(i int) bool {
120        return ch.sortedHashes[i] >= hash
121    })
122
123    // Wrap around if necessary
124    if idx >= len(ch.sortedHashes) {
125        idx = 0
126    }
127
128    nodeAddr := ch.circle[ch.sortedHashes[idx]]
129    client, exists := ch.nodes[nodeAddr]
130    if !exists {
131        return nil, fmt.Errorf("node %s not found", nodeAddr)
132    }
133
134    return client, nil
135}
136
137// hashKey generates a hash for a key using MD5
138func (ch *ConsistentHash) hashKey(key string) uint32 {
139    hasher := md5.New()
140    hasher.Write([]byte(key))
141    hashBytes := hasher.Sum(nil)
142    return binary.BigEndian.Uint32(hashBytes)
143}
144
145// GetAllNodes returns all active Redis clients
146func (ch *ConsistentHash) GetAllNodes() []*redis.Client {
147    ch.mu.RLock()
148    defer ch.mu.RUnlock()
149
150    clients := make([]*redis.Client, 0, len(ch.nodes))
151    for _, client := range ch.nodes {
152        clients = append(clients, client)
153    }
154    return clients
155}
156
157// HealthChecker monitors node health
158type HealthChecker struct {
159    cluster  *RedisClusterClient
160    interval time.Duration
161    stopCh   chan struct{}
162}
163
164func NewHealthChecker(cluster *RedisClusterClient, interval time.Duration) *HealthChecker {
165    return &HealthChecker{
166        cluster:  cluster,
167        interval: interval,
168        stopCh:   make(chan struct{}),
169    }
170}
171
172// Start begins health checking
173func (hc *HealthChecker) Start() {
174    go hc.healthCheckLoop()
175}
176
177// Stop stops health checking
178func (hc *HealthChecker) Stop() {
179    close(hc.stopCh)
180}
181
182func (hc *HealthChecker) healthCheckLoop() {
183    ticker := time.NewTicker(hc.interval)
184    defer ticker.Stop()
185
186    for {
187        select {
188        case <-hc.stopCh:
189            return
190        case <-ticker.C:
191            hc.checkAllNodes()
192        }
193    }
194}
195
196func (hc *HealthChecker) checkAllNodes() {
197    clients := hc.cluster.hash.GetAllNodes()
198
199    for _, client := range clients {
200        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
201        err := client.Ping(ctx).Err()
202        cancel()
203
204        if err != nil {
205            log.Printf("Health check failed for node: %v", err)
206            // Remove unhealthy node
207            // hc.cluster.hash.RemoveNode(addr)
208        }
209    }
210}
211
212func NewRedisClusterClient(config *ClusterConfig) (*RedisClusterClient, error) {
213    hash := NewConsistentHash(config.VirtualNodes)
214
215    // Create clients for all nodes
216    for _, nodeConfig := range config.Nodes {
217        client := redis.NewClient(&redis.Options{
218            Addr:         nodeConfig.Addr,
219            Password:     nodeConfig.Password,
220            DB:           nodeConfig.DB,
221            DialTimeout:  config.DialTimeout,
222            ReadTimeout:  config.ReadTimeout,
223            WriteTimeout: config.WriteTimeout,
224            MaxRetries:   config.MaxRetries,
225            PoolSize:     10,
226            MinIdleConns: 5,
227        })
228
229        // Test connection
230        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
231        err := client.Ping(ctx).Err()
232        cancel()
233
234        if err != nil {
235            log.Printf("Failed to connect to node %s: %v", nodeConfig.Addr, err)
236            continue
237        }
238
239        hash.AddNode(nodeConfig.Addr, client)
240    }
241
242    if len(hash.nodes) == 0 {
243        return nil, fmt.Errorf("no Redis nodes available")
244    }
245
246    cluster := &RedisClusterClient{
247        hash:   hash,
248        config: config,
249    }
250
251    // Start health checker
252    cluster.healthChecker = NewHealthChecker(cluster, config.HealthCheckInterval)
253    cluster.healthChecker.Start()
254
255    log.Printf("Redis cluster client initialized with %d nodes", len(hash.nodes))
256    return cluster, nil
257}
258
259// Set sets a key-value pair in the cluster
260func (rcc *RedisClusterClient) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
261    client, err := rcc.hash.GetNode(key)
262    if err != nil {
263        return fmt.Errorf("failed to get node for key %s: %w", key, err)
264    }
265
266    return client.Set(ctx, key, value, ttl).Err()
267}
268
269// Get retrieves a value from the cluster
270func (rcc *RedisClusterClient) Get(ctx context.Context, key string) (string, error) {
271    client, err := rcc.hash.GetNode(key)
272    if err != nil {
273        return "", fmt.Errorf("failed to get node for key %s: %w", key, err)
274    }
275
276    val, err := client.Get(ctx, key).Result()
277    if err == redis.Nil {
278        return "", nil
279    }
280    return val, err
281}
282
283// Delete removes a key from the cluster
284func (rcc *RedisClusterClient) Delete(ctx context.Context, key string) error {
285    client, err := rcc.hash.GetNode(key)
286    if err != nil {
287        return fmt.Errorf("failed to get node for key %s: %w", key, err)
288    }
289
290    return client.Del(ctx, key).Err()
291}
292
293// MGet retrieves multiple keys from the cluster
294func (rcc *RedisClusterClient) MGet(ctx context.Context, keys []string) (map[string]string, error) {
295    // Group keys by node
296    nodeKeys := make(map[*redis.Client][]string)
297    keyToNode := make(map[string]*redis.Client)
298
299    for _, key := range keys {
300        client, err := rcc.hash.GetNode(key)
301        if err != nil {
302            log.Printf("Failed to get node for key %s: %v", key, err)
303            continue
304        }
305
306        nodeKeys[client] = append(nodeKeys[client], key)
307        keyToNode[key] = client
308    }
309
310    // Execute MGet for each node in parallel
311    var wg sync.WaitGroup
312    resultCh := make(chan map[string]string, len(nodeKeys))
313    errorCh := make(chan error, len(nodeKeys))
314
315    for client, nodeKeyList := range nodeKeys {
316        wg.Add(1)
317        go func(c *redis.Client, keys []string) {
318            defer wg.Done()
319
320            results, err := c.MGet(ctx, keys...).Result()
321            if err != nil {
322                errorCh <- err
323                return
324            }
325
326            resultMap := make(map[string]string)
327            for i, key := range keys {
328                if results[i] != nil {
329                    resultMap[key] = results[i].(string)
330                }
331            }
332            resultCh <- resultMap
333        }(client, nodeKeyList)
334    }
335
336    wg.Wait()
337    close(resultCh)
338    close(errorCh)
339
340    // Check for errors
341    if len(errorCh) > 0 {
342        return nil, <-errorCh
343    }
344
345    // Merge results
346    finalResults := make(map[string]string)
347    for result := range resultCh {
348        for k, v := range result {
349            finalResults[k] = v
350        }
351    }
352
353    return finalResults, nil
354}
355
356// Pipeline executes multiple commands in a pipeline
357func (rcc *RedisClusterClient) Pipeline(ctx context.Context, commands []PipelineCommand) error {
358    // Group commands by node
359    nodeCommands := make(map[*redis.Client][]PipelineCommand)
360
361    for _, cmd := range commands {
362        client, err := rcc.hash.GetNode(cmd.Key)
363        if err != nil {
364            log.Printf("Failed to get node for key %s: %v", cmd.Key, err)
365            continue
366        }
367
368        nodeCommands[client] = append(nodeCommands[client], cmd)
369    }
370
371    // Execute pipelines for each node
372    var wg sync.WaitGroup
373    errorCh := make(chan error, len(nodeCommands))
374
375    for client, cmds := range nodeCommands {
376        wg.Add(1)
377        go func(c *redis.Client, commands []PipelineCommand) {
378            defer wg.Done()
379
380            pipe := c.Pipeline()
381            for _, cmd := range commands {
382                switch cmd.Op {
383                case "SET":
384                    pipe.Set(ctx, cmd.Key, cmd.Value, cmd.TTL)
385                case "GET":
386                    pipe.Get(ctx, cmd.Key)
387                case "DEL":
388                    pipe.Del(ctx, cmd.Key)
389                }
390            }
391
392            _, err := pipe.Exec(ctx)
393            if err != nil {
394                errorCh <- err
395            }
396        }(client, cmds)
397    }
398
399    wg.Wait()
400    close(errorCh)
401
402    if len(errorCh) > 0 {
403        return <-errorCh
404    }
405
406    return nil
407}
408
409type PipelineCommand struct {
410    Op    string
411    Key   string
412    Value interface{}
413    TTL   time.Duration
414}
415
416// Close closes all Redis connections
417func (rcc *RedisClusterClient) Close() error {
418    rcc.healthChecker.Stop()
419
420    clients := rcc.hash.GetAllNodes()
421    for _, client := range clients {
422        if err := client.Close(); err != nil {
423            log.Printf("Error closing Redis client: %v", err)
424        }
425    }
426
427    return nil
428}
429
430// Example usage
431func ExampleRedisCluster() {
432    config := &ClusterConfig{
433        Nodes: []NodeConfig{
434            {Addr: "localhost:6379", Password: "", DB: 0},
435            {Addr: "localhost:6380", Password: "", DB: 0},
436            {Addr: "localhost:6381", Password: "", DB: 0},
437        },
438        VirtualNodes:        150,
439        HealthCheckInterval: 10 * time.Second,
440        MaxRetries:          3,
441        DialTimeout:         5 * time.Second,
442        ReadTimeout:         3 * time.Second,
443        WriteTimeout:        3 * time.Second,
444    }
445
446    cluster, err := NewRedisClusterClient(config)
447    if err != nil {
448        log.Fatal(err)
449    }
450    defer cluster.Close()
451
452    ctx := context.Background()
453
454    // Set values
455    for i := 0; i < 100; i++ {
456        key := fmt.Sprintf("key:%d", i)
457        value := fmt.Sprintf("value:%d", i)
458        if err := cluster.Set(ctx, key, value, 1*time.Hour); err != nil {
459            log.Printf("Failed to set %s: %v", key, err)
460        }
461    }
462
463    // Get values
464    val, err := cluster.Get(ctx, "key:42")
465    if err != nil {
466        log.Printf("Failed to get key:42: %v", err)
467    } else {
468        log.Printf("key:42 = %s", val)
469    }
470
471    // MGet example
472    keys := []string{"key:1", "key:2", "key:3", "key:4", "key:5"}
473    results, err := cluster.MGet(ctx, keys)
474    if err != nil {
475        log.Printf("MGet failed: %v", err)
476    } else {
477        log.Printf("MGet results: %+v", results)
478    }
479}

Exercise 5: Build Cache-Aside Pattern with Cache Warming and Probabilistic Early Expiration

Implement an advanced cache-aside pattern that includes cache warming, probabilistic early expiration to prevent thundering herd, and automatic stale-while-revalidate.

Requirements:

  1. Cache-aside pattern with automatic population
  2. Cache warming on startup
  3. Probabilistic early expiration (XFetch algorithm)
  4. Stale-while-revalidate for better availability
  5. Background refresh for hot keys
  6. Metrics and monitoring
Show Solution
  1package exercise
  2
  3import (
  4    "context"
  5    "encoding/json"
  6    "fmt"
  7    "log"
  8    "math"
  9    "math/rand"
 10    "sync"
 11    "time"
 12
 13    "github.com/redis/go-redis/v9"
 14)
 15
 16// AdvancedCache implements cache-aside with probabilistic early expiration
 17type AdvancedCache struct {
 18    redis          *redis.Client
 19    database       Database
 20    metrics        *CacheMetrics
 21    refreshQueue   chan string
 22    hotKeys        *HotKeyTracker
 23    config         *AdvancedCacheConfig
 24    stopCh         chan struct{}
 25    wg             sync.WaitGroup
 26}
 27
 28type AdvancedCacheConfig struct {
 29    DefaultTTL          time.Duration
 30    StaleWhileRevalidate time.Duration
 31    Beta                float64  // XFetch beta parameter (typically 1.0)
 32    HotKeyThreshold     int      // Access count threshold for hot keys
 33    RefreshWorkers      int      // Number of background refresh workers
 34    WarmupKeys          []string // Keys to warm up on startup
 35}
 36
 37type Database interface {
 38    Get(ctx context.Context, key string) (interface{}, error)
 39    Set(ctx context.Context, key string, value interface{}) error
 40}
 41
 42type CacheMetrics struct {
 43    hits       int64
 44    misses     int64
 45    refreshes  int64
 46    errors     int64
 47    mu         sync.RWMutex
 48}
 49
 50type HotKeyTracker struct {
 51    accessCounts map[string]*AccessInfo
 52    threshold    int
 53    mu           sync.RWMutex
 54}
 55
 56type AccessInfo struct {
 57    Count      int
 58    LastAccess time.Time
 59}
 60
 61type CacheEntry struct {
 62    Value      interface{}
 63    CachedAt   time.Time
 64    ExpiresAt  time.Time
 65    IsStale    bool
 66}
 67
 68func NewAdvancedCache(rdb *redis.Client, db Database, config *AdvancedCacheConfig) *AdvancedCache {
 69    cache := &AdvancedCache{
 70        redis:        rdb,
 71        database:     db,
 72        metrics:      &CacheMetrics{},
 73        refreshQueue: make(chan string, 1000),
 74        hotKeys: &HotKeyTracker{
 75            accessCounts: make(map[string]*AccessInfo),
 76            threshold:    config.HotKeyThreshold,
 77        },
 78        config: config,
 79        stopCh: make(chan struct{}),
 80    }
 81
 82    // Start background refresh workers
 83    for i := 0; i < config.RefreshWorkers; i++ {
 84        cache.wg.Add(1)
 85        go cache.refreshWorker()
 86    }
 87
 88    // Start hot key monitor
 89    cache.wg.Add(1)
 90    go cache.hotKeyMonitor()
 91
 92    // Perform cache warming
 93    if len(config.WarmupKeys) > 0 {
 94        cache.warmCache()
 95    }
 96
 97    return cache
 98}
 99
100// Get retrieves a value with probabilistic early expiration
101func (ac *AdvancedCache) Get(ctx context.Context, key string) (interface{}, error) {
102    // Track hot keys
103    ac.hotKeys.RecordAccess(key)
104
105    // Try to get from cache with metadata
106    entry, err := ac.getCacheEntry(ctx, key)
107    if err != nil && err != redis.Nil {
108        ac.metrics.RecordError()
109        log.Printf("Redis error for key %s: %v", key, err)
110    }
111
112    // Cache hit - check if we should probabilistically refresh
113    if entry != nil && !entry.IsStale {
114        ac.metrics.RecordHit()
115
116        // Implement XFetch probabilistic early expiration
117        if ac.shouldRefreshEarly(entry) {
118            // Trigger background refresh
119            select {
120            case ac.refreshQueue <- key:
121                ac.metrics.RecordRefresh()
122            default:
123                // Queue full, skip refresh
124            }
125        }
126
127        return entry.Value, nil
128    }
129
130    // Cache miss or stale - decide whether to serve stale data
131    if entry != nil && entry.IsStale {
132        // Serve stale data while revalidating in background
133        select {
134        case ac.refreshQueue <- key:
135            ac.metrics.RecordRefresh()
136        default:
137        }
138
139        log.Printf("Serving stale data for key %s", key)
140        return entry.Value, nil
141    }
142
143    // Cache miss - fetch from database
144    ac.metrics.RecordMiss()
145    return ac.fetchAndCache(ctx, key)
146}
147
148// getCacheEntry retrieves cache entry with metadata
149func (ac *AdvancedCache) getCacheEntry(ctx context.Context, key string) (*CacheEntry, error) {
150    // Get cache data with metadata
151    data, err := ac.redis.HGetAll(ctx, ac.metaKey(key)).Result()
152    if err != nil || len(data) == 0 {
153        return nil, redis.Nil
154    }
155
156    var entry CacheEntry
157
158    // Parse cached value
159    if valueStr, exists := data["value"]; exists {
160        if err := json.Unmarshal([]byte(valueStr), &entry.Value); err != nil {
161            return nil, fmt.Errorf("failed to unmarshal value: %w", err)
162        }
163    }
164
165    // Parse timestamps
166    if cachedAtStr, exists := data["cached_at"]; exists {
167        var cachedAtUnix int64
168        fmt.Sscanf(cachedAtStr, "%d", &cachedAtUnix)
169        entry.CachedAt = time.Unix(cachedAtUnix, 0)
170    }
171
172    if expiresAtStr, exists := data["expires_at"]; exists {
173        var expiresAtUnix int64
174        fmt.Sscanf(expiresAtStr, "%d", &expiresAtUnix)
175        entry.ExpiresAt = time.Unix(expiresAtUnix, 0)
176    }
177
178    // Check if stale
179    now := time.Now()
180    if now.After(entry.ExpiresAt) {
181        // Check if within stale-while-revalidate window
182        if now.Before(entry.ExpiresAt.Add(ac.config.StaleWhileRevalidate)) {
183            entry.IsStale = true
184            return &entry, nil
185        }
186        // Too old, treat as miss
187        return nil, redis.Nil
188    }
189
190    return &entry, nil
191}
192
193// shouldRefreshEarly implements XFetch probabilistic early expiration
194func (ac *AdvancedCache) shouldRefreshEarly(entry *CacheEntry) bool {
195    now := time.Now()
196    delta := entry.ExpiresAt.Sub(now).Seconds()
197
198    if delta <= 0 {
199        return true // Already expired
200    }
201
202    // Calculate time since cached
203    age := now.Sub(entry.CachedAt).Seconds()
204
205    // XFetch formula: probability = beta * age / ttl
206    // This gives higher probability as entry gets older
207    ttl := entry.ExpiresAt.Sub(entry.CachedAt).Seconds()
208    probability := ac.config.Beta * age / ttl
209
210    // Random decision
211    return rand.Float64() < probability
212}
213
214// fetchAndCache fetches data from database and caches it
215func (ac *AdvancedCache) fetchAndCache(ctx context.Context, key string) (interface{}, error) {
216    // Fetch from database
217    value, err := ac.database.Get(ctx, key)
218    if err != nil {
219        return nil, fmt.Errorf("database fetch failed: %w", err)
220    }
221
222    // Cache the value with metadata
223    go ac.cacheValue(context.Background(), key, value)
224
225    return value, nil
226}
227
228// cacheValue stores value in cache with metadata
229func (ac *AdvancedCache) cacheValue(ctx context.Context, key string, value interface{}) {
230    // Serialize value
231    valueData, err := json.Marshal(value)
232    if err != nil {
233        log.Printf("Failed to marshal value for key %s: %v", key, err)
234        return
235    }
236
237    now := time.Now()
238    expiresAt := now.Add(ac.config.DefaultTTL)
239
240    // Store with metadata
241    data := map[string]interface{}{
242        "value":      string(valueData),
243        "cached_at":  now.Unix(),
244        "expires_at": expiresAt.Unix(),
245    }
246
247    pipe := ac.redis.Pipeline()
248    pipe.HMSet(ctx, ac.metaKey(key), data)
249    pipe.Expire(ctx, ac.metaKey(key), ac.config.DefaultTTL+ac.config.StaleWhileRevalidate)
250
251    if _, err := pipe.Exec(ctx); err != nil {
252        log.Printf("Failed to cache value for key %s: %v", key, err)
253    }
254}
255
256// refreshWorker processes background refresh queue
257func (ac *AdvancedCache) refreshWorker() {
258    defer ac.wg.Done()
259
260    for {
261        select {
262        case key := <-ac.refreshQueue:
263            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
264            value, err := ac.database.Get(ctx, key)
265            cancel()
266
267            if err != nil {
268                log.Printf("Background refresh failed for key %s: %v", key, err)
269                continue
270            }
271
272            ac.cacheValue(context.Background(), key, value)
273            log.Printf("Background refreshed key %s", key)
274
275        case <-ac.stopCh:
276            return
277        }
278    }
279}
280
281// hotKeyMonitor tracks and refreshes hot keys proactively
282func (ac *AdvancedCache) hotKeyMonitor() {
283    defer ac.wg.Done()
284
285    ticker := time.NewTicker(10 * time.Second)
286    defer ticker.Stop()
287
288    for {
289        select {
290        case <-ticker.C:
291            ac.refreshHotKeys()
292
293        case <-ac.stopCh:
294            return
295        }
296    }
297}
298
299// refreshHotKeys proactively refreshes frequently accessed keys
300func (ac *AdvancedCache) refreshHotKeys() {
301    hotKeys := ac.hotKeys.GetHotKeys()
302
303    for _, key := range hotKeys {
304        select {
305        case ac.refreshQueue <- key:
306            log.Printf("Proactively refreshing hot key: %s", key)
307        default:
308            // Queue full, skip
309        }
310    }
311}
312
313// warmCache performs initial cache warming
314func (ac *AdvancedCache) warmCache() {
315    log.Printf("Starting cache warming for %d keys", len(ac.config.WarmupKeys))
316
317    var wg sync.WaitGroup
318    semaphore := make(chan struct{}, 10) // Limit concurrent warming operations
319
320    for _, key := range ac.config.WarmupKeys {
321        wg.Add(1)
322        go func(k string) {
323            defer wg.Done()
324
325            semaphore <- struct{}{}
326            defer func() { <-semaphore }()
327
328            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
329            defer cancel()
330
331            value, err := ac.database.Get(ctx, k)
332            if err != nil {
333                log.Printf("Failed to warm key %s: %v", k, err)
334                return
335            }
336
337            ac.cacheValue(context.Background(), k, value)
338            log.Printf("Warmed key %s", k)
339        }(key)
340    }
341
342    wg.Wait()
343    log.Printf("Cache warming completed")
344}
345
346// metaKey generates cache key with metadata prefix
347func (ac *AdvancedCache) metaKey(key string) string {
348    return fmt.Sprintf("cache:meta:%s", key)
349}
350
351// RecordAccess tracks key access for hot key detection
352func (hkt *HotKeyTracker) RecordAccess(key string) {
353    hkt.mu.Lock()
354    defer hkt.mu.Unlock()
355
356    info, exists := hkt.accessCounts[key]
357    if !exists {
358        info = &AccessInfo{}
359        hkt.accessCounts[key] = info
360    }
361
362    info.Count++
363    info.LastAccess = time.Now()
364
365    // Cleanup old entries periodically
366    if len(hkt.accessCounts) > 10000 {
367        hkt.cleanup()
368    }
369}
370
371// GetHotKeys returns keys exceeding access threshold
372func (hkt *HotKeyTracker) GetHotKeys() []string {
373    hkt.mu.RLock()
374    defer hkt.mu.RUnlock()
375
376    var hotKeys []string
377    cutoff := time.Now().Add(-1 * time.Minute)
378
379    for key, info := range hkt.accessCounts {
380        if info.Count >= hkt.threshold && info.LastAccess.After(cutoff) {
381            hotKeys = append(hotKeys, key)
382        }
383    }
384
385    return hotKeys
386}
387
388// cleanup removes stale access tracking entries
389func (hkt *HotKeyTracker) cleanup() {
390    cutoff := time.Now().Add(-5 * time.Minute)
391
392    for key, info := range hkt.accessCounts {
393        if info.LastAccess.Before(cutoff) {
394            delete(hkt.accessCounts, key)
395        }
396    }
397}
398
399// Metrics methods
400func (cm *CacheMetrics) RecordHit() {
401    cm.mu.Lock()
402    cm.hits++
403    cm.mu.Unlock()
404}
405
406func (cm *CacheMetrics) RecordMiss() {
407    cm.mu.Lock()
408    cm.misses++
409    cm.mu.Unlock()
410}
411
412func (cm *CacheMetrics) RecordRefresh() {
413    cm.mu.Lock()
414    cm.refreshes++
415    cm.mu.Unlock()
416}
417
418func (cm *CacheMetrics) RecordError() {
419    cm.mu.Lock()
420    cm.errors++
421    cm.mu.Unlock()
422}
423
424func (cm *CacheMetrics) GetStats() map[string]int64 {
425    cm.mu.RLock()
426    defer cm.mu.RUnlock()
427
428    total := cm.hits + cm.misses
429    hitRate := float64(0)
430    if total > 0 {
431        hitRate = float64(cm.hits) / float64(total) * 100
432    }
433
434    return map[string]int64{
435        "hits":      cm.hits,
436        "misses":    cm.misses,
437        "refreshes": cm.refreshes,
438        "errors":    cm.errors,
439        "hit_rate":  int64(math.Round(hitRate)),
440    }
441}
442
443// Close stops all background workers
444func (ac *AdvancedCache) Close() error {
445    close(ac.stopCh)
446    ac.wg.Wait()
447    return nil
448}
449
450// Example usage
451func ExampleAdvancedCache() {
452    rdb := redis.NewClient(&redis.Options{
453        Addr: "localhost:6379",
454    })
455
456    // Mock database
457    var db Database
458
459    config := &AdvancedCacheConfig{
460        DefaultTTL:           30 * time.Minute,
461        StaleWhileRevalidate: 5 * time.Minute,
462        Beta:                 1.0,
463        HotKeyThreshold:      100,
464        RefreshWorkers:       3,
465        WarmupKeys:           []string{"popular:product:1", "popular:product:2"},
466    }
467
468    cache := NewAdvancedCache(rdb, db, config)
469    defer cache.Close()
470
471    ctx := context.Background()
472
473    // Get value (will cache miss and fetch from database)
474    value, err := cache.Get(ctx, "user:12345")
475    if err != nil {
476        log.Printf("Failed to get value: %v", err)
477    } else {
478        log.Printf("Value: %+v", value)
479    }
480
481    // Subsequent gets will hit cache
482    for i := 0; i < 10; i++ {
483        value, _ = cache.Get(ctx, "user:12345")
484    }
485
486    // Print metrics
487    stats := cache.metrics.GetStats()
488    log.Printf("Cache stats: %+v", stats)
489}

Practice Exercises

Summary

Key Takeaways

  1. Choose the Right Pattern: Cache-aside for general use, write-through for consistency, write-behind for high throughput
  2. Select Appropriate Data Structures: Strings for simple values, hashes for objects, lists for queues, sorted sets for rankings
  3. Handle Cache Stampedes: Use single-flight patterns and lock-based protection
  4. Implement Proper TTL: Set appropriate expiration times and manage memory effectively
  5. Monitor Performance: Track hit rates, latency, and error rates
  6. Design for Failure: Build resilient systems that can handle cache unavailability
  7. Use Pipelining: Batch operations for better performance

Production Checklist

  • Connection Pooling: Configure appropriate pool sizes and timeouts
  • TTL Management: Set reasonable expiration times for different data types
  • Error Handling: Handle Redis failures gracefully with fallbacks
  • Monitoring: Track hit rates, latency, memory usage, and error rates
  • Cache Invalidation: Implement proper invalidation strategies
  • Serialization: Use efficient formats
  • Security: Implement authentication and encryption for sensitive data
  • Testing: Test cache behavior, expiration, and failure scenarios

When to Use Each Pattern

Cache-Aside Pattern:

  • General-purpose caching
  • Data that changes frequently
  • When you want control over cache population
  • Most web applications and APIs

Write-Through Pattern:

  • Data consistency is critical
  • Write-heavy workloads
  • Configuration and user preference data
  • When you can tolerate slightly slower writes

Write-Behind Pattern:

  • Very high write throughput requirements
  • Bulk operations and analytics
  • Logging and telemetry data
  • When you can tolerate eventual consistency

💡 Final Insight: Caching is not a silver bullet, but it's one of the most effective ways to improve application performance. The key is understanding your data access patterns and choosing the right strategy for each use case.

Next Steps

  1. Practice: Build a complete caching system for a real application
  2. Experiment: Try different Redis data structures and patterns
  3. Deepen Knowledge: Read Redis documentation and best practices
  4. Production Readiness: Learn about Redis clustering, sentinel, and scaling
  5. Advanced Topics: Study distributed caching patterns and consistency models

Remember: Caching is about trade-offs. Understand what you're optimizing for and choose the right approach for your specific needs.