VOL-2362 - add exponential backoff around enabling indications

Change-Id: I212312f297a0ca5ae33a4250bac1008e9be2f2c2
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 80d706a..7e09fc8 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -31,6 +31,7 @@
 
 	"google.golang.org/grpc/codes"
 
+	backoff "github.com/cenkalti/backoff/v3"
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
@@ -278,10 +279,27 @@
 		dh.lockDevice.Unlock()
 	}
 
+	// Create an exponential backoff around re-enabling indications. The
+	// maximum elapsed time for the back off is set to 0 so that we will
+	// continue to retry. The max interval defaults to 1m, but is set
+	// here for code clarity
+	indicationBackoff := backoff.NewExponentialBackOff()
+	indicationBackoff.MaxElapsedTime = 0
+	indicationBackoff.MaxInterval = 1 * time.Minute
 	for {
 		indication, err := indications.Recv()
 		if err == io.EOF {
 			log.Infow("EOF for  indications", log.Fields{"err": err})
+			// Use an exponential back off to prevent getting into a tight loop
+			duration := indicationBackoff.NextBackOff()
+			if duration == backoff.Stop {
+				// If we reach a maximum then warn and reset the backoff
+				// timer and keep attempting.
+				log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+					log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
+				indicationBackoff.Reset()
+			}
+			time.Sleep(indicationBackoff.NextBackOff())
 			indications, err = dh.Client.EnableIndication(context.Background(), new(oop.Empty))
 			if err != nil {
 				log.Errorw("Failed to read indications", log.Fields{"err": err})
@@ -299,6 +317,8 @@
 			dh.transitionMap.Handle(DeviceInit)
 			break
 		}
+		// Reset backoff if we have a successful receive
+		indicationBackoff.Reset()
 		dh.lockDevice.RLock()
 		adminState := dh.adminState
 		dh.lockDevice.RUnlock()
diff --git a/go.mod b/go.mod
index 92454eb..9d21cb0 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@
 go 1.12
 
 require (
+	github.com/cenkalti/backoff/v3 v3.1.1
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.3.2
 	github.com/opencord/voltha-lib-go/v2 v2.2.20
diff --git a/go.sum b/go.sum
index 9595ab8..79e15c5 100644
--- a/go.sum
+++ b/go.sum
@@ -23,6 +23,8 @@
 github.com/boljen/go-bitmap v0.0.0-20151001105940-23cd2fb0ce7d/go.mod h1:f1iKL6ZhUWvbk7PdWVmOaak10o86cqMUYEmn1CZNGEI=
 github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A=
 github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
+github.com/cenkalti/backoff/v3 v3.1.1 h1:UBHElAnr3ODEbpqPzX8g5sBcASjoLFtt3L/xwJ01L6E=
+github.com/cenkalti/backoff/v3 v3.1.1/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73 h1:q1g9lSyo/nOIC3W5E3FK3Unrz8b9LdLXCyuC+ZcpPC0=
 github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73/go.mod h1:507vXsotcZop7NZfBWdhPmVeOse4ko2R7AagJYrpoEg=
diff --git a/vendor/github.com/cenkalti/backoff/v3/.gitignore b/vendor/github.com/cenkalti/backoff/v3/.gitignore
new file mode 100644
index 0000000..0026861
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/.gitignore
@@ -0,0 +1,22 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
diff --git a/vendor/github.com/cenkalti/backoff/v3/.travis.yml b/vendor/github.com/cenkalti/backoff/v3/.travis.yml
new file mode 100644
index 0000000..47a6a46
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/.travis.yml
@@ -0,0 +1,10 @@
+language: go
+go:
+  - 1.7
+  - 1.x
+  - tip
+before_install:
+  - go get github.com/mattn/goveralls
+  - go get golang.org/x/tools/cmd/cover
+script:
+  - $HOME/gopath/bin/goveralls -service=travis-ci
diff --git a/vendor/github.com/cenkalti/backoff/v3/LICENSE b/vendor/github.com/cenkalti/backoff/v3/LICENSE
new file mode 100644
index 0000000..89b8179
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Cenk Altı
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/cenkalti/backoff/v3/README.md b/vendor/github.com/cenkalti/backoff/v3/README.md
new file mode 100644
index 0000000..3673df4
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/README.md
@@ -0,0 +1,33 @@
+# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls]
+
+This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client].
+
+[Exponential backoff][exponential backoff wiki]
+is an algorithm that uses feedback to multiplicatively decrease the rate of some process,
+in order to gradually find an acceptable rate.
+The retries exponentially increase and stop increasing when a certain threshold is met.
+
+## Usage
+
+Import path is `github.com/cenkalti/backoff/v3`. Please note the version part at the end.
+
+godoc.org does not support modules yet,
+so you can use https://godoc.org/gopkg.in/cenkalti/backoff.v3 to view the documentation.
+
+## Contributing
+
+* I would like to keep this library as small as possible.
+* Please don't send a PR without opening an issue and discussing it first.
+* If proposed change is not a common use case, I will probably not accept it.
+
+[godoc]: https://godoc.org/github.com/cenkalti/backoff
+[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png
+[travis]: https://travis-ci.org/cenkalti/backoff
+[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master
+[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
+[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
+
+[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java
+[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
+
+[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_
diff --git a/vendor/github.com/cenkalti/backoff/v3/backoff.go b/vendor/github.com/cenkalti/backoff/v3/backoff.go
new file mode 100644
index 0000000..3676ee4
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/backoff.go
@@ -0,0 +1,66 @@
+// Package backoff implements backoff algorithms for retrying operations.
+//
+// Use Retry function for retrying operations that may fail.
+// If Retry does not meet your needs,
+// copy/paste the function into your project and modify as you wish.
+//
+// There is also Ticker type similar to time.Ticker.
+// You can use it if you need to work with channels.
+//
+// See Examples section below for usage examples.
+package backoff
+
+import "time"
+
+// BackOff is a backoff policy for retrying an operation.
+type BackOff interface {
+	// NextBackOff returns the duration to wait before retrying the operation,
+	// or backoff. Stop to indicate that no more retries should be made.
+	//
+	// Example usage:
+	//
+	// 	duration := backoff.NextBackOff();
+	// 	if (duration == backoff.Stop) {
+	// 		// Do not retry operation.
+	// 	} else {
+	// 		// Sleep for duration and retry operation.
+	// 	}
+	//
+	NextBackOff() time.Duration
+
+	// Reset to initial state.
+	Reset()
+}
+
+// Stop indicates that no more retries should be made for use in NextBackOff().
+const Stop time.Duration = -1
+
+// ZeroBackOff is a fixed backoff policy whose backoff time is always zero,
+// meaning that the operation is retried immediately without waiting, indefinitely.
+type ZeroBackOff struct{}
+
+func (b *ZeroBackOff) Reset() {}
+
+func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 }
+
+// StopBackOff is a fixed backoff policy that always returns backoff.Stop for
+// NextBackOff(), meaning that the operation should never be retried.
+type StopBackOff struct{}
+
+func (b *StopBackOff) Reset() {}
+
+func (b *StopBackOff) NextBackOff() time.Duration { return Stop }
+
+// ConstantBackOff is a backoff policy that always returns the same backoff delay.
+// This is in contrast to an exponential backoff policy,
+// which returns a delay that grows longer as you call NextBackOff() over and over again.
+type ConstantBackOff struct {
+	Interval time.Duration
+}
+
+func (b *ConstantBackOff) Reset()                     {}
+func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval }
+
+func NewConstantBackOff(d time.Duration) *ConstantBackOff {
+	return &ConstantBackOff{Interval: d}
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/context.go b/vendor/github.com/cenkalti/backoff/v3/context.go
new file mode 100644
index 0000000..7f78421
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/context.go
@@ -0,0 +1,63 @@
+package backoff
+
+import (
+	"context"
+	"time"
+)
+
+// BackOffContext is a backoff policy that stops retrying after the context
+// is canceled.
+type BackOffContext interface { // nolint: golint
+	BackOff
+	Context() context.Context
+}
+
+type backOffContext struct {
+	BackOff
+	ctx context.Context
+}
+
+// WithContext returns a BackOffContext with context ctx
+//
+// ctx must not be nil
+func WithContext(b BackOff, ctx context.Context) BackOffContext { // nolint: golint
+	if ctx == nil {
+		panic("nil context")
+	}
+
+	if b, ok := b.(*backOffContext); ok {
+		return &backOffContext{
+			BackOff: b.BackOff,
+			ctx:     ctx,
+		}
+	}
+
+	return &backOffContext{
+		BackOff: b,
+		ctx:     ctx,
+	}
+}
+
+func ensureContext(b BackOff) BackOffContext {
+	if cb, ok := b.(BackOffContext); ok {
+		return cb
+	}
+	return WithContext(b, context.Background())
+}
+
+func (b *backOffContext) Context() context.Context {
+	return b.ctx
+}
+
+func (b *backOffContext) NextBackOff() time.Duration {
+	select {
+	case <-b.ctx.Done():
+		return Stop
+	default:
+	}
+	next := b.BackOff.NextBackOff()
+	if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next { // nolint: gosimple
+		return Stop
+	}
+	return next
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/exponential.go b/vendor/github.com/cenkalti/backoff/v3/exponential.go
new file mode 100644
index 0000000..cb11cc1
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/exponential.go
@@ -0,0 +1,154 @@
+package backoff
+
+import (
+	"math/rand"
+	"time"
+)
+
+/*
+ExponentialBackOff is a backoff implementation that increases the backoff
+period for each retry attempt using a randomization function that grows exponentially.
+
+NextBackOff() is calculated using the following formula:
+
+ randomized interval =
+     RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
+
+In other words NextBackOff() will range between the randomization factor
+percentage below and above the retry interval.
+
+For example, given the following parameters:
+
+ RetryInterval = 2
+ RandomizationFactor = 0.5
+ Multiplier = 2
+
+the actual backoff period used in the next retry attempt will range between 1 and 3 seconds,
+multiplied by the exponential, that is, between 2 and 6 seconds.
+
+Note: MaxInterval caps the RetryInterval and not the randomized interval.
+
+If the time elapsed since an ExponentialBackOff instance is created goes past the
+MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop.
+
+The elapsed time can be reset by calling Reset().
+
+Example: Given the following default arguments, for 10 tries the sequence will be,
+and assuming we go over the MaxElapsedTime on the 10th try:
+
+ Request #  RetryInterval (seconds)  Randomized Interval (seconds)
+
+  1          0.5                     [0.25,   0.75]
+  2          0.75                    [0.375,  1.125]
+  3          1.125                   [0.562,  1.687]
+  4          1.687                   [0.8435, 2.53]
+  5          2.53                    [1.265,  3.795]
+  6          3.795                   [1.897,  5.692]
+  7          5.692                   [2.846,  8.538]
+  8          8.538                   [4.269, 12.807]
+  9         12.807                   [6.403, 19.210]
+ 10         19.210                   backoff.Stop
+
+Note: Implementation is not thread-safe.
+*/
+type ExponentialBackOff struct {
+	InitialInterval     time.Duration
+	RandomizationFactor float64
+	Multiplier          float64
+	MaxInterval         time.Duration
+	// After MaxElapsedTime the ExponentialBackOff stops.
+	// It never stops if MaxElapsedTime == 0.
+	MaxElapsedTime time.Duration
+	Clock          Clock
+
+	currentInterval time.Duration
+	startTime       time.Time
+}
+
+// Clock is an interface that returns current time for BackOff.
+type Clock interface {
+	Now() time.Time
+}
+
+// Default values for ExponentialBackOff.
+const (
+	DefaultInitialInterval     = 500 * time.Millisecond
+	DefaultRandomizationFactor = 0.5
+	DefaultMultiplier          = 1.5
+	DefaultMaxInterval         = 60 * time.Second
+	DefaultMaxElapsedTime      = 15 * time.Minute
+)
+
+// NewExponentialBackOff creates an instance of ExponentialBackOff using default values.
+func NewExponentialBackOff() *ExponentialBackOff {
+	b := &ExponentialBackOff{
+		InitialInterval:     DefaultInitialInterval,
+		RandomizationFactor: DefaultRandomizationFactor,
+		Multiplier:          DefaultMultiplier,
+		MaxInterval:         DefaultMaxInterval,
+		MaxElapsedTime:      DefaultMaxElapsedTime,
+		Clock:               SystemClock,
+	}
+	b.Reset()
+	return b
+}
+
+type systemClock struct{}
+
+func (t systemClock) Now() time.Time {
+	return time.Now()
+}
+
+// SystemClock implements Clock interface that uses time.Now().
+var SystemClock = systemClock{}
+
+// Reset the interval back to the initial retry interval and restarts the timer.
+// Reset must be called before using b.
+func (b *ExponentialBackOff) Reset() {
+	b.currentInterval = b.InitialInterval
+	b.startTime = b.Clock.Now()
+}
+
+// NextBackOff calculates the next backoff interval using the formula:
+// 	Randomized interval = RetryInterval * (1 ± RandomizationFactor)
+func (b *ExponentialBackOff) NextBackOff() time.Duration {
+	// Make sure we have not gone over the maximum elapsed time.
+	if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime {
+		return Stop
+	}
+	defer b.incrementCurrentInterval()
+	return getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval)
+}
+
+// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance
+// is created and is reset when Reset() is called.
+//
+// The elapsed time is computed using time.Now().UnixNano(). It is
+// safe to call even while the backoff policy is used by a running
+// ticker.
+func (b *ExponentialBackOff) GetElapsedTime() time.Duration {
+	return b.Clock.Now().Sub(b.startTime)
+}
+
+// Increments the current interval by multiplying it with the multiplier.
+func (b *ExponentialBackOff) incrementCurrentInterval() {
+	// Check for overflow, if overflow is detected set the current interval to the max interval.
+	if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier {
+		b.currentInterval = b.MaxInterval
+	} else {
+		b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier)
+	}
+}
+
+// Returns a random value from the following interval:
+// 	[randomizationFactor * currentInterval, randomizationFactor * currentInterval].
+func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
+	var delta = randomizationFactor * float64(currentInterval)
+	var minInterval = float64(currentInterval) - delta
+	var maxInterval = float64(currentInterval) + delta
+
+	// Get a random value from the range [minInterval, maxInterval].
+	// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
+	// we want a 33% chance for selecting either 1, 2 or 3.
+	return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/go.mod b/vendor/github.com/cenkalti/backoff/v3/go.mod
new file mode 100644
index 0000000..479e62a
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/go.mod
@@ -0,0 +1,3 @@
+module github.com/cenkalti/backoff/v3
+
+go 1.12
diff --git a/vendor/github.com/cenkalti/backoff/v3/retry.go b/vendor/github.com/cenkalti/backoff/v3/retry.go
new file mode 100644
index 0000000..e4c6c9c
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/retry.go
@@ -0,0 +1,86 @@
+package backoff
+
+import "time"
+
+// An Operation is executing by Retry() or RetryNotify().
+// The operation will be retried using a backoff policy if it returns an error.
+type Operation func() error
+
+// Notify is a notify-on-error function. It receives an operation error and
+// backoff delay if the operation failed (with an error).
+//
+// NOTE that if the backoff policy stated to stop retrying,
+// the notify function isn't called.
+type Notify func(error, time.Duration)
+
+// Retry the operation o until it does not return error or BackOff stops.
+// o is guaranteed to be run at least once.
+//
+// If o returns a *PermanentError, the operation is not retried, and the
+// wrapped error is returned.
+//
+// Retry sleeps the goroutine for the duration returned by BackOff after a
+// failed operation returns.
+func Retry(o Operation, b BackOff) error { return RetryNotify(o, b, nil) }
+
+// RetryNotify calls notify function with the error and wait duration
+// for each failed attempt before sleep.
+func RetryNotify(operation Operation, b BackOff, notify Notify) error {
+	var err error
+	var next time.Duration
+	var t *time.Timer
+
+	cb := ensureContext(b)
+
+	b.Reset()
+	for {
+		if err = operation(); err == nil {
+			return nil
+		}
+
+		if permanent, ok := err.(*PermanentError); ok {
+			return permanent.Err
+		}
+
+		if next = cb.NextBackOff(); next == Stop {
+			return err
+		}
+
+		if notify != nil {
+			notify(err, next)
+		}
+
+		if t == nil {
+			t = time.NewTimer(next)
+			defer t.Stop()
+		} else {
+			t.Reset(next)
+		}
+
+		select {
+		case <-cb.Context().Done():
+			return err
+		case <-t.C:
+		}
+	}
+}
+
+// PermanentError signals that the operation should not be retried.
+type PermanentError struct {
+	Err error
+}
+
+func (e *PermanentError) Error() string {
+	return e.Err.Error()
+}
+
+func (e *PermanentError) Unwrap() error {
+	return e.Err
+}
+
+// Permanent wraps the given err in a *PermanentError.
+func Permanent(err error) *PermanentError {
+	return &PermanentError{
+		Err: err,
+	}
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/ticker.go b/vendor/github.com/cenkalti/backoff/v3/ticker.go
new file mode 100644
index 0000000..e41084b
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/ticker.go
@@ -0,0 +1,82 @@
+package backoff
+
+import (
+	"sync"
+	"time"
+)
+
+// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
+//
+// Ticks will continue to arrive when the previous operation is still running,
+// so operations that take a while to fail could run in quick succession.
+type Ticker struct {
+	C        <-chan time.Time
+	c        chan time.Time
+	b        BackOffContext
+	stop     chan struct{}
+	stopOnce sync.Once
+}
+
+// NewTicker returns a new Ticker containing a channel that will send
+// the time at times specified by the BackOff argument. Ticker is
+// guaranteed to tick at least once.  The channel is closed when Stop
+// method is called or BackOff stops. It is not safe to manipulate the
+// provided backoff policy (notably calling NextBackOff or Reset)
+// while the ticker is running.
+func NewTicker(b BackOff) *Ticker {
+	c := make(chan time.Time)
+	t := &Ticker{
+		C:    c,
+		c:    c,
+		b:    ensureContext(b),
+		stop: make(chan struct{}),
+	}
+	t.b.Reset()
+	go t.run()
+	return t
+}
+
+// Stop turns off a ticker. After Stop, no more ticks will be sent.
+func (t *Ticker) Stop() {
+	t.stopOnce.Do(func() { close(t.stop) })
+}
+
+func (t *Ticker) run() {
+	c := t.c
+	defer close(c)
+
+	// Ticker is guaranteed to tick at least once.
+	afterC := t.send(time.Now())
+
+	for {
+		if afterC == nil {
+			return
+		}
+
+		select {
+		case tick := <-afterC:
+			afterC = t.send(tick)
+		case <-t.stop:
+			t.c = nil // Prevent future ticks from being sent to the channel.
+			return
+		case <-t.b.Context().Done():
+			return
+		}
+	}
+}
+
+func (t *Ticker) send(tick time.Time) <-chan time.Time {
+	select {
+	case t.c <- tick:
+	case <-t.stop:
+		return nil
+	}
+
+	next := t.b.NextBackOff()
+	if next == Stop {
+		t.Stop()
+		return nil
+	}
+
+	return time.After(next)
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/tries.go b/vendor/github.com/cenkalti/backoff/v3/tries.go
new file mode 100644
index 0000000..cfeefd9
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/tries.go
@@ -0,0 +1,35 @@
+package backoff
+
+import "time"
+
+/*
+WithMaxRetries creates a wrapper around another BackOff, which will
+return Stop if NextBackOff() has been called too many times since
+the last time Reset() was called
+
+Note: Implementation is not thread-safe.
+*/
+func WithMaxRetries(b BackOff, max uint64) BackOff {
+	return &backOffTries{delegate: b, maxTries: max}
+}
+
+type backOffTries struct {
+	delegate BackOff
+	maxTries uint64
+	numTries uint64
+}
+
+func (b *backOffTries) NextBackOff() time.Duration {
+	if b.maxTries > 0 {
+		if b.maxTries <= b.numTries {
+			return Stop
+		}
+		b.numTries++
+	}
+	return b.delegate.NextBackOff()
+}
+
+func (b *backOffTries) Reset() {
+	b.numTries = 0
+	b.delegate.Reset()
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index a43764b..ea3c340 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -8,6 +8,8 @@
 github.com/boljen/go-bitmap
 # github.com/bsm/sarama-cluster v2.1.15+incompatible
 github.com/bsm/sarama-cluster
+# github.com/cenkalti/backoff/v3 v3.1.1
+github.com/cenkalti/backoff/v3
 # github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73
 github.com/cevaris/ordered_map
 # github.com/coreos/go-systemd v0.0.0-20190620071333-e64a0ec8b42a