[VOL-3981] Voltha Core restarts if it can't subscribe to Kafka
This commit fixes the following issues:
1) It creates a loop to try to resubscribe to kafka on error. This
is an issue that occurs randomly, especially when the kafka broker
is up and running but not truly ready to create a new topic.
2) Fixes an issue where the event proxy start was incorrect setting
the cluster messaging bus probe to out of service and never sets it
to running. This was causing the Core to wait forever for the
probe to be ready.
Change-Id: Idf22481f85e4b576440301f2859da7ddf2d8c688
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2b5017e..6510964 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,104 +24,113 @@
// RW Core service default constants
const (
- EtcdStoreName = "etcd"
- defaultGrpcAddress = ":50057"
- defaultKafkaAdapterAddress = "127.0.0.1:9092"
- defaultKafkaClusterAddress = "127.0.0.1:9094"
- defaultKVStoreType = EtcdStoreName
- defaultKVStoreTimeout = 5 * time.Second
- defaultKVStoreAddress = "127.0.0.1:2379" // Etcd = 2379
- defaultKVTxnKeyDelTime = 60
- defaultLogLevel = "WARN"
- defaultBanner = false
- defaultDisplayVersionOnly = false
- defaultCoreTopic = "rwcore"
- defaultEventTopic = "voltha.events"
- defaultRWCoreEndpoint = "rwcore"
- defaultRWCoreKey = "pki/voltha.key"
- defaultRWCoreCert = "pki/voltha.crt"
- defaultRWCoreCA = "pki/voltha-CA.pem"
- defaultLongRunningRequestTimeout = 2000 * time.Millisecond
- defaultDefaultRequestTimeout = 1000 * time.Millisecond
- defaultCoreTimeout = 1000 * time.Millisecond
- defaultCoreBindingKey = "voltha_backend_name"
- defaultMaxConnectionRetries = -1 // retries forever
- defaultConnectionRetryInterval = 2 * time.Second
- defaultLiveProbeInterval = 60 * time.Second
- defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
- defaultProbeAddress = ":8080"
- defaultTraceEnabled = false
- defaultTraceAgentAddress = "127.0.0.1:6831"
- defaultLogCorrelationEnabled = true
- defaultVolthaStackID = "voltha"
+ EtcdStoreName = "etcd"
+ defaultGrpcAddress = ":50057"
+ defaultKafkaAdapterAddress = "127.0.0.1:9092"
+ defaultKafkaClusterAddress = "127.0.0.1:9094"
+ defaultKVStoreType = EtcdStoreName
+ defaultKVStoreTimeout = 5 * time.Second
+ defaultKVStoreAddress = "127.0.0.1:2379" // Etcd = 2379
+ defaultKVTxnKeyDelTime = 60
+ defaultLogLevel = "WARN"
+ defaultBanner = false
+ defaultDisplayVersionOnly = false
+ defaultCoreTopic = "rwcore"
+ defaultEventTopic = "voltha.events"
+ defaultRWCoreEndpoint = "rwcore"
+ defaultRWCoreKey = "pki/voltha.key"
+ defaultRWCoreCert = "pki/voltha.crt"
+ defaultRWCoreCA = "pki/voltha-CA.pem"
+ defaultLongRunningRequestTimeout = 2000 * time.Millisecond
+ defaultDefaultRequestTimeout = 1000 * time.Millisecond
+ defaultCoreTimeout = 1000 * time.Millisecond
+ defaultCoreBindingKey = "voltha_backend_name"
+ defaultMaxConnectionRetries = -1 // retries forever
+ defaultConnectionRetryInterval = 2 * time.Second
+ defaultLiveProbeInterval = 60 * time.Second
+ defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
+ defaultProbeAddress = ":8080"
+ defaultTraceEnabled = false
+ defaultTraceAgentAddress = "127.0.0.1:6831"
+ defaultLogCorrelationEnabled = true
+ defaultVolthaStackID = "voltha"
+ defaultBackoffRetryInitialInterval = 500 * time.Millisecond
+ defaultBackoffRetryMaxElapsedTime = 0
+ defaultBackoffRetryMaxInterval = 1 * time.Minute
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- RWCoreEndpoint string
- GrpcAddress string
- KafkaAdapterAddress string
- KafkaClusterAddress string
- KVStoreType string
- KVStoreTimeout time.Duration
- KVStoreAddress string
- KVTxnKeyDelTime int
- CoreTopic string
- EventTopic string
- LogLevel string
- Banner bool
- DisplayVersionOnly bool
- RWCoreKey string
- RWCoreCert string
- RWCoreCA string
- LongRunningRequestTimeout time.Duration
- DefaultRequestTimeout time.Duration
- DefaultCoreTimeout time.Duration
- CoreBindingKey string
- MaxConnectionRetries int
- ConnectionRetryInterval time.Duration
- LiveProbeInterval time.Duration
- NotLiveProbeInterval time.Duration
- ProbeAddress string
- TraceEnabled bool
- TraceAgentAddress string
- LogCorrelationEnabled bool
- VolthaStackID string
+ RWCoreEndpoint string
+ GrpcAddress string
+ KafkaAdapterAddress string
+ KafkaClusterAddress string
+ KVStoreType string
+ KVStoreTimeout time.Duration
+ KVStoreAddress string
+ KVTxnKeyDelTime int
+ CoreTopic string
+ EventTopic string
+ LogLevel string
+ Banner bool
+ DisplayVersionOnly bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
+ LongRunningRequestTimeout time.Duration
+ DefaultRequestTimeout time.Duration
+ DefaultCoreTimeout time.Duration
+ CoreBindingKey string
+ MaxConnectionRetries int
+ ConnectionRetryInterval time.Duration
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
+ ProbeAddress string
+ TraceEnabled bool
+ TraceAgentAddress string
+ LogCorrelationEnabled bool
+ VolthaStackID string
+ BackoffRetryInitialInterval time.Duration
+ BackoffRetryMaxElapsedTime time.Duration
+ BackoffRetryMaxInterval time.Duration
}
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- RWCoreEndpoint: defaultRWCoreEndpoint,
- GrpcAddress: defaultGrpcAddress,
- KafkaAdapterAddress: defaultKafkaAdapterAddress,
- KafkaClusterAddress: defaultKafkaClusterAddress,
- KVStoreType: defaultKVStoreType,
- KVStoreTimeout: defaultKVStoreTimeout,
- KVStoreAddress: defaultKVStoreAddress,
- KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
- CoreTopic: defaultCoreTopic,
- EventTopic: defaultEventTopic,
- LogLevel: defaultLogLevel,
- Banner: defaultBanner,
- DisplayVersionOnly: defaultDisplayVersionOnly,
- RWCoreKey: defaultRWCoreKey,
- RWCoreCert: defaultRWCoreCert,
- RWCoreCA: defaultRWCoreCA,
- DefaultRequestTimeout: defaultDefaultRequestTimeout,
- LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
- DefaultCoreTimeout: defaultCoreTimeout,
- CoreBindingKey: defaultCoreBindingKey,
- MaxConnectionRetries: defaultMaxConnectionRetries,
- ConnectionRetryInterval: defaultConnectionRetryInterval,
- LiveProbeInterval: defaultLiveProbeInterval,
- NotLiveProbeInterval: defaultNotLiveProbeInterval,
- ProbeAddress: defaultProbeAddress,
- TraceEnabled: defaultTraceEnabled,
- TraceAgentAddress: defaultTraceAgentAddress,
- LogCorrelationEnabled: defaultLogCorrelationEnabled,
- VolthaStackID: defaultVolthaStackID,
+ RWCoreEndpoint: defaultRWCoreEndpoint,
+ GrpcAddress: defaultGrpcAddress,
+ KafkaAdapterAddress: defaultKafkaAdapterAddress,
+ KafkaClusterAddress: defaultKafkaClusterAddress,
+ KVStoreType: defaultKVStoreType,
+ KVStoreTimeout: defaultKVStoreTimeout,
+ KVStoreAddress: defaultKVStoreAddress,
+ KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
+ CoreTopic: defaultCoreTopic,
+ EventTopic: defaultEventTopic,
+ LogLevel: defaultLogLevel,
+ Banner: defaultBanner,
+ DisplayVersionOnly: defaultDisplayVersionOnly,
+ RWCoreKey: defaultRWCoreKey,
+ RWCoreCert: defaultRWCoreCert,
+ RWCoreCA: defaultRWCoreCA,
+ DefaultRequestTimeout: defaultDefaultRequestTimeout,
+ LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
+ DefaultCoreTimeout: defaultCoreTimeout,
+ CoreBindingKey: defaultCoreBindingKey,
+ MaxConnectionRetries: defaultMaxConnectionRetries,
+ ConnectionRetryInterval: defaultConnectionRetryInterval,
+ LiveProbeInterval: defaultLiveProbeInterval,
+ NotLiveProbeInterval: defaultNotLiveProbeInterval,
+ ProbeAddress: defaultProbeAddress,
+ TraceEnabled: defaultTraceEnabled,
+ TraceAgentAddress: defaultTraceAgentAddress,
+ LogCorrelationEnabled: defaultLogCorrelationEnabled,
+ VolthaStackID: defaultVolthaStackID,
+ BackoffRetryInitialInterval: defaultBackoffRetryInitialInterval,
+ BackoffRetryMaxElapsedTime: defaultBackoffRetryMaxElapsedTime,
+ BackoffRetryMaxInterval: defaultBackoffRetryMaxInterval,
}
return &rwCoreFlag
}
@@ -209,5 +218,14 @@
help = fmt.Sprintf("ID for the current voltha stack")
flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
+ help = fmt.Sprintf("The initial number of milliseconds an exponential backoff will wait before a retry")
+ flag.DurationVar(&(cf.BackoffRetryInitialInterval), "backoff_retry_initial_interval", defaultBackoffRetryInitialInterval, help)
+
+ help = fmt.Sprintf("The maximum number of milliseconds an exponential backoff can elasped")
+ flag.DurationVar(&(cf.BackoffRetryMaxElapsedTime), "backoff_retry_max_elapsed_time", defaultBackoffRetryMaxElapsedTime, help)
+
+ help = fmt.Sprintf("The maximum number of milliseconds of an exponential backoff interval")
+ flag.DurationVar(&(cf.BackoffRetryMaxInterval), "backoff_retry_max_interval", defaultBackoffRetryMaxInterval, help)
+
flag.Parse()
}