Service Mesh Implementation

Exercise: Service Mesh Implementation

Difficulty - Advanced

Learning Objectives

  • Understand service mesh architecture patterns
  • Implement service discovery and registration
  • Build client-side load balancing
  • Practice health checking and circuit breaking
  • Implement distributed tracing

Problem Statement

Create a lightweight service mesh that provides service discovery, load balancing, and health checking for microservices.

Implementation

  1package servicemesh
  2
  3import (
  4	"context"
  5	"fmt"
  6	"net/http"
  7	"sync"
  8	"time"
  9)
 10
 11// Service represents a registered service
 12type Service struct {
 13	Name     string
 14	Address  string
 15	Port     int
 16	Health   HealthStatus
 17	Metadata map[string]string
 18}
 19
 20type HealthStatus string
 21
 22const (
 23	Healthy   HealthStatus = "healthy"
 24	Unhealthy HealthStatus = "unhealthy"
 25	Unknown   HealthStatus = "unknown"
 26)
 27
 28// ServiceRegistry manages service registration and discovery
 29type ServiceRegistry struct {
 30	mu       sync.RWMutex
 31	services map[string][]*Service
 32	watchers map[string][]chan *Service
 33}
 34
 35func NewServiceRegistry() *ServiceRegistry {
 36	return &ServiceRegistry{
 37		services: make(map[string][]*Service),
 38		watchers: make(map[string][]chan *Service),
 39	}
 40}
 41
 42// Register adds a service to the registry
 43func Register(service *Service) error {
 44	sr.mu.Lock()
 45	defer sr.mu.Unlock()
 46
 47	if _, exists := sr.services[service.Name]; !exists {
 48		sr.services[service.Name] = make([]*Service, 0)
 49	}
 50
 51	sr.services[service.Name] = append(sr.services[service.Name], service)
 52
 53	// Notify watchers
 54	if watchers, ok := sr.watchers[service.Name]; ok {
 55		for _, watcher := range watchers {
 56			select {
 57			case watcher <- service:
 58			default:
 59			}
 60		}
 61	}
 62
 63	return nil
 64}
 65
 66// Deregister removes a service from the registry
 67func Deregister(name, address string, port int) error {
 68	sr.mu.Lock()
 69	defer sr.mu.Unlock()
 70
 71	services, exists := sr.services[name]
 72	if !exists {
 73		return fmt.Errorf("service not found: %s", name)
 74	}
 75
 76	filtered := make([]*Service, 0)
 77	for _, s := range services {
 78		if s.Address != address || s.Port != port {
 79			filtered = append(filtered, s)
 80		}
 81	}
 82
 83	sr.services[name] = filtered
 84	return nil
 85}
 86
 87// Discover finds all healthy instances of a service
 88func Discover(name string) {
 89	sr.mu.RLock()
 90	defer sr.mu.RUnlock()
 91
 92	services, exists := sr.services[name]
 93	if !exists {
 94		return nil, fmt.Errorf("service not found: %s", name)
 95	}
 96
 97	healthy := make([]*Service, 0)
 98	for _, s := range services {
 99		if s.Health == Healthy {
100			healthy = append(healthy, s)
101		}
102	}
103
104	if len(healthy) == 0 {
105		return nil, fmt.Errorf("no healthy instances for service: %s", name)
106	}
107
108	return healthy, nil
109}
110
111// Watch subscribes to service changes
112func Watch(name string) <-chan *Service {
113	sr.mu.Lock()
114	defer sr.mu.Unlock()
115
116	ch := make(chan *Service, 10)
117	sr.watchers[name] = append(sr.watchers[name], ch)
118	return ch
119}
120
121// LoadBalancer provides client-side load balancing
122type LoadBalancer struct {
123	strategy string // round-robin, random, least-connections
124	registry *ServiceRegistry
125	counters map[string]int
126	mu       sync.Mutex
127}
128
129func NewLoadBalancer(strategy string, registry *ServiceRegistry) *LoadBalancer {
130	return &LoadBalancer{
131		strategy: strategy,
132		registry: registry,
133		counters: make(map[string]int),
134	}
135}
136
137// SelectInstance chooses a service instance based on strategy
138func SelectInstance(serviceName string) {
139	instances, err := lb.registry.Discover(serviceName)
140	if err != nil {
141		return nil, err
142	}
143
144	if len(instances) == 0 {
145		return nil, fmt.Errorf("no instances available for %s", serviceName)
146	}
147
148	switch lb.strategy {
149	case "round-robin":
150		lb.mu.Lock()
151		idx := lb.counters[serviceName] % len(instances)
152		lb.counters[serviceName]++
153		lb.mu.Unlock()
154		return instances[idx], nil
155
156	case "random":
157		idx := time.Now().UnixNano() % int64(len(instances))
158		return instances[idx], nil
159
160	default:
161		return instances[0], nil
162	}
163}
164
165// HealthChecker monitors service health
166type HealthChecker struct {
167	registry *ServiceRegistry
168	interval time.Duration
169	timeout  time.Duration
170}
171
172func NewHealthChecker(registry *ServiceRegistry, interval, timeout time.Duration) *HealthChecker {
173	return &HealthChecker{
174		registry: registry,
175		interval: interval,
176		timeout:  timeout,
177	}
178}
179
180// Start begins health checking
181func Start(ctx context.Context) {
182	ticker := time.NewTicker(hc.interval)
183	defer ticker.Stop()
184
185	for {
186		select {
187		case <-ctx.Done():
188			return
189		case <-ticker.C:
190			hc.checkAllServices()
191		}
192	}
193}
194
195func checkAllServices() {
196	hc.registry.mu.RLock()
197	services := make([]*Service, 0)
198	for _, serviceList := range hc.registry.services {
199		services = append(services, serviceList...)
200	}
201	hc.registry.mu.RUnlock()
202
203	for _, service := range services {
204		go hc.checkService(service)
205	}
206}
207
208func checkService(service *Service) {
209	ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
210	defer cancel()
211
212	url := fmt.Sprintf("http://%s:%d/health", service.Address, service.Port)
213	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
214	if err != nil {
215		service.Health = Unhealthy
216		return
217	}
218
219	resp, err := http.DefaultClient.Do(req)
220	if err != nil || resp.StatusCode != http.StatusOK {
221		service.Health = Unhealthy
222		return
223	}
224	defer resp.Body.Close()
225
226	service.Health = Healthy
227}
228
229// CircuitBreaker prevents cascading failures
230type CircuitBreaker struct {
231	mu           sync.Mutex
232	failures     int
233	threshold    int
234	timeout      time.Duration
235	state        string // closed, open, half-open
236	lastAttempt  time.Time
237}
238
239func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
240	return &CircuitBreaker{
241		threshold: threshold,
242		timeout:   timeout,
243		state:     "closed",
244	}
245}
246
247// Call executes a function with circuit breaker protection
248func Call(fn func() error) error {
249	cb.mu.Lock()
250	defer cb.mu.Unlock()
251
252	// Check if circuit is open
253	if cb.state == "open" {
254		if time.Since(cb.lastAttempt) > cb.timeout {
255			cb.state = "half-open"
256			cb.failures = 0
257		} else {
258			return fmt.Errorf("circuit breaker is open")
259		}
260	}
261
262	// Execute function
263	err := fn()
264	cb.lastAttempt = time.Now()
265
266	if err != nil {
267		cb.failures++
268		if cb.failures >= cb.threshold {
269			cb.state = "open"
270		}
271		return err
272	}
273
274	// Success - reset
275	cb.failures = 0
276	if cb.state == "half-open" {
277		cb.state = "closed"
278	}
279
280	return nil
281}

Key Takeaways

  1. Service Discovery: Central registry for dynamic service location
  2. Load Balancing: Distribute traffic across healthy instances
  3. Health Checking: Monitor service availability continuously
  4. Circuit Breaker: Prevent cascading failures
  5. Observability: Track service health and performance