📖 Prerequisite: Web Development - Understanding HTTP servers and routing is essential for building microservices
Consider a large restaurant where instead of one massive kitchen trying to cook everything, you have specialized stations: a grill station, a salad station, a dessert station, and a beverage station. Each station operates independently, has its own staff, and focuses on doing one thing exceptionally well. This is the essence of microservices architecture - breaking down a monolithic application into small, specialized services that work together.
Microservices architecture decomposes applications into small, independent services that communicate over networks. This guide covers building production-ready microservices in Go, including service discovery, communication patterns, distributed transactions, and deployment strategies.
💡 Key Takeaway: Microservices trade the simplicity of a single deployment for the flexibility of independent, scalable services that can be developed, deployed, and maintained separately.
Microservices Architecture Principles
Think of microservices like building a city with specialized districts rather than one massive building. Each district has a clear purpose, its own resources, and connects to others through well-defined roads.
Understanding microservices principles guides effective service design.
When to Use Microservices:
- ✅ Large teams needing independent deployment
- ✅ Different parts of system have vastly different scaling requirements
- ✅ Need to experiment with different technologies for different problems
- ✅ System complexity justifies the distributed system overhead
When NOT to Use Microservices:
- ❌ Small team - monolith is faster to build
- ❌ Startup/MVP - distributed systems add enormous complexity
- ❌ Simple CRUD application - microservices overkill
- ❌ Don't have expertise in distributed systems, monitoring, orchestration
⚠️ Important: The microservices approach introduces significant operational complexity. Start with a monolith and break it into services only when you have clear pain points that microservices can solve.
Critical Trade-offs:
- Monolith: Simple deployment, easy debugging, but scales as one unit
- Microservices: Independent scaling and deployment, but complex operations, network latency, data consistency challenges
Real-world Example: Netflix evolved from a monolithic DVD rental system to thousands of microservices handling recommendations, billing, video streaming, and user profiles independently. This allowed them to scale different parts of their system based on specific needs.
Core Principles
1// run
2package main
3
4import (
5 "fmt"
6)
7
8func main() {
9 fmt.Println("Microservices Architecture Principles")
10 fmt.Println("====================================\n")
11
12 principles := []struct {
13 name string
14 description string
15 example string
16 }{
17 {
18 "Single Responsibility",
19 "Each service handles one business capability",
20 "UserService manages only user-related operations",
21 },
22 {
23 "Decentralized Data",
24 "Each service owns its database",
25 "OrderService has its own order database",
26 },
27 {
28 "Independent Deployment",
29 "Services deploy independently",
30 "Update ProductService without touching OrderService",
31 },
32 {
33 "Fault Isolation",
34 "Failure in one service doesn't crash others",
35 "Payment failure doesn't prevent browsing",
36 },
37 {
38 "Technology Diversity",
39 "Services can use different tech stacks",
40 "UserService in Go, RecommendationService in Python",
41 },
42 {
43 "API-First Design",
44 "Services communicate via well-defined APIs",
45 "REST, gRPC, or message queues",
46 },
47 {
48 "Stateless Services",
49 "Services don't maintain session state",
50 "All state in databases or caches",
51 },
52 {
53 "Observable",
54 "Services expose health, metrics, logs",
55 "Prometheus metrics, structured logging",
56 },
57 }
58
59 for i, p := range principles {
60 fmt.Printf("%d. %s\n", i+1, p.name)
61 fmt.Printf(" Definition: %s\n", p.description)
62 fmt.Printf(" Example: %s\n\n", p.example)
63 }
64
65 fmt.Println("Benefits:")
66 fmt.Println("- Scalability: Scale services independently")
67 fmt.Println("- Resilience: Isolated failures")
68 fmt.Println("- Flexibility: Choose best tech per service")
69 fmt.Println("- Team autonomy: Teams own services end-to-end")
70 fmt.Println("\nChallenges:")
71 fmt.Println("- Distributed complexity")
72 fmt.Println("- Data consistency")
73 fmt.Println("- Testing difficulty")
74 fmt.Println("- Operational overhead")
75}
Service Boundaries
1package main
2
3import (
4 "fmt"
5)
6
7// Example: E-commerce microservices
8
9type Service struct {
10 Name string
11 Responsibility string
12 Database string
13 APIs []string
14 Dependencies []string
15}
16
17func main() {
18 services := []Service{
19 {
20 Name: "UserService",
21 Responsibility: "User authentication and profile management",
22 Database: "users_db",
23 APIs: []string{"POST /users", "GET /users/:id", "PUT /users/:id"},
24 Dependencies: []string{},
25 },
26 {
27 Name: "ProductService",
28 Responsibility: "Product catalog and inventory",
29 Database: "products_db",
30 APIs: []string{"GET /products", "POST /products", "PUT /products/:id/stock"},
31 Dependencies: []string{},
32 },
33 {
34 Name: "OrderService",
35 Responsibility: "Order processing and management",
36 Database: "orders_db",
37 APIs: []string{"POST /orders", "GET /orders/:id", "GET /users/:id/orders"},
38 Dependencies: []string{"UserService", "ProductService", "PaymentService"},
39 },
40 {
41 Name: "PaymentService",
42 Responsibility: "Payment processing",
43 Database: "payments_db",
44 APIs: []string{"POST /payments", "GET /payments/:id"},
45 Dependencies: []string{"OrderService"},
46 },
47 {
48 Name: "NotificationService",
49 Responsibility: "Email and SMS notifications",
50 Database: "notifications_db",
51 APIs: []string{"POST /notifications"},
52 Dependencies: []string{},
53 },
54 }
55
56 fmt.Println("E-commerce Microservices Architecture")
57 fmt.Println("======================================\n")
58
59 for _, svc := range services {
60 fmt.Printf("Service: %s\n", svc.Name)
61 fmt.Printf(" Role: %s\n", svc.Responsibility)
62 fmt.Printf(" Database: %s\n", svc.Database)
63 fmt.Printf(" APIs: %v\n", svc.APIs)
64 if len(svc.Dependencies) > 0 {
65 fmt.Printf(" Depends on: %v\n", svc.Dependencies)
66 }
67 fmt.Println()
68 }
69}
Service Discovery
Imagine you're in a large shopping mall looking for a specific store. Instead of wandering around randomly, you check the directory map that shows exactly where each store is located and whether it's open. Service discovery works the same way - it's a directory that helps services find and communicate with each other dynamically.
Service discovery solves the fundamental problem: "How does Service A know where to find Service B when Service B might be running on multiple servers, scaling up and down, or even moving between data centers?"
Consul Service Discovery
1package discovery
2
3import (
4 "fmt"
5 "log"
6 "time"
7
8 consul "github.com/hashicorp/consul/api"
9)
10
11type ServiceRegistry struct {
12 client *consul.Client
13}
14
15func NewServiceRegistry(consulAddr string) {
16 config := consul.DefaultConfig()
17 config.Address = consulAddr
18
19 client, err := consul.NewClient(config)
20 if err != nil {
21 return nil, err
22 }
23
24 return &ServiceRegistry{client: client}, nil
25}
26
27// Register service with Consul
28func Register(
29 serviceID, serviceName, address string,
30 port int,
31 tags []string,
32) error {
33 registration := &consul.AgentServiceRegistration{
34 ID: serviceID,
35 Name: serviceName,
36 Address: address,
37 Port: port,
38 Tags: tags,
39 Check: &consul.AgentServiceCheck{
40 HTTP: fmt.Sprintf("http://%s:%d/health", address, port),
41 Interval: "10s",
42 Timeout: "3s",
43 DeregisterCriticalServiceAfter: "30s",
44 },
45 }
46
47 return sr.client.Agent().ServiceRegister(registration)
48}
49
50// Deregister service
51func Deregister(serviceID string) error {
52 return sr.client.Agent().ServiceDeregister(serviceID)
53}
54
55// Discover healthy services
56func Discover(serviceName string) {
57 services, _, err := sr.client.Health().Service(serviceName, "", true, nil)
58 return services, err
59}
60
61// Get service endpoint
62func GetServiceEndpoint(serviceName string) {
63 services, err := sr.Discover(serviceName)
64 if err != nil {
65 return "", err
66 }
67
68 if len(services) == 0 {
69 return "", fmt.Errorf("no healthy instances of %s found", serviceName)
70 }
71
72 // Simple round-robin
73 service := services[0]
74 endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
75
76 return endpoint, nil
77}
78
79// Example usage
80func main() {
81 registry, err := NewServiceRegistry("localhost:8500")
82 if err != nil {
83 log.Fatal(err)
84 }
85
86 // Register service
87 err = registry.Register(
88 "user-service-1",
89 "user-service",
90 "localhost",
91 8080,
92 []string{"api", "v1"},
93 )
94 if err != nil {
95 log.Fatal(err)
96 }
97
98 // Discover service
99 endpoint, err := registry.GetServiceEndpoint("order-service")
100 if err != nil {
101 log.Printf("Discovery failed: %v", err)
102 } else {
103 log.Printf("Order service endpoint: %s", endpoint)
104 }
105
106 // Deregister on shutdown
107 defer registry.Deregister("user-service-1")
108
109 // Keep service running
110 time.Sleep(1 * time.Hour)
111}
etcd Service Discovery
1package discovery
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 clientv3 "go.etcd.io/etcd/client/v3"
9)
10
11type EtcdRegistry struct {
12 client *clientv3.Client
13 lease clientv3.Lease
14}
15
16func NewEtcdRegistry(endpoints []string) {
17 client, err := clientv3.New(clientv3.Config{
18 Endpoints: endpoints,
19 DialTimeout: 5 * time.Second,
20 })
21 if err != nil {
22 return nil, err
23 }
24
25 return &EtcdRegistry{
26 client: client,
27 lease: clientv3.NewLease(client),
28 }, nil
29}
30
31func Register(ctx context.Context, serviceKey, serviceValue string, ttl int64) error {
32 // Create a lease
33 leaseResp, err := er.lease.Grant(ctx, ttl)
34 if err != nil {
35 return err
36 }
37
38 // Put service info with lease
39 _, err = er.client.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(leaseResp.ID))
40 if err != nil {
41 return err
42 }
43
44 // Keep alive
45 ch, err := er.lease.KeepAlive(ctx, leaseResp.ID)
46 if err != nil {
47 return err
48 }
49
50 // Consume keep alive responses
51 go func() {
52 for range ch {
53 // Keep alive response
54 }
55 }()
56
57 return nil
58}
59
60func Discover(ctx context.Context, servicePrefix string) {
61 resp, err := er.client.Get(ctx, servicePrefix, clientv3.WithPrefix())
62 if err != nil {
63 return nil, err
64 }
65
66 services := make(map[string]string)
67 for _, kv := range resp.Kvs {
68 services[string(kv.Key)] = string(kv.Value)
69 }
70
71 return services, nil
72}
73
74func Watch(ctx context.Context, servicePrefix string) clientv3.WatchChan {
75 return er.client.Watch(ctx, servicePrefix, clientv3.WithPrefix())
76}
77
78func Close() error {
79 return er.client.Close()
80}
Inter-Service Communication
Think of inter-service communication like different departments in a large company communicating. Some departments send formal memos, others have direct phone lines, and some use an internal messaging system. The choice depends on urgency, reliability needs, and the nature of the communication.
Services communicate via REST, gRPC, or message queues. The right choice depends on your specific needs:
REST APIs: Best for public-facing services, simple HTTP-based communication
gRPC: Ideal for internal service-to-service communication with high performance needs
Message Queues: Perfect for asynchronous communication and decoupling services
Communication Patterns Overview
1// run
2package main
3
4import (
5 "fmt"
6)
7
8type CommunicationPattern struct {
9 Name string
10 Protocol string
11 Style string
12 UseCases []string
13 Pros []string
14 Cons []string
15}
16
17func main() {
18 patterns := []CommunicationPattern{
19 {
20 Name: "Synchronous HTTP/REST",
21 Protocol: "HTTP/1.1 or HTTP/2",
22 Style: "Request-Response",
23 UseCases: []string{
24 "Public APIs",
25 "Simple CRUD operations",
26 "External integrations",
27 },
28 Pros: []string{
29 "Universal compatibility",
30 "Easy debugging",
31 "Browser support",
32 "Human readable",
33 },
34 Cons: []string{
35 "Higher latency",
36 "Text-based overhead",
37 "No streaming support (HTTP/1.1)",
38 },
39 },
40 {
41 Name: "gRPC",
42 Protocol: "HTTP/2",
43 Style: "Request-Response or Streaming",
44 UseCases: []string{
45 "Internal service-to-service",
46 "High-performance APIs",
47 "Real-time bidirectional streaming",
48 },
49 Pros: []string{
50 "Binary protocol (faster)",
51 "Strong typing with Protobuf",
52 "Bidirectional streaming",
53 "Code generation",
54 },
55 Cons: []string{
56 "Not browser-friendly",
57 "Learning curve",
58 "Binary format harder to debug",
59 },
60 },
61 {
62 Name: "Message Queues",
63 Protocol: "AMQP, MQTT, or proprietary",
64 Style: "Publish-Subscribe or Point-to-Point",
65 UseCases: []string{
66 "Event-driven architecture",
67 "Async processing",
68 "Fire-and-forget operations",
69 },
70 Pros: []string{
71 "Decoupling services",
72 "Guaranteed delivery",
73 "Load leveling",
74 "Replay capability",
75 },
76 Cons: []string{
77 "Eventual consistency",
78 "Additional infrastructure",
79 "Complexity in debugging",
80 },
81 },
82 {
83 Name: "GraphQL",
84 Protocol: "HTTP",
85 Style: "Query-based",
86 UseCases: []string{
87 "Client-specific data needs",
88 "Aggregating multiple services",
89 "Mobile and web frontends",
90 },
91 Pros: []string{
92 "Flexible queries",
93 "Avoid over-fetching",
94 "Single endpoint",
95 "Strong typing",
96 },
97 Cons: []string{
98 "Complex caching",
99 "N+1 query problem",
100 "Learning curve",
101 },
102 },
103 }
104
105 fmt.Println("Inter-Service Communication Patterns")
106 fmt.Println("=" + string(make([]byte, 70)))
107 fmt.Println()
108
109 for _, pattern := range patterns {
110 fmt.Printf("Pattern: %s\n", pattern.Name)
111 fmt.Printf("Protocol: %s\n", pattern.Protocol)
112 fmt.Printf("Style: %s\n\n", pattern.Style)
113
114 fmt.Println("Use Cases:")
115 for _, uc := range pattern.UseCases {
116 fmt.Printf(" • %s\n", uc)
117 }
118 fmt.Println()
119
120 fmt.Println("Advantages:")
121 for _, pro := range pattern.Pros {
122 fmt.Printf(" ✓ %s\n", pro)
123 }
124 fmt.Println()
125
126 fmt.Println("Disadvantages:")
127 for _, con := range pattern.Cons {
128 fmt.Printf(" ✗ %s\n", con)
129 }
130 fmt.Println()
131 fmt.Println(string(make([]byte, 70)))
132 fmt.Println()
133 }
134
135 fmt.Println("Recommendation:")
136 fmt.Println(" • Internal services: gRPC for performance")
137 fmt.Println(" • Public APIs: REST for compatibility")
138 fmt.Println(" • Events/notifications: Message queues for reliability")
139 fmt.Println(" • Flexible client needs: GraphQL for data aggregation")
140}
gRPC Communication
1// user.proto
2syntax = "proto3";
3
4package user;
5option go_package = "github.com/yourproject/proto/user";
6
7message User {
8 string id = 1;
9 string username = 2;
10 string email = 3;
11 string role = 4;
12}
13
14message GetUserRequest {
15 string id = 1;
16}
17
18message GetUserResponse {
19 User user = 1;
20}
21
22message CreateUserRequest {
23 string username = 1;
24 string email = 2;
25 string password = 3;
26}
27
28message CreateUserResponse {
29 User user = 1;
30}
31
32service UserService {
33 rpc GetUser(GetUserRequest) returns;
34 rpc CreateUser(CreateUserRequest) returns;
35}
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "net"
8
9 "google.golang.org/grpc"
10 pb "github.com/yourproject/proto/user"
11)
12
13// Server implementation
14type userServiceServer struct {
15 pb.UnimplementedUserServiceServer
16}
17
18func GetUser(ctx context.Context, req *pb.GetUserRequest) {
19 // Fetch user from database
20 user := &pb.User{
21 Id: req.Id,
22 Username: "john_doe",
23 Email: "john@example.com",
24 Role: "user",
25 }
26
27 return &pb.GetUserResponse{User: user}, nil
28}
29
30func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
31 // Create user in database
32 user := &pb.User{
33 Id: "new-id",
34 Username: req.Username,
35 Email: req.Email,
36 Role: "user",
37 }
38
39 return &pb.CreateUserResponse{User: user}, nil
40}
41
42// gRPC server
43func runServer() {
44 lis, err := net.Listen("tcp", ":50051")
45 if err != nil {
46 log.Fatalf("Failed to listen: %v", err)
47 }
48
49 s := grpc.NewServer()
50 pb.RegisterUserServiceServer(s, &userServiceServer{})
51
52 log.Println("User service listening on :50051")
53 if err := s.Serve(lis); err != nil {
54 log.Fatalf("Failed to serve: %v", err)
55 }
56}
57
58// gRPC client
59func createClient() {
60 conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
61 if err != nil {
62 return nil, nil, err
63 }
64
65 client := pb.NewUserServiceClient(conn)
66 return client, conn, nil
67}
68
69func main() {
70 // Start server in goroutine
71 go runServer()
72
73 // Wait for server to start
74 time.Sleep(1 * time.Second)
75
76 // Create client
77 client, conn, err := createClient()
78 if err != nil {
79 log.Fatal(err)
80 }
81 defer conn.Close()
82
83 // Call service
84 ctx := context.Background()
85 resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: "123"})
86 if err != nil {
87 log.Fatal(err)
88 }
89
90 fmt.Printf("User: %+v\n", resp.User)
91}
HTTP REST Communication
1package client
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "time"
11)
12
13type HTTPClient struct {
14 baseURL string
15 httpClient *http.Client
16}
17
18func NewHTTPClient(baseURL string) *HTTPClient {
19 return &HTTPClient{
20 baseURL: baseURL,
21 httpClient: &http.Client{
22 Timeout: 10 * time.Second,
23 Transport: &http.Transport{
24 MaxIdleConns: 100,
25 MaxIdleConnsPerHost: 10,
26 IdleConnTimeout: 90 * time.Second,
27 },
28 },
29 }
30}
31
32func Get(ctx context.Context, path string, result interface{}) error {
33 req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
34 if err != nil {
35 return err
36 }
37
38 resp, err := c.httpClient.Do(req)
39 if err != nil {
40 return err
41 }
42 defer resp.Body.Close()
43
44 if resp.StatusCode != http.StatusOK {
45 body, _ := io.ReadAll(resp.Body)
46 return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
47 }
48
49 return json.NewDecoder(resp.Body).Decode(result)
50}
51
52func Post(ctx context.Context, path string, body, result interface{}) error {
53 jsonData, err := json.Marshal(body)
54 if err != nil {
55 return err
56 }
57
58 req, err := http.NewRequestWithContext(
59 ctx,
60 "POST",
61 c.baseURL+path,
62 bytes.NewBuffer(jsonData),
63 )
64 if err != nil {
65 return err
66 }
67
68 req.Header.Set("Content-Type", "application/json")
69
70 resp, err := c.httpClient.Do(req)
71 if err != nil {
72 return err
73 }
74 defer resp.Body.Close()
75
76 if resp.StatusCode < 200 || resp.StatusCode >= 300 {
77 respBody, _ := io.ReadAll(resp.Body)
78 return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody))
79 }
80
81 if result != nil {
82 return json.NewDecoder(resp.Body).Decode(result)
83 }
84
85 return nil
86}
87
88// Example: Order service calling User service
89type UserServiceClient struct {
90 client *HTTPClient
91}
92
93func NewUserServiceClient(baseURL string) *UserServiceClient {
94 return &UserServiceClient{
95 client: NewHTTPClient(baseURL),
96 }
97}
98
99type User struct {
100 ID string `json:"id"`
101 Username string `json:"username"`
102 Email string `json:"email"`
103}
104
105func GetUser(ctx context.Context, userID string) {
106 var user User
107 err := c.client.Get(ctx, "/users/"+userID, &user)
108 return &user, err
109}
110
111func CreateUser(ctx context.Context, username, email string) {
112 body := map[string]string{
113 "username": username,
114 "email": email,
115 }
116
117 var user User
118 err := c.client.Post(ctx, "/users", body, &user)
119 return &user, err
120}
Message Queue Communication
1package messaging
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/streadway/amqp"
11)
12
13// Event represents a domain event
14type Event struct {
15 ID string `json:"id"`
16 Type string `json:"type"`
17 Timestamp time.Time `json:"timestamp"`
18 Data map[string]interface{} `json:"data"`
19 Metadata map[string]string `json:"metadata"`
20}
21
22// MessageBroker handles message publishing and consumption
23type MessageBroker struct {
24 conn *amqp.Connection
25 channel *amqp.Channel
26}
27
28func NewMessageBroker(url string) (*MessageBroker, error) {
29 conn, err := amqp.Dial(url)
30 if err != nil {
31 return nil, err
32 }
33
34 channel, err := conn.Channel()
35 if err != nil {
36 return nil, err
37 }
38
39 return &MessageBroker{
40 conn: conn,
41 channel: channel,
42 }, nil
43}
44
45// DeclareExchange creates an exchange for pub/sub
46func (mb *MessageBroker) DeclareExchange(name, kind string) error {
47 return mb.channel.ExchangeDeclare(
48 name, // name
49 kind, // type: direct, fanout, topic, headers
50 true, // durable
51 false, // auto-deleted
52 false, // internal
53 false, // no-wait
54 nil, // arguments
55 )
56}
57
58// Publish publishes an event to an exchange
59func (mb *MessageBroker) Publish(exchange, routingKey string, event Event) error {
60 body, err := json.Marshal(event)
61 if err != nil {
62 return err
63 }
64
65 return mb.channel.Publish(
66 exchange, // exchange
67 routingKey, // routing key
68 false, // mandatory
69 false, // immediate
70 amqp.Publishing{
71 ContentType: "application/json",
72 Body: body,
73 DeliveryMode: amqp.Persistent,
74 Timestamp: time.Now(),
75 },
76 )
77}
78
79// Subscribe subscribes to events from an exchange
80func (mb *MessageBroker) Subscribe(
81 exchange, queueName, routingKey string,
82 handler func(Event) error,
83) error {
84 // Declare queue
85 queue, err := mb.channel.QueueDeclare(
86 queueName, // name
87 true, // durable
88 false, // delete when unused
89 false, // exclusive
90 false, // no-wait
91 nil, // arguments
92 )
93 if err != nil {
94 return err
95 }
96
97 // Bind queue to exchange
98 err = mb.channel.QueueBind(
99 queue.Name, // queue name
100 routingKey, // routing key
101 exchange, // exchange
102 false,
103 nil,
104 )
105 if err != nil {
106 return err
107 }
108
109 // Consume messages
110 msgs, err := mb.channel.Consume(
111 queue.Name, // queue
112 "", // consumer
113 false, // auto-ack
114 false, // exclusive
115 false, // no-local
116 false, // no-wait
117 nil, // args
118 )
119 if err != nil {
120 return err
121 }
122
123 // Process messages in goroutine
124 go func() {
125 for msg := range msgs {
126 var event Event
127 if err := json.Unmarshal(msg.Body, &event); err != nil {
128 log.Printf("Failed to unmarshal event: %v", err)
129 msg.Nack(false, false) // Discard malformed message
130 continue
131 }
132
133 if err := handler(event); err != nil {
134 log.Printf("Handler error: %v", err)
135 msg.Nack(false, true) // Requeue on error
136 } else {
137 msg.Ack(false) // Acknowledge success
138 }
139 }
140 }()
141
142 return nil
143}
144
145func (mb *MessageBroker) Close() error {
146 if err := mb.channel.Close(); err != nil {
147 return err
148 }
149 return mb.conn.Close()
150}
151
152// Example: Order service publishes events
153type OrderService struct {
154 broker *MessageBroker
155}
156
157func NewOrderService(broker *MessageBroker) *OrderService {
158 return &OrderService{broker: broker}
159}
160
161func (os *OrderService) CreateOrder(ctx context.Context, userID, productID string) error {
162 // Create order in database
163 orderID := "order-123"
164
165 // Publish event
166 event := Event{
167 ID: "event-456",
168 Type: "order.created",
169 Timestamp: time.Now(),
170 Data: map[string]interface{}{
171 "order_id": orderID,
172 "user_id": userID,
173 "product_id": productID,
174 },
175 Metadata: map[string]string{
176 "service": "order-service",
177 },
178 }
179
180 return os.broker.Publish("orders", "order.created", event)
181}
182
183// Example: Notification service subscribes to events
184type NotificationService struct {
185 broker *MessageBroker
186}
187
188func NewNotificationService(broker *MessageBroker) *NotificationService {
189 return &NotificationService{broker: broker}
190}
191
192func (ns *NotificationService) Start() error {
193 return ns.broker.Subscribe(
194 "orders",
195 "notification-queue",
196 "order.*",
197 ns.handleOrderEvent,
198 )
199}
200
201func (ns *NotificationService) handleOrderEvent(event Event) error {
202 log.Printf("Processing event: %s", event.Type)
203
204 switch event.Type {
205 case "order.created":
206 userID := event.Data["user_id"].(string)
207 orderID := event.Data["order_id"].(string)
208 return ns.sendOrderConfirmation(userID, orderID)
209
210 case "order.completed":
211 userID := event.Data["user_id"].(string)
212 return ns.sendCompletionNotification(userID)
213
214 default:
215 log.Printf("Unknown event type: %s", event.Type)
216 }
217
218 return nil
219}
220
221func (ns *NotificationService) sendOrderConfirmation(userID, orderID string) error {
222 log.Printf("Sending order confirmation to user %s for order %s", userID, orderID)
223 // Send email/SMS
224 return nil
225}
226
227func (ns *NotificationService) sendCompletionNotification(userID string) error {
228 log.Printf("Sending completion notification to user %s", userID)
229 return nil
230}
API Composition and BFF Pattern
API composition aggregates data from multiple microservices to reduce client complexity. The Backend for Frontend (BFF) pattern creates specialized backends optimized for different client types (mobile, web, IoT).
Think of BFF like having different waiters in a restaurant - one waiter specializes in explaining menu items to tourists (web clients), while another efficiently serves regulars who know exactly what they want (mobile clients).
💡 Key Takeaway: BFF pattern eliminates the "one-size-fits-all" API problem by providing client-specific APIs that aggregate and transform data optimally for each platform.
API Composition Example
1package composition
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "sync"
9 "time"
10)
11
12// CompositeResponse aggregates data from multiple services
13type CompositeResponse struct {
14 User *User `json:"user"`
15 Orders []Order `json:"orders"`
16 Profile *Profile `json:"profile"`
17 Stats *Stats `json:"stats"`
18 Errors []string `json:"errors,omitempty"`
19}
20
21type User struct {
22 ID string `json:"id"`
23 Username string `json:"username"`
24 Email string `json:"email"`
25}
26
27type Order struct {
28 ID string `json:"id"`
29 ProductName string `json:"product_name"`
30 Total float64 `json:"total"`
31 Status string `json:"status"`
32}
33
34type Profile struct {
35 Bio string `json:"bio"`
36 Interests []string `json:"interests"`
37}
38
39type Stats struct {
40 TotalOrders int `json:"total_orders"`
41 TotalSpent float64 `json:"total_spent"`
42 MemberSince string `json:"member_since"`
43 LoyaltyPoints int `json:"loyalty_points"`
44}
45
46// APIComposer aggregates data from multiple services
47type APIComposer struct {
48 userClient *HTTPClient
49 orderClient *HTTPClient
50 profileClient *HTTPClient
51 timeout time.Duration
52}
53
54func NewAPIComposer(timeout time.Duration) *APIComposer {
55 return &APIComposer{
56 userClient: NewHTTPClient("http://user-service:8080"),
57 orderClient: NewHTTPClient("http://order-service:8081"),
58 profileClient: NewHTTPClient("http://profile-service:8082"),
59 timeout: timeout,
60 }
61}
62
63// GetUserDashboard aggregates user data from multiple services in parallel
64func (ac *APIComposer) GetUserDashboard(ctx context.Context, userID string) (*CompositeResponse, error) {
65 ctx, cancel := context.WithTimeout(ctx, ac.timeout)
66 defer cancel()
67
68 result := &CompositeResponse{
69 Errors: []string{},
70 }
71
72 // Use WaitGroup for concurrent requests
73 var wg sync.WaitGroup
74 var mu sync.Mutex
75
76 // Fetch user info
77 wg.Add(1)
78 go func() {
79 defer wg.Done()
80 user, err := ac.fetchUser(ctx, userID)
81 mu.Lock()
82 defer mu.Unlock()
83 if err != nil {
84 result.Errors = append(result.Errors, fmt.Sprintf("user service: %v", err))
85 } else {
86 result.User = user
87 }
88 }()
89
90 // Fetch orders
91 wg.Add(1)
92 go func() {
93 defer wg.Done()
94 orders, err := ac.fetchOrders(ctx, userID)
95 mu.Lock()
96 defer mu.Unlock()
97 if err != nil {
98 result.Errors = append(result.Errors, fmt.Sprintf("order service: %v", err))
99 } else {
100 result.Orders = orders
101 }
102 }()
103
104 // Fetch profile
105 wg.Add(1)
106 go func() {
107 defer wg.Done()
108 profile, err := ac.fetchProfile(ctx, userID)
109 mu.Lock()
110 defer mu.Unlock()
111 if err != nil {
112 result.Errors = append(result.Errors, fmt.Sprintf("profile service: %v", err))
113 } else {
114 result.Profile = profile
115 }
116 }()
117
118 // Fetch stats
119 wg.Add(1)
120 go func() {
121 defer wg.Done()
122 stats, err := ac.fetchStats(ctx, userID)
123 mu.Lock()
124 defer mu.Unlock()
125 if err != nil {
126 result.Errors = append(result.Errors, fmt.Sprintf("stats calculation: %v", err))
127 } else {
128 result.Stats = stats
129 }
130 }()
131
132 wg.Wait()
133
134 return result, nil
135}
136
137func (ac *APIComposer) fetchUser(ctx context.Context, userID string) (*User, error) {
138 var user User
139 err := ac.userClient.Get(ctx, fmt.Sprintf("/users/%s", userID), &user)
140 return &user, err
141}
142
143func (ac *APIComposer) fetchOrders(ctx context.Context, userID string) ([]Order, error) {
144 var orders []Order
145 err := ac.orderClient.Get(ctx, fmt.Sprintf("/users/%s/orders", userID), &orders)
146 return orders, err
147}
148
149func (ac *APIComposer) fetchProfile(ctx context.Context, userID string) (*Profile, error) {
150 var profile Profile
151 err := ac.profileClient.Get(ctx, fmt.Sprintf("/profiles/%s", userID), &profile)
152 return &profile, err
153}
154
155func (ac *APIComposer) fetchStats(ctx context.Context, userID string) (*Stats, error) {
156 var stats Stats
157 err := ac.orderClient.Get(ctx, fmt.Sprintf("/users/%s/stats", userID), &stats)
158 return &stats, err
159}
160
161// HTTP Handler for composition endpoint
162func (ac *APIComposer) HandleDashboard(w http.ResponseWriter, r *http.Request) {
163 userID := r.URL.Query().Get("user_id")
164 if userID == "" {
165 http.Error(w, "user_id required", http.StatusBadRequest)
166 return
167 }
168
169 response, err := ac.GetUserDashboard(r.Context(), userID)
170 if err != nil {
171 http.Error(w, err.Error(), http.StatusInternalServerError)
172 return
173 }
174
175 w.Header().Set("Content-Type", "application/json")
176 json.NewEncoder(w).Encode(response)
177}
BFF Pattern Implementation
1package bff
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "time"
9)
10
11// MobileBFF provides bandwidth-optimized API for mobile clients
12type MobileBFF struct {
13 composer *APIComposer
14}
15
16func NewMobileBFF(composer *APIComposer) *MobileBFF {
17 return &MobileBFF{composer: composer}
18}
19
20// Mobile response is streamlined
21type MobileDashboardResponse struct {
22 UserName string `json:"user_name"`
23 RecentOrders []MobileOrder `json:"recent_orders"`
24 QuickStats MobileStats `json:"quick_stats"`
25 Notifications []Notification `json:"notifications"`
26}
27
28type MobileOrder struct {
29 ID string `json:"id"`
30 Title string `json:"title"`
31 Total float64 `json:"total"`
32 Status string `json:"status"`
33}
34
35type MobileStats struct {
36 OrderCount int `json:"order_count"`
37 Points int `json:"points"`
38}
39
40type Notification struct {
41 Message string `json:"message"`
42 Type string `json:"type"`
43}
44
45func (bff *MobileBFF) HandleDashboard(w http.ResponseWriter, r *http.Request) {
46 userID := r.URL.Query().Get("user_id")
47
48 // Fetch full composite data
49 data, err := bff.composer.GetUserDashboard(r.Context(), userID)
50 if err != nil {
51 http.Error(w, err.Error(), http.StatusInternalServerError)
52 return
53 }
54
55 // Transform to mobile-optimized format
56 response := MobileDashboardResponse{
57 UserName: data.User.Username,
58 RecentOrders: bff.transformOrders(data.Orders),
59 QuickStats: MobileStats{
60 OrderCount: data.Stats.TotalOrders,
61 Points: data.Stats.LoyaltyPoints,
62 },
63 Notifications: bff.buildNotifications(data),
64 }
65
66 w.Header().Set("Content-Type", "application/json")
67 json.NewEncoder(w).Encode(response)
68}
69
70func (bff *MobileBFF) transformOrders(orders []Order) []MobileOrder {
71 // Return only recent 5 orders with minimal data
72 mobileOrders := make([]MobileOrder, 0, 5)
73 for i, order := range orders {
74 if i >= 5 {
75 break
76 }
77 mobileOrders = append(mobileOrders, MobileOrder{
78 ID: order.ID,
79 Title: order.ProductName,
80 Total: order.Total,
81 Status: order.Status,
82 })
83 }
84 return mobileOrders
85}
86
87func (bff *MobileBFF) buildNotifications(data *CompositeResponse) []Notification {
88 notifications := []Notification{}
89
90 // Check for pending orders
91 for _, order := range data.Orders {
92 if order.Status == "pending" {
93 notifications = append(notifications, Notification{
94 Message: fmt.Sprintf("Order %s is being processed", order.ID),
95 Type: "info",
96 })
97 }
98 }
99
100 // Check loyalty points milestone
101 if data.Stats.LoyaltyPoints >= 1000 {
102 notifications = append(notifications, Notification{
103 Message: "You've earned 1000 loyalty points!",
104 Type: "achievement",
105 })
106 }
107
108 return notifications
109}
110
111// WebBFF provides feature-rich API for web clients
112type WebBFF struct {
113 composer *APIComposer
114}
115
116func NewWebBFF(composer *APIComposer) *WebBFF {
117 return &WebBFF{composer: composer}
118}
119
120// Web response includes full details
121type WebDashboardResponse struct {
122 User *User `json:"user"`
123 Orders []EnrichedOrder `json:"orders"`
124 Profile *Profile `json:"profile"`
125 Stats *DetailedStats `json:"stats"`
126 Recommendations []Product `json:"recommendations"`
127}
128
129type EnrichedOrder struct {
130 Order
131 EstimatedDelivery time.Time `json:"estimated_delivery"`
132 TrackingURL string `json:"tracking_url"`
133}
134
135type DetailedStats struct {
136 Stats
137 SpendingTrend []SpendingDataPoint `json:"spending_trend"`
138 TopCategories []CategorySpending `json:"top_categories"`
139}
140
141type SpendingDataPoint struct {
142 Month string `json:"month"`
143 Amount float64 `json:"amount"`
144}
145
146type CategorySpending struct {
147 Category string `json:"category"`
148 Amount float64 `json:"amount"`
149}
150
151type Product struct {
152 ID string `json:"id"`
153 Name string `json:"name"`
154 Price float64 `json:"price"`
155}
156
157func (bff *WebBFF) HandleDashboard(w http.ResponseWriter, r *http.Request) {
158 userID := r.URL.Query().Get("user_id")
159
160 data, err := bff.composer.GetUserDashboard(r.Context(), userID)
161 if err != nil {
162 http.Error(w, err.Error(), http.StatusInternalServerError)
163 return
164 }
165
166 recommendations, _ := bff.fetchRecommendations(r.Context(), userID)
167
168 response := WebDashboardResponse{
169 User: data.User,
170 Orders: bff.enrichOrders(data.Orders),
171 Profile: data.Profile,
172 Stats: bff.enrichStats(data.Stats),
173 Recommendations: recommendations,
174 }
175
176 w.Header().Set("Content-Type", "application/json")
177 json.NewEncoder(w).Encode(response)
178}
179
180func (bff *WebBFF) enrichOrders(orders []Order) []EnrichedOrder {
181 enriched := make([]EnrichedOrder, len(orders))
182 for i, order := range orders {
183 enriched[i] = EnrichedOrder{
184 Order: order,
185 EstimatedDelivery: time.Now().Add(3 * 24 * time.Hour),
186 TrackingURL: fmt.Sprintf("https://tracking.example.com/%s", order.ID),
187 }
188 }
189 return enriched
190}
191
192func (bff *WebBFF) enrichStats(stats *Stats) *DetailedStats {
193 return &DetailedStats{
194 Stats: *stats,
195 SpendingTrend: []SpendingDataPoint{
196 {Month: "Jan", Amount: 120.50},
197 {Month: "Feb", Amount: 89.20},
198 {Month: "Mar", Amount: 145.75},
199 },
200 TopCategories: []CategorySpending{
201 {Category: "Electronics", Amount: 500.00},
202 {Category: "Books", Amount: 125.00},
203 },
204 }
205}
206
207func (bff *WebBFF) fetchRecommendations(ctx context.Context, userID string) ([]Product, error) {
208 return []Product{
209 {ID: "prod-1", Name: "Recommended Product", Price: 29.99},
210 }, nil
211}
API Gateway Pattern
API Gateway provides a single entry point for clients and handles cross-cutting concerns like authentication, rate limiting, and request routing. It acts as a reception desk for your microservices, directing requests and managing security.
💡 Key Takeaway: An API Gateway simplifies client communication by providing a unified interface while hiding the complexity of your microservices architecture.
📖 Complete Guide: For comprehensive coverage of API Gateway patterns including implementation details, authentication strategies, rate limiting, load balancing, circuit breaking, and production best practices, see API Gateway Patterns.
Key API Gateway Features
Core Capabilities:
- Request Routing: Direct requests to appropriate microservices
- Authentication & Authorization: Centralized security enforcement
- Rate Limiting: Protect services from overload
- Load Balancing: Distribute traffic across service instances
- Circuit Breaking: Prevent cascading failures
- Request/Response Transformation: Adapt protocols and formats
- Aggregation: Combine multiple service responses
Common Patterns:
- Single Entry Point: Unified API surface for clients
- Protocol Translation: REST ↔ gRPC ↔ GraphQL
- Service Composition: Aggregate responses from multiple services
- Edge Security: SSL termination, WAF integration
- Analytics & Monitoring: Request tracking, response metrics
Integration Example
In microservices architecture, the API Gateway sits between clients and backend services:
Client → API Gateway → Multiple Microservices
For detailed implementation examples, middleware patterns, and production configurations, see the dedicated API Gateway tutorial.
Distributed Transactions with Saga
Imagine you're planning a vacation that involves booking a flight, reserving a hotel, and renting a car. If any of these bookings fails, you need to cancel the others to avoid paying for services you can't use. A saga pattern works exactly like this travel planning - it coordinates multiple services and can roll back changes if something goes wrong.
Traditional database transactions don't work across multiple services. The Saga pattern manages distributed transactions by breaking them into a series of smaller transactions, each with a compensating action to undo changes if needed.
Saga pattern manages distributed transactions across microservices.
⚠️ Important: Sagas provide eventual consistency, not immediate consistency. There will be brief periods where the system is in an intermediate state during the saga execution.
Saga Orchestration
1package saga
2
3import (
4 "context"
5 "fmt"
6 "log"
7)
8
9type SagaStep struct {
10 Name string
11 Execute func(ctx context.Context, data interface{})
12 Compensate func(ctx context.Context, data interface{}) error
13 ExecuteData interface{}
14 CompensateData interface{}
15}
16
17type Saga struct {
18 steps []SagaStep
19 executedSteps []int
20}
21
22func NewSaga() *Saga {
23 return &Saga{
24 steps: []SagaStep{},
25 executedSteps: []int{},
26 }
27}
28
29func AddStep(step SagaStep) {
30 s.steps = append(s.steps, step)
31}
32
33func Execute(ctx context.Context) error {
34 for i, step := range s.steps {
35 log.Printf("Executing step: %s", step.Name)
36
37 result, err := step.Execute(ctx, step.ExecuteData)
38 if err != nil {
39 log.Printf("Step %s failed: %v", step.Name, err)
40 s.compensate(ctx)
41 return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
42 }
43
44 // Store result for compensation if needed
45 s.steps[i].CompensateData = result
46 s.executedSteps = append(s.executedSteps, i)
47
48 log.Printf("Step %s completed successfully", step.Name)
49 }
50
51 log.Println("Saga completed successfully")
52 return nil
53}
54
55func compensate(ctx context.Context) {
56 log.Println("Starting compensation...")
57
58 // Compensate in reverse order
59 for i := len(s.executedSteps) - 1; i >= 0; i-- {
60 stepIndex := s.executedSteps[i]
61 step := s.steps[stepIndex]
62
63 log.Printf("Compensating step: %s", step.Name)
64
65 if err := step.Compensate(ctx, step.CompensateData); err != nil {
66 log.Printf("Compensation failed for %s: %v", step.Name, err)
67 // Log but continue compensating other steps
68 }
69 }
70
71 log.Println("Compensation completed")
72}
73
74// Example: Order creation saga
75type OrderSaga struct {
76 orderID string
77 userID string
78 productID string
79 quantity int
80}
81
82func CreateOrderSaga(userID, productID string, quantity int) *Saga {
83 saga := NewSaga()
84
85 // Step 1: Reserve inventory
86 saga.AddStep(SagaStep{
87 Name: "ReserveInventory",
88 Execute: func(ctx context.Context, data interface{}) {
89 // Call inventory service
90 log.Printf("Reserving %d units of product %s", quantity, productID)
91 // Return reservation ID
92 return "reservation-123", nil
93 },
94 Compensate: func(ctx context.Context, data interface{}) error {
95 reservationID := data.(string)
96 log.Printf("Releasing inventory reservation %s", reservationID)
97 // Call inventory service to release
98 return nil
99 },
100 })
101
102 // Step 2: Process payment
103 saga.AddStep(SagaStep{
104 Name: "ProcessPayment",
105 Execute: func(ctx context.Context, data interface{}) {
106 log.Printf("Processing payment for user %s", userID)
107 // Call payment service
108 return "payment-456", nil
109 },
110 Compensate: func(ctx context.Context, data interface{}) error {
111 paymentID := data.(string)
112 log.Printf("Refunding payment %s", paymentID)
113 // Call payment service to refund
114 return nil
115 },
116 })
117
118 // Step 3: Create order
119 saga.AddStep(SagaStep{
120 Name: "CreateOrder",
121 Execute: func(ctx context.Context, data interface{}) {
122 log.Printf("Creating order for user %s", userID)
123 // Call order service
124 return "order-789", nil
125 },
126 Compensate: func(ctx context.Context, data interface{}) error {
127 orderID := data.(string)
128 log.Printf("Cancelling order %s", orderID)
129 // Call order service to cancel
130 return nil
131 },
132 })
133
134 // Step 4: Send notification
135 saga.AddStep(SagaStep{
136 Name: "SendNotification",
137 Execute: func(ctx context.Context, data interface{}) {
138 log.Printf("Sending order confirmation to user %s", userID)
139 // Call notification service
140 return nil, nil
141 },
142 Compensate: func(ctx context.Context, data interface{}) error {
143 // Notifications don't need compensation
144 return nil
145 },
146 })
147
148 return saga
149}
150
151func main() {
152 ctx := context.Background()
153
154 saga := CreateOrderSaga("user-123", "product-456", 2)
155
156 if err := saga.Execute(ctx); err != nil {
157 log.Printf("Order creation failed: %v", err)
158 }
159}
Circuit Breakers and Retries
Think of a circuit breaker like the safety mechanisms in your home's electrical system. When too much current flows through a circuit, the breaker trips to prevent damage to your appliances. In microservices, when a service starts failing repeatedly, the circuit breaker "trips" to prevent cascading failures throughout your system.
Circuit breakers prevent cascading failures and retries handle transient errors.
Common Pitfalls:
- Setting retry limits too low can cause unnecessary failures
- Not implementing exponential backoff can overwhelm struggling services
- Forgetting to configure timeouts can cause cascading delays
💡 Key Takeaway: Circuit breakers and retries work together to create resilient systems that can handle partial failures without crashing completely.
Circuit Breaker Implementation
1package resilience
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9)
10
11type CircuitState int
12
13const (
14 StateClosed CircuitState = iota
15 StateOpen
16 StateHalfOpen
17)
18
19type CircuitBreaker struct {
20 name string
21 maxFailures int
22 timeout time.Duration
23 resetTimeout time.Duration
24 state CircuitState
25 failures int
26 lastFailTime time.Time
27 successCount int
28 halfOpenMax int
29 mu sync.RWMutex
30 onStateChange func(from, to CircuitState)
31}
32
33func NewCircuitBreaker(name string, maxFailures int, timeout, resetTimeout time.Duration) *CircuitBreaker {
34 return &CircuitBreaker{
35 name: name,
36 maxFailures: maxFailures,
37 timeout: timeout,
38 resetTimeout: resetTimeout,
39 state: StateClosed,
40 halfOpenMax: 3,
41 }
42}
43
44func Execute(ctx context.Context, fn func() error) error {
45 if err := cb.beforeRequest(); err != nil {
46 return err
47 }
48
49 err := fn()
50
51 cb.afterRequest(err)
52
53 return err
54}
55
56func beforeRequest() error {
57 cb.mu.Lock()
58 defer cb.mu.Unlock()
59
60 switch cb.state {
61 case StateOpen:
62 if time.Since(cb.lastFailTime) > cb.resetTimeout {
63 cb.setState(StateHalfOpen)
64 return nil
65 }
66 return fmt.Errorf("circuit breaker %s is open", cb.name)
67
68 case StateHalfOpen:
69 if cb.successCount >= cb.halfOpenMax {
70 cb.setState(StateClosed)
71 }
72 return nil
73
74 default:
75 return nil
76 }
77}
78
79func afterRequest(err error) {
80 cb.mu.Lock()
81 defer cb.mu.Unlock()
82
83 if err != nil {
84 cb.failures++
85 cb.lastFailTime = time.Now()
86
87 if cb.state == StateHalfOpen {
88 cb.setState(StateOpen)
89 } else if cb.failures >= cb.maxFailures {
90 cb.setState(StateOpen)
91 }
92 } else {
93 if cb.state == StateHalfOpen {
94 cb.successCount++
95 if cb.successCount >= cb.halfOpenMax {
96 cb.setState(StateClosed)
97 }
98 } else {
99 cb.failures = 0
100 }
101 }
102}
103
104func setState(newState CircuitState) {
105 oldState := cb.state
106 cb.state = newState
107 cb.failures = 0
108 cb.successCount = 0
109
110 fmt.Printf("Circuit breaker %s: %v -> %v\n", cb.name, stateString(oldState), stateString(newState))
111
112 if cb.onStateChange != nil {
113 cb.onStateChange(oldState, newState)
114 }
115}
116
117func stateString(state CircuitState) string {
118 switch state {
119 case StateClosed:
120 return "CLOSED"
121 case StateOpen:
122 return "OPEN"
123 case StateHalfOpen:
124 return "HALF_OPEN"
125 default:
126 return "UNKNOWN"
127 }
128}
129
130func State() CircuitState {
131 cb.mu.RLock()
132 defer cb.mu.RUnlock()
133 return cb.state
134}
135
136// Retry with exponential backoff
137type RetryConfig struct {
138 MaxAttempts int
139 InitialDelay time.Duration
140 MaxDelay time.Duration
141 Multiplier float64
142}
143
144func Retry(ctx context.Context, config RetryConfig, fn func() error) error {
145 var err error
146 delay := config.InitialDelay
147
148 for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
149 err = fn()
150 if err == nil {
151 return nil
152 }
153
154 if attempt == config.MaxAttempts {
155 break
156 }
157
158 select {
159 case <-ctx.Done():
160 return ctx.Err()
161 case <-time.After(delay):
162 }
163
164 delay = time.Duration(float64(delay) * config.Multiplier)
165 if delay > config.MaxDelay {
166 delay = config.MaxDelay
167 }
168
169 fmt.Printf("Retry attempt %d after %v\n", attempt, delay)
170 }
171
172 return fmt.Errorf("max retries exceeded: %w", err)
173}
174
175// Example usage
176func main() {
177 cb := NewCircuitBreaker("user-service", 3, 5*time.Second, 10*time.Second)
178
179 // Simulate service calls
180 for i := 0; i < 10; i++ {
181 err := cb.Execute(context.Background(), func() error {
182 // Simulate failing service
183 if i < 5 {
184 return errors.New("service unavailable")
185 }
186 return nil
187 })
188
189 if err != nil {
190 fmt.Printf("Request %d failed: %v\n", i, err)
191 } else {
192 fmt.Printf("Request %d succeeded\n", i)
193 }
194
195 time.Sleep(1 * time.Second)
196 }
197
198 // Retry example
199 retryConfig := RetryConfig{
200 MaxAttempts: 3,
201 InitialDelay: 100 * time.Millisecond,
202 MaxDelay: 1 * time.Second,
203 Multiplier: 2.0,
204 }
205
206 err := Retry(context.Background(), retryConfig, func() error {
207 // Simulate operation
208 return errors.New("temporary error")
209 })
210
211 if err != nil {
212 fmt.Printf("Operation failed: %v\n", err)
213 }
214}
Health Checks and Service Registration
Imagine you're organizing a conference and need to know which speakers are ready and available to present. You'd periodically check in with each speaker to confirm they're prepared and their equipment is working. Health checks in microservices work the same way - they continuously verify that services are running correctly and ready to handle traffic.
Health checks ensure services are ready to handle traffic.
When to use Liveness vs Readiness Checks:
- Liveness: "Is the service alive?" - Restart if this fails
- Readiness: "Is the service ready to accept traffic?" - Don't send requests if this fails
Real-world Example: Kubernetes uses both liveness and readiness probes. A service might be alive but not ready. This prevents sending traffic to services that can't handle it.
Health Check Implementation
1package health
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "net/http"
9 "sync"
10 "time"
11)
12
13type HealthStatus string
14
15const (
16 StatusHealthy HealthStatus = "healthy"
17 StatusUnhealthy HealthStatus = "unhealthy"
18 StatusDegraded HealthStatus = "degraded"
19)
20
21type ComponentHealth struct {
22 Status HealthStatus `json:"status"`
23 Message string `json:"message,omitempty"`
24 Timestamp time.Time `json:"timestamp"`
25}
26
27type HealthCheck struct {
28 mu sync.RWMutex
29 components map[string]ComponentHealth
30 db *sql.DB
31}
32
33func NewHealthCheck(db *sql.DB) *HealthCheck {
34 return &HealthCheck{
35 components: make(map[string]ComponentHealth),
36 db: db,
37 }
38}
39
40func Check(ctx context.Context) map[string]ComponentHealth {
41 hc.mu.Lock()
42 defer hc.mu.Unlock()
43
44 // Check database
45 hc.components["database"] = hc.checkDatabase(ctx)
46
47 // Check external services
48 hc.components["user_service"] = hc.checkExternalService(ctx, "http://user-service:8080/health")
49
50 // Check Redis
51 // hc.components["redis"] = hc.checkRedis(ctx)
52
53 return hc.components
54}
55
56func checkDatabase(ctx context.Context) ComponentHealth {
57 ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
58 defer cancel()
59
60 err := hc.db.PingContext(ctx)
61 if err != nil {
62 return ComponentHealth{
63 Status: StatusUnhealthy,
64 Message: err.Error(),
65 Timestamp: time.Now(),
66 }
67 }
68
69 return ComponentHealth{
70 Status: StatusHealthy,
71 Timestamp: time.Now(),
72 }
73}
74
75func checkExternalService(ctx context.Context, url string) ComponentHealth {
76 ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
77 defer cancel()
78
79 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
80 if err != nil {
81 return ComponentHealth{
82 Status: StatusUnhealthy,
83 Message: err.Error(),
84 Timestamp: time.Now(),
85 }
86 }
87
88 resp, err := http.DefaultClient.Do(req)
89 if err != nil {
90 return ComponentHealth{
91 Status: StatusUnhealthy,
92 Message: err.Error(),
93 Timestamp: time.Now(),
94 }
95 }
96 defer resp.Body.Close()
97
98 if resp.StatusCode != http.StatusOK {
99 return ComponentHealth{
100 Status: StatusUnhealthy,
101 Message: fmt.Sprintf("HTTP %d", resp.StatusCode),
102 Timestamp: time.Now(),
103 }
104 }
105
106 return ComponentHealth{
107 Status: StatusHealthy,
108 Timestamp: time.Now(),
109 }
110}
111
112func OverallStatus() HealthStatus {
113 hc.mu.RLock()
114 defer hc.mu.RUnlock()
115
116 hasUnhealthy := false
117 hasDegraded := false
118
119 for _, comp := range hc.components {
120 switch comp.Status {
121 case StatusUnhealthy:
122 hasUnhealthy = true
123 case StatusDegraded:
124 hasDegraded = true
125 }
126 }
127
128 if hasUnhealthy {
129 return StatusUnhealthy
130 }
131 if hasDegraded {
132 return StatusDegraded
133 }
134 return StatusHealthy
135}
136
137// HTTP handlers
138func LivenessHandler(w http.ResponseWriter, r *http.Request) {
139 // Liveness: Is the service running?
140 w.WriteHeader(http.StatusOK)
141 w.Write([]byte("alive"))
142}
143
144func ReadinessHandler(w http.ResponseWriter, r *http.Request) {
145 // Readiness: Is the service ready to accept traffic?
146 components := hc.Check(r.Context())
147 overall := hc.OverallStatus()
148
149 status := http.StatusOK
150 if overall != StatusHealthy {
151 status = http.StatusServiceUnavailable
152 }
153
154 response := map[string]interface{}{
155 "status": overall,
156 "components": components,
157 "timestamp": time.Now(),
158 }
159
160 w.Header().Set("Content-Type", "application/json")
161 w.WriteHeader(status)
162 json.NewEncoder(w).Encode(response)
163}
164
165func main() {
166 // Setup database
167 db, _ := sql.Open("postgres", "connection-string")
168 defer db.Close()
169
170 // Create health checker
171 healthCheck := NewHealthCheck(db)
172
173 // Setup routes
174 http.HandleFunc("/health/live", healthCheck.LivenessHandler)
175 http.HandleFunc("/health/ready", healthCheck.ReadinessHandler)
176
177 http.ListenAndServe(":8080", nil)
178}
Distributed Tracing
Imagine you're tracking a package through a complex delivery system. At each checkpoint, the package is scanned, creating a complete trail of its journey. Distributed tracing works the same way - it follows a request as it travels through multiple services, creating a detailed map of its path and identifying any delays or errors.
Distributed tracing tracks requests across microservices.
Common Pitfalls:
- Too much tracing data can overwhelm your monitoring system
- Missing context propagation breaks the trace chain
- Inconsistent sampling can miss important issues
💡 Key Takeaway: Distributed tracing is essential for debugging microservices, but start with a sampling strategy that captures enough detail without overwhelming your systems.
OpenTelemetry Tracing
1package tracing
2
3import (
4 "context"
5 "fmt"
6 "log"
7
8 "go.opentelemetry.io/otel"
9 "go.opentelemetry.io/otel/attribute"
10 "go.opentelemetry.io/otel/exporters/jaeger"
11 "go.opentelemetry.io/otel/sdk/resource"
12 tracesdk "go.opentelemetry.io/otel/sdk/trace"
13 semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
14 "go.opentelemetry.io/otel/trace"
15)
16
17func InitTracer(serviceName, jaegerEndpoint string) {
18 // Create Jaeger exporter
19 exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerEndpoint)))
20 if err != nil {
21 return nil, err
22 }
23
24 // Create tracer provider
25 tp := tracesdk.NewTracerProvider(
26 tracesdk.WithBatcher(exporter),
27 tracesdk.WithResource(resource.NewWithAttributes(
28 semconv.SchemaURL,
29 semconv.ServiceNameKey.String(serviceName),
30 )),
31 )
32
33 otel.SetTracerProvider(tp)
34
35 return tp, nil
36}
37
38// Example: Tracing HTTP handler
39func TracedHandler(w http.ResponseWriter, r *http.Request) {
40 tracer := otel.Tracer("order-service")
41
42 ctx, span := tracer.Start(r.Context(), "HandleOrder")
43 defer span.End()
44
45 // Add attributes
46 span.SetAttributes(
47 attribute.String("user.id", "user-123"),
48 attribute.String("order.id", "order-456"),
49 )
50
51 // Call other services with traced context
52 user, err := getUserWithTrace(ctx, "user-123")
53 if err != nil {
54 span.RecordError(err)
55 http.Error(w, err.Error(), http.StatusInternalServerError)
56 return
57 }
58
59 product, err := getProductWithTrace(ctx, "product-789")
60 if err != nil {
61 span.RecordError(err)
62 http.Error(w, err.Error(), http.StatusInternalServerError)
63 return
64 }
65
66 // Process order
67 order := processOrder(ctx, user, product)
68
69 w.WriteHeader(http.StatusOK)
70 json.NewEncoder(w).Encode(order)
71}
72
73func getUserWithTrace(ctx context.Context, userID string) {
74 tracer := otel.Tracer("order-service")
75 ctx, span := tracer.Start(ctx, "GetUser")
76 defer span.End()
77
78 span.SetAttributes(attribute.String("user.id", userID))
79
80 // Make HTTP request with trace context
81 // Headers will include trace information
82 user, err := callUserService(ctx, userID)
83 if err != nil {
84 span.RecordError(err)
85 return nil, err
86 }
87
88 return user, nil
89}
90
91func getProductWithTrace(ctx context.Context, productID string) {
92 tracer := otel.Tracer("order-service")
93 ctx, span := tracer.Start(ctx, "GetProduct")
94 defer span.End()
95
96 span.SetAttributes(attribute.String("product.id", productID))
97
98 product, err := callProductService(ctx, productID)
99 if err != nil {
100 span.RecordError(err)
101 return nil, err
102 }
103
104 return product, nil
105}
106
107func processOrder(ctx context.Context, user *User, product *Product) *Order {
108 tracer := otel.Tracer("order-service")
109 _, span := tracer.Start(ctx, "ProcessOrder")
110 defer span.End()
111
112 // Business logic
113 order := &Order{
114 ID: "order-new",
115 UserID: user.ID,
116 ProductID: product.ID,
117 }
118
119 return order
120}
Practice Exercises
Exercise 1: E-Commerce Microservices Architecture
🎯 Learning Objectives:
- Design and implement a complete microservices architecture from scratch
- Understand service boundaries and responsibility separation
- Implement inter-service communication patterns
- Set up service discovery and API gateway patterns
- Apply microservices design patterns and best practices
⏱️ Time Estimate: 180-240 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: A retail startup needs to build a scalable e-commerce platform that can handle Black Friday traffic spikes. Microservices architecture enables independent scaling, deployment, and fault isolation across different business domains.
Task: Create a production-ready e-commerce system using microservices architecture that demonstrates core patterns and real-world implementation challenges.
Core Services to Implement:
- User Service: Authentication, authorization, profile management, JWT token generation
- Product Service: Product catalog, inventory management, search and filtering
- Order Service: Order processing workflow, order status tracking, order history
- Payment Service: Payment processing, payment method management, transaction logging
- Notification Service: Email notifications, SMS alerts, push notifications
- API Gateway: Request routing, authentication, rate limiting, request aggregation
Infrastructure Components:
- Service Discovery: Consul integration with health checks and service registration
- Inter-Service Communication: gRPC for synchronous calls, message queues for async
- Circuit Breakers: Fault tolerance and graceful degradation
- Health Checks: Comprehensive health monitoring for all services
- Distributed Tracing: Request tracking across service boundaries
- Configuration Management: Externalized configuration with environment overrides
Advanced Features:
- Database per service pattern with proper data isolation
- Event-driven communication for eventual consistency
- API versioning and backward compatibility
- Load testing and performance optimization
- Monitoring and observability setup
- Container orchestration with Docker Compose
Solution Outline
Architecture:
- API Gateway
- User Service
- Product Service
- Order Service
- Payment Service
- Notification Service
- Consul
- Jaeger
Communication Flow:
1. Client -> API Gateway
2. API Gateway -> User/Product/Order Services
3. Order Service -> Payment Service
4. Payment Service -> Notification Service
Each service:
- gRPC server implementation
- HTTP health endpoints
- Consul registration
- OpenTelemetry tracing
- Circuit breakers for external calls
- Structured logging
Exercise 2: Distributed Transactions with Saga Pattern
🎯 Learning Objectives:
- Understand the challenges of distributed transactions in microservices
- Implement saga pattern for managing long-running business transactions
- Design compensation actions for rolling back failed operations
- Build saga orchestrator vs choreography patterns
- Handle transaction state persistence and recovery
⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform needs to handle complex order processing that spans multiple services. Traditional ACID transactions don't work across services, requiring saga pattern for maintaining data consistency.
Task: Implement a comprehensive saga pattern solution for order processing that ensures data consistency across multiple microservices without using distributed transactions.
Core Saga Components:
- Saga Orchestrator: Central coordinator managing transaction lifecycle
- Compensation Actions: Rollback operations for each transaction step
- State Management: Persistent saga state tracking and recovery
- Timeout Handling: Automatic rollback for long-running operations
- Retry Logic: Configurable retry policies for failed operations
Order Processing Workflow:
- Inventory Reservation: Reserve items and lock stock
- Payment Validation: Validate payment method and process payment
- Order Creation: Create order record with status tracking
- Shipping Arrangement: Schedule shipping and generate tracking
- Notification Dispatch: Send confirmation emails and SMS
- Loyalty Points: Award customer loyalty points
Failure Scenarios:
- Payment gateway timeout
- Inventory out of stock after reservation
- Shipping service unavailable
- Email delivery failures
- Loyalty service errors
Advanced Patterns:
- Event-driven saga choreography
- Event sourcing for saga events
- Saga monitoring and observability
- Manual intervention for failed sagas
- Performance optimization and batch processing
Solution: See Saga implementation in article
Extend the saga example to include:
- Persistent saga state
- Saga coordinator service
- Event sourcing for saga steps
- Monitoring dashboard
- Retry logic for failed steps
Exercise 3: Service Discovery and Load Balancing
🎯 Learning Objectives:
- Build a comprehensive service discovery system from scratch
- Implement health checking mechanisms with configurable intervals
- Design load balancing algorithms for optimal traffic distribution
- Create service registration and deregistration workflows
- Handle service metadata and versioning for blue-green deployments
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A cloud-native application with dozens of microservices needs dynamic service discovery to handle container orchestration, auto-scaling, and blue-green deployments. Service discovery enables services to find and communicate with each other without hardcoded configurations.
Task: Create a production-ready service discovery system that manages service registration, health monitoring, and intelligent load balancing across dynamic service instances.
Core Discovery Components:
- Service Registry: Central repository of available service instances
- Health Checker: Active and passive health monitoring with custom endpoints
- Load Balancer: Multiple algorithms
- Service Watcher: Real-time notifications for service changes
- Metadata Manager: Service versioning, tags, and custom attributes
Health Monitoring Features:
- Configurable health check intervals and timeouts
- HTTP, TCP, and gRPC health check protocols
- Gradual traffic drain for service shutdowns
- Automatic service deregistration for unhealthy instances
- Circuit breaker integration for failing services
Load Balancing Algorithms:
- Round Robin: Even distribution across healthy instances
- Weighted Round Robin: Based on instance capacity or metadata
- Least Connections: Route to instance with fewest active connections
- Random: Simple random selection for testing
- IP Hash: Session affinity for stateful services
Advanced Features:
- Service versioning and traffic splitting
- Geographic and region-based routing
- Service dependencies and topology mapping
- Performance metrics and analytics
- Integration with container orchestrators
- DNS and API-based discovery interfaces
Solution
1// run
2package main
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "math/rand"
10 "net/http"
11 "sync"
12 "time"
13)
14
15// Service represents a registered service instance
16type Service struct {
17 ID string `json:"id"`
18 Name string `json:"name"`
19 Address string `json:"address"`
20 Port int `json:"port"`
21 Version string `json:"version"`
22 Tags []string `json:"tags"`
23 Metadata map[string]string `json:"metadata"`
24 Health HealthStatus `json:"health"`
25 LastSeen time.Time `json:"last_seen"`
26}
27
28type HealthStatus string
29
30const (
31 HealthHealthy HealthStatus = "healthy"
32 HealthUnhealthy HealthStatus = "unhealthy"
33 HealthUnknown HealthStatus = "unknown"
34)
35
36// ServiceRegistry manages service registration and discovery
37type ServiceRegistry struct {
38 mu sync.RWMutex
39 services map[string]map[string]*Service // service-name -> service-id -> service
40 watchers map[string][]chan *ServiceEvent
41}
42
43type ServiceEvent struct {
44 Type string `json:"type"` // "register", "deregister", "health_change"
45 Service *Service `json:"service"`
46}
47
48func NewServiceRegistry() *ServiceRegistry {
49 registry := &ServiceRegistry{
50 services: make(map[string]map[string]*Service),
51 watchers: make(map[string][]chan *ServiceEvent),
52 }
53
54 // Start background health check
55 go registry.healthCheckLoop()
56
57 return registry
58}
59
60// Register a new service instance
61func Register(service *Service) error {
62 sr.mu.Lock()
63 defer sr.mu.Unlock()
64
65 if sr.services[service.Name] == nil {
66 sr.services[service.Name] = make(map[string]*Service)
67 }
68
69 service.LastSeen = time.Now()
70 service.Health = HealthHealthy
71 sr.services[service.Name][service.ID] = service
72
73 log.Printf("Registered service: %s at %s:%d", service.Name, service.ID, service.Address, service.Port)
74
75 // Notify watchers
76 sr.notifyWatchers(service.Name, &ServiceEvent{
77 Type: "register",
78 Service: service,
79 })
80
81 return nil
82}
83
84// Deregister a service instance
85func Deregister(serviceName, serviceID string) error {
86 sr.mu.Lock()
87 defer sr.mu.Unlock()
88
89 if instances, ok := sr.services[serviceName]; ok {
90 if service, ok := instances[serviceID]; ok {
91 delete(instances, serviceID)
92
93 log.Printf("Deregistered service: %s", serviceName, serviceID)
94
95 // Notify watchers
96 sr.notifyWatchers(serviceName, &ServiceEvent{
97 Type: "deregister",
98 Service: service,
99 })
100
101 return nil
102 }
103 }
104
105 return fmt.Errorf("service not found: %s/%s", serviceName, serviceID)
106}
107
108// Discover returns all healthy instances of a service
109func Discover(serviceName string) {
110 sr.mu.RLock()
111 defer sr.mu.RUnlock()
112
113 instances, ok := sr.services[serviceName]
114 if !ok || len(instances) == 0 {
115 return nil, fmt.Errorf("no instances found for service: %s", serviceName)
116 }
117
118 var healthy []*Service
119 for _, service := range instances {
120 if service.Health == HealthHealthy {
121 healthy = append(healthy, service)
122 }
123 }
124
125 if len(healthy) == 0 {
126 return nil, fmt.Errorf("no healthy instances found for service: %s", serviceName)
127 }
128
129 return healthy, nil
130}
131
132// GetInstance returns a single instance using load balancing
133func GetInstance(serviceName string, strategy string) {
134 instances, err := sr.Discover(serviceName)
135 if err != nil {
136 return nil, err
137 }
138
139 switch strategy {
140 case "round-robin":
141 return instances[rand.Intn(len(instances))], nil
142 case "random":
143 return instances[rand.Intn(len(instances))], nil
144 default:
145 return instances[0], nil
146 }
147}
148
149// Heartbeat updates the last seen time for a service
150func Heartbeat(serviceName, serviceID string) error {
151 sr.mu.Lock()
152 defer sr.mu.Unlock()
153
154 if instances, ok := sr.services[serviceName]; ok {
155 if service, ok := instances[serviceID]; ok {
156 service.LastSeen = time.Now()
157 if service.Health != HealthHealthy {
158 service.Health = HealthHealthy
159 log.Printf("Service %s/%s is now healthy", serviceName, serviceID)
160
161 // Notify watchers
162 sr.notifyWatchers(serviceName, &ServiceEvent{
163 Type: "health_change",
164 Service: service,
165 })
166 }
167 return nil
168 }
169 }
170
171 return fmt.Errorf("service not found: %s/%s", serviceName, serviceID)
172}
173
174// Watch returns a channel that receives service events
175func Watch(serviceName string) <-chan *ServiceEvent {
176 sr.mu.Lock()
177 defer sr.mu.Unlock()
178
179 ch := make(chan *ServiceEvent, 10)
180 sr.watchers[serviceName] = append(sr.watchers[serviceName], ch)
181
182 return ch
183}
184
185func notifyWatchers(serviceName string, event *ServiceEvent) {
186 for _, ch := range sr.watchers[serviceName] {
187 select {
188 case ch <- event:
189 default:
190 // Channel full, skip
191 }
192 }
193}
194
195// Health check loop
196func healthCheckLoop() {
197 ticker := time.NewTicker(10 * time.Second)
198 defer ticker.Stop()
199
200 for range ticker.C {
201 sr.mu.Lock()
202
203 for serviceName, instances := range sr.services {
204 for serviceID, service := range instances {
205 // Check if service is stale
206 if time.Since(service.LastSeen) > 30*time.Second {
207 if service.Health != HealthUnhealthy {
208 service.Health = HealthUnhealthy
209 log.Printf("Service %s/%s is unhealthy", serviceName, serviceID)
210
211 // Notify watchers
212 sr.notifyWatchers(serviceName, &ServiceEvent{
213 Type: "health_change",
214 Service: service,
215 })
216 }
217
218 // Remove if unhealthy for too long
219 if time.Since(service.LastSeen) > 60*time.Second {
220 delete(instances, serviceID)
221 log.Printf("Removed stale service: %s/%s", serviceName, serviceID)
222
223 sr.notifyWatchers(serviceName, &ServiceEvent{
224 Type: "deregister",
225 Service: service,
226 })
227 }
228 }
229 }
230 }
231
232 sr.mu.Unlock()
233 }
234}
235
236// HTTP API for service registry
237type RegistryAPI struct {
238 registry *ServiceRegistry
239}
240
241func NewRegistryAPI(registry *ServiceRegistry) *RegistryAPI {
242 return &RegistryAPI{registry: registry}
243}
244
245func ServeHTTP(w http.ResponseWriter, r *http.Request) {
246 switch r.URL.Path {
247 case "/register":
248 api.handleRegister(w, r)
249 case "/deregister":
250 api.handleDeregister(w, r)
251 case "/discover":
252 api.handleDiscover(w, r)
253 case "/heartbeat":
254 api.handleHeartbeat(w, r)
255 default:
256 http.NotFound(w, r)
257 }
258}
259
260func handleRegister(w http.ResponseWriter, r *http.Request) {
261 var service Service
262 if err := json.NewDecoder(r.Body).Decode(&service); err != nil {
263 http.Error(w, err.Error(), http.StatusBadRequest)
264 return
265 }
266
267 if err := api.registry.Register(&service); err != nil {
268 http.Error(w, err.Error(), http.StatusInternalServerError)
269 return
270 }
271
272 w.WriteHeader(http.StatusCreated)
273 json.NewEncoder(w).Encode(service)
274}
275
276func handleDeregister(w http.ResponseWriter, r *http.Request) {
277 serviceName := r.URL.Query().Get("name")
278 serviceID := r.URL.Query().Get("id")
279
280 if err := api.registry.Deregister(serviceName, serviceID); err != nil {
281 http.Error(w, err.Error(), http.StatusNotFound)
282 return
283 }
284
285 w.WriteHeader(http.StatusOK)
286}
287
288func handleDiscover(w http.ResponseWriter, r *http.Request) {
289 serviceName := r.URL.Query().Get("name")
290
291 instances, err := api.registry.Discover(serviceName)
292 if err != nil {
293 http.Error(w, err.Error(), http.StatusNotFound)
294 return
295 }
296
297 json.NewEncoder(w).Encode(instances)
298}
299
300func handleHeartbeat(w http.ResponseWriter, r *http.Request) {
301 serviceName := r.URL.Query().Get("name")
302 serviceID := r.URL.Query().Get("id")
303
304 if err := api.registry.Heartbeat(serviceName, serviceID); err != nil {
305 http.Error(w, err.Error(), http.StatusNotFound)
306 return
307 }
308
309 w.WriteHeader(http.StatusOK)
310}
311
312// Example service that registers itself
313type ExampleService struct {
314 id string
315 name string
316 port int
317 registry *ServiceRegistry
318 stopCh chan struct{}
319}
320
321func NewExampleService(name string, port int, registry *ServiceRegistry) *ExampleService {
322 return &ExampleService{
323 id: fmt.Sprintf("%s-%d", name, time.Now().UnixNano()),
324 name: name,
325 port: port,
326 registry: registry,
327 stopCh: make(chan struct{}),
328 }
329}
330
331func Start() {
332 // Register service
333 service := &Service{
334 ID: s.id,
335 Name: s.name,
336 Address: "localhost",
337 Port: s.port,
338 Version: "1.0.0",
339 Tags: []string{"production"},
340 Metadata: map[string]string{
341 "region": "us-east-1",
342 },
343 }
344
345 s.registry.Register(service)
346
347 // Send heartbeats
348 go func() {
349 ticker := time.NewTicker(5 * time.Second)
350 defer ticker.Stop()
351
352 for {
353 select {
354 case <-ticker.C:
355 s.registry.Heartbeat(s.name, s.id)
356 case <-s.stopCh:
357 return
358 }
359 }
360 }()
361
362 // Simple HTTP server setup
363 // See also: [Web Development](/02-standard-library/02-web-development) for comprehensive HTTP server patterns
364 http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
365 w.WriteHeader(http.StatusOK)
366 w.Write([]byte("OK"))
367 })
368
369 log.Printf("Service %s starting on port %d", s.name, s.port)
370 http.ListenAndServe(fmt.Sprintf(":%d", s.port), nil)
371}
372
373func Stop() {
374 close(s.stopCh)
375 s.registry.Deregister(s.name, s.id)
376}
377
378func main() {
379 registry := NewServiceRegistry()
380
381 // Start registry API
382 api := NewRegistryAPI(registry)
383 go func() {
384 log.Println("Service Registry API starting on :8500")
385 http.ListenAndServe(":8500", api)
386 }()
387
388 // Start multiple instances of a service
389 for i := 0; i < 3; i++ {
390 service := NewExampleService("api-service", 9000+i, registry)
391 go service.Start()
392 }
393
394 // Wait for services to register
395 time.Sleep(2 * time.Second)
396
397 // Demonstrate service discovery
398 instances, _ := registry.Discover("api-service")
399 fmt.Printf("\nDiscovered %d instances of api-service:\n", len(instances))
400 for _, instance := range instances {
401 fmt.Printf(" - %s at %s:%d\n",
402 instance.ID, instance.Address, instance.Port, instance.Health)
403 }
404
405 // Watch for service changes
406 watch := registry.Watch("api-service")
407 go func() {
408 for event := range watch {
409 fmt.Printf("Service event: %s - %s/%s\n", event.Type, event.Service.Name, event.Service.ID)
410 }
411 }()
412
413 fmt.Println("\nService Registry running. Press Ctrl+C to exit.")
414 fmt.Println("Test endpoints:")
415 fmt.Println(" curl http://localhost:8500/discover?name=api-service")
416 fmt.Println(" curl http://localhost:8500/heartbeat?name=api-service&id=<service-id>")
417
418 select {}
419}
Key Features:
- Service Registration: Register service instances with metadata
- Health Checks: Automatic health monitoring via heartbeats
- Service Discovery: Find all healthy instances by service name
- Load Balancing: Round-robin and random strategies
- Automatic Cleanup: Remove stale/unhealthy services
- Watch Mechanism: Subscribe to service changes in real-time
- HTTP API: RESTful API for registration and discovery
- Metadata Support: Version, tags, and custom metadata
- Concurrent Safe: Thread-safe operations with proper locking
- Production Ready: Heartbeat-based health monitoring
Exercise 4: Distributed Tracing and Observability
🎯 Learning Objectives:
- Implement comprehensive distributed tracing across service boundaries
- Understand trace context propagation and span relationships
- Create meaningful spans for database queries, external calls, and business operations
- Set up observability stack with Jaeger and OpenTelemetry
- Analyze performance bottlenecks and service dependencies
⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A microservices application experiencing performance issues needs distributed tracing to identify bottlenecks, understand service dependencies, and debug requests across multiple services. Without proper tracing, debugging production issues becomes nearly impossible.
Task: Build a complete distributed tracing system that provides end-to-end visibility into request flows across microservices with meaningful instrumentation and analysis capabilities.
Core Tracing Components:
- Trace Propagation: Context propagation across HTTP and gRPC boundaries
- Span Creation: Automatic and manual span instrumentation
- OpenTelemetry Integration: Vendor-neutral tracing implementation
- Jaeger Backend: Trace collection, storage, and visualization
- Custom Attributes: Business-relevant metadata for debugging
Instrumentation Targets:
- HTTP Requests: Incoming and outgoing HTTP calls with full context
- Database Operations: Query execution, connection pool usage, slow queries
- External API Calls: Third-party service interactions with retry tracking
- Message Processing: Queue operations and async processing
- Business Operations: Order processing, payment flows, user interactions
Trace Analysis Features:
- Service dependency mapping and topology visualization
- Performance bottleneck identification and hot path analysis
- Error tracking and exception correlation across services
- Custom dashboards for business-critical operations
- Alerting on performance degradation and error rates
Advanced Patterns:
- Trace sampling strategies for high-traffic systems
- Baggage propagation for cross-service context
- Custom span events and structured logging integration
- Performance regression detection
- SLA monitoring and compliance tracking
Solution
1// run
2package main
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "math/rand"
11 "net/http"
12 "time"
13)
14
15// Span represents a unit of work in a trace
16type Span struct {
17 TraceID string `json:"trace_id"`
18 SpanID string `json:"span_id"`
19 ParentID string `json:"parent_id,omitempty"`
20 Name string `json:"name"`
21 StartTime time.Time `json:"start_time"`
22 EndTime time.Time `json:"end_time,omitempty"`
23 Duration time.Duration `json:"duration,omitempty"`
24 Attributes map[string]interface{} `json:"attributes"`
25 Events []SpanEvent `json:"events"`
26 Status SpanStatus `json:"status"`
27}
28
29type SpanEvent struct {
30 Name string `json:"name"`
31 Timestamp time.Time `json:"timestamp"`
32 Attributes map[string]interface{} `json:"attributes,omitempty"`
33}
34
35type SpanStatus struct {
36 Code string `json:"code"` // "OK", "ERROR"
37 Message string `json:"message,omitempty"`
38}
39
40// Tracer manages span creation and context propagation
41type Tracer struct {
42 serviceName string
43 spans []*Span
44}
45
46func NewTracer(serviceName string) *Tracer {
47 return &Tracer{
48 serviceName: serviceName,
49 spans: []*Span{},
50 }
51}
52
53// StartSpan creates a new span
54func StartSpan(ctx context.Context, name string) {
55 span := &Span{
56 TraceID: getTraceIDFromContext(ctx),
57 SpanID: generateID(),
58 ParentID: getSpanIDFromContext(ctx),
59 Name: name,
60 StartTime: time.Now(),
61 Attributes: map[string]interface{}{
62 "service.name": t.serviceName,
63 },
64 Events: []SpanEvent{},
65 Status: SpanStatus{Code: "OK"},
66 }
67
68 if span.TraceID == "" {
69 span.TraceID = generateID()
70 }
71
72 // Add span to context
73 ctx = context.WithValue(ctx, "trace_id", span.TraceID)
74 ctx = context.WithValue(ctx, "span_id", span.SpanID)
75 ctx = context.WithValue(ctx, "span", span)
76
77 return span, ctx
78}
79
80// EndSpan finalizes a span
81func EndSpan(span *Span) {
82 span.EndTime = time.Now()
83 span.Duration = span.EndTime.Sub(span.StartTime)
84
85 t.spans = append(t.spans, span)
86
87 log.Printf("[TRACE] %s - %s",
88 t.serviceName, span.Name, span.TraceID[:8], span.SpanID[:8], span.Duration)
89}
90
91// SetSpanAttribute adds a custom attribute to a span
92func SetSpanAttribute(span *Span, key string, value interface{}) {
93 if span != nil {
94 span.Attributes[key] = value
95 }
96}
97
98// AddSpanEvent adds an event to a span
99func AddSpanEvent(span *Span, name string, attributes map[string]interface{}) {
100 if span != nil {
101 span.Events = append(span.Events, SpanEvent{
102 Name: name,
103 Timestamp: time.Now(),
104 Attributes: attributes,
105 })
106 }
107}
108
109// RecordError records an error in a span
110func RecordError(span *Span, err error) {
111 if span != nil && err != nil {
112 span.Status.Code = "ERROR"
113 span.Status.Message = err.Error()
114 span.Attributes["error"] = true
115 span.Attributes["error.message"] = err.Error()
116
117 AddSpanEvent(span, "exception", map[string]interface{}{
118 "exception.message": err.Error(),
119 "exception.type": "error",
120 })
121 }
122}
123
124// PropagateContext injects trace context into HTTP headers
125func PropagateContext(ctx context.Context, header http.Header) {
126 traceID := getTraceIDFromContext(ctx)
127 spanID := getSpanIDFromContext(ctx)
128
129 if traceID != "" {
130 header.Set("X-Trace-ID", traceID)
131 }
132 if spanID != "" {
133 header.Set("X-Parent-Span-ID", spanID)
134 }
135}
136
137// ExtractContext extracts trace context from HTTP headers
138func ExtractContext(ctx context.Context, header http.Header) context.Context {
139 traceID := header.Get("X-Trace-ID")
140 parentSpanID := header.Get("X-Parent-Span-ID")
141
142 if traceID != "" {
143 ctx = context.WithValue(ctx, "trace_id", traceID)
144 }
145 if parentSpanID != "" {
146 ctx = context.WithValue(ctx, "parent_span_id", parentSpanID)
147 }
148
149 return ctx
150}
151
152// Helper functions
153func getTraceIDFromContext(ctx context.Context) string {
154 if traceID, ok := ctx.Value("trace_id").(string); ok {
155 return traceID
156 }
157 return ""
158}
159
160func getSpanIDFromContext(ctx context.Context) string {
161 if spanID, ok := ctx.Value("span_id").(string); ok {
162 return spanID
163 }
164 if parentSpanID, ok := ctx.Value("parent_span_id").(string); ok {
165 return parentSpanID
166 }
167 return ""
168}
169
170func generateID() string {
171 const charset = "abcdef0123456789"
172 b := make([]byte, 16)
173 for i := range b {
174 b[i] = charset[rand.Intn(len(charset))]
175 }
176 return string(b)
177}
178
179// Instrumented HTTP Client
180type InstrumentedClient struct {
181 tracer *Tracer
182 client *http.Client
183}
184
185func NewInstrumentedClient(tracer *Tracer) *InstrumentedClient {
186 return &InstrumentedClient{
187 tracer: tracer,
188 client: &http.Client{Timeout: 10 * time.Second},
189 }
190}
191
192func Get(ctx context.Context, url string) {
193 span, ctx := c.tracer.StartSpan(ctx, "HTTP GET "+url)
194 defer c.tracer.EndSpan(span)
195
196 SetSpanAttribute(span, "http.method", "GET")
197 SetSpanAttribute(span, "http.url", url)
198
199 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
200 if err != nil {
201 RecordError(span, err)
202 return nil, err
203 }
204
205 // Propagate trace context
206 PropagateContext(ctx, req.Header)
207
208 resp, err := c.client.Do(req)
209 if err != nil {
210 RecordError(span, err)
211 return nil, err
212 }
213
214 SetSpanAttribute(span, "http.status_code", resp.StatusCode)
215
216 if resp.StatusCode >= 400 {
217 RecordError(span, fmt.Errorf("HTTP error: %d", resp.StatusCode))
218 }
219
220 return resp, nil
221}
222
223// Example Services
224type ServiceA struct {
225 tracer *Tracer
226 client *InstrumentedClient
227}
228
229func NewServiceA() *ServiceA {
230 tracer := NewTracer("service-a")
231 return &ServiceA{
232 tracer: tracer,
233 client: NewInstrumentedClient(tracer),
234 }
235}
236
237func HandleRequest(w http.ResponseWriter, r *http.Request) {
238 // Extract trace context from incoming request
239 ctx := ExtractContext(r.Context(), r.Header)
240
241 span, ctx := s.tracer.StartSpan(ctx, "ServiceA.HandleRequest")
242 defer s.tracer.EndSpan(span)
243
244 SetSpanAttribute(span, "http.method", r.Method)
245 SetSpanAttribute(span, "http.path", r.URL.Path)
246 SetSpanAttribute(span, "http.remote_addr", r.RemoteAddr)
247
248 // Simulate database query
249 dbSpan, dbCtx := s.tracer.StartSpan(ctx, "Database Query")
250 SetSpanAttribute(dbSpan, "db.system", "postgresql")
251 SetSpanAttribute(dbSpan, "db.query", "SELECT * FROM users WHERE id = $1")
252
253 time.Sleep(20 * time.Millisecond) // Simulate DB query
254
255 AddSpanEvent(dbSpan, "query.executed", map[string]interface{}{
256 "rows.affected": 1,
257 })
258
259 s.tracer.EndSpan(dbSpan)
260
261 // Call Service B
262 resp, err := s.client.Get(dbCtx, "http://localhost:8082/api")
263 if err != nil {
264 RecordError(span, err)
265 http.Error(w, err.Error(), http.StatusInternalServerError)
266 return
267 }
268 defer resp.Body.Close()
269
270 body, _ := io.ReadAll(resp.Body)
271
272 response := map[string]interface{}{
273 "service": "A",
274 "trace_id": span.TraceID,
275 "data": string(body),
276 }
277
278 w.Header().Set("Content-Type", "application/json")
279 json.NewEncoder(w).Encode(response)
280}
281
282type ServiceB struct {
283 tracer *Tracer
284}
285
286func NewServiceB() *ServiceB {
287 return &ServiceB{
288 tracer: NewTracer("service-b"),
289 }
290}
291
292func HandleRequest(w http.ResponseWriter, r *http.Request) {
293 // Extract trace context
294 ctx := ExtractContext(r.Context(), r.Header)
295
296 span, ctx := s.tracer.StartSpan(ctx, "ServiceB.HandleRequest")
297 defer s.tracer.EndSpan(span)
298
299 SetSpanAttribute(span, "http.method", r.Method)
300 SetSpanAttribute(span, "http.path", r.URL.Path)
301
302 // Simulate processing
303 processingSpan, _ := s.tracer.StartSpan(ctx, "ProcessData")
304 SetSpanAttribute(processingSpan, "data.size", 1024)
305
306 time.Sleep(30 * time.Millisecond) // Simulate processing
307
308 // Simulate cache lookup
309 AddSpanEvent(processingSpan, "cache.lookup", map[string]interface{}{
310 "cache.hit": true,
311 "cache.key": "user:123",
312 })
313
314 s.tracer.EndSpan(processingSpan)
315
316 response := map[string]interface{}{
317 "service": "B",
318 "trace_id": span.TraceID,
319 "message": "Processed by Service B",
320 }
321
322 w.Header().Set("Content-Type", "application/json")
323 json.NewEncoder(w).Encode(response)
324}
325
326func main() {
327 serviceA := NewServiceA()
328 serviceB := NewServiceB()
329
330 // Service A on port 8081
331 go func() {
332 http.HandleFunc("/api", serviceA.HandleRequest)
333 log.Println("Service A starting on :8081")
334 http.ListenAndServe(":8081", nil)
335 }()
336
337 // Service B on port 8082
338 go func() {
339 mux := http.NewServeMux()
340 mux.HandleFunc("/api", serviceB.HandleRequest)
341 log.Println("Service B starting on :8082")
342 http.ListenAndServe(":8082", mux)
343 }()
344
345 // Wait for services to start
346 time.Sleep(1 * time.Second)
347
348 fmt.Println("\nDistributed Tracing Demo")
349 fmt.Println("========================")
350 fmt.Println("\nTest the traced request:")
351 fmt.Println(" curl http://localhost:8081/api")
352 fmt.Println("\nTrace propagation:")
353 fmt.Println(" Service A -> Database -> Service B")
354 fmt.Println("\nWatch the console for trace logs showing:")
355 fmt.Println(" - Trace IDs propagating across services")
356 fmt.Println(" - Span hierarchy")
357 fmt.Println(" - Operation durations")
358 fmt.Println(" - Custom attributes and events")
359
360 select {}
361}
Key Tracing Concepts:
- Trace Context Propagation: TraceID and SpanID passed via HTTP headers
- Span Hierarchy: Parent-child relationships between operations
- Custom Attributes: Add metadata to spans for debugging
- Events: Record significant moments within a span
- Error Tracking: Capture and propagate errors through traces
- Performance Metrics: Automatic duration tracking
- Service Topology: Understand service dependencies
- Request Flow: Track requests across multiple services
- Instrumentation: Wrap HTTP clients and database calls
- Production Ready: Minimal overhead, structured logging
Trace Visualization:
Request Flow:
ServiceA.HandleRequest
├── Database Query
│ └── query.executed event
└── HTTP GET http://localhost:8082/api
└── ServiceB.HandleRequest
└── ProcessData
└── cache.lookup event
Exercise 5: Resilience Patterns and Circuit Breakers
🎯 Learning Objectives:
- Implement circuit breaker pattern for fault tolerance
- Design fallback mechanisms for graceful degradation
- Understand failure detection and recovery patterns
- Build retry logic with exponential backoff
- Create comprehensive resilience monitoring and alerting
⏱️ Time Estimate: 75-90 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A microservices application experiencing cascading failures needs circuit breakers to prevent single service failures from bringing down the entire system. Proper resilience patterns ensure system stability during partial outages and performance degradation.
Task: Implement a comprehensive resilience patterns library that protects microservices from cascading failures and provides graceful degradation when dependencies are unavailable.
Core Resilience Patterns:
- Circuit Breaker: Prevent calls to failing services with automatic recovery
- Retry Mechanism: Configurable retry policies with exponential backoff
- Fallback Handlers: Alternative responses when primary service fails
- Bulkhead Isolation: Resource isolation to prevent resource exhaustion
- Timeout Management: Request timeout handling and cancellation
Circuit Breaker States:
- Closed: Normal operation with failure tracking
- Open: Rejecting all requests with immediate fallback
- Half-Open: Limited requests to test service recovery
- Automatic Transitions: State changes based on success/failure rates
Monitoring and Observability:
- Real-time circuit breaker state monitoring
- Success/failure rate metrics and trends
- Response time distributions and percentiles
- Fallback usage tracking and effectiveness
- Alert thresholds for performance degradation
Advanced Features:
- Circuit breaker configuration per service endpoint
- Dynamic threshold adjustment based on traffic patterns
- Integration with service discovery for automatic setup
- Circuit breaker dashboards and operational insights
- Load shedding and admission control during high load
- Distributed circuit breaker coordination
Solution
1// run
2package main
3
4import (
5 "context"
6 "errors"
7 "fmt"
8 "log"
9 "net/http"
10 "sync"
11 "time"
12)
13
14// CircuitState represents the state of the circuit breaker
15type CircuitState string
16
17const (
18 StateClosed CircuitState = "CLOSED" // Normal operation
19 StateOpen CircuitState = "OPEN" // Failing, reject requests
20 StateHalfOpen CircuitState = "HALF_OPEN" // Testing if service recovered
21)
22
23// CircuitBreaker implements the circuit breaker pattern
24type CircuitBreaker struct {
25 name string
26 maxFailures int
27 resetTimeout time.Duration
28 halfOpenMaxReq int
29
30 mu sync.RWMutex
31 state CircuitState
32 failures int
33 successes int
34 lastFailureTime time.Time
35 lastStateChange time.Time
36 halfOpenRequests int
37
38 // Metrics
39 totalRequests int64
40 totalSuccesses int64
41 totalFailures int64
42 totalRejected int64
43}
44
45// Config for circuit breaker
46type CircuitBreakerConfig struct {
47 Name string
48 MaxFailures int // Number of failures before opening
49 ResetTimeout time.Duration // Time before attempting half-open
50 HalfOpenMaxReq int // Max requests allowed in half-open state
51}
52
53func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker {
54 if config.MaxFailures == 0 {
55 config.MaxFailures = 5
56 }
57 if config.ResetTimeout == 0 {
58 config.ResetTimeout = 60 * time.Second
59 }
60 if config.HalfOpenMaxReq == 0 {
61 config.HalfOpenMaxReq = 3
62 }
63
64 return &CircuitBreaker{
65 name: config.Name,
66 maxFailures: config.MaxFailures,
67 resetTimeout: config.ResetTimeout,
68 halfOpenMaxReq: config.HalfOpenMaxReq,
69 state: StateClosed,
70 lastStateChange: time.Now(),
71 }
72}
73
74// Execute runs the function with circuit breaker protection
75func Execute(fn func() error) error {
76 cb.mu.Lock()
77
78 // Check if we should attempt recovery
79 if cb.state == StateOpen {
80 if time.Since(cb.lastFailureTime) > cb.resetTimeout {
81 cb.setState(StateHalfOpen)
82 log.Printf("[%s] Circuit breaker entering HALF-OPEN state", cb.name)
83 }
84 }
85
86 // Check state
87 switch cb.state {
88 case StateOpen:
89 cb.totalRejected++
90 cb.mu.Unlock()
91 return ErrCircuitOpen
92
93 case StateHalfOpen:
94 if cb.halfOpenRequests >= cb.halfOpenMaxReq {
95 cb.totalRejected++
96 cb.mu.Unlock()
97 return ErrTooManyRequests
98 }
99 cb.halfOpenRequests++
100 }
101
102 cb.totalRequests++
103 cb.mu.Unlock()
104
105 // Execute the function
106 err := fn()
107
108 // Record result
109 cb.mu.Lock()
110 defer cb.mu.Unlock()
111
112 if err != nil {
113 cb.onFailure()
114 return err
115 }
116
117 cb.onSuccess()
118 return nil
119}
120
121func onSuccess() {
122 cb.totalSuccesses++
123 cb.failures = 0
124
125 switch cb.state {
126 case StateHalfOpen:
127 cb.successes++
128 if cb.successes >= cb.halfOpenMaxReq {
129 cb.setState(StateClosed)
130 log.Printf("[%s] Circuit breaker CLOSED after successful recovery", cb.name)
131 }
132 }
133}
134
135func onFailure() {
136 cb.totalFailures++
137 cb.failures++
138 cb.lastFailureTime = time.Now()
139
140 switch cb.state {
141 case StateClosed:
142 if cb.failures >= cb.maxFailures {
143 cb.setState(StateOpen)
144 log.Printf("[%s] Circuit breaker OPENED after %d failures", cb.name, cb.failures)
145 }
146
147 case StateHalfOpen:
148 cb.setState(StateOpen)
149 log.Printf("[%s] Circuit breaker OPENED during recovery attempt", cb.name)
150 }
151}
152
153func setState(state CircuitState) {
154 cb.state = state
155 cb.lastStateChange = time.Now()
156
157 if state == StateHalfOpen {
158 cb.halfOpenRequests = 0
159 cb.successes = 0
160 }
161}
162
163// GetState returns the current state
164func GetState() CircuitState {
165 cb.mu.RLock()
166 defer cb.mu.RUnlock()
167 return cb.state
168}
169
170// GetMetrics returns circuit breaker metrics
171func GetMetrics() CircuitBreakerMetrics {
172 cb.mu.RLock()
173 defer cb.mu.RUnlock()
174
175 var successRate float64
176 if cb.totalRequests > 0 {
177 successRate = float64(cb.totalSuccesses) / float64(cb.totalRequests) * 100
178 }
179
180 return CircuitBreakerMetrics{
181 Name: cb.name,
182 State: cb.state,
183 TotalRequests: cb.totalRequests,
184 TotalSuccesses: cb.totalSuccesses,
185 TotalFailures: cb.totalFailures,
186 TotalRejected: cb.totalRejected,
187 SuccessRate: successRate,
188 LastStateChange: cb.lastStateChange,
189 }
190}
191
192type CircuitBreakerMetrics struct {
193 Name string `json:"name"`
194 State CircuitState `json:"state"`
195 TotalRequests int64 `json:"total_requests"`
196 TotalSuccesses int64 `json:"total_successes"`
197 TotalFailures int64 `json:"total_failures"`
198 TotalRejected int64 `json:"total_rejected"`
199 SuccessRate float64 `json:"success_rate"`
200 LastStateChange time.Time `json:"last_state_change"`
201}
202
203var (
204 ErrCircuitOpen = errors.New("circuit breaker is open")
205 ErrTooManyRequests = errors.New("too many requests in half-open state")
206 ErrServiceUnavailable = errors.New("service unavailable")
207)
208
209// Example HTTP client with circuit breaker
210type ResilientHTTPClient struct {
211 breaker *CircuitBreaker
212 client *http.Client
213 baseURL string
214}
215
216func NewResilientHTTPClient(name string, baseURL string) *ResilientHTTPClient {
217 return &ResilientHTTPClient{
218 breaker: NewCircuitBreaker(CircuitBreakerConfig{
219 Name: name,
220 MaxFailures: 3,
221 ResetTimeout: 10 * time.Second,
222 HalfOpenMaxReq: 2,
223 }),
224 client: &http.Client{Timeout: 5 * time.Second},
225 baseURL: baseURL,
226 }
227}
228
229func Get(ctx context.Context, path string) {
230 var response string
231
232 err := c.breaker.Execute(func() error {
233 req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
234 if err != nil {
235 return err
236 }
237
238 resp, err := c.client.Do(req)
239 if err != nil {
240 return err
241 }
242 defer resp.Body.Close()
243
244 if resp.StatusCode >= 500 {
245 return fmt.Errorf("server error: %d", resp.StatusCode)
246 }
247
248 response = fmt.Sprintf("Status: %d", resp.StatusCode)
249 return nil
250 })
251
252 if err != nil {
253 // Fallback mechanism
254 if err == ErrCircuitOpen || err == ErrTooManyRequests {
255 log.Printf("Using fallback response due to: %v", err)
256 return c.fallback(), nil
257 }
258 return "", err
259 }
260
261 return response, nil
262}
263
264func fallback() string {
265 return "Fallback response: Service temporarily unavailable"
266}
267
268func GetMetrics() CircuitBreakerMetrics {
269 return c.breaker.GetMetrics()
270}
271
272// Test service that can be toggled between healthy and unhealthy
273type TestService struct {
274 mu sync.Mutex
275 healthy bool
276 failCount int
277}
278
279func NewTestService() *TestService {
280 return &TestService{healthy: true}
281}
282
283func SetHealthy(healthy bool) {
284 s.mu.Lock()
285 defer s.mu.Unlock()
286 s.healthy = healthy
287 if healthy {
288 s.failCount = 0
289 }
290}
291
292func ServeHTTP(w http.ResponseWriter, r *http.Request) {
293 s.mu.Lock()
294 healthy := s.healthy
295 s.failCount++
296 s.mu.Unlock()
297
298 // Auto-recover after 5 failures
299 if s.failCount > 5 {
300 s.SetHealthy(true)
301 log.Println("[TestService] Auto-recovering")
302 }
303
304 if !healthy {
305 log.Println("[TestService] Returning error response")
306 http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
307 return
308 }
309
310 log.Println("[TestService] Returning success response")
311 w.WriteHeader(http.StatusOK)
312 w.Write([]byte("OK"))
313}
314
315func main() {
316 // Start test service
317 testService := NewTestService()
318 go func() {
319 http.Handle("/api", testService)
320 log.Println("Test service starting on :9000")
321 http.ListenAndServe(":9000", nil)
322 }()
323
324 time.Sleep(1 * time.Second)
325
326 // Create resilient client
327 client := NewResilientHTTPClient("test-service", "http://localhost:9000")
328
329 // Simulate requests
330 fmt.Println("\n=== Circuit Breaker Demo ===\n")
331
332 // Phase 1: Healthy service
333 fmt.Println("Phase 1: Service is healthy")
334 for i := 0; i < 3; i++ {
335 resp, err := client.Get(context.Background(), "/api")
336 fmt.Printf("Request %d: %v\n", i+1, resp, err)
337 time.Sleep(500 * time.Millisecond)
338 }
339 fmt.Printf("Circuit state: %s\n\n", client.breaker.GetState())
340
341 // Phase 2: Make service unhealthy
342 fmt.Println("Phase 2: Service becomes unhealthy")
343 testService.SetHealthy(false)
344
345 for i := 0; i < 5; i++ {
346 resp, err := client.Get(context.Background(), "/api")
347 fmt.Printf("Request %d: %v\n", i+1, resp, err)
348 fmt.Printf(" Circuit state: %s\n", client.breaker.GetState())
349 time.Sleep(500 * time.Millisecond)
350 }
351
352 // Phase 3: Wait for half-open
353 fmt.Println("\nPhase 3: Waiting for circuit to enter HALF-OPEN...")
354 time.Sleep(11 * time.Second)
355
356 for i := 0; i < 3; i++ {
357 resp, err := client.Get(context.Background(), "/api")
358 fmt.Printf("Request %d: %v\n", i+1, resp, err)
359 fmt.Printf(" Circuit state: %s\n", client.breaker.GetState())
360 time.Sleep(1 * time.Second)
361 }
362
363 // Print final metrics
364 metrics := client.GetMetrics()
365 fmt.Println("\n=== Final Metrics ===")
366 fmt.Printf("State: %s\n", metrics.State)
367 fmt.Printf("Total Requests: %d\n", metrics.TotalRequests)
368 fmt.Printf("Total Successes: %d\n", metrics.TotalSuccesses)
369 fmt.Printf("Total Failures: %d\n", metrics.TotalFailures)
370 fmt.Printf("Total Rejected: %d\n", metrics.TotalRejected)
371 fmt.Printf("Success Rate: %.2f%%\n", metrics.SuccessRate)
372
373 fmt.Println("\nCircuit Breaker States:")
374 fmt.Println(" CLOSED: Normal operation, all requests go through")
375 fmt.Println(" OPEN: Too many failures, reject all requests")
376 fmt.Println(" HALF-OPEN: Testing recovery, allow limited requests")
377}
Key Circuit Breaker Concepts:
- Three States: Closed, Open, Half-Open
- Failure Threshold: Open circuit after N consecutive failures
- Reset Timeout: Time to wait before attempting recovery
- Half-Open Testing: Limited requests to test service recovery
- Automatic Transitions: State changes based on success/failure patterns
- Fallback Mechanism: Provide degraded functionality when circuit is open
- Metrics Tracking: Monitor request counts, success rates, state changes
- Thread-Safe: Concurrent request handling with proper locking
- Configurable: Adjust thresholds and timeouts per service
- Production Ready: Prevents cascade failures in distributed systems
State Transitions:
CLOSED --[failures >= threshold]--> OPEN
OPEN --[timeout elapsed]--> HALF-OPEN
HALF-OPEN --[success]--> CLOSED
HALF-OPEN --[failure]--> OPEN
Further Reading
- Microservices Patterns
- Building Microservices
- gRPC Documentation
- Consul Service Discovery
- OpenTelemetry Go
- Circuit Breaker Pattern
- Saga Pattern
Summary
Key Takeaways
-
Service Design
- Single responsibility per service
- Decentralized data management
- Independent deployment
- API-first design
-
Communication
- Use gRPC for internal services
- REST for external APIs
- Async messaging for events
- Implement retries and circuit breakers
-
Resilience
- Circuit breakers prevent cascading failures
- Retries handle transient errors
- Health checks ensure service readiness
- Graceful degradation
-
Observability
- Distributed tracing across services
- Centralized logging
- Metrics and monitoring
- Service mesh for traffic management
-
Production
- Service discovery for dynamic routing
- API gateway for single entry point
- Saga pattern for distributed transactions
- Configuration management