Message Queue

Exercise: Message Queue

Difficulty - Intermediate

Learning Objectives

  • Implement in-memory message queue
  • Support multiple consumers and producers
  • Handle message acknowledgment
  • Implement priority queuing
  • Add dead letter queue for failures

Problem Statement

Create an in-memory message queue with support for multiple topics, consumer groups, and message persistence.

Core Components

 1package messagequeue
 2
 3import (
 4    "sync"
 5    "time"
 6)
 7
 8type Message struct {
 9    ID        string
10    Topic     string
11    Payload   []byte
12    Priority  int
13    Timestamp time.Time
14    Retries   int
15}
16
17type Queue struct {
18    topics   map[string]*topic
19    mu       sync.RWMutex
20    dlq      *topic
21}
22
23type topic struct {
24    messages chan *Message
25    mu       sync.RWMutex
26}
27
28func New(topicBufferSize int) *Queue
29func Publish(topic string, payload []byte, priority int) error
30func Subscribe(topic string)
31func Ack(msg *Message) error
32func Nack(msg *Message) error
33func DeadLetterQueue() <-chan *Message

Solution

Click to see the solution
  1package messagequeue
  2
  3import (
  4    "errors"
  5    "fmt"
  6    "sync"
  7    "time"
  8)
  9
 10const (
 11    maxRetries = 3
 12)
 13
 14type Message struct {
 15    ID        string
 16    Topic     string
 17    Payload   []byte
 18    Priority  int
 19    Timestamp time.Time
 20    Retries   int
 21}
 22
 23type Queue struct {
 24    topics      map[string]*topic
 25    mu          sync.RWMutex
 26    dlq         chan *Message
 27    bufferSize  int
 28    nextID      int
 29    idMu        sync.Mutex
 30}
 31
 32type topic struct {
 33    messages chan *Message
 34}
 35
 36func New(bufferSize int) *Queue {
 37    return &Queue{
 38        topics:     make(map[string]*topic),
 39        dlq:        make(chan *Message, bufferSize),
 40        bufferSize: bufferSize,
 41    }
 42}
 43
 44func getOrCreateTopic(name string) *topic {
 45    q.mu.Lock()
 46    defer q.mu.Unlock()
 47
 48    if t, exists := q.topics[name]; exists {
 49        return t
 50    }
 51
 52    t := &topic{
 53        messages: make(chan *Message, q.bufferSize),
 54    }
 55    q.topics[name] = t
 56    return t
 57}
 58
 59func nextMessageID() string {
 60    q.idMu.Lock()
 61    defer q.idMu.Unlock()
 62    q.nextID++
 63    return fmt.Sprintf("msg-%d", q.nextID)
 64}
 65
 66func Publish(topicName string, payload []byte, priority int) error {
 67    t := q.getOrCreateTopic(topicName)
 68
 69    msg := &Message{
 70        ID:        q.nextMessageID(),
 71        Topic:     topicName,
 72        Payload:   payload,
 73        Priority:  priority,
 74        Timestamp: time.Now(),
 75        Retries:   0,
 76    }
 77
 78    select {
 79    case t.messages <- msg:
 80        return nil
 81    default:
 82        return errors.New("topic buffer full")
 83    }
 84}
 85
 86func Subscribe(topicName string) {
 87    t := q.getOrCreateTopic(topicName)
 88    return t.messages, nil
 89}
 90
 91func Ack(msg *Message) error {
 92    // Message successfully processed, do nothing
 93    return nil
 94}
 95
 96func Nack(msg *Message) error {
 97    msg.Retries++
 98
 99    if msg.Retries >= maxRetries {
100        // Send to dead letter queue
101        select {
102        case q.dlq <- msg:
103            return nil
104        default:
105            return errors.New("dead letter queue full")
106        }
107    }
108
109    // Requeue message
110    t := q.getOrCreateTopic(msg.Topic)
111    select {
112    case t.messages <- msg:
113        return nil
114    default:
115        return errors.New("failed to requeue message")
116    }
117}
118
119func DeadLetterQueue() <-chan *Message {
120    return q.dlq
121}
122
123func Close() {
124    q.mu.Lock()
125    defer q.mu.Unlock()
126
127    for _, t := range q.topics {
128        close(t.messages)
129    }
130    close(q.dlq)
131}

Key Takeaways

  • Message queues decouple producers and consumers
  • Topics enable pub/sub pattern
  • Acknowledgment ensures reliable delivery
  • Dead letter queue handles permanent failures
  • Priority queuing enables SLA differentiation