Advanced Concurrency Patterns
Exercise Overview
Build a high-performance concurrent data processing system that demonstrates advanced Go concurrency patterns. You'll implement fan-in/fan-out, pipeline processing, worker pools, and proper cancellation techniques.
Learning Objectives
- Implement fan-in and fan-out patterns for parallel processing
- Build pipeline processing with stage-wise concurrency
- Create advanced worker pools with dynamic scaling
- Handle graceful shutdown and cancellation properly
- Apply context propagation through concurrent operations
- Use select statements for complex channel operations
- Implement timeout and error handling in concurrent systems
The Problem - Concurrent Data Pipeline
You need to process streaming data from multiple sources, apply transformations, and output results. The system must handle high throughput, be resilient to failures, and support graceful shutdown.
Initial Code
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "math/rand"
8 "runtime"
9 "sync"
10 "time"
11)
12
13// TODO: Implement data source interface
14type DataSource interface {
15 Stream(ctx context.Context) <-chan DataItem
16}
17
18// TODO: Optimize data structure
19type DataItem struct {
20 ID int
21 Value float64
22 Source string
23 Timestamp time.Time
24}
25
26// TODO: Implement fan-out pattern
27type FanOut struct {
28 input <-chan DataItem
29 outputs []chan DataItem
30 workerWg sync.WaitGroup
31}
32
33// TODO: Implement fan-in pattern
34type FanIn struct {
35 inputs []<-chan DataItem
36 output chan DataItem
37 workerWg sync.WaitGroup
38}
39
40// TODO: Implement pipeline stage
41type PipelineStage struct {
42 name string
43 input <-chan DataItem
44 output chan DataItem
45 process func(DataItem) DataItem
46 batchSize int
47}
48
49// TODO: Implement worker pool with scaling
50type WorkerPool struct {
51 taskChan chan Task
52 workerWg sync.WaitGroup
53 workers int
54 results chan Result
55 ctx context.Context
56 cancel context.CancelFunc
57}
58
59type Task struct {
60 ID int
61 Data DataItem
62 Process func(DataItem) Result
63}
64
65type Result struct {
66 TaskID int
67 Data DataItem
68 Error error
69}
70
71// TODO: Implement concurrent data processor
72type ConcurrentProcessor struct {
73 sources []DataSource
74 stages []*PipelineStage
75 workerPool *WorkerPool
76 fanOut *FanOut
77 fanIn *FanIn
78 output chan DataItem
79 ctx context.Context
80 cancel context.CancelFunc
81}
82
83// TODO: Implement sample data sources
84type FileDataSource struct {
85 filename string
86 data []DataItem
87 rateLimit time.Duration
88}
89
90type NetworkDataSource struct {
91 url string
92 rateLimit time.Duration
93}
94
95func Stream(ctx context.Context) <-chan DataItem {
96 // TODO: Implement file data source with proper cancellation
97 ch := make(chan DataItem)
98 return ch
99}
100
101func Stream(ctx context.Context) <-chan DataItem {
102 // TODO: Implement network data source with timeout and retry
103 ch := make(chan DataItem)
104 return ch
105}
106
107// TODO: Implement processing functions
108func ValidateData(item DataItem) DataItem {
109 // TODO: Add data validation
110 return item
111}
112
113func TransformData(item DataItem) DataItem {
114 // TODO: Add data transformation
115 return item
116}
117
118func EnrichData(item DataItem) DataItem {
119 // TODO: Add data enrichment
120 return item
121}
122
123func main() {
124 // TODO: Build and run concurrent processing pipeline
125 ctx, cancel := context.WithCancel(context.Background())
126 defer cancel()
127
128 processor := &ConcurrentProcessor{
129 ctx: ctx,
130 cancel: cancel,
131 }
132
133 // Start processing
134 if err := processor.Start(); err != nil {
135 log.Fatal(err)
136 }
137
138 // Handle graceful shutdown
139 handleSignals(processor)
140}
Tasks
Task 1: Implement Fan-Out Pattern
Create a fan-out pattern that distributes input to multiple output channels:
1type FanOut struct {
2 input <-chan DataItem
3 outputs []chan DataItem
4 workerWg sync.WaitGroup
5 ctx context.Context
6}
7
8func NewFanOut(ctx context.Context, input <-chan DataItem, outputCount int) *FanOut {
9 outputs := make([]chan DataItem, outputCount)
10 for i := range outputs {
11 outputs[i] = make(chan DataItem, 100) // Buffered channels
12 }
13
14 return &FanOut{
15 input: input,
16 outputs: outputs,
17 ctx: ctx,
18 }
19}
20
21func Start() {
22 for i, output := range fo.outputs {
23 fo.workerWg.Add(1)
24 go fo.distribute(i, output)
25 }
26}
27
28func distribute(workerID int, output chan DataItem) {
29 defer fo.workerWg.Done()
30 defer close(output)
31
32 for {
33 select {
34 case <-fo.ctx.Done():
35 return
36 case item, ok := <-fo.input:
37 if !ok {
38 return
39 }
40 select {
41 case output <- item:
42 case <-fo.ctx.Done():
43 return
44 }
45 }
46 }
47}
48
49func GetOutputs() []<-chan DataItem {
50 outputs := make([]<-chan DataItem, len(fo.outputs))
51 for i, output := range fo.outputs {
52 outputs[i] = output
53 }
54 return outputs
55}
56
57func Wait() {
58 fo.workerWg.Wait()
59}
Task 2: Implement Fan-In Pattern
Create a fan-in pattern that merges multiple input channels:
1type FanIn struct {
2 inputs []<-chan DataItem
3 output chan DataItem
4 workerWg sync.WaitGroup
5 ctx context.Context
6}
7
8func NewFanIn(ctx context.Context, inputs []<-chan DataItem) *FanIn {
9 return &FanIn{
10 inputs: inputs,
11 output: make(chan DataItem, 100),
12 ctx: ctx,
13 }
14}
15
16func Start() {
17 for i, input := range fi.inputs {
18 fi.workerWg.Add(1)
19 go fi.collect(i, input)
20 }
21
22 // Start closer goroutine
23 go func() {
24 fi.workerWg.Wait()
25 close(fi.output)
26 }()
27}
28
29func collect(workerID int, input <-chan DataItem) {
30 defer fi.workerWg.Done()
31
32 for {
33 select {
34 case <-fi.ctx.Done():
35 return
36 case item, ok := <-input:
37 if !ok {
38 return
39 }
40 select {
41 case fi.output <- item:
42 case <-fi.ctx.Done():
43 return
44 }
45 }
46 }
47}
48
49func Output() <-chan DataItem {
50 return fi.output
51}
Task 3: Implement Pipeline Stages
Build pipeline stages with concurrent processing:
1type PipelineStage struct {
2 name string
3 input <-chan DataItem
4 output chan DataItem
5 process func(DataItem)
6 errorChan chan error
7 workerWg sync.WaitGroup
8 workers int
9 ctx context.Context
10}
11
12func NewPipelineStage(ctx context.Context, name string, input <-chan DataItem, workers int, process func(DataItem)) *PipelineStage {
13 return &PipelineStage{
14 name: name,
15 input: input,
16 output: make(chan DataItem, 100),
17 process: process,
18 errorChan: make(chan error, 10),
19 workers: workers,
20 ctx: ctx,
21 }
22}
23
24func Start() {
25 for i := 0; i < ps.workers; i++ {
26 ps.workerWg.Add(1)
27 go ps.worker(i)
28 }
29
30 // Start output closer
31 go func() {
32 ps.workerWg.Wait()
33 close(ps.output)
34 close(ps.errorChan)
35 }()
36}
37
38func worker(workerID int) {
39 defer ps.workerWg.Done()
40
41 for {
42 select {
43 case <-ps.ctx.Done():
44 return
45 case item, ok := <-ps.input:
46 if !ok {
47 return
48 }
49
50 // Process item
51 result, err := ps.process(item)
52 if err != nil {
53 select {
54 case ps.errorChan <- fmt.Errorf("stage %s worker %d: %w", ps.name, workerID, err):
55 case <-ps.ctx.Done():
56 return
57 }
58 continue
59 }
60
61 select {
62 case ps.output <- result:
63 case <-ps.ctx.Done():
64 return
65 }
66 }
67 }
68}
69
70func Output() <-chan DataItem {
71 return ps.output
72}
73
74func Errors() <-chan error {
75 return ps.errorChan
76}
77
78func Wait() {
79 ps.workerWg.Wait()
80}
Task 4: Implement Dynamic Worker Pool
Create a worker pool that can scale based on load:
1type WorkerPool struct {
2 taskChan chan Task
3 resultChan chan Result
4 workerWg sync.WaitGroup
5 mu sync.RWMutex
6 workers []*Worker
7 minWorkers int
8 maxWorkers int
9 ctx context.Context
10 cancel context.CancelFunc
11 metrics *PoolMetrics
12}
13
14type Worker struct {
15 id int
16 taskChan <-chan Task
17 resultChan chan<- Result
18 quit chan struct{}
19 busy bool
20 lastActive time.Time
21 mu sync.RWMutex
22}
23
24type PoolMetrics struct {
25 TasksSubmitted int64
26 TasksCompleted int64
27 TasksFailed int64
28 ActiveWorkers int64
29 TotalWorkers int64
30}
31
32func NewWorkerPool(minWorkers, maxWorkers int) *WorkerPool {
33 ctx, cancel := context.WithCancel(context.Background())
34
35 return &WorkerPool{
36 taskChan: make(chan Task, 1000),
37 resultChan: make(chan Result, 1000),
38 workers: make([]*Worker, 0),
39 minWorkers: minWorkers,
40 maxWorkers: maxWorkers,
41 ctx: ctx,
42 cancel: cancel,
43 metrics: &PoolMetrics{},
44 }
45}
46
47func Start() {
48 // Start minimum workers
49 for i := 0; i < wp.minWorkers; i++ {
50 wp.addWorker()
51 }
52
53 // Start autoscaler
54 go wp.autoscaler()
55
56 // Start result collector
57 go wp.resultCollector()
58}
59
60func addWorker() {
61 wp.mu.Lock()
62 defer wp.mu.Unlock()
63
64 if len(wp.workers) >= wp.maxWorkers {
65 return
66 }
67
68 workerID := len(wp.workers)
69 worker := &Worker{
70 id: workerID,
71 taskChan: wp.taskChan,
72 resultChan: wp.resultChan,
73 quit: make(chan struct{}),
74 lastActive: time.Now(),
75 }
76
77 wp.workers = append(wp.workers, worker)
78 atomic.AddInt64(&wp.metrics.TotalWorkers, 1)
79
80 wp.workerWg.Add(1)
81 go worker.run(wp.ctx, &wp.workerWg)
82}
83
84func run(ctx context.Context, wg *sync.WaitGroup) {
85 defer wg.Done()
86
87 for {
88 select {
89 case <-ctx.Done():
90 return
91 case <-w.quit:
92 return
93 case task := <-w.taskChan:
94 w.setBusy(true)
95
96 // Process task
97 result := task.Process(task.Data)
98 result.TaskID = task.ID
99
100 select {
101 case w.resultChan <- result:
102 case <-ctx.Done():
103 return
104 }
105
106 w.setBusy(false)
107 }
108 }
109}
110
111func setBusy(busy bool) {
112 w.mu.Lock()
113 defer w.mu.Unlock()
114 w.busy = busy
115 w.lastActive = time.Now()
116}
117
118func isBusy() bool {
119 w.mu.RLock()
120 defer w.mu.RUnlock()
121 return w.busy
122}
123
124func autoscaler() {
125 ticker := time.NewTicker(5 * time.Second)
126 defer ticker.Stop()
127
128 for {
129 select {
130 case <-wp.ctx.Done():
131 return
132 case <-ticker.C:
133 wp.adjustWorkers()
134 }
135 }
136}
137
138func adjustWorkers() {
139 wp.mu.RLock()
140 defer wp.mu.RUnlock()
141
142 busyCount := 0
143 idleCount := 0
144
145 for _, worker := range wp.workers {
146 if worker.isBusy() {
147 busyCount++
148 } else {
149 idleCount++
150 }
151 }
152
153 // Scale up if all workers are busy and queue is growing
154 if busyCount == len(wp.workers) && len(wp.workers) < wp.maxWorkers && len(wp.taskChan) > 10 {
155 wp.addWorker()
156 log.Printf("Scaling up: added worker %d", len(wp.workers)-1)
157 }
158
159 // Scale down if we have idle workers and minimum is satisfied
160 if idleCount > 2 && len(wp.workers) > wp.minWorkers {
161 wp.removeWorker()
162 log.Printf("Scaling down: removed worker, now have %d", len(wp.workers)-1)
163 }
164}
165
166func removeWorker() {
167 wp.mu.Lock()
168 defer wp.mu.Unlock()
169
170 if len(wp.workers) <= wp.minWorkers {
171 return
172 }
173
174 // Find an idle worker
175 for i, worker := range wp.workers {
176 if !worker.isBusy() {
177 close(worker.quit)
178 wp.workers = append(wp.workers[:i], wp.workers[i+1:]...)
179 atomic.AddInt64(&wp.metrics.TotalWorkers, -1)
180 break
181 }
182 }
183}
184
185func Submit(task Task) {
186 atomic.AddInt64(&wp.metrics.TasksSubmitted, 1)
187
188 select {
189 case wp.taskChan <- task:
190 case <-wp.ctx.Done():
191 return
192 }
193}
194
195func Results() <-chan Result {
196 return wp.resultChan
197}
198
199func GetMetrics() PoolMetrics {
200 return PoolMetrics{
201 TasksSubmitted: atomic.LoadInt64(&wp.metrics.TasksSubmitted),
202 TasksCompleted: atomic.LoadInt64(&wp.metrics.TasksCompleted),
203 TasksFailed: atomic.LoadInt64(&wp.metrics.TasksFailed),
204 ActiveWorkers: atomic.LoadInt64(&wp.metrics.ActiveWorkers),
205 TotalWorkers: atomic.LoadInt64(&wp.metrics.TotalWorkers),
206 }
207}
208
209func Shutdown() {
210 wp.cancel()
211 wp.workerWg.Wait()
212 close(wp.resultChan)
213}
Task 5: Implement Context Propagation
Ensure proper context propagation through the pipeline:
1type ContextKey string
2
3const (
4 RequestIDKey ContextKey = "request_id"
5 TraceIDKey ContextKey = "trace_id"
6)
7
8type ProcessingContext struct {
9 RequestID string
10 TraceID string
11 StartTime time.Time
12 Metadata map[string]interface{}
13}
14
15func ToContext(ctx context.Context) context.Context {
16 ctx = context.WithValue(ctx, RequestIDKey, pc.RequestID)
17 ctx = context.WithValue(ctx, TraceIDKey, pc.TraceID)
18 return ctx
19}
20
21func ExtractProcessingContext(ctx context.Context) *ProcessingContext {
22 requestID, _ := ctx.Value(RequestIDKey).(string)
23 traceID, _ := ctx.Value(TraceIDKey).(string)
24
25 return &ProcessingContext{
26 RequestID: requestID,
27 TraceID: traceID,
28 }
29}
30
31func ContextAwareProcess(ctx context.Context, item DataItem) {
32 // Extract context information
33 pc := ExtractProcessingContext(ctx)
34
35 // Add context metadata to item
36 if pc.RequestID != "" {
37 // Add request ID to item for tracing
38 }
39
40 // Process with timeout
41 processCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
42 defer cancel()
43
44 // Simulate work
45 select {
46 case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
47 return item, nil
48 case <-processCtx.Done():
49 return DataItem{}, processCtx.Err()
50 }
51}
Solution Approach
Click to see detailed solution
Complete Implementation:
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "math/rand"
8 "os"
9 "os/signal"
10 "runtime"
11 "sync"
12 "sync/atomic"
13 "syscall"
14 "time"
15)
16
17type DataItem struct {
18 ID int `json:"id"`
19 Value float64 `json:"value"`
20 Source string `json:"source"`
21 Timestamp time.Time `json:"timestamp"`
22 Metadata map[string]interface{} `json:"metadata"`
23}
24
25type DataSource interface {
26 Stream(ctx context.Context) <-chan DataItem
27}
28
29// FileDataSource implementation
30type FileDataSource struct {
31 filename string
32 rateLimit time.Duration
33}
34
35func Stream(ctx context.Context) <-chan DataItem {
36 ch := make(chan DataItem, 10)
37
38 go func() {
39 defer close(ch)
40
41 ticker := time.NewTicker(fds.rateLimit)
42 defer ticker.Stop()
43
44 itemID := 0
45 for {
46 select {
47 case <-ctx.Done():
48 return
49 case <-ticker.C:
50 item := DataItem{
51 ID: itemID,
52 Value: rand.Float64() * 100,
53 Source: "file:" + fds.filename,
54 Timestamp: time.Now(),
55 Metadata: map[string]interface{}{
56 "type": "generated",
57 },
58 }
59
60 select {
61 case ch <- item:
62 itemID++
63 case <-ctx.Done():
64 return
65 }
66 }
67 }
68 }()
69
70 return ch
71}
72
73// NetworkDataSource implementation
74type NetworkDataSource struct {
75 url string
76 rateLimit time.Duration
77}
78
79func Stream(ctx context.Context) <-chan DataItem {
80 ch := make(chan DataItem, 10)
81
82 go func() {
83 defer close(ch)
84
85 ticker := time.NewTicker(nds.rateLimit)
86 defer ticker.Stop()
87
88 itemID := 1000
89 for {
90 select {
91 case <-ctx.Done():
92 return
93 case <-ticker.C:
94 // Simulate network latency
95 time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
96
97 item := DataItem{
98 ID: itemID,
99 Value: rand.Float64() * 200,
100 Source: "network:" + nds.url,
101 Timestamp: time.Now(),
102 Metadata: map[string]interface{}{
103 "type": "fetched",
104 },
105 }
106
107 select {
108 case ch <- item:
109 itemID++
110 case <-ctx.Done():
111 return
112 }
113 }
114 }
115 }()
116
117 return ch
118}
119
120// FanOut implementation
121type FanOut struct {
122 input <-chan DataItem
123 outputs []chan DataItem
124 workerWg sync.WaitGroup
125 ctx context.Context
126}
127
128func NewFanOut(ctx context.Context, input <-chan DataItem, outputCount int) *FanOut {
129 outputs := make([]chan DataItem, outputCount)
130 for i := range outputs {
131 outputs[i] = make(chan DataItem, 100)
132 }
133
134 return &FanOut{
135 input: input,
136 outputs: outputs,
137 ctx: ctx,
138 }
139}
140
141func Start() {
142 for i, output := range fo.outputs {
143 fo.workerWg.Add(1)
144 go fo.distribute(i, output)
145 }
146}
147
148func distribute(workerID int, output chan DataItem) {
149 defer fo.workerWg.Done()
150 defer close(output)
151
152 for {
153 select {
154 case <-fo.ctx.Done():
155 return
156 case item, ok := <-fo.input:
157 if !ok {
158 return
159 }
160 select {
161 case output <- item:
162 case <-fo.ctx.Done():
163 return
164 }
165 }
166 }
167}
168
169func GetOutputs() []<-chan DataItem {
170 outputs := make([]<-chan DataItem, len(fo.outputs))
171 for i, output := range fo.outputs {
172 outputs[i] = output
173 }
174 return outputs
175}
176
177func Wait() {
178 fo.workerWg.Wait()
179}
180
181// FanIn implementation
182type FanIn struct {
183 inputs []<-chan DataItem
184 output chan DataItem
185 workerWg sync.WaitGroup
186 ctx context.Context
187}
188
189func NewFanIn(ctx context.Context, inputs []<-chan DataItem) *FanIn {
190 return &FanIn{
191 inputs: inputs,
192 output: make(chan DataItem, 100),
193 ctx: ctx,
194 }
195}
196
197func Start() {
198 for i, input := range fi.inputs {
199 fi.workerWg.Add(1)
200 go fi.collect(i, input)
201 }
202
203 // Start closer goroutine
204 go func() {
205 fi.workerWg.Wait()
206 close(fi.output)
207 }()
208}
209
210func collect(workerID int, input <-chan DataItem) {
211 defer fi.workerWg.Done()
212
213 for {
214 select {
215 case <-fi.ctx.Done():
216 return
217 case item, ok := <-input:
218 if !ok {
219 return
220 }
221 select {
222 case fi.output <- item:
223 case <-fi.ctx.Done():
224 return
225 }
226 }
227 }
228}
229
230func Output() <-chan DataItem {
231 return fi.output
232}
233
234// Processing functions
235func ValidateData(item DataItem) {
236 if item.Value < 0 {
237 return item, fmt.Errorf("invalid negative value: %f", item.Value)
238 }
239
240 // Add validation metadata
241 if item.Metadata == nil {
242 item.Metadata = make(map[string]interface{})
243 }
244 item.Metadata["validated"] = true
245 item.Metadata["validation_time"] = time.Now()
246
247 return item, nil
248}
249
250func TransformData(item DataItem) {
251 // Apply transformation
252 item.Value = item.Value * 2.0
253
254 // Add transformation metadata
255 if item.Metadata == nil {
256 item.Metadata = make(map[string]interface{})
257 }
258 item.Metadata["transformed"] = true
259 item.Metadata["transform_factor"] = 2.0
260 item.Metadata["transform_time"] = time.Now()
261
262 return item, nil
263}
264
265func EnrichData(item DataItem) {
266 // Add enrichment
267 if item.Metadata == nil {
268 item.Metadata = make(map[string]interface{})
269 }
270 item.Metadata["enriched"] = true
271 item.Metadata["category"] = "processed"
272 item.Metadata["priority"] = "normal"
273 item.Metadata["enrich_time"] = time.Now()
274
275 return item, nil
276}
277
278// PipelineStage implementation
279type PipelineStage struct {
280 name string
281 input <-chan DataItem
282 output chan DataItem
283 process func(DataItem)
284 errorChan chan error
285 workerWg sync.WaitGroup
286 workers int
287 ctx context.Context
288 metrics *StageMetrics
289}
290
291type StageMetrics struct {
292 Processed int64
293 Errors int64
294 Duration int64
295}
296
297func NewPipelineStage(ctx context.Context, name string, input <-chan DataItem, workers int, process func(DataItem)) *PipelineStage {
298 return &PipelineStage{
299 name: name,
300 input: input,
301 output: make(chan DataItem, 100),
302 process: process,
303 errorChan: make(chan error, 10),
304 workers: workers,
305 ctx: ctx,
306 metrics: &StageMetrics{},
307 }
308}
309
310func Start() {
311 for i := 0; i < ps.workers; i++ {
312 ps.workerWg.Add(1)
313 go ps.worker(i)
314 }
315
316 // Start output closer
317 go func() {
318 ps.workerWg.Wait()
319 close(ps.output)
320 close(ps.errorChan)
321 }()
322}
323
324func worker(workerID int) {
325 defer ps.workerWg.Done()
326
327 for {
328 select {
329 case <-ps.ctx.Done():
330 return
331 case item, ok := <-ps.input:
332 if !ok {
333 return
334 }
335
336 start := time.Now()
337 result, err := ps.process(item)
338 duration := time.Since(start)
339
340 atomic.AddInt64(&ps.metrics.Processed, 1)
341 atomic.AddInt64(&ps.metrics.Duration, duration.Nanoseconds())
342
343 if err != nil {
344 atomic.AddInt64(&ps.metrics.Errors, 1)
345 select {
346 case ps.errorChan <- fmt.Errorf("stage %s worker %d: %w", ps.name, workerID, err):
347 case <-ps.ctx.Done():
348 return
349 }
350 continue
351 }
352
353 select {
354 case ps.output <- result:
355 case <-ps.ctx.Done():
356 return
357 }
358 }
359 }
360}
361
362func Output() <-chan DataItem {
363 return ps.output
364}
365
366func Errors() <-chan error {
367 return ps.errorChan
368}
369
370func GetMetrics() StageMetrics {
371 return StageMetrics{
372 Processed: atomic.LoadInt64(&ps.metrics.Processed),
373 Errors: atomic.LoadInt64(&ps.metrics.Errors),
374 Duration: atomic.LoadInt64(&ps.metrics.Duration),
375 }
376}
377
378// ConcurrentProcessor implementation
379type ConcurrentProcessor struct {
380 sources []DataSource
381 stages []*PipelineStage
382 fanOut *FanOut
383 fanIn *FanIn
384 output chan DataItem
385 ctx context.Context
386 cancel context.CancelFunc
387 wg sync.WaitGroup
388 stats *ProcessorStats
389}
390
391type ProcessorStats struct {
392 ItemsProcessed int64
393 TotalErrors int64
394 StartTime time.Time
395}
396
397func NewConcurrentProcessor() *ConcurrentProcessor {
398 ctx, cancel := context.WithCancel(context.Background())
399
400 return &ConcurrentProcessor{
401 ctx: ctx,
402 cancel: cancel,
403 output: make(chan DataItem, 100),
404 stats: &ProcessorStats{StartTime: time.Now()},
405 }
406}
407
408func AddSource(source DataSource) {
409 cp.sources = append(cp.sources, source)
410}
411
412func BuildPipeline() {
413 // Create source channels
414 sourceChannels := make([]<-chan DataItem, len(cp.sources))
415 for i, source := range cp.sources {
416 sourceChannels[i] = source.Stream(cp.ctx)
417 }
418
419 // Fan-in all sources
420 cp.fanIn = NewFanIn(cp.ctx, sourceChannels)
421 cp.fanIn.Start()
422
423 // Fan-out to multiple workers
424 cp.fanOut = NewFanOut(cp.ctx, cp.fanIn.Output(), runtime.NumCPU())
425 cp.fanOut.Start()
426
427 // Create pipeline stages
428 stageInputs := cp.fanOut.GetOutputs()
429
430 // Validation stage
431 validationStage := NewPipelineStage(cp.ctx, "validation", stageInputs[0], 2, ValidateData)
432 cp.stages = append(cp.stages, validationStage)
433 validationStage.Start()
434
435 // Transformation stage
436 transformStage := NewPipelineStage(cp.ctx, "transform", validationStage.Output(), 2, TransformData)
437 cp.stages = append(cp.stages, transformStage)
438 transformStage.Start()
439
440 // Enrichment stage
441 enrichmentStage := NewPipelineStage(cp.ctx, "enrichment", transformStage.Output(), 2, EnrichData)
442 cp.stages = append(cp.stages, enrichmentStage)
443 enrichmentStage.Start()
444
445 // Start result collector
446 cp.wg.Add(1)
447 go cp.collectResults(enrichmentStage.Output())
448}
449
450func collectResults(input <-chan DataItem) {
451 defer cp.wg.Done()
452 defer close(cp.output)
453
454 for {
455 select {
456 case <-cp.ctx.Done():
457 return
458 case item, ok := <-input:
459 if !ok {
460 return
461 }
462 atomic.AddInt64(&cp.stats.ItemsProcessed, 1)
463
464 select {
465 case cp.output <- item:
466 case <-cp.ctx.Done():
467 return
468 }
469 }
470 }
471}
472
473func Start() error {
474 if len(cp.sources) == 0 {
475 return fmt.Errorf("no data sources configured")
476 }
477
478 cp.BuildPipeline()
479
480 // Start monitoring
481 cp.wg.Add(1)
482 go cp.monitor()
483
484 return nil
485}
486
487func monitor() {
488 defer cp.wg.Done()
489
490 ticker := time.NewTicker(5 * time.Second)
491 defer ticker.Stop()
492
493 for {
494 select {
495 case <-cp.ctx.Done():
496 return
497 case <-ticker.C:
498 cp.logStats()
499 }
500 }
501}
502
503func logStats() {
504 stats := *cp.stats
505 processed := atomic.LoadInt64(&stats.ItemsProcessed)
506 errors := atomic.LoadInt64(&stats.TotalErrors)
507 duration := time.Since(stats.StartTime)
508 rate := float64(processed) / duration.Seconds()
509
510 log.Printf("Processor Stats - Processed: %d, Errors: %d, Rate: %.2f items/sec, Duration: %v",
511 processed, errors, rate, duration)
512
513 // Log stage metrics
514 for _, stage := range cp.stages {
515 metrics := stage.GetMetrics()
516 avgDuration := time.Duration(0)
517 if metrics.Processed > 0 {
518 avgDuration = time.Duration(metrics.Duration / metrics.Processed)
519 }
520 log.Printf("Stage %s - Processed: %d, Errors: %d, Avg Duration: %v",
521 stage.name, metrics.Processed, metrics.Errors, avgDuration)
522 }
523}
524
525func Output() <-chan DataItem {
526 return cp.output
527}
528
529func Shutdown() {
530 cp.cancel()
531
532 // Wait for all components to finish
533 cp.fanIn.Wait()
534 cp.fanOut.Wait()
535 for _, stage := range cp.stages {
536 stage.Wait()
537 }
538 cp.wg.Wait()
539
540 log.Println("Concurrent processor shutdown complete")
541}
542
543func handleSignals(processor *ConcurrentProcessor) {
544 sigChan := make(chan os.Signal, 1)
545 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
546
547 <-sigChan
548 log.Println("Received shutdown signal, shutting down gracefully...")
549 processor.Shutdown()
550}
551
552func main() {
553 log.Printf("Starting concurrent processor with %d CPUs", runtime.NumCPU())
554
555 // Create processor
556 processor := NewConcurrentProcessor()
557
558 // Add data sources
559 processor.AddSource(&FileDataSource{
560 filename: "data1.txt",
561 rateLimit: 10 * time.Millisecond,
562 })
563
564 processor.AddSource(&FileDataSource{
565 filename: "data2.txt",
566 rateLimit: 15 * time.Millisecond,
567 })
568
569 processor.AddSource(&NetworkDataSource{
570 url: "api.example.com",
571 rateLimit: 20 * time.Millisecond,
572 })
573
574 // Start processing
575 if err := processor.Start(); err != nil {
576 log.Fatal(err)
577 }
578
579 // Process results
580 go func() {
581 for item := range processor.Output() {
582 log.Printf("Processed item: ID=%d, Value=%.2f, Source=%s",
583 item.ID, item.Value, item.Source)
584 }
585 }()
586
587 // Handle graceful shutdown
588 handleSignals(processor)
589}
Testing Your Solution
1. Basic Functionality Test
1# Run the processor
2go run main.go
3
4# Test with different data rates
5# Modify rateLimit values in data sources to see how the system handles varying load
2. Concurrency Testing
1# Test with many sources
2# Add more data sources to test fan-in scalability
3
4# Test with different worker counts
5# Modify the number of workers in pipeline stages
3. Graceful Shutdown Test
1# Send SIGINT signal
2kill -INT <pid>
3
4# Verify all goroutines are cleaned up properly
5# Check for resource leaks
4. Performance Testing
1# Monitor CPU usage
2# Check goroutine count with runtime.NumGoroutine()
3# Measure throughput
4# Measure latency distribution
Extension Challenges
- Add backpressure handling - Implement flow control to prevent memory buildup
- Add batch processing - Process items in batches for better throughput
- Add priority queues - Implement priority-based processing
- Add distributed processing - Scale across multiple machines
- Add persistence - Handle system restarts and state recovery
Key Takeaways
- Fan-out pattern distributes work across multiple workers for parallel processing
- Fan-in pattern combines results from multiple concurrent operations
- Pipeline stages enable step-by-step processing with concurrent workers
- Context cancellation ensures graceful shutdown and resource cleanup
- Buffered channels help smooth out processing rate variations
- Worker pools provide efficient resource utilization
- Monitoring and metrics are essential for understanding system behavior
This exercise demonstrates advanced Go concurrency patterns that are essential for building high-performance, scalable concurrent systems.