summaryrefslogtreecommitdiffstats
path: root/jobs/jobs_watcher.go
blob: 01d0a8d0f7c6e344ec5c7efacd06b6cb8315c91e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.

package jobs

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/mattermost/mattermost-server/mlog"
	"github.com/mattermost/mattermost-server/model"
)

// Default polling interval for jobs termination.
// (Defining as `var` rather than `const` allows tests to lower the interval.)
var DEFAULT_WATCHER_POLLING_INTERVAL = 15000

type Watcher struct {
	srv     *JobServer
	workers *Workers

	stop            chan bool
	stopped         chan bool
	pollingInterval int
}

func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
	return &Watcher{
		stop:            make(chan bool, 1),
		stopped:         make(chan bool, 1),
		pollingInterval: pollingInterval,
		workers:         workers,
		srv:             srv,
	}
}

func (watcher *Watcher) Start() {
	mlog.Debug("Watcher Started")

	// Delay for some random number of milliseconds before starting to ensure that multiple
	// instances of the jobserver  don't poll at a time too close to each other.
	rand.Seed(time.Now().UTC().UnixNano())
	<-time.After(time.Duration(rand.Intn(watcher.pollingInterval)) * time.Millisecond)

	defer func() {
		mlog.Debug("Watcher Finished")
		watcher.stopped <- true
	}()

	for {
		select {
		case <-watcher.stop:
			mlog.Debug("Watcher: Received stop signal")
			return
		case <-time.After(time.Duration(watcher.pollingInterval) * time.Millisecond):
			watcher.PollAndNotify()
		}
	}
}

func (watcher *Watcher) Stop() {
	mlog.Debug("Watcher Stopping")
	watcher.stop <- true
	<-watcher.stopped
}

func (watcher *Watcher) PollAndNotify() {
	if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
		mlog.Error(fmt.Sprintf("Error occurred getting all pending statuses: %v", result.Err.Error()))
	} else {
		jobs := result.Data.([]*model.Job)

		for _, job := range jobs {
			if job.Type == model.JOB_TYPE_DATA_RETENTION {
				if watcher.workers.DataRetention != nil {
					select {
					case watcher.workers.DataRetention.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_MESSAGE_EXPORT {
				if watcher.workers.MessageExport != nil {
					select {
					case watcher.workers.MessageExport.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
				if watcher.workers.ElasticsearchIndexing != nil {
					select {
					case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION {
				if watcher.workers.ElasticsearchAggregation != nil {
					select {
					case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_LDAP_SYNC {
				if watcher.workers.LdapSync != nil {
					select {
					case watcher.workers.LdapSync.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_MIGRATIONS {
				if watcher.workers.Migrations != nil {
					select {
					case watcher.workers.Migrations.JobChannel() <- *job:
					default:
					}
				}
			}
		}
	}
}