Microservices with Go

📖 Prerequisite: Web Development - Understanding HTTP servers and routing is essential for building microservices

Consider a large restaurant where instead of one massive kitchen trying to cook everything, you have specialized stations: a grill station, a salad station, a dessert station, and a beverage station. Each station operates independently, has its own staff, and focuses on doing one thing exceptionally well. This is the essence of microservices architecture - breaking down a monolithic application into small, specialized services that work together.

Microservices architecture decomposes applications into small, independent services that communicate over networks. This guide covers building production-ready microservices in Go, including service discovery, communication patterns, distributed transactions, and deployment strategies.

💡 Key Takeaway: Microservices trade the simplicity of a single deployment for the flexibility of independent, scalable services that can be developed, deployed, and maintained separately.

Microservices Architecture Principles

Think of microservices like building a city with specialized districts rather than one massive building. Each district has a clear purpose, its own resources, and connects to others through well-defined roads.

Understanding microservices principles guides effective service design.

When to Use Microservices:

  • ✅ Large teams needing independent deployment
  • ✅ Different parts of system have vastly different scaling requirements
  • ✅ Need to experiment with different technologies for different problems
  • ✅ System complexity justifies the distributed system overhead

When NOT to Use Microservices:

  • ❌ Small team - monolith is faster to build
  • ❌ Startup/MVP - distributed systems add enormous complexity
  • ❌ Simple CRUD application - microservices overkill
  • ❌ Don't have expertise in distributed systems, monitoring, orchestration

⚠️ Important: The microservices approach introduces significant operational complexity. Start with a monolith and break it into services only when you have clear pain points that microservices can solve.

Critical Trade-offs:

  • Monolith: Simple deployment, easy debugging, but scales as one unit
  • Microservices: Independent scaling and deployment, but complex operations, network latency, data consistency challenges

Real-world Example: Netflix evolved from a monolithic DVD rental system to thousands of microservices handling recommendations, billing, video streaming, and user profiles independently. This allowed them to scale different parts of their system based on specific needs.

Core Principles

 1// run
 2package main
 3
 4import (
 5	"fmt"
 6)
 7
 8func main() {
 9	fmt.Println("Microservices Architecture Principles")
10	fmt.Println("====================================\n")
11
12	principles := []struct {
13		name        string
14		description string
15		example     string
16	}{
17		{
18			"Single Responsibility",
19			"Each service handles one business capability",
20			"UserService manages only user-related operations",
21		},
22		{
23			"Decentralized Data",
24			"Each service owns its database",
25			"OrderService has its own order database",
26		},
27		{
28			"Independent Deployment",
29			"Services deploy independently",
30			"Update ProductService without touching OrderService",
31		},
32		{
33			"Fault Isolation",
34			"Failure in one service doesn't crash others",
35			"Payment failure doesn't prevent browsing",
36		},
37		{
38			"Technology Diversity",
39			"Services can use different tech stacks",
40			"UserService in Go, RecommendationService in Python",
41		},
42		{
43			"API-First Design",
44			"Services communicate via well-defined APIs",
45			"REST, gRPC, or message queues",
46		},
47		{
48			"Stateless Services",
49			"Services don't maintain session state",
50			"All state in databases or caches",
51		},
52		{
53			"Observable",
54			"Services expose health, metrics, logs",
55			"Prometheus metrics, structured logging",
56		},
57	}
58
59	for i, p := range principles {
60		fmt.Printf("%d. %s\n", i+1, p.name)
61		fmt.Printf("   Definition: %s\n", p.description)
62		fmt.Printf("   Example:    %s\n\n", p.example)
63	}
64
65	fmt.Println("Benefits:")
66	fmt.Println("- Scalability: Scale services independently")
67	fmt.Println("- Resilience: Isolated failures")
68	fmt.Println("- Flexibility: Choose best tech per service")
69	fmt.Println("- Team autonomy: Teams own services end-to-end")
70	fmt.Println("\nChallenges:")
71	fmt.Println("- Distributed complexity")
72	fmt.Println("- Data consistency")
73	fmt.Println("- Testing difficulty")
74	fmt.Println("- Operational overhead")
75}

Service Boundaries

 1package main
 2
 3import (
 4	"fmt"
 5)
 6
 7// Example: E-commerce microservices
 8
 9type Service struct {
10	Name           string
11	Responsibility string
12	Database       string
13	APIs           []string
14	Dependencies   []string
15}
16
17func main() {
18	services := []Service{
19		{
20			Name:           "UserService",
21			Responsibility: "User authentication and profile management",
22			Database:       "users_db",
23			APIs:           []string{"POST /users", "GET /users/:id", "PUT /users/:id"},
24			Dependencies:   []string{},
25		},
26		{
27			Name:           "ProductService",
28			Responsibility: "Product catalog and inventory",
29			Database:       "products_db",
30			APIs:           []string{"GET /products", "POST /products", "PUT /products/:id/stock"},
31			Dependencies:   []string{},
32		},
33		{
34			Name:           "OrderService",
35			Responsibility: "Order processing and management",
36			Database:       "orders_db",
37			APIs:           []string{"POST /orders", "GET /orders/:id", "GET /users/:id/orders"},
38			Dependencies:   []string{"UserService", "ProductService", "PaymentService"},
39		},
40		{
41			Name:           "PaymentService",
42			Responsibility: "Payment processing",
43			Database:       "payments_db",
44			APIs:           []string{"POST /payments", "GET /payments/:id"},
45			Dependencies:   []string{"OrderService"},
46		},
47		{
48			Name:           "NotificationService",
49			Responsibility: "Email and SMS notifications",
50			Database:       "notifications_db",
51			APIs:           []string{"POST /notifications"},
52			Dependencies:   []string{},
53		},
54	}
55
56	fmt.Println("E-commerce Microservices Architecture")
57	fmt.Println("======================================\n")
58
59	for _, svc := range services {
60		fmt.Printf("Service: %s\n", svc.Name)
61		fmt.Printf("  Role: %s\n", svc.Responsibility)
62		fmt.Printf("  Database: %s\n", svc.Database)
63		fmt.Printf("  APIs: %v\n", svc.APIs)
64		if len(svc.Dependencies) > 0 {
65			fmt.Printf("  Depends on: %v\n", svc.Dependencies)
66		}
67		fmt.Println()
68	}
69}

Service Discovery

Imagine you're in a large shopping mall looking for a specific store. Instead of wandering around randomly, you check the directory map that shows exactly where each store is located and whether it's open. Service discovery works the same way - it's a directory that helps services find and communicate with each other dynamically.

Service discovery solves the fundamental problem: "How does Service A know where to find Service B when Service B might be running on multiple servers, scaling up and down, or even moving between data centers?"

Consul Service Discovery

  1package discovery
  2
  3import (
  4	"fmt"
  5	"log"
  6	"time"
  7
  8	consul "github.com/hashicorp/consul/api"
  9)
 10
 11type ServiceRegistry struct {
 12	client *consul.Client
 13}
 14
 15func NewServiceRegistry(consulAddr string) {
 16	config := consul.DefaultConfig()
 17	config.Address = consulAddr
 18
 19	client, err := consul.NewClient(config)
 20	if err != nil {
 21		return nil, err
 22	}
 23
 24	return &ServiceRegistry{client: client}, nil
 25}
 26
 27// Register service with Consul
 28func Register(
 29	serviceID, serviceName, address string,
 30	port int,
 31	tags []string,
 32) error {
 33	registration := &consul.AgentServiceRegistration{
 34		ID:      serviceID,
 35		Name:    serviceName,
 36		Address: address,
 37		Port:    port,
 38		Tags:    tags,
 39		Check: &consul.AgentServiceCheck{
 40			HTTP:                           fmt.Sprintf("http://%s:%d/health", address, port),
 41			Interval:                       "10s",
 42			Timeout:                        "3s",
 43			DeregisterCriticalServiceAfter: "30s",
 44		},
 45	}
 46
 47	return sr.client.Agent().ServiceRegister(registration)
 48}
 49
 50// Deregister service
 51func Deregister(serviceID string) error {
 52	return sr.client.Agent().ServiceDeregister(serviceID)
 53}
 54
 55// Discover healthy services
 56func Discover(serviceName string) {
 57	services, _, err := sr.client.Health().Service(serviceName, "", true, nil)
 58	return services, err
 59}
 60
 61// Get service endpoint
 62func GetServiceEndpoint(serviceName string) {
 63	services, err := sr.Discover(serviceName)
 64	if err != nil {
 65		return "", err
 66	}
 67
 68	if len(services) == 0 {
 69		return "", fmt.Errorf("no healthy instances of %s found", serviceName)
 70	}
 71
 72	// Simple round-robin
 73	service := services[0]
 74	endpoint := fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port)
 75
 76	return endpoint, nil
 77}
 78
 79// Example usage
 80func main() {
 81	registry, err := NewServiceRegistry("localhost:8500")
 82	if err != nil {
 83		log.Fatal(err)
 84	}
 85
 86	// Register service
 87	err = registry.Register(
 88		"user-service-1",
 89		"user-service",
 90		"localhost",
 91		8080,
 92		[]string{"api", "v1"},
 93	)
 94	if err != nil {
 95		log.Fatal(err)
 96	}
 97
 98	// Discover service
 99	endpoint, err := registry.GetServiceEndpoint("order-service")
100	if err != nil {
101		log.Printf("Discovery failed: %v", err)
102	} else {
103		log.Printf("Order service endpoint: %s", endpoint)
104	}
105
106	// Deregister on shutdown
107	defer registry.Deregister("user-service-1")
108
109	// Keep service running
110	time.Sleep(1 * time.Hour)
111}

etcd Service Discovery

 1package discovery
 2
 3import (
 4	"context"
 5	"fmt"
 6	"time"
 7
 8	clientv3 "go.etcd.io/etcd/client/v3"
 9)
10
11type EtcdRegistry struct {
12	client *clientv3.Client
13	lease  clientv3.Lease
14}
15
16func NewEtcdRegistry(endpoints []string) {
17	client, err := clientv3.New(clientv3.Config{
18		Endpoints:   endpoints,
19		DialTimeout: 5 * time.Second,
20	})
21	if err != nil {
22		return nil, err
23	}
24
25	return &EtcdRegistry{
26		client: client,
27		lease:  clientv3.NewLease(client),
28	}, nil
29}
30
31func Register(ctx context.Context, serviceKey, serviceValue string, ttl int64) error {
32	// Create a lease
33	leaseResp, err := er.lease.Grant(ctx, ttl)
34	if err != nil {
35		return err
36	}
37
38	// Put service info with lease
39	_, err = er.client.Put(ctx, serviceKey, serviceValue, clientv3.WithLease(leaseResp.ID))
40	if err != nil {
41		return err
42	}
43
44	// Keep alive
45	ch, err := er.lease.KeepAlive(ctx, leaseResp.ID)
46	if err != nil {
47		return err
48	}
49
50	// Consume keep alive responses
51	go func() {
52		for range ch {
53			// Keep alive response
54		}
55	}()
56
57	return nil
58}
59
60func Discover(ctx context.Context, servicePrefix string) {
61	resp, err := er.client.Get(ctx, servicePrefix, clientv3.WithPrefix())
62	if err != nil {
63		return nil, err
64	}
65
66	services := make(map[string]string)
67	for _, kv := range resp.Kvs {
68		services[string(kv.Key)] = string(kv.Value)
69	}
70
71	return services, nil
72}
73
74func Watch(ctx context.Context, servicePrefix string) clientv3.WatchChan {
75	return er.client.Watch(ctx, servicePrefix, clientv3.WithPrefix())
76}
77
78func Close() error {
79	return er.client.Close()
80}

Inter-Service Communication

Think of inter-service communication like different departments in a large company communicating. Some departments send formal memos, others have direct phone lines, and some use an internal messaging system. The choice depends on urgency, reliability needs, and the nature of the communication.

Services communicate via REST, gRPC, or message queues. The right choice depends on your specific needs:

REST APIs: Best for public-facing services, simple HTTP-based communication
gRPC: Ideal for internal service-to-service communication with high performance needs
Message Queues: Perfect for asynchronous communication and decoupling services

Communication Patterns Overview

  1// run
  2package main
  3
  4import (
  5	"fmt"
  6)
  7
  8type CommunicationPattern struct {
  9	Name        string
 10	Protocol    string
 11	Style       string
 12	UseCases    []string
 13	Pros        []string
 14	Cons        []string
 15}
 16
 17func main() {
 18	patterns := []CommunicationPattern{
 19		{
 20			Name:     "Synchronous HTTP/REST",
 21			Protocol: "HTTP/1.1 or HTTP/2",
 22			Style:    "Request-Response",
 23			UseCases: []string{
 24				"Public APIs",
 25				"Simple CRUD operations",
 26				"External integrations",
 27			},
 28			Pros: []string{
 29				"Universal compatibility",
 30				"Easy debugging",
 31				"Browser support",
 32				"Human readable",
 33			},
 34			Cons: []string{
 35				"Higher latency",
 36				"Text-based overhead",
 37				"No streaming support (HTTP/1.1)",
 38			},
 39		},
 40		{
 41			Name:     "gRPC",
 42			Protocol: "HTTP/2",
 43			Style:    "Request-Response or Streaming",
 44			UseCases: []string{
 45				"Internal service-to-service",
 46				"High-performance APIs",
 47				"Real-time bidirectional streaming",
 48			},
 49			Pros: []string{
 50				"Binary protocol (faster)",
 51				"Strong typing with Protobuf",
 52				"Bidirectional streaming",
 53				"Code generation",
 54			},
 55			Cons: []string{
 56				"Not browser-friendly",
 57				"Learning curve",
 58				"Binary format harder to debug",
 59			},
 60		},
 61		{
 62			Name:     "Message Queues",
 63			Protocol: "AMQP, MQTT, or proprietary",
 64			Style:    "Publish-Subscribe or Point-to-Point",
 65			UseCases: []string{
 66				"Event-driven architecture",
 67				"Async processing",
 68				"Fire-and-forget operations",
 69			},
 70			Pros: []string{
 71				"Decoupling services",
 72				"Guaranteed delivery",
 73				"Load leveling",
 74				"Replay capability",
 75			},
 76			Cons: []string{
 77				"Eventual consistency",
 78				"Additional infrastructure",
 79				"Complexity in debugging",
 80			},
 81		},
 82		{
 83			Name:     "GraphQL",
 84			Protocol: "HTTP",
 85			Style:    "Query-based",
 86			UseCases: []string{
 87				"Client-specific data needs",
 88				"Aggregating multiple services",
 89				"Mobile and web frontends",
 90			},
 91			Pros: []string{
 92				"Flexible queries",
 93				"Avoid over-fetching",
 94				"Single endpoint",
 95				"Strong typing",
 96			},
 97			Cons: []string{
 98				"Complex caching",
 99				"N+1 query problem",
100				"Learning curve",
101			},
102		},
103	}
104
105	fmt.Println("Inter-Service Communication Patterns")
106	fmt.Println("=" + string(make([]byte, 70)))
107	fmt.Println()
108
109	for _, pattern := range patterns {
110		fmt.Printf("Pattern: %s\n", pattern.Name)
111		fmt.Printf("Protocol: %s\n", pattern.Protocol)
112		fmt.Printf("Style: %s\n\n", pattern.Style)
113
114		fmt.Println("Use Cases:")
115		for _, uc := range pattern.UseCases {
116			fmt.Printf("  • %s\n", uc)
117		}
118		fmt.Println()
119
120		fmt.Println("Advantages:")
121		for _, pro := range pattern.Pros {
122			fmt.Printf("  ✓ %s\n", pro)
123		}
124		fmt.Println()
125
126		fmt.Println("Disadvantages:")
127		for _, con := range pattern.Cons {
128			fmt.Printf("  ✗ %s\n", con)
129		}
130		fmt.Println()
131		fmt.Println(string(make([]byte, 70)))
132		fmt.Println()
133	}
134
135	fmt.Println("Recommendation:")
136	fmt.Println("  • Internal services: gRPC for performance")
137	fmt.Println("  • Public APIs: REST for compatibility")
138	fmt.Println("  • Events/notifications: Message queues for reliability")
139	fmt.Println("  • Flexible client needs: GraphQL for data aggregation")
140}

gRPC Communication

 1// user.proto
 2syntax = "proto3";
 3
 4package user;
 5option go_package = "github.com/yourproject/proto/user";
 6
 7message User {
 8  string id = 1;
 9  string username = 2;
10  string email = 3;
11  string role = 4;
12}
13
14message GetUserRequest {
15  string id = 1;
16}
17
18message GetUserResponse {
19  User user = 1;
20}
21
22message CreateUserRequest {
23  string username = 1;
24  string email = 2;
25  string password = 3;
26}
27
28message CreateUserResponse {
29  User user = 1;
30}
31
32service UserService {
33  rpc GetUser(GetUserRequest) returns;
34  rpc CreateUser(CreateUserRequest) returns;
35}
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"log"
 7	"net"
 8
 9	"google.golang.org/grpc"
10	pb "github.com/yourproject/proto/user"
11)
12
13// Server implementation
14type userServiceServer struct {
15	pb.UnimplementedUserServiceServer
16}
17
18func GetUser(ctx context.Context, req *pb.GetUserRequest) {
19	// Fetch user from database
20	user := &pb.User{
21		Id:       req.Id,
22		Username: "john_doe",
23		Email:    "john@example.com",
24		Role:     "user",
25	}
26
27	return &pb.GetUserResponse{User: user}, nil
28}
29
30func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
31	// Create user in database
32	user := &pb.User{
33		Id:       "new-id",
34		Username: req.Username,
35		Email:    req.Email,
36		Role:     "user",
37	}
38
39	return &pb.CreateUserResponse{User: user}, nil
40}
41
42// gRPC server
43func runServer() {
44	lis, err := net.Listen("tcp", ":50051")
45	if err != nil {
46		log.Fatalf("Failed to listen: %v", err)
47	}
48
49	s := grpc.NewServer()
50	pb.RegisterUserServiceServer(s, &userServiceServer{})
51
52	log.Println("User service listening on :50051")
53	if err := s.Serve(lis); err != nil {
54		log.Fatalf("Failed to serve: %v", err)
55	}
56}
57
58// gRPC client
59func createClient() {
60	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
61	if err != nil {
62		return nil, nil, err
63	}
64
65	client := pb.NewUserServiceClient(conn)
66	return client, conn, nil
67}
68
69func main() {
70	// Start server in goroutine
71	go runServer()
72
73	// Wait for server to start
74	time.Sleep(1 * time.Second)
75
76	// Create client
77	client, conn, err := createClient()
78	if err != nil {
79		log.Fatal(err)
80	}
81	defer conn.Close()
82
83	// Call service
84	ctx := context.Background()
85	resp, err := client.GetUser(ctx, &pb.GetUserRequest{Id: "123"})
86	if err != nil {
87		log.Fatal(err)
88	}
89
90	fmt.Printf("User: %+v\n", resp.User)
91}

HTTP REST Communication

  1package client
  2
  3import (
  4	"bytes"
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"io"
  9	"net/http"
 10	"time"
 11)
 12
 13type HTTPClient struct {
 14	baseURL    string
 15	httpClient *http.Client
 16}
 17
 18func NewHTTPClient(baseURL string) *HTTPClient {
 19	return &HTTPClient{
 20		baseURL: baseURL,
 21		httpClient: &http.Client{
 22			Timeout: 10 * time.Second,
 23			Transport: &http.Transport{
 24				MaxIdleConns:        100,
 25				MaxIdleConnsPerHost: 10,
 26				IdleConnTimeout:     90 * time.Second,
 27			},
 28		},
 29	}
 30}
 31
 32func Get(ctx context.Context, path string, result interface{}) error {
 33	req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
 34	if err != nil {
 35		return err
 36	}
 37
 38	resp, err := c.httpClient.Do(req)
 39	if err != nil {
 40		return err
 41	}
 42	defer resp.Body.Close()
 43
 44	if resp.StatusCode != http.StatusOK {
 45		body, _ := io.ReadAll(resp.Body)
 46		return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
 47	}
 48
 49	return json.NewDecoder(resp.Body).Decode(result)
 50}
 51
 52func Post(ctx context.Context, path string, body, result interface{}) error {
 53	jsonData, err := json.Marshal(body)
 54	if err != nil {
 55		return err
 56	}
 57
 58	req, err := http.NewRequestWithContext(
 59		ctx,
 60		"POST",
 61		c.baseURL+path,
 62		bytes.NewBuffer(jsonData),
 63	)
 64	if err != nil {
 65		return err
 66	}
 67
 68	req.Header.Set("Content-Type", "application/json")
 69
 70	resp, err := c.httpClient.Do(req)
 71	if err != nil {
 72		return err
 73	}
 74	defer resp.Body.Close()
 75
 76	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
 77		respBody, _ := io.ReadAll(resp.Body)
 78		return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody))
 79	}
 80
 81	if result != nil {
 82		return json.NewDecoder(resp.Body).Decode(result)
 83	}
 84
 85	return nil
 86}
 87
 88// Example: Order service calling User service
 89type UserServiceClient struct {
 90	client *HTTPClient
 91}
 92
 93func NewUserServiceClient(baseURL string) *UserServiceClient {
 94	return &UserServiceClient{
 95		client: NewHTTPClient(baseURL),
 96	}
 97}
 98
 99type User struct {
100	ID       string `json:"id"`
101	Username string `json:"username"`
102	Email    string `json:"email"`
103}
104
105func GetUser(ctx context.Context, userID string) {
106	var user User
107	err := c.client.Get(ctx, "/users/"+userID, &user)
108	return &user, err
109}
110
111func CreateUser(ctx context.Context, username, email string) {
112	body := map[string]string{
113		"username": username,
114		"email":    email,
115	}
116
117	var user User
118	err := c.client.Post(ctx, "/users", body, &user)
119	return &user, err
120}

Message Queue Communication

  1package messaging
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log"
  8	"time"
  9
 10	"github.com/streadway/amqp"
 11)
 12
 13// Event represents a domain event
 14type Event struct {
 15	ID        string                 `json:"id"`
 16	Type      string                 `json:"type"`
 17	Timestamp time.Time              `json:"timestamp"`
 18	Data      map[string]interface{} `json:"data"`
 19	Metadata  map[string]string      `json:"metadata"`
 20}
 21
 22// MessageBroker handles message publishing and consumption
 23type MessageBroker struct {
 24	conn    *amqp.Connection
 25	channel *amqp.Channel
 26}
 27
 28func NewMessageBroker(url string) (*MessageBroker, error) {
 29	conn, err := amqp.Dial(url)
 30	if err != nil {
 31		return nil, err
 32	}
 33
 34	channel, err := conn.Channel()
 35	if err != nil {
 36		return nil, err
 37	}
 38
 39	return &MessageBroker{
 40		conn:    conn,
 41		channel: channel,
 42	}, nil
 43}
 44
 45// DeclareExchange creates an exchange for pub/sub
 46func (mb *MessageBroker) DeclareExchange(name, kind string) error {
 47	return mb.channel.ExchangeDeclare(
 48		name,   // name
 49		kind,   // type: direct, fanout, topic, headers
 50		true,   // durable
 51		false,  // auto-deleted
 52		false,  // internal
 53		false,  // no-wait
 54		nil,    // arguments
 55	)
 56}
 57
 58// Publish publishes an event to an exchange
 59func (mb *MessageBroker) Publish(exchange, routingKey string, event Event) error {
 60	body, err := json.Marshal(event)
 61	if err != nil {
 62		return err
 63	}
 64
 65	return mb.channel.Publish(
 66		exchange,   // exchange
 67		routingKey, // routing key
 68		false,      // mandatory
 69		false,      // immediate
 70		amqp.Publishing{
 71			ContentType:  "application/json",
 72			Body:         body,
 73			DeliveryMode: amqp.Persistent,
 74			Timestamp:    time.Now(),
 75		},
 76	)
 77}
 78
 79// Subscribe subscribes to events from an exchange
 80func (mb *MessageBroker) Subscribe(
 81	exchange, queueName, routingKey string,
 82	handler func(Event) error,
 83) error {
 84	// Declare queue
 85	queue, err := mb.channel.QueueDeclare(
 86		queueName, // name
 87		true,      // durable
 88		false,     // delete when unused
 89		false,     // exclusive
 90		false,     // no-wait
 91		nil,       // arguments
 92	)
 93	if err != nil {
 94		return err
 95	}
 96
 97	// Bind queue to exchange
 98	err = mb.channel.QueueBind(
 99		queue.Name, // queue name
100		routingKey, // routing key
101		exchange,   // exchange
102		false,
103		nil,
104	)
105	if err != nil {
106		return err
107	}
108
109	// Consume messages
110	msgs, err := mb.channel.Consume(
111		queue.Name, // queue
112		"",         // consumer
113		false,      // auto-ack
114		false,      // exclusive
115		false,      // no-local
116		false,      // no-wait
117		nil,        // args
118	)
119	if err != nil {
120		return err
121	}
122
123	// Process messages in goroutine
124	go func() {
125		for msg := range msgs {
126			var event Event
127			if err := json.Unmarshal(msg.Body, &event); err != nil {
128				log.Printf("Failed to unmarshal event: %v", err)
129				msg.Nack(false, false) // Discard malformed message
130				continue
131			}
132
133			if err := handler(event); err != nil {
134				log.Printf("Handler error: %v", err)
135				msg.Nack(false, true) // Requeue on error
136			} else {
137				msg.Ack(false) // Acknowledge success
138			}
139		}
140	}()
141
142	return nil
143}
144
145func (mb *MessageBroker) Close() error {
146	if err := mb.channel.Close(); err != nil {
147		return err
148	}
149	return mb.conn.Close()
150}
151
152// Example: Order service publishes events
153type OrderService struct {
154	broker *MessageBroker
155}
156
157func NewOrderService(broker *MessageBroker) *OrderService {
158	return &OrderService{broker: broker}
159}
160
161func (os *OrderService) CreateOrder(ctx context.Context, userID, productID string) error {
162	// Create order in database
163	orderID := "order-123"
164
165	// Publish event
166	event := Event{
167		ID:        "event-456",
168		Type:      "order.created",
169		Timestamp: time.Now(),
170		Data: map[string]interface{}{
171			"order_id":   orderID,
172			"user_id":    userID,
173			"product_id": productID,
174		},
175		Metadata: map[string]string{
176			"service": "order-service",
177		},
178	}
179
180	return os.broker.Publish("orders", "order.created", event)
181}
182
183// Example: Notification service subscribes to events
184type NotificationService struct {
185	broker *MessageBroker
186}
187
188func NewNotificationService(broker *MessageBroker) *NotificationService {
189	return &NotificationService{broker: broker}
190}
191
192func (ns *NotificationService) Start() error {
193	return ns.broker.Subscribe(
194		"orders",
195		"notification-queue",
196		"order.*",
197		ns.handleOrderEvent,
198	)
199}
200
201func (ns *NotificationService) handleOrderEvent(event Event) error {
202	log.Printf("Processing event: %s", event.Type)
203
204	switch event.Type {
205	case "order.created":
206		userID := event.Data["user_id"].(string)
207		orderID := event.Data["order_id"].(string)
208		return ns.sendOrderConfirmation(userID, orderID)
209
210	case "order.completed":
211		userID := event.Data["user_id"].(string)
212		return ns.sendCompletionNotification(userID)
213
214	default:
215		log.Printf("Unknown event type: %s", event.Type)
216	}
217
218	return nil
219}
220
221func (ns *NotificationService) sendOrderConfirmation(userID, orderID string) error {
222	log.Printf("Sending order confirmation to user %s for order %s", userID, orderID)
223	// Send email/SMS
224	return nil
225}
226
227func (ns *NotificationService) sendCompletionNotification(userID string) error {
228	log.Printf("Sending completion notification to user %s", userID)
229	return nil
230}

API Composition and BFF Pattern

API composition aggregates data from multiple microservices to reduce client complexity. The Backend for Frontend (BFF) pattern creates specialized backends optimized for different client types (mobile, web, IoT).

Think of BFF like having different waiters in a restaurant - one waiter specializes in explaining menu items to tourists (web clients), while another efficiently serves regulars who know exactly what they want (mobile clients).

💡 Key Takeaway: BFF pattern eliminates the "one-size-fits-all" API problem by providing client-specific APIs that aggregate and transform data optimally for each platform.

API Composition Example

  1package composition
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"net/http"
  8	"sync"
  9	"time"
 10)
 11
 12// CompositeResponse aggregates data from multiple services
 13type CompositeResponse struct {
 14	User    *User    `json:"user"`
 15	Orders  []Order  `json:"orders"`
 16	Profile *Profile `json:"profile"`
 17	Stats   *Stats   `json:"stats"`
 18	Errors  []string `json:"errors,omitempty"`
 19}
 20
 21type User struct {
 22	ID       string `json:"id"`
 23	Username string `json:"username"`
 24	Email    string `json:"email"`
 25}
 26
 27type Order struct {
 28	ID          string  `json:"id"`
 29	ProductName string  `json:"product_name"`
 30	Total       float64 `json:"total"`
 31	Status      string  `json:"status"`
 32}
 33
 34type Profile struct {
 35	Bio       string   `json:"bio"`
 36	Interests []string `json:"interests"`
 37}
 38
 39type Stats struct {
 40	TotalOrders   int     `json:"total_orders"`
 41	TotalSpent    float64 `json:"total_spent"`
 42	MemberSince   string  `json:"member_since"`
 43	LoyaltyPoints int     `json:"loyalty_points"`
 44}
 45
 46// APIComposer aggregates data from multiple services
 47type APIComposer struct {
 48	userClient    *HTTPClient
 49	orderClient   *HTTPClient
 50	profileClient *HTTPClient
 51	timeout       time.Duration
 52}
 53
 54func NewAPIComposer(timeout time.Duration) *APIComposer {
 55	return &APIComposer{
 56		userClient:    NewHTTPClient("http://user-service:8080"),
 57		orderClient:   NewHTTPClient("http://order-service:8081"),
 58		profileClient: NewHTTPClient("http://profile-service:8082"),
 59		timeout:       timeout,
 60	}
 61}
 62
 63// GetUserDashboard aggregates user data from multiple services in parallel
 64func (ac *APIComposer) GetUserDashboard(ctx context.Context, userID string) (*CompositeResponse, error) {
 65	ctx, cancel := context.WithTimeout(ctx, ac.timeout)
 66	defer cancel()
 67
 68	result := &CompositeResponse{
 69		Errors: []string{},
 70	}
 71
 72	// Use WaitGroup for concurrent requests
 73	var wg sync.WaitGroup
 74	var mu sync.Mutex
 75
 76	// Fetch user info
 77	wg.Add(1)
 78	go func() {
 79		defer wg.Done()
 80		user, err := ac.fetchUser(ctx, userID)
 81		mu.Lock()
 82		defer mu.Unlock()
 83		if err != nil {
 84			result.Errors = append(result.Errors, fmt.Sprintf("user service: %v", err))
 85		} else {
 86			result.User = user
 87		}
 88	}()
 89
 90	// Fetch orders
 91	wg.Add(1)
 92	go func() {
 93		defer wg.Done()
 94		orders, err := ac.fetchOrders(ctx, userID)
 95		mu.Lock()
 96		defer mu.Unlock()
 97		if err != nil {
 98			result.Errors = append(result.Errors, fmt.Sprintf("order service: %v", err))
 99		} else {
100			result.Orders = orders
101		}
102	}()
103
104	// Fetch profile
105	wg.Add(1)
106	go func() {
107		defer wg.Done()
108		profile, err := ac.fetchProfile(ctx, userID)
109		mu.Lock()
110		defer mu.Unlock()
111		if err != nil {
112			result.Errors = append(result.Errors, fmt.Sprintf("profile service: %v", err))
113		} else {
114			result.Profile = profile
115		}
116	}()
117
118	// Fetch stats
119	wg.Add(1)
120	go func() {
121		defer wg.Done()
122		stats, err := ac.fetchStats(ctx, userID)
123		mu.Lock()
124		defer mu.Unlock()
125		if err != nil {
126			result.Errors = append(result.Errors, fmt.Sprintf("stats calculation: %v", err))
127		} else {
128			result.Stats = stats
129		}
130	}()
131
132	wg.Wait()
133
134	return result, nil
135}
136
137func (ac *APIComposer) fetchUser(ctx context.Context, userID string) (*User, error) {
138	var user User
139	err := ac.userClient.Get(ctx, fmt.Sprintf("/users/%s", userID), &user)
140	return &user, err
141}
142
143func (ac *APIComposer) fetchOrders(ctx context.Context, userID string) ([]Order, error) {
144	var orders []Order
145	err := ac.orderClient.Get(ctx, fmt.Sprintf("/users/%s/orders", userID), &orders)
146	return orders, err
147}
148
149func (ac *APIComposer) fetchProfile(ctx context.Context, userID string) (*Profile, error) {
150	var profile Profile
151	err := ac.profileClient.Get(ctx, fmt.Sprintf("/profiles/%s", userID), &profile)
152	return &profile, err
153}
154
155func (ac *APIComposer) fetchStats(ctx context.Context, userID string) (*Stats, error) {
156	var stats Stats
157	err := ac.orderClient.Get(ctx, fmt.Sprintf("/users/%s/stats", userID), &stats)
158	return &stats, err
159}
160
161// HTTP Handler for composition endpoint
162func (ac *APIComposer) HandleDashboard(w http.ResponseWriter, r *http.Request) {
163	userID := r.URL.Query().Get("user_id")
164	if userID == "" {
165		http.Error(w, "user_id required", http.StatusBadRequest)
166		return
167	}
168
169	response, err := ac.GetUserDashboard(r.Context(), userID)
170	if err != nil {
171		http.Error(w, err.Error(), http.StatusInternalServerError)
172		return
173	}
174
175	w.Header().Set("Content-Type", "application/json")
176	json.NewEncoder(w).Encode(response)
177}

BFF Pattern Implementation

  1package bff
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"net/http"
  8	"time"
  9)
 10
 11// MobileBFF provides bandwidth-optimized API for mobile clients
 12type MobileBFF struct {
 13	composer *APIComposer
 14}
 15
 16func NewMobileBFF(composer *APIComposer) *MobileBFF {
 17	return &MobileBFF{composer: composer}
 18}
 19
 20// Mobile response is streamlined
 21type MobileDashboardResponse struct {
 22	UserName      string         `json:"user_name"`
 23	RecentOrders  []MobileOrder  `json:"recent_orders"`
 24	QuickStats    MobileStats    `json:"quick_stats"`
 25	Notifications []Notification `json:"notifications"`
 26}
 27
 28type MobileOrder struct {
 29	ID     string  `json:"id"`
 30	Title  string  `json:"title"`
 31	Total  float64 `json:"total"`
 32	Status string  `json:"status"`
 33}
 34
 35type MobileStats struct {
 36	OrderCount int `json:"order_count"`
 37	Points     int `json:"points"`
 38}
 39
 40type Notification struct {
 41	Message string `json:"message"`
 42	Type    string `json:"type"`
 43}
 44
 45func (bff *MobileBFF) HandleDashboard(w http.ResponseWriter, r *http.Request) {
 46	userID := r.URL.Query().Get("user_id")
 47
 48	// Fetch full composite data
 49	data, err := bff.composer.GetUserDashboard(r.Context(), userID)
 50	if err != nil {
 51		http.Error(w, err.Error(), http.StatusInternalServerError)
 52		return
 53	}
 54
 55	// Transform to mobile-optimized format
 56	response := MobileDashboardResponse{
 57		UserName:      data.User.Username,
 58		RecentOrders:  bff.transformOrders(data.Orders),
 59		QuickStats: MobileStats{
 60			OrderCount: data.Stats.TotalOrders,
 61			Points:     data.Stats.LoyaltyPoints,
 62		},
 63		Notifications: bff.buildNotifications(data),
 64	}
 65
 66	w.Header().Set("Content-Type", "application/json")
 67	json.NewEncoder(w).Encode(response)
 68}
 69
 70func (bff *MobileBFF) transformOrders(orders []Order) []MobileOrder {
 71	// Return only recent 5 orders with minimal data
 72	mobileOrders := make([]MobileOrder, 0, 5)
 73	for i, order := range orders {
 74		if i >= 5 {
 75			break
 76		}
 77		mobileOrders = append(mobileOrders, MobileOrder{
 78			ID:     order.ID,
 79			Title:  order.ProductName,
 80			Total:  order.Total,
 81			Status: order.Status,
 82		})
 83	}
 84	return mobileOrders
 85}
 86
 87func (bff *MobileBFF) buildNotifications(data *CompositeResponse) []Notification {
 88	notifications := []Notification{}
 89
 90	// Check for pending orders
 91	for _, order := range data.Orders {
 92		if order.Status == "pending" {
 93			notifications = append(notifications, Notification{
 94				Message: fmt.Sprintf("Order %s is being processed", order.ID),
 95				Type:    "info",
 96			})
 97		}
 98	}
 99
100	// Check loyalty points milestone
101	if data.Stats.LoyaltyPoints >= 1000 {
102		notifications = append(notifications, Notification{
103			Message: "You've earned 1000 loyalty points!",
104			Type:    "achievement",
105		})
106	}
107
108	return notifications
109}
110
111// WebBFF provides feature-rich API for web clients
112type WebBFF struct {
113	composer *APIComposer
114}
115
116func NewWebBFF(composer *APIComposer) *WebBFF {
117	return &WebBFF{composer: composer}
118}
119
120// Web response includes full details
121type WebDashboardResponse struct {
122	User            *User           `json:"user"`
123	Orders          []EnrichedOrder `json:"orders"`
124	Profile         *Profile        `json:"profile"`
125	Stats           *DetailedStats  `json:"stats"`
126	Recommendations []Product       `json:"recommendations"`
127}
128
129type EnrichedOrder struct {
130	Order
131	EstimatedDelivery time.Time `json:"estimated_delivery"`
132	TrackingURL       string    `json:"tracking_url"`
133}
134
135type DetailedStats struct {
136	Stats
137	SpendingTrend []SpendingDataPoint `json:"spending_trend"`
138	TopCategories []CategorySpending  `json:"top_categories"`
139}
140
141type SpendingDataPoint struct {
142	Month  string  `json:"month"`
143	Amount float64 `json:"amount"`
144}
145
146type CategorySpending struct {
147	Category string  `json:"category"`
148	Amount   float64 `json:"amount"`
149}
150
151type Product struct {
152	ID    string  `json:"id"`
153	Name  string  `json:"name"`
154	Price float64 `json:"price"`
155}
156
157func (bff *WebBFF) HandleDashboard(w http.ResponseWriter, r *http.Request) {
158	userID := r.URL.Query().Get("user_id")
159
160	data, err := bff.composer.GetUserDashboard(r.Context(), userID)
161	if err != nil {
162		http.Error(w, err.Error(), http.StatusInternalServerError)
163		return
164	}
165
166	recommendations, _ := bff.fetchRecommendations(r.Context(), userID)
167
168	response := WebDashboardResponse{
169		User:            data.User,
170		Orders:          bff.enrichOrders(data.Orders),
171		Profile:         data.Profile,
172		Stats:           bff.enrichStats(data.Stats),
173		Recommendations: recommendations,
174	}
175
176	w.Header().Set("Content-Type", "application/json")
177	json.NewEncoder(w).Encode(response)
178}
179
180func (bff *WebBFF) enrichOrders(orders []Order) []EnrichedOrder {
181	enriched := make([]EnrichedOrder, len(orders))
182	for i, order := range orders {
183		enriched[i] = EnrichedOrder{
184			Order:             order,
185			EstimatedDelivery: time.Now().Add(3 * 24 * time.Hour),
186			TrackingURL:       fmt.Sprintf("https://tracking.example.com/%s", order.ID),
187		}
188	}
189	return enriched
190}
191
192func (bff *WebBFF) enrichStats(stats *Stats) *DetailedStats {
193	return &DetailedStats{
194		Stats: *stats,
195		SpendingTrend: []SpendingDataPoint{
196			{Month: "Jan", Amount: 120.50},
197			{Month: "Feb", Amount: 89.20},
198			{Month: "Mar", Amount: 145.75},
199		},
200		TopCategories: []CategorySpending{
201			{Category: "Electronics", Amount: 500.00},
202			{Category: "Books", Amount: 125.00},
203		},
204	}
205}
206
207func (bff *WebBFF) fetchRecommendations(ctx context.Context, userID string) ([]Product, error) {
208	return []Product{
209		{ID: "prod-1", Name: "Recommended Product", Price: 29.99},
210	}, nil
211}

API Gateway Pattern

API Gateway provides a single entry point for clients and handles cross-cutting concerns like authentication, rate limiting, and request routing. It acts as a reception desk for your microservices, directing requests and managing security.

💡 Key Takeaway: An API Gateway simplifies client communication by providing a unified interface while hiding the complexity of your microservices architecture.

📖 Complete Guide: For comprehensive coverage of API Gateway patterns including implementation details, authentication strategies, rate limiting, load balancing, circuit breaking, and production best practices, see API Gateway Patterns.

Key API Gateway Features

Core Capabilities:

  • Request Routing: Direct requests to appropriate microservices
  • Authentication & Authorization: Centralized security enforcement
  • Rate Limiting: Protect services from overload
  • Load Balancing: Distribute traffic across service instances
  • Circuit Breaking: Prevent cascading failures
  • Request/Response Transformation: Adapt protocols and formats
  • Aggregation: Combine multiple service responses

Common Patterns:

  • Single Entry Point: Unified API surface for clients
  • Protocol Translation: REST ↔ gRPC ↔ GraphQL
  • Service Composition: Aggregate responses from multiple services
  • Edge Security: SSL termination, WAF integration
  • Analytics & Monitoring: Request tracking, response metrics

Integration Example

In microservices architecture, the API Gateway sits between clients and backend services:

Client → API Gateway → Multiple Microservices
          

For detailed implementation examples, middleware patterns, and production configurations, see the dedicated API Gateway tutorial.

Distributed Transactions with Saga

Imagine you're planning a vacation that involves booking a flight, reserving a hotel, and renting a car. If any of these bookings fails, you need to cancel the others to avoid paying for services you can't use. A saga pattern works exactly like this travel planning - it coordinates multiple services and can roll back changes if something goes wrong.

Traditional database transactions don't work across multiple services. The Saga pattern manages distributed transactions by breaking them into a series of smaller transactions, each with a compensating action to undo changes if needed.

Saga pattern manages distributed transactions across microservices.

⚠️ Important: Sagas provide eventual consistency, not immediate consistency. There will be brief periods where the system is in an intermediate state during the saga execution.

Saga Orchestration

  1package saga
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7)
  8
  9type SagaStep struct {
 10	Name         string
 11	Execute      func(ctx context.Context, data interface{})
 12	Compensate   func(ctx context.Context, data interface{}) error
 13	ExecuteData  interface{}
 14	CompensateData interface{}
 15}
 16
 17type Saga struct {
 18	steps       []SagaStep
 19	executedSteps []int
 20}
 21
 22func NewSaga() *Saga {
 23	return &Saga{
 24		steps:         []SagaStep{},
 25		executedSteps: []int{},
 26	}
 27}
 28
 29func AddStep(step SagaStep) {
 30	s.steps = append(s.steps, step)
 31}
 32
 33func Execute(ctx context.Context) error {
 34	for i, step := range s.steps {
 35		log.Printf("Executing step: %s", step.Name)
 36
 37		result, err := step.Execute(ctx, step.ExecuteData)
 38		if err != nil {
 39			log.Printf("Step %s failed: %v", step.Name, err)
 40			s.compensate(ctx)
 41			return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
 42		}
 43
 44		// Store result for compensation if needed
 45		s.steps[i].CompensateData = result
 46		s.executedSteps = append(s.executedSteps, i)
 47
 48		log.Printf("Step %s completed successfully", step.Name)
 49	}
 50
 51	log.Println("Saga completed successfully")
 52	return nil
 53}
 54
 55func compensate(ctx context.Context) {
 56	log.Println("Starting compensation...")
 57
 58	// Compensate in reverse order
 59	for i := len(s.executedSteps) - 1; i >= 0; i-- {
 60		stepIndex := s.executedSteps[i]
 61		step := s.steps[stepIndex]
 62
 63		log.Printf("Compensating step: %s", step.Name)
 64
 65		if err := step.Compensate(ctx, step.CompensateData); err != nil {
 66			log.Printf("Compensation failed for %s: %v", step.Name, err)
 67			// Log but continue compensating other steps
 68		}
 69	}
 70
 71	log.Println("Compensation completed")
 72}
 73
 74// Example: Order creation saga
 75type OrderSaga struct {
 76	orderID   string
 77	userID    string
 78	productID string
 79	quantity  int
 80}
 81
 82func CreateOrderSaga(userID, productID string, quantity int) *Saga {
 83	saga := NewSaga()
 84
 85	// Step 1: Reserve inventory
 86	saga.AddStep(SagaStep{
 87		Name: "ReserveInventory",
 88		Execute: func(ctx context.Context, data interface{}) {
 89			// Call inventory service
 90			log.Printf("Reserving %d units of product %s", quantity, productID)
 91			// Return reservation ID
 92			return "reservation-123", nil
 93		},
 94		Compensate: func(ctx context.Context, data interface{}) error {
 95			reservationID := data.(string)
 96			log.Printf("Releasing inventory reservation %s", reservationID)
 97			// Call inventory service to release
 98			return nil
 99		},
100	})
101
102	// Step 2: Process payment
103	saga.AddStep(SagaStep{
104		Name: "ProcessPayment",
105		Execute: func(ctx context.Context, data interface{}) {
106			log.Printf("Processing payment for user %s", userID)
107			// Call payment service
108			return "payment-456", nil
109		},
110		Compensate: func(ctx context.Context, data interface{}) error {
111			paymentID := data.(string)
112			log.Printf("Refunding payment %s", paymentID)
113			// Call payment service to refund
114			return nil
115		},
116	})
117
118	// Step 3: Create order
119	saga.AddStep(SagaStep{
120		Name: "CreateOrder",
121		Execute: func(ctx context.Context, data interface{}) {
122			log.Printf("Creating order for user %s", userID)
123			// Call order service
124			return "order-789", nil
125		},
126		Compensate: func(ctx context.Context, data interface{}) error {
127			orderID := data.(string)
128			log.Printf("Cancelling order %s", orderID)
129			// Call order service to cancel
130			return nil
131		},
132	})
133
134	// Step 4: Send notification
135	saga.AddStep(SagaStep{
136		Name: "SendNotification",
137		Execute: func(ctx context.Context, data interface{}) {
138			log.Printf("Sending order confirmation to user %s", userID)
139			// Call notification service
140			return nil, nil
141		},
142		Compensate: func(ctx context.Context, data interface{}) error {
143			// Notifications don't need compensation
144			return nil
145		},
146	})
147
148	return saga
149}
150
151func main() {
152	ctx := context.Background()
153
154	saga := CreateOrderSaga("user-123", "product-456", 2)
155
156	if err := saga.Execute(ctx); err != nil {
157		log.Printf("Order creation failed: %v", err)
158	}
159}

Circuit Breakers and Retries

Think of a circuit breaker like the safety mechanisms in your home's electrical system. When too much current flows through a circuit, the breaker trips to prevent damage to your appliances. In microservices, when a service starts failing repeatedly, the circuit breaker "trips" to prevent cascading failures throughout your system.

Circuit breakers prevent cascading failures and retries handle transient errors.

Common Pitfalls:

  • Setting retry limits too low can cause unnecessary failures
  • Not implementing exponential backoff can overwhelm struggling services
  • Forgetting to configure timeouts can cause cascading delays

💡 Key Takeaway: Circuit breakers and retries work together to create resilient systems that can handle partial failures without crashing completely.

Circuit Breaker Implementation

  1package resilience
  2
  3import (
  4	"context"
  5	"errors"
  6	"fmt"
  7	"sync"
  8	"time"
  9)
 10
 11type CircuitState int
 12
 13const (
 14	StateClosed CircuitState = iota
 15	StateOpen
 16	StateHalfOpen
 17)
 18
 19type CircuitBreaker struct {
 20	name           string
 21	maxFailures    int
 22	timeout        time.Duration
 23	resetTimeout   time.Duration
 24	state          CircuitState
 25	failures       int
 26	lastFailTime   time.Time
 27	successCount   int
 28	halfOpenMax    int
 29	mu             sync.RWMutex
 30	onStateChange  func(from, to CircuitState)
 31}
 32
 33func NewCircuitBreaker(name string, maxFailures int, timeout, resetTimeout time.Duration) *CircuitBreaker {
 34	return &CircuitBreaker{
 35		name:          name,
 36		maxFailures:   maxFailures,
 37		timeout:       timeout,
 38		resetTimeout:  resetTimeout,
 39		state:         StateClosed,
 40		halfOpenMax:   3,
 41	}
 42}
 43
 44func Execute(ctx context.Context, fn func() error) error {
 45	if err := cb.beforeRequest(); err != nil {
 46		return err
 47	}
 48
 49	err := fn()
 50
 51	cb.afterRequest(err)
 52
 53	return err
 54}
 55
 56func beforeRequest() error {
 57	cb.mu.Lock()
 58	defer cb.mu.Unlock()
 59
 60	switch cb.state {
 61	case StateOpen:
 62		if time.Since(cb.lastFailTime) > cb.resetTimeout {
 63			cb.setState(StateHalfOpen)
 64			return nil
 65		}
 66		return fmt.Errorf("circuit breaker %s is open", cb.name)
 67
 68	case StateHalfOpen:
 69		if cb.successCount >= cb.halfOpenMax {
 70			cb.setState(StateClosed)
 71		}
 72		return nil
 73
 74	default:
 75		return nil
 76	}
 77}
 78
 79func afterRequest(err error) {
 80	cb.mu.Lock()
 81	defer cb.mu.Unlock()
 82
 83	if err != nil {
 84		cb.failures++
 85		cb.lastFailTime = time.Now()
 86
 87		if cb.state == StateHalfOpen {
 88			cb.setState(StateOpen)
 89		} else if cb.failures >= cb.maxFailures {
 90			cb.setState(StateOpen)
 91		}
 92	} else {
 93		if cb.state == StateHalfOpen {
 94			cb.successCount++
 95			if cb.successCount >= cb.halfOpenMax {
 96				cb.setState(StateClosed)
 97			}
 98		} else {
 99			cb.failures = 0
100		}
101	}
102}
103
104func setState(newState CircuitState) {
105	oldState := cb.state
106	cb.state = newState
107	cb.failures = 0
108	cb.successCount = 0
109
110	fmt.Printf("Circuit breaker %s: %v -> %v\n", cb.name, stateString(oldState), stateString(newState))
111
112	if cb.onStateChange != nil {
113		cb.onStateChange(oldState, newState)
114	}
115}
116
117func stateString(state CircuitState) string {
118	switch state {
119	case StateClosed:
120		return "CLOSED"
121	case StateOpen:
122		return "OPEN"
123	case StateHalfOpen:
124		return "HALF_OPEN"
125	default:
126		return "UNKNOWN"
127	}
128}
129
130func State() CircuitState {
131	cb.mu.RLock()
132	defer cb.mu.RUnlock()
133	return cb.state
134}
135
136// Retry with exponential backoff
137type RetryConfig struct {
138	MaxAttempts  int
139	InitialDelay time.Duration
140	MaxDelay     time.Duration
141	Multiplier   float64
142}
143
144func Retry(ctx context.Context, config RetryConfig, fn func() error) error {
145	var err error
146	delay := config.InitialDelay
147
148	for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
149		err = fn()
150		if err == nil {
151			return nil
152		}
153
154		if attempt == config.MaxAttempts {
155			break
156		}
157
158		select {
159		case <-ctx.Done():
160			return ctx.Err()
161		case <-time.After(delay):
162		}
163
164		delay = time.Duration(float64(delay) * config.Multiplier)
165		if delay > config.MaxDelay {
166			delay = config.MaxDelay
167		}
168
169		fmt.Printf("Retry attempt %d after %v\n", attempt, delay)
170	}
171
172	return fmt.Errorf("max retries exceeded: %w", err)
173}
174
175// Example usage
176func main() {
177	cb := NewCircuitBreaker("user-service", 3, 5*time.Second, 10*time.Second)
178
179	// Simulate service calls
180	for i := 0; i < 10; i++ {
181		err := cb.Execute(context.Background(), func() error {
182			// Simulate failing service
183			if i < 5 {
184				return errors.New("service unavailable")
185			}
186			return nil
187		})
188
189		if err != nil {
190			fmt.Printf("Request %d failed: %v\n", i, err)
191		} else {
192			fmt.Printf("Request %d succeeded\n", i)
193		}
194
195		time.Sleep(1 * time.Second)
196	}
197
198	// Retry example
199	retryConfig := RetryConfig{
200		MaxAttempts:  3,
201		InitialDelay: 100 * time.Millisecond,
202		MaxDelay:     1 * time.Second,
203		Multiplier:   2.0,
204	}
205
206	err := Retry(context.Background(), retryConfig, func() error {
207		// Simulate operation
208		return errors.New("temporary error")
209	})
210
211	if err != nil {
212		fmt.Printf("Operation failed: %v\n", err)
213	}
214}

Health Checks and Service Registration

Imagine you're organizing a conference and need to know which speakers are ready and available to present. You'd periodically check in with each speaker to confirm they're prepared and their equipment is working. Health checks in microservices work the same way - they continuously verify that services are running correctly and ready to handle traffic.

Health checks ensure services are ready to handle traffic.

When to use Liveness vs Readiness Checks:

  • Liveness: "Is the service alive?" - Restart if this fails
  • Readiness: "Is the service ready to accept traffic?" - Don't send requests if this fails

Real-world Example: Kubernetes uses both liveness and readiness probes. A service might be alive but not ready. This prevents sending traffic to services that can't handle it.

Health Check Implementation

  1package health
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"encoding/json"
  7	"fmt"
  8	"net/http"
  9	"sync"
 10	"time"
 11)
 12
 13type HealthStatus string
 14
 15const (
 16	StatusHealthy   HealthStatus = "healthy"
 17	StatusUnhealthy HealthStatus = "unhealthy"
 18	StatusDegraded  HealthStatus = "degraded"
 19)
 20
 21type ComponentHealth struct {
 22	Status    HealthStatus `json:"status"`
 23	Message   string       `json:"message,omitempty"`
 24	Timestamp time.Time    `json:"timestamp"`
 25}
 26
 27type HealthCheck struct {
 28	mu         sync.RWMutex
 29	components map[string]ComponentHealth
 30	db         *sql.DB
 31}
 32
 33func NewHealthCheck(db *sql.DB) *HealthCheck {
 34	return &HealthCheck{
 35		components: make(map[string]ComponentHealth),
 36		db:         db,
 37	}
 38}
 39
 40func Check(ctx context.Context) map[string]ComponentHealth {
 41	hc.mu.Lock()
 42	defer hc.mu.Unlock()
 43
 44	// Check database
 45	hc.components["database"] = hc.checkDatabase(ctx)
 46
 47	// Check external services
 48	hc.components["user_service"] = hc.checkExternalService(ctx, "http://user-service:8080/health")
 49
 50	// Check Redis
 51	// hc.components["redis"] = hc.checkRedis(ctx)
 52
 53	return hc.components
 54}
 55
 56func checkDatabase(ctx context.Context) ComponentHealth {
 57	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
 58	defer cancel()
 59
 60	err := hc.db.PingContext(ctx)
 61	if err != nil {
 62		return ComponentHealth{
 63			Status:    StatusUnhealthy,
 64			Message:   err.Error(),
 65			Timestamp: time.Now(),
 66		}
 67	}
 68
 69	return ComponentHealth{
 70		Status:    StatusHealthy,
 71		Timestamp: time.Now(),
 72	}
 73}
 74
 75func checkExternalService(ctx context.Context, url string) ComponentHealth {
 76	ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
 77	defer cancel()
 78
 79	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 80	if err != nil {
 81		return ComponentHealth{
 82			Status:    StatusUnhealthy,
 83			Message:   err.Error(),
 84			Timestamp: time.Now(),
 85		}
 86	}
 87
 88	resp, err := http.DefaultClient.Do(req)
 89	if err != nil {
 90		return ComponentHealth{
 91			Status:    StatusUnhealthy,
 92			Message:   err.Error(),
 93			Timestamp: time.Now(),
 94		}
 95	}
 96	defer resp.Body.Close()
 97
 98	if resp.StatusCode != http.StatusOK {
 99		return ComponentHealth{
100			Status:    StatusUnhealthy,
101			Message:   fmt.Sprintf("HTTP %d", resp.StatusCode),
102			Timestamp: time.Now(),
103		}
104	}
105
106	return ComponentHealth{
107		Status:    StatusHealthy,
108		Timestamp: time.Now(),
109	}
110}
111
112func OverallStatus() HealthStatus {
113	hc.mu.RLock()
114	defer hc.mu.RUnlock()
115
116	hasUnhealthy := false
117	hasDegraded := false
118
119	for _, comp := range hc.components {
120		switch comp.Status {
121		case StatusUnhealthy:
122			hasUnhealthy = true
123		case StatusDegraded:
124			hasDegraded = true
125		}
126	}
127
128	if hasUnhealthy {
129		return StatusUnhealthy
130	}
131	if hasDegraded {
132		return StatusDegraded
133	}
134	return StatusHealthy
135}
136
137// HTTP handlers
138func LivenessHandler(w http.ResponseWriter, r *http.Request) {
139	// Liveness: Is the service running?
140	w.WriteHeader(http.StatusOK)
141	w.Write([]byte("alive"))
142}
143
144func ReadinessHandler(w http.ResponseWriter, r *http.Request) {
145	// Readiness: Is the service ready to accept traffic?
146	components := hc.Check(r.Context())
147	overall := hc.OverallStatus()
148
149	status := http.StatusOK
150	if overall != StatusHealthy {
151		status = http.StatusServiceUnavailable
152	}
153
154	response := map[string]interface{}{
155		"status":     overall,
156		"components": components,
157		"timestamp":  time.Now(),
158	}
159
160	w.Header().Set("Content-Type", "application/json")
161	w.WriteHeader(status)
162	json.NewEncoder(w).Encode(response)
163}
164
165func main() {
166	// Setup database
167	db, _ := sql.Open("postgres", "connection-string")
168	defer db.Close()
169
170	// Create health checker
171	healthCheck := NewHealthCheck(db)
172
173	// Setup routes
174	http.HandleFunc("/health/live", healthCheck.LivenessHandler)
175	http.HandleFunc("/health/ready", healthCheck.ReadinessHandler)
176
177	http.ListenAndServe(":8080", nil)
178}

Distributed Tracing

Imagine you're tracking a package through a complex delivery system. At each checkpoint, the package is scanned, creating a complete trail of its journey. Distributed tracing works the same way - it follows a request as it travels through multiple services, creating a detailed map of its path and identifying any delays or errors.

Distributed tracing tracks requests across microservices.

Common Pitfalls:

  • Too much tracing data can overwhelm your monitoring system
  • Missing context propagation breaks the trace chain
  • Inconsistent sampling can miss important issues

💡 Key Takeaway: Distributed tracing is essential for debugging microservices, but start with a sampling strategy that captures enough detail without overwhelming your systems.

OpenTelemetry Tracing

  1package tracing
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7
  8	"go.opentelemetry.io/otel"
  9	"go.opentelemetry.io/otel/attribute"
 10	"go.opentelemetry.io/otel/exporters/jaeger"
 11	"go.opentelemetry.io/otel/sdk/resource"
 12	tracesdk "go.opentelemetry.io/otel/sdk/trace"
 13	semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
 14	"go.opentelemetry.io/otel/trace"
 15)
 16
 17func InitTracer(serviceName, jaegerEndpoint string) {
 18	// Create Jaeger exporter
 19	exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerEndpoint)))
 20	if err != nil {
 21		return nil, err
 22	}
 23
 24	// Create tracer provider
 25	tp := tracesdk.NewTracerProvider(
 26		tracesdk.WithBatcher(exporter),
 27		tracesdk.WithResource(resource.NewWithAttributes(
 28			semconv.SchemaURL,
 29			semconv.ServiceNameKey.String(serviceName),
 30		)),
 31	)
 32
 33	otel.SetTracerProvider(tp)
 34
 35	return tp, nil
 36}
 37
 38// Example: Tracing HTTP handler
 39func TracedHandler(w http.ResponseWriter, r *http.Request) {
 40	tracer := otel.Tracer("order-service")
 41
 42	ctx, span := tracer.Start(r.Context(), "HandleOrder")
 43	defer span.End()
 44
 45	// Add attributes
 46	span.SetAttributes(
 47		attribute.String("user.id", "user-123"),
 48		attribute.String("order.id", "order-456"),
 49	)
 50
 51	// Call other services with traced context
 52	user, err := getUserWithTrace(ctx, "user-123")
 53	if err != nil {
 54		span.RecordError(err)
 55		http.Error(w, err.Error(), http.StatusInternalServerError)
 56		return
 57	}
 58
 59	product, err := getProductWithTrace(ctx, "product-789")
 60	if err != nil {
 61		span.RecordError(err)
 62		http.Error(w, err.Error(), http.StatusInternalServerError)
 63		return
 64	}
 65
 66	// Process order
 67	order := processOrder(ctx, user, product)
 68
 69	w.WriteHeader(http.StatusOK)
 70	json.NewEncoder(w).Encode(order)
 71}
 72
 73func getUserWithTrace(ctx context.Context, userID string) {
 74	tracer := otel.Tracer("order-service")
 75	ctx, span := tracer.Start(ctx, "GetUser")
 76	defer span.End()
 77
 78	span.SetAttributes(attribute.String("user.id", userID))
 79
 80	// Make HTTP request with trace context
 81	// Headers will include trace information
 82	user, err := callUserService(ctx, userID)
 83	if err != nil {
 84		span.RecordError(err)
 85		return nil, err
 86	}
 87
 88	return user, nil
 89}
 90
 91func getProductWithTrace(ctx context.Context, productID string) {
 92	tracer := otel.Tracer("order-service")
 93	ctx, span := tracer.Start(ctx, "GetProduct")
 94	defer span.End()
 95
 96	span.SetAttributes(attribute.String("product.id", productID))
 97
 98	product, err := callProductService(ctx, productID)
 99	if err != nil {
100		span.RecordError(err)
101		return nil, err
102	}
103
104	return product, nil
105}
106
107func processOrder(ctx context.Context, user *User, product *Product) *Order {
108	tracer := otel.Tracer("order-service")
109	_, span := tracer.Start(ctx, "ProcessOrder")
110	defer span.End()
111
112	// Business logic
113	order := &Order{
114		ID:        "order-new",
115		UserID:    user.ID,
116		ProductID: product.ID,
117	}
118
119	return order
120}

Practice Exercises

Exercise 1: E-Commerce Microservices Architecture

🎯 Learning Objectives:

  • Design and implement a complete microservices architecture from scratch
  • Understand service boundaries and responsibility separation
  • Implement inter-service communication patterns
  • Set up service discovery and API gateway patterns
  • Apply microservices design patterns and best practices

⏱️ Time Estimate: 180-240 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: A retail startup needs to build a scalable e-commerce platform that can handle Black Friday traffic spikes. Microservices architecture enables independent scaling, deployment, and fault isolation across different business domains.

Task: Create a production-ready e-commerce system using microservices architecture that demonstrates core patterns and real-world implementation challenges.

Core Services to Implement:

  • User Service: Authentication, authorization, profile management, JWT token generation
  • Product Service: Product catalog, inventory management, search and filtering
  • Order Service: Order processing workflow, order status tracking, order history
  • Payment Service: Payment processing, payment method management, transaction logging
  • Notification Service: Email notifications, SMS alerts, push notifications
  • API Gateway: Request routing, authentication, rate limiting, request aggregation

Infrastructure Components:

  • Service Discovery: Consul integration with health checks and service registration
  • Inter-Service Communication: gRPC for synchronous calls, message queues for async
  • Circuit Breakers: Fault tolerance and graceful degradation
  • Health Checks: Comprehensive health monitoring for all services
  • Distributed Tracing: Request tracking across service boundaries
  • Configuration Management: Externalized configuration with environment overrides

Advanced Features:

  • Database per service pattern with proper data isolation
  • Event-driven communication for eventual consistency
  • API versioning and backward compatibility
  • Load testing and performance optimization
  • Monitoring and observability setup
  • Container orchestration with Docker Compose
Solution Outline
Architecture:
- API Gateway
- User Service
- Product Service
- Order Service
- Payment Service
- Notification Service
- Consul
- Jaeger

Communication Flow:
1. Client -> API Gateway
2. API Gateway -> User/Product/Order Services
3. Order Service -> Payment Service
4. Payment Service -> Notification Service

Each service:
- gRPC server implementation
- HTTP health endpoints
- Consul registration
- OpenTelemetry tracing
- Circuit breakers for external calls
- Structured logging

Exercise 2: Distributed Transactions with Saga Pattern

🎯 Learning Objectives:

  • Understand the challenges of distributed transactions in microservices
  • Implement saga pattern for managing long-running business transactions
  • Design compensation actions for rolling back failed operations
  • Build saga orchestrator vs choreography patterns
  • Handle transaction state persistence and recovery

⏱️ Time Estimate: 120-150 minutes
📊 Difficulty: Advanced
🌍 Real-World Context: An e-commerce platform needs to handle complex order processing that spans multiple services. Traditional ACID transactions don't work across services, requiring saga pattern for maintaining data consistency.

Task: Implement a comprehensive saga pattern solution for order processing that ensures data consistency across multiple microservices without using distributed transactions.

Core Saga Components:

  • Saga Orchestrator: Central coordinator managing transaction lifecycle
  • Compensation Actions: Rollback operations for each transaction step
  • State Management: Persistent saga state tracking and recovery
  • Timeout Handling: Automatic rollback for long-running operations
  • Retry Logic: Configurable retry policies for failed operations

Order Processing Workflow:

  1. Inventory Reservation: Reserve items and lock stock
  2. Payment Validation: Validate payment method and process payment
  3. Order Creation: Create order record with status tracking
  4. Shipping Arrangement: Schedule shipping and generate tracking
  5. Notification Dispatch: Send confirmation emails and SMS
  6. Loyalty Points: Award customer loyalty points

Failure Scenarios:

  • Payment gateway timeout
  • Inventory out of stock after reservation
  • Shipping service unavailable
  • Email delivery failures
  • Loyalty service errors

Advanced Patterns:

  • Event-driven saga choreography
  • Event sourcing for saga events
  • Saga monitoring and observability
  • Manual intervention for failed sagas
  • Performance optimization and batch processing
Solution: See Saga implementation in article

Extend the saga example to include:

  • Persistent saga state
  • Saga coordinator service
  • Event sourcing for saga steps
  • Monitoring dashboard
  • Retry logic for failed steps

Exercise 3: Service Discovery and Load Balancing

🎯 Learning Objectives:

  • Build a comprehensive service discovery system from scratch
  • Implement health checking mechanisms with configurable intervals
  • Design load balancing algorithms for optimal traffic distribution
  • Create service registration and deregistration workflows
  • Handle service metadata and versioning for blue-green deployments

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A cloud-native application with dozens of microservices needs dynamic service discovery to handle container orchestration, auto-scaling, and blue-green deployments. Service discovery enables services to find and communicate with each other without hardcoded configurations.

Task: Create a production-ready service discovery system that manages service registration, health monitoring, and intelligent load balancing across dynamic service instances.

Core Discovery Components:

  • Service Registry: Central repository of available service instances
  • Health Checker: Active and passive health monitoring with custom endpoints
  • Load Balancer: Multiple algorithms
  • Service Watcher: Real-time notifications for service changes
  • Metadata Manager: Service versioning, tags, and custom attributes

Health Monitoring Features:

  • Configurable health check intervals and timeouts
  • HTTP, TCP, and gRPC health check protocols
  • Gradual traffic drain for service shutdowns
  • Automatic service deregistration for unhealthy instances
  • Circuit breaker integration for failing services

Load Balancing Algorithms:

  • Round Robin: Even distribution across healthy instances
  • Weighted Round Robin: Based on instance capacity or metadata
  • Least Connections: Route to instance with fewest active connections
  • Random: Simple random selection for testing
  • IP Hash: Session affinity for stateful services

Advanced Features:

  • Service versioning and traffic splitting
  • Geographic and region-based routing
  • Service dependencies and topology mapping
  • Performance metrics and analytics
  • Integration with container orchestrators
  • DNS and API-based discovery interfaces
Solution
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"log"
  9	"math/rand"
 10	"net/http"
 11	"sync"
 12	"time"
 13)
 14
 15// Service represents a registered service instance
 16type Service struct {
 17	ID       string            `json:"id"`
 18	Name     string            `json:"name"`
 19	Address  string            `json:"address"`
 20	Port     int               `json:"port"`
 21	Version  string            `json:"version"`
 22	Tags     []string          `json:"tags"`
 23	Metadata map[string]string `json:"metadata"`
 24	Health   HealthStatus      `json:"health"`
 25	LastSeen time.Time         `json:"last_seen"`
 26}
 27
 28type HealthStatus string
 29
 30const (
 31	HealthHealthy   HealthStatus = "healthy"
 32	HealthUnhealthy HealthStatus = "unhealthy"
 33	HealthUnknown   HealthStatus = "unknown"
 34)
 35
 36// ServiceRegistry manages service registration and discovery
 37type ServiceRegistry struct {
 38	mu       sync.RWMutex
 39	services map[string]map[string]*Service // service-name -> service-id -> service
 40	watchers map[string][]chan *ServiceEvent
 41}
 42
 43type ServiceEvent struct {
 44	Type    string   `json:"type"` // "register", "deregister", "health_change"
 45	Service *Service `json:"service"`
 46}
 47
 48func NewServiceRegistry() *ServiceRegistry {
 49	registry := &ServiceRegistry{
 50		services: make(map[string]map[string]*Service),
 51		watchers: make(map[string][]chan *ServiceEvent),
 52	}
 53
 54	// Start background health check
 55	go registry.healthCheckLoop()
 56
 57	return registry
 58}
 59
 60// Register a new service instance
 61func Register(service *Service) error {
 62	sr.mu.Lock()
 63	defer sr.mu.Unlock()
 64
 65	if sr.services[service.Name] == nil {
 66		sr.services[service.Name] = make(map[string]*Service)
 67	}
 68
 69	service.LastSeen = time.Now()
 70	service.Health = HealthHealthy
 71	sr.services[service.Name][service.ID] = service
 72
 73	log.Printf("Registered service: %s at %s:%d", service.Name, service.ID, service.Address, service.Port)
 74
 75	// Notify watchers
 76	sr.notifyWatchers(service.Name, &ServiceEvent{
 77		Type:    "register",
 78		Service: service,
 79	})
 80
 81	return nil
 82}
 83
 84// Deregister a service instance
 85func Deregister(serviceName, serviceID string) error {
 86	sr.mu.Lock()
 87	defer sr.mu.Unlock()
 88
 89	if instances, ok := sr.services[serviceName]; ok {
 90		if service, ok := instances[serviceID]; ok {
 91			delete(instances, serviceID)
 92
 93			log.Printf("Deregistered service: %s", serviceName, serviceID)
 94
 95			// Notify watchers
 96			sr.notifyWatchers(serviceName, &ServiceEvent{
 97				Type:    "deregister",
 98				Service: service,
 99			})
100
101			return nil
102		}
103	}
104
105	return fmt.Errorf("service not found: %s/%s", serviceName, serviceID)
106}
107
108// Discover returns all healthy instances of a service
109func Discover(serviceName string) {
110	sr.mu.RLock()
111	defer sr.mu.RUnlock()
112
113	instances, ok := sr.services[serviceName]
114	if !ok || len(instances) == 0 {
115		return nil, fmt.Errorf("no instances found for service: %s", serviceName)
116	}
117
118	var healthy []*Service
119	for _, service := range instances {
120		if service.Health == HealthHealthy {
121			healthy = append(healthy, service)
122		}
123	}
124
125	if len(healthy) == 0 {
126		return nil, fmt.Errorf("no healthy instances found for service: %s", serviceName)
127	}
128
129	return healthy, nil
130}
131
132// GetInstance returns a single instance using load balancing
133func GetInstance(serviceName string, strategy string) {
134	instances, err := sr.Discover(serviceName)
135	if err != nil {
136		return nil, err
137	}
138
139	switch strategy {
140	case "round-robin":
141		return instances[rand.Intn(len(instances))], nil
142	case "random":
143		return instances[rand.Intn(len(instances))], nil
144	default:
145		return instances[0], nil
146	}
147}
148
149// Heartbeat updates the last seen time for a service
150func Heartbeat(serviceName, serviceID string) error {
151	sr.mu.Lock()
152	defer sr.mu.Unlock()
153
154	if instances, ok := sr.services[serviceName]; ok {
155		if service, ok := instances[serviceID]; ok {
156			service.LastSeen = time.Now()
157			if service.Health != HealthHealthy {
158				service.Health = HealthHealthy
159				log.Printf("Service %s/%s is now healthy", serviceName, serviceID)
160
161				// Notify watchers
162				sr.notifyWatchers(serviceName, &ServiceEvent{
163					Type:    "health_change",
164					Service: service,
165				})
166			}
167			return nil
168		}
169	}
170
171	return fmt.Errorf("service not found: %s/%s", serviceName, serviceID)
172}
173
174// Watch returns a channel that receives service events
175func Watch(serviceName string) <-chan *ServiceEvent {
176	sr.mu.Lock()
177	defer sr.mu.Unlock()
178
179	ch := make(chan *ServiceEvent, 10)
180	sr.watchers[serviceName] = append(sr.watchers[serviceName], ch)
181
182	return ch
183}
184
185func notifyWatchers(serviceName string, event *ServiceEvent) {
186	for _, ch := range sr.watchers[serviceName] {
187		select {
188		case ch <- event:
189		default:
190			// Channel full, skip
191		}
192	}
193}
194
195// Health check loop
196func healthCheckLoop() {
197	ticker := time.NewTicker(10 * time.Second)
198	defer ticker.Stop()
199
200	for range ticker.C {
201		sr.mu.Lock()
202
203		for serviceName, instances := range sr.services {
204			for serviceID, service := range instances {
205				// Check if service is stale
206				if time.Since(service.LastSeen) > 30*time.Second {
207					if service.Health != HealthUnhealthy {
208						service.Health = HealthUnhealthy
209						log.Printf("Service %s/%s is unhealthy", serviceName, serviceID)
210
211						// Notify watchers
212						sr.notifyWatchers(serviceName, &ServiceEvent{
213							Type:    "health_change",
214							Service: service,
215						})
216					}
217
218					// Remove if unhealthy for too long
219					if time.Since(service.LastSeen) > 60*time.Second {
220						delete(instances, serviceID)
221						log.Printf("Removed stale service: %s/%s", serviceName, serviceID)
222
223						sr.notifyWatchers(serviceName, &ServiceEvent{
224							Type:    "deregister",
225							Service: service,
226						})
227					}
228				}
229			}
230		}
231
232		sr.mu.Unlock()
233	}
234}
235
236// HTTP API for service registry
237type RegistryAPI struct {
238	registry *ServiceRegistry
239}
240
241func NewRegistryAPI(registry *ServiceRegistry) *RegistryAPI {
242	return &RegistryAPI{registry: registry}
243}
244
245func ServeHTTP(w http.ResponseWriter, r *http.Request) {
246	switch r.URL.Path {
247	case "/register":
248		api.handleRegister(w, r)
249	case "/deregister":
250		api.handleDeregister(w, r)
251	case "/discover":
252		api.handleDiscover(w, r)
253	case "/heartbeat":
254		api.handleHeartbeat(w, r)
255	default:
256		http.NotFound(w, r)
257	}
258}
259
260func handleRegister(w http.ResponseWriter, r *http.Request) {
261	var service Service
262	if err := json.NewDecoder(r.Body).Decode(&service); err != nil {
263		http.Error(w, err.Error(), http.StatusBadRequest)
264		return
265	}
266
267	if err := api.registry.Register(&service); err != nil {
268		http.Error(w, err.Error(), http.StatusInternalServerError)
269		return
270	}
271
272	w.WriteHeader(http.StatusCreated)
273	json.NewEncoder(w).Encode(service)
274}
275
276func handleDeregister(w http.ResponseWriter, r *http.Request) {
277	serviceName := r.URL.Query().Get("name")
278	serviceID := r.URL.Query().Get("id")
279
280	if err := api.registry.Deregister(serviceName, serviceID); err != nil {
281		http.Error(w, err.Error(), http.StatusNotFound)
282		return
283	}
284
285	w.WriteHeader(http.StatusOK)
286}
287
288func handleDiscover(w http.ResponseWriter, r *http.Request) {
289	serviceName := r.URL.Query().Get("name")
290
291	instances, err := api.registry.Discover(serviceName)
292	if err != nil {
293		http.Error(w, err.Error(), http.StatusNotFound)
294		return
295	}
296
297	json.NewEncoder(w).Encode(instances)
298}
299
300func handleHeartbeat(w http.ResponseWriter, r *http.Request) {
301	serviceName := r.URL.Query().Get("name")
302	serviceID := r.URL.Query().Get("id")
303
304	if err := api.registry.Heartbeat(serviceName, serviceID); err != nil {
305		http.Error(w, err.Error(), http.StatusNotFound)
306		return
307	}
308
309	w.WriteHeader(http.StatusOK)
310}
311
312// Example service that registers itself
313type ExampleService struct {
314	id       string
315	name     string
316	port     int
317	registry *ServiceRegistry
318	stopCh   chan struct{}
319}
320
321func NewExampleService(name string, port int, registry *ServiceRegistry) *ExampleService {
322	return &ExampleService{
323		id:       fmt.Sprintf("%s-%d", name, time.Now().UnixNano()),
324		name:     name,
325		port:     port,
326		registry: registry,
327		stopCh:   make(chan struct{}),
328	}
329}
330
331func Start() {
332	// Register service
333	service := &Service{
334		ID:      s.id,
335		Name:    s.name,
336		Address: "localhost",
337		Port:    s.port,
338		Version: "1.0.0",
339		Tags:    []string{"production"},
340		Metadata: map[string]string{
341			"region": "us-east-1",
342		},
343	}
344
345	s.registry.Register(service)
346
347	// Send heartbeats
348	go func() {
349		ticker := time.NewTicker(5 * time.Second)
350		defer ticker.Stop()
351
352		for {
353			select {
354			case <-ticker.C:
355				s.registry.Heartbeat(s.name, s.id)
356			case <-s.stopCh:
357				return
358			}
359		}
360	}()
361
362	// Simple HTTP server setup
363	// See also: [Web Development](/02-standard-library/02-web-development) for comprehensive HTTP server patterns
364	http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
365		w.WriteHeader(http.StatusOK)
366		w.Write([]byte("OK"))
367	})
368
369	log.Printf("Service %s starting on port %d", s.name, s.port)
370	http.ListenAndServe(fmt.Sprintf(":%d", s.port), nil)
371}
372
373func Stop() {
374	close(s.stopCh)
375	s.registry.Deregister(s.name, s.id)
376}
377
378func main() {
379	registry := NewServiceRegistry()
380
381	// Start registry API
382	api := NewRegistryAPI(registry)
383	go func() {
384		log.Println("Service Registry API starting on :8500")
385		http.ListenAndServe(":8500", api)
386	}()
387
388	// Start multiple instances of a service
389	for i := 0; i < 3; i++ {
390		service := NewExampleService("api-service", 9000+i, registry)
391		go service.Start()
392	}
393
394	// Wait for services to register
395	time.Sleep(2 * time.Second)
396
397	// Demonstrate service discovery
398	instances, _ := registry.Discover("api-service")
399	fmt.Printf("\nDiscovered %d instances of api-service:\n", len(instances))
400	for _, instance := range instances {
401		fmt.Printf("  - %s at %s:%d\n",
402			instance.ID, instance.Address, instance.Port, instance.Health)
403	}
404
405	// Watch for service changes
406	watch := registry.Watch("api-service")
407	go func() {
408		for event := range watch {
409			fmt.Printf("Service event: %s - %s/%s\n", event.Type, event.Service.Name, event.Service.ID)
410		}
411	}()
412
413	fmt.Println("\nService Registry running. Press Ctrl+C to exit.")
414	fmt.Println("Test endpoints:")
415	fmt.Println("  curl http://localhost:8500/discover?name=api-service")
416	fmt.Println("  curl http://localhost:8500/heartbeat?name=api-service&id=<service-id>")
417
418	select {}
419}

Key Features:

  1. Service Registration: Register service instances with metadata
  2. Health Checks: Automatic health monitoring via heartbeats
  3. Service Discovery: Find all healthy instances by service name
  4. Load Balancing: Round-robin and random strategies
  5. Automatic Cleanup: Remove stale/unhealthy services
  6. Watch Mechanism: Subscribe to service changes in real-time
  7. HTTP API: RESTful API for registration and discovery
  8. Metadata Support: Version, tags, and custom metadata
  9. Concurrent Safe: Thread-safe operations with proper locking
  10. Production Ready: Heartbeat-based health monitoring

Exercise 4: Distributed Tracing and Observability

🎯 Learning Objectives:

  • Implement comprehensive distributed tracing across service boundaries
  • Understand trace context propagation and span relationships
  • Create meaningful spans for database queries, external calls, and business operations
  • Set up observability stack with Jaeger and OpenTelemetry
  • Analyze performance bottlenecks and service dependencies

⏱️ Time Estimate: 90-120 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A microservices application experiencing performance issues needs distributed tracing to identify bottlenecks, understand service dependencies, and debug requests across multiple services. Without proper tracing, debugging production issues becomes nearly impossible.

Task: Build a complete distributed tracing system that provides end-to-end visibility into request flows across microservices with meaningful instrumentation and analysis capabilities.

Core Tracing Components:

  • Trace Propagation: Context propagation across HTTP and gRPC boundaries
  • Span Creation: Automatic and manual span instrumentation
  • OpenTelemetry Integration: Vendor-neutral tracing implementation
  • Jaeger Backend: Trace collection, storage, and visualization
  • Custom Attributes: Business-relevant metadata for debugging

Instrumentation Targets:

  • HTTP Requests: Incoming and outgoing HTTP calls with full context
  • Database Operations: Query execution, connection pool usage, slow queries
  • External API Calls: Third-party service interactions with retry tracking
  • Message Processing: Queue operations and async processing
  • Business Operations: Order processing, payment flows, user interactions

Trace Analysis Features:

  • Service dependency mapping and topology visualization
  • Performance bottleneck identification and hot path analysis
  • Error tracking and exception correlation across services
  • Custom dashboards for business-critical operations
  • Alerting on performance degradation and error rates

Advanced Patterns:

  • Trace sampling strategies for high-traffic systems
  • Baggage propagation for cross-service context
  • Custom span events and structured logging integration
  • Performance regression detection
  • SLA monitoring and compliance tracking
Solution
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"io"
  9	"log"
 10	"math/rand"
 11	"net/http"
 12	"time"
 13)
 14
 15// Span represents a unit of work in a trace
 16type Span struct {
 17	TraceID    string                 `json:"trace_id"`
 18	SpanID     string                 `json:"span_id"`
 19	ParentID   string                 `json:"parent_id,omitempty"`
 20	Name       string                 `json:"name"`
 21	StartTime  time.Time              `json:"start_time"`
 22	EndTime    time.Time              `json:"end_time,omitempty"`
 23	Duration   time.Duration          `json:"duration,omitempty"`
 24	Attributes map[string]interface{} `json:"attributes"`
 25	Events     []SpanEvent            `json:"events"`
 26	Status     SpanStatus             `json:"status"`
 27}
 28
 29type SpanEvent struct {
 30	Name       string                 `json:"name"`
 31	Timestamp  time.Time              `json:"timestamp"`
 32	Attributes map[string]interface{} `json:"attributes,omitempty"`
 33}
 34
 35type SpanStatus struct {
 36	Code    string `json:"code"` // "OK", "ERROR"
 37	Message string `json:"message,omitempty"`
 38}
 39
 40// Tracer manages span creation and context propagation
 41type Tracer struct {
 42	serviceName string
 43	spans       []*Span
 44}
 45
 46func NewTracer(serviceName string) *Tracer {
 47	return &Tracer{
 48		serviceName: serviceName,
 49		spans:       []*Span{},
 50	}
 51}
 52
 53// StartSpan creates a new span
 54func StartSpan(ctx context.Context, name string) {
 55	span := &Span{
 56		TraceID:    getTraceIDFromContext(ctx),
 57		SpanID:     generateID(),
 58		ParentID:   getSpanIDFromContext(ctx),
 59		Name:       name,
 60		StartTime:  time.Now(),
 61		Attributes: map[string]interface{}{
 62			"service.name": t.serviceName,
 63		},
 64		Events: []SpanEvent{},
 65		Status: SpanStatus{Code: "OK"},
 66	}
 67
 68	if span.TraceID == "" {
 69		span.TraceID = generateID()
 70	}
 71
 72	// Add span to context
 73	ctx = context.WithValue(ctx, "trace_id", span.TraceID)
 74	ctx = context.WithValue(ctx, "span_id", span.SpanID)
 75	ctx = context.WithValue(ctx, "span", span)
 76
 77	return span, ctx
 78}
 79
 80// EndSpan finalizes a span
 81func EndSpan(span *Span) {
 82	span.EndTime = time.Now()
 83	span.Duration = span.EndTime.Sub(span.StartTime)
 84
 85	t.spans = append(t.spans, span)
 86
 87	log.Printf("[TRACE] %s - %s",
 88		t.serviceName, span.Name, span.TraceID[:8], span.SpanID[:8], span.Duration)
 89}
 90
 91// SetSpanAttribute adds a custom attribute to a span
 92func SetSpanAttribute(span *Span, key string, value interface{}) {
 93	if span != nil {
 94		span.Attributes[key] = value
 95	}
 96}
 97
 98// AddSpanEvent adds an event to a span
 99func AddSpanEvent(span *Span, name string, attributes map[string]interface{}) {
100	if span != nil {
101		span.Events = append(span.Events, SpanEvent{
102			Name:       name,
103			Timestamp:  time.Now(),
104			Attributes: attributes,
105		})
106	}
107}
108
109// RecordError records an error in a span
110func RecordError(span *Span, err error) {
111	if span != nil && err != nil {
112		span.Status.Code = "ERROR"
113		span.Status.Message = err.Error()
114		span.Attributes["error"] = true
115		span.Attributes["error.message"] = err.Error()
116
117		AddSpanEvent(span, "exception", map[string]interface{}{
118			"exception.message": err.Error(),
119			"exception.type":    "error",
120		})
121	}
122}
123
124// PropagateContext injects trace context into HTTP headers
125func PropagateContext(ctx context.Context, header http.Header) {
126	traceID := getTraceIDFromContext(ctx)
127	spanID := getSpanIDFromContext(ctx)
128
129	if traceID != "" {
130		header.Set("X-Trace-ID", traceID)
131	}
132	if spanID != "" {
133		header.Set("X-Parent-Span-ID", spanID)
134	}
135}
136
137// ExtractContext extracts trace context from HTTP headers
138func ExtractContext(ctx context.Context, header http.Header) context.Context {
139	traceID := header.Get("X-Trace-ID")
140	parentSpanID := header.Get("X-Parent-Span-ID")
141
142	if traceID != "" {
143		ctx = context.WithValue(ctx, "trace_id", traceID)
144	}
145	if parentSpanID != "" {
146		ctx = context.WithValue(ctx, "parent_span_id", parentSpanID)
147	}
148
149	return ctx
150}
151
152// Helper functions
153func getTraceIDFromContext(ctx context.Context) string {
154	if traceID, ok := ctx.Value("trace_id").(string); ok {
155		return traceID
156	}
157	return ""
158}
159
160func getSpanIDFromContext(ctx context.Context) string {
161	if spanID, ok := ctx.Value("span_id").(string); ok {
162		return spanID
163	}
164	if parentSpanID, ok := ctx.Value("parent_span_id").(string); ok {
165		return parentSpanID
166	}
167	return ""
168}
169
170func generateID() string {
171	const charset = "abcdef0123456789"
172	b := make([]byte, 16)
173	for i := range b {
174		b[i] = charset[rand.Intn(len(charset))]
175	}
176	return string(b)
177}
178
179// Instrumented HTTP Client
180type InstrumentedClient struct {
181	tracer *Tracer
182	client *http.Client
183}
184
185func NewInstrumentedClient(tracer *Tracer) *InstrumentedClient {
186	return &InstrumentedClient{
187		tracer: tracer,
188		client: &http.Client{Timeout: 10 * time.Second},
189	}
190}
191
192func Get(ctx context.Context, url string) {
193	span, ctx := c.tracer.StartSpan(ctx, "HTTP GET "+url)
194	defer c.tracer.EndSpan(span)
195
196	SetSpanAttribute(span, "http.method", "GET")
197	SetSpanAttribute(span, "http.url", url)
198
199	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
200	if err != nil {
201		RecordError(span, err)
202		return nil, err
203	}
204
205	// Propagate trace context
206	PropagateContext(ctx, req.Header)
207
208	resp, err := c.client.Do(req)
209	if err != nil {
210		RecordError(span, err)
211		return nil, err
212	}
213
214	SetSpanAttribute(span, "http.status_code", resp.StatusCode)
215
216	if resp.StatusCode >= 400 {
217		RecordError(span, fmt.Errorf("HTTP error: %d", resp.StatusCode))
218	}
219
220	return resp, nil
221}
222
223// Example Services
224type ServiceA struct {
225	tracer *Tracer
226	client *InstrumentedClient
227}
228
229func NewServiceA() *ServiceA {
230	tracer := NewTracer("service-a")
231	return &ServiceA{
232		tracer: tracer,
233		client: NewInstrumentedClient(tracer),
234	}
235}
236
237func HandleRequest(w http.ResponseWriter, r *http.Request) {
238	// Extract trace context from incoming request
239	ctx := ExtractContext(r.Context(), r.Header)
240
241	span, ctx := s.tracer.StartSpan(ctx, "ServiceA.HandleRequest")
242	defer s.tracer.EndSpan(span)
243
244	SetSpanAttribute(span, "http.method", r.Method)
245	SetSpanAttribute(span, "http.path", r.URL.Path)
246	SetSpanAttribute(span, "http.remote_addr", r.RemoteAddr)
247
248	// Simulate database query
249	dbSpan, dbCtx := s.tracer.StartSpan(ctx, "Database Query")
250	SetSpanAttribute(dbSpan, "db.system", "postgresql")
251	SetSpanAttribute(dbSpan, "db.query", "SELECT * FROM users WHERE id = $1")
252
253	time.Sleep(20 * time.Millisecond) // Simulate DB query
254
255	AddSpanEvent(dbSpan, "query.executed", map[string]interface{}{
256		"rows.affected": 1,
257	})
258
259	s.tracer.EndSpan(dbSpan)
260
261	// Call Service B
262	resp, err := s.client.Get(dbCtx, "http://localhost:8082/api")
263	if err != nil {
264		RecordError(span, err)
265		http.Error(w, err.Error(), http.StatusInternalServerError)
266		return
267	}
268	defer resp.Body.Close()
269
270	body, _ := io.ReadAll(resp.Body)
271
272	response := map[string]interface{}{
273		"service":  "A",
274		"trace_id": span.TraceID,
275		"data":     string(body),
276	}
277
278	w.Header().Set("Content-Type", "application/json")
279	json.NewEncoder(w).Encode(response)
280}
281
282type ServiceB struct {
283	tracer *Tracer
284}
285
286func NewServiceB() *ServiceB {
287	return &ServiceB{
288		tracer: NewTracer("service-b"),
289	}
290}
291
292func HandleRequest(w http.ResponseWriter, r *http.Request) {
293	// Extract trace context
294	ctx := ExtractContext(r.Context(), r.Header)
295
296	span, ctx := s.tracer.StartSpan(ctx, "ServiceB.HandleRequest")
297	defer s.tracer.EndSpan(span)
298
299	SetSpanAttribute(span, "http.method", r.Method)
300	SetSpanAttribute(span, "http.path", r.URL.Path)
301
302	// Simulate processing
303	processingSpan, _ := s.tracer.StartSpan(ctx, "ProcessData")
304	SetSpanAttribute(processingSpan, "data.size", 1024)
305
306	time.Sleep(30 * time.Millisecond) // Simulate processing
307
308	// Simulate cache lookup
309	AddSpanEvent(processingSpan, "cache.lookup", map[string]interface{}{
310		"cache.hit":  true,
311		"cache.key": "user:123",
312	})
313
314	s.tracer.EndSpan(processingSpan)
315
316	response := map[string]interface{}{
317		"service":  "B",
318		"trace_id": span.TraceID,
319		"message":  "Processed by Service B",
320	}
321
322	w.Header().Set("Content-Type", "application/json")
323	json.NewEncoder(w).Encode(response)
324}
325
326func main() {
327	serviceA := NewServiceA()
328	serviceB := NewServiceB()
329
330	// Service A on port 8081
331	go func() {
332		http.HandleFunc("/api", serviceA.HandleRequest)
333		log.Println("Service A starting on :8081")
334		http.ListenAndServe(":8081", nil)
335	}()
336
337	// Service B on port 8082
338	go func() {
339		mux := http.NewServeMux()
340		mux.HandleFunc("/api", serviceB.HandleRequest)
341		log.Println("Service B starting on :8082")
342		http.ListenAndServe(":8082", mux)
343	}()
344
345	// Wait for services to start
346	time.Sleep(1 * time.Second)
347
348	fmt.Println("\nDistributed Tracing Demo")
349	fmt.Println("========================")
350	fmt.Println("\nTest the traced request:")
351	fmt.Println("  curl http://localhost:8081/api")
352	fmt.Println("\nTrace propagation:")
353	fmt.Println("  Service A -> Database -> Service B")
354	fmt.Println("\nWatch the console for trace logs showing:")
355	fmt.Println("  - Trace IDs propagating across services")
356	fmt.Println("  - Span hierarchy")
357	fmt.Println("  - Operation durations")
358	fmt.Println("  - Custom attributes and events")
359
360	select {}
361}

Key Tracing Concepts:

  1. Trace Context Propagation: TraceID and SpanID passed via HTTP headers
  2. Span Hierarchy: Parent-child relationships between operations
  3. Custom Attributes: Add metadata to spans for debugging
  4. Events: Record significant moments within a span
  5. Error Tracking: Capture and propagate errors through traces
  6. Performance Metrics: Automatic duration tracking
  7. Service Topology: Understand service dependencies
  8. Request Flow: Track requests across multiple services
  9. Instrumentation: Wrap HTTP clients and database calls
  10. Production Ready: Minimal overhead, structured logging

Trace Visualization:

Request Flow:
ServiceA.HandleRequest
├── Database Query
│   └── query.executed event
└── HTTP GET http://localhost:8082/api
    └── ServiceB.HandleRequest
        └── ProcessData
            └── cache.lookup event

Exercise 5: Resilience Patterns and Circuit Breakers

🎯 Learning Objectives:

  • Implement circuit breaker pattern for fault tolerance
  • Design fallback mechanisms for graceful degradation
  • Understand failure detection and recovery patterns
  • Build retry logic with exponential backoff
  • Create comprehensive resilience monitoring and alerting

⏱️ Time Estimate: 75-90 minutes
📊 Difficulty: Intermediate
🌍 Real-World Context: A microservices application experiencing cascading failures needs circuit breakers to prevent single service failures from bringing down the entire system. Proper resilience patterns ensure system stability during partial outages and performance degradation.

Task: Implement a comprehensive resilience patterns library that protects microservices from cascading failures and provides graceful degradation when dependencies are unavailable.

Core Resilience Patterns:

  • Circuit Breaker: Prevent calls to failing services with automatic recovery
  • Retry Mechanism: Configurable retry policies with exponential backoff
  • Fallback Handlers: Alternative responses when primary service fails
  • Bulkhead Isolation: Resource isolation to prevent resource exhaustion
  • Timeout Management: Request timeout handling and cancellation

Circuit Breaker States:

  • Closed: Normal operation with failure tracking
  • Open: Rejecting all requests with immediate fallback
  • Half-Open: Limited requests to test service recovery
  • Automatic Transitions: State changes based on success/failure rates

Monitoring and Observability:

  • Real-time circuit breaker state monitoring
  • Success/failure rate metrics and trends
  • Response time distributions and percentiles
  • Fallback usage tracking and effectiveness
  • Alert thresholds for performance degradation

Advanced Features:

  • Circuit breaker configuration per service endpoint
  • Dynamic threshold adjustment based on traffic patterns
  • Integration with service discovery for automatic setup
  • Circuit breaker dashboards and operational insights
  • Load shedding and admission control during high load
  • Distributed circuit breaker coordination
Solution
  1// run
  2package main
  3
  4import (
  5	"context"
  6	"errors"
  7	"fmt"
  8	"log"
  9	"net/http"
 10	"sync"
 11	"time"
 12)
 13
 14// CircuitState represents the state of the circuit breaker
 15type CircuitState string
 16
 17const (
 18	StateClosed   CircuitState = "CLOSED"   // Normal operation
 19	StateOpen     CircuitState = "OPEN"     // Failing, reject requests
 20	StateHalfOpen CircuitState = "HALF_OPEN" // Testing if service recovered
 21)
 22
 23// CircuitBreaker implements the circuit breaker pattern
 24type CircuitBreaker struct {
 25	name           string
 26	maxFailures    int
 27	resetTimeout   time.Duration
 28	halfOpenMaxReq int
 29
 30	mu               sync.RWMutex
 31	state            CircuitState
 32	failures         int
 33	successes        int
 34	lastFailureTime  time.Time
 35	lastStateChange  time.Time
 36	halfOpenRequests int
 37
 38	// Metrics
 39	totalRequests     int64
 40	totalSuccesses    int64
 41	totalFailures     int64
 42	totalRejected     int64
 43}
 44
 45// Config for circuit breaker
 46type CircuitBreakerConfig struct {
 47	Name           string
 48	MaxFailures    int           // Number of failures before opening
 49	ResetTimeout   time.Duration // Time before attempting half-open
 50	HalfOpenMaxReq int           // Max requests allowed in half-open state
 51}
 52
 53func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker {
 54	if config.MaxFailures == 0 {
 55		config.MaxFailures = 5
 56	}
 57	if config.ResetTimeout == 0 {
 58		config.ResetTimeout = 60 * time.Second
 59	}
 60	if config.HalfOpenMaxReq == 0 {
 61		config.HalfOpenMaxReq = 3
 62	}
 63
 64	return &CircuitBreaker{
 65		name:           config.Name,
 66		maxFailures:    config.MaxFailures,
 67		resetTimeout:   config.ResetTimeout,
 68		halfOpenMaxReq: config.HalfOpenMaxReq,
 69		state:          StateClosed,
 70		lastStateChange: time.Now(),
 71	}
 72}
 73
 74// Execute runs the function with circuit breaker protection
 75func Execute(fn func() error) error {
 76	cb.mu.Lock()
 77
 78	// Check if we should attempt recovery
 79	if cb.state == StateOpen {
 80		if time.Since(cb.lastFailureTime) > cb.resetTimeout {
 81			cb.setState(StateHalfOpen)
 82			log.Printf("[%s] Circuit breaker entering HALF-OPEN state", cb.name)
 83		}
 84	}
 85
 86	// Check state
 87	switch cb.state {
 88	case StateOpen:
 89		cb.totalRejected++
 90		cb.mu.Unlock()
 91		return ErrCircuitOpen
 92
 93	case StateHalfOpen:
 94		if cb.halfOpenRequests >= cb.halfOpenMaxReq {
 95			cb.totalRejected++
 96			cb.mu.Unlock()
 97			return ErrTooManyRequests
 98		}
 99		cb.halfOpenRequests++
100	}
101
102	cb.totalRequests++
103	cb.mu.Unlock()
104
105	// Execute the function
106	err := fn()
107
108	// Record result
109	cb.mu.Lock()
110	defer cb.mu.Unlock()
111
112	if err != nil {
113		cb.onFailure()
114		return err
115	}
116
117	cb.onSuccess()
118	return nil
119}
120
121func onSuccess() {
122	cb.totalSuccesses++
123	cb.failures = 0
124
125	switch cb.state {
126	case StateHalfOpen:
127		cb.successes++
128		if cb.successes >= cb.halfOpenMaxReq {
129			cb.setState(StateClosed)
130			log.Printf("[%s] Circuit breaker CLOSED after successful recovery", cb.name)
131		}
132	}
133}
134
135func onFailure() {
136	cb.totalFailures++
137	cb.failures++
138	cb.lastFailureTime = time.Now()
139
140	switch cb.state {
141	case StateClosed:
142		if cb.failures >= cb.maxFailures {
143			cb.setState(StateOpen)
144			log.Printf("[%s] Circuit breaker OPENED after %d failures", cb.name, cb.failures)
145		}
146
147	case StateHalfOpen:
148		cb.setState(StateOpen)
149		log.Printf("[%s] Circuit breaker OPENED during recovery attempt", cb.name)
150	}
151}
152
153func setState(state CircuitState) {
154	cb.state = state
155	cb.lastStateChange = time.Now()
156
157	if state == StateHalfOpen {
158		cb.halfOpenRequests = 0
159		cb.successes = 0
160	}
161}
162
163// GetState returns the current state
164func GetState() CircuitState {
165	cb.mu.RLock()
166	defer cb.mu.RUnlock()
167	return cb.state
168}
169
170// GetMetrics returns circuit breaker metrics
171func GetMetrics() CircuitBreakerMetrics {
172	cb.mu.RLock()
173	defer cb.mu.RUnlock()
174
175	var successRate float64
176	if cb.totalRequests > 0 {
177		successRate = float64(cb.totalSuccesses) / float64(cb.totalRequests) * 100
178	}
179
180	return CircuitBreakerMetrics{
181		Name:           cb.name,
182		State:          cb.state,
183		TotalRequests:  cb.totalRequests,
184		TotalSuccesses: cb.totalSuccesses,
185		TotalFailures:  cb.totalFailures,
186		TotalRejected:  cb.totalRejected,
187		SuccessRate:    successRate,
188		LastStateChange: cb.lastStateChange,
189	}
190}
191
192type CircuitBreakerMetrics struct {
193	Name            string        `json:"name"`
194	State           CircuitState  `json:"state"`
195	TotalRequests   int64         `json:"total_requests"`
196	TotalSuccesses  int64         `json:"total_successes"`
197	TotalFailures   int64         `json:"total_failures"`
198	TotalRejected   int64         `json:"total_rejected"`
199	SuccessRate     float64       `json:"success_rate"`
200	LastStateChange time.Time     `json:"last_state_change"`
201}
202
203var (
204	ErrCircuitOpen      = errors.New("circuit breaker is open")
205	ErrTooManyRequests  = errors.New("too many requests in half-open state")
206	ErrServiceUnavailable = errors.New("service unavailable")
207)
208
209// Example HTTP client with circuit breaker
210type ResilientHTTPClient struct {
211	breaker *CircuitBreaker
212	client  *http.Client
213	baseURL string
214}
215
216func NewResilientHTTPClient(name string, baseURL string) *ResilientHTTPClient {
217	return &ResilientHTTPClient{
218		breaker: NewCircuitBreaker(CircuitBreakerConfig{
219			Name:           name,
220			MaxFailures:    3,
221			ResetTimeout:   10 * time.Second,
222			HalfOpenMaxReq: 2,
223		}),
224		client:  &http.Client{Timeout: 5 * time.Second},
225		baseURL: baseURL,
226	}
227}
228
229func Get(ctx context.Context, path string) {
230	var response string
231
232	err := c.breaker.Execute(func() error {
233		req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
234		if err != nil {
235			return err
236		}
237
238		resp, err := c.client.Do(req)
239		if err != nil {
240			return err
241		}
242		defer resp.Body.Close()
243
244		if resp.StatusCode >= 500 {
245			return fmt.Errorf("server error: %d", resp.StatusCode)
246		}
247
248		response = fmt.Sprintf("Status: %d", resp.StatusCode)
249		return nil
250	})
251
252	if err != nil {
253		// Fallback mechanism
254		if err == ErrCircuitOpen || err == ErrTooManyRequests {
255			log.Printf("Using fallback response due to: %v", err)
256			return c.fallback(), nil
257		}
258		return "", err
259	}
260
261	return response, nil
262}
263
264func fallback() string {
265	return "Fallback response: Service temporarily unavailable"
266}
267
268func GetMetrics() CircuitBreakerMetrics {
269	return c.breaker.GetMetrics()
270}
271
272// Test service that can be toggled between healthy and unhealthy
273type TestService struct {
274	mu        sync.Mutex
275	healthy   bool
276	failCount int
277}
278
279func NewTestService() *TestService {
280	return &TestService{healthy: true}
281}
282
283func SetHealthy(healthy bool) {
284	s.mu.Lock()
285	defer s.mu.Unlock()
286	s.healthy = healthy
287	if healthy {
288		s.failCount = 0
289	}
290}
291
292func ServeHTTP(w http.ResponseWriter, r *http.Request) {
293	s.mu.Lock()
294	healthy := s.healthy
295	s.failCount++
296	s.mu.Unlock()
297
298	// Auto-recover after 5 failures
299	if s.failCount > 5 {
300		s.SetHealthy(true)
301		log.Println("[TestService] Auto-recovering")
302	}
303
304	if !healthy {
305		log.Println("[TestService] Returning error response")
306		http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
307		return
308	}
309
310	log.Println("[TestService] Returning success response")
311	w.WriteHeader(http.StatusOK)
312	w.Write([]byte("OK"))
313}
314
315func main() {
316	// Start test service
317	testService := NewTestService()
318	go func() {
319		http.Handle("/api", testService)
320		log.Println("Test service starting on :9000")
321		http.ListenAndServe(":9000", nil)
322	}()
323
324	time.Sleep(1 * time.Second)
325
326	// Create resilient client
327	client := NewResilientHTTPClient("test-service", "http://localhost:9000")
328
329	// Simulate requests
330	fmt.Println("\n=== Circuit Breaker Demo ===\n")
331
332	// Phase 1: Healthy service
333	fmt.Println("Phase 1: Service is healthy")
334	for i := 0; i < 3; i++ {
335		resp, err := client.Get(context.Background(), "/api")
336		fmt.Printf("Request %d: %v\n", i+1, resp, err)
337		time.Sleep(500 * time.Millisecond)
338	}
339	fmt.Printf("Circuit state: %s\n\n", client.breaker.GetState())
340
341	// Phase 2: Make service unhealthy
342	fmt.Println("Phase 2: Service becomes unhealthy")
343	testService.SetHealthy(false)
344
345	for i := 0; i < 5; i++ {
346		resp, err := client.Get(context.Background(), "/api")
347		fmt.Printf("Request %d: %v\n", i+1, resp, err)
348		fmt.Printf("  Circuit state: %s\n", client.breaker.GetState())
349		time.Sleep(500 * time.Millisecond)
350	}
351
352	// Phase 3: Wait for half-open
353	fmt.Println("\nPhase 3: Waiting for circuit to enter HALF-OPEN...")
354	time.Sleep(11 * time.Second)
355
356	for i := 0; i < 3; i++ {
357		resp, err := client.Get(context.Background(), "/api")
358		fmt.Printf("Request %d: %v\n", i+1, resp, err)
359		fmt.Printf("  Circuit state: %s\n", client.breaker.GetState())
360		time.Sleep(1 * time.Second)
361	}
362
363	// Print final metrics
364	metrics := client.GetMetrics()
365	fmt.Println("\n=== Final Metrics ===")
366	fmt.Printf("State: %s\n", metrics.State)
367	fmt.Printf("Total Requests: %d\n", metrics.TotalRequests)
368	fmt.Printf("Total Successes: %d\n", metrics.TotalSuccesses)
369	fmt.Printf("Total Failures: %d\n", metrics.TotalFailures)
370	fmt.Printf("Total Rejected: %d\n", metrics.TotalRejected)
371	fmt.Printf("Success Rate: %.2f%%\n", metrics.SuccessRate)
372
373	fmt.Println("\nCircuit Breaker States:")
374	fmt.Println("  CLOSED: Normal operation, all requests go through")
375	fmt.Println("  OPEN: Too many failures, reject all requests")
376	fmt.Println("  HALF-OPEN: Testing recovery, allow limited requests")
377}

Key Circuit Breaker Concepts:

  1. Three States: Closed, Open, Half-Open
  2. Failure Threshold: Open circuit after N consecutive failures
  3. Reset Timeout: Time to wait before attempting recovery
  4. Half-Open Testing: Limited requests to test service recovery
  5. Automatic Transitions: State changes based on success/failure patterns
  6. Fallback Mechanism: Provide degraded functionality when circuit is open
  7. Metrics Tracking: Monitor request counts, success rates, state changes
  8. Thread-Safe: Concurrent request handling with proper locking
  9. Configurable: Adjust thresholds and timeouts per service
  10. Production Ready: Prevents cascade failures in distributed systems

State Transitions:

CLOSED --[failures >= threshold]--> OPEN
OPEN --[timeout elapsed]--> HALF-OPEN
HALF-OPEN --[success]--> CLOSED
HALF-OPEN --[failure]--> OPEN

Further Reading

Summary

Key Takeaways

  1. Service Design

    • Single responsibility per service
    • Decentralized data management
    • Independent deployment
    • API-first design
  2. Communication

    • Use gRPC for internal services
    • REST for external APIs
    • Async messaging for events
    • Implement retries and circuit breakers
  3. Resilience

    • Circuit breakers prevent cascading failures
    • Retries handle transient errors
    • Health checks ensure service readiness
    • Graceful degradation
  4. Observability

    • Distributed tracing across services
    • Centralized logging
    • Metrics and monitoring
    • Service mesh for traffic management
  5. Production

    • Service discovery for dynamic routing
    • API gateway for single entry point
    • Saga pattern for distributed transactions
    • Configuration management