Why This Matters - The Foundation of High-Performance Systems
Concurrency is not just a feature in Go - it's the core philosophy that makes Go uniquely suited for building scalable, high-performance systems. Unlike languages that bolt concurrency on as an afterthought, Go was designed from day one with concurrency as a first-class citizen.
Real-world impact: Every modern application needs to handle multiple things simultaneously - web servers serving thousands of requests, data pipelines processing streams, microservices coordinating across services. Go's concurrency model makes these scenarios not just possible, but elegant and efficient.
Business value: Mastering Go's concurrency enables you to:
- Build high-throughput web services that scale with load
- Create responsive applications that never block on single operations
- Process data streams efficiently with pipelines and workers
- Coordinate distributed systems with reliable communication patterns
- Maximize hardware utilization across multiple CPU cores
Performance advantage: Go's lightweight goroutines and efficient channels mean you can handle hundreds of thousands of concurrent operations where other languages would require complex thread pools or async frameworks.
Learning Objectives
By the end of this tutorial, you will be able to:
- Create and manage goroutines for concurrent execution
- Design communication patterns using channels for safe data sharing
- Implement complex coordination with select statements and timeouts
- Apply synchronization patterns like mutexes and WaitGroups
- Build worker pools and pipelines for scalable processing
- Avoid common concurrency pitfalls like race conditions and deadlocks
- Use context for cancellation and timeout management
- Apply production-ready patterns for concurrent systems
Core Concepts - Understanding Go's Concurrency Model
The Philosophy: Communicating Sequential Processes
Go's concurrency is based on CSP - Communicating Sequential Processes, a mathematical model from 1978 by Tony Hoare. Instead of sharing memory through locks, Go encourages sharing memory by communicating through channels.
The mantra: "Do not communicate by sharing memory; instead, share memory by communicating."
Why this matters: Traditional shared-memory concurrency creates:
- Race conditions - unpredictable behavior when multiple goroutines access shared data
- Complex lock hierarchies - deadlocks when goroutines wait for each other
- Performance bottlenecks - contention when many goroutines compete for the same locks
- Hard to reason about - execution order becomes unpredictable
Go's CSP approach provides:
- Safe communication - channels are thread-safe by design
- Clear ownership - data flows from producer to consumer
- Composable patterns - pipelines, fan-in/fan-out, worker pools
- Deterministic behavior - no race conditions when using channels correctly
The Coffee Shop Analogy
Consider a busy coffee shop during morning rush:
Sequential approach (one barista, one task at a time):
- Customer 1: Order → Make coffee → Serve (4 minutes)
- Customer 2: Order → Make coffee → Serve (4 minutes)
- Customer 3: Order → Make coffee → Serve (4 minutes)
- Total time: 12 minutes for 3 customers
Concurrent approach (one barista, overlapping tasks):
- Start Customer 1's order → Start coffee brewing
- Take Customer 2's order while Customer 1's coffee brews
- Finish Customer 1's coffee → Serve Customer 1
- Start Customer 2's coffee → Take Customer 3's order
- Total time: ~6 minutes for 3 customers
Parallel approach (three baristas working together):
- Barista 1: Customer 1 (4 minutes)
- Barista 2: Customer 2 (4 minutes)
- Barista 3: Customer 3 (4 minutes)
- Total time: 4 minutes for 3 customers
This demonstrates:
- Concurrency: Managing multiple tasks that may run at overlapping times
- Parallelism: Actually running multiple tasks simultaneously
- Go's model: Goroutines are concurrent by design, parallel when resources allow
Goroutines vs Traditional Threads
Go's goroutines are fundamentally different from traditional threads:
| Feature | Traditional Threads | Goroutines |
|---|---|---|
| Memory usage | ~1-2MB stack per thread | ~2KB initial stack (grows/shrinks dynamically) |
| Creation cost | Expensive system call | ~100x faster (runtime managed) |
| Scheduling | OS scheduler (preemptive) | Go runtime (cooperative + preemptive) |
| Context switching | ~1-2µs (kernel mode) | ~200ns (user space) |
| Count | Thousands at most | Millions easily |
| Communication | Shared memory + locks | Channels (type-safe) |
Why this matters: A single Go program can easily handle hundreds of thousands of concurrent operations while using minimal memory, making it perfect for microservices and high-throughput systems.
The Go scheduler: Go uses an M:N scheduler, where M goroutines are multiplexed onto N OS threads. This means:
- Goroutines blocked on I/O don't block OS threads
- CPU-intensive goroutines are automatically distributed across cores
- The runtime handles all scheduling complexity for you
Understanding the M:N Scheduler in Detail:
Go's scheduler is one of its most powerful features, operating at the user-space level for maximum efficiency:
The GMP Model:
- G (Goroutine): The actual goroutine structure with stack, instruction pointer, and scheduling information
- M (Machine): An OS thread that executes goroutines
- P (Processor): A logical processor representing resources needed to execute Go code; limited to GOMAXPROCS
How it works:
- Each P has a local run queue of goroutines ready to execute
- M's (OS threads) must acquire a P to run goroutines
- When a goroutine blocks on a system call, the M releases its P so other goroutines can continue
- Work-stealing between P's ensures load balancing
Preemption and fairness:
- Since Go 1.14, goroutines can be preempted at function calls and certain loop conditions
- This prevents a single CPU-bound goroutine from monopolizing a P
- Network I/O uses non-blocking system calls, allowing other goroutines to run during waits
Performance implications:
- Creating millions of goroutines is practical due to small memory footprint
- Context switching between goroutines is 10x-100x faster than thread switching
- Go runtime automatically scales parallelism based on available CPU cores
- GOMAXPROCS controls the number of P's (defaults to number of CPUs)
Example demonstrating scheduler efficiency:
1// run
2package main
3
4import (
5 "fmt"
6 "runtime"
7 "sync"
8 "time"
9)
10
11func main() {
12 // Display scheduler configuration
13 fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
14 fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
15 fmt.Printf("NumGoroutine (start): %d\n\n", runtime.NumGoroutine())
16
17 // Create many goroutines to demonstrate lightweight nature
18 const numGoroutines = 10000
19 var wg sync.WaitGroup
20 start := time.Now()
21
22 for i := 1; i <= numGoroutines; i++ {
23 wg.Add(1)
24 go func(id int) {
25 defer wg.Done()
26 // Simulate minimal work
27 time.Sleep(time.Millisecond * 10)
28 }(i)
29 }
30
31 fmt.Printf("Created %d goroutines in %v\n", numGoroutines, time.Since(start))
32 fmt.Printf("NumGoroutine (peak): %d\n", runtime.NumGoroutine())
33
34 wg.Wait()
35 elapsed := time.Since(start)
36
37 fmt.Printf("\nCompleted %d goroutines in %v\n", numGoroutines, elapsed)
38 fmt.Printf("Average per goroutine: %v\n", elapsed/time.Duration(numGoroutines))
39 fmt.Printf("NumGoroutine (end): %d\n", runtime.NumGoroutine())
40}
Key insights from this example:
- Creating 10,000 goroutines takes mere microseconds
- Peak memory usage remains low despite thousands of concurrent goroutines
- The scheduler efficiently multiplexes goroutines across available CPU cores
- Cleanup is automatic when goroutines complete
Channels: Type-Safe Communication
Channels are Go's way of enabling safe communication between goroutines:
Key properties:
- Type-safe: Channels have a specific type (e.g.,
chan int,chan string) - Synchronization: Send/receive operations synchronize goroutines
- Buffered or unbuffered: Control blocking behavior
- Directional: Can be send-only (
chan<-) or receive-only (<-chan) - First-class values: Can be passed to functions, stored in structs, etc.
Channel operations:
1ch := make(chan int) // Create unbuffered channel
2ch := make(chan int, 10) // Create buffered channel (capacity 10)
3ch <- 42 // Send value to channel (blocks if full)
4value := <-ch // Receive value from channel (blocks if empty)
5value, ok := <-ch // Receive with closed check
6close(ch) // Close channel (no more sends allowed)
Unbuffered vs Buffered Channels - Deep Dive:
Understanding the difference between unbuffered and buffered channels is critical for designing correct concurrent systems:
Unbuffered channels (make(chan T)):
- Synchronous communication: Send blocks until receiver is ready
- Guaranteed delivery: Sender knows the value was received
- Use cases: Handshakes, acknowledgments, strict synchronization
Buffered channels (make(chan T, capacity)):
- Asynchronous communication: Send only blocks when buffer is full
- Decoupling: Sender can continue without waiting for receiver
- Use cases: Work queues, rate limiting, smoothing bursts
Practical comparison:
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9func demonstrateUnbuffered() {
10 fmt.Println("=== Unbuffered Channel Demo ===")
11 ch := make(chan string)
12
13 // Sender goroutine
14 go func() {
15 fmt.Println("Sender: Preparing to send...")
16 ch <- "message"
17 fmt.Println("Sender: Sent! (receiver must have received)")
18 }()
19
20 // Main goroutine receives
21 time.Sleep(time.Second) // Delay to show blocking behavior
22 fmt.Println("Receiver: Ready to receive")
23 msg := <-ch
24 fmt.Printf("Receiver: Got '%s'\n\n", msg)
25}
26
27func demonstrateBuffered() {
28 fmt.Println("=== Buffered Channel Demo ===")
29 ch := make(chan string, 2) // Buffer size 2
30
31 // Sender can send multiple values without blocking
32 fmt.Println("Sender: Sending multiple values...")
33 ch <- "first"
34 fmt.Println("Sender: Sent 'first' (didn't block)")
35 ch <- "second"
36 fmt.Println("Sender: Sent 'second' (didn't block)")
37
38 // Buffer is now full - next send would block
39 fmt.Println("Sender: Buffer is full, would block on next send")
40
41 // Receive values later
42 time.Sleep(time.Second)
43 fmt.Println("\nReceiver: Reading values...")
44 fmt.Printf("Receiver: Got '%s'\n", <-ch)
45 fmt.Printf("Receiver: Got '%s'\n\n", <-ch)
46}
47
48func demonstrateChannelDirections() {
49 fmt.Println("=== Channel Directions Demo ===")
50
51 // Send-only channel parameter
52 sendOnly := func(ch chan<- int, value int) {
53 ch <- value
54 // Cannot receive: val := <-ch // Compile error
55 }
56
57 // Receive-only channel parameter
58 receiveOnly := func(ch <-chan int) int {
59 return <-ch
60 // Cannot send: ch <- 42 // Compile error
61 }
62
63 ch := make(chan int, 1)
64 sendOnly(ch, 42)
65 result := receiveOnly(ch)
66 fmt.Printf("Sent and received: %d\n", result)
67}
68
69func main() {
70 demonstrateUnbuffered()
71 demonstrateBuffered()
72 demonstrateChannelDirections()
73}
Channel closing semantics:
Closing channels signals "no more values" and enables important patterns:
- Closed channels never block on receive: Receive returns zero value and
falseforok - Range over channels: Automatically stops when channel is closed
- Multiple receivers: All can detect closure
- Cannot send to closed channel: Causes panic
- Cannot close nil channel: Causes panic
- Closing already-closed channel: Causes panic
Pattern: Broadcast shutdown signal:
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10func worker(id int, shutdown <-chan struct{}, wg *sync.WaitGroup) {
11 defer wg.Done()
12
13 for {
14 select {
15 case <-shutdown:
16 fmt.Printf("Worker %d: Received shutdown signal\n", id)
17 return
18 default:
19 // Do work
20 fmt.Printf("Worker %d: Working...\n", id)
21 time.Sleep(time.Millisecond * 300)
22 }
23 }
24}
25
26func main() {
27 shutdown := make(chan struct{})
28 var wg sync.WaitGroup
29
30 // Start 3 workers
31 for i := 1; i <= 3; i++ {
32 wg.Add(1)
33 go worker(i, shutdown, &wg)
34 }
35
36 // Let them work
37 time.Sleep(time.Second)
38
39 // Broadcast shutdown by closing channel
40 fmt.Println("\nMain: Broadcasting shutdown signal...")
41 close(shutdown)
42
43 wg.Wait()
44 fmt.Println("Main: All workers stopped")
45}
Why use struct{} for signals?
- Zero memory footprint (empty struct occupies 0 bytes)
- Clear intent: channel carries signal, not data
- Common Go idiom for coordination
Practical Examples - From Basics to Mastery
Example 1: Basic Goroutine Creation and Synchronization
Let's start with simple concurrent execution:
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10// Simple task that simulates work
11func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
12 // Defer to signal when done
13 defer wg.Done()
14
15 fmt.Printf("Worker %d starting\n", id)
16
17 // Process jobs until channel is closed
18 for job := range jobs {
19 fmt.Printf("Worker %d processing job %d\n", id, job)
20
21 // Simulate work
22 time.Sleep(time.Millisecond * 100)
23
24 // Send result
25 result := job * job // Square the number
26 results <- result
27 }
28
29 fmt.Printf("Worker %d finished\n", id)
30}
31
32func main() {
33 // Create channels for jobs and results
34 jobs := make(chan int, 10)
35 results := make(chan int, 10)
36
37 var wg sync.WaitGroup
38
39 // Start 3 worker goroutines
40 numWorkers := 3
41 for i := 1; i <= numWorkers; i++ {
42 wg.Add(1) // Add worker to waitgroup
43 go worker(i, jobs, results, &wg)
44 }
45
46 fmt.Printf("Started %d workers\n", numWorkers)
47
48 // Send 5 jobs
49 numJobs := 5
50 for j := 1; j <= numJobs; j++ {
51 jobs <- j
52 fmt.Printf("Sent job %d to workers\n", j)
53 }
54
55 close(jobs) // Signal no more jobs
56
57 // Wait for all workers to finish in a separate goroutine
58 go func() {
59 wg.Wait()
60 close(results)
61 }()
62
63 // Collect results
64 fmt.Println("Collecting results:")
65 for result := range results {
66 fmt.Printf("Received result: %d\n", result)
67 }
68
69 fmt.Println("All workers completed")
70}
What this demonstrates:
- Goroutine creation with
gokeyword for concurrent execution - Channel communication for safe data passing between goroutines
- WaitGroup synchronization to wait for all goroutines to finish
- Channel closing to signal completion of work
- Buffered channels to prevent blocking during initial job distribution
- Range over channels to process values until channel is closed
Key concepts illustrated:
- Goroutines are lightweight and easy to create
- Channels provide thread-safe communication
- WaitGroups coordinate multiple goroutines
- Buffering prevents deadlocks in producer-consumer patterns
- Closing channels signals completion
Example 2: Select Statement for Multiple Channels
The select statement is Go's concurrency multiplexer:
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9func main() {
10 // Create multiple channels
11 ch1 := make(chan string)
12 ch2 := make(chan string)
13 quit := make(chan bool)
14
15 // Goroutine 1: sends to ch1
16 go func() {
17 for i := 1; i <= 3; i++ {
18 time.Sleep(1000 * time.Millisecond)
19 ch1 <- fmt.Sprintf("Channel 1 message %d", i)
20 }
21 }()
22
23 // Goroutine 2: sends to ch2
24 go func() {
25 for i := 1; i <= 3; i++ {
26 time.Sleep(1500 * time.Millisecond)
27 ch2 <- fmt.Sprintf("Channel 2 message %d", i)
28 }
29 }()
30
31 // Goroutine 3: signals quit after 5 seconds
32 go func() {
33 time.Sleep(5 * time.Second)
34 quit <- true
35 }()
36
37 // Main goroutine: select from multiple channels
38 fmt.Println("Starting select loop...")
39 messageCount := 0
40
41 for {
42 select {
43 case msg1 := <-ch1:
44 fmt.Printf("Received from ch1: %s\n", msg1)
45 messageCount++
46
47 case msg2 := <-ch2:
48 fmt.Printf("Received from ch2: %s\n", msg2)
49 messageCount++
50
51 case <-quit:
52 fmt.Printf("Quit signal received after %d messages\n", messageCount)
53 return
54
55 case <-time.After(2 * time.Second):
56 fmt.Println("Timeout: no message received in 2 seconds")
57 }
58 }
59}
What this demonstrates:
- Select statement for multiplexing multiple channels
- Non-blocking communication with multiple concurrent sources
- Timeout handling using
time.After() - Graceful shutdown with quit channel
- Case selection - select picks the first ready channel
Key patterns:
- Select blocks until one case can proceed
- If multiple cases are ready, one is chosen at random
- Timeout cases prevent indefinite blocking
- Quit channels enable graceful shutdown
Example 3: Pipeline Pattern with Multiple Stages
Let's build a data processing pipeline:
1// run
2package main
3
4import (
5 "fmt"
6 "math/rand"
7 "sync"
8 "time"
9)
10
11// Stage 1: Generate numbers
12func generator(nums []int, out chan<- int, wg *sync.WaitGroup) {
13 defer wg.Done()
14 defer close(out)
15
16 for _, num := range nums {
17 out <- num
18 fmt.Printf("Generated: %d\n", num)
19 time.Sleep(time.Millisecond * 100)
20 }
21 fmt.Println("Generator finished")
22}
23
24// Stage 2: Square numbers
25func squarer(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
26 defer wg.Done()
27 defer close(out)
28
29 for num := range in {
30 squared := num * num
31 out <- squared
32 fmt.Printf("Squared %d -> %d\n", num, squared)
33 time.Sleep(time.Millisecond * 50)
34 }
35 fmt.Println("Squarer finished")
36}
37
38// Stage 3: Filter results
39func filter(in <-chan int, out chan<- int, threshold int, wg *sync.WaitGroup) {
40 defer wg.Done()
41 defer close(out)
42
43 for num := range in {
44 if num > threshold {
45 out <- num
46 fmt.Printf("Filtered: %d (> %d)\n", num, threshold)
47 } else {
48 fmt.Printf("Rejected: %d (<= %d)\n", num, threshold)
49 }
50 }
51 fmt.Println("Filter finished")
52}
53
54func main() {
55 // Input data
56 numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
57
58 // Create pipeline stages
59 stage1 := make(chan int) // generator -> squarer
60 stage2 := make(chan int) // squarer -> filter
61 stage3 := make(chan int) // filter -> main
62
63 var wg sync.WaitGroup
64
65 fmt.Println("Starting pipeline with threshold of 50")
66
67 // Start pipeline stages
68 wg.Add(3)
69
70 go generator(numbers, stage1, &wg)
71 go squarer(stage1, stage2, &wg)
72 go filter(stage2, stage3, 50, &wg)
73
74 // Collect final results
75 go func() {
76 wg.Wait()
77 fmt.Println("Pipeline completed")
78 }()
79
80 // Process results
81 fmt.Println("\nFinal results:")
82 count := 0
83 for result := range stage3 {
84 count++
85 fmt.Printf("Result %d: %d\n", count, result)
86 }
87
88 if count == 0 {
89 fmt.Println("No results above threshold")
90 } else {
91 fmt.Printf("\nTotal results: %d\n", count)
92 }
93}
What this demonstrates:
- Pipeline pattern with data flowing through multiple stages
- Channel coordination between different processing stages
- Graceful shutdown with WaitGroups and channel closing
- Filtering logic integrated into pipeline flow
- Defer for cleanup ensures channels are closed properly
Advanced concepts illustrated:
- Each stage runs concurrently
- Backpressure naturally managed by unbuffered channels
- Pipeline termination signaling through channel closing
- Stages can run at different speeds
Example 4: Advanced Worker Pool with Context
Let's build a production-ready worker pool:
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "math/rand"
8 "sync"
9 "time"
10)
11
12// Job represents work to be done
13type Job struct {
14 ID int
15 Data string
16 Duration time.Duration
17}
18
19// Result represents job completion
20type Result struct {
21 JobID int
22 Success bool
23 Duration time.Duration
24 Error error
25}
26
27// Worker processes jobs from a channel
28type Worker struct {
29 id int
30 jobChan <-chan Job
31 resultChan chan<- Result
32 ctx context.Context
33}
34
35func NewWorker(id int, jobChan <-chan Job, resultChan chan<- Result, ctx context.Context) *Worker {
36 return &Worker{
37 id: id,
38 jobChan: jobChan,
39 resultChan: resultChan,
40 ctx: ctx,
41 }
42}
43
44func (w *Worker) Start() {
45 fmt.Printf("Worker %d starting\n", w.id)
46
47 for {
48 select {
49 case job, ok := <-w.jobChan:
50 if !ok {
51 fmt.Printf("Worker %d: job channel closed, shutting down\n", w.id)
52 return
53 }
54
55 // Process job with context awareness
56 result := w.processJob(job)
57 result.JobID = job.ID
58
59 select {
60 case w.resultChan <- result:
61 // Result sent successfully
62 case <-w.ctx.Done():
63 fmt.Printf("Worker %d: context cancelled while sending result\n", w.id)
64 return
65 }
66
67 case <-w.ctx.Done():
68 fmt.Printf("Worker %d: context cancelled, shutting down\n", w.id)
69 return
70 }
71 }
72}
73
74func (w *Worker) processJob(job Job) Result {
75 startTime := time.Now()
76
77 // Simulate job processing with potential cancellation
78 processingDone := make(chan bool)
79
80 go func() {
81 // Simulate work
82 time.Sleep(job.Duration)
83
84 // Simulate occasional failure (10% chance)
85 if rand.Intn(10) == 0 {
86 processingDone <- false
87 } else {
88 processingDone <- true
89 }
90 }()
91
92 select {
93 case success := <-processingDone:
94 duration := time.Since(startTime)
95 if success {
96 return Result{
97 Success: true,
98 Duration: duration,
99 }
100 }
101 return Result{
102 Success: false,
103 Duration: duration,
104 Error: fmt.Errorf("job %d failed during processing", job.ID),
105 }
106
107 case <-w.ctx.Done():
108 return Result{
109 Success: false,
110 Duration: time.Since(startTime),
111 Error: fmt.Errorf("job cancelled by context"),
112 }
113 }
114}
115
116// WorkerPool manages multiple workers
117type WorkerPool struct {
118 workers []*Worker
119 jobChan chan Job
120 resultChan chan Result
121 ctx context.Context
122 cancel context.CancelFunc
123 wg sync.WaitGroup
124}
125
126func NewWorkerPool(numWorkers, jobQueueSize int) *WorkerPool {
127 ctx, cancel := context.WithCancel(context.Background())
128
129 return &WorkerPool{
130 jobChan: make(chan Job, jobQueueSize),
131 resultChan: make(chan Result, jobQueueSize),
132 ctx: ctx,
133 cancel: cancel,
134 }
135}
136
137func (wp *WorkerPool) Start(numWorkers int) {
138 fmt.Printf("Starting worker pool with %d workers\n", numWorkers)
139
140 // Create and start workers
141 wp.workers = make([]*Worker, 0, numWorkers)
142 for i := 1; i <= numWorkers; i++ {
143 worker := NewWorker(i, wp.jobChan, wp.resultChan, wp.ctx)
144 wp.workers = append(wp.workers, worker)
145
146 wp.wg.Add(1)
147 go func() {
148 defer wp.wg.Done()
149 worker.Start()
150 }()
151 }
152}
153
154func (wp *WorkerPool) Submit(job Job) bool {
155 select {
156 case wp.jobChan <- job:
157 return true
158 case <-wp.ctx.Done():
159 fmt.Printf("Pool shutting down, cannot submit job %d\n", job.ID)
160 return false
161 }
162}
163
164func (wp *WorkerPool) Shutdown(timeout time.Duration) {
165 fmt.Println("Initiating worker pool shutdown...")
166
167 // Close job channel to signal no more jobs
168 close(wp.jobChan)
169
170 // Wait for workers to finish with timeout
171 done := make(chan struct{})
172 go func() {
173 wp.wg.Wait()
174 close(done)
175 }()
176
177 select {
178 case <-done:
179 fmt.Println("All workers completed gracefully")
180 case <-time.After(timeout):
181 fmt.Println("Shutdown timed out, forcing termination")
182 wp.cancel()
183 }
184
185 close(wp.resultChan)
186}
187
188func main() {
189 rand.Seed(time.Now().UnixNano())
190
191 // Create worker pool
192 pool := NewWorkerPool(3, 10) // 3 workers, queue size 10
193 pool.Start(3)
194
195 // Submit jobs
196 jobs := make([]Job, 10)
197 for i := 0; i < 10; i++ {
198 jobs[i] = Job{
199 ID: i + 1,
200 Data: fmt.Sprintf("job-data-%d", i+1),
201 Duration: time.Duration(rand.Intn(200)+100) * time.Millisecond,
202 }
203 }
204
205 // Submit jobs concurrently
206 fmt.Println("Submitting jobs...")
207 for _, job := range jobs {
208 if pool.Submit(job) {
209 fmt.Printf("Submitted job %d\n", job.ID)
210 }
211 time.Sleep(time.Millisecond * 50)
212 }
213
214 // Process results
215 fmt.Println("\nProcessing results:")
216 successCount := 0
217 errorCount := 0
218
219 go func() {
220 time.Sleep(3 * time.Second)
221 pool.Shutdown(2 * time.Second)
222 }()
223
224 for result := range pool.resultChan {
225 if result.Success {
226 fmt.Printf("✓ Job %d completed in %v\n", result.JobID, result.Duration)
227 successCount++
228 } else {
229 fmt.Printf("✗ Job %d failed: %v\n", result.JobID, result.Error)
230 errorCount++
231 }
232 }
233
234 // Print statistics
235 fmt.Printf("\n=== Job Statistics ===\n")
236 fmt.Printf("Successful: %d\n", successCount)
237 fmt.Printf("Failed: %d\n", errorCount)
238 fmt.Printf("Total: %d\n", successCount+errorCount)
239}
What this demonstrates:
- Context-based cancellation for graceful shutdown
- Worker pool pattern for managing concurrent resources
- Error handling in concurrent environments
- Resource cleanup and timeout management
- Non-blocking job submission with select statements
- Graceful shutdown with proper cleanup
Production-ready patterns:
- Context propagation through all levels
- Graceful shutdown with timeouts
- Comprehensive error handling
- Resource lifecycle management
- Statistics and monitoring integration
Example 5: Fan-Out Fan-In Pattern
This pattern distributes work across multiple goroutines (fan-out) and collects results (fan-in):
1// run
2package main
3
4import (
5 "fmt"
6 "math/rand"
7 "sync"
8 "time"
9)
10
11// Task represents work to be done
12type Task struct {
13 ID int
14 Value int
15}
16
17// Process tasks with variable complexity
18func worker(id int, tasks <-chan Task, results chan<- int, wg *sync.WaitGroup) {
19 defer wg.Done()
20
21 for task := range tasks {
22 // Simulate variable processing time
23 processingTime := time.Duration(rand.Intn(100)+50) * time.Millisecond
24 time.Sleep(processingTime)
25
26 // Process task (square the value)
27 result := task.Value * task.Value
28 fmt.Printf("Worker %d processed task %d: %d -> %d\n", id, task.ID, task.Value, result)
29
30 results <- result
31 }
32
33 fmt.Printf("Worker %d finished\n", id)
34}
35
36// Fan-out: distribute tasks to multiple workers
37func fanOut(tasks []Task, numWorkers int) <-chan int {
38 taskChan := make(chan Task, len(tasks))
39 resultChan := make(chan int, len(tasks))
40
41 var wg sync.WaitGroup
42
43 // Start workers (fan-out)
44 fmt.Printf("Starting %d workers (fan-out)...\n", numWorkers)
45 for i := 1; i <= numWorkers; i++ {
46 wg.Add(1)
47 go worker(i, taskChan, resultChan, &wg)
48 }
49
50 // Send tasks to workers
51 go func() {
52 for _, task := range tasks {
53 taskChan <- task
54 }
55 close(taskChan)
56 }()
57
58 // Close results channel when all workers finish
59 go func() {
60 wg.Wait()
61 close(resultChan)
62 fmt.Println("All workers completed")
63 }()
64
65 return resultChan
66}
67
68// Fan-in: collect results from multiple channels into one
69func fanIn(channels ...<-chan int) <-chan int {
70 out := make(chan int)
71 var wg sync.WaitGroup
72
73 // Start a goroutine for each input channel
74 for i, ch := range channels {
75 wg.Add(1)
76 go func(channelID int, c <-chan int) {
77 defer wg.Done()
78 for val := range c {
79 fmt.Printf("Fan-in from channel %d: %d\n", channelID, val)
80 out <- val
81 }
82 }(i+1, ch)
83 }
84
85 // Close output channel when all inputs are exhausted
86 go func() {
87 wg.Wait()
88 close(out)
89 fmt.Println("Fan-in completed")
90 }()
91
92 return out
93}
94
95func main() {
96 rand.Seed(time.Now().UnixNano())
97
98 // Create tasks
99 tasks := make([]Task, 20)
100 for i := 0; i < 20; i++ {
101 tasks[i] = Task{
102 ID: i + 1,
103 Value: rand.Intn(10) + 1,
104 }
105 }
106
107 fmt.Println("=== Fan-Out Pattern Demo ===")
108 fmt.Printf("Processing %d tasks with 4 workers\n\n", len(tasks))
109
110 // Fan-out: distribute to 4 workers
111 results := fanOut(tasks, 4)
112
113 // Collect and sum results
114 fmt.Println("\nCollecting results...")
115 sum := 0
116 count := 0
117 for result := range results {
118 sum += result
119 count++
120 }
121
122 fmt.Printf("\n=== Results ===\n")
123 fmt.Printf("Processed tasks: %d\n", count)
124 fmt.Printf("Sum of squared values: %d\n", sum)
125
126 // Demonstrate fan-in pattern
127 fmt.Println("\n=== Fan-In Pattern Demo ===")
128
129 // Create multiple result channels
130 ch1 := make(chan int, 3)
131 ch2 := make(chan int, 3)
132 ch3 := make(chan int, 3)
133
134 // Send values to channels
135 go func() {
136 for i := 1; i <= 3; i++ {
137 ch1 <- i * 10
138 time.Sleep(time.Millisecond * 100)
139 }
140 close(ch1)
141 }()
142
143 go func() {
144 for i := 1; i <= 3; i++ {
145 ch2 <- i * 20
146 time.Sleep(time.Millisecond * 150)
147 }
148 close(ch2)
149 }()
150
151 go func() {
152 for i := 1; i <= 3; i++ {
153 ch3 <- i * 30
154 time.Sleep(time.Millisecond * 200)
155 }
156 close(ch3)
157 }()
158
159 // Fan-in: merge channels
160 merged := fanIn(ch1, ch2, ch3)
161
162 // Collect merged results
163 fmt.Println("Collecting merged results...")
164 mergedSum := 0
165 mergedCount := 0
166 for val := range merged {
167 mergedSum += val
168 mergedCount++
169 }
170
171 fmt.Printf("\n=== Fan-In Results ===\n")
172 fmt.Printf("Total values: %d\n", mergedCount)
173 fmt.Printf("Sum: %d\n", mergedSum)
174}
What this demonstrates:
- Fan-out pattern for distributing work to multiple goroutines
- Fan-in pattern for collecting results from multiple sources
- Dynamic workload distribution across workers
- Result aggregation from concurrent operations
- Pattern composition for complex workflows
Advanced Synchronization Primitives
Beyond channels, Go provides powerful synchronization tools in the sync package for scenarios where channels aren't the best fit:
sync.Mutex and sync.RWMutex
When to use mutexes over channels:
- Protecting shared state accessed by multiple goroutines
- When synchronization is simpler than passing data through channels
- Caching scenarios where reads vastly outnumber writes
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10// Counter with mutex protection
11type Counter struct {
12 mu sync.Mutex
13 value int
14}
15
16func (c *Counter) Increment() {
17 c.mu.Lock()
18 c.value++
19 c.mu.Unlock()
20}
21
22func (c *Counter) Value() int {
23 c.mu.Lock()
24 defer c.mu.Unlock()
25 return c.value
26}
27
28// Cache with RWMutex for read-heavy workloads
29type Cache struct {
30 mu sync.RWMutex
31 data map[string]string
32}
33
34func NewCache() *Cache {
35 return &Cache{
36 data: make(map[string]string),
37 }
38}
39
40func (c *Cache) Get(key string) (string, bool) {
41 c.mu.RLock() // Multiple readers can acquire this simultaneously
42 defer c.mu.RUnlock()
43 val, ok := c.data[key]
44 return val, ok
45}
46
47func (c *Cache) Set(key, value string) {
48 c.mu.Lock() // Exclusive lock for writing
49 defer c.mu.Unlock()
50 c.data[key] = value
51}
52
53func main() {
54 fmt.Println("=== Mutex Demo ===")
55 counter := &Counter{}
56 var wg sync.WaitGroup
57
58 // 10 goroutines incrementing concurrently
59 for i := 0; i < 10; i++ {
60 wg.Add(1)
61 go func(id int) {
62 defer wg.Done()
63 for j := 0; j < 100; j++ {
64 counter.Increment()
65 }
66 fmt.Printf("Goroutine %d completed\n", id)
67 }(i)
68 }
69
70 wg.Wait()
71 fmt.Printf("Final counter value: %d (expected: 1000)\n\n", counter.Value())
72
73 // RWMutex demo
74 fmt.Println("=== RWMutex Cache Demo ===")
75 cache := NewCache()
76
77 // Writer goroutine
78 go func() {
79 for i := 0; i < 5; i++ {
80 cache.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i))
81 fmt.Printf("Writer: Set key%d\n", i)
82 time.Sleep(time.Millisecond * 100)
83 }
84 }()
85
86 // Multiple reader goroutines
87 for i := 0; i < 3; i++ {
88 go func(id int) {
89 for j := 0; j < 5; j++ {
90 if val, ok := cache.Get(fmt.Sprintf("key%d", j)); ok {
91 fmt.Printf("Reader %d: Got key%d=%s\n", id, j, val)
92 }
93 time.Sleep(time.Millisecond * 50)
94 }
95 }(i)
96 }
97
98 time.Sleep(time.Second)
99}
sync.Once - Execute Exactly Once
Perfect for lazy initialization and singleton patterns:
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7)
8
9type Database struct {
10 connection string
11}
12
13var (
14 dbInstance *Database
15 once sync.Once
16)
17
18func GetDatabase() *Database {
19 once.Do(func() {
20 fmt.Println("Initializing database (expensive operation)...")
21 dbInstance = &Database{connection: "db://localhost:5432"}
22 })
23 return dbInstance
24}
25
26func main() {
27 var wg sync.WaitGroup
28
29 // Multiple goroutines trying to get database
30 for i := 1; i <= 5; i++ {
31 wg.Add(1)
32 go func(id int) {
33 defer wg.Done()
34 db := GetDatabase()
35 fmt.Printf("Goroutine %d: Got database %v\n", id, db)
36 }(i)
37 }
38
39 wg.Wait()
40 fmt.Println("\nDatabase initialization happened exactly once")
41}
sync.Cond - Condition Variables
For complex coordination where goroutines wait for specific conditions:
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10type Queue struct {
11 items []int
12 mu sync.Mutex
13 cond *sync.Cond
14}
15
16func NewQueue() *Queue {
17 q := &Queue{}
18 q.cond = sync.NewCond(&q.mu)
19 return q
20}
21
22func (q *Queue) Enqueue(item int) {
23 q.mu.Lock()
24 q.items = append(q.items, item)
25 fmt.Printf("Enqueued: %d (queue size: %d)\n", item, len(q.items))
26 q.cond.Signal() // Wake up one waiting goroutine
27 q.mu.Unlock()
28}
29
30func (q *Queue) Dequeue() int {
31 q.mu.Lock()
32 defer q.mu.Unlock()
33
34 // Wait until items are available
35 for len(q.items) == 0 {
36 fmt.Println("Queue empty, waiting...")
37 q.cond.Wait() // Releases lock and waits
38 }
39
40 item := q.items[0]
41 q.items = q.items[1:]
42 fmt.Printf("Dequeued: %d (queue size: %d)\n", item, len(q.items))
43 return item
44}
45
46func main() {
47 queue := NewQueue()
48 var wg sync.WaitGroup
49
50 // Consumer goroutines
51 for i := 1; i <= 3; i++ {
52 wg.Add(1)
53 go func(id int) {
54 defer wg.Done()
55 for j := 0; j < 3; j++ {
56 item := queue.Dequeue()
57 fmt.Printf("Consumer %d processed: %d\n", id, item)
58 time.Sleep(time.Millisecond * 100)
59 }
60 }(i)
61 }
62
63 // Producer goroutine
64 go func() {
65 for i := 1; i <= 9; i++ {
66 time.Sleep(time.Millisecond * 50)
67 queue.Enqueue(i)
68 }
69 }()
70
71 wg.Wait()
72 fmt.Println("All items processed")
73}
sync/atomic - Lock-Free Operations
For high-performance counters and flags without mutex overhead:
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "sync/atomic"
8)
9
10type AtomicCounter struct {
11 value int64
12}
13
14func (c *AtomicCounter) Increment() {
15 atomic.AddInt64(&c.value, 1)
16}
17
18func (c *AtomicCounter) Value() int64 {
19 return atomic.LoadInt64(&c.value)
20}
21
22func main() {
23 counter := &AtomicCounter{}
24 var wg sync.WaitGroup
25
26 // 100 goroutines incrementing 1000 times each
27 for i := 0; i < 100; i++ {
28 wg.Add(1)
29 go func() {
30 defer wg.Done()
31 for j := 0; j < 1000; j++ {
32 counter.Increment()
33 }
34 }()
35 }
36
37 wg.Wait()
38 fmt.Printf("Final atomic counter: %d (expected: 100000)\n", counter.Value())
39
40 // Compare-and-swap example
41 var flag int32
42 success := atomic.CompareAndSwapInt32(&flag, 0, 1)
43 fmt.Printf("\nCAS from 0 to 1: %v (flag is now %d)\n", success, atomic.LoadInt32(&flag))
44
45 success = atomic.CompareAndSwapInt32(&flag, 0, 2)
46 fmt.Printf("CAS from 0 to 2: %v (flag is still %d)\n", success, atomic.LoadInt32(&flag))
47}
When to use each synchronization primitive:
| Primitive | Use Case | Performance |
|---|---|---|
| Channels | Communication, coordination, pipelines | Moderate (copying overhead) |
| Mutex | Simple shared state protection | Good |
| RWMutex | Read-heavy workloads | Excellent for reads |
| Once | Lazy initialization, singletons | Excellent (zero overhead after first call) |
| Cond | Complex waiting conditions | Good |
| Atomic | Counters, flags, lock-free algorithms | Excellent (no locks) |
Common Patterns and Best Practices
Pattern 1: Timeout and Cancellation
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "time"
8)
9
10func doWork(ctx context.Context, task string) error {
11 // Simulate long-running work
12 workDone := make(chan bool)
13
14 go func() {
15 fmt.Printf("Starting task: %s\n", task)
16 time.Sleep(2 * time.Second)
17 workDone <- true
18 }()
19
20 select {
21 case <-workDone:
22 fmt.Printf("Task completed: %s\n", task)
23 return nil
24
25 case <-ctx.Done():
26 fmt.Printf("Task cancelled: %s\n", task)
27 return ctx.Err()
28
29 case <-time.After(1 * time.Second):
30 fmt.Printf("Task timed out: %s\n", task)
31 return fmt.Errorf("timeout exceeded")
32 }
33}
34
35func main() {
36 // Example 1: Timeout using context
37 fmt.Println("=== Example 1: Context with Timeout ===")
38 ctx1, cancel1 := context.WithTimeout(context.Background(), 500*time.Millisecond)
39 defer cancel1()
40
41 err := doWork(ctx1, "quick-task")
42 if err != nil {
43 fmt.Printf("Error: %v\n", err)
44 }
45
46 time.Sleep(1 * time.Second)
47
48 // Example 2: Manual cancellation
49 fmt.Println("\n=== Example 2: Manual Cancellation ===")
50 ctx2, cancel2 := context.WithCancel(context.Background())
51
52 go func() {
53 time.Sleep(500 * time.Millisecond)
54 fmt.Println("Sending cancellation signal...")
55 cancel2()
56 }()
57
58 err = doWork(ctx2, "cancellable-task")
59 if err != nil {
60 fmt.Printf("Error: %v\n", err)
61 }
62
63 // Example 3: Select with timeout
64 fmt.Println("\n=== Example 3: Select with Timeout ===")
65 ch := make(chan string, 1)
66
67 go func() {
68 time.Sleep(2 * time.Second)
69 ch <- "result"
70 }()
71
72 select {
73 case result := <-ch:
74 fmt.Printf("Received: %s\n", result)
75 case <-time.After(1 * time.Second):
76 fmt.Println("Operation timed out")
77 }
78}
Pattern 2: Rate Limiting
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9// Simple rate limiter using channels
10func rateLimiter() {
11 // Allow 3 requests per second
12 requests := make(chan int, 5)
13 limiter := time.Tick(time.Second / 3) // 3 requests per second
14
15 // Simulate incoming requests
16 for i := 1; i <= 5; i++ {
17 requests <- i
18 }
19 close(requests)
20
21 fmt.Println("=== Rate Limiter Demo ===")
22 fmt.Println("Processing 5 requests at 3 requests/second...")
23
24 for req := range requests {
25 <-limiter // Wait for rate limiter
26 fmt.Printf("Processing request %d at %s\n", req, time.Now().Format("15:04:05.000"))
27 }
28
29 fmt.Println("\n=== Token Bucket Rate Limiter ===")
30
31 // Token bucket implementation
32 tokenBucket := make(chan struct{}, 3) // Bucket capacity: 3 tokens
33
34 // Fill bucket with initial tokens
35 for i := 0; i < 3; i++ {
36 tokenBucket <- struct{}{}
37 }
38
39 // Refill tokens periodically
40 go func() {
41 ticker := time.NewTicker(time.Second)
42 defer ticker.Stop()
43
44 for range ticker.C {
45 select {
46 case tokenBucket <- struct{}{}:
47 fmt.Println("Added token to bucket")
48 default:
49 // Bucket is full, skip
50 }
51 }
52 }()
53
54 // Process requests using token bucket
55 for i := 1; i <= 6; i++ {
56 go func(reqID int) {
57 <-tokenBucket // Wait for token
58 fmt.Printf("Request %d processed at %s\n", reqID, time.Now().Format("15:04:05.000"))
59 }(i)
60
61 time.Sleep(time.Millisecond * 300)
62 }
63
64 time.Sleep(3 * time.Second)
65}
66
67func main() {
68 rateLimiter()
69}
Pattern 3: Semaphore for Resource Limiting
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10// Semaphore limits concurrent access to a resource
11type Semaphore chan struct{}
12
13func NewSemaphore(maxConcurrency int) Semaphore {
14 return make(Semaphore, maxConcurrency)
15}
16
17func (s Semaphore) Acquire() {
18 s <- struct{}{}
19}
20
21func (s Semaphore) Release() {
22 <-s
23}
24
25func worker(id int, sem Semaphore, wg *sync.WaitGroup) {
26 defer wg.Done()
27
28 fmt.Printf("Worker %d waiting for semaphore...\n", id)
29 sem.Acquire()
30 defer sem.Release()
31
32 fmt.Printf("Worker %d acquired semaphore, starting work\n", id)
33
34 // Simulate work
35 time.Sleep(time.Second)
36
37 fmt.Printf("Worker %d finished work, releasing semaphore\n", id)
38}
39
40func main() {
41 fmt.Println("=== Semaphore Pattern Demo ===")
42 fmt.Println("Max concurrent workers: 3")
43 fmt.Println("Total workers: 6")
44 fmt.Println()
45
46 // Create semaphore allowing 3 concurrent workers
47 sem := NewSemaphore(3)
48
49 var wg sync.WaitGroup
50
51 // Start 6 workers, but only 3 can run concurrently
52 for i := 1; i <= 6; i++ {
53 wg.Add(1)
54 go worker(i, sem, &wg)
55 time.Sleep(time.Millisecond * 100)
56 }
57
58 wg.Wait()
59 fmt.Println("\nAll workers completed")
60}
Common Pitfalls and How to Avoid Them
Pitfall 1: Goroutine Leaks
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9// ❌ WRONG: Goroutine leak
10func leakExample() {
11 ch := make(chan int)
12
13 go func() {
14 val := <-ch // This goroutine will block forever
15 fmt.Println("Received:", val)
16 }()
17
18 // Forgot to send to channel or close it
19 // Goroutine leaks - never exits
20 fmt.Println("Function returned, but goroutine is still running!")
21}
22
23// ✅ CORRECT: Ensure goroutine cleanup
24func fixedExample() {
25 ch := make(chan int)
26 done := make(chan struct{})
27
28 go func() {
29 defer close(done)
30
31 select {
32 case val := <-ch:
33 fmt.Println("Received:", val)
34 case <-time.After(2 * time.Second):
35 fmt.Println("Timeout waiting for value")
36 }
37 }()
38
39 // Send value within reasonable time
40 time.Sleep(500 * time.Millisecond)
41 ch <- 42
42
43 <-done // Wait for goroutine to finish
44 fmt.Println("Goroutine completed properly")
45}
46
47func main() {
48 fmt.Println("=== Goroutine Leak Example ===")
49 leakExample()
50
51 time.Sleep(1 * time.Second)
52
53 fmt.Println("\n=== Fixed Example ===")
54 fixedExample()
55}
Pitfall 2: Race Conditions
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10// ❌ WRONG: Race condition
11type UnsafeCounter struct {
12 value int
13}
14
15func (c *UnsafeCounter) Increment() {
16 c.value++ // Race condition!
17}
18
19func (c *UnsafeCounter) Value() int {
20 return c.value
21}
22
23// ✅ CORRECT: Thread-safe with mutex
24type SafeCounter struct {
25 mu sync.Mutex
26 value int
27}
28
29func (c *SafeCounter) Increment() {
30 c.mu.Lock()
31 c.value++
32 c.mu.Unlock()
33}
34
35func (c *SafeCounter) Value() int {
36 c.mu.Lock()
37 defer c.mu.Unlock()
38 return c.value
39}
40
41func main() {
42 fmt.Println("=== Race Condition Demo ===")
43
44 // Unsafe counter
45 unsafeCounter := &UnsafeCounter{}
46 var wg1 sync.WaitGroup
47
48 for i := 0; i < 1000; i++ {
49 wg1.Add(1)
50 go func() {
51 defer wg1.Done()
52 unsafeCounter.Increment()
53 }()
54 }
55
56 wg1.Wait()
57 fmt.Printf("Unsafe counter (expected 1000): %d\n", unsafeCounter.Value())
58
59 // Safe counter
60 safeCounter := &SafeCounter{}
61 var wg2 sync.WaitGroup
62
63 for i := 0; i < 1000; i++ {
64 wg2.Add(1)
65 go func() {
66 defer wg2.Done()
67 safeCounter.Increment()
68 }()
69 }
70
71 wg2.Wait()
72 fmt.Printf("Safe counter (expected 1000): %d\n", safeCounter.Value())
73}
Pitfall 3: Deadlocks
1// run
2package main
3
4import (
5 "fmt"
6 "time"
7)
8
9// ❌ WRONG: Deadlock with unbuffered channel
10func deadlockExample() {
11 fmt.Println("=== Deadlock Example ===")
12 ch := make(chan int) // Unbuffered
13
14 // This would deadlock if uncommented:
15 // ch <- 1 // Blocks waiting for receiver
16 // val := <-ch // Never reached
17
18 fmt.Println("Skipping deadlock example to avoid hanging...")
19}
20
21// ✅ CORRECT: Multiple solutions
22func fixedExamples() {
23 // Solution 1: Buffered channel
24 fmt.Println("\n=== Solution 1: Buffered Channel ===")
25 ch1 := make(chan int, 1) // Buffer size 1
26 ch1 <- 1 // Doesn't block
27 val := <-ch1 // Receives the value
28 fmt.Printf("Received: %d\n", val)
29
30 // Solution 2: Separate goroutines
31 fmt.Println("\n=== Solution 2: Separate Goroutines ===")
32 ch2 := make(chan int)
33
34 go func() {
35 ch2 <- 42 // Send in goroutine
36 }()
37
38 val2 := <-ch2 // Receive in main goroutine
39 fmt.Printf("Received: %d\n", val2)
40
41 // Solution 3: Select with timeout
42 fmt.Println("\n=== Solution 3: Select with Timeout ===")
43 ch3 := make(chan int)
44
45 select {
46 case ch3 <- 100:
47 fmt.Println("Sent successfully")
48 case <-time.After(time.Second):
49 fmt.Println("Send timed out (expected)")
50 }
51}
52
53func main() {
54 deadlockExample()
55 fixedExamples()
56}
Practice Exercises
Exercise 1: Concurrent Number Processing
Learning Objectives: Master basic goroutine creation, channel communication, and synchronization patterns.
Difficulty: Beginner
Real-World Context: Web servers often need to process multiple requests concurrently. This exercise demonstrates the fundamental patterns for coordinating concurrent work and collecting results.
Task: Create a program that:
- Processes a list of numbers concurrently using multiple goroutines
- Calculates the square of each number with simulated processing time
- Collects results using channels with proper synchronization
- Reports total processing time and results
Requirements:
- Use at least 3 worker goroutines
- Process a list of 10 numbers
- Use channels for communication
- Use WaitGroup for synchronization
- Calculate and display the sum of all squared values
Show Solution
1// run
2package main
3
4import (
5 "fmt"
6 "math/rand"
7 "sync"
8 "time"
9)
10
11func processNumber(num int, results chan<- int, wg *sync.WaitGroup) {
12 defer wg.Done()
13
14 // Simulate variable processing time
15 processingTime := time.Duration(rand.Intn(100)+50) * time.Millisecond
16 time.Sleep(processingTime)
17
18 square := num * num
19 results <- square
20 fmt.Printf("Processed %d -> %d (took %v)\n", num, square, processingTime)
21}
22
23func main() {
24 rand.Seed(time.Now().UnixNano())
25
26 numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
27
28 // Create channels and waitgroup
29 results := make(chan int, len(numbers))
30 var wg sync.WaitGroup
31
32 startTime := time.Now()
33
34 // Start goroutines
35 fmt.Println("Starting concurrent number processing...")
36 for _, num := range numbers {
37 wg.Add(1)
38 go processNumber(num, results, &wg)
39 }
40
41 // Close results channel after all goroutines finish
42 go func() {
43 wg.Wait()
44 close(results)
45 }()
46
47 // Collect results
48 fmt.Println("\nCollecting results:")
49 sum := 0
50 count := 0
51 for result := range results {
52 sum += result
53 count++
54 fmt.Printf("Result %d: %d (running sum: %d)\n", count, result, sum)
55 }
56
57 elapsed := time.Since(startTime)
58
59 fmt.Printf("\n=== Summary ===\n")
60 fmt.Printf("Processed %d numbers\n", count)
61 fmt.Printf("Sum of squares: %d\n", sum)
62 fmt.Printf("Total time: %v\n", elapsed)
63 fmt.Printf("Average time per number: %v\n", elapsed/time.Duration(count))
64}
Key Concepts:
- Goroutines for concurrent execution
- Buffered channels to prevent blocking
- WaitGroup for synchronization
- Defer for cleanup
- Channel closing to signal completion
Exercise 2: Producer-Consumer Pipeline
Learning Objectives: Build a multi-stage pipeline with buffered channels and understand backpressure management.
Difficulty: Intermediate
Real-World Context: Data processing pipelines are fundamental to stream processing, ETL systems, and real-time analytics. This exercise teaches how to build efficient data flow systems.
Task: Implement a pipeline with three stages:
- Generator: Produces numbers from 1 to 20
- Transformer: Multiplies each number by 2 and adds 10
- Collector: Filters numbers greater than 30 and calculates average
Requirements:
- Use separate goroutines for each stage
- Use buffered channels between stages
- Implement graceful shutdown when generator completes
- Report statistics (count, average, processing time)
Show Solution
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10// Stage 1: Generate numbers
11func generator(count int, out chan<- int, wg *sync.WaitGroup) {
12 defer wg.Done()
13 defer close(out)
14
15 fmt.Println("Generator: Starting...")
16 for i := 1; i <= count; i++ {
17 out <- i
18 fmt.Printf("Generator: Produced %d\n", i)
19 time.Sleep(time.Millisecond * 50)
20 }
21 fmt.Println("Generator: Completed")
22}
23
24// Stage 2: Transform data
25func transformer(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
26 defer wg.Done()
27 defer close(out)
28
29 fmt.Println("Transformer: Starting...")
30 for num := range in {
31 // Transform: multiply by 2 and add 10
32 transformed := num*2 + 10
33 out <- transformed
34 fmt.Printf("Transformer: %d -> %d\n", num, transformed)
35 time.Sleep(time.Millisecond * 30)
36 }
37 fmt.Println("Transformer: Completed")
38}
39
40// Stage 3: Collect and filter results
41func collector(in <-chan int, threshold int, wg *sync.WaitGroup) {
42 defer wg.Done()
43
44 fmt.Printf("Collector: Starting (filtering > %d)...\n", threshold)
45
46 var filtered []int
47 sum := 0
48
49 for num := range in {
50 if num > threshold {
51 filtered = append(filtered, num)
52 sum += num
53 fmt.Printf("Collector: Accepted %d\n", num)
54 } else {
55 fmt.Printf("Collector: Rejected %d\n", num)
56 }
57 time.Sleep(time.Millisecond * 20)
58 }
59
60 fmt.Println("\nCollector: Completed")
61 fmt.Printf("\n=== Results ===\n")
62 fmt.Printf("Filtered count: %d\n", len(filtered))
63 fmt.Printf("Filtered values: %v\n", filtered)
64 fmt.Printf("Sum: %d\n", sum)
65 if len(filtered) > 0 {
66 avg := float64(sum) / float64(len(filtered))
67 fmt.Printf("Average: %.2f\n", avg)
68 }
69}
70
71func main() {
72 startTime := time.Now()
73
74 // Create pipeline channels
75 stage1 := make(chan int, 5) // generator -> transformer
76 stage2 := make(chan int, 5) // transformer -> collector
77
78 var wg sync.WaitGroup
79
80 fmt.Println("=== Starting 3-Stage Pipeline ===\n")
81
82 // Start pipeline stages
83 wg.Add(3)
84
85 go generator(20, stage1, &wg)
86 go transformer(stage1, stage2, &wg)
87 go collector(stage2, 30, &wg)
88
89 // Wait for pipeline to complete
90 wg.Wait()
91
92 elapsed := time.Since(startTime)
93 fmt.Printf("\nTotal pipeline time: %v\n", elapsed)
94}
Key Concepts:
- Multi-stage pipeline architecture
- Buffered channels for throughput
- Cascading channel closure for cleanup
- Goroutine synchronization with WaitGroup
- Data transformation and filtering
Exercise 3: Worker Pool with Dynamic Jobs
Learning Objectives: Implement a worker pool that efficiently distributes work across multiple goroutines with load balancing.
Difficulty: Intermediate
Real-World Context: Worker pools are essential for building scalable web servers, task queues, and batch processing systems. This exercise demonstrates efficient resource utilization and load distribution.
Task: Build a worker pool that:
- Creates a fixed number of workers (e.g., 4 workers)
- Processes jobs with varying complexity (simulated by different sleep times)
- Tracks which worker processes each job
- Reports statistics (jobs per worker, total time, throughput)
Requirements:
- Use a job channel for distributing work
- Use a result channel for collecting outcomes
- Implement graceful shutdown
- Track and report worker statistics
Show Solution
1// run
2package main
3
4import (
5 "fmt"
6 "math/rand"
7 "sync"
8 "time"
9)
10
11type Job struct {
12 ID int
13 Complexity int // Represents processing time in milliseconds
14}
15
16type Result struct {
17 JobID int
18 WorkerID int
19 Duration time.Duration
20}
21
22func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
23 defer wg.Done()
24
25 fmt.Printf("Worker %d: Started\n", id)
26 jobsProcessed := 0
27
28 for job := range jobs {
29 startTime := time.Now()
30
31 // Simulate work based on complexity
32 time.Sleep(time.Duration(job.Complexity) * time.Millisecond)
33
34 duration := time.Since(startTime)
35 jobsProcessed++
36
37 fmt.Printf("Worker %d: Completed job %d (complexity: %dms, actual: %v)\n",
38 id, job.ID, job.Complexity, duration)
39
40 results <- Result{
41 JobID: job.ID,
42 WorkerID: id,
43 Duration: duration,
44 }
45 }
46
47 fmt.Printf("Worker %d: Finished (%d jobs processed)\n", id, jobsProcessed)
48}
49
50func main() {
51 rand.Seed(time.Now().UnixNano())
52
53 const numWorkers = 4
54 const numJobs = 20
55
56 // Create channels
57 jobs := make(chan Job, numJobs)
58 results := make(chan Result, numJobs)
59
60 var wg sync.WaitGroup
61
62 // Start workers
63 fmt.Printf("=== Starting Worker Pool (%d workers) ===\n\n", numWorkers)
64 startTime := time.Now()
65
66 for i := 1; i <= numWorkers; i++ {
67 wg.Add(1)
68 go worker(i, jobs, results, &wg)
69 }
70
71 // Submit jobs
72 fmt.Println("Submitting jobs...")
73 go func() {
74 for i := 1; i <= numJobs; i++ {
75 job := Job{
76 ID: i,
77 Complexity: rand.Intn(100) + 50, // 50-150ms
78 }
79 jobs <- job
80 fmt.Printf("Submitted job %d (complexity: %dms)\n", job.ID, job.Complexity)
81 }
82 close(jobs)
83 fmt.Println("All jobs submitted\n")
84 }()
85
86 // Close results channel when all workers finish
87 go func() {
88 wg.Wait()
89 close(results)
90 }()
91
92 // Collect results and compute statistics
93 workerStats := make(map[int]int)
94 totalDuration := time.Duration(0)
95 jobCount := 0
96
97 for result := range results {
98 workerStats[result.WorkerID]++
99 totalDuration += result.Duration
100 jobCount++
101 }
102
103 elapsed := time.Since(startTime)
104
105 // Print statistics
106 fmt.Printf("\n=== Worker Pool Statistics ===\n")
107 fmt.Printf("Total time: %v\n", elapsed)
108 fmt.Printf("Jobs processed: %d\n", jobCount)
109 fmt.Printf("Average job duration: %v\n", totalDuration/time.Duration(jobCount))
110 fmt.Printf("Throughput: %.2f jobs/second\n", float64(jobCount)/elapsed.Seconds())
111
112 fmt.Println("\nJobs per worker:")
113 for workerID := 1; workerID <= numWorkers; workerID++ {
114 count := workerStats[workerID]
115 fmt.Printf(" Worker %d: %d jobs (%.1f%%)\n",
116 workerID, count, float64(count)/float64(jobCount)*100)
117 }
118}
Key Concepts:
- Worker pool pattern for resource management
- Job distribution across workers
- Result collection and statistics
- Graceful shutdown with channel closing
- Load balancing through channel communication
Exercise 4: Context-Aware Operations
Learning Objectives: Master context propagation for cancellation, timeouts, and deadline management in concurrent systems.
Difficulty: Advanced
Real-World Context: Production systems must handle graceful shutdown, request timeouts, and operation cancellation. This exercise demonstrates how to build responsive concurrent systems that respect cancellation signals.
Task: Implement a system that:
- Processes multiple long-running operations concurrently
- Uses context for timeout management (operations should timeout after 2 seconds)
- Implements manual cancellation triggered after processing 5 operations
- Reports success/failure/timeout statistics
Requirements:
- Use context.WithTimeout for operation timeouts
- Use context.WithCancel for manual cancellation
- Implement at least 10 operations with variable durations
- Track and report statistics for each outcome type
Show Solution
1// run
2package main
3
4import (
5 "context"
6 "fmt"
7 "math/rand"
8 "sync"
9 "time"
10)
11
12type Operation struct {
13 ID int
14 Duration time.Duration
15}
16
17type Result struct {
18 OpID int
19 Status string // "success", "timeout", "cancelled"
20 Duration time.Duration
21 Error error
22}
23
24func executeOperation(ctx context.Context, op Operation) Result {
25 startTime := time.Now()
26
27 // Create a channel for operation completion
28 done := make(chan bool)
29
30 go func() {
31 // Simulate work
32 time.Sleep(op.Duration)
33 done <- true
34 }()
35
36 // Wait for completion, timeout, or cancellation
37 select {
38 case <-done:
39 return Result{
40 OpID: op.ID,
41 Status: "success",
42 Duration: time.Since(startTime),
43 }
44
45 case <-ctx.Done():
46 if ctx.Err() == context.DeadlineExceeded {
47 return Result{
48 OpID: op.ID,
49 Status: "timeout",
50 Duration: time.Since(startTime),
51 Error: ctx.Err(),
52 }
53 }
54 return Result{
55 OpID: op.ID,
56 Status: "cancelled",
57 Duration: time.Since(startTime),
58 Error: ctx.Err(),
59 }
60 }
61}
62
63func main() {
64 rand.Seed(time.Now().UnixNano())
65
66 // Create operations with variable durations
67 operations := make([]Operation, 15)
68 for i := 0; i < 15; i++ {
69 operations[i] = Operation{
70 ID: i + 1,
71 Duration: time.Duration(rand.Intn(3000)+500) * time.Millisecond,
72 }
73 }
74
75 // Create context with timeout
76 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
77 defer cancel()
78
79 results := make(chan Result, len(operations))
80 var wg sync.WaitGroup
81
82 fmt.Println("=== Context-Aware Operations Demo ===")
83 fmt.Printf("Starting %d operations with 2s timeout per operation\n", len(operations))
84 fmt.Println("Global context will cancel after 5 seconds\n")
85
86 startTime := time.Now()
87
88 // Launch operations
89 for _, op := range operations {
90 wg.Add(1)
91
92 go func(operation Operation) {
93 defer wg.Done()
94
95 // Create operation-specific timeout context
96 opCtx, opCancel := context.WithTimeout(ctx, 2*time.Second)
97 defer opCancel()
98
99 fmt.Printf("Operation %d: Starting (duration: %v)\n",
100 operation.ID, operation.Duration)
101
102 result := executeOperation(opCtx, operation)
103 results <- result
104
105 switch result.Status {
106 case "success":
107 fmt.Printf("Operation %d: ✓ Success (took %v)\n",
108 operation.ID, result.Duration)
109 case "timeout":
110 fmt.Printf("Operation %d: ⏱ Timeout (took %v)\n",
111 operation.ID, result.Duration)
112 case "cancelled":
113 fmt.Printf("Operation %d: ✗ Cancelled (took %v)\n",
114 operation.ID, result.Duration)
115 }
116 }(op)
117
118 time.Sleep(time.Millisecond * 100) // Stagger operation starts
119 }
120
121 // Wait for all operations to complete
122 go func() {
123 wg.Wait()
124 close(results)
125 }()
126
127 // Collect results
128 stats := map[string]int{
129 "success": 0,
130 "timeout": 0,
131 "cancelled": 0,
132 }
133
134 for result := range results {
135 stats[result.Status]++
136 }
137
138 elapsed := time.Since(startTime)
139
140 // Print statistics
141 fmt.Printf("\n=== Results ===\n")
142 fmt.Printf("Total time: %v\n", elapsed)
143 fmt.Printf("Operations completed: %d\n", stats["success"])
144 fmt.Printf("Operations timed out: %d\n", stats["timeout"])
145 fmt.Printf("Operations cancelled: %d\n", stats["cancelled"])
146 fmt.Printf("Total operations: %d\n", len(operations))
147}
Key Concepts:
- Context propagation for cancellation
- Timeout management with context.WithTimeout
- Cascading cancellation signals
- Distinguishing between timeout and manual cancellation
- Resource cleanup with defer
Exercise 5: Message Broker Pattern
Learning Objectives: Build a message broker that demonstrates pub-sub patterns with multiple publishers and subscribers.
Difficulty: Advanced
Real-World Context: Message brokers are central to event-driven architectures, microservices communication, and distributed systems. This exercise demonstrates building a simple but functional message distribution system.
Task: Implement a message broker that:
- Accepts messages from multiple publishers
- Distributes messages to multiple subscribers based on topics
- Supports topic-based subscription filtering
- Implements graceful shutdown for all components
Requirements:
- Support at least 3 topics: "orders", "payments", "notifications"
- Create 2 publishers sending messages to different topics
- Create 3 subscribers with different topic interests
- Track message delivery statistics per subscriber
Show Solution
1// run
2package main
3
4import (
5 "fmt"
6 "sync"
7 "time"
8)
9
10type Message struct {
11 Topic string
12 Payload string
13 SentAt time.Time
14}
15
16type Subscriber struct {
17 ID int
18 Topics map[string]bool
19 Messages chan Message
20}
21
22func NewSubscriber(id int, topics []string) *Subscriber {
23 topicMap := make(map[string]bool)
24 for _, topic := range topics {
25 topicMap[topic] = true
26 }
27
28 return &Subscriber{
29 ID: id,
30 Topics: topicMap,
31 Messages: make(chan Message, 10),
32 }
33}
34
35func (s *Subscriber) Start(wg *sync.WaitGroup) {
36 defer wg.Done()
37
38 fmt.Printf("Subscriber %d: Started (topics: ", s.ID)
39 first := true
40 for topic := range s.Topics {
41 if !first {
42 fmt.Print(", ")
43 }
44 fmt.Print(topic)
45 first = false
46 }
47 fmt.Println(")")
48
49 messageCount := 0
50
51 for msg := range s.Messages {
52 messageCount++
53 fmt.Printf("Subscriber %d: Received [%s] %s\n", s.ID, msg.Topic, msg.Payload)
54 time.Sleep(time.Millisecond * 50) // Simulate processing
55 }
56
57 fmt.Printf("Subscriber %d: Stopped (received %d messages)\n", s.ID, messageCount)
58}
59
60type MessageBroker struct {
61 subscribers []*Subscriber
62 incoming chan Message
63 mu sync.RWMutex
64}
65
66func NewMessageBroker() *MessageBroker {
67 return &MessageBroker{
68 subscribers: make([]*Subscriber, 0),
69 incoming: make(chan Message, 50),
70 }
71}
72
73func (mb *MessageBroker) Subscribe(subscriber *Subscriber) {
74 mb.mu.Lock()
75 defer mb.mu.Unlock()
76
77 mb.subscribers = append(mb.subscribers, subscriber)
78 fmt.Printf("Broker: Registered subscriber %d\n", subscriber.ID)
79}
80
81func (mb *MessageBroker) Start(wg *sync.WaitGroup) {
82 defer wg.Done()
83
84 fmt.Println("Broker: Started")
85 messagesRouted := 0
86
87 for msg := range mb.incoming {
88 messagesRouted++
89 fmt.Printf("Broker: Routing [%s] %s\n", msg.Topic, msg.Payload)
90
91 mb.mu.RLock()
92 for _, sub := range mb.subscribers {
93 if sub.Topics[msg.Topic] {
94 select {
95 case sub.Messages <- msg:
96 // Message delivered
97 default:
98 fmt.Printf("Broker: Warning - Subscriber %d channel full\n", sub.ID)
99 }
100 }
101 }
102 mb.mu.RUnlock()
103 }
104
105 // Close all subscriber channels
106 mb.mu.Lock()
107 for _, sub := range mb.subscribers {
108 close(sub.Messages)
109 }
110 mb.mu.Unlock()
111
112 fmt.Printf("Broker: Stopped (routed %d messages)\n", messagesRouted)
113}
114
115func (mb *MessageBroker) Publish(msg Message) {
116 mb.incoming <- msg
117}
118
119func (mb *MessageBroker) Shutdown() {
120 close(mb.incoming)
121}
122
123func publisher(id int, broker *MessageBroker, topics []string, count int, wg *sync.WaitGroup) {
124 defer wg.Done()
125
126 fmt.Printf("Publisher %d: Started\n", id)
127
128 for i := 1; i <= count; i++ {
129 topic := topics[i%len(topics)]
130 msg := Message{
131 Topic: topic,
132 Payload: fmt.Sprintf("Message %d from Publisher %d", i, id),
133 SentAt: time.Now(),
134 }
135
136 broker.Publish(msg)
137 fmt.Printf("Publisher %d: Published [%s] %s\n", id, msg.Topic, msg.Payload)
138
139 time.Sleep(time.Millisecond * 200)
140 }
141
142 fmt.Printf("Publisher %d: Finished\n", id)
143}
144
145func main() {
146 fmt.Println("=== Message Broker Demo ===\n")
147
148 // Create broker
149 broker := NewMessageBroker()
150
151 // Create subscribers with different topic interests
152 sub1 := NewSubscriber(1, []string{"orders", "payments"})
153 sub2 := NewSubscriber(2, []string{"payments", "notifications"})
154 sub3 := NewSubscriber(3, []string{"orders", "notifications"})
155
156 // Register subscribers
157 broker.Subscribe(sub1)
158 broker.Subscribe(sub2)
159 broker.Subscribe(sub3)
160
161 var wg sync.WaitGroup
162
163 // Start broker
164 wg.Add(1)
165 go broker.Start(&wg)
166
167 // Start subscribers
168 wg.Add(3)
169 go sub1.Start(&wg)
170 go sub2.Start(&wg)
171 go sub3.Start(&wg)
172
173 // Give broker and subscribers time to start
174 time.Sleep(time.Millisecond * 100)
175
176 // Start publishers
177 fmt.Println()
178 wg.Add(2)
179 go publisher(1, broker, []string{"orders", "payments"}, 6, &wg)
180 go publisher(2, broker, []string{"notifications", "orders"}, 6, &wg)
181
182 // Wait for publishers to finish
183 time.Sleep(3 * time.Second)
184
185 // Shutdown broker
186 fmt.Println("\nInitiating shutdown...")
187 broker.Shutdown()
188
189 // Wait for all goroutines to complete
190 wg.Wait()
191
192 fmt.Println("\n=== Demo Complete ===")
193}
Key Concepts:
- Pub-sub messaging pattern
- Topic-based message filtering
- Multiple concurrent publishers and subscribers
- Channel-based message distribution
- Graceful shutdown with proper cleanup
- Non-blocking send to prevent slow subscribers from blocking broker
Summary
Key Takeaways
Goroutines provide:
- Lightweight concurrency with minimal memory overhead (~2KB initial stack)
- Easy creation with simple
gokeyword syntax - Automatic scheduling by Go runtime across multiple CPU cores
- Channel-based communication for safe data sharing
Channels enable:
- Thread-safe communication between goroutines without explicit locks
- Synchronization through send/receive operations
- Composable patterns like pipelines, fan-out/fan-in, and worker pools
- Backpressure management through buffering and blocking behavior
Select statement provides:
- Multiplexing multiple channel operations
- Non-blocking operations with default case
- Timeout handling with time.After()
- Graceful shutdown patterns
Concurrency patterns:
- Worker pools for managing fixed numbers of goroutines
- Pipelines for data processing stages
- Fan-out/fan-in for distributing and collecting work
- Context-aware cancellation for graceful shutdown and timeouts
- Rate limiting for controlling resource consumption
- Semaphores for limiting concurrent access
Best practices:
- Always close channels to signal completion
- Use WaitGroups for waiting on multiple goroutines
- Prefer channels over shared memory and mutexes
- Handle context cancellation for responsive systems
- Avoid goroutine leaks by ensuring all goroutines can exit
- Use buffered channels for producer-consumer patterns to prevent blocking
- Protect shared state with mutexes when channels aren't practical
Next Steps
Continue your Go learning journey with these essential topics:
- Error Handling Patterns - Learn to handle errors in concurrent contexts
- Advanced Synchronization - Master sync.Once, sync.Cond, and atomic operations
- Performance Optimization - Learn to profile and optimize concurrent code
- Testing Concurrent Code - Master testing patterns for concurrent systems
- Production Patterns - Circuit breakers, rate limiting, and distributed coordination
Production Readiness
You now have the foundation for building production-ready concurrent Go applications. The patterns and concepts covered here are used extensively in:
- High-throughput web services serving thousands of concurrent requests
- Real-time data processing and stream processing systems
- Microservice architectures with inter-service communication
- Background job processing and task queue systems
- Distributed systems with proper coordination patterns
Remember: Go's concurrency model is designed for communication over shared memory. Master channels and goroutines, and you'll write scalable, efficient concurrent systems that can handle millions of operations.