Channel Basics

Exercise: Channel Basics

Difficulty - Beginner

📖 Background: This exercise builds upon the comprehensive concurrency coverage in Concurrency in Go. You should be familiar with goroutines and basic channel concepts before starting this exercise.

Learning Objectives

  • Master Go channels for goroutine communication
  • Understand buffered vs unbuffered channels
  • Learn channel direction
  • Practice common channel patterns
  • Handle channel closing correctly

Problem Statement

Create a ChannelUtils package that demonstrates fundamental channel operations and patterns. You'll implement common concurrency patterns using channels.

Implement these functions:

  1. Generator: Create a channel that generates numbers
  2. Pipeline: Chain multiple processing stages using channels
  3. FanOut: Distribute work across multiple workers
  4. FanIn: Merge results from multiple channels into one
  5. Timeout: Implement timeout using channels and select

Function Signatures

 1package channelutils
 2
 3import "time"
 4
 5// Generator returns a channel that emits numbers from start to end
 6func Generator(start, end int) <-chan int
 7
 8// Pipeline takes input channel, applies function, returns output channel
 9func Pipeline(input <-chan int, fn func(int) int) <-chan int
10
11// FanOut distributes input across n worker goroutines
12func FanOut(input <-chan int, n int, fn func(int) int) []<-chan int
13
14// FanIn merges multiple channels into one
15func FanIn(channels ...<-chan int) <-chan int
16
17// WithTimeout wraps a channel operation with timeout
18func WithTimeout(ch <-chan int, timeout time.Duration)

Example Usage

 1package main
 2
 3import (
 4    "fmt"
 5    "time"
 6    "channelutils"
 7)
 8
 9func main() {
10    // Generator pattern
11    numbers := channelutils.Generator(1, 5)
12    for num := range numbers {
13        fmt.Printf("Generated: %d\n", num)
14    }
15    // Output: 1, 2, 3, 4, 5
16
17    // Pipeline pattern
18    input := channelutils.Generator(1, 5)
19    squared := channelutils.Pipeline(input, func(n int) int {
20        return n * n
21    })
22
23    for result := range squared {
24        fmt.Printf("Squared: %d\n", result)
25    }
26    // Output: 1, 4, 9, 16, 25
27
28    // Fan-out pattern
29    input = channelutils.Generator(1, 10)
30    workers := channelutils.FanOut(input, 3, func(n int) int {
31        time.Sleep(100 * time.Millisecond)
32        return n * 2
33    })
34
35    // Fan-in pattern
36    merged := channelutils.FanIn(workers...)
37    for result := range merged {
38        fmt.Printf("Result: %d\n", result)
39    }
40
41    // Timeout pattern
42    slowChan := make(chan int)
43    go func() {
44        time.Sleep(2 * time.Second)
45        slowChan <- 42
46    }()
47
48    value, err := channelutils.WithTimeout(slowChan, 1*time.Second)
49    if err != nil {
50        fmt.Println("Operation timed out")
51    } else {
52        fmt.Printf("Received: %d\n", value)
53    }
54}

Requirements

  1. Generators should close channels when done
  2. Pipeline functions should propagate closure
  3. FanOut should launch goroutines for parallel processing
  4. FanIn should wait for all input channels to close
  5. WithTimeout should return error on timeout

Solution

Click to see the complete solution
 1package channelutils
 2
 3import (
 4    "errors"
 5    "sync"
 6    "time"
 7)
 8
 9// Generator returns a channel that emits numbers from start to end
10func Generator(start, end int) <-chan int {
11    out := make(chan int)
12
13    go func() {
14        defer close(out)
15        for i := start; i <= end; i++ {
16            out <- i
17        }
18    }()
19
20    return out
21}
22
23// Pipeline takes input channel, applies function, returns output channel
24func Pipeline(input <-chan int, fn func(int) int) <-chan int {
25    out := make(chan int)
26
27    go func() {
28        defer close(out)
29        for value := range input {
30            out <- fn(value)
31        }
32    }()
33
34    return out
35}
36
37// FanOut distributes input across n worker goroutines
38func FanOut(input <-chan int, n int, fn func(int) int) []<-chan int {
39    channels := make([]<-chan int, n)
40
41    for i := 0; i < n; i++ {
42        out := make(chan int)
43        channels[i] = out
44
45        go func(ch chan<- int) {
46            defer close(ch)
47            for value := range input {
48                ch <- fn(value)
49            }
50        }(out)
51    }
52
53    return channels
54}
55
56// FanIn merges multiple channels into one
57func FanIn(channels ...<-chan int) <-chan int {
58    out := make(chan int)
59    var wg sync.WaitGroup
60
61    // Function to copy values from channel to out
62    output := func(ch <-chan int) {
63        defer wg.Done()
64        for value := range ch {
65            out <- value
66        }
67    }
68
69    // Start a goroutine for each input channel
70    wg.Add(len(channels))
71    for _, ch := range channels {
72        go output(ch)
73    }
74
75    // Close out channel when all inputs are closed
76    go func() {
77        wg.Wait()
78        close(out)
79    }()
80
81    return out
82}
83
84// WithTimeout wraps a channel operation with timeout
85func WithTimeout(ch <-chan int, timeout time.Duration) {
86    select {
87    case value := <-ch:
88        return value, nil
89    case <-time.After(timeout):
90        return 0, errors.New("operation timed out")
91    }
92}

Explanation

Generator:

  • Creates unbuffered channel for output
  • Launches goroutine that sends values
  • Closes channel after sending all values
  • Returns receive-only channel

Pipeline:

  • Takes input channel and transformation function
  • Creates output channel
  • Reads from input, applies function, sends to output
  • Closes output when input is exhausted
  • Enables composable data processing

FanOut:

  • Creates n output channels
  • Launches n goroutines
  • Each worker reads from shared input channel
  • Distributes work across workers automatically
  • Each worker closes its own output channel

FanIn:

  • Creates single output channel
  • Uses WaitGroup to track active goroutines
  • Launches goroutine for each input channel
  • Each goroutine forwards values to output
  • Closes output only after all inputs close

WithTimeout:

  • Uses select statement with timeout
  • time.After() creates timeout channel
  • Returns value if received before timeout
  • Returns error if timeout expires first

Channel Patterns Explained

1. Generator Pattern:

 1// Produces values on a channel
 2func generate(values ...int) <-chan int {
 3    out := make(chan int)
 4    go func() {
 5        defer close(out)
 6        for _, v := range values {
 7            out <- v
 8        }
 9    }()
10    return out
11}

2. Pipeline Pattern:

1// Chain multiple stages
2numbers := generate(1, 2, 3)
3squared := square(numbers)
4doubled := double(squared)
5
6for result := range doubled {
7    fmt.Println(result)
8}

3. Fan-Out Pattern:

1// Distribute work to multiple workers
2input := generate(1, 2, 3, 4, 5)
3workers := []<-chan int{
4    worker(input),
5    worker(input),
6    worker(input),
7}

4. Fan-In Pattern:

1// Merge results from multiple sources
2merged := merge(workers...)
3for result := range merged {
4    fmt.Println(result)
5}

5. Select Pattern:

1select {
2case msg := <-ch1:
3    fmt.Println("Received from ch1:", msg)
4case msg := <-ch2:
5    fmt.Println("Received from ch2:", msg)
6case <-time.After(1 * time.Second):
7    fmt.Println("Timeout")
8}

Channel Best Practices

1. Close channels from sender:

1// Good
2go func() {
3    defer close(ch)
4    for _, item := range items {
5        ch <- item
6    }
7}()
8
9// Bad: closing from receiver

2. Use buffered channels to avoid blocking:

1// Unbuffered: blocks until receiver ready
2ch := make(chan int)
3
4// Buffered: allows n sends without blocking
5ch := make(chan int, 10)

3. Check for closed channels:

1value, ok := <-ch
2if !ok {
3    // Channel closed
4}
5
6// Or use range
7for value := range ch {
8    // Process value
9}

4. Avoid goroutine leaks:

 1// Ensure goroutines can exit
 2ctx, cancel := context.WithCancel(context.Background())
 3defer cancel()
 4
 5go func() {
 6    for {
 7        select {
 8        case <-ctx.Done():
 9            return // Exit goroutine
10        case value := <-ch:
11            // Process value
12        }
13    }
14}()

Testing Concurrent Code

 1func TestGenerator(t *testing.T) {
 2    ch := channelutils.Generator(1, 5)
 3
 4    expected := []int{1, 2, 3, 4, 5}
 5    var received []int
 6
 7    for value := range ch {
 8        received = append(received, value)
 9    }
10
11    if len(received) != len(expected) {
12        t.Fatalf("Expected %d values, got %d", len(expected), len(received))
13    }
14
15    for i, v := range received {
16        if v != expected[i] {
17            t.Errorf("Index %d: expected %d, got %d", i, expected[i], v)
18        }
19    }
20}
21
22func TestWithTimeout(t *testing.T) {
23    // Test success case
24    ch := make(chan int, 1)
25    ch <- 42
26
27    value, err := channelutils.WithTimeout(ch, 1*time.Second)
28    if err != nil {
29        t.Fatalf("Expected no error, got %v", err)
30    }
31    if value != 42 {
32        t.Errorf("Expected 42, got %d", value)
33    }
34
35    // Test timeout case
36    slowCh := make(chan int)
37    _, err = channelutils.WithTimeout(slowCh, 100*time.Millisecond)
38    if err == nil {
39        t.Error("Expected timeout error")
40    }
41}

Test Cases

 1package channelutils
 2
 3import (
 4    "testing"
 5    "time"
 6)
 7
 8func TestGenerator(t *testing.T) {
 9    ch := Generator(1, 3)
10
11    expected := []int{1, 2, 3}
12    var got []int
13
14    for value := range ch {
15        got = append(got, value)
16    }
17
18    if len(got) != len(expected) {
19        t.Fatalf("Length mismatch: expected %d, got %d", len(expected), len(got))
20    }
21
22    for i := range expected {
23        if got[i] != expected[i] {
24            t.Errorf("Index %d: expected %d, got %d", i, expected[i], got[i])
25        }
26    }
27}
28
29func TestPipeline(t *testing.T) {
30    input := Generator(1, 3)
31    output := Pipeline(input, func(n int) int {
32        return n * 2
33    })
34
35    expected := []int{2, 4, 6}
36    var got []int
37
38    for value := range output {
39        got = append(got, value)
40    }
41
42    if len(got) != len(expected) {
43        t.Fatalf("Length mismatch: expected %d, got %d", len(expected), len(got))
44    }
45}
46
47func TestFanIn(t *testing.T) {
48    ch1 := make(chan int, 2)
49    ch2 := make(chan int, 2)
50
51    ch1 <- 1
52    ch1 <- 2
53    close(ch1)
54
55    ch2 <- 3
56    ch2 <- 4
57    close(ch2)
58
59    merged := FanIn(ch1, ch2)
60
61    var count int
62    for range merged {
63        count++
64    }
65
66    if count != 4 {
67        t.Errorf("Expected 4 values, got %d", count)
68    }
69}
70
71func TestWithTimeout_Success(t *testing.T) {
72    ch := make(chan int, 1)
73    ch <- 42
74
75    value, err := WithTimeout(ch, 1*time.Second)
76    if err != nil {
77        t.Fatalf("Unexpected error: %v", err)
78    }
79
80    if value != 42 {
81        t.Errorf("Expected 42, got %d", value)
82    }
83}
84
85func TestWithTimeout_Timeout(t *testing.T) {
86    ch := make(chan int)
87
88    _, err := WithTimeout(ch, 100*time.Millisecond)
89    if err == nil {
90        t.Error("Expected timeout error")
91    }
92}

Bonus Challenges

  1. Broadcast: Send same value to multiple channels
1func Broadcast(value int, n int) []<-chan int
  1. Buffer: Implement a buffered pipeline stage
1func Buffer(input <-chan int, size int) <-chan int
  1. Rate Limiter: Limit throughput using channels
1func RateLimit(input <-chan int, rate time.Duration) <-chan int
  1. Worker Pool: Reusable worker pool pattern
1type WorkerPool struct {
2    workers int
3    jobs    chan func()
4}
5
6func NewWorkerPool(workers int) *WorkerPool
7func Submit(job func())

Key Takeaways

  • Channels are typed - each channel carries values of one type
  • Unbuffered channels block until both sender and receiver are ready
  • Buffered channels block only when buffer is full/empty
  • Close channels from sender to signal completion
  • Range over channels automatically stops when channel closes
  • Select allows non-blocking operations with timeouts
  • Direction matters - use <-chan and chan<- for safety

Channels are Go's primary concurrency primitive. Understanding channel patterns and proper channel usage is essential for writing concurrent Go programs. Always ensure goroutines can exit to avoid leaks.