blob: e41084b0eff94364e0c40333dede3f1594b46207 [file] [log] [blame]
Girish Gowdra64503432020-01-07 10:59:10 +05301package backoff
2
3import (
4 "sync"
5 "time"
6)
7
8// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
9//
10// Ticks will continue to arrive when the previous operation is still running,
11// so operations that take a while to fail could run in quick succession.
12type Ticker struct {
13 C <-chan time.Time
14 c chan time.Time
15 b BackOffContext
16 stop chan struct{}
17 stopOnce sync.Once
18}
19
20// NewTicker returns a new Ticker containing a channel that will send
21// the time at times specified by the BackOff argument. Ticker is
22// guaranteed to tick at least once. The channel is closed when Stop
23// method is called or BackOff stops. It is not safe to manipulate the
24// provided backoff policy (notably calling NextBackOff or Reset)
25// while the ticker is running.
26func NewTicker(b BackOff) *Ticker {
27 c := make(chan time.Time)
28 t := &Ticker{
29 C: c,
30 c: c,
31 b: ensureContext(b),
32 stop: make(chan struct{}),
33 }
34 t.b.Reset()
35 go t.run()
36 return t
37}
38
39// Stop turns off a ticker. After Stop, no more ticks will be sent.
40func (t *Ticker) Stop() {
41 t.stopOnce.Do(func() { close(t.stop) })
42}
43
44func (t *Ticker) run() {
45 c := t.c
46 defer close(c)
47
48 // Ticker is guaranteed to tick at least once.
49 afterC := t.send(time.Now())
50
51 for {
52 if afterC == nil {
53 return
54 }
55
56 select {
57 case tick := <-afterC:
58 afterC = t.send(tick)
59 case <-t.stop:
60 t.c = nil // Prevent future ticks from being sent to the channel.
61 return
62 case <-t.b.Context().Done():
63 return
64 }
65 }
66}
67
68func (t *Ticker) send(tick time.Time) <-chan time.Time {
69 select {
70 case t.c <- tick:
71 case <-t.stop:
72 return nil
73 }
74
75 next := t.b.NextBackOff()
76 if next == Stop {
77 t.Stop()
78 return nil
79 }
80
81 return time.After(next)
82}