Scheduler

Exercise: Scheduler

Difficulty - Advanced

Learning Objectives

  • Implement task scheduling with priorities
  • Handle cron-like scheduling
  • Support delayed and recurring tasks
  • Implement graceful shutdown
  • Track task execution history
  • Handle task dependencies

Problem Statement

Create a flexible task scheduler supporting one-time, recurring, and cron-based task execution with priority queuing.

Core Components

 1package scheduler
 2
 3import (
 4    "context"
 5    "time"
 6)
 7
 8type Priority int
 9
10const (
11    Low Priority = iota
12    Normal
13    High
14    Critical
15)
16
17type Task struct {
18    ID       string
19    Priority Priority
20    RunAt    time.Time
21    Interval time.Duration
22    Function func(context.Context) error
23}
24
25type Scheduler struct {
26    tasks    []*Task
27    running  bool
28}
29
30func New() *Scheduler
31func Schedule(task *Task) error
32func ScheduleCron(expr string, fn func(context.Context) error) error
33func Start(ctx context.Context)
34func Stop()
35func Remove(taskID string) error

Solution

Click to see the solution
  1package scheduler
  2
  3import (
  4    "container/heap"
  5    "context"
  6    "errors"
  7    "fmt"
  8    "sync"
  9    "time"
 10)
 11
 12type Priority int
 13
 14const (
 15    Low Priority = iota
 16    Normal
 17    High
 18    Critical
 19)
 20
 21type TaskStatus int
 22
 23const (
 24    Pending TaskStatus = iota
 25    Running
 26    Completed
 27    Failed
 28)
 29
 30type Task struct {
 31    ID         string
 32    Priority   Priority
 33    RunAt      time.Time
 34    Interval   time.Duration
 35    Cron       string
 36    Function   func(context.Context) error
 37    MaxRetries int
 38    Retries    int
 39    Status     TaskStatus
 40    LastRun    time.Time
 41    NextRun    time.Time
 42    mu         sync.RWMutex
 43}
 44
 45type taskQueue []*Task
 46
 47func Len() int { return len(tq) }
 48
 49func Less(i, j int) bool {
 50    if tq[i].Priority != tq[j].Priority {
 51        return tq[i].Priority > tq[j].Priority
 52    }
 53    return tq[i].RunAt.Before(tq[j].RunAt)
 54}
 55
 56func Swap(i, j int) {
 57    tq[i], tq[j] = tq[j], tq[i]
 58}
 59
 60func Push(x interface{}) {
 61    *tq = append(*tq, x.(*Task))
 62}
 63
 64func Pop() interface{} {
 65    old := *tq
 66    n := len(old)
 67    task := old[n-1]
 68    *tq = old[0 : n-1]
 69    return task
 70}
 71
 72type Scheduler struct {
 73    tasks      taskQueue
 74    taskMap    map[string]*Task
 75    mu         sync.RWMutex
 76    running    bool
 77    stopCh     chan struct{}
 78    newTaskCh  chan *Task
 79    history    []*TaskExecution
 80    historyMu  sync.RWMutex
 81}
 82
 83type TaskExecution struct {
 84    TaskID    string
 85    StartTime time.Time
 86    EndTime   time.Time
 87    Error     error
 88    Duration  time.Duration
 89}
 90
 91func New() *Scheduler {
 92    s := &Scheduler{
 93        tasks:     make(taskQueue, 0),
 94        taskMap:   make(map[string]*Task),
 95        stopCh:    make(chan struct{}),
 96        newTaskCh: make(chan *Task, 100),
 97        history:   make([]*TaskExecution, 0),
 98    }
 99    heap.Init(&s.tasks)
100    return s
101}
102
103func Schedule(task *Task) error {
104    if task.ID == "" {
105        task.ID = fmt.Sprintf("task-%d", time.Now().UnixNano())
106    }
107
108    if task.RunAt.IsZero() {
109        task.RunAt = time.Now()
110    }
111
112    task.NextRun = task.RunAt
113    task.Status = Pending
114
115    s.mu.Lock()
116    if _, exists := s.taskMap[task.ID]; exists {
117        s.mu.Unlock()
118        return errors.New("task already exists")
119    }
120    s.taskMap[task.ID] = task
121    s.mu.Unlock()
122
123    if s.running {
124        s.newTaskCh <- task
125    } else {
126        s.mu.Lock()
127        heap.Push(&s.tasks, task)
128        s.mu.Unlock()
129    }
130
131    return nil
132}
133
134func ScheduleCron(expr string, fn func(context.Context) error) error {
135    // Simple cron parsing
136    nextRun, err := parseCron(expr)
137    if err != nil {
138        return err
139    }
140
141    task := &Task{
142        ID:       fmt.Sprintf("cron-%d", time.Now().UnixNano()),
143        Priority: Normal,
144        RunAt:    nextRun,
145        Cron:     expr,
146        Function: fn,
147    }
148
149    return s.Schedule(task)
150}
151
152func Start(ctx context.Context) {
153    s.mu.Lock()
154    if s.running {
155        s.mu.Unlock()
156        return
157    }
158    s.running = true
159    s.mu.Unlock()
160
161    go s.run(ctx)
162}
163
164func run(ctx context.Context) {
165    ticker := time.NewTicker(100 * time.Millisecond)
166    defer ticker.Stop()
167
168    for {
169        select {
170        case <-ctx.Done():
171            s.Stop()
172            return
173        case <-s.stopCh:
174            return
175        case task := <-s.newTaskCh:
176            s.mu.Lock()
177            heap.Push(&s.tasks, task)
178            s.mu.Unlock()
179        case <-ticker.C:
180            s.processTasks(ctx)
181        }
182    }
183}
184
185func processTasks(ctx context.Context) {
186    now := time.Now()
187
188    s.mu.Lock()
189    for s.tasks.Len() > 0 {
190        task := s.tasks[0]
191        if task.NextRun.After(now) {
192            break
193        }
194
195        task = heap.Pop(&s.tasks).(*Task)
196        s.mu.Unlock()
197
198        go s.executeTask(ctx, task)
199
200        s.mu.Lock()
201    }
202    s.mu.Unlock()
203}
204
205func executeTask(ctx context.Context, task *Task) {
206    execution := &TaskExecution{
207        TaskID:    task.ID,
208        StartTime: time.Now(),
209    }
210
211    task.mu.Lock()
212    task.Status = Running
213    task.LastRun = time.Now()
214    task.mu.Unlock()
215
216    err := task.Function(ctx)
217    execution.EndTime = time.Now()
218    execution.Duration = execution.EndTime.Sub(execution.StartTime)
219    execution.Error = err
220
221    s.historyMu.Lock()
222    s.history = append(s.history, execution)
223    if len(s.history) > 1000 {
224        s.history = s.history[len(s.history)-1000:]
225    }
226    s.historyMu.Unlock()
227
228    task.mu.Lock()
229    if err != nil {
230        task.Status = Failed
231        task.Retries++
232
233        if task.Retries < task.MaxRetries {
234            // Retry with exponential backoff
235            delay := time.Duration(1<<uint(task.Retries)) * time.Second
236            task.NextRun = time.Now().Add(delay)
237            task.Status = Pending
238            s.rescheduleTask(task)
239        }
240    } else {
241        task.Status = Completed
242        task.Retries = 0
243
244        // Reschedule if recurring
245        if task.Interval > 0 {
246            task.NextRun = time.Now().Add(task.Interval)
247            task.Status = Pending
248            s.rescheduleTask(task)
249        } else if task.Cron != "" {
250            nextRun, err := parseCron(task.Cron)
251            if err == nil {
252                task.NextRun = nextRun
253                task.Status = Pending
254                s.rescheduleTask(task)
255            }
256        }
257    }
258    task.mu.Unlock()
259}
260
261func rescheduleTask(task *Task) {
262    s.newTaskCh <- task
263}
264
265func Stop() {
266    s.mu.Lock()
267    if !s.running {
268        s.mu.Unlock()
269        return
270    }
271    s.running = false
272    s.mu.Unlock()
273
274    close(s.stopCh)
275}
276
277func Remove(taskID string) error {
278    s.mu.Lock()
279    defer s.mu.Unlock()
280
281    task, exists := s.taskMap[taskID]
282    if !exists {
283        return errors.New("task not found")
284    }
285
286    delete(s.taskMap, taskID)
287
288    // Remove from heap
289    for i, t := range s.tasks {
290        if t.ID == taskID {
291            heap.Remove(&s.tasks, i)
292            break
293        }
294    }
295
296    task.mu.Lock()
297    task.Status = Completed
298    task.mu.Unlock()
299
300    return nil
301}
302
303func Stats() map[string]interface{} {
304    s.mu.RLock()
305    defer s.mu.RUnlock()
306
307    stats := map[string]interface{}{
308        "total_tasks": len(s.taskMap),
309        "pending":     0,
310        "running":     0,
311        "completed":   0,
312        "failed":      0,
313    }
314
315    for _, task := range s.taskMap {
316        task.mu.RLock()
317        switch task.Status {
318        case Pending:
319            stats["pending"] = stats["pending"].(int) + 1
320        case Running:
321            stats["running"] = stats["running"].(int) + 1
322        case Completed:
323            stats["completed"] = stats["completed"].(int) + 1
324        case Failed:
325            stats["failed"] = stats["failed"].(int) + 1
326        }
327        task.mu.RUnlock()
328    }
329
330    return stats
331}
332
333func History() []*TaskExecution {
334    s.historyMu.RLock()
335    defer s.historyMu.RUnlock()
336
337    history := make([]*TaskExecution, len(s.history))
338    copy(history, s.history)
339    return history
340}
341
342// Simple cron parser
343func parseCron(expr string) {
344    // In production, use github.com/robfig/cron
345    // For demo, just return next minute
346    return time.Now().Add(time.Minute), nil
347}

Usage Example

 1package main
 2
 3import (
 4    "context"
 5    "fmt"
 6    "log"
 7    "time"
 8)
 9
10func main() {
11    scheduler := scheduler.New()
12    ctx := context.Background()
13
14    // Start scheduler
15    scheduler.Start(ctx)
16    defer scheduler.Stop()
17
18    // Schedule one-time task
19    scheduler.Schedule(&scheduler.Task{
20        ID:       "task-1",
21        Priority: scheduler.High,
22        RunAt:    time.Now().Add(5 * time.Second),
23        Function: func(ctx context.Context) error {
24            fmt.Println("One-time task executed")
25            return nil
26        },
27    })
28
29    // Schedule recurring task
30    scheduler.Schedule(&scheduler.Task{
31        ID:       "task-2",
32        Priority: scheduler.Normal,
33        RunAt:    time.Now(),
34        Interval: 10 * time.Second,
35        Function: func(ctx context.Context) error {
36            fmt.Println("Recurring task executed")
37            return nil
38        },
39    })
40
41    // Schedule cron task
42    scheduler.ScheduleCron("0 * * * *", func(ctx context.Context) error {
43        fmt.Println("Hourly task executed")
44        return nil
45    })
46
47    // Monitor stats
48    ticker := time.NewTicker(5 * time.Second)
49    for range ticker.C {
50        stats := scheduler.Stats()
51        fmt.Printf("Stats: %+v\n", stats)
52    }
53}

Key Takeaways

  • Priority queue ensures high-priority tasks run first
  • Heap structure provides efficient task ordering
  • Cron expressions enable flexible scheduling
  • Retry logic with exponential backoff handles transient failures
  • Task history enables monitoring and debugging
  • Graceful shutdown prevents task interruption
  • Context propagation enables cancellation
  • Thread-safe operations support concurrent scheduling