Exercise: Vector Clock for Distributed Causality
Difficulty - Advanced
Estimated Time - 3-5 hours
Learning Objectives
- Understand the problem of time and causality in distributed systems
- Implement vector clocks to track happened-before relationships
- Detect concurrent events and conflicts in distributed systems
- Apply vector clocks to build eventually consistent systems
- Master version control and conflict resolution in replicated data
- Design distributed systems with causal consistency
Problem Statement
In distributed systems, physical clocks are unreliable for determining the order of events. Network delays, clock skew, and relativistic effects make it impossible to rely on wall-clock time. Yet, we often need to know: Did event A happen before event B? Are these events concurrent and potentially conflicting?
Vector clocks provide a logical clock mechanism that captures causality relationships between events across distributed processes. Each process maintains a vector of counters, and by comparing vector timestamps, we can definitively determine if events are causally related or concurrent.
This is crucial for distributed databases, version control systems, distributed caching, and any system that needs to maintain causal consistency or detect conflicts.
Real-World Scenario:
Consider a distributed note-taking application like Google Docs or Notion. Multiple users can edit the same document simultaneously from different locations. Without proper causality tracking:
1// Without vector clocks: Cannot determine event ordering
2type NaiveEdit struct {
3 text string
4 timestamp time.Time // Wall-clock time is unreliable!
5}
6
7func HappenedBefore(e2 *NaiveEdit) bool {
8 // WRONG: Clock skew makes this unreliable
9 return e1.timestamp.Before(e2.timestamp)
10}
11
12// With vector clocks: Precise causality tracking
13type Edit struct {
14 text string
15 vector VectorClock
16}
17
18func HappenedBefore(e2 *Edit) bool {
19 // CORRECT: Logical clocks are reliable
20 return e1.vector.HappenedBefore(e2.vector)
21}
22
23func ConcurrentWith(e2 *Edit) bool {
24 // Can detect conflicts!
25 return e1.vector.ConcurrentWith(e2.vector)
26}
Requirements
Functional Requirements
-
Vector Clock Implementation
- Maintain vector of logical clocks
- Increment local clock on local events
- Update vector clock on message send/receive
- Support arbitrary number of processes
-
Comparison Operations
- Implement
HappenedBefore(other): Returns true if this event causally precedes other - Implement
HappenedAfter(other): Returns true if this event causally follows other - Implement
ConcurrentWith(other): Returns true if events are concurrent - Implement
Equal(other): Returns true if vector clocks are identical
- Implement
-
Clock Operations
Increment(processID): Increment local process counterMerge(other): Take component-wise maximum for received messagesCopy(): Create independent copy of vector clockString(): Human-readable representation
-
Versioned Data Structures
- Implement versioned key-value store using vector clocks
- Track all versions of each key
- Detect and store concurrent conflicting updates
- Provide conflict resolution API
-
Event Tracking
- Log events with vector clock timestamps
- Query events by causality relationships
- Visualize causality graphs
- Detect anomalies
Non-Functional Requirements
- Correctness: Must accurately represent happened-before relationships
- Efficiency: O(n) space per clock where n is number of processes
- Scalability: Support 10-1000 processes efficiently
- Thread-Safety: Safe for concurrent access
- Serialization: Support JSON/binary encoding for network transmission
Background and Theory
Vector clocks were invented by Colin Fidge and Friedemann Mattern in 1988 to extend Lamport's logical clocks with the ability to detect concurrency.
The Happened-Before Relation
In distributed systems, we define the "happened-before" relation as:
- Local events: If a and b are events in the same process, and a occurs before b, then a → b
- Message passing: If a is a send event and b is the corresponding receive event, then a → b
- Transitivity: If a → b and b → c, then a → c
Events that are not related by happened-before are concurrent.
How Vector Clocks Work
Each process P maintains a vector clock VC[P] where:
- VC[P][P] is P's logical clock
- VC[P][Q] is P's knowledge of Q's logical clock
Rules:
- Local event: Process P increments VC[P][P]
- Send message: Process P increments VC[P][P] and sends VC[P] with message
- Receive message: Process Q merges: VC[Q][i] = max(VC[Q][i], VC_msg[i]) for all i, then increments VC[Q][Q]
Comparison:
- VC1 < VC2 if ∀i: VC1[i] ≤ VC2[i] and ∃j: VC1[j] < VC2[j]
- VC1 || VC2 if neither VC1 < VC2 nor VC2 < VC1
Example
Process A: [1,0,0] → [2,0,0] → [3,0,0] ←msg─ [3,2,0]
↓msg
Process B: [0,1,0] → [0,2,0] ← [2,2,0]
↓msg
Process C: [0,0,1] ← [0,2,1]
Event at A[2,0,0] happened before event at C[0,2,1] because:
- When C received message from B
- B had already seen event A[2,0,0]
- Transitivity proves the causal relationship
Use Cases
- Amazon Dynamo: Uses vector clocks for multi-master replication
- Riak: Distributed key-value store with vector clock versioning
- Cassandra: Conflict detection in eventual consistency model
- Git: Version control with causal history
- CRDTs: Conflict-free replicated data types use vector clocks
- Distributed Debugging: Reconstruct causality of events
- Consistent Snapshots: Determine global consistent states
Implementation Challenges
Challenge 1: Growing Vector Size
As the number of processes increases, vector clocks grow linearly in size. For systems with thousands of nodes, this becomes expensive.
Solution Approach:
- Use sparse representation instead of dense array
- Implement clock pruning: remove entries for dead processes
- Consider bounded vector clocks for approximation
- Use process IDs efficiently
Challenge 2: Clock Merging Semantics
When receiving a message, the merge operation must be atomic and correct to maintain causality invariants.
Solution Approach:
- Take component-wise maximum: VC[i] = max(local[i], received[i])
- Always increment local counter after merge
- Use locks or atomic operations for thread safety
- Validate merged clocks maintain monotonicity
Challenge 3: Conflict Resolution
When detecting concurrent updates, the system must have a strategy to resolve them.
Solution Approach:
- Store all concurrent versions
- Let application provide resolution function
- Use last-write-wins with process ID tiebreaker
- Implement semantic conflict resolution
- Allow manual conflict resolution
Challenge 4: Serialization Efficiency
Vector clocks must be sent over network frequently, so encoding size matters.
Solution Approach:
- Use binary encoding instead of JSON
- Delta compression: send only changed entries
- Run-length encoding for sparse vectors
- Compress with standard algorithms
Hints
Hint 1: Data Structure Design
Use a map for sparse representation, which is efficient when number of active processes is small relative to total process space:
1type VectorClock struct {
2 clock map[string]int // processID -> counter
3 mu sync.RWMutex // Thread safety
4}
5
6func NewVectorClock() *VectorClock {
7 return &VectorClock{
8 clock: make(map[string]int),
9 }
10}
This allows O(k) space where k is number of active processes, rather than O(n) for total process space.
Hint 2: Comparison Logic
To determine if VC1 happened before VC2:
1func HappenedBefore(other *VectorClock) bool {
2 lessOrEqual := true
3 strictlyLess := false
4
5 // Check all entries in both clocks
6 allKeys := getAllKeys(vc, other)
7
8 for _, key := range allKeys {
9 v1 := vc.clock[key] // 0 if not present
10 v2 := other.clock[key]
11
12 if v1 > v2 {
13 lessOrEqual = false
14 break
15 }
16 if v1 < v2 {
17 strictlyLess = true
18 }
19 }
20
21 return lessOrEqual && strictlyLess
22}
The key insight: all components must be ≤, and at least one must be strictly <.
Hint 3: Message Passing Protocol
Implement three operations for distributed communication:
1// Before sending message
2func PrepareSend(processID string) *VectorClock {
3 vc.mu.Lock()
4 defer vc.mu.Unlock()
5
6 vc.clock[processID]++
7 return vc.Copy() // Send copy with message
8}
9
10// Upon receiving message
11func ReceiveMsg(processID string, msgClock *VectorClock) {
12 vc.mu.Lock()
13 defer vc.mu.Unlock()
14
15 // Merge: take component-wise max
16 for pid, count := range msgClock.clock {
17 if count > vc.clock[pid] {
18 vc.clock[pid] = count
19 }
20 }
21
22 // Increment local clock
23 vc.clock[processID]++
24}
25
26// On local event
27func LocalEvent(processID string) {
28 vc.mu.Lock()
29 defer vc.mu.Unlock()
30
31 vc.clock[processID]++
32}
Hint 4: Versioned Storage
Store multiple versions with their vector clocks to detect conflicts:
1type Version struct {
2 value interface{}
3 clock *VectorClock
4}
5
6type VersionedStore struct {
7 data map[string][]*Version // key -> list of versions
8 mu sync.RWMutex
9}
10
11func Put(key string, value interface{}, clock *VectorClock) {
12 vs.mu.Lock()
13 defer vs.mu.Unlock()
14
15 // Remove versions that are superseded
16 versions := vs.data[key]
17 newVersions := []*Version{}
18
19 for _, v := range versions {
20 // Keep if concurrent or happened after
21 if clock.ConcurrentWith(v.clock) || v.clock.HappenedAfter(clock) {
22 newVersions = append(newVersions, v)
23 }
24 }
25
26 // Add new version
27 newVersions = append(newVersions, &Version{value, clock.Copy()})
28 vs.data[key] = newVersions
29}
Hint 5: Efficient Serialization
Implement custom binary encoding for network efficiency:
1func MarshalBinary() {
2 vc.mu.RLock()
3 defer vc.mu.RUnlock()
4
5 buf := new(bytes.Buffer)
6
7 // Write number of entries
8 binary.Write(buf, binary.LittleEndian, uint32(len(vc.clock)))
9
10 // Write each entry
11 for pid, count := range vc.clock {
12 // Write process ID length and value
13 binary.Write(buf, binary.LittleEndian, uint16(len(pid)))
14 buf.WriteString(pid)
15
16 // Write counter value
17 binary.Write(buf, binary.LittleEndian, uint64(count))
18 }
19
20 return buf.Bytes(), nil
21}
This is much more compact than JSON for large vectors.
Solution
Show Complete Solution
Approach
This implementation provides a complete vector clock system with:
- Core VectorClock: Thread-safe vector clock with all comparison operations
- VersionedStore: Key-value store that tracks all versions and detects conflicts
- Process Simulation: Framework for simulating distributed processes with message passing
- Event Log: Tracking and querying causality relationships between events
- Conflict Resolution: Strategies for handling concurrent updates
The implementation emphasizes correctness and clarity while maintaining good performance.
Implementation
1package vectorclock
2
3import (
4 "bytes"
5 "encoding/binary"
6 "encoding/json"
7 "fmt"
8 "sort"
9 "strings"
10 "sync"
11)
12
13// VectorClock represents a logical clock for a distributed system
14type VectorClock struct {
15 clock map[string]int
16 mu sync.RWMutex
17}
18
19// NewVectorClock creates a new vector clock
20func NewVectorClock() *VectorClock {
21 return &VectorClock{
22 clock: make(map[string]int),
23 }
24}
25
26// NewVectorClockWithCapacity creates a vector clock with initial capacity
27func NewVectorClockWithCapacity(capacity int) *VectorClock {
28 return &VectorClock{
29 clock: make(map[string]int, capacity),
30 }
31}
32
33// Increment increments the clock for the specified process
34func Increment(processID string) {
35 vc.mu.Lock()
36 defer vc.mu.Unlock()
37 vc.clock[processID]++
38}
39
40// Get returns the clock value for a process
41func Get(processID string) int {
42 vc.mu.RLock()
43 defer vc.mu.RUnlock()
44 return vc.clock[processID]
45}
46
47// Set sets the clock value for a process
48func Set(processID string, value int) {
49 vc.mu.Lock()
50 defer vc.mu.Unlock()
51 vc.clock[processID] = value
52}
53
54// Merge merges another vector clock
55func Merge(other *VectorClock) {
56 vc.mu.Lock()
57 defer vc.mu.Unlock()
58
59 other.mu.RLock()
60 defer other.mu.RUnlock()
61
62 for pid, count := range other.clock {
63 if count > vc.clock[pid] {
64 vc.clock[pid] = count
65 }
66 }
67}
68
69// Copy creates a deep copy of the vector clock
70func Copy() *VectorClock {
71 vc.mu.RLock()
72 defer vc.mu.RUnlock()
73
74 newVC := NewVectorClockWithCapacity(len(vc.clock))
75 for pid, count := range vc.clock {
76 newVC.clock[pid] = count
77 }
78 return newVC
79}
80
81// HappenedBefore returns true if this clock happened before other
82func HappenedBefore(other *VectorClock) bool {
83 vc.mu.RLock()
84 defer vc.mu.RUnlock()
85
86 other.mu.RLock()
87 defer other.mu.RUnlock()
88
89 // Get all process IDs from both clocks
90 allKeys := vc.getAllKeys(other)
91
92 lessOrEqual := true
93 strictlyLess := false
94
95 for _, key := range allKeys {
96 v1 := vc.clock[key]
97 v2 := other.clock[key]
98
99 if v1 > v2 {
100 // If any component is greater, not happened-before
101 return false
102 }
103 if v1 < v2 {
104 // At least one component must be strictly less
105 strictlyLess = true
106 }
107 }
108
109 // Must be less-or-equal in all components AND strictly less in at least one
110 return lessOrEqual && strictlyLess
111}
112
113// HappenedAfter returns true if this clock happened after other
114func HappenedAfter(other *VectorClock) bool {
115 return other.HappenedBefore(vc)
116}
117
118// ConcurrentWith returns true if clocks are concurrent
119func ConcurrentWith(other *VectorClock) bool {
120 return !vc.HappenedBefore(other) && !vc.HappenedAfter(other) && !vc.Equal(other)
121}
122
123// Equal returns true if vector clocks are identical
124func Equal(other *VectorClock) bool {
125 vc.mu.RLock()
126 defer vc.mu.RUnlock()
127
128 other.mu.RLock()
129 defer other.mu.RUnlock()
130
131 // Get all process IDs from both clocks
132 allKeys := vc.getAllKeys(other)
133
134 for _, key := range allKeys {
135 if vc.clock[key] != other.clock[key] {
136 return false
137 }
138 }
139
140 return true
141}
142
143// getAllKeys returns all unique keys from both clocks
144func getAllKeys(other *VectorClock) []string {
145 keyMap := make(map[string]bool)
146 for key := range vc.clock {
147 keyMap[key] = true
148 }
149 for key := range other.clock {
150 keyMap[key] = true
151 }
152
153 keys := make([]string, 0, len(keyMap))
154 for key := range keyMap {
155 keys = append(keys, key)
156 }
157 return keys
158}
159
160// String returns a string representation of the vector clock
161func String() string {
162 vc.mu.RLock()
163 defer vc.mu.RUnlock()
164
165 if len(vc.clock) == 0 {
166 return "{}"
167 }
168
169 // Sort keys for consistent output
170 keys := make([]string, 0, len(vc.clock))
171 for k := range vc.clock {
172 keys = append(keys, k)
173 }
174 sort.Strings(keys)
175
176 parts := make([]string, 0, len(keys))
177 for _, k := range keys {
178 parts = append(parts, fmt.Sprintf("%s:%d", k, vc.clock[k]))
179 }
180
181 return "{" + strings.Join(parts, ", ") + "}"
182}
183
184// MarshalJSON implements json.Marshaler
185func MarshalJSON() {
186 vc.mu.RLock()
187 defer vc.mu.RUnlock()
188 return json.Marshal(vc.clock)
189}
190
191// UnmarshalJSON implements json.Unmarshaler
192func UnmarshalJSON(data []byte) error {
193 vc.mu.Lock()
194 defer vc.mu.Unlock()
195 return json.Unmarshal(data, &vc.clock)
196}
197
198// MarshalBinary implements encoding.BinaryMarshaler for efficient serialization
199func MarshalBinary() {
200 vc.mu.RLock()
201 defer vc.mu.RUnlock()
202
203 buf := new(bytes.Buffer)
204
205 // Write number of entries
206 if err := binary.Write(buf, binary.LittleEndian, uint32(len(vc.clock))); err != nil {
207 return nil, err
208 }
209
210 // Write each entry
211 for pid, count := range vc.clock {
212 // Write process ID length and value
213 if err := binary.Write(buf, binary.LittleEndian, uint16(len(pid))); err != nil {
214 return nil, err
215 }
216 if _, err := buf.WriteString(pid); err != nil {
217 return nil, err
218 }
219
220 // Write counter value
221 if err := binary.Write(buf, binary.LittleEndian, uint64(count)); err != nil {
222 return nil, err
223 }
224 }
225
226 return buf.Bytes(), nil
227}
228
229// UnmarshalBinary implements encoding.BinaryUnmarshaler
230func UnmarshalBinary(data []byte) error {
231 vc.mu.Lock()
232 defer vc.mu.Unlock()
233
234 buf := bytes.NewReader(data)
235
236 // Read number of entries
237 var numEntries uint32
238 if err := binary.Read(buf, binary.LittleEndian, &numEntries); err != nil {
239 return err
240 }
241
242 vc.clock = make(map[string]int, numEntries)
243
244 // Read each entry
245 for i := uint32(0); i < numEntries; i++ {
246 // Read process ID length
247 var pidLen uint16
248 if err := binary.Read(buf, binary.LittleEndian, &pidLen); err != nil {
249 return err
250 }
251
252 // Read process ID
253 pidBytes := make([]byte, pidLen)
254 if _, err := buf.Read(pidBytes); err != nil {
255 return err
256 }
257 pid := string(pidBytes)
258
259 // Read counter value
260 var count uint64
261 if err := binary.Read(buf, binary.LittleEndian, &count); err != nil {
262 return err
263 }
264
265 vc.clock[pid] = int(count)
266 }
267
268 return nil
269}
270
271// Size returns the number of entries in the vector clock
272func Size() int {
273 vc.mu.RLock()
274 defer vc.mu.RUnlock()
275 return len(vc.clock)
276}
277
278// Version represents a versioned value with vector clock
279type Version struct {
280 Value interface{}
281 Clock *VectorClock
282}
283
284// VersionedStore is a key-value store with vector clock versioning
285type VersionedStore struct {
286 data map[string][]*Version
287 mu sync.RWMutex
288}
289
290// NewVersionedStore creates a new versioned store
291func NewVersionedStore() *VersionedStore {
292 return &VersionedStore{
293 data: make(map[string][]*Version),
294 }
295}
296
297// Put stores a value with a vector clock
298// Automatically prunes superseded versions
299func Put(key string, value interface{}, clock *VectorClock) {
300 vs.mu.Lock()
301 defer vs.mu.Unlock()
302
303 versions := vs.data[key]
304 newVersions := []*Version{}
305
306 // Keep only concurrent or newer versions
307 for _, v := range versions {
308 // Keep if concurrent or happened after
309 if clock.ConcurrentWith(v.Clock) || v.Clock.HappenedAfter(clock) {
310 newVersions = append(newVersions, v)
311 }
312 // Discard if happened before
313 }
314
315 // Add new version
316 newVersions = append(newVersions, &Version{
317 Value: value,
318 Clock: clock.Copy(),
319 })
320
321 vs.data[key] = newVersions
322}
323
324// Get retrieves all current versions of a key
325func Get(key string) []*Version {
326 vs.mu.RLock()
327 defer vs.mu.RUnlock()
328
329 versions := vs.data[key]
330 if versions == nil {
331 return nil
332 }
333
334 // Return copies to prevent external modification
335 result := make([]*Version, len(versions))
336 for i, v := range versions {
337 result[i] = &Version{
338 Value: v.Value,
339 Clock: v.Clock.Copy(),
340 }
341 }
342 return result
343}
344
345// HasConflict returns true if there are multiple concurrent versions
346func HasConflict(key string) bool {
347 vs.mu.RLock()
348 defer vs.mu.RUnlock()
349 return len(vs.data[key]) > 1
350}
351
352// GetConflicts returns keys with conflicts
353func GetConflicts() []string {
354 vs.mu.RLock()
355 defer vs.mu.RUnlock()
356
357 conflicts := []string{}
358 for key, versions := range vs.data {
359 if len(versions) > 1 {
360 conflicts = append(conflicts, key)
361 }
362 }
363 return conflicts
364}
365
366// Resolve resolves conflicts using a resolution function
367func Resolve(key string, resolver func([]*Version)) error {
368 vs.mu.Lock()
369 defer vs.mu.Unlock()
370
371 versions := vs.data[key]
372 if len(versions) <= 1 {
373 return fmt.Errorf("no conflict for key %s", key)
374 }
375
376 // Call resolver
377 value, clock := resolver(versions)
378
379 // Replace with resolved version
380 vs.data[key] = []*Version{{Value: value, Clock: clock}}
381 return nil
382}
383
384// Event represents an event in the system
385type Event struct {
386 ProcessID string
387 Description string
388 Clock *VectorClock
389 Timestamp int64
390}
391
392// EventLog tracks events with vector clocks
393type EventLog struct {
394 events []*Event
395 mu sync.RWMutex
396}
397
398// NewEventLog creates a new event log
399func NewEventLog() *EventLog {
400 return &EventLog{
401 events: make([]*Event, 0),
402 }
403}
404
405// Log adds an event to the log
406func Log(event *Event) {
407 el.mu.Lock()
408 defer el.mu.Unlock()
409 el.events = append(el.events, event)
410}
411
412// GetEvents returns all events
413func GetEvents() []*Event {
414 el.mu.RLock()
415 defer el.mu.RUnlock()
416
417 result := make([]*Event, len(el.events))
418 copy(result, el.events)
419 return result
420}
421
422// GetCausalHistory returns all events that causally precede the given event
423func GetCausalHistory(event *Event) []*Event {
424 el.mu.RLock()
425 defer el.mu.RUnlock()
426
427 history := []*Event{}
428 for _, e := range el.events {
429 if e.Clock.HappenedBefore(event.Clock) {
430 history = append(history, e)
431 }
432 }
433 return history
434}
435
436// GetConcurrentEvents returns events concurrent with the given event
437func GetConcurrentEvents(event *Event) []*Event {
438 el.mu.RLock()
439 defer el.mu.RUnlock()
440
441 concurrent := []*Event{}
442 for _, e := range el.events {
443 if e != event && e.Clock.ConcurrentWith(event.Clock) {
444 concurrent = append(concurrent, e)
445 }
446 }
447 return concurrent
448}
449
450// Process represents a distributed process with vector clock
451type Process struct {
452 ID string
453 Clock *VectorClock
454 Store *VersionedStore
455 Log *EventLog
456 mu sync.Mutex
457}
458
459// NewProcess creates a new process
460func NewProcess(id string) *Process {
461 return &Process{
462 ID: id,
463 Clock: NewVectorClock(),
464 Store: NewVersionedStore(),
465 Log: NewEventLog(),
466 }
467}
468
469// LocalEvent records a local event
470func LocalEvent(description string) *Event {
471 p.mu.Lock()
472 defer p.mu.Unlock()
473
474 p.Clock.Increment(p.ID)
475
476 event := &Event{
477 ProcessID: p.ID,
478 Description: description,
479 Clock: p.Clock.Copy(),
480 }
481
482 p.Log.Log(event)
483 return event
484}
485
486// PrepareSend prepares a message to send
487func PrepareSend(description string) {
488 p.mu.Lock()
489 defer p.mu.Unlock()
490
491 p.Clock.Increment(p.ID)
492
493 event := &Event{
494 ProcessID: p.ID,
495 Description: description,
496 Clock: p.Clock.Copy(),
497 }
498
499 p.Log.Log(event)
500
501 msg := &Message{
502 From: p.ID,
503 Clock: p.Clock.Copy(),
504 Data: description,
505 }
506
507 return msg, event
508}
509
510// Receive processes a received message
511func Receive(msg *Message, description string) *Event {
512 p.mu.Lock()
513 defer p.mu.Unlock()
514
515 // Merge incoming clock
516 p.Clock.Merge(msg.Clock)
517
518 // Increment local clock
519 p.Clock.Increment(p.ID)
520
521 event := &Event{
522 ProcessID: p.ID,
523 Description: fmt.Sprintf("received from %s: %s", msg.From, description),
524 Clock: p.Clock.Copy(),
525 }
526
527 p.Log.Log(event)
528 return event
529}
530
531// PutValue stores a value in the versioned store
532func PutValue(key string, value interface{}) {
533 p.mu.Lock()
534 defer p.mu.Unlock()
535
536 p.Clock.Increment(p.ID)
537 p.Store.Put(key, value, p.Clock.Copy())
538}
539
540// GetValue retrieves value from store
541func GetValue(key string) []*Version {
542 return p.Store.Get(key)
543}
544
545// Message represents a message between processes
546type Message struct {
547 From string
548 Clock *VectorClock
549 Data string
550}
551
552// ConflictResolver defines interface for conflict resolution
553type ConflictResolver interface {
554 Resolve(versions []*Version)
555}
556
557// LastWriteWinsResolver resolves conflicts by taking the version with highest clock
558type LastWriteWinsResolver struct{}
559
560func Resolve(versions []*Version) {
561 if len(versions) == 0 {
562 return nil, NewVectorClock()
563 }
564
565 // Find version with "latest" clock
566 latest := versions[0]
567 for _, v := range versions[1:] {
568 if v.Clock.HappenedAfter(latest.Clock) {
569 latest = v
570 }
571 }
572
573 return latest.Value, latest.Clock
574}
575
576// MergeAllResolver merges all concurrent versions
577type MergeAllResolver struct {
578 MergeFn func(values []interface{}) interface{}
579}
580
581func Resolve(versions []*Version) {
582 // Extract values
583 values := make([]interface{}, len(versions))
584 for i, v := range versions {
585 values[i] = v.Value
586 }
587
588 // Merge values using custom function
589 merged := r.MergeFn(values)
590
591 // Create merged clock
592 mergedClock := NewVectorClock()
593 for _, v := range versions {
594 mergedClock.Merge(v.Clock)
595 }
596
597 return merged, mergedClock
598}
Testing
1package vectorclock
2
3import (
4 "testing"
5)
6
7func TestVectorClockComparison(t *testing.T) {
8 vc1 := NewVectorClock()
9 vc1.Set("A", 2)
10 vc1.Set("B", 1)
11 vc1.Set("C", 0)
12
13 vc2 := NewVectorClock()
14 vc2.Set("A", 3)
15 vc2.Set("B", 2)
16 vc2.Set("C", 1)
17
18 // vc1 happened before vc2
19 if !vc1.HappenedBefore(vc2) {
20 t.Error("vc1 should happen before vc2")
21 }
22
23 if vc2.HappenedBefore(vc1) {
24 t.Error("vc2 should not happen before vc1")
25 }
26
27 if vc1.ConcurrentWith(vc2) {
28 t.Error("vc1 and vc2 should not be concurrent")
29 }
30}
31
32func TestConcurrentClocks(t *testing.T) {
33 vc1 := NewVectorClock()
34 vc1.Set("A", 2)
35 vc1.Set("B", 1)
36
37 vc2 := NewVectorClock()
38 vc2.Set("A", 1)
39 vc2.Set("B", 2)
40
41 // Concurrent: vc1[A] > vc2[A] but vc1[B] < vc2[B]
42 if !vc1.ConcurrentWith(vc2) {
43 t.Error("vc1 and vc2 should be concurrent")
44 }
45
46 if vc1.HappenedBefore(vc2) || vc1.HappenedAfter(vc2) {
47 t.Error("concurrent clocks should not have happened-before relationship")
48 }
49}
50
51func TestMessagePassing(t *testing.T) {
52 // Create three processes
53 p1 := NewProcess("P1")
54 p2 := NewProcess("P2")
55 p3 := NewProcess("P3")
56
57 // P1: local event
58 p1.LocalEvent("start")
59
60 // P1 -> P2: send message
61 msg12, _ := p1.PrepareSend("hello from P1")
62 p2.Receive(msg12, "message from P1")
63
64 // P2 -> P3: send message
65 msg23, _ := p2.PrepareSend("hello from P2")
66 p3.Receive(msg23, "message from P2")
67
68 // Verify causality: P1's first event happened before P3's receive
69 p1Events := p1.Log.GetEvents()
70 p3Events := p3.Log.GetEvents()
71
72 if !p1Events[0].Clock.HappenedBefore(p3Events[0].Clock) {
73 t.Error("P1's event should causally precede P3's event")
74 }
75}
76
77func TestVersionedStore(t *testing.T) {
78 store := NewVersionedStore()
79
80 // Create two concurrent updates
81 vc1 := NewVectorClock()
82 vc1.Set("A", 1)
83 vc1.Set("B", 0)
84
85 vc2 := NewVectorClock()
86 vc2.Set("A", 0)
87 vc2.Set("B", 1)
88
89 store.Put("key1", "value1", vc1)
90 store.Put("key1", "value2", vc2)
91
92 // Should have conflict
93 if !store.HasConflict("key1") {
94 t.Error("Should detect conflict for concurrent updates")
95 }
96
97 versions := store.Get("key1")
98 if len(versions) != 2 {
99 t.Errorf("Expected 2 versions, got %d", len(versions))
100 }
101
102 // Now add a version that supersedes both
103 vc3 := NewVectorClock()
104 vc3.Set("A", 2)
105 vc3.Set("B", 2)
106
107 store.Put("key1", "value3", vc3)
108
109 // Should no longer have conflict
110 if store.HasConflict("key1") {
111 t.Error("Should not have conflict after superseding update")
112 }
113
114 versions = store.Get("key1")
115 if len(versions) != 1 {
116 t.Errorf("Expected 1 version after supersede, got %d", len(versions))
117 }
118}
119
120func TestConflictResolution(t *testing.T) {
121 store := NewVersionedStore()
122
123 // Create concurrent updates
124 vc1 := NewVectorClock()
125 vc1.Set("A", 1)
126
127 vc2 := NewVectorClock()
128 vc2.Set("B", 1)
129
130 store.Put("key1", 10, vc1)
131 store.Put("key1", 20, vc2)
132
133 // Resolve by summing
134 resolver := &MergeAllResolver{
135 MergeFn: func(values []interface{}) interface{} {
136 sum := 0
137 for _, v := range values {
138 sum += v.(int)
139 }
140 return sum
141 },
142 }
143
144 value, clock := resolver.Resolve(store.Get("key1"))
145
146 if value.(int) != 30 {
147 t.Errorf("Expected merged value 30, got %d", value.(int))
148 }
149
150 // Apply resolution
151 store.Put("key1", value, clock)
152
153 if store.HasConflict("key1") {
154 t.Error("Should not have conflict after resolution")
155 }
156}
157
158func TestSerialization(t *testing.T) {
159 vc := NewVectorClock()
160 vc.Set("P1", 5)
161 vc.Set("P2", 3)
162 vc.Set("P3", 7)
163
164 // Binary serialization
165 data, err := vc.MarshalBinary()
166 if err != nil {
167 t.Fatalf("Failed to marshal: %v", err)
168 }
169
170 vc2 := NewVectorClock()
171 if err := vc2.UnmarshalBinary(data); err != nil {
172 t.Fatalf("Failed to unmarshal: %v", err)
173 }
174
175 if !vc.Equal(vc2) {
176 t.Error("Deserialized clock should equal original")
177 }
178
179 // JSON serialization
180 jsonData, err := vc.MarshalJSON()
181 if err != nil {
182 t.Fatalf("Failed to marshal JSON: %v", err)
183 }
184
185 vc3 := NewVectorClock()
186 if err := vc3.UnmarshalJSON(jsonData); err != nil {
187 t.Fatalf("Failed to unmarshal JSON: %v", err)
188 }
189
190 if !vc.Equal(vc3) {
191 t.Error("JSON deserialized clock should equal original")
192 }
193}
Usage Example
1package main
2
3import (
4 "fmt"
5 "vectorclock"
6)
7
8func main() {
9 // Create distributed processes
10 alice := vectorclock.NewProcess("Alice")
11 bob := vectorclock.NewProcess("Bob")
12 charlie := vectorclock.NewProcess("Charlie")
13
14 fmt.Println("=== Distributed Execution Example ===\n")
15
16 // Alice: local event
17 e1 := alice.LocalEvent("write file v1")
18 fmt.Printf("Alice: %s - Clock: %s\n", e1.Description, e1.Clock)
19
20 // Alice sends message to Bob
21 msg1, e2 := alice.PrepareSend("file update notification")
22 fmt.Printf("Alice: send message to Bob - Clock: %s\n", e2.Clock)
23
24 // Bob receives and processes
25 e3 := bob.Receive(msg1, "notification from Alice")
26 fmt.Printf("Bob: %s - Clock: %s\n", e3.Description, e3.Clock)
27
28 // Bob does local work
29 e4 := bob.LocalEvent("process update")
30 fmt.Printf("Bob: %s - Clock: %s\n", e4.Description, e4.Clock)
31
32 // Bob sends to Charlie
33 msg2, e5 := bob.PrepareSend("processed update")
34 fmt.Printf("Bob: send message to Charlie - Clock: %s\n", e5.Clock)
35
36 // Charlie receives
37 e6 := charlie.Receive(msg2, "update from Bob")
38 fmt.Printf("Charlie: %s - Clock: %s\n", e6.Description, e6.Clock)
39
40 // Check causality
41 fmt.Println("\n=== Causality Analysis ===\n")
42 if e1.Clock.HappenedBefore(e6.Clock) {
43 fmt.Println("Alice's write HAPPENED BEFORE Charlie's receive ✓")
44 }
45
46 fmt.Println("\n=== Versioned Store Example ===\n")
47
48 // Simulate concurrent edits
49 alice.PutValue("document.txt", "Alice's version")
50 bob.PutValue("document.txt", "Bob's version")
51
52 // Check for conflicts
53 if alice.Store.HasConflict("document.txt") {
54 fmt.Println("CONFLICT detected in document.txt!")
55 versions := alice.Store.Get("document.txt")
56 fmt.Printf("Found %d concurrent versions:\n", len(versions))
57 for i, v := range versions {
58 fmt.Printf(" Version %d: %v\n", i+1, v.Value, v.Clock)
59 }
60 }
61
62 // Resolve conflict
63 fmt.Println("\n=== Conflict Resolution ===\n")
64 resolver := &vectorclock.MergeAllResolver{
65 MergeFn: func(values []interface{}) interface{} {
66 // Merge by concatenation
67 merged := ""
68 for _, v := range values {
69 merged += v.(string) + "; "
70 }
71 return merged
72 },
73 }
74
75 value, clock := resolver.Resolve(alice.Store.Get("document.txt"))
76 alice.Store.Put("document.txt", value, clock)
77
78 fmt.Printf("Resolved value: %v\n", value)
79 fmt.Printf("Resolved clock: %s\n", clock)
80
81 // Display event logs
82 fmt.Println("\n=== Event Causality Graph ===\n")
83 for _, event := range alice.Log.GetEvents() {
84 fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
85 }
86 for _, event := range bob.Log.GetEvents() {
87 fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
88 }
89 for _, event := range charlie.Log.GetEvents() {
90 fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
91 }
92}
Testing Your Solution
1. Basic Clock Operations
1// Test increment, copy, equality
2vc := NewVectorClock()
3vc.Increment("P1")
4vc2 := vc.Copy()
5// Verify equal and independent
2. Happened-Before Relation
1// Test that A → B is correctly identified
2// Test that concurrent events are detected
3// Test transitivity: if A → B and B → C, then A → C
3. Message Passing Protocol
1// Simulate processes exchanging messages
2// Verify clocks are properly updated
3// Check causality is preserved
4. Concurrent Updates
1// Create two processes with concurrent operations
2// Verify operations are detected as concurrent
3// Test conflict detection in versioned store
5. Serialization
1// Test binary and JSON serialization
2// Verify round-trip consistency
3// Compare sizes of different encodings
Edge Cases to Test
- Empty vector clocks
- Single process
- Large number of processes
- Clock overflow
- Sparse vs dense clocks
- Clock pruning after process death
Bonus Challenges
-
Dotted Version Vectors: Implement DVVs which are more efficient than standard vector clocks for client-server systems
-
Interval Tree Clocks: Implement ITCs which support dynamic process creation without coordination
-
Causal Broadcast: Implement causal order message delivery using vector clocks
-
Snapshot Algorithm: Use vector clocks to capture consistent global snapshots
-
CRDT Integration: Build Conflict-Free Replicated Data Types using vector clocks
-
Clock Pruning: Implement automatic garbage collection of entries for dead processes
-
Visualization: Build a web UI to visualize causality graphs and concurrent operations
Key Takeaways
- Vector clocks accurately capture causality in distributed systems without physical time
- Happened-before relation can be definitively determined by comparing vector clocks
- Concurrent events are detected when neither clock happened-before the other
- Versioned storage with vector clocks enables detecting and resolving conflicts
- O(n) space per clock where n is number of processes
- Message passing requires incrementing before send and merging on receive
- Real-world systems like Dynamo and Riak use vector clocks for eventual consistency
- Trade-offs exist between accuracy, space overhead, and complexity
- Conflict resolution requires application-specific strategies
- Causality tracking is fundamental to distributed systems, debugging, and consistency
References
- Original Paper: "Virtual Time and Global States of Distributed Systems" by Friedemann Mattern
- Colin Fidge's Paper: "Timestamps in Message-Passing Systems"
- Amazon Dynamo Paper: "Dynamo: Amazon's Highly Available Key-value Store"
- Riak Documentation: Vector clock implementation in production
- "Designing Data-Intensive Applications" by Martin Kleppmann - Chapter 5 on Replication
- Lamport's Paper: "Time, Clocks, and the Ordering of Events" - Foundation for logical clocks
- CRDTs: Conflict-Free Replicated Data Types using version vectors
- Distributed Systems: "Distributed Systems" by Maarten van Steen and Andrew Tanenbaum