[VOL-4292] OpenOLT Adapter changes for gRPC migration

Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index a08e73d..576ae72 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -27,16 +27,14 @@
 const (
 	EtcdStoreName               = "etcd"
 	defaultInstanceid           = "openOlt001"
-	defaultKafkaadapteraddress  = "127.0.0.1:9092"
-	defaultKafkaclusteraddress  = "127.0.0.1:9094"
+	defaultKafkaclusteraddress  = "127.0.0.1:9092"
 	defaultKvstoretype          = EtcdStoreName
 	defaultKvstoretimeout       = 5 * time.Second
+	defaultRPCTimeout           = 10 * time.Second
 	defaultKvstoreaddress       = "127.0.0.1:2379" // Port: Consul = 8500; Etcd = 2379
 	defaultLoglevel             = "WARN"
 	defaultBanner               = false
 	defaultDisplayVersionOnly   = false
-	defaultTopic                = "openolt"
-	defaultCoretopic            = "rwcore"
 	defaultEventtopic           = "voltha.events"
 	defaultOnunumber            = 1
 	defaultProbeAddress         = ":8080"
@@ -46,6 +44,8 @@
 	defaultHeartbeatCheckInterval = 15 * time.Second
 	// defaultHeartbeatFailReportInterval is the time adapter will wait before updating the state to the core.
 	defaultHeartbeatFailReportInterval = 0 * time.Second
+	defaultGrpcAddress                 = ":50060"
+	defaultCoreEndpoint                = ":55555"
 	//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
 	defaultGrpcTimeoutInterval   = 2 * time.Second
 	defaultCurrentReplica        = 1
@@ -56,6 +56,9 @@
 	defaultOmccEncryption        = false
 	defaultEnableONUStats        = false
 	defaultEnableGEMStats        = false
+	defaultMinBackoffRetryDelay  = 500 * time.Millisecond
+	defaultMaxBackoffRetryDelay  = 10 * time.Second
+	defaultAdapterEndpoint       = "adapter-open-olt"
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -63,13 +66,11 @@
 	// Command line parameters
 	AdapterName                 string
 	InstanceID                  string // NOTE what am I used for? why not cli but only ENV? TODO expose in the chart
-	KafkaAdapterAddress         string
 	KafkaClusterAddress         string
 	KVStoreType                 string
 	KVStoreTimeout              time.Duration
 	KVStoreAddress              string
-	Topic                       string
-	CoreTopic                   string
+	RPCTimeout                  time.Duration
 	EventTopic                  string
 	LogLevel                    string
 	OnuNumber                   int
@@ -81,6 +82,8 @@
 	HeartbeatCheckInterval      time.Duration
 	HeartbeatFailReportInterval time.Duration
 	GrpcTimeoutInterval         time.Duration
+	GrpcAddress                 string
+	CoreEndpoint                string
 	CurrentReplica              int
 	TotalReplicas               int
 	TraceEnabled                bool
@@ -89,19 +92,19 @@
 	OmccEncryption              bool
 	EnableONUStats              bool
 	EnableGEMStats              bool
+	MinBackoffRetryDelay        time.Duration
+	MaxBackoffRetryDelay        time.Duration
+	AdapterEndpoint             string
 }
 
 // NewAdapterFlags returns a new RWCore config
 func NewAdapterFlags() *AdapterFlags {
 	var adapterFlags = AdapterFlags{ // Default values
 		InstanceID:                  defaultInstanceid,
-		KafkaAdapterAddress:         defaultKafkaadapteraddress,
 		KafkaClusterAddress:         defaultKafkaclusteraddress,
 		KVStoreType:                 defaultKvstoretype,
 		KVStoreTimeout:              defaultKvstoretimeout,
 		KVStoreAddress:              defaultKvstoreaddress,
-		Topic:                       defaultTopic,
-		CoreTopic:                   defaultCoretopic,
 		EventTopic:                  defaultEventtopic,
 		LogLevel:                    defaultLoglevel,
 		OnuNumber:                   defaultOnunumber,
@@ -112,6 +115,8 @@
 		NotLiveProbeInterval:        defaultNotLiveProbeInterval,
 		HeartbeatCheckInterval:      defaultHeartbeatCheckInterval,
 		HeartbeatFailReportInterval: defaultHeartbeatFailReportInterval,
+		GrpcAddress:                 defaultGrpcAddress,
+		CoreEndpoint:                defaultCoreEndpoint,
 		GrpcTimeoutInterval:         defaultGrpcTimeoutInterval,
 		TraceEnabled:                defaultTraceEnabled,
 		TraceAgentAddress:           defaultTraceAgentAddress,
@@ -119,6 +124,9 @@
 		OmccEncryption:              defaultOmccEncryption,
 		EnableONUStats:              defaultEnableONUStats,
 		EnableGEMStats:              defaultEnableGEMStats,
+		RPCTimeout:                  defaultRPCTimeout,
+		MinBackoffRetryDelay:        defaultMinBackoffRetryDelay,
+		MaxBackoffRetryDelay:        defaultMaxBackoffRetryDelay,
 	}
 	return &adapterFlags
 }
@@ -126,57 +134,150 @@
 // ParseCommandArguments parses the arguments when running read-write adaptercore service
 func (so *AdapterFlags) ParseCommandArguments() {
 
-	flag.StringVar(&(so.KafkaAdapterAddress), "kafka_adapter_address", defaultKafkaadapteraddress, "Kafka - Adapter messaging address")
+	flag.StringVar(&(so.KafkaClusterAddress),
+		"kafka_cluster_address",
+		defaultKafkaclusteraddress,
+		"Kafka - Cluster messaging address")
 
-	flag.StringVar(&(so.KafkaClusterAddress), "kafka_cluster_address", defaultKafkaclusteraddress, "Kafka - Cluster messaging address")
+	flag.StringVar(&(so.EventTopic),
+		"event_topic",
+		defaultEventtopic,
+		"Event topic")
 
-	flag.StringVar(&(so.Topic), "adapter_topic", defaultTopic, "Open OLT topic")
+	flag.StringVar(&(so.KVStoreType),
+		"kv_store_type",
+		defaultKvstoretype,
+		"KV store type")
 
-	flag.StringVar(&(so.CoreTopic), "core_topic", defaultCoretopic, "Core topic")
+	flag.DurationVar(&(so.KVStoreTimeout),
+		"kv_store_request_timeout",
+		defaultKvstoretimeout,
+		"The default timeout when making a kv store request")
 
-	flag.StringVar(&(so.EventTopic), "event_topic", defaultEventtopic, "Event topic")
+	flag.StringVar(&(so.KVStoreAddress),
+		"kv_store_address",
+		defaultKvstoreaddress,
+		"KV store address")
 
-	flag.StringVar(&(so.KVStoreType), "kv_store_type", defaultKvstoretype, "KV store type")
+	flag.StringVar(&(so.LogLevel),
+		"log_level",
+		defaultLoglevel,
+		"Log level")
 
-	flag.DurationVar(&(so.KVStoreTimeout), "kv_store_request_timeout", defaultKvstoretimeout, "The default timeout when making a kv store request")
+	flag.IntVar(&(so.OnuNumber),
+		"onu_number",
+		defaultOnunumber,
+		"Number of ONUs")
 
-	flag.StringVar(&(so.KVStoreAddress), "kv_store_address", defaultKvstoreaddress, "KV store address")
+	flag.BoolVar(&(so.Banner),
+		"banner",
+		defaultBanner,
+		"Show startup banner log lines")
 
-	flag.StringVar(&(so.LogLevel), "log_level", defaultLoglevel, "Log level")
+	flag.BoolVar(&(so.DisplayVersionOnly),
+		"version",
+		defaultDisplayVersionOnly,
+		"Show version information and exit")
 
-	flag.IntVar(&(so.OnuNumber), "onu_number", defaultOnunumber, "Number of ONUs")
+	flag.StringVar(&(so.ProbeAddress),
+		"probe_address",
+		defaultProbeAddress,
+		"The address on which to listen to answer liveness and readiness probe queries over HTTP.")
 
-	flag.BoolVar(&(so.Banner), "banner", defaultBanner, "Show startup banner log lines")
+	flag.DurationVar(&(so.LiveProbeInterval),
+		"live_probe_interval",
+		defaultLiveProbeInterval,
+		"Number of seconds for the default liveliness check")
 
-	flag.BoolVar(&(so.DisplayVersionOnly), "version", defaultDisplayVersionOnly, "Show version information and exit")
+	flag.DurationVar(&(so.NotLiveProbeInterval),
+		"not_live_probe_interval",
+		defaultNotLiveProbeInterval,
+		"Number of seconds for liveliness check if probe is not running")
 
-	flag.StringVar(&(so.ProbeAddress), "probe_address", defaultProbeAddress, "The address on which to listen to answer liveness and readiness probe queries over HTTP.")
+	flag.DurationVar(&(so.HeartbeatCheckInterval),
+		"heartbeat_check_interval",
+		defaultHeartbeatCheckInterval,
+		"Number of seconds for heartbeat check interval")
 
-	flag.DurationVar(&(so.LiveProbeInterval), "live_probe_interval", defaultLiveProbeInterval, "Number of seconds for the default liveliness check")
+	flag.DurationVar(&(so.HeartbeatFailReportInterval),
+		"heartbeat_fail_interval",
+		defaultHeartbeatFailReportInterval,
+		"Number of seconds adapter has to wait before reporting core on the heartbeat check failure")
 
-	flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, "Number of seconds for liveliness check if probe is not running")
+	flag.DurationVar(&(so.GrpcTimeoutInterval),
+		"grpc_timeout_interval",
+		defaultGrpcTimeoutInterval,
+		"Number of seconds for GRPC timeout")
 
-	flag.DurationVar(&(so.HeartbeatCheckInterval), "heartbeat_check_interval", defaultHeartbeatCheckInterval, "Number of seconds for heartbeat check interval")
+	flag.IntVar(&(so.CurrentReplica),
+		"current_replica",
+		defaultCurrentReplica,
+		"Replica number of this particular instance")
 
-	flag.DurationVar(&(so.HeartbeatFailReportInterval), "heartbeat_fail_interval", defaultHeartbeatFailReportInterval, "Number of seconds adapter has to wait before reporting core on the heartbeat check failure")
+	flag.IntVar(&(so.TotalReplicas),
+		"total_replica",
+		defaultTotalReplicas,
+		"Total number of instances for this adapter")
 
-	flag.DurationVar(&(so.GrpcTimeoutInterval), "grpc_timeout_interval", defaultGrpcTimeoutInterval, "Number of seconds for GRPC timeout")
+	flag.BoolVar(&(so.TraceEnabled),
+		"trace_enabled",
+		defaultTraceEnabled,
+		"Whether to send logs to tracing agent?")
 
-	flag.IntVar(&(so.CurrentReplica), "current_replica", defaultCurrentReplica, "Replica number of this particular instance")
+	flag.StringVar(&(so.TraceAgentAddress),
+		"trace_agent_address",
+		defaultTraceAgentAddress,
+		"The address of tracing agent to which span info should be sent")
 
-	flag.IntVar(&(so.TotalReplicas), "total_replica", defaultTotalReplicas, "Total number of instances for this adapter")
+	flag.BoolVar(&(so.LogCorrelationEnabled),
+		"log_correlation_enabled",
+		defaultLogCorrelationEnabled,
+		"Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
 
-	flag.BoolVar(&(so.TraceEnabled), "trace_enabled", defaultTraceEnabled, "Whether to send logs to tracing agent?")
+	flag.BoolVar(&(so.OmccEncryption),
+		"omcc_encryption",
+		defaultOmccEncryption,
+		"OMCI Channel encryption status")
 
-	flag.StringVar(&(so.TraceAgentAddress), "trace_agent_address", defaultTraceAgentAddress, "The address of tracing agent to which span info should be sent")
+	flag.BoolVar(&(so.EnableONUStats),
+		"enable_onu_stats",
+		defaultEnableONUStats,
+		"Enable ONU Statistics")
 
-	flag.BoolVar(&(so.LogCorrelationEnabled), "log_correlation_enabled", defaultLogCorrelationEnabled, "Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
+	flag.BoolVar(&(so.EnableGEMStats),
+		"enable_gem_stats",
+		defaultEnableGEMStats,
+		"Enable GEM Statistics")
 
-	flag.BoolVar(&(so.OmccEncryption), "omcc_encryption", defaultOmccEncryption, "OMCI Channel encryption status")
+	flag.StringVar(&(so.GrpcAddress),
+		"grpc_address",
+		defaultGrpcAddress,
+		"Adapter GRPC Server address")
 
-	flag.BoolVar(&(so.EnableONUStats), "enable_onu_stats", defaultEnableONUStats, "Enable ONU Statistics")
+	flag.StringVar(&(so.CoreEndpoint),
+		"core_endpoint",
+		defaultCoreEndpoint,
+		"Core endpoint")
 
-	flag.BoolVar(&(so.EnableGEMStats), "enable_gem_stats", defaultEnableGEMStats, "Enable GEM Statistics")
+	flag.StringVar(&(so.AdapterEndpoint),
+		"adapter_endpoint",
+		defaultAdapterEndpoint,
+		"Adapter Endpoint")
+
+	flag.DurationVar(&(so.RPCTimeout),
+		"rpc_timeout",
+		defaultRPCTimeout,
+		"The default timeout when making an RPC request")
+
+	flag.DurationVar(&(so.MinBackoffRetryDelay),
+		"min_retry_delay",
+		defaultMinBackoffRetryDelay,
+		"The minimum number of milliseconds to delay before a connection retry attempt")
+
+	flag.DurationVar(&(so.MaxBackoffRetryDelay),
+		"max_retry_delay",
+		defaultMaxBackoffRetryDelay,
+		"The maximum number of milliseconds to delay before a connection retry attempt")
 
 	flag.Parse()
 	containerName := getContainerInfo()