blob: 234d01989dfb2dac9ce49599ebd248e1610568ca [file] [log] [blame]
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package schedule
import (
"context"
"sync"
)
type Job func(context.Context)
// Scheduler can schedule jobs.
type Scheduler interface {
// Schedule asks the scheduler to schedule a job defined by the given func.
// Schedule to a stopped scheduler might panic.
Schedule(j Job)
// Pending returns number of pending jobs
Pending() int
// Scheduled returns the number of scheduled jobs (excluding pending jobs)
Scheduled() int
// Finished returns the number of finished jobs
Finished() int
// WaitFinish waits until at least n job are finished and all pending jobs are finished.
WaitFinish(n int)
// Stop stops the scheduler.
Stop()
}
type fifo struct {
mu sync.Mutex
resume chan struct{}
scheduled int
finished int
pendings []Job
ctx context.Context
cancel context.CancelFunc
finishCond *sync.Cond
donec chan struct{}
}
// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
// order sequentially
func NewFIFOScheduler() Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
}
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
go f.run()
return f
}
// Schedule schedules a job that will be ran in FIFO order sequentially.
func (f *fifo) Schedule(j Job) {
f.mu.Lock()
defer f.mu.Unlock()
if f.cancel == nil {
panic("schedule: schedule to stopped scheduler")
}
if len(f.pendings) == 0 {
select {
case f.resume <- struct{}{}:
default:
}
}
f.pendings = append(f.pendings, j)
}
func (f *fifo) Pending() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.pendings)
}
func (f *fifo) Scheduled() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.scheduled
}
func (f *fifo) Finished() int {
f.finishCond.L.Lock()
defer f.finishCond.L.Unlock()
return f.finished
}
func (f *fifo) WaitFinish(n int) {
f.finishCond.L.Lock()
for f.finished < n || len(f.pendings) != 0 {
f.finishCond.Wait()
}
f.finishCond.L.Unlock()
}
// Stop stops the scheduler and cancels all pending jobs.
func (f *fifo) Stop() {
f.mu.Lock()
f.cancel()
f.cancel = nil
f.mu.Unlock()
<-f.donec
}
func (f *fifo) run() {
// TODO: recover from job panic?
defer func() {
close(f.donec)
close(f.resume)
}()
for {
var todo Job
f.mu.Lock()
if len(f.pendings) != 0 {
f.scheduled++
todo = f.pendings[0]
}
f.mu.Unlock()
if todo == nil {
select {
case <-f.resume:
case <-f.ctx.Done():
f.mu.Lock()
pendings := f.pendings
f.pendings = nil
f.mu.Unlock()
// clean up pending jobs
for _, todo := range pendings {
todo(f.ctx)
}
return
}
} else {
todo(f.ctx)
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}
}