blob: 234d01989dfb2dac9ce49599ebd248e1610568ca [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package schedule
16
17import (
18 "context"
19 "sync"
20)
21
22type Job func(context.Context)
23
24// Scheduler can schedule jobs.
25type Scheduler interface {
26 // Schedule asks the scheduler to schedule a job defined by the given func.
27 // Schedule to a stopped scheduler might panic.
28 Schedule(j Job)
29
30 // Pending returns number of pending jobs
31 Pending() int
32
33 // Scheduled returns the number of scheduled jobs (excluding pending jobs)
34 Scheduled() int
35
36 // Finished returns the number of finished jobs
37 Finished() int
38
39 // WaitFinish waits until at least n job are finished and all pending jobs are finished.
40 WaitFinish(n int)
41
42 // Stop stops the scheduler.
43 Stop()
44}
45
46type fifo struct {
47 mu sync.Mutex
48
49 resume chan struct{}
50 scheduled int
51 finished int
52 pendings []Job
53
54 ctx context.Context
55 cancel context.CancelFunc
56
57 finishCond *sync.Cond
58 donec chan struct{}
59}
60
61// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
62// order sequentially
63func NewFIFOScheduler() Scheduler {
64 f := &fifo{
65 resume: make(chan struct{}, 1),
66 donec: make(chan struct{}, 1),
67 }
68 f.finishCond = sync.NewCond(&f.mu)
69 f.ctx, f.cancel = context.WithCancel(context.Background())
70 go f.run()
71 return f
72}
73
74// Schedule schedules a job that will be ran in FIFO order sequentially.
75func (f *fifo) Schedule(j Job) {
76 f.mu.Lock()
77 defer f.mu.Unlock()
78
79 if f.cancel == nil {
80 panic("schedule: schedule to stopped scheduler")
81 }
82
83 if len(f.pendings) == 0 {
84 select {
85 case f.resume <- struct{}{}:
86 default:
87 }
88 }
89 f.pendings = append(f.pendings, j)
90}
91
92func (f *fifo) Pending() int {
93 f.mu.Lock()
94 defer f.mu.Unlock()
95 return len(f.pendings)
96}
97
98func (f *fifo) Scheduled() int {
99 f.mu.Lock()
100 defer f.mu.Unlock()
101 return f.scheduled
102}
103
104func (f *fifo) Finished() int {
105 f.finishCond.L.Lock()
106 defer f.finishCond.L.Unlock()
107 return f.finished
108}
109
110func (f *fifo) WaitFinish(n int) {
111 f.finishCond.L.Lock()
112 for f.finished < n || len(f.pendings) != 0 {
113 f.finishCond.Wait()
114 }
115 f.finishCond.L.Unlock()
116}
117
118// Stop stops the scheduler and cancels all pending jobs.
119func (f *fifo) Stop() {
120 f.mu.Lock()
121 f.cancel()
122 f.cancel = nil
123 f.mu.Unlock()
124 <-f.donec
125}
126
127func (f *fifo) run() {
128 // TODO: recover from job panic?
129 defer func() {
130 close(f.donec)
131 close(f.resume)
132 }()
133
134 for {
135 var todo Job
136 f.mu.Lock()
137 if len(f.pendings) != 0 {
138 f.scheduled++
139 todo = f.pendings[0]
140 }
141 f.mu.Unlock()
142 if todo == nil {
143 select {
144 case <-f.resume:
145 case <-f.ctx.Done():
146 f.mu.Lock()
147 pendings := f.pendings
148 f.pendings = nil
149 f.mu.Unlock()
150 // clean up pending jobs
151 for _, todo := range pendings {
152 todo(f.ctx)
153 }
154 return
155 }
156 } else {
157 todo(f.ctx)
158 f.finishCond.L.Lock()
159 f.finished++
160 f.pendings = f.pendings[1:]
161 f.finishCond.Broadcast()
162 f.finishCond.L.Unlock()
163 }
164 }
165}