[VOL-3981] Voltha Core restarts if it can't subscribe to Kafka

This commit fixes the following issues:
1) It creates a loop to try to resubscribe to kafka on error.  This
is an issue that occurs randomly, especially when the kafka broker
is up and running but not truly ready to create a new topic.

2) Fixes an issue where the event proxy start was incorrect setting
the cluster messaging bus probe to out of service and never sets it
to running.   This was causing the Core to wait forever for the
probe to be ready.

Change-Id: Idf22481f85e4b576440301f2859da7ddf2d8c688
diff --git a/go.mod b/go.mod
index 18e8af4..56b4125 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@
 go 1.16
 
 require (
+	github.com/cenkalti/backoff/v3 v3.2.2
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/mock v1.5.0
 	github.com/golang/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index fdb5b9b..aaec4cd 100644
--- a/go.sum
+++ b/go.sum
@@ -20,6 +20,8 @@
 github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
+github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
+github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2b5017e..6510964 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,104 +24,113 @@
 
 // RW Core service default constants
 const (
-	EtcdStoreName                    = "etcd"
-	defaultGrpcAddress               = ":50057"
-	defaultKafkaAdapterAddress       = "127.0.0.1:9092"
-	defaultKafkaClusterAddress       = "127.0.0.1:9094"
-	defaultKVStoreType               = EtcdStoreName
-	defaultKVStoreTimeout            = 5 * time.Second
-	defaultKVStoreAddress            = "127.0.0.1:2379" // Etcd = 2379
-	defaultKVTxnKeyDelTime           = 60
-	defaultLogLevel                  = "WARN"
-	defaultBanner                    = false
-	defaultDisplayVersionOnly        = false
-	defaultCoreTopic                 = "rwcore"
-	defaultEventTopic                = "voltha.events"
-	defaultRWCoreEndpoint            = "rwcore"
-	defaultRWCoreKey                 = "pki/voltha.key"
-	defaultRWCoreCert                = "pki/voltha.crt"
-	defaultRWCoreCA                  = "pki/voltha-CA.pem"
-	defaultLongRunningRequestTimeout = 2000 * time.Millisecond
-	defaultDefaultRequestTimeout     = 1000 * time.Millisecond
-	defaultCoreTimeout               = 1000 * time.Millisecond
-	defaultCoreBindingKey            = "voltha_backend_name"
-	defaultMaxConnectionRetries      = -1 // retries forever
-	defaultConnectionRetryInterval   = 2 * time.Second
-	defaultLiveProbeInterval         = 60 * time.Second
-	defaultNotLiveProbeInterval      = 5 * time.Second // Probe more frequently when not alive
-	defaultProbeAddress              = ":8080"
-	defaultTraceEnabled              = false
-	defaultTraceAgentAddress         = "127.0.0.1:6831"
-	defaultLogCorrelationEnabled     = true
-	defaultVolthaStackID             = "voltha"
+	EtcdStoreName                      = "etcd"
+	defaultGrpcAddress                 = ":50057"
+	defaultKafkaAdapterAddress         = "127.0.0.1:9092"
+	defaultKafkaClusterAddress         = "127.0.0.1:9094"
+	defaultKVStoreType                 = EtcdStoreName
+	defaultKVStoreTimeout              = 5 * time.Second
+	defaultKVStoreAddress              = "127.0.0.1:2379" // Etcd = 2379
+	defaultKVTxnKeyDelTime             = 60
+	defaultLogLevel                    = "WARN"
+	defaultBanner                      = false
+	defaultDisplayVersionOnly          = false
+	defaultCoreTopic                   = "rwcore"
+	defaultEventTopic                  = "voltha.events"
+	defaultRWCoreEndpoint              = "rwcore"
+	defaultRWCoreKey                   = "pki/voltha.key"
+	defaultRWCoreCert                  = "pki/voltha.crt"
+	defaultRWCoreCA                    = "pki/voltha-CA.pem"
+	defaultLongRunningRequestTimeout   = 2000 * time.Millisecond
+	defaultDefaultRequestTimeout       = 1000 * time.Millisecond
+	defaultCoreTimeout                 = 1000 * time.Millisecond
+	defaultCoreBindingKey              = "voltha_backend_name"
+	defaultMaxConnectionRetries        = -1 // retries forever
+	defaultConnectionRetryInterval     = 2 * time.Second
+	defaultLiveProbeInterval           = 60 * time.Second
+	defaultNotLiveProbeInterval        = 5 * time.Second // Probe more frequently when not alive
+	defaultProbeAddress                = ":8080"
+	defaultTraceEnabled                = false
+	defaultTraceAgentAddress           = "127.0.0.1:6831"
+	defaultLogCorrelationEnabled       = true
+	defaultVolthaStackID               = "voltha"
+	defaultBackoffRetryInitialInterval = 500 * time.Millisecond
+	defaultBackoffRetryMaxElapsedTime  = 0
+	defaultBackoffRetryMaxInterval     = 1 * time.Minute
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	RWCoreEndpoint            string
-	GrpcAddress               string
-	KafkaAdapterAddress       string
-	KafkaClusterAddress       string
-	KVStoreType               string
-	KVStoreTimeout            time.Duration
-	KVStoreAddress            string
-	KVTxnKeyDelTime           int
-	CoreTopic                 string
-	EventTopic                string
-	LogLevel                  string
-	Banner                    bool
-	DisplayVersionOnly        bool
-	RWCoreKey                 string
-	RWCoreCert                string
-	RWCoreCA                  string
-	LongRunningRequestTimeout time.Duration
-	DefaultRequestTimeout     time.Duration
-	DefaultCoreTimeout        time.Duration
-	CoreBindingKey            string
-	MaxConnectionRetries      int
-	ConnectionRetryInterval   time.Duration
-	LiveProbeInterval         time.Duration
-	NotLiveProbeInterval      time.Duration
-	ProbeAddress              string
-	TraceEnabled              bool
-	TraceAgentAddress         string
-	LogCorrelationEnabled     bool
-	VolthaStackID             string
+	RWCoreEndpoint              string
+	GrpcAddress                 string
+	KafkaAdapterAddress         string
+	KafkaClusterAddress         string
+	KVStoreType                 string
+	KVStoreTimeout              time.Duration
+	KVStoreAddress              string
+	KVTxnKeyDelTime             int
+	CoreTopic                   string
+	EventTopic                  string
+	LogLevel                    string
+	Banner                      bool
+	DisplayVersionOnly          bool
+	RWCoreKey                   string
+	RWCoreCert                  string
+	RWCoreCA                    string
+	LongRunningRequestTimeout   time.Duration
+	DefaultRequestTimeout       time.Duration
+	DefaultCoreTimeout          time.Duration
+	CoreBindingKey              string
+	MaxConnectionRetries        int
+	ConnectionRetryInterval     time.Duration
+	LiveProbeInterval           time.Duration
+	NotLiveProbeInterval        time.Duration
+	ProbeAddress                string
+	TraceEnabled                bool
+	TraceAgentAddress           string
+	LogCorrelationEnabled       bool
+	VolthaStackID               string
+	BackoffRetryInitialInterval time.Duration
+	BackoffRetryMaxElapsedTime  time.Duration
+	BackoffRetryMaxInterval     time.Duration
 }
 
 // NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		RWCoreEndpoint:            defaultRWCoreEndpoint,
-		GrpcAddress:               defaultGrpcAddress,
-		KafkaAdapterAddress:       defaultKafkaAdapterAddress,
-		KafkaClusterAddress:       defaultKafkaClusterAddress,
-		KVStoreType:               defaultKVStoreType,
-		KVStoreTimeout:            defaultKVStoreTimeout,
-		KVStoreAddress:            defaultKVStoreAddress,
-		KVTxnKeyDelTime:           defaultKVTxnKeyDelTime,
-		CoreTopic:                 defaultCoreTopic,
-		EventTopic:                defaultEventTopic,
-		LogLevel:                  defaultLogLevel,
-		Banner:                    defaultBanner,
-		DisplayVersionOnly:        defaultDisplayVersionOnly,
-		RWCoreKey:                 defaultRWCoreKey,
-		RWCoreCert:                defaultRWCoreCert,
-		RWCoreCA:                  defaultRWCoreCA,
-		DefaultRequestTimeout:     defaultDefaultRequestTimeout,
-		LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
-		DefaultCoreTimeout:        defaultCoreTimeout,
-		CoreBindingKey:            defaultCoreBindingKey,
-		MaxConnectionRetries:      defaultMaxConnectionRetries,
-		ConnectionRetryInterval:   defaultConnectionRetryInterval,
-		LiveProbeInterval:         defaultLiveProbeInterval,
-		NotLiveProbeInterval:      defaultNotLiveProbeInterval,
-		ProbeAddress:              defaultProbeAddress,
-		TraceEnabled:              defaultTraceEnabled,
-		TraceAgentAddress:         defaultTraceAgentAddress,
-		LogCorrelationEnabled:     defaultLogCorrelationEnabled,
-		VolthaStackID:             defaultVolthaStackID,
+		RWCoreEndpoint:              defaultRWCoreEndpoint,
+		GrpcAddress:                 defaultGrpcAddress,
+		KafkaAdapterAddress:         defaultKafkaAdapterAddress,
+		KafkaClusterAddress:         defaultKafkaClusterAddress,
+		KVStoreType:                 defaultKVStoreType,
+		KVStoreTimeout:              defaultKVStoreTimeout,
+		KVStoreAddress:              defaultKVStoreAddress,
+		KVTxnKeyDelTime:             defaultKVTxnKeyDelTime,
+		CoreTopic:                   defaultCoreTopic,
+		EventTopic:                  defaultEventTopic,
+		LogLevel:                    defaultLogLevel,
+		Banner:                      defaultBanner,
+		DisplayVersionOnly:          defaultDisplayVersionOnly,
+		RWCoreKey:                   defaultRWCoreKey,
+		RWCoreCert:                  defaultRWCoreCert,
+		RWCoreCA:                    defaultRWCoreCA,
+		DefaultRequestTimeout:       defaultDefaultRequestTimeout,
+		LongRunningRequestTimeout:   defaultLongRunningRequestTimeout,
+		DefaultCoreTimeout:          defaultCoreTimeout,
+		CoreBindingKey:              defaultCoreBindingKey,
+		MaxConnectionRetries:        defaultMaxConnectionRetries,
+		ConnectionRetryInterval:     defaultConnectionRetryInterval,
+		LiveProbeInterval:           defaultLiveProbeInterval,
+		NotLiveProbeInterval:        defaultNotLiveProbeInterval,
+		ProbeAddress:                defaultProbeAddress,
+		TraceEnabled:                defaultTraceEnabled,
+		TraceAgentAddress:           defaultTraceAgentAddress,
+		LogCorrelationEnabled:       defaultLogCorrelationEnabled,
+		VolthaStackID:               defaultVolthaStackID,
+		BackoffRetryInitialInterval: defaultBackoffRetryInitialInterval,
+		BackoffRetryMaxElapsedTime:  defaultBackoffRetryMaxElapsedTime,
+		BackoffRetryMaxInterval:     defaultBackoffRetryMaxInterval,
 	}
 	return &rwCoreFlag
 }
@@ -209,5 +218,14 @@
 	help = fmt.Sprintf("ID for the current voltha stack")
 	flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
 
+	help = fmt.Sprintf("The initial number of milliseconds an exponential backoff will wait before a retry")
+	flag.DurationVar(&(cf.BackoffRetryInitialInterval), "backoff_retry_initial_interval", defaultBackoffRetryInitialInterval, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds an exponential backoff can elasped")
+	flag.DurationVar(&(cf.BackoffRetryMaxElapsedTime), "backoff_retry_max_elapsed_time", defaultBackoffRetryMaxElapsedTime, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds of an exponential backoff interval")
+	flag.DurationVar(&(cf.BackoffRetryMaxInterval), "backoff_retry_max_interval", defaultBackoffRetryMaxInterval, help)
+
 	flag.Parse()
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 459c2fa..4b864de 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,6 +56,7 @@
 			"kv-store",
 			"adapter-manager",
 			"grpc-service",
+			"adapter-request-handler",
 		)
 
 		if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
@@ -134,7 +135,8 @@
 	)
 
 	// create event proxy
-	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+	updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
+	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
 	if err != nil {
 		logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
 		return
@@ -168,7 +170,7 @@
 	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy, cf.VolthaStackID)
 
 	// register kafka RPC handler
-	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
+	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
 
 	// start gRPC handler
 	grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index d45a84e..ae33090 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,7 +18,10 @@
 
 import (
 	"context"
+	"github.com/cenkalti/backoff/v3"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events"
+
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -69,11 +72,13 @@
 	return kmp, nil
 }
 
-func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
 	ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
 	for {
 		if err := kafkaClient.Start(ctx); err != nil {
-			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			if updateProbeService {
+				probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			}
 			logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
@@ -82,6 +87,9 @@
 				return nil, ctx.Err()
 			}
 		}
+		if updateProbeService {
+			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
+		}
 		logger.Info(ctx, "started-connection-on-kafka-cluster-address")
 		break
 	}
@@ -170,13 +178,45 @@
 	}
 }
 
-func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
+	aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
+	logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
+
+	// Set the exponential backoff params
+	kafkaRetryBackoff := backoff.NewExponentialBackOff()
+	kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
+	kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
+	kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
+
+	//For initial request, do not wait
+	backoffTimer := time.NewTimer(0)
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
 	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
-
-	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
-		logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+	for {
+		select {
+		case <-backoffTimer.C:
+			// Register the broadcast topic to handle any core-bound broadcast requests
+			err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
+			if err == nil {
+				logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				return
+			}
+			logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
+			duration := kafkaRetryBackoff.NextBackOff()
+			//This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
+			if duration == backoff.Stop {
+				// If we reach a maximum then warn and reset the backoff timer and keep attempting.
+				logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
+					log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
+				kafkaRetryBackoff.Reset()
+				duration = kafkaRetryBackoff.NextBackOff()
+			}
+			backoffTimer = time.NewTimer(duration)
+		case <-ctx.Done():
+			logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
+			return
+		}
 	}
-
-	logger.Info(ctx, "request-handler-registered")
 }
diff --git a/vendor/github.com/cenkalti/backoff/v3/.gitignore b/vendor/github.com/cenkalti/backoff/v3/.gitignore
new file mode 100644
index 0000000..0026861
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/.gitignore
@@ -0,0 +1,22 @@
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
diff --git a/vendor/github.com/cenkalti/backoff/v3/.travis.yml b/vendor/github.com/cenkalti/backoff/v3/.travis.yml
new file mode 100644
index 0000000..47a6a46
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/.travis.yml
@@ -0,0 +1,10 @@
+language: go
+go:
+  - 1.7
+  - 1.x
+  - tip
+before_install:
+  - go get github.com/mattn/goveralls
+  - go get golang.org/x/tools/cmd/cover
+script:
+  - $HOME/gopath/bin/goveralls -service=travis-ci
diff --git a/vendor/github.com/cenkalti/backoff/v3/LICENSE b/vendor/github.com/cenkalti/backoff/v3/LICENSE
new file mode 100644
index 0000000..89b8179
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Cenk Altı
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/github.com/cenkalti/backoff/v3/README.md b/vendor/github.com/cenkalti/backoff/v3/README.md
new file mode 100644
index 0000000..3673df4
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/README.md
@@ -0,0 +1,33 @@
+# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls]
+
+This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client].
+
+[Exponential backoff][exponential backoff wiki]
+is an algorithm that uses feedback to multiplicatively decrease the rate of some process,
+in order to gradually find an acceptable rate.
+The retries exponentially increase and stop increasing when a certain threshold is met.
+
+## Usage
+
+Import path is `github.com/cenkalti/backoff/v3`. Please note the version part at the end.
+
+godoc.org does not support modules yet,
+so you can use https://godoc.org/gopkg.in/cenkalti/backoff.v3 to view the documentation.
+
+## Contributing
+
+* I would like to keep this library as small as possible.
+* Please don't send a PR without opening an issue and discussing it first.
+* If proposed change is not a common use case, I will probably not accept it.
+
+[godoc]: https://godoc.org/github.com/cenkalti/backoff
+[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png
+[travis]: https://travis-ci.org/cenkalti/backoff
+[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master
+[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
+[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
+
+[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java
+[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
+
+[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_
diff --git a/vendor/github.com/cenkalti/backoff/v3/backoff.go b/vendor/github.com/cenkalti/backoff/v3/backoff.go
new file mode 100644
index 0000000..3676ee4
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/backoff.go
@@ -0,0 +1,66 @@
+// Package backoff implements backoff algorithms for retrying operations.
+//
+// Use Retry function for retrying operations that may fail.
+// If Retry does not meet your needs,
+// copy/paste the function into your project and modify as you wish.
+//
+// There is also Ticker type similar to time.Ticker.
+// You can use it if you need to work with channels.
+//
+// See Examples section below for usage examples.
+package backoff
+
+import "time"
+
+// BackOff is a backoff policy for retrying an operation.
+type BackOff interface {
+	// NextBackOff returns the duration to wait before retrying the operation,
+	// or backoff. Stop to indicate that no more retries should be made.
+	//
+	// Example usage:
+	//
+	// 	duration := backoff.NextBackOff();
+	// 	if (duration == backoff.Stop) {
+	// 		// Do not retry operation.
+	// 	} else {
+	// 		// Sleep for duration and retry operation.
+	// 	}
+	//
+	NextBackOff() time.Duration
+
+	// Reset to initial state.
+	Reset()
+}
+
+// Stop indicates that no more retries should be made for use in NextBackOff().
+const Stop time.Duration = -1
+
+// ZeroBackOff is a fixed backoff policy whose backoff time is always zero,
+// meaning that the operation is retried immediately without waiting, indefinitely.
+type ZeroBackOff struct{}
+
+func (b *ZeroBackOff) Reset() {}
+
+func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 }
+
+// StopBackOff is a fixed backoff policy that always returns backoff.Stop for
+// NextBackOff(), meaning that the operation should never be retried.
+type StopBackOff struct{}
+
+func (b *StopBackOff) Reset() {}
+
+func (b *StopBackOff) NextBackOff() time.Duration { return Stop }
+
+// ConstantBackOff is a backoff policy that always returns the same backoff delay.
+// This is in contrast to an exponential backoff policy,
+// which returns a delay that grows longer as you call NextBackOff() over and over again.
+type ConstantBackOff struct {
+	Interval time.Duration
+}
+
+func (b *ConstantBackOff) Reset()                     {}
+func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval }
+
+func NewConstantBackOff(d time.Duration) *ConstantBackOff {
+	return &ConstantBackOff{Interval: d}
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/context.go b/vendor/github.com/cenkalti/backoff/v3/context.go
new file mode 100644
index 0000000..fcff86c
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/context.go
@@ -0,0 +1,66 @@
+package backoff
+
+import (
+	"context"
+	"time"
+)
+
+// BackOffContext is a backoff policy that stops retrying after the context
+// is canceled.
+type BackOffContext interface { // nolint: golint
+	BackOff
+	Context() context.Context
+}
+
+type backOffContext struct {
+	BackOff
+	ctx context.Context
+}
+
+// WithContext returns a BackOffContext with context ctx
+//
+// ctx must not be nil
+func WithContext(b BackOff, ctx context.Context) BackOffContext { // nolint: golint
+	if ctx == nil {
+		panic("nil context")
+	}
+
+	if b, ok := b.(*backOffContext); ok {
+		return &backOffContext{
+			BackOff: b.BackOff,
+			ctx:     ctx,
+		}
+	}
+
+	return &backOffContext{
+		BackOff: b,
+		ctx:     ctx,
+	}
+}
+
+func getContext(b BackOff) context.Context {
+	if cb, ok := b.(BackOffContext); ok {
+		return cb.Context()
+	}
+	if tb, ok := b.(*backOffTries); ok {
+		return getContext(tb.delegate)
+	}
+	return context.Background()
+}
+
+func (b *backOffContext) Context() context.Context {
+	return b.ctx
+}
+
+func (b *backOffContext) NextBackOff() time.Duration {
+	select {
+	case <-b.ctx.Done():
+		return Stop
+	default:
+	}
+	next := b.BackOff.NextBackOff()
+	if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next { // nolint: gosimple
+		return Stop
+	}
+	return next
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/exponential.go b/vendor/github.com/cenkalti/backoff/v3/exponential.go
new file mode 100644
index 0000000..cb11cc1
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/exponential.go
@@ -0,0 +1,154 @@
+package backoff
+
+import (
+	"math/rand"
+	"time"
+)
+
+/*
+ExponentialBackOff is a backoff implementation that increases the backoff
+period for each retry attempt using a randomization function that grows exponentially.
+
+NextBackOff() is calculated using the following formula:
+
+ randomized interval =
+     RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
+
+In other words NextBackOff() will range between the randomization factor
+percentage below and above the retry interval.
+
+For example, given the following parameters:
+
+ RetryInterval = 2
+ RandomizationFactor = 0.5
+ Multiplier = 2
+
+the actual backoff period used in the next retry attempt will range between 1 and 3 seconds,
+multiplied by the exponential, that is, between 2 and 6 seconds.
+
+Note: MaxInterval caps the RetryInterval and not the randomized interval.
+
+If the time elapsed since an ExponentialBackOff instance is created goes past the
+MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop.
+
+The elapsed time can be reset by calling Reset().
+
+Example: Given the following default arguments, for 10 tries the sequence will be,
+and assuming we go over the MaxElapsedTime on the 10th try:
+
+ Request #  RetryInterval (seconds)  Randomized Interval (seconds)
+
+  1          0.5                     [0.25,   0.75]
+  2          0.75                    [0.375,  1.125]
+  3          1.125                   [0.562,  1.687]
+  4          1.687                   [0.8435, 2.53]
+  5          2.53                    [1.265,  3.795]
+  6          3.795                   [1.897,  5.692]
+  7          5.692                   [2.846,  8.538]
+  8          8.538                   [4.269, 12.807]
+  9         12.807                   [6.403, 19.210]
+ 10         19.210                   backoff.Stop
+
+Note: Implementation is not thread-safe.
+*/
+type ExponentialBackOff struct {
+	InitialInterval     time.Duration
+	RandomizationFactor float64
+	Multiplier          float64
+	MaxInterval         time.Duration
+	// After MaxElapsedTime the ExponentialBackOff stops.
+	// It never stops if MaxElapsedTime == 0.
+	MaxElapsedTime time.Duration
+	Clock          Clock
+
+	currentInterval time.Duration
+	startTime       time.Time
+}
+
+// Clock is an interface that returns current time for BackOff.
+type Clock interface {
+	Now() time.Time
+}
+
+// Default values for ExponentialBackOff.
+const (
+	DefaultInitialInterval     = 500 * time.Millisecond
+	DefaultRandomizationFactor = 0.5
+	DefaultMultiplier          = 1.5
+	DefaultMaxInterval         = 60 * time.Second
+	DefaultMaxElapsedTime      = 15 * time.Minute
+)
+
+// NewExponentialBackOff creates an instance of ExponentialBackOff using default values.
+func NewExponentialBackOff() *ExponentialBackOff {
+	b := &ExponentialBackOff{
+		InitialInterval:     DefaultInitialInterval,
+		RandomizationFactor: DefaultRandomizationFactor,
+		Multiplier:          DefaultMultiplier,
+		MaxInterval:         DefaultMaxInterval,
+		MaxElapsedTime:      DefaultMaxElapsedTime,
+		Clock:               SystemClock,
+	}
+	b.Reset()
+	return b
+}
+
+type systemClock struct{}
+
+func (t systemClock) Now() time.Time {
+	return time.Now()
+}
+
+// SystemClock implements Clock interface that uses time.Now().
+var SystemClock = systemClock{}
+
+// Reset the interval back to the initial retry interval and restarts the timer.
+// Reset must be called before using b.
+func (b *ExponentialBackOff) Reset() {
+	b.currentInterval = b.InitialInterval
+	b.startTime = b.Clock.Now()
+}
+
+// NextBackOff calculates the next backoff interval using the formula:
+// 	Randomized interval = RetryInterval * (1 ± RandomizationFactor)
+func (b *ExponentialBackOff) NextBackOff() time.Duration {
+	// Make sure we have not gone over the maximum elapsed time.
+	if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime {
+		return Stop
+	}
+	defer b.incrementCurrentInterval()
+	return getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval)
+}
+
+// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance
+// is created and is reset when Reset() is called.
+//
+// The elapsed time is computed using time.Now().UnixNano(). It is
+// safe to call even while the backoff policy is used by a running
+// ticker.
+func (b *ExponentialBackOff) GetElapsedTime() time.Duration {
+	return b.Clock.Now().Sub(b.startTime)
+}
+
+// Increments the current interval by multiplying it with the multiplier.
+func (b *ExponentialBackOff) incrementCurrentInterval() {
+	// Check for overflow, if overflow is detected set the current interval to the max interval.
+	if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier {
+		b.currentInterval = b.MaxInterval
+	} else {
+		b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier)
+	}
+}
+
+// Returns a random value from the following interval:
+// 	[randomizationFactor * currentInterval, randomizationFactor * currentInterval].
+func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
+	var delta = randomizationFactor * float64(currentInterval)
+	var minInterval = float64(currentInterval) - delta
+	var maxInterval = float64(currentInterval) + delta
+
+	// Get a random value from the range [minInterval, maxInterval].
+	// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
+	// we want a 33% chance for selecting either 1, 2 or 3.
+	return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/go.mod b/vendor/github.com/cenkalti/backoff/v3/go.mod
new file mode 100644
index 0000000..479e62a
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/go.mod
@@ -0,0 +1,3 @@
+module github.com/cenkalti/backoff/v3
+
+go 1.12
diff --git a/vendor/github.com/cenkalti/backoff/v3/retry.go b/vendor/github.com/cenkalti/backoff/v3/retry.go
new file mode 100644
index 0000000..6c776cc
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/retry.go
@@ -0,0 +1,96 @@
+package backoff
+
+import "time"
+
+// An Operation is executing by Retry() or RetryNotify().
+// The operation will be retried using a backoff policy if it returns an error.
+type Operation func() error
+
+// Notify is a notify-on-error function. It receives an operation error and
+// backoff delay if the operation failed (with an error).
+//
+// NOTE that if the backoff policy stated to stop retrying,
+// the notify function isn't called.
+type Notify func(error, time.Duration)
+
+// Retry the operation o until it does not return error or BackOff stops.
+// o is guaranteed to be run at least once.
+//
+// If o returns a *PermanentError, the operation is not retried, and the
+// wrapped error is returned.
+//
+// Retry sleeps the goroutine for the duration returned by BackOff after a
+// failed operation returns.
+func Retry(o Operation, b BackOff) error {
+	return RetryNotify(o, b, nil)
+}
+
+// RetryNotify calls notify function with the error and wait duration
+// for each failed attempt before sleep.
+func RetryNotify(operation Operation, b BackOff, notify Notify) error {
+	return RetryNotifyWithTimer(operation, b, notify, nil)
+}
+
+// RetryNotifyWithTimer calls notify function with the error and wait duration using the given Timer
+// for each failed attempt before sleep.
+// A default timer that uses system timer is used when nil is passed.
+func RetryNotifyWithTimer(operation Operation, b BackOff, notify Notify, t Timer) error {
+	var err error
+	var next time.Duration
+	if t == nil {
+		t = &defaultTimer{}
+	}
+
+	defer func() {
+		t.Stop()
+	}()
+
+	ctx := getContext(b)
+
+	b.Reset()
+	for {
+		if err = operation(); err == nil {
+			return nil
+		}
+
+		if permanent, ok := err.(*PermanentError); ok {
+			return permanent.Err
+		}
+
+		if next = b.NextBackOff(); next == Stop {
+			return err
+		}
+
+		if notify != nil {
+			notify(err, next)
+		}
+
+		t.Start(next)
+
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-t.C():
+		}
+	}
+}
+
+// PermanentError signals that the operation should not be retried.
+type PermanentError struct {
+	Err error
+}
+
+func (e *PermanentError) Error() string {
+	return e.Err.Error()
+}
+
+func (e *PermanentError) Unwrap() error {
+	return e.Err
+}
+
+// Permanent wraps the given err in a *PermanentError.
+func Permanent(err error) *PermanentError {
+	return &PermanentError{
+		Err: err,
+	}
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/ticker.go b/vendor/github.com/cenkalti/backoff/v3/ticker.go
new file mode 100644
index 0000000..ed699e0
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/ticker.go
@@ -0,0 +1,94 @@
+package backoff
+
+import (
+	"context"
+	"sync"
+	"time"
+)
+
+// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
+//
+// Ticks will continue to arrive when the previous operation is still running,
+// so operations that take a while to fail could run in quick succession.
+type Ticker struct {
+	C        <-chan time.Time
+	c        chan time.Time
+	b        BackOff
+	ctx      context.Context
+	timer    Timer
+	stop     chan struct{}
+	stopOnce sync.Once
+}
+
+// NewTicker returns a new Ticker containing a channel that will send
+// the time at times specified by the BackOff argument. Ticker is
+// guaranteed to tick at least once.  The channel is closed when Stop
+// method is called or BackOff stops. It is not safe to manipulate the
+// provided backoff policy (notably calling NextBackOff or Reset)
+// while the ticker is running.
+func NewTicker(b BackOff) *Ticker {
+	return NewTickerWithTimer(b, &defaultTimer{})
+}
+
+// NewTickerWithTimer returns a new Ticker with a custom timer.
+// A default timer that uses system timer is used when nil is passed.
+func NewTickerWithTimer(b BackOff, timer Timer) *Ticker {
+	c := make(chan time.Time)
+	t := &Ticker{
+		C:     c,
+		c:     c,
+		b:     b,
+		ctx:   getContext(b),
+		timer: timer,
+		stop:  make(chan struct{}),
+	}
+	t.b.Reset()
+	go t.run()
+	return t
+}
+
+// Stop turns off a ticker. After Stop, no more ticks will be sent.
+func (t *Ticker) Stop() {
+	t.stopOnce.Do(func() { close(t.stop) })
+}
+
+func (t *Ticker) run() {
+	c := t.c
+	defer close(c)
+
+	// Ticker is guaranteed to tick at least once.
+	afterC := t.send(time.Now())
+
+	for {
+		if afterC == nil {
+			return
+		}
+
+		select {
+		case tick := <-afterC:
+			afterC = t.send(tick)
+		case <-t.stop:
+			t.c = nil // Prevent future ticks from being sent to the channel.
+			return
+		case <-t.ctx.Done():
+			return
+		}
+	}
+}
+
+func (t *Ticker) send(tick time.Time) <-chan time.Time {
+	select {
+	case t.c <- tick:
+	case <-t.stop:
+		return nil
+	}
+
+	next := t.b.NextBackOff()
+	if next == Stop {
+		t.Stop()
+		return nil
+	}
+
+	t.timer.Start(next)
+	return t.timer.C()
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/timer.go b/vendor/github.com/cenkalti/backoff/v3/timer.go
new file mode 100644
index 0000000..8120d02
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/timer.go
@@ -0,0 +1,35 @@
+package backoff
+
+import "time"
+
+type Timer interface {
+	Start(duration time.Duration)
+	Stop()
+	C() <-chan time.Time
+}
+
+// defaultTimer implements Timer interface using time.Timer
+type defaultTimer struct {
+	timer *time.Timer
+}
+
+// C returns the timers channel which receives the current time when the timer fires.
+func (t *defaultTimer) C() <-chan time.Time {
+	return t.timer.C
+}
+
+// Start starts the timer to fire after the given duration
+func (t *defaultTimer) Start(duration time.Duration) {
+	if t.timer == nil {
+		t.timer = time.NewTimer(duration)
+	} else {
+		t.timer.Reset(duration)
+	}
+}
+
+// Stop is called when the timer is not used anymore and resources may be freed.
+func (t *defaultTimer) Stop() {
+	if t.timer != nil {
+		t.timer.Stop()
+	}
+}
diff --git a/vendor/github.com/cenkalti/backoff/v3/tries.go b/vendor/github.com/cenkalti/backoff/v3/tries.go
new file mode 100644
index 0000000..cfeefd9
--- /dev/null
+++ b/vendor/github.com/cenkalti/backoff/v3/tries.go
@@ -0,0 +1,35 @@
+package backoff
+
+import "time"
+
+/*
+WithMaxRetries creates a wrapper around another BackOff, which will
+return Stop if NextBackOff() has been called too many times since
+the last time Reset() was called
+
+Note: Implementation is not thread-safe.
+*/
+func WithMaxRetries(b BackOff, max uint64) BackOff {
+	return &backOffTries{delegate: b, maxTries: max}
+}
+
+type backOffTries struct {
+	delegate BackOff
+	maxTries uint64
+	numTries uint64
+}
+
+func (b *backOffTries) NextBackOff() time.Duration {
+	if b.maxTries > 0 {
+		if b.maxTries <= b.numTries {
+			return Stop
+		}
+		b.numTries++
+	}
+	return b.delegate.NextBackOff()
+}
+
+func (b *backOffTries) Reset() {
+	b.numTries = 0
+	b.delegate.Reset()
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 22ed9b3..eeebf5a 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -8,6 +8,9 @@
 github.com/bsm/sarama-cluster
 # github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
 github.com/buraksezer/consistent
+# github.com/cenkalti/backoff/v3 v3.2.2
+## explicit
+github.com/cenkalti/backoff/v3
 # github.com/cespare/xxhash v1.1.0
 github.com/cespare/xxhash
 # github.com/cevaris/ordered_map v0.0.0-20190319150403-3adeae072e73