Vector Clock for Distributed Causality

Exercise: Vector Clock for Distributed Causality

Difficulty - Advanced

Estimated Time - 3-5 hours

Learning Objectives

  • Understand the problem of time and causality in distributed systems
  • Implement vector clocks to track happened-before relationships
  • Detect concurrent events and conflicts in distributed systems
  • Apply vector clocks to build eventually consistent systems
  • Master version control and conflict resolution in replicated data
  • Design distributed systems with causal consistency

Problem Statement

In distributed systems, physical clocks are unreliable for determining the order of events. Network delays, clock skew, and relativistic effects make it impossible to rely on wall-clock time. Yet, we often need to know: Did event A happen before event B? Are these events concurrent and potentially conflicting?

Vector clocks provide a logical clock mechanism that captures causality relationships between events across distributed processes. Each process maintains a vector of counters, and by comparing vector timestamps, we can definitively determine if events are causally related or concurrent.

This is crucial for distributed databases, version control systems, distributed caching, and any system that needs to maintain causal consistency or detect conflicts.

Real-World Scenario:

Consider a distributed note-taking application like Google Docs or Notion. Multiple users can edit the same document simultaneously from different locations. Without proper causality tracking:

 1// Without vector clocks: Cannot determine event ordering
 2type NaiveEdit struct {
 3    text      string
 4    timestamp time.Time // Wall-clock time is unreliable!
 5}
 6
 7func HappenedBefore(e2 *NaiveEdit) bool {
 8    // WRONG: Clock skew makes this unreliable
 9    return e1.timestamp.Before(e2.timestamp)
10}
11
12// With vector clocks: Precise causality tracking
13type Edit struct {
14    text   string
15    vector VectorClock
16}
17
18func HappenedBefore(e2 *Edit) bool {
19    // CORRECT: Logical clocks are reliable
20    return e1.vector.HappenedBefore(e2.vector)
21}
22
23func ConcurrentWith(e2 *Edit) bool {
24    // Can detect conflicts!
25    return e1.vector.ConcurrentWith(e2.vector)
26}

Requirements

Functional Requirements

  1. Vector Clock Implementation

    • Maintain vector of logical clocks
    • Increment local clock on local events
    • Update vector clock on message send/receive
    • Support arbitrary number of processes
  2. Comparison Operations

    • Implement HappenedBefore(other): Returns true if this event causally precedes other
    • Implement HappenedAfter(other): Returns true if this event causally follows other
    • Implement ConcurrentWith(other): Returns true if events are concurrent
    • Implement Equal(other): Returns true if vector clocks are identical
  3. Clock Operations

    • Increment(processID): Increment local process counter
    • Merge(other): Take component-wise maximum for received messages
    • Copy(): Create independent copy of vector clock
    • String(): Human-readable representation
  4. Versioned Data Structures

    • Implement versioned key-value store using vector clocks
    • Track all versions of each key
    • Detect and store concurrent conflicting updates
    • Provide conflict resolution API
  5. Event Tracking

    • Log events with vector clock timestamps
    • Query events by causality relationships
    • Visualize causality graphs
    • Detect anomalies

Non-Functional Requirements

  • Correctness: Must accurately represent happened-before relationships
  • Efficiency: O(n) space per clock where n is number of processes
  • Scalability: Support 10-1000 processes efficiently
  • Thread-Safety: Safe for concurrent access
  • Serialization: Support JSON/binary encoding for network transmission

Background and Theory

Vector clocks were invented by Colin Fidge and Friedemann Mattern in 1988 to extend Lamport's logical clocks with the ability to detect concurrency.

The Happened-Before Relation

In distributed systems, we define the "happened-before" relation as:

  1. Local events: If a and b are events in the same process, and a occurs before b, then a → b
  2. Message passing: If a is a send event and b is the corresponding receive event, then a → b
  3. Transitivity: If a → b and b → c, then a → c

Events that are not related by happened-before are concurrent.

How Vector Clocks Work

Each process P maintains a vector clock VC[P] where:

  • VC[P][P] is P's logical clock
  • VC[P][Q] is P's knowledge of Q's logical clock

Rules:

  1. Local event: Process P increments VC[P][P]
  2. Send message: Process P increments VC[P][P] and sends VC[P] with message
  3. Receive message: Process Q merges: VC[Q][i] = max(VC[Q][i], VC_msg[i]) for all i, then increments VC[Q][Q]

Comparison:

  • VC1 < VC2 if ∀i: VC1[i] ≤ VC2[i] and ∃j: VC1[j] < VC2[j]
  • VC1 || VC2 if neither VC1 < VC2 nor VC2 < VC1

Example

Process A:  [1,0,0] → [2,0,0] → [3,0,0] ←msg─ [3,2,0]
                                ↓msg
Process B:  [0,1,0] → [0,2,0] ← [2,2,0]
                         ↓msg
Process C:  [0,0,1] ← [0,2,1]

Event at A[2,0,0] happened before event at C[0,2,1] because:

  • When C received message from B
  • B had already seen event A[2,0,0]
  • Transitivity proves the causal relationship

Use Cases

  1. Amazon Dynamo: Uses vector clocks for multi-master replication
  2. Riak: Distributed key-value store with vector clock versioning
  3. Cassandra: Conflict detection in eventual consistency model
  4. Git: Version control with causal history
  5. CRDTs: Conflict-free replicated data types use vector clocks
  6. Distributed Debugging: Reconstruct causality of events
  7. Consistent Snapshots: Determine global consistent states

Implementation Challenges

Challenge 1: Growing Vector Size

As the number of processes increases, vector clocks grow linearly in size. For systems with thousands of nodes, this becomes expensive.

Solution Approach:

  • Use sparse representation instead of dense array
  • Implement clock pruning: remove entries for dead processes
  • Consider bounded vector clocks for approximation
  • Use process IDs efficiently

Challenge 2: Clock Merging Semantics

When receiving a message, the merge operation must be atomic and correct to maintain causality invariants.

Solution Approach:

  • Take component-wise maximum: VC[i] = max(local[i], received[i])
  • Always increment local counter after merge
  • Use locks or atomic operations for thread safety
  • Validate merged clocks maintain monotonicity

Challenge 3: Conflict Resolution

When detecting concurrent updates, the system must have a strategy to resolve them.

Solution Approach:

  • Store all concurrent versions
  • Let application provide resolution function
  • Use last-write-wins with process ID tiebreaker
  • Implement semantic conflict resolution
  • Allow manual conflict resolution

Challenge 4: Serialization Efficiency

Vector clocks must be sent over network frequently, so encoding size matters.

Solution Approach:

  • Use binary encoding instead of JSON
  • Delta compression: send only changed entries
  • Run-length encoding for sparse vectors
  • Compress with standard algorithms

Hints

Hint 1: Data Structure Design

Use a map for sparse representation, which is efficient when number of active processes is small relative to total process space:

 1type VectorClock struct {
 2    clock map[string]int  // processID -> counter
 3    mu    sync.RWMutex     // Thread safety
 4}
 5
 6func NewVectorClock() *VectorClock {
 7    return &VectorClock{
 8        clock: make(map[string]int),
 9    }
10}

This allows O(k) space where k is number of active processes, rather than O(n) for total process space.

Hint 2: Comparison Logic

To determine if VC1 happened before VC2:

 1func HappenedBefore(other *VectorClock) bool {
 2    lessOrEqual := true
 3    strictlyLess := false
 4
 5    // Check all entries in both clocks
 6    allKeys := getAllKeys(vc, other)
 7
 8    for _, key := range allKeys {
 9        v1 := vc.clock[key]  // 0 if not present
10        v2 := other.clock[key]
11
12        if v1 > v2 {
13            lessOrEqual = false
14            break
15        }
16        if v1 < v2 {
17            strictlyLess = true
18        }
19    }
20
21    return lessOrEqual && strictlyLess
22}

The key insight: all components must be ≤, and at least one must be strictly <.

Hint 3: Message Passing Protocol

Implement three operations for distributed communication:

 1// Before sending message
 2func PrepareSend(processID string) *VectorClock {
 3    vc.mu.Lock()
 4    defer vc.mu.Unlock()
 5
 6    vc.clock[processID]++
 7    return vc.Copy()  // Send copy with message
 8}
 9
10// Upon receiving message
11func ReceiveMsg(processID string, msgClock *VectorClock) {
12    vc.mu.Lock()
13    defer vc.mu.Unlock()
14
15    // Merge: take component-wise max
16    for pid, count := range msgClock.clock {
17        if count > vc.clock[pid] {
18            vc.clock[pid] = count
19        }
20    }
21
22    // Increment local clock
23    vc.clock[processID]++
24}
25
26// On local event
27func LocalEvent(processID string) {
28    vc.mu.Lock()
29    defer vc.mu.Unlock()
30
31    vc.clock[processID]++
32}
Hint 4: Versioned Storage

Store multiple versions with their vector clocks to detect conflicts:

 1type Version struct {
 2    value interface{}
 3    clock *VectorClock
 4}
 5
 6type VersionedStore struct {
 7    data map[string][]*Version  // key -> list of versions
 8    mu   sync.RWMutex
 9}
10
11func Put(key string, value interface{}, clock *VectorClock) {
12    vs.mu.Lock()
13    defer vs.mu.Unlock()
14
15    // Remove versions that are superseded
16    versions := vs.data[key]
17    newVersions := []*Version{}
18
19    for _, v := range versions {
20        // Keep if concurrent or happened after
21        if clock.ConcurrentWith(v.clock) || v.clock.HappenedAfter(clock) {
22            newVersions = append(newVersions, v)
23        }
24    }
25
26    // Add new version
27    newVersions = append(newVersions, &Version{value, clock.Copy()})
28    vs.data[key] = newVersions
29}
Hint 5: Efficient Serialization

Implement custom binary encoding for network efficiency:

 1func MarshalBinary() {
 2    vc.mu.RLock()
 3    defer vc.mu.RUnlock()
 4
 5    buf := new(bytes.Buffer)
 6
 7    // Write number of entries
 8    binary.Write(buf, binary.LittleEndian, uint32(len(vc.clock)))
 9
10    // Write each entry
11    for pid, count := range vc.clock {
12        // Write process ID length and value
13        binary.Write(buf, binary.LittleEndian, uint16(len(pid)))
14        buf.WriteString(pid)
15
16        // Write counter value
17        binary.Write(buf, binary.LittleEndian, uint64(count))
18    }
19
20    return buf.Bytes(), nil
21}

This is much more compact than JSON for large vectors.

Solution

Show Complete Solution

Approach

This implementation provides a complete vector clock system with:

  1. Core VectorClock: Thread-safe vector clock with all comparison operations
  2. VersionedStore: Key-value store that tracks all versions and detects conflicts
  3. Process Simulation: Framework for simulating distributed processes with message passing
  4. Event Log: Tracking and querying causality relationships between events
  5. Conflict Resolution: Strategies for handling concurrent updates

The implementation emphasizes correctness and clarity while maintaining good performance.

Implementation

  1package vectorclock
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9	"strings"
 10	"sync"
 11)
 12
 13// VectorClock represents a logical clock for a distributed system
 14type VectorClock struct {
 15	clock map[string]int
 16	mu    sync.RWMutex
 17}
 18
 19// NewVectorClock creates a new vector clock
 20func NewVectorClock() *VectorClock {
 21	return &VectorClock{
 22		clock: make(map[string]int),
 23	}
 24}
 25
 26// NewVectorClockWithCapacity creates a vector clock with initial capacity
 27func NewVectorClockWithCapacity(capacity int) *VectorClock {
 28	return &VectorClock{
 29		clock: make(map[string]int, capacity),
 30	}
 31}
 32
 33// Increment increments the clock for the specified process
 34func Increment(processID string) {
 35	vc.mu.Lock()
 36	defer vc.mu.Unlock()
 37	vc.clock[processID]++
 38}
 39
 40// Get returns the clock value for a process
 41func Get(processID string) int {
 42	vc.mu.RLock()
 43	defer vc.mu.RUnlock()
 44	return vc.clock[processID]
 45}
 46
 47// Set sets the clock value for a process
 48func Set(processID string, value int) {
 49	vc.mu.Lock()
 50	defer vc.mu.Unlock()
 51	vc.clock[processID] = value
 52}
 53
 54// Merge merges another vector clock
 55func Merge(other *VectorClock) {
 56	vc.mu.Lock()
 57	defer vc.mu.Unlock()
 58
 59	other.mu.RLock()
 60	defer other.mu.RUnlock()
 61
 62	for pid, count := range other.clock {
 63		if count > vc.clock[pid] {
 64			vc.clock[pid] = count
 65		}
 66	}
 67}
 68
 69// Copy creates a deep copy of the vector clock
 70func Copy() *VectorClock {
 71	vc.mu.RLock()
 72	defer vc.mu.RUnlock()
 73
 74	newVC := NewVectorClockWithCapacity(len(vc.clock))
 75	for pid, count := range vc.clock {
 76		newVC.clock[pid] = count
 77	}
 78	return newVC
 79}
 80
 81// HappenedBefore returns true if this clock happened before other
 82func HappenedBefore(other *VectorClock) bool {
 83	vc.mu.RLock()
 84	defer vc.mu.RUnlock()
 85
 86	other.mu.RLock()
 87	defer other.mu.RUnlock()
 88
 89	// Get all process IDs from both clocks
 90	allKeys := vc.getAllKeys(other)
 91
 92	lessOrEqual := true
 93	strictlyLess := false
 94
 95	for _, key := range allKeys {
 96		v1 := vc.clock[key]
 97		v2 := other.clock[key]
 98
 99		if v1 > v2 {
100			// If any component is greater, not happened-before
101			return false
102		}
103		if v1 < v2 {
104			// At least one component must be strictly less
105			strictlyLess = true
106		}
107	}
108
109	// Must be less-or-equal in all components AND strictly less in at least one
110	return lessOrEqual && strictlyLess
111}
112
113// HappenedAfter returns true if this clock happened after other
114func HappenedAfter(other *VectorClock) bool {
115	return other.HappenedBefore(vc)
116}
117
118// ConcurrentWith returns true if clocks are concurrent
119func ConcurrentWith(other *VectorClock) bool {
120	return !vc.HappenedBefore(other) && !vc.HappenedAfter(other) && !vc.Equal(other)
121}
122
123// Equal returns true if vector clocks are identical
124func Equal(other *VectorClock) bool {
125	vc.mu.RLock()
126	defer vc.mu.RUnlock()
127
128	other.mu.RLock()
129	defer other.mu.RUnlock()
130
131	// Get all process IDs from both clocks
132	allKeys := vc.getAllKeys(other)
133
134	for _, key := range allKeys {
135		if vc.clock[key] != other.clock[key] {
136			return false
137		}
138	}
139
140	return true
141}
142
143// getAllKeys returns all unique keys from both clocks
144func getAllKeys(other *VectorClock) []string {
145	keyMap := make(map[string]bool)
146	for key := range vc.clock {
147		keyMap[key] = true
148	}
149	for key := range other.clock {
150		keyMap[key] = true
151	}
152
153	keys := make([]string, 0, len(keyMap))
154	for key := range keyMap {
155		keys = append(keys, key)
156	}
157	return keys
158}
159
160// String returns a string representation of the vector clock
161func String() string {
162	vc.mu.RLock()
163	defer vc.mu.RUnlock()
164
165	if len(vc.clock) == 0 {
166		return "{}"
167	}
168
169	// Sort keys for consistent output
170	keys := make([]string, 0, len(vc.clock))
171	for k := range vc.clock {
172		keys = append(keys, k)
173	}
174	sort.Strings(keys)
175
176	parts := make([]string, 0, len(keys))
177	for _, k := range keys {
178		parts = append(parts, fmt.Sprintf("%s:%d", k, vc.clock[k]))
179	}
180
181	return "{" + strings.Join(parts, ", ") + "}"
182}
183
184// MarshalJSON implements json.Marshaler
185func MarshalJSON() {
186	vc.mu.RLock()
187	defer vc.mu.RUnlock()
188	return json.Marshal(vc.clock)
189}
190
191// UnmarshalJSON implements json.Unmarshaler
192func UnmarshalJSON(data []byte) error {
193	vc.mu.Lock()
194	defer vc.mu.Unlock()
195	return json.Unmarshal(data, &vc.clock)
196}
197
198// MarshalBinary implements encoding.BinaryMarshaler for efficient serialization
199func MarshalBinary() {
200	vc.mu.RLock()
201	defer vc.mu.RUnlock()
202
203	buf := new(bytes.Buffer)
204
205	// Write number of entries
206	if err := binary.Write(buf, binary.LittleEndian, uint32(len(vc.clock))); err != nil {
207		return nil, err
208	}
209
210	// Write each entry
211	for pid, count := range vc.clock {
212		// Write process ID length and value
213		if err := binary.Write(buf, binary.LittleEndian, uint16(len(pid))); err != nil {
214			return nil, err
215		}
216		if _, err := buf.WriteString(pid); err != nil {
217			return nil, err
218		}
219
220		// Write counter value
221		if err := binary.Write(buf, binary.LittleEndian, uint64(count)); err != nil {
222			return nil, err
223		}
224	}
225
226	return buf.Bytes(), nil
227}
228
229// UnmarshalBinary implements encoding.BinaryUnmarshaler
230func UnmarshalBinary(data []byte) error {
231	vc.mu.Lock()
232	defer vc.mu.Unlock()
233
234	buf := bytes.NewReader(data)
235
236	// Read number of entries
237	var numEntries uint32
238	if err := binary.Read(buf, binary.LittleEndian, &numEntries); err != nil {
239		return err
240	}
241
242	vc.clock = make(map[string]int, numEntries)
243
244	// Read each entry
245	for i := uint32(0); i < numEntries; i++ {
246		// Read process ID length
247		var pidLen uint16
248		if err := binary.Read(buf, binary.LittleEndian, &pidLen); err != nil {
249			return err
250		}
251
252		// Read process ID
253		pidBytes := make([]byte, pidLen)
254		if _, err := buf.Read(pidBytes); err != nil {
255			return err
256		}
257		pid := string(pidBytes)
258
259		// Read counter value
260		var count uint64
261		if err := binary.Read(buf, binary.LittleEndian, &count); err != nil {
262			return err
263		}
264
265		vc.clock[pid] = int(count)
266	}
267
268	return nil
269}
270
271// Size returns the number of entries in the vector clock
272func Size() int {
273	vc.mu.RLock()
274	defer vc.mu.RUnlock()
275	return len(vc.clock)
276}
277
278// Version represents a versioned value with vector clock
279type Version struct {
280	Value interface{}
281	Clock *VectorClock
282}
283
284// VersionedStore is a key-value store with vector clock versioning
285type VersionedStore struct {
286	data map[string][]*Version
287	mu   sync.RWMutex
288}
289
290// NewVersionedStore creates a new versioned store
291func NewVersionedStore() *VersionedStore {
292	return &VersionedStore{
293		data: make(map[string][]*Version),
294	}
295}
296
297// Put stores a value with a vector clock
298// Automatically prunes superseded versions
299func Put(key string, value interface{}, clock *VectorClock) {
300	vs.mu.Lock()
301	defer vs.mu.Unlock()
302
303	versions := vs.data[key]
304	newVersions := []*Version{}
305
306	// Keep only concurrent or newer versions
307	for _, v := range versions {
308		// Keep if concurrent or happened after
309		if clock.ConcurrentWith(v.Clock) || v.Clock.HappenedAfter(clock) {
310			newVersions = append(newVersions, v)
311		}
312		// Discard if happened before
313	}
314
315	// Add new version
316	newVersions = append(newVersions, &Version{
317		Value: value,
318		Clock: clock.Copy(),
319	})
320
321	vs.data[key] = newVersions
322}
323
324// Get retrieves all current versions of a key
325func Get(key string) []*Version {
326	vs.mu.RLock()
327	defer vs.mu.RUnlock()
328
329	versions := vs.data[key]
330	if versions == nil {
331		return nil
332	}
333
334	// Return copies to prevent external modification
335	result := make([]*Version, len(versions))
336	for i, v := range versions {
337		result[i] = &Version{
338			Value: v.Value,
339			Clock: v.Clock.Copy(),
340		}
341	}
342	return result
343}
344
345// HasConflict returns true if there are multiple concurrent versions
346func HasConflict(key string) bool {
347	vs.mu.RLock()
348	defer vs.mu.RUnlock()
349	return len(vs.data[key]) > 1
350}
351
352// GetConflicts returns keys with conflicts
353func GetConflicts() []string {
354	vs.mu.RLock()
355	defer vs.mu.RUnlock()
356
357	conflicts := []string{}
358	for key, versions := range vs.data {
359		if len(versions) > 1 {
360			conflicts = append(conflicts, key)
361		}
362	}
363	return conflicts
364}
365
366// Resolve resolves conflicts using a resolution function
367func Resolve(key string, resolver func([]*Version)) error {
368	vs.mu.Lock()
369	defer vs.mu.Unlock()
370
371	versions := vs.data[key]
372	if len(versions) <= 1 {
373		return fmt.Errorf("no conflict for key %s", key)
374	}
375
376	// Call resolver
377	value, clock := resolver(versions)
378
379	// Replace with resolved version
380	vs.data[key] = []*Version{{Value: value, Clock: clock}}
381	return nil
382}
383
384// Event represents an event in the system
385type Event struct {
386	ProcessID   string
387	Description string
388	Clock       *VectorClock
389	Timestamp   int64
390}
391
392// EventLog tracks events with vector clocks
393type EventLog struct {
394	events []*Event
395	mu     sync.RWMutex
396}
397
398// NewEventLog creates a new event log
399func NewEventLog() *EventLog {
400	return &EventLog{
401		events: make([]*Event, 0),
402	}
403}
404
405// Log adds an event to the log
406func Log(event *Event) {
407	el.mu.Lock()
408	defer el.mu.Unlock()
409	el.events = append(el.events, event)
410}
411
412// GetEvents returns all events
413func GetEvents() []*Event {
414	el.mu.RLock()
415	defer el.mu.RUnlock()
416
417	result := make([]*Event, len(el.events))
418	copy(result, el.events)
419	return result
420}
421
422// GetCausalHistory returns all events that causally precede the given event
423func GetCausalHistory(event *Event) []*Event {
424	el.mu.RLock()
425	defer el.mu.RUnlock()
426
427	history := []*Event{}
428	for _, e := range el.events {
429		if e.Clock.HappenedBefore(event.Clock) {
430			history = append(history, e)
431		}
432	}
433	return history
434}
435
436// GetConcurrentEvents returns events concurrent with the given event
437func GetConcurrentEvents(event *Event) []*Event {
438	el.mu.RLock()
439	defer el.mu.RUnlock()
440
441	concurrent := []*Event{}
442	for _, e := range el.events {
443		if e != event && e.Clock.ConcurrentWith(event.Clock) {
444			concurrent = append(concurrent, e)
445		}
446	}
447	return concurrent
448}
449
450// Process represents a distributed process with vector clock
451type Process struct {
452	ID    string
453	Clock *VectorClock
454	Store *VersionedStore
455	Log   *EventLog
456	mu    sync.Mutex
457}
458
459// NewProcess creates a new process
460func NewProcess(id string) *Process {
461	return &Process{
462		ID:    id,
463		Clock: NewVectorClock(),
464		Store: NewVersionedStore(),
465		Log:   NewEventLog(),
466	}
467}
468
469// LocalEvent records a local event
470func LocalEvent(description string) *Event {
471	p.mu.Lock()
472	defer p.mu.Unlock()
473
474	p.Clock.Increment(p.ID)
475
476	event := &Event{
477		ProcessID:   p.ID,
478		Description: description,
479		Clock:       p.Clock.Copy(),
480	}
481
482	p.Log.Log(event)
483	return event
484}
485
486// PrepareSend prepares a message to send
487func PrepareSend(description string) {
488	p.mu.Lock()
489	defer p.mu.Unlock()
490
491	p.Clock.Increment(p.ID)
492
493	event := &Event{
494		ProcessID:   p.ID,
495		Description: description,
496		Clock:       p.Clock.Copy(),
497	}
498
499	p.Log.Log(event)
500
501	msg := &Message{
502		From:  p.ID,
503		Clock: p.Clock.Copy(),
504		Data:  description,
505	}
506
507	return msg, event
508}
509
510// Receive processes a received message
511func Receive(msg *Message, description string) *Event {
512	p.mu.Lock()
513	defer p.mu.Unlock()
514
515	// Merge incoming clock
516	p.Clock.Merge(msg.Clock)
517
518	// Increment local clock
519	p.Clock.Increment(p.ID)
520
521	event := &Event{
522		ProcessID:   p.ID,
523		Description: fmt.Sprintf("received from %s: %s", msg.From, description),
524		Clock:       p.Clock.Copy(),
525	}
526
527	p.Log.Log(event)
528	return event
529}
530
531// PutValue stores a value in the versioned store
532func PutValue(key string, value interface{}) {
533	p.mu.Lock()
534	defer p.mu.Unlock()
535
536	p.Clock.Increment(p.ID)
537	p.Store.Put(key, value, p.Clock.Copy())
538}
539
540// GetValue retrieves value from store
541func GetValue(key string) []*Version {
542	return p.Store.Get(key)
543}
544
545// Message represents a message between processes
546type Message struct {
547	From  string
548	Clock *VectorClock
549	Data  string
550}
551
552// ConflictResolver defines interface for conflict resolution
553type ConflictResolver interface {
554	Resolve(versions []*Version)
555}
556
557// LastWriteWinsResolver resolves conflicts by taking the version with highest clock
558type LastWriteWinsResolver struct{}
559
560func Resolve(versions []*Version) {
561	if len(versions) == 0 {
562		return nil, NewVectorClock()
563	}
564
565	// Find version with "latest" clock
566	latest := versions[0]
567	for _, v := range versions[1:] {
568		if v.Clock.HappenedAfter(latest.Clock) {
569			latest = v
570		}
571	}
572
573	return latest.Value, latest.Clock
574}
575
576// MergeAllResolver merges all concurrent versions
577type MergeAllResolver struct {
578	MergeFn func(values []interface{}) interface{}
579}
580
581func Resolve(versions []*Version) {
582	// Extract values
583	values := make([]interface{}, len(versions))
584	for i, v := range versions {
585		values[i] = v.Value
586	}
587
588	// Merge values using custom function
589	merged := r.MergeFn(values)
590
591	// Create merged clock
592	mergedClock := NewVectorClock()
593	for _, v := range versions {
594		mergedClock.Merge(v.Clock)
595	}
596
597	return merged, mergedClock
598}

Testing

  1package vectorclock
  2
  3import (
  4	"testing"
  5)
  6
  7func TestVectorClockComparison(t *testing.T) {
  8	vc1 := NewVectorClock()
  9	vc1.Set("A", 2)
 10	vc1.Set("B", 1)
 11	vc1.Set("C", 0)
 12
 13	vc2 := NewVectorClock()
 14	vc2.Set("A", 3)
 15	vc2.Set("B", 2)
 16	vc2.Set("C", 1)
 17
 18	// vc1 happened before vc2
 19	if !vc1.HappenedBefore(vc2) {
 20		t.Error("vc1 should happen before vc2")
 21	}
 22
 23	if vc2.HappenedBefore(vc1) {
 24		t.Error("vc2 should not happen before vc1")
 25	}
 26
 27	if vc1.ConcurrentWith(vc2) {
 28		t.Error("vc1 and vc2 should not be concurrent")
 29	}
 30}
 31
 32func TestConcurrentClocks(t *testing.T) {
 33	vc1 := NewVectorClock()
 34	vc1.Set("A", 2)
 35	vc1.Set("B", 1)
 36
 37	vc2 := NewVectorClock()
 38	vc2.Set("A", 1)
 39	vc2.Set("B", 2)
 40
 41	// Concurrent: vc1[A] > vc2[A] but vc1[B] < vc2[B]
 42	if !vc1.ConcurrentWith(vc2) {
 43		t.Error("vc1 and vc2 should be concurrent")
 44	}
 45
 46	if vc1.HappenedBefore(vc2) || vc1.HappenedAfter(vc2) {
 47		t.Error("concurrent clocks should not have happened-before relationship")
 48	}
 49}
 50
 51func TestMessagePassing(t *testing.T) {
 52	// Create three processes
 53	p1 := NewProcess("P1")
 54	p2 := NewProcess("P2")
 55	p3 := NewProcess("P3")
 56
 57	// P1: local event
 58	p1.LocalEvent("start")
 59
 60	// P1 -> P2: send message
 61	msg12, _ := p1.PrepareSend("hello from P1")
 62	p2.Receive(msg12, "message from P1")
 63
 64	// P2 -> P3: send message
 65	msg23, _ := p2.PrepareSend("hello from P2")
 66	p3.Receive(msg23, "message from P2")
 67
 68	// Verify causality: P1's first event happened before P3's receive
 69	p1Events := p1.Log.GetEvents()
 70	p3Events := p3.Log.GetEvents()
 71
 72	if !p1Events[0].Clock.HappenedBefore(p3Events[0].Clock) {
 73		t.Error("P1's event should causally precede P3's event")
 74	}
 75}
 76
 77func TestVersionedStore(t *testing.T) {
 78	store := NewVersionedStore()
 79
 80	// Create two concurrent updates
 81	vc1 := NewVectorClock()
 82	vc1.Set("A", 1)
 83	vc1.Set("B", 0)
 84
 85	vc2 := NewVectorClock()
 86	vc2.Set("A", 0)
 87	vc2.Set("B", 1)
 88
 89	store.Put("key1", "value1", vc1)
 90	store.Put("key1", "value2", vc2)
 91
 92	// Should have conflict
 93	if !store.HasConflict("key1") {
 94		t.Error("Should detect conflict for concurrent updates")
 95	}
 96
 97	versions := store.Get("key1")
 98	if len(versions) != 2 {
 99		t.Errorf("Expected 2 versions, got %d", len(versions))
100	}
101
102	// Now add a version that supersedes both
103	vc3 := NewVectorClock()
104	vc3.Set("A", 2)
105	vc3.Set("B", 2)
106
107	store.Put("key1", "value3", vc3)
108
109	// Should no longer have conflict
110	if store.HasConflict("key1") {
111		t.Error("Should not have conflict after superseding update")
112	}
113
114	versions = store.Get("key1")
115	if len(versions) != 1 {
116		t.Errorf("Expected 1 version after supersede, got %d", len(versions))
117	}
118}
119
120func TestConflictResolution(t *testing.T) {
121	store := NewVersionedStore()
122
123	// Create concurrent updates
124	vc1 := NewVectorClock()
125	vc1.Set("A", 1)
126
127	vc2 := NewVectorClock()
128	vc2.Set("B", 1)
129
130	store.Put("key1", 10, vc1)
131	store.Put("key1", 20, vc2)
132
133	// Resolve by summing
134	resolver := &MergeAllResolver{
135		MergeFn: func(values []interface{}) interface{} {
136			sum := 0
137			for _, v := range values {
138				sum += v.(int)
139			}
140			return sum
141		},
142	}
143
144	value, clock := resolver.Resolve(store.Get("key1"))
145
146	if value.(int) != 30 {
147		t.Errorf("Expected merged value 30, got %d", value.(int))
148	}
149
150	// Apply resolution
151	store.Put("key1", value, clock)
152
153	if store.HasConflict("key1") {
154		t.Error("Should not have conflict after resolution")
155	}
156}
157
158func TestSerialization(t *testing.T) {
159	vc := NewVectorClock()
160	vc.Set("P1", 5)
161	vc.Set("P2", 3)
162	vc.Set("P3", 7)
163
164	// Binary serialization
165	data, err := vc.MarshalBinary()
166	if err != nil {
167		t.Fatalf("Failed to marshal: %v", err)
168	}
169
170	vc2 := NewVectorClock()
171	if err := vc2.UnmarshalBinary(data); err != nil {
172		t.Fatalf("Failed to unmarshal: %v", err)
173	}
174
175	if !vc.Equal(vc2) {
176		t.Error("Deserialized clock should equal original")
177	}
178
179	// JSON serialization
180	jsonData, err := vc.MarshalJSON()
181	if err != nil {
182		t.Fatalf("Failed to marshal JSON: %v", err)
183	}
184
185	vc3 := NewVectorClock()
186	if err := vc3.UnmarshalJSON(jsonData); err != nil {
187		t.Fatalf("Failed to unmarshal JSON: %v", err)
188	}
189
190	if !vc.Equal(vc3) {
191		t.Error("JSON deserialized clock should equal original")
192	}
193}

Usage Example

 1package main
 2
 3import (
 4	"fmt"
 5	"vectorclock"
 6)
 7
 8func main() {
 9	// Create distributed processes
10	alice := vectorclock.NewProcess("Alice")
11	bob := vectorclock.NewProcess("Bob")
12	charlie := vectorclock.NewProcess("Charlie")
13
14	fmt.Println("=== Distributed Execution Example ===\n")
15
16	// Alice: local event
17	e1 := alice.LocalEvent("write file v1")
18	fmt.Printf("Alice: %s - Clock: %s\n", e1.Description, e1.Clock)
19
20	// Alice sends message to Bob
21	msg1, e2 := alice.PrepareSend("file update notification")
22	fmt.Printf("Alice: send message to Bob - Clock: %s\n", e2.Clock)
23
24	// Bob receives and processes
25	e3 := bob.Receive(msg1, "notification from Alice")
26	fmt.Printf("Bob: %s - Clock: %s\n", e3.Description, e3.Clock)
27
28	// Bob does local work
29	e4 := bob.LocalEvent("process update")
30	fmt.Printf("Bob: %s - Clock: %s\n", e4.Description, e4.Clock)
31
32	// Bob sends to Charlie
33	msg2, e5 := bob.PrepareSend("processed update")
34	fmt.Printf("Bob: send message to Charlie - Clock: %s\n", e5.Clock)
35
36	// Charlie receives
37	e6 := charlie.Receive(msg2, "update from Bob")
38	fmt.Printf("Charlie: %s - Clock: %s\n", e6.Description, e6.Clock)
39
40	// Check causality
41	fmt.Println("\n=== Causality Analysis ===\n")
42	if e1.Clock.HappenedBefore(e6.Clock) {
43		fmt.Println("Alice's write HAPPENED BEFORE Charlie's receive ✓")
44	}
45
46	fmt.Println("\n=== Versioned Store Example ===\n")
47
48	// Simulate concurrent edits
49	alice.PutValue("document.txt", "Alice's version")
50	bob.PutValue("document.txt", "Bob's version")
51
52	// Check for conflicts
53	if alice.Store.HasConflict("document.txt") {
54		fmt.Println("CONFLICT detected in document.txt!")
55		versions := alice.Store.Get("document.txt")
56		fmt.Printf("Found %d concurrent versions:\n", len(versions))
57		for i, v := range versions {
58			fmt.Printf("  Version %d: %v\n", i+1, v.Value, v.Clock)
59		}
60	}
61
62	// Resolve conflict
63	fmt.Println("\n=== Conflict Resolution ===\n")
64	resolver := &vectorclock.MergeAllResolver{
65		MergeFn: func(values []interface{}) interface{} {
66			// Merge by concatenation
67			merged := ""
68			for _, v := range values {
69				merged += v.(string) + "; "
70			}
71			return merged
72		},
73	}
74
75	value, clock := resolver.Resolve(alice.Store.Get("document.txt"))
76	alice.Store.Put("document.txt", value, clock)
77
78	fmt.Printf("Resolved value: %v\n", value)
79	fmt.Printf("Resolved clock: %s\n", clock)
80
81	// Display event logs
82	fmt.Println("\n=== Event Causality Graph ===\n")
83	for _, event := range alice.Log.GetEvents() {
84		fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
85	}
86	for _, event := range bob.Log.GetEvents() {
87		fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
88	}
89	for _, event := range charlie.Log.GetEvents() {
90		fmt.Printf("[%s] %s: %s\n", event.ProcessID, event.Clock, event.Description)
91	}
92}

Testing Your Solution

1. Basic Clock Operations

1// Test increment, copy, equality
2vc := NewVectorClock()
3vc.Increment("P1")
4vc2 := vc.Copy()
5// Verify equal and independent

2. Happened-Before Relation

1// Test that A → B is correctly identified
2// Test that concurrent events are detected
3// Test transitivity: if A → B and B → C, then A → C

3. Message Passing Protocol

1// Simulate processes exchanging messages
2// Verify clocks are properly updated
3// Check causality is preserved

4. Concurrent Updates

1// Create two processes with concurrent operations
2// Verify operations are detected as concurrent
3// Test conflict detection in versioned store

5. Serialization

1// Test binary and JSON serialization
2// Verify round-trip consistency
3// Compare sizes of different encodings

Edge Cases to Test

  • Empty vector clocks
  • Single process
  • Large number of processes
  • Clock overflow
  • Sparse vs dense clocks
  • Clock pruning after process death

Bonus Challenges

  1. Dotted Version Vectors: Implement DVVs which are more efficient than standard vector clocks for client-server systems

  2. Interval Tree Clocks: Implement ITCs which support dynamic process creation without coordination

  3. Causal Broadcast: Implement causal order message delivery using vector clocks

  4. Snapshot Algorithm: Use vector clocks to capture consistent global snapshots

  5. CRDT Integration: Build Conflict-Free Replicated Data Types using vector clocks

  6. Clock Pruning: Implement automatic garbage collection of entries for dead processes

  7. Visualization: Build a web UI to visualize causality graphs and concurrent operations

Key Takeaways

  • Vector clocks accurately capture causality in distributed systems without physical time
  • Happened-before relation can be definitively determined by comparing vector clocks
  • Concurrent events are detected when neither clock happened-before the other
  • Versioned storage with vector clocks enables detecting and resolving conflicts
  • O(n) space per clock where n is number of processes
  • Message passing requires incrementing before send and merging on receive
  • Real-world systems like Dynamo and Riak use vector clocks for eventual consistency
  • Trade-offs exist between accuracy, space overhead, and complexity
  • Conflict resolution requires application-specific strategies
  • Causality tracking is fundamental to distributed systems, debugging, and consistency

References

  • Original Paper: "Virtual Time and Global States of Distributed Systems" by Friedemann Mattern
  • Colin Fidge's Paper: "Timestamps in Message-Passing Systems"
  • Amazon Dynamo Paper: "Dynamo: Amazon's Highly Available Key-value Store"
  • Riak Documentation: Vector clock implementation in production
  • "Designing Data-Intensive Applications" by Martin Kleppmann - Chapter 5 on Replication
  • Lamport's Paper: "Time, Clocks, and the Ordering of Events" - Foundation for logical clocks
  • CRDTs: Conflict-Free Replicated Data Types using version vectors
  • Distributed Systems: "Distributed Systems" by Maarten van Steen and Andrew Tanenbaum