Building Kubernetes Operators

Why This Matters - The Automation Revolution

Consider a senior DevOps engineer who never sleeps, never makes mistakes, and automatically handles all operational tasks for your applications 24/7. That's essentially what a Kubernetes operator is - automated operational expertise encoded as software that runs perpetually in your cluster.

Real-world scenario: A large e-commerce platform runs dozens of PostgreSQL database clusters. Each cluster needs:

  • Automatic failover when the primary node fails during peak shopping season
  • Daily backups with verification before major product launches
  • Zero-downtime upgrades during high-traffic periods
  • Monitoring replication lag and automatic corrective actions

Without operators, this requires a team of DBAs working around the clock. With operators, all this knowledge becomes reusable, automated code that handles scenarios humans might miss during critical moments.

Learning Objectives

By the end of this article, you will be able to:

  • Design and implement custom resources and controllers using controller-runtime
  • Use kubebuilder to scaffold production-ready operators with proper code generation
  • Implement reconciliation patterns that maintain desired state effectively
  • Apply RBAC best practices for secure operator deployment
  • Write comprehensive tests using envtest for operator validation
  • Deploy operators with proper observability and production patterns

Core Concepts - Understanding the Operator Pattern

The Control Loop Philosophy

At its heart, Kubernetes operates on a simple but powerful concept: declarative configuration and reconciliation loops. Think of it like a thermostat:

 1// The Kubernetes way: declare what you want
 2type DesiredState struct {
 3    Temperature int    `json:"temperature"`
 4    Mode        string `json:"mode"` // "heat", "cool", "auto"
 5}
 6
 7// Kubernetes continuously works to make actual == desired
 8func Reconcile(actual ActualState) error {
 9    if actual.Temperature < t.Desired.Temperature {
10        t.turnOnHeat()
11    } else if actual.Temperature > t.Desired.Temperature {
12        t.turnOnCool()
13    }
14    return nil
15}

The thermostat doesn't just "turn on heat once" - it continuously monitors and adjusts. Kubernetes operators apply this same principle to complex applications.

Operator Components: The Building Blocks

Custom Resources - The "what":

 1# Example: What we want our database to look like
 2apiVersion: database.mycompany.com/v1alpha1
 3kind: PostgreSQL
 4metadata:
 5  name: prod-db-cluster
 6spec:
 7  replicas: 3
 8  version: "15.4"
 9  backupSchedule: "0 2 * * *"  # Daily at 2 AM
10  resources:
11    requests:
12      memory: "2Gi"
13      cpu: "1000m"

Custom Resource Definitions - The "contract":

 1// Database operator CRD definition
 2// +kubebuilder:validation:Required
 3// +kubebuilder:validation:Minimum=1
 4// +kubebuilder:validation:Maximum=10
 5Replicas int32 `json:"replicas"`
 6
 7// +kubebuilder:validation:Required
 8Version string `json:"version"`
 9
10// +kubebuilder:validation:Pattern=^[0-9]+\s[0-9]+\s\*\s\*\s\*$
11BackupSchedule string `json:"backupSchedule"`

Controller - The "how":

1// The controller watches CRs and makes them reality
2func Reconcile(ctx context.Context, req ctrl.Request) {
3    // 1. Fetch the PostgreSQL resource
4    // 2. Check what currently exists in the cluster
5    // 3. Compare desired vs actual state
6    // 4. Take action to reconcile differences
7    // 5. Update status with current state
8}

💡 Key Insight: Operators transform operational knowledge from tribal wisdom into executable code that anyone can use. This means your best DevOps practices are automatically applied, every time, without human intervention.

The Operator Maturity Journey

Not all operators are created equal. The Operator Framework defines a maturity model:

  1. Basic Install: Automated installation and configuration
  2. Seamless Upgrades: Automated upgrade process
  3. Full Lifecycle: Backup, restore, and failure recovery
  4. Deep Insights: Metrics, alerts, and log processing
  5. Auto Pilot: Horizontal/vertical scaling, auto-tuning, abnormality detection

Deep Dive - Kubernetes API Machinery

Understanding the Client-Go Library

Before building operators, you need to understand how Go applications interact with Kubernetes. The client-go library is the foundation:

 1// pkg/kubernetes/client.go - Understanding Kubernetes clients
 2
 3package kubernetes
 4
 5import (
 6    "context"
 7    "fmt"
 8
 9    corev1 "k8s.io/api/core/v1"
10    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11    "k8s.io/client-go/kubernetes"
12    "k8s.io/client-go/rest"
13    "k8s.io/client-go/tools/clientcmd"
14)
15
16// ClientManager provides access to Kubernetes API
17type ClientManager struct {
18    clientset *kubernetes.Clientset
19    config    *rest.Config
20}
21
22// NewClientManager creates a new Kubernetes client manager
23func NewClientManager(kubeconfig string) (*ClientManager, error) {
24    var config *rest.Config
25    var err error
26
27    if kubeconfig != "" {
28        // Out-of-cluster configuration (development)
29        config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
30    } else {
31        // In-cluster configuration (production)
32        config, err = rest.InClusterConfig()
33    }
34
35    if err != nil {
36        return nil, fmt.Errorf("failed to get config: %w", err)
37    }
38
39    // Create the clientset
40    clientset, err := kubernetes.NewForConfig(config)
41    if err != nil {
42        return nil, fmt.Errorf("failed to create clientset: %w", err)
43    }
44
45    return &ClientManager{
46        clientset: clientset,
47        config:    config,
48    }, nil
49}
50
51// ListPods demonstrates basic Kubernetes API operations
52func (cm *ClientManager) ListPods(ctx context.Context, namespace string) (*corev1.PodList, error) {
53    pods, err := cm.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
54    if err != nil {
55        return nil, fmt.Errorf("failed to list pods: %w", err)
56    }
57
58    return pods, nil
59}
60
61// WatchPods demonstrates watching Kubernetes resources
62func (cm *ClientManager) WatchPods(ctx context.Context, namespace string) error {
63    watcher, err := cm.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{})
64    if err != nil {
65        return fmt.Errorf("failed to create watcher: %w", err)
66    }
67    defer watcher.Stop()
68
69    fmt.Println("Watching for pod changes...")
70    for {
71        select {
72        case event, ok := <-watcher.ResultChan():
73            if !ok {
74                return fmt.Errorf("watch channel closed")
75            }
76
77            pod, ok := event.Object.(*corev1.Pod)
78            if !ok {
79                continue
80            }
81
82            fmt.Printf("Event: %s - Pod: %s/%s - Phase: %s\n",
83                event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
84
85        case <-ctx.Done():
86            return ctx.Err()
87        }
88    }
89}

Key concepts from client-go:

  1. Clientset: Type-safe clients for all Kubernetes resources
  2. RESTClient: Low-level HTTP client for custom operations
  3. Informers: Efficient caching and watching mechanisms
  4. Workqueues: Rate-limited work distribution for controllers

Informers and Caching

Operators need efficient ways to watch resources without overwhelming the API server:

 1// pkg/kubernetes/informer.go - Efficient resource watching
 2
 3package kubernetes
 4
 5import (
 6    "fmt"
 7    "time"
 8
 9    corev1 "k8s.io/api/core/v1"
10    "k8s.io/apimachinery/pkg/labels"
11    "k8s.io/client-go/informers"
12    "k8s.io/client-go/kubernetes"
13    "k8s.io/client-go/tools/cache"
14)
15
16// PodInformerManager demonstrates efficient pod watching
17type PodInformerManager struct {
18    clientset      *kubernetes.Clientset
19    informerFactory informers.SharedInformerFactory
20}
21
22// NewPodInformerManager creates a manager with informers
23func NewPodInformerManager(clientset *kubernetes.Clientset) *PodInformerManager {
24    // Create informer factory with 30 second resync period
25    factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
26
27    return &PodInformerManager{
28        clientset:      clientset,
29        informerFactory: factory,
30    }
31}
32
33// SetupPodInformer configures pod watching with event handlers
34func (pim *PodInformerManager) SetupPodInformer() {
35    // Get pod informer from factory
36    podInformer := pim.informerFactory.Core().V1().Pods().Informer()
37
38    // Add event handlers
39    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
40        AddFunc: func(obj interface{}) {
41            pod := obj.(*corev1.Pod)
42            fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
43        },
44        UpdateFunc: func(oldObj, newObj interface{}) {
45            oldPod := oldObj.(*corev1.Pod)
46            newPod := newObj.(*corev1.Pod)
47
48            // Only log if phase changed
49            if oldPod.Status.Phase != newPod.Status.Phase {
50                fmt.Printf("Pod updated: %s/%s - Phase: %s -> %s\n",
51                    newPod.Namespace, newPod.Name,
52                    oldPod.Status.Phase, newPod.Status.Phase)
53            }
54        },
55        DeleteFunc: func(obj interface{}) {
56            pod := obj.(*corev1.Pod)
57            fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
58        },
59    })
60}
61
62// Start begins the informer and blocks until stopped
63func (pim *PodInformerManager) Start(stopCh <-chan struct{}) {
64    // Start all informers in the factory
65    pim.informerFactory.Start(stopCh)
66
67    // Wait for initial cache sync
68    pim.informerFactory.WaitForCacheSync(stopCh)
69
70    fmt.Println("Informers started and synced")
71}
72
73// ListPodsFromCache demonstrates using the informer's cache
74func (pim *PodInformerManager) ListPodsFromCache(namespace string) ([]*corev1.Pod, error) {
75    // Get lister from informer
76    lister := pim.informerFactory.Core().V1().Pods().Lister()
77
78    // List from cache instead of API server
79    pods, err := lister.Pods(namespace).List(labels.Everything())
80    if err != nil {
81        return nil, fmt.Errorf("failed to list from cache: %w", err)
82    }
83
84    return pods, nil
85}

Why informers are crucial for operators:

  • Efficient watching: Single watch connection shared across controllers
  • Local cache: Reduces API server load dramatically
  • Event batching: Handles bursts of updates efficiently
  • Resync mechanism: Ensures eventual consistency

Workqueues for Reliable Processing

Controllers need reliable, rate-limited work processing:

  1// pkg/kubernetes/workqueue.go - Reliable work processing
  2
  3package kubernetes
  4
  5import (
  6    "fmt"
  7    "time"
  8
  9    "k8s.io/apimachinery/pkg/util/runtime"
 10    "k8s.io/apimachinery/pkg/util/wait"
 11    "k8s.io/client-go/tools/cache"
 12    "k8s.io/client-go/util/workqueue"
 13)
 14
 15// Controller demonstrates workqueue pattern
 16type Controller struct {
 17    queue    workqueue.RateLimitingInterface
 18    informer cache.SharedIndexInformer
 19}
 20
 21// NewController creates a controller with workqueue
 22func NewController(informer cache.SharedIndexInformer) *Controller {
 23    // Create rate-limited workqueue
 24    queue := workqueue.NewRateLimitingQueue(
 25        workqueue.NewItemExponentialFailureRateLimiter(
 26            100*time.Millisecond, // Base delay
 27            10*time.Second,       // Max delay
 28        ),
 29    )
 30
 31    controller := &Controller{
 32        queue:    queue,
 33        informer: informer,
 34    }
 35
 36    // Add event handlers that enqueue keys
 37    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
 38        AddFunc: func(obj interface{}) {
 39            key, err := cache.MetaNamespaceKeyFunc(obj)
 40            if err == nil {
 41                queue.Add(key)
 42            }
 43        },
 44        UpdateFunc: func(oldObj, newObj interface{}) {
 45            key, err := cache.MetaNamespaceKeyFunc(newObj)
 46            if err == nil {
 47                queue.Add(key)
 48            }
 49        },
 50        DeleteFunc: func(obj interface{}) {
 51            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
 52            if err == nil {
 53                queue.Add(key)
 54            }
 55        },
 56    })
 57
 58    return controller
 59}
 60
 61// Run starts the controller workers
 62func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
 63    defer runtime.HandleCrash()
 64    defer c.queue.ShutDown()
 65
 66    fmt.Println("Starting controller")
 67
 68    // Start informer
 69    go c.informer.Run(stopCh)
 70
 71    // Wait for cache sync
 72    if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
 73        fmt.Println("Failed to sync informer cache")
 74        return
 75    }
 76
 77    fmt.Println("Controller synced and ready")
 78
 79    // Start workers
 80    for i := 0; i < workers; i++ {
 81        go wait.Until(c.runWorker, time.Second, stopCh)
 82    }
 83
 84    <-stopCh
 85    fmt.Println("Stopping controller")
 86}
 87
 88// runWorker processes items from the queue
 89func (c *Controller) runWorker() {
 90    for c.processNextItem() {
 91    }
 92}
 93
 94// processNextItem processes a single queued item
 95func (c *Controller) processNextItem() bool {
 96    // Get next item (blocks until available)
 97    key, shutdown := c.queue.Get()
 98    if shutdown {
 99        return false
100    }
101    defer c.queue.Done(key)
102
103    // Process the item
104    err := c.processItem(key.(string))
105
106    // Handle result
107    if err == nil {
108        // Success - forget any rate limiting
109        c.queue.Forget(key)
110    } else {
111        // Failure - requeue with rate limiting
112        if c.queue.NumRequeues(key) < 5 {
113            fmt.Printf("Error processing %s (will retry): %v\n", key, err)
114            c.queue.AddRateLimited(key)
115        } else {
116            fmt.Printf("Dropping %s after 5 retries: %v\n", key, err)
117            c.queue.Forget(key)
118            runtime.HandleError(err)
119        }
120    }
121
122    return true
123}
124
125// processItem handles a single item - implement your logic here
126func (c *Controller) processItem(key string) error {
127    // Get object from informer cache
128    obj, exists, err := c.informer.GetIndexer().GetByKey(key)
129    if err != nil {
130        return fmt.Errorf("fetching object with key %s failed: %w", key, err)
131    }
132
133    if !exists {
134        fmt.Printf("Object %s does not exist anymore\n", key)
135        return nil
136    }
137
138    // Process the object
139    fmt.Printf("Processing %s: %v\n", key, obj)
140
141    // Simulate some work
142    time.Sleep(100 * time.Millisecond)
143
144    return nil
145}

Workqueue benefits for operators:

  • Rate limiting: Prevents overwhelming the API server during failures
  • Deduplication: Multiple events for same object get collapsed
  • Retry logic: Automatic exponential backoff for failures
  • Ordered processing: FIFO guarantee within rate limits

Advanced Operator Patterns

Leader Election for High Availability

Production operators must handle multiple replicas with leader election:

  1// pkg/operator/leader.go - Leader election implementation
  2
  3package operator
  4
  5import (
  6    "context"
  7    "fmt"
  8    "os"
  9    "time"
 10
 11    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 12    "k8s.io/client-go/kubernetes"
 13    "k8s.io/client-go/tools/leaderelection"
 14    "k8s.io/client-go/tools/leaderelection/resourcelock"
 15)
 16
 17// LeaderElectionConfig configures leader election
 18type LeaderElectionConfig struct {
 19    Identity          string
 20    Namespace         string
 21    LockName          string
 22    LeaseDuration     time.Duration
 23    RenewDeadline     time.Duration
 24    RetryPeriod       time.Duration
 25}
 26
 27// RunWithLeaderElection runs the operator with leader election
 28func RunWithLeaderElection(
 29    ctx context.Context,
 30    clientset *kubernetes.Clientset,
 31    config LeaderElectionConfig,
 32    runFunc func(ctx context.Context),
 33) error {
 34    // Create resource lock for leader election
 35    lock := &resourcelock.LeaseLock{
 36        LeaseMeta: metav1.ObjectMeta{
 37            Name:      config.LockName,
 38            Namespace: config.Namespace,
 39        },
 40        Client: clientset.CoordinationV1(),
 41        LockConfig: resourcelock.ResourceLockConfig{
 42            Identity: config.Identity,
 43        },
 44    }
 45
 46    // Configure leader election
 47    leaderElectionConfig := leaderelection.LeaderElectionConfig{
 48        Lock:            lock,
 49        ReleaseOnCancel: true,
 50        LeaseDuration:   config.LeaseDuration,
 51        RenewDeadline:   config.RenewDeadline,
 52        RetryPeriod:     config.RetryPeriod,
 53        Callbacks: leaderelection.LeaderCallbacks{
 54            OnStartedLeading: func(ctx context.Context) {
 55                fmt.Printf("Started leading with identity: %s\n", config.Identity)
 56                runFunc(ctx)
 57            },
 58            OnStoppedLeading: func() {
 59                fmt.Printf("Stopped leading with identity: %s\n", config.Identity)
 60                os.Exit(0)
 61            },
 62            OnNewLeader: func(identity string) {
 63                if identity == config.Identity {
 64                    fmt.Println("I am the new leader")
 65                } else {
 66                    fmt.Printf("New leader elected: %s\n", identity)
 67                }
 68            },
 69        },
 70    }
 71
 72    // Start leader election
 73    leaderelection.RunOrDie(ctx, leaderElectionConfig)
 74
 75    return nil
 76}
 77
 78// Example usage of leader election
 79func ExampleLeaderElection() {
 80    ctx := context.Background()
 81
 82    // Get clientset (assumed to be created elsewhere)
 83    var clientset *kubernetes.Clientset
 84
 85    // Configure leader election
 86    config := LeaderElectionConfig{
 87        Identity:      os.Getenv("HOSTNAME"), // Pod name
 88        Namespace:     "default",
 89        LockName:      "my-operator-lock",
 90        LeaseDuration: 15 * time.Second,
 91        RenewDeadline: 10 * time.Second,
 92        RetryPeriod:   2 * time.Second,
 93    }
 94
 95    // Run operator with leader election
 96    RunWithLeaderElection(ctx, clientset, config, func(ctx context.Context) {
 97        fmt.Println("Running as leader...")
 98        // Start your operator controllers here
 99    })
100}

Finalizers for Resource Cleanup

Finalizers ensure proper cleanup when resources are deleted:

 1// pkg/operator/finalizer.go - Finalizer handling
 2
 3package operator
 4
 5import (
 6    "context"
 7    "fmt"
 8
 9    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10    "sigs.k8s.io/controller-runtime/pkg/client"
11    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
12)
13
14const (
15    // FinalizerName is our custom finalizer
16    FinalizerName = "database.mycompany.com/finalizer"
17)
18
19// ResourceWithFinalizer demonstrates finalizer usage
20type ResourceWithFinalizer struct {
21    client.Client
22}
23
24// HandleFinalizer manages finalizer logic for a resource
25func (r *ResourceWithFinalizer) HandleFinalizer(
26    ctx context.Context,
27    obj client.Object,
28    cleanup func(context.Context, client.Object) error,
29) error {
30    // Check if object is being deleted
31    if obj.GetDeletionTimestamp() != nil {
32        // Object is being deleted
33        if controllerutil.ContainsFinalizer(obj, FinalizerName) {
34            // Our finalizer is present, perform cleanup
35            fmt.Printf("Running cleanup for %s/%s\n", obj.GetNamespace(), obj.GetName())
36
37            if err := cleanup(ctx, obj); err != nil {
38                return fmt.Errorf("cleanup failed: %w", err)
39            }
40
41            // Remove our finalizer
42            controllerutil.RemoveFinalizer(obj, FinalizerName)
43            if err := r.Update(ctx, obj); err != nil {
44                return fmt.Errorf("failed to remove finalizer: %w", err)
45            }
46
47            fmt.Printf("Finalizer removed for %s/%s\n", obj.GetNamespace(), obj.GetName())
48        }
49
50        // Object will be deleted by Kubernetes
51        return nil
52    }
53
54    // Object is not being deleted, ensure finalizer exists
55    if !controllerutil.ContainsFinalizer(obj, FinalizerName) {
56        controllerutil.AddFinalizer(obj, FinalizerName)
57        if err := r.Update(ctx, obj); err != nil {
58            return fmt.Errorf("failed to add finalizer: %w", err)
59        }
60        fmt.Printf("Finalizer added to %s/%s\n", obj.GetNamespace(), obj.GetName())
61    }
62
63    return nil
64}
65
66// Example cleanup function for database operator
67func cleanupDatabase(ctx context.Context, obj client.Object) error {
68    fmt.Printf("Cleaning up database resources for %s/%s\n", obj.GetNamespace(), obj.GetName())
69
70    // Example cleanup tasks:
71    // 1. Take final backup
72    // 2. Remove external resources (S3 buckets, etc.)
73    // 3. Deregister from external services
74    // 4. Clean up associated secrets
75
76    // Simulate cleanup work
77    fmt.Println("- Taking final backup...")
78    fmt.Println("- Removing S3 backup bucket...")
79    fmt.Println("- Cleaning up secrets...")
80
81    return nil
82}

Webhooks for Validation and Mutation

Admission webhooks provide advanced validation and defaulting:

  1// api/v1alpha1/postgresql_webhook.go - Admission webhooks
  2
  3package v1alpha1
  4
  5import (
  6    "fmt"
  7    "k8s.io/apimachinery/pkg/runtime"
  8    ctrl "sigs.k8s.io/controller-runtime"
  9    logf "sigs.k8s.io/controller-runtime/pkg/log"
 10    "sigs.k8s.io/controller-runtime/pkg/webhook"
 11)
 12
 13var postgresqllog = logf.Log.WithName("postgresql-resource")
 14
 15// SetupWebhookWithManager registers webhooks with the manager
 16func (r *PostgreSQL) SetupWebhookWithManager(mgr ctrl.Manager) error {
 17    return ctrl.NewWebhookManagedBy(mgr).
 18        For(r).
 19        Complete()
 20}
 21
 22//+kubebuilder:webhook:path=/mutate-database-mycompany-com-v1alpha1-postgresql,mutating=true,failurePolicy=fail,sideEffects=None,groups=database.mycompany.com,resources=postgresqls,verbs=create;update,versions=v1alpha1,name=mpostgresql.kb.io,admissionReviewVersions=v1
 23
 24var _ webhook.Defaulter = &PostgreSQL{}
 25
 26// Default implements webhook.Defaulter - sets default values
 27func (r *PostgreSQL) Default() {
 28    postgresqllog.Info("default", "name", r.Name)
 29
 30    // Set default values if not provided
 31    if r.Spec.Version == "" {
 32        r.Spec.Version = "15.4"
 33        postgresqllog.Info("defaulting version to 15.4")
 34    }
 35
 36    if r.Spec.Replicas == 0 {
 37        r.Spec.Replicas = 3
 38        postgresqllog.Info("defaulting replicas to 3")
 39    }
 40
 41    if r.Spec.StorageClassName == "" {
 42        r.Spec.StorageClassName = "standard"
 43        postgresqllog.Info("defaulting storage class to standard")
 44    }
 45
 46    if r.Spec.BackupSchedule == "" {
 47        r.Spec.BackupSchedule = "0 2 * * *" // Daily at 2 AM
 48        postgresqllog.Info("defaulting backup schedule to daily at 2 AM")
 49    }
 50}
 51
 52//+kubebuilder:webhook:path=/validate-database-mycompany-com-v1alpha1-postgresql,mutating=false,failurePolicy=fail,sideEffects=None,groups=database.mycompany.com,resources=postgresqls,verbs=create;update,versions=v1alpha1,name=vpostgresql.kb.io,admissionReviewVersions=v1
 53
 54var _ webhook.Validator = &PostgreSQL{}
 55
 56// ValidateCreate implements webhook.Validator - validates creation
 57func (r *PostgreSQL) ValidateCreate() error {
 58    postgresqllog.Info("validate create", "name", r.Name)
 59
 60    return r.validatePostgreSQL()
 61}
 62
 63// ValidateUpdate implements webhook.Validator - validates updates
 64func (r *PostgreSQL) ValidateUpdate(old runtime.Object) error {
 65    postgresqllog.Info("validate update", "name", r.Name)
 66
 67    oldPostgres, ok := old.(*PostgreSQL)
 68    if !ok {
 69        return fmt.Errorf("expected PostgreSQL object")
 70    }
 71
 72    // Prevent version downgrades
 73    if r.Spec.Version < oldPostgres.Spec.Version {
 74        return fmt.Errorf("version downgrades are not allowed: %s -> %s",
 75            oldPostgres.Spec.Version, r.Spec.Version)
 76    }
 77
 78    // Prevent reducing replicas below 1
 79    if r.Spec.Replicas < 1 {
 80        return fmt.Errorf("replicas cannot be less than 1")
 81    }
 82
 83    return r.validatePostgreSQL()
 84}
 85
 86// ValidateDelete implements webhook.Validator - validates deletion
 87func (r *PostgreSQL) ValidateDelete() error {
 88    postgresqllog.Info("validate delete", "name", r.Name)
 89
 90    // Add protection for production databases
 91    if r.Labels["environment"] == "production" {
 92        if r.Annotations["allow-delete"] != "true" {
 93            return fmt.Errorf("production databases require 'allow-delete: true' annotation")
 94        }
 95    }
 96
 97    return nil
 98}
 99
100// validatePostgreSQL performs common validation
101func (r *PostgreSQL) validatePostgreSQL() error {
102    // Validate replica count
103    if r.Spec.Replicas < 1 || r.Spec.Replicas > 10 {
104        return fmt.Errorf("replicas must be between 1 and 10, got %d", r.Spec.Replicas)
105    }
106
107    // Validate version format
108    if r.Spec.Version == "" {
109        return fmt.Errorf("version is required")
110    }
111
112    // Validate backup schedule (cron format)
113    if !isValidCronSchedule(r.Spec.BackupSchedule) {
114        return fmt.Errorf("invalid backup schedule format: %s", r.Spec.BackupSchedule)
115    }
116
117    return nil
118}
119
120// isValidCronSchedule validates cron schedule format
121func isValidCronSchedule(schedule string) bool {
122    // Simple validation - in production use a proper cron parser
123    return schedule != ""
124}

Practical Examples - Building Your First Operator

Step 1: Setting Up the Project

First, install kubebuilder and initialize your project:

1// run
2# Install kubebuilder
3OS=$(uname | tr '[:upper:]' '[:lower:]')
4ARCH=$(uname -m | sed 's/x86_64/amd64/' | sed 's/arm64/arm64/')
5curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/${OS}/${ARCH}"
6chmod +x kubebuilder && sudo mv kubebuilder /usr/local/bin/
1// run
2# Initialize a new project
3kubebuilder init --domain mycompany.com --repo mycompany.com/database-operator
4
5# Create an API
6kubebuilder create api --group database --version v1alpha1 --kind PostgreSQL

This creates the complete project structure:

database-operator/
├── api/           # API definitions
├── controllers/   # Controller implementations
├── config/        # Kubernetes manifests
├── hack/          # Scripts and tools
├── main.go        # Operator entry point
└── Makefile       # Build and deploy targets

Step 2: Defining Your Custom Resource

Let's design a PostgreSQL operator that manages database clusters:

 1// api/v1alpha1/postgresql_types.go
 2
 3package v1alpha1
 4
 5import (
 6	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 7)
 8
 9// PostgreSQLSpec defines the desired state of PostgreSQL
10type PostgreSQLSpec struct {
11	// +kubebuilder:validation:Required
12	// +kubebuilder:validation:Minimum=1
13	// +kubebuilder:validation:Maximum=10
14	Replicas int32 `json:"replicas"`
15
16	// +kubebuilder:validation:Required
17	Version string `json:"version"`
18
19	// +kubebuilder:validation:Pattern=^[0-9]+\s[0-9]+\s\*\s\*\s\*$
20	BackupSchedule string `json:"backupSchedule"`
21
22	// +kubebuilder:validation:Optional
23	Resources corev1.ResourceRequirements `json:"resources,omitempty"`
24
25	// +kubebuilder:validation:Optional
26	StorageClassName string `json:"storageClassName,omitempty"`
27}
28
29// PostgreSQLStatus defines the observed state of PostgreSQL
30type PostgreSQLStatus struct {
31	// Represents the latest available observations of a database's state.
32	Conditions []metav1.Condition `json:"conditions,omitempty"`
33
34	// ReadyReplicas indicates the number of ready database replicas.
35	ReadyReplicas int32 `json:"readyReplicas"`
36
37	// PrimaryPod contains the name of the primary database pod.
38	PrimaryPod string `json:"primaryPod,omitempty"`
39
40	// LastBackupTime records when the last successful backup completed.
41	LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`
42}
43
44//+kubebuilder:object:root=true
45//+kubebuilder:subresource:status
46//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
47//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
48//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
49
50// PostgreSQL is the Schema for the postgresqls API
51type PostgreSQL struct {
52	metav1.TypeMeta   `json:",inline"`
53	metav1.ObjectMeta `json:"metadata,omitempty"`
54
55	Spec   PostgreSQLSpec   `json:"spec,omitempty"`
56	Status PostgreSQLStatus `json:"status,omitempty"`
57}
58
59//+kubebuilder:object:root=true
60
61// PostgreSQLList contains a list of PostgreSQL
62type PostgreSQLList struct {
63	metav1.TypeMeta `json:",inline"`
64	metav1.ListMeta `json:"metadata,omitempty"`
65	Items           []PostgreSQL `json:"items"`
66}
67
68func init() {
69	SchemeBuilder.Register(&PostgreSQL{}, &PostgreSQLList{})
70}

Step 3: Implementing the Reconciliation Logic

The controller's reconciliation loop is where the magic happens:

  1// controllers/postgresql_controller.go
  2
  3package controllers
  4
  5import (
  6	"context"
  7	"fmt"
  8	"time"
  9
 10	appsv1 "k8s.io/api/apps/v1"
 11	corev1 "k8s.io/api/core/v1"
 12	"k8s.io/apimachinery/pkg/api/errors"
 13	"k8s.io/apimachinery/pkg/api/resource"
 14	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 15	"k8s.io/apimachinery/pkg/types"
 16	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
 17
 18	databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
 19)
 20
 21// PostgreSQLReconciler reconciles a PostgreSQL object
 22type PostgreSQLReconciler struct {
 23	client.Client
 24	Scheme *runtime.Scheme
 25}
 26
 27// Reconcile implements the core reconciliation logic
 28func Reconcile(ctx context.Context, req ctrl.Request) {
 29	logger := log.FromContext(ctx)
 30
 31	// 1. Fetch the PostgreSQL instance
 32	postgres := &databasev1alpha1.PostgreSQL{}
 33	err := r.Get(ctx, req.NamespacedName, postgres)
 34	if err != nil {
 35		if errors.IsNotFound(err) {
 36			// Object was deleted, nothing to do
 37			return ctrl.Result{}, nil
 38		}
 39		logger.Error(err, "unable to fetch PostgreSQL")
 40		return ctrl.Result{}, err
 41	}
 42
 43	// 2. Check if the StatefulSet already exists, if not create a new one
 44	found := &appsv1.StatefulSet{}
 45	err = r.Get(ctx, types.NamespacedName{Name: postgres.Name, Namespace: postgres.Namespace}, found)
 46	if err != nil && errors.IsNotFound(err) {
 47		// Define a new StatefulSet
 48		dep := r.statefulSetForPostgreSQL(postgres)
 49		logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
 50		err = r.Create(ctx, dep)
 51		if err != nil {
 52			logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
 53			return ctrl.Result{}, err
 54		}
 55		// StatefulSet created successfully - return and requeue
 56		return ctrl.Result{Requeue: true}, nil
 57	} else if err != nil {
 58		logger.Error(err, "Failed to get StatefulSet")
 59		return ctrl.Result{}, err
 60	}
 61
 62	// 3. Ensure the StatefulSet size is the same as the spec
 63	size := postgres.Spec.Replicas
 64	if *found.Spec.Replicas != size {
 65		found.Spec.Replicas = &size
 66		err = r.Update(ctx, found)
 67		if err != nil {
 68			logger.Error(err, "Failed to update StatefulSet", "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
 69			return ctrl.Result{}, err
 70		}
 71		// Spec updated - return and requeue
 72		return ctrl.Result{Requeue: true}, nil
 73	}
 74
 75	// 4. Update the PostgreSQL status with the pod names
 76	podList := &corev1.PodList{}
 77	listOpts := []client.ListOption{
 78		client.InNamespace(postgres.Namespace),
 79		client.MatchingLabels(labelsForPostgreSQL(postgres.Name)),
 80	}
 81	if err = r.List(ctx, podList, listOpts); err != nil {
 82		logger.Error(err, "Failed to list pods", "PostgreSQL.Namespace", postgres.Namespace, "PostgreSQL.Name", postgres.Name)
 83		return ctrl.Result{}, err
 84	}
 85
 86	podNames := getPodNames(podList.Items)
 87
 88	// 5. Update status if needed
 89	if !reflect.DeepEqual(podNames, postgres.Status.Nodes) {
 90		postgres.Status.Nodes = podNames
 91		err := r.Status().Update(ctx, postgres)
 92		if err != nil {
 93			logger.Error(err, "Failed to update PostgreSQL status")
 94			return ctrl.Result{}, err
 95		}
 96	}
 97
 98	return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
 99}
100
101// statefulSetForPostgreSQL creates a StatefulSet for PostgreSQL
102func statefulSetForPostgreSQL(p *databasev1alpha1.PostgreSQL) *appsv1.StatefulSet {
103	labels := labelsForPostgreSQL(p.Name)
104
105	// Default resource requirements if not specified
106	resources := p.Spec.Resources
107	if resources.Requests == nil {
108		resources = corev1.ResourceRequirements{
109			Requests: corev1.ResourceList{
110				corev1.ResourceCPU:    resource.MustParse("500m"),
111				corev1.ResourceMemory: resource.MustParse("1Gi"),
112			},
113		}
114	}
115
116	// Storage class for PVCs
117	storageClass := p.Spec.StorageClassName
118	if storageClass == "" {
119		storageClass = "standard"  // Default storage class
120	}
121
122	replicas := p.Spec.Replicas
123
124	return &appsv1.StatefulSet{
125		ObjectMeta: metav1.ObjectMeta{
126			Name:      p.Name,
127			Namespace: p.Namespace,
128		},
129		Spec: appsv1.StatefulSetSpec{
130			Replicas: &replicas,
131			Selector: &metav1.LabelSelector{
132				MatchLabels: labels,
133			},
134			Template: corev1.PodTemplateSpec{
135				ObjectMeta: metav1.ObjectMeta{
136					Labels: labels,
137				},
138				Spec: corev1.PodSpec{
139					Containers: []corev1.Container{{
140						Image:   fmt.Sprintf("postgres:%s", p.Spec.Version),
141						Name:    "postgresql",
142						Ports: []corev1.ContainerPort{{
143							ContainerPort: 5432,
144							Name:          "postgresql",
145						}},
146						Env: []corev1.EnvVar{
147							{
148								Name:  "POSTGRES_DB",
149								Value: "postgresdb",
150							},
151							{
152								Name:  "POSTGRES_USER",
153								Value: "postgresadmin",
154							},
155							{
156								Name:  "POSTGRES_PASSWORD",
157								Value: "admin123",
158							},
159						},
160						Resources: resources,
161						VolumeMounts: []corev1.VolumeMount{
162							{
163								Name:      "postgres-storage",
164								MountPath: "/var/lib/postgresql/data",
165							},
166						},
167						ReadinessProbe: &corev1.Probe{
168							ProbeHandler: corev1.ProbeHandler{
169								Exec: &corev1.ExecAction{
170									Command: []string{"pg_isready"},
171								},
172							},
173							InitialDelaySeconds: 5,
174							PeriodSeconds:       10,
175						},
176						LivenessProbe: &corev1.Probe{
177							ProbeHandler: corev1.ProbeHandler{
178								Exec: &corev1.ExecAction{
179									Command: []string{"pg_isready"},
180								},
181							},
182							InitialDelaySeconds: 30,
183							PeriodSeconds:       30,
184						},
185					}},
186				},
187			},
188			VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
189				ObjectMeta: metav1.ObjectMeta{
190					Name: "postgres-storage",
191				},
192				Spec: corev1.PersistentVolumeClaimSpec{
193					AccessModes:      []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
194					StorageClassName: &storageClass,
195					Resources: corev1.ResourceRequirements{
196						Requests: corev1.ResourceList{
197							corev1.ResourceStorage: resource.MustParse("10Gi"),
198						},
199					},
200				},
201			}},
202			ServiceName: p.Name,
203		},
204	}
205}
206
207// labelsForPostgreSQL returns the labels for selecting the resources
208func labelsForPostgreSQL(name string) map[string]string {
209	return map[string]string{"app": "postgresql", "postgresql_cr": name}
210}
211
212// getPodNames returns the pod names of the array of pods passed in
213func getPodNames(pods []corev1.Pod) []string {
214	var podNames []string
215	for _, pod := range pods {
216		podNames = append(podNames, pod.Name)
217	}
218	return podNames
219}

Common Patterns and Pitfalls

Pattern 1: Status Management

Always maintain accurate status information - it's critical for debugging:

 1// Update status conditions appropriately
 2func updateStatus(ctx context.Context, postgres *databasev1alpha1.PostgreSQL, conditionType, status, reason, message string) error {
 3	// Find or create the condition
 4	var condition *metav1.Condition
 5	for i := range postgres.Status.Conditions {
 6		if postgres.Status.Conditions[i].Type == conditionType {
 7			condition = &postgres.Status.Conditions[i]
 8			break
 9		}
10	}
11
12	if condition == nil {
13		// New condition
14		condition = &metav1.Condition{
15			Type:   conditionType,
16			Status: metav1.ConditionStatus(status),
17		}
18		postgres.Status.Conditions = append(postgres.Status.Conditions, *condition)
19	} else {
20		// Update existing condition
21		condition.Status = metav1.ConditionStatus(status)
22	}
23
24	condition.Reason = reason
25	condition.Message = message
26	condition.LastTransitionTime = metav1.Now()
27
28	return r.Status().Update(ctx, postgres)
29}

Pattern 2: Event Recording

Record events for significant actions - they appear in kubectl describe:

1// Record events for visibility
2r.Recorder.Eventf(postgres, corev1.EventTypeNormal, "Created",
3    "Created StatefulSet %s", statefulSet.Name)
4
5r.Recorder.Eventf(postgres, corev1.EventTypeWarning, "FailedCreate",
6    "Failed to create StatefulSet: %v", err)

Pattern 3: Owner References

Ensure proper cleanup when the CR is deleted:

1// Set owner reference for garbage collection
2controllerutil.SetControllerReference(postgres, statefulSet, r.Scheme)

Common Pitfalls

1. Infinite Reconciliation Loops

1// BAD: Always requeue without checking if changes are needed
2return ctrl.Result{Requeue: true}, nil
3
4// GOOD: Only requeue if there are pending changes
5if needsReconciliation {
6    return ctrl.Result{RequeueAfter: time.Minute}, nil
7}
8return ctrl.Result{}, nil

2. Missing Error Handling

 1// BAD: Ignoring errors from Get operations
 2r.Get(ctx, req.NamespacedName, postgres) // Ignored error
 3
 4// GOOD: Always handle errors properly
 5err := r.Get(ctx, req.NamespacedName, postgres)
 6if err != nil {
 7    if errors.IsNotFound(err) {
 8        return ctrl.Result{}, nil  // Object deleted, nothing to do
 9    }
10    return ctrl.Result{}, err  // Real error, requeue
11}

3. Inefficient Reconciliation

1// BAD: Making many API calls in reconciliation loop
2pod1 := r.getPod(...)
3pod2 := r.getPod(...)
4pod3 := r.getPod(...)
5
6// GOOD: Use List with selectors to fetch multiple resources
7podList := &corev1.PodList{}
8r.List(ctx, podList, client.InNamespace(ns), client.MatchingLabels(labels))

Integration and Mastery

Testing Your Operator

Unit Testing with Envtest:

 1// controllers/postgresql_controller_test.go
 2
 3package controllers
 4
 5import (
 6	"context"
 7	"testing"
 8	"time"
 9
10	. "github.com/onsi/ginkgo/v2"
11	. "github.com/onsi/gomega"
12	"k8s.io/apimachinery/pkg/api/resource"
13	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14	"k8s.io/apimachinery/pkg/types"
15
16	databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
17)
18
19var _ = Describe("PostgreSQL Controller", func() {
20	Context("When creating PostgreSQL resource", func() {
21		It("Should create StatefulSet with correct spec", func() {
22			ctx := context.Background()
23
24			// Create a PostgreSQL resource
25			postgres := &databasev1alpha1.PostgreSQL{
26				ObjectMeta: metav1.ObjectMeta{
27					Name:      "test-postgres",
28					Namespace: "default",
29				},
30				Spec: databasev1alpha1.PostgreSQLSpec{
31					Replicas: 3,
32					Version:  "15.4",
33					BackupSchedule: "0 2 * * *",
34					Resources: corev1.ResourceRequirements{
35						Requests: corev1.ResourceList{
36							corev1.ResourceCPU:    resource.MustParse("1"),
37							corev1.ResourceMemory: resource.MustParse("2Gi"),
38						},
39					},
40				},
41			}
42
43			Expect(k8sClient.Create(ctx, postgres)).To(Succeed())
44
45			// Wait for StatefulSet to be created
46			statefulSet := &appsv1.StatefulSet{}
47			Eventually(func() error {
48				return k8sClient.Get(ctx, types.NamespacedName{
49					Name: postgres.Name, Namespace: postgres.Namespace
50				}, statefulSet)
51			}, time.Second*10, time.Millisecond*250).Should(Succeed())
52
53			Expect(*statefulSet.Spec.Replicas).To(Equal(int32(3)))
54			Expect(statefulSet.Spec.Template.Spec.Containers[0].Image).To(Equal("postgres:15.4"))
55		})
56	})
57})
58
59func TestPostgreSQL(t *testing.T) {
60	RegisterFailHandler(Fail)
61	RunSpecs(t, "Controller Suite")
62}

Production Deployment Patterns

1. Leader Election for High Availability:

 1// In main.go
 2func main() {
 3    // ...
 4    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
 5        Scheme:             scheme,
 6        MetricsBindAddress: metricsAddr,
 7        Port:               9443,
 8        LeaderElection:     true,  // Enable leader election
 9        LeaderElectionID:   "database-operator.mycompany.com",
10    })
11    // ...
12}

2. Observability Integration:

 1// Add metrics to your controller
 2var (
 3    // Reconciliation metrics
 4    reconcileCounter = prometheus.NewCounterVec(
 5        prometheus.CounterOpts{
 6            Name: "postgresql_reconcile_total",
 7            Help: "Total number of PostgreSQL reconciliations",
 8        },
 9        []string{"namespace", "name", "success"},
10    )
11
12    reconcileDuration = prometheus.NewHistogramVec(
13        prometheus.HistogramOpts{
14            Name: "postgresql_reconcile_duration_seconds",
15            Help: "Duration of PostgreSQL reconciliations",
16        },
17        []string{"namespace", "name"},
18    )
19)
20
21// In your Reconcile function
22start := time.Now()
23defer func() {
24    duration := time.Since(start).Seconds()
25    reconcileDuration.WithLabelValues(req.Namespace, req.Name).Observe(duration)
26}()

3. Graceful Shutdown:

1// Handle shutdown signals gracefully
2ctx := ctrl.SetupSignalHandler()
3
4if err := mgr.Start(ctx); err != nil {
5    setupLog.Error(err, "problem running manager")
6    os.Exit(1)
7}

Production-Ready Operator Development

Comprehensive Monitoring and Metrics

Production operators need deep observability for debugging and performance tracking:

 1// pkg/metrics/metrics.go - Operator metrics implementation
 2
 3package metrics
 4
 5import (
 6    "github.com/prometheus/client_golang/prometheus"
 7    "sigs.k8s.io/controller-runtime/pkg/metrics"
 8)
 9
10var (
11    // ReconciliationTotal tracks total reconciliation attempts
12    ReconciliationTotal = prometheus.NewCounterVec(
13        prometheus.CounterOpts{
14            Name: "operator_reconciliation_total",
15            Help: "Total number of reconciliations per resource",
16        },
17        []string{"namespace", "name", "result"},
18    )
19
20    // ReconciliationDuration tracks reconciliation latency
21    ReconciliationDuration = prometheus.NewHistogramVec(
22        prometheus.HistogramOpts{
23            Name:    "operator_reconciliation_duration_seconds",
24            Help:    "Time spent in reconciliation loop",
25            Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0},
26        },
27        []string{"namespace", "name"},
28    )
29
30    // ResourcesManaged tracks currently managed resources
31    ResourcesManaged = prometheus.NewGaugeVec(
32        prometheus.GaugeOpts{
33            Name: "operator_resources_managed",
34            Help: "Number of resources currently managed",
35        },
36        []string{"namespace", "status"},
37    )
38
39    // APICallsTotal tracks Kubernetes API calls
40    APICallsTotal = prometheus.NewCounterVec(
41        prometheus.CounterOpts{
42            Name: "operator_api_calls_total",
43            Help: "Total number of Kubernetes API calls",
44        },
45        []string{"method", "resource", "result"},
46    )
47
48    // QueueDepth tracks workqueue depth
49    QueueDepth = prometheus.NewGaugeVec(
50        prometheus.GaugeOpts{
51            Name: "operator_queue_depth",
52            Help: "Current depth of the workqueue",
53        },
54        []string{"name"},
55    )
56)
57
58// init registers metrics with controller-runtime
59func init() {
60    metrics.Registry.MustRegister(
61        ReconciliationTotal,
62        ReconciliationDuration,
63        ResourcesManaged,
64        APICallsTotal,
65        QueueDepth,
66    )
67}
68
69// RecordReconciliation records a reconciliation attempt
70func RecordReconciliation(namespace, name, result string, duration float64) {
71    ReconciliationTotal.WithLabelValues(namespace, name, result).Inc()
72    ReconciliationDuration.WithLabelValues(namespace, name).Observe(duration)
73}
74
75// UpdateResourceCount updates the managed resource count
76func UpdateResourceCount(namespace, status string, count float64) {
77    ResourcesManaged.WithLabelValues(namespace, status).Set(count)
78}
79
80// RecordAPICall records a Kubernetes API call
81func RecordAPICall(method, resource, result string) {
82    APICallsTotal.WithLabelValues(method, resource, result).Inc()
83}
84
85// UpdateQueueDepth updates workqueue depth metric
86func UpdateQueueDepth(queueName string, depth int) {
87    QueueDepth.WithLabelValues(queueName).Set(float64(depth))
88}

Using metrics in your reconciler:

 1// controllers/postgresql_controller.go - With metrics
 2
 3package controllers
 4
 5import (
 6    "context"
 7    "time"
 8
 9    ctrl "sigs.k8s.io/controller-runtime"
10
11    "mycompany.com/database-operator/pkg/metrics"
12)
13
14func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
15    startTime := time.Now()
16    logger := log.FromContext(ctx)
17
18    // Track reconciliation
19    defer func() {
20        duration := time.Since(startTime).Seconds()
21        result := "success"
22        if err != nil {
23            result = "error"
24        }
25        metrics.RecordReconciliation(req.Namespace, req.Name, result, duration)
26    }()
27
28    // Your reconciliation logic here
29    logger.Info("Reconciling PostgreSQL", "namespace", req.Namespace, "name", req.Name)
30
31    // Track API calls
32    metrics.RecordAPICall("GET", "PostgreSQL", "success")
33
34    // Update resource counts
35    metrics.UpdateResourceCount(req.Namespace, "ready", 1)
36
37    return ctrl.Result{}, nil
38}

Advanced Error Handling and Recovery

Robust operators need sophisticated error handling:

 1// pkg/errors/errors.go - Operator error handling
 2
 3package errors
 4
 5import (
 6    "fmt"
 7    "time"
 8
 9    ctrl "sigs.k8s.io/controller-runtime"
10)
11
12// OperatorError represents an error with retry behavior
13type OperatorError struct {
14    Err        error
15    Retriable  bool
16    RetryAfter time.Duration
17    Reason     string
18}
19
20// Error implements the error interface
21func (e *OperatorError) Error() string {
22    return fmt.Sprintf("%s: %v (retriable: %v)", e.Reason, e.Err, e.Retriable)
23}
24
25// NewRetriableError creates an error that should be retried
26func NewRetriableError(err error, retryAfter time.Duration, reason string) *OperatorError {
27    return &OperatorError{
28        Err:        err,
29        Retriable:  true,
30        RetryAfter: retryAfter,
31        Reason:     reason,
32    }
33}
34
35// NewPermanentError creates an error that should not be retried
36func NewPermanentError(err error, reason string) *OperatorError {
37    return &OperatorError{
38        Err:       err,
39        Retriable: false,
40        Reason:    reason,
41    }
42}
43
44// HandleReconcileError converts operator errors to reconcile results
45func HandleReconcileError(err error) (ctrl.Result, error) {
46    if err == nil {
47        return ctrl.Result{}, nil
48    }
49
50    // Check if it's an OperatorError
51    if opErr, ok := err.(*OperatorError); ok {
52        if opErr.Retriable {
53            // Retriable error - requeue after delay
54            return ctrl.Result{
55                Requeue:      true,
56                RequeueAfter: opErr.RetryAfter,
57            }, nil
58        }
59        // Permanent error - don't requeue
60        return ctrl.Result{}, nil
61    }
62
63    // Unknown error - requeue with exponential backoff
64    return ctrl.Result{}, err
65}
66
67// Example error scenarios
68var (
69    ErrResourceNotReady = NewRetriableError(
70        fmt.Errorf("resource not ready"),
71        30*time.Second,
72        "WaitingForDependencies",
73    )
74
75    ErrInvalidConfiguration = NewPermanentError(
76        fmt.Errorf("invalid configuration"),
77        "InvalidSpec",
78    )
79
80    ErrAPIServerUnavailable = NewRetriableError(
81        fmt.Errorf("API server unavailable"),
82        10*time.Second,
83        "APIServerError",
84    )
85)

Using error handling in reconciliation:

 1func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 2    logger := log.FromContext(ctx)
 3
 4    // Fetch the PostgreSQL instance
 5    postgres := &databasev1alpha1.PostgreSQL{}
 6    if err := r.Get(ctx, req.NamespacedName, postgres); err != nil {
 7        if errors.IsNotFound(err) {
 8            return ctrl.Result{}, nil
 9        }
10        return errors.HandleReconcileError(
11            errors.NewRetriableError(err, 5*time.Second, "FailedToFetch"),
12        )
13    }
14
15    // Validate configuration
16    if err := r.validateConfiguration(postgres); err != nil {
17        logger.Error(err, "Invalid configuration")
18        return errors.HandleReconcileError(
19            errors.NewPermanentError(err, "InvalidConfiguration"),
20        )
21    }
22
23    // Check dependencies
24    if err := r.checkDependencies(ctx, postgres); err != nil {
25        logger.Info("Dependencies not ready, will retry")
26        return errors.HandleReconcileError(
27            errors.NewRetriableError(err, 30*time.Second, "DependenciesNotReady"),
28        )
29    }
30
31    // Continue with reconciliation...
32    return ctrl.Result{}, nil
33}

Multi-Tenant Operator Design

Building operators that safely handle multiple tenants:

  1// pkg/multitenancy/isolation.go - Multi-tenant isolation
  2
  3package multitenancy
  4
  5import (
  6    "context"
  7    "fmt"
  8
  9    corev1 "k8s.io/api/core/v1"
 10    rbacv1 "k8s.io/api/rbac/v1"
 11    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 12    "sigs.k8s.io/controller-runtime/pkg/client"
 13)
 14
 15// TenantManager handles multi-tenant resource isolation
 16type TenantManager struct {
 17    client.Client
 18}
 19
 20// EnsureTenantIsolation ensures proper RBAC and namespace isolation
 21func (tm *TenantManager) EnsureTenantIsolation(ctx context.Context, tenantName string) error {
 22    // Create dedicated namespace for tenant
 23    namespace := &corev1.Namespace{
 24        ObjectMeta: metav1.ObjectMeta{
 25            Name: fmt.Sprintf("tenant-%s", tenantName),
 26            Labels: map[string]string{
 27                "tenant":     tenantName,
 28                "managed-by": "database-operator",
 29            },
 30        },
 31    }
 32
 33    if err := tm.Create(ctx, namespace); err != nil {
 34        return fmt.Errorf("failed to create namespace: %w", err)
 35    }
 36
 37    // Create ServiceAccount for tenant
 38    sa := &corev1.ServiceAccount{
 39        ObjectMeta: metav1.ObjectMeta{
 40            Name:      fmt.Sprintf("%s-operator", tenantName),
 41            Namespace: namespace.Name,
 42        },
 43    }
 44
 45    if err := tm.Create(ctx, sa); err != nil {
 46        return fmt.Errorf("failed to create service account: %w", err)
 47    }
 48
 49    // Create Role for tenant-specific permissions
 50    role := &rbacv1.Role{
 51        ObjectMeta: metav1.ObjectMeta{
 52            Name:      fmt.Sprintf("%s-operator-role", tenantName),
 53            Namespace: namespace.Name,
 54        },
 55        Rules: []rbacv1.PolicyRule{
 56            {
 57                APIGroups: []string{"database.mycompany.com"},
 58                Resources: []string{"postgresqls"},
 59                Verbs:     []string{"get", "list", "watch", "create", "update", "patch", "delete"},
 60            },
 61            {
 62                APIGroups: []string{""},
 63                Resources: []string{"pods", "services", "configmaps", "secrets"},
 64                Verbs:     []string{"get", "list", "watch", "create", "update", "patch", "delete"},
 65            },
 66        },
 67    }
 68
 69    if err := tm.Create(ctx, role); err != nil {
 70        return fmt.Errorf("failed to create role: %w", err)
 71    }
 72
 73    // Create RoleBinding
 74    roleBinding := &rbacv1.RoleBinding{
 75        ObjectMeta: metav1.ObjectMeta{
 76            Name:      fmt.Sprintf("%s-operator-binding", tenantName),
 77            Namespace: namespace.Name,
 78        },
 79        RoleRef: rbacv1.RoleRef{
 80            APIGroup: "rbac.authorization.k8s.io",
 81            Kind:     "Role",
 82            Name:     role.Name,
 83        },
 84        Subjects: []rbacv1.Subject{
 85            {
 86                Kind:      "ServiceAccount",
 87                Name:      sa.Name,
 88                Namespace: namespace.Name,
 89            },
 90        },
 91    }
 92
 93    if err := tm.Create(ctx, roleBinding); err != nil {
 94        return fmt.Errorf("failed to create role binding: %w", err)
 95    }
 96
 97    return nil
 98}
 99
100// EnforceResourceQuotas sets resource limits for tenants
101func (tm *TenantManager) EnforceResourceQuotas(ctx context.Context, tenantName string, limits map[string]string) error {
102    namespace := fmt.Sprintf("tenant-%s", tenantName)
103
104    quota := &corev1.ResourceQuota{
105        ObjectMeta: metav1.ObjectMeta{
106            Name:      fmt.Sprintf("%s-quota", tenantName),
107            Namespace: namespace,
108        },
109        Spec: corev1.ResourceQuotaSpec{
110            Hard: corev1.ResourceList{},
111        },
112    }
113
114    // Apply tenant-specific limits
115    for resource, limit := range limits {
116        quota.Spec.Hard[corev1.ResourceName(resource)] = resource.MustParse(limit)
117    }
118
119    if err := tm.Create(ctx, quota); err != nil {
120        return fmt.Errorf("failed to create resource quota: %w", err)
121    }
122
123    return nil
124}

Disaster Recovery and Backup Integration

Operators should handle backup and recovery automatically:

  1// pkg/backup/manager.go - Backup and recovery management
  2
  3package backup
  4
  5import (
  6    "context"
  7    "fmt"
  8    "time"
  9
 10    batchv1 "k8s.io/api/batch/v1"
 11    corev1 "k8s.io/api/core/v1"
 12    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 13    "sigs.k8s.io/controller-runtime/pkg/client"
 14
 15    databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
 16)
 17
 18// BackupManager handles database backups
 19type BackupManager struct {
 20    client.Client
 21}
 22
 23// CreateBackupJob creates a Kubernetes Job for database backup
 24func (bm *BackupManager) CreateBackupJob(
 25    ctx context.Context,
 26    postgres *databasev1alpha1.PostgreSQL,
 27) error {
 28    backupName := fmt.Sprintf("%s-backup-%d", postgres.Name, time.Now().Unix())
 29
 30    job := &batchv1.Job{
 31        ObjectMeta: metav1.ObjectMeta{
 32            Name:      backupName,
 33            Namespace: postgres.Namespace,
 34            Labels: map[string]string{
 35                "app":        "postgresql-backup",
 36                "database":   postgres.Name,
 37                "managed-by": "database-operator",
 38            },
 39        },
 40        Spec: batchv1.JobSpec{
 41            Template: corev1.PodTemplateSpec{
 42                Spec: corev1.PodSpec{
 43                    RestartPolicy: corev1.RestartPolicyNever,
 44                    Containers: []corev1.Container{
 45                        {
 46                            Name:  "backup",
 47                            Image: "postgres:15.4",
 48                            Command: []string{
 49                                "/bin/bash",
 50                                "-c",
 51                                fmt.Sprintf(`
 52                                    pg_dump -h %s-primary -U postgres -d postgresdb > /backup/dump.sql
 53                                    gzip /backup/dump.sql
 54                                    aws s3 cp /backup/dump.sql.gz s3://%s/%s/backup-$(date +%%Y%%m%%d-%%H%%M%%S).sql.gz
 55                                `, postgres.Name, postgres.Spec.BackupSpec.S3Bucket, postgres.Name),
 56                            },
 57                            Env: []corev1.EnvVar{
 58                                {
 59                                    Name: "PGPASSWORD",
 60                                    ValueFrom: &corev1.EnvVarSource{
 61                                        SecretKeyRef: &corev1.SecretKeySelector{
 62                                            LocalObjectReference: corev1.LocalObjectReference{
 63                                                Name: fmt.Sprintf("%s-credentials", postgres.Name),
 64                                            },
 65                                            Key: "password",
 66                                        },
 67                                    },
 68                                },
 69                                {
 70                                    Name:  "AWS_REGION",
 71                                    Value: postgres.Spec.BackupSpec.S3Region,
 72                                },
 73                            },
 74                            VolumeMounts: []corev1.VolumeMount{
 75                                {
 76                                    Name:      "backup-volume",
 77                                    MountPath: "/backup",
 78                                },
 79                            },
 80                        },
 81                    },
 82                    Volumes: []corev1.Volume{
 83                        {
 84                            Name: "backup-volume",
 85                            VolumeSource: corev1.VolumeSource{
 86                                EmptyDir: &corev1.EmptyDirVolumeSource{},
 87                            },
 88                        },
 89                    },
 90                },
 91            },
 92            BackoffLimit: int32Ptr(3),
 93        },
 94    }
 95
 96    if err := bm.Create(ctx, job); err != nil {
 97        return fmt.Errorf("failed to create backup job: %w", err)
 98    }
 99
100    // Update status with backup time
101    postgres.Status.LastBackupTime = &metav1.Time{Time: time.Now()}
102    if err := bm.Status().Update(ctx, postgres); err != nil {
103        return fmt.Errorf("failed to update status: %w", err)
104    }
105
106    return nil
107}
108
109// CreateBackupCronJob creates scheduled backups
110func (bm *BackupManager) CreateBackupCronJob(
111    ctx context.Context,
112    postgres *databasev1alpha1.PostgreSQL,
113) error {
114    cronJob := &batchv1.CronJob{
115        ObjectMeta: metav1.ObjectMeta{
116            Name:      fmt.Sprintf("%s-backup-cronjob", postgres.Name),
117            Namespace: postgres.Namespace,
118        },
119        Spec: batchv1.CronJobSpec{
120            Schedule: postgres.Spec.BackupSchedule,
121            JobTemplate: batchv1.JobTemplateSpec{
122                Spec: batchv1.JobSpec{
123                    Template: corev1.PodTemplateSpec{
124                        Spec: corev1.PodSpec{
125                            RestartPolicy: corev1.RestartPolicyNever,
126                            Containers: []corev1.Container{
127                                {
128                                    Name:  "backup",
129                                    Image: "postgres:15.4",
130                                    Command: []string{"/bin/bash", "-c", "pg_dump ..."},
131                                },
132                            },
133                        },
134                    },
135                },
136            },
137            SuccessfulJobsHistoryLimit: int32Ptr(3),
138            FailedJobsHistoryLimit:     int32Ptr(1),
139        },
140    }
141
142    if err := bm.Create(ctx, cronJob); err != nil {
143        return fmt.Errorf("failed to create backup cronjob: %w", err)
144    }
145
146    return nil
147}
148
149// RestoreFromBackup restores database from S3 backup
150func (bm *BackupManager) RestoreFromBackup(
151    ctx context.Context,
152    postgres *databasev1alpha1.PostgreSQL,
153    backupTimestamp string,
154) error {
155    restoreName := fmt.Sprintf("%s-restore-%d", postgres.Name, time.Now().Unix())
156
157    job := &batchv1.Job{
158        ObjectMeta: metav1.ObjectMeta{
159            Name:      restoreName,
160            Namespace: postgres.Namespace,
161        },
162        Spec: batchv1.JobSpec{
163            Template: corev1.PodTemplateSpec{
164                Spec: corev1.PodSpec{
165                    RestartPolicy: corev1.RestartPolicyNever,
166                    Containers: []corev1.Container{
167                        {
168                            Name:  "restore",
169                            Image: "postgres:15.4",
170                            Command: []string{
171                                "/bin/bash",
172                                "-c",
173                                fmt.Sprintf(`
174                                    aws s3 cp s3://%s/%s/backup-%s.sql.gz /tmp/backup.sql.gz
175                                    gunzip /tmp/backup.sql.gz
176                                    psql -h %s-primary -U postgres -d postgresdb < /tmp/backup.sql
177                                `, postgres.Spec.BackupSpec.S3Bucket, postgres.Name, backupTimestamp, postgres.Name),
178                            },
179                        },
180                    },
181                },
182            },
183        },
184    }
185
186    if err := bm.Create(ctx, job); err != nil {
187        return fmt.Errorf("failed to create restore job: %w", err)
188    }
189
190    return nil
191}
192
193func int32Ptr(i int32) *int32 {
194    return &i
195}

Practice Exercises

Exercise 1: Build a ConfigMap Operator

Objective: Create an operator that manages ConfigMaps with automatic reload of applications when configuration changes.

Requirements:

  1. Define a ConfigMapWatcher custom resource
  2. Implement reconciliation logic that watches ConfigMap changes
  3. Trigger rolling restart of pods when configuration changes
  4. Add status tracking for reload operations
  5. Include metrics for configuration reloads

Implementation:

 1// api/v1alpha1/configmapwatcher_types.go
 2package v1alpha1
 3
 4import (
 5    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 6)
 7
 8// ConfigMapWatcherSpec defines the desired state
 9type ConfigMapWatcherSpec struct {
10    // Source ConfigMap to watch
11    // +kubebuilder:validation:Required
12    SourceConfigMap string `json:"sourceConfigMap"`
13
14    // Target deployments to reload
15    // +kubebuilder:validation:Required
16    // +kubebuilder:validation:MinItems=1
17    TargetDeployments []string `json:"targetDeployments"`
18
19    // Reload strategy
20    // +kubebuilder:validation:Enum=rolling;recreate
21    // +kubebuilder:default=rolling
22    ReloadStrategy string `json:"reloadStrategy,omitempty"`
23
24    // Check interval in seconds
25    // +kubebuilder:validation:Minimum=10
26    // +kubebuilder:default=60
27    CheckInterval int32 `json:"checkInterval,omitempty"`
28}
29
30// ConfigMapWatcherStatus defines the observed state
31type ConfigMapWatcherStatus struct {
32    // Last observed ConfigMap version
33    LastConfigVersion string `json:"lastConfigVersion,omitempty"`
34
35    // Last reload timestamp
36    LastReloadTime *metav1.Time `json:"lastReloadTime,omitempty"`
37
38    // Number of successful reloads
39    ReloadCount int32 `json:"reloadCount"`
40
41    // Status conditions
42    Conditions []metav1.Condition `json:"conditions,omitempty"`
43}
44
45//+kubebuilder:object:root=true
46//+kubebuilder:subresource:status
47//+kubebuilder:printcolumn:name="Source",type="string",JSONPath=".spec.sourceConfigMap"
48//+kubebuilder:printcolumn:name="Reloads",type="integer",JSONPath=".status.reloadCount"
49//+kubebuilder:printcolumn:name="Last Reload",type="date",JSONPath=".status.lastReloadTime"
50
51type ConfigMapWatcher struct {
52    metav1.TypeMeta   `json:",inline"`
53    metav1.ObjectMeta `json:"metadata,omitempty"`
54
55    Spec   ConfigMapWatcherSpec   `json:"spec,omitempty"`
56    Status ConfigMapWatcherStatus `json:"status,omitempty"`
57}
58
59//+kubebuilder:object:root=true
60
61type ConfigMapWatcherList struct {
62    metav1.TypeMeta `json:",inline"`
63    metav1.ListMeta `json:"metadata,omitempty"`
64    Items           []ConfigMapWatcher `json:"items"`
65}

Reconciliation Controller:

  1// controllers/configmapwatcher_controller.go
  2package controllers
  3
  4import (
  5    "context"
  6    "crypto/sha256"
  7    "encoding/hex"
  8    "fmt"
  9    "time"
 10
 11    appsv1 "k8s.io/api/apps/v1"
 12    corev1 "k8s.io/api/core/v1"
 13    "k8s.io/apimachinery/pkg/api/errors"
 14    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 15    "k8s.io/apimachinery/pkg/types"
 16    ctrl "sigs.k8s.io/controller-runtime"
 17    "sigs.k8s.io/controller-runtime/pkg/client"
 18    "sigs.k8s.io/controller-runtime/pkg/log"
 19
 20    configv1alpha1 "mycompany.com/config-operator/api/v1alpha1"
 21)
 22
 23type ConfigMapWatcherReconciler struct {
 24    client.Client
 25    Scheme *runtime.Scheme
 26}
 27
 28func (r *ConfigMapWatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 29    logger := log.FromContext(ctx)
 30
 31    // Fetch the ConfigMapWatcher
 32    watcher := &configv1alpha1.ConfigMapWatcher{}
 33    if err := r.Get(ctx, req.NamespacedName, watcher); err != nil {
 34        if errors.IsNotFound(err) {
 35            return ctrl.Result{}, nil
 36        }
 37        return ctrl.Result{}, err
 38    }
 39
 40    // Fetch the source ConfigMap
 41    configMap := &corev1.ConfigMap{}
 42    err := r.Get(ctx, types.NamespacedName{
 43        Name:      watcher.Spec.SourceConfigMap,
 44        Namespace: watcher.Namespace,
 45    }, configMap)
 46    if err != nil {
 47        logger.Error(err, "Failed to get source ConfigMap")
 48        return ctrl.Result{RequeueAfter: time.Second * 30}, err
 49    }
 50
 51    // Calculate current config hash
 52    currentHash := r.calculateConfigHash(configMap)
 53
 54    // Check if configuration changed
 55    if watcher.Status.LastConfigVersion != currentHash {
 56        logger.Info("Configuration changed, triggering reload",
 57            "oldHash", watcher.Status.LastConfigVersion,
 58            "newHash", currentHash)
 59
 60        // Reload target deployments
 61        if err := r.reloadDeployments(ctx, watcher); err != nil {
 62            logger.Error(err, "Failed to reload deployments")
 63            return ctrl.Result{RequeueAfter: time.Second * 30}, err
 64        }
 65
 66        // Update status
 67        watcher.Status.LastConfigVersion = currentHash
 68        watcher.Status.LastReloadTime = &metav1.Time{Time: time.Now()}
 69        watcher.Status.ReloadCount++
 70
 71        if err := r.Status().Update(ctx, watcher); err != nil {
 72            return ctrl.Result{}, err
 73        }
 74
 75        logger.Info("Reload completed successfully")
 76    }
 77
 78    // Requeue after check interval
 79    interval := time.Duration(watcher.Spec.CheckInterval) * time.Second
 80    return ctrl.Result{RequeueAfter: interval}, nil
 81}
 82
 83// calculateConfigHash computes hash of ConfigMap data
 84func (r *ConfigMapWatcherReconciler) calculateConfigHash(cm *corev1.ConfigMap) string {
 85    h := sha256.New()
 86    for k, v := range cm.Data {
 87        h.Write([]byte(k + ":" + v))
 88    }
 89    for k, v := range cm.BinaryData {
 90        h.Write([]byte(k))
 91        h.Write(v)
 92    }
 93    return hex.EncodeToString(h.Sum(nil))
 94}
 95
 96// reloadDeployments triggers reload of target deployments
 97func (r *ConfigMapWatcherReconciler) reloadDeployments(ctx context.Context, watcher *configv1alpha1.ConfigMapWatcher) error {
 98    for _, depName := range watcher.Spec.TargetDeployments {
 99        deployment := &appsv1.Deployment{}
100        err := r.Get(ctx, types.NamespacedName{
101            Name:      depName,
102            Namespace: watcher.Namespace,
103        }, deployment)
104        if err != nil {
105            return fmt.Errorf("failed to get deployment %s: %w", depName, err)
106        }
107
108        // Trigger rolling restart by updating annotation
109        if deployment.Spec.Template.Annotations == nil {
110            deployment.Spec.Template.Annotations = make(map[string]string)
111        }
112        deployment.Spec.Template.Annotations["configmap-watcher/reloaded-at"] = time.Now().Format(time.RFC3339)
113
114        if err := r.Update(ctx, deployment); err != nil {
115            return fmt.Errorf("failed to update deployment %s: %w", depName, err)
116        }
117    }
118
119    return nil
120}
121
122func (r *ConfigMapWatcherReconciler) SetupWithManager(mgr ctrl.Manager) error {
123    return ctrl.NewControllerManagedBy(mgr).
124        For(&configv1alpha1.ConfigMapWatcher{}).
125        Owns(&appsv1.Deployment{}).
126        Complete(r)
127}

Testing: Write tests to verify config changes trigger reloads correctly.

Exercise 2: Implement Advanced Backup and Restore

Objective: Add comprehensive backup functionality to your PostgreSQL operator with S3 integration, point-in-time recovery, and backup verification.

Requirements:

  1. Automated scheduled backups with CronJob
  2. Point-in-time recovery capability
  3. Backup verification after creation
  4. Retention policy enforcement
  5. Multi-region backup support
  6. Backup/restore status tracking

Implementation:

 1// api/v1alpha1/postgresql_types.go - Enhanced backup specification
 2
 3package v1alpha1
 4
 5import (
 6    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 7)
 8
 9// BackupSpec defines backup configuration
10type BackupSpec struct {
11    // Enable automated backups
12    // +kubebuilder:default=true
13    Enabled bool `json:"enabled"`
14
15    // Backup schedule in cron format
16    // +kubebuilder:validation:Required
17    Schedule string `json:"schedule"`
18
19    // S3 bucket for backups
20    // +kubebuilder:validation:Required
21    S3Bucket string `json:"s3Bucket"`
22
23    // S3 region
24    // +kubebuilder:validation:Required
25    S3Region string `json:"s3Region"`
26
27    // Retention in days
28    // +kubebuilder:validation:Minimum=1
29    // +kubebuilder:validation:Maximum=365
30    // +kubebuilder:default=30
31    RetentionDays int `json:"retentionDays"`
32
33    // Enable point-in-time recovery
34    // +kubebuilder:default=false
35    PITREnabled bool `json:"pitrEnabled,omitempty"`
36
37    // Backup verification enabled
38    // +kubebuilder:default=true
39    VerifyBackup bool `json:"verifyBackup"`
40
41    // Compression level (1-9)
42    // +kubebuilder:validation:Minimum=1
43    // +kubebuilder:validation:Maximum=9
44    // +kubebuilder:default=6
45    CompressionLevel int `json:"compressionLevel,omitempty"`
46
47    // Multi-region backup copies
48    ReplicationRegions []string `json:"replicationRegions,omitempty"`
49}
50
51// BackupStatus tracks backup operations
52type BackupStatus struct {
53    // Last successful backup
54    LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`
55
56    // Last backup size in bytes
57    LastBackupSize int64 `json:"lastBackupSize,omitempty"`
58
59    // Total backups created
60    TotalBackups int32 `json:"totalBackups"`
61
62    // Failed backup attempts
63    FailedBackups int32 `json:"failedBackups"`
64
65    // Latest backup location
66    LatestBackupPath string `json:"latestBackupPath,omitempty"`
67
68    // PITR status
69    PITRStatus PITRStatus `json:"pitrStatus,omitempty"`
70}
71
72// PITRStatus tracks point-in-time recovery capability
73type PITRStatus struct {
74    // Enabled indicates if PITR is active
75    Enabled bool `json:"enabled"`
76
77    // Earliest recovery point
78    EarliestRecoveryPoint *metav1.Time `json:"earliestRecoveryPoint,omitempty"`
79
80    // Latest recovery point
81    LatestRecoveryPoint *metav1.Time `json:"latestRecoveryPoint,omitempty"`
82
83    // WAL archive location
84    WALArchiveLocation string `json:"walArchiveLocation,omitempty"`
85}

Backup Controller Implementation:

  1// pkg/backup/controller.go - Advanced backup controller
  2
  3package backup
  4
  5import (
  6    "context"
  7    "fmt"
  8    "time"
  9
 10    batchv1 "k8s.io/api/batch/v1"
 11    corev1 "k8s.io/api/core/v1"
 12    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 13    "k8s.io/apimachinery/pkg/api/resource"
 14    "sigs.k8s.io/controller-runtime/pkg/client"
 15
 16    databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
 17)
 18
 19// BackupController manages database backups
 20type BackupController struct {
 21    client.Client
 22}
 23
 24// ReconcileBackup ensures backup infrastructure is in place
 25func (bc *BackupController) ReconcileBackup(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
 26    if !postgres.Spec.BackupSpec.Enabled {
 27        return nil
 28    }
 29
 30    // Create backup CronJob
 31    if err := bc.ensureBackupCronJob(ctx, postgres); err != nil {
 32        return fmt.Errorf("failed to ensure backup cronjob: %w", err)
 33    }
 34
 35    // Set up PITR if enabled
 36    if postgres.Spec.BackupSpec.PITREnabled {
 37        if err := bc.setupPITR(ctx, postgres); err != nil {
 38            return fmt.Errorf("failed to setup PITR: %w", err)
 39        }
 40    }
 41
 42    // Create retention cleanup job
 43    if err := bc.ensureRetentionCleanup(ctx, postgres); err != nil {
 44        return fmt.Errorf("failed to ensure retention cleanup: %w", err)
 45    }
 46
 47    return nil
 48}
 49
 50// ensureBackupCronJob creates or updates the backup CronJob
 51func (bc *BackupController) ensureBackupCronJob(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
 52    cronJob := &batchv1.CronJob{
 53        ObjectMeta: metav1.ObjectMeta{
 54            Name:      fmt.Sprintf("%s-backup", postgres.Name),
 55            Namespace: postgres.Namespace,
 56            Labels: map[string]string{
 57                "app":      "postgresql-backup",
 58                "database": postgres.Name,
 59            },
 60        },
 61        Spec: batchv1.CronJobSpec{
 62            Schedule:                   postgres.Spec.BackupSchedule,
 63            SuccessfulJobsHistoryLimit: int32Ptr(5),
 64            FailedJobsHistoryLimit:     int32Ptr(3),
 65            ConcurrencyPolicy:          batchv1.ForbidConcurrent,
 66            JobTemplate: batchv1.JobTemplateSpec{
 67                Spec: batchv1.JobSpec{
 68                    Template: corev1.PodTemplateSpec{
 69                        Spec: bc.createBackupPodSpec(postgres),
 70                    },
 71                    BackoffLimit: int32Ptr(3),
 72                },
 73            },
 74        },
 75    }
 76
 77    return bc.Create(ctx, cronJob)
 78}
 79
 80// createBackupPodSpec generates the pod specification for backup jobs
 81func (bc *BackupController) createBackupPodSpec(postgres *databasev1alpha1.PostgreSQL) corev1.PodSpec {
 82    return corev1.PodSpec{
 83        RestartPolicy: corev1.RestartPolicyNever,
 84        Containers: []corev1.Container{
 85            {
 86                Name:  "backup",
 87                Image: fmt.Sprintf("postgres:%s", postgres.Spec.Version),
 88                Command: []string{"/bin/bash", "-c"},
 89                Args: []string{
 90                    fmt.Sprintf(`
 91                        set -e
 92                        BACKUP_NAME="backup-$(date +%%Y%%m%%d-%%H%%M%%S)"
 93                        BACKUP_PATH="/tmp/${BACKUP_NAME}.sql"
 94
 95                        echo "Starting backup: ${BACKUP_NAME}"
 96
 97                        # Create backup
 98                        pg_dump -h %s-primary -U postgres -d postgresdb > ${BACKUP_PATH}
 99
100                        # Compress backup
101                        gzip --%d ${BACKUP_PATH}
102
103                        # Calculate checksum
104                        CHECKSUM=$(sha256sum ${BACKUP_PATH}.gz | awk '{print $1}')
105                        echo "Checksum: ${CHECKSUM}"
106
107                        # Upload to S3
108                        aws s3 cp ${BACKUP_PATH}.gz s3://%s/%s/${BACKUP_NAME}.sql.gz \
109                            --metadata checksum=${CHECKSUM}
110
111                        # Verify upload
112                        if [ "%t" = "true" ]; then
113                            echo "Verifying backup..."
114                            aws s3 cp s3://%s/%s/${BACKUP_NAME}.sql.gz /tmp/verify.sql.gz
115                            VERIFY_CHECKSUM=$(sha256sum /tmp/verify.sql.gz | awk '{print $1}')
116
117                            if [ "${CHECKSUM}" != "${VERIFY_CHECKSUM}" ]; then
118                                echo "Backup verification failed!"
119                                exit 1
120                            fi
121                            echo "Backup verified successfully"
122                        fi
123
124                        # Replicate to other regions
125                        %s
126
127                        echo "Backup completed: ${BACKUP_NAME}"
128                        echo "Size: $(du -h ${BACKUP_PATH}.gz | awk '{print $1}')"
129                    `,
130                        postgres.Name,
131                        postgres.Spec.BackupSpec.CompressionLevel,
132                        postgres.Spec.BackupSpec.S3Bucket,
133                        postgres.Name,
134                        postgres.Spec.BackupSpec.VerifyBackup,
135                        postgres.Spec.BackupSpec.S3Bucket,
136                        postgres.Name,
137                        bc.generateReplicationScript(postgres),
138                    ),
139                },
140                Env: []corev1.EnvVar{
141                    {
142                        Name: "PGPASSWORD",
143                        ValueFrom: &corev1.EnvVarSource{
144                            SecretKeyRef: &corev1.SecretKeySelector{
145                                LocalObjectReference: corev1.LocalObjectReference{
146                                    Name: fmt.Sprintf("%s-credentials", postgres.Name),
147                                },
148                                Key: "password",
149                            },
150                        },
151                    },
152                    {
153                        Name:  "AWS_REGION",
154                        Value: postgres.Spec.BackupSpec.S3Region,
155                    },
156                    {
157                        Name: "AWS_ACCESS_KEY_ID",
158                        ValueFrom: &corev1.EnvVarSource{
159                            SecretKeyRef: &corev1.SecretKeySelector{
160                                LocalObjectReference: corev1.LocalObjectReference{
161                                    Name: "aws-credentials",
162                                },
163                                Key: "access-key-id",
164                            },
165                        },
166                    },
167                    {
168                        Name: "AWS_SECRET_ACCESS_KEY",
169                        ValueFrom: &corev1.EnvVarSource{
170                            SecretKeyRef: &corev1.SecretKeySelector{
171                                LocalObjectReference: corev1.LocalObjectReference{
172                                    Name: "aws-credentials",
173                                },
174                                Key: "secret-access-key",
175                            },
176                        },
177                    },
178                },
179                Resources: corev1.ResourceRequirements{
180                    Requests: corev1.ResourceList{
181                        corev1.ResourceCPU:    resource.MustParse("500m"),
182                        corev1.ResourceMemory: resource.MustParse("1Gi"),
183                    },
184                    Limits: corev1.ResourceList{
185                        corev1.ResourceCPU:    resource.MustParse("2"),
186                        corev1.ResourceMemory: resource.MustParse("4Gi"),
187                    },
188                },
189            },
190        },
191    }
192}
193
194// generateReplicationScript creates script for multi-region replication
195func (bc *BackupController) generateReplicationScript(postgres *databasev1alpha1.PostgreSQL) string {
196    if len(postgres.Spec.BackupSpec.ReplicationRegions) == 0 {
197        return "echo 'No replication configured'"
198    }
199
200    script := ""
201    for _, region := range postgres.Spec.BackupSpec.ReplicationRegions {
202        script += fmt.Sprintf(`
203            echo "Replicating to region: %s"
204            aws s3 cp s3://%s/%s/${BACKUP_NAME}.sql.gz \
205                s3://%s-%s/%s/${BACKUP_NAME}.sql.gz \
206                --source-region %s --region %s
207        `, region,
208            postgres.Spec.BackupSpec.S3Bucket, postgres.Name,
209            postgres.Spec.BackupSpec.S3Bucket, region, postgres.Name,
210            postgres.Spec.BackupSpec.S3Region, region)
211    }
212    return script
213}
214
215// setupPITR configures point-in-time recovery
216func (bc *BackupController) setupPITR(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
217    // Create WAL archiving configuration
218    // This would configure PostgreSQL to archive WAL files to S3
219    // Implementation details depend on your PostgreSQL setup
220    return nil
221}
222
223// ensureRetentionCleanup creates job to clean up old backups
224func (bc *BackupController) ensureRetentionCleanup(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
225    cronJob := &batchv1.CronJob{
226        ObjectMeta: metav1.ObjectMeta{
227            Name:      fmt.Sprintf("%s-backup-cleanup", postgres.Name),
228            Namespace: postgres.Namespace,
229        },
230        Spec: batchv1.CronJobSpec{
231            Schedule: "0 3 * * *", // Daily at 3 AM
232            JobTemplate: batchv1.JobTemplateSpec{
233                Spec: batchv1.JobSpec{
234                    Template: corev1.PodTemplateSpec{
235                        Spec: corev1.PodSpec{
236                            RestartPolicy: corev1.RestartPolicyNever,
237                            Containers: []corev1.Container{
238                                {
239                                    Name:  "cleanup",
240                                    Image: "amazon/aws-cli:latest",
241                                    Command: []string{"/bin/bash", "-c"},
242                                    Args: []string{
243                                        fmt.Sprintf(`
244                                            RETENTION_DAYS=%d
245                                            CUTOFF_DATE=$(date -d "${RETENTION_DAYS} days ago" +%%Y%%m%%d)
246
247                                            echo "Cleaning up backups older than ${CUTOFF_DATE}"
248
249                                            aws s3 ls s3://%s/%s/ | while read -r line; do
250                                                BACKUP_DATE=$(echo $line | grep -oP 'backup-\K[0-9]{8}')
251                                                if [ "${BACKUP_DATE}" -lt "${CUTOFF_DATE}" ]; then
252                                                    BACKUP_FILE=$(echo $line | awk '{print $4}')
253                                                    echo "Deleting old backup: ${BACKUP_FILE}"
254                                                    aws s3 rm s3://%s/%s/${BACKUP_FILE}
255                                                fi
256                                            done
257
258                                            echo "Cleanup completed"
259                                        `,
260                                            postgres.Spec.BackupSpec.RetentionDays,
261                                            postgres.Spec.BackupSpec.S3Bucket,
262                                            postgres.Name,
263                                            postgres.Spec.BackupSpec.S3Bucket,
264                                            postgres.Name,
265                                        ),
266                                    },
267                                },
268                            },
269                        },
270                    },
271                },
272            },
273        },
274    }
275
276    return bc.Create(ctx, cronJob)
277}
278
279func int32Ptr(i int32) *int32 {
280    return &i
281}

Testing: Verify backups are created, verified, replicated, and cleaned up according to retention policy.

Exercise 3: StatefulSet Scaling Controller with Intelligent Autoscaling

Objective: Implement intelligent scaling based on database load metrics with predictive scaling capabilities.

Requirements:

  1. Monitor database metrics (connections, CPU, memory, query latency)
  2. Implement predictive scaling based on historical patterns
  3. Support custom metrics from Prometheus
  4. Implement scale-up and scale-down cooldown periods
  5. Add safety checks to prevent thrashing

Implementation:

  1// api/v1alpha1/autoscaling_types.go
  2
  3package v1alpha1
  4
  5import (
  6    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7)
  8
  9// AutoscalingSpec defines autoscaling configuration
 10type AutoscalingSpec struct {
 11    // Enable autoscaling
 12    // +kubebuilder:default=false
 13    Enabled bool `json:"enabled"`
 14
 15    // Minimum replicas
 16    // +kubebuilder:validation:Minimum=1
 17    // +kubebuilder:default=1
 18    MinReplicas int32 `json:"minReplicas"`
 19
 20    // Maximum replicas
 21    // +kubebuilder:validation:Minimum=1
 22    // +kubebuilder:validation:Maximum=100
 23    // +kubebuilder:default=10
 24    MaxReplicas int32 `json:"maxReplicas"`
 25
 26    // Target CPU utilization percentage
 27    // +kubebuilder:validation:Minimum=1
 28    // +kubebuilder:validation:Maximum=100
 29    // +kubebuilder:default=70
 30    TargetCPUUtilization int `json:"targetCPUUtilization"`
 31
 32    // Target memory utilization percentage
 33    // +kubebuilder:validation:Minimum=1
 34    // +kubebuilder:validation:Maximum=100
 35    // +kubebuilder:default=80
 36    TargetMemoryUtilization int `json:"targetMemoryUtilization"`
 37
 38    // Target database connections percentage
 39    // +kubebuilder:validation:Minimum=1
 40    // +kubebuilder:validation:Maximum=100
 41    // +kubebuilder:default=75
 42    TargetConnectionUtilization int `json:"targetConnectionUtilization"`
 43
 44    // Custom Prometheus metrics for scaling
 45    CustomMetrics []CustomMetric `json:"customMetrics,omitempty"`
 46
 47    // Scale up cooldown in seconds
 48    // +kubebuilder:validation:Minimum=0
 49    // +kubebuilder:default=300
 50    ScaleUpCooldown int32 `json:"scaleUpCooldown"`
 51
 52    // Scale down cooldown in seconds
 53    // +kubebuilder:validation:Minimum=0
 54    // +kubebuilder:default=600
 55    ScaleDownCooldown int32 `json:"scaleDownCooldown"`
 56
 57    // Enable predictive scaling
 58    // +kubebuilder:default=false
 59    PredictiveScaling bool `json:"predictiveScaling,omitempty"`
 60
 61    // Behavior configuration
 62    Behavior *ScalingBehavior `json:"behavior,omitempty"`
 63}
 64
 65// CustomMetric defines a custom Prometheus metric for scaling
 66type CustomMetric struct {
 67    // Metric name
 68    Name string `json:"name"`
 69
 70    // Prometheus query
 71    Query string `json:"query"`
 72
 73    // Target value
 74    TargetValue string `json:"targetValue"`
 75
 76    // Metric type (average, value)
 77    Type string `json:"type"`
 78}
 79
 80// ScalingBehavior defines scaling behavior rules
 81type ScalingBehavior struct {
 82    // Scale up rules
 83    ScaleUp *ScalingRules `json:"scaleUp,omitempty"`
 84
 85    // Scale down rules
 86    ScaleDown *ScalingRules `json:"scaleDown,omitempty"`
 87}
 88
 89// ScalingRules defines scaling policy rules
 90type ScalingRules struct {
 91    // Stabilization window in seconds
 92    StabilizationWindowSeconds int32 `json:"stabilizationWindowSeconds,omitempty"`
 93
 94    // Maximum number of pods to add/remove
 95    MaxPods int32 `json:"maxPods,omitempty"`
 96
 97    // Maximum percentage to scale by
 98    MaxPercent int32 `json:"maxPercent,omitempty"`
 99}
100
101// AutoscalingStatus tracks autoscaling status
102type AutoscalingStatus struct {
103    // Current replicas
104    CurrentReplicas int32 `json:"currentReplicas"`
105
106    // Desired replicas
107    DesiredReplicas int32 `json:"desiredReplicas"`
108
109    // Last scale time
110    LastScaleTime *metav1.Time `json:"lastScaleTime,omitempty"`
111
112    // Current metrics
113    CurrentMetrics []MetricStatus `json:"currentMetrics,omitempty"`
114
115    // Scaling decision reason
116    LastScaleReason string `json:"lastScaleReason,omitempty"`
117}
118
119// MetricStatus represents current status of a metric
120type MetricStatus struct {
121    // Metric name
122    Name string `json:"name"`
123
124    // Current value
125    CurrentValue string `json:"currentValue"`
126
127    // Target value
128    TargetValue string `json:"targetValue"`
129}

Task: Implement the autoscaling reconciler that monitors metrics and scales the StatefulSet accordingly. Include predictive scaling based on time-series analysis of historical data.

Exercise 4: Multi-Cluster Deployment Operator

Objective: Extend your operator to deploy and manage PostgreSQL clusters across multiple Kubernetes clusters with automatic failover.

Requirements:

  1. Deploy to multiple clusters simultaneously
  2. Configure cross-cluster replication
  3. Implement automatic failover between clusters
  4. Monitor cluster health across regions
  5. Synchronize configuration across clusters

Task: Design and implement the multi-cluster architecture, including cluster discovery, resource synchronization, and failover logic.

Exercise 5: Certificate Management Operator

Objective: Create an operator that manages SSL/TLS certificates with automatic renewal using cert-manager integration or ACME protocol.

Requirements:

  1. Automatic certificate issuance
  2. Automatic renewal before expiration
  3. Certificate rotation without downtime
  4. Support for multiple certificate authorities
  5. Integration with secret management

Task: Implement the certificate lifecycle management including issuance, renewal, and rotation with zero downtime.

Summary

Key Takeaways

  1. Operators encode operational knowledge into reusable, automated code
  2. Reconciliation loops continuously drive actual state toward desired state
  3. Controller-runtime provides the framework for building production-ready operators
  4. Testing is crucial - use envtest for comprehensive operator testing
  5. Production patterns include leader election, observability, and graceful shutdown

Next Steps

  • Explore the Operator SDK for additional tools and scaffolding options
  • Study existing operators for advanced patterns
  • Learn about Helm integration for packaging and distributing operators
  • Investigate webhooks for validating and mutating custom resources
  • Consider Operator Lifecycle Manager for operator distribution

Further Reading

You're now equipped to build production-ready Kubernetes operators that can automate complex operational tasks at scale. The patterns and practices covered here will help you create operators that are maintainable, testable, and production-ready.