[VOL-3981] Voltha Core restarts if it can't subscribe to Kafka

This commit fixes the following issues:
1) It creates a loop to try to resubscribe to kafka on error.  This
is an issue that occurs randomly, especially when the kafka broker
is up and running but not truly ready to create a new topic.

2) Fixes an issue where the event proxy start was incorrect setting
the cluster messaging bus probe to out of service and never sets it
to running.   This was causing the Core to wait forever for the
probe to be ready.

Change-Id: Idf22481f85e4b576440301f2859da7ddf2d8c688
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2b5017e..6510964 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,104 +24,113 @@
 
 // RW Core service default constants
 const (
-	EtcdStoreName                    = "etcd"
-	defaultGrpcAddress               = ":50057"
-	defaultKafkaAdapterAddress       = "127.0.0.1:9092"
-	defaultKafkaClusterAddress       = "127.0.0.1:9094"
-	defaultKVStoreType               = EtcdStoreName
-	defaultKVStoreTimeout            = 5 * time.Second
-	defaultKVStoreAddress            = "127.0.0.1:2379" // Etcd = 2379
-	defaultKVTxnKeyDelTime           = 60
-	defaultLogLevel                  = "WARN"
-	defaultBanner                    = false
-	defaultDisplayVersionOnly        = false
-	defaultCoreTopic                 = "rwcore"
-	defaultEventTopic                = "voltha.events"
-	defaultRWCoreEndpoint            = "rwcore"
-	defaultRWCoreKey                 = "pki/voltha.key"
-	defaultRWCoreCert                = "pki/voltha.crt"
-	defaultRWCoreCA                  = "pki/voltha-CA.pem"
-	defaultLongRunningRequestTimeout = 2000 * time.Millisecond
-	defaultDefaultRequestTimeout     = 1000 * time.Millisecond
-	defaultCoreTimeout               = 1000 * time.Millisecond
-	defaultCoreBindingKey            = "voltha_backend_name"
-	defaultMaxConnectionRetries      = -1 // retries forever
-	defaultConnectionRetryInterval   = 2 * time.Second
-	defaultLiveProbeInterval         = 60 * time.Second
-	defaultNotLiveProbeInterval      = 5 * time.Second // Probe more frequently when not alive
-	defaultProbeAddress              = ":8080"
-	defaultTraceEnabled              = false
-	defaultTraceAgentAddress         = "127.0.0.1:6831"
-	defaultLogCorrelationEnabled     = true
-	defaultVolthaStackID             = "voltha"
+	EtcdStoreName                      = "etcd"
+	defaultGrpcAddress                 = ":50057"
+	defaultKafkaAdapterAddress         = "127.0.0.1:9092"
+	defaultKafkaClusterAddress         = "127.0.0.1:9094"
+	defaultKVStoreType                 = EtcdStoreName
+	defaultKVStoreTimeout              = 5 * time.Second
+	defaultKVStoreAddress              = "127.0.0.1:2379" // Etcd = 2379
+	defaultKVTxnKeyDelTime             = 60
+	defaultLogLevel                    = "WARN"
+	defaultBanner                      = false
+	defaultDisplayVersionOnly          = false
+	defaultCoreTopic                   = "rwcore"
+	defaultEventTopic                  = "voltha.events"
+	defaultRWCoreEndpoint              = "rwcore"
+	defaultRWCoreKey                   = "pki/voltha.key"
+	defaultRWCoreCert                  = "pki/voltha.crt"
+	defaultRWCoreCA                    = "pki/voltha-CA.pem"
+	defaultLongRunningRequestTimeout   = 2000 * time.Millisecond
+	defaultDefaultRequestTimeout       = 1000 * time.Millisecond
+	defaultCoreTimeout                 = 1000 * time.Millisecond
+	defaultCoreBindingKey              = "voltha_backend_name"
+	defaultMaxConnectionRetries        = -1 // retries forever
+	defaultConnectionRetryInterval     = 2 * time.Second
+	defaultLiveProbeInterval           = 60 * time.Second
+	defaultNotLiveProbeInterval        = 5 * time.Second // Probe more frequently when not alive
+	defaultProbeAddress                = ":8080"
+	defaultTraceEnabled                = false
+	defaultTraceAgentAddress           = "127.0.0.1:6831"
+	defaultLogCorrelationEnabled       = true
+	defaultVolthaStackID               = "voltha"
+	defaultBackoffRetryInitialInterval = 500 * time.Millisecond
+	defaultBackoffRetryMaxElapsedTime  = 0
+	defaultBackoffRetryMaxInterval     = 1 * time.Minute
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	RWCoreEndpoint            string
-	GrpcAddress               string
-	KafkaAdapterAddress       string
-	KafkaClusterAddress       string
-	KVStoreType               string
-	KVStoreTimeout            time.Duration
-	KVStoreAddress            string
-	KVTxnKeyDelTime           int
-	CoreTopic                 string
-	EventTopic                string
-	LogLevel                  string
-	Banner                    bool
-	DisplayVersionOnly        bool
-	RWCoreKey                 string
-	RWCoreCert                string
-	RWCoreCA                  string
-	LongRunningRequestTimeout time.Duration
-	DefaultRequestTimeout     time.Duration
-	DefaultCoreTimeout        time.Duration
-	CoreBindingKey            string
-	MaxConnectionRetries      int
-	ConnectionRetryInterval   time.Duration
-	LiveProbeInterval         time.Duration
-	NotLiveProbeInterval      time.Duration
-	ProbeAddress              string
-	TraceEnabled              bool
-	TraceAgentAddress         string
-	LogCorrelationEnabled     bool
-	VolthaStackID             string
+	RWCoreEndpoint              string
+	GrpcAddress                 string
+	KafkaAdapterAddress         string
+	KafkaClusterAddress         string
+	KVStoreType                 string
+	KVStoreTimeout              time.Duration
+	KVStoreAddress              string
+	KVTxnKeyDelTime             int
+	CoreTopic                   string
+	EventTopic                  string
+	LogLevel                    string
+	Banner                      bool
+	DisplayVersionOnly          bool
+	RWCoreKey                   string
+	RWCoreCert                  string
+	RWCoreCA                    string
+	LongRunningRequestTimeout   time.Duration
+	DefaultRequestTimeout       time.Duration
+	DefaultCoreTimeout          time.Duration
+	CoreBindingKey              string
+	MaxConnectionRetries        int
+	ConnectionRetryInterval     time.Duration
+	LiveProbeInterval           time.Duration
+	NotLiveProbeInterval        time.Duration
+	ProbeAddress                string
+	TraceEnabled                bool
+	TraceAgentAddress           string
+	LogCorrelationEnabled       bool
+	VolthaStackID               string
+	BackoffRetryInitialInterval time.Duration
+	BackoffRetryMaxElapsedTime  time.Duration
+	BackoffRetryMaxInterval     time.Duration
 }
 
 // NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		RWCoreEndpoint:            defaultRWCoreEndpoint,
-		GrpcAddress:               defaultGrpcAddress,
-		KafkaAdapterAddress:       defaultKafkaAdapterAddress,
-		KafkaClusterAddress:       defaultKafkaClusterAddress,
-		KVStoreType:               defaultKVStoreType,
-		KVStoreTimeout:            defaultKVStoreTimeout,
-		KVStoreAddress:            defaultKVStoreAddress,
-		KVTxnKeyDelTime:           defaultKVTxnKeyDelTime,
-		CoreTopic:                 defaultCoreTopic,
-		EventTopic:                defaultEventTopic,
-		LogLevel:                  defaultLogLevel,
-		Banner:                    defaultBanner,
-		DisplayVersionOnly:        defaultDisplayVersionOnly,
-		RWCoreKey:                 defaultRWCoreKey,
-		RWCoreCert:                defaultRWCoreCert,
-		RWCoreCA:                  defaultRWCoreCA,
-		DefaultRequestTimeout:     defaultDefaultRequestTimeout,
-		LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
-		DefaultCoreTimeout:        defaultCoreTimeout,
-		CoreBindingKey:            defaultCoreBindingKey,
-		MaxConnectionRetries:      defaultMaxConnectionRetries,
-		ConnectionRetryInterval:   defaultConnectionRetryInterval,
-		LiveProbeInterval:         defaultLiveProbeInterval,
-		NotLiveProbeInterval:      defaultNotLiveProbeInterval,
-		ProbeAddress:              defaultProbeAddress,
-		TraceEnabled:              defaultTraceEnabled,
-		TraceAgentAddress:         defaultTraceAgentAddress,
-		LogCorrelationEnabled:     defaultLogCorrelationEnabled,
-		VolthaStackID:             defaultVolthaStackID,
+		RWCoreEndpoint:              defaultRWCoreEndpoint,
+		GrpcAddress:                 defaultGrpcAddress,
+		KafkaAdapterAddress:         defaultKafkaAdapterAddress,
+		KafkaClusterAddress:         defaultKafkaClusterAddress,
+		KVStoreType:                 defaultKVStoreType,
+		KVStoreTimeout:              defaultKVStoreTimeout,
+		KVStoreAddress:              defaultKVStoreAddress,
+		KVTxnKeyDelTime:             defaultKVTxnKeyDelTime,
+		CoreTopic:                   defaultCoreTopic,
+		EventTopic:                  defaultEventTopic,
+		LogLevel:                    defaultLogLevel,
+		Banner:                      defaultBanner,
+		DisplayVersionOnly:          defaultDisplayVersionOnly,
+		RWCoreKey:                   defaultRWCoreKey,
+		RWCoreCert:                  defaultRWCoreCert,
+		RWCoreCA:                    defaultRWCoreCA,
+		DefaultRequestTimeout:       defaultDefaultRequestTimeout,
+		LongRunningRequestTimeout:   defaultLongRunningRequestTimeout,
+		DefaultCoreTimeout:          defaultCoreTimeout,
+		CoreBindingKey:              defaultCoreBindingKey,
+		MaxConnectionRetries:        defaultMaxConnectionRetries,
+		ConnectionRetryInterval:     defaultConnectionRetryInterval,
+		LiveProbeInterval:           defaultLiveProbeInterval,
+		NotLiveProbeInterval:        defaultNotLiveProbeInterval,
+		ProbeAddress:                defaultProbeAddress,
+		TraceEnabled:                defaultTraceEnabled,
+		TraceAgentAddress:           defaultTraceAgentAddress,
+		LogCorrelationEnabled:       defaultLogCorrelationEnabled,
+		VolthaStackID:               defaultVolthaStackID,
+		BackoffRetryInitialInterval: defaultBackoffRetryInitialInterval,
+		BackoffRetryMaxElapsedTime:  defaultBackoffRetryMaxElapsedTime,
+		BackoffRetryMaxInterval:     defaultBackoffRetryMaxInterval,
 	}
 	return &rwCoreFlag
 }
@@ -209,5 +218,14 @@
 	help = fmt.Sprintf("ID for the current voltha stack")
 	flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
 
+	help = fmt.Sprintf("The initial number of milliseconds an exponential backoff will wait before a retry")
+	flag.DurationVar(&(cf.BackoffRetryInitialInterval), "backoff_retry_initial_interval", defaultBackoffRetryInitialInterval, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds an exponential backoff can elasped")
+	flag.DurationVar(&(cf.BackoffRetryMaxElapsedTime), "backoff_retry_max_elapsed_time", defaultBackoffRetryMaxElapsedTime, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds of an exponential backoff interval")
+	flag.DurationVar(&(cf.BackoffRetryMaxInterval), "backoff_retry_max_interval", defaultBackoffRetryMaxInterval, help)
+
 	flag.Parse()
 }