Exercise: TCP Connection Manager
Difficulty - Intermediate
Learning Objectives
- Implement connection pooling for TCP clients
- Master connection lifecycle management
- Handle connection health checking and recycling
- Implement connection limits and timeouts
- Build thread-safe connection acquisition/release
- Handle graceful shutdown of connection pools
Problem Statement
Create a production-ready TCP connection pool that efficiently manages connections to remote servers, implements health checking, and handles connection failures gracefully. The pool should support:
- Connection pooling: Reuse connections to reduce overhead
- Connection limits: Maximum connections per pool
- Health checking: Validate connections before use
- Idle timeout: Close idle connections automatically
- Connection lifetime: Maximum connection age
- Thread-safe operations: Concurrent acquisition/release
- Graceful shutdown: Clean pool termination
Data Structures
1package connpool
2
3import (
4 "context"
5 "net"
6 "sync"
7 "time"
8)
9
10// Connection wraps a network connection with metadata
11type Connection struct {
12 Conn net.Conn
13 createdAt time.Time
14 lastUsed time.Time
15 inUse bool
16}
17
18// Pool manages a pool of network connections
19type Pool struct {
20 address string
21 minConns int
22 maxConns int
23 maxIdleTime time.Duration
24 maxLifetime time.Duration
25 dialTimeout time.Duration
26 healthCheckFunc HealthCheckFunc
27 connections []*Connection
28 mu sync.RWMutex
29 closed bool
30 closeChan chan struct{}
31 wg sync.WaitGroup
32}
33
34// Config holds pool configuration
35type Config struct {
36 Address string
37 MinConns int
38 MaxConns int
39 MaxIdleTime time.Duration
40 MaxLifetime time.Duration
41 DialTimeout time.Duration
42 HealthCheckFunc HealthCheckFunc
43}
44
45// HealthCheckFunc validates a connection
46type HealthCheckFunc func(conn net.Conn) error
47
48// Stats holds pool statistics
49type Stats struct {
50 TotalConns int
51 IdleConns int
52 ActiveConns int
53 WaitingCount int
54 HitCount int64
55 MissCount int64
56 TimeoutCount int64
57}
Function Signatures
1// NewPool creates a new connection pool
2func NewPool(config Config)
3
4// Get acquires a connection from the pool
5func Get(ctx context.Context)
6
7// Put returns a connection to the pool
8func Put(conn *Connection) error
9
10// Close closes the pool and all connections
11func Close() error
12
13// Stats returns current pool statistics
14func Stats() Stats
15
16// healthCheck validates a connection
17func healthCheck(conn *Connection) bool
18
19// createConnection creates a new connection
20func createConnection()
21
22// reaper removes idle and expired connections
23func reaper()
24
25// maintainMinConnections ensures minimum connections
26func maintainMinConnections()
Example Usage
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net"
8 "time"
9 "connpool"
10)
11
12func main() {
13 // Configure connection pool
14 config := connpool.Config{
15 Address: "localhost:6379",
16 MinConns: 2,
17 MaxConns: 10,
18 MaxIdleTime: 5 * time.Minute,
19 MaxLifetime: 30 * time.Minute,
20 DialTimeout: 5 * time.Second,
21 HealthCheckFunc: func(conn net.Conn) error {
22 // Simple ping-pong health check
23 conn.SetDeadline(time.Now().Add(2 * time.Second))
24 defer conn.SetDeadline(time.Time{})
25
26 _, err := conn.Write([]byte("PING\r\n"))
27 if err != nil {
28 return err
29 }
30
31 buf := make([]byte, 5)
32 _, err = conn.Read(buf)
33 return err
34 },
35 }
36
37 // Create pool
38 pool, err := connpool.NewPool(config)
39 if err != nil {
40 log.Fatal(err)
41 }
42 defer pool.Close()
43
44 // Use connection
45 ctx := context.Background()
46 conn, err := pool.Get(ctx)
47 if err != nil {
48 log.Fatal(err)
49 }
50
51 // Use the connection
52 _, err = conn.Conn.Write([]byte("GET key\r\n"))
53 if err != nil {
54 log.Printf("Write error: %v", err)
55 conn.Conn.Close() // Don't return bad connection
56 return
57 }
58
59 // Read response
60 buf := make([]byte, 1024)
61 n, err := conn.Conn.Read(buf)
62 if err != nil {
63 log.Printf("Read error: %v", err)
64 conn.Conn.Close()
65 return
66 }
67
68 fmt.Printf("Response: %s\n", buf[:n])
69
70 // Return connection to pool
71 if err := pool.Put(conn); err != nil {
72 log.Printf("Failed to return connection: %v", err)
73 }
74
75 // View pool statistics
76 stats := pool.Stats()
77 fmt.Printf("Pool stats: %+v\n", stats)
78}
Requirements
- Pool must maintain minimum connections at all times
- Never exceed maximum connection limit
- Implement connection health checking before use
- Remove idle connections exceeding idle timeout
- Remove connections exceeding maximum lifetime
- Support context-based timeouts for acquisition
- Thread-safe connection management
Solution
Click to see the complete solution
1package connpool
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "sync"
9 "sync/atomic"
10 "time"
11)
12
13var (
14 ErrPoolClosed = errors.New("pool is closed")
15 ErrTimeout = errors.New("timeout acquiring connection")
16 ErrMaxConns = errors.New("maximum connections reached")
17 ErrInvalidConn = errors.New("invalid connection")
18)
19
20// Connection wraps a network connection with metadata
21type Connection struct {
22 Conn net.Conn
23 createdAt time.Time
24 lastUsed time.Time
25 inUse bool
26}
27
28// Pool manages a pool of network connections
29type Pool struct {
30 address string
31 minConns int
32 maxConns int
33 maxIdleTime time.Duration
34 maxLifetime time.Duration
35 dialTimeout time.Duration
36 healthCheckFunc HealthCheckFunc
37 connections []*Connection
38 mu sync.RWMutex
39 cond *sync.Cond
40 closed bool
41 closeChan chan struct{}
42 wg sync.WaitGroup
43
44 // Statistics
45 hitCount int64
46 missCount int64
47 timeoutCount int64
48}
49
50// Config holds pool configuration
51type Config struct {
52 Address string
53 MinConns int
54 MaxConns int
55 MaxIdleTime time.Duration
56 MaxLifetime time.Duration
57 DialTimeout time.Duration
58 HealthCheckFunc HealthCheckFunc
59}
60
61// HealthCheckFunc validates a connection
62type HealthCheckFunc func(conn net.Conn) error
63
64// Stats holds pool statistics
65type Stats struct {
66 TotalConns int
67 IdleConns int
68 ActiveConns int
69 HitCount int64
70 MissCount int64
71 TimeoutCount int64
72}
73
74// NewPool creates a new connection pool
75func NewPool(config Config) {
76 if config.MinConns < 0 {
77 config.MinConns = 0
78 }
79 if config.MaxConns <= 0 {
80 config.MaxConns = 10
81 }
82 if config.MinConns > config.MaxConns {
83 config.MinConns = config.MaxConns
84 }
85 if config.MaxIdleTime <= 0 {
86 config.MaxIdleTime = 5 * time.Minute
87 }
88 if config.MaxLifetime <= 0 {
89 config.MaxLifetime = 30 * time.Minute
90 }
91 if config.DialTimeout <= 0 {
92 config.DialTimeout = 5 * time.Second
93 }
94
95 pool := &Pool{
96 address: config.Address,
97 minConns: config.MinConns,
98 maxConns: config.MaxConns,
99 maxIdleTime: config.MaxIdleTime,
100 maxLifetime: config.MaxLifetime,
101 dialTimeout: config.DialTimeout,
102 healthCheckFunc: config.HealthCheckFunc,
103 connections: make([]*Connection, 0, config.MaxConns),
104 closeChan: make(chan struct{}),
105 }
106
107 pool.cond = sync.NewCond(&pool.mu)
108
109 // Initialize minimum connections
110 for i := 0; i < config.MinConns; i++ {
111 conn, err := pool.createConnection()
112 if err != nil {
113 // Close any connections created so far
114 pool.Close()
115 return nil, fmt.Errorf("failed to create initial connection: %w", err)
116 }
117 pool.connections = append(pool.connections, conn)
118 }
119
120 // Start background workers
121 pool.wg.Add(2)
122 go pool.reaper()
123 go pool.maintainMinConnections()
124
125 return pool, nil
126}
127
128// Get acquires a connection from the pool
129func Get(ctx context.Context) {
130 p.mu.Lock()
131 defer p.mu.Unlock()
132
133 // Check if pool is closed
134 if p.closed {
135 return nil, ErrPoolClosed
136 }
137
138 for {
139 // Try to find an idle connection
140 for i, conn := range p.connections {
141 if !conn.inUse {
142 // Check if connection is still healthy
143 if p.isConnectionValid(conn) {
144 conn.inUse = true
145 conn.lastUsed = time.Now()
146 atomic.AddInt64(&p.hitCount, 1)
147 return conn, nil
148 }
149
150 // Connection is invalid, remove it
151 p.removeConnection(i)
152 break
153 }
154 }
155
156 // No idle connection found, try to create a new one
157 if len(p.connections) < p.maxConns {
158 atomic.AddInt64(&p.missCount, 1)
159 p.mu.Unlock()
160
161 conn, err := p.createConnection()
162 if err != nil {
163 p.mu.Lock()
164 return nil, err
165 }
166
167 p.mu.Lock()
168 if p.closed {
169 conn.Conn.Close()
170 return nil, ErrPoolClosed
171 }
172
173 conn.inUse = true
174 conn.lastUsed = time.Now()
175 p.connections = append(p.connections, conn)
176 return conn, nil
177 }
178
179 // Pool is full, wait for a connection to be released
180 select {
181 case <-ctx.Done():
182 atomic.AddInt64(&p.timeoutCount, 1)
183 return nil, ctx.Err()
184 default:
185 }
186
187 // Wait with timeout
188 done := make(chan struct{})
189 go func() {
190 p.cond.Wait()
191 close(done)
192 }()
193
194 p.mu.Unlock()
195 select {
196 case <-ctx.Done():
197 p.mu.Lock()
198 p.cond.Signal() // Wake up another waiter
199 atomic.AddInt64(&p.timeoutCount, 1)
200 return nil, ctx.Err()
201 case <-done:
202 p.mu.Lock()
203 }
204
205 // Check if pool was closed while waiting
206 if p.closed {
207 return nil, ErrPoolClosed
208 }
209 }
210}
211
212// Put returns a connection to the pool
213func Put(conn *Connection) error {
214 if conn == nil {
215 return ErrInvalidConn
216 }
217
218 p.mu.Lock()
219 defer p.mu.Unlock()
220
221 if p.closed {
222 conn.Conn.Close()
223 return ErrPoolClosed
224 }
225
226 // Find the connection in the pool
227 found := false
228 for _, c := range p.connections {
229 if c == conn {
230 found = true
231 break
232 }
233 }
234
235 if !found {
236 conn.Conn.Close()
237 return ErrInvalidConn
238 }
239
240 // Mark as not in use
241 conn.inUse = false
242 conn.lastUsed = time.Now()
243
244 // Signal waiting goroutines
245 p.cond.Signal()
246
247 return nil
248}
249
250// Close closes the pool and all connections
251func Close() error {
252 p.mu.Lock()
253 if p.closed {
254 p.mu.Unlock()
255 return ErrPoolClosed
256 }
257
258 p.closed = true
259 close(p.closeChan)
260
261 // Close all connections
262 for _, conn := range p.connections {
263 if conn.Conn != nil {
264 conn.Conn.Close()
265 }
266 }
267 p.connections = nil
268
269 // Wake up all waiting goroutines
270 p.cond.Broadcast()
271 p.mu.Unlock()
272
273 // Wait for background workers to finish
274 p.wg.Wait()
275
276 return nil
277}
278
279// Stats returns current pool statistics
280func Stats() Stats {
281 p.mu.RLock()
282 defer p.mu.RUnlock()
283
284 stats := Stats{
285 TotalConns: len(p.connections),
286 HitCount: atomic.LoadInt64(&p.hitCount),
287 MissCount: atomic.LoadInt64(&p.missCount),
288 TimeoutCount: atomic.LoadInt64(&p.timeoutCount),
289 }
290
291 for _, conn := range p.connections {
292 if conn.inUse {
293 stats.ActiveConns++
294 } else {
295 stats.IdleConns++
296 }
297 }
298
299 return stats
300}
301
302// createConnection creates a new connection
303func createConnection() {
304 dialer := &net.Dialer{
305 Timeout: p.dialTimeout,
306 }
307
308 conn, err := dialer.Dial("tcp", p.address)
309 if err != nil {
310 return nil, fmt.Errorf("failed to dial: %w", err)
311 }
312
313 return &Connection{
314 Conn: conn,
315 createdAt: time.Now(),
316 lastUsed: time.Now(),
317 inUse: false,
318 }, nil
319}
320
321// isConnectionValid checks if a connection is still valid
322func isConnectionValid(conn *Connection) bool {
323 // Check lifetime
324 if time.Since(conn.createdAt) > p.maxLifetime {
325 return false
326 }
327
328 // Check idle time
329 if time.Since(conn.lastUsed) > p.maxIdleTime {
330 return false
331 }
332
333 // Run health check if provided
334 if p.healthCheckFunc != nil {
335 if err := p.healthCheckFunc(conn.Conn); err != nil {
336 return false
337 }
338 }
339
340 return true
341}
342
343// removeConnection removes a connection from the pool
344func removeConnection(index int) {
345 conn := p.connections[index]
346 if conn.Conn != nil {
347 conn.Conn.Close()
348 }
349
350 // Remove from slice
351 p.connections = append(p.connections[:index], p.connections[index+1:]...)
352}
353
354// reaper removes idle and expired connections
355func reaper() {
356 defer p.wg.Done()
357
358 ticker := time.NewTicker(30 * time.Second)
359 defer ticker.Stop()
360
361 for {
362 select {
363 case <-p.closeChan:
364 return
365 case <-ticker.C:
366 p.mu.Lock()
367
368 // Remove invalid idle connections
369 i := 0
370 for i < len(p.connections) {
371 conn := p.connections[i]
372
373 if !conn.inUse && !p.isConnectionValid(conn) {
374 // Only remove if we have more than minimum connections
375 if len(p.connections) > p.minConns {
376 p.removeConnection(i)
377 continue
378 }
379 }
380 i++
381 }
382
383 p.mu.Unlock()
384 }
385 }
386}
387
388// maintainMinConnections ensures minimum connections
389func maintainMinConnections() {
390 defer p.wg.Done()
391
392 ticker := time.NewTicker(10 * time.Second)
393 defer ticker.Stop()
394
395 for {
396 select {
397 case <-p.closeChan:
398 return
399 case <-ticker.C:
400 p.mu.Lock()
401
402 // Create connections if below minimum
403 for len(p.connections) < p.minConns {
404 p.mu.Unlock()
405
406 conn, err := p.createConnection()
407 if err != nil {
408 // Log error and try again later
409 p.mu.Lock()
410 break
411 }
412
413 p.mu.Lock()
414 if p.closed {
415 conn.Conn.Close()
416 break
417 }
418
419 p.connections = append(p.connections, conn)
420 }
421
422 p.mu.Unlock()
423 }
424 }
425}
Explanation
Pool Structure:
address: Target server addressminConns/maxConns: Connection limitsmaxIdleTime: Maximum idle duration before recyclingmaxLifetime: Maximum connection ageconnections: Slice of managed connectionscond: Condition variable for blocking/signalingcloseChan: Signal channel for shutdown
NewPool:
- Validates and sets default configuration
- Creates initial minimum connections
- Starts background workers
- Returns error if initial connections fail
Get:
- Locks pool and searches for idle connection
- Validates connection health before returning
- Creates new connection if pool not full
- Waits on condition variable if pool full
- Supports context cancellation for timeout
- Updates statistics
Put:
- Validates connection belongs to pool
- Marks connection as not in use
- Updates last used timestamp
- Signals waiting goroutines via condition variable
- Closes connection if pool is closed
Connection Validation:
- Checks connection lifetime
- Checks idle duration
- Runs health check function if provided
- Returns false if any check fails
Reaper:
- Runs periodically
- Removes invalid idle connections
- Respects minimum connection count
- Stops on pool closure
Maintainer:
- Runs periodically
- Creates connections if below minimum
- Handles creation errors gracefully
- Stops on pool closure
Statistics:
- Hit count: Reused existing connection
- Miss count: Created new connection
- Timeout count: Failed to acquire within timeout
- Active/idle connection counts
Connection Pool Patterns
Pattern 1: Per-Request Connection
1func handleRequest(pool *Pool) error {
2 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
3 defer cancel()
4
5 conn, err := pool.Get(ctx)
6 if err != nil {
7 return err
8 }
9 defer pool.Put(conn)
10
11 // Use connection
12 return nil
13}
Pattern 2: Connection Wrapping
1type PooledConn struct {
2 *Connection
3 pool *Pool
4}
5
6func Close() error {
7 return pc.pool.Put(pc.Connection)
8}
9
10// Usage
11conn, _ := pool.Get(ctx)
12pc := &PooledConn{Connection: conn, pool: pool}
13defer pc.Close()
Pattern 3: Lazy Initialization
1var (
2 poolOnce sync.Once
3 pool *Pool
4)
5
6func getPool() *Pool {
7 poolOnce.Do(func() {
8 pool, _ = NewPool(config)
9 })
10 return pool
11}
Pattern 4: Multi-Pool Manager
1type PoolManager struct {
2 pools map[string]*Pool
3 mu sync.RWMutex
4}
5
6func GetPool(address string) {
7 pm.mu.RLock()
8 pool, exists := pm.pools[address]
9 pm.mu.RUnlock()
10
11 if exists {
12 return pool, nil
13 }
14
15 // Create new pool
16 pm.mu.Lock()
17 defer pm.mu.Unlock()
18
19 pool, err := NewPool(Config{Address: address})
20 if err != nil {
21 return nil, err
22 }
23
24 pm.pools[address] = pool
25 return pool, nil
26}
Best Practices
1. Always use context with timeout:
1ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2defer cancel()
3
4conn, err := pool.Get(ctx)
5if err != nil {
6 return err
7}
8defer pool.Put(conn)
2. Handle connection errors gracefully:
1conn, err := pool.Get(ctx)
2if err != nil {
3 return err
4}
5
6_, err = conn.Conn.Write(data)
7if err != nil {
8 conn.Conn.Close() // Don't return bad connection
9 return err
10}
11
12pool.Put(conn) // Return healthy connection
3. Implement health checks:
1healthCheck := func(conn net.Conn) error {
2 conn.SetDeadline(time.Now().Add(2 * time.Second))
3 defer conn.SetDeadline(time.Time{})
4
5 // Send ping
6 _, err := conn.Write([]byte("PING\r\n"))
7 if err != nil {
8 return err
9 }
10
11 // Wait for pong
12 buf := make([]byte, 5)
13 _, err = conn.Read(buf)
14 return err
15}
4. Monitor pool metrics:
1func monitorPool(pool *Pool) {
2 ticker := time.NewTicker(1 * time.Minute)
3 defer ticker.Stop()
4
5 for range ticker.C {
6 stats := pool.Stats()
7 log.Printf("Pool stats: %+v", stats)
8
9 // Alert on issues
10 if stats.TimeoutCount > 100 {
11 log.Printf("High timeout count: %d", stats.TimeoutCount)
12 }
13 }
14}
Test Cases
1package connpool
2
3import (
4 "context"
5 "net"
6 "sync"
7 "testing"
8 "time"
9)
10
11func TestPool_BasicOperations(t *testing.T) {
12 // Start test server
13 listener, err := net.Listen("tcp", "127.0.0.1:0")
14 if err != nil {
15 t.Fatal(err)
16 }
17 defer listener.Close()
18
19 go func() {
20 for {
21 conn, err := listener.Accept()
22 if err != nil {
23 return
24 }
25 conn.Close()
26 }
27 }()
28
29 config := Config{
30 Address: listener.Addr().String(),
31 MinConns: 2,
32 MaxConns: 5,
33 MaxIdleTime: 1 * time.Minute,
34 MaxLifetime: 5 * time.Minute,
35 DialTimeout: 5 * time.Second,
36 }
37
38 pool, err := NewPool(config)
39 if err != nil {
40 t.Fatal(err)
41 }
42 defer pool.Close()
43
44 ctx := context.Background()
45 conn, err := pool.Get(ctx)
46 if err != nil {
47 t.Fatalf("Failed to get connection: %v", err)
48 }
49
50 if err := pool.Put(conn); err != nil {
51 t.Errorf("Failed to return connection: %v", err)
52 }
53}
54
55func TestPool_MaxConnections(t *testing.T) {
56 listener, err := net.Listen("tcp", "127.0.0.1:0")
57 if err != nil {
58 t.Fatal(err)
59 }
60 defer listener.Close()
61
62 go func() {
63 for {
64 conn, err := listener.Accept()
65 if err != nil {
66 return
67 }
68 go func(c net.Conn) {
69 time.Sleep(100 * time.Millisecond)
70 c.Close()
71 }(conn)
72 }
73 }()
74
75 config := Config{
76 Address: listener.Addr().String(),
77 MinConns: 0,
78 MaxConns: 2,
79 }
80
81 pool, err := NewPool(config)
82 if err != nil {
83 t.Fatal(err)
84 }
85 defer pool.Close()
86
87 ctx := context.Background()
88
89 // Get max connections
90 conn1, _ := pool.Get(ctx)
91 conn2, _ := pool.Get(ctx)
92
93 // Try to get third connection with short timeout
94 ctx3, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
95 defer cancel()
96
97 _, err = pool.Get(ctx3)
98 if err != context.DeadlineExceeded {
99 t.Errorf("Expected timeout, got: %v", err)
100 }
101
102 // Return connections
103 pool.Put(conn1)
104 pool.Put(conn2)
105}
106
107func TestPool_ConcurrentAccess(t *testing.T) {
108 listener, err := net.Listen("tcp", "127.0.0.1:0")
109 if err != nil {
110 t.Fatal(err)
111 }
112 defer listener.Close()
113
114 go func() {
115 for {
116 conn, err := listener.Accept()
117 if err != nil {
118 return
119 }
120 conn.Close()
121 }
122 }()
123
124 config := Config{
125 Address: listener.Addr().String(),
126 MinConns: 2,
127 MaxConns: 10,
128 }
129
130 pool, err := NewPool(config)
131 if err != nil {
132 t.Fatal(err)
133 }
134 defer pool.Close()
135
136 var wg sync.WaitGroup
137 errors := make(chan error, 50)
138
139 // Concurrent gets and puts
140 for i := 0; i < 50; i++ {
141 wg.Add(1)
142 go func() {
143 defer wg.Done()
144
145 ctx := context.Background()
146 conn, err := pool.Get(ctx)
147 if err != nil {
148 errors <- err
149 return
150 }
151
152 time.Sleep(10 * time.Millisecond)
153
154 if err := pool.Put(conn); err != nil {
155 errors <- err
156 }
157 }()
158 }
159
160 wg.Wait()
161 close(errors)
162
163 for err := range errors {
164 t.Errorf("Concurrent access error: %v", err)
165 }
166}
167
168func TestPool_HealthCheck(t *testing.T) {
169 listener, err := net.Listen("tcp", "127.0.0.1:0")
170 if err != nil {
171 t.Fatal(err)
172 }
173 defer listener.Close()
174
175 go func() {
176 for {
177 conn, err := listener.Accept()
178 if err != nil {
179 return
180 }
181 conn.Close()
182 }
183 }()
184
185 healthCheckCalled := false
186
187 config := Config{
188 Address: listener.Addr().String(),
189 MinConns: 1,
190 MaxConns: 5,
191 HealthCheckFunc: func(conn net.Conn) error {
192 healthCheckCalled = true
193 return nil
194 },
195 }
196
197 pool, err := NewPool(config)
198 if err != nil {
199 t.Fatal(err)
200 }
201 defer pool.Close()
202
203 ctx := context.Background()
204 conn, err := pool.Get(ctx)
205 if err != nil {
206 t.Fatal(err)
207 }
208
209 pool.Put(conn)
210
211 // Get again to trigger health check
212 _, err = pool.Get(ctx)
213 if err != nil {
214 t.Fatal(err)
215 }
216
217 if !healthCheckCalled {
218 t.Error("Health check was not called")
219 }
220}
221
222func TestPool_Stats(t *testing.T) {
223 listener, err := net.Listen("tcp", "127.0.0.1:0")
224 if err != nil {
225 t.Fatal(err)
226 }
227 defer listener.Close()
228
229 go func() {
230 for {
231 conn, err := listener.Accept()
232 if err != nil {
233 return
234 }
235 conn.Close()
236 }
237 }()
238
239 config := Config{
240 Address: listener.Addr().String(),
241 MinConns: 2,
242 MaxConns: 5,
243 }
244
245 pool, err := NewPool(config)
246 if err != nil {
247 t.Fatal(err)
248 }
249 defer pool.Close()
250
251 stats := pool.Stats()
252 if stats.TotalConns != 2 {
253 t.Errorf("Expected 2 total connections, got %d", stats.TotalConns)
254 }
255
256 ctx := context.Background()
257 conn, _ := pool.Get(ctx)
258
259 stats = pool.Stats()
260 if stats.ActiveConns != 1 {
261 t.Errorf("Expected 1 active connection, got %d", stats.ActiveConns)
262 }
263
264 pool.Put(conn)
265
266 stats = pool.Stats()
267 if stats.IdleConns != 2 {
268 t.Errorf("Expected 2 idle connections, got %d", stats.IdleConns)
269 }
270}
Bonus Challenges
- Connection Affinity: Pin connections to specific goroutines
- Load Balancing: Distribute connections across multiple servers
- Circuit Breaker: Stop creating connections after repeated failures
- Metrics Export: Export metrics to Prometheus
- Connection Warming: Pre-establish connections on startup
Key Takeaways
- Connection pooling reduces overhead and improves performance
- Health checking ensures connection validity
- Lifecycle management prevents resource leaks
- Condition variables enable efficient blocking/signaling
- Background workers maintain pool health
- Context support enables timeout and cancellation
Connection pooling is essential for high-performance networked applications. Production pools must handle connection failures, implement proper timeouts, and maintain connection health automatically.