Event Sourcing Implementation

Exercise: Event Sourcing Implementation

Difficulty - Advanced

Learning Objectives

  • Understand event sourcing principles
  • Implement event store for persistence
  • Build event replay and projection
  • Practice CQRS pattern
  • Handle event versioning

Problem Statement

Create an event sourcing system for a banking application that tracks account transactions through events.

Implementation

  1package eventsourcing
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"sync"
  7	"time"
  8)
  9
 10// Event represents a domain event
 11type Event struct {
 12	ID            string                 `json:"id"`
 13	AggregateID   string                 `json:"aggregate_id"`
 14	AggregateType string                 `json:"aggregate_type"`
 15	EventType     string                 `json:"event_type"`
 16	Version       int                    `json:"version"`
 17	Data          map[string]interface{} `json:"data"`
 18	Timestamp     time.Time              `json:"timestamp"`
 19}
 20
 21// EventStore persists and retrieves events
 22type EventStore struct {
 23	mu     sync.RWMutex
 24	events map[string][]Event // aggregateID -> events
 25}
 26
 27func NewEventStore() *EventStore {
 28	return &EventStore{
 29		events: make(map[string][]Event),
 30	}
 31}
 32
 33// Append adds an event to the store
 34func Append(event Event) error {
 35	es.mu.Lock()
 36	defer es.mu.Unlock()
 37
 38	// Validate version
 39	existing := es.events[event.AggregateID]
 40	if len(existing) > 0 && existing[len(existing)-1].Version >= event.Version {
 41		return fmt.Errorf("version conflict: expected version > %d", existing[len(existing)-1].Version)
 42	}
 43
 44	es.events[event.AggregateID] = append(es.events[event.AggregateID], event)
 45	return nil
 46}
 47
 48// GetEvents retrieves all events for an aggregate
 49func GetEvents(aggregateID string) {
 50	es.mu.RLock()
 51	defer es.mu.RUnlock()
 52
 53	events, exists := es.events[aggregateID]
 54	if !exists {
 55		return []Event{}, nil
 56	}
 57
 58	// Return a copy
 59	result := make([]Event, len(events))
 60	copy(result, events)
 61	return result, nil
 62}
 63
 64// GetEventsSince retrieves events after a specific version
 65func GetEventsSince(aggregateID string, version int) {
 66	es.mu.RLock()
 67	defer es.mu.RUnlock()
 68
 69	events := es.events[aggregateID]
 70	result := make([]Event, 0)
 71
 72	for _, event := range events {
 73		if event.Version > version {
 74			result = append(result, event)
 75		}
 76	}
 77
 78	return result, nil
 79}
 80
 81// BankAccount is an aggregate root
 82type BankAccount struct {
 83	ID      string
 84	Owner   string
 85	Balance float64
 86	Version int
 87	events  []Event
 88}
 89
 90// NewBankAccount creates a new bank account
 91func NewBankAccount(id, owner string, initialDeposit float64) *BankAccount {
 92	account := &BankAccount{
 93		ID:      id,
 94		Owner:   owner,
 95		Version: 0,
 96		events:  make([]Event, 0),
 97	}
 98
 99	// Apply account created event
100	event := Event{
101		ID:            fmt.Sprintf("evt-%d", time.Now().UnixNano()),
102		AggregateID:   id,
103		AggregateType: "BankAccount",
104		EventType:     "AccountCreated",
105		Version:       1,
106		Data: map[string]interface{}{
107			"owner":           owner,
108			"initial_deposit": initialDeposit,
109		},
110		Timestamp: time.Now(),
111	}
112
113	account.applyAccountCreated(event)
114	account.events = append(account.events, event)
115
116	return account
117}
118
119// Deposit adds money to the account
120func Deposit(amount float64) error {
121	if amount <= 0 {
122		return fmt.Errorf("deposit amount must be positive")
123	}
124
125	event := Event{
126		ID:            fmt.Sprintf("evt-%d", time.Now().UnixNano()),
127		AggregateID:   ba.ID,
128		AggregateType: "BankAccount",
129		EventType:     "MoneyDeposited",
130		Version:       ba.Version + 1,
131		Data: map[string]interface{}{
132			"amount": amount,
133		},
134		Timestamp: time.Now(),
135	}
136
137	ba.applyMoneyDeposited(event)
138	ba.events = append(ba.events, event)
139
140	return nil
141}
142
143// Withdraw removes money from the account
144func Withdraw(amount float64) error {
145	if amount <= 0 {
146		return fmt.Errorf("withdrawal amount must be positive")
147	}
148
149	if ba.Balance < amount {
150		return fmt.Errorf("insufficient funds: balance=%.2f, withdrawal=%.2f", ba.Balance, amount)
151	}
152
153	event := Event{
154		ID:            fmt.Sprintf("evt-%d", time.Now().UnixNano()),
155		AggregateID:   ba.ID,
156		AggregateType: "BankAccount",
157		EventType:     "MoneyWithdrawn",
158		Version:       ba.Version + 1,
159		Data: map[string]interface{}{
160			"amount": amount,
161		},
162		Timestamp: time.Now(),
163	}
164
165	ba.applyMoneyWithdrawn(event)
166	ba.events = append(ba.events, event)
167
168	return nil
169}
170
171// GetUncommittedEvents returns events not yet persisted
172func GetUncommittedEvents() []Event {
173	return ba.events
174}
175
176// ClearUncommittedEvents clears the event buffer
177func ClearUncommittedEvents() {
178	ba.events = make([]Event, 0)
179}
180
181// LoadFromHistory reconstructs state from events
182func LoadFromHistory(events []Event) {
183	for _, event := range events {
184		switch event.EventType {
185		case "AccountCreated":
186			ba.applyAccountCreated(event)
187		case "MoneyDeposited":
188			ba.applyMoneyDeposited(event)
189		case "MoneyWithdrawn":
190			ba.applyMoneyWithdrawn(event)
191		}
192	}
193}
194
195// Event application methods
196func applyAccountCreated(event Event) {
197	ba.ID = event.AggregateID
198	ba.Owner = event.Data["owner"].(string)
199	ba.Balance = event.Data["initial_deposit"].(float64)
200	ba.Version = event.Version
201}
202
203func applyMoneyDeposited(event Event) {
204	amount := event.Data["amount"].(float64)
205	ba.Balance += amount
206	ba.Version = event.Version
207}
208
209func applyMoneyWithdrawn(event Event) {
210	amount := event.Data["amount"].(float64)
211	ba.Balance -= amount
212	ba.Version = event.Version
213}
214
215// Repository handles aggregate persistence
216type Repository struct {
217	eventStore *EventStore
218}
219
220func NewRepository(eventStore *EventStore) *Repository {
221	return &Repository{
222		eventStore: eventStore,
223	}
224}
225
226// Save persists an aggregate's events
227func Save(account *BankAccount) error {
228	events := account.GetUncommittedEvents()
229
230	for _, event := range events {
231		if err := r.eventStore.Append(event); err != nil {
232			return err
233		}
234	}
235
236	account.ClearUncommittedEvents()
237	return nil
238}
239
240// Load reconstructs an aggregate from events
241func Load(id string) {
242	events, err := r.eventStore.GetEvents(id)
243	if err != nil {
244		return nil, err
245	}
246
247	if len(events) == 0 {
248		return nil, fmt.Errorf("aggregate not found: %s", id)
249	}
250
251	account := &BankAccount{}
252	account.LoadFromHistory(events)
253
254	return account, nil
255}
256
257// Projection builds read models from events
258type AccountProjection struct {
259	mu       sync.RWMutex
260	accounts map[string]AccountView
261}
262
263type AccountView struct {
264	ID              string
265	Owner           string
266	Balance         float64
267	TransactionCount int
268	LastTransaction time.Time
269}
270
271func NewAccountProjection() *AccountProjection {
272	return &AccountProjection{
273		accounts: make(map[string]AccountView),
274	}
275}
276
277// Project processes events to build read model
278func Project(event Event) {
279	ap.mu.Lock()
280	defer ap.mu.Unlock()
281
282	view := ap.accounts[event.AggregateID]
283
284	switch event.EventType {
285	case "AccountCreated":
286		view.ID = event.AggregateID
287		view.Owner = event.Data["owner"].(string)
288		view.Balance = event.Data["initial_deposit"].(float64)
289		view.TransactionCount = 1
290		view.LastTransaction = event.Timestamp
291
292	case "MoneyDeposited":
293		amount := event.Data["amount"].(float64)
294		view.Balance += amount
295		view.TransactionCount++
296		view.LastTransaction = event.Timestamp
297
298	case "MoneyWithdrawn":
299		amount := event.Data["amount"].(float64)
300		view.Balance -= amount
301		view.TransactionCount++
302		view.LastTransaction = event.Timestamp
303	}
304
305	ap.accounts[event.AggregateID] = view
306}
307
308// GetAccount retrieves an account view
309func GetAccount(id string) {
310	ap.mu.RLock()
311	defer ap.mu.RUnlock()
312
313	view, exists := ap.accounts[id]
314	if !exists {
315		return AccountView{}, fmt.Errorf("account not found")
316	}
317
318	return view, nil
319}

Key Takeaways

  1. Immutable Events: Events are never modified, only appended
  2. Event Replay: Reconstruct state by replaying events
  3. CQRS: Separate write from read
  4. Version Control: Track event versions for optimistic concurrency
  5. Audit Trail: Complete history of all state changes