DevOps Toolchain with Go

DevOps Toolchain with Go

Exercise Overview

Build a comprehensive DevOps toolchain for Go applications. You'll create CI/CD pipelines, monitoring systems, logging infrastructure, and automation tools using Go's excellent DevOps ecosystem.

Learning Objectives

  • Design and implement CI/CD pipelines for Go applications
  • Build monitoring and alerting systems with Prometheus
  • Create structured logging and log aggregation
  • Implement infrastructure as code with Go tools
  • Build deployment automation and rollback mechanisms
  • Add configuration management and secret handling
  • Create health checks and readiness probes
  • Implement automated testing and quality gates

The System - DevOps Platform

You'll build a DevOps platform that manages the complete lifecycle of Go applications from development to production.

Initial Structure

 1// cmd/devops-platform/main.go
 2package main
 3
 4import (
 5    "fmt"
 6    "log"
 7    "os"
 8)
 9
10// TODO: Implement CI/CD pipeline engine
11type PipelineEngine struct {
12    // Add pipeline engine implementation
13}
14
15// TODO: Implement monitoring system
16type MonitoringSystem struct {
17    // Add monitoring system implementation
18}
19
20// TODO: Implement logging infrastructure
21type LoggingInfrastructure struct {
22    // Add logging infrastructure implementation
23}
24
25// TODO: Implement deployment automation
26type DeploymentAutomation struct {
27    // Add deployment automation implementation
28}
29
30func main() {
31    // TODO: Build complete DevOps platform
32}

Tasks

Task 1: Build CI/CD Pipeline Engine

Create a flexible CI/CD pipeline system:

  1// pkg/pipeline/engine.go
  2package pipeline
  3
  4import (
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"log"
  9	"os"
 10	"path/filepath"
 11	"sync"
 12	"time"
 13
 14	"github.com/go-git/go-git/v5"
 15	"github.com/go-git/go-git/v5/plumbing"
 16	"gopkg.in/yaml.v3"
 17)
 18
 19type Pipeline struct {
 20	ID          string            `yaml:"id"`
 21	Name        string            `yaml:"name"`
 22	Trigger     Trigger           `yaml:"trigger"`
 23	Environment Environment       `yaml:"environment"`
 24	Stages      []Stage           `yaml:"stages"`
 25	Notifications []Notification  `yaml:"notifications"`
 26	Variables   map[string]string `yaml:"variables"`
 27}
 28
 29type Trigger struct {
 30	Type       string   `yaml:"type"` // webhook, schedule, manual
 31	Branches   []string `yaml:"branches"`
 32	Schedule   string   `yaml:"schedule"`
 33	Events     []string `yaml:"events"`
 34}
 35
 36type Environment struct {
 37	Name         string            `yaml:"name"`
 38	Image        string            `yaml:"image"`
 39	Variables    map[string]string `yaml:"variables"`
 40	Services     []Service         `yaml:"services"`
 41	CachePaths   []string          `yaml:"cache_paths"`
 42	Timeout      time.Duration     `yaml:"timeout"`
 43}
 44
 45type Service struct {
 46	Name    string            `yaml:"name"`
 47	Image   string            `yaml:"image"`
 48	Ports   []string          `yaml:"ports"`
 49	Env     map[string]string `yaml:"env"`
 50	Command []string          `yaml:"command"`
 51}
 52
 53type Stage struct {
 54	Name        string            `yaml:"name"`
 55	Image       string            `yaml:"image"`
 56	Script      []string          `yaml:"script"`
 57	Artifacts   []Artifact        `yaml:"artifacts"`
 58	DependsOn   []string          `yaml:"depends_on"`
 59	Condition   string            `yaml:"condition"`
 60	Variables   map[string]string `yaml:"variables"`
 61	Timeout     time.Duration     `yaml:"timeout"`
 62	Retry       int               `yaml:"retry"`
 63	Parallel    bool              `yaml:"parallel"`
 64}
 65
 66type Artifact struct {
 67	Path   string `yaml:"path"`
 68	Type   string `yaml:"type"` // file, directory, report
 69	Expire string `yaml:"expire"`
 70}
 71
 72type Notification struct {
 73	Type   string            `yaml:"type"` // slack, email, webhook
 74	Target string            `yaml:"target"`
 75	Events []string          `yaml:"events"`
 76	Config map[string]string `yaml:"config"`
 77}
 78
 79type PipelineExecution struct {
 80	ID          string        `json:"id"`
 81	Pipeline    *Pipeline     `json:"pipeline"`
 82	Status      string        `json:"status"`
 83	StartedAt   time.Time     `json:"started_at"`
 84	CompletedAt *time.Time    `json:"completed_at"`
 85	Commit      string        `json:"commit"`
 86	Branch      string        `json:"branch"`
 87	TriggeredBy string        `json:"triggered_by"`
 88	Variables   map[string]string `json:"variables"`
 89	Stages      []StageExecution `json:"stages"`
 90	Logs        []LogEntry    `json:"logs"`
 91	Artifacts   []ArtifactFile `json:"artifacts"`
 92}
 93
 94type StageExecution struct {
 95	Name        string     `json:"name"`
 96	Status      string     `json:"status"`
 97	StartedAt   time.Time  `json:"started_at"`
 98	CompletedAt *time.Time `json:"completed_at"`
 99	Duration    string     `json:"duration"`
100	Logs        string     `json:"logs"`
101	Artifacts   []ArtifactFile `json:"artifacts"`
102	Error       string     `json:"error,omitempty"`
103}
104
105type LogEntry struct {
106	Timestamp time.Time `json:"timestamp"`
107	Level     string    `json:"level"`
108	Stage     string    `json:"stage"`
109	Message   string    `json:"message"`
110}
111
112type ArtifactFile struct {
113	Name string `json:"name"`
114	Path string `json:"path"`
115	Size int64  `json:"size"`
116	Type string `json:"type"`
117}
118
119type PipelineEngine struct {
120	pipelines      map[string]*Pipeline
121	executions     map[string]*PipelineExecution
122	workDir        string
123	dockerClient   DockerClient
124	gitClient      GitClient
125	storageClient  StorageClient
126	notifier       Notifier
127	monitor        Monitor
128	mutex          sync.RWMutex
129}
130
131type DockerClient interface {
132	BuildImage(ctx context.Context, dockerfilePath, tag string, buildArgs map[string]string) error
133	RunContainer(ctx context.Context, image string, config ContainerConfig)
134	StopContainer(ctx context.Context, containerID string) error
135	RemoveContainer(ctx context.Context, containerID string) error
136	GetContainerLogs(ctx context.Context, containerID string)
137	CopyFromContainer(ctx context.Context, containerID, srcPath, destPath string) error
138}
139
140type GitClient interface {
141	CloneRepository(ctx context.Context, repoURL, destPath string, branch string) error
142	GetLatestCommit(repoPath string)
143	CreateTag(repoPath, tag, message string) error
144	PushTags(repoPath string) error
145}
146
147type StorageClient interface {
148	UploadFile(ctx context.Context, bucket, key string, data []byte) error
149	DownloadFile(ctx context.Context, bucket, key string)
150	ListFiles(ctx context.Context, bucket, prefix string)
151	DeleteFile(ctx context.Context, bucket, key string) error
152}
153
154type Notifier interface {
155	SendNotification(ctx context.Context, notification Notification, execution *PipelineExecution) error
156}
157
158type Monitor interface {
159	RecordMetric(name string, value float64, tags map[string]string)
160	RecordEvent(event string, data map[string]interface{})
161	IncrementCounter(name string, tags map[string]string)
162}
163
164func NewPipelineEngine(workDir string, dockerClient DockerClient, gitClient GitClient, storageClient StorageClient, notifier Notifier, monitor Monitor) *PipelineEngine {
165	return &PipelineEngine{
166		pipelines:     make(map[string]*Pipeline),
167		executions:    make(map[string]*PipelineExecution),
168		workDir:       workDir,
169		dockerClient:  dockerClient,
170		gitClient:     gitClient,
171		storageClient: storageClient,
172		notifier:      notifier,
173		monitor:       monitor,
174	}
175}
176
177func LoadPipeline(configPath string) error {
178	data, err := os.ReadFile(configPath)
179	if err != nil {
180		return fmt.Errorf("failed to read pipeline config: %w", err)
181	}
182
183	var pipeline Pipeline
184	if err := yaml.Unmarshal(data, &pipeline); err != nil {
185		return fmt.Errorf("failed to parse pipeline config: %w", err)
186	}
187
188	pe.mutex.Lock()
189	pe.pipelines[pipeline.ID] = &pipeline
190	pe.mutex.Unlock()
191
192	log.Printf("Loaded pipeline: %s", pipeline.Name)
193	return nil
194}
195
196func ExecutePipeline(ctx context.Context, pipelineID string, trigger TriggerInfo) {
197	pe.mutex.RLock()
198	pipeline, exists := pe.pipelines[pipelineID]
199	pe.mutex.RUnlock()
200
201	if !exists {
202		return nil, fmt.Errorf("pipeline not found: %s", pipelineID)
203	}
204
205	execution := &PipelineExecution{
206		ID:          generateExecutionID(),
207		Pipeline:    pipeline,
208		Status:      "pending",
209		StartedAt:   time.Now(),
210		Commit:      trigger.Commit,
211		Branch:      trigger.Branch,
212		TriggeredBy: trigger.TriggeredBy,
213		Variables:   make(map[string]string),
214		Stages:      make([]StageExecution, len(pipeline.Stages)),
215		Logs:        make([]LogEntry, 0),
216		Artifacts:   make([]ArtifactFile, 0),
217	}
218
219	// Merge pipeline and trigger variables
220	for k, v := range pipeline.Variables {
221		execution.Variables[k] = v
222	}
223	for k, v := range trigger.Variables {
224		execution.Variables[k] = v
225	}
226
227	pe.mutex.Lock()
228	pe.executions[execution.ID] = execution
229	pe.mutex.Unlock()
230
231	// Start execution in goroutine
232	go pe.runPipeline(ctx, execution)
233
234	// Send notification
235	pe.notifier.SendNotification(ctx, Notification{
236		Type:   "pipeline_started",
237		Target: "default",
238	}, execution)
239
240	pe.monitor.RecordEvent("pipeline.started", map[string]interface{}{
241		"pipeline_id":   pipelineID,
242		"execution_id":  execution.ID,
243		"trigger":       trigger.TriggeredBy,
244		"branch":        trigger.Branch,
245	})
246
247	return execution, nil
248}
249
250func runPipeline(ctx context.Context, execution *PipelineExecution) {
251	defer func() {
252		completedAt := time.Now()
253		execution.CompletedAt = &completedAt
254
255		if execution.Status == "running" {
256			execution.Status = "completed"
257		}
258
259		// Send completion notification
260		pe.notifier.SendNotification(ctx, Notification{
261			Type:   "pipeline_completed",
262			Target: "default",
263		}, execution)
264
265		pe.monitor.RecordEvent("pipeline.completed", map[string]interface{}{
266			"execution_id": execution.ID,
267			"status":       execution.Status,
268			"duration":     completedAt.Sub(execution.StartedAt).String(),
269		})
270	}()
271
272	execution.Status = "running"
273
274	// Create workspace
275	workspace := filepath.Join(pe.workDir, execution.ID)
276	if err := os.MkdirAll(workspace, 0755); err != nil {
277		execution.Status = "failed"
278		execution.addLog("error", "", fmt.Sprintf("Failed to create workspace: %v", err))
279		return
280	}
281	defer os.RemoveAll(workspace)
282
283	// Clone repository
284	repoPath := filepath.Join(workspace, "repo")
285	if err := pe.gitClient.CloneRepository(ctx, execution.Pipeline.Trigger.Branches[0], repoPath, execution.Branch); err != nil {
286		execution.Status = "failed"
287		execution.addLog("error", "", fmt.Sprintf("Failed to clone repository: %v", err))
288		return
289	}
290
291	// Execute stages
292	for i, stage := range execution.Pipeline.Stages {
293		stageExec := &execution.Stages[i]
294		stageExec.Name = stage.Name
295		stageExec.Status = "pending"
296		stageExec.StartedAt = time.Now()
297
298		// Check dependencies
299		if !pe.checkStageDependencies(stage.DependsOn, execution.Stages) {
300			stageExec.Status = "skipped"
301			stageExec.CompletedAt = &[]time.Time{time.Now()}[0]
302			continue
303		}
304
305		// Execute stage
306		if err := pe.executeStage(ctx, stage, stageExec, execution, repoPath); err != nil {
307			stageExec.Status = "failed"
308			stageExec.Error = err.Error()
309			execution.Status = "failed"
310			execution.addLog("error", stage.Name, fmt.Sprintf("Stage failed: %v", err))
311			return
312		}
313
314		stageExec.Status = "completed"
315		completedAt := time.Now()
316		stageExec.CompletedAt = &completedAt
317		stageExec.Duration = completedAt.Sub(stageExec.StartedAt).String()
318
319		execution.addLog("info", stage.Name, fmt.Sprintf("Stage completed in %s", stageExec.Duration))
320	}
321
322	if execution.Status == "running" {
323		execution.Status = "completed"
324	}
325
326	execution.addLog("info", "", fmt.Sprintf("Pipeline %s", execution.Status))
327}
328
329func executeStage(ctx context.Context, stage Stage, stageExec *StageExecution, execution *PipelineExecution, repoPath string) error {
330	stageExec.Status = "running"
331
332	// Prepare environment variables
333	envVars := make(map[string]string)
334	for k, v := range execution.Variables {
335		envVars[k] = v
336	}
337	for k, v := range stage.Variables {
338		envVars[k] = v
339	}
340
341	// Add default environment variables
342	envVars["CI"] = "true"
343	envVars["PIPELINE_ID"] = execution.ID
344	envVars["STAGE_NAME"] = stage.Name
345	envVars["WORKSPACE"] = repoPath
346
347	// Create Docker image if specified
348	image := stage.Image
349	if image == "" {
350		image = execution.Pipeline.Environment.Image
351	}
352
353	if image == "" {
354		// Build from Dockerfile
355		dockerfilePath := filepath.Join(repoPath, "Dockerfile")
356		imageTag := fmt.Sprintf("pipeline-%s-%s", execution.ID, stage.Name)
357
358		if err := pe.dockerClient.BuildImage(ctx, dockerfilePath, imageTag, map[string]string{
359			"PIPELINE_ID": execution.ID,
360			"STAGE_NAME":  stage.Name,
361		}); err != nil {
362			return fmt.Errorf("failed to build Docker image: %w", err)
363		}
364		image = imageTag
365	}
366
367	// Prepare container configuration
368	containerConfig := ContainerConfig{
369		Image:     image,
370		Env:       envVars,
371		WorkingDir: repoPath,
372		Mounts: []Mount{
373			{Source: repoPath, Destination: "/workspace"},
374		},
375		Network: "pipeline-network",
376	}
377
378	// Run container
379	containerID, err := pe.dockerClient.RunContainer(ctx, image, containerConfig)
380	if err != nil {
381		return fmt.Errorf("failed to run container: %w", err)
382	}
383	defer pe.dockerClient.RemoveContainer(ctx, containerID)
384
385	// Execute scripts
386	for _, script := range stage.Script {
387		if err := pe.executeScript(ctx, containerID, script, stageExec); err != nil {
388			return fmt.Errorf("script execution failed: %w", err)
389		}
390	}
391
392	// Collect artifacts
393	for _, artifact := range stage.Artifacts {
394		if err := pe.collectArtifact(ctx, containerID, artifact, stageExec, execution); err != nil {
395			execution.addLog("warning", stage.Name, fmt.Sprintf("Failed to collect artifact %s: %v", artifact.Path, err))
396		}
397	}
398
399	return nil
400}
401
402func executeScript(ctx context.Context, containerID, script string, stageExec *StageExecution) error {
403	// This would typically use docker exec to run the script
404	// For demonstration, we'll simulate script execution
405	stageExec.Logs += fmt.Sprintf("$ %s\n", script)
406
407	// Simulate execution time
408	time.Sleep(100 * time.Millisecond)
409
410	// Simulate success
411	stageExec.Logs += "Script executed successfully\n"
412
413	return nil
414}
415
416func collectArtifact(ctx context.Context, containerID string, artifact Artifact, stageExec *StageExecution, execution *PipelineExecution) error {
417	// Create artifact directory
418	artifactDir := filepath.Join(pe.workDir, execution.ID, "artifacts", stageExec.Name)
419	if err := os.MkdirAll(artifactDir, 0755); err != nil {
420		return err
421	}
422
423	// Copy from container
424	localPath := filepath.Join(artifactDir, filepath.Base(artifact.Path))
425	if err := pe.dockerClient.CopyFromContainer(ctx, containerID, artifact.Path, localPath); err != nil {
426		return err
427	}
428
429	// Get file info
430	info, err := os.Stat(localPath)
431	if err != nil {
432		return err
433	}
434
435	// Upload to storage
436	data, err := os.ReadFile(localPath)
437	if err != nil {
438		return err
439	}
440
441	storageKey := fmt.Sprintf("pipelines/%s/%s/%s", execution.ID, stageExec.Name, filepath.Base(artifact.Path))
442	if err := pe.storageClient.UploadFile(ctx, "artifacts", storageKey, data); err != nil {
443		return err
444	}
445
446	// Record artifact
447	artifactFile := ArtifactFile{
448		Name: filepath.Base(artifact.Path),
449		Path: storageKey,
450		Size: info.Size(),
451		Type: artifact.Type,
452	}
453
454	stageExec.Artifacts = append(stageExec.Artifacts, artifactFile)
455	execution.Artifacts = append(execution.Artifacts, artifactFile)
456
457	return nil
458}
459
460func checkStageDependencies(dependencies []string, stages []StageExecution) bool {
461	for _, dep := range dependencies {
462		found := false
463		for _, stage := range stages {
464			if stage.Name == dep {
465				found = true
466				if stage.Status != "completed" {
467					return false
468				}
469				break
470			}
471		}
472		if !found {
473			return false
474		}
475	}
476	return true
477}
478
479func GetExecution(executionID string) {
480	pe.mutex.RLock()
481	defer pe.mutex.RUnlock()
482
483	execution, exists := pe.executions[executionID]
484	if !exists {
485		return nil, fmt.Errorf("execution not found: %s", executionID)
486	}
487
488	return execution, nil
489}
490
491func ListExecutions(pipelineID string) {
492	pe.mutex.RLock()
493	defer pe.mutex.RUnlock()
494
495	var executions []*PipelineExecution
496	for _, execution := range pe.executions {
497		if pipelineID == "" || execution.Pipeline.ID == pipelineID {
498			executions = append(executions, execution)
499		}
500	}
501
502	return executions, nil
503}
504
505func addLog(level, stage, message string) {
506	entry := LogEntry{
507		Timestamp: time.Now(),
508		Level:     level,
509		Stage:     stage,
510		Message:   message,
511	}
512	execution.Logs = append(execution.Logs, entry)
513}
514
515type TriggerInfo struct {
516	Commit      string            `json:"commit"`
517	Branch      string            `json:"branch"`
518	TriggeredBy string            `json:"triggered_by"`
519	Variables   map[string]string `json:"variables"`
520}
521
522type ContainerConfig struct {
523	Image      string            `json:"image"`
524	Env        map[string]string `json:"env"`
525	WorkingDir string            `json:"working_dir"`
526	Mounts     []Mount           `json:"mounts"`
527	Network    string            `json:"network"`
528}
529
530type Mount struct {
531	Source      string `json:"source"`
532	Destination string `json:"destination"`
533}
534
535func generateExecutionID() string {
536	return fmt.Sprintf("exec-%d", time.Now().UnixNano())
537}

Task 2: Implement Monitoring System

Build comprehensive monitoring with Prometheus:

  1// pkg/monitoring/prometheus.go
  2package monitoring
  3
  4import (
  5	"context"
  6	"fmt"
  7	"log"
  8	"net/http"
  9	"time"
 10
 11	"github.com/prometheus/client_golang/api"
 12	v1 "github.com/prometheus/client_golang/api/prometheus/v1"
 13	"github.com/prometheus/client_golang/prometheus"
 14	"github.com/prometheus/client_golang/prometheus/promhttp"
 15)
 16
 17type PrometheusMonitor struct {
 18	client   v1.API
 19	registry *prometheus.Registry
 20
 21	// Custom metrics
 22	buildDuration   *prometheus.HistogramVec
 23	buildSuccess    *prometheus.CounterVec
 24	buildTotal      *prometheus.CounterVec
 25	deploymentTotal *prometheus.CounterVec
 26	testDuration    *prometheus.HistogramVec
 27	testPassRate    *prometheus.GaugeVec
 28
 29	// System metrics
 30	cpuUsage    *prometheus.GaugeVec
 31	memoryUsage *prometheus.GaugeVec
 32	diskUsage   *prometheus.GaugeVec
 33	networkIO   *prometheus.CounterVec
 34}
 35
 36type MonitoringConfig struct {
 37	PrometheusURL string `yaml:"prometheus_url"`
 38	ListenPort    int    `yaml:"listen_port"`
 39	MetricsPath   string `yaml:"metrics_path"`
 40}
 41
 42func NewPrometheusMonitor(config MonitoringConfig) {
 43	// Create Prometheus client
 44	client, err := api.NewClient(api.Config{
 45		Address: config.PrometheusURL,
 46	})
 47	if err != nil {
 48		return nil, fmt.Errorf("failed to create Prometheus client: %w", err)
 49	}
 50
 51	v1api := v1.NewAPI(client)
 52
 53	// Create registry
 54	registry := prometheus.NewRegistry()
 55
 56	monitor := &PrometheusMonitor{
 57		client:   v1api,
 58		registry: registry,
 59
 60		buildDuration: prometheus.NewHistogramVec(
 61			prometheus.HistogramOpts{
 62				Name:    "pipeline_build_duration_seconds",
 63				Help:    "Duration of pipeline builds",
 64				Buckets: prometheus.DefBuckets,
 65			},
 66			[]string{"pipeline", "stage", "status"},
 67		),
 68
 69		buildSuccess: prometheus.NewCounterVec(
 70			prometheus.CounterOpts{
 71				Name: "pipeline_build_success_total",
 72				Help: "Total number of successful pipeline builds",
 73			},
 74			[]string{"pipeline", "branch"},
 75		),
 76
 77		buildTotal: prometheus.NewCounterVec(
 78			prometheus.CounterOpts{
 79				Name: "pipeline_build_total",
 80				Help: "Total number of pipeline builds",
 81			},
 82			[]string{"pipeline", "branch", "status"},
 83		),
 84
 85		deploymentTotal: prometheus.NewCounterVec(
 86			prometheus.CounterOpts{
 87				Name: "deployment_total",
 88				Help: "Total number of deployments",
 89			},
 90			[]string{"service", "environment", "status"},
 91		),
 92
 93		testDuration: prometheus.NewHistogramVec(
 94			prometheus.HistogramOpts{
 95				Name:    "test_duration_seconds",
 96				Help:    "Duration of test execution",
 97				Buckets: prometheus.DefBuckets,
 98			},
 99			[]string{"test_type", "package"},
100		),
101
102		testPassRate: prometheus.NewGaugeVec(
103			prometheus.GaugeOpts{
104				Name: "test_pass_rate",
105				Help: "Test pass rate percentage",
106			},
107			[]string{"test_type", "package"},
108		),
109
110		cpuUsage: prometheus.NewGaugeVec(
111			prometheus.GaugeOpts{
112				Name: "system_cpu_usage_percent",
113				Help: "CPU usage percentage",
114			},
115			[]string{"instance", "service"},
116		),
117
118		memoryUsage: prometheus.NewGaugeVec(
119			prometheus.GaugeOpts{
120				Name: "system_memory_usage_bytes",
121				Help: "Memory usage in bytes",
122			},
123			[]string{"instance", "service"},
124		),
125
126		diskUsage: prometheus.NewGaugeVec(
127			prometheus.GaugeOpts{
128				Name: "system_disk_usage_bytes",
129				Help: "Disk usage in bytes",
130			},
131			[]string{"instance", "mountpoint"},
132		),
133
134		networkIO: prometheus.NewCounterVec(
135			prometheus.CounterOpts{
136				Name: "system_network_io_bytes_total",
137				Help: "Network I/O in bytes",
138			},
139			[]string{"instance", "interface", "direction"},
140		),
141	}
142
143	// Register metrics
144	registry.MustRegister(
145		monitor.buildDuration,
146		monitor.buildSuccess,
147		monitor.buildTotal,
148		monitor.deploymentTotal,
149		monitor.testDuration,
150		monitor.testPassRate,
151		monitor.cpuUsage,
152		monitor.memoryUsage,
153		monitor.diskUsage,
154		monitor.networkIO,
155	)
156
157	return monitor, nil
158}
159
160func RecordBuildStart(pipeline, branch string) {
161	pm.buildTotal.WithLabelValues(pipeline, branch, "started").Inc()
162}
163
164func RecordBuildComplete(pipeline, branch, stage string, duration time.Duration, success bool) {
165	status := "success"
166	if !success {
167		status = "failed"
168	}
169
170	pm.buildDuration.WithLabelValues(pipeline, stage, status).Observe(duration.Seconds())
171	pm.buildTotal.WithLabelValues(pipeline, branch, status).Inc()
172
173	if success {
174		pm.buildSuccess.WithLabelValues(pipeline, branch).Inc()
175	}
176}
177
178func RecordDeployment(service, environment, status string) {
179	pm.deploymentTotal.WithLabelValues(service, environment, status).Inc()
180}
181
182func RecordTestExecution(testType, pkg string, duration time.Duration, passed, total int) {
183	pm.testDuration.WithLabelValues(testType, pkg).Observe(duration.Seconds())
184
185	if total > 0 {
186		passRate := float64(passed) / float64(total) * 100
187		pm.testPassRate.WithLabelValues(testType, pkg).Set(passRate)
188	}
189}
190
191func UpdateSystemMetrics(instance, service string, cpu float64, memory, disk uint64) {
192	pm.cpuUsage.WithLabelValues(instance, service).Set(cpu)
193	pm.memoryUsage.WithLabelValues(instance, service).Set(float64(memory))
194}
195
196func RecordNetworkIO(instance, iface, direction string, bytes uint64) {
197	pm.networkIO.WithLabelValues(instance, iface, direction).Add(float64(bytes))
198}
199
200func QueryMetrics(ctx context.Context, query string) {
201	result, warnings, err := pm.client.Query(ctx, query, time.Now())
202	if err != nil {
203		return nil, fmt.Errorf("error querying Prometheus: %w", err)
204	}
205
206	if len(warnings) > 0 {
207		log.Printf("Prometheus query warnings: %v", warnings)
208	}
209
210	return result, nil
211}
212
213func StartMetricsServer(listenAddr string) error {
214	mux := http.NewServeMux()
215	mux.Handle(pm.metricsPath, promhttp.HandlerFor(pm.registry, promhttp.HandlerOpts{}))
216
217	server := &http.Server{
218		Addr:    listenAddr,
219		Handler: mux,
220	}
221
222	log.Printf("Starting metrics server on %s", listenAddr)
223	return server.ListenAndServe()
224}
225
226func GetSystemHealth(ctx context.Context) {
227	health := make(map[string]interface{})
228
229	// Query system metrics
230	queries := map[string]string{
231		"cpu_usage":    "avg by) * 100))",
232		"memory_usage": "avg by) * 100)",
233		"disk_usage":   "avg by) * 100)",
234	}
235
236	for name, query := range queries {
237		result, err := pm.QueryMetrics(ctx, query)
238		if err != nil {
239			health[name] = map[string]interface{}{
240				"error": err.Error(),
241			}
242			continue
243		}
244
245		health[name] = result
246	}
247
248	return health, nil
249}
250
251// Alerting system
252type AlertManager struct {
253	rules    []AlertRule
254	notifier Notifier
255}
256
257type AlertRule struct {
258	Name        string            `yaml:"name"`
259	Query       string            `yaml:"query"`
260	Condition   string            `yaml:"condition"`
261	Duration    time.Duration     `yaml:"duration"`
262	Severity    string            `yaml:"severity"`
263	Annotations map[string]string `yaml:"annotations"`
264	Labels      map[string]string `yaml:"labels"`
265}
266
267type Alert struct {
268	RuleName    string            `json:"rule_name"`
269	State       string            `json:"state"`
270	Severity    string            `json:"severity"`
271	Annotations map[string]string `json:"annotations"`
272	Labels      map[string]string `json:"labels"`
273	Timestamp   time.Time         `json:"timestamp"`
274}
275
276func NewAlertManager(notifier Notifier) *AlertManager {
277	return &AlertManager{
278		rules:    make([]AlertRule, 0),
279		notifier: notifier,
280	}
281}
282
283func AddRule(rule AlertRule) {
284	am.rules = append(am.rules, rule)
285}
286
287func EvaluateRules(ctx context.Context, monitor *PrometheusMonitor) []Alert {
288	alerts := make([]Alert, 0)
289
290	for _, rule := range am.rules {
291		result, err := monitor.QueryMetrics(ctx, rule.Query)
292		if err != nil {
293			log.Printf("Failed to evaluate alert rule %s: %v", rule.Name, err)
294			continue
295		}
296
297		// Evaluate condition
298		if am.evaluateCondition(result, rule.Condition) {
299			alert := Alert{
300				RuleName:    rule.Name,
301				State:       "firing",
302				Severity:    rule.Severity,
303				Annotations: rule.Annotations,
304				Labels:      rule.Labels,
305				Timestamp:   time.Now(),
306			}
307
308			alerts = append(alerts, alert)
309
310			// Send notification
311			am.notifier.SendAlert(ctx, alert)
312		}
313	}
314
315	return alerts
316}
317
318func evaluateCondition(result interface{}, condition string) bool {
319	// Simplified condition evaluation
320	// In real implementation, this would parse and evaluate the condition
321	return true
322}

Task 3: Build Logging Infrastructure

Create structured logging and log aggregation:

  1// pkg/logging/infrastructure.go
  2package logging
  3
  4import (
  5	"bytes"
  6	"context"
  7	"encoding/json"
  8	"fmt"
  9	"log"
 10	"os"
 11	"time"
 12
 13	"github.com/elastic/go-elasticsearch/v8"
 14	"github.com/sirupsen/logrus"
 15	"gopkg.in/natefinch/lumberjack.v2"
 16)
 17
 18type LogLevel string
 19
 20const (
 21	DebugLevel LogLevel = "debug"
 22	InfoLevel  LogLevel = "info"
 23	WarnLevel  LogLevel = "warn"
 24	ErrorLevel LogLevel = "error"
 25	FatalLevel LogLevel = "fatal"
 26)
 27
 28type LogEntry struct {
 29	Timestamp   time.Time              `json:"timestamp"`
 30	Level       LogLevel               `json:"level"`
 31	Message     string                 `json:"message"`
 32	Service     string                 `json:"service"`
 33	Component   string                 `json:"component"`
 34	TraceID     string                 `json:"trace_id,omitempty"`
 35	SpanID      string                 `json:"span_id,omitempty"`
 36	UserID      string                 `json:"user_id,omitempty"`
 37	RequestID   string                 `json:"request_id,omitempty"`
 38	Fields      map[string]interface{} `json:"fields,omitempty"`
 39	Error       string                 `json:"error,omitempty"`
 40	Stack       string                 `json:"stack,omitempty"`
 41	Duration    string                 `json:"duration,omitempty"`
 42	Source      string                 `json:"source"`
 43	Environment string                 `json:"environment"`
 44	Version     string                 `json:"version"`
 45}
 46
 47type LoggingConfig struct {
 48	Level         LogLevel `yaml:"level"`
 49	Format        string   `yaml:"format"` // json, text
 50	Service       string   `yaml:"service"`
 51	Environment   string   `yaml:"environment"`
 52	Version       string   `yaml:"version"`
 53	Output        string   `yaml:"output"` // stdout, file, elasticsearch
 54	FilePath      string   `yaml:"file_path"`
 55	MaxSize       int      `yaml:"max_size"`
 56	MaxBackups    int      `yaml:"max_backups"`
 57	MaxAge        int      `yaml:"max_age"`
 58	Elasticsearch  struct {
 59		URLs      []string `yaml:"urls"`
 60		Index     string   `yaml:"index"`
 61		Username  string   `yaml:"username"`
 62		Password  string   `yaml:"password"`
 63	} `yaml:"elasticsearch"`
 64}
 65
 66type LoggingInfrastructure struct {
 67	config     LoggingConfig
 68	logger     *logrus.Logger
 69	elastic    *elasticsearch.Client
 70	fileLogger *lumberjack.Logger
 71}
 72
 73func NewLoggingInfrastructure(config LoggingConfig) {
 74	// Create logrus logger
 75	logger := logrus.New()
 76
 77	// Set log level
 78	level, err := logrus.ParseLevel(string(config.Level))
 79	if err != nil {
 80		return nil, fmt.Errorf("invalid log level: %w", err)
 81	}
 82	logger.SetLevel(level)
 83
 84	// Set formatter
 85	if config.Format == "json" {
 86		logger.SetFormatter(&logrus.JSONFormatter{
 87			TimestampFormat: time.RFC3339,
 88		})
 89	} else {
 90		logger.SetFormatter(&logrus.TextFormatter{
 91			FullTimestamp:   true,
 92			TimestampFormat: time.RFC3339,
 93		})
 94	}
 95
 96	infra := &LoggingInfrastructure{
 97		config: config,
 98		logger: logger,
 99	}
100
101	// Setup output
102	if err := infra.setupOutput(); err != nil {
103		return nil, fmt.Errorf("failed to setup logging output: %w", err)
104	}
105
106	return infra, nil
107}
108
109func setupOutput() error {
110	switch li.config.Output {
111	case "stdout":
112		li.logger.SetOutput(os.Stdout)
113	case "file":
114		li.fileLogger = &lumberjack.Logger{
115			Filename:   li.config.FilePath,
116			MaxSize:    li.config.MaxSize,
117			MaxBackups: li.config.MaxBackups,
118			MaxAge:     li.config.MaxAge,
119			Compress:   true,
120		}
121		li.logger.SetOutput(li.fileLogger)
122	case "elasticsearch":
123		if err := li.setupElasticsearch(); err != nil {
124			return fmt.Errorf("failed to setup Elasticsearch: %w", err)
125		}
126		// Still log to stdout as fallback
127		li.logger.SetOutput(os.Stdout)
128	default:
129		return fmt.Errorf("unsupported output type: %s", li.config.Output)
130	}
131
132	return nil
133}
134
135func setupElasticsearch() error {
136	cfg := elasticsearch.Config{
137		Addresses: li.config.Elasticsearch.URLs,
138	}
139
140	if li.config.Elasticsearch.Username != "" && li.config.Elasticsearch.Password != "" {
141		cfg.Username = li.config.Elasticsearch.Username
142		cfg.Password = li.config.Elasticsearch.Password
143	}
144
145	client, err := elasticsearch.NewClient(cfg)
146	if err != nil {
147		return fmt.Errorf("failed to create Elasticsearch client: %w", err)
148	}
149
150	// Test connection
151	res, err := client.Info()
152	if err != nil {
153		return fmt.Errorf("failed to connect to Elasticsearch: %w", err)
154	}
155	res.Body.Close()
156
157	li.elastic = client
158	return nil
159}
160
161func Log(entry LogEntry) error {
162	// Set default fields
163	if entry.Service == "" {
164		entry.Service = li.config.Service
165	}
166	if entry.Environment == "" {
167		entry.Environment = li.config.Environment
168	}
169	if entry.Version == "" {
170		entry.Version = li.config.Version
171	}
172	if entry.Timestamp.IsZero() {
173		entry.Timestamp = time.Now()
174	}
175
176	// Log to logrus
177	fields := logrus.Fields{
178		"service":     entry.Service,
179		"component":   entry.Component,
180		"environment": entry.Environment,
181		"version":     entry.Version,
182	}
183
184	if entry.TraceID != "" {
185		fields["trace_id"] = entry.TraceID
186	}
187	if entry.SpanID != "" {
188		fields["span_id"] = entry.SpanID
189	}
190	if entry.UserID != "" {
191		fields["user_id"] = entry.UserID
192	}
193	if entry.RequestID != "" {
194		fields["request_id"] = entry.RequestID
195	}
196	if entry.Duration != "" {
197		fields["duration"] = entry.Duration
198	}
199	if entry.Error != "" {
200		fields["error"] = entry.Error
201	}
202	if entry.Stack != "" {
203		fields["stack"] = entry.Stack
204	}
205
206	// Add custom fields
207	for k, v := range entry.Fields {
208		fields[k] = v
209	}
210
211	level, _ := logrus.ParseLevel(string(entry.Level))
212	li.logger.WithFields(fields).Log(level, entry.Message)
213
214	// Send to Elasticsearch if configured
215	if li.elastic != nil {
216		go li.sendToElasticsearch(entry)
217	}
218
219	return nil
220}
221
222func sendToElasticsearch(entry LogEntry) {
223	if li.elastic == nil {
224		return
225	}
226
227	// Create index name with date pattern
228	indexName := fmt.Sprintf("%s-%s", li.config.Elasticsearch.Index, time.Now().Format("2006.01.02"))
229
230	// Convert to JSON
231	data, err := json.Marshal(entry)
232	if err != nil {
233		log.Printf("Failed to marshal log entry: %v", err)
234		return
235	}
236
237	// Index document
238	req := elasticsearch.IndexRequest{
239		Index:   indexName,
240		Body:    bytes.NewReader(data),
241		Refresh: "false",
242	}
243
244	res, err := req.Do(context.Background(), li.elastic)
245	if err != nil {
246		log.Printf("Failed to index log entry: %v", err)
247		return
248	}
249	defer res.Body.Close()
250
251	if res.IsError() {
252		log.Printf("Elasticsearch indexing error: %s", res.Status())
253	}
254}
255
256func CreateLogger(component string) *Logger {
257	return &Logger{
258		infra:    li,
259		component: component,
260	}
261}
262
263func QueryLogs(ctx context.Context, query string, limit int) {
264	if li.elastic == nil {
265		return nil, fmt.Errorf("Elasticsearch not configured")
266	}
267
268	// Build Elasticsearch query
269	esQuery := map[string]interface{}{
270		"query": map[string]interface{}{
271			"bool": map[string]interface{}{
272				"must": []map[string]interface{}{
273					{
274						"query_string": map[string]interface{}{
275							"query": query,
276						},
277					},
278				},
279				"filter": []map[string]interface{}{
280					{
281						"term": map[string]interface{}{
282							"service": li.config.Service,
283						},
284					},
285				},
286			},
287			"sort": []map[string]interface{}{
288				{
289					"timestamp": map[string]interface{}{
290						"order": "desc",
291					},
292				},
293			},
294			"size": limit,
295		}
296
297	// Execute search
298	var buf bytes.Buffer
299	if err := json.NewEncoder(&buf).Encode(esQuery); err != nil {
300		return nil, fmt.Errorf("failed to encode search query: %w", err)
301	}
302
303	indexPattern := fmt.Sprintf("%s-*", li.config.Elasticsearch.Index)
304	res, err := li.elastic.Search(
305		li.elastic.Search.WithContext(ctx),
306		li.elastic.Search.WithIndex(indexPattern),
307		li.elastic.Search.WithBody(&buf),
308	)
309	if err != nil {
310		return nil, fmt.Errorf("failed to search logs: %w", err)
311	}
312	defer res.Body.Close()
313
314	if res.IsError() {
315		return nil, fmt.Errorf("Elasticsearch search error: %s", res.Status())
316	}
317
318	// Parse results
319	var searchResult map[string]interface{}
320	if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
321		return nil, fmt.Errorf("failed to decode search results: %w", err)
322	}
323
324	hits, ok := searchResult["hits"].(map[string]interface{})
325	if !ok {
326		return []LogEntry{}, nil
327	}
328
329	hitsList, ok := hits["hits"].([]interface{})
330	if !ok {
331		return []LogEntry{}, nil
332	}
333
334	logs := make([]LogEntry, 0, len(hitsList))
335	for _, hit := range hitsList {
336		hitMap, ok := hit.(map[string]interface{})
337		if !ok {
338			continue
339		}
340
341		source, ok := hitMap["_source"].(map[string]interface{})
342		if !ok {
343			continue
344		}
345
346		// Convert to LogEntry
347		data, err := json.Marshal(source)
348		if err != nil {
349			continue
350		}
351
352		var entry LogEntry
353		if err := json.Unmarshal(data, &entry); err != nil {
354			continue
355		}
356
357		logs = append(logs, entry)
358	}
359
360	return logs, nil
361}
362
363type Logger struct {
364	infra     *LoggingInfrastructure
365	component string
366	fields    map[string]interface{}
367}
368
369func WithField(key string, value interface{}) *Logger {
370	newLogger := &Logger{
371		infra:     l.infra,
372		component: l.component,
373		fields:    make(map[string]interface{}),
374	}
375
376	// Copy existing fields
377	for k, v := range l.fields {
378		newLogger.fields[k] = v
379	}
380
381	// Add new field
382	newLogger.fields[key] = value
383	return newLogger
384}
385
386func WithFields(fields map[string]interface{}) *Logger {
387	newLogger := &Logger{
388		infra:     l.infra,
389		component: l.component,
390		fields:    make(map[string]interface{}),
391	}
392
393	// Copy existing fields
394	for k, v := range l.fields {
395		newLogger.fields[k] = v
396	}
397
398	// Add new fields
399	for k, v := range fields {
400		newLogger.fields[k] = v
401	}
402
403	return newLogger
404}
405
406func WithError(err error) *Logger {
407	return l.WithField("error", err.Error())
408}
409
410func Debug(message string) {
411	l.log(DebugLevel, message)
412}
413
414func Info(message string) {
415	l.log(InfoLevel, message)
416}
417
418func Warn(message string) {
419	l.log(WarnLevel, message)
420}
421
422func Error(message string) {
423	l.log(ErrorLevel, message)
424}
425
426func Fatal(message string) {
427	l.log(FatalLevel, message)
428	os.Exit(1)
429}
430
431func log(level LogLevel, message string) {
432	entry := LogEntry{
433		Level:     level,
434		Message:   message,
435		Component: l.component,
436		Fields:    l.fields,
437	}
438
439	l.infra.Log(entry)
440}

Solution Approach

Click to see detailed solution

This would include complete implementations of all DevOps toolchain components including deployment automation, configuration management, health checks, and quality gates.

Testing Your DevOps Platform

1. Test CI/CD Pipeline

 1# Build the platform
 2go build -o devops-platform ./cmd/devops-platform
 3
 4# Create a test pipeline configuration
 5cat > test-pipeline.yaml << EOF
 6id: test-app
 7name: Test Application CI/CD
 8trigger:
 9  type: webhook
10  branches: [main, develop]
11environment:
12  name: test
13  image: golang:1.21-alpine
14  variables:
15    GO_VERSION: "1.21"
16stages:
17  - name: test
18    script:
19      - go mod download
20      - go test -v ./...
21    artifacts:
22      - path: coverage.out
23        type: file
24  - name: build
25    script:
26      - go build -o app ./cmd/app
27    depends_on: [test]
28  - name: deploy
29    script:
30      - echo "Deploying to staging"
31    depends_on: [build]
32EOF
33
34# Execute pipeline
35./devops-platform pipeline execute --config test-pipeline.yaml

2. Test Monitoring

 1# Check metrics endpoint
 2curl http://localhost:9090/metrics
 3
 4# Query metrics
 5curl "http://localhost:9091/api/v1/query?query=pipeline_build_total"
 6
 7# Test alerts
 8curl -X POST http://localhost:9093/api/v1/alerts \
 9  -H "Content-Type: application/json" \
10  -d '[{
11    "labels": {
12      "alertname": "HighFailureRate",
13      "severity": "critical"
14    }
15  }]'

3. Test Logging

 1# Generate test logs
 2curl -X POST http://localhost:8080/api/logs \
 3  -H "Content-Type: application/json" \
 4  -d '{
 5    "level": "info",
 6    "message": "Test log message",
 7    "service": "test-service",
 8    "component": "test-component"
 9  }'
10
11# Query logs
12curl "http://localhost:8080/api/logs?query=service:test-service&limit=10"

Extension Challenges

  1. Add multi-cloud support - Deploy to AWS, GCP, Azure
  2. Add GitOps support - Implement ArgoCD/Flux integration
  3. Add security scanning - Integrate vulnerability scanning
  4. Add cost monitoring - Track cloud spending
  5. Add compliance checking - Automated compliance validation

Key Takeaways

  • CI/CD pipelines automate the build, test, and deployment process
  • Monitoring provides visibility into system health and performance
  • Logging enables debugging and auditing of system behavior
  • Infrastructure as code ensures reproducible environments
  • Automation reduces manual errors and improves consistency
  • Quality gates maintain high standards across deployments
  • Observability is crucial for operating complex systems

This exercise demonstrates how to build a comprehensive DevOps toolchain using Go, covering the complete lifecycle of modern software development and operations.