David Bainbridge | f5879ca | 2019-12-13 21:17:54 +0000 | [diff] [blame] | 1 | package backoff |
| 2 | |
| 3 | import ( |
| 4 | "sync" |
| 5 | "time" |
| 6 | ) |
| 7 | |
| 8 | // Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff. |
| 9 | // |
| 10 | // Ticks will continue to arrive when the previous operation is still running, |
| 11 | // so operations that take a while to fail could run in quick succession. |
| 12 | type Ticker struct { |
| 13 | C <-chan time.Time |
| 14 | c chan time.Time |
| 15 | b BackOffContext |
| 16 | stop chan struct{} |
| 17 | stopOnce sync.Once |
| 18 | } |
| 19 | |
| 20 | // NewTicker returns a new Ticker containing a channel that will send |
| 21 | // the time at times specified by the BackOff argument. Ticker is |
| 22 | // guaranteed to tick at least once. The channel is closed when Stop |
| 23 | // method is called or BackOff stops. It is not safe to manipulate the |
| 24 | // provided backoff policy (notably calling NextBackOff or Reset) |
| 25 | // while the ticker is running. |
| 26 | func NewTicker(b BackOff) *Ticker { |
| 27 | c := make(chan time.Time) |
| 28 | t := &Ticker{ |
| 29 | C: c, |
| 30 | c: c, |
| 31 | b: ensureContext(b), |
| 32 | stop: make(chan struct{}), |
| 33 | } |
| 34 | t.b.Reset() |
| 35 | go t.run() |
| 36 | return t |
| 37 | } |
| 38 | |
| 39 | // Stop turns off a ticker. After Stop, no more ticks will be sent. |
| 40 | func (t *Ticker) Stop() { |
| 41 | t.stopOnce.Do(func() { close(t.stop) }) |
| 42 | } |
| 43 | |
| 44 | func (t *Ticker) run() { |
| 45 | c := t.c |
| 46 | defer close(c) |
| 47 | |
| 48 | // Ticker is guaranteed to tick at least once. |
| 49 | afterC := t.send(time.Now()) |
| 50 | |
| 51 | for { |
| 52 | if afterC == nil { |
| 53 | return |
| 54 | } |
| 55 | |
| 56 | select { |
| 57 | case tick := <-afterC: |
| 58 | afterC = t.send(tick) |
| 59 | case <-t.stop: |
| 60 | t.c = nil // Prevent future ticks from being sent to the channel. |
| 61 | return |
| 62 | case <-t.b.Context().Done(): |
| 63 | return |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | func (t *Ticker) send(tick time.Time) <-chan time.Time { |
| 69 | select { |
| 70 | case t.c <- tick: |
| 71 | case <-t.stop: |
| 72 | return nil |
| 73 | } |
| 74 | |
| 75 | next := t.b.NextBackOff() |
| 76 | if next == Stop { |
| 77 | t.Stop() |
| 78 | return nil |
| 79 | } |
| 80 | |
| 81 | return time.After(next) |
| 82 | } |