Exercise: Channel Basics
Difficulty - Beginner
📖 Background: This exercise builds upon the comprehensive concurrency coverage in Concurrency in Go. You should be familiar with goroutines and basic channel concepts before starting this exercise.
Learning Objectives
- Master Go channels for goroutine communication
- Understand buffered vs unbuffered channels
- Learn channel direction
- Practice common channel patterns
- Handle channel closing correctly
Problem Statement
Create a ChannelUtils package that demonstrates fundamental channel operations and patterns. You'll implement common concurrency patterns using channels.
Implement these functions:
- Generator: Create a channel that generates numbers
- Pipeline: Chain multiple processing stages using channels
- FanOut: Distribute work across multiple workers
- FanIn: Merge results from multiple channels into one
- Timeout: Implement timeout using channels and select
Function Signatures
1package channelutils
2
3import "time"
4
5// Generator returns a channel that emits numbers from start to end
6func Generator(start, end int) <-chan int
7
8// Pipeline takes input channel, applies function, returns output channel
9func Pipeline(input <-chan int, fn func(int) int) <-chan int
10
11// FanOut distributes input across n worker goroutines
12func FanOut(input <-chan int, n int, fn func(int) int) []<-chan int
13
14// FanIn merges multiple channels into one
15func FanIn(channels ...<-chan int) <-chan int
16
17// WithTimeout wraps a channel operation with timeout
18func WithTimeout(ch <-chan int, timeout time.Duration)
Example Usage
1package main
2
3import (
4 "fmt"
5 "time"
6 "channelutils"
7)
8
9func main() {
10 // Generator pattern
11 numbers := channelutils.Generator(1, 5)
12 for num := range numbers {
13 fmt.Printf("Generated: %d\n", num)
14 }
15 // Output: 1, 2, 3, 4, 5
16
17 // Pipeline pattern
18 input := channelutils.Generator(1, 5)
19 squared := channelutils.Pipeline(input, func(n int) int {
20 return n * n
21 })
22
23 for result := range squared {
24 fmt.Printf("Squared: %d\n", result)
25 }
26 // Output: 1, 4, 9, 16, 25
27
28 // Fan-out pattern
29 input = channelutils.Generator(1, 10)
30 workers := channelutils.FanOut(input, 3, func(n int) int {
31 time.Sleep(100 * time.Millisecond)
32 return n * 2
33 })
34
35 // Fan-in pattern
36 merged := channelutils.FanIn(workers...)
37 for result := range merged {
38 fmt.Printf("Result: %d\n", result)
39 }
40
41 // Timeout pattern
42 slowChan := make(chan int)
43 go func() {
44 time.Sleep(2 * time.Second)
45 slowChan <- 42
46 }()
47
48 value, err := channelutils.WithTimeout(slowChan, 1*time.Second)
49 if err != nil {
50 fmt.Println("Operation timed out")
51 } else {
52 fmt.Printf("Received: %d\n", value)
53 }
54}
Requirements
- Generators should close channels when done
- Pipeline functions should propagate closure
- FanOut should launch goroutines for parallel processing
- FanIn should wait for all input channels to close
- WithTimeout should return error on timeout
Solution
Click to see the complete solution
1package channelutils
2
3import (
4 "errors"
5 "sync"
6 "time"
7)
8
9// Generator returns a channel that emits numbers from start to end
10func Generator(start, end int) <-chan int {
11 out := make(chan int)
12
13 go func() {
14 defer close(out)
15 for i := start; i <= end; i++ {
16 out <- i
17 }
18 }()
19
20 return out
21}
22
23// Pipeline takes input channel, applies function, returns output channel
24func Pipeline(input <-chan int, fn func(int) int) <-chan int {
25 out := make(chan int)
26
27 go func() {
28 defer close(out)
29 for value := range input {
30 out <- fn(value)
31 }
32 }()
33
34 return out
35}
36
37// FanOut distributes input across n worker goroutines
38func FanOut(input <-chan int, n int, fn func(int) int) []<-chan int {
39 channels := make([]<-chan int, n)
40
41 for i := 0; i < n; i++ {
42 out := make(chan int)
43 channels[i] = out
44
45 go func(ch chan<- int) {
46 defer close(ch)
47 for value := range input {
48 ch <- fn(value)
49 }
50 }(out)
51 }
52
53 return channels
54}
55
56// FanIn merges multiple channels into one
57func FanIn(channels ...<-chan int) <-chan int {
58 out := make(chan int)
59 var wg sync.WaitGroup
60
61 // Function to copy values from channel to out
62 output := func(ch <-chan int) {
63 defer wg.Done()
64 for value := range ch {
65 out <- value
66 }
67 }
68
69 // Start a goroutine for each input channel
70 wg.Add(len(channels))
71 for _, ch := range channels {
72 go output(ch)
73 }
74
75 // Close out channel when all inputs are closed
76 go func() {
77 wg.Wait()
78 close(out)
79 }()
80
81 return out
82}
83
84// WithTimeout wraps a channel operation with timeout
85func WithTimeout(ch <-chan int, timeout time.Duration) {
86 select {
87 case value := <-ch:
88 return value, nil
89 case <-time.After(timeout):
90 return 0, errors.New("operation timed out")
91 }
92}
Explanation
Generator:
- Creates unbuffered channel for output
- Launches goroutine that sends values
- Closes channel after sending all values
- Returns receive-only channel
Pipeline:
- Takes input channel and transformation function
- Creates output channel
- Reads from input, applies function, sends to output
- Closes output when input is exhausted
- Enables composable data processing
FanOut:
- Creates n output channels
- Launches n goroutines
- Each worker reads from shared input channel
- Distributes work across workers automatically
- Each worker closes its own output channel
FanIn:
- Creates single output channel
- Uses WaitGroup to track active goroutines
- Launches goroutine for each input channel
- Each goroutine forwards values to output
- Closes output only after all inputs close
WithTimeout:
- Uses
selectstatement with timeout time.After()creates timeout channel- Returns value if received before timeout
- Returns error if timeout expires first
Channel Patterns Explained
1. Generator Pattern:
1// Produces values on a channel
2func generate(values ...int) <-chan int {
3 out := make(chan int)
4 go func() {
5 defer close(out)
6 for _, v := range values {
7 out <- v
8 }
9 }()
10 return out
11}
2. Pipeline Pattern:
1// Chain multiple stages
2numbers := generate(1, 2, 3)
3squared := square(numbers)
4doubled := double(squared)
5
6for result := range doubled {
7 fmt.Println(result)
8}
3. Fan-Out Pattern:
1// Distribute work to multiple workers
2input := generate(1, 2, 3, 4, 5)
3workers := []<-chan int{
4 worker(input),
5 worker(input),
6 worker(input),
7}
4. Fan-In Pattern:
1// Merge results from multiple sources
2merged := merge(workers...)
3for result := range merged {
4 fmt.Println(result)
5}
5. Select Pattern:
1select {
2case msg := <-ch1:
3 fmt.Println("Received from ch1:", msg)
4case msg := <-ch2:
5 fmt.Println("Received from ch2:", msg)
6case <-time.After(1 * time.Second):
7 fmt.Println("Timeout")
8}
Channel Best Practices
1. Close channels from sender:
1// Good
2go func() {
3 defer close(ch)
4 for _, item := range items {
5 ch <- item
6 }
7}()
8
9// Bad: closing from receiver
2. Use buffered channels to avoid blocking:
1// Unbuffered: blocks until receiver ready
2ch := make(chan int)
3
4// Buffered: allows n sends without blocking
5ch := make(chan int, 10)
3. Check for closed channels:
1value, ok := <-ch
2if !ok {
3 // Channel closed
4}
5
6// Or use range
7for value := range ch {
8 // Process value
9}
4. Avoid goroutine leaks:
1// Ensure goroutines can exit
2ctx, cancel := context.WithCancel(context.Background())
3defer cancel()
4
5go func() {
6 for {
7 select {
8 case <-ctx.Done():
9 return // Exit goroutine
10 case value := <-ch:
11 // Process value
12 }
13 }
14}()
Testing Concurrent Code
1func TestGenerator(t *testing.T) {
2 ch := channelutils.Generator(1, 5)
3
4 expected := []int{1, 2, 3, 4, 5}
5 var received []int
6
7 for value := range ch {
8 received = append(received, value)
9 }
10
11 if len(received) != len(expected) {
12 t.Fatalf("Expected %d values, got %d", len(expected), len(received))
13 }
14
15 for i, v := range received {
16 if v != expected[i] {
17 t.Errorf("Index %d: expected %d, got %d", i, expected[i], v)
18 }
19 }
20}
21
22func TestWithTimeout(t *testing.T) {
23 // Test success case
24 ch := make(chan int, 1)
25 ch <- 42
26
27 value, err := channelutils.WithTimeout(ch, 1*time.Second)
28 if err != nil {
29 t.Fatalf("Expected no error, got %v", err)
30 }
31 if value != 42 {
32 t.Errorf("Expected 42, got %d", value)
33 }
34
35 // Test timeout case
36 slowCh := make(chan int)
37 _, err = channelutils.WithTimeout(slowCh, 100*time.Millisecond)
38 if err == nil {
39 t.Error("Expected timeout error")
40 }
41}
Test Cases
1package channelutils
2
3import (
4 "testing"
5 "time"
6)
7
8func TestGenerator(t *testing.T) {
9 ch := Generator(1, 3)
10
11 expected := []int{1, 2, 3}
12 var got []int
13
14 for value := range ch {
15 got = append(got, value)
16 }
17
18 if len(got) != len(expected) {
19 t.Fatalf("Length mismatch: expected %d, got %d", len(expected), len(got))
20 }
21
22 for i := range expected {
23 if got[i] != expected[i] {
24 t.Errorf("Index %d: expected %d, got %d", i, expected[i], got[i])
25 }
26 }
27}
28
29func TestPipeline(t *testing.T) {
30 input := Generator(1, 3)
31 output := Pipeline(input, func(n int) int {
32 return n * 2
33 })
34
35 expected := []int{2, 4, 6}
36 var got []int
37
38 for value := range output {
39 got = append(got, value)
40 }
41
42 if len(got) != len(expected) {
43 t.Fatalf("Length mismatch: expected %d, got %d", len(expected), len(got))
44 }
45}
46
47func TestFanIn(t *testing.T) {
48 ch1 := make(chan int, 2)
49 ch2 := make(chan int, 2)
50
51 ch1 <- 1
52 ch1 <- 2
53 close(ch1)
54
55 ch2 <- 3
56 ch2 <- 4
57 close(ch2)
58
59 merged := FanIn(ch1, ch2)
60
61 var count int
62 for range merged {
63 count++
64 }
65
66 if count != 4 {
67 t.Errorf("Expected 4 values, got %d", count)
68 }
69}
70
71func TestWithTimeout_Success(t *testing.T) {
72 ch := make(chan int, 1)
73 ch <- 42
74
75 value, err := WithTimeout(ch, 1*time.Second)
76 if err != nil {
77 t.Fatalf("Unexpected error: %v", err)
78 }
79
80 if value != 42 {
81 t.Errorf("Expected 42, got %d", value)
82 }
83}
84
85func TestWithTimeout_Timeout(t *testing.T) {
86 ch := make(chan int)
87
88 _, err := WithTimeout(ch, 100*time.Millisecond)
89 if err == nil {
90 t.Error("Expected timeout error")
91 }
92}
Bonus Challenges
- Broadcast: Send same value to multiple channels
1func Broadcast(value int, n int) []<-chan int
- Buffer: Implement a buffered pipeline stage
1func Buffer(input <-chan int, size int) <-chan int
- Rate Limiter: Limit throughput using channels
1func RateLimit(input <-chan int, rate time.Duration) <-chan int
- Worker Pool: Reusable worker pool pattern
1type WorkerPool struct {
2 workers int
3 jobs chan func()
4}
5
6func NewWorkerPool(workers int) *WorkerPool
7func Submit(job func())
Key Takeaways
- Channels are typed - each channel carries values of one type
- Unbuffered channels block until both sender and receiver are ready
- Buffered channels block only when buffer is full/empty
- Close channels from sender to signal completion
- Range over channels automatically stops when channel closes
- Select allows non-blocking operations with timeouts
- Direction matters - use
<-chanandchan<-for safety
Channels are Go's primary concurrency primitive. Understanding channel patterns and proper channel usage is essential for writing concurrent Go programs. Always ensure goroutines can exit to avoid leaks.