TCP Connection Manager

Exercise: TCP Connection Manager

Difficulty - Intermediate

Learning Objectives

  • Implement connection pooling for TCP clients
  • Master connection lifecycle management
  • Handle connection health checking and recycling
  • Implement connection limits and timeouts
  • Build thread-safe connection acquisition/release
  • Handle graceful shutdown of connection pools

Problem Statement

Create a production-ready TCP connection pool that efficiently manages connections to remote servers, implements health checking, and handles connection failures gracefully. The pool should support:

  1. Connection pooling: Reuse connections to reduce overhead
  2. Connection limits: Maximum connections per pool
  3. Health checking: Validate connections before use
  4. Idle timeout: Close idle connections automatically
  5. Connection lifetime: Maximum connection age
  6. Thread-safe operations: Concurrent acquisition/release
  7. Graceful shutdown: Clean pool termination

Data Structures

 1package connpool
 2
 3import (
 4    "context"
 5    "net"
 6    "sync"
 7    "time"
 8)
 9
10// Connection wraps a network connection with metadata
11type Connection struct {
12    Conn      net.Conn
13    createdAt time.Time
14    lastUsed  time.Time
15    inUse     bool
16}
17
18// Pool manages a pool of network connections
19type Pool struct {
20    address         string
21    minConns        int
22    maxConns        int
23    maxIdleTime     time.Duration
24    maxLifetime     time.Duration
25    dialTimeout     time.Duration
26    healthCheckFunc HealthCheckFunc
27    connections     []*Connection
28    mu              sync.RWMutex
29    closed          bool
30    closeChan       chan struct{}
31    wg              sync.WaitGroup
32}
33
34// Config holds pool configuration
35type Config struct {
36    Address         string
37    MinConns        int
38    MaxConns        int
39    MaxIdleTime     time.Duration
40    MaxLifetime     time.Duration
41    DialTimeout     time.Duration
42    HealthCheckFunc HealthCheckFunc
43}
44
45// HealthCheckFunc validates a connection
46type HealthCheckFunc func(conn net.Conn) error
47
48// Stats holds pool statistics
49type Stats struct {
50    TotalConns    int
51    IdleConns     int
52    ActiveConns   int
53    WaitingCount  int
54    HitCount      int64
55    MissCount     int64
56    TimeoutCount  int64
57}

Function Signatures

 1// NewPool creates a new connection pool
 2func NewPool(config Config)
 3
 4// Get acquires a connection from the pool
 5func Get(ctx context.Context)
 6
 7// Put returns a connection to the pool
 8func Put(conn *Connection) error
 9
10// Close closes the pool and all connections
11func Close() error
12
13// Stats returns current pool statistics
14func Stats() Stats
15
16// healthCheck validates a connection
17func healthCheck(conn *Connection) bool
18
19// createConnection creates a new connection
20func createConnection()
21
22// reaper removes idle and expired connections
23func reaper()
24
25// maintainMinConnections ensures minimum connections
26func maintainMinConnections()

Example Usage

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "log"
 7    "net"
 8    "time"
 9    "connpool"
10)
11
12func main() {
13    // Configure connection pool
14    config := connpool.Config{
15        Address:     "localhost:6379",
16        MinConns:    2,
17        MaxConns:    10,
18        MaxIdleTime: 5 * time.Minute,
19        MaxLifetime: 30 * time.Minute,
20        DialTimeout: 5 * time.Second,
21        HealthCheckFunc: func(conn net.Conn) error {
22            // Simple ping-pong health check
23            conn.SetDeadline(time.Now().Add(2 * time.Second))
24            defer conn.SetDeadline(time.Time{})
25
26            _, err := conn.Write([]byte("PING\r\n"))
27            if err != nil {
28                return err
29            }
30
31            buf := make([]byte, 5)
32            _, err = conn.Read(buf)
33            return err
34        },
35    }
36
37    // Create pool
38    pool, err := connpool.NewPool(config)
39    if err != nil {
40        log.Fatal(err)
41    }
42    defer pool.Close()
43
44    // Use connection
45    ctx := context.Background()
46    conn, err := pool.Get(ctx)
47    if err != nil {
48        log.Fatal(err)
49    }
50
51    // Use the connection
52    _, err = conn.Conn.Write([]byte("GET key\r\n"))
53    if err != nil {
54        log.Printf("Write error: %v", err)
55        conn.Conn.Close() // Don't return bad connection
56        return
57    }
58
59    // Read response
60    buf := make([]byte, 1024)
61    n, err := conn.Conn.Read(buf)
62    if err != nil {
63        log.Printf("Read error: %v", err)
64        conn.Conn.Close()
65        return
66    }
67
68    fmt.Printf("Response: %s\n", buf[:n])
69
70    // Return connection to pool
71    if err := pool.Put(conn); err != nil {
72        log.Printf("Failed to return connection: %v", err)
73    }
74
75    // View pool statistics
76    stats := pool.Stats()
77    fmt.Printf("Pool stats: %+v\n", stats)
78}

Requirements

  1. Pool must maintain minimum connections at all times
  2. Never exceed maximum connection limit
  3. Implement connection health checking before use
  4. Remove idle connections exceeding idle timeout
  5. Remove connections exceeding maximum lifetime
  6. Support context-based timeouts for acquisition
  7. Thread-safe connection management

Solution

Click to see the complete solution
  1package connpool
  2
  3import (
  4    "context"
  5    "errors"
  6    "fmt"
  7    "net"
  8    "sync"
  9    "sync/atomic"
 10    "time"
 11)
 12
 13var (
 14    ErrPoolClosed      = errors.New("pool is closed")
 15    ErrTimeout         = errors.New("timeout acquiring connection")
 16    ErrMaxConns        = errors.New("maximum connections reached")
 17    ErrInvalidConn     = errors.New("invalid connection")
 18)
 19
 20// Connection wraps a network connection with metadata
 21type Connection struct {
 22    Conn      net.Conn
 23    createdAt time.Time
 24    lastUsed  time.Time
 25    inUse     bool
 26}
 27
 28// Pool manages a pool of network connections
 29type Pool struct {
 30    address         string
 31    minConns        int
 32    maxConns        int
 33    maxIdleTime     time.Duration
 34    maxLifetime     time.Duration
 35    dialTimeout     time.Duration
 36    healthCheckFunc HealthCheckFunc
 37    connections     []*Connection
 38    mu              sync.RWMutex
 39    cond            *sync.Cond
 40    closed          bool
 41    closeChan       chan struct{}
 42    wg              sync.WaitGroup
 43
 44    // Statistics
 45    hitCount     int64
 46    missCount    int64
 47    timeoutCount int64
 48}
 49
 50// Config holds pool configuration
 51type Config struct {
 52    Address         string
 53    MinConns        int
 54    MaxConns        int
 55    MaxIdleTime     time.Duration
 56    MaxLifetime     time.Duration
 57    DialTimeout     time.Duration
 58    HealthCheckFunc HealthCheckFunc
 59}
 60
 61// HealthCheckFunc validates a connection
 62type HealthCheckFunc func(conn net.Conn) error
 63
 64// Stats holds pool statistics
 65type Stats struct {
 66    TotalConns   int
 67    IdleConns    int
 68    ActiveConns  int
 69    HitCount     int64
 70    MissCount    int64
 71    TimeoutCount int64
 72}
 73
 74// NewPool creates a new connection pool
 75func NewPool(config Config) {
 76    if config.MinConns < 0 {
 77        config.MinConns = 0
 78    }
 79    if config.MaxConns <= 0 {
 80        config.MaxConns = 10
 81    }
 82    if config.MinConns > config.MaxConns {
 83        config.MinConns = config.MaxConns
 84    }
 85    if config.MaxIdleTime <= 0 {
 86        config.MaxIdleTime = 5 * time.Minute
 87    }
 88    if config.MaxLifetime <= 0 {
 89        config.MaxLifetime = 30 * time.Minute
 90    }
 91    if config.DialTimeout <= 0 {
 92        config.DialTimeout = 5 * time.Second
 93    }
 94
 95    pool := &Pool{
 96        address:         config.Address,
 97        minConns:        config.MinConns,
 98        maxConns:        config.MaxConns,
 99        maxIdleTime:     config.MaxIdleTime,
100        maxLifetime:     config.MaxLifetime,
101        dialTimeout:     config.DialTimeout,
102        healthCheckFunc: config.HealthCheckFunc,
103        connections:     make([]*Connection, 0, config.MaxConns),
104        closeChan:       make(chan struct{}),
105    }
106
107    pool.cond = sync.NewCond(&pool.mu)
108
109    // Initialize minimum connections
110    for i := 0; i < config.MinConns; i++ {
111        conn, err := pool.createConnection()
112        if err != nil {
113            // Close any connections created so far
114            pool.Close()
115            return nil, fmt.Errorf("failed to create initial connection: %w", err)
116        }
117        pool.connections = append(pool.connections, conn)
118    }
119
120    // Start background workers
121    pool.wg.Add(2)
122    go pool.reaper()
123    go pool.maintainMinConnections()
124
125    return pool, nil
126}
127
128// Get acquires a connection from the pool
129func Get(ctx context.Context) {
130    p.mu.Lock()
131    defer p.mu.Unlock()
132
133    // Check if pool is closed
134    if p.closed {
135        return nil, ErrPoolClosed
136    }
137
138    for {
139        // Try to find an idle connection
140        for i, conn := range p.connections {
141            if !conn.inUse {
142                // Check if connection is still healthy
143                if p.isConnectionValid(conn) {
144                    conn.inUse = true
145                    conn.lastUsed = time.Now()
146                    atomic.AddInt64(&p.hitCount, 1)
147                    return conn, nil
148                }
149
150                // Connection is invalid, remove it
151                p.removeConnection(i)
152                break
153            }
154        }
155
156        // No idle connection found, try to create a new one
157        if len(p.connections) < p.maxConns {
158            atomic.AddInt64(&p.missCount, 1)
159            p.mu.Unlock()
160
161            conn, err := p.createConnection()
162            if err != nil {
163                p.mu.Lock()
164                return nil, err
165            }
166
167            p.mu.Lock()
168            if p.closed {
169                conn.Conn.Close()
170                return nil, ErrPoolClosed
171            }
172
173            conn.inUse = true
174            conn.lastUsed = time.Now()
175            p.connections = append(p.connections, conn)
176            return conn, nil
177        }
178
179        // Pool is full, wait for a connection to be released
180        select {
181        case <-ctx.Done():
182            atomic.AddInt64(&p.timeoutCount, 1)
183            return nil, ctx.Err()
184        default:
185        }
186
187        // Wait with timeout
188        done := make(chan struct{})
189        go func() {
190            p.cond.Wait()
191            close(done)
192        }()
193
194        p.mu.Unlock()
195        select {
196        case <-ctx.Done():
197            p.mu.Lock()
198            p.cond.Signal() // Wake up another waiter
199            atomic.AddInt64(&p.timeoutCount, 1)
200            return nil, ctx.Err()
201        case <-done:
202            p.mu.Lock()
203        }
204
205        // Check if pool was closed while waiting
206        if p.closed {
207            return nil, ErrPoolClosed
208        }
209    }
210}
211
212// Put returns a connection to the pool
213func Put(conn *Connection) error {
214    if conn == nil {
215        return ErrInvalidConn
216    }
217
218    p.mu.Lock()
219    defer p.mu.Unlock()
220
221    if p.closed {
222        conn.Conn.Close()
223        return ErrPoolClosed
224    }
225
226    // Find the connection in the pool
227    found := false
228    for _, c := range p.connections {
229        if c == conn {
230            found = true
231            break
232        }
233    }
234
235    if !found {
236        conn.Conn.Close()
237        return ErrInvalidConn
238    }
239
240    // Mark as not in use
241    conn.inUse = false
242    conn.lastUsed = time.Now()
243
244    // Signal waiting goroutines
245    p.cond.Signal()
246
247    return nil
248}
249
250// Close closes the pool and all connections
251func Close() error {
252    p.mu.Lock()
253    if p.closed {
254        p.mu.Unlock()
255        return ErrPoolClosed
256    }
257
258    p.closed = true
259    close(p.closeChan)
260
261    // Close all connections
262    for _, conn := range p.connections {
263        if conn.Conn != nil {
264            conn.Conn.Close()
265        }
266    }
267    p.connections = nil
268
269    // Wake up all waiting goroutines
270    p.cond.Broadcast()
271    p.mu.Unlock()
272
273    // Wait for background workers to finish
274    p.wg.Wait()
275
276    return nil
277}
278
279// Stats returns current pool statistics
280func Stats() Stats {
281    p.mu.RLock()
282    defer p.mu.RUnlock()
283
284    stats := Stats{
285        TotalConns:   len(p.connections),
286        HitCount:     atomic.LoadInt64(&p.hitCount),
287        MissCount:    atomic.LoadInt64(&p.missCount),
288        TimeoutCount: atomic.LoadInt64(&p.timeoutCount),
289    }
290
291    for _, conn := range p.connections {
292        if conn.inUse {
293            stats.ActiveConns++
294        } else {
295            stats.IdleConns++
296        }
297    }
298
299    return stats
300}
301
302// createConnection creates a new connection
303func createConnection() {
304    dialer := &net.Dialer{
305        Timeout: p.dialTimeout,
306    }
307
308    conn, err := dialer.Dial("tcp", p.address)
309    if err != nil {
310        return nil, fmt.Errorf("failed to dial: %w", err)
311    }
312
313    return &Connection{
314        Conn:      conn,
315        createdAt: time.Now(),
316        lastUsed:  time.Now(),
317        inUse:     false,
318    }, nil
319}
320
321// isConnectionValid checks if a connection is still valid
322func isConnectionValid(conn *Connection) bool {
323    // Check lifetime
324    if time.Since(conn.createdAt) > p.maxLifetime {
325        return false
326    }
327
328    // Check idle time
329    if time.Since(conn.lastUsed) > p.maxIdleTime {
330        return false
331    }
332
333    // Run health check if provided
334    if p.healthCheckFunc != nil {
335        if err := p.healthCheckFunc(conn.Conn); err != nil {
336            return false
337        }
338    }
339
340    return true
341}
342
343// removeConnection removes a connection from the pool
344func removeConnection(index int) {
345    conn := p.connections[index]
346    if conn.Conn != nil {
347        conn.Conn.Close()
348    }
349
350    // Remove from slice
351    p.connections = append(p.connections[:index], p.connections[index+1:]...)
352}
353
354// reaper removes idle and expired connections
355func reaper() {
356    defer p.wg.Done()
357
358    ticker := time.NewTicker(30 * time.Second)
359    defer ticker.Stop()
360
361    for {
362        select {
363        case <-p.closeChan:
364            return
365        case <-ticker.C:
366            p.mu.Lock()
367
368            // Remove invalid idle connections
369            i := 0
370            for i < len(p.connections) {
371                conn := p.connections[i]
372
373                if !conn.inUse && !p.isConnectionValid(conn) {
374                    // Only remove if we have more than minimum connections
375                    if len(p.connections) > p.minConns {
376                        p.removeConnection(i)
377                        continue
378                    }
379                }
380                i++
381            }
382
383            p.mu.Unlock()
384        }
385    }
386}
387
388// maintainMinConnections ensures minimum connections
389func maintainMinConnections() {
390    defer p.wg.Done()
391
392    ticker := time.NewTicker(10 * time.Second)
393    defer ticker.Stop()
394
395    for {
396        select {
397        case <-p.closeChan:
398            return
399        case <-ticker.C:
400            p.mu.Lock()
401
402            // Create connections if below minimum
403            for len(p.connections) < p.minConns {
404                p.mu.Unlock()
405
406                conn, err := p.createConnection()
407                if err != nil {
408                    // Log error and try again later
409                    p.mu.Lock()
410                    break
411                }
412
413                p.mu.Lock()
414                if p.closed {
415                    conn.Conn.Close()
416                    break
417                }
418
419                p.connections = append(p.connections, conn)
420            }
421
422            p.mu.Unlock()
423        }
424    }
425}

Explanation

Pool Structure:

  • address: Target server address
  • minConns/maxConns: Connection limits
  • maxIdleTime: Maximum idle duration before recycling
  • maxLifetime: Maximum connection age
  • connections: Slice of managed connections
  • cond: Condition variable for blocking/signaling
  • closeChan: Signal channel for shutdown

NewPool:

  • Validates and sets default configuration
  • Creates initial minimum connections
  • Starts background workers
  • Returns error if initial connections fail

Get:

  • Locks pool and searches for idle connection
  • Validates connection health before returning
  • Creates new connection if pool not full
  • Waits on condition variable if pool full
  • Supports context cancellation for timeout
  • Updates statistics

Put:

  • Validates connection belongs to pool
  • Marks connection as not in use
  • Updates last used timestamp
  • Signals waiting goroutines via condition variable
  • Closes connection if pool is closed

Connection Validation:

  • Checks connection lifetime
  • Checks idle duration
  • Runs health check function if provided
  • Returns false if any check fails

Reaper:

  • Runs periodically
  • Removes invalid idle connections
  • Respects minimum connection count
  • Stops on pool closure

Maintainer:

  • Runs periodically
  • Creates connections if below minimum
  • Handles creation errors gracefully
  • Stops on pool closure

Statistics:

  • Hit count: Reused existing connection
  • Miss count: Created new connection
  • Timeout count: Failed to acquire within timeout
  • Active/idle connection counts

Connection Pool Patterns

Pattern 1: Per-Request Connection

 1func handleRequest(pool *Pool) error {
 2    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 3    defer cancel()
 4
 5    conn, err := pool.Get(ctx)
 6    if err != nil {
 7        return err
 8    }
 9    defer pool.Put(conn)
10
11    // Use connection
12    return nil
13}

Pattern 2: Connection Wrapping

 1type PooledConn struct {
 2    *Connection
 3    pool *Pool
 4}
 5
 6func Close() error {
 7    return pc.pool.Put(pc.Connection)
 8}
 9
10// Usage
11conn, _ := pool.Get(ctx)
12pc := &PooledConn{Connection: conn, pool: pool}
13defer pc.Close()

Pattern 3: Lazy Initialization

 1var (
 2    poolOnce sync.Once
 3    pool     *Pool
 4)
 5
 6func getPool() *Pool {
 7    poolOnce.Do(func() {
 8        pool, _ = NewPool(config)
 9    })
10    return pool
11}

Pattern 4: Multi-Pool Manager

 1type PoolManager struct {
 2    pools map[string]*Pool
 3    mu    sync.RWMutex
 4}
 5
 6func GetPool(address string) {
 7    pm.mu.RLock()
 8    pool, exists := pm.pools[address]
 9    pm.mu.RUnlock()
10
11    if exists {
12        return pool, nil
13    }
14
15    // Create new pool
16    pm.mu.Lock()
17    defer pm.mu.Unlock()
18
19    pool, err := NewPool(Config{Address: address})
20    if err != nil {
21        return nil, err
22    }
23
24    pm.pools[address] = pool
25    return pool, nil
26}

Best Practices

1. Always use context with timeout:

1ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2defer cancel()
3
4conn, err := pool.Get(ctx)
5if err != nil {
6    return err
7}
8defer pool.Put(conn)

2. Handle connection errors gracefully:

 1conn, err := pool.Get(ctx)
 2if err != nil {
 3    return err
 4}
 5
 6_, err = conn.Conn.Write(data)
 7if err != nil {
 8    conn.Conn.Close() // Don't return bad connection
 9    return err
10}
11
12pool.Put(conn) // Return healthy connection

3. Implement health checks:

 1healthCheck := func(conn net.Conn) error {
 2    conn.SetDeadline(time.Now().Add(2 * time.Second))
 3    defer conn.SetDeadline(time.Time{})
 4
 5    // Send ping
 6    _, err := conn.Write([]byte("PING\r\n"))
 7    if err != nil {
 8        return err
 9    }
10
11    // Wait for pong
12    buf := make([]byte, 5)
13    _, err = conn.Read(buf)
14    return err
15}

4. Monitor pool metrics:

 1func monitorPool(pool *Pool) {
 2    ticker := time.NewTicker(1 * time.Minute)
 3    defer ticker.Stop()
 4
 5    for range ticker.C {
 6        stats := pool.Stats()
 7        log.Printf("Pool stats: %+v", stats)
 8
 9        // Alert on issues
10        if stats.TimeoutCount > 100 {
11            log.Printf("High timeout count: %d", stats.TimeoutCount)
12        }
13    }
14}

Test Cases

  1package connpool
  2
  3import (
  4    "context"
  5    "net"
  6    "sync"
  7    "testing"
  8    "time"
  9)
 10
 11func TestPool_BasicOperations(t *testing.T) {
 12    // Start test server
 13    listener, err := net.Listen("tcp", "127.0.0.1:0")
 14    if err != nil {
 15        t.Fatal(err)
 16    }
 17    defer listener.Close()
 18
 19    go func() {
 20        for {
 21            conn, err := listener.Accept()
 22            if err != nil {
 23                return
 24            }
 25            conn.Close()
 26        }
 27    }()
 28
 29    config := Config{
 30        Address:     listener.Addr().String(),
 31        MinConns:    2,
 32        MaxConns:    5,
 33        MaxIdleTime: 1 * time.Minute,
 34        MaxLifetime: 5 * time.Minute,
 35        DialTimeout: 5 * time.Second,
 36    }
 37
 38    pool, err := NewPool(config)
 39    if err != nil {
 40        t.Fatal(err)
 41    }
 42    defer pool.Close()
 43
 44    ctx := context.Background()
 45    conn, err := pool.Get(ctx)
 46    if err != nil {
 47        t.Fatalf("Failed to get connection: %v", err)
 48    }
 49
 50    if err := pool.Put(conn); err != nil {
 51        t.Errorf("Failed to return connection: %v", err)
 52    }
 53}
 54
 55func TestPool_MaxConnections(t *testing.T) {
 56    listener, err := net.Listen("tcp", "127.0.0.1:0")
 57    if err != nil {
 58        t.Fatal(err)
 59    }
 60    defer listener.Close()
 61
 62    go func() {
 63        for {
 64            conn, err := listener.Accept()
 65            if err != nil {
 66                return
 67            }
 68            go func(c net.Conn) {
 69                time.Sleep(100 * time.Millisecond)
 70                c.Close()
 71            }(conn)
 72        }
 73    }()
 74
 75    config := Config{
 76        Address:  listener.Addr().String(),
 77        MinConns: 0,
 78        MaxConns: 2,
 79    }
 80
 81    pool, err := NewPool(config)
 82    if err != nil {
 83        t.Fatal(err)
 84    }
 85    defer pool.Close()
 86
 87    ctx := context.Background()
 88
 89    // Get max connections
 90    conn1, _ := pool.Get(ctx)
 91    conn2, _ := pool.Get(ctx)
 92
 93    // Try to get third connection with short timeout
 94    ctx3, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
 95    defer cancel()
 96
 97    _, err = pool.Get(ctx3)
 98    if err != context.DeadlineExceeded {
 99        t.Errorf("Expected timeout, got: %v", err)
100    }
101
102    // Return connections
103    pool.Put(conn1)
104    pool.Put(conn2)
105}
106
107func TestPool_ConcurrentAccess(t *testing.T) {
108    listener, err := net.Listen("tcp", "127.0.0.1:0")
109    if err != nil {
110        t.Fatal(err)
111    }
112    defer listener.Close()
113
114    go func() {
115        for {
116            conn, err := listener.Accept()
117            if err != nil {
118                return
119            }
120            conn.Close()
121        }
122    }()
123
124    config := Config{
125        Address:  listener.Addr().String(),
126        MinConns: 2,
127        MaxConns: 10,
128    }
129
130    pool, err := NewPool(config)
131    if err != nil {
132        t.Fatal(err)
133    }
134    defer pool.Close()
135
136    var wg sync.WaitGroup
137    errors := make(chan error, 50)
138
139    // Concurrent gets and puts
140    for i := 0; i < 50; i++ {
141        wg.Add(1)
142        go func() {
143            defer wg.Done()
144
145            ctx := context.Background()
146            conn, err := pool.Get(ctx)
147            if err != nil {
148                errors <- err
149                return
150            }
151
152            time.Sleep(10 * time.Millisecond)
153
154            if err := pool.Put(conn); err != nil {
155                errors <- err
156            }
157        }()
158    }
159
160    wg.Wait()
161    close(errors)
162
163    for err := range errors {
164        t.Errorf("Concurrent access error: %v", err)
165    }
166}
167
168func TestPool_HealthCheck(t *testing.T) {
169    listener, err := net.Listen("tcp", "127.0.0.1:0")
170    if err != nil {
171        t.Fatal(err)
172    }
173    defer listener.Close()
174
175    go func() {
176        for {
177            conn, err := listener.Accept()
178            if err != nil {
179                return
180            }
181            conn.Close()
182        }
183    }()
184
185    healthCheckCalled := false
186
187    config := Config{
188        Address:  listener.Addr().String(),
189        MinConns: 1,
190        MaxConns: 5,
191        HealthCheckFunc: func(conn net.Conn) error {
192            healthCheckCalled = true
193            return nil
194        },
195    }
196
197    pool, err := NewPool(config)
198    if err != nil {
199        t.Fatal(err)
200    }
201    defer pool.Close()
202
203    ctx := context.Background()
204    conn, err := pool.Get(ctx)
205    if err != nil {
206        t.Fatal(err)
207    }
208
209    pool.Put(conn)
210
211    // Get again to trigger health check
212    _, err = pool.Get(ctx)
213    if err != nil {
214        t.Fatal(err)
215    }
216
217    if !healthCheckCalled {
218        t.Error("Health check was not called")
219    }
220}
221
222func TestPool_Stats(t *testing.T) {
223    listener, err := net.Listen("tcp", "127.0.0.1:0")
224    if err != nil {
225        t.Fatal(err)
226    }
227    defer listener.Close()
228
229    go func() {
230        for {
231            conn, err := listener.Accept()
232            if err != nil {
233                return
234            }
235            conn.Close()
236        }
237    }()
238
239    config := Config{
240        Address:  listener.Addr().String(),
241        MinConns: 2,
242        MaxConns: 5,
243    }
244
245    pool, err := NewPool(config)
246    if err != nil {
247        t.Fatal(err)
248    }
249    defer pool.Close()
250
251    stats := pool.Stats()
252    if stats.TotalConns != 2 {
253        t.Errorf("Expected 2 total connections, got %d", stats.TotalConns)
254    }
255
256    ctx := context.Background()
257    conn, _ := pool.Get(ctx)
258
259    stats = pool.Stats()
260    if stats.ActiveConns != 1 {
261        t.Errorf("Expected 1 active connection, got %d", stats.ActiveConns)
262    }
263
264    pool.Put(conn)
265
266    stats = pool.Stats()
267    if stats.IdleConns != 2 {
268        t.Errorf("Expected 2 idle connections, got %d", stats.IdleConns)
269    }
270}

Bonus Challenges

  1. Connection Affinity: Pin connections to specific goroutines
  2. Load Balancing: Distribute connections across multiple servers
  3. Circuit Breaker: Stop creating connections after repeated failures
  4. Metrics Export: Export metrics to Prometheus
  5. Connection Warming: Pre-establish connections on startup

Key Takeaways

  • Connection pooling reduces overhead and improves performance
  • Health checking ensures connection validity
  • Lifecycle management prevents resource leaks
  • Condition variables enable efficient blocking/signaling
  • Background workers maintain pool health
  • Context support enables timeout and cancellation

Connection pooling is essential for high-performance networked applications. Production pools must handle connection failures, implement proper timeouts, and maintain connection health automatically.