From 55261a2b155c4e3d699fc6bd4255f05fb8258157 Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 23 Feb 2018 12:56:32 -0600 Subject: fix scheduled task race (#8355) --- model/scheduled_task.go | 97 ++++++++++++++++--------------------------------- 1 file changed, 32 insertions(+), 65 deletions(-) (limited to 'model/scheduled_task.go') diff --git a/model/scheduled_task.go b/model/scheduled_task.go index 453828bd2..f3529dedb 100644 --- a/model/scheduled_task.go +++ b/model/scheduled_task.go @@ -5,7 +5,6 @@ package model import ( "fmt" - "sync" "time" ) @@ -15,89 +14,57 @@ type ScheduledTask struct { Name string `json:"name"` Interval time.Duration `json:"interval"` Recurring bool `json:"recurring"` - function TaskFunc - timer *time.Timer -} - -var taskMutex = sync.Mutex{} -var tasks = make(map[string]*ScheduledTask) - -func addTask(task *ScheduledTask) { - taskMutex.Lock() - defer taskMutex.Unlock() - tasks[task.Name] = task -} - -func removeTaskByName(name string) { - taskMutex.Lock() - defer taskMutex.Unlock() - delete(tasks, name) -} - -func GetTaskByName(name string) *ScheduledTask { - taskMutex.Lock() - defer taskMutex.Unlock() - if task, ok := tasks[name]; ok { - return task - } - return nil -} - -func GetAllTasks() *map[string]*ScheduledTask { - taskMutex.Lock() - defer taskMutex.Unlock() - return &tasks + function func() + cancel chan struct{} + cancelled chan struct{} } func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask { - task := &ScheduledTask{ - Name: name, - Interval: timeToExecution, - Recurring: false, - function: function, - } - - taskRunner := func() { - go task.function() - removeTaskByName(task.Name) - } - - task.timer = time.AfterFunc(timeToExecution, taskRunner) - - addTask(task) - - return task + return createTask(name, function, timeToExecution, false) } func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask { + return createTask(name, function, interval, true) +} + +func createTask(name string, function TaskFunc, interval time.Duration, recurring bool) *ScheduledTask { task := &ScheduledTask{ Name: name, Interval: interval, - Recurring: true, + Recurring: recurring, function: function, + cancel: make(chan struct{}), + cancelled: make(chan struct{}), } - taskRecurer := func() { - go task.function() - task.timer.Reset(task.Interval) - } + go func() { + defer close(task.cancelled) - task.timer = time.AfterFunc(interval, taskRecurer) + ticker := time.NewTicker(interval) + defer func() { + ticker.Stop() + }() - addTask(task) + for { + select { + case <-ticker.C: + function() + case <-task.cancel: + return + } + + if !task.Recurring { + break + } + } + }() return task } func (task *ScheduledTask) Cancel() { - task.timer.Stop() - removeTaskByName(task.Name) -} - -// Executes the task immediatly. A recurring task will be run regularally after interval. -func (task *ScheduledTask) Execute() { - task.function() - task.timer.Reset(task.Interval) + close(task.cancel) + <-task.cancelled } func (task *ScheduledTask) String() string { -- cgit v1.2.3-1-g7c22