Exercise: Message Queue
Difficulty - Intermediate
Learning Objectives
- Implement in-memory message queue
- Support multiple consumers and producers
- Handle message acknowledgment
- Implement priority queuing
- Add dead letter queue for failures
Problem Statement
Create an in-memory message queue with support for multiple topics, consumer groups, and message persistence.
Core Components
1package messagequeue
2
3import (
4 "sync"
5 "time"
6)
7
8type Message struct {
9 ID string
10 Topic string
11 Payload []byte
12 Priority int
13 Timestamp time.Time
14 Retries int
15}
16
17type Queue struct {
18 topics map[string]*topic
19 mu sync.RWMutex
20 dlq *topic
21}
22
23type topic struct {
24 messages chan *Message
25 mu sync.RWMutex
26}
27
28func New(topicBufferSize int) *Queue
29func Publish(topic string, payload []byte, priority int) error
30func Subscribe(topic string)
31func Ack(msg *Message) error
32func Nack(msg *Message) error
33func DeadLetterQueue() <-chan *Message
Solution
Click to see the solution
1package messagequeue
2
3import (
4 "errors"
5 "fmt"
6 "sync"
7 "time"
8)
9
10const (
11 maxRetries = 3
12)
13
14type Message struct {
15 ID string
16 Topic string
17 Payload []byte
18 Priority int
19 Timestamp time.Time
20 Retries int
21}
22
23type Queue struct {
24 topics map[string]*topic
25 mu sync.RWMutex
26 dlq chan *Message
27 bufferSize int
28 nextID int
29 idMu sync.Mutex
30}
31
32type topic struct {
33 messages chan *Message
34}
35
36func New(bufferSize int) *Queue {
37 return &Queue{
38 topics: make(map[string]*topic),
39 dlq: make(chan *Message, bufferSize),
40 bufferSize: bufferSize,
41 }
42}
43
44func getOrCreateTopic(name string) *topic {
45 q.mu.Lock()
46 defer q.mu.Unlock()
47
48 if t, exists := q.topics[name]; exists {
49 return t
50 }
51
52 t := &topic{
53 messages: make(chan *Message, q.bufferSize),
54 }
55 q.topics[name] = t
56 return t
57}
58
59func nextMessageID() string {
60 q.idMu.Lock()
61 defer q.idMu.Unlock()
62 q.nextID++
63 return fmt.Sprintf("msg-%d", q.nextID)
64}
65
66func Publish(topicName string, payload []byte, priority int) error {
67 t := q.getOrCreateTopic(topicName)
68
69 msg := &Message{
70 ID: q.nextMessageID(),
71 Topic: topicName,
72 Payload: payload,
73 Priority: priority,
74 Timestamp: time.Now(),
75 Retries: 0,
76 }
77
78 select {
79 case t.messages <- msg:
80 return nil
81 default:
82 return errors.New("topic buffer full")
83 }
84}
85
86func Subscribe(topicName string) {
87 t := q.getOrCreateTopic(topicName)
88 return t.messages, nil
89}
90
91func Ack(msg *Message) error {
92 // Message successfully processed, do nothing
93 return nil
94}
95
96func Nack(msg *Message) error {
97 msg.Retries++
98
99 if msg.Retries >= maxRetries {
100 // Send to dead letter queue
101 select {
102 case q.dlq <- msg:
103 return nil
104 default:
105 return errors.New("dead letter queue full")
106 }
107 }
108
109 // Requeue message
110 t := q.getOrCreateTopic(msg.Topic)
111 select {
112 case t.messages <- msg:
113 return nil
114 default:
115 return errors.New("failed to requeue message")
116 }
117}
118
119func DeadLetterQueue() <-chan *Message {
120 return q.dlq
121}
122
123func Close() {
124 q.mu.Lock()
125 defer q.mu.Unlock()
126
127 for _, t := range q.topics {
128 close(t.messages)
129 }
130 close(q.dlq)
131}
Key Takeaways
- Message queues decouple producers and consumers
- Topics enable pub/sub pattern
- Acknowledgment ensures reliable delivery
- Dead letter queue handles permanent failures
- Priority queuing enables SLA differentiation