Performance Optimization Lab
Exercise Overview
Optimize a high-throughput data processing system that's experiencing performance issues. You'll use Go's profiling tools to identify bottlenecks, analyze memory allocation patterns, and implement performance improvements.
Learning Objectives
- Use Go's built-in profiling tools effectively
- Identify and fix CPU bottlenecks and memory leaks
- Optimize memory allocation and reduce garbage collection pressure
- Implement efficient concurrency patterns
- Apply performance optimization best practices
- Measure and validate performance improvements
The Problem - Slow Data Processing Pipeline
You have a data processing pipeline that processes JSON messages from a message queue, applies transformations, and stores results in a database. The system is experiencing high latency and memory usage under load.
Initial Code
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "math/rand"
9 "runtime"
10 "sync"
11 "time"
12)
13
14// TODO: Profile and optimize this structure
15type Message struct {
16 ID string `json:"id"`
17 Timestamp int64 `json:"timestamp"`
18 Data map[string]interface{} `json:"data"`
19 Processed bool `json:"processed"`
20}
21
22// TODO: Optimize this processor
23type MessageProcessor struct {
24 inputChan chan Message
25 outputChan chan ProcessedMessage
26 dbPool *DatabasePool
27 transformers []Transformer
28 mu sync.Mutex
29 processed int
30 errors int
31}
32
33type ProcessedMessage struct {
34 OriginalID string
35 Result map[string]interface{}
36 Duration time.Duration
37 Error error
38}
39
40type DatabasePool struct {
41 connections []DatabaseConnection
42 mu sync.Mutex
43}
44
45type DatabaseConnection struct {
46 ID int
47 Active bool
48}
49
50type Transformer interface {
51 Transform(data map[string]interface{})
52}
53
54// TODO: Optimize these transformers
55type JSONTransformer struct {
56 name string
57 cache map[string]interface{}
58 cacheMu sync.RWMutex
59}
60
61type MathTransformer struct {
62 operations []func(float64) float64
63}
64
65// TODO: Fix this message generator
66type MessageGenerator struct {
67 ratePerSecond int
68 messageTypes []string
69 running bool
70}
71
72func Start(ctx context.Context) <-chan Message {
73 ch := make(chan Message)
74
75 go func() {
76 defer close(ch)
77 ticker := time.NewTicker(time.Second / time.Duration(mg.ratePerSecond))
78 defer ticker.Stop()
79
80 for {
81 select {
82 case <-ctx.Done():
83 return
84 case <-ticker.C:
85 // TODO: Optimize message generation
86 message := Message{
87 ID: fmt.Sprintf("msg_%d", rand.Intn(1000000)),
88 Timestamp: time.Now().Unix(),
89 Data: map[string]interface{}{
90 "type": mg.messageTypes[rand.Intn(len(mg.messageTypes))],
91 "value": rand.Float64() * 1000,
92 "metadata": map[string]interface{}{
93 "source": "generator",
94 "tags": []string{"tag1", "tag2", "tag3"},
95 },
96 },
97 }
98 ch <- message
99 }
100 }
101 }()
102
103 return ch
104}
105
106// TODO: Optimize this processor
107func Start(ctx context.Context) {
108 // Create worker pool
109 for i := 0; i < 10; i++ {
110 go mp.worker(ctx)
111 }
112
113 // Process results
114 go mp.resultHandler(ctx)
115}
116
117func worker(ctx context.Context) {
118 for {
119 select {
120 case <-ctx.Done():
121 return
122 case msg := <-mp.inputChan:
123 start := time.Now()
124
125 // Process message
126 result, err := mp.processMessage(msg)
127
128 duration := time.Since(start)
129
130 processed := ProcessedMessage{
131 OriginalID: msg.ID,
132 Result: result,
133 Duration: duration,
134 Error: err,
135 }
136
137 select {
138 case mp.outputChan <- processed:
139 case <-ctx.Done():
140 return
141 }
142
143 // Update counters
144 mp.mu.Lock()
145 if err != nil {
146 mp.errors++
147 } else {
148 mp.processed++
149 }
150 mp.mu.Unlock()
151 }
152 }
153}
154
155func processMessage(msg Message) {
156 var result map[string]interface{} = make(map[string]interface{})
157
158 // Apply transformers
159 for _, transformer := range mp.transformers {
160 transformed, err := transformer.Transform(result)
161 if err != nil {
162 return nil, err
163 }
164 result = transformed
165 }
166
167 // Simulate database operation
168 time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
169
170 return result, nil
171}
172
173func resultHandler(ctx context.Context) {
174 for {
175 select {
176 case <-ctx.Done():
177 return
178 case result := <-mp.outputChan:
179 // TODO: Optimize result handling
180 if result.Error != nil {
181 log.Printf("Error processing message %s: %v", result.OriginalID, result.Error)
182 continue
183 }
184
185 // Store in database
186 err := mp.dbPool.Store(result)
187 if err != nil {
188 log.Printf("Failed to store result %s: %v", result.OriginalID, err)
189 }
190 }
191 }
192}
193
194func Store(result ProcessedMessage) error {
195 // TODO: Optimize database operations
196 db.mu.Lock()
197 defer db.mu.Unlock()
198
199 // Find available connection
200 for _, conn := range db.connections {
201 if conn.Active {
202 // Simulate database write
203 time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
204 return nil
205 }
206 }
207
208 return fmt.Errorf("no available database connections")
209}
210
211// TODO: Optimize transformer implementations
212func Transform(data map[string]interface{}) {
213 jt.cacheMu.Lock()
214 defer jt.cacheMu.Unlock()
215
216 // TODO: Fix this cache implementation
217 cacheKey := fmt.Sprintf("%v", data)
218 if cached, exists := jt.cache[cacheKey]; exists {
219 return cached.(map[string]interface{}), nil
220 }
221
222 // Transform data
223 result := make(map[string]interface{})
224 for k, v := range data {
225 if jsonStr, err := json.Marshal(v); err == nil {
226 result[k+"_json"] = string(jsonStr)
227 }
228 }
229
230 // Cache result
231 jt.cache[cacheKey] = result
232
233 return result, nil
234}
235
236func Transform(data map[string]interface{}) {
237 // TODO: Optimize math operations
238 result := make(map[string]interface{})
239
240 for k, v := range data {
241 if value, ok := v.(float64); ok {
242 for _, op := range mt.operations {
243 value = op(value)
244 }
245 result[k+"_math"] = value
246 }
247 }
248
249 return result, nil
250}
251
252func main() {
253 // TODO: Add profiling endpoints
254 // TODO: Optimize system configuration
255
256 // Initialize components
257 generator := &MessageGenerator{
258 ratePerSecond: 1000,
259 messageTypes: []string{"user_action", "system_event", "metrics", "logs"},
260 }
261
262 dbPool := &DatabasePool{
263 connections: make([]DatabaseConnection, 20),
264 }
265 for i := range dbPool.connections {
266 dbPool.connections[i] = DatabaseConnection{ID: i, Active: true}
267 }
268
269 transformers := []Transformer{
270 &JSONTransformer{
271 name: "json_transformer",
272 cache: make(map[string]interface{}),
273 },
274 &MathTransformer{
275 operations: []func(float64) float64{
276 func(x float64) float64 { return x * 2 },
277 func(x float64) float64 { return x + 1 },
278 func(x float64) float64 { return math.Sqrt(x) },
279 },
280 },
281 }
282
283 processor := &MessageProcessor{
284 inputChan: make(chan Message, 1000),
285 outputChan: make(chan ProcessedMessage, 1000),
286 dbPool: dbPool,
287 transformers: transformers,
288 }
289
290 // Start processing
291 ctx, cancel := context.WithCancel(context.Background())
292 defer cancel()
293
294 processor.Start(ctx)
295 messageChan := generator.Start(ctx)
296
297 // Feed messages to processor
298 go func() {
299 for msg := range messageChan {
300 processor.inputChan <- msg
301 }
302 }()
303
304 // Monitor performance
305 ticker := time.NewTicker(5 * time.Second)
306 defer ticker.Stop()
307
308 for {
309 select {
310 case <-ticker.C:
311 // TODO: Optimize performance monitoring
312 var m runtime.MemStats
313 runtime.ReadMemStats(&m)
314
315 log.Printf("Processed: %d, Errors: %d, Memory: %d MB, Goroutines: %d",
316 processor.processed, processor.errors,
317 m.Alloc/1024/1024, runtime.NumGoroutine())
318 }
319 }
320}
Tasks
Task 1: Add Profiling Support
Add profiling endpoints to monitor CPU, memory, and goroutine behavior:
1import (
2 _ "net/http/pprof"
3 "net/http"
4)
5
6func startProfilingServer() {
7 go func() {
8 log.Println("Profiling server starting on :6060")
9 log.Fatal(http.ListenAndServe("localhost:6060", nil))
10 }()
11}
12
13// Add to main()
14startProfilingServer()
Task 2: Identify Memory Allocation Issues
Fix excessive allocations in the message processing pipeline:
1// Use object pooling for messages
2var messagePool = sync.Pool{
3 New: func() interface{} {
4 return &Message{}
5 },
6}
7
8func generateMessage() *Message {
9 msg := messagePool.Get().(*Message)
10 *msg = Message{
11 ID: fmt.Sprintf("msg_%d", rand.Intn(1000000)),
12 Timestamp: time.Now().Unix(),
13 Data: mg.generateData(),
14 }
15 return msg
16}
17
18func releaseMessage(msg *Message) {
19 messagePool.Put(msg)
20}
Task 3: Optimize JSON Transformer
Fix the inefficient caching mechanism in JSONTransformer:
1type JSONTransformer struct {
2 name string
3 cache sync.Map // Use sync.Map for better concurrency
4 bufPool sync.Pool // Buffer pool for JSON encoding
5}
6
7func Transform(data map[string]interface{}) {
8 // Use pre-allocated buffer
9 buf := jt.bufPool.Get().(*bytes.Buffer)
10 defer func() {
11 buf.Reset()
12 jt.bufPool.Put(buf)
13 }()
14
15 result := make(map[string]interface{}, len(data)) // Pre-allocate
16
17 for k, v := range data {
18 buf.Reset()
19 if err := json.NewEncoder(buf).Encode(v); err == nil {
20 result[k+"_json"] = strings.TrimSpace(buf.String())
21 }
22 }
23
24 return result, nil
25}
Task 4: Implement Efficient Worker Pool
Optimize the worker pool pattern for better performance:
1type OptimizedProcessor struct {
2 workers int
3 inputChan chan *Message
4 outputChan chan *ProcessedMessage
5 dbBatch *BatchProcessor
6 transformer *JSONTransformer
7 stats *atomic.Stats
8}
9
10func Start(ctx context.Context) {
11 var wg sync.WaitGroup
12
13 // Start workers
14 for i := 0; i < op.workers; i++ {
15 wg.Add(1)
16 go func(workerID int) {
17 defer wg.Done()
18 op.worker(ctx, workerID)
19 }(i)
20 }
21
22 // Start batch processor
23 go op.dbBatch.Start(ctx)
24
25 wg.Wait()
26}
27
28func worker(ctx context.Context, workerID int) {
29 for {
30 select {
31 case <-ctx.Done():
32 return
33 case msg := <-op.inputChan:
34 op.processMessage(msg)
35 messagePool.Put(msg) // Return to pool
36 }
37 }
38}
Task 5: Add Batch Processing
Implement efficient batch database operations:
1type BatchProcessor struct {
2 batchSize int
3 timeout time.Duration
4 buffer []*ProcessedMessage
5 bufferMu sync.Mutex
6 dbPool *DatabasePool
7 flushChan chan struct{}
8}
9
10func Add(result *ProcessedMessage) {
11 bp.bufferMu.Lock()
12 defer bp.bufferMu.Unlock()
13
14 bp.buffer = append(bp.buffer, result)
15
16 if len(bp.buffer) >= bp.batchSize {
17 select {
18 case bp.flushChan <- struct{}{}:
19 default:
20 }
21 }
22}
23
24func flush() {
25 bp.bufferMu.Lock()
26 if len(bp.buffer) == 0 {
27 bp.bufferMu.Unlock()
28 return
29 }
30
31 batch := make([]*ProcessedMessage, len(bp.buffer))
32 copy(batch, bp.buffer)
33 bp.buffer = bp.buffer[:0] // Reuse slice
34 bp.bufferMu.Unlock()
35
36 // Process batch
37 bp.dbPool.StoreBatch(batch)
38}
Solution Approach
Click to see detailed solution
Complete Optimized Implementation:
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "math"
10 "math/rand"
11 "net/http"
12 _ "net/http/pprof"
13 "runtime"
14 "strings"
15 "sync"
16 "sync/atomic"
17 "time"
18)
19
20// Optimized message structure with better memory layout
21type Message struct {
22 ID string `json:"id"`
23 Timestamp int64 `json:"timestamp"`
24 Data map[string]interface{} `json:"data"`
25 Processed bool `json:"processed"`
26}
27
28type ProcessedMessage struct {
29 OriginalID string
30 Result map[string]interface{}
31 Duration time.Duration
32 Error error
33}
34
35// Object pools to reduce allocations
36var (
37 messagePool = sync.Pool{
38 New: func() interface{} {
39 return &Message{
40 Data: make(map[string]interface{}),
41 }
42 },
43 }
44
45 processedMessagePool = sync.Pool{
46 New: func() interface{} {
47 return &ProcessedMessage{
48 Result: make(map[string]interface{}),
49 }
50 },
51 }
52
53 bufferPool = sync.Pool{
54 New: func() interface{} {
55 return bytes.NewBuffer(make([]byte, 0, 1024))
56 },
57 }
58)
59
60// Optimized message processor with batch processing
61type OptimizedProcessor struct {
62 workers int
63 inputChan chan *Message
64 outputChan chan *ProcessedMessage
65 dbBatch *BatchProcessor
66 transformers []Transformer
67 stats *ProcessorStats
68 workerWg sync.WaitGroup
69}
70
71type ProcessorStats struct {
72 Processed int64
73 Errors int64
74 Duration int64 // Total processing time in nanoseconds
75}
76
77func AddProcessed(duration time.Duration) {
78 atomic.AddInt64(&ps.Processed, 1)
79 atomic.AddInt64(&ps.Duration, duration.Nanoseconds())
80}
81
82func AddError() {
83 atomic.AddInt64(&ps.Errors, 1)
84}
85
86func GetStats() {
87 processed := atomic.LoadInt64(&ps.Processed)
88 errors := atomic.LoadInt64(&ps.Errors)
89 duration := time.Duration(atomic.LoadInt64(&ps.Duration))
90 return processed, errors, duration
91}
92
93// Efficient batch processor for database operations
94type BatchProcessor struct {
95 batchSize int
96 timeout time.Duration
97 buffer []*ProcessedMessage
98 bufferMu sync.Mutex
99 dbPool *DatabasePool
100 flushChan chan struct{}
101 ctx context.Context
102 cancel context.CancelFunc
103}
104
105func NewBatchProcessor(batchSize int, timeout time.Duration, dbPool *DatabasePool) *BatchProcessor {
106 ctx, cancel := context.WithCancel(context.Background())
107 return &BatchProcessor{
108 batchSize: batchSize,
109 timeout: timeout,
110 buffer: make([]*ProcessedMessage, 0, batchSize),
111 dbPool: dbPool,
112 flushChan: make(chan struct{}, 1),
113 ctx: ctx,
114 cancel: cancel,
115 }
116}
117
118func Add(result *ProcessedMessage) {
119 bp.bufferMu.Lock()
120 bp.buffer = append(bp.buffer, result)
121 shouldFlush := len(bp.buffer) >= bp.batchSize
122 bp.bufferMu.Unlock()
123
124 if shouldFlush {
125 select {
126 case bp.flushChan <- struct{}{}:
127 default:
128 }
129 }
130}
131
132func flush() {
133 bp.bufferMu.Lock()
134 if len(bp.buffer) == 0 {
135 bp.bufferMu.Unlock()
136 return
137 }
138
139 // Copy buffer to avoid holding lock during database operation
140 batch := make([]*ProcessedMessage, len(bp.buffer))
141 copy(batch, bp.buffer)
142 bp.buffer = bp.buffer[:0] // Reset slice length but keep capacity
143 bp.bufferMu.Unlock()
144
145 // Process batch
146 start := time.Now()
147 err := bp.dbPool.StoreBatch(batch)
148 duration := time.Since(start)
149
150 if err != nil {
151 log.Printf("Batch store error: %v", err, len(batch))
152 } else {
153 log.Printf("Stored batch of %d messages in %v", len(batch), duration)
154 }
155
156 // Return messages to pool
157 for _, msg := range batch {
158 processedMessagePool.Put(msg)
159 }
160}
161
162func Start(ctx context.Context) {
163 ticker := time.NewTicker(bp.timeout)
164 defer ticker.Stop()
165
166 for {
167 select {
168 case <-ctx.Done():
169 bp.flush() // Flush remaining messages
170 return
171 case <-bp.flushChan:
172 bp.flush()
173 case <-ticker.C:
174 bp.flush() // Periodic flush based on timeout
175 }
176 }
177}
178
179// Optimized database connection pool
180type DatabasePool struct {
181 connections []DatabaseConnection
182 mu sync.RWMutex
183 nextConn int64 // Atomic counter for round-robin
184}
185
186type DatabaseConnection struct {
187 ID int
188 Active bool
189 lastUsed time.Time
190}
191
192func StoreBatch(messages []*ProcessedMessage) error {
193 db.mu.RLock()
194 defer db.mu.RUnlock()
195
196 // Use multiple connections for parallel processing
197 var wg sync.WaitGroup
198 errChan := make(chan error, len(messages))
199
200 for i, msg := range messages {
201 connIndex :=) - 1) % len(db.connections)
202 conn := &db.connections[connIndex]
203
204 if !conn.Active {
205 errChan <- fmt.Errorf("connection %d not active", connIndex)
206 continue
207 }
208
209 wg.Add(1)
210 go func(msg *ProcessedMessage, conn *DatabaseConnection) {
211 defer wg.Done()
212 // Simulate optimized database write
213 time.Sleep(time.Duration(rand.Intn(2)+1) * time.Millisecond)
214 conn.lastUsed = time.Now()
215 }(msg, conn)
216 }
217
218 wg.Wait()
219 close(errChan)
220
221 // Check for errors
222 for err := range errChan {
223 if err != nil {
224 return err
225 }
226 }
227
228 return nil
229}
230
231// Optimized transformer implementations
232type OptimizedJSONTransformer struct {
233 name string
234 cache sync.Map
235}
236
237func Transform(data map[string]interface{}) {
238 result := make(map[string]interface{}, len(data))
239
240 for k, v := range data {
241 buf := bufferPool.Get().(*bytes.Buffer)
242 buf.Reset()
243
244 if err := json.NewEncoder(buf).Encode(v); err == nil {
245 // Trim newline added by Encode
246 jsonStr := strings.TrimSpace(buf.String())
247 result[k+"_json"] = jsonStr
248 }
249
250 bufferPool.Put(buf)
251 }
252
253 return result, nil
254}
255
256type OptimizedMathTransformer struct {
257 operations []func(float64) float64
258}
259
260func Transform(data map[string]interface{}) {
261 result := make(map[string]interface{}, len(data))
262
263 for k, v := range data {
264 if value, ok := v.(float64); ok {
265 // Apply operations in-place to reduce allocations
266 for _, op := range mt.operations {
267 value = op(value)
268 }
269 result[k+"_math"] = value
270 }
271 }
272
273 return result, nil
274}
275
276// Optimized message generator
277type OptimizedMessageGenerator struct {
278 ratePerSecond int
279 messageTypes []string
280 interval time.Duration
281}
282
283func Start(ctx context.Context) <-chan *Message {
284 ch := make(chan *Message, 1000) // Buffered channel
285
286 go func() {
287 defer close(ch)
288 ticker := time.NewTicker(mg.interval)
289 defer ticker.Stop()
290
291 for {
292 select {
293 case <-ctx.Done():
294 return
295 case <-ticker.C:
296 msg := mg.generateMessage()
297 select {
298 case ch <- msg:
299 case <-ctx.Done():
300 messagePool.Put(msg)
301 return
302 default:
303 // Drop message if channel is full to prevent backpressure
304 messagePool.Put(msg)
305 }
306 }
307 }
308 }()
309
310 return ch
311}
312
313func generateMessage() *Message {
314 msg := messagePool.Get().(*Message)
315
316 // Reset message data
317 msg.ID = fmt.Sprintf("msg_%d", rand.Int63())
318 msg.Timestamp = time.Now().Unix()
319 msg.Processed = false
320
321 // Clear and regenerate data
322 for k := range msg.Data {
323 delete(msg.Data, k)
324 }
325
326 msg.Data["type"] = mg.messageTypes[rand.Intn(len(mg.messageTypes))]
327 msg.Data["value"] = rand.Float64() * 1000
328 msg.Data["metadata"] = map[string]interface{}{
329 "source": "generator",
330 "tags": []string{"tag1", "tag2", "tag3"},
331 }
332
333 return msg
334}
335
336func NewOptimizedProcessor(workers int, dbPool *DatabasePool) *OptimizedProcessor {
337 return &OptimizedProcessor{
338 workers: workers,
339 inputChan: make(chan *Message, workers*10),
340 outputChan: make(chan *ProcessedMessage, workers*10),
341 dbBatch: NewBatchProcessor(100, 100*time.Millisecond, dbPool),
342 transformers: []Transformer{
343 &OptimizedJSONTransformer{name: "json_transformer"},
344 &OptimizedMathTransformer{
345 operations: []func(float64) float64{
346 func(x float64) float64 { return x * 2 },
347 func(x float64) float64 { return x + 1 },
348 func(x float64) float64 { return math.Sqrt(x) },
349 },
350 },
351 },
352 stats: &ProcessorStats{},
353 }
354}
355
356func Start(ctx context.Context) {
357 // Start batch processor
358 go op.dbBatch.Start(ctx)
359
360 // Start workers
361 for i := 0; i < op.workers; i++ {
362 op.workerWg.Add(1)
363 go func(workerID int) {
364 defer op.workerWg.Done()
365 op.worker(ctx, workerID)
366 }(i)
367 }
368
369 // Start result handler
370 go op.resultHandler(ctx)
371}
372
373func worker(ctx context.Context, workerID int) {
374 for {
375 select {
376 case <-ctx.Done():
377 return
378 case msg := <-op.inputChan:
379 op.processMessage(msg)
380 }
381 }
382}
383
384func processMessage(msg *Message) {
385 start := time.Now()
386
387 result := processedMessagePool.Get().(*ProcessedMessage)
388 result.OriginalID = msg.ID
389 result.Duration = 0
390 result.Error = nil
391
392 // Clear previous result data
393 for k := range result.Result {
394 delete(result.Result, k)
395 }
396
397 // Apply transformers
398 var err error
399 transformData := msg.Data
400 for _, transformer := range op.transformers {
401 transformData, err = transformer.Transform(transformData)
402 if err != nil {
403 result.Error = err
404 break
405 }
406 }
407
408 if result.Error == nil {
409 // Copy transformed data to result
410 for k, v := range transformData {
411 result.Result[k] = v
412 }
413 }
414
415 result.Duration = time.Since(start)
416
417 // Add to batch processor
418 op.dbBatch.Add(result)
419
420 // Update statistics
421 if result.Error != nil {
422 op.stats.AddError()
423 } else {
424 op.stats.AddProcessed(result.Duration)
425 }
426
427 // Return message to pool
428 messagePool.Put(msg)
429}
430
431func resultHandler(ctx context.Context) {
432 // This is now handled by the batch processor
433}
434
435func InputChan() chan<- *Message {
436 return op.inputChan
437}
438
439func GetStats() {
440 return op.stats.GetStats()
441}
442
443// Profiling support
444func startProfilingServer() {
445 go func() {
446 log.Println("Profiling server starting on :6060")
447 if err := http.ListenAndServe("localhost:6060", nil); err != nil {
448 log.Printf("Profiling server error: %v", err)
449 }
450 }()
451}
452
453// Performance monitoring
454func startPerformanceMonitor(processor *OptimizedProcessor, ctx context.Context) {
455 ticker := time.NewTicker(5 * time.Second)
456 defer ticker.Stop()
457
458 var lastProcessed, lastErrors int64
459 var lastTime time.Time = time.Now()
460
461 for {
462 select {
463 case <-ctx.Done():
464 return
465 case <-ticker.C:
466 processed, errors, duration := processor.GetStats()
467 now := time.Now()
468
469 var m runtime.MemStats
470 runtime.ReadMemStats(&m)
471
472 // Calculate rates
473 timeDiff := now.Sub(lastTime).Seconds()
474 processedRate := float64(processed-lastProcessed) / timeDiff
475 errorsRate := float64(errors-lastErrors) / timeDiff
476
477 avgDuration := time.Duration(0)
478 if processed > 0 {
479 avgDuration = duration / time.Duration(processed)
480 }
481
482 log.Printf("Performance Stats:\n"+
483 " Messages: %d processed, %d errors\n"+
484 " Avg Duration: %v\n"+
485 " Memory: %d MB allocated, %d MB heap, %d MB stack\n"+
486 " Goroutines: %d, GC cycles: %d",
487 processed, processedRate, errors, errorsRate,
488 avgDuration,
489 m.Alloc/1024/1024, m.HeapAlloc/1024/1024, m.StackInuse/1024/1024,
490 runtime.NumGoroutine(), m.NumGC)
491
492 lastProcessed, lastErrors, lastTime = processed, errors, now
493 }
494 }
495}
496
497func main() {
498 // Start profiling server
499 startProfilingServer()
500
501 // Configure runtime for better performance
502 runtime.GOMAXPROCS(runtime.NumCPU())
503
504 // Initialize optimized components
505 generator := &OptimizedMessageGenerator{
506 ratePerSecond: 5000,
507 messageTypes: []string{"user_action", "system_event", "metrics", "logs"},
508 interval: time.Second / time.Duration(5000),
509 }
510
511 dbPool := &DatabasePool{
512 connections: make([]DatabaseConnection, 20),
513 }
514 for i := range dbPool.connections {
515 dbPool.connections[i] = DatabaseConnection{
516 ID: i,
517 Active: true,
518 lastUsed: time.Now(),
519 }
520 }
521
522 processor := NewOptimizedProcessor(runtime.NumCPU()*2, dbPool)
523
524 // Start processing
525 ctx, cancel := context.WithCancel(context.Background())
526 defer cancel()
527
528 processor.Start(ctx)
529
530 // Start performance monitor
531 go startPerformanceMonitor(processor, ctx)
532
533 // Start message generation and feed to processor
534 messageChan := generator.Start(ctx)
535
536 go func() {
537 for msg := range messageChan {
538 select {
539 case processor.InputChan() <- msg:
540 case <-ctx.Done():
541 messagePool.Put(msg)
542 return
543 default:
544 // Channel full, drop message
545 messagePool.Put(msg)
546 }
547 }
548 }()
549
550 // Keep running until interrupted
551 select {
552 case <-ctx.Done():
553 log.Println("Shutting down...")
554 time.Sleep(1 * time.Second) // Allow graceful shutdown
555 }
556}
Testing and Profiling
1. Baseline Performance Testing
1# Start the application
2go run main.go
3
4# In another terminal, run profiling commands
5
6# CPU profiling
7go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
8
9# Memory profiling
10go tool pprof http://localhost:6060/debug/pprof/heap
11
12# Goroutine profiling
13go tool pprof http://localhost:6060/debug/pprof/goroutine
14
15# Block profiling
16go tool pprof http://localhost:6060/debug/pprof/block
2. Performance Benchmarks
1# Run benchmarks
2go test -bench=. -benchmem -cpuprofile=cpu.prof -memprofile=mem.prof
3
4# Analyze with pprof
5go tool pprof cpu.prof
6go tool pprof mem.prof
3. Load Testing
1# Install hey load testing tool
2go install github.com/rakyll/hey@latest
3
4# Generate load
5hey -n 10000 -c 100 http://localhost:8080/api/process
Expected Performance Improvements
After optimization, you should see:
- Memory Usage: 70-80% reduction in allocations
- CPU Usage: 40-50% reduction in CPU time
- Throughput: 2-3x increase in messages per second
- Latency: 60-70% reduction in average processing time
- GC Pressure: Significantly reduced garbage collection frequency
Extension Challenges
- Add metrics collection - Integrate Prometheus for detailed monitoring
- Implement adaptive scaling - Auto-adjust worker pool size based on load
- Add distributed tracing - Use OpenTelemetry for request tracing
- Implement zero-copy optimizations - Reduce memory allocations further
- Add SIMD optimizations - Use vectorized operations for math transformations
Key Takeaways
- Profiling is essential - Always measure before optimizing
- Object pooling dramatically reduces GC pressure
- Batch processing improves throughput for I/O operations
- Buffer reuse minimizes memory allocations
- Atomic operations are faster than mutexes for simple counters
- Proper channel sizing prevents backpressure issues
- Context cancellation enables graceful shutdown
This exercise teaches systematic performance optimization using Go's profiling tools and best practices for building high-throughput systems.