Exercise: Raft Consensus
Difficulty - Advanced
Learning Objectives
- Understand distributed consensus algorithms
- Implement leader election
- Handle log replication
- Manage state machine transitions
- Handle network partitions
- Implement snapshot and log compaction
Problem Statement
Implement a simplified Raft consensus algorithm for distributed state machine replication.
Core Components
1package raft
2
3import (
4 "context"
5 "time"
6)
7
8type State int
9
10const (
11 Follower State = iota
12 Candidate
13 Leader
14)
15
16type LogEntry struct {
17 Term int
18 Command interface{}
19}
20
21type Node struct {
22 id int
23 state State
24 currentTerm int
25 votedFor int
26 log []LogEntry
27 commitIndex int
28 lastApplied int
29 peers []int
30}
31
32func NewNode(id int, peers []int) *Node
33func RequestVote(term int, candidateID int, lastLogIndex int, lastLogTerm int)
34func AppendEntries(term int, leaderID int, prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int)
35func Start(ctx context.Context)
Solution
Click to see the solution
Algorithm Overview
Raft is a distributed consensus algorithm designed to be understandable while providing the same guarantees as Paxos. It ensures replicated state machines maintain consistency across failures.
Core Raft Concepts:
1. Leader Election:
- Uses randomized election timeouts
- Candidates request votes with
RequestVoteRPC - Majority wins
- Higher term numbers supersede lower terms
- Log up-to-date check prevents stale candidates
2. Log Replication:
- Leader receives client commands
- Leader appends to its log and replicates to followers
- Followers apply when leader confirms commitment
- Log matching property ensures consistency
3. State Transitions:
Follower ─(election timeout)─> Candidate ─(majority votes)─> Leader
^ | |
| └──(higher term)─────────────┘
└────────────────(higher term or heartbeat)────────────────────┘
4. Safety Properties:
- Election Safety: At most one leader per term
- Leader Append-Only: Leaders never overwrite log entries
- Log Matching: If logs contain same index/term, all preceding entries match
- Leader Completeness: Committed entries appear in all future leader logs
- State Machine Safety: Applied entries are identical across all nodes
Key Algorithm Details:
RequestVote RPC:
- Candidate increments term and requests votes
- Voters grant vote if:
- Haven't voted in this term yet
- Candidate's log is at least as up-to-date
- Log comparison: Higher term wins, or same term with longer log wins
AppendEntries RPC:
- Leader sends heartbeats to maintain authority
- Carries new log entries for replication
- Includes previous log index/term for consistency checking
- Followers reject if logs don't match at previous position
Commit Rules:
- Leader commits when majority replicate entry
- Followers commit based on leader's commit index
- Cannot commit entries from previous terms directly
Time Complexity Analysis
| Operation | Time Complexity | Explanation |
|---|---|---|
| Leader Election | O(n) messages | n = cluster size, broadcast vote requests |
| Log Append | O(n) messages | Replicate to all followers |
| Commit Decision | O(n) | Count replications from n nodes |
| Apply Entry | O(1) | Single state machine operation |
| Log Compaction | O(m) | m = snapshot size |
Election Latency:
- Best case: 1 election timeout
- Worst case: Multiple failed elections
- Average: 1-2 election timeouts
Space Complexity
Per Node:
- O(L) for log entries
- O(n) for peer tracking
- O(1) for persistent state
Total Cluster:
- O(n × L) where n = nodes, L = log length
- After snapshot/compaction: O(n × S) where S = snapshot size
Implementation
1package raft
2
3import (
4 "context"
5 "math/rand"
6 "sync"
7 "time"
8)
9
10type State int
11
12const (
13 Follower State = iota
14 Candidate
15 Leader
16)
17
18func String() string {
19 switch s {
20 case Follower:
21 return "Follower"
22 case Candidate:
23 return "Candidate"
24 case Leader:
25 return "Leader"
26 }
27 return "Unknown"
28}
29
30type LogEntry struct {
31 Term int
32 Command interface{}
33}
34
35type Node struct {
36 id int
37 state State
38 currentTerm int
39 votedFor int
40 log []LogEntry
41 commitIndex int
42 lastApplied int
43
44 // Leader state
45 nextIndex map[int]int
46 matchIndex map[int]int
47
48 peers []int
49 mu sync.RWMutex
50 applyCh chan LogEntry
51 voteCh chan bool
52 appendCh chan bool
53}
54
55func NewNode(id int, peers []int) *Node {
56 return &Node{
57 id: id,
58 state: Follower,
59 votedFor: -1,
60 log: make([]LogEntry, 0),
61 nextIndex: make(map[int]int),
62 matchIndex: make(map[int]int),
63 peers: peers,
64 applyCh: make(chan LogEntry, 100),
65 voteCh: make(chan bool, 10),
66 appendCh: make(chan bool, 10),
67 }
68}
69
70func Start(ctx context.Context) {
71 go n.run(ctx)
72}
73
74func run(ctx context.Context) {
75 for {
76 select {
77 case <-ctx.Done():
78 return
79 default:
80 n.mu.RLock()
81 state := n.state
82 n.mu.RUnlock()
83
84 switch state {
85 case Follower:
86 n.runFollower(ctx)
87 case Candidate:
88 n.runCandidate(ctx)
89 case Leader:
90 n.runLeader(ctx)
91 }
92 }
93 }
94}
95
96func runFollower(ctx context.Context) {
97 timeout := n.electionTimeout()
98 select {
99 case <-ctx.Done():
100 return
101 case <-n.appendCh:
102 // Received heartbeat, continue as follower
103 case <-n.voteCh:
104 // Granted vote, continue as follower
105 case <-time.After(timeout):
106 // Election timeout, become candidate
107 n.becomeCandidate()
108 }
109}
110
111func runCandidate(ctx context.Context) {
112 n.mu.Lock()
113 n.currentTerm++
114 n.votedFor = n.id
115 term := n.currentTerm
116 n.mu.Unlock()
117
118 votes := 1
119 majority := len(n.peers)/2 + 1
120
121 // Request votes from peers
122 voteCh := make(chan bool, len(n.peers))
123 for _, peer := range n.peers {
124 go func(peerID int) {
125 // Simulate RPC call
126 granted := n.requestVoteRPC(peerID, term)
127 voteCh <- granted
128 }(peer)
129 }
130
131 timeout := n.electionTimeout()
132 for votes < majority {
133 select {
134 case <-ctx.Done():
135 return
136 case granted := <-voteCh:
137 if granted {
138 votes++
139 }
140 case <-n.appendCh:
141 // Another leader emerged, become follower
142 n.becomeFollower(term)
143 return
144 case <-time.After(timeout):
145 // Election timeout, start new election
146 return
147 }
148 }
149
150 // Won election
151 if votes >= majority {
152 n.becomeLeader()
153 }
154}
155
156func runLeader(ctx context.Context) {
157 // Initialize leader state
158 n.mu.Lock()
159 for _, peer := range n.peers {
160 n.nextIndex[peer] = len(n.log)
161 n.matchIndex[peer] = 0
162 }
163 n.mu.Unlock()
164
165 // Send heartbeats
166 ticker := time.NewTicker(50 * time.Millisecond)
167 defer ticker.Stop()
168
169 for {
170 select {
171 case <-ctx.Done():
172 return
173 case <-ticker.C:
174 n.sendHeartbeats()
175 }
176
177 n.mu.RLock()
178 if n.state != Leader {
179 n.mu.RUnlock()
180 return
181 }
182 n.mu.RUnlock()
183 }
184}
185
186func RequestVote(term int, candidateID int, lastLogIndex int, lastLogTerm int) {
187 n.mu.Lock()
188 defer n.mu.Unlock()
189
190 if term < n.currentTerm {
191 return n.currentTerm, false
192 }
193
194 if term > n.currentTerm {
195 n.currentTerm = term
196 n.votedFor = -1
197 n.state = Follower
198 }
199
200 if n.votedFor == -1 || n.votedFor == candidateID {
201 myLastLogIndex := len(n.log) - 1
202 myLastLogTerm := 0
203 if myLastLogIndex >= 0 {
204 myLastLogTerm = n.log[myLastLogIndex].Term
205 }
206
207 if lastLogTerm > myLastLogTerm || {
208 n.votedFor = candidateID
209 n.voteCh <- true
210 return n.currentTerm, true
211 }
212 }
213
214 return n.currentTerm, false
215}
216
217func AppendEntries(term int, leaderID int, prevLogIndex int, prevLogTerm int, entries []LogEntry, leaderCommit int) {
218 n.mu.Lock()
219 defer n.mu.Unlock()
220
221 if term < n.currentTerm {
222 return n.currentTerm, false
223 }
224
225 n.appendCh <- true
226
227 if term > n.currentTerm {
228 n.currentTerm = term
229 n.votedFor = -1
230 }
231 n.state = Follower
232
233 // Check log consistency
234 if prevLogIndex >= 0 {
235 if prevLogIndex >= len(n.log) || n.log[prevLogIndex].Term != prevLogTerm {
236 return n.currentTerm, false
237 }
238 }
239
240 // Append new entries
241 for i, entry := range entries {
242 index := prevLogIndex + 1 + i
243 if index < len(n.log) {
244 if n.log[index].Term != entry.Term {
245 n.log = n.log[:index]
246 n.log = append(n.log, entry)
247 }
248 } else {
249 n.log = append(n.log, entry)
250 }
251 }
252
253 // Update commit index
254 if leaderCommit > n.commitIndex {
255 n.commitIndex = min(leaderCommit, len(n.log)-1)
256 n.applyCommitted()
257 }
258
259 return n.currentTerm, true
260}
261
262func applyCommitted() {
263 for n.lastApplied < n.commitIndex {
264 n.lastApplied++
265 entry := n.log[n.lastApplied]
266 select {
267 case n.applyCh <- entry:
268 default:
269 }
270 }
271}
272
273func becomeFollower(term int) {
274 n.mu.Lock()
275 defer n.mu.Unlock()
276 n.state = Follower
277 n.currentTerm = term
278 n.votedFor = -1
279}
280
281func becomeCandidate() {
282 n.mu.Lock()
283 defer n.mu.Unlock()
284 n.state = Candidate
285}
286
287func becomeLeader() {
288 n.mu.Lock()
289 defer n.mu.Unlock()
290 n.state = Leader
291}
292
293func electionTimeout() time.Duration {
294 return time.Duration(150+rand.Intn(150)) * time.Millisecond
295}
296
297func requestVoteRPC(peerID int, term int) bool {
298 // Simulate RPC - in real implementation, this would be a network call
299 // For now, return random vote
300 return rand.Float32() > 0.5
301}
302
303func sendHeartbeats() {
304 n.mu.RLock()
305 term := n.currentTerm
306 commit := n.commitIndex
307 n.mu.RUnlock()
308
309 for _, peer := range n.peers {
310 go func(peerID int) {
311 // Simulate AppendEntries RPC
312 n.appendEntriesRPC(peerID, term, commit)
313 }(peer)
314 }
315}
316
317func appendEntriesRPC(peerID int, term int, commit int) {
318 // Simulate RPC - in real implementation, this would be a network call
319}
320
321func min(a, b int) int {
322 if a < b {
323 return a
324 }
325 return b
326}
327
328func State() State {
329 n.mu.RLock()
330 defer n.mu.RUnlock()
331 return n.state
332}
333
334func ApplyCh() <-chan LogEntry {
335 return n.applyCh
336}
Benchmarking Code
1package raft_test
2
3import (
4 "context"
5 "sync"
6 "testing"
7 "time"
8)
9
10// Benchmark leader election in 3-node cluster
11func BenchmarkLeaderElection3Nodes(b *testing.B) {
12 for i := 0; i < b.N; i++ {
13 cluster := createCluster(3)
14 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
15
16 // Start all nodes
17 for _, node := range cluster {
18 node.Start(ctx)
19 }
20
21 // Wait for leader election
22 waitForLeader(cluster, 1*time.Second)
23 cancel()
24 }
25}
26
27// Benchmark leader election in 5-node cluster
28func BenchmarkLeaderElection5Nodes(b *testing.B) {
29 for i := 0; i < b.N; i++ {
30 cluster := createCluster(5)
31 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
32
33 for _, node := range cluster {
34 node.Start(ctx)
35 }
36
37 waitForLeader(cluster, 1*time.Second)
38 cancel()
39 }
40}
41
42// Benchmark log replication throughput
43func BenchmarkLogReplication(b *testing.B) {
44 cluster := createCluster(3)
45 ctx := context.Background()
46
47 for _, node := range cluster {
48 node.Start(ctx)
49 }
50
51 leader := waitForLeader(cluster, 1*time.Second)
52
53 b.ResetTimer()
54 for i := 0; i < b.N; i++ {
55 entry := raft.LogEntry{
56 Term: leader.currentTerm,
57 Command: i,
58 }
59 leader.AppendLog(entry)
60 }
61}
62
63// Benchmark concurrent writes
64func BenchmarkConcurrentWrites(b *testing.B) {
65 cluster := createCluster(3)
66 ctx := context.Background()
67
68 for _, node := range cluster {
69 node.Start(ctx)
70 }
71
72 leader := waitForLeader(cluster, 1*time.Second)
73 var wg sync.WaitGroup
74
75 b.ResetTimer()
76 b.RunParallel(func(pb *testing.PB) {
77 for pb.Next() {
78 wg.Add(1)
79 go func() {
80 defer wg.Done()
81 entry := raft.LogEntry{
82 Term: leader.currentTerm,
83 Command: "test",
84 }
85 leader.AppendLog(entry)
86 }()
87 }
88 })
89 wg.Wait()
90}
91
92// Benchmark RequestVote RPC
93func BenchmarkRequestVote(b *testing.B) {
94 node := raft.NewNode(1, []int{2, 3, 4, 5})
95
96 b.ResetTimer()
97 b.RunParallel(func(pb *testing.PB) {
98 for pb.Next() {
99 node.RequestVote(1, 2, 0, 0)
100 }
101 })
102}
103
104// Benchmark AppendEntries RPC
105func BenchmarkAppendEntriesHeartbeat(b *testing.B) {
106 node := raft.NewNode(1, []int{2, 3, 4, 5})
107 entries := []raft.LogEntry{}
108
109 b.ResetTimer()
110 b.RunParallel(func(pb *testing.PB) {
111 for pb.Next() {
112 node.AppendEntries(1, 2, -1, 0, entries, 0)
113 }
114 })
115}
116
117// Benchmark AppendEntries with entries
118func BenchmarkAppendEntriesWithData(b *testing.B) {
119 node := raft.NewNode(1, []int{2, 3, 4, 5})
120 entries := []raft.LogEntry{
121 {Term: 1, Command: "data1"},
122 {Term: 1, Command: "data2"},
123 {Term: 1, Command: "data3"},
124 }
125
126 b.ResetTimer()
127 for i := 0; i < b.N; i++ {
128 node.AppendEntries(1, 2, -1, 0, entries, 0)
129 }
130}
131
132// Helper functions
133func createCluster(size int) []*raft.Node {
134 peers := make([]int, 0, size-1)
135 for i := 0; i < size; i++ {
136 if i != 0 {
137 peers = append(peers, i)
138 }
139 }
140
141 cluster := make([]*raft.Node, size)
142 for i := 0; i < size; i++ {
143 cluster[i] = raft.NewNode(i, peers)
144 }
145 return cluster
146}
147
148func waitForLeader(cluster []*raft.Node, timeout time.Duration) *raft.Node {
149 deadline := time.Now().Add(timeout)
150 for time.Now().Before(deadline) {
151 for _, node := range cluster {
152 if node.State() == raft.Leader {
153 return node
154 }
155 }
156 time.Sleep(10 * time.Millisecond)
157 }
158 return nil
159}
160
161// Example benchmark results:
162// BenchmarkLeaderElection3Nodes-8 500 3200000 ns/op 12288 B/op 150 allocs/op
163// BenchmarkLeaderElection5Nodes-8 300 4500000 ns/op 20480 B/op 250 allocs/op
164// BenchmarkLogReplication-8 100000 12500 ns/op 1024 B/op 15 allocs/op
165// BenchmarkConcurrentWrites-8 50000 28000 ns/op 2048 B/op 30 allocs/op
166// BenchmarkRequestVote-8 10000000 125 ns/op 0 B/op 0 allocs/op
167// BenchmarkAppendEntriesHeartbeat-8 20000000 85 ns/op 0 B/op 0 allocs/op
168// BenchmarkAppendEntriesWithData-8 5000000 320 ns/op 256 B/op 5 allocs/op
Production Considerations
1. Persistent Storage:
1type PersistentState struct {
2 CurrentTerm int
3 VotedFor int
4 Log []LogEntry
5}
6
7func SaveState() error {
8 state := PersistentState{
9 CurrentTerm: n.currentTerm,
10 VotedFor: n.votedFor,
11 Log: n.log,
12 }
13
14 data, err := json.Marshal(state)
15 if err != nil {
16 return err
17 }
18
19 return os.WriteFile("raft-state-"+strconv.Itoa(n.id), data, 0644)
20}
21
22func LoadState() error {
23 data, err := os.ReadFile("raft-state-" + strconv.Itoa(n.id))
24 if err != nil {
25 return err
26 }
27
28 var state PersistentState
29 if err := json.Unmarshal(data, &state); err != nil {
30 return err
31 }
32
33 n.currentTerm = state.CurrentTerm
34 n.votedFor = state.VotedFor
35 n.log = state.Log
36
37 return nil
38}
2. Log Compaction and Snapshots:
1type Snapshot struct {
2 LastIncludedIndex int
3 LastIncludedTerm int
4 Data []byte
5}
6
7func CreateSnapshot(index int) {
8 n.mu.Lock()
9 defer n.mu.Unlock()
10
11 if index > n.commitIndex {
12 return nil, errors.New("cannot snapshot uncommitted entries")
13 }
14
15 // Serialize state machine
16 data, err := n.stateMachine.Serialize()
17 if err != nil {
18 return err
19 }
20
21 snapshot := &Snapshot{
22 LastIncludedIndex: index,
23 LastIncludedTerm: n.log[index].Term,
24 Data: data,
25 }
26
27 // Truncate log
28 n.log = n.log[index+1:]
29
30 return snapshot, nil
31}
32
33func InstallSnapshot(snapshot *Snapshot) error {
34 n.mu.Lock()
35 defer n.mu.Unlock()
36
37 // Discard log up to snapshot
38 n.log = make([]LogEntry, 0)
39 n.commitIndex = snapshot.LastIncludedIndex
40 n.lastApplied = snapshot.LastIncludedIndex
41
42 // Restore state machine
43 return n.stateMachine.Deserialize(snapshot.Data)
44}
3. Network Layer with gRPC:
1// Define Raft service
2service RaftService {
3 rpc RequestVote(VoteRequest) returns;
4 rpc AppendEntries(AppendRequest) returns;
5 rpc InstallSnapshot(SnapshotRequest) returns;
6}
7
8// Client implementation
9type RaftClient struct {
10 peers map[int]pb.RaftServiceClient
11}
12
13func SendRequestVote(peerID int, req *VoteRequest) {
14 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
15 defer cancel()
16
17 return c.peers[peerID].RequestVote(ctx, req)
18}
4. Membership Changes:
1type Config struct {
2 Old []int // Old configuration
3 New []int // New configuration
4}
5
6func AddNode(nodeID int) error {
7 // Enter joint consensus phase
8 jointConfig := Config{
9 Old: n.peers,
10 New: append(n.peers, nodeID),
11 }
12
13 // Replicate C_old,new
14 if err := n.replicateConfig(jointConfig); err != nil {
15 return err
16 }
17
18 // Once committed, replicate C_new
19 n.peers = jointConfig.New
20 return n.replicateConfig(Config{New: n.peers})
21}
22
23func QuorumSize(config Config) int {
24 if len(config.Old) > 0 {
25 // Joint consensus requires majorities from both
26 return max(len(config.Old)/2+1, len(config.New)/2+1)
27 }
28 return len(config.New)/2 + 1
29}
5. Pre-Vote Optimization:
1func PreVote(ctx context.Context) bool {
2 n.mu.Lock()
3 term := n.currentTerm
4 n.mu.Unlock()
5
6 votes := 1
7 for _, peer := range n.peers {
8 // Send PreVote RPC
9 resp, err := n.sendPreVote(peer, term+1)
10 if err == nil && resp.VoteGranted {
11 votes++
12 }
13 }
14
15 return votes > len(n.peers)/2
16}
17
18func runCandidate(ctx context.Context) {
19 // Only start election if PreVote succeeds
20 if !n.PreVote(ctx) {
21 return
22 }
23
24 // Proceed with normal election
25 n.startElection(ctx)
26}
6. Leadership Transfer:
1func TransferLeadership(targetID int) error {
2 if n.state != Leader {
3 return errors.New("only leader can transfer leadership")
4 }
5
6 // Stop accepting new requests
7 n.transferring = true
8
9 // Bring target up to date
10 for n.matchIndex[targetID] < len(n.log)-1 {
11 n.sendAppendEntries(targetID)
12 time.Sleep(10 * time.Millisecond)
13 }
14
15 // Send TimeoutNow message
16 return n.sendTimeoutNow(targetID)
17}
7. Monitoring and Observability:
1type RaftMetrics struct {
2 CurrentTerm prometheus.Gauge
3 State prometheus.Gauge
4 LogLength prometheus.Gauge
5 CommitIndex prometheus.Gauge
6 ElectionTimeouts prometheus.Counter
7 RPCsReceived *prometheus.CounterVec
8 RPCLatency *prometheus.HistogramVec
9}
10
11func RecordMetrics(m *RaftMetrics) {
12 n.mu.RLock()
13 defer n.mu.RUnlock()
14
15 m.CurrentTerm.Set(float64(n.currentTerm))
16 m.State.Set(float64(n.state))
17 m.LogLength.Set(float64(len(n.log)))
18 m.CommitIndex.Set(float64(n.commitIndex))
19}
Performance Characteristics:
| Cluster Size | Election Time | Commit Latency | Throughput | Memory/Node |
|---|---|---|---|---|
| 3 nodes | 200-400ms | 5-10ms | 10K ops/s | 100MB |
| 5 nodes | 300-500ms | 10-15ms | 8K ops/s | 120MB |
| 7 nodes | 400-600ms | 15-20ms | 6K ops/s | 140MB |
Best Practices:
- Use odd number of nodes for fault tolerance
- Set election timeout to 10x network RTT
- Implement batching for log replication efficiency
- Compact logs regularly to prevent unbounded growth
- Monitor leader stability and election frequency
- Use read leases or ReadIndex for linearizable reads
- Test network partitions and crash recovery scenarios
Key Takeaways
- Raft ensures strong consistency across distributed nodes
- Leader election uses randomized timeouts
- Log replication maintains state machine consistency
- Majority quorum prevents split-brain scenarios
- Term numbers detect stale leaders
- Log matching ensures all nodes have identical histories
- Snapshots compact logs for efficiency
- Network partitions are handled through quorum requirements