Raft Consensus

Exercise: Raft Consensus

Difficulty - Advanced

Learning Objectives

  • Understand distributed consensus algorithms
  • Implement leader election
  • Handle log replication
  • Manage state machine transitions
  • Handle network partitions
  • Implement snapshot and log compaction

Problem Statement

Implement a simplified Raft consensus algorithm for distributed state machine replication.

Core Components

 1package raft
 2
 3import (
 4    "context"
 5    "time"
 6)
 7
 8type State int
 9
10const (
11    Follower State = iota
12    Candidate
13    Leader
14)
15
16type LogEntry struct {
17    Term    int
18    Command interface{}
19}
20
21type Node struct {
22    id          int
23    state       State
24    currentTerm int
25    votedFor    int
26    log         []LogEntry
27    commitIndex int
28    lastApplied int
29    peers       []int
30}
31
32func NewNode(id int, peers []int) *Node
33func RequestVote(term int, candidateID int, lastLogIndex int, lastLogTerm int)
34func AppendEntries(term int, leaderID int, prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int)
35func Start(ctx context.Context)

Solution

Click to see the solution

Algorithm Overview

Raft is a distributed consensus algorithm designed to be understandable while providing the same guarantees as Paxos. It ensures replicated state machines maintain consistency across failures.

Core Raft Concepts:

1. Leader Election:

  • Uses randomized election timeouts
  • Candidates request votes with RequestVote RPC
  • Majority wins
  • Higher term numbers supersede lower terms
  • Log up-to-date check prevents stale candidates

2. Log Replication:

  • Leader receives client commands
  • Leader appends to its log and replicates to followers
  • Followers apply when leader confirms commitment
  • Log matching property ensures consistency

3. State Transitions:

Follower ─(election timeout)─> Candidate ─(majority votes)─> Leader
    ^                                |                            |
    |                                └──(higher term)─────────────┘
    └────────────────(higher term or heartbeat)────────────────────┘

4. Safety Properties:

  • Election Safety: At most one leader per term
  • Leader Append-Only: Leaders never overwrite log entries
  • Log Matching: If logs contain same index/term, all preceding entries match
  • Leader Completeness: Committed entries appear in all future leader logs
  • State Machine Safety: Applied entries are identical across all nodes

Key Algorithm Details:

RequestVote RPC:

  • Candidate increments term and requests votes
  • Voters grant vote if:
    • Haven't voted in this term yet
    • Candidate's log is at least as up-to-date
  • Log comparison: Higher term wins, or same term with longer log wins

AppendEntries RPC:

  • Leader sends heartbeats to maintain authority
  • Carries new log entries for replication
  • Includes previous log index/term for consistency checking
  • Followers reject if logs don't match at previous position

Commit Rules:

  • Leader commits when majority replicate entry
  • Followers commit based on leader's commit index
  • Cannot commit entries from previous terms directly

Time Complexity Analysis

Operation Time Complexity Explanation
Leader Election O(n) messages n = cluster size, broadcast vote requests
Log Append O(n) messages Replicate to all followers
Commit Decision O(n) Count replications from n nodes
Apply Entry O(1) Single state machine operation
Log Compaction O(m) m = snapshot size

Election Latency:

  • Best case: 1 election timeout
  • Worst case: Multiple failed elections
  • Average: 1-2 election timeouts

Space Complexity

Per Node:

  • O(L) for log entries
  • O(n) for peer tracking
  • O(1) for persistent state

Total Cluster:

  • O(n × L) where n = nodes, L = log length
  • After snapshot/compaction: O(n × S) where S = snapshot size

Implementation

  1package raft
  2
  3import (
  4    "context"
  5    "math/rand"
  6    "sync"
  7    "time"
  8)
  9
 10type State int
 11
 12const (
 13    Follower State = iota
 14    Candidate
 15    Leader
 16)
 17
 18func String() string {
 19    switch s {
 20    case Follower:
 21        return "Follower"
 22    case Candidate:
 23        return "Candidate"
 24    case Leader:
 25        return "Leader"
 26    }
 27    return "Unknown"
 28}
 29
 30type LogEntry struct {
 31    Term    int
 32    Command interface{}
 33}
 34
 35type Node struct {
 36    id          int
 37    state       State
 38    currentTerm int
 39    votedFor    int
 40    log         []LogEntry
 41    commitIndex int
 42    lastApplied int
 43
 44    // Leader state
 45    nextIndex  map[int]int
 46    matchIndex map[int]int
 47
 48    peers    []int
 49    mu       sync.RWMutex
 50    applyCh  chan LogEntry
 51    voteCh   chan bool
 52    appendCh chan bool
 53}
 54
 55func NewNode(id int, peers []int) *Node {
 56    return &Node{
 57        id:         id,
 58        state:      Follower,
 59        votedFor:   -1,
 60        log:        make([]LogEntry, 0),
 61        nextIndex:  make(map[int]int),
 62        matchIndex: make(map[int]int),
 63        peers:      peers,
 64        applyCh:    make(chan LogEntry, 100),
 65        voteCh:     make(chan bool, 10),
 66        appendCh:   make(chan bool, 10),
 67    }
 68}
 69
 70func Start(ctx context.Context) {
 71    go n.run(ctx)
 72}
 73
 74func run(ctx context.Context) {
 75    for {
 76        select {
 77        case <-ctx.Done():
 78            return
 79        default:
 80            n.mu.RLock()
 81            state := n.state
 82            n.mu.RUnlock()
 83
 84            switch state {
 85            case Follower:
 86                n.runFollower(ctx)
 87            case Candidate:
 88                n.runCandidate(ctx)
 89            case Leader:
 90                n.runLeader(ctx)
 91            }
 92        }
 93    }
 94}
 95
 96func runFollower(ctx context.Context) {
 97    timeout := n.electionTimeout()
 98    select {
 99    case <-ctx.Done():
100        return
101    case <-n.appendCh:
102        // Received heartbeat, continue as follower
103    case <-n.voteCh:
104        // Granted vote, continue as follower
105    case <-time.After(timeout):
106        // Election timeout, become candidate
107        n.becomeCandidate()
108    }
109}
110
111func runCandidate(ctx context.Context) {
112    n.mu.Lock()
113    n.currentTerm++
114    n.votedFor = n.id
115    term := n.currentTerm
116    n.mu.Unlock()
117
118    votes := 1
119    majority := len(n.peers)/2 + 1
120
121    // Request votes from peers
122    voteCh := make(chan bool, len(n.peers))
123    for _, peer := range n.peers {
124        go func(peerID int) {
125            // Simulate RPC call
126            granted := n.requestVoteRPC(peerID, term)
127            voteCh <- granted
128        }(peer)
129    }
130
131    timeout := n.electionTimeout()
132    for votes < majority {
133        select {
134        case <-ctx.Done():
135            return
136        case granted := <-voteCh:
137            if granted {
138                votes++
139            }
140        case <-n.appendCh:
141            // Another leader emerged, become follower
142            n.becomeFollower(term)
143            return
144        case <-time.After(timeout):
145            // Election timeout, start new election
146            return
147        }
148    }
149
150    // Won election
151    if votes >= majority {
152        n.becomeLeader()
153    }
154}
155
156func runLeader(ctx context.Context) {
157    // Initialize leader state
158    n.mu.Lock()
159    for _, peer := range n.peers {
160        n.nextIndex[peer] = len(n.log)
161        n.matchIndex[peer] = 0
162    }
163    n.mu.Unlock()
164
165    // Send heartbeats
166    ticker := time.NewTicker(50 * time.Millisecond)
167    defer ticker.Stop()
168
169    for {
170        select {
171        case <-ctx.Done():
172            return
173        case <-ticker.C:
174            n.sendHeartbeats()
175        }
176
177        n.mu.RLock()
178        if n.state != Leader {
179            n.mu.RUnlock()
180            return
181        }
182        n.mu.RUnlock()
183    }
184}
185
186func RequestVote(term int, candidateID int, lastLogIndex int, lastLogTerm int) {
187    n.mu.Lock()
188    defer n.mu.Unlock()
189
190    if term < n.currentTerm {
191        return n.currentTerm, false
192    }
193
194    if term > n.currentTerm {
195        n.currentTerm = term
196        n.votedFor = -1
197        n.state = Follower
198    }
199
200    if n.votedFor == -1 || n.votedFor == candidateID {
201        myLastLogIndex := len(n.log) - 1
202        myLastLogTerm := 0
203        if myLastLogIndex >= 0 {
204            myLastLogTerm = n.log[myLastLogIndex].Term
205        }
206
207        if lastLogTerm > myLastLogTerm || {
208            n.votedFor = candidateID
209            n.voteCh <- true
210            return n.currentTerm, true
211        }
212    }
213
214    return n.currentTerm, false
215}
216
217func AppendEntries(term int, leaderID int, prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int) {
218    n.mu.Lock()
219    defer n.mu.Unlock()
220
221    if term < n.currentTerm {
222        return n.currentTerm, false
223    }
224
225    n.appendCh <- true
226
227    if term > n.currentTerm {
228        n.currentTerm = term
229        n.votedFor = -1
230    }
231    n.state = Follower
232
233    // Check log consistency
234    if prevLogIndex >= 0 {
235        if prevLogIndex >= len(n.log) || n.log[prevLogIndex].Term != prevLogTerm {
236            return n.currentTerm, false
237        }
238    }
239
240    // Append new entries
241    for i, entry := range entries {
242        index := prevLogIndex + 1 + i
243        if index < len(n.log) {
244            if n.log[index].Term != entry.Term {
245                n.log = n.log[:index]
246                n.log = append(n.log, entry)
247            }
248        } else {
249            n.log = append(n.log, entry)
250        }
251    }
252
253    // Update commit index
254    if leaderCommit > n.commitIndex {
255        n.commitIndex = min(leaderCommit, len(n.log)-1)
256        n.applyCommitted()
257    }
258
259    return n.currentTerm, true
260}
261
262func applyCommitted() {
263    for n.lastApplied < n.commitIndex {
264        n.lastApplied++
265        entry := n.log[n.lastApplied]
266        select {
267        case n.applyCh <- entry:
268        default:
269        }
270    }
271}
272
273func becomeFollower(term int) {
274    n.mu.Lock()
275    defer n.mu.Unlock()
276    n.state = Follower
277    n.currentTerm = term
278    n.votedFor = -1
279}
280
281func becomeCandidate() {
282    n.mu.Lock()
283    defer n.mu.Unlock()
284    n.state = Candidate
285}
286
287func becomeLeader() {
288    n.mu.Lock()
289    defer n.mu.Unlock()
290    n.state = Leader
291}
292
293func electionTimeout() time.Duration {
294    return time.Duration(150+rand.Intn(150)) * time.Millisecond
295}
296
297func requestVoteRPC(peerID int, term int) bool {
298    // Simulate RPC - in real implementation, this would be a network call
299    // For now, return random vote
300    return rand.Float32() > 0.5
301}
302
303func sendHeartbeats() {
304    n.mu.RLock()
305    term := n.currentTerm
306    commit := n.commitIndex
307    n.mu.RUnlock()
308
309    for _, peer := range n.peers {
310        go func(peerID int) {
311            // Simulate AppendEntries RPC
312            n.appendEntriesRPC(peerID, term, commit)
313        }(peer)
314    }
315}
316
317func appendEntriesRPC(peerID int, term int, commit int) {
318    // Simulate RPC - in real implementation, this would be a network call
319}
320
321func min(a, b int) int {
322    if a < b {
323        return a
324    }
325    return b
326}
327
328func State() State {
329    n.mu.RLock()
330    defer n.mu.RUnlock()
331    return n.state
332}
333
334func ApplyCh() <-chan LogEntry {
335    return n.applyCh
336}

Benchmarking Code

  1package raft_test
  2
  3import (
  4    "context"
  5    "sync"
  6    "testing"
  7    "time"
  8)
  9
 10// Benchmark leader election in 3-node cluster
 11func BenchmarkLeaderElection3Nodes(b *testing.B) {
 12    for i := 0; i < b.N; i++ {
 13        cluster := createCluster(3)
 14        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 15
 16        // Start all nodes
 17        for _, node := range cluster {
 18            node.Start(ctx)
 19        }
 20
 21        // Wait for leader election
 22        waitForLeader(cluster, 1*time.Second)
 23        cancel()
 24    }
 25}
 26
 27// Benchmark leader election in 5-node cluster
 28func BenchmarkLeaderElection5Nodes(b *testing.B) {
 29    for i := 0; i < b.N; i++ {
 30        cluster := createCluster(5)
 31        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 32
 33        for _, node := range cluster {
 34            node.Start(ctx)
 35        }
 36
 37        waitForLeader(cluster, 1*time.Second)
 38        cancel()
 39    }
 40}
 41
 42// Benchmark log replication throughput
 43func BenchmarkLogReplication(b *testing.B) {
 44    cluster := createCluster(3)
 45    ctx := context.Background()
 46
 47    for _, node := range cluster {
 48        node.Start(ctx)
 49    }
 50
 51    leader := waitForLeader(cluster, 1*time.Second)
 52
 53    b.ResetTimer()
 54    for i := 0; i < b.N; i++ {
 55        entry := raft.LogEntry{
 56            Term:    leader.currentTerm,
 57            Command: i,
 58        }
 59        leader.AppendLog(entry)
 60    }
 61}
 62
 63// Benchmark concurrent writes
 64func BenchmarkConcurrentWrites(b *testing.B) {
 65    cluster := createCluster(3)
 66    ctx := context.Background()
 67
 68    for _, node := range cluster {
 69        node.Start(ctx)
 70    }
 71
 72    leader := waitForLeader(cluster, 1*time.Second)
 73    var wg sync.WaitGroup
 74
 75    b.ResetTimer()
 76    b.RunParallel(func(pb *testing.PB) {
 77        for pb.Next() {
 78            wg.Add(1)
 79            go func() {
 80                defer wg.Done()
 81                entry := raft.LogEntry{
 82                    Term:    leader.currentTerm,
 83                    Command: "test",
 84                }
 85                leader.AppendLog(entry)
 86            }()
 87        }
 88    })
 89    wg.Wait()
 90}
 91
 92// Benchmark RequestVote RPC
 93func BenchmarkRequestVote(b *testing.B) {
 94    node := raft.NewNode(1, []int{2, 3, 4, 5})
 95
 96    b.ResetTimer()
 97    b.RunParallel(func(pb *testing.PB) {
 98        for pb.Next() {
 99            node.RequestVote(1, 2, 0, 0)
100        }
101    })
102}
103
104// Benchmark AppendEntries RPC
105func BenchmarkAppendEntriesHeartbeat(b *testing.B) {
106    node := raft.NewNode(1, []int{2, 3, 4, 5})
107    entries := []raft.LogEntry{}
108
109    b.ResetTimer()
110    b.RunParallel(func(pb *testing.PB) {
111        for pb.Next() {
112            node.AppendEntries(1, 2, -1, 0, entries, 0)
113        }
114    })
115}
116
117// Benchmark AppendEntries with entries
118func BenchmarkAppendEntriesWithData(b *testing.B) {
119    node := raft.NewNode(1, []int{2, 3, 4, 5})
120    entries := []raft.LogEntry{
121        {Term: 1, Command: "data1"},
122        {Term: 1, Command: "data2"},
123        {Term: 1, Command: "data3"},
124    }
125
126    b.ResetTimer()
127    for i := 0; i < b.N; i++ {
128        node.AppendEntries(1, 2, -1, 0, entries, 0)
129    }
130}
131
132// Helper functions
133func createCluster(size int) []*raft.Node {
134    peers := make([]int, 0, size-1)
135    for i := 0; i < size; i++ {
136        if i != 0 {
137            peers = append(peers, i)
138        }
139    }
140
141    cluster := make([]*raft.Node, size)
142    for i := 0; i < size; i++ {
143        cluster[i] = raft.NewNode(i, peers)
144    }
145    return cluster
146}
147
148func waitForLeader(cluster []*raft.Node, timeout time.Duration) *raft.Node {
149    deadline := time.Now().Add(timeout)
150    for time.Now().Before(deadline) {
151        for _, node := range cluster {
152            if node.State() == raft.Leader {
153                return node
154            }
155        }
156        time.Sleep(10 * time.Millisecond)
157    }
158    return nil
159}
160
161// Example benchmark results:
162// BenchmarkLeaderElection3Nodes-8          500   3200000 ns/op   12288 B/op   150 allocs/op
163// BenchmarkLeaderElection5Nodes-8          300   4500000 ns/op   20480 B/op   250 allocs/op
164// BenchmarkLogReplication-8             100000     12500 ns/op    1024 B/op    15 allocs/op
165// BenchmarkConcurrentWrites-8            50000     28000 ns/op    2048 B/op    30 allocs/op
166// BenchmarkRequestVote-8              10000000       125 ns/op       0 B/op     0 allocs/op
167// BenchmarkAppendEntriesHeartbeat-8   20000000        85 ns/op       0 B/op     0 allocs/op
168// BenchmarkAppendEntriesWithData-8     5000000       320 ns/op     256 B/op     5 allocs/op

Production Considerations

1. Persistent Storage:

 1type PersistentState struct {
 2    CurrentTerm int
 3    VotedFor    int
 4    Log         []LogEntry
 5}
 6
 7func SaveState() error {
 8    state := PersistentState{
 9        CurrentTerm: n.currentTerm,
10        VotedFor:    n.votedFor,
11        Log:         n.log,
12    }
13
14    data, err := json.Marshal(state)
15    if err != nil {
16        return err
17    }
18
19    return os.WriteFile("raft-state-"+strconv.Itoa(n.id), data, 0644)
20}
21
22func LoadState() error {
23    data, err := os.ReadFile("raft-state-" + strconv.Itoa(n.id))
24    if err != nil {
25        return err
26    }
27
28    var state PersistentState
29    if err := json.Unmarshal(data, &state); err != nil {
30        return err
31    }
32
33    n.currentTerm = state.CurrentTerm
34    n.votedFor = state.VotedFor
35    n.log = state.Log
36
37    return nil
38}

2. Log Compaction and Snapshots:

 1type Snapshot struct {
 2    LastIncludedIndex int
 3    LastIncludedTerm  int
 4    Data              []byte
 5}
 6
 7func CreateSnapshot(index int) {
 8    n.mu.Lock()
 9    defer n.mu.Unlock()
10
11    if index > n.commitIndex {
12        return nil, errors.New("cannot snapshot uncommitted entries")
13    }
14
15    // Serialize state machine
16    data, err := n.stateMachine.Serialize()
17    if err != nil {
18        return err
19    }
20
21    snapshot := &Snapshot{
22        LastIncludedIndex: index,
23        LastIncludedTerm:  n.log[index].Term,
24        Data:              data,
25    }
26
27    // Truncate log
28    n.log = n.log[index+1:]
29
30    return snapshot, nil
31}
32
33func InstallSnapshot(snapshot *Snapshot) error {
34    n.mu.Lock()
35    defer n.mu.Unlock()
36
37    // Discard log up to snapshot
38    n.log = make([]LogEntry, 0)
39    n.commitIndex = snapshot.LastIncludedIndex
40    n.lastApplied = snapshot.LastIncludedIndex
41
42    // Restore state machine
43    return n.stateMachine.Deserialize(snapshot.Data)
44}

3. Network Layer with gRPC:

 1// Define Raft service
 2service RaftService {
 3    rpc RequestVote(VoteRequest) returns;
 4    rpc AppendEntries(AppendRequest) returns;
 5    rpc InstallSnapshot(SnapshotRequest) returns;
 6}
 7
 8// Client implementation
 9type RaftClient struct {
10    peers map[int]pb.RaftServiceClient
11}
12
13func SendRequestVote(peerID int, req *VoteRequest) {
14    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
15    defer cancel()
16
17    return c.peers[peerID].RequestVote(ctx, req)
18}

4. Membership Changes:

 1type Config struct {
 2    Old []int // Old configuration
 3    New []int // New configuration
 4}
 5
 6func AddNode(nodeID int) error {
 7    // Enter joint consensus phase
 8    jointConfig := Config{
 9        Old: n.peers,
10        New: append(n.peers, nodeID),
11    }
12
13    // Replicate C_old,new
14    if err := n.replicateConfig(jointConfig); err != nil {
15        return err
16    }
17
18    // Once committed, replicate C_new
19    n.peers = jointConfig.New
20    return n.replicateConfig(Config{New: n.peers})
21}
22
23func QuorumSize(config Config) int {
24    if len(config.Old) > 0 {
25        // Joint consensus requires majorities from both
26        return max(len(config.Old)/2+1, len(config.New)/2+1)
27    }
28    return len(config.New)/2 + 1
29}

5. Pre-Vote Optimization:

 1func PreVote(ctx context.Context) bool {
 2    n.mu.Lock()
 3    term := n.currentTerm
 4    n.mu.Unlock()
 5
 6    votes := 1
 7    for _, peer := range n.peers {
 8        // Send PreVote RPC
 9        resp, err := n.sendPreVote(peer, term+1)
10        if err == nil && resp.VoteGranted {
11            votes++
12        }
13    }
14
15    return votes > len(n.peers)/2
16}
17
18func runCandidate(ctx context.Context) {
19    // Only start election if PreVote succeeds
20    if !n.PreVote(ctx) {
21        return
22    }
23
24    // Proceed with normal election
25    n.startElection(ctx)
26}

6. Leadership Transfer:

 1func TransferLeadership(targetID int) error {
 2    if n.state != Leader {
 3        return errors.New("only leader can transfer leadership")
 4    }
 5
 6    // Stop accepting new requests
 7    n.transferring = true
 8
 9    // Bring target up to date
10    for n.matchIndex[targetID] < len(n.log)-1 {
11        n.sendAppendEntries(targetID)
12        time.Sleep(10 * time.Millisecond)
13    }
14
15    // Send TimeoutNow message
16    return n.sendTimeoutNow(targetID)
17}

7. Monitoring and Observability:

 1type RaftMetrics struct {
 2    CurrentTerm      prometheus.Gauge
 3    State            prometheus.Gauge
 4    LogLength        prometheus.Gauge
 5    CommitIndex      prometheus.Gauge
 6    ElectionTimeouts prometheus.Counter
 7    RPCsReceived     *prometheus.CounterVec
 8    RPCLatency       *prometheus.HistogramVec
 9}
10
11func RecordMetrics(m *RaftMetrics) {
12    n.mu.RLock()
13    defer n.mu.RUnlock()
14
15    m.CurrentTerm.Set(float64(n.currentTerm))
16    m.State.Set(float64(n.state))
17    m.LogLength.Set(float64(len(n.log)))
18    m.CommitIndex.Set(float64(n.commitIndex))
19}

Performance Characteristics:

Cluster Size Election Time Commit Latency Throughput Memory/Node
3 nodes 200-400ms 5-10ms 10K ops/s 100MB
5 nodes 300-500ms 10-15ms 8K ops/s 120MB
7 nodes 400-600ms 15-20ms 6K ops/s 140MB

Best Practices:

  • Use odd number of nodes for fault tolerance
  • Set election timeout to 10x network RTT
  • Implement batching for log replication efficiency
  • Compact logs regularly to prevent unbounded growth
  • Monitor leader stability and election frequency
  • Use read leases or ReadIndex for linearizable reads
  • Test network partitions and crash recovery scenarios

Key Takeaways

  • Raft ensures strong consistency across distributed nodes
  • Leader election uses randomized timeouts
  • Log replication maintains state machine consistency
  • Majority quorum prevents split-brain scenarios
  • Term numbers detect stale leaders
  • Log matching ensures all nodes have identical histories
  • Snapshots compact logs for efficiency
  • Network partitions are handled through quorum requirements