[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a08e73d..576ae72 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -27,16 +27,14 @@
const (
EtcdStoreName = "etcd"
defaultInstanceid = "openOlt001"
- defaultKafkaadapteraddress = "127.0.0.1:9092"
- defaultKafkaclusteraddress = "127.0.0.1:9094"
+ defaultKafkaclusteraddress = "127.0.0.1:9092"
defaultKvstoretype = EtcdStoreName
defaultKvstoretimeout = 5 * time.Second
+ defaultRPCTimeout = 10 * time.Second
defaultKvstoreaddress = "127.0.0.1:2379" // Port: Consul = 8500; Etcd = 2379
defaultLoglevel = "WARN"
defaultBanner = false
defaultDisplayVersionOnly = false
- defaultTopic = "openolt"
- defaultCoretopic = "rwcore"
defaultEventtopic = "voltha.events"
defaultOnunumber = 1
defaultProbeAddress = ":8080"
@@ -46,6 +44,8 @@
defaultHeartbeatCheckInterval = 15 * time.Second
// defaultHeartbeatFailReportInterval is the time adapter will wait before updating the state to the core.
defaultHeartbeatFailReportInterval = 0 * time.Second
+ defaultGrpcAddress = ":50060"
+ defaultCoreEndpoint = ":55555"
//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
defaultGrpcTimeoutInterval = 2 * time.Second
defaultCurrentReplica = 1
@@ -56,6 +56,9 @@
defaultOmccEncryption = false
defaultEnableONUStats = false
defaultEnableGEMStats = false
+ defaultMinBackoffRetryDelay = 500 * time.Millisecond
+ defaultMaxBackoffRetryDelay = 10 * time.Second
+ defaultAdapterEndpoint = "adapter-open-olt"
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -63,13 +66,11 @@
// Command line parameters
AdapterName string
InstanceID string // NOTE what am I used for? why not cli but only ENV? TODO expose in the chart
- KafkaAdapterAddress string
KafkaClusterAddress string
KVStoreType string
KVStoreTimeout time.Duration
KVStoreAddress string
- Topic string
- CoreTopic string
+ RPCTimeout time.Duration
EventTopic string
LogLevel string
OnuNumber int
@@ -81,6 +82,8 @@
HeartbeatCheckInterval time.Duration
HeartbeatFailReportInterval time.Duration
GrpcTimeoutInterval time.Duration
+ GrpcAddress string
+ CoreEndpoint string
CurrentReplica int
TotalReplicas int
TraceEnabled bool
@@ -89,19 +92,19 @@
OmccEncryption bool
EnableONUStats bool
EnableGEMStats bool
+ MinBackoffRetryDelay time.Duration
+ MaxBackoffRetryDelay time.Duration
+ AdapterEndpoint string
}
// NewAdapterFlags returns a new RWCore config
func NewAdapterFlags() *AdapterFlags {
var adapterFlags = AdapterFlags{ // Default values
InstanceID: defaultInstanceid,
- KafkaAdapterAddress: defaultKafkaadapteraddress,
KafkaClusterAddress: defaultKafkaclusteraddress,
KVStoreType: defaultKvstoretype,
KVStoreTimeout: defaultKvstoretimeout,
KVStoreAddress: defaultKvstoreaddress,
- Topic: defaultTopic,
- CoreTopic: defaultCoretopic,
EventTopic: defaultEventtopic,
LogLevel: defaultLoglevel,
OnuNumber: defaultOnunumber,
@@ -112,6 +115,8 @@
NotLiveProbeInterval: defaultNotLiveProbeInterval,
HeartbeatCheckInterval: defaultHeartbeatCheckInterval,
HeartbeatFailReportInterval: defaultHeartbeatFailReportInterval,
+ GrpcAddress: defaultGrpcAddress,
+ CoreEndpoint: defaultCoreEndpoint,
GrpcTimeoutInterval: defaultGrpcTimeoutInterval,
TraceEnabled: defaultTraceEnabled,
TraceAgentAddress: defaultTraceAgentAddress,
@@ -119,6 +124,9 @@
OmccEncryption: defaultOmccEncryption,
EnableONUStats: defaultEnableONUStats,
EnableGEMStats: defaultEnableGEMStats,
+ RPCTimeout: defaultRPCTimeout,
+ MinBackoffRetryDelay: defaultMinBackoffRetryDelay,
+ MaxBackoffRetryDelay: defaultMaxBackoffRetryDelay,
}
return &adapterFlags
}
@@ -126,57 +134,150 @@
// ParseCommandArguments parses the arguments when running read-write adaptercore service
func (so *AdapterFlags) ParseCommandArguments() {
- flag.StringVar(&(so.KafkaAdapterAddress), "kafka_adapter_address", defaultKafkaadapteraddress, "Kafka - Adapter messaging address")
+ flag.StringVar(&(so.KafkaClusterAddress),
+ "kafka_cluster_address",
+ defaultKafkaclusteraddress,
+ "Kafka - Cluster messaging address")
- flag.StringVar(&(so.KafkaClusterAddress), "kafka_cluster_address", defaultKafkaclusteraddress, "Kafka - Cluster messaging address")
+ flag.StringVar(&(so.EventTopic),
+ "event_topic",
+ defaultEventtopic,
+ "Event topic")
- flag.StringVar(&(so.Topic), "adapter_topic", defaultTopic, "Open OLT topic")
+ flag.StringVar(&(so.KVStoreType),
+ "kv_store_type",
+ defaultKvstoretype,
+ "KV store type")
- flag.StringVar(&(so.CoreTopic), "core_topic", defaultCoretopic, "Core topic")
+ flag.DurationVar(&(so.KVStoreTimeout),
+ "kv_store_request_timeout",
+ defaultKvstoretimeout,
+ "The default timeout when making a kv store request")
- flag.StringVar(&(so.EventTopic), "event_topic", defaultEventtopic, "Event topic")
+ flag.StringVar(&(so.KVStoreAddress),
+ "kv_store_address",
+ defaultKvstoreaddress,
+ "KV store address")
- flag.StringVar(&(so.KVStoreType), "kv_store_type", defaultKvstoretype, "KV store type")
+ flag.StringVar(&(so.LogLevel),
+ "log_level",
+ defaultLoglevel,
+ "Log level")
- flag.DurationVar(&(so.KVStoreTimeout), "kv_store_request_timeout", defaultKvstoretimeout, "The default timeout when making a kv store request")
+ flag.IntVar(&(so.OnuNumber),
+ "onu_number",
+ defaultOnunumber,
+ "Number of ONUs")
- flag.StringVar(&(so.KVStoreAddress), "kv_store_address", defaultKvstoreaddress, "KV store address")
+ flag.BoolVar(&(so.Banner),
+ "banner",
+ defaultBanner,
+ "Show startup banner log lines")
- flag.StringVar(&(so.LogLevel), "log_level", defaultLoglevel, "Log level")
+ flag.BoolVar(&(so.DisplayVersionOnly),
+ "version",
+ defaultDisplayVersionOnly,
+ "Show version information and exit")
- flag.IntVar(&(so.OnuNumber), "onu_number", defaultOnunumber, "Number of ONUs")
+ flag.StringVar(&(so.ProbeAddress),
+ "probe_address",
+ defaultProbeAddress,
+ "The address on which to listen to answer liveness and readiness probe queries over HTTP.")
- flag.BoolVar(&(so.Banner), "banner", defaultBanner, "Show startup banner log lines")
+ flag.DurationVar(&(so.LiveProbeInterval),
+ "live_probe_interval",
+ defaultLiveProbeInterval,
+ "Number of seconds for the default liveliness check")
- flag.BoolVar(&(so.DisplayVersionOnly), "version", defaultDisplayVersionOnly, "Show version information and exit")
+ flag.DurationVar(&(so.NotLiveProbeInterval),
+ "not_live_probe_interval",
+ defaultNotLiveProbeInterval,
+ "Number of seconds for liveliness check if probe is not running")
- flag.StringVar(&(so.ProbeAddress), "probe_address", defaultProbeAddress, "The address on which to listen to answer liveness and readiness probe queries over HTTP.")
+ flag.DurationVar(&(so.HeartbeatCheckInterval),
+ "heartbeat_check_interval",
+ defaultHeartbeatCheckInterval,
+ "Number of seconds for heartbeat check interval")
- flag.DurationVar(&(so.LiveProbeInterval), "live_probe_interval", defaultLiveProbeInterval, "Number of seconds for the default liveliness check")
+ flag.DurationVar(&(so.HeartbeatFailReportInterval),
+ "heartbeat_fail_interval",
+ defaultHeartbeatFailReportInterval,
+ "Number of seconds adapter has to wait before reporting core on the heartbeat check failure")
- flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, "Number of seconds for liveliness check if probe is not running")
+ flag.DurationVar(&(so.GrpcTimeoutInterval),
+ "grpc_timeout_interval",
+ defaultGrpcTimeoutInterval,
+ "Number of seconds for GRPC timeout")
- flag.DurationVar(&(so.HeartbeatCheckInterval), "heartbeat_check_interval", defaultHeartbeatCheckInterval, "Number of seconds for heartbeat check interval")
+ flag.IntVar(&(so.CurrentReplica),
+ "current_replica",
+ defaultCurrentReplica,
+ "Replica number of this particular instance")
- flag.DurationVar(&(so.HeartbeatFailReportInterval), "heartbeat_fail_interval", defaultHeartbeatFailReportInterval, "Number of seconds adapter has to wait before reporting core on the heartbeat check failure")
+ flag.IntVar(&(so.TotalReplicas),
+ "total_replica",
+ defaultTotalReplicas,
+ "Total number of instances for this adapter")
- flag.DurationVar(&(so.GrpcTimeoutInterval), "grpc_timeout_interval", defaultGrpcTimeoutInterval, "Number of seconds for GRPC timeout")
+ flag.BoolVar(&(so.TraceEnabled),
+ "trace_enabled",
+ defaultTraceEnabled,
+ "Whether to send logs to tracing agent?")
- flag.IntVar(&(so.CurrentReplica), "current_replica", defaultCurrentReplica, "Replica number of this particular instance")
+ flag.StringVar(&(so.TraceAgentAddress),
+ "trace_agent_address",
+ defaultTraceAgentAddress,
+ "The address of tracing agent to which span info should be sent")
- flag.IntVar(&(so.TotalReplicas), "total_replica", defaultTotalReplicas, "Total number of instances for this adapter")
+ flag.BoolVar(&(so.LogCorrelationEnabled),
+ "log_correlation_enabled",
+ defaultLogCorrelationEnabled,
+ "Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
- flag.BoolVar(&(so.TraceEnabled), "trace_enabled", defaultTraceEnabled, "Whether to send logs to tracing agent?")
+ flag.BoolVar(&(so.OmccEncryption),
+ "omcc_encryption",
+ defaultOmccEncryption,
+ "OMCI Channel encryption status")
- flag.StringVar(&(so.TraceAgentAddress), "trace_agent_address", defaultTraceAgentAddress, "The address of tracing agent to which span info should be sent")
+ flag.BoolVar(&(so.EnableONUStats),
+ "enable_onu_stats",
+ defaultEnableONUStats,
+ "Enable ONU Statistics")
- flag.BoolVar(&(so.LogCorrelationEnabled), "log_correlation_enabled", defaultLogCorrelationEnabled, "Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
+ flag.BoolVar(&(so.EnableGEMStats),
+ "enable_gem_stats",
+ defaultEnableGEMStats,
+ "Enable GEM Statistics")
- flag.BoolVar(&(so.OmccEncryption), "omcc_encryption", defaultOmccEncryption, "OMCI Channel encryption status")
+ flag.StringVar(&(so.GrpcAddress),
+ "grpc_address",
+ defaultGrpcAddress,
+ "Adapter GRPC Server address")
- flag.BoolVar(&(so.EnableONUStats), "enable_onu_stats", defaultEnableONUStats, "Enable ONU Statistics")
+ flag.StringVar(&(so.CoreEndpoint),
+ "core_endpoint",
+ defaultCoreEndpoint,
+ "Core endpoint")
- flag.BoolVar(&(so.EnableGEMStats), "enable_gem_stats", defaultEnableGEMStats, "Enable GEM Statistics")
+ flag.StringVar(&(so.AdapterEndpoint),
+ "adapter_endpoint",
+ defaultAdapterEndpoint,
+ "Adapter Endpoint")
+
+ flag.DurationVar(&(so.RPCTimeout),
+ "rpc_timeout",
+ defaultRPCTimeout,
+ "The default timeout when making an RPC request")
+
+ flag.DurationVar(&(so.MinBackoffRetryDelay),
+ "min_retry_delay",
+ defaultMinBackoffRetryDelay,
+ "The minimum number of milliseconds to delay before a connection retry attempt")
+
+ flag.DurationVar(&(so.MaxBackoffRetryDelay),
+ "max_retry_delay",
+ defaultMaxBackoffRetryDelay,
+ "The maximum number of milliseconds to delay before a connection retry attempt")
flag.Parse()
containerName := getContainerInfo()