Exercise: Event Sourcing Implementation
Difficulty - Advanced
Learning Objectives
- Understand event sourcing principles
- Implement event store for persistence
- Build event replay and projection
- Practice CQRS pattern
- Handle event versioning
Problem Statement
Create an event sourcing system for a banking application that tracks account transactions through events.
Implementation
1package eventsourcing
2
3import (
4 "encoding/json"
5 "fmt"
6 "sync"
7 "time"
8)
9
10// Event represents a domain event
11type Event struct {
12 ID string `json:"id"`
13 AggregateID string `json:"aggregate_id"`
14 AggregateType string `json:"aggregate_type"`
15 EventType string `json:"event_type"`
16 Version int `json:"version"`
17 Data map[string]interface{} `json:"data"`
18 Timestamp time.Time `json:"timestamp"`
19}
20
21// EventStore persists and retrieves events
22type EventStore struct {
23 mu sync.RWMutex
24 events map[string][]Event // aggregateID -> events
25}
26
27func NewEventStore() *EventStore {
28 return &EventStore{
29 events: make(map[string][]Event),
30 }
31}
32
33// Append adds an event to the store
34func Append(event Event) error {
35 es.mu.Lock()
36 defer es.mu.Unlock()
37
38 // Validate version
39 existing := es.events[event.AggregateID]
40 if len(existing) > 0 && existing[len(existing)-1].Version >= event.Version {
41 return fmt.Errorf("version conflict: expected version > %d", existing[len(existing)-1].Version)
42 }
43
44 es.events[event.AggregateID] = append(es.events[event.AggregateID], event)
45 return nil
46}
47
48// GetEvents retrieves all events for an aggregate
49func GetEvents(aggregateID string) {
50 es.mu.RLock()
51 defer es.mu.RUnlock()
52
53 events, exists := es.events[aggregateID]
54 if !exists {
55 return []Event{}, nil
56 }
57
58 // Return a copy
59 result := make([]Event, len(events))
60 copy(result, events)
61 return result, nil
62}
63
64// GetEventsSince retrieves events after a specific version
65func GetEventsSince(aggregateID string, version int) {
66 es.mu.RLock()
67 defer es.mu.RUnlock()
68
69 events := es.events[aggregateID]
70 result := make([]Event, 0)
71
72 for _, event := range events {
73 if event.Version > version {
74 result = append(result, event)
75 }
76 }
77
78 return result, nil
79}
80
81// BankAccount is an aggregate root
82type BankAccount struct {
83 ID string
84 Owner string
85 Balance float64
86 Version int
87 events []Event
88}
89
90// NewBankAccount creates a new bank account
91func NewBankAccount(id, owner string, initialDeposit float64) *BankAccount {
92 account := &BankAccount{
93 ID: id,
94 Owner: owner,
95 Version: 0,
96 events: make([]Event, 0),
97 }
98
99 // Apply account created event
100 event := Event{
101 ID: fmt.Sprintf("evt-%d", time.Now().UnixNano()),
102 AggregateID: id,
103 AggregateType: "BankAccount",
104 EventType: "AccountCreated",
105 Version: 1,
106 Data: map[string]interface{}{
107 "owner": owner,
108 "initial_deposit": initialDeposit,
109 },
110 Timestamp: time.Now(),
111 }
112
113 account.applyAccountCreated(event)
114 account.events = append(account.events, event)
115
116 return account
117}
118
119// Deposit adds money to the account
120func Deposit(amount float64) error {
121 if amount <= 0 {
122 return fmt.Errorf("deposit amount must be positive")
123 }
124
125 event := Event{
126 ID: fmt.Sprintf("evt-%d", time.Now().UnixNano()),
127 AggregateID: ba.ID,
128 AggregateType: "BankAccount",
129 EventType: "MoneyDeposited",
130 Version: ba.Version + 1,
131 Data: map[string]interface{}{
132 "amount": amount,
133 },
134 Timestamp: time.Now(),
135 }
136
137 ba.applyMoneyDeposited(event)
138 ba.events = append(ba.events, event)
139
140 return nil
141}
142
143// Withdraw removes money from the account
144func Withdraw(amount float64) error {
145 if amount <= 0 {
146 return fmt.Errorf("withdrawal amount must be positive")
147 }
148
149 if ba.Balance < amount {
150 return fmt.Errorf("insufficient funds: balance=%.2f, withdrawal=%.2f", ba.Balance, amount)
151 }
152
153 event := Event{
154 ID: fmt.Sprintf("evt-%d", time.Now().UnixNano()),
155 AggregateID: ba.ID,
156 AggregateType: "BankAccount",
157 EventType: "MoneyWithdrawn",
158 Version: ba.Version + 1,
159 Data: map[string]interface{}{
160 "amount": amount,
161 },
162 Timestamp: time.Now(),
163 }
164
165 ba.applyMoneyWithdrawn(event)
166 ba.events = append(ba.events, event)
167
168 return nil
169}
170
171// GetUncommittedEvents returns events not yet persisted
172func GetUncommittedEvents() []Event {
173 return ba.events
174}
175
176// ClearUncommittedEvents clears the event buffer
177func ClearUncommittedEvents() {
178 ba.events = make([]Event, 0)
179}
180
181// LoadFromHistory reconstructs state from events
182func LoadFromHistory(events []Event) {
183 for _, event := range events {
184 switch event.EventType {
185 case "AccountCreated":
186 ba.applyAccountCreated(event)
187 case "MoneyDeposited":
188 ba.applyMoneyDeposited(event)
189 case "MoneyWithdrawn":
190 ba.applyMoneyWithdrawn(event)
191 }
192 }
193}
194
195// Event application methods
196func applyAccountCreated(event Event) {
197 ba.ID = event.AggregateID
198 ba.Owner = event.Data["owner"].(string)
199 ba.Balance = event.Data["initial_deposit"].(float64)
200 ba.Version = event.Version
201}
202
203func applyMoneyDeposited(event Event) {
204 amount := event.Data["amount"].(float64)
205 ba.Balance += amount
206 ba.Version = event.Version
207}
208
209func applyMoneyWithdrawn(event Event) {
210 amount := event.Data["amount"].(float64)
211 ba.Balance -= amount
212 ba.Version = event.Version
213}
214
215// Repository handles aggregate persistence
216type Repository struct {
217 eventStore *EventStore
218}
219
220func NewRepository(eventStore *EventStore) *Repository {
221 return &Repository{
222 eventStore: eventStore,
223 }
224}
225
226// Save persists an aggregate's events
227func Save(account *BankAccount) error {
228 events := account.GetUncommittedEvents()
229
230 for _, event := range events {
231 if err := r.eventStore.Append(event); err != nil {
232 return err
233 }
234 }
235
236 account.ClearUncommittedEvents()
237 return nil
238}
239
240// Load reconstructs an aggregate from events
241func Load(id string) {
242 events, err := r.eventStore.GetEvents(id)
243 if err != nil {
244 return nil, err
245 }
246
247 if len(events) == 0 {
248 return nil, fmt.Errorf("aggregate not found: %s", id)
249 }
250
251 account := &BankAccount{}
252 account.LoadFromHistory(events)
253
254 return account, nil
255}
256
257// Projection builds read models from events
258type AccountProjection struct {
259 mu sync.RWMutex
260 accounts map[string]AccountView
261}
262
263type AccountView struct {
264 ID string
265 Owner string
266 Balance float64
267 TransactionCount int
268 LastTransaction time.Time
269}
270
271func NewAccountProjection() *AccountProjection {
272 return &AccountProjection{
273 accounts: make(map[string]AccountView),
274 }
275}
276
277// Project processes events to build read model
278func Project(event Event) {
279 ap.mu.Lock()
280 defer ap.mu.Unlock()
281
282 view := ap.accounts[event.AggregateID]
283
284 switch event.EventType {
285 case "AccountCreated":
286 view.ID = event.AggregateID
287 view.Owner = event.Data["owner"].(string)
288 view.Balance = event.Data["initial_deposit"].(float64)
289 view.TransactionCount = 1
290 view.LastTransaction = event.Timestamp
291
292 case "MoneyDeposited":
293 amount := event.Data["amount"].(float64)
294 view.Balance += amount
295 view.TransactionCount++
296 view.LastTransaction = event.Timestamp
297
298 case "MoneyWithdrawn":
299 amount := event.Data["amount"].(float64)
300 view.Balance -= amount
301 view.TransactionCount++
302 view.LastTransaction = event.Timestamp
303 }
304
305 ap.accounts[event.AggregateID] = view
306}
307
308// GetAccount retrieves an account view
309func GetAccount(id string) {
310 ap.mu.RLock()
311 defer ap.mu.RUnlock()
312
313 view, exists := ap.accounts[id]
314 if !exists {
315 return AccountView{}, fmt.Errorf("account not found")
316 }
317
318 return view, nil
319}
Key Takeaways
- Immutable Events: Events are never modified, only appended
- Event Replay: Reconstruct state by replaying events
- CQRS: Separate write from read
- Version Control: Track event versions for optimistic concurrency
- Audit Trail: Complete history of all state changes
Related Topics
- Event-Driven Architecture - Event patterns
- Distributed Systems - Distributed patterns
- Microservices - Microservice architecture