gRPC Services in Go

Why This Matters - The Communication Revolution

Consider building a global financial trading platform where millisecond delays can cost millions. Each trade request triggers dozens of microservice calls - market data, risk assessment, portfolio validation, compliance checks, order execution. With traditional REST APIs, each call adds overhead, and latency compounds quickly. During peak trading hours, your system processes 50,000 trades per second, each requiring coordination between 10+ microservices. That's 500,000 internal API calls where every millisecond counts.

Real-world scenario: A major streaming service needs to deliver personalized content to 100 million concurrent users. Each user's homepage requires:

  • User profile data from the authentication service
  • Viewing history from the analytics service
  • Recommendation data from the ML service
  • Content metadata from the catalog service
  • Playback capabilities from the media service

With REST APIs, this means 5 sequential HTTP requests per user. At 100 million users, that's 500 million HTTP requests. With gRPC's streaming and multiplexing, this can be reduced to a single bidirectional stream per user - a 500x improvement in efficiency.

Learning Objectives

By the end of this article, you will be able to:

  • Design high-performance gRPC services using Protocol Buffers for efficient serialization
  • Implement unary, server, client, and bidirectional streaming for real-time communication
  • Build production-ready interceptors for authentication, logging, and metrics
  • Apply advanced patterns including load balancing, retries, and health checking
  • Create complete microservice architectures using gRPC for internal communication
  • Handle errors gracefully with proper status codes and client-side error handling

Core Concepts - Understanding gRPC

The Performance Imperative

In cloud-native microservices architectures, the communication layer becomes critical:

Traditional REST Challenges:

  • Text-based protocols add serialization overhead
  • Request/response model inefficient for real-time data
  • Connection overhead for each service call
  • Limited type safety between services

gRPC Advantages:

  • Binary protocol - 3-10x smaller than JSON
  • HTTP/2 multiplexing - single connection handles many concurrent requests
  • Streaming capabilities - real-time bidirectional communication
  • Strongly typed contracts - compile-time validation and code generation

💡 Key Insight: gRPC combines the performance of binary protocols with the developer experience of function calls, making it ideal for high-throughput microservices.

The Protocol Buffer Foundation

Protocol Buffers are the contract that enables different services to communicate with compile-time type safety:

  1// user_service.proto - Service contract definition
  2syntax = "proto3";
  3
  4package user;
  5
  6// Import common types for consistency
  7import "google/protobuf/timestamp.proto";
  8import "google/protobuf/empty.proto";
  9
 10option go_package = "github.com/company/microservices/gen/go/user";
 11
 12// User message definition with comprehensive fields
 13message User {
 14  uint64 id = 1;                    // Unique identifier
 15  string email = 2;                  // Unique email address
 16  string username = 3;               // Display username
 17  string first_name = 4;              // First name
 18  string last_name = 5;               // Last name
 19
 20  // Timestamps for lifecycle management
 21  google.protobuf.Timestamp created_at = 6;
 22  google.protobuf.Timestamp updated_at = 7;
 23  google.protobuf.Timestamp last_login = 8;
 24
 25  // User status and roles
 26  UserStatus status = 9;
 27  repeated UserRole roles = 10;
 28
 29  // Profile information
 30  string avatar_url = 11;
 31  string bio = 12;
 32  map<string, string> preferences = 13;
 33}
 34
 35// Enums for type safety
 36enum UserStatus {
 37  USER_STATUS_UNSPECIFIED = 0;
 38  USER_STATUS_ACTIVE = 1;
 39  USER_STATUS_INACTIVE = 2;
 40  USER_STATUS_SUSPENDED = 3;
 41  USER_STATUS_DELETED = 4;
 42}
 43
 44enum UserRole {
 45  USER_ROLE_UNSPECIFIED = 0;
 46  USER_ROLE_USER = 1;
 47  USER_ROLE_ADMIN = 2;
 48  USER_ROLE_MODERATOR = 3;
 49}
 50
 51// Request/Response messages
 52message GetUserRequest {
 53  oneof identifier {
 54    uint64 user_id = 1;
 55    string email = 2;
 56    string username = 3;
 57  }
 58}
 59
 60message CreateUserRequest {
 61  string email = 1;
 62  string username = 2;
 63  string password = 3;
 64  string first_name = 4;
 65  string last_name = 5;
 66}
 67
 68message UpdateUserRequest {
 69  uint64 user_id = 1;
 70  // Optional fields for partial updates
 71  optional string username = 2;
 72  optional string first_name = 3;
 73  optional string last_name = 4;
 74  optional string bio = 5;
 75  map<string, string> preferences = 6;
 76}
 77
 78message ListUsersRequest {
 79  uint32 page = 1;
 80  uint32 limit = 2;
 81  UserStatus status = 3;  // Filter by status
 82  string search = 4;      // Search term
 83}
 84
 85message ListUsersResponse {
 86  repeated User users = 1;
 87  uint32 total = 2;
 88  bool has_more = 3;
 89}
 90
 91// Service definition with comprehensive operations
 92service UserService {
 93  // Unary operations
 94  rpc GetUser(GetUserRequest) returns;
 95  rpc CreateUser(CreateUserRequest) returns;
 96  rpc UpdateUser(UpdateUserRequest) returns;
 97  rpc DeleteUser(GetUserRequest) returns;
 98
 99  // List operation with pagination
100  rpc ListUsers(ListUsersRequest) returns;
101
102  // Streaming operations
103  rpc StreamUsers(ListUsersRequest) returns;
104  rpc BulkCreateUsers(stream CreateUserRequest) returns;
105  rpc UserActivity(GetUserRequest) returns;
106}

The gRPC Communication Model

1. Unary RPC:

1// Traditional function call pattern
2response, err := client.GetUser(ctx, &user.GetUserRequest{UserId: 123})

2. Server Streaming:

1// Client receives multiple responses from single request
2stream, err := client.StreamUsers(ctx, &user.ListUsersRequest{Limit: 100})
3for {
4    user, err := stream.Recv()
5    if err == io.EOF { break }
6    // Process each user
7}

3. Client Streaming:

1// Client sends multiple requests, gets single response
2stream, err := client.BulkCreateUsers(ctx)
3for _, userData := range usersToCreate {
4    stream.Send(&user.CreateUserRequest{...})
5}
6response, err := stream.CloseAndRecv()

4. Bidirectional Streaming:

 1// Both client and server can send messages independently
 2stream, err := client.UserActivity(ctx)
 3go func() {
 4    for activity := range userActivities {
 5        stream.Send(&user.UserActivityRequest{...})
 6    }
 7    stream.CloseSend()
 8}()
 9
10for {
11    event, err := stream.Recv()
12    if err == io.EOF { break }
13    // Process server events
14}

Practical Examples - Building Production gRPC Services

Example 1: Basic Unary Service

Let's start with a complete user management service:

 1// api/user/v1/user.proto - Complete service definition
 2syntax = "proto3";
 3
 4package user.v1;
 5
 6import "google/protobuf/timestamp.proto";
 7import "google/protobuf/empty.proto";
 8
 9option go_package = "github.com/company/microservices/gen/go/user/v1;userv1";
10
11message User {
12  uint64 id = 1;
13  string email = 2;
14  string username = 3;
15  string first_name = 4;
16  string last_name = 5;
17  google.protobuf.Timestamp created_at = 6;
18  google.protobuf.Timestamp updated_at = 7;
19  UserStatus status = 8;
20}
21
22enum UserStatus {
23  USER_STATUS_UNSPECIFIED = 0;
24  USER_STATUS_ACTIVE = 1;
25  USER_STATUS_INACTIVE = 2;
26}
27
28message GetUserRequest {
29  oneof identifier {
30    uint64 user_id = 1;
31    string email = 2;
32    string username = 3;
33  }
34}
35
36message CreateUserRequest {
37  string email = 1;
38  string username = 2;
39  string password = 3;
40  string first_name = 4;
41  string last_name = 5;
42}
43
44message UpdateUserRequest {
45  uint64 user_id = 1;
46  optional string username = 2;
47  optional string first_name = 3;
48  optional string last_name = 4;
49  optional UserStatus status = 5;
50}
51
52service UserService {
53  rpc GetUser(GetUserRequest) returns;
54  rpc CreateUser(CreateUserRequest) returns;
55  rpc UpdateUser(UpdateUserRequest) returns;
56  rpc DeleteUser(GetUserRequest) returns;
57}

Generate Go code:

1// run
2# Install protoc and Go plugins
3go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
4go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
5
6# Generate Go code from proto files
7protoc --go_out=. --go_opt=paths=source_relative \
8    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
9    api/user/v1/*.proto

Server implementation:

  1// cmd/server/main.go - Complete gRPC server
  2
  3package main
  4
  5import (
  6	"context"
  7	"crypto/tls"
  8	"database/sql"
  9	"fmt"
 10	"log"
 11	"net"
 12	"os"
 13	"time"
 14
 15	"google.golang.org/grpc"
 16	"google.golang.org/grpc/codes"
 17	"google.golang.org/grpc/credentials"
 18	"google.golang.org/grpc/health"
 19	"google.golang.org/grpc/health/grpc_health_v1"
 20	"google.golang.org/grpc/reflection"
 21	"google.golang.org/grpc/status"
 22
 23	pb "github.com/company/microservices/gen/go/user/v1"
 24	"github.com/company/microservices/internal/user/repository"
 25	"github.com/company/microservices/internal/user/service"
 26)
 27
 28func main() {
 29	// Configuration
 30	config := struct {
 31		Port       string
 32		DBHost     string
 33		DBPort     string
 34		DBUser     string
 35		DBPassword string
 36		DBName     string
 37		TLSCert    string
 38		TLSKey     string
 39	}{
 40		Port:       getEnv("PORT", "50051"),
 41		DBHost:     getEnv("DB_HOST", "localhost"),
 42		DBPort:     getEnv("DB_PORT", "5432"),
 43		DBUser:     getEnv("DB_USER", "postgres"),
 44		DBPassword: getEnv("DB_PASSWORD", "password"),
 45		DBName:     getEnv("DB_NAME", "users"),
 46		TLSCert:    getEnv("TLS_CERT", "server.crt"),
 47		TLSKey:     getEnv("TLS_KEY", "server.key"),
 48	}
 49
 50	// Database connection
 51	db, err := connectDB(config)
 52	if err != nil {
 53		log.Fatalf("Failed to connect to database: %v", err)
 54	}
 55	defer db.Close()
 56
 57	// Repository layer
 58	userRepo := repository.NewPostgresRepository(db)
 59
 60	// Service layer
 61	userService := service.NewUserService(userRepo)
 62
 63	// Create gRPC server with interceptors
 64	server := grpc.NewServer(
 65		grpc.UnaryInterceptor(loggingUnaryInterceptor),
 66		grpc.StreamInterceptor(loggingStreamInterceptor),
 67		grpc.ChainUnaryInterceptor(
 68			authUnaryInterceptor,
 69			metricsUnaryInterceptor,
 70		),
 71	)
 72
 73	// Register services
 74	pb.RegisterUserServiceServer(server, userService)
 75
 76	// Health check service
 77	healthServer := health.NewServer()
 78	grpc_health_v1.RegisterHealthServer(server, healthServer)
 79	healthServer.SetServingStatus("user-service", grpc_health_v1.HealthCheckResponse_SERVING)
 80
 81	// Enable reflection for development
 82	reflection.Register(server)
 83
 84	// Start server with TLS
 85	lis, err := net.Listen("tcp", ":"+config.Port)
 86	if err != nil {
 87		log.Fatalf("Failed to listen: %v", err)
 88	}
 89
 90	// Load TLS certificates
 91	cert, err := tls.LoadX509KeyPair(config.TLSCert, config.TLSKey)
 92	if err != nil {
 93		log.Fatalf("Failed to load TLS certificates: %v", err)
 94	}
 95
 96	tlsConfig := &tls.Config{
 97		Certificates: []tls.Certificate{cert},
 98		ClientAuth:   tls.VerifyClientCertIfGiven,
 99	}
100
101	tlsCreds := credentials.NewTLS(tlsConfig)
102
103	// Create TLS listener
104	tlsLis := tls.NewListener(lis, tlsConfig)
105
106	log.Printf("Starting gRPC server on port %s with TLS", config.Port)
107	if err := server.Serve(tlsLis); err != nil {
108		log.Fatalf("Failed to serve: %v", err)
109	}
110}
111
112// Repository implementation
113type userRepository struct {
114	db *sql.DB
115}
116
117func NewPostgresRepository(db *sql.DB) *userRepository {
118	return &userRepository{db: db}
119}
120
121func GetByID(ctx context.Context, id uint64) {
122	query := `
123		SELECT id, email, username, first_name, last_name,
124		       created_at, updated_at, status
125		FROM users
126		WHERE id = $1 AND deleted_at IS NULL
127	`
128
129	user := &pb.User{}
130	var status pb.UserStatus
131	var createdAt, updatedAt time.Time
132
133	err := r.db.QueryRowContext(ctx, query, id).Scan(
134		&user.Id, &user.Email, &user.Username,
135		&user.FirstName, &user.LastName,
136		&createdAt, &updatedAt, &status,
137	)
138
139	if err != nil {
140		if err == sql.ErrNoRows {
141			return nil, status.Error(codes.NotFound, "user not found")
142		}
143		return nil, status.Error(codes.Internal, "database error")
144	}
145
146	user.CreatedAt = timestamppb.New(createdAt)
147	user.UpdatedAt = timestamppb.New(updatedAt)
148	user.Status = status
149
150	return user, nil
151}
152
153func GetByEmail(ctx context.Context, email string) {
154	// Similar implementation for email lookup
155}
156
157func Create(ctx context.Context, req *pb.CreateUserRequest) {
158	query := `
159		INSERT INTO users
160		VALUES, NOW())
161		RETURNING id, created_at, updated_at
162	`
163
164	user := &pb.User{
165		Email:     req.Email,
166		Username:  req.Username,
167		FirstName: req.FirstName,
168		LastName:  req.LastName,
169		Status:    pb.UserStatus_USER_STATUS_ACTIVE,
170	}
171
172	// Hash password
173	hashedPassword, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
174	if err != nil {
175		return nil, status.Error(codes.Internal, "failed to hash password")
176	}
177
178	var createdAt, updatedAt time.Time
179	err = r.db.QueryRowContext(ctx, query,
180		req.Email, req.Username, hashedPassword,
181		req.FirstName, req.LastName,
182	).Scan(&user.Id, &createdAt, &updatedAt)
183
184	if err != nil {
185		if isUniqueViolation(err) {
186			return nil, status.Error(codes.AlreadyExists, "user already exists")
187		}
188		return nil, status.Error(codes.Internal, "failed to create user")
189	}
190
191	user.CreatedAt = timestamppb.New(createdAt)
192	user.UpdatedAt = timestamppb.New(updatedAt)
193
194	return user, nil
195}
196
197// Service implementation
198type userService struct {
199	pb.UnimplementedUserServiceServer
200	repo repository.UserRepository
201}
202
203func NewUserService(repo repository.UserRepository) *userService {
204	return &userService{repo: repo}
205}
206
207func GetUser(ctx context.Context, req *pb.GetUserRequest) {
208	// Validate request
209	if req.GetIdentifier() == nil {
210		return nil, status.Error(codes.InvalidArgument, "identifier is required")
211	}
212
213	switch identifier := req.Identifier.(type) {
214	case *pb.GetUserRequest_UserId:
215		return s.repo.GetByID(ctx, identifier.UserId)
216	case *pb.GetUserRequest_Email:
217		return s.repo.GetByEmail(ctx, identifier.Email)
218	case *pb.GetUserRequest_Username:
219		return s.repo.GetByUsername(ctx, identifier.Username)
220	default:
221		return nil, status.Error(codes.InvalidArgument, "invalid identifier type")
222	}
223}
224
225func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
226	// Validate request
227	if err := validateCreateUserRequest(req); err != nil {
228		return nil, status.Error(codes.InvalidArgument, err.Error())
229	}
230
231	// Check if user already exists
232	if _, err := s.repo.GetByEmail(ctx, req.Email); err == nil {
233		return nil, status.Error(codes.AlreadyExists, "user with this email already exists")
234	}
235
236	// Create user
237	return s.repo.Create(ctx, req)
238}
239
240func UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) {
241	// Validate request
242	if req.UserId == 0 {
243		return nil, status.Error(codes.InvalidArgument, "user_id is required")
244	}
245
246	// Check if user exists
247	user, err := s.repo.GetByID(ctx, req.UserId)
248	if err != nil {
249		return nil, err
250	}
251
252	// Update fields if provided
253	if req.Username != nil {
254		user.Username = *req.Username
255	}
256	if req.FirstName != nil {
257		user.FirstName = *req.FirstName
258	}
259	if req.LastName != nil {
260		user.LastName = *req.LastName
261	}
262	if req.Status != nil {
263		user.Status = *req.Status
264	}
265
266	// Update in database
267	return s.repo.Update(ctx, user)
268}
269
270func DeleteUser(ctx context.Context, req *pb.GetUserRequest) {
271	// Similar implementation for soft delete
272	return &emptypb.Empty{}, nil
273}
274
275// Helper functions
276func validateCreateUserRequest(req *pb.CreateUserRequest) error {
277	if req.Email == "" {
278		return fmt.Errorf("email is required")
279	}
280	if req.Username == "" {
281		return fmt.Errorf("username is required")
282	}
283	if req.Password == "" {
284		return fmt.Errorf("password is required")
285	}
286	if len(req.Password) < 8 {
287		return fmt.Errorf("password must be at least 8 characters")
288	}
289	return nil
290}
291
292func connectDB(config struct{}) {
293	dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable",
294		config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)
295
296	db, err := sql.Open("postgres", dsn)
297	if err != nil {
298		return nil, err
299	}
300
301	// Test connection
302	if err := db.Ping(); err != nil {
303		return nil, err
304	}
305
306	// Configure connection pool
307	db.SetMaxOpenConns(25)
308	db.SetMaxIdleConns(5)
309	db.SetConnMaxLifetime(5 * time.Minute)
310
311	return db, nil
312}

Example 2: Advanced Streaming Patterns

  1// service/streaming.go - Advanced streaming implementations
  2
  3// Server streaming for real-time user updates
  4func StreamUsers(req *pb.ListUsersRequest, stream pb.UserService_StreamUsersServer) error {
  5	// Validate request
  6	if req.Limit == 0 {
  7		req.Limit = 100  // Default limit
  8	}
  9	if req.Limit > 1000 {
 10		return status.Error(codes.InvalidArgument, "limit cannot exceed 1000")
 11	}
 12
 13	// Create database cursor for streaming
 14	query := `
 15		SELECT id, email, username, first_name, last_name,
 16		       created_at, updated_at, status
 17		FROM users
 18		WHERE deleted_at IS NULL
 19	`
 20	args := []interface{}{}
 21
 22	if req.Status != pb.UserStatus_USER_STATUS_UNSPECIFIED {
 23		query += " AND status = $"
 24		args = append(args, int32(req.Status))
 25	}
 26	if req.Search != "" {
 27		query += " AND"
 28		searchPattern := "%" + req.Search + "%"
 29		args = append(args, searchPattern, searchPattern, searchPattern, searchPattern)
 30	}
 31	query += " ORDER BY created_at DESC"
 32
 33	// Execute query with cursor
 34	rows, err := s.db.QueryContext(stream.Context(), query, args...)
 35	if err != nil {
 36		return status.Error(codes.Internal, "database error")
 37	}
 38	defer rows.Close()
 39
 40	count := 0
 41	for rows.Next() {
 42		// Check if client has disconnected
 43		select {
 44		case <-stream.Context().Done():
 45			return stream.Context().Err()
 46		default:
 47		}
 48
 49		user := &pb.User{}
 50		var status pb.UserStatus
 51		var createdAt, updatedAt time.Time
 52
 53		err := rows.Scan(
 54			&user.Id, &user.Email, &user.Username,
 55			&user.FirstName, &user.LastName,
 56			&createdAt, &updatedAt, &status,
 57		)
 58		if err != nil {
 59			log.Printf("Error scanning row: %v", err)
 60			continue
 61		}
 62
 63		user.CreatedAt = timestamppb.New(createdAt)
 64		user.UpdatedAt = timestamppb.New(updatedAt)
 65		user.Status = status
 66
 67		// Send user to client
 68		if err := stream.Send(user); err != nil {
 69			return err
 70		}
 71
 72		count++
 73		if count >= int(req.Limit) {
 74			break
 75		}
 76	}
 77
 78	return nil
 79}
 80
 81// Client streaming for bulk operations
 82func BulkCreateUsers(stream pb.UserService_BulkCreateUsersServer) error {
 83	var users []*pb.CreateUserRequest
 84	var errors []*pb.BulkCreateError
 85
 86	// Receive all users from client
 87	for {
 88		req, err := stream.Recv()
 89		if err == io.EOF {
 90			break
 91		}
 92		if err != nil {
 93			return err
 94		}
 95
 96		// Validate each request
 97		if err := validateCreateUserRequest(req); err != nil {
 98			errors = append(errors, &pb.BulkCreateError{
 99				Email: req.Email,
100				Error: err.Error(),
101			})
102			continue
103		}
104
105		users = append(users, req)
106	}
107
108	// Create users in database
109	var createdUsers []*pb.User
110	for _, req := range users {
111		user, err := s.repo.Create(stream.Context(), req)
112		if err != nil {
113			errors = append(errors, &pb.BulkCreateError{
114				Email: req.Email,
115				Error: err.Error(),
116			})
117			continue
118		}
119		createdUsers = append(createdUsers, user)
120	}
121
122	// Send response with created users and errors
123	response := &pb.ListUsersResponse{
124		Users:    createdUsers,
125		Total:    uint32(len(createdUsers)),
126		Errors:   errors,
127		HasMore:  false,
128	}
129
130	return stream.SendAndClose(response)
131}
132
133// Bidirectional streaming for real-time user activity
134func UserActivity(stream pb.UserService_UserActivityServer) error {
135	// Subscribe user to activity updates
136	activityChan := make(chan *pb.UserActivityEvent, 100)
137	defer close(activityChan)
138
139	// Handle incoming requests in goroutine
140	go func() {
141		for {
142			req, err := stream.Recv()
143			if err == io.EOF {
144				break
145			}
146			if err != nil {
147				log.Printf("Error receiving activity request: %v", err)
148				return
149			}
150
151			// Subscribe user to specific events
152			switch req.GetAction() {
153			case pb.UserActivityRequest_SUBSCRIBE_USER_UPDATES:
154				s.activityManager.Subscribe(req.GetUserId(), activityChan)
155			case pb.UserActivityRequest_UNSUBSCRIBE_USER_UPDATES:
156				s.activityManager.Unsubscribe(req.GetUserId(), activityChan)
157			}
158		}
159	}()
160
161	// Send activity events to client
162	for {
163		select {
164		case event := <-activityChan:
165			if err := stream.Send(event); err != nil {
166				return err
167			}
168		case <-stream.Context().Done():
169			return stream.Context().Err()
170		}
171	}
172}

Example 3: Production Interceptors

 1// interceptors.go - Production-ready gRPC interceptors
 2
 3// Authentication interceptor
 4func authUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
 5	// Skip authentication for health check and reflection
 6	if info.FullMethod == "/grpc.health.v1.Health/Check" ||
 7		info.FullMethod == "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" {
 8		return handler(ctx, req)
 9	}
10
11	// Extract token from metadata
12	md, ok := metadata.FromIncomingContext(ctx)
13	if !ok {
14		return nil, status.Error(codes.Unauthenticated, "missing metadata")
15	}
16
17	tokens := md["authorization"]
18	if len(tokens) == 0 {
19		return nil, status.Error(codes.Unauthenticated, "missing authorization token")
20	}
21
22	// Validate token
23	userID, err := validateToken(tokens[0])
24	if err != nil {
25		return nil, status.Error(codes.Unauthenticated, "invalid token")
26	}
27
28	// Add user ID to context
29	newCtx := context.WithValue(ctx, "user_id", userID)
30	return handler(newCtx, req)
31}
32
33// Logging interceptor
34func loggingUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
35	start := time.Now()
36
37	// Log request
38	log.Printf("GRPC Request: %s", info.FullMethod)
39
40	// Call handler
41	resp, err := handler(ctx, req)
42
43	// Log response with duration
44	duration := time.Since(start)
45	if err != nil {
46		log.Printf("GRPC Error: %s - %v", info.FullMethod, err, duration)
47	} else {
48		log.Printf("GRPC Response: %s", info.FullMethod, duration)
49	}
50
51	return resp, err
52}
53
54// Metrics interceptor
55func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
56	start := time.Now()
57
58	resp, err := handler(ctx, req)
59	duration := time.Since(start)
60
61	// Record metrics
62	method := info.FullMethod
63	if err != nil {
64		grpcErrorCounter.WithLabelValues(method, getErrorCode(err)).Inc()
65	} else {
66		grpcSuccessCounter.WithLabelValues(method).Inc()
67	}
68	grpcDurationHistogram.WithLabelValues(method).Observe(duration.Seconds())
69
70	return resp, err
71}
72
73// Rate limiting interceptor
74func rateLimitUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) {
75	// Extract client ID
76	md, ok := metadata.FromIncomingContext(ctx)
77	if !ok {
78		return nil, status.Error(codes.Unauthenticated, "missing metadata")
79	}
80
81	clientIDs := md["x-client-id"]
82	if len(clientIDs) == 0 {
83		return nil, status.Error(codes.Unauthenticated, "missing client ID")
84	}
85
86	clientID := clientIDs[0]
87
88	// Check rate limit
89	if !rateLimiter.Allow(clientID) {
90		return nil, status.Error(codes.ResourceExhausted, "rate limit exceeded")
91	}
92
93	return handler(ctx, req)
94}

Example 4: Client Implementation with Best Practices

  1// client/user_client.go - Production-ready gRPC client
  2
  3package client
  4
  5import (
  6	"context"
  7	"crypto/tls"
  8	"fmt"
  9	"time"
 10
 11	"google.golang.org/grpc"
 12	"google.golang.org/grpc/credentials"
 13	"google.golang.org/grpc/credentials/insecure"
 14	"google.golang.org/grpc/keepalive"
 15	"google.golang.org/grpc/retry"
 16
 17	pb "github.com/company/microservices/gen/go/user/v1"
 18)
 19
 20type UserClient struct {
 21	conn   *grpc.ClientConn
 22	client pb.UserServiceClient
 23}
 24
 25func NewUserClient(addr string, useTLS bool, token string) {
 26	// Create connection with retry and keepalive
 27	dialOpts := []grpc.DialOption{
 28		grpc.WithKeepaliveParams(keepalive.ClientParameters{
 29			Time:                10 * time.Second,
 30			Timeout:             3 * time.Second,
 31			PermitWithoutStream: true,
 32		}),
 33		grpc.WithUnaryInterceptor(loggingClientInterceptor),
 34		grpc.WithStreamInterceptor(loggingStreamInterceptor),
 35		grpc.WithDefaultCallOptions(
 36			grpc.WaitForReady(true),
 37			grpc.MaxRetryRPCBufferSize(1024*1024), // 1MB
 38		),
 39	}
 40
 41	// Configure credentials
 42	if useTLS {
 43		creds := credentials.NewTLS(&tls.Config{
 44			ServerName:         "user-service",
 45			InsecureSkipVerify: false,
 46		})
 47		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
 48	} else {
 49		dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
 50	}
 51
 52	// Connect with retry
 53	conn, err := grpc.Dial(addr, dialOpts...)
 54	if err != nil {
 55		return nil, fmt.Errorf("failed to connect to user service: %w", err)
 56	}
 57
 58	// Create client with authentication interceptor
 59	if token != "" {
 60		authInterceptor := tokenAuthInterceptor(token)
 61		client := pb.NewUserServiceClient(
 62			grpc.NewClientConn(
 63				conn.Target(),
 64				grpc.WithTransportCredentials(conn.ConnectionState().TransportCredentials),
 65				grpc.WithUnaryInterceptor(authInterceptor),
 66			),
 67		)
 68		return &UserClient{conn: conn, client: client}, nil
 69	}
 70
 71	return &UserClient{
 72		conn:   conn,
 73		client: pb.NewUserServiceClient(conn),
 74	}, nil
 75}
 76
 77func GetUser(ctx context.Context, userID uint64) {
 78	req := &pb.GetUserRequest{
 79		Identifier: &pb.GetUserRequest_UserId{UserId: userID},
 80	}
 81
 82	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
 83	defer cancel()
 84
 85	return c.client.GetUser(ctx, req)
 86}
 87
 88func CreateUser(ctx context.Context, req *pb.CreateUserRequest) {
 89	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
 90	defer cancel()
 91
 92	return c.client.CreateUser(ctx, req)
 93}
 94
 95func StreamUsers(ctx context.Context, req *pb.ListUsersRequest) {
 96	stream, err := c.client.StreamUsers(ctx, req)
 97	if err != nil {
 98		return nil, err
 99	}
100
101	userChan := make(chan *pb.User, 100)
102
103	go func() {
104		defer close(userChan)
105		for {
106			user, err := stream.Recv()
107			if err == io.EOF {
108				return
109			}
110			if err != nil {
111				log.Printf("Error receiving user: %v", err)
112				return
113			}
114			userChan <- user
115		}
116	}()
117
118	return userChan, nil
119}
120
121func Close() error {
122	return c.conn.Close()
123}
124
125// Client interceptors
126func loggingClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
127	start := time.Now()
128	err := invoker(ctx, method, req, reply, cc, opts...)
129	duration := time.Since(start)
130
131	if err != nil {
132		log.Printf("GRPC Client Error: %s - %v", method, err, duration)
133	} else {
134		log.Printf("GRPC Client Response: %s", method, duration)
135	}
136
137	return err
138}
139
140func tokenAuthInterceptor(token string) grpc.UnaryClientInterceptor {
141	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
142		// Add token to metadata
143		md := metadata.Pairs("authorization", "Bearer "+token)
144		ctx = metadata.NewOutgoingContext(ctx, md)
145		return invoker(ctx, method, req, reply, cc, opts...)
146	}
147}

Common Patterns and Pitfalls

Pattern 1: Connection Pooling and Reuse

 1// ✅ GOOD: Use connection pooling and reuse
 2type ServiceRegistry struct {
 3	connections map[string]*grpc.ClientConn
 4	mu         sync.RWMutex
 5}
 6
 7func GetConnection(serviceName, address string) {
 8	r.mu.RLock()
 9	conn, exists := r.connections[serviceName]
10	r.mu.RUnlock()
11
12	if exists {
13		return conn, nil
14	}
15
16	r.mu.Lock()
17	defer r.mu.Unlock()
18
19	// Double-check after acquiring write lock
20	if conn, exists := r.connections[serviceName]; exists {
21		return conn, nil
22	}
23
24	// Create new connection
25	conn, err := grpc.Dial(address, grpc.WithInsecure())
26	if err != nil {
27		return nil, err
28	}
29
30	r.connections[serviceName] = conn
31	return conn, nil
32}
33
34// ❌ BAD: Creating new connections for each request
35func badGetUser(userID uint64) {
36	conn, err := grpc.Dial("localhost:50051") // New connection every call!
37	if err != nil {
38		return nil, err
39	}
40	defer conn.Close() // Immediately closed!
41
42	client := pb.NewUserServiceClient(conn)
43	return client.GetUser(context.Background(), &pb.GetUserRequest{UserId: userID})
44}

Pattern 2: Proper Error Handling

 1// ✅ GOOD: Proper gRPC error handling
 2func GetUser(ctx context.Context, req *pb.GetUserRequest) {
 3	user, err := s.repo.GetByID(ctx, req.GetUserId())
 4	if err != nil {
 5		switch {
 6		case errors.Is(err, repository.ErrNotFound):
 7			return nil, status.Error(codes.NotFound, "user not found")
 8		case errors.Is(err, repository.ErrInvalidID):
 9			return nil, status.Error(codes.InvalidArgument, "invalid user ID")
10		default:
11			// Log unexpected errors but don't expose details
12			log.Printf("Unexpected error getting user %d: %v", req.GetUserId(), err)
13			return nil, status.Error(codes.Internal, "internal error")
14		}
15	}
16	return user, nil
17}
18
19// ❌ BAD: Exposing internal errors
20func GetUserBad(ctx context.Context, req *pb.GetUserRequest) {
21	user, err := s.repo.GetByID(ctx, req.GetUserId())
22	if err != nil {
23		return nil, err // Exposes internal database errors!
24	}
25	return user, nil
26}

Pattern 3: Context Propagation

 1// ✅ GOOD: Proper context propagation and cancellation
 2func UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) {
 3	// Create a child context with timeout
 4	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
 5	defer cancel()
 6
 7	// Pass context to all operations
 8	user, err := s.repo.GetByID(ctx, req.UserId)
 9	if err != nil {
10		return nil, err
11	}
12
13	// Check if user has permission to update this resource
14	userID := ctx.Value("user_id").(uint64)
15	if userID != req.UserId && !isAdmin(userID) {
16		return nil, status.Error(codes.PermissionDenied, "insufficient permissions")
17	}
18
19	return s.repo.Update(ctx, user)
20}
21
22// ❌ BAD: Ignoring context cancellation
23func UpdateUserBad(ctx context.Context, req *pb.UpdateUserRequest) {
24	// Ignoring context passed by client
25	user, err := s.repo.GetByID(context.Background(), req.UserId)
26	if err != nil {
27		return nil, err
28	}
29
30	// Long-running operation without cancellation
31	time.Sleep(30 * time.Second) // Block for 30 seconds!
32
33	return s.repo.Update(context.Background(), user)
34}

Common Pitfalls

1. Not handling streaming errors properly

 1// ❌ BAD: Not checking for stream errors
 2for {
 3    user, _ := stream.Recv() // Ignoring error!
 4    processUser(user)
 5}
 6
 7// ✅ GOOD: Proper error handling
 8for {
 9    user, err := stream.Recv()
10    if err == io.EOF {
11        break // Normal termination
12    }
13    if err != nil {
14        return fmt.Errorf("stream error: %w", err)
15    }
16    processUser(user)
17}

2. Blocking on streaming calls without timeout

 1// ❌ BAD: No timeout for streaming
 2stream, err := client.StreamUsers(ctx, req)
 3if err != nil {
 4    return err
 5}
 6
 7for {
 8    user, err := stream.Recv() // Could block forever!
 9    if err == io.EOF { break }
10    // Process...
11}
12
13// ✅ GOOD: With timeout context
14ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
15defer cancel()
16
17stream, err := client.StreamUsers(ctx, req)
18if err != nil {
19    return err
20}
21
22for {
23    user, err := stream.Recv()
24    if err == io.EOF { break }
25    if err != nil { return err }
26    // Process...
27}

Integration and Mastery

Production Deployment Patterns

1. Load Balancing and Service Discovery:

 1// client/resolver.go - Custom resolver for service discovery
 2type ServiceDiscoveryResolver struct {
 3	serviceName string
 4	registry   ServiceRegistry
 5}
 6
 7func ResolveNow(o resolver.ResolveNowOptions) {
 8	// Trigger when endpoints change
 9}
10
11func Close() {
12	// Cleanup resources
13}
14
15// Build with custom resolver
16conn, err := grpc.Dial(
17	"discovery:///"+serviceName,
18	grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
19	grpc.WithResolvers(&ServiceDiscoveryResolver{}),
20)

2. Circuit Breaker Pattern:

 1// client/circuit_breaker.go - Circuit breaker for gRPC
 2type CircuitBreaker struct {
 3	maxFailures  int
 4	resetTimeout time.Duration
 5	failures     int
 6	lastFailTime time.Time
 7	mu           sync.Mutex
 8	state        State
 9}
10
11type State int
12
13const (
14	StateClosed State = iota
15	StateOpen
16	StateHalfOpen
17)
18
19func Call(ctx context.Context, call func() error) error {
20	cb.mu.Lock()
21	defer cb.mu.Unlock()
22
23	if cb.state == StateOpen {
24		if time.Since(cb.lastFailTime) > cb.resetTimeout {
25			cb.state = StateHalfOpen
26		} else {
27			return errors.New("circuit breaker is open")
28		}
29	}
30
31	err := call()
32	if err != nil {
33		cb.failures++
34		cb.lastFailTime = time.Now()
35		if cb.failures >= cb.maxFailures {
36			cb.state = StateOpen
37		}
38		return err
39	}
40
41	cb.failures = 0
42	cb.state = StateClosed
43	return nil
44}

Observability Integration

Metrics Collection:

 1// metrics/otel.go - OpenTelemetry integration
 2func initGRPCMetrics() {
 3	metrics := grpc_prometheus.ClientMetrics{
 4		ClientHandledCounter: prometheus.NewCounterVec(
 5			prometheus.CounterOpts{
 6				Name: "grpc_client_handled_total",
 7				Help: "Total number of RPCs completed by the client, regardless of success or failure.",
 8			},
 9			[]string{"grpc_service", "grpc_method", "grpc_code"},
10		),
11		ClientMsgReceived: prometheus.NewHistogramVec(
12			prometheus.HistogramOpts{
13				Name:    "grpc_client_msg_received_total",
14				Help:    "Total number of RPC messages received by the client.",
15				Buckets: prometheus.DefBuckets,
16			},
17			[]string{"grpc_service", "grpc_method"},
18		),
19	}
20
21	// Register metrics
22	prometheus.MustRegister(
23		metrics.ClientHandledCounter,
24		metrics.ClientMsgReceived,
25	)
26}

Testing gRPC Services

  1// tests/user_service_test.go - Comprehensive testing
  2
  3func TestUserService_GetUser(t *testing.T) {
  4	// Setup test database
  5	db := setupTestDB(t)
  6	defer db.Close()
  7
  8	repo := repository.NewPostgresRepository(db)
  9	service := &userService{repo: repo}
 10
 11	tests := []struct {
 12		name    string
 13		req     *pb.GetUserRequest
 14		want    *pb.User
 15		wantErr bool
 16		errCode codes.Code
 17	}{
 18		{
 19			name: "valid user ID",
 20			req: &pb.GetUserRequest{
 21				Identifier: &pb.GetUserRequest_UserId{UserId: 1},
 22			},
 23			want: &pb.User{
 24				Id:        1,
 25				Email:     "test@example.com",
 26				Username:  "testuser",
 27				FirstName: "Test",
 28				LastName:  "User",
 29				Status:    pb.UserStatus_USER_STATUS_ACTIVE,
 30			},
 31			wantErr: false,
 32		},
 33		{
 34			name: "non-existent user",
 35			req: &pb.GetUserRequest{
 36				Identifier: &pb.GetUserRequest_UserId{UserId: 999},
 37			},
 38			want:    nil,
 39			wantErr: true,
 40			errCode: codes.NotFound,
 41		},
 42		{
 43			name:    "invalid request",
 44			req:     &pb.GetUserRequest{},
 45			want:    nil,
 46			wantErr: true,
 47			errCode: codes.InvalidArgument,
 48		},
 49	}
 50
 51	for _, tt := range tests {
 52		t.Run(tt.name, func(t *testing.T) {
 53			got, err := service.GetUser(context.Background(), tt.req)
 54
 55			if tt.wantErr {
 56				require.Error(t, err)
 57				st, ok := status.FromError(err)
 58				require.True(t, ok)
 59				assert.Equal(t, tt.errCode, st.Code())
 60			} else {
 61				require.NoError(t, err)
 62				assert.Equal(t, tt.want.Id, got.Id)
 63				assert.Equal(t, tt.want.Email, got.Email)
 64			}
 65		})
 66	}
 67}
 68
 69// Integration test with in-memory gRPC server
 70func TestUserService_Integration(t *testing.T) {
 71	// Create in-memory server
 72	srv := grpc.NewServer()
 73	userService := setupUserService(t)
 74	pb.RegisterUserServiceServer(srv, userService)
 75
 76	// Create listener
 77	lis := bufconn.Listen(1024 * 1024)
 78
 79	// Start server in goroutine
 80	go func() {
 81		if err := srv.Serve(lis); err != nil {
 82			log.Fatalf("Server exited with error: %v", err)
 83		}
 84	}()
 85	defer srv.Stop()
 86
 87	// Create client connection
 88	conn, err := grpc.DialContext(
 89		context.Background(),
 90		"bufnet",
 91		grpc.WithContextDialer(bufconn.Dialer),
 92		grpc.WithInsecure(),
 93	)
 94	require.NoError(t, err)
 95	defer conn.Close()
 96
 97	client := pb.NewUserServiceClient(conn)
 98
 99	// Test create and get user
100	createReq := &pb.CreateUserRequest{
101		Email:     "integration@test.com",
102		Username:  "integration",
103		Password:  "password123",
104		FirstName: "Integration",
105		LastName:  "Test",
106	}
107
108	createdUser, err := client.CreateUser(context.Background(), createReq)
109	require.NoError(t, err)
110	assert.NotZero(t, createdUser.Id)
111
112	getReq := &pb.GetUserRequest{
113		Identifier: &pb.GetUserRequest_UserId{UserId: createdUser.Id},
114	}
115
116	retrievedUser, err := client.GetUser(context.Background(), getReq)
117	require.NoError(t, err)
118	assert.Equal(t, createdUser.Id, retrievedUser.Id)
119	assert.Equal(t, createReq.Email, retrievedUser.Email)
120}

Practice Exercises

Exercise 1: Real-Time Notification System

Objective: Build a bidirectional streaming service for real-time notifications.

Requirements:

  1. Client subscribes to notification types
  2. Server pushes notifications in real-time
  3. Client can send acknowledgment receipts
  4. Handle connection drops and reconnection
  5. Implement authentication and authorization

Exercise 2: File Upload Service with Progress

Objective: Create a streaming service for large file uploads with progress tracking.

Requirements:

  1. Client streams file in chunks
  2. Server provides progress updates
  3. Support for pause/resume uploads
  4. File integrity verification
  5. Concurrent uploads handling

Exercise 3: Load-Balanced gRPC Service Pool

Objective: Implement client-side load balancing across multiple service instances.

Requirements:

  1. Dynamic service discovery
  2. Multiple load balancing strategies
  3. Health checking and failover
  4. Circuit breaker integration
  5. Metrics collection

Exercise 4: gRPC Gateway and REST API Integration

Objective: Create a REST gateway that exposes gRPC services as HTTP APIs.

Requirements:

  1. HTTP/JSON to gRPC translation
  2. Error code mapping
  3. Authentication middleware
  4. OpenAPI documentation generation
  5. Request/response transformation

Exercise 5: Advanced gRPC Metrics and Observability

Objective: Implement comprehensive observability for gRPC services.

Requirements:

  1. OpenTelemetry tracing integration
  2. Custom metrics collection
  3. Distributed context propagation
  4. Performance profiling
  5. Alerting on SLA violations

Advanced Topics - Production gRPC at Scale

Load Balancing Strategies

gRPC supports multiple load balancing strategies for distributing requests across service instances:

  1// client/load_balancing.go - Advanced load balancing configurations
  2
  3package client
  4
  5import (
  6	"context"
  7	"fmt"
  8	"log"
  9	"time"
 10
 11	"google.golang.org/grpc"
 12	"google.golang.org/grpc/balancer"
 13	"google.golang.org/grpc/balancer/roundrobin"
 14	"google.golang.org/grpc/credentials/insecure"
 15	"google.golang.org/grpc/resolver"
 16	"google.golang.org/grpc/resolver/manual"
 17
 18	pb "github.com/company/microservices/gen/go/user/v1"
 19)
 20
 21// Custom resolver for service discovery
 22type CustomResolver struct {
 23	target     resolver.Target
 24	cc         resolver.ClientConn
 25	updateChan chan []resolver.Address
 26}
 27
 28func NewCustomResolver(target resolver.Target, cc resolver.ClientConn) *CustomResolver {
 29	r := &CustomResolver{
 30		target:     target,
 31		cc:         cc,
 32		updateChan: make(chan []resolver.Address, 10),
 33	}
 34	go r.watcher()
 35	return r
 36}
 37
 38func (r *CustomResolver) watcher() {
 39	for {
 40		select {
 41		case addrs := <-r.updateChan:
 42			// Update service endpoints dynamically
 43			r.cc.UpdateState(resolver.State{
 44				Addresses: addrs,
 45			})
 46		}
 47	}
 48}
 49
 50func (r *CustomResolver) UpdateEndpoints(endpoints []string) {
 51	var addrs []resolver.Address
 52	for _, ep := range endpoints {
 53		addrs = append(addrs, resolver.Address{Addr: ep})
 54	}
 55	r.updateChan <- addrs
 56}
 57
 58func (r *CustomResolver) ResolveNow(o resolver.ResolveNowOptions) {}
 59func (r *CustomResolver) Close()                                 {}
 60
 61// Example: Client with round-robin load balancing
 62func NewLoadBalancedClient(serviceEndpoints []string) (*pb.UserServiceClient, error) {
 63	// Create manual resolver
 64	r := manual.NewBuilderWithScheme("custom")
 65
 66	// Add service endpoints
 67	var addrs []resolver.Address
 68	for _, endpoint := range serviceEndpoints {
 69		addrs = append(addrs, resolver.Address{
 70			Addr: endpoint,
 71		})
 72	}
 73	r.InitialState(resolver.State{Addresses: addrs})
 74
 75	// Create connection with round-robin load balancing
 76	conn, err := grpc.Dial(
 77		"custom:///user-service",
 78		grpc.WithResolvers(r),
 79		grpc.WithTransportCredentials(insecure.NewCredentials()),
 80		grpc.WithDefaultServiceConfig(`{
 81			"loadBalancingPolicy": "round_robin",
 82			"healthCheckConfig": {
 83				"serviceName": "user-service"
 84			}
 85		}`),
 86	)
 87	if err != nil {
 88		return nil, fmt.Errorf("failed to dial: %w", err)
 89	}
 90
 91	client := pb.NewUserServiceClient(conn)
 92	return &client, nil
 93}
 94
 95// Example: Pick-first load balancing (fastest connection)
 96func NewPickFirstClient(serviceEndpoints []string) (*pb.UserServiceClient, error) {
 97	r := manual.NewBuilderWithScheme("pickfirst")
 98
 99	var addrs []resolver.Address
100	for _, endpoint := range serviceEndpoints {
101		addrs = append(addrs, resolver.Address{Addr: endpoint})
102	}
103	r.InitialState(resolver.State{Addresses: addrs})
104
105	conn, err := grpc.Dial(
106		"pickfirst:///user-service",
107		grpc.WithResolvers(r),
108		grpc.WithTransportCredentials(insecure.NewCredentials()),
109		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "pick_first"}`),
110	)
111	if err != nil {
112		return nil, fmt.Errorf("failed to dial: %w", err)
113	}
114
115	client := pb.NewUserServiceClient(conn)
116	return &client, nil
117}

Service Discovery Integration

  1// discovery/consul.go - Consul-based service discovery
  2
  3package discovery
  4
  5import (
  6	"context"
  7	"fmt"
  8	"log"
  9	"time"
 10
 11	consul "github.com/hashicorp/consul/api"
 12	"google.golang.org/grpc"
 13	"google.golang.org/grpc/resolver"
 14)
 15
 16// ConsulResolver implements gRPC resolver interface
 17type ConsulResolver struct {
 18	consulClient *consul.Client
 19	serviceName  string
 20	cc           resolver.ClientConn
 21	stopCh       chan struct{}
 22}
 23
 24func NewConsulResolver(consulAddr, serviceName string, cc resolver.ClientConn) (*ConsulResolver, error) {
 25	config := consul.DefaultConfig()
 26	config.Address = consulAddr
 27
 28	client, err := consul.NewClient(config)
 29	if err != nil {
 30		return nil, fmt.Errorf("failed to create consul client: %w", err)
 31	}
 32
 33	cr := &ConsulResolver{
 34		consulClient: client,
 35		serviceName:  serviceName,
 36		cc:           cc,
 37		stopCh:       make(chan struct{}),
 38	}
 39
 40	// Start watching for service changes
 41	go cr.watcher()
 42
 43	return cr, nil
 44}
 45
 46func (cr *ConsulResolver) watcher() {
 47	ticker := time.NewTicker(10 * time.Second)
 48	defer ticker.Stop()
 49
 50	for {
 51		select {
 52		case <-ticker.C:
 53			if err := cr.updateEndpoints(); err != nil {
 54				log.Printf("Failed to update endpoints: %v", err)
 55			}
 56		case <-cr.stopCh:
 57			return
 58		}
 59	}
 60}
 61
 62func (cr *ConsulResolver) updateEndpoints() error {
 63	// Query Consul for healthy service instances
 64	services, _, err := cr.consulClient.Health().Service(
 65		cr.serviceName,
 66		"",
 67		true, // only healthy instances
 68		nil,
 69	)
 70	if err != nil {
 71		return fmt.Errorf("failed to query consul: %w", err)
 72	}
 73
 74	// Convert to gRPC addresses
 75	var addrs []resolver.Address
 76	for _, service := range services {
 77		addr := resolver.Address{
 78			Addr: fmt.Sprintf("%s:%d", service.Service.Address, service.Service.Port),
 79			Attributes: nil,
 80		}
 81		addrs = append(addrs, addr)
 82	}
 83
 84	// Update gRPC with new addresses
 85	cr.cc.UpdateState(resolver.State{
 86		Addresses: addrs,
 87	})
 88
 89	log.Printf("Updated service endpoints: %d instances found", len(addrs))
 90	return nil
 91}
 92
 93func (cr *ConsulResolver) ResolveNow(o resolver.ResolveNowOptions) {
 94	go cr.updateEndpoints()
 95}
 96
 97func (cr *ConsulResolver) Close() {
 98	close(cr.stopCh)
 99}
100
101// Register service with Consul
102func RegisterService(consulAddr, serviceID, serviceName, serviceAddr string, port int) error {
103	config := consul.DefaultConfig()
104	config.Address = consulAddr
105
106	client, err := consul.NewClient(config)
107	if err != nil {
108		return fmt.Errorf("failed to create consul client: %w", err)
109	}
110
111	registration := &consul.AgentServiceRegistration{
112		ID:      serviceID,
113		Name:    serviceName,
114		Address: serviceAddr,
115		Port:    port,
116		Check: &consul.AgentServiceCheck{
117			GRPC:                           fmt.Sprintf("%s:%d", serviceAddr, port),
118			Interval:                       "10s",
119			DeregisterCriticalServiceAfter: "30s",
120		},
121	}
122
123	return client.Agent().ServiceRegister(registration)
124}
125
126// Deregister service from Consul
127func DeregisterService(consulAddr, serviceID string) error {
128	config := consul.DefaultConfig()
129	config.Address = consulAddr
130
131	client, err := consul.NewClient(config)
132	if err != nil {
133		return fmt.Errorf("failed to create consul client: %w", err)
134	}
135
136	return client.Agent().ServiceDeregister(serviceID)
137}

gRPC-Gateway: Bridging REST and gRPC

 1// api/user/v1/user.proto - With HTTP annotations for REST gateway
 2
 3syntax = "proto3";
 4
 5package user.v1;
 6
 7import "google/api/annotations.proto";
 8import "google/protobuf/timestamp.proto";
 9import "google/protobuf/empty.proto";
10
11option go_package = "github.com/company/microservices/gen/go/user/v1;userv1";
12
13message User {
14  uint64 id = 1;
15  string email = 2;
16  string username = 3;
17  string first_name = 4;
18  string last_name = 5;
19  google.protobuf.Timestamp created_at = 6;
20  google.protobuf.Timestamp updated_at = 7;
21  UserStatus status = 8;
22}
23
24enum UserStatus {
25  USER_STATUS_UNSPECIFIED = 0;
26  USER_STATUS_ACTIVE = 1;
27  USER_STATUS_INACTIVE = 2;
28}
29
30message GetUserRequest {
31  oneof identifier {
32    uint64 user_id = 1;
33    string email = 2;
34    string username = 3;
35  }
36}
37
38message CreateUserRequest {
39  string email = 1;
40  string username = 2;
41  string password = 3;
42  string first_name = 4;
43  string last_name = 5;
44}
45
46message UpdateUserRequest {
47  uint64 user_id = 1;
48  optional string username = 2;
49  optional string first_name = 3;
50  optional string last_name = 4;
51  optional UserStatus status = 5;
52}
53
54// Service with HTTP annotations
55service UserService {
56  rpc GetUser(GetUserRequest) returns (User) {
57    option (google.api.http) = {
58      get: "/v1/users/{user_id}"
59    };
60  }
61
62  rpc CreateUser(CreateUserRequest) returns (User) {
63    option (google.api.http) = {
64      post: "/v1/users"
65      body: "*"
66    };
67  }
68
69  rpc UpdateUser(UpdateUserRequest) returns (User) {
70    option (google.api.http) = {
71      put: "/v1/users/{user_id}"
72      body: "*"
73    };
74  }
75
76  rpc DeleteUser(GetUserRequest) returns (google.protobuf.Empty) {
77    option (google.api.http) = {
78      delete: "/v1/users/{user_id}"
79    };
80  }
81}
 1// cmd/gateway/main.go - gRPC-Gateway REST proxy
 2
 3package main
 4
 5import (
 6	"context"
 7	"fmt"
 8	"log"
 9	"net/http"
10
11	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
12	"google.golang.org/grpc"
13	"google.golang.org/grpc/credentials/insecure"
14
15	gw "github.com/company/microservices/gen/go/user/v1"
16)
17
18func main() {
19	ctx := context.Background()
20	ctx, cancel := context.WithCancel(ctx)
21	defer cancel()
22
23	// Create gRPC-Gateway mux
24	mux := runtime.NewServeMux(
25		runtime.WithIncomingHeaderMatcher(customHeaderMatcher),
26		runtime.WithErrorHandler(customErrorHandler),
27	)
28
29	// gRPC server endpoint
30	grpcServerEndpoint := "localhost:50051"
31
32	opts := []grpc.DialOption{
33		grpc.WithTransportCredentials(insecure.NewCredentials()),
34	}
35
36	// Register gRPC-Gateway handlers
37	err := gw.RegisterUserServiceHandlerFromEndpoint(ctx, mux, grpcServerEndpoint, opts)
38	if err != nil {
39		log.Fatalf("Failed to register gateway: %v", err)
40	}
41
42	// HTTP server for REST API
43	httpServer := &http.Server{
44		Addr:    ":8080",
45		Handler: corsMiddleware(mux),
46	}
47
48	log.Println("Starting REST gateway on :8080")
49	log.Fatal(httpServer.ListenAndServe())
50}
51
52// Custom header matcher for authentication tokens
53func customHeaderMatcher(key string) (string, bool) {
54	switch key {
55	case "Authorization", "X-Request-Id", "X-User-Id":
56		return key, true
57	default:
58		return runtime.DefaultHeaderMatcher(key)
59	}
60}
61
62// Custom error handler for consistent error responses
63func customErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler,
64	w http.ResponseWriter, r *http.Request, err error) {
65
66	w.Header().Set("Content-Type", "application/json")
67
68	// Convert gRPC error to HTTP status
69	httpStatus := runtime.HTTPStatusFromCode(grpc.Code(err))
70	w.WriteHeader(httpStatus)
71
72	// Write error response
73	errorResponse := map[string]interface{}{
74		"error": err.Error(),
75		"code":  grpc.Code(err).String(),
76	}
77
78	marshaler.NewEncoder(w).Encode(errorResponse)
79}
80
81// CORS middleware for REST API
82func corsMiddleware(h http.Handler) http.Handler {
83	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
84		w.Header().Set("Access-Control-Allow-Origin", "*")
85		w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
86		w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Request-Id")
87
88		if r.Method == "OPTIONS" {
89			w.WriteHeader(http.StatusOK)
90			return
91		}
92
93		h.ServeHTTP(w, r)
94	})
95}

Performance Optimization Techniques

  1// performance/pooling.go - Connection and buffer pooling
  2
  3package performance
  4
  5import (
  6	"sync"
  7
  8	"google.golang.org/grpc"
  9	"google.golang.org/protobuf/proto"
 10)
 11
 12// ConnectionPool manages gRPC connections efficiently
 13type ConnectionPool struct {
 14	mu          sync.RWMutex
 15	connections []*grpc.ClientConn
 16	current     int
 17	size        int
 18}
 19
 20func NewConnectionPool(target string, size int, opts ...grpc.DialOption) (*ConnectionPool, error) {
 21	pool := &ConnectionPool{
 22		connections: make([]*grpc.ClientConn, size),
 23		size:        size,
 24	}
 25
 26	// Create pool of connections
 27	for i := 0; i < size; i++ {
 28		conn, err := grpc.Dial(target, opts...)
 29		if err != nil {
 30			// Close previously created connections
 31			for j := 0; j < i; j++ {
 32				pool.connections[j].Close()
 33			}
 34			return nil, err
 35		}
 36		pool.connections[i] = conn
 37	}
 38
 39	return pool, nil
 40}
 41
 42// Get returns next connection in round-robin fashion
 43func (p *ConnectionPool) Get() *grpc.ClientConn {
 44	p.mu.Lock()
 45	defer p.mu.Unlock()
 46
 47	conn := p.connections[p.current]
 48	p.current = (p.current + 1) % p.size
 49	return conn
 50}
 51
 52// Close all connections in pool
 53func (p *ConnectionPool) Close() error {
 54	p.mu.Lock()
 55	defer p.mu.Unlock()
 56
 57	for _, conn := range p.connections {
 58		if err := conn.Close(); err != nil {
 59			return err
 60		}
 61	}
 62	return nil
 63}
 64
 65// MessagePool reuses protobuf messages to reduce allocations
 66type MessagePool struct {
 67	pool sync.Pool
 68}
 69
 70func NewMessagePool(newFunc func() proto.Message) *MessagePool {
 71	return &MessagePool{
 72		pool: sync.Pool{
 73			New: func() interface{} {
 74				return newFunc()
 75			},
 76		},
 77	}
 78}
 79
 80func (mp *MessagePool) Get() proto.Message {
 81	return mp.pool.Get().(proto.Message)
 82}
 83
 84func (mp *MessagePool) Put(msg proto.Message) {
 85	// Reset message before returning to pool
 86	proto.Reset(msg)
 87	mp.pool.Put(msg)
 88}
 89
 90// Example usage with buffer pooling
 91type BufferedService struct {
 92	userPool *MessagePool
 93}
 94
 95func NewBufferedService() *BufferedService {
 96	return &BufferedService{
 97		userPool: NewMessagePool(func() proto.Message {
 98			return &pb.User{}
 99		}),
100	}
101}
102
103func (s *BufferedService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
104	// Get user from pool
105	user := s.userPool.Get().(*pb.User)
106	defer s.userPool.Put(user)
107
108	// Populate user data
109	user.Id = req.GetUserId()
110	user.Email = "user@example.com"
111
112	// Clone for return (pool object will be reset)
113	return proto.Clone(user).(*pb.User), nil
114}

Distributed Tracing with OpenTelemetry

 1// tracing/otel.go - OpenTelemetry integration for distributed tracing
 2
 3package tracing
 4
 5import (
 6	"context"
 7	"fmt"
 8
 9	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
10	"go.opentelemetry.io/otel"
11	"go.opentelemetry.io/otel/attribute"
12	"go.opentelemetry.io/otel/exporters/jaeger"
13	"go.opentelemetry.io/otel/sdk/resource"
14	sdktrace "go.opentelemetry.io/otel/sdk/trace"
15	semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
16	"go.opentelemetry.io/otel/trace"
17	"google.golang.org/grpc"
18)
19
20// InitTracer initializes OpenTelemetry with Jaeger exporter
21func InitTracer(serviceName, jaegerEndpoint string) (*sdktrace.TracerProvider, error) {
22	// Create Jaeger exporter
23	exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerEndpoint)))
24	if err != nil {
25		return nil, fmt.Errorf("failed to create jaeger exporter: %w", err)
26	}
27
28	// Create tracer provider
29	tp := sdktrace.NewTracerProvider(
30		sdktrace.WithBatcher(exporter),
31		sdktrace.WithResource(resource.NewWithAttributes(
32			semconv.SchemaURL,
33			semconv.ServiceNameKey.String(serviceName),
34		)),
35	)
36
37	// Register as global tracer provider
38	otel.SetTracerProvider(tp)
39
40	return tp, nil
41}
42
43// Server with tracing
44func NewTracedServer() *grpc.Server {
45	return grpc.NewServer(
46		grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
47		grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
48	)
49}
50
51// Client with tracing
52func NewTracedClient(target string) (*grpc.ClientConn, error) {
53	return grpc.Dial(
54		target,
55		grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
56		grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
57	)
58}
59
60// Custom span with attributes
61func AddCustomSpan(ctx context.Context, operationName string) (context.Context, trace.Span) {
62	tracer := otel.Tracer("user-service")
63	ctx, span := tracer.Start(ctx, operationName)
64
65	// Add custom attributes
66	span.SetAttributes(
67		attribute.String("service.version", "v1.0.0"),
68		attribute.String("deployment.environment", "production"),
69	)
70
71	return ctx, span
72}
73
74// Example traced service operation
75func (s *userService) GetUserWithTracing(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
76	// Create custom span
77	ctx, span := AddCustomSpan(ctx, "GetUser.Database")
78	defer span.End()
79
80	// Add request details to span
81	span.SetAttributes(
82		attribute.Int64("user.id", int64(req.GetUserId())),
83	)
84
85	// Perform database operation
86	user, err := s.repo.GetByID(ctx, req.GetUserId())
87	if err != nil {
88		span.RecordError(err)
89		return nil, err
90	}
91
92	// Add response details
93	span.SetAttributes(
94		attribute.String("user.email", user.Email),
95		attribute.String("user.status", user.Status.String()),
96	)
97
98	return user, nil
99}

Summary

Key Takeaways

  1. gRPC provides significant performance benefits over REST for microservice communication
  2. Protocol Buffers ensure type safety and enable efficient code generation
  3. Streaming enables real-time communication patterns not possible with traditional HTTP
  4. Interceptors provide powerful middleware for cross-cutting concerns
  5. Proper error handling and context management are critical for production services
  6. Load balancing and service discovery enable horizontal scaling and high availability
  7. gRPC-Gateway bridges REST and gRPC for backward compatibility
  8. OpenTelemetry integration provides comprehensive distributed tracing

Production Deployment Checklist

Before deploying gRPC services to production:

Security:

  • Enable TLS/mTLS for all connections
  • Implement authentication interceptors
  • Validate all input data
  • Rate limit client connections

Reliability:

  • Configure retry policies with exponential backoff
  • Implement circuit breakers
  • Set appropriate timeouts
  • Enable health checking

Performance:

  • Use connection pooling
  • Enable HTTP/2 multiplexing
  • Implement message pooling for high-frequency operations
  • Profile and optimize hot paths

Observability:

  • Integrate distributed tracing (OpenTelemetry)
  • Export metrics (Prometheus)
  • Implement structured logging
  • Set up alerting on SLA violations

Operations:

  • Document all service APIs
  • Version your Protocol Buffers
  • Implement graceful shutdown
  • Plan for backward compatibility

Next Steps

  • Explore advanced load balancing patterns including client-side and service mesh integration
  • Study gRPC-Gateway for bridging REST and gRPC systems
  • Learn about service mesh patterns with Istio and Linkerd
  • Investigate gRPC-Web for browser compatibility
  • Master OpenTelemetry integration for distributed tracing

Further Reading

You now have the knowledge to build production-ready gRPC services that can handle high-throughput, real-time communication between microservices. The patterns and practices covered here will help you create scalable, maintainable, and observable gRPC architectures.