Pub/Sub Message Broker

Exercise: Pub/Sub Message Broker

Difficulty - Intermediate

Learning Objectives

  • Implement publish/subscribe messaging patterns
  • Manage concurrent subscribers safely
  • Handle slow/blocked subscribers gracefully
  • Design topic-based routing systems
  • Implement wildcard topic matching
  • Build message filtering and transformation

Problem Statement

Create an in-memory publish/subscribe message broker that allows publishers to send messages to topics and subscribers to receive messages from topics they're interested in. This pattern is fundamental in distributed systems, event-driven architectures, microservices communication, and real-time applications. Your implementation should handle concurrency, backpressure, and flexible topic routing.

Requirements

1. Broker Core

Implement a Broker type that:

  • Manages subscribers for multiple topics
  • Routes messages from publishers to subscribers
  • Supports concurrent publishing and subscribing
  • Provides thread-safe operations
  • Gracefully handles subscriber cleanup
  • Returns subscription handles for unsubscribing

Example Usage:

1broker := NewBroker()
2sub := broker.Subscribe("events.user.login")
3broker.Publish("events.user.login", "user123 logged in")
4msg := <-sub  // Receives "user123 logged in"

2. Subscribe Operations

Support flexible subscription patterns:

  • Subscribe to exact topic names
  • Return a channel for receiving messages
  • Allow multiple subscribers per topic
  • Configure subscriber buffer size
  • Automatically create topics on first subscription
  • Support concurrent subscriptions

Example Usage:

1// Multiple subscribers on same topic
2sub1 := broker.Subscribe("orders")
3sub2 := broker.Subscribe("orders")
4// Both sub1 and sub2 receive all messages to "orders"
5
6// Configure buffer size
7sub := broker.SubscribeBuffered("logs", 100)

3. Publish Operations

Deliver messages to all subscribers:

  • Publish messages to specific topics
  • Broadcast to all subscribers of that topic
  • Handle slow subscribers without blocking publisher
  • Support any message type
  • Return number of subscribers notified
  • Never block on individual subscriber

Example Usage:

1type UserEvent struct {
2    UserID string
3    Action string
4}
5
6event := UserEvent{UserID: "123", Action: "login"}
7count := broker.Publish("events.user", event)
8fmt.Printf("Delivered to %d subscribers\n", count)

4. Unsubscribe Operations

Clean up subscriptions properly:

  • Unsubscribe using subscription channel
  • Close subscriber channel on unsubscribe
  • Remove from broker's routing table
  • Support unsubscribing from multiple topics
  • Handle already-closed subscriptions gracefully
  • Prevent memory leaks from abandoned subscriptions

Example Usage:

1sub := broker.Subscribe("events")
2// Use subscription...
3broker.Unsubscribe("events", sub)
4// sub channel is now closed

5. Wildcard Topic Matching

Support pattern-based subscriptions:

  • Use hierarchical topic names
  • Support * for single-level wildcard
  • Support # for multi-level wildcard
  • Match messages to all matching patterns
  • Efficient matching algorithm
  • Standard MQTT-style topic matching

Example Usage:

1// Subscribe to all user events
2sub := broker.SubscribePattern("events.user.*")
3// Receives: events.user.login, events.user.logout, etc.
4
5// Subscribe to all events
6sub := broker.SubscribePattern("events.#")
7// Receives: events.user.login, events.system.startup, etc.

6. Subscriber Options

Configure subscriber behavior:

  • Set buffer size
  • Choose blocking vs dropping strategy for slow consumers
  • Add message filters
  • Transform messages before delivery
  • Set subscriber priority
  • Configure automatic unsubscribe conditions

Example Usage:

1opts := SubscriberOptions{
2    BufferSize: 50,
3    DropOnFull: true,  // Drop messages instead of blocking
4    Filter: func(msg Message) bool {
5        return msg.Priority == "high"
6    },
7}
8sub := broker.SubscribeWithOptions("alerts", opts)

Type Signatures

 1package pubsub
 2
 3import "sync"
 4
 5// Message represents a published message
 6type Message struct {
 7    Topic   string
 8    Data    interface{}
 9    ID      string
10    Timestamp time.Time
11}
12
13// Subscriber is a channel for receiving messages
14type Subscriber chan Message
15
16// FilterFunc determines if a message should be delivered
17type FilterFunc func(Message) bool
18
19// SubscriberOptions configures subscriber behavior
20type SubscriberOptions struct {
21    BufferSize int
22    DropOnFull bool
23    Filter     FilterFunc
24}
25
26// Broker manages pub/sub message routing
27type Broker struct {
28    mu          sync.RWMutex
29    subscribers map[string][]Subscriber
30    patterns    map[string][]Subscriber  // For wildcard subscriptions
31}
32
33// Core functions
34func NewBroker() *Broker
35func Subscribe(topic string) Subscriber
36func SubscribeBuffered(topic string, size int) Subscriber
37func SubscribeWithOptions(topic string, opts SubscriberOptions) Subscriber
38func Publish(topic string, data interface{}) int
39func PublishMessage(msg Message) int
40func Unsubscribe(topic string, sub Subscriber)
41
42// Pattern matching
43func SubscribePattern(pattern string) Subscriber
44func MatchesTopic(pattern, topic string) bool
45
46// Utility functions
47func Topics() []string
48func SubscriberCount(topic string) int
49func Close()  // Close all subscriptions

Example: Event-Driven System

Here's a realistic example using pub/sub for system events:

 1package main
 2
 3import (
 4    "fmt"
 5    "log"
 6    "time"
 7    "your-module/pubsub"
 8)
 9
10type UserEvent struct {
11    UserID string
12    Action string
13    Time   time.Time
14}
15
16type SystemEvent struct {
17    Component string
18    Status    string
19}
20
21func main() {
22    broker := pubsub.NewBroker()
23
24    // Analytics subscriber - gets all events
25    analytics := broker.SubscribePattern("events.#")
26    go func() {
27        for msg := range analytics {
28            log.Printf("[Analytics] Received: %s", msg.Topic)
29            // Send to analytics system
30        }
31    }()
32
33    // User service - only user events
34    userService := broker.SubscribePattern("events.user.*")
35    go func() {
36        for msg := range userService {
37            event := msg.Data.(UserEvent)
38            log.Printf("[UserService] %s: %s", event.UserID, event.Action)
39            // Update user statistics
40        }
41    }()
42
43    // Alert service - high-priority only
44    opts := pubsub.SubscriberOptions{
45        BufferSize: 10,
46        Filter: func(msg pubsub.Message) bool {
47            // Only alert on logout events
48            if user, ok := msg.Data.(UserEvent); ok {
49                return user.Action == "logout"
50            }
51            return false
52        },
53    }
54    alerts := broker.SubscribeWithOptions("events.user.*", opts)
55    go func() {
56        for msg := range alerts {
57            event := msg.Data.(UserEvent)
58            log.Printf("[Alerts] User %s logged out", event.UserID)
59            // Send alert
60        }
61    }()
62
63    // Publish events
64    broker.Publish("events.user.login", UserEvent{
65        UserID: "user123",
66        Action: "login",
67        Time:   time.Now(),
68    })
69
70    broker.Publish("events.user.logout", UserEvent{
71        UserID: "user456",
72        Action: "logout",
73        Time:   time.Now(),
74    })
75
76    broker.Publish("events.system.startup", SystemEvent{
77        Component: "database",
78        Status:    "ready",
79    })
80
81    time.Sleep(100 * time.Millisecond)
82    broker.Close()
83}

Test Cases

Your implementation should pass these scenarios:

  1// Test basic pub/sub
  2func TestBasicPubSub() {
  3    broker := NewBroker()
  4    sub := broker.Subscribe("test")
  5
  6    go broker.Publish("test", "hello")
  7
  8    msg := <-sub
  9    // msg.Data should be "hello"
 10    // msg.Topic should be "test"
 11}
 12
 13// Test multiple subscribers
 14func TestMultipleSubscribers() {
 15    broker := NewBroker()
 16    sub1 := broker.Subscribe("news")
 17    sub2 := broker.Subscribe("news")
 18
 19    broker.Publish("news", "breaking")
 20
 21    msg1 := <-sub1
 22    msg2 := <-sub2
 23    // Both should receive "breaking"
 24}
 25
 26// Test no subscribers
 27func TestNoSubscribers() {
 28    broker := NewBroker()
 29    count := broker.Publish("empty", "data")
 30    // count should be 0
 31}
 32
 33// Test unsubscribe
 34func TestUnsubscribe() {
 35    broker := NewBroker()
 36    sub := broker.Subscribe("topic")
 37
 38    broker.Unsubscribe("topic", sub)
 39
 40    _, ok := <-sub
 41    // ok should be false
 42}
 43
 44// Test slow subscriber
 45func TestSlowSubscriber() {
 46    broker := NewBroker()
 47    sub := broker.SubscribeBuffered("topic", 2)
 48
 49    // Fill buffer
 50    broker.Publish("topic", 1)
 51    broker.Publish("topic", 2)
 52
 53    // This should not block publisher
 54    start := time.Now()
 55    broker.Publish("topic", 3)
 56    elapsed := time.Since(start)
 57
 58    // Should complete quickly
 59    // Without proper handling, would block forever
 60}
 61
 62// Test wildcard single level
 63func TestWildcardSingleLevel() {
 64    broker := NewBroker()
 65    sub := broker.SubscribePattern("events.*.created")
 66
 67    broker.Publish("events.user.created", "data1")
 68    broker.Publish("events.order.created", "data2")
 69    broker.Publish("events.user.deleted", "data3")  // Shouldn't match
 70
 71    msg1 := <-sub  // Should receive "data1"
 72    msg2 := <-sub  // Should receive "data2"
 73
 74    select {
 75    case <-sub:
 76        // Should not receive anything else
 77        t.Error("Received unexpected message")
 78    case <-time.After(50 * time.Millisecond):
 79        // Correct - no more messages
 80    }
 81}
 82
 83// Test wildcard multi level
 84func TestWildcardMultiLevel() {
 85    broker := NewBroker()
 86    sub := broker.SubscribePattern("events.#")
 87
 88    broker.Publish("events.user", "data1")
 89    broker.Publish("events.user.login", "data2")
 90    broker.Publish("events.system.startup.complete", "data3")
 91    broker.Publish("other.topic", "data4")  // Shouldn't match
 92
 93    received := 0
 94    timeout := time.After(100 * time.Millisecond)
 95    for received < 3 {
 96        select {
 97        case <-sub:
 98            received++
 99        case <-timeout:
100            break
101        }
102    }
103    // received should be 3
104}
105
106// Test filter function
107func TestMessageFilter() {
108    broker := NewBroker()
109    opts := SubscriberOptions{
110        Filter: func(msg Message) bool {
111            return msg.Data.(int) > 5
112        },
113    }
114    sub := broker.SubscribeWithOptions("numbers", opts)
115
116    broker.Publish("numbers", 3)  // Filtered out
117    broker.Publish("numbers", 7)  // Passes filter
118    broker.Publish("numbers", 2)  // Filtered out
119    broker.Publish("numbers", 9)  // Passes filter
120
121    msg1 := <-sub
122    msg2 := <-sub
123    // Should only receive 7 and 9
124}
125
126// Test concurrent publishing
127func TestConcurrentPublish() {
128    broker := NewBroker()
129    sub := broker.SubscribeBuffered("topic", 1000)
130
131    var wg sync.WaitGroup
132    for i := 0; i < 100; i++ {
133        wg.Add(1)
134        go func(val int) {
135            defer wg.Done()
136            broker.Publish("topic", val)
137        }(i)
138    }
139
140    wg.Wait()
141
142    count := 0
143    timeout := time.After(100 * time.Millisecond)
144    for {
145        select {
146        case <-sub:
147            count++
148        case <-timeout:
149            goto done
150        }
151    }
152    done:
153    // count should be 100
154}

Common Pitfalls

⚠️ Watch out for these common mistakes:

  1. Publisher blocking: Using unbuffered channels or blocking sends causes publishers to hang on slow subscribers
  2. Memory leaks: Not closing subscriber channels or removing from maps causes goroutine/memory leaks
  3. Race conditions: Concurrent map access without locks causes panics
  4. Sending to closed channel: Attempting to publish after subscriber unsubscribed causes panic
  5. Wildcard matching bugs: Incorrect pattern matching logic fails to match valid topics
  6. Not copying subscriber lists: Iterating over map while modifying it causes race conditions
  7. Infinite buffering: Unbounded buffers can cause memory exhaustion with fast publishers

Hints

💡 Hint 1: Non-Blocking Publish

Use select with default to avoid blocking on full channels:

1for _, sub := range subscribers {
2    select {
3    case sub <- msg:
4        count++
5    default:
6        // Subscriber full, skip or handle
7    }
8}
💡 Hint 2: Safe Subscriber Iteration

Copy the subscriber slice before iterating:

1b.mu.RLock()
2subs := make([]Subscriber, len(b.subscribers[topic]))
3copy(subs, b.subscribers[topic])
4b.mu.RUnlock()
5
6// Now iterate without holding lock
7for _, sub := range subs {
8    sub <- msg
9}
💡 Hint 3: Wildcard Matching

Implement topic matching with string splitting:

 1func matchesTopic(pattern, topic string) bool {
 2    patternParts := strings.Split(pattern, ".")
 3    topicParts := strings.Split(topic, ".")
 4
 5    for i, p := range patternParts {
 6        if p == "#" {
 7            return true  // Match everything after
 8        }
 9        if i >= len(topicParts) {
10            return false
11        }
12        if p != "*" && p != topicParts[i] {
13            return false
14        }
15    }
16    return len(patternParts) == len(topicParts)
17}
💡 Hint 4: Safe Unsubscribe

Remove from slice safely:

1for i, s := range b.subscribers[topic] {
2    if s == sub {
3        close(s)
4        // Remove by swapping with last element
5        b.subscribers[topic][i] = b.subscribers[topic][len(b.subscribers[topic])-1]
6        b.subscribers[topic] = b.subscribers[topic][:len(b.subscribers[topic])-1]
7        break
8    }
9}

Solution

Click to see the solution
  1package pubsub
  2
  3import (
  4    "strings"
  5    "sync"
  6    "time"
  7)
  8
  9type Message struct {
 10    Topic     string
 11    Data      interface{}
 12    ID        string
 13    Timestamp time.Time
 14}
 15
 16type Subscriber chan Message
 17
 18type FilterFunc func(Message) bool
 19
 20type SubscriberOptions struct {
 21    BufferSize int
 22    DropOnFull bool
 23    Filter     FilterFunc
 24}
 25
 26type subscription struct {
 27    ch      Subscriber
 28    options SubscriberOptions
 29}
 30
 31type Broker struct {
 32    mu          sync.RWMutex
 33    subscribers map[string][]*subscription
 34    patterns    map[string][]*subscription
 35    nextID      int
 36}
 37
 38func NewBroker() *Broker {
 39    return &Broker{
 40        subscribers: make(map[string][]*subscription),
 41        patterns:    make(map[string][]*subscription),
 42    }
 43}
 44
 45func Subscribe(topic string) Subscriber {
 46    return b.SubscribeBuffered(topic, 10)
 47}
 48
 49func SubscribeBuffered(topic string, size int) Subscriber {
 50    return b.SubscribeWithOptions(topic, SubscriberOptions{
 51        BufferSize: size,
 52        DropOnFull: true,
 53    })
 54}
 55
 56func SubscribeWithOptions(topic string, opts SubscriberOptions) Subscriber {
 57    b.mu.Lock()
 58    defer b.mu.Unlock()
 59
 60    sub := &subscription{
 61        ch:      make(Subscriber, opts.BufferSize),
 62        options: opts,
 63    }
 64
 65    b.subscribers[topic] = append(b.subscribers[topic], sub)
 66
 67    return sub.ch
 68}
 69
 70func SubscribePattern(pattern string) Subscriber {
 71    b.mu.Lock()
 72    defer b.mu.Unlock()
 73
 74    sub := &subscription{
 75        ch: make(Subscriber, 10),
 76        options: SubscriberOptions{
 77            BufferSize: 10,
 78            DropOnFull: true,
 79        },
 80    }
 81
 82    b.patterns[pattern] = append(b.patterns[pattern], sub)
 83
 84    return sub.ch
 85}
 86
 87func Publish(topic string, data interface{}) int {
 88    msg := Message{
 89        Topic:     topic,
 90        Data:      data,
 91        Timestamp: time.Now(),
 92    }
 93    return b.PublishMessage(msg)
 94}
 95
 96func PublishMessage(msg Message) int {
 97    b.mu.RLock()
 98
 99    // Collect exact match subscribers
100    exactSubs := make([]*subscription, len(b.subscribers[msg.Topic]))
101    copy(exactSubs, b.subscribers[msg.Topic])
102
103    // Collect pattern match subscribers
104    var patternSubs []*subscription
105    for pattern, subs := range b.patterns {
106        if b.matchesTopic(pattern, msg.Topic) {
107            patternSubs = append(patternSubs, subs...)
108        }
109    }
110
111    b.mu.RUnlock()
112
113    // Deliver to all subscribers
114    allSubs := append(exactSubs, patternSubs...)
115    count := 0
116
117    for _, sub := range allSubs {
118        // Apply filter if configured
119        if sub.options.Filter != nil && !sub.options.Filter(msg) {
120            continue
121        }
122
123        // Non-blocking send
124        select {
125        case sub.ch <- msg:
126            count++
127        default:
128            if !sub.options.DropOnFull {
129                // Try to send with timeout
130                select {
131                case sub.ch <- msg:
132                    count++
133                case <-time.After(10 * time.Millisecond):
134                    // Subscriber too slow, skip
135                }
136            }
137            // If DropOnFull is true, just skip
138        }
139    }
140
141    return count
142}
143
144func Unsubscribe(topic string, sub Subscriber) {
145    b.mu.Lock()
146    defer b.mu.Unlock()
147
148    // Remove from exact subscriptions
149    if subs, ok := b.subscribers[topic]; ok {
150        for i, s := range subs {
151            if s.ch == sub {
152                close(s.ch)
153                b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
154                if len(b.subscribers[topic]) == 0 {
155                    delete(b.subscribers, topic)
156                }
157                return
158            }
159        }
160    }
161
162    // Remove from pattern subscriptions
163    for pattern, subs := range b.patterns {
164        for i, s := range subs {
165            if s.ch == sub {
166                close(s.ch)
167                b.patterns[pattern] = append(subs[:i], subs[i+1:]...)
168                if len(b.patterns[pattern]) == 0 {
169                    delete(b.patterns, pattern)
170                }
171                return
172            }
173        }
174    }
175}
176
177func matchesTopic(pattern, topic string) bool {
178    patternParts := strings.Split(pattern, ".")
179    topicParts := strings.Split(topic, ".")
180
181    pIndex := 0
182    tIndex := 0
183
184    for pIndex < len(patternParts) && tIndex < len(topicParts) {
185        if patternParts[pIndex] == "#" {
186            return true
187        }
188        if patternParts[pIndex] == "*" {
189            pIndex++
190            tIndex++
191            continue
192        }
193        if patternParts[pIndex] != topicParts[tIndex] {
194            return false
195        }
196        pIndex++
197        tIndex++
198    }
199
200    // Handle trailing #
201    if pIndex < len(patternParts) && patternParts[pIndex] == "#" {
202        return true
203    }
204
205    return pIndex == len(patternParts) && tIndex == len(topicParts)
206}
207
208func Topics() []string {
209    b.mu.RLock()
210    defer b.mu.RUnlock()
211
212    topics := make([]string, 0, len(b.subscribers))
213    for topic := range b.subscribers {
214        topics = append(topics, topic)
215    }
216    return topics
217}
218
219func SubscriberCount(topic string) int {
220    b.mu.RLock()
221    defer b.mu.RUnlock()
222
223    return len(b.subscribers[topic])
224}
225
226func Close() {
227    b.mu.Lock()
228    defer b.mu.Unlock()
229
230    // Close all exact subscriptions
231    for topic, subs := range b.subscribers {
232        for _, sub := range subs {
233            close(sub.ch)
234        }
235        delete(b.subscribers, topic)
236    }
237
238    // Close all pattern subscriptions
239    for pattern, subs := range b.patterns {
240        for _, sub := range subs {
241            close(sub.ch)
242        }
243        delete(b.patterns, pattern)
244    }
245}

Key Takeaways

  • Pub/sub decouples publishers from subscribers
  • Non-blocking sends prevent slow subscribers from blocking publishers
  • Wildcard patterns enable flexible topic routing
  • Thread safety is essential for concurrent access
  • Message filtering reduces unnecessary processing
  • Proper cleanup prevents memory and goroutine leaks