DevOps Toolchain with Go
Exercise Overview
Build a comprehensive DevOps toolchain for Go applications. You'll create CI/CD pipelines, monitoring systems, logging infrastructure, and automation tools using Go's excellent DevOps ecosystem.
Learning Objectives
- Design and implement CI/CD pipelines for Go applications
- Build monitoring and alerting systems with Prometheus
- Create structured logging and log aggregation
- Implement infrastructure as code with Go tools
- Build deployment automation and rollback mechanisms
- Add configuration management and secret handling
- Create health checks and readiness probes
- Implement automated testing and quality gates
The System - DevOps Platform
You'll build a DevOps platform that manages the complete lifecycle of Go applications from development to production.
Initial Structure
1// cmd/devops-platform/main.go
2package main
3
4import (
5 "fmt"
6 "log"
7 "os"
8)
9
10// TODO: Implement CI/CD pipeline engine
11type PipelineEngine struct {
12 // Add pipeline engine implementation
13}
14
15// TODO: Implement monitoring system
16type MonitoringSystem struct {
17 // Add monitoring system implementation
18}
19
20// TODO: Implement logging infrastructure
21type LoggingInfrastructure struct {
22 // Add logging infrastructure implementation
23}
24
25// TODO: Implement deployment automation
26type DeploymentAutomation struct {
27 // Add deployment automation implementation
28}
29
30func main() {
31 // TODO: Build complete DevOps platform
32}
Tasks
Task 1: Build CI/CD Pipeline Engine
Create a flexible CI/CD pipeline system:
1// pkg/pipeline/engine.go
2package pipeline
3
4import (
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "os"
10 "path/filepath"
11 "sync"
12 "time"
13
14 "github.com/go-git/go-git/v5"
15 "github.com/go-git/go-git/v5/plumbing"
16 "gopkg.in/yaml.v3"
17)
18
19type Pipeline struct {
20 ID string `yaml:"id"`
21 Name string `yaml:"name"`
22 Trigger Trigger `yaml:"trigger"`
23 Environment Environment `yaml:"environment"`
24 Stages []Stage `yaml:"stages"`
25 Notifications []Notification `yaml:"notifications"`
26 Variables map[string]string `yaml:"variables"`
27}
28
29type Trigger struct {
30 Type string `yaml:"type"` // webhook, schedule, manual
31 Branches []string `yaml:"branches"`
32 Schedule string `yaml:"schedule"`
33 Events []string `yaml:"events"`
34}
35
36type Environment struct {
37 Name string `yaml:"name"`
38 Image string `yaml:"image"`
39 Variables map[string]string `yaml:"variables"`
40 Services []Service `yaml:"services"`
41 CachePaths []string `yaml:"cache_paths"`
42 Timeout time.Duration `yaml:"timeout"`
43}
44
45type Service struct {
46 Name string `yaml:"name"`
47 Image string `yaml:"image"`
48 Ports []string `yaml:"ports"`
49 Env map[string]string `yaml:"env"`
50 Command []string `yaml:"command"`
51}
52
53type Stage struct {
54 Name string `yaml:"name"`
55 Image string `yaml:"image"`
56 Script []string `yaml:"script"`
57 Artifacts []Artifact `yaml:"artifacts"`
58 DependsOn []string `yaml:"depends_on"`
59 Condition string `yaml:"condition"`
60 Variables map[string]string `yaml:"variables"`
61 Timeout time.Duration `yaml:"timeout"`
62 Retry int `yaml:"retry"`
63 Parallel bool `yaml:"parallel"`
64}
65
66type Artifact struct {
67 Path string `yaml:"path"`
68 Type string `yaml:"type"` // file, directory, report
69 Expire string `yaml:"expire"`
70}
71
72type Notification struct {
73 Type string `yaml:"type"` // slack, email, webhook
74 Target string `yaml:"target"`
75 Events []string `yaml:"events"`
76 Config map[string]string `yaml:"config"`
77}
78
79type PipelineExecution struct {
80 ID string `json:"id"`
81 Pipeline *Pipeline `json:"pipeline"`
82 Status string `json:"status"`
83 StartedAt time.Time `json:"started_at"`
84 CompletedAt *time.Time `json:"completed_at"`
85 Commit string `json:"commit"`
86 Branch string `json:"branch"`
87 TriggeredBy string `json:"triggered_by"`
88 Variables map[string]string `json:"variables"`
89 Stages []StageExecution `json:"stages"`
90 Logs []LogEntry `json:"logs"`
91 Artifacts []ArtifactFile `json:"artifacts"`
92}
93
94type StageExecution struct {
95 Name string `json:"name"`
96 Status string `json:"status"`
97 StartedAt time.Time `json:"started_at"`
98 CompletedAt *time.Time `json:"completed_at"`
99 Duration string `json:"duration"`
100 Logs string `json:"logs"`
101 Artifacts []ArtifactFile `json:"artifacts"`
102 Error string `json:"error,omitempty"`
103}
104
105type LogEntry struct {
106 Timestamp time.Time `json:"timestamp"`
107 Level string `json:"level"`
108 Stage string `json:"stage"`
109 Message string `json:"message"`
110}
111
112type ArtifactFile struct {
113 Name string `json:"name"`
114 Path string `json:"path"`
115 Size int64 `json:"size"`
116 Type string `json:"type"`
117}
118
119type PipelineEngine struct {
120 pipelines map[string]*Pipeline
121 executions map[string]*PipelineExecution
122 workDir string
123 dockerClient DockerClient
124 gitClient GitClient
125 storageClient StorageClient
126 notifier Notifier
127 monitor Monitor
128 mutex sync.RWMutex
129}
130
131type DockerClient interface {
132 BuildImage(ctx context.Context, dockerfilePath, tag string, buildArgs map[string]string) error
133 RunContainer(ctx context.Context, image string, config ContainerConfig)
134 StopContainer(ctx context.Context, containerID string) error
135 RemoveContainer(ctx context.Context, containerID string) error
136 GetContainerLogs(ctx context.Context, containerID string)
137 CopyFromContainer(ctx context.Context, containerID, srcPath, destPath string) error
138}
139
140type GitClient interface {
141 CloneRepository(ctx context.Context, repoURL, destPath string, branch string) error
142 GetLatestCommit(repoPath string)
143 CreateTag(repoPath, tag, message string) error
144 PushTags(repoPath string) error
145}
146
147type StorageClient interface {
148 UploadFile(ctx context.Context, bucket, key string, data []byte) error
149 DownloadFile(ctx context.Context, bucket, key string)
150 ListFiles(ctx context.Context, bucket, prefix string)
151 DeleteFile(ctx context.Context, bucket, key string) error
152}
153
154type Notifier interface {
155 SendNotification(ctx context.Context, notification Notification, execution *PipelineExecution) error
156}
157
158type Monitor interface {
159 RecordMetric(name string, value float64, tags map[string]string)
160 RecordEvent(event string, data map[string]interface{})
161 IncrementCounter(name string, tags map[string]string)
162}
163
164func NewPipelineEngine(workDir string, dockerClient DockerClient, gitClient GitClient, storageClient StorageClient, notifier Notifier, monitor Monitor) *PipelineEngine {
165 return &PipelineEngine{
166 pipelines: make(map[string]*Pipeline),
167 executions: make(map[string]*PipelineExecution),
168 workDir: workDir,
169 dockerClient: dockerClient,
170 gitClient: gitClient,
171 storageClient: storageClient,
172 notifier: notifier,
173 monitor: monitor,
174 }
175}
176
177func LoadPipeline(configPath string) error {
178 data, err := os.ReadFile(configPath)
179 if err != nil {
180 return fmt.Errorf("failed to read pipeline config: %w", err)
181 }
182
183 var pipeline Pipeline
184 if err := yaml.Unmarshal(data, &pipeline); err != nil {
185 return fmt.Errorf("failed to parse pipeline config: %w", err)
186 }
187
188 pe.mutex.Lock()
189 pe.pipelines[pipeline.ID] = &pipeline
190 pe.mutex.Unlock()
191
192 log.Printf("Loaded pipeline: %s", pipeline.Name)
193 return nil
194}
195
196func ExecutePipeline(ctx context.Context, pipelineID string, trigger TriggerInfo) {
197 pe.mutex.RLock()
198 pipeline, exists := pe.pipelines[pipelineID]
199 pe.mutex.RUnlock()
200
201 if !exists {
202 return nil, fmt.Errorf("pipeline not found: %s", pipelineID)
203 }
204
205 execution := &PipelineExecution{
206 ID: generateExecutionID(),
207 Pipeline: pipeline,
208 Status: "pending",
209 StartedAt: time.Now(),
210 Commit: trigger.Commit,
211 Branch: trigger.Branch,
212 TriggeredBy: trigger.TriggeredBy,
213 Variables: make(map[string]string),
214 Stages: make([]StageExecution, len(pipeline.Stages)),
215 Logs: make([]LogEntry, 0),
216 Artifacts: make([]ArtifactFile, 0),
217 }
218
219 // Merge pipeline and trigger variables
220 for k, v := range pipeline.Variables {
221 execution.Variables[k] = v
222 }
223 for k, v := range trigger.Variables {
224 execution.Variables[k] = v
225 }
226
227 pe.mutex.Lock()
228 pe.executions[execution.ID] = execution
229 pe.mutex.Unlock()
230
231 // Start execution in goroutine
232 go pe.runPipeline(ctx, execution)
233
234 // Send notification
235 pe.notifier.SendNotification(ctx, Notification{
236 Type: "pipeline_started",
237 Target: "default",
238 }, execution)
239
240 pe.monitor.RecordEvent("pipeline.started", map[string]interface{}{
241 "pipeline_id": pipelineID,
242 "execution_id": execution.ID,
243 "trigger": trigger.TriggeredBy,
244 "branch": trigger.Branch,
245 })
246
247 return execution, nil
248}
249
250func runPipeline(ctx context.Context, execution *PipelineExecution) {
251 defer func() {
252 completedAt := time.Now()
253 execution.CompletedAt = &completedAt
254
255 if execution.Status == "running" {
256 execution.Status = "completed"
257 }
258
259 // Send completion notification
260 pe.notifier.SendNotification(ctx, Notification{
261 Type: "pipeline_completed",
262 Target: "default",
263 }, execution)
264
265 pe.monitor.RecordEvent("pipeline.completed", map[string]interface{}{
266 "execution_id": execution.ID,
267 "status": execution.Status,
268 "duration": completedAt.Sub(execution.StartedAt).String(),
269 })
270 }()
271
272 execution.Status = "running"
273
274 // Create workspace
275 workspace := filepath.Join(pe.workDir, execution.ID)
276 if err := os.MkdirAll(workspace, 0755); err != nil {
277 execution.Status = "failed"
278 execution.addLog("error", "", fmt.Sprintf("Failed to create workspace: %v", err))
279 return
280 }
281 defer os.RemoveAll(workspace)
282
283 // Clone repository
284 repoPath := filepath.Join(workspace, "repo")
285 if err := pe.gitClient.CloneRepository(ctx, execution.Pipeline.Trigger.Branches[0], repoPath, execution.Branch); err != nil {
286 execution.Status = "failed"
287 execution.addLog("error", "", fmt.Sprintf("Failed to clone repository: %v", err))
288 return
289 }
290
291 // Execute stages
292 for i, stage := range execution.Pipeline.Stages {
293 stageExec := &execution.Stages[i]
294 stageExec.Name = stage.Name
295 stageExec.Status = "pending"
296 stageExec.StartedAt = time.Now()
297
298 // Check dependencies
299 if !pe.checkStageDependencies(stage.DependsOn, execution.Stages) {
300 stageExec.Status = "skipped"
301 stageExec.CompletedAt = &[]time.Time{time.Now()}[0]
302 continue
303 }
304
305 // Execute stage
306 if err := pe.executeStage(ctx, stage, stageExec, execution, repoPath); err != nil {
307 stageExec.Status = "failed"
308 stageExec.Error = err.Error()
309 execution.Status = "failed"
310 execution.addLog("error", stage.Name, fmt.Sprintf("Stage failed: %v", err))
311 return
312 }
313
314 stageExec.Status = "completed"
315 completedAt := time.Now()
316 stageExec.CompletedAt = &completedAt
317 stageExec.Duration = completedAt.Sub(stageExec.StartedAt).String()
318
319 execution.addLog("info", stage.Name, fmt.Sprintf("Stage completed in %s", stageExec.Duration))
320 }
321
322 if execution.Status == "running" {
323 execution.Status = "completed"
324 }
325
326 execution.addLog("info", "", fmt.Sprintf("Pipeline %s", execution.Status))
327}
328
329func executeStage(ctx context.Context, stage Stage, stageExec *StageExecution, execution *PipelineExecution, repoPath string) error {
330 stageExec.Status = "running"
331
332 // Prepare environment variables
333 envVars := make(map[string]string)
334 for k, v := range execution.Variables {
335 envVars[k] = v
336 }
337 for k, v := range stage.Variables {
338 envVars[k] = v
339 }
340
341 // Add default environment variables
342 envVars["CI"] = "true"
343 envVars["PIPELINE_ID"] = execution.ID
344 envVars["STAGE_NAME"] = stage.Name
345 envVars["WORKSPACE"] = repoPath
346
347 // Create Docker image if specified
348 image := stage.Image
349 if image == "" {
350 image = execution.Pipeline.Environment.Image
351 }
352
353 if image == "" {
354 // Build from Dockerfile
355 dockerfilePath := filepath.Join(repoPath, "Dockerfile")
356 imageTag := fmt.Sprintf("pipeline-%s-%s", execution.ID, stage.Name)
357
358 if err := pe.dockerClient.BuildImage(ctx, dockerfilePath, imageTag, map[string]string{
359 "PIPELINE_ID": execution.ID,
360 "STAGE_NAME": stage.Name,
361 }); err != nil {
362 return fmt.Errorf("failed to build Docker image: %w", err)
363 }
364 image = imageTag
365 }
366
367 // Prepare container configuration
368 containerConfig := ContainerConfig{
369 Image: image,
370 Env: envVars,
371 WorkingDir: repoPath,
372 Mounts: []Mount{
373 {Source: repoPath, Destination: "/workspace"},
374 },
375 Network: "pipeline-network",
376 }
377
378 // Run container
379 containerID, err := pe.dockerClient.RunContainer(ctx, image, containerConfig)
380 if err != nil {
381 return fmt.Errorf("failed to run container: %w", err)
382 }
383 defer pe.dockerClient.RemoveContainer(ctx, containerID)
384
385 // Execute scripts
386 for _, script := range stage.Script {
387 if err := pe.executeScript(ctx, containerID, script, stageExec); err != nil {
388 return fmt.Errorf("script execution failed: %w", err)
389 }
390 }
391
392 // Collect artifacts
393 for _, artifact := range stage.Artifacts {
394 if err := pe.collectArtifact(ctx, containerID, artifact, stageExec, execution); err != nil {
395 execution.addLog("warning", stage.Name, fmt.Sprintf("Failed to collect artifact %s: %v", artifact.Path, err))
396 }
397 }
398
399 return nil
400}
401
402func executeScript(ctx context.Context, containerID, script string, stageExec *StageExecution) error {
403 // This would typically use docker exec to run the script
404 // For demonstration, we'll simulate script execution
405 stageExec.Logs += fmt.Sprintf("$ %s\n", script)
406
407 // Simulate execution time
408 time.Sleep(100 * time.Millisecond)
409
410 // Simulate success
411 stageExec.Logs += "Script executed successfully\n"
412
413 return nil
414}
415
416func collectArtifact(ctx context.Context, containerID string, artifact Artifact, stageExec *StageExecution, execution *PipelineExecution) error {
417 // Create artifact directory
418 artifactDir := filepath.Join(pe.workDir, execution.ID, "artifacts", stageExec.Name)
419 if err := os.MkdirAll(artifactDir, 0755); err != nil {
420 return err
421 }
422
423 // Copy from container
424 localPath := filepath.Join(artifactDir, filepath.Base(artifact.Path))
425 if err := pe.dockerClient.CopyFromContainer(ctx, containerID, artifact.Path, localPath); err != nil {
426 return err
427 }
428
429 // Get file info
430 info, err := os.Stat(localPath)
431 if err != nil {
432 return err
433 }
434
435 // Upload to storage
436 data, err := os.ReadFile(localPath)
437 if err != nil {
438 return err
439 }
440
441 storageKey := fmt.Sprintf("pipelines/%s/%s/%s", execution.ID, stageExec.Name, filepath.Base(artifact.Path))
442 if err := pe.storageClient.UploadFile(ctx, "artifacts", storageKey, data); err != nil {
443 return err
444 }
445
446 // Record artifact
447 artifactFile := ArtifactFile{
448 Name: filepath.Base(artifact.Path),
449 Path: storageKey,
450 Size: info.Size(),
451 Type: artifact.Type,
452 }
453
454 stageExec.Artifacts = append(stageExec.Artifacts, artifactFile)
455 execution.Artifacts = append(execution.Artifacts, artifactFile)
456
457 return nil
458}
459
460func checkStageDependencies(dependencies []string, stages []StageExecution) bool {
461 for _, dep := range dependencies {
462 found := false
463 for _, stage := range stages {
464 if stage.Name == dep {
465 found = true
466 if stage.Status != "completed" {
467 return false
468 }
469 break
470 }
471 }
472 if !found {
473 return false
474 }
475 }
476 return true
477}
478
479func GetExecution(executionID string) {
480 pe.mutex.RLock()
481 defer pe.mutex.RUnlock()
482
483 execution, exists := pe.executions[executionID]
484 if !exists {
485 return nil, fmt.Errorf("execution not found: %s", executionID)
486 }
487
488 return execution, nil
489}
490
491func ListExecutions(pipelineID string) {
492 pe.mutex.RLock()
493 defer pe.mutex.RUnlock()
494
495 var executions []*PipelineExecution
496 for _, execution := range pe.executions {
497 if pipelineID == "" || execution.Pipeline.ID == pipelineID {
498 executions = append(executions, execution)
499 }
500 }
501
502 return executions, nil
503}
504
505func addLog(level, stage, message string) {
506 entry := LogEntry{
507 Timestamp: time.Now(),
508 Level: level,
509 Stage: stage,
510 Message: message,
511 }
512 execution.Logs = append(execution.Logs, entry)
513}
514
515type TriggerInfo struct {
516 Commit string `json:"commit"`
517 Branch string `json:"branch"`
518 TriggeredBy string `json:"triggered_by"`
519 Variables map[string]string `json:"variables"`
520}
521
522type ContainerConfig struct {
523 Image string `json:"image"`
524 Env map[string]string `json:"env"`
525 WorkingDir string `json:"working_dir"`
526 Mounts []Mount `json:"mounts"`
527 Network string `json:"network"`
528}
529
530type Mount struct {
531 Source string `json:"source"`
532 Destination string `json:"destination"`
533}
534
535func generateExecutionID() string {
536 return fmt.Sprintf("exec-%d", time.Now().UnixNano())
537}
Task 2: Implement Monitoring System
Build comprehensive monitoring with Prometheus:
1// pkg/monitoring/prometheus.go
2package monitoring
3
4import (
5 "context"
6 "fmt"
7 "log"
8 "net/http"
9 "time"
10
11 "github.com/prometheus/client_golang/api"
12 v1 "github.com/prometheus/client_golang/api/prometheus/v1"
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/prometheus/client_golang/prometheus/promhttp"
15)
16
17type PrometheusMonitor struct {
18 client v1.API
19 registry *prometheus.Registry
20
21 // Custom metrics
22 buildDuration *prometheus.HistogramVec
23 buildSuccess *prometheus.CounterVec
24 buildTotal *prometheus.CounterVec
25 deploymentTotal *prometheus.CounterVec
26 testDuration *prometheus.HistogramVec
27 testPassRate *prometheus.GaugeVec
28
29 // System metrics
30 cpuUsage *prometheus.GaugeVec
31 memoryUsage *prometheus.GaugeVec
32 diskUsage *prometheus.GaugeVec
33 networkIO *prometheus.CounterVec
34}
35
36type MonitoringConfig struct {
37 PrometheusURL string `yaml:"prometheus_url"`
38 ListenPort int `yaml:"listen_port"`
39 MetricsPath string `yaml:"metrics_path"`
40}
41
42func NewPrometheusMonitor(config MonitoringConfig) {
43 // Create Prometheus client
44 client, err := api.NewClient(api.Config{
45 Address: config.PrometheusURL,
46 })
47 if err != nil {
48 return nil, fmt.Errorf("failed to create Prometheus client: %w", err)
49 }
50
51 v1api := v1.NewAPI(client)
52
53 // Create registry
54 registry := prometheus.NewRegistry()
55
56 monitor := &PrometheusMonitor{
57 client: v1api,
58 registry: registry,
59
60 buildDuration: prometheus.NewHistogramVec(
61 prometheus.HistogramOpts{
62 Name: "pipeline_build_duration_seconds",
63 Help: "Duration of pipeline builds",
64 Buckets: prometheus.DefBuckets,
65 },
66 []string{"pipeline", "stage", "status"},
67 ),
68
69 buildSuccess: prometheus.NewCounterVec(
70 prometheus.CounterOpts{
71 Name: "pipeline_build_success_total",
72 Help: "Total number of successful pipeline builds",
73 },
74 []string{"pipeline", "branch"},
75 ),
76
77 buildTotal: prometheus.NewCounterVec(
78 prometheus.CounterOpts{
79 Name: "pipeline_build_total",
80 Help: "Total number of pipeline builds",
81 },
82 []string{"pipeline", "branch", "status"},
83 ),
84
85 deploymentTotal: prometheus.NewCounterVec(
86 prometheus.CounterOpts{
87 Name: "deployment_total",
88 Help: "Total number of deployments",
89 },
90 []string{"service", "environment", "status"},
91 ),
92
93 testDuration: prometheus.NewHistogramVec(
94 prometheus.HistogramOpts{
95 Name: "test_duration_seconds",
96 Help: "Duration of test execution",
97 Buckets: prometheus.DefBuckets,
98 },
99 []string{"test_type", "package"},
100 ),
101
102 testPassRate: prometheus.NewGaugeVec(
103 prometheus.GaugeOpts{
104 Name: "test_pass_rate",
105 Help: "Test pass rate percentage",
106 },
107 []string{"test_type", "package"},
108 ),
109
110 cpuUsage: prometheus.NewGaugeVec(
111 prometheus.GaugeOpts{
112 Name: "system_cpu_usage_percent",
113 Help: "CPU usage percentage",
114 },
115 []string{"instance", "service"},
116 ),
117
118 memoryUsage: prometheus.NewGaugeVec(
119 prometheus.GaugeOpts{
120 Name: "system_memory_usage_bytes",
121 Help: "Memory usage in bytes",
122 },
123 []string{"instance", "service"},
124 ),
125
126 diskUsage: prometheus.NewGaugeVec(
127 prometheus.GaugeOpts{
128 Name: "system_disk_usage_bytes",
129 Help: "Disk usage in bytes",
130 },
131 []string{"instance", "mountpoint"},
132 ),
133
134 networkIO: prometheus.NewCounterVec(
135 prometheus.CounterOpts{
136 Name: "system_network_io_bytes_total",
137 Help: "Network I/O in bytes",
138 },
139 []string{"instance", "interface", "direction"},
140 ),
141 }
142
143 // Register metrics
144 registry.MustRegister(
145 monitor.buildDuration,
146 monitor.buildSuccess,
147 monitor.buildTotal,
148 monitor.deploymentTotal,
149 monitor.testDuration,
150 monitor.testPassRate,
151 monitor.cpuUsage,
152 monitor.memoryUsage,
153 monitor.diskUsage,
154 monitor.networkIO,
155 )
156
157 return monitor, nil
158}
159
160func RecordBuildStart(pipeline, branch string) {
161 pm.buildTotal.WithLabelValues(pipeline, branch, "started").Inc()
162}
163
164func RecordBuildComplete(pipeline, branch, stage string, duration time.Duration, success bool) {
165 status := "success"
166 if !success {
167 status = "failed"
168 }
169
170 pm.buildDuration.WithLabelValues(pipeline, stage, status).Observe(duration.Seconds())
171 pm.buildTotal.WithLabelValues(pipeline, branch, status).Inc()
172
173 if success {
174 pm.buildSuccess.WithLabelValues(pipeline, branch).Inc()
175 }
176}
177
178func RecordDeployment(service, environment, status string) {
179 pm.deploymentTotal.WithLabelValues(service, environment, status).Inc()
180}
181
182func RecordTestExecution(testType, pkg string, duration time.Duration, passed, total int) {
183 pm.testDuration.WithLabelValues(testType, pkg).Observe(duration.Seconds())
184
185 if total > 0 {
186 passRate := float64(passed) / float64(total) * 100
187 pm.testPassRate.WithLabelValues(testType, pkg).Set(passRate)
188 }
189}
190
191func UpdateSystemMetrics(instance, service string, cpu float64, memory, disk uint64) {
192 pm.cpuUsage.WithLabelValues(instance, service).Set(cpu)
193 pm.memoryUsage.WithLabelValues(instance, service).Set(float64(memory))
194}
195
196func RecordNetworkIO(instance, iface, direction string, bytes uint64) {
197 pm.networkIO.WithLabelValues(instance, iface, direction).Add(float64(bytes))
198}
199
200func QueryMetrics(ctx context.Context, query string) {
201 result, warnings, err := pm.client.Query(ctx, query, time.Now())
202 if err != nil {
203 return nil, fmt.Errorf("error querying Prometheus: %w", err)
204 }
205
206 if len(warnings) > 0 {
207 log.Printf("Prometheus query warnings: %v", warnings)
208 }
209
210 return result, nil
211}
212
213func StartMetricsServer(listenAddr string) error {
214 mux := http.NewServeMux()
215 mux.Handle(pm.metricsPath, promhttp.HandlerFor(pm.registry, promhttp.HandlerOpts{}))
216
217 server := &http.Server{
218 Addr: listenAddr,
219 Handler: mux,
220 }
221
222 log.Printf("Starting metrics server on %s", listenAddr)
223 return server.ListenAndServe()
224}
225
226func GetSystemHealth(ctx context.Context) {
227 health := make(map[string]interface{})
228
229 // Query system metrics
230 queries := map[string]string{
231 "cpu_usage": "avg by) * 100))",
232 "memory_usage": "avg by) * 100)",
233 "disk_usage": "avg by) * 100)",
234 }
235
236 for name, query := range queries {
237 result, err := pm.QueryMetrics(ctx, query)
238 if err != nil {
239 health[name] = map[string]interface{}{
240 "error": err.Error(),
241 }
242 continue
243 }
244
245 health[name] = result
246 }
247
248 return health, nil
249}
250
251// Alerting system
252type AlertManager struct {
253 rules []AlertRule
254 notifier Notifier
255}
256
257type AlertRule struct {
258 Name string `yaml:"name"`
259 Query string `yaml:"query"`
260 Condition string `yaml:"condition"`
261 Duration time.Duration `yaml:"duration"`
262 Severity string `yaml:"severity"`
263 Annotations map[string]string `yaml:"annotations"`
264 Labels map[string]string `yaml:"labels"`
265}
266
267type Alert struct {
268 RuleName string `json:"rule_name"`
269 State string `json:"state"`
270 Severity string `json:"severity"`
271 Annotations map[string]string `json:"annotations"`
272 Labels map[string]string `json:"labels"`
273 Timestamp time.Time `json:"timestamp"`
274}
275
276func NewAlertManager(notifier Notifier) *AlertManager {
277 return &AlertManager{
278 rules: make([]AlertRule, 0),
279 notifier: notifier,
280 }
281}
282
283func AddRule(rule AlertRule) {
284 am.rules = append(am.rules, rule)
285}
286
287func EvaluateRules(ctx context.Context, monitor *PrometheusMonitor) []Alert {
288 alerts := make([]Alert, 0)
289
290 for _, rule := range am.rules {
291 result, err := monitor.QueryMetrics(ctx, rule.Query)
292 if err != nil {
293 log.Printf("Failed to evaluate alert rule %s: %v", rule.Name, err)
294 continue
295 }
296
297 // Evaluate condition
298 if am.evaluateCondition(result, rule.Condition) {
299 alert := Alert{
300 RuleName: rule.Name,
301 State: "firing",
302 Severity: rule.Severity,
303 Annotations: rule.Annotations,
304 Labels: rule.Labels,
305 Timestamp: time.Now(),
306 }
307
308 alerts = append(alerts, alert)
309
310 // Send notification
311 am.notifier.SendAlert(ctx, alert)
312 }
313 }
314
315 return alerts
316}
317
318func evaluateCondition(result interface{}, condition string) bool {
319 // Simplified condition evaluation
320 // In real implementation, this would parse and evaluate the condition
321 return true
322}
Task 3: Build Logging Infrastructure
Create structured logging and log aggregation:
1// pkg/logging/infrastructure.go
2package logging
3
4import (
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "log"
10 "os"
11 "time"
12
13 "github.com/elastic/go-elasticsearch/v8"
14 "github.com/sirupsen/logrus"
15 "gopkg.in/natefinch/lumberjack.v2"
16)
17
18type LogLevel string
19
20const (
21 DebugLevel LogLevel = "debug"
22 InfoLevel LogLevel = "info"
23 WarnLevel LogLevel = "warn"
24 ErrorLevel LogLevel = "error"
25 FatalLevel LogLevel = "fatal"
26)
27
28type LogEntry struct {
29 Timestamp time.Time `json:"timestamp"`
30 Level LogLevel `json:"level"`
31 Message string `json:"message"`
32 Service string `json:"service"`
33 Component string `json:"component"`
34 TraceID string `json:"trace_id,omitempty"`
35 SpanID string `json:"span_id,omitempty"`
36 UserID string `json:"user_id,omitempty"`
37 RequestID string `json:"request_id,omitempty"`
38 Fields map[string]interface{} `json:"fields,omitempty"`
39 Error string `json:"error,omitempty"`
40 Stack string `json:"stack,omitempty"`
41 Duration string `json:"duration,omitempty"`
42 Source string `json:"source"`
43 Environment string `json:"environment"`
44 Version string `json:"version"`
45}
46
47type LoggingConfig struct {
48 Level LogLevel `yaml:"level"`
49 Format string `yaml:"format"` // json, text
50 Service string `yaml:"service"`
51 Environment string `yaml:"environment"`
52 Version string `yaml:"version"`
53 Output string `yaml:"output"` // stdout, file, elasticsearch
54 FilePath string `yaml:"file_path"`
55 MaxSize int `yaml:"max_size"`
56 MaxBackups int `yaml:"max_backups"`
57 MaxAge int `yaml:"max_age"`
58 Elasticsearch struct {
59 URLs []string `yaml:"urls"`
60 Index string `yaml:"index"`
61 Username string `yaml:"username"`
62 Password string `yaml:"password"`
63 } `yaml:"elasticsearch"`
64}
65
66type LoggingInfrastructure struct {
67 config LoggingConfig
68 logger *logrus.Logger
69 elastic *elasticsearch.Client
70 fileLogger *lumberjack.Logger
71}
72
73func NewLoggingInfrastructure(config LoggingConfig) {
74 // Create logrus logger
75 logger := logrus.New()
76
77 // Set log level
78 level, err := logrus.ParseLevel(string(config.Level))
79 if err != nil {
80 return nil, fmt.Errorf("invalid log level: %w", err)
81 }
82 logger.SetLevel(level)
83
84 // Set formatter
85 if config.Format == "json" {
86 logger.SetFormatter(&logrus.JSONFormatter{
87 TimestampFormat: time.RFC3339,
88 })
89 } else {
90 logger.SetFormatter(&logrus.TextFormatter{
91 FullTimestamp: true,
92 TimestampFormat: time.RFC3339,
93 })
94 }
95
96 infra := &LoggingInfrastructure{
97 config: config,
98 logger: logger,
99 }
100
101 // Setup output
102 if err := infra.setupOutput(); err != nil {
103 return nil, fmt.Errorf("failed to setup logging output: %w", err)
104 }
105
106 return infra, nil
107}
108
109func setupOutput() error {
110 switch li.config.Output {
111 case "stdout":
112 li.logger.SetOutput(os.Stdout)
113 case "file":
114 li.fileLogger = &lumberjack.Logger{
115 Filename: li.config.FilePath,
116 MaxSize: li.config.MaxSize,
117 MaxBackups: li.config.MaxBackups,
118 MaxAge: li.config.MaxAge,
119 Compress: true,
120 }
121 li.logger.SetOutput(li.fileLogger)
122 case "elasticsearch":
123 if err := li.setupElasticsearch(); err != nil {
124 return fmt.Errorf("failed to setup Elasticsearch: %w", err)
125 }
126 // Still log to stdout as fallback
127 li.logger.SetOutput(os.Stdout)
128 default:
129 return fmt.Errorf("unsupported output type: %s", li.config.Output)
130 }
131
132 return nil
133}
134
135func setupElasticsearch() error {
136 cfg := elasticsearch.Config{
137 Addresses: li.config.Elasticsearch.URLs,
138 }
139
140 if li.config.Elasticsearch.Username != "" && li.config.Elasticsearch.Password != "" {
141 cfg.Username = li.config.Elasticsearch.Username
142 cfg.Password = li.config.Elasticsearch.Password
143 }
144
145 client, err := elasticsearch.NewClient(cfg)
146 if err != nil {
147 return fmt.Errorf("failed to create Elasticsearch client: %w", err)
148 }
149
150 // Test connection
151 res, err := client.Info()
152 if err != nil {
153 return fmt.Errorf("failed to connect to Elasticsearch: %w", err)
154 }
155 res.Body.Close()
156
157 li.elastic = client
158 return nil
159}
160
161func Log(entry LogEntry) error {
162 // Set default fields
163 if entry.Service == "" {
164 entry.Service = li.config.Service
165 }
166 if entry.Environment == "" {
167 entry.Environment = li.config.Environment
168 }
169 if entry.Version == "" {
170 entry.Version = li.config.Version
171 }
172 if entry.Timestamp.IsZero() {
173 entry.Timestamp = time.Now()
174 }
175
176 // Log to logrus
177 fields := logrus.Fields{
178 "service": entry.Service,
179 "component": entry.Component,
180 "environment": entry.Environment,
181 "version": entry.Version,
182 }
183
184 if entry.TraceID != "" {
185 fields["trace_id"] = entry.TraceID
186 }
187 if entry.SpanID != "" {
188 fields["span_id"] = entry.SpanID
189 }
190 if entry.UserID != "" {
191 fields["user_id"] = entry.UserID
192 }
193 if entry.RequestID != "" {
194 fields["request_id"] = entry.RequestID
195 }
196 if entry.Duration != "" {
197 fields["duration"] = entry.Duration
198 }
199 if entry.Error != "" {
200 fields["error"] = entry.Error
201 }
202 if entry.Stack != "" {
203 fields["stack"] = entry.Stack
204 }
205
206 // Add custom fields
207 for k, v := range entry.Fields {
208 fields[k] = v
209 }
210
211 level, _ := logrus.ParseLevel(string(entry.Level))
212 li.logger.WithFields(fields).Log(level, entry.Message)
213
214 // Send to Elasticsearch if configured
215 if li.elastic != nil {
216 go li.sendToElasticsearch(entry)
217 }
218
219 return nil
220}
221
222func sendToElasticsearch(entry LogEntry) {
223 if li.elastic == nil {
224 return
225 }
226
227 // Create index name with date pattern
228 indexName := fmt.Sprintf("%s-%s", li.config.Elasticsearch.Index, time.Now().Format("2006.01.02"))
229
230 // Convert to JSON
231 data, err := json.Marshal(entry)
232 if err != nil {
233 log.Printf("Failed to marshal log entry: %v", err)
234 return
235 }
236
237 // Index document
238 req := elasticsearch.IndexRequest{
239 Index: indexName,
240 Body: bytes.NewReader(data),
241 Refresh: "false",
242 }
243
244 res, err := req.Do(context.Background(), li.elastic)
245 if err != nil {
246 log.Printf("Failed to index log entry: %v", err)
247 return
248 }
249 defer res.Body.Close()
250
251 if res.IsError() {
252 log.Printf("Elasticsearch indexing error: %s", res.Status())
253 }
254}
255
256func CreateLogger(component string) *Logger {
257 return &Logger{
258 infra: li,
259 component: component,
260 }
261}
262
263func QueryLogs(ctx context.Context, query string, limit int) {
264 if li.elastic == nil {
265 return nil, fmt.Errorf("Elasticsearch not configured")
266 }
267
268 // Build Elasticsearch query
269 esQuery := map[string]interface{}{
270 "query": map[string]interface{}{
271 "bool": map[string]interface{}{
272 "must": []map[string]interface{}{
273 {
274 "query_string": map[string]interface{}{
275 "query": query,
276 },
277 },
278 },
279 "filter": []map[string]interface{}{
280 {
281 "term": map[string]interface{}{
282 "service": li.config.Service,
283 },
284 },
285 },
286 },
287 "sort": []map[string]interface{}{
288 {
289 "timestamp": map[string]interface{}{
290 "order": "desc",
291 },
292 },
293 },
294 "size": limit,
295 }
296
297 // Execute search
298 var buf bytes.Buffer
299 if err := json.NewEncoder(&buf).Encode(esQuery); err != nil {
300 return nil, fmt.Errorf("failed to encode search query: %w", err)
301 }
302
303 indexPattern := fmt.Sprintf("%s-*", li.config.Elasticsearch.Index)
304 res, err := li.elastic.Search(
305 li.elastic.Search.WithContext(ctx),
306 li.elastic.Search.WithIndex(indexPattern),
307 li.elastic.Search.WithBody(&buf),
308 )
309 if err != nil {
310 return nil, fmt.Errorf("failed to search logs: %w", err)
311 }
312 defer res.Body.Close()
313
314 if res.IsError() {
315 return nil, fmt.Errorf("Elasticsearch search error: %s", res.Status())
316 }
317
318 // Parse results
319 var searchResult map[string]interface{}
320 if err := json.NewDecoder(res.Body).Decode(&searchResult); err != nil {
321 return nil, fmt.Errorf("failed to decode search results: %w", err)
322 }
323
324 hits, ok := searchResult["hits"].(map[string]interface{})
325 if !ok {
326 return []LogEntry{}, nil
327 }
328
329 hitsList, ok := hits["hits"].([]interface{})
330 if !ok {
331 return []LogEntry{}, nil
332 }
333
334 logs := make([]LogEntry, 0, len(hitsList))
335 for _, hit := range hitsList {
336 hitMap, ok := hit.(map[string]interface{})
337 if !ok {
338 continue
339 }
340
341 source, ok := hitMap["_source"].(map[string]interface{})
342 if !ok {
343 continue
344 }
345
346 // Convert to LogEntry
347 data, err := json.Marshal(source)
348 if err != nil {
349 continue
350 }
351
352 var entry LogEntry
353 if err := json.Unmarshal(data, &entry); err != nil {
354 continue
355 }
356
357 logs = append(logs, entry)
358 }
359
360 return logs, nil
361}
362
363type Logger struct {
364 infra *LoggingInfrastructure
365 component string
366 fields map[string]interface{}
367}
368
369func WithField(key string, value interface{}) *Logger {
370 newLogger := &Logger{
371 infra: l.infra,
372 component: l.component,
373 fields: make(map[string]interface{}),
374 }
375
376 // Copy existing fields
377 for k, v := range l.fields {
378 newLogger.fields[k] = v
379 }
380
381 // Add new field
382 newLogger.fields[key] = value
383 return newLogger
384}
385
386func WithFields(fields map[string]interface{}) *Logger {
387 newLogger := &Logger{
388 infra: l.infra,
389 component: l.component,
390 fields: make(map[string]interface{}),
391 }
392
393 // Copy existing fields
394 for k, v := range l.fields {
395 newLogger.fields[k] = v
396 }
397
398 // Add new fields
399 for k, v := range fields {
400 newLogger.fields[k] = v
401 }
402
403 return newLogger
404}
405
406func WithError(err error) *Logger {
407 return l.WithField("error", err.Error())
408}
409
410func Debug(message string) {
411 l.log(DebugLevel, message)
412}
413
414func Info(message string) {
415 l.log(InfoLevel, message)
416}
417
418func Warn(message string) {
419 l.log(WarnLevel, message)
420}
421
422func Error(message string) {
423 l.log(ErrorLevel, message)
424}
425
426func Fatal(message string) {
427 l.log(FatalLevel, message)
428 os.Exit(1)
429}
430
431func log(level LogLevel, message string) {
432 entry := LogEntry{
433 Level: level,
434 Message: message,
435 Component: l.component,
436 Fields: l.fields,
437 }
438
439 l.infra.Log(entry)
440}
Solution Approach
Click to see detailed solution
This would include complete implementations of all DevOps toolchain components including deployment automation, configuration management, health checks, and quality gates.
Testing Your DevOps Platform
1. Test CI/CD Pipeline
1# Build the platform
2go build -o devops-platform ./cmd/devops-platform
3
4# Create a test pipeline configuration
5cat > test-pipeline.yaml << EOF
6id: test-app
7name: Test Application CI/CD
8trigger:
9 type: webhook
10 branches: [main, develop]
11environment:
12 name: test
13 image: golang:1.21-alpine
14 variables:
15 GO_VERSION: "1.21"
16stages:
17 - name: test
18 script:
19 - go mod download
20 - go test -v ./...
21 artifacts:
22 - path: coverage.out
23 type: file
24 - name: build
25 script:
26 - go build -o app ./cmd/app
27 depends_on: [test]
28 - name: deploy
29 script:
30 - echo "Deploying to staging"
31 depends_on: [build]
32EOF
33
34# Execute pipeline
35./devops-platform pipeline execute --config test-pipeline.yaml
2. Test Monitoring
1# Check metrics endpoint
2curl http://localhost:9090/metrics
3
4# Query metrics
5curl "http://localhost:9091/api/v1/query?query=pipeline_build_total"
6
7# Test alerts
8curl -X POST http://localhost:9093/api/v1/alerts \
9 -H "Content-Type: application/json" \
10 -d '[{
11 "labels": {
12 "alertname": "HighFailureRate",
13 "severity": "critical"
14 }
15 }]'
3. Test Logging
1# Generate test logs
2curl -X POST http://localhost:8080/api/logs \
3 -H "Content-Type: application/json" \
4 -d '{
5 "level": "info",
6 "message": "Test log message",
7 "service": "test-service",
8 "component": "test-component"
9 }'
10
11# Query logs
12curl "http://localhost:8080/api/logs?query=service:test-service&limit=10"
Extension Challenges
- Add multi-cloud support - Deploy to AWS, GCP, Azure
- Add GitOps support - Implement ArgoCD/Flux integration
- Add security scanning - Integrate vulnerability scanning
- Add cost monitoring - Track cloud spending
- Add compliance checking - Automated compliance validation
Key Takeaways
- CI/CD pipelines automate the build, test, and deployment process
- Monitoring provides visibility into system health and performance
- Logging enables debugging and auditing of system behavior
- Infrastructure as code ensures reproducible environments
- Automation reduces manual errors and improves consistency
- Quality gates maintain high standards across deployments
- Observability is crucial for operating complex systems
This exercise demonstrates how to build a comprehensive DevOps toolchain using Go, covering the complete lifecycle of modern software development and operations.