Exercise: Pub/Sub Message Broker
Difficulty - Intermediate
Learning Objectives
- Implement publish/subscribe messaging patterns
- Manage concurrent subscribers safely
- Handle slow/blocked subscribers gracefully
- Design topic-based routing systems
- Implement wildcard topic matching
- Build message filtering and transformation
Problem Statement
Create an in-memory publish/subscribe message broker that allows publishers to send messages to topics and subscribers to receive messages from topics they're interested in. This pattern is fundamental in distributed systems, event-driven architectures, microservices communication, and real-time applications. Your implementation should handle concurrency, backpressure, and flexible topic routing.
Requirements
1. Broker Core
Implement a Broker type that:
- Manages subscribers for multiple topics
- Routes messages from publishers to subscribers
- Supports concurrent publishing and subscribing
- Provides thread-safe operations
- Gracefully handles subscriber cleanup
- Returns subscription handles for unsubscribing
Example Usage:
1broker := NewBroker()
2sub := broker.Subscribe("events.user.login")
3broker.Publish("events.user.login", "user123 logged in")
4msg := <-sub // Receives "user123 logged in"
2. Subscribe Operations
Support flexible subscription patterns:
- Subscribe to exact topic names
- Return a channel for receiving messages
- Allow multiple subscribers per topic
- Configure subscriber buffer size
- Automatically create topics on first subscription
- Support concurrent subscriptions
Example Usage:
1// Multiple subscribers on same topic
2sub1 := broker.Subscribe("orders")
3sub2 := broker.Subscribe("orders")
4// Both sub1 and sub2 receive all messages to "orders"
5
6// Configure buffer size
7sub := broker.SubscribeBuffered("logs", 100)
3. Publish Operations
Deliver messages to all subscribers:
- Publish messages to specific topics
- Broadcast to all subscribers of that topic
- Handle slow subscribers without blocking publisher
- Support any message type
- Return number of subscribers notified
- Never block on individual subscriber
Example Usage:
1type UserEvent struct {
2 UserID string
3 Action string
4}
5
6event := UserEvent{UserID: "123", Action: "login"}
7count := broker.Publish("events.user", event)
8fmt.Printf("Delivered to %d subscribers\n", count)
4. Unsubscribe Operations
Clean up subscriptions properly:
- Unsubscribe using subscription channel
- Close subscriber channel on unsubscribe
- Remove from broker's routing table
- Support unsubscribing from multiple topics
- Handle already-closed subscriptions gracefully
- Prevent memory leaks from abandoned subscriptions
Example Usage:
1sub := broker.Subscribe("events")
2// Use subscription...
3broker.Unsubscribe("events", sub)
4// sub channel is now closed
5. Wildcard Topic Matching
Support pattern-based subscriptions:
- Use hierarchical topic names
- Support
*for single-level wildcard - Support
#for multi-level wildcard - Match messages to all matching patterns
- Efficient matching algorithm
- Standard MQTT-style topic matching
Example Usage:
1// Subscribe to all user events
2sub := broker.SubscribePattern("events.user.*")
3// Receives: events.user.login, events.user.logout, etc.
4
5// Subscribe to all events
6sub := broker.SubscribePattern("events.#")
7// Receives: events.user.login, events.system.startup, etc.
6. Subscriber Options
Configure subscriber behavior:
- Set buffer size
- Choose blocking vs dropping strategy for slow consumers
- Add message filters
- Transform messages before delivery
- Set subscriber priority
- Configure automatic unsubscribe conditions
Example Usage:
1opts := SubscriberOptions{
2 BufferSize: 50,
3 DropOnFull: true, // Drop messages instead of blocking
4 Filter: func(msg Message) bool {
5 return msg.Priority == "high"
6 },
7}
8sub := broker.SubscribeWithOptions("alerts", opts)
Type Signatures
1package pubsub
2
3import "sync"
4
5// Message represents a published message
6type Message struct {
7 Topic string
8 Data interface{}
9 ID string
10 Timestamp time.Time
11}
12
13// Subscriber is a channel for receiving messages
14type Subscriber chan Message
15
16// FilterFunc determines if a message should be delivered
17type FilterFunc func(Message) bool
18
19// SubscriberOptions configures subscriber behavior
20type SubscriberOptions struct {
21 BufferSize int
22 DropOnFull bool
23 Filter FilterFunc
24}
25
26// Broker manages pub/sub message routing
27type Broker struct {
28 mu sync.RWMutex
29 subscribers map[string][]Subscriber
30 patterns map[string][]Subscriber // For wildcard subscriptions
31}
32
33// Core functions
34func NewBroker() *Broker
35func Subscribe(topic string) Subscriber
36func SubscribeBuffered(topic string, size int) Subscriber
37func SubscribeWithOptions(topic string, opts SubscriberOptions) Subscriber
38func Publish(topic string, data interface{}) int
39func PublishMessage(msg Message) int
40func Unsubscribe(topic string, sub Subscriber)
41
42// Pattern matching
43func SubscribePattern(pattern string) Subscriber
44func MatchesTopic(pattern, topic string) bool
45
46// Utility functions
47func Topics() []string
48func SubscriberCount(topic string) int
49func Close() // Close all subscriptions
Example: Event-Driven System
Here's a realistic example using pub/sub for system events:
1package main
2
3import (
4 "fmt"
5 "log"
6 "time"
7 "your-module/pubsub"
8)
9
10type UserEvent struct {
11 UserID string
12 Action string
13 Time time.Time
14}
15
16type SystemEvent struct {
17 Component string
18 Status string
19}
20
21func main() {
22 broker := pubsub.NewBroker()
23
24 // Analytics subscriber - gets all events
25 analytics := broker.SubscribePattern("events.#")
26 go func() {
27 for msg := range analytics {
28 log.Printf("[Analytics] Received: %s", msg.Topic)
29 // Send to analytics system
30 }
31 }()
32
33 // User service - only user events
34 userService := broker.SubscribePattern("events.user.*")
35 go func() {
36 for msg := range userService {
37 event := msg.Data.(UserEvent)
38 log.Printf("[UserService] %s: %s", event.UserID, event.Action)
39 // Update user statistics
40 }
41 }()
42
43 // Alert service - high-priority only
44 opts := pubsub.SubscriberOptions{
45 BufferSize: 10,
46 Filter: func(msg pubsub.Message) bool {
47 // Only alert on logout events
48 if user, ok := msg.Data.(UserEvent); ok {
49 return user.Action == "logout"
50 }
51 return false
52 },
53 }
54 alerts := broker.SubscribeWithOptions("events.user.*", opts)
55 go func() {
56 for msg := range alerts {
57 event := msg.Data.(UserEvent)
58 log.Printf("[Alerts] User %s logged out", event.UserID)
59 // Send alert
60 }
61 }()
62
63 // Publish events
64 broker.Publish("events.user.login", UserEvent{
65 UserID: "user123",
66 Action: "login",
67 Time: time.Now(),
68 })
69
70 broker.Publish("events.user.logout", UserEvent{
71 UserID: "user456",
72 Action: "logout",
73 Time: time.Now(),
74 })
75
76 broker.Publish("events.system.startup", SystemEvent{
77 Component: "database",
78 Status: "ready",
79 })
80
81 time.Sleep(100 * time.Millisecond)
82 broker.Close()
83}
Test Cases
Your implementation should pass these scenarios:
1// Test basic pub/sub
2func TestBasicPubSub() {
3 broker := NewBroker()
4 sub := broker.Subscribe("test")
5
6 go broker.Publish("test", "hello")
7
8 msg := <-sub
9 // msg.Data should be "hello"
10 // msg.Topic should be "test"
11}
12
13// Test multiple subscribers
14func TestMultipleSubscribers() {
15 broker := NewBroker()
16 sub1 := broker.Subscribe("news")
17 sub2 := broker.Subscribe("news")
18
19 broker.Publish("news", "breaking")
20
21 msg1 := <-sub1
22 msg2 := <-sub2
23 // Both should receive "breaking"
24}
25
26// Test no subscribers
27func TestNoSubscribers() {
28 broker := NewBroker()
29 count := broker.Publish("empty", "data")
30 // count should be 0
31}
32
33// Test unsubscribe
34func TestUnsubscribe() {
35 broker := NewBroker()
36 sub := broker.Subscribe("topic")
37
38 broker.Unsubscribe("topic", sub)
39
40 _, ok := <-sub
41 // ok should be false
42}
43
44// Test slow subscriber
45func TestSlowSubscriber() {
46 broker := NewBroker()
47 sub := broker.SubscribeBuffered("topic", 2)
48
49 // Fill buffer
50 broker.Publish("topic", 1)
51 broker.Publish("topic", 2)
52
53 // This should not block publisher
54 start := time.Now()
55 broker.Publish("topic", 3)
56 elapsed := time.Since(start)
57
58 // Should complete quickly
59 // Without proper handling, would block forever
60}
61
62// Test wildcard single level
63func TestWildcardSingleLevel() {
64 broker := NewBroker()
65 sub := broker.SubscribePattern("events.*.created")
66
67 broker.Publish("events.user.created", "data1")
68 broker.Publish("events.order.created", "data2")
69 broker.Publish("events.user.deleted", "data3") // Shouldn't match
70
71 msg1 := <-sub // Should receive "data1"
72 msg2 := <-sub // Should receive "data2"
73
74 select {
75 case <-sub:
76 // Should not receive anything else
77 t.Error("Received unexpected message")
78 case <-time.After(50 * time.Millisecond):
79 // Correct - no more messages
80 }
81}
82
83// Test wildcard multi level
84func TestWildcardMultiLevel() {
85 broker := NewBroker()
86 sub := broker.SubscribePattern("events.#")
87
88 broker.Publish("events.user", "data1")
89 broker.Publish("events.user.login", "data2")
90 broker.Publish("events.system.startup.complete", "data3")
91 broker.Publish("other.topic", "data4") // Shouldn't match
92
93 received := 0
94 timeout := time.After(100 * time.Millisecond)
95 for received < 3 {
96 select {
97 case <-sub:
98 received++
99 case <-timeout:
100 break
101 }
102 }
103 // received should be 3
104}
105
106// Test filter function
107func TestMessageFilter() {
108 broker := NewBroker()
109 opts := SubscriberOptions{
110 Filter: func(msg Message) bool {
111 return msg.Data.(int) > 5
112 },
113 }
114 sub := broker.SubscribeWithOptions("numbers", opts)
115
116 broker.Publish("numbers", 3) // Filtered out
117 broker.Publish("numbers", 7) // Passes filter
118 broker.Publish("numbers", 2) // Filtered out
119 broker.Publish("numbers", 9) // Passes filter
120
121 msg1 := <-sub
122 msg2 := <-sub
123 // Should only receive 7 and 9
124}
125
126// Test concurrent publishing
127func TestConcurrentPublish() {
128 broker := NewBroker()
129 sub := broker.SubscribeBuffered("topic", 1000)
130
131 var wg sync.WaitGroup
132 for i := 0; i < 100; i++ {
133 wg.Add(1)
134 go func(val int) {
135 defer wg.Done()
136 broker.Publish("topic", val)
137 }(i)
138 }
139
140 wg.Wait()
141
142 count := 0
143 timeout := time.After(100 * time.Millisecond)
144 for {
145 select {
146 case <-sub:
147 count++
148 case <-timeout:
149 goto done
150 }
151 }
152 done:
153 // count should be 100
154}
Common Pitfalls
⚠️ Watch out for these common mistakes:
- Publisher blocking: Using unbuffered channels or blocking sends causes publishers to hang on slow subscribers
- Memory leaks: Not closing subscriber channels or removing from maps causes goroutine/memory leaks
- Race conditions: Concurrent map access without locks causes panics
- Sending to closed channel: Attempting to publish after subscriber unsubscribed causes panic
- Wildcard matching bugs: Incorrect pattern matching logic fails to match valid topics
- Not copying subscriber lists: Iterating over map while modifying it causes race conditions
- Infinite buffering: Unbounded buffers can cause memory exhaustion with fast publishers
Hints
💡 Hint 1: Non-Blocking Publish
Use select with default to avoid blocking on full channels:
1for _, sub := range subscribers {
2 select {
3 case sub <- msg:
4 count++
5 default:
6 // Subscriber full, skip or handle
7 }
8}
💡 Hint 2: Safe Subscriber Iteration
Copy the subscriber slice before iterating:
1b.mu.RLock()
2subs := make([]Subscriber, len(b.subscribers[topic]))
3copy(subs, b.subscribers[topic])
4b.mu.RUnlock()
5
6// Now iterate without holding lock
7for _, sub := range subs {
8 sub <- msg
9}
💡 Hint 3: Wildcard Matching
Implement topic matching with string splitting:
1func matchesTopic(pattern, topic string) bool {
2 patternParts := strings.Split(pattern, ".")
3 topicParts := strings.Split(topic, ".")
4
5 for i, p := range patternParts {
6 if p == "#" {
7 return true // Match everything after
8 }
9 if i >= len(topicParts) {
10 return false
11 }
12 if p != "*" && p != topicParts[i] {
13 return false
14 }
15 }
16 return len(patternParts) == len(topicParts)
17}
💡 Hint 4: Safe Unsubscribe
Remove from slice safely:
1for i, s := range b.subscribers[topic] {
2 if s == sub {
3 close(s)
4 // Remove by swapping with last element
5 b.subscribers[topic][i] = b.subscribers[topic][len(b.subscribers[topic])-1]
6 b.subscribers[topic] = b.subscribers[topic][:len(b.subscribers[topic])-1]
7 break
8 }
9}
Solution
Click to see the solution
1package pubsub
2
3import (
4 "strings"
5 "sync"
6 "time"
7)
8
9type Message struct {
10 Topic string
11 Data interface{}
12 ID string
13 Timestamp time.Time
14}
15
16type Subscriber chan Message
17
18type FilterFunc func(Message) bool
19
20type SubscriberOptions struct {
21 BufferSize int
22 DropOnFull bool
23 Filter FilterFunc
24}
25
26type subscription struct {
27 ch Subscriber
28 options SubscriberOptions
29}
30
31type Broker struct {
32 mu sync.RWMutex
33 subscribers map[string][]*subscription
34 patterns map[string][]*subscription
35 nextID int
36}
37
38func NewBroker() *Broker {
39 return &Broker{
40 subscribers: make(map[string][]*subscription),
41 patterns: make(map[string][]*subscription),
42 }
43}
44
45func Subscribe(topic string) Subscriber {
46 return b.SubscribeBuffered(topic, 10)
47}
48
49func SubscribeBuffered(topic string, size int) Subscriber {
50 return b.SubscribeWithOptions(topic, SubscriberOptions{
51 BufferSize: size,
52 DropOnFull: true,
53 })
54}
55
56func SubscribeWithOptions(topic string, opts SubscriberOptions) Subscriber {
57 b.mu.Lock()
58 defer b.mu.Unlock()
59
60 sub := &subscription{
61 ch: make(Subscriber, opts.BufferSize),
62 options: opts,
63 }
64
65 b.subscribers[topic] = append(b.subscribers[topic], sub)
66
67 return sub.ch
68}
69
70func SubscribePattern(pattern string) Subscriber {
71 b.mu.Lock()
72 defer b.mu.Unlock()
73
74 sub := &subscription{
75 ch: make(Subscriber, 10),
76 options: SubscriberOptions{
77 BufferSize: 10,
78 DropOnFull: true,
79 },
80 }
81
82 b.patterns[pattern] = append(b.patterns[pattern], sub)
83
84 return sub.ch
85}
86
87func Publish(topic string, data interface{}) int {
88 msg := Message{
89 Topic: topic,
90 Data: data,
91 Timestamp: time.Now(),
92 }
93 return b.PublishMessage(msg)
94}
95
96func PublishMessage(msg Message) int {
97 b.mu.RLock()
98
99 // Collect exact match subscribers
100 exactSubs := make([]*subscription, len(b.subscribers[msg.Topic]))
101 copy(exactSubs, b.subscribers[msg.Topic])
102
103 // Collect pattern match subscribers
104 var patternSubs []*subscription
105 for pattern, subs := range b.patterns {
106 if b.matchesTopic(pattern, msg.Topic) {
107 patternSubs = append(patternSubs, subs...)
108 }
109 }
110
111 b.mu.RUnlock()
112
113 // Deliver to all subscribers
114 allSubs := append(exactSubs, patternSubs...)
115 count := 0
116
117 for _, sub := range allSubs {
118 // Apply filter if configured
119 if sub.options.Filter != nil && !sub.options.Filter(msg) {
120 continue
121 }
122
123 // Non-blocking send
124 select {
125 case sub.ch <- msg:
126 count++
127 default:
128 if !sub.options.DropOnFull {
129 // Try to send with timeout
130 select {
131 case sub.ch <- msg:
132 count++
133 case <-time.After(10 * time.Millisecond):
134 // Subscriber too slow, skip
135 }
136 }
137 // If DropOnFull is true, just skip
138 }
139 }
140
141 return count
142}
143
144func Unsubscribe(topic string, sub Subscriber) {
145 b.mu.Lock()
146 defer b.mu.Unlock()
147
148 // Remove from exact subscriptions
149 if subs, ok := b.subscribers[topic]; ok {
150 for i, s := range subs {
151 if s.ch == sub {
152 close(s.ch)
153 b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
154 if len(b.subscribers[topic]) == 0 {
155 delete(b.subscribers, topic)
156 }
157 return
158 }
159 }
160 }
161
162 // Remove from pattern subscriptions
163 for pattern, subs := range b.patterns {
164 for i, s := range subs {
165 if s.ch == sub {
166 close(s.ch)
167 b.patterns[pattern] = append(subs[:i], subs[i+1:]...)
168 if len(b.patterns[pattern]) == 0 {
169 delete(b.patterns, pattern)
170 }
171 return
172 }
173 }
174 }
175}
176
177func matchesTopic(pattern, topic string) bool {
178 patternParts := strings.Split(pattern, ".")
179 topicParts := strings.Split(topic, ".")
180
181 pIndex := 0
182 tIndex := 0
183
184 for pIndex < len(patternParts) && tIndex < len(topicParts) {
185 if patternParts[pIndex] == "#" {
186 return true
187 }
188 if patternParts[pIndex] == "*" {
189 pIndex++
190 tIndex++
191 continue
192 }
193 if patternParts[pIndex] != topicParts[tIndex] {
194 return false
195 }
196 pIndex++
197 tIndex++
198 }
199
200 // Handle trailing #
201 if pIndex < len(patternParts) && patternParts[pIndex] == "#" {
202 return true
203 }
204
205 return pIndex == len(patternParts) && tIndex == len(topicParts)
206}
207
208func Topics() []string {
209 b.mu.RLock()
210 defer b.mu.RUnlock()
211
212 topics := make([]string, 0, len(b.subscribers))
213 for topic := range b.subscribers {
214 topics = append(topics, topic)
215 }
216 return topics
217}
218
219func SubscriberCount(topic string) int {
220 b.mu.RLock()
221 defer b.mu.RUnlock()
222
223 return len(b.subscribers[topic])
224}
225
226func Close() {
227 b.mu.Lock()
228 defer b.mu.Unlock()
229
230 // Close all exact subscriptions
231 for topic, subs := range b.subscribers {
232 for _, sub := range subs {
233 close(sub.ch)
234 }
235 delete(b.subscribers, topic)
236 }
237
238 // Close all pattern subscriptions
239 for pattern, subs := range b.patterns {
240 for _, sub := range subs {
241 close(sub.ch)
242 }
243 delete(b.patterns, pattern)
244 }
245}
Key Takeaways
- Pub/sub decouples publishers from subscribers
- Non-blocking sends prevent slow subscribers from blocking publishers
- Wildcard patterns enable flexible topic routing
- Thread safety is essential for concurrent access
- Message filtering reduces unnecessary processing
- Proper cleanup prevents memory and goroutine leaks