Exercise: Service Mesh Implementation
Difficulty - Advanced
Learning Objectives
- Understand service mesh architecture patterns
- Implement service discovery and registration
- Build client-side load balancing
- Practice health checking and circuit breaking
- Implement distributed tracing
Problem Statement
Create a lightweight service mesh that provides service discovery, load balancing, and health checking for microservices.
Implementation
1package servicemesh
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "sync"
8 "time"
9)
10
11// Service represents a registered service
12type Service struct {
13 Name string
14 Address string
15 Port int
16 Health HealthStatus
17 Metadata map[string]string
18}
19
20type HealthStatus string
21
22const (
23 Healthy HealthStatus = "healthy"
24 Unhealthy HealthStatus = "unhealthy"
25 Unknown HealthStatus = "unknown"
26)
27
28// ServiceRegistry manages service registration and discovery
29type ServiceRegistry struct {
30 mu sync.RWMutex
31 services map[string][]*Service
32 watchers map[string][]chan *Service
33}
34
35func NewServiceRegistry() *ServiceRegistry {
36 return &ServiceRegistry{
37 services: make(map[string][]*Service),
38 watchers: make(map[string][]chan *Service),
39 }
40}
41
42// Register adds a service to the registry
43func Register(service *Service) error {
44 sr.mu.Lock()
45 defer sr.mu.Unlock()
46
47 if _, exists := sr.services[service.Name]; !exists {
48 sr.services[service.Name] = make([]*Service, 0)
49 }
50
51 sr.services[service.Name] = append(sr.services[service.Name], service)
52
53 // Notify watchers
54 if watchers, ok := sr.watchers[service.Name]; ok {
55 for _, watcher := range watchers {
56 select {
57 case watcher <- service:
58 default:
59 }
60 }
61 }
62
63 return nil
64}
65
66// Deregister removes a service from the registry
67func Deregister(name, address string, port int) error {
68 sr.mu.Lock()
69 defer sr.mu.Unlock()
70
71 services, exists := sr.services[name]
72 if !exists {
73 return fmt.Errorf("service not found: %s", name)
74 }
75
76 filtered := make([]*Service, 0)
77 for _, s := range services {
78 if s.Address != address || s.Port != port {
79 filtered = append(filtered, s)
80 }
81 }
82
83 sr.services[name] = filtered
84 return nil
85}
86
87// Discover finds all healthy instances of a service
88func Discover(name string) {
89 sr.mu.RLock()
90 defer sr.mu.RUnlock()
91
92 services, exists := sr.services[name]
93 if !exists {
94 return nil, fmt.Errorf("service not found: %s", name)
95 }
96
97 healthy := make([]*Service, 0)
98 for _, s := range services {
99 if s.Health == Healthy {
100 healthy = append(healthy, s)
101 }
102 }
103
104 if len(healthy) == 0 {
105 return nil, fmt.Errorf("no healthy instances for service: %s", name)
106 }
107
108 return healthy, nil
109}
110
111// Watch subscribes to service changes
112func Watch(name string) <-chan *Service {
113 sr.mu.Lock()
114 defer sr.mu.Unlock()
115
116 ch := make(chan *Service, 10)
117 sr.watchers[name] = append(sr.watchers[name], ch)
118 return ch
119}
120
121// LoadBalancer provides client-side load balancing
122type LoadBalancer struct {
123 strategy string // round-robin, random, least-connections
124 registry *ServiceRegistry
125 counters map[string]int
126 mu sync.Mutex
127}
128
129func NewLoadBalancer(strategy string, registry *ServiceRegistry) *LoadBalancer {
130 return &LoadBalancer{
131 strategy: strategy,
132 registry: registry,
133 counters: make(map[string]int),
134 }
135}
136
137// SelectInstance chooses a service instance based on strategy
138func SelectInstance(serviceName string) {
139 instances, err := lb.registry.Discover(serviceName)
140 if err != nil {
141 return nil, err
142 }
143
144 if len(instances) == 0 {
145 return nil, fmt.Errorf("no instances available for %s", serviceName)
146 }
147
148 switch lb.strategy {
149 case "round-robin":
150 lb.mu.Lock()
151 idx := lb.counters[serviceName] % len(instances)
152 lb.counters[serviceName]++
153 lb.mu.Unlock()
154 return instances[idx], nil
155
156 case "random":
157 idx := time.Now().UnixNano() % int64(len(instances))
158 return instances[idx], nil
159
160 default:
161 return instances[0], nil
162 }
163}
164
165// HealthChecker monitors service health
166type HealthChecker struct {
167 registry *ServiceRegistry
168 interval time.Duration
169 timeout time.Duration
170}
171
172func NewHealthChecker(registry *ServiceRegistry, interval, timeout time.Duration) *HealthChecker {
173 return &HealthChecker{
174 registry: registry,
175 interval: interval,
176 timeout: timeout,
177 }
178}
179
180// Start begins health checking
181func Start(ctx context.Context) {
182 ticker := time.NewTicker(hc.interval)
183 defer ticker.Stop()
184
185 for {
186 select {
187 case <-ctx.Done():
188 return
189 case <-ticker.C:
190 hc.checkAllServices()
191 }
192 }
193}
194
195func checkAllServices() {
196 hc.registry.mu.RLock()
197 services := make([]*Service, 0)
198 for _, serviceList := range hc.registry.services {
199 services = append(services, serviceList...)
200 }
201 hc.registry.mu.RUnlock()
202
203 for _, service := range services {
204 go hc.checkService(service)
205 }
206}
207
208func checkService(service *Service) {
209 ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
210 defer cancel()
211
212 url := fmt.Sprintf("http://%s:%d/health", service.Address, service.Port)
213 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
214 if err != nil {
215 service.Health = Unhealthy
216 return
217 }
218
219 resp, err := http.DefaultClient.Do(req)
220 if err != nil || resp.StatusCode != http.StatusOK {
221 service.Health = Unhealthy
222 return
223 }
224 defer resp.Body.Close()
225
226 service.Health = Healthy
227}
228
229// CircuitBreaker prevents cascading failures
230type CircuitBreaker struct {
231 mu sync.Mutex
232 failures int
233 threshold int
234 timeout time.Duration
235 state string // closed, open, half-open
236 lastAttempt time.Time
237}
238
239func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
240 return &CircuitBreaker{
241 threshold: threshold,
242 timeout: timeout,
243 state: "closed",
244 }
245}
246
247// Call executes a function with circuit breaker protection
248func Call(fn func() error) error {
249 cb.mu.Lock()
250 defer cb.mu.Unlock()
251
252 // Check if circuit is open
253 if cb.state == "open" {
254 if time.Since(cb.lastAttempt) > cb.timeout {
255 cb.state = "half-open"
256 cb.failures = 0
257 } else {
258 return fmt.Errorf("circuit breaker is open")
259 }
260 }
261
262 // Execute function
263 err := fn()
264 cb.lastAttempt = time.Now()
265
266 if err != nil {
267 cb.failures++
268 if cb.failures >= cb.threshold {
269 cb.state = "open"
270 }
271 return err
272 }
273
274 // Success - reset
275 cb.failures = 0
276 if cb.state == "half-open" {
277 cb.state = "closed"
278 }
279
280 return nil
281}
Key Takeaways
- Service Discovery: Central registry for dynamic service location
- Load Balancing: Distribute traffic across healthy instances
- Health Checking: Monitor service availability continuously
- Circuit Breaker: Prevent cascading failures
- Observability: Track service health and performance
Related Topics
- Service Mesh - Main service mesh tutorial
- Microservices - Microservice architecture
- Distributed Systems - Distributed patterns