Performance Optimization Lab

Performance Optimization Lab

Exercise Overview

Optimize a high-throughput data processing system that's experiencing performance issues. You'll use Go's profiling tools to identify bottlenecks, analyze memory allocation patterns, and implement performance improvements.

Learning Objectives

  • Use Go's built-in profiling tools effectively
  • Identify and fix CPU bottlenecks and memory leaks
  • Optimize memory allocation and reduce garbage collection pressure
  • Implement efficient concurrency patterns
  • Apply performance optimization best practices
  • Measure and validate performance improvements

The Problem - Slow Data Processing Pipeline

You have a data processing pipeline that processes JSON messages from a message queue, applies transformations, and stores results in a database. The system is experiencing high latency and memory usage under load.

Initial Code

  1package main
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log"
  8	"math/rand"
  9	"runtime"
 10	"sync"
 11	"time"
 12)
 13
 14// TODO: Profile and optimize this structure
 15type Message struct {
 16	ID        string                 `json:"id"`
 17	Timestamp int64                 `json:"timestamp"`
 18	Data      map[string]interface{} `json:"data"`
 19	Processed bool                  `json:"processed"`
 20}
 21
 22// TODO: Optimize this processor
 23type MessageProcessor struct {
 24	inputChan    chan Message
 25	outputChan   chan ProcessedMessage
 26	dbPool       *DatabasePool
 27	transformers []Transformer
 28	mu           sync.Mutex
 29	processed    int
 30	errors       int
 31}
 32
 33type ProcessedMessage struct {
 34	OriginalID string
 35	Result     map[string]interface{}
 36	Duration   time.Duration
 37	Error      error
 38}
 39
 40type DatabasePool struct {
 41	connections []DatabaseConnection
 42	mu          sync.Mutex
 43}
 44
 45type DatabaseConnection struct {
 46	ID     int
 47	Active bool
 48}
 49
 50type Transformer interface {
 51	Transform(data map[string]interface{})
 52}
 53
 54// TODO: Optimize these transformers
 55type JSONTransformer struct {
 56	name     string
 57	cache    map[string]interface{}
 58	cacheMu  sync.RWMutex
 59}
 60
 61type MathTransformer struct {
 62	operations []func(float64) float64
 63}
 64
 65// TODO: Fix this message generator
 66type MessageGenerator struct {
 67	ratePerSecond int
 68	messageTypes  []string
 69	running       bool
 70}
 71
 72func Start(ctx context.Context) <-chan Message {
 73	ch := make(chan Message)
 74
 75	go func() {
 76		defer close(ch)
 77		ticker := time.NewTicker(time.Second / time.Duration(mg.ratePerSecond))
 78		defer ticker.Stop()
 79
 80		for {
 81			select {
 82			case <-ctx.Done():
 83				return
 84			case <-ticker.C:
 85				// TODO: Optimize message generation
 86				message := Message{
 87					ID:        fmt.Sprintf("msg_%d", rand.Intn(1000000)),
 88					Timestamp: time.Now().Unix(),
 89					Data: map[string]interface{}{
 90						"type": mg.messageTypes[rand.Intn(len(mg.messageTypes))],
 91						"value": rand.Float64() * 1000,
 92						"metadata": map[string]interface{}{
 93							"source": "generator",
 94							"tags":    []string{"tag1", "tag2", "tag3"},
 95						},
 96					},
 97				}
 98				ch <- message
 99			}
100		}
101	}()
102
103	return ch
104}
105
106// TODO: Optimize this processor
107func Start(ctx context.Context) {
108	// Create worker pool
109	for i := 0; i < 10; i++ {
110		go mp.worker(ctx)
111	}
112
113	// Process results
114	go mp.resultHandler(ctx)
115}
116
117func worker(ctx context.Context) {
118	for {
119		select {
120		case <-ctx.Done():
121			return
122		case msg := <-mp.inputChan:
123			start := time.Now()
124
125			// Process message
126			result, err := mp.processMessage(msg)
127
128			duration := time.Since(start)
129
130			processed := ProcessedMessage{
131				OriginalID: msg.ID,
132				Result:     result,
133				Duration:   duration,
134				Error:      err,
135			}
136
137			select {
138			case mp.outputChan <- processed:
139			case <-ctx.Done():
140				return
141			}
142
143			// Update counters
144			mp.mu.Lock()
145			if err != nil {
146				mp.errors++
147			} else {
148				mp.processed++
149			}
150			mp.mu.Unlock()
151		}
152	}
153}
154
155func processMessage(msg Message) {
156	var result map[string]interface{} = make(map[string]interface{})
157
158	// Apply transformers
159	for _, transformer := range mp.transformers {
160		transformed, err := transformer.Transform(result)
161		if err != nil {
162			return nil, err
163		}
164		result = transformed
165	}
166
167	// Simulate database operation
168	time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
169
170	return result, nil
171}
172
173func resultHandler(ctx context.Context) {
174	for {
175		select {
176		case <-ctx.Done():
177			return
178		case result := <-mp.outputChan:
179			// TODO: Optimize result handling
180			if result.Error != nil {
181				log.Printf("Error processing message %s: %v", result.OriginalID, result.Error)
182				continue
183			}
184
185			// Store in database
186			err := mp.dbPool.Store(result)
187			if err != nil {
188				log.Printf("Failed to store result %s: %v", result.OriginalID, err)
189			}
190		}
191	}
192}
193
194func Store(result ProcessedMessage) error {
195	// TODO: Optimize database operations
196	db.mu.Lock()
197	defer db.mu.Unlock()
198
199	// Find available connection
200	for _, conn := range db.connections {
201		if conn.Active {
202			// Simulate database write
203			time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
204			return nil
205		}
206	}
207
208	return fmt.Errorf("no available database connections")
209}
210
211// TODO: Optimize transformer implementations
212func Transform(data map[string]interface{}) {
213	jt.cacheMu.Lock()
214	defer jt.cacheMu.Unlock()
215
216	// TODO: Fix this cache implementation
217	cacheKey := fmt.Sprintf("%v", data)
218	if cached, exists := jt.cache[cacheKey]; exists {
219		return cached.(map[string]interface{}), nil
220	}
221
222	// Transform data
223	result := make(map[string]interface{})
224	for k, v := range data {
225		if jsonStr, err := json.Marshal(v); err == nil {
226			result[k+"_json"] = string(jsonStr)
227		}
228	}
229
230	// Cache result
231	jt.cache[cacheKey] = result
232
233	return result, nil
234}
235
236func Transform(data map[string]interface{}) {
237	// TODO: Optimize math operations
238	result := make(map[string]interface{})
239
240	for k, v := range data {
241		if value, ok := v.(float64); ok {
242			for _, op := range mt.operations {
243				value = op(value)
244			}
245			result[k+"_math"] = value
246		}
247	}
248
249	return result, nil
250}
251
252func main() {
253	// TODO: Add profiling endpoints
254	// TODO: Optimize system configuration
255
256	// Initialize components
257	generator := &MessageGenerator{
258		ratePerSecond: 1000,
259		messageTypes:  []string{"user_action", "system_event", "metrics", "logs"},
260	}
261
262	dbPool := &DatabasePool{
263		connections: make([]DatabaseConnection, 20),
264	}
265	for i := range dbPool.connections {
266		dbPool.connections[i] = DatabaseConnection{ID: i, Active: true}
267	}
268
269	transformers := []Transformer{
270		&JSONTransformer{
271			name:  "json_transformer",
272			cache: make(map[string]interface{}),
273		},
274		&MathTransformer{
275			operations: []func(float64) float64{
276				func(x float64) float64 { return x * 2 },
277				func(x float64) float64 { return x + 1 },
278				func(x float64) float64 { return math.Sqrt(x) },
279			},
280		},
281	}
282
283	processor := &MessageProcessor{
284		inputChan:    make(chan Message, 1000),
285		outputChan:   make(chan ProcessedMessage, 1000),
286		dbPool:       dbPool,
287		transformers: transformers,
288	}
289
290	// Start processing
291	ctx, cancel := context.WithCancel(context.Background())
292	defer cancel()
293
294	processor.Start(ctx)
295	messageChan := generator.Start(ctx)
296
297	// Feed messages to processor
298	go func() {
299		for msg := range messageChan {
300			processor.inputChan <- msg
301		}
302	}()
303
304	// Monitor performance
305	ticker := time.NewTicker(5 * time.Second)
306	defer ticker.Stop()
307
308	for {
309		select {
310		case <-ticker.C:
311			// TODO: Optimize performance monitoring
312			var m runtime.MemStats
313			runtime.ReadMemStats(&m)
314
315			log.Printf("Processed: %d, Errors: %d, Memory: %d MB, Goroutines: %d",
316				processor.processed, processor.errors,
317				m.Alloc/1024/1024, runtime.NumGoroutine())
318		}
319	}
320}

Tasks

Task 1: Add Profiling Support

Add profiling endpoints to monitor CPU, memory, and goroutine behavior:

 1import (
 2    _ "net/http/pprof"
 3    "net/http"
 4)
 5
 6func startProfilingServer() {
 7    go func() {
 8        log.Println("Profiling server starting on :6060")
 9        log.Fatal(http.ListenAndServe("localhost:6060", nil))
10    }()
11}
12
13// Add to main()
14startProfilingServer()

Task 2: Identify Memory Allocation Issues

Fix excessive allocations in the message processing pipeline:

 1// Use object pooling for messages
 2var messagePool = sync.Pool{
 3    New: func() interface{} {
 4        return &Message{}
 5    },
 6}
 7
 8func generateMessage() *Message {
 9    msg := messagePool.Get().(*Message)
10    *msg = Message{
11        ID:        fmt.Sprintf("msg_%d", rand.Intn(1000000)),
12        Timestamp: time.Now().Unix(),
13        Data:      mg.generateData(),
14    }
15    return msg
16}
17
18func releaseMessage(msg *Message) {
19    messagePool.Put(msg)
20}

Task 3: Optimize JSON Transformer

Fix the inefficient caching mechanism in JSONTransformer:

 1type JSONTransformer struct {
 2    name    string
 3    cache   sync.Map // Use sync.Map for better concurrency
 4    bufPool sync.Pool // Buffer pool for JSON encoding
 5}
 6
 7func Transform(data map[string]interface{}) {
 8    // Use pre-allocated buffer
 9    buf := jt.bufPool.Get().(*bytes.Buffer)
10    defer func() {
11        buf.Reset()
12        jt.bufPool.Put(buf)
13    }()
14
15    result := make(map[string]interface{}, len(data)) // Pre-allocate
16
17    for k, v := range data {
18        buf.Reset()
19        if err := json.NewEncoder(buf).Encode(v); err == nil {
20            result[k+"_json"] = strings.TrimSpace(buf.String())
21        }
22    }
23
24    return result, nil
25}

Task 4: Implement Efficient Worker Pool

Optimize the worker pool pattern for better performance:

 1type OptimizedProcessor struct {
 2    workers      int
 3    inputChan    chan *Message
 4    outputChan   chan *ProcessedMessage
 5    dbBatch      *BatchProcessor
 6    transformer  *JSONTransformer
 7    stats        *atomic.Stats
 8}
 9
10func Start(ctx context.Context) {
11    var wg sync.WaitGroup
12
13    // Start workers
14    for i := 0; i < op.workers; i++ {
15        wg.Add(1)
16        go func(workerID int) {
17            defer wg.Done()
18            op.worker(ctx, workerID)
19        }(i)
20    }
21
22    // Start batch processor
23    go op.dbBatch.Start(ctx)
24
25    wg.Wait()
26}
27
28func worker(ctx context.Context, workerID int) {
29    for {
30        select {
31        case <-ctx.Done():
32            return
33        case msg := <-op.inputChan:
34            op.processMessage(msg)
35            messagePool.Put(msg) // Return to pool
36        }
37    }
38}

Task 5: Add Batch Processing

Implement efficient batch database operations:

 1type BatchProcessor struct {
 2    batchSize   int
 3    timeout     time.Duration
 4    buffer      []*ProcessedMessage
 5    bufferMu    sync.Mutex
 6    dbPool      *DatabasePool
 7    flushChan   chan struct{}
 8}
 9
10func Add(result *ProcessedMessage) {
11    bp.bufferMu.Lock()
12    defer bp.bufferMu.Unlock()
13
14    bp.buffer = append(bp.buffer, result)
15
16    if len(bp.buffer) >= bp.batchSize {
17        select {
18        case bp.flushChan <- struct{}{}:
19        default:
20        }
21    }
22}
23
24func flush() {
25    bp.bufferMu.Lock()
26    if len(bp.buffer) == 0 {
27        bp.bufferMu.Unlock()
28        return
29    }
30
31    batch := make([]*ProcessedMessage, len(bp.buffer))
32    copy(batch, bp.buffer)
33    bp.buffer = bp.buffer[:0] // Reuse slice
34    bp.bufferMu.Unlock()
35
36    // Process batch
37    bp.dbPool.StoreBatch(batch)
38}

Solution Approach

Click to see detailed solution

Complete Optimized Implementation:

  1package main
  2
  3import (
  4	"bytes"
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"log"
  9	"math"
 10	"math/rand"
 11	"net/http"
 12	_ "net/http/pprof"
 13	"runtime"
 14	"strings"
 15	"sync"
 16	"sync/atomic"
 17	"time"
 18)
 19
 20// Optimized message structure with better memory layout
 21type Message struct {
 22	ID        string                 `json:"id"`
 23	Timestamp int64                 `json:"timestamp"`
 24	Data      map[string]interface{} `json:"data"`
 25	Processed bool                  `json:"processed"`
 26}
 27
 28type ProcessedMessage struct {
 29	OriginalID string
 30	Result     map[string]interface{}
 31	Duration   time.Duration
 32	Error      error
 33}
 34
 35// Object pools to reduce allocations
 36var (
 37	messagePool = sync.Pool{
 38		New: func() interface{} {
 39			return &Message{
 40				Data: make(map[string]interface{}),
 41			}
 42		},
 43	}
 44
 45	processedMessagePool = sync.Pool{
 46		New: func() interface{} {
 47			return &ProcessedMessage{
 48				Result: make(map[string]interface{}),
 49			}
 50		},
 51	}
 52
 53	bufferPool = sync.Pool{
 54		New: func() interface{} {
 55			return bytes.NewBuffer(make([]byte, 0, 1024))
 56		},
 57	}
 58)
 59
 60// Optimized message processor with batch processing
 61type OptimizedProcessor struct {
 62	workers        int
 63	inputChan      chan *Message
 64	outputChan     chan *ProcessedMessage
 65	dbBatch        *BatchProcessor
 66	transformers   []Transformer
 67	stats          *ProcessorStats
 68	workerWg       sync.WaitGroup
 69}
 70
 71type ProcessorStats struct {
 72	Processed int64
 73	Errors    int64
 74	Duration  int64 // Total processing time in nanoseconds
 75}
 76
 77func AddProcessed(duration time.Duration) {
 78	atomic.AddInt64(&ps.Processed, 1)
 79	atomic.AddInt64(&ps.Duration, duration.Nanoseconds())
 80}
 81
 82func AddError() {
 83	atomic.AddInt64(&ps.Errors, 1)
 84}
 85
 86func GetStats() {
 87	processed := atomic.LoadInt64(&ps.Processed)
 88	errors := atomic.LoadInt64(&ps.Errors)
 89	duration := time.Duration(atomic.LoadInt64(&ps.Duration))
 90	return processed, errors, duration
 91}
 92
 93// Efficient batch processor for database operations
 94type BatchProcessor struct {
 95	batchSize   int
 96	timeout     time.Duration
 97	buffer      []*ProcessedMessage
 98	bufferMu    sync.Mutex
 99	dbPool      *DatabasePool
100	flushChan   chan struct{}
101	ctx         context.Context
102	cancel      context.CancelFunc
103}
104
105func NewBatchProcessor(batchSize int, timeout time.Duration, dbPool *DatabasePool) *BatchProcessor {
106	ctx, cancel := context.WithCancel(context.Background())
107	return &BatchProcessor{
108		batchSize: batchSize,
109		timeout:   timeout,
110		buffer:    make([]*ProcessedMessage, 0, batchSize),
111		dbPool:    dbPool,
112		flushChan: make(chan struct{}, 1),
113		ctx:       ctx,
114		cancel:    cancel,
115	}
116}
117
118func Add(result *ProcessedMessage) {
119	bp.bufferMu.Lock()
120	bp.buffer = append(bp.buffer, result)
121	shouldFlush := len(bp.buffer) >= bp.batchSize
122	bp.bufferMu.Unlock()
123
124	if shouldFlush {
125		select {
126		case bp.flushChan <- struct{}{}:
127		default:
128		}
129	}
130}
131
132func flush() {
133	bp.bufferMu.Lock()
134	if len(bp.buffer) == 0 {
135		bp.bufferMu.Unlock()
136		return
137	}
138
139	// Copy buffer to avoid holding lock during database operation
140	batch := make([]*ProcessedMessage, len(bp.buffer))
141	copy(batch, bp.buffer)
142	bp.buffer = bp.buffer[:0] // Reset slice length but keep capacity
143	bp.bufferMu.Unlock()
144
145	// Process batch
146	start := time.Now()
147	err := bp.dbPool.StoreBatch(batch)
148	duration := time.Since(start)
149
150	if err != nil {
151		log.Printf("Batch store error: %v", err, len(batch))
152	} else {
153		log.Printf("Stored batch of %d messages in %v", len(batch), duration)
154	}
155
156	// Return messages to pool
157	for _, msg := range batch {
158		processedMessagePool.Put(msg)
159	}
160}
161
162func Start(ctx context.Context) {
163	ticker := time.NewTicker(bp.timeout)
164	defer ticker.Stop()
165
166	for {
167		select {
168		case <-ctx.Done():
169			bp.flush() // Flush remaining messages
170			return
171		case <-bp.flushChan:
172			bp.flush()
173		case <-ticker.C:
174			bp.flush() // Periodic flush based on timeout
175		}
176	}
177}
178
179// Optimized database connection pool
180type DatabasePool struct {
181	connections []DatabaseConnection
182	mu          sync.RWMutex
183	nextConn    int64 // Atomic counter for round-robin
184}
185
186type DatabaseConnection struct {
187	ID     int
188	Active bool
189	lastUsed time.Time
190}
191
192func StoreBatch(messages []*ProcessedMessage) error {
193	db.mu.RLock()
194	defer db.mu.RUnlock()
195
196	// Use multiple connections for parallel processing
197	var wg sync.WaitGroup
198	errChan := make(chan error, len(messages))
199
200	for i, msg := range messages {
201		connIndex :=) - 1) % len(db.connections)
202		conn := &db.connections[connIndex]
203
204		if !conn.Active {
205			errChan <- fmt.Errorf("connection %d not active", connIndex)
206			continue
207		}
208
209		wg.Add(1)
210		go func(msg *ProcessedMessage, conn *DatabaseConnection) {
211			defer wg.Done()
212			// Simulate optimized database write
213			time.Sleep(time.Duration(rand.Intn(2)+1) * time.Millisecond)
214			conn.lastUsed = time.Now()
215		}(msg, conn)
216	}
217
218	wg.Wait()
219	close(errChan)
220
221	// Check for errors
222	for err := range errChan {
223		if err != nil {
224			return err
225		}
226	}
227
228	return nil
229}
230
231// Optimized transformer implementations
232type OptimizedJSONTransformer struct {
233	name  string
234	cache sync.Map
235}
236
237func Transform(data map[string]interface{}) {
238	result := make(map[string]interface{}, len(data))
239
240	for k, v := range data {
241		buf := bufferPool.Get().(*bytes.Buffer)
242		buf.Reset()
243
244		if err := json.NewEncoder(buf).Encode(v); err == nil {
245			// Trim newline added by Encode
246			jsonStr := strings.TrimSpace(buf.String())
247			result[k+"_json"] = jsonStr
248		}
249
250		bufferPool.Put(buf)
251	}
252
253	return result, nil
254}
255
256type OptimizedMathTransformer struct {
257	operations []func(float64) float64
258}
259
260func Transform(data map[string]interface{}) {
261	result := make(map[string]interface{}, len(data))
262
263	for k, v := range data {
264		if value, ok := v.(float64); ok {
265			// Apply operations in-place to reduce allocations
266			for _, op := range mt.operations {
267				value = op(value)
268			}
269			result[k+"_math"] = value
270		}
271	}
272
273	return result, nil
274}
275
276// Optimized message generator
277type OptimizedMessageGenerator struct {
278	ratePerSecond int
279	messageTypes  []string
280	interval      time.Duration
281}
282
283func Start(ctx context.Context) <-chan *Message {
284	ch := make(chan *Message, 1000) // Buffered channel
285
286	go func() {
287		defer close(ch)
288		ticker := time.NewTicker(mg.interval)
289		defer ticker.Stop()
290
291		for {
292			select {
293			case <-ctx.Done():
294				return
295			case <-ticker.C:
296				msg := mg.generateMessage()
297				select {
298				case ch <- msg:
299				case <-ctx.Done():
300					messagePool.Put(msg)
301					return
302				default:
303					// Drop message if channel is full to prevent backpressure
304					messagePool.Put(msg)
305				}
306			}
307		}
308	}()
309
310	return ch
311}
312
313func generateMessage() *Message {
314	msg := messagePool.Get().(*Message)
315
316	// Reset message data
317	msg.ID = fmt.Sprintf("msg_%d", rand.Int63())
318	msg.Timestamp = time.Now().Unix()
319	msg.Processed = false
320
321	// Clear and regenerate data
322	for k := range msg.Data {
323		delete(msg.Data, k)
324	}
325
326	msg.Data["type"] = mg.messageTypes[rand.Intn(len(mg.messageTypes))]
327	msg.Data["value"] = rand.Float64() * 1000
328	msg.Data["metadata"] = map[string]interface{}{
329		"source": "generator",
330		"tags":   []string{"tag1", "tag2", "tag3"},
331	}
332
333	return msg
334}
335
336func NewOptimizedProcessor(workers int, dbPool *DatabasePool) *OptimizedProcessor {
337	return &OptimizedProcessor{
338		workers:    workers,
339		inputChan:  make(chan *Message, workers*10),
340		outputChan: make(chan *ProcessedMessage, workers*10),
341		dbBatch:    NewBatchProcessor(100, 100*time.Millisecond, dbPool),
342		transformers: []Transformer{
343			&OptimizedJSONTransformer{name: "json_transformer"},
344			&OptimizedMathTransformer{
345				operations: []func(float64) float64{
346					func(x float64) float64 { return x * 2 },
347					func(x float64) float64 { return x + 1 },
348					func(x float64) float64 { return math.Sqrt(x) },
349				},
350			},
351		},
352		stats: &ProcessorStats{},
353	}
354}
355
356func Start(ctx context.Context) {
357	// Start batch processor
358	go op.dbBatch.Start(ctx)
359
360	// Start workers
361	for i := 0; i < op.workers; i++ {
362		op.workerWg.Add(1)
363		go func(workerID int) {
364			defer op.workerWg.Done()
365			op.worker(ctx, workerID)
366		}(i)
367	}
368
369	// Start result handler
370	go op.resultHandler(ctx)
371}
372
373func worker(ctx context.Context, workerID int) {
374	for {
375		select {
376		case <-ctx.Done():
377			return
378		case msg := <-op.inputChan:
379			op.processMessage(msg)
380		}
381	}
382}
383
384func processMessage(msg *Message) {
385	start := time.Now()
386
387	result := processedMessagePool.Get().(*ProcessedMessage)
388	result.OriginalID = msg.ID
389	result.Duration = 0
390	result.Error = nil
391
392	// Clear previous result data
393	for k := range result.Result {
394		delete(result.Result, k)
395	}
396
397	// Apply transformers
398	var err error
399	transformData := msg.Data
400	for _, transformer := range op.transformers {
401		transformData, err = transformer.Transform(transformData)
402		if err != nil {
403			result.Error = err
404			break
405		}
406	}
407
408	if result.Error == nil {
409		// Copy transformed data to result
410		for k, v := range transformData {
411			result.Result[k] = v
412		}
413	}
414
415	result.Duration = time.Since(start)
416
417	// Add to batch processor
418	op.dbBatch.Add(result)
419
420	// Update statistics
421	if result.Error != nil {
422		op.stats.AddError()
423	} else {
424		op.stats.AddProcessed(result.Duration)
425	}
426
427	// Return message to pool
428	messagePool.Put(msg)
429}
430
431func resultHandler(ctx context.Context) {
432	// This is now handled by the batch processor
433}
434
435func InputChan() chan<- *Message {
436	return op.inputChan
437}
438
439func GetStats() {
440	return op.stats.GetStats()
441}
442
443// Profiling support
444func startProfilingServer() {
445	go func() {
446		log.Println("Profiling server starting on :6060")
447		if err := http.ListenAndServe("localhost:6060", nil); err != nil {
448			log.Printf("Profiling server error: %v", err)
449		}
450	}()
451}
452
453// Performance monitoring
454func startPerformanceMonitor(processor *OptimizedProcessor, ctx context.Context) {
455	ticker := time.NewTicker(5 * time.Second)
456	defer ticker.Stop()
457
458	var lastProcessed, lastErrors int64
459	var lastTime time.Time = time.Now()
460
461	for {
462		select {
463		case <-ctx.Done():
464			return
465		case <-ticker.C:
466			processed, errors, duration := processor.GetStats()
467			now := time.Now()
468
469			var m runtime.MemStats
470			runtime.ReadMemStats(&m)
471
472			// Calculate rates
473			timeDiff := now.Sub(lastTime).Seconds()
474			processedRate := float64(processed-lastProcessed) / timeDiff
475			errorsRate := float64(errors-lastErrors) / timeDiff
476
477			avgDuration := time.Duration(0)
478			if processed > 0 {
479				avgDuration = duration / time.Duration(processed)
480			}
481
482			log.Printf("Performance Stats:\n"+
483				"  Messages: %d processed, %d errors\n"+
484				"  Avg Duration: %v\n"+
485				"  Memory: %d MB allocated, %d MB heap, %d MB stack\n"+
486				"  Goroutines: %d, GC cycles: %d",
487				processed, processedRate, errors, errorsRate,
488				avgDuration,
489				m.Alloc/1024/1024, m.HeapAlloc/1024/1024, m.StackInuse/1024/1024,
490				runtime.NumGoroutine(), m.NumGC)
491
492			lastProcessed, lastErrors, lastTime = processed, errors, now
493		}
494	}
495}
496
497func main() {
498	// Start profiling server
499	startProfilingServer()
500
501	// Configure runtime for better performance
502	runtime.GOMAXPROCS(runtime.NumCPU())
503
504	// Initialize optimized components
505	generator := &OptimizedMessageGenerator{
506		ratePerSecond: 5000,
507		messageTypes:  []string{"user_action", "system_event", "metrics", "logs"},
508		interval:      time.Second / time.Duration(5000),
509	}
510
511	dbPool := &DatabasePool{
512		connections: make([]DatabaseConnection, 20),
513	}
514	for i := range dbPool.connections {
515		dbPool.connections[i] = DatabaseConnection{
516			ID:       i,
517			Active:   true,
518			lastUsed: time.Now(),
519		}
520	}
521
522	processor := NewOptimizedProcessor(runtime.NumCPU()*2, dbPool)
523
524	// Start processing
525	ctx, cancel := context.WithCancel(context.Background())
526	defer cancel()
527
528	processor.Start(ctx)
529
530	// Start performance monitor
531	go startPerformanceMonitor(processor, ctx)
532
533	// Start message generation and feed to processor
534	messageChan := generator.Start(ctx)
535
536	go func() {
537		for msg := range messageChan {
538			select {
539			case processor.InputChan() <- msg:
540			case <-ctx.Done():
541				messagePool.Put(msg)
542				return
543			default:
544				// Channel full, drop message
545				messagePool.Put(msg)
546			}
547		}
548	}()
549
550	// Keep running until interrupted
551	select {
552	case <-ctx.Done():
553		log.Println("Shutting down...")
554		time.Sleep(1 * time.Second) // Allow graceful shutdown
555	}
556}

Testing and Profiling

1. Baseline Performance Testing

 1# Start the application
 2go run main.go
 3
 4# In another terminal, run profiling commands
 5
 6# CPU profiling
 7go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
 8
 9# Memory profiling
10go tool pprof http://localhost:6060/debug/pprof/heap
11
12# Goroutine profiling
13go tool pprof http://localhost:6060/debug/pprof/goroutine
14
15# Block profiling
16go tool pprof http://localhost:6060/debug/pprof/block

2. Performance Benchmarks

1# Run benchmarks
2go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
3
4# Analyze with pprof
5go tool pprof cpu.prof
6go tool pprof mem.prof

3. Load Testing

1# Install hey load testing tool
2go install github.com/rakyll/hey@latest
3
4# Generate load
5hey -n 10000 -c 100 http://localhost:8080/api/process

Expected Performance Improvements

After optimization, you should see:

  1. Memory Usage: 70-80% reduction in allocations
  2. CPU Usage: 40-50% reduction in CPU time
  3. Throughput: 2-3x increase in messages per second
  4. Latency: 60-70% reduction in average processing time
  5. GC Pressure: Significantly reduced garbage collection frequency

Extension Challenges

  1. Add metrics collection - Integrate Prometheus for detailed monitoring
  2. Implement adaptive scaling - Auto-adjust worker pool size based on load
  3. Add distributed tracing - Use OpenTelemetry for request tracing
  4. Implement zero-copy optimizations - Reduce memory allocations further
  5. Add SIMD optimizations - Use vectorized operations for math transformations

Key Takeaways

  • Profiling is essential - Always measure before optimizing
  • Object pooling dramatically reduces GC pressure
  • Batch processing improves throughput for I/O operations
  • Buffer reuse minimizes memory allocations
  • Atomic operations are faster than mutexes for simple counters
  • Proper channel sizing prevents backpressure issues
  • Context cancellation enables graceful shutdown

This exercise teaches systematic performance optimization using Go's profiling tools and best practices for building high-throughput systems.