Why This Matters - The Communication Revolution
Consider building a global financial trading platform where millisecond delays can cost millions. Each trade request triggers dozens of microservice calls - market data, risk assessment, portfolio validation, compliance checks, order execution. With traditional REST APIs, each call adds overhead, and latency compounds quickly. During peak trading hours, your system processes 50,000 trades per second, each requiring coordination between 10+ microservices. That's 500,000 internal API calls where every millisecond counts.
Real-world scenario: A major streaming service needs to deliver personalized content to 100 million concurrent users. Each user's homepage requires:
- User profile data from the authentication service
- Viewing history from the analytics service
- Recommendation data from the ML service
- Content metadata from the catalog service
- Playback capabilities from the media service
With REST APIs, this means 5 sequential HTTP requests per user. At 100 million users, that's 500 million HTTP requests. With gRPC's streaming and multiplexing, this can be reduced to a single bidirectional stream per user - a 500x improvement in efficiency.
Learning Objectives
By the end of this article, you will be able to:
- Design high-performance gRPC services using Protocol Buffers for efficient serialization
- Implement unary, server, client, and bidirectional streaming for real-time communication
- Build production-ready interceptors for authentication, logging, and metrics
- Apply advanced patterns including load balancing, retries, and health checking
- Create complete microservice architectures using gRPC for internal communication
- Handle errors gracefully with proper status codes and client-side error handling
Core Concepts - Understanding gRPC
The Performance Imperative
In cloud-native microservices architectures, the communication layer becomes critical:
Traditional REST Challenges:
- Text-based protocols add serialization overhead
- Request/response model inefficient for real-time data
- Connection overhead for each service call
- Limited type safety between services
gRPC Advantages:
- Binary protocol - 3-10x smaller than JSON
- HTTP/2 multiplexing - single connection handles many concurrent requests
- Streaming capabilities - real-time bidirectional communication
- Strongly typed contracts - compile-time validation and code generation
💡 Key Insight: gRPC combines the performance of binary protocols with the developer experience of function calls, making it ideal for high-throughput microservices.
The Protocol Buffer Foundation
Protocol Buffers are the contract that enables different services to communicate with compile-time type safety:
1// user_service.proto - Service contract definition
2syntax = "proto3";
3
4package user;
5
6// Import common types for consistency
7import "google/protobuf/timestamp.proto";
8import "google/protobuf/empty.proto";
9
10option go_package = "github.com/company/microservices/gen/go/user";
11
12// User message definition with comprehensive fields
13message User {
14 uint64 id = 1; // Unique identifier
15 string email = 2; // Unique email address
16 string username = 3; // Display username
17 string first_name = 4; // First name
18 string last_name = 5; // Last name
19
20 // Timestamps for lifecycle management
21 google.protobuf.Timestamp created_at = 6;
22 google.protobuf.Timestamp updated_at = 7;
23 google.protobuf.Timestamp last_login = 8;
24
25 // User status and roles
26 UserStatus status = 9;
27 repeated UserRole roles = 10;
28
29 // Profile information
30 string avatar_url = 11;
31 string bio = 12;
32 map<string, string> preferences = 13;
33}
34
35// Enums for type safety
36enum UserStatus {
37 USER_STATUS_UNSPECIFIED = 0;
38 USER_STATUS_ACTIVE = 1;
39 USER_STATUS_INACTIVE = 2;
40 USER_STATUS_SUSPENDED = 3;
41 USER_STATUS_DELETED = 4;
42}
43
44enum UserRole {
45 USER_ROLE_UNSPECIFIED = 0;
46 USER_ROLE_USER = 1;
47 USER_ROLE_ADMIN = 2;
48 USER_ROLE_MODERATOR = 3;
49}
50
51// Request/Response messages
52message GetUserRequest {
53 oneof identifier {
54 uint64 user_id = 1;
55 string email = 2;
56 string username = 3;
57 }
58}
59
60message CreateUserRequest {
61 string email = 1;
62 string username = 2;
63 string password = 3;
64 string first_name = 4;
65 string last_name = 5;
66}
67
68message UpdateUserRequest {
69 uint64 user_id = 1;
70 // Optional fields for partial updates
71 optional string username = 2;
72 optional string first_name = 3;
73 optional string last_name = 4;
74 optional string bio = 5;
75 map<string, string> preferences = 6;
76}
77
78message ListUsersRequest {
79 uint32 page = 1;
80 uint32 limit = 2;
81 UserStatus status = 3; // Filter by status
82 string search = 4; // Search term
83}
84
85message ListUsersResponse {
86 repeated User users = 1;
87 uint32 total = 2;
88 bool has_more = 3;
89}
90
91// Service definition with comprehensive operations
92service UserService {
93 // Unary operations
94 rpc GetUser(GetUserRequest) returns;
95 rpc CreateUser(CreateUserRequest) returns;
96 rpc UpdateUser(UpdateUserRequest) returns;
97 rpc DeleteUser(GetUserRequest) returns;
98
99 // List operation with pagination
100 rpc ListUsers(ListUsersRequest) returns;
101
102 // Streaming operations
103 rpc StreamUsers(ListUsersRequest) returns;
104 rpc BulkCreateUsers(stream CreateUserRequest) returns;
105 rpc UserActivity(GetUserRequest) returns;
106}
The gRPC Communication Model
1. Unary RPC:
1// Traditional function call pattern
2response, err := client.GetUser(ctx, &user.GetUserRequest{UserId: 123})
2. Server Streaming:
1// Client receives multiple responses from single request
2stream, err := client.StreamUsers(ctx, &user.ListUsersRequest{Limit: 100})
3for {
4 user, err := stream.Recv()
5 if err == io.EOF { break }
6 // Process each user
7}
3. Client Streaming:
1// Client sends multiple requests, gets single response
2stream, err := client.BulkCreateUsers(ctx)
3for _, userData := range usersToCreate {
4 stream.Send(&user.CreateUserRequest{...})
5}
6response, err := stream.CloseAndRecv()
4. Bidirectional Streaming:
1// Both client and server can send messages independently
2stream, err := client.UserActivity(ctx)
3go func() {
4 for activity := range userActivities {
5 stream.Send(&user.UserActivityRequest{...})
6 }
7 stream.CloseSend()
8}()
9
10for {
11 event, err := stream.Recv()
12 if err == io.EOF { break }
13 // Process server events
14}
Practical Examples - Building Production gRPC Services
Example 1: Basic Unary Service
Let's start with a complete user management service:
1// api/user/v1/user.proto - Complete service definition
2syntax = "proto3";
3
4package user.v1;
5
6import "google/protobuf/timestamp.proto";
7import "google/protobuf/empty.proto";
8
9option go_package = "github.com/company/microservices/gen/go/user/v1;userv1";
10
11message User {
12 uint64 id = 1;
13 string email = 2;
14 string username = 3;
15 string first_name = 4;
16 string last_name = 5;
17 google.protobuf.Timestamp created_at = 6;
18 google.protobuf.Timestamp updated_at = 7;
19 UserStatus status = 8;
20}
21
22enum UserStatus {
23 USER_STATUS_UNSPECIFIED = 0;
24 USER_STATUS_ACTIVE = 1;
25 USER_STATUS_INACTIVE = 2;
26}
27
28message GetUserRequest {
29 oneof identifier {
30 uint64 user_id = 1;
31 string email = 2;
32 string username = 3;
33 }
34}
35
36message CreateUserRequest {
37 string email = 1;
38 string username = 2;
39 string password = 3;
40 string first_name = 4;
41 string last_name = 5;
42}
43
44message UpdateUserRequest {
45 uint64 user_id = 1;
46 optional string username = 2;
47 optional string first_name = 3;
48 optional string last_name = 4;
49 optional UserStatus status = 5;
50}
51
52service UserService {
53 rpc GetUser(GetUserRequest) returns;
54 rpc CreateUser(CreateUserRequest) returns;
55 rpc UpdateUser(UpdateUserRequest) returns;
56 rpc DeleteUser(GetUserRequest) returns;
57}
Generate Go code:
1// run
2# Install protoc and Go plugins
3go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
4go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
5
6# Generate Go code from proto files
7protoc --go_out=. --go_opt=paths=source_relative \
8 --go-grpc_out=. --go-grpc_opt=paths=source_relative \
9 api/user/v1/*.proto
Server implementation:
1// cmd/server/main.go - Complete gRPC server
2
3package main
4
5import (
6 "context"
7 "crypto/tls"
8 "database/sql"
9 "fmt"
10 "log"
11 "net"
12 "os"
13 "time"
14
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/codes"
17 "google.golang.org/grpc/credentials"
18 "google.golang.org/grpc/health"
19 "google.golang.org/grpc/health/grpc_health_v1"
20 "google.golang.org/grpc/reflection"
21 "google.golang.org/grpc/status"
22
23 pb "github.com/company/microservices/gen/go/user/v1"
24 "github.com/company/microservices/internal/user/repository"
25 "github.com/company/microservices/internal/user/service"
26)
27
28func main() {
29 // Configuration
30 config := struct {
31 Port string
32 DBHost string
33 DBPort string
34 DBUser string
35 DBPassword string
36 DBName string
37 TLSCert string
38 TLSKey string
39 }{
40 Port: getEnv("PORT", "50051"),
41 DBHost: getEnv("DB_HOST", "localhost"),
42 DBPort: getEnv("DB_PORT", "5432"),
43 DBUser: getEnv("DB_USER", "postgres"),
44 DBPassword: getEnv("DB_PASSWORD", "password"),
45 DBName: getEnv("DB_NAME", "users"),
46 TLSCert: getEnv("TLS_CERT", "server.crt"),
47 TLSKey: getEnv("TLS_KEY", "server.key"),
48 }
49
50 // Database connection
51 db, err := connectDB(config)
52 if err != nil {
53 log.Fatalf("Failed to connect to database: %v", err)
54 }
55 defer db.Close()
56
57 // Repository layer
58 userRepo := repository.NewPostgresRepository(db)
59
60 // Service layer
61 userService := service.NewUserService(userRepo)
62
63 // Create gRPC server with interceptors
64 server := grpc.NewServer(
65 grpc.UnaryInterceptor(loggingUnaryInterceptor),
66 grpc.StreamInterceptor(loggingStreamInterceptor),
67 grpc.ChainUnaryInterceptor(
68 authUnaryInterceptor,
69 metricsUnaryInterceptor,
70 ),
71 )
72
73 // Register services
74 pb.RegisterUserServiceServer(server, userService)
75
76 // Health check service
77 healthServer := health.NewServer()
78 grpc_health_v1.RegisterHealthServer(server, healthServer)
79 healthServer.SetServingStatus("user-service", grpc_health_v1.HealthCheckResponse_SERVING)
80
81 // Enable reflection for development
82 reflection.Register(server)
83
84 // Start server with TLS
85 lis, err := net.Listen("tcp", ":"+config.Port)
86 if err != nil {
87 log.Fatalf("Failed to listen: %v", err)
88 }
89
90 // Load TLS certificates
91 cert, err := tls.LoadX509KeyPair(config.TLSCert, config.TLSKey)
92 if err != nil {
93 log.Fatalf("Failed to load TLS certificates: %v", err)
94 }
95
96 tlsConfig := &tls.Config{
97 Certificates: []tls.Certificate{cert},
98 ClientAuth: tls.VerifyClientCertIfGiven,
99 }
100
101 tlsCreds := credentials.NewTLS(tlsConfig)
102
103 // Create TLS listener
104 tlsLis := tls.NewListener(lis, tlsConfig)
105
106 log.Printf("Starting gRPC server on port %s with TLS", config.Port)
107 if err := server.Serve(tlsLis); err != nil {
108 log.Fatalf("Failed to serve: %v", err)
109 }
110}
111
112// Repository implementation
113type userRepository struct {
114 db *sql.DB
115}
116
117func NewPostgresRepository(db *sql.DB) *userRepository {
118 return &userRepository{db: db}
119}
120
121func GetByID(ctx context.Context, id uint64) {
122 query := `
123 SELECT id, email, username, first_name, last_name,
124 created_at, updated_at, status
125 FROM users
126 WHERE id = $1 AND deleted_at IS NULL
127 `
128
129 user := &pb.User{}
130 var status pb.UserStatus
131 var createdAt, updatedAt time.Time
132
133 err := r.db.QueryRowContext(ctx, query, id).Scan(
134 &user.Id, &user.Email, &user.Username,
135 &user.FirstName, &user.LastName,
136 &createdAt, &updatedAt, &status,
137 )
138
139 if err != nil {
140 if err == sql.ErrNoRows {
141 return nil, status.Error(codes.NotFound, "user not found")
142 }
143 return nil, status.Error(codes.Internal, "database error")
144 }
145
146 user.CreatedAt = timestamppb.New(createdAt)
147 user.UpdatedAt = timestamppb.New(updatedAt)
148 user.Status = status
149
150 return user, nil
151}
152
153func GetByEmail(ctx context.Context, email string) {
154 // Similar implementation for email lookup
155}
156
157func Create(ctx context.Context, req *pb.CreateUserRequest) {
158 query := `
159 INSERT INTO users
160 VALUES, NOW())
161 RETURNING id, created_at, updated_at
162 `
163
164 user := &pb.User{
165 Email: req.Email,
166 Username: req.Username,
167 FirstName: req.FirstName,
168 LastName: req.LastName,
169 Status: pb.UserStatus_USER_STATUS_ACTIVE,
170 }
171
172 // Hash password
173 hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
174 if err != nil {
175 return nil, status.Error(codes.Internal, "failed to hash password")
176 }
177
178 var createdAt, updatedAt time.Time
179 err = r.db.QueryRowContext(ctx, query,
180 req.Email, req.Username, hashedPassword,
181 req.FirstName, req.LastName,
182 ).Scan(&user.Id, &createdAt, &updatedAt)
183
184 if err != nil {
185 if isUniqueViolation(err) {
186 return nil, status.Error(codes.AlreadyExists, "user already exists")
187 }
188 return nil, status.Error(codes.Internal, "failed to create user")
189 }
190
191 user.CreatedAt = timestamppb.New(createdAt)
192 user.UpdatedAt = timestamppb.New(updatedAt)
193
194 return user, nil
195}
196
197// Service implementation
198type userService struct {
199 pb.UnimplementedUserServiceServer
200 repo repository.UserRepository
201}
202
203func NewUserService(repo repository.UserRepository) *userService {
204 return &userService{repo: repo}
205}
206
207func GetUser(ctx context.Context, req *pb.GetUserRequest) {
208 // Validate request
209 if req.GetIdentifier() == nil {
210 return nil, status.Error(codes.InvalidArgument, "identifier is required")
211 }
212
213 switch identifier := req.Identifier.(type) {
214 case *pb.GetUserRequest_UserId:
215 return s.repo.GetByID(ctx, identifier.UserId)
216 case *pb.GetUserRequest_Email:
217 return s.repo.GetByEmail(ctx, identifier.Email)
218 case *pb.GetUserRequest_Username:
219 return s.repo.GetByUsername(ctx, identifier.Username)
220 default:
221 return nil, status.Error(codes.InvalidArgument, "invalid identifier type")
222 }
223}
224
225func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
226 // Validate request
227 if err := validateCreateUserRequest(req); err != nil {
228 return nil, status.Error(codes.InvalidArgument, err.Error())
229 }
230
231 // Check if user already exists
232 if _, err := s.repo.GetByEmail(ctx, req.Email); err == nil {
233 return nil, status.Error(codes.AlreadyExists, "user with this email already exists")
234 }
235
236 // Create user
237 return s.repo.Create(ctx, req)
238}
239
240func UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) {
241 // Validate request
242 if req.UserId == 0 {
243 return nil, status.Error(codes.InvalidArgument, "user_id is required")
244 }
245
246 // Check if user exists
247 user, err := s.repo.GetByID(ctx, req.UserId)
248 if err != nil {
249 return nil, err
250 }
251
252 // Update fields if provided
253 if req.Username != nil {
254 user.Username = *req.Username
255 }
256 if req.FirstName != nil {
257 user.FirstName = *req.FirstName
258 }
259 if req.LastName != nil {
260 user.LastName = *req.LastName
261 }
262 if req.Status != nil {
263 user.Status = *req.Status
264 }
265
266 // Update in database
267 return s.repo.Update(ctx, user)
268}
269
270func DeleteUser(ctx context.Context, req *pb.GetUserRequest) {
271 // Similar implementation for soft delete
272 return &emptypb.Empty{}, nil
273}
274
275// Helper functions
276func validateCreateUserRequest(req *pb.CreateUserRequest) error {
277 if req.Email == "" {
278 return fmt.Errorf("email is required")
279 }
280 if req.Username == "" {
281 return fmt.Errorf("username is required")
282 }
283 if req.Password == "" {
284 return fmt.Errorf("password is required")
285 }
286 if len(req.Password) < 8 {
287 return fmt.Errorf("password must be at least 8 characters")
288 }
289 return nil
290}
291
292func connectDB(config struct{}) {
293 dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
294 config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)
295
296 db, err := sql.Open("postgres", dsn)
297 if err != nil {
298 return nil, err
299 }
300
301 // Test connection
302 if err := db.Ping(); err != nil {
303 return nil, err
304 }
305
306 // Configure connection pool
307 db.SetMaxOpenConns(25)
308 db.SetMaxIdleConns(5)
309 db.SetConnMaxLifetime(5 * time.Minute)
310
311 return db, nil
312}
Example 2: Advanced Streaming Patterns
1// service/streaming.go - Advanced streaming implementations
2
3// Server streaming for real-time user updates
4func StreamUsers(req *pb.ListUsersRequest, stream pb.UserService_StreamUsersServer) error {
5 // Validate request
6 if req.Limit == 0 {
7 req.Limit = 100 // Default limit
8 }
9 if req.Limit > 1000 {
10 return status.Error(codes.InvalidArgument, "limit cannot exceed 1000")
11 }
12
13 // Create database cursor for streaming
14 query := `
15 SELECT id, email, username, first_name, last_name,
16 created_at, updated_at, status
17 FROM users
18 WHERE deleted_at IS NULL
19 `
20 args := []interface{}{}
21
22 if req.Status != pb.UserStatus_USER_STATUS_UNSPECIFIED {
23 query += " AND status = $"
24 args = append(args, int32(req.Status))
25 }
26 if req.Search != "" {
27 query += " AND"
28 searchPattern := "%" + req.Search + "%"
29 args = append(args, searchPattern, searchPattern, searchPattern, searchPattern)
30 }
31 query += " ORDER BY created_at DESC"
32
33 // Execute query with cursor
34 rows, err := s.db.QueryContext(stream.Context(), query, args...)
35 if err != nil {
36 return status.Error(codes.Internal, "database error")
37 }
38 defer rows.Close()
39
40 count := 0
41 for rows.Next() {
42 // Check if client has disconnected
43 select {
44 case <-stream.Context().Done():
45 return stream.Context().Err()
46 default:
47 }
48
49 user := &pb.User{}
50 var status pb.UserStatus
51 var createdAt, updatedAt time.Time
52
53 err := rows.Scan(
54 &user.Id, &user.Email, &user.Username,
55 &user.FirstName, &user.LastName,
56 &createdAt, &updatedAt, &status,
57 )
58 if err != nil {
59 log.Printf("Error scanning row: %v", err)
60 continue
61 }
62
63 user.CreatedAt = timestamppb.New(createdAt)
64 user.UpdatedAt = timestamppb.New(updatedAt)
65 user.Status = status
66
67 // Send user to client
68 if err := stream.Send(user); err != nil {
69 return err
70 }
71
72 count++
73 if count >= int(req.Limit) {
74 break
75 }
76 }
77
78 return nil
79}
80
81// Client streaming for bulk operations
82func BulkCreateUsers(stream pb.UserService_BulkCreateUsersServer) error {
83 var users []*pb.CreateUserRequest
84 var errors []*pb.BulkCreateError
85
86 // Receive all users from client
87 for {
88 req, err := stream.Recv()
89 if err == io.EOF {
90 break
91 }
92 if err != nil {
93 return err
94 }
95
96 // Validate each request
97 if err := validateCreateUserRequest(req); err != nil {
98 errors = append(errors, &pb.BulkCreateError{
99 Email: req.Email,
100 Error: err.Error(),
101 })
102 continue
103 }
104
105 users = append(users, req)
106 }
107
108 // Create users in database
109 var createdUsers []*pb.User
110 for _, req := range users {
111 user, err := s.repo.Create(stream.Context(), req)
112 if err != nil {
113 errors = append(errors, &pb.BulkCreateError{
114 Email: req.Email,
115 Error: err.Error(),
116 })
117 continue
118 }
119 createdUsers = append(createdUsers, user)
120 }
121
122 // Send response with created users and errors
123 response := &pb.ListUsersResponse{
124 Users: createdUsers,
125 Total: uint32(len(createdUsers)),
126 Errors: errors,
127 HasMore: false,
128 }
129
130 return stream.SendAndClose(response)
131}
132
133// Bidirectional streaming for real-time user activity
134func UserActivity(stream pb.UserService_UserActivityServer) error {
135 // Subscribe user to activity updates
136 activityChan := make(chan *pb.UserActivityEvent, 100)
137 defer close(activityChan)
138
139 // Handle incoming requests in goroutine
140 go func() {
141 for {
142 req, err := stream.Recv()
143 if err == io.EOF {
144 break
145 }
146 if err != nil {
147 log.Printf("Error receiving activity request: %v", err)
148 return
149 }
150
151 // Subscribe user to specific events
152 switch req.GetAction() {
153 case pb.UserActivityRequest_SUBSCRIBE_USER_UPDATES:
154 s.activityManager.Subscribe(req.GetUserId(), activityChan)
155 case pb.UserActivityRequest_UNSUBSCRIBE_USER_UPDATES:
156 s.activityManager.Unsubscribe(req.GetUserId(), activityChan)
157 }
158 }
159 }()
160
161 // Send activity events to client
162 for {
163 select {
164 case event := <-activityChan:
165 if err := stream.Send(event); err != nil {
166 return err
167 }
168 case <-stream.Context().Done():
169 return stream.Context().Err()
170 }
171 }
172}
Example 3: Production Interceptors
1// interceptors.go - Production-ready gRPC interceptors
2
3// Authentication interceptor
4func authUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
5 // Skip authentication for health check and reflection
6 if info.FullMethod == "/grpc.health.v1.Health/Check" ||
7 info.FullMethod == "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" {
8 return handler(ctx, req)
9 }
10
11 // Extract token from metadata
12 md, ok := metadata.FromIncomingContext(ctx)
13 if !ok {
14 return nil, status.Error(codes.Unauthenticated, "missing metadata")
15 }
16
17 tokens := md["authorization"]
18 if len(tokens) == 0 {
19 return nil, status.Error(codes.Unauthenticated, "missing authorization token")
20 }
21
22 // Validate token
23 userID, err := validateToken(tokens[0])
24 if err != nil {
25 return nil, status.Error(codes.Unauthenticated, "invalid token")
26 }
27
28 // Add user ID to context
29 newCtx := context.WithValue(ctx, "user_id", userID)
30 return handler(newCtx, req)
31}
32
33// Logging interceptor
34func loggingUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
35 start := time.Now()
36
37 // Log request
38 log.Printf("GRPC Request: %s", info.FullMethod)
39
40 // Call handler
41 resp, err := handler(ctx, req)
42
43 // Log response with duration
44 duration := time.Since(start)
45 if err != nil {
46 log.Printf("GRPC Error: %s - %v", info.FullMethod, err, duration)
47 } else {
48 log.Printf("GRPC Response: %s", info.FullMethod, duration)
49 }
50
51 return resp, err
52}
53
54// Metrics interceptor
55func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
56 start := time.Now()
57
58 resp, err := handler(ctx, req)
59 duration := time.Since(start)
60
61 // Record metrics
62 method := info.FullMethod
63 if err != nil {
64 grpcErrorCounter.WithLabelValues(method, getErrorCode(err)).Inc()
65 } else {
66 grpcSuccessCounter.WithLabelValues(method).Inc()
67 }
68 grpcDurationHistogram.WithLabelValues(method).Observe(duration.Seconds())
69
70 return resp, err
71}
72
73// Rate limiting interceptor
74func rateLimitUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
75 // Extract client ID
76 md, ok := metadata.FromIncomingContext(ctx)
77 if !ok {
78 return nil, status.Error(codes.Unauthenticated, "missing metadata")
79 }
80
81 clientIDs := md["x-client-id"]
82 if len(clientIDs) == 0 {
83 return nil, status.Error(codes.Unauthenticated, "missing client ID")
84 }
85
86 clientID := clientIDs[0]
87
88 // Check rate limit
89 if !rateLimiter.Allow(clientID) {
90 return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
91 }
92
93 return handler(ctx, req)
94}
Example 4: Client Implementation with Best Practices
1// client/user_client.go - Production-ready gRPC client
2
3package client
4
5import (
6 "context"
7 "crypto/tls"
8 "fmt"
9 "time"
10
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/credentials"
13 "google.golang.org/grpc/credentials/insecure"
14 "google.golang.org/grpc/keepalive"
15 "google.golang.org/grpc/retry"
16
17 pb "github.com/company/microservices/gen/go/user/v1"
18)
19
20type UserClient struct {
21 conn *grpc.ClientConn
22 client pb.UserServiceClient
23}
24
25func NewUserClient(addr string, useTLS bool, token string) {
26 // Create connection with retry and keepalive
27 dialOpts := []grpc.DialOption{
28 grpc.WithKeepaliveParams(keepalive.ClientParameters{
29 Time: 10 * time.Second,
30 Timeout: 3 * time.Second,
31 PermitWithoutStream: true,
32 }),
33 grpc.WithUnaryInterceptor(loggingClientInterceptor),
34 grpc.WithStreamInterceptor(loggingStreamInterceptor),
35 grpc.WithDefaultCallOptions(
36 grpc.WaitForReady(true),
37 grpc.MaxRetryRPCBufferSize(1024*1024), // 1MB
38 ),
39 }
40
41 // Configure credentials
42 if useTLS {
43 creds := credentials.NewTLS(&tls.Config{
44 ServerName: "user-service",
45 InsecureSkipVerify: false,
46 })
47 dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
48 } else {
49 dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
50 }
51
52 // Connect with retry
53 conn, err := grpc.Dial(addr, dialOpts...)
54 if err != nil {
55 return nil, fmt.Errorf("failed to connect to user service: %w", err)
56 }
57
58 // Create client with authentication interceptor
59 if token != "" {
60 authInterceptor := tokenAuthInterceptor(token)
61 client := pb.NewUserServiceClient(
62 grpc.NewClientConn(
63 conn.Target(),
64 grpc.WithTransportCredentials(conn.ConnectionState().TransportCredentials),
65 grpc.WithUnaryInterceptor(authInterceptor),
66 ),
67 )
68 return &UserClient{conn: conn, client: client}, nil
69 }
70
71 return &UserClient{
72 conn: conn,
73 client: pb.NewUserServiceClient(conn),
74 }, nil
75}
76
77func GetUser(ctx context.Context, userID uint64) {
78 req := &pb.GetUserRequest{
79 Identifier: &pb.GetUserRequest_UserId{UserId: userID},
80 }
81
82 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
83 defer cancel()
84
85 return c.client.GetUser(ctx, req)
86}
87
88func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
89 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
90 defer cancel()
91
92 return c.client.CreateUser(ctx, req)
93}
94
95func StreamUsers(ctx context.Context, req *pb.ListUsersRequest) {
96 stream, err := c.client.StreamUsers(ctx, req)
97 if err != nil {
98 return nil, err
99 }
100
101 userChan := make(chan *pb.User, 100)
102
103 go func() {
104 defer close(userChan)
105 for {
106 user, err := stream.Recv()
107 if err == io.EOF {
108 return
109 }
110 if err != nil {
111 log.Printf("Error receiving user: %v", err)
112 return
113 }
114 userChan <- user
115 }
116 }()
117
118 return userChan, nil
119}
120
121func Close() error {
122 return c.conn.Close()
123}
124
125// Client interceptors
126func loggingClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
127 start := time.Now()
128 err := invoker(ctx, method, req, reply, cc, opts...)
129 duration := time.Since(start)
130
131 if err != nil {
132 log.Printf("GRPC Client Error: %s - %v", method, err, duration)
133 } else {
134 log.Printf("GRPC Client Response: %s", method, duration)
135 }
136
137 return err
138}
139
140func tokenAuthInterceptor(token string) grpc.UnaryClientInterceptor {
141 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
142 // Add token to metadata
143 md := metadata.Pairs("authorization", "Bearer "+token)
144 ctx = metadata.NewOutgoingContext(ctx, md)
145 return invoker(ctx, method, req, reply, cc, opts...)
146 }
147}
Common Patterns and Pitfalls
Pattern 1: Connection Pooling and Reuse
1// ✅ GOOD: Use connection pooling and reuse
2type ServiceRegistry struct {
3 connections map[string]*grpc.ClientConn
4 mu sync.RWMutex
5}
6
7func GetConnection(serviceName, address string) {
8 r.mu.RLock()
9 conn, exists := r.connections[serviceName]
10 r.mu.RUnlock()
11
12 if exists {
13 return conn, nil
14 }
15
16 r.mu.Lock()
17 defer r.mu.Unlock()
18
19 // Double-check after acquiring write lock
20 if conn, exists := r.connections[serviceName]; exists {
21 return conn, nil
22 }
23
24 // Create new connection
25 conn, err := grpc.Dial(address, grpc.WithInsecure())
26 if err != nil {
27 return nil, err
28 }
29
30 r.connections[serviceName] = conn
31 return conn, nil
32}
33
34// ❌ BAD: Creating new connections for each request
35func badGetUser(userID uint64) {
36 conn, err := grpc.Dial("localhost:50051") // New connection every call!
37 if err != nil {
38 return nil, err
39 }
40 defer conn.Close() // Immediately closed!
41
42 client := pb.NewUserServiceClient(conn)
43 return client.GetUser(context.Background(), &pb.GetUserRequest{UserId: userID})
44}
Pattern 2: Proper Error Handling
1// ✅ GOOD: Proper gRPC error handling
2func GetUser(ctx context.Context, req *pb.GetUserRequest) {
3 user, err := s.repo.GetByID(ctx, req.GetUserId())
4 if err != nil {
5 switch {
6 case errors.Is(err, repository.ErrNotFound):
7 return nil, status.Error(codes.NotFound, "user not found")
8 case errors.Is(err, repository.ErrInvalidID):
9 return nil, status.Error(codes.InvalidArgument, "invalid user ID")
10 default:
11 // Log unexpected errors but don't expose details
12 log.Printf("Unexpected error getting user %d: %v", req.GetUserId(), err)
13 return nil, status.Error(codes.Internal, "internal error")
14 }
15 }
16 return user, nil
17}
18
19// ❌ BAD: Exposing internal errors
20func GetUserBad(ctx context.Context, req *pb.GetUserRequest) {
21 user, err := s.repo.GetByID(ctx, req.GetUserId())
22 if err != nil {
23 return nil, err // Exposes internal database errors!
24 }
25 return user, nil
26}
Pattern 3: Context Propagation
1// ✅ GOOD: Proper context propagation and cancellation
2func UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) {
3 // Create a child context with timeout
4 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
5 defer cancel()
6
7 // Pass context to all operations
8 user, err := s.repo.GetByID(ctx, req.UserId)
9 if err != nil {
10 return nil, err
11 }
12
13 // Check if user has permission to update this resource
14 userID := ctx.Value("user_id").(uint64)
15 if userID != req.UserId && !isAdmin(userID) {
16 return nil, status.Error(codes.PermissionDenied, "insufficient permissions")
17 }
18
19 return s.repo.Update(ctx, user)
20}
21
22// ❌ BAD: Ignoring context cancellation
23func UpdateUserBad(ctx context.Context, req *pb.UpdateUserRequest) {
24 // Ignoring context passed by client
25 user, err := s.repo.GetByID(context.Background(), req.UserId)
26 if err != nil {
27 return nil, err
28 }
29
30 // Long-running operation without cancellation
31 time.Sleep(30 * time.Second) // Block for 30 seconds!
32
33 return s.repo.Update(context.Background(), user)
34}
Common Pitfalls
1. Not handling streaming errors properly
1// ❌ BAD: Not checking for stream errors
2for {
3 user, _ := stream.Recv() // Ignoring error!
4 processUser(user)
5}
6
7// ✅ GOOD: Proper error handling
8for {
9 user, err := stream.Recv()
10 if err == io.EOF {
11 break // Normal termination
12 }
13 if err != nil {
14 return fmt.Errorf("stream error: %w", err)
15 }
16 processUser(user)
17}
2. Blocking on streaming calls without timeout
1// ❌ BAD: No timeout for streaming
2stream, err := client.StreamUsers(ctx, req)
3if err != nil {
4 return err
5}
6
7for {
8 user, err := stream.Recv() // Could block forever!
9 if err == io.EOF { break }
10 // Process...
11}
12
13// ✅ GOOD: With timeout context
14ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
15defer cancel()
16
17stream, err := client.StreamUsers(ctx, req)
18if err != nil {
19 return err
20}
21
22for {
23 user, err := stream.Recv()
24 if err == io.EOF { break }
25 if err != nil { return err }
26 // Process...
27}
Integration and Mastery
Production Deployment Patterns
1. Load Balancing and Service Discovery:
1// client/resolver.go - Custom resolver for service discovery
2type ServiceDiscoveryResolver struct {
3 serviceName string
4 registry ServiceRegistry
5}
6
7func ResolveNow(o resolver.ResolveNowOptions) {
8 // Trigger when endpoints change
9}
10
11func Close() {
12 // Cleanup resources
13}
14
15// Build with custom resolver
16conn, err := grpc.Dial(
17 "discovery:///"+serviceName,
18 grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
19 grpc.WithResolvers(&ServiceDiscoveryResolver{}),
20)
2. Circuit Breaker Pattern:
1// client/circuit_breaker.go - Circuit breaker for gRPC
2type CircuitBreaker struct {
3 maxFailures int
4 resetTimeout time.Duration
5 failures int
6 lastFailTime time.Time
7 mu sync.Mutex
8 state State
9}
10
11type State int
12
13const (
14 StateClosed State = iota
15 StateOpen
16 StateHalfOpen
17)
18
19func Call(ctx context.Context, call func() error) error {
20 cb.mu.Lock()
21 defer cb.mu.Unlock()
22
23 if cb.state == StateOpen {
24 if time.Since(cb.lastFailTime) > cb.resetTimeout {
25 cb.state = StateHalfOpen
26 } else {
27 return errors.New("circuit breaker is open")
28 }
29 }
30
31 err := call()
32 if err != nil {
33 cb.failures++
34 cb.lastFailTime = time.Now()
35 if cb.failures >= cb.maxFailures {
36 cb.state = StateOpen
37 }
38 return err
39 }
40
41 cb.failures = 0
42 cb.state = StateClosed
43 return nil
44}
Observability Integration
Metrics Collection:
1// metrics/otel.go - OpenTelemetry integration
2func initGRPCMetrics() {
3 metrics := grpc_prometheus.ClientMetrics{
4 ClientHandledCounter: prometheus.NewCounterVec(
5 prometheus.CounterOpts{
6 Name: "grpc_client_handled_total",
7 Help: "Total number of RPCs completed by the client, regardless of success or failure.",
8 },
9 []string{"grpc_service", "grpc_method", "grpc_code"},
10 ),
11 ClientMsgReceived: prometheus.NewHistogramVec(
12 prometheus.HistogramOpts{
13 Name: "grpc_client_msg_received_total",
14 Help: "Total number of RPC messages received by the client.",
15 Buckets: prometheus.DefBuckets,
16 },
17 []string{"grpc_service", "grpc_method"},
18 ),
19 }
20
21 // Register metrics
22 prometheus.MustRegister(
23 metrics.ClientHandledCounter,
24 metrics.ClientMsgReceived,
25 )
26}
Testing gRPC Services
1// tests/user_service_test.go - Comprehensive testing
2
3func TestUserService_GetUser(t *testing.T) {
4 // Setup test database
5 db := setupTestDB(t)
6 defer db.Close()
7
8 repo := repository.NewPostgresRepository(db)
9 service := &userService{repo: repo}
10
11 tests := []struct {
12 name string
13 req *pb.GetUserRequest
14 want *pb.User
15 wantErr bool
16 errCode codes.Code
17 }{
18 {
19 name: "valid user ID",
20 req: &pb.GetUserRequest{
21 Identifier: &pb.GetUserRequest_UserId{UserId: 1},
22 },
23 want: &pb.User{
24 Id: 1,
25 Email: "test@example.com",
26 Username: "testuser",
27 FirstName: "Test",
28 LastName: "User",
29 Status: pb.UserStatus_USER_STATUS_ACTIVE,
30 },
31 wantErr: false,
32 },
33 {
34 name: "non-existent user",
35 req: &pb.GetUserRequest{
36 Identifier: &pb.GetUserRequest_UserId{UserId: 999},
37 },
38 want: nil,
39 wantErr: true,
40 errCode: codes.NotFound,
41 },
42 {
43 name: "invalid request",
44 req: &pb.GetUserRequest{},
45 want: nil,
46 wantErr: true,
47 errCode: codes.InvalidArgument,
48 },
49 }
50
51 for _, tt := range tests {
52 t.Run(tt.name, func(t *testing.T) {
53 got, err := service.GetUser(context.Background(), tt.req)
54
55 if tt.wantErr {
56 require.Error(t, err)
57 st, ok := status.FromError(err)
58 require.True(t, ok)
59 assert.Equal(t, tt.errCode, st.Code())
60 } else {
61 require.NoError(t, err)
62 assert.Equal(t, tt.want.Id, got.Id)
63 assert.Equal(t, tt.want.Email, got.Email)
64 }
65 })
66 }
67}
68
69// Integration test with in-memory gRPC server
70func TestUserService_Integration(t *testing.T) {
71 // Create in-memory server
72 srv := grpc.NewServer()
73 userService := setupUserService(t)
74 pb.RegisterUserServiceServer(srv, userService)
75
76 // Create listener
77 lis := bufconn.Listen(1024 * 1024)
78
79 // Start server in goroutine
80 go func() {
81 if err := srv.Serve(lis); err != nil {
82 log.Fatalf("Server exited with error: %v", err)
83 }
84 }()
85 defer srv.Stop()
86
87 // Create client connection
88 conn, err := grpc.DialContext(
89 context.Background(),
90 "bufnet",
91 grpc.WithContextDialer(bufconn.Dialer),
92 grpc.WithInsecure(),
93 )
94 require.NoError(t, err)
95 defer conn.Close()
96
97 client := pb.NewUserServiceClient(conn)
98
99 // Test create and get user
100 createReq := &pb.CreateUserRequest{
101 Email: "integration@test.com",
102 Username: "integration",
103 Password: "password123",
104 FirstName: "Integration",
105 LastName: "Test",
106 }
107
108 createdUser, err := client.CreateUser(context.Background(), createReq)
109 require.NoError(t, err)
110 assert.NotZero(t, createdUser.Id)
111
112 getReq := &pb.GetUserRequest{
113 Identifier: &pb.GetUserRequest_UserId{UserId: createdUser.Id},
114 }
115
116 retrievedUser, err := client.GetUser(context.Background(), getReq)
117 require.NoError(t, err)
118 assert.Equal(t, createdUser.Id, retrievedUser.Id)
119 assert.Equal(t, createReq.Email, retrievedUser.Email)
120}
Practice Exercises
Exercise 1: Real-Time Notification System
Objective: Build a bidirectional streaming service for real-time notifications.
Requirements:
- Client subscribes to notification types
- Server pushes notifications in real-time
- Client can send acknowledgment receipts
- Handle connection drops and reconnection
- Implement authentication and authorization
Exercise 2: File Upload Service with Progress
Objective: Create a streaming service for large file uploads with progress tracking.
Requirements:
- Client streams file in chunks
- Server provides progress updates
- Support for pause/resume uploads
- File integrity verification
- Concurrent uploads handling
Exercise 3: Load-Balanced gRPC Service Pool
Objective: Implement client-side load balancing across multiple service instances.
Requirements:
- Dynamic service discovery
- Multiple load balancing strategies
- Health checking and failover
- Circuit breaker integration
- Metrics collection
Exercise 4: gRPC Gateway and REST API Integration
Objective: Create a REST gateway that exposes gRPC services as HTTP APIs.
Requirements:
- HTTP/JSON to gRPC translation
- Error code mapping
- Authentication middleware
- OpenAPI documentation generation
- Request/response transformation
Exercise 5: Advanced gRPC Metrics and Observability
Objective: Implement comprehensive observability for gRPC services.
Requirements:
- OpenTelemetry tracing integration
- Custom metrics collection
- Distributed context propagation
- Performance profiling
- Alerting on SLA violations
Advanced Topics - Production gRPC at Scale
Load Balancing Strategies
gRPC supports multiple load balancing strategies for distributing requests across service instances:
1// client/load_balancing.go - Advanced load balancing configurations
2
3package client
4
5import (
6 "context"
7 "fmt"
8 "log"
9 "time"
10
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/balancer"
13 "google.golang.org/grpc/balancer/roundrobin"
14 "google.golang.org/grpc/credentials/insecure"
15 "google.golang.org/grpc/resolver"
16 "google.golang.org/grpc/resolver/manual"
17
18 pb "github.com/company/microservices/gen/go/user/v1"
19)
20
21// Custom resolver for service discovery
22type CustomResolver struct {
23 target resolver.Target
24 cc resolver.ClientConn
25 updateChan chan []resolver.Address
26}
27
28func NewCustomResolver(target resolver.Target, cc resolver.ClientConn) *CustomResolver {
29 r := &CustomResolver{
30 target: target,
31 cc: cc,
32 updateChan: make(chan []resolver.Address, 10),
33 }
34 go r.watcher()
35 return r
36}
37
38func (r *CustomResolver) watcher() {
39 for {
40 select {
41 case addrs := <-r.updateChan:
42 // Update service endpoints dynamically
43 r.cc.UpdateState(resolver.State{
44 Addresses: addrs,
45 })
46 }
47 }
48}
49
50func (r *CustomResolver) UpdateEndpoints(endpoints []string) {
51 var addrs []resolver.Address
52 for _, ep := range endpoints {
53 addrs = append(addrs, resolver.Address{Addr: ep})
54 }
55 r.updateChan <- addrs
56}
57
58func (r *CustomResolver) ResolveNow(o resolver.ResolveNowOptions) {}
59func (r *CustomResolver) Close() {}
60
61// Example: Client with round-robin load balancing
62func NewLoadBalancedClient(serviceEndpoints []string) (*pb.UserServiceClient, error) {
63 // Create manual resolver
64 r := manual.NewBuilderWithScheme("custom")
65
66 // Add service endpoints
67 var addrs []resolver.Address
68 for _, endpoint := range serviceEndpoints {
69 addrs = append(addrs, resolver.Address{
70 Addr: endpoint,
71 })
72 }
73 r.InitialState(resolver.State{Addresses: addrs})
74
75 // Create connection with round-robin load balancing
76 conn, err := grpc.Dial(
77 "custom:///user-service",
78 grpc.WithResolvers(r),
79 grpc.WithTransportCredentials(insecure.NewCredentials()),
80 grpc.WithDefaultServiceConfig(`{
81 "loadBalancingPolicy": "round_robin",
82 "healthCheckConfig": {
83 "serviceName": "user-service"
84 }
85 }`),
86 )
87 if err != nil {
88 return nil, fmt.Errorf("failed to dial: %w", err)
89 }
90
91 client := pb.NewUserServiceClient(conn)
92 return &client, nil
93}
94
95// Example: Pick-first load balancing (fastest connection)
96func NewPickFirstClient(serviceEndpoints []string) (*pb.UserServiceClient, error) {
97 r := manual.NewBuilderWithScheme("pickfirst")
98
99 var addrs []resolver.Address
100 for _, endpoint := range serviceEndpoints {
101 addrs = append(addrs, resolver.Address{Addr: endpoint})
102 }
103 r.InitialState(resolver.State{Addresses: addrs})
104
105 conn, err := grpc.Dial(
106 "pickfirst:///user-service",
107 grpc.WithResolvers(r),
108 grpc.WithTransportCredentials(insecure.NewCredentials()),
109 grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "pick_first"}`),
110 )
111 if err != nil {
112 return nil, fmt.Errorf("failed to dial: %w", err)
113 }
114
115 client := pb.NewUserServiceClient(conn)
116 return &client, nil
117}
Service Discovery Integration
1// discovery/consul.go - Consul-based service discovery
2
3package discovery
4
5import (
6 "context"
7 "fmt"
8 "log"
9 "time"
10
11 consul "github.com/hashicorp/consul/api"
12 "google.golang.org/grpc"
13 "google.golang.org/grpc/resolver"
14)
15
16// ConsulResolver implements gRPC resolver interface
17type ConsulResolver struct {
18 consulClient *consul.Client
19 serviceName string
20 cc resolver.ClientConn
21 stopCh chan struct{}
22}
23
24func NewConsulResolver(consulAddr, serviceName string, cc resolver.ClientConn) (*ConsulResolver, error) {
25 config := consul.DefaultConfig()
26 config.Address = consulAddr
27
28 client, err := consul.NewClient(config)
29 if err != nil {
30 return nil, fmt.Errorf("failed to create consul client: %w", err)
31 }
32
33 cr := &ConsulResolver{
34 consulClient: client,
35 serviceName: serviceName,
36 cc: cc,
37 stopCh: make(chan struct{}),
38 }
39
40 // Start watching for service changes
41 go cr.watcher()
42
43 return cr, nil
44}
45
46func (cr *ConsulResolver) watcher() {
47 ticker := time.NewTicker(10 * time.Second)
48 defer ticker.Stop()
49
50 for {
51 select {
52 case <-ticker.C:
53 if err := cr.updateEndpoints(); err != nil {
54 log.Printf("Failed to update endpoints: %v", err)
55 }
56 case <-cr.stopCh:
57 return
58 }
59 }
60}
61
62func (cr *ConsulResolver) updateEndpoints() error {
63 // Query Consul for healthy service instances
64 services, _, err := cr.consulClient.Health().Service(
65 cr.serviceName,
66 "",
67 true, // only healthy instances
68 nil,
69 )
70 if err != nil {
71 return fmt.Errorf("failed to query consul: %w", err)
72 }
73
74 // Convert to gRPC addresses
75 var addrs []resolver.Address
76 for _, service := range services {
77 addr := resolver.Address{
78 Addr: fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port),
79 Attributes: nil,
80 }
81 addrs = append(addrs, addr)
82 }
83
84 // Update gRPC with new addresses
85 cr.cc.UpdateState(resolver.State{
86 Addresses: addrs,
87 })
88
89 log.Printf("Updated service endpoints: %d instances found", len(addrs))
90 return nil
91}
92
93func (cr *ConsulResolver) ResolveNow(o resolver.ResolveNowOptions) {
94 go cr.updateEndpoints()
95}
96
97func (cr *ConsulResolver) Close() {
98 close(cr.stopCh)
99}
100
101// Register service with Consul
102func RegisterService(consulAddr, serviceID, serviceName, serviceAddr string, port int) error {
103 config := consul.DefaultConfig()
104 config.Address = consulAddr
105
106 client, err := consul.NewClient(config)
107 if err != nil {
108 return fmt.Errorf("failed to create consul client: %w", err)
109 }
110
111 registration := &consul.AgentServiceRegistration{
112 ID: serviceID,
113 Name: serviceName,
114 Address: serviceAddr,
115 Port: port,
116 Check: &consul.AgentServiceCheck{
117 GRPC: fmt.Sprintf("%s:%d", serviceAddr, port),
118 Interval: "10s",
119 DeregisterCriticalServiceAfter: "30s",
120 },
121 }
122
123 return client.Agent().ServiceRegister(registration)
124}
125
126// Deregister service from Consul
127func DeregisterService(consulAddr, serviceID string) error {
128 config := consul.DefaultConfig()
129 config.Address = consulAddr
130
131 client, err := consul.NewClient(config)
132 if err != nil {
133 return fmt.Errorf("failed to create consul client: %w", err)
134 }
135
136 return client.Agent().ServiceDeregister(serviceID)
137}
gRPC-Gateway: Bridging REST and gRPC
1// api/user/v1/user.proto - With HTTP annotations for REST gateway
2
3syntax = "proto3";
4
5package user.v1;
6
7import "google/api/annotations.proto";
8import "google/protobuf/timestamp.proto";
9import "google/protobuf/empty.proto";
10
11option go_package = "github.com/company/microservices/gen/go/user/v1;userv1";
12
13message User {
14 uint64 id = 1;
15 string email = 2;
16 string username = 3;
17 string first_name = 4;
18 string last_name = 5;
19 google.protobuf.Timestamp created_at = 6;
20 google.protobuf.Timestamp updated_at = 7;
21 UserStatus status = 8;
22}
23
24enum UserStatus {
25 USER_STATUS_UNSPECIFIED = 0;
26 USER_STATUS_ACTIVE = 1;
27 USER_STATUS_INACTIVE = 2;
28}
29
30message GetUserRequest {
31 oneof identifier {
32 uint64 user_id = 1;
33 string email = 2;
34 string username = 3;
35 }
36}
37
38message CreateUserRequest {
39 string email = 1;
40 string username = 2;
41 string password = 3;
42 string first_name = 4;
43 string last_name = 5;
44}
45
46message UpdateUserRequest {
47 uint64 user_id = 1;
48 optional string username = 2;
49 optional string first_name = 3;
50 optional string last_name = 4;
51 optional UserStatus status = 5;
52}
53
54// Service with HTTP annotations
55service UserService {
56 rpc GetUser(GetUserRequest) returns (User) {
57 option (google.api.http) = {
58 get: "/v1/users/{user_id}"
59 };
60 }
61
62 rpc CreateUser(CreateUserRequest) returns (User) {
63 option (google.api.http) = {
64 post: "/v1/users"
65 body: "*"
66 };
67 }
68
69 rpc UpdateUser(UpdateUserRequest) returns (User) {
70 option (google.api.http) = {
71 put: "/v1/users/{user_id}"
72 body: "*"
73 };
74 }
75
76 rpc DeleteUser(GetUserRequest) returns (google.protobuf.Empty) {
77 option (google.api.http) = {
78 delete: "/v1/users/{user_id}"
79 };
80 }
81}
1// cmd/gateway/main.go - gRPC-Gateway REST proxy
2
3package main
4
5import (
6 "context"
7 "fmt"
8 "log"
9 "net/http"
10
11 "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
12 "google.golang.org/grpc"
13 "google.golang.org/grpc/credentials/insecure"
14
15 gw "github.com/company/microservices/gen/go/user/v1"
16)
17
18func main() {
19 ctx := context.Background()
20 ctx, cancel := context.WithCancel(ctx)
21 defer cancel()
22
23 // Create gRPC-Gateway mux
24 mux := runtime.NewServeMux(
25 runtime.WithIncomingHeaderMatcher(customHeaderMatcher),
26 runtime.WithErrorHandler(customErrorHandler),
27 )
28
29 // gRPC server endpoint
30 grpcServerEndpoint := "localhost:50051"
31
32 opts := []grpc.DialOption{
33 grpc.WithTransportCredentials(insecure.NewCredentials()),
34 }
35
36 // Register gRPC-Gateway handlers
37 err := gw.RegisterUserServiceHandlerFromEndpoint(ctx, mux, grpcServerEndpoint, opts)
38 if err != nil {
39 log.Fatalf("Failed to register gateway: %v", err)
40 }
41
42 // HTTP server for REST API
43 httpServer := &http.Server{
44 Addr: ":8080",
45 Handler: corsMiddleware(mux),
46 }
47
48 log.Println("Starting REST gateway on :8080")
49 log.Fatal(httpServer.ListenAndServe())
50}
51
52// Custom header matcher for authentication tokens
53func customHeaderMatcher(key string) (string, bool) {
54 switch key {
55 case "Authorization", "X-Request-Id", "X-User-Id":
56 return key, true
57 default:
58 return runtime.DefaultHeaderMatcher(key)
59 }
60}
61
62// Custom error handler for consistent error responses
63func customErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler,
64 w http.ResponseWriter, r *http.Request, err error) {
65
66 w.Header().Set("Content-Type", "application/json")
67
68 // Convert gRPC error to HTTP status
69 httpStatus := runtime.HTTPStatusFromCode(grpc.Code(err))
70 w.WriteHeader(httpStatus)
71
72 // Write error response
73 errorResponse := map[string]interface{}{
74 "error": err.Error(),
75 "code": grpc.Code(err).String(),
76 }
77
78 marshaler.NewEncoder(w).Encode(errorResponse)
79}
80
81// CORS middleware for REST API
82func corsMiddleware(h http.Handler) http.Handler {
83 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
84 w.Header().Set("Access-Control-Allow-Origin", "*")
85 w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
86 w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Request-Id")
87
88 if r.Method == "OPTIONS" {
89 w.WriteHeader(http.StatusOK)
90 return
91 }
92
93 h.ServeHTTP(w, r)
94 })
95}
Performance Optimization Techniques
1// performance/pooling.go - Connection and buffer pooling
2
3package performance
4
5import (
6 "sync"
7
8 "google.golang.org/grpc"
9 "google.golang.org/protobuf/proto"
10)
11
12// ConnectionPool manages gRPC connections efficiently
13type ConnectionPool struct {
14 mu sync.RWMutex
15 connections []*grpc.ClientConn
16 current int
17 size int
18}
19
20func NewConnectionPool(target string, size int, opts ...grpc.DialOption) (*ConnectionPool, error) {
21 pool := &ConnectionPool{
22 connections: make([]*grpc.ClientConn, size),
23 size: size,
24 }
25
26 // Create pool of connections
27 for i := 0; i < size; i++ {
28 conn, err := grpc.Dial(target, opts...)
29 if err != nil {
30 // Close previously created connections
31 for j := 0; j < i; j++ {
32 pool.connections[j].Close()
33 }
34 return nil, err
35 }
36 pool.connections[i] = conn
37 }
38
39 return pool, nil
40}
41
42// Get returns next connection in round-robin fashion
43func (p *ConnectionPool) Get() *grpc.ClientConn {
44 p.mu.Lock()
45 defer p.mu.Unlock()
46
47 conn := p.connections[p.current]
48 p.current = (p.current + 1) % p.size
49 return conn
50}
51
52// Close all connections in pool
53func (p *ConnectionPool) Close() error {
54 p.mu.Lock()
55 defer p.mu.Unlock()
56
57 for _, conn := range p.connections {
58 if err := conn.Close(); err != nil {
59 return err
60 }
61 }
62 return nil
63}
64
65// MessagePool reuses protobuf messages to reduce allocations
66type MessagePool struct {
67 pool sync.Pool
68}
69
70func NewMessagePool(newFunc func() proto.Message) *MessagePool {
71 return &MessagePool{
72 pool: sync.Pool{
73 New: func() interface{} {
74 return newFunc()
75 },
76 },
77 }
78}
79
80func (mp *MessagePool) Get() proto.Message {
81 return mp.pool.Get().(proto.Message)
82}
83
84func (mp *MessagePool) Put(msg proto.Message) {
85 // Reset message before returning to pool
86 proto.Reset(msg)
87 mp.pool.Put(msg)
88}
89
90// Example usage with buffer pooling
91type BufferedService struct {
92 userPool *MessagePool
93}
94
95func NewBufferedService() *BufferedService {
96 return &BufferedService{
97 userPool: NewMessagePool(func() proto.Message {
98 return &pb.User{}
99 }),
100 }
101}
102
103func (s *BufferedService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
104 // Get user from pool
105 user := s.userPool.Get().(*pb.User)
106 defer s.userPool.Put(user)
107
108 // Populate user data
109 user.Id = req.GetUserId()
110 user.Email = "user@example.com"
111
112 // Clone for return (pool object will be reset)
113 return proto.Clone(user).(*pb.User), nil
114}
Distributed Tracing with OpenTelemetry
1// tracing/otel.go - OpenTelemetry integration for distributed tracing
2
3package tracing
4
5import (
6 "context"
7 "fmt"
8
9 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
10 "go.opentelemetry.io/otel"
11 "go.opentelemetry.io/otel/attribute"
12 "go.opentelemetry.io/otel/exporters/jaeger"
13 "go.opentelemetry.io/otel/sdk/resource"
14 sdktrace "go.opentelemetry.io/otel/sdk/trace"
15 semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
16 "go.opentelemetry.io/otel/trace"
17 "google.golang.org/grpc"
18)
19
20// InitTracer initializes OpenTelemetry with Jaeger exporter
21func InitTracer(serviceName, jaegerEndpoint string) (*sdktrace.TracerProvider, error) {
22 // Create Jaeger exporter
23 exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerEndpoint)))
24 if err != nil {
25 return nil, fmt.Errorf("failed to create jaeger exporter: %w", err)
26 }
27
28 // Create tracer provider
29 tp := sdktrace.NewTracerProvider(
30 sdktrace.WithBatcher(exporter),
31 sdktrace.WithResource(resource.NewWithAttributes(
32 semconv.SchemaURL,
33 semconv.ServiceNameKey.String(serviceName),
34 )),
35 )
36
37 // Register as global tracer provider
38 otel.SetTracerProvider(tp)
39
40 return tp, nil
41}
42
43// Server with tracing
44func NewTracedServer() *grpc.Server {
45 return grpc.NewServer(
46 grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
47 grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
48 )
49}
50
51// Client with tracing
52func NewTracedClient(target string) (*grpc.ClientConn, error) {
53 return grpc.Dial(
54 target,
55 grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
56 grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
57 )
58}
59
60// Custom span with attributes
61func AddCustomSpan(ctx context.Context, operationName string) (context.Context, trace.Span) {
62 tracer := otel.Tracer("user-service")
63 ctx, span := tracer.Start(ctx, operationName)
64
65 // Add custom attributes
66 span.SetAttributes(
67 attribute.String("service.version", "v1.0.0"),
68 attribute.String("deployment.environment", "production"),
69 )
70
71 return ctx, span
72}
73
74// Example traced service operation
75func (s *userService) GetUserWithTracing(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
76 // Create custom span
77 ctx, span := AddCustomSpan(ctx, "GetUser.Database")
78 defer span.End()
79
80 // Add request details to span
81 span.SetAttributes(
82 attribute.Int64("user.id", int64(req.GetUserId())),
83 )
84
85 // Perform database operation
86 user, err := s.repo.GetByID(ctx, req.GetUserId())
87 if err != nil {
88 span.RecordError(err)
89 return nil, err
90 }
91
92 // Add response details
93 span.SetAttributes(
94 attribute.String("user.email", user.Email),
95 attribute.String("user.status", user.Status.String()),
96 )
97
98 return user, nil
99}
Summary
Key Takeaways
- gRPC provides significant performance benefits over REST for microservice communication
- Protocol Buffers ensure type safety and enable efficient code generation
- Streaming enables real-time communication patterns not possible with traditional HTTP
- Interceptors provide powerful middleware for cross-cutting concerns
- Proper error handling and context management are critical for production services
- Load balancing and service discovery enable horizontal scaling and high availability
- gRPC-Gateway bridges REST and gRPC for backward compatibility
- OpenTelemetry integration provides comprehensive distributed tracing
Production Deployment Checklist
Before deploying gRPC services to production:
✅ Security:
- Enable TLS/mTLS for all connections
- Implement authentication interceptors
- Validate all input data
- Rate limit client connections
✅ Reliability:
- Configure retry policies with exponential backoff
- Implement circuit breakers
- Set appropriate timeouts
- Enable health checking
✅ Performance:
- Use connection pooling
- Enable HTTP/2 multiplexing
- Implement message pooling for high-frequency operations
- Profile and optimize hot paths
✅ Observability:
- Integrate distributed tracing (OpenTelemetry)
- Export metrics (Prometheus)
- Implement structured logging
- Set up alerting on SLA violations
✅ Operations:
- Document all service APIs
- Version your Protocol Buffers
- Implement graceful shutdown
- Plan for backward compatibility
Next Steps
- Explore advanced load balancing patterns including client-side and service mesh integration
- Study gRPC-Gateway for bridging REST and gRPC systems
- Learn about service mesh patterns with Istio and Linkerd
- Investigate gRPC-Web for browser compatibility
- Master OpenTelemetry integration for distributed tracing
Further Reading
- gRPC Official Documentation
- Protocol Buffers Developer Guide
- gRPC-Go Documentation
- OpenTelemetry Go Instrumentation
- gRPC Best Practices
- gRPC-Gateway Documentation
- Consul Service Discovery
You now have the knowledge to build production-ready gRPC services that can handle high-throughput, real-time communication between microservices. The patterns and practices covered here will help you create scalable, maintainable, and observable gRPC architectures.