Advanced Concurrency Patterns

Advanced Concurrency Patterns

Exercise Overview

Build a high-performance concurrent data processing system that demonstrates advanced Go concurrency patterns. You'll implement fan-in/fan-out, pipeline processing, worker pools, and proper cancellation techniques.

Learning Objectives

  • Implement fan-in and fan-out patterns for parallel processing
  • Build pipeline processing with stage-wise concurrency
  • Create advanced worker pools with dynamic scaling
  • Handle graceful shutdown and cancellation properly
  • Apply context propagation through concurrent operations
  • Use select statements for complex channel operations
  • Implement timeout and error handling in concurrent systems

The Problem - Concurrent Data Pipeline

You need to process streaming data from multiple sources, apply transformations, and output results. The system must handle high throughput, be resilient to failures, and support graceful shutdown.

Initial Code

  1package main
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"math/rand"
  8	"runtime"
  9	"sync"
 10	"time"
 11)
 12
 13// TODO: Implement data source interface
 14type DataSource interface {
 15	Stream(ctx context.Context) <-chan DataItem
 16}
 17
 18// TODO: Optimize data structure
 19type DataItem struct {
 20	ID        int
 21	Value     float64
 22	Source    string
 23	Timestamp time.Time
 24}
 25
 26// TODO: Implement fan-out pattern
 27type FanOut struct {
 28	input     <-chan DataItem
 29	outputs   []chan DataItem
 30	workerWg  sync.WaitGroup
 31}
 32
 33// TODO: Implement fan-in pattern
 34type FanIn struct {
 35	inputs    []<-chan DataItem
 36	output    chan DataItem
 37	workerWg  sync.WaitGroup
 38}
 39
 40// TODO: Implement pipeline stage
 41type PipelineStage struct {
 42	name      string
 43	input     <-chan DataItem
 44	output    chan DataItem
 45	process   func(DataItem) DataItem
 46	batchSize int
 47}
 48
 49// TODO: Implement worker pool with scaling
 50type WorkerPool struct {
 51	taskChan  chan Task
 52	workerWg  sync.WaitGroup
 53	workers   int
 54	results   chan Result
 55	ctx       context.Context
 56	cancel    context.CancelFunc
 57}
 58
 59type Task struct {
 60	ID     int
 61	Data   DataItem
 62	Process func(DataItem) Result
 63}
 64
 65type Result struct {
 66	TaskID int
 67	Data   DataItem
 68	Error  error
 69}
 70
 71// TODO: Implement concurrent data processor
 72type ConcurrentProcessor struct {
 73	sources    []DataSource
 74	stages     []*PipelineStage
 75	workerPool *WorkerPool
 76	fanOut     *FanOut
 77	fanIn      *FanIn
 78	output     chan DataItem
 79	ctx        context.Context
 80	cancel     context.CancelFunc
 81}
 82
 83// TODO: Implement sample data sources
 84type FileDataSource struct {
 85	filename  string
 86	data      []DataItem
 87	rateLimit time.Duration
 88}
 89
 90type NetworkDataSource struct {
 91	url       string
 92	rateLimit time.Duration
 93}
 94
 95func Stream(ctx context.Context) <-chan DataItem {
 96	// TODO: Implement file data source with proper cancellation
 97	ch := make(chan DataItem)
 98	return ch
 99}
100
101func Stream(ctx context.Context) <-chan DataItem {
102	// TODO: Implement network data source with timeout and retry
103	ch := make(chan DataItem)
104	return ch
105}
106
107// TODO: Implement processing functions
108func ValidateData(item DataItem) DataItem {
109	// TODO: Add data validation
110	return item
111}
112
113func TransformData(item DataItem) DataItem {
114	// TODO: Add data transformation
115	return item
116}
117
118func EnrichData(item DataItem) DataItem {
119	// TODO: Add data enrichment
120	return item
121}
122
123func main() {
124	// TODO: Build and run concurrent processing pipeline
125	ctx, cancel := context.WithCancel(context.Background())
126	defer cancel()
127
128	processor := &ConcurrentProcessor{
129		ctx:    ctx,
130		cancel: cancel,
131	}
132
133	// Start processing
134	if err := processor.Start(); err != nil {
135		log.Fatal(err)
136	}
137
138	// Handle graceful shutdown
139	handleSignals(processor)
140}

Tasks

Task 1: Implement Fan-Out Pattern

Create a fan-out pattern that distributes input to multiple output channels:

 1type FanOut struct {
 2	input    <-chan DataItem
 3	outputs  []chan DataItem
 4	workerWg sync.WaitGroup
 5	ctx      context.Context
 6}
 7
 8func NewFanOut(ctx context.Context, input <-chan DataItem, outputCount int) *FanOut {
 9	outputs := make([]chan DataItem, outputCount)
10	for i := range outputs {
11		outputs[i] = make(chan DataItem, 100) // Buffered channels
12	}
13
14	return &FanOut{
15		input:   input,
16		outputs: outputs,
17		ctx:     ctx,
18	}
19}
20
21func Start() {
22	for i, output := range fo.outputs {
23		fo.workerWg.Add(1)
24		go fo.distribute(i, output)
25	}
26}
27
28func distribute(workerID int, output chan DataItem) {
29	defer fo.workerWg.Done()
30	defer close(output)
31
32	for {
33		select {
34		case <-fo.ctx.Done():
35			return
36		case item, ok := <-fo.input:
37			if !ok {
38				return
39			}
40			select {
41			case output <- item:
42			case <-fo.ctx.Done():
43				return
44			}
45		}
46	}
47}
48
49func GetOutputs() []<-chan DataItem {
50	outputs := make([]<-chan DataItem, len(fo.outputs))
51	for i, output := range fo.outputs {
52		outputs[i] = output
53	}
54	return outputs
55}
56
57func Wait() {
58	fo.workerWg.Wait()
59}

Task 2: Implement Fan-In Pattern

Create a fan-in pattern that merges multiple input channels:

 1type FanIn struct {
 2	inputs   []<-chan DataItem
 3	output   chan DataItem
 4	workerWg sync.WaitGroup
 5	ctx      context.Context
 6}
 7
 8func NewFanIn(ctx context.Context, inputs []<-chan DataItem) *FanIn {
 9	return &FanIn{
10		inputs: inputs,
11		output: make(chan DataItem, 100),
12		ctx:    ctx,
13	}
14}
15
16func Start() {
17	for i, input := range fi.inputs {
18		fi.workerWg.Add(1)
19		go fi.collect(i, input)
20	}
21
22	// Start closer goroutine
23	go func() {
24		fi.workerWg.Wait()
25		close(fi.output)
26	}()
27}
28
29func collect(workerID int, input <-chan DataItem) {
30	defer fi.workerWg.Done()
31
32	for {
33		select {
34		case <-fi.ctx.Done():
35			return
36		case item, ok := <-input:
37			if !ok {
38				return
39			}
40			select {
41			case fi.output <- item:
42			case <-fi.ctx.Done():
43				return
44			}
45		}
46	}
47}
48
49func Output() <-chan DataItem {
50	return fi.output
51}

Task 3: Implement Pipeline Stages

Build pipeline stages with concurrent processing:

 1type PipelineStage struct {
 2	name       string
 3	input      <-chan DataItem
 4	output     chan DataItem
 5	process    func(DataItem)
 6	errorChan  chan error
 7	workerWg   sync.WaitGroup
 8	workers    int
 9	ctx        context.Context
10}
11
12func NewPipelineStage(ctx context.Context, name string, input <-chan DataItem, workers int, process func(DataItem)) *PipelineStage {
13	return &PipelineStage{
14		name:      name,
15		input:     input,
16		output:    make(chan DataItem, 100),
17		process:   process,
18		errorChan: make(chan error, 10),
19		workers:   workers,
20		ctx:       ctx,
21	}
22}
23
24func Start() {
25	for i := 0; i < ps.workers; i++ {
26		ps.workerWg.Add(1)
27		go ps.worker(i)
28	}
29
30	// Start output closer
31	go func() {
32		ps.workerWg.Wait()
33		close(ps.output)
34		close(ps.errorChan)
35	}()
36}
37
38func worker(workerID int) {
39	defer ps.workerWg.Done()
40
41	for {
42		select {
43		case <-ps.ctx.Done():
44			return
45		case item, ok := <-ps.input:
46			if !ok {
47				return
48			}
49
50			// Process item
51			result, err := ps.process(item)
52			if err != nil {
53				select {
54				case ps.errorChan <- fmt.Errorf("stage %s worker %d: %w", ps.name, workerID, err):
55				case <-ps.ctx.Done():
56					return
57				}
58				continue
59			}
60
61			select {
62			case ps.output <- result:
63			case <-ps.ctx.Done():
64				return
65			}
66		}
67	}
68}
69
70func Output() <-chan DataItem {
71	return ps.output
72}
73
74func Errors() <-chan error {
75	return ps.errorChan
76}
77
78func Wait() {
79	ps.workerWg.Wait()
80}

Task 4: Implement Dynamic Worker Pool

Create a worker pool that can scale based on load:

  1type WorkerPool struct {
  2	taskChan    chan Task
  3	resultChan  chan Result
  4	workerWg    sync.WaitGroup
  5	mu          sync.RWMutex
  6	workers     []*Worker
  7	minWorkers  int
  8	maxWorkers  int
  9	ctx         context.Context
 10	cancel      context.CancelFunc
 11	metrics     *PoolMetrics
 12}
 13
 14type Worker struct {
 15	id          int
 16	taskChan    <-chan Task
 17	resultChan  chan<- Result
 18	quit        chan struct{}
 19	busy        bool
 20	lastActive  time.Time
 21	mu          sync.RWMutex
 22}
 23
 24type PoolMetrics struct {
 25	TasksSubmitted  int64
 26	TasksCompleted  int64
 27	TasksFailed     int64
 28	ActiveWorkers   int64
 29	TotalWorkers    int64
 30}
 31
 32func NewWorkerPool(minWorkers, maxWorkers int) *WorkerPool {
 33	ctx, cancel := context.WithCancel(context.Background())
 34
 35	return &WorkerPool{
 36		taskChan:   make(chan Task, 1000),
 37		resultChan: make(chan Result, 1000),
 38		workers:    make([]*Worker, 0),
 39		minWorkers: minWorkers,
 40		maxWorkers: maxWorkers,
 41		ctx:        ctx,
 42		cancel:     cancel,
 43		metrics:    &PoolMetrics{},
 44	}
 45}
 46
 47func Start() {
 48	// Start minimum workers
 49	for i := 0; i < wp.minWorkers; i++ {
 50		wp.addWorker()
 51	}
 52
 53	// Start autoscaler
 54	go wp.autoscaler()
 55
 56	// Start result collector
 57	go wp.resultCollector()
 58}
 59
 60func addWorker() {
 61	wp.mu.Lock()
 62	defer wp.mu.Unlock()
 63
 64	if len(wp.workers) >= wp.maxWorkers {
 65		return
 66	}
 67
 68	workerID := len(wp.workers)
 69	worker := &Worker{
 70		id:         workerID,
 71		taskChan:   wp.taskChan,
 72		resultChan: wp.resultChan,
 73		quit:       make(chan struct{}),
 74		lastActive: time.Now(),
 75	}
 76
 77	wp.workers = append(wp.workers, worker)
 78	atomic.AddInt64(&wp.metrics.TotalWorkers, 1)
 79
 80	wp.workerWg.Add(1)
 81	go worker.run(wp.ctx, &wp.workerWg)
 82}
 83
 84func run(ctx context.Context, wg *sync.WaitGroup) {
 85	defer wg.Done()
 86
 87	for {
 88		select {
 89		case <-ctx.Done():
 90			return
 91		case <-w.quit:
 92			return
 93		case task := <-w.taskChan:
 94			w.setBusy(true)
 95
 96			// Process task
 97			result := task.Process(task.Data)
 98			result.TaskID = task.ID
 99
100			select {
101			case w.resultChan <- result:
102			case <-ctx.Done():
103				return
104			}
105
106			w.setBusy(false)
107		}
108	}
109}
110
111func setBusy(busy bool) {
112	w.mu.Lock()
113	defer w.mu.Unlock()
114	w.busy = busy
115	w.lastActive = time.Now()
116}
117
118func isBusy() bool {
119	w.mu.RLock()
120	defer w.mu.RUnlock()
121	return w.busy
122}
123
124func autoscaler() {
125	ticker := time.NewTicker(5 * time.Second)
126	defer ticker.Stop()
127
128	for {
129		select {
130		case <-wp.ctx.Done():
131			return
132		case <-ticker.C:
133			wp.adjustWorkers()
134		}
135	}
136}
137
138func adjustWorkers() {
139	wp.mu.RLock()
140	defer wp.mu.RUnlock()
141
142	busyCount := 0
143	idleCount := 0
144
145	for _, worker := range wp.workers {
146		if worker.isBusy() {
147			busyCount++
148		} else {
149			idleCount++
150		}
151	}
152
153	// Scale up if all workers are busy and queue is growing
154	if busyCount == len(wp.workers) && len(wp.workers) < wp.maxWorkers && len(wp.taskChan) > 10 {
155		wp.addWorker()
156		log.Printf("Scaling up: added worker %d", len(wp.workers)-1)
157	}
158
159	// Scale down if we have idle workers and minimum is satisfied
160	if idleCount > 2 && len(wp.workers) > wp.minWorkers {
161		wp.removeWorker()
162		log.Printf("Scaling down: removed worker, now have %d", len(wp.workers)-1)
163	}
164}
165
166func removeWorker() {
167	wp.mu.Lock()
168	defer wp.mu.Unlock()
169
170	if len(wp.workers) <= wp.minWorkers {
171		return
172	}
173
174	// Find an idle worker
175	for i, worker := range wp.workers {
176		if !worker.isBusy() {
177			close(worker.quit)
178			wp.workers = append(wp.workers[:i], wp.workers[i+1:]...)
179			atomic.AddInt64(&wp.metrics.TotalWorkers, -1)
180			break
181		}
182	}
183}
184
185func Submit(task Task) {
186	atomic.AddInt64(&wp.metrics.TasksSubmitted, 1)
187
188	select {
189	case wp.taskChan <- task:
190	case <-wp.ctx.Done():
191		return
192	}
193}
194
195func Results() <-chan Result {
196	return wp.resultChan
197}
198
199func GetMetrics() PoolMetrics {
200	return PoolMetrics{
201		TasksSubmitted: atomic.LoadInt64(&wp.metrics.TasksSubmitted),
202		TasksCompleted: atomic.LoadInt64(&wp.metrics.TasksCompleted),
203		TasksFailed:    atomic.LoadInt64(&wp.metrics.TasksFailed),
204		ActiveWorkers:  atomic.LoadInt64(&wp.metrics.ActiveWorkers),
205		TotalWorkers:   atomic.LoadInt64(&wp.metrics.TotalWorkers),
206	}
207}
208
209func Shutdown() {
210	wp.cancel()
211	wp.workerWg.Wait()
212	close(wp.resultChan)
213}

Task 5: Implement Context Propagation

Ensure proper context propagation through the pipeline:

 1type ContextKey string
 2
 3const (
 4	RequestIDKey ContextKey = "request_id"
 5	TraceIDKey   ContextKey = "trace_id"
 6)
 7
 8type ProcessingContext struct {
 9	RequestID string
10	TraceID   string
11	StartTime time.Time
12	Metadata  map[string]interface{}
13}
14
15func ToContext(ctx context.Context) context.Context {
16	ctx = context.WithValue(ctx, RequestIDKey, pc.RequestID)
17	ctx = context.WithValue(ctx, TraceIDKey, pc.TraceID)
18	return ctx
19}
20
21func ExtractProcessingContext(ctx context.Context) *ProcessingContext {
22	requestID, _ := ctx.Value(RequestIDKey).(string)
23	traceID, _ := ctx.Value(TraceIDKey).(string)
24
25	return &ProcessingContext{
26		RequestID: requestID,
27		TraceID:   traceID,
28	}
29}
30
31func ContextAwareProcess(ctx context.Context, item DataItem) {
32	// Extract context information
33	pc := ExtractProcessingContext(ctx)
34
35	// Add context metadata to item
36	if pc.RequestID != "" {
37		// Add request ID to item for tracing
38	}
39
40	// Process with timeout
41	processCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
42	defer cancel()
43
44	// Simulate work
45	select {
46	case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
47		return item, nil
48	case <-processCtx.Done():
49		return DataItem{}, processCtx.Err()
50	}
51}

Solution Approach

Click to see detailed solution

Complete Implementation:

  1package main
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"math/rand"
  8	"os"
  9	"os/signal"
 10	"runtime"
 11	"sync"
 12	"sync/atomic"
 13	"syscall"
 14	"time"
 15)
 16
 17type DataItem struct {
 18	ID        int                    `json:"id"`
 19	Value     float64                `json:"value"`
 20	Source    string                 `json:"source"`
 21	Timestamp time.Time              `json:"timestamp"`
 22	Metadata  map[string]interface{} `json:"metadata"`
 23}
 24
 25type DataSource interface {
 26	Stream(ctx context.Context) <-chan DataItem
 27}
 28
 29// FileDataSource implementation
 30type FileDataSource struct {
 31	filename  string
 32	rateLimit time.Duration
 33}
 34
 35func Stream(ctx context.Context) <-chan DataItem {
 36	ch := make(chan DataItem, 10)
 37
 38	go func() {
 39		defer close(ch)
 40
 41		ticker := time.NewTicker(fds.rateLimit)
 42		defer ticker.Stop()
 43
 44		itemID := 0
 45		for {
 46			select {
 47			case <-ctx.Done():
 48				return
 49			case <-ticker.C:
 50				item := DataItem{
 51					ID:        itemID,
 52					Value:     rand.Float64() * 100,
 53					Source:    "file:" + fds.filename,
 54					Timestamp: time.Now(),
 55					Metadata: map[string]interface{}{
 56						"type": "generated",
 57					},
 58				}
 59
 60				select {
 61				case ch <- item:
 62					itemID++
 63				case <-ctx.Done():
 64					return
 65				}
 66			}
 67		}
 68	}()
 69
 70	return ch
 71}
 72
 73// NetworkDataSource implementation
 74type NetworkDataSource struct {
 75	url       string
 76	rateLimit time.Duration
 77}
 78
 79func Stream(ctx context.Context) <-chan DataItem {
 80	ch := make(chan DataItem, 10)
 81
 82	go func() {
 83		defer close(ch)
 84
 85		ticker := time.NewTicker(nds.rateLimit)
 86		defer ticker.Stop()
 87
 88		itemID := 1000
 89		for {
 90			select {
 91			case <-ctx.Done():
 92				return
 93			case <-ticker.C:
 94				// Simulate network latency
 95				time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
 96
 97				item := DataItem{
 98					ID:        itemID,
 99					Value:     rand.Float64() * 200,
100					Source:    "network:" + nds.url,
101					Timestamp: time.Now(),
102					Metadata: map[string]interface{}{
103						"type": "fetched",
104					},
105				}
106
107				select {
108				case ch <- item:
109					itemID++
110				case <-ctx.Done():
111					return
112				}
113			}
114		}
115	}()
116
117	return ch
118}
119
120// FanOut implementation
121type FanOut struct {
122	input    <-chan DataItem
123	outputs  []chan DataItem
124	workerWg sync.WaitGroup
125	ctx      context.Context
126}
127
128func NewFanOut(ctx context.Context, input <-chan DataItem, outputCount int) *FanOut {
129	outputs := make([]chan DataItem, outputCount)
130	for i := range outputs {
131		outputs[i] = make(chan DataItem, 100)
132	}
133
134	return &FanOut{
135		input:   input,
136		outputs: outputs,
137		ctx:     ctx,
138	}
139}
140
141func Start() {
142	for i, output := range fo.outputs {
143		fo.workerWg.Add(1)
144		go fo.distribute(i, output)
145	}
146}
147
148func distribute(workerID int, output chan DataItem) {
149	defer fo.workerWg.Done()
150	defer close(output)
151
152	for {
153		select {
154		case <-fo.ctx.Done():
155			return
156		case item, ok := <-fo.input:
157			if !ok {
158				return
159			}
160			select {
161			case output <- item:
162			case <-fo.ctx.Done():
163				return
164			}
165		}
166	}
167}
168
169func GetOutputs() []<-chan DataItem {
170	outputs := make([]<-chan DataItem, len(fo.outputs))
171	for i, output := range fo.outputs {
172		outputs[i] = output
173	}
174	return outputs
175}
176
177func Wait() {
178	fo.workerWg.Wait()
179}
180
181// FanIn implementation
182type FanIn struct {
183	inputs   []<-chan DataItem
184	output   chan DataItem
185	workerWg sync.WaitGroup
186	ctx      context.Context
187}
188
189func NewFanIn(ctx context.Context, inputs []<-chan DataItem) *FanIn {
190	return &FanIn{
191		inputs: inputs,
192		output: make(chan DataItem, 100),
193		ctx:    ctx,
194	}
195}
196
197func Start() {
198	for i, input := range fi.inputs {
199		fi.workerWg.Add(1)
200		go fi.collect(i, input)
201	}
202
203	// Start closer goroutine
204	go func() {
205		fi.workerWg.Wait()
206		close(fi.output)
207	}()
208}
209
210func collect(workerID int, input <-chan DataItem) {
211	defer fi.workerWg.Done()
212
213	for {
214		select {
215		case <-fi.ctx.Done():
216			return
217		case item, ok := <-input:
218			if !ok {
219				return
220			}
221			select {
222			case fi.output <- item:
223			case <-fi.ctx.Done():
224				return
225			}
226		}
227	}
228}
229
230func Output() <-chan DataItem {
231	return fi.output
232}
233
234// Processing functions
235func ValidateData(item DataItem) {
236	if item.Value < 0 {
237		return item, fmt.Errorf("invalid negative value: %f", item.Value)
238	}
239
240	// Add validation metadata
241	if item.Metadata == nil {
242		item.Metadata = make(map[string]interface{})
243	}
244	item.Metadata["validated"] = true
245	item.Metadata["validation_time"] = time.Now()
246
247	return item, nil
248}
249
250func TransformData(item DataItem) {
251	// Apply transformation
252	item.Value = item.Value * 2.0
253
254	// Add transformation metadata
255	if item.Metadata == nil {
256		item.Metadata = make(map[string]interface{})
257	}
258	item.Metadata["transformed"] = true
259	item.Metadata["transform_factor"] = 2.0
260	item.Metadata["transform_time"] = time.Now()
261
262	return item, nil
263}
264
265func EnrichData(item DataItem) {
266	// Add enrichment
267	if item.Metadata == nil {
268		item.Metadata = make(map[string]interface{})
269	}
270	item.Metadata["enriched"] = true
271	item.Metadata["category"] = "processed"
272	item.Metadata["priority"] = "normal"
273	item.Metadata["enrich_time"] = time.Now()
274
275	return item, nil
276}
277
278// PipelineStage implementation
279type PipelineStage struct {
280	name       string
281	input      <-chan DataItem
282	output     chan DataItem
283	process    func(DataItem)
284	errorChan  chan error
285	workerWg   sync.WaitGroup
286	workers    int
287	ctx        context.Context
288	metrics    *StageMetrics
289}
290
291type StageMetrics struct {
292	Processed int64
293	Errors    int64
294	Duration  int64
295}
296
297func NewPipelineStage(ctx context.Context, name string, input <-chan DataItem, workers int, process func(DataItem)) *PipelineStage {
298	return &PipelineStage{
299		name:      name,
300		input:     input,
301		output:    make(chan DataItem, 100),
302		process:   process,
303		errorChan: make(chan error, 10),
304		workers:   workers,
305		ctx:       ctx,
306		metrics:   &StageMetrics{},
307	}
308}
309
310func Start() {
311	for i := 0; i < ps.workers; i++ {
312		ps.workerWg.Add(1)
313		go ps.worker(i)
314	}
315
316	// Start output closer
317	go func() {
318		ps.workerWg.Wait()
319		close(ps.output)
320		close(ps.errorChan)
321	}()
322}
323
324func worker(workerID int) {
325	defer ps.workerWg.Done()
326
327	for {
328		select {
329		case <-ps.ctx.Done():
330			return
331		case item, ok := <-ps.input:
332			if !ok {
333				return
334			}
335
336			start := time.Now()
337			result, err := ps.process(item)
338			duration := time.Since(start)
339
340			atomic.AddInt64(&ps.metrics.Processed, 1)
341			atomic.AddInt64(&ps.metrics.Duration, duration.Nanoseconds())
342
343			if err != nil {
344				atomic.AddInt64(&ps.metrics.Errors, 1)
345				select {
346				case ps.errorChan <- fmt.Errorf("stage %s worker %d: %w", ps.name, workerID, err):
347				case <-ps.ctx.Done():
348					return
349				}
350				continue
351			}
352
353			select {
354			case ps.output <- result:
355			case <-ps.ctx.Done():
356				return
357			}
358		}
359	}
360}
361
362func Output() <-chan DataItem {
363	return ps.output
364}
365
366func Errors() <-chan error {
367	return ps.errorChan
368}
369
370func GetMetrics() StageMetrics {
371	return StageMetrics{
372		Processed: atomic.LoadInt64(&ps.metrics.Processed),
373		Errors:    atomic.LoadInt64(&ps.metrics.Errors),
374		Duration:  atomic.LoadInt64(&ps.metrics.Duration),
375	}
376}
377
378// ConcurrentProcessor implementation
379type ConcurrentProcessor struct {
380	sources   []DataSource
381	stages    []*PipelineStage
382	fanOut    *FanOut
383	fanIn     *FanIn
384	output    chan DataItem
385	ctx       context.Context
386	cancel    context.CancelFunc
387	wg        sync.WaitGroup
388	stats     *ProcessorStats
389}
390
391type ProcessorStats struct {
392	ItemsProcessed int64
393	TotalErrors    int64
394	StartTime      time.Time
395}
396
397func NewConcurrentProcessor() *ConcurrentProcessor {
398	ctx, cancel := context.WithCancel(context.Background())
399
400	return &ConcurrentProcessor{
401		ctx:    ctx,
402		cancel: cancel,
403		output: make(chan DataItem, 100),
404		stats:  &ProcessorStats{StartTime: time.Now()},
405	}
406}
407
408func AddSource(source DataSource) {
409	cp.sources = append(cp.sources, source)
410}
411
412func BuildPipeline() {
413	// Create source channels
414	sourceChannels := make([]<-chan DataItem, len(cp.sources))
415	for i, source := range cp.sources {
416		sourceChannels[i] = source.Stream(cp.ctx)
417	}
418
419	// Fan-in all sources
420	cp.fanIn = NewFanIn(cp.ctx, sourceChannels)
421	cp.fanIn.Start()
422
423	// Fan-out to multiple workers
424	cp.fanOut = NewFanOut(cp.ctx, cp.fanIn.Output(), runtime.NumCPU())
425	cp.fanOut.Start()
426
427	// Create pipeline stages
428	stageInputs := cp.fanOut.GetOutputs()
429
430	// Validation stage
431	validationStage := NewPipelineStage(cp.ctx, "validation", stageInputs[0], 2, ValidateData)
432	cp.stages = append(cp.stages, validationStage)
433	validationStage.Start()
434
435	// Transformation stage
436	transformStage := NewPipelineStage(cp.ctx, "transform", validationStage.Output(), 2, TransformData)
437	cp.stages = append(cp.stages, transformStage)
438	transformStage.Start()
439
440	// Enrichment stage
441	enrichmentStage := NewPipelineStage(cp.ctx, "enrichment", transformStage.Output(), 2, EnrichData)
442	cp.stages = append(cp.stages, enrichmentStage)
443	enrichmentStage.Start()
444
445	// Start result collector
446	cp.wg.Add(1)
447	go cp.collectResults(enrichmentStage.Output())
448}
449
450func collectResults(input <-chan DataItem) {
451	defer cp.wg.Done()
452	defer close(cp.output)
453
454	for {
455		select {
456		case <-cp.ctx.Done():
457			return
458		case item, ok := <-input:
459			if !ok {
460				return
461			}
462			atomic.AddInt64(&cp.stats.ItemsProcessed, 1)
463
464			select {
465			case cp.output <- item:
466			case <-cp.ctx.Done():
467				return
468			}
469		}
470	}
471}
472
473func Start() error {
474	if len(cp.sources) == 0 {
475		return fmt.Errorf("no data sources configured")
476	}
477
478	cp.BuildPipeline()
479
480	// Start monitoring
481	cp.wg.Add(1)
482	go cp.monitor()
483
484	return nil
485}
486
487func monitor() {
488	defer cp.wg.Done()
489
490	ticker := time.NewTicker(5 * time.Second)
491	defer ticker.Stop()
492
493	for {
494		select {
495		case <-cp.ctx.Done():
496			return
497		case <-ticker.C:
498			cp.logStats()
499		}
500	}
501}
502
503func logStats() {
504	stats := *cp.stats
505	processed := atomic.LoadInt64(&stats.ItemsProcessed)
506	errors := atomic.LoadInt64(&stats.TotalErrors)
507	duration := time.Since(stats.StartTime)
508	rate := float64(processed) / duration.Seconds()
509
510	log.Printf("Processor Stats - Processed: %d, Errors: %d, Rate: %.2f items/sec, Duration: %v",
511		processed, errors, rate, duration)
512
513	// Log stage metrics
514	for _, stage := range cp.stages {
515		metrics := stage.GetMetrics()
516		avgDuration := time.Duration(0)
517		if metrics.Processed > 0 {
518			avgDuration = time.Duration(metrics.Duration / metrics.Processed)
519		}
520		log.Printf("Stage %s - Processed: %d, Errors: %d, Avg Duration: %v",
521			stage.name, metrics.Processed, metrics.Errors, avgDuration)
522	}
523}
524
525func Output() <-chan DataItem {
526	return cp.output
527}
528
529func Shutdown() {
530	cp.cancel()
531
532	// Wait for all components to finish
533	cp.fanIn.Wait()
534	cp.fanOut.Wait()
535	for _, stage := range cp.stages {
536		stage.Wait()
537	}
538	cp.wg.Wait()
539
540	log.Println("Concurrent processor shutdown complete")
541}
542
543func handleSignals(processor *ConcurrentProcessor) {
544	sigChan := make(chan os.Signal, 1)
545	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
546
547	<-sigChan
548	log.Println("Received shutdown signal, shutting down gracefully...")
549	processor.Shutdown()
550}
551
552func main() {
553	log.Printf("Starting concurrent processor with %d CPUs", runtime.NumCPU())
554
555	// Create processor
556	processor := NewConcurrentProcessor()
557
558	// Add data sources
559	processor.AddSource(&FileDataSource{
560		filename:  "data1.txt",
561		rateLimit: 10 * time.Millisecond,
562	})
563
564	processor.AddSource(&FileDataSource{
565		filename:  "data2.txt",
566		rateLimit: 15 * time.Millisecond,
567	})
568
569	processor.AddSource(&NetworkDataSource{
570		url:       "api.example.com",
571		rateLimit: 20 * time.Millisecond,
572	})
573
574	// Start processing
575	if err := processor.Start(); err != nil {
576		log.Fatal(err)
577	}
578
579	// Process results
580	go func() {
581		for item := range processor.Output() {
582			log.Printf("Processed item: ID=%d, Value=%.2f, Source=%s",
583				item.ID, item.Value, item.Source)
584		}
585	}()
586
587	// Handle graceful shutdown
588	handleSignals(processor)
589}

Testing Your Solution

1. Basic Functionality Test

1# Run the processor
2go run main.go
3
4# Test with different data rates
5# Modify rateLimit values in data sources to see how the system handles varying load

2. Concurrency Testing

1# Test with many sources
2# Add more data sources to test fan-in scalability
3
4# Test with different worker counts
5# Modify the number of workers in pipeline stages

3. Graceful Shutdown Test

1# Send SIGINT signal
2kill -INT <pid>
3
4# Verify all goroutines are cleaned up properly
5# Check for resource leaks

4. Performance Testing

1# Monitor CPU usage
2# Check goroutine count with runtime.NumGoroutine()
3# Measure throughput
4# Measure latency distribution

Extension Challenges

  1. Add backpressure handling - Implement flow control to prevent memory buildup
  2. Add batch processing - Process items in batches for better throughput
  3. Add priority queues - Implement priority-based processing
  4. Add distributed processing - Scale across multiple machines
  5. Add persistence - Handle system restarts and state recovery

Key Takeaways

  • Fan-out pattern distributes work across multiple workers for parallel processing
  • Fan-in pattern combines results from multiple concurrent operations
  • Pipeline stages enable step-by-step processing with concurrent workers
  • Context cancellation ensures graceful shutdown and resource cleanup
  • Buffered channels help smooth out processing rate variations
  • Worker pools provide efficient resource utilization
  • Monitoring and metrics are essential for understanding system behavior

This exercise demonstrates advanced Go concurrency patterns that are essential for building high-performance, scalable concurrent systems.