khenaidoo | 5e4fca3 | 2021-05-12 16:02:23 -0400 | [diff] [blame] | 1 | package backoff |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "sync" |
| 6 | "time" |
| 7 | ) |
| 8 | |
| 9 | // Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff. |
| 10 | // |
| 11 | // Ticks will continue to arrive when the previous operation is still running, |
| 12 | // so operations that take a while to fail could run in quick succession. |
| 13 | type Ticker struct { |
| 14 | C <-chan time.Time |
| 15 | c chan time.Time |
| 16 | b BackOff |
| 17 | ctx context.Context |
| 18 | timer Timer |
| 19 | stop chan struct{} |
| 20 | stopOnce sync.Once |
| 21 | } |
| 22 | |
| 23 | // NewTicker returns a new Ticker containing a channel that will send |
| 24 | // the time at times specified by the BackOff argument. Ticker is |
| 25 | // guaranteed to tick at least once. The channel is closed when Stop |
| 26 | // method is called or BackOff stops. It is not safe to manipulate the |
| 27 | // provided backoff policy (notably calling NextBackOff or Reset) |
| 28 | // while the ticker is running. |
| 29 | func NewTicker(b BackOff) *Ticker { |
| 30 | return NewTickerWithTimer(b, &defaultTimer{}) |
| 31 | } |
| 32 | |
| 33 | // NewTickerWithTimer returns a new Ticker with a custom timer. |
| 34 | // A default timer that uses system timer is used when nil is passed. |
| 35 | func NewTickerWithTimer(b BackOff, timer Timer) *Ticker { |
| 36 | c := make(chan time.Time) |
| 37 | t := &Ticker{ |
| 38 | C: c, |
| 39 | c: c, |
| 40 | b: b, |
| 41 | ctx: getContext(b), |
| 42 | timer: timer, |
| 43 | stop: make(chan struct{}), |
| 44 | } |
| 45 | t.b.Reset() |
| 46 | go t.run() |
| 47 | return t |
| 48 | } |
| 49 | |
| 50 | // Stop turns off a ticker. After Stop, no more ticks will be sent. |
| 51 | func (t *Ticker) Stop() { |
| 52 | t.stopOnce.Do(func() { close(t.stop) }) |
| 53 | } |
| 54 | |
| 55 | func (t *Ticker) run() { |
| 56 | c := t.c |
| 57 | defer close(c) |
| 58 | |
| 59 | // Ticker is guaranteed to tick at least once. |
| 60 | afterC := t.send(time.Now()) |
| 61 | |
| 62 | for { |
| 63 | if afterC == nil { |
| 64 | return |
| 65 | } |
| 66 | |
| 67 | select { |
| 68 | case tick := <-afterC: |
| 69 | afterC = t.send(tick) |
| 70 | case <-t.stop: |
| 71 | t.c = nil // Prevent future ticks from being sent to the channel. |
| 72 | return |
| 73 | case <-t.ctx.Done(): |
| 74 | return |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | func (t *Ticker) send(tick time.Time) <-chan time.Time { |
| 80 | select { |
| 81 | case t.c <- tick: |
| 82 | case <-t.stop: |
| 83 | return nil |
| 84 | } |
| 85 | |
| 86 | next := t.b.NextBackOff() |
| 87 | if next == Stop { |
| 88 | t.Stop() |
| 89 | return nil |
| 90 | } |
| 91 | |
| 92 | t.timer.Start(next) |
| 93 | return t.timer.C() |
| 94 | } |