Exercise: Scheduler
Difficulty - Advanced
Learning Objectives
- Implement task scheduling with priorities
- Handle cron-like scheduling
- Support delayed and recurring tasks
- Implement graceful shutdown
- Track task execution history
- Handle task dependencies
Problem Statement
Create a flexible task scheduler supporting one-time, recurring, and cron-based task execution with priority queuing.
Core Components
1package scheduler
2
3import (
4 "context"
5 "time"
6)
7
8type Priority int
9
10const (
11 Low Priority = iota
12 Normal
13 High
14 Critical
15)
16
17type Task struct {
18 ID string
19 Priority Priority
20 RunAt time.Time
21 Interval time.Duration
22 Function func(context.Context) error
23}
24
25type Scheduler struct {
26 tasks []*Task
27 running bool
28}
29
30func New() *Scheduler
31func Schedule(task *Task) error
32func ScheduleCron(expr string, fn func(context.Context) error) error
33func Start(ctx context.Context)
34func Stop()
35func Remove(taskID string) error
Solution
Click to see the solution
1package scheduler
2
3import (
4 "container/heap"
5 "context"
6 "errors"
7 "fmt"
8 "sync"
9 "time"
10)
11
12type Priority int
13
14const (
15 Low Priority = iota
16 Normal
17 High
18 Critical
19)
20
21type TaskStatus int
22
23const (
24 Pending TaskStatus = iota
25 Running
26 Completed
27 Failed
28)
29
30type Task struct {
31 ID string
32 Priority Priority
33 RunAt time.Time
34 Interval time.Duration
35 Cron string
36 Function func(context.Context) error
37 MaxRetries int
38 Retries int
39 Status TaskStatus
40 LastRun time.Time
41 NextRun time.Time
42 mu sync.RWMutex
43}
44
45type taskQueue []*Task
46
47func Len() int { return len(tq) }
48
49func Less(i, j int) bool {
50 if tq[i].Priority != tq[j].Priority {
51 return tq[i].Priority > tq[j].Priority
52 }
53 return tq[i].RunAt.Before(tq[j].RunAt)
54}
55
56func Swap(i, j int) {
57 tq[i], tq[j] = tq[j], tq[i]
58}
59
60func Push(x interface{}) {
61 *tq = append(*tq, x.(*Task))
62}
63
64func Pop() interface{} {
65 old := *tq
66 n := len(old)
67 task := old[n-1]
68 *tq = old[0 : n-1]
69 return task
70}
71
72type Scheduler struct {
73 tasks taskQueue
74 taskMap map[string]*Task
75 mu sync.RWMutex
76 running bool
77 stopCh chan struct{}
78 newTaskCh chan *Task
79 history []*TaskExecution
80 historyMu sync.RWMutex
81}
82
83type TaskExecution struct {
84 TaskID string
85 StartTime time.Time
86 EndTime time.Time
87 Error error
88 Duration time.Duration
89}
90
91func New() *Scheduler {
92 s := &Scheduler{
93 tasks: make(taskQueue, 0),
94 taskMap: make(map[string]*Task),
95 stopCh: make(chan struct{}),
96 newTaskCh: make(chan *Task, 100),
97 history: make([]*TaskExecution, 0),
98 }
99 heap.Init(&s.tasks)
100 return s
101}
102
103func Schedule(task *Task) error {
104 if task.ID == "" {
105 task.ID = fmt.Sprintf("task-%d", time.Now().UnixNano())
106 }
107
108 if task.RunAt.IsZero() {
109 task.RunAt = time.Now()
110 }
111
112 task.NextRun = task.RunAt
113 task.Status = Pending
114
115 s.mu.Lock()
116 if _, exists := s.taskMap[task.ID]; exists {
117 s.mu.Unlock()
118 return errors.New("task already exists")
119 }
120 s.taskMap[task.ID] = task
121 s.mu.Unlock()
122
123 if s.running {
124 s.newTaskCh <- task
125 } else {
126 s.mu.Lock()
127 heap.Push(&s.tasks, task)
128 s.mu.Unlock()
129 }
130
131 return nil
132}
133
134func ScheduleCron(expr string, fn func(context.Context) error) error {
135 // Simple cron parsing
136 nextRun, err := parseCron(expr)
137 if err != nil {
138 return err
139 }
140
141 task := &Task{
142 ID: fmt.Sprintf("cron-%d", time.Now().UnixNano()),
143 Priority: Normal,
144 RunAt: nextRun,
145 Cron: expr,
146 Function: fn,
147 }
148
149 return s.Schedule(task)
150}
151
152func Start(ctx context.Context) {
153 s.mu.Lock()
154 if s.running {
155 s.mu.Unlock()
156 return
157 }
158 s.running = true
159 s.mu.Unlock()
160
161 go s.run(ctx)
162}
163
164func run(ctx context.Context) {
165 ticker := time.NewTicker(100 * time.Millisecond)
166 defer ticker.Stop()
167
168 for {
169 select {
170 case <-ctx.Done():
171 s.Stop()
172 return
173 case <-s.stopCh:
174 return
175 case task := <-s.newTaskCh:
176 s.mu.Lock()
177 heap.Push(&s.tasks, task)
178 s.mu.Unlock()
179 case <-ticker.C:
180 s.processTasks(ctx)
181 }
182 }
183}
184
185func processTasks(ctx context.Context) {
186 now := time.Now()
187
188 s.mu.Lock()
189 for s.tasks.Len() > 0 {
190 task := s.tasks[0]
191 if task.NextRun.After(now) {
192 break
193 }
194
195 task = heap.Pop(&s.tasks).(*Task)
196 s.mu.Unlock()
197
198 go s.executeTask(ctx, task)
199
200 s.mu.Lock()
201 }
202 s.mu.Unlock()
203}
204
205func executeTask(ctx context.Context, task *Task) {
206 execution := &TaskExecution{
207 TaskID: task.ID,
208 StartTime: time.Now(),
209 }
210
211 task.mu.Lock()
212 task.Status = Running
213 task.LastRun = time.Now()
214 task.mu.Unlock()
215
216 err := task.Function(ctx)
217 execution.EndTime = time.Now()
218 execution.Duration = execution.EndTime.Sub(execution.StartTime)
219 execution.Error = err
220
221 s.historyMu.Lock()
222 s.history = append(s.history, execution)
223 if len(s.history) > 1000 {
224 s.history = s.history[len(s.history)-1000:]
225 }
226 s.historyMu.Unlock()
227
228 task.mu.Lock()
229 if err != nil {
230 task.Status = Failed
231 task.Retries++
232
233 if task.Retries < task.MaxRetries {
234 // Retry with exponential backoff
235 delay := time.Duration(1<<uint(task.Retries)) * time.Second
236 task.NextRun = time.Now().Add(delay)
237 task.Status = Pending
238 s.rescheduleTask(task)
239 }
240 } else {
241 task.Status = Completed
242 task.Retries = 0
243
244 // Reschedule if recurring
245 if task.Interval > 0 {
246 task.NextRun = time.Now().Add(task.Interval)
247 task.Status = Pending
248 s.rescheduleTask(task)
249 } else if task.Cron != "" {
250 nextRun, err := parseCron(task.Cron)
251 if err == nil {
252 task.NextRun = nextRun
253 task.Status = Pending
254 s.rescheduleTask(task)
255 }
256 }
257 }
258 task.mu.Unlock()
259}
260
261func rescheduleTask(task *Task) {
262 s.newTaskCh <- task
263}
264
265func Stop() {
266 s.mu.Lock()
267 if !s.running {
268 s.mu.Unlock()
269 return
270 }
271 s.running = false
272 s.mu.Unlock()
273
274 close(s.stopCh)
275}
276
277func Remove(taskID string) error {
278 s.mu.Lock()
279 defer s.mu.Unlock()
280
281 task, exists := s.taskMap[taskID]
282 if !exists {
283 return errors.New("task not found")
284 }
285
286 delete(s.taskMap, taskID)
287
288 // Remove from heap
289 for i, t := range s.tasks {
290 if t.ID == taskID {
291 heap.Remove(&s.tasks, i)
292 break
293 }
294 }
295
296 task.mu.Lock()
297 task.Status = Completed
298 task.mu.Unlock()
299
300 return nil
301}
302
303func Stats() map[string]interface{} {
304 s.mu.RLock()
305 defer s.mu.RUnlock()
306
307 stats := map[string]interface{}{
308 "total_tasks": len(s.taskMap),
309 "pending": 0,
310 "running": 0,
311 "completed": 0,
312 "failed": 0,
313 }
314
315 for _, task := range s.taskMap {
316 task.mu.RLock()
317 switch task.Status {
318 case Pending:
319 stats["pending"] = stats["pending"].(int) + 1
320 case Running:
321 stats["running"] = stats["running"].(int) + 1
322 case Completed:
323 stats["completed"] = stats["completed"].(int) + 1
324 case Failed:
325 stats["failed"] = stats["failed"].(int) + 1
326 }
327 task.mu.RUnlock()
328 }
329
330 return stats
331}
332
333func History() []*TaskExecution {
334 s.historyMu.RLock()
335 defer s.historyMu.RUnlock()
336
337 history := make([]*TaskExecution, len(s.history))
338 copy(history, s.history)
339 return history
340}
341
342// Simple cron parser
343func parseCron(expr string) {
344 // In production, use github.com/robfig/cron
345 // For demo, just return next minute
346 return time.Now().Add(time.Minute), nil
347}
Usage Example
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8)
9
10func main() {
11 scheduler := scheduler.New()
12 ctx := context.Background()
13
14 // Start scheduler
15 scheduler.Start(ctx)
16 defer scheduler.Stop()
17
18 // Schedule one-time task
19 scheduler.Schedule(&scheduler.Task{
20 ID: "task-1",
21 Priority: scheduler.High,
22 RunAt: time.Now().Add(5 * time.Second),
23 Function: func(ctx context.Context) error {
24 fmt.Println("One-time task executed")
25 return nil
26 },
27 })
28
29 // Schedule recurring task
30 scheduler.Schedule(&scheduler.Task{
31 ID: "task-2",
32 Priority: scheduler.Normal,
33 RunAt: time.Now(),
34 Interval: 10 * time.Second,
35 Function: func(ctx context.Context) error {
36 fmt.Println("Recurring task executed")
37 return nil
38 },
39 })
40
41 // Schedule cron task
42 scheduler.ScheduleCron("0 * * * *", func(ctx context.Context) error {
43 fmt.Println("Hourly task executed")
44 return nil
45 })
46
47 // Monitor stats
48 ticker := time.NewTicker(5 * time.Second)
49 for range ticker.C {
50 stats := scheduler.Stats()
51 fmt.Printf("Stats: %+v\n", stats)
52 }
53}
Key Takeaways
- Priority queue ensures high-priority tasks run first
- Heap structure provides efficient task ordering
- Cron expressions enable flexible scheduling
- Retry logic with exponential backoff handles transient failures
- Task history enables monitoring and debugging
- Graceful shutdown prevents task interruption
- Context propagation enables cancellation
- Thread-safe operations support concurrent scheduling