Why This Matters - The Automation Revolution
Consider a senior DevOps engineer who never sleeps, never makes mistakes, and automatically handles all operational tasks for your applications 24/7. That's essentially what a Kubernetes operator is - automated operational expertise encoded as software that runs perpetually in your cluster.
Real-world scenario: A large e-commerce platform runs dozens of PostgreSQL database clusters. Each cluster needs:
- Automatic failover when the primary node fails during peak shopping season
- Daily backups with verification before major product launches
- Zero-downtime upgrades during high-traffic periods
- Monitoring replication lag and automatic corrective actions
Without operators, this requires a team of DBAs working around the clock. With operators, all this knowledge becomes reusable, automated code that handles scenarios humans might miss during critical moments.
Learning Objectives
By the end of this article, you will be able to:
- Design and implement custom resources and controllers using controller-runtime
- Use kubebuilder to scaffold production-ready operators with proper code generation
- Implement reconciliation patterns that maintain desired state effectively
- Apply RBAC best practices for secure operator deployment
- Write comprehensive tests using envtest for operator validation
- Deploy operators with proper observability and production patterns
Core Concepts - Understanding the Operator Pattern
The Control Loop Philosophy
At its heart, Kubernetes operates on a simple but powerful concept: declarative configuration and reconciliation loops. Think of it like a thermostat:
1// The Kubernetes way: declare what you want
2type DesiredState struct {
3 Temperature int `json:"temperature"`
4 Mode string `json:"mode"` // "heat", "cool", "auto"
5}
6
7// Kubernetes continuously works to make actual == desired
8func Reconcile(actual ActualState) error {
9 if actual.Temperature < t.Desired.Temperature {
10 t.turnOnHeat()
11 } else if actual.Temperature > t.Desired.Temperature {
12 t.turnOnCool()
13 }
14 return nil
15}
The thermostat doesn't just "turn on heat once" - it continuously monitors and adjusts. Kubernetes operators apply this same principle to complex applications.
Operator Components: The Building Blocks
Custom Resources - The "what":
1# Example: What we want our database to look like
2apiVersion: database.mycompany.com/v1alpha1
3kind: PostgreSQL
4metadata:
5 name: prod-db-cluster
6spec:
7 replicas: 3
8 version: "15.4"
9 backupSchedule: "0 2 * * *" # Daily at 2 AM
10 resources:
11 requests:
12 memory: "2Gi"
13 cpu: "1000m"
Custom Resource Definitions - The "contract":
1// Database operator CRD definition
2// +kubebuilder:validation:Required
3// +kubebuilder:validation:Minimum=1
4// +kubebuilder:validation:Maximum=10
5Replicas int32 `json:"replicas"`
6
7// +kubebuilder:validation:Required
8Version string `json:"version"`
9
10// +kubebuilder:validation:Pattern=^[0-9]+\s[0-9]+\s\*\s\*\s\*$
11BackupSchedule string `json:"backupSchedule"`
Controller - The "how":
1// The controller watches CRs and makes them reality
2func Reconcile(ctx context.Context, req ctrl.Request) {
3 // 1. Fetch the PostgreSQL resource
4 // 2. Check what currently exists in the cluster
5 // 3. Compare desired vs actual state
6 // 4. Take action to reconcile differences
7 // 5. Update status with current state
8}
💡 Key Insight: Operators transform operational knowledge from tribal wisdom into executable code that anyone can use. This means your best DevOps practices are automatically applied, every time, without human intervention.
The Operator Maturity Journey
Not all operators are created equal. The Operator Framework defines a maturity model:
- Basic Install: Automated installation and configuration
- Seamless Upgrades: Automated upgrade process
- Full Lifecycle: Backup, restore, and failure recovery
- Deep Insights: Metrics, alerts, and log processing
- Auto Pilot: Horizontal/vertical scaling, auto-tuning, abnormality detection
Deep Dive - Kubernetes API Machinery
Understanding the Client-Go Library
Before building operators, you need to understand how Go applications interact with Kubernetes. The client-go library is the foundation:
1// pkg/kubernetes/client.go - Understanding Kubernetes clients
2
3package kubernetes
4
5import (
6 "context"
7 "fmt"
8
9 corev1 "k8s.io/api/core/v1"
10 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11 "k8s.io/client-go/kubernetes"
12 "k8s.io/client-go/rest"
13 "k8s.io/client-go/tools/clientcmd"
14)
15
16// ClientManager provides access to Kubernetes API
17type ClientManager struct {
18 clientset *kubernetes.Clientset
19 config *rest.Config
20}
21
22// NewClientManager creates a new Kubernetes client manager
23func NewClientManager(kubeconfig string) (*ClientManager, error) {
24 var config *rest.Config
25 var err error
26
27 if kubeconfig != "" {
28 // Out-of-cluster configuration (development)
29 config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
30 } else {
31 // In-cluster configuration (production)
32 config, err = rest.InClusterConfig()
33 }
34
35 if err != nil {
36 return nil, fmt.Errorf("failed to get config: %w", err)
37 }
38
39 // Create the clientset
40 clientset, err := kubernetes.NewForConfig(config)
41 if err != nil {
42 return nil, fmt.Errorf("failed to create clientset: %w", err)
43 }
44
45 return &ClientManager{
46 clientset: clientset,
47 config: config,
48 }, nil
49}
50
51// ListPods demonstrates basic Kubernetes API operations
52func (cm *ClientManager) ListPods(ctx context.Context, namespace string) (*corev1.PodList, error) {
53 pods, err := cm.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
54 if err != nil {
55 return nil, fmt.Errorf("failed to list pods: %w", err)
56 }
57
58 return pods, nil
59}
60
61// WatchPods demonstrates watching Kubernetes resources
62func (cm *ClientManager) WatchPods(ctx context.Context, namespace string) error {
63 watcher, err := cm.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{})
64 if err != nil {
65 return fmt.Errorf("failed to create watcher: %w", err)
66 }
67 defer watcher.Stop()
68
69 fmt.Println("Watching for pod changes...")
70 for {
71 select {
72 case event, ok := <-watcher.ResultChan():
73 if !ok {
74 return fmt.Errorf("watch channel closed")
75 }
76
77 pod, ok := event.Object.(*corev1.Pod)
78 if !ok {
79 continue
80 }
81
82 fmt.Printf("Event: %s - Pod: %s/%s - Phase: %s\n",
83 event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
84
85 case <-ctx.Done():
86 return ctx.Err()
87 }
88 }
89}
Key concepts from client-go:
- Clientset: Type-safe clients for all Kubernetes resources
- RESTClient: Low-level HTTP client for custom operations
- Informers: Efficient caching and watching mechanisms
- Workqueues: Rate-limited work distribution for controllers
Informers and Caching
Operators need efficient ways to watch resources without overwhelming the API server:
1// pkg/kubernetes/informer.go - Efficient resource watching
2
3package kubernetes
4
5import (
6 "fmt"
7 "time"
8
9 corev1 "k8s.io/api/core/v1"
10 "k8s.io/apimachinery/pkg/labels"
11 "k8s.io/client-go/informers"
12 "k8s.io/client-go/kubernetes"
13 "k8s.io/client-go/tools/cache"
14)
15
16// PodInformerManager demonstrates efficient pod watching
17type PodInformerManager struct {
18 clientset *kubernetes.Clientset
19 informerFactory informers.SharedInformerFactory
20}
21
22// NewPodInformerManager creates a manager with informers
23func NewPodInformerManager(clientset *kubernetes.Clientset) *PodInformerManager {
24 // Create informer factory with 30 second resync period
25 factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
26
27 return &PodInformerManager{
28 clientset: clientset,
29 informerFactory: factory,
30 }
31}
32
33// SetupPodInformer configures pod watching with event handlers
34func (pim *PodInformerManager) SetupPodInformer() {
35 // Get pod informer from factory
36 podInformer := pim.informerFactory.Core().V1().Pods().Informer()
37
38 // Add event handlers
39 podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
40 AddFunc: func(obj interface{}) {
41 pod := obj.(*corev1.Pod)
42 fmt.Printf("Pod added: %s/%s\n", pod.Namespace, pod.Name)
43 },
44 UpdateFunc: func(oldObj, newObj interface{}) {
45 oldPod := oldObj.(*corev1.Pod)
46 newPod := newObj.(*corev1.Pod)
47
48 // Only log if phase changed
49 if oldPod.Status.Phase != newPod.Status.Phase {
50 fmt.Printf("Pod updated: %s/%s - Phase: %s -> %s\n",
51 newPod.Namespace, newPod.Name,
52 oldPod.Status.Phase, newPod.Status.Phase)
53 }
54 },
55 DeleteFunc: func(obj interface{}) {
56 pod := obj.(*corev1.Pod)
57 fmt.Printf("Pod deleted: %s/%s\n", pod.Namespace, pod.Name)
58 },
59 })
60}
61
62// Start begins the informer and blocks until stopped
63func (pim *PodInformerManager) Start(stopCh <-chan struct{}) {
64 // Start all informers in the factory
65 pim.informerFactory.Start(stopCh)
66
67 // Wait for initial cache sync
68 pim.informerFactory.WaitForCacheSync(stopCh)
69
70 fmt.Println("Informers started and synced")
71}
72
73// ListPodsFromCache demonstrates using the informer's cache
74func (pim *PodInformerManager) ListPodsFromCache(namespace string) ([]*corev1.Pod, error) {
75 // Get lister from informer
76 lister := pim.informerFactory.Core().V1().Pods().Lister()
77
78 // List from cache instead of API server
79 pods, err := lister.Pods(namespace).List(labels.Everything())
80 if err != nil {
81 return nil, fmt.Errorf("failed to list from cache: %w", err)
82 }
83
84 return pods, nil
85}
Why informers are crucial for operators:
- Efficient watching: Single watch connection shared across controllers
- Local cache: Reduces API server load dramatically
- Event batching: Handles bursts of updates efficiently
- Resync mechanism: Ensures eventual consistency
Workqueues for Reliable Processing
Controllers need reliable, rate-limited work processing:
1// pkg/kubernetes/workqueue.go - Reliable work processing
2
3package kubernetes
4
5import (
6 "fmt"
7 "time"
8
9 "k8s.io/apimachinery/pkg/util/runtime"
10 "k8s.io/apimachinery/pkg/util/wait"
11 "k8s.io/client-go/tools/cache"
12 "k8s.io/client-go/util/workqueue"
13)
14
15// Controller demonstrates workqueue pattern
16type Controller struct {
17 queue workqueue.RateLimitingInterface
18 informer cache.SharedIndexInformer
19}
20
21// NewController creates a controller with workqueue
22func NewController(informer cache.SharedIndexInformer) *Controller {
23 // Create rate-limited workqueue
24 queue := workqueue.NewRateLimitingQueue(
25 workqueue.NewItemExponentialFailureRateLimiter(
26 100*time.Millisecond, // Base delay
27 10*time.Second, // Max delay
28 ),
29 )
30
31 controller := &Controller{
32 queue: queue,
33 informer: informer,
34 }
35
36 // Add event handlers that enqueue keys
37 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
38 AddFunc: func(obj interface{}) {
39 key, err := cache.MetaNamespaceKeyFunc(obj)
40 if err == nil {
41 queue.Add(key)
42 }
43 },
44 UpdateFunc: func(oldObj, newObj interface{}) {
45 key, err := cache.MetaNamespaceKeyFunc(newObj)
46 if err == nil {
47 queue.Add(key)
48 }
49 },
50 DeleteFunc: func(obj interface{}) {
51 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
52 if err == nil {
53 queue.Add(key)
54 }
55 },
56 })
57
58 return controller
59}
60
61// Run starts the controller workers
62func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
63 defer runtime.HandleCrash()
64 defer c.queue.ShutDown()
65
66 fmt.Println("Starting controller")
67
68 // Start informer
69 go c.informer.Run(stopCh)
70
71 // Wait for cache sync
72 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
73 fmt.Println("Failed to sync informer cache")
74 return
75 }
76
77 fmt.Println("Controller synced and ready")
78
79 // Start workers
80 for i := 0; i < workers; i++ {
81 go wait.Until(c.runWorker, time.Second, stopCh)
82 }
83
84 <-stopCh
85 fmt.Println("Stopping controller")
86}
87
88// runWorker processes items from the queue
89func (c *Controller) runWorker() {
90 for c.processNextItem() {
91 }
92}
93
94// processNextItem processes a single queued item
95func (c *Controller) processNextItem() bool {
96 // Get next item (blocks until available)
97 key, shutdown := c.queue.Get()
98 if shutdown {
99 return false
100 }
101 defer c.queue.Done(key)
102
103 // Process the item
104 err := c.processItem(key.(string))
105
106 // Handle result
107 if err == nil {
108 // Success - forget any rate limiting
109 c.queue.Forget(key)
110 } else {
111 // Failure - requeue with rate limiting
112 if c.queue.NumRequeues(key) < 5 {
113 fmt.Printf("Error processing %s (will retry): %v\n", key, err)
114 c.queue.AddRateLimited(key)
115 } else {
116 fmt.Printf("Dropping %s after 5 retries: %v\n", key, err)
117 c.queue.Forget(key)
118 runtime.HandleError(err)
119 }
120 }
121
122 return true
123}
124
125// processItem handles a single item - implement your logic here
126func (c *Controller) processItem(key string) error {
127 // Get object from informer cache
128 obj, exists, err := c.informer.GetIndexer().GetByKey(key)
129 if err != nil {
130 return fmt.Errorf("fetching object with key %s failed: %w", key, err)
131 }
132
133 if !exists {
134 fmt.Printf("Object %s does not exist anymore\n", key)
135 return nil
136 }
137
138 // Process the object
139 fmt.Printf("Processing %s: %v\n", key, obj)
140
141 // Simulate some work
142 time.Sleep(100 * time.Millisecond)
143
144 return nil
145}
Workqueue benefits for operators:
- Rate limiting: Prevents overwhelming the API server during failures
- Deduplication: Multiple events for same object get collapsed
- Retry logic: Automatic exponential backoff for failures
- Ordered processing: FIFO guarantee within rate limits
Advanced Operator Patterns
Leader Election for High Availability
Production operators must handle multiple replicas with leader election:
1// pkg/operator/leader.go - Leader election implementation
2
3package operator
4
5import (
6 "context"
7 "fmt"
8 "os"
9 "time"
10
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "k8s.io/client-go/kubernetes"
13 "k8s.io/client-go/tools/leaderelection"
14 "k8s.io/client-go/tools/leaderelection/resourcelock"
15)
16
17// LeaderElectionConfig configures leader election
18type LeaderElectionConfig struct {
19 Identity string
20 Namespace string
21 LockName string
22 LeaseDuration time.Duration
23 RenewDeadline time.Duration
24 RetryPeriod time.Duration
25}
26
27// RunWithLeaderElection runs the operator with leader election
28func RunWithLeaderElection(
29 ctx context.Context,
30 clientset *kubernetes.Clientset,
31 config LeaderElectionConfig,
32 runFunc func(ctx context.Context),
33) error {
34 // Create resource lock for leader election
35 lock := &resourcelock.LeaseLock{
36 LeaseMeta: metav1.ObjectMeta{
37 Name: config.LockName,
38 Namespace: config.Namespace,
39 },
40 Client: clientset.CoordinationV1(),
41 LockConfig: resourcelock.ResourceLockConfig{
42 Identity: config.Identity,
43 },
44 }
45
46 // Configure leader election
47 leaderElectionConfig := leaderelection.LeaderElectionConfig{
48 Lock: lock,
49 ReleaseOnCancel: true,
50 LeaseDuration: config.LeaseDuration,
51 RenewDeadline: config.RenewDeadline,
52 RetryPeriod: config.RetryPeriod,
53 Callbacks: leaderelection.LeaderCallbacks{
54 OnStartedLeading: func(ctx context.Context) {
55 fmt.Printf("Started leading with identity: %s\n", config.Identity)
56 runFunc(ctx)
57 },
58 OnStoppedLeading: func() {
59 fmt.Printf("Stopped leading with identity: %s\n", config.Identity)
60 os.Exit(0)
61 },
62 OnNewLeader: func(identity string) {
63 if identity == config.Identity {
64 fmt.Println("I am the new leader")
65 } else {
66 fmt.Printf("New leader elected: %s\n", identity)
67 }
68 },
69 },
70 }
71
72 // Start leader election
73 leaderelection.RunOrDie(ctx, leaderElectionConfig)
74
75 return nil
76}
77
78// Example usage of leader election
79func ExampleLeaderElection() {
80 ctx := context.Background()
81
82 // Get clientset (assumed to be created elsewhere)
83 var clientset *kubernetes.Clientset
84
85 // Configure leader election
86 config := LeaderElectionConfig{
87 Identity: os.Getenv("HOSTNAME"), // Pod name
88 Namespace: "default",
89 LockName: "my-operator-lock",
90 LeaseDuration: 15 * time.Second,
91 RenewDeadline: 10 * time.Second,
92 RetryPeriod: 2 * time.Second,
93 }
94
95 // Run operator with leader election
96 RunWithLeaderElection(ctx, clientset, config, func(ctx context.Context) {
97 fmt.Println("Running as leader...")
98 // Start your operator controllers here
99 })
100}
Finalizers for Resource Cleanup
Finalizers ensure proper cleanup when resources are deleted:
1// pkg/operator/finalizer.go - Finalizer handling
2
3package operator
4
5import (
6 "context"
7 "fmt"
8
9 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10 "sigs.k8s.io/controller-runtime/pkg/client"
11 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
12)
13
14const (
15 // FinalizerName is our custom finalizer
16 FinalizerName = "database.mycompany.com/finalizer"
17)
18
19// ResourceWithFinalizer demonstrates finalizer usage
20type ResourceWithFinalizer struct {
21 client.Client
22}
23
24// HandleFinalizer manages finalizer logic for a resource
25func (r *ResourceWithFinalizer) HandleFinalizer(
26 ctx context.Context,
27 obj client.Object,
28 cleanup func(context.Context, client.Object) error,
29) error {
30 // Check if object is being deleted
31 if obj.GetDeletionTimestamp() != nil {
32 // Object is being deleted
33 if controllerutil.ContainsFinalizer(obj, FinalizerName) {
34 // Our finalizer is present, perform cleanup
35 fmt.Printf("Running cleanup for %s/%s\n", obj.GetNamespace(), obj.GetName())
36
37 if err := cleanup(ctx, obj); err != nil {
38 return fmt.Errorf("cleanup failed: %w", err)
39 }
40
41 // Remove our finalizer
42 controllerutil.RemoveFinalizer(obj, FinalizerName)
43 if err := r.Update(ctx, obj); err != nil {
44 return fmt.Errorf("failed to remove finalizer: %w", err)
45 }
46
47 fmt.Printf("Finalizer removed for %s/%s\n", obj.GetNamespace(), obj.GetName())
48 }
49
50 // Object will be deleted by Kubernetes
51 return nil
52 }
53
54 // Object is not being deleted, ensure finalizer exists
55 if !controllerutil.ContainsFinalizer(obj, FinalizerName) {
56 controllerutil.AddFinalizer(obj, FinalizerName)
57 if err := r.Update(ctx, obj); err != nil {
58 return fmt.Errorf("failed to add finalizer: %w", err)
59 }
60 fmt.Printf("Finalizer added to %s/%s\n", obj.GetNamespace(), obj.GetName())
61 }
62
63 return nil
64}
65
66// Example cleanup function for database operator
67func cleanupDatabase(ctx context.Context, obj client.Object) error {
68 fmt.Printf("Cleaning up database resources for %s/%s\n", obj.GetNamespace(), obj.GetName())
69
70 // Example cleanup tasks:
71 // 1. Take final backup
72 // 2. Remove external resources (S3 buckets, etc.)
73 // 3. Deregister from external services
74 // 4. Clean up associated secrets
75
76 // Simulate cleanup work
77 fmt.Println("- Taking final backup...")
78 fmt.Println("- Removing S3 backup bucket...")
79 fmt.Println("- Cleaning up secrets...")
80
81 return nil
82}
Webhooks for Validation and Mutation
Admission webhooks provide advanced validation and defaulting:
1// api/v1alpha1/postgresql_webhook.go - Admission webhooks
2
3package v1alpha1
4
5import (
6 "fmt"
7 "k8s.io/apimachinery/pkg/runtime"
8 ctrl "sigs.k8s.io/controller-runtime"
9 logf "sigs.k8s.io/controller-runtime/pkg/log"
10 "sigs.k8s.io/controller-runtime/pkg/webhook"
11)
12
13var postgresqllog = logf.Log.WithName("postgresql-resource")
14
15// SetupWebhookWithManager registers webhooks with the manager
16func (r *PostgreSQL) SetupWebhookWithManager(mgr ctrl.Manager) error {
17 return ctrl.NewWebhookManagedBy(mgr).
18 For(r).
19 Complete()
20}
21
22//+kubebuilder:webhook:path=/mutate-database-mycompany-com-v1alpha1-postgresql,mutating=true,failurePolicy=fail,sideEffects=None,groups=database.mycompany.com,resources=postgresqls,verbs=create;update,versions=v1alpha1,name=mpostgresql.kb.io,admissionReviewVersions=v1
23
24var _ webhook.Defaulter = &PostgreSQL{}
25
26// Default implements webhook.Defaulter - sets default values
27func (r *PostgreSQL) Default() {
28 postgresqllog.Info("default", "name", r.Name)
29
30 // Set default values if not provided
31 if r.Spec.Version == "" {
32 r.Spec.Version = "15.4"
33 postgresqllog.Info("defaulting version to 15.4")
34 }
35
36 if r.Spec.Replicas == 0 {
37 r.Spec.Replicas = 3
38 postgresqllog.Info("defaulting replicas to 3")
39 }
40
41 if r.Spec.StorageClassName == "" {
42 r.Spec.StorageClassName = "standard"
43 postgresqllog.Info("defaulting storage class to standard")
44 }
45
46 if r.Spec.BackupSchedule == "" {
47 r.Spec.BackupSchedule = "0 2 * * *" // Daily at 2 AM
48 postgresqllog.Info("defaulting backup schedule to daily at 2 AM")
49 }
50}
51
52//+kubebuilder:webhook:path=/validate-database-mycompany-com-v1alpha1-postgresql,mutating=false,failurePolicy=fail,sideEffects=None,groups=database.mycompany.com,resources=postgresqls,verbs=create;update,versions=v1alpha1,name=vpostgresql.kb.io,admissionReviewVersions=v1
53
54var _ webhook.Validator = &PostgreSQL{}
55
56// ValidateCreate implements webhook.Validator - validates creation
57func (r *PostgreSQL) ValidateCreate() error {
58 postgresqllog.Info("validate create", "name", r.Name)
59
60 return r.validatePostgreSQL()
61}
62
63// ValidateUpdate implements webhook.Validator - validates updates
64func (r *PostgreSQL) ValidateUpdate(old runtime.Object) error {
65 postgresqllog.Info("validate update", "name", r.Name)
66
67 oldPostgres, ok := old.(*PostgreSQL)
68 if !ok {
69 return fmt.Errorf("expected PostgreSQL object")
70 }
71
72 // Prevent version downgrades
73 if r.Spec.Version < oldPostgres.Spec.Version {
74 return fmt.Errorf("version downgrades are not allowed: %s -> %s",
75 oldPostgres.Spec.Version, r.Spec.Version)
76 }
77
78 // Prevent reducing replicas below 1
79 if r.Spec.Replicas < 1 {
80 return fmt.Errorf("replicas cannot be less than 1")
81 }
82
83 return r.validatePostgreSQL()
84}
85
86// ValidateDelete implements webhook.Validator - validates deletion
87func (r *PostgreSQL) ValidateDelete() error {
88 postgresqllog.Info("validate delete", "name", r.Name)
89
90 // Add protection for production databases
91 if r.Labels["environment"] == "production" {
92 if r.Annotations["allow-delete"] != "true" {
93 return fmt.Errorf("production databases require 'allow-delete: true' annotation")
94 }
95 }
96
97 return nil
98}
99
100// validatePostgreSQL performs common validation
101func (r *PostgreSQL) validatePostgreSQL() error {
102 // Validate replica count
103 if r.Spec.Replicas < 1 || r.Spec.Replicas > 10 {
104 return fmt.Errorf("replicas must be between 1 and 10, got %d", r.Spec.Replicas)
105 }
106
107 // Validate version format
108 if r.Spec.Version == "" {
109 return fmt.Errorf("version is required")
110 }
111
112 // Validate backup schedule (cron format)
113 if !isValidCronSchedule(r.Spec.BackupSchedule) {
114 return fmt.Errorf("invalid backup schedule format: %s", r.Spec.BackupSchedule)
115 }
116
117 return nil
118}
119
120// isValidCronSchedule validates cron schedule format
121func isValidCronSchedule(schedule string) bool {
122 // Simple validation - in production use a proper cron parser
123 return schedule != ""
124}
Practical Examples - Building Your First Operator
Step 1: Setting Up the Project
First, install kubebuilder and initialize your project:
1// run
2# Install kubebuilder
3OS=$(uname | tr '[:upper:]' '[:lower:]')
4ARCH=$(uname -m | sed 's/x86_64/amd64/' | sed 's/arm64/arm64/')
5curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/${OS}/${ARCH}"
6chmod +x kubebuilder && sudo mv kubebuilder /usr/local/bin/
1// run
2# Initialize a new project
3kubebuilder init --domain mycompany.com --repo mycompany.com/database-operator
4
5# Create an API
6kubebuilder create api --group database --version v1alpha1 --kind PostgreSQL
This creates the complete project structure:
database-operator/
├── api/ # API definitions
├── controllers/ # Controller implementations
├── config/ # Kubernetes manifests
├── hack/ # Scripts and tools
├── main.go # Operator entry point
└── Makefile # Build and deploy targets
Step 2: Defining Your Custom Resource
Let's design a PostgreSQL operator that manages database clusters:
1// api/v1alpha1/postgresql_types.go
2
3package v1alpha1
4
5import (
6 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7)
8
9// PostgreSQLSpec defines the desired state of PostgreSQL
10type PostgreSQLSpec struct {
11 // +kubebuilder:validation:Required
12 // +kubebuilder:validation:Minimum=1
13 // +kubebuilder:validation:Maximum=10
14 Replicas int32 `json:"replicas"`
15
16 // +kubebuilder:validation:Required
17 Version string `json:"version"`
18
19 // +kubebuilder:validation:Pattern=^[0-9]+\s[0-9]+\s\*\s\*\s\*$
20 BackupSchedule string `json:"backupSchedule"`
21
22 // +kubebuilder:validation:Optional
23 Resources corev1.ResourceRequirements `json:"resources,omitempty"`
24
25 // +kubebuilder:validation:Optional
26 StorageClassName string `json:"storageClassName,omitempty"`
27}
28
29// PostgreSQLStatus defines the observed state of PostgreSQL
30type PostgreSQLStatus struct {
31 // Represents the latest available observations of a database's state.
32 Conditions []metav1.Condition `json:"conditions,omitempty"`
33
34 // ReadyReplicas indicates the number of ready database replicas.
35 ReadyReplicas int32 `json:"readyReplicas"`
36
37 // PrimaryPod contains the name of the primary database pod.
38 PrimaryPod string `json:"primaryPod,omitempty"`
39
40 // LastBackupTime records when the last successful backup completed.
41 LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`
42}
43
44//+kubebuilder:object:root=true
45//+kubebuilder:subresource:status
46//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas"
47//+kubebuilder:printcolumn:name="Ready",type="integer",JSONPath=".status.readyReplicas"
48//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
49
50// PostgreSQL is the Schema for the postgresqls API
51type PostgreSQL struct {
52 metav1.TypeMeta `json:",inline"`
53 metav1.ObjectMeta `json:"metadata,omitempty"`
54
55 Spec PostgreSQLSpec `json:"spec,omitempty"`
56 Status PostgreSQLStatus `json:"status,omitempty"`
57}
58
59//+kubebuilder:object:root=true
60
61// PostgreSQLList contains a list of PostgreSQL
62type PostgreSQLList struct {
63 metav1.TypeMeta `json:",inline"`
64 metav1.ListMeta `json:"metadata,omitempty"`
65 Items []PostgreSQL `json:"items"`
66}
67
68func init() {
69 SchemeBuilder.Register(&PostgreSQL{}, &PostgreSQLList{})
70}
Step 3: Implementing the Reconciliation Logic
The controller's reconciliation loop is where the magic happens:
1// controllers/postgresql_controller.go
2
3package controllers
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 appsv1 "k8s.io/api/apps/v1"
11 corev1 "k8s.io/api/core/v1"
12 "k8s.io/apimachinery/pkg/api/errors"
13 "k8s.io/apimachinery/pkg/api/resource"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/apimachinery/pkg/types"
16 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
17
18 databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
19)
20
21// PostgreSQLReconciler reconciles a PostgreSQL object
22type PostgreSQLReconciler struct {
23 client.Client
24 Scheme *runtime.Scheme
25}
26
27// Reconcile implements the core reconciliation logic
28func Reconcile(ctx context.Context, req ctrl.Request) {
29 logger := log.FromContext(ctx)
30
31 // 1. Fetch the PostgreSQL instance
32 postgres := &databasev1alpha1.PostgreSQL{}
33 err := r.Get(ctx, req.NamespacedName, postgres)
34 if err != nil {
35 if errors.IsNotFound(err) {
36 // Object was deleted, nothing to do
37 return ctrl.Result{}, nil
38 }
39 logger.Error(err, "unable to fetch PostgreSQL")
40 return ctrl.Result{}, err
41 }
42
43 // 2. Check if the StatefulSet already exists, if not create a new one
44 found := &appsv1.StatefulSet{}
45 err = r.Get(ctx, types.NamespacedName{Name: postgres.Name, Namespace: postgres.Namespace}, found)
46 if err != nil && errors.IsNotFound(err) {
47 // Define a new StatefulSet
48 dep := r.statefulSetForPostgreSQL(postgres)
49 logger.Info("Creating a new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
50 err = r.Create(ctx, dep)
51 if err != nil {
52 logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", dep.Namespace, "StatefulSet.Name", dep.Name)
53 return ctrl.Result{}, err
54 }
55 // StatefulSet created successfully - return and requeue
56 return ctrl.Result{Requeue: true}, nil
57 } else if err != nil {
58 logger.Error(err, "Failed to get StatefulSet")
59 return ctrl.Result{}, err
60 }
61
62 // 3. Ensure the StatefulSet size is the same as the spec
63 size := postgres.Spec.Replicas
64 if *found.Spec.Replicas != size {
65 found.Spec.Replicas = &size
66 err = r.Update(ctx, found)
67 if err != nil {
68 logger.Error(err, "Failed to update StatefulSet", "StatefulSet.Namespace", found.Namespace, "StatefulSet.Name", found.Name)
69 return ctrl.Result{}, err
70 }
71 // Spec updated - return and requeue
72 return ctrl.Result{Requeue: true}, nil
73 }
74
75 // 4. Update the PostgreSQL status with the pod names
76 podList := &corev1.PodList{}
77 listOpts := []client.ListOption{
78 client.InNamespace(postgres.Namespace),
79 client.MatchingLabels(labelsForPostgreSQL(postgres.Name)),
80 }
81 if err = r.List(ctx, podList, listOpts); err != nil {
82 logger.Error(err, "Failed to list pods", "PostgreSQL.Namespace", postgres.Namespace, "PostgreSQL.Name", postgres.Name)
83 return ctrl.Result{}, err
84 }
85
86 podNames := getPodNames(podList.Items)
87
88 // 5. Update status if needed
89 if !reflect.DeepEqual(podNames, postgres.Status.Nodes) {
90 postgres.Status.Nodes = podNames
91 err := r.Status().Update(ctx, postgres)
92 if err != nil {
93 logger.Error(err, "Failed to update PostgreSQL status")
94 return ctrl.Result{}, err
95 }
96 }
97
98 return ctrl.Result{RequeueAfter: time.Minute * 5}, nil
99}
100
101// statefulSetForPostgreSQL creates a StatefulSet for PostgreSQL
102func statefulSetForPostgreSQL(p *databasev1alpha1.PostgreSQL) *appsv1.StatefulSet {
103 labels := labelsForPostgreSQL(p.Name)
104
105 // Default resource requirements if not specified
106 resources := p.Spec.Resources
107 if resources.Requests == nil {
108 resources = corev1.ResourceRequirements{
109 Requests: corev1.ResourceList{
110 corev1.ResourceCPU: resource.MustParse("500m"),
111 corev1.ResourceMemory: resource.MustParse("1Gi"),
112 },
113 }
114 }
115
116 // Storage class for PVCs
117 storageClass := p.Spec.StorageClassName
118 if storageClass == "" {
119 storageClass = "standard" // Default storage class
120 }
121
122 replicas := p.Spec.Replicas
123
124 return &appsv1.StatefulSet{
125 ObjectMeta: metav1.ObjectMeta{
126 Name: p.Name,
127 Namespace: p.Namespace,
128 },
129 Spec: appsv1.StatefulSetSpec{
130 Replicas: &replicas,
131 Selector: &metav1.LabelSelector{
132 MatchLabels: labels,
133 },
134 Template: corev1.PodTemplateSpec{
135 ObjectMeta: metav1.ObjectMeta{
136 Labels: labels,
137 },
138 Spec: corev1.PodSpec{
139 Containers: []corev1.Container{{
140 Image: fmt.Sprintf("postgres:%s", p.Spec.Version),
141 Name: "postgresql",
142 Ports: []corev1.ContainerPort{{
143 ContainerPort: 5432,
144 Name: "postgresql",
145 }},
146 Env: []corev1.EnvVar{
147 {
148 Name: "POSTGRES_DB",
149 Value: "postgresdb",
150 },
151 {
152 Name: "POSTGRES_USER",
153 Value: "postgresadmin",
154 },
155 {
156 Name: "POSTGRES_PASSWORD",
157 Value: "admin123",
158 },
159 },
160 Resources: resources,
161 VolumeMounts: []corev1.VolumeMount{
162 {
163 Name: "postgres-storage",
164 MountPath: "/var/lib/postgresql/data",
165 },
166 },
167 ReadinessProbe: &corev1.Probe{
168 ProbeHandler: corev1.ProbeHandler{
169 Exec: &corev1.ExecAction{
170 Command: []string{"pg_isready"},
171 },
172 },
173 InitialDelaySeconds: 5,
174 PeriodSeconds: 10,
175 },
176 LivenessProbe: &corev1.Probe{
177 ProbeHandler: corev1.ProbeHandler{
178 Exec: &corev1.ExecAction{
179 Command: []string{"pg_isready"},
180 },
181 },
182 InitialDelaySeconds: 30,
183 PeriodSeconds: 30,
184 },
185 }},
186 },
187 },
188 VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
189 ObjectMeta: metav1.ObjectMeta{
190 Name: "postgres-storage",
191 },
192 Spec: corev1.PersistentVolumeClaimSpec{
193 AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
194 StorageClassName: &storageClass,
195 Resources: corev1.ResourceRequirements{
196 Requests: corev1.ResourceList{
197 corev1.ResourceStorage: resource.MustParse("10Gi"),
198 },
199 },
200 },
201 }},
202 ServiceName: p.Name,
203 },
204 }
205}
206
207// labelsForPostgreSQL returns the labels for selecting the resources
208func labelsForPostgreSQL(name string) map[string]string {
209 return map[string]string{"app": "postgresql", "postgresql_cr": name}
210}
211
212// getPodNames returns the pod names of the array of pods passed in
213func getPodNames(pods []corev1.Pod) []string {
214 var podNames []string
215 for _, pod := range pods {
216 podNames = append(podNames, pod.Name)
217 }
218 return podNames
219}
Common Patterns and Pitfalls
Pattern 1: Status Management
Always maintain accurate status information - it's critical for debugging:
1// Update status conditions appropriately
2func updateStatus(ctx context.Context, postgres *databasev1alpha1.PostgreSQL, conditionType, status, reason, message string) error {
3 // Find or create the condition
4 var condition *metav1.Condition
5 for i := range postgres.Status.Conditions {
6 if postgres.Status.Conditions[i].Type == conditionType {
7 condition = &postgres.Status.Conditions[i]
8 break
9 }
10 }
11
12 if condition == nil {
13 // New condition
14 condition = &metav1.Condition{
15 Type: conditionType,
16 Status: metav1.ConditionStatus(status),
17 }
18 postgres.Status.Conditions = append(postgres.Status.Conditions, *condition)
19 } else {
20 // Update existing condition
21 condition.Status = metav1.ConditionStatus(status)
22 }
23
24 condition.Reason = reason
25 condition.Message = message
26 condition.LastTransitionTime = metav1.Now()
27
28 return r.Status().Update(ctx, postgres)
29}
Pattern 2: Event Recording
Record events for significant actions - they appear in kubectl describe:
1// Record events for visibility
2r.Recorder.Eventf(postgres, corev1.EventTypeNormal, "Created",
3 "Created StatefulSet %s", statefulSet.Name)
4
5r.Recorder.Eventf(postgres, corev1.EventTypeWarning, "FailedCreate",
6 "Failed to create StatefulSet: %v", err)
Pattern 3: Owner References
Ensure proper cleanup when the CR is deleted:
1// Set owner reference for garbage collection
2controllerutil.SetControllerReference(postgres, statefulSet, r.Scheme)
Common Pitfalls
1. Infinite Reconciliation Loops
1// BAD: Always requeue without checking if changes are needed
2return ctrl.Result{Requeue: true}, nil
3
4// GOOD: Only requeue if there are pending changes
5if needsReconciliation {
6 return ctrl.Result{RequeueAfter: time.Minute}, nil
7}
8return ctrl.Result{}, nil
2. Missing Error Handling
1// BAD: Ignoring errors from Get operations
2r.Get(ctx, req.NamespacedName, postgres) // Ignored error
3
4// GOOD: Always handle errors properly
5err := r.Get(ctx, req.NamespacedName, postgres)
6if err != nil {
7 if errors.IsNotFound(err) {
8 return ctrl.Result{}, nil // Object deleted, nothing to do
9 }
10 return ctrl.Result{}, err // Real error, requeue
11}
3. Inefficient Reconciliation
1// BAD: Making many API calls in reconciliation loop
2pod1 := r.getPod(...)
3pod2 := r.getPod(...)
4pod3 := r.getPod(...)
5
6// GOOD: Use List with selectors to fetch multiple resources
7podList := &corev1.PodList{}
8r.List(ctx, podList, client.InNamespace(ns), client.MatchingLabels(labels))
Integration and Mastery
Testing Your Operator
Unit Testing with Envtest:
1// controllers/postgresql_controller_test.go
2
3package controllers
4
5import (
6 "context"
7 "testing"
8 "time"
9
10 . "github.com/onsi/ginkgo/v2"
11 . "github.com/onsi/gomega"
12 "k8s.io/apimachinery/pkg/api/resource"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/types"
15
16 databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
17)
18
19var _ = Describe("PostgreSQL Controller", func() {
20 Context("When creating PostgreSQL resource", func() {
21 It("Should create StatefulSet with correct spec", func() {
22 ctx := context.Background()
23
24 // Create a PostgreSQL resource
25 postgres := &databasev1alpha1.PostgreSQL{
26 ObjectMeta: metav1.ObjectMeta{
27 Name: "test-postgres",
28 Namespace: "default",
29 },
30 Spec: databasev1alpha1.PostgreSQLSpec{
31 Replicas: 3,
32 Version: "15.4",
33 BackupSchedule: "0 2 * * *",
34 Resources: corev1.ResourceRequirements{
35 Requests: corev1.ResourceList{
36 corev1.ResourceCPU: resource.MustParse("1"),
37 corev1.ResourceMemory: resource.MustParse("2Gi"),
38 },
39 },
40 },
41 }
42
43 Expect(k8sClient.Create(ctx, postgres)).To(Succeed())
44
45 // Wait for StatefulSet to be created
46 statefulSet := &appsv1.StatefulSet{}
47 Eventually(func() error {
48 return k8sClient.Get(ctx, types.NamespacedName{
49 Name: postgres.Name, Namespace: postgres.Namespace
50 }, statefulSet)
51 }, time.Second*10, time.Millisecond*250).Should(Succeed())
52
53 Expect(*statefulSet.Spec.Replicas).To(Equal(int32(3)))
54 Expect(statefulSet.Spec.Template.Spec.Containers[0].Image).To(Equal("postgres:15.4"))
55 })
56 })
57})
58
59func TestPostgreSQL(t *testing.T) {
60 RegisterFailHandler(Fail)
61 RunSpecs(t, "Controller Suite")
62}
Production Deployment Patterns
1. Leader Election for High Availability:
1// In main.go
2func main() {
3 // ...
4 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
5 Scheme: scheme,
6 MetricsBindAddress: metricsAddr,
7 Port: 9443,
8 LeaderElection: true, // Enable leader election
9 LeaderElectionID: "database-operator.mycompany.com",
10 })
11 // ...
12}
2. Observability Integration:
1// Add metrics to your controller
2var (
3 // Reconciliation metrics
4 reconcileCounter = prometheus.NewCounterVec(
5 prometheus.CounterOpts{
6 Name: "postgresql_reconcile_total",
7 Help: "Total number of PostgreSQL reconciliations",
8 },
9 []string{"namespace", "name", "success"},
10 )
11
12 reconcileDuration = prometheus.NewHistogramVec(
13 prometheus.HistogramOpts{
14 Name: "postgresql_reconcile_duration_seconds",
15 Help: "Duration of PostgreSQL reconciliations",
16 },
17 []string{"namespace", "name"},
18 )
19)
20
21// In your Reconcile function
22start := time.Now()
23defer func() {
24 duration := time.Since(start).Seconds()
25 reconcileDuration.WithLabelValues(req.Namespace, req.Name).Observe(duration)
26}()
3. Graceful Shutdown:
1// Handle shutdown signals gracefully
2ctx := ctrl.SetupSignalHandler()
3
4if err := mgr.Start(ctx); err != nil {
5 setupLog.Error(err, "problem running manager")
6 os.Exit(1)
7}
Production-Ready Operator Development
Comprehensive Monitoring and Metrics
Production operators need deep observability for debugging and performance tracking:
1// pkg/metrics/metrics.go - Operator metrics implementation
2
3package metrics
4
5import (
6 "github.com/prometheus/client_golang/prometheus"
7 "sigs.k8s.io/controller-runtime/pkg/metrics"
8)
9
10var (
11 // ReconciliationTotal tracks total reconciliation attempts
12 ReconciliationTotal = prometheus.NewCounterVec(
13 prometheus.CounterOpts{
14 Name: "operator_reconciliation_total",
15 Help: "Total number of reconciliations per resource",
16 },
17 []string{"namespace", "name", "result"},
18 )
19
20 // ReconciliationDuration tracks reconciliation latency
21 ReconciliationDuration = prometheus.NewHistogramVec(
22 prometheus.HistogramOpts{
23 Name: "operator_reconciliation_duration_seconds",
24 Help: "Time spent in reconciliation loop",
25 Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0},
26 },
27 []string{"namespace", "name"},
28 )
29
30 // ResourcesManaged tracks currently managed resources
31 ResourcesManaged = prometheus.NewGaugeVec(
32 prometheus.GaugeOpts{
33 Name: "operator_resources_managed",
34 Help: "Number of resources currently managed",
35 },
36 []string{"namespace", "status"},
37 )
38
39 // APICallsTotal tracks Kubernetes API calls
40 APICallsTotal = prometheus.NewCounterVec(
41 prometheus.CounterOpts{
42 Name: "operator_api_calls_total",
43 Help: "Total number of Kubernetes API calls",
44 },
45 []string{"method", "resource", "result"},
46 )
47
48 // QueueDepth tracks workqueue depth
49 QueueDepth = prometheus.NewGaugeVec(
50 prometheus.GaugeOpts{
51 Name: "operator_queue_depth",
52 Help: "Current depth of the workqueue",
53 },
54 []string{"name"},
55 )
56)
57
58// init registers metrics with controller-runtime
59func init() {
60 metrics.Registry.MustRegister(
61 ReconciliationTotal,
62 ReconciliationDuration,
63 ResourcesManaged,
64 APICallsTotal,
65 QueueDepth,
66 )
67}
68
69// RecordReconciliation records a reconciliation attempt
70func RecordReconciliation(namespace, name, result string, duration float64) {
71 ReconciliationTotal.WithLabelValues(namespace, name, result).Inc()
72 ReconciliationDuration.WithLabelValues(namespace, name).Observe(duration)
73}
74
75// UpdateResourceCount updates the managed resource count
76func UpdateResourceCount(namespace, status string, count float64) {
77 ResourcesManaged.WithLabelValues(namespace, status).Set(count)
78}
79
80// RecordAPICall records a Kubernetes API call
81func RecordAPICall(method, resource, result string) {
82 APICallsTotal.WithLabelValues(method, resource, result).Inc()
83}
84
85// UpdateQueueDepth updates workqueue depth metric
86func UpdateQueueDepth(queueName string, depth int) {
87 QueueDepth.WithLabelValues(queueName).Set(float64(depth))
88}
Using metrics in your reconciler:
1// controllers/postgresql_controller.go - With metrics
2
3package controllers
4
5import (
6 "context"
7 "time"
8
9 ctrl "sigs.k8s.io/controller-runtime"
10
11 "mycompany.com/database-operator/pkg/metrics"
12)
13
14func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
15 startTime := time.Now()
16 logger := log.FromContext(ctx)
17
18 // Track reconciliation
19 defer func() {
20 duration := time.Since(startTime).Seconds()
21 result := "success"
22 if err != nil {
23 result = "error"
24 }
25 metrics.RecordReconciliation(req.Namespace, req.Name, result, duration)
26 }()
27
28 // Your reconciliation logic here
29 logger.Info("Reconciling PostgreSQL", "namespace", req.Namespace, "name", req.Name)
30
31 // Track API calls
32 metrics.RecordAPICall("GET", "PostgreSQL", "success")
33
34 // Update resource counts
35 metrics.UpdateResourceCount(req.Namespace, "ready", 1)
36
37 return ctrl.Result{}, nil
38}
Advanced Error Handling and Recovery
Robust operators need sophisticated error handling:
1// pkg/errors/errors.go - Operator error handling
2
3package errors
4
5import (
6 "fmt"
7 "time"
8
9 ctrl "sigs.k8s.io/controller-runtime"
10)
11
12// OperatorError represents an error with retry behavior
13type OperatorError struct {
14 Err error
15 Retriable bool
16 RetryAfter time.Duration
17 Reason string
18}
19
20// Error implements the error interface
21func (e *OperatorError) Error() string {
22 return fmt.Sprintf("%s: %v (retriable: %v)", e.Reason, e.Err, e.Retriable)
23}
24
25// NewRetriableError creates an error that should be retried
26func NewRetriableError(err error, retryAfter time.Duration, reason string) *OperatorError {
27 return &OperatorError{
28 Err: err,
29 Retriable: true,
30 RetryAfter: retryAfter,
31 Reason: reason,
32 }
33}
34
35// NewPermanentError creates an error that should not be retried
36func NewPermanentError(err error, reason string) *OperatorError {
37 return &OperatorError{
38 Err: err,
39 Retriable: false,
40 Reason: reason,
41 }
42}
43
44// HandleReconcileError converts operator errors to reconcile results
45func HandleReconcileError(err error) (ctrl.Result, error) {
46 if err == nil {
47 return ctrl.Result{}, nil
48 }
49
50 // Check if it's an OperatorError
51 if opErr, ok := err.(*OperatorError); ok {
52 if opErr.Retriable {
53 // Retriable error - requeue after delay
54 return ctrl.Result{
55 Requeue: true,
56 RequeueAfter: opErr.RetryAfter,
57 }, nil
58 }
59 // Permanent error - don't requeue
60 return ctrl.Result{}, nil
61 }
62
63 // Unknown error - requeue with exponential backoff
64 return ctrl.Result{}, err
65}
66
67// Example error scenarios
68var (
69 ErrResourceNotReady = NewRetriableError(
70 fmt.Errorf("resource not ready"),
71 30*time.Second,
72 "WaitingForDependencies",
73 )
74
75 ErrInvalidConfiguration = NewPermanentError(
76 fmt.Errorf("invalid configuration"),
77 "InvalidSpec",
78 )
79
80 ErrAPIServerUnavailable = NewRetriableError(
81 fmt.Errorf("API server unavailable"),
82 10*time.Second,
83 "APIServerError",
84 )
85)
Using error handling in reconciliation:
1func (r *PostgreSQLReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
2 logger := log.FromContext(ctx)
3
4 // Fetch the PostgreSQL instance
5 postgres := &databasev1alpha1.PostgreSQL{}
6 if err := r.Get(ctx, req.NamespacedName, postgres); err != nil {
7 if errors.IsNotFound(err) {
8 return ctrl.Result{}, nil
9 }
10 return errors.HandleReconcileError(
11 errors.NewRetriableError(err, 5*time.Second, "FailedToFetch"),
12 )
13 }
14
15 // Validate configuration
16 if err := r.validateConfiguration(postgres); err != nil {
17 logger.Error(err, "Invalid configuration")
18 return errors.HandleReconcileError(
19 errors.NewPermanentError(err, "InvalidConfiguration"),
20 )
21 }
22
23 // Check dependencies
24 if err := r.checkDependencies(ctx, postgres); err != nil {
25 logger.Info("Dependencies not ready, will retry")
26 return errors.HandleReconcileError(
27 errors.NewRetriableError(err, 30*time.Second, "DependenciesNotReady"),
28 )
29 }
30
31 // Continue with reconciliation...
32 return ctrl.Result{}, nil
33}
Multi-Tenant Operator Design
Building operators that safely handle multiple tenants:
1// pkg/multitenancy/isolation.go - Multi-tenant isolation
2
3package multitenancy
4
5import (
6 "context"
7 "fmt"
8
9 corev1 "k8s.io/api/core/v1"
10 rbacv1 "k8s.io/api/rbac/v1"
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13)
14
15// TenantManager handles multi-tenant resource isolation
16type TenantManager struct {
17 client.Client
18}
19
20// EnsureTenantIsolation ensures proper RBAC and namespace isolation
21func (tm *TenantManager) EnsureTenantIsolation(ctx context.Context, tenantName string) error {
22 // Create dedicated namespace for tenant
23 namespace := &corev1.Namespace{
24 ObjectMeta: metav1.ObjectMeta{
25 Name: fmt.Sprintf("tenant-%s", tenantName),
26 Labels: map[string]string{
27 "tenant": tenantName,
28 "managed-by": "database-operator",
29 },
30 },
31 }
32
33 if err := tm.Create(ctx, namespace); err != nil {
34 return fmt.Errorf("failed to create namespace: %w", err)
35 }
36
37 // Create ServiceAccount for tenant
38 sa := &corev1.ServiceAccount{
39 ObjectMeta: metav1.ObjectMeta{
40 Name: fmt.Sprintf("%s-operator", tenantName),
41 Namespace: namespace.Name,
42 },
43 }
44
45 if err := tm.Create(ctx, sa); err != nil {
46 return fmt.Errorf("failed to create service account: %w", err)
47 }
48
49 // Create Role for tenant-specific permissions
50 role := &rbacv1.Role{
51 ObjectMeta: metav1.ObjectMeta{
52 Name: fmt.Sprintf("%s-operator-role", tenantName),
53 Namespace: namespace.Name,
54 },
55 Rules: []rbacv1.PolicyRule{
56 {
57 APIGroups: []string{"database.mycompany.com"},
58 Resources: []string{"postgresqls"},
59 Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"},
60 },
61 {
62 APIGroups: []string{""},
63 Resources: []string{"pods", "services", "configmaps", "secrets"},
64 Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete"},
65 },
66 },
67 }
68
69 if err := tm.Create(ctx, role); err != nil {
70 return fmt.Errorf("failed to create role: %w", err)
71 }
72
73 // Create RoleBinding
74 roleBinding := &rbacv1.RoleBinding{
75 ObjectMeta: metav1.ObjectMeta{
76 Name: fmt.Sprintf("%s-operator-binding", tenantName),
77 Namespace: namespace.Name,
78 },
79 RoleRef: rbacv1.RoleRef{
80 APIGroup: "rbac.authorization.k8s.io",
81 Kind: "Role",
82 Name: role.Name,
83 },
84 Subjects: []rbacv1.Subject{
85 {
86 Kind: "ServiceAccount",
87 Name: sa.Name,
88 Namespace: namespace.Name,
89 },
90 },
91 }
92
93 if err := tm.Create(ctx, roleBinding); err != nil {
94 return fmt.Errorf("failed to create role binding: %w", err)
95 }
96
97 return nil
98}
99
100// EnforceResourceQuotas sets resource limits for tenants
101func (tm *TenantManager) EnforceResourceQuotas(ctx context.Context, tenantName string, limits map[string]string) error {
102 namespace := fmt.Sprintf("tenant-%s", tenantName)
103
104 quota := &corev1.ResourceQuota{
105 ObjectMeta: metav1.ObjectMeta{
106 Name: fmt.Sprintf("%s-quota", tenantName),
107 Namespace: namespace,
108 },
109 Spec: corev1.ResourceQuotaSpec{
110 Hard: corev1.ResourceList{},
111 },
112 }
113
114 // Apply tenant-specific limits
115 for resource, limit := range limits {
116 quota.Spec.Hard[corev1.ResourceName(resource)] = resource.MustParse(limit)
117 }
118
119 if err := tm.Create(ctx, quota); err != nil {
120 return fmt.Errorf("failed to create resource quota: %w", err)
121 }
122
123 return nil
124}
Disaster Recovery and Backup Integration
Operators should handle backup and recovery automatically:
1// pkg/backup/manager.go - Backup and recovery management
2
3package backup
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 batchv1 "k8s.io/api/batch/v1"
11 corev1 "k8s.io/api/core/v1"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "sigs.k8s.io/controller-runtime/pkg/client"
14
15 databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
16)
17
18// BackupManager handles database backups
19type BackupManager struct {
20 client.Client
21}
22
23// CreateBackupJob creates a Kubernetes Job for database backup
24func (bm *BackupManager) CreateBackupJob(
25 ctx context.Context,
26 postgres *databasev1alpha1.PostgreSQL,
27) error {
28 backupName := fmt.Sprintf("%s-backup-%d", postgres.Name, time.Now().Unix())
29
30 job := &batchv1.Job{
31 ObjectMeta: metav1.ObjectMeta{
32 Name: backupName,
33 Namespace: postgres.Namespace,
34 Labels: map[string]string{
35 "app": "postgresql-backup",
36 "database": postgres.Name,
37 "managed-by": "database-operator",
38 },
39 },
40 Spec: batchv1.JobSpec{
41 Template: corev1.PodTemplateSpec{
42 Spec: corev1.PodSpec{
43 RestartPolicy: corev1.RestartPolicyNever,
44 Containers: []corev1.Container{
45 {
46 Name: "backup",
47 Image: "postgres:15.4",
48 Command: []string{
49 "/bin/bash",
50 "-c",
51 fmt.Sprintf(`
52 pg_dump -h %s-primary -U postgres -d postgresdb > /backup/dump.sql
53 gzip /backup/dump.sql
54 aws s3 cp /backup/dump.sql.gz s3://%s/%s/backup-$(date +%%Y%%m%%d-%%H%%M%%S).sql.gz
55 `, postgres.Name, postgres.Spec.BackupSpec.S3Bucket, postgres.Name),
56 },
57 Env: []corev1.EnvVar{
58 {
59 Name: "PGPASSWORD",
60 ValueFrom: &corev1.EnvVarSource{
61 SecretKeyRef: &corev1.SecretKeySelector{
62 LocalObjectReference: corev1.LocalObjectReference{
63 Name: fmt.Sprintf("%s-credentials", postgres.Name),
64 },
65 Key: "password",
66 },
67 },
68 },
69 {
70 Name: "AWS_REGION",
71 Value: postgres.Spec.BackupSpec.S3Region,
72 },
73 },
74 VolumeMounts: []corev1.VolumeMount{
75 {
76 Name: "backup-volume",
77 MountPath: "/backup",
78 },
79 },
80 },
81 },
82 Volumes: []corev1.Volume{
83 {
84 Name: "backup-volume",
85 VolumeSource: corev1.VolumeSource{
86 EmptyDir: &corev1.EmptyDirVolumeSource{},
87 },
88 },
89 },
90 },
91 },
92 BackoffLimit: int32Ptr(3),
93 },
94 }
95
96 if err := bm.Create(ctx, job); err != nil {
97 return fmt.Errorf("failed to create backup job: %w", err)
98 }
99
100 // Update status with backup time
101 postgres.Status.LastBackupTime = &metav1.Time{Time: time.Now()}
102 if err := bm.Status().Update(ctx, postgres); err != nil {
103 return fmt.Errorf("failed to update status: %w", err)
104 }
105
106 return nil
107}
108
109// CreateBackupCronJob creates scheduled backups
110func (bm *BackupManager) CreateBackupCronJob(
111 ctx context.Context,
112 postgres *databasev1alpha1.PostgreSQL,
113) error {
114 cronJob := &batchv1.CronJob{
115 ObjectMeta: metav1.ObjectMeta{
116 Name: fmt.Sprintf("%s-backup-cronjob", postgres.Name),
117 Namespace: postgres.Namespace,
118 },
119 Spec: batchv1.CronJobSpec{
120 Schedule: postgres.Spec.BackupSchedule,
121 JobTemplate: batchv1.JobTemplateSpec{
122 Spec: batchv1.JobSpec{
123 Template: corev1.PodTemplateSpec{
124 Spec: corev1.PodSpec{
125 RestartPolicy: corev1.RestartPolicyNever,
126 Containers: []corev1.Container{
127 {
128 Name: "backup",
129 Image: "postgres:15.4",
130 Command: []string{"/bin/bash", "-c", "pg_dump ..."},
131 },
132 },
133 },
134 },
135 },
136 },
137 SuccessfulJobsHistoryLimit: int32Ptr(3),
138 FailedJobsHistoryLimit: int32Ptr(1),
139 },
140 }
141
142 if err := bm.Create(ctx, cronJob); err != nil {
143 return fmt.Errorf("failed to create backup cronjob: %w", err)
144 }
145
146 return nil
147}
148
149// RestoreFromBackup restores database from S3 backup
150func (bm *BackupManager) RestoreFromBackup(
151 ctx context.Context,
152 postgres *databasev1alpha1.PostgreSQL,
153 backupTimestamp string,
154) error {
155 restoreName := fmt.Sprintf("%s-restore-%d", postgres.Name, time.Now().Unix())
156
157 job := &batchv1.Job{
158 ObjectMeta: metav1.ObjectMeta{
159 Name: restoreName,
160 Namespace: postgres.Namespace,
161 },
162 Spec: batchv1.JobSpec{
163 Template: corev1.PodTemplateSpec{
164 Spec: corev1.PodSpec{
165 RestartPolicy: corev1.RestartPolicyNever,
166 Containers: []corev1.Container{
167 {
168 Name: "restore",
169 Image: "postgres:15.4",
170 Command: []string{
171 "/bin/bash",
172 "-c",
173 fmt.Sprintf(`
174 aws s3 cp s3://%s/%s/backup-%s.sql.gz /tmp/backup.sql.gz
175 gunzip /tmp/backup.sql.gz
176 psql -h %s-primary -U postgres -d postgresdb < /tmp/backup.sql
177 `, postgres.Spec.BackupSpec.S3Bucket, postgres.Name, backupTimestamp, postgres.Name),
178 },
179 },
180 },
181 },
182 },
183 },
184 }
185
186 if err := bm.Create(ctx, job); err != nil {
187 return fmt.Errorf("failed to create restore job: %w", err)
188 }
189
190 return nil
191}
192
193func int32Ptr(i int32) *int32 {
194 return &i
195}
Practice Exercises
Exercise 1: Build a ConfigMap Operator
Objective: Create an operator that manages ConfigMaps with automatic reload of applications when configuration changes.
Requirements:
- Define a ConfigMapWatcher custom resource
- Implement reconciliation logic that watches ConfigMap changes
- Trigger rolling restart of pods when configuration changes
- Add status tracking for reload operations
- Include metrics for configuration reloads
Implementation:
1// api/v1alpha1/configmapwatcher_types.go
2package v1alpha1
3
4import (
5 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6)
7
8// ConfigMapWatcherSpec defines the desired state
9type ConfigMapWatcherSpec struct {
10 // Source ConfigMap to watch
11 // +kubebuilder:validation:Required
12 SourceConfigMap string `json:"sourceConfigMap"`
13
14 // Target deployments to reload
15 // +kubebuilder:validation:Required
16 // +kubebuilder:validation:MinItems=1
17 TargetDeployments []string `json:"targetDeployments"`
18
19 // Reload strategy
20 // +kubebuilder:validation:Enum=rolling;recreate
21 // +kubebuilder:default=rolling
22 ReloadStrategy string `json:"reloadStrategy,omitempty"`
23
24 // Check interval in seconds
25 // +kubebuilder:validation:Minimum=10
26 // +kubebuilder:default=60
27 CheckInterval int32 `json:"checkInterval,omitempty"`
28}
29
30// ConfigMapWatcherStatus defines the observed state
31type ConfigMapWatcherStatus struct {
32 // Last observed ConfigMap version
33 LastConfigVersion string `json:"lastConfigVersion,omitempty"`
34
35 // Last reload timestamp
36 LastReloadTime *metav1.Time `json:"lastReloadTime,omitempty"`
37
38 // Number of successful reloads
39 ReloadCount int32 `json:"reloadCount"`
40
41 // Status conditions
42 Conditions []metav1.Condition `json:"conditions,omitempty"`
43}
44
45//+kubebuilder:object:root=true
46//+kubebuilder:subresource:status
47//+kubebuilder:printcolumn:name="Source",type="string",JSONPath=".spec.sourceConfigMap"
48//+kubebuilder:printcolumn:name="Reloads",type="integer",JSONPath=".status.reloadCount"
49//+kubebuilder:printcolumn:name="Last Reload",type="date",JSONPath=".status.lastReloadTime"
50
51type ConfigMapWatcher struct {
52 metav1.TypeMeta `json:",inline"`
53 metav1.ObjectMeta `json:"metadata,omitempty"`
54
55 Spec ConfigMapWatcherSpec `json:"spec,omitempty"`
56 Status ConfigMapWatcherStatus `json:"status,omitempty"`
57}
58
59//+kubebuilder:object:root=true
60
61type ConfigMapWatcherList struct {
62 metav1.TypeMeta `json:",inline"`
63 metav1.ListMeta `json:"metadata,omitempty"`
64 Items []ConfigMapWatcher `json:"items"`
65}
Reconciliation Controller:
1// controllers/configmapwatcher_controller.go
2package controllers
3
4import (
5 "context"
6 "crypto/sha256"
7 "encoding/hex"
8 "fmt"
9 "time"
10
11 appsv1 "k8s.io/api/apps/v1"
12 corev1 "k8s.io/api/core/v1"
13 "k8s.io/apimachinery/pkg/api/errors"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/apimachinery/pkg/types"
16 ctrl "sigs.k8s.io/controller-runtime"
17 "sigs.k8s.io/controller-runtime/pkg/client"
18 "sigs.k8s.io/controller-runtime/pkg/log"
19
20 configv1alpha1 "mycompany.com/config-operator/api/v1alpha1"
21)
22
23type ConfigMapWatcherReconciler struct {
24 client.Client
25 Scheme *runtime.Scheme
26}
27
28func (r *ConfigMapWatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
29 logger := log.FromContext(ctx)
30
31 // Fetch the ConfigMapWatcher
32 watcher := &configv1alpha1.ConfigMapWatcher{}
33 if err := r.Get(ctx, req.NamespacedName, watcher); err != nil {
34 if errors.IsNotFound(err) {
35 return ctrl.Result{}, nil
36 }
37 return ctrl.Result{}, err
38 }
39
40 // Fetch the source ConfigMap
41 configMap := &corev1.ConfigMap{}
42 err := r.Get(ctx, types.NamespacedName{
43 Name: watcher.Spec.SourceConfigMap,
44 Namespace: watcher.Namespace,
45 }, configMap)
46 if err != nil {
47 logger.Error(err, "Failed to get source ConfigMap")
48 return ctrl.Result{RequeueAfter: time.Second * 30}, err
49 }
50
51 // Calculate current config hash
52 currentHash := r.calculateConfigHash(configMap)
53
54 // Check if configuration changed
55 if watcher.Status.LastConfigVersion != currentHash {
56 logger.Info("Configuration changed, triggering reload",
57 "oldHash", watcher.Status.LastConfigVersion,
58 "newHash", currentHash)
59
60 // Reload target deployments
61 if err := r.reloadDeployments(ctx, watcher); err != nil {
62 logger.Error(err, "Failed to reload deployments")
63 return ctrl.Result{RequeueAfter: time.Second * 30}, err
64 }
65
66 // Update status
67 watcher.Status.LastConfigVersion = currentHash
68 watcher.Status.LastReloadTime = &metav1.Time{Time: time.Now()}
69 watcher.Status.ReloadCount++
70
71 if err := r.Status().Update(ctx, watcher); err != nil {
72 return ctrl.Result{}, err
73 }
74
75 logger.Info("Reload completed successfully")
76 }
77
78 // Requeue after check interval
79 interval := time.Duration(watcher.Spec.CheckInterval) * time.Second
80 return ctrl.Result{RequeueAfter: interval}, nil
81}
82
83// calculateConfigHash computes hash of ConfigMap data
84func (r *ConfigMapWatcherReconciler) calculateConfigHash(cm *corev1.ConfigMap) string {
85 h := sha256.New()
86 for k, v := range cm.Data {
87 h.Write([]byte(k + ":" + v))
88 }
89 for k, v := range cm.BinaryData {
90 h.Write([]byte(k))
91 h.Write(v)
92 }
93 return hex.EncodeToString(h.Sum(nil))
94}
95
96// reloadDeployments triggers reload of target deployments
97func (r *ConfigMapWatcherReconciler) reloadDeployments(ctx context.Context, watcher *configv1alpha1.ConfigMapWatcher) error {
98 for _, depName := range watcher.Spec.TargetDeployments {
99 deployment := &appsv1.Deployment{}
100 err := r.Get(ctx, types.NamespacedName{
101 Name: depName,
102 Namespace: watcher.Namespace,
103 }, deployment)
104 if err != nil {
105 return fmt.Errorf("failed to get deployment %s: %w", depName, err)
106 }
107
108 // Trigger rolling restart by updating annotation
109 if deployment.Spec.Template.Annotations == nil {
110 deployment.Spec.Template.Annotations = make(map[string]string)
111 }
112 deployment.Spec.Template.Annotations["configmap-watcher/reloaded-at"] = time.Now().Format(time.RFC3339)
113
114 if err := r.Update(ctx, deployment); err != nil {
115 return fmt.Errorf("failed to update deployment %s: %w", depName, err)
116 }
117 }
118
119 return nil
120}
121
122func (r *ConfigMapWatcherReconciler) SetupWithManager(mgr ctrl.Manager) error {
123 return ctrl.NewControllerManagedBy(mgr).
124 For(&configv1alpha1.ConfigMapWatcher{}).
125 Owns(&appsv1.Deployment{}).
126 Complete(r)
127}
Testing: Write tests to verify config changes trigger reloads correctly.
Exercise 2: Implement Advanced Backup and Restore
Objective: Add comprehensive backup functionality to your PostgreSQL operator with S3 integration, point-in-time recovery, and backup verification.
Requirements:
- Automated scheduled backups with CronJob
- Point-in-time recovery capability
- Backup verification after creation
- Retention policy enforcement
- Multi-region backup support
- Backup/restore status tracking
Implementation:
1// api/v1alpha1/postgresql_types.go - Enhanced backup specification
2
3package v1alpha1
4
5import (
6 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7)
8
9// BackupSpec defines backup configuration
10type BackupSpec struct {
11 // Enable automated backups
12 // +kubebuilder:default=true
13 Enabled bool `json:"enabled"`
14
15 // Backup schedule in cron format
16 // +kubebuilder:validation:Required
17 Schedule string `json:"schedule"`
18
19 // S3 bucket for backups
20 // +kubebuilder:validation:Required
21 S3Bucket string `json:"s3Bucket"`
22
23 // S3 region
24 // +kubebuilder:validation:Required
25 S3Region string `json:"s3Region"`
26
27 // Retention in days
28 // +kubebuilder:validation:Minimum=1
29 // +kubebuilder:validation:Maximum=365
30 // +kubebuilder:default=30
31 RetentionDays int `json:"retentionDays"`
32
33 // Enable point-in-time recovery
34 // +kubebuilder:default=false
35 PITREnabled bool `json:"pitrEnabled,omitempty"`
36
37 // Backup verification enabled
38 // +kubebuilder:default=true
39 VerifyBackup bool `json:"verifyBackup"`
40
41 // Compression level (1-9)
42 // +kubebuilder:validation:Minimum=1
43 // +kubebuilder:validation:Maximum=9
44 // +kubebuilder:default=6
45 CompressionLevel int `json:"compressionLevel,omitempty"`
46
47 // Multi-region backup copies
48 ReplicationRegions []string `json:"replicationRegions,omitempty"`
49}
50
51// BackupStatus tracks backup operations
52type BackupStatus struct {
53 // Last successful backup
54 LastBackupTime *metav1.Time `json:"lastBackupTime,omitempty"`
55
56 // Last backup size in bytes
57 LastBackupSize int64 `json:"lastBackupSize,omitempty"`
58
59 // Total backups created
60 TotalBackups int32 `json:"totalBackups"`
61
62 // Failed backup attempts
63 FailedBackups int32 `json:"failedBackups"`
64
65 // Latest backup location
66 LatestBackupPath string `json:"latestBackupPath,omitempty"`
67
68 // PITR status
69 PITRStatus PITRStatus `json:"pitrStatus,omitempty"`
70}
71
72// PITRStatus tracks point-in-time recovery capability
73type PITRStatus struct {
74 // Enabled indicates if PITR is active
75 Enabled bool `json:"enabled"`
76
77 // Earliest recovery point
78 EarliestRecoveryPoint *metav1.Time `json:"earliestRecoveryPoint,omitempty"`
79
80 // Latest recovery point
81 LatestRecoveryPoint *metav1.Time `json:"latestRecoveryPoint,omitempty"`
82
83 // WAL archive location
84 WALArchiveLocation string `json:"walArchiveLocation,omitempty"`
85}
Backup Controller Implementation:
1// pkg/backup/controller.go - Advanced backup controller
2
3package backup
4
5import (
6 "context"
7 "fmt"
8 "time"
9
10 batchv1 "k8s.io/api/batch/v1"
11 corev1 "k8s.io/api/core/v1"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "k8s.io/apimachinery/pkg/api/resource"
14 "sigs.k8s.io/controller-runtime/pkg/client"
15
16 databasev1alpha1 "mycompany.com/database-operator/api/v1alpha1"
17)
18
19// BackupController manages database backups
20type BackupController struct {
21 client.Client
22}
23
24// ReconcileBackup ensures backup infrastructure is in place
25func (bc *BackupController) ReconcileBackup(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
26 if !postgres.Spec.BackupSpec.Enabled {
27 return nil
28 }
29
30 // Create backup CronJob
31 if err := bc.ensureBackupCronJob(ctx, postgres); err != nil {
32 return fmt.Errorf("failed to ensure backup cronjob: %w", err)
33 }
34
35 // Set up PITR if enabled
36 if postgres.Spec.BackupSpec.PITREnabled {
37 if err := bc.setupPITR(ctx, postgres); err != nil {
38 return fmt.Errorf("failed to setup PITR: %w", err)
39 }
40 }
41
42 // Create retention cleanup job
43 if err := bc.ensureRetentionCleanup(ctx, postgres); err != nil {
44 return fmt.Errorf("failed to ensure retention cleanup: %w", err)
45 }
46
47 return nil
48}
49
50// ensureBackupCronJob creates or updates the backup CronJob
51func (bc *BackupController) ensureBackupCronJob(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
52 cronJob := &batchv1.CronJob{
53 ObjectMeta: metav1.ObjectMeta{
54 Name: fmt.Sprintf("%s-backup", postgres.Name),
55 Namespace: postgres.Namespace,
56 Labels: map[string]string{
57 "app": "postgresql-backup",
58 "database": postgres.Name,
59 },
60 },
61 Spec: batchv1.CronJobSpec{
62 Schedule: postgres.Spec.BackupSchedule,
63 SuccessfulJobsHistoryLimit: int32Ptr(5),
64 FailedJobsHistoryLimit: int32Ptr(3),
65 ConcurrencyPolicy: batchv1.ForbidConcurrent,
66 JobTemplate: batchv1.JobTemplateSpec{
67 Spec: batchv1.JobSpec{
68 Template: corev1.PodTemplateSpec{
69 Spec: bc.createBackupPodSpec(postgres),
70 },
71 BackoffLimit: int32Ptr(3),
72 },
73 },
74 },
75 }
76
77 return bc.Create(ctx, cronJob)
78}
79
80// createBackupPodSpec generates the pod specification for backup jobs
81func (bc *BackupController) createBackupPodSpec(postgres *databasev1alpha1.PostgreSQL) corev1.PodSpec {
82 return corev1.PodSpec{
83 RestartPolicy: corev1.RestartPolicyNever,
84 Containers: []corev1.Container{
85 {
86 Name: "backup",
87 Image: fmt.Sprintf("postgres:%s", postgres.Spec.Version),
88 Command: []string{"/bin/bash", "-c"},
89 Args: []string{
90 fmt.Sprintf(`
91 set -e
92 BACKUP_NAME="backup-$(date +%%Y%%m%%d-%%H%%M%%S)"
93 BACKUP_PATH="/tmp/${BACKUP_NAME}.sql"
94
95 echo "Starting backup: ${BACKUP_NAME}"
96
97 # Create backup
98 pg_dump -h %s-primary -U postgres -d postgresdb > ${BACKUP_PATH}
99
100 # Compress backup
101 gzip --%d ${BACKUP_PATH}
102
103 # Calculate checksum
104 CHECKSUM=$(sha256sum ${BACKUP_PATH}.gz | awk '{print $1}')
105 echo "Checksum: ${CHECKSUM}"
106
107 # Upload to S3
108 aws s3 cp ${BACKUP_PATH}.gz s3://%s/%s/${BACKUP_NAME}.sql.gz \
109 --metadata checksum=${CHECKSUM}
110
111 # Verify upload
112 if [ "%t" = "true" ]; then
113 echo "Verifying backup..."
114 aws s3 cp s3://%s/%s/${BACKUP_NAME}.sql.gz /tmp/verify.sql.gz
115 VERIFY_CHECKSUM=$(sha256sum /tmp/verify.sql.gz | awk '{print $1}')
116
117 if [ "${CHECKSUM}" != "${VERIFY_CHECKSUM}" ]; then
118 echo "Backup verification failed!"
119 exit 1
120 fi
121 echo "Backup verified successfully"
122 fi
123
124 # Replicate to other regions
125 %s
126
127 echo "Backup completed: ${BACKUP_NAME}"
128 echo "Size: $(du -h ${BACKUP_PATH}.gz | awk '{print $1}')"
129 `,
130 postgres.Name,
131 postgres.Spec.BackupSpec.CompressionLevel,
132 postgres.Spec.BackupSpec.S3Bucket,
133 postgres.Name,
134 postgres.Spec.BackupSpec.VerifyBackup,
135 postgres.Spec.BackupSpec.S3Bucket,
136 postgres.Name,
137 bc.generateReplicationScript(postgres),
138 ),
139 },
140 Env: []corev1.EnvVar{
141 {
142 Name: "PGPASSWORD",
143 ValueFrom: &corev1.EnvVarSource{
144 SecretKeyRef: &corev1.SecretKeySelector{
145 LocalObjectReference: corev1.LocalObjectReference{
146 Name: fmt.Sprintf("%s-credentials", postgres.Name),
147 },
148 Key: "password",
149 },
150 },
151 },
152 {
153 Name: "AWS_REGION",
154 Value: postgres.Spec.BackupSpec.S3Region,
155 },
156 {
157 Name: "AWS_ACCESS_KEY_ID",
158 ValueFrom: &corev1.EnvVarSource{
159 SecretKeyRef: &corev1.SecretKeySelector{
160 LocalObjectReference: corev1.LocalObjectReference{
161 Name: "aws-credentials",
162 },
163 Key: "access-key-id",
164 },
165 },
166 },
167 {
168 Name: "AWS_SECRET_ACCESS_KEY",
169 ValueFrom: &corev1.EnvVarSource{
170 SecretKeyRef: &corev1.SecretKeySelector{
171 LocalObjectReference: corev1.LocalObjectReference{
172 Name: "aws-credentials",
173 },
174 Key: "secret-access-key",
175 },
176 },
177 },
178 },
179 Resources: corev1.ResourceRequirements{
180 Requests: corev1.ResourceList{
181 corev1.ResourceCPU: resource.MustParse("500m"),
182 corev1.ResourceMemory: resource.MustParse("1Gi"),
183 },
184 Limits: corev1.ResourceList{
185 corev1.ResourceCPU: resource.MustParse("2"),
186 corev1.ResourceMemory: resource.MustParse("4Gi"),
187 },
188 },
189 },
190 },
191 }
192}
193
194// generateReplicationScript creates script for multi-region replication
195func (bc *BackupController) generateReplicationScript(postgres *databasev1alpha1.PostgreSQL) string {
196 if len(postgres.Spec.BackupSpec.ReplicationRegions) == 0 {
197 return "echo 'No replication configured'"
198 }
199
200 script := ""
201 for _, region := range postgres.Spec.BackupSpec.ReplicationRegions {
202 script += fmt.Sprintf(`
203 echo "Replicating to region: %s"
204 aws s3 cp s3://%s/%s/${BACKUP_NAME}.sql.gz \
205 s3://%s-%s/%s/${BACKUP_NAME}.sql.gz \
206 --source-region %s --region %s
207 `, region,
208 postgres.Spec.BackupSpec.S3Bucket, postgres.Name,
209 postgres.Spec.BackupSpec.S3Bucket, region, postgres.Name,
210 postgres.Spec.BackupSpec.S3Region, region)
211 }
212 return script
213}
214
215// setupPITR configures point-in-time recovery
216func (bc *BackupController) setupPITR(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
217 // Create WAL archiving configuration
218 // This would configure PostgreSQL to archive WAL files to S3
219 // Implementation details depend on your PostgreSQL setup
220 return nil
221}
222
223// ensureRetentionCleanup creates job to clean up old backups
224func (bc *BackupController) ensureRetentionCleanup(ctx context.Context, postgres *databasev1alpha1.PostgreSQL) error {
225 cronJob := &batchv1.CronJob{
226 ObjectMeta: metav1.ObjectMeta{
227 Name: fmt.Sprintf("%s-backup-cleanup", postgres.Name),
228 Namespace: postgres.Namespace,
229 },
230 Spec: batchv1.CronJobSpec{
231 Schedule: "0 3 * * *", // Daily at 3 AM
232 JobTemplate: batchv1.JobTemplateSpec{
233 Spec: batchv1.JobSpec{
234 Template: corev1.PodTemplateSpec{
235 Spec: corev1.PodSpec{
236 RestartPolicy: corev1.RestartPolicyNever,
237 Containers: []corev1.Container{
238 {
239 Name: "cleanup",
240 Image: "amazon/aws-cli:latest",
241 Command: []string{"/bin/bash", "-c"},
242 Args: []string{
243 fmt.Sprintf(`
244 RETENTION_DAYS=%d
245 CUTOFF_DATE=$(date -d "${RETENTION_DAYS} days ago" +%%Y%%m%%d)
246
247 echo "Cleaning up backups older than ${CUTOFF_DATE}"
248
249 aws s3 ls s3://%s/%s/ | while read -r line; do
250 BACKUP_DATE=$(echo $line | grep -oP 'backup-\K[0-9]{8}')
251 if [ "${BACKUP_DATE}" -lt "${CUTOFF_DATE}" ]; then
252 BACKUP_FILE=$(echo $line | awk '{print $4}')
253 echo "Deleting old backup: ${BACKUP_FILE}"
254 aws s3 rm s3://%s/%s/${BACKUP_FILE}
255 fi
256 done
257
258 echo "Cleanup completed"
259 `,
260 postgres.Spec.BackupSpec.RetentionDays,
261 postgres.Spec.BackupSpec.S3Bucket,
262 postgres.Name,
263 postgres.Spec.BackupSpec.S3Bucket,
264 postgres.Name,
265 ),
266 },
267 },
268 },
269 },
270 },
271 },
272 },
273 },
274 }
275
276 return bc.Create(ctx, cronJob)
277}
278
279func int32Ptr(i int32) *int32 {
280 return &i
281}
Testing: Verify backups are created, verified, replicated, and cleaned up according to retention policy.
Exercise 3: StatefulSet Scaling Controller with Intelligent Autoscaling
Objective: Implement intelligent scaling based on database load metrics with predictive scaling capabilities.
Requirements:
- Monitor database metrics (connections, CPU, memory, query latency)
- Implement predictive scaling based on historical patterns
- Support custom metrics from Prometheus
- Implement scale-up and scale-down cooldown periods
- Add safety checks to prevent thrashing
Implementation:
1// api/v1alpha1/autoscaling_types.go
2
3package v1alpha1
4
5import (
6 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7)
8
9// AutoscalingSpec defines autoscaling configuration
10type AutoscalingSpec struct {
11 // Enable autoscaling
12 // +kubebuilder:default=false
13 Enabled bool `json:"enabled"`
14
15 // Minimum replicas
16 // +kubebuilder:validation:Minimum=1
17 // +kubebuilder:default=1
18 MinReplicas int32 `json:"minReplicas"`
19
20 // Maximum replicas
21 // +kubebuilder:validation:Minimum=1
22 // +kubebuilder:validation:Maximum=100
23 // +kubebuilder:default=10
24 MaxReplicas int32 `json:"maxReplicas"`
25
26 // Target CPU utilization percentage
27 // +kubebuilder:validation:Minimum=1
28 // +kubebuilder:validation:Maximum=100
29 // +kubebuilder:default=70
30 TargetCPUUtilization int `json:"targetCPUUtilization"`
31
32 // Target memory utilization percentage
33 // +kubebuilder:validation:Minimum=1
34 // +kubebuilder:validation:Maximum=100
35 // +kubebuilder:default=80
36 TargetMemoryUtilization int `json:"targetMemoryUtilization"`
37
38 // Target database connections percentage
39 // +kubebuilder:validation:Minimum=1
40 // +kubebuilder:validation:Maximum=100
41 // +kubebuilder:default=75
42 TargetConnectionUtilization int `json:"targetConnectionUtilization"`
43
44 // Custom Prometheus metrics for scaling
45 CustomMetrics []CustomMetric `json:"customMetrics,omitempty"`
46
47 // Scale up cooldown in seconds
48 // +kubebuilder:validation:Minimum=0
49 // +kubebuilder:default=300
50 ScaleUpCooldown int32 `json:"scaleUpCooldown"`
51
52 // Scale down cooldown in seconds
53 // +kubebuilder:validation:Minimum=0
54 // +kubebuilder:default=600
55 ScaleDownCooldown int32 `json:"scaleDownCooldown"`
56
57 // Enable predictive scaling
58 // +kubebuilder:default=false
59 PredictiveScaling bool `json:"predictiveScaling,omitempty"`
60
61 // Behavior configuration
62 Behavior *ScalingBehavior `json:"behavior,omitempty"`
63}
64
65// CustomMetric defines a custom Prometheus metric for scaling
66type CustomMetric struct {
67 // Metric name
68 Name string `json:"name"`
69
70 // Prometheus query
71 Query string `json:"query"`
72
73 // Target value
74 TargetValue string `json:"targetValue"`
75
76 // Metric type (average, value)
77 Type string `json:"type"`
78}
79
80// ScalingBehavior defines scaling behavior rules
81type ScalingBehavior struct {
82 // Scale up rules
83 ScaleUp *ScalingRules `json:"scaleUp,omitempty"`
84
85 // Scale down rules
86 ScaleDown *ScalingRules `json:"scaleDown,omitempty"`
87}
88
89// ScalingRules defines scaling policy rules
90type ScalingRules struct {
91 // Stabilization window in seconds
92 StabilizationWindowSeconds int32 `json:"stabilizationWindowSeconds,omitempty"`
93
94 // Maximum number of pods to add/remove
95 MaxPods int32 `json:"maxPods,omitempty"`
96
97 // Maximum percentage to scale by
98 MaxPercent int32 `json:"maxPercent,omitempty"`
99}
100
101// AutoscalingStatus tracks autoscaling status
102type AutoscalingStatus struct {
103 // Current replicas
104 CurrentReplicas int32 `json:"currentReplicas"`
105
106 // Desired replicas
107 DesiredReplicas int32 `json:"desiredReplicas"`
108
109 // Last scale time
110 LastScaleTime *metav1.Time `json:"lastScaleTime,omitempty"`
111
112 // Current metrics
113 CurrentMetrics []MetricStatus `json:"currentMetrics,omitempty"`
114
115 // Scaling decision reason
116 LastScaleReason string `json:"lastScaleReason,omitempty"`
117}
118
119// MetricStatus represents current status of a metric
120type MetricStatus struct {
121 // Metric name
122 Name string `json:"name"`
123
124 // Current value
125 CurrentValue string `json:"currentValue"`
126
127 // Target value
128 TargetValue string `json:"targetValue"`
129}
Task: Implement the autoscaling reconciler that monitors metrics and scales the StatefulSet accordingly. Include predictive scaling based on time-series analysis of historical data.
Exercise 4: Multi-Cluster Deployment Operator
Objective: Extend your operator to deploy and manage PostgreSQL clusters across multiple Kubernetes clusters with automatic failover.
Requirements:
- Deploy to multiple clusters simultaneously
- Configure cross-cluster replication
- Implement automatic failover between clusters
- Monitor cluster health across regions
- Synchronize configuration across clusters
Task: Design and implement the multi-cluster architecture, including cluster discovery, resource synchronization, and failover logic.
Exercise 5: Certificate Management Operator
Objective: Create an operator that manages SSL/TLS certificates with automatic renewal using cert-manager integration or ACME protocol.
Requirements:
- Automatic certificate issuance
- Automatic renewal before expiration
- Certificate rotation without downtime
- Support for multiple certificate authorities
- Integration with secret management
Task: Implement the certificate lifecycle management including issuance, renewal, and rotation with zero downtime.
Summary
Key Takeaways
- Operators encode operational knowledge into reusable, automated code
- Reconciliation loops continuously drive actual state toward desired state
- Controller-runtime provides the framework for building production-ready operators
- Testing is crucial - use envtest for comprehensive operator testing
- Production patterns include leader election, observability, and graceful shutdown
Next Steps
- Explore the Operator SDK for additional tools and scaffolding options
- Study existing operators for advanced patterns
- Learn about Helm integration for packaging and distributing operators
- Investigate webhooks for validating and mutating custom resources
- Consider Operator Lifecycle Manager for operator distribution
Further Reading
- Kubernetes Operator Documentation
- Kubebuilder Book
- Operator Framework Guide
- Controller-Runtime Documentation
- Kubernetes API Machinery Concepts
You're now equipped to build production-ready Kubernetes operators that can automate complex operational tasks at scale. The patterns and practices covered here will help you create operators that are maintainable, testable, and production-ready.