[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index b61a909..aff1de3 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -19,73 +19,24 @@
import (
"flag"
- "fmt"
"os"
"time"
)
// Open ONU default constants
const (
- etcdStoreName = "etcd"
- defaultInstanceid = "openonu"
- defaultKafkaadapteraddress = "127.0.0.1:9092"
- defaultKafkaclusteraddress = "127.0.0.1:9092"
- defaultKvstoretype = etcdStoreName
- defaultKvstoretimeout = 5 * time.Second
- defaultKvstoreaddress = "127.0.0.1:2379"
- defaultLoglevel = "WARN"
- defaultBanner = false
- defaultDisplayVersionOnly = false
- defaultAccIncrEvto = false
- defaultTopic = "openonu"
- defaultCoreTopic = "rwcore"
- defaultEventTopic = "voltha.events"
- defaultOnunumber = 1
- defaultProbeHost = ""
- defaultProbePort = 8080
- defaultLiveProbeInterval = 60 * time.Second
- defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
- //defaultHearbeatFailReportInterval is the time in seconds the adapter will keep checking the hardware for heartbeat.
- defaultHearbeatCheckInterval = 30 * time.Second
- // defaultHearbeatFailReportInterval is the time adapter will wait before updating the state to the core.
- defaultHearbeatFailReportInterval = 180 * time.Second
- //defaultKafkaReconnectRetries -1: reconnect endlessly.
- defaultKafkaReconnectRetries = -1
- defaultCurrentReplica = 1
- defaultTotalReplicas = 1
- defaultMaxTimeoutInterAdapterComm = 30 * time.Second
- defaultMaxTimeoutReconciling = 10 * time.Second
- defaultOnuVendorIds = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG"
-
- // For Tracing
- defaultTraceEnabled = false
- defaultTraceAgentAddress = "127.0.0.1:6831"
- defaultLogCorrelationEnabled = true
-
- defaultMetricsEnabled = false
- defaultMibAuditInterval = 0
- defaultAlarmAuditInterval = 300 * time.Second
-
- defaultOmciTimeout = 3 * time.Second
- defaultDlToAdapterTimeout = 10 * time.Second
- defaultDlToOnuTimeoutPer4MB = 60 * time.Minute //assumed for 4 MB of the image
- //Mask to indicate which possibly active ONU UNI state is really reported to the core
- // compare python code - at the moment restrict active state to the first ONU UNI port
- // check is limited to max 16 uni ports - cmp above UNI limit!!!
- defaultUniPortMask = 0x0001
+ EtcdStoreName = "etcd"
+ OnuVendorIds = "OPEN,ALCL,BRCM,TWSH,ALPH,ISKT,SFAA,BBSM,SCOM,ARPX,DACM,ERSN,HWTC,CIGG,ADTN,ARCA,AVMG"
)
// AdapterFlags represents the set of configurations used by the read-write adaptercore service
type AdapterFlags struct {
// Command line parameters
InstanceID string
- KafkaAdapterAddress string
KafkaClusterAddress string // NOTE this is unused across the adapter
KVStoreType string
KVStoreTimeout time.Duration
KVStoreAddress string
- Topic string
- CoreTopic string
EventTopic string
LogLevel string
OnuNumber int
@@ -114,169 +65,220 @@
DownloadToAdapterTimeout time.Duration
DownloadToOnuTimeout4MB time.Duration
UniPortMask int
-}
-
-// NewAdapterFlags returns a new RWCore config
-func NewAdapterFlags() *AdapterFlags {
- var adapterFlags = AdapterFlags{ // Default values
- InstanceID: defaultInstanceid,
- KafkaAdapterAddress: defaultKafkaadapteraddress,
- KafkaClusterAddress: defaultKafkaclusteraddress,
- KVStoreType: defaultKvstoretype,
- KVStoreTimeout: defaultKvstoretimeout,
- KVStoreAddress: defaultKvstoreaddress,
- Topic: defaultTopic,
- CoreTopic: defaultCoreTopic,
- EventTopic: defaultEventTopic,
- LogLevel: defaultLoglevel,
- OnuNumber: defaultOnunumber,
- Banner: defaultBanner,
- DisplayVersionOnly: defaultDisplayVersionOnly,
- AccIncrEvto: defaultAccIncrEvto,
- ProbeHost: defaultProbeHost,
- ProbePort: defaultProbePort,
- LiveProbeInterval: defaultLiveProbeInterval,
- NotLiveProbeInterval: defaultNotLiveProbeInterval,
- HeartbeatCheckInterval: defaultHearbeatCheckInterval,
- HeartbeatFailReportInterval: defaultHearbeatFailReportInterval,
- KafkaReconnectRetries: defaultKafkaReconnectRetries,
- CurrentReplica: defaultCurrentReplica,
- TotalReplicas: defaultTotalReplicas,
- MaxTimeoutInterAdapterComm: defaultMaxTimeoutInterAdapterComm,
- MaxTimeoutReconciling: defaultMaxTimeoutReconciling,
- TraceEnabled: defaultTraceEnabled,
- TraceAgentAddress: defaultTraceAgentAddress,
- LogCorrelationEnabled: defaultLogCorrelationEnabled,
- OnuVendorIds: defaultOnuVendorIds,
- MetricsEnabled: defaultMetricsEnabled,
- MibAuditInterval: defaultMibAuditInterval,
- AlarmAuditInterval: defaultAlarmAuditInterval,
- OmciTimeout: defaultOmciTimeout,
- DownloadToAdapterTimeout: defaultDlToAdapterTimeout,
- DownloadToOnuTimeout4MB: defaultDlToOnuTimeoutPer4MB,
- UniPortMask: defaultUniPortMask,
- }
- return &adapterFlags
+ MinBackoffRetryDelay time.Duration
+ MaxBackoffRetryDelay time.Duration
+ AdapterEndpoint string
+ GrpcAddress string
+ CoreEndpoint string
+ RPCTimeout time.Duration
}
// ParseCommandArguments parses the arguments when running read-write adaptercore service
-func (so *AdapterFlags) ParseCommandArguments() {
+func (so *AdapterFlags) ParseCommandArguments(args []string) {
- help := "Kafka - Adapter messaging address"
- flag.StringVar(&(so.KafkaAdapterAddress), "kafka_adapter_address", defaultKafkaadapteraddress, help)
+ fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
- help = "Kafka - Cluster messaging address"
- flag.StringVar(&(so.KafkaClusterAddress), "kafka_cluster_address", defaultKafkaclusteraddress, help)
+ fs.StringVar(&(so.KafkaClusterAddress),
+ "kafka_cluster_address",
+ "127.0.0.1:9092",
+ "Kafka - Cluster messaging address")
- help = "Open ONU topic"
- baseAdapterTopic := flag.String("adapter_topic", defaultTopic, help)
+ fs.StringVar(&(so.EventTopic),
+ "event_topic",
+ "voltha.events",
+ "Event topic")
- help = "Core topic"
- flag.StringVar(&(so.CoreTopic), "core_topic", defaultCoreTopic, help)
+ fs.StringVar(&(so.KVStoreType),
+ "kv_store_type",
+ EtcdStoreName,
+ "KV store type")
- help = "Event topic"
- flag.StringVar(&(so.EventTopic), "event_topic", defaultEventTopic, help)
+ fs.DurationVar(&(so.KVStoreTimeout),
+ "kv_store_request_timeout",
+ 5*time.Second,
+ "The default timeout when making a kv store request")
- help = "KV store type"
- flag.StringVar(&(so.KVStoreType), "kv_store_type", defaultKvstoretype, help)
+ fs.StringVar(&(so.KVStoreAddress),
+ "kv_store_address",
+ "127.0.0.1:2379",
+ "KV store address")
- help = "The default timeout when making a kv store request"
- flag.DurationVar(&(so.KVStoreTimeout), "kv_store_request_timeout", defaultKvstoretimeout, help)
+ fs.StringVar(&(so.LogLevel),
+ "log_level",
+ "WARN",
+ "Log level")
- help = "KV store address"
- flag.StringVar(&(so.KVStoreAddress), "kv_store_address", defaultKvstoreaddress, help)
+ fs.IntVar(&(so.OnuNumber),
+ "onu_number",
+ 1,
+ "Number of ONUs")
- help = "Log level"
- flag.StringVar(&(so.LogLevel), "log_level", defaultLoglevel, help)
+ fs.BoolVar(&(so.Banner),
+ "banner",
+ false,
+ "Show startup banner log lines")
- help = "Number of ONUs"
- flag.IntVar(&(so.OnuNumber), "onu_number", defaultOnunumber, help)
+ fs.BoolVar(&(so.DisplayVersionOnly),
+ "version",
+ false,
+ "Show version information and exit")
- help = "Show startup banner log lines"
- flag.BoolVar(&(so.Banner), "banner", defaultBanner, help)
+ fs.BoolVar(&(so.AccIncrEvto),
+ "accept_incr_evto",
+ false,
+ "Acceptance of incremental EVTOCD configuration")
- help = "Show version information and exit"
- flag.BoolVar(&(so.DisplayVersionOnly), "version", defaultDisplayVersionOnly, help)
+ fs.StringVar(&(so.ProbeHost),
+ "probe_host",
+ "",
+ "The address on which to listen to answer liveness and readiness probe queries over HTTP")
- help = "Acceptance of incremental EVTOCD configuration"
- flag.BoolVar(&(so.AccIncrEvto), "accept_incr_evto", defaultAccIncrEvto, help)
+ fs.IntVar(&(so.ProbePort),
+ "probe_port",
+ 8080,
+ "The port on which to listen to answer liveness and readiness probe queries over HTTP")
- help = "The address on which to listen to answer liveness and readiness probe queries over HTTP"
- flag.StringVar(&(so.ProbeHost), "probe_host", defaultProbeHost, help)
+ fs.DurationVar(&(so.LiveProbeInterval),
+ "live_probe_interval",
+ 60*time.Second,
+ "Number of seconds for the default liveliness check")
- help = "The port on which to listen to answer liveness and readiness probe queries over HTTP"
- flag.IntVar(&(so.ProbePort), "probe_port", defaultProbePort, help)
+ fs.DurationVar(&(so.NotLiveProbeInterval),
+ "not_live_probe_interval",
+ 60*time.Second,
+ "Number of seconds for liveliness check if probe is not running")
- help = "Number of seconds for the default liveliness check"
- flag.DurationVar(&(so.LiveProbeInterval), "live_probe_interval", defaultLiveProbeInterval, help)
+ fs.DurationVar(&(so.HeartbeatCheckInterval),
+ "hearbeat_check_interval",
+ 30*time.Second,
+ "Number of seconds for heartbeat check interval")
- help = "Number of seconds for liveliness check if probe is not running"
- flag.DurationVar(&(so.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, help)
+ fs.DurationVar(&(so.HeartbeatFailReportInterval),
+ "hearbeat_fail_interval",
+ 30*time.Second,
+ "Number of seconds adapter has to wait before reporting core on the hearbeat check failure")
- help = "Number of seconds for heartbeat check interval"
- flag.DurationVar(&(so.HeartbeatCheckInterval), "hearbeat_check_interval", defaultHearbeatCheckInterval, help)
+ fs.IntVar(&(so.KafkaReconnectRetries),
+ "kafka_reconnect_retries",
+ -1,
+ "Number of retries to connect to Kafka")
- help = "Number of seconds adapter has to wait before reporting core on the hearbeat check failure"
- flag.DurationVar(&(so.HeartbeatFailReportInterval), "hearbeat_fail_interval", defaultHearbeatFailReportInterval, help)
+ fs.IntVar(&(so.CurrentReplica),
+ "current_replica",
+ 1,
+ "Replica number of this particular instance")
- help = "Number of retries to connect to Kafka"
- flag.IntVar(&(so.KafkaReconnectRetries), "kafka_reconnect_retries", defaultKafkaReconnectRetries, help)
+ fs.IntVar(&(so.TotalReplicas),
+ "total_replica",
+ 1,
+ "Total number of instances for this adapter")
- help = "Replica number of this particular instance"
- flag.IntVar(&(so.CurrentReplica), "current_replica", defaultCurrentReplica, help)
+ fs.DurationVar(&(so.MaxTimeoutInterAdapterComm),
+ "max_timeout_interadapter_comm",
+ 30*time.Second,
+ "Maximum Number of seconds for the default interadapter communication timeout")
- help = "Total number of instances for this adapter"
- flag.IntVar(&(so.TotalReplicas), "total_replica", defaultTotalReplicas, help)
+ fs.DurationVar(&(so.MaxTimeoutReconciling),
+ "max_timeout_reconciling",
+ 10*time.Second,
+ "Maximum Number of seconds for the default ONU reconciling timeout")
- help = "Maximum Number of seconds for the default interadapter communication timeout"
- flag.DurationVar(&(so.MaxTimeoutInterAdapterComm), "max_timeout_interadapter_comm",
- defaultMaxTimeoutInterAdapterComm, help)
+ fs.BoolVar(&(so.TraceEnabled),
+ "trace_enabled",
+ false,
+ "Whether to send logs to tracing agent")
- help = "Maximum Number of seconds for the default ONU reconciling timeout"
- flag.DurationVar(&(so.MaxTimeoutReconciling), "max_timeout_reconciling",
- defaultMaxTimeoutReconciling, help)
+ fs.StringVar(&(so.TraceAgentAddress),
+ "trace_agent_address",
+ "127.0.0.1:6831",
+ "The address of tracing agent to which span info should be sent")
- help = "Whether to send logs to tracing agent"
- flag.BoolVar(&(so.TraceEnabled), "trace_enabled", defaultTraceEnabled, help)
+ fs.BoolVar(&(so.LogCorrelationEnabled),
+ "log_correlation_enabled",
+ true,
+ "Whether to enrich log statements with fields denoting operation being executed for achieving correlation")
- help = "The address of tracing agent to which span info should be sent"
- flag.StringVar(&(so.TraceAgentAddress), "trace_agent_address", defaultTraceAgentAddress, help)
+ fs.StringVar(&(so.OnuVendorIds),
+ "allowed_onu_vendors",
+ OnuVendorIds,
+ "List of Allowed ONU Vendor Ids")
- help = "Whether to enrich log statements with fields denoting operation being executed for achieving correlation"
- flag.BoolVar(&(so.LogCorrelationEnabled), "log_correlation_enabled", defaultLogCorrelationEnabled, help)
+ fs.BoolVar(&(so.MetricsEnabled),
+ "metrics_enabled",
+ false,
+ "Whether to enable metrics collection")
- help = "List of Allowed ONU Vendor Ids"
- flag.StringVar(&(so.OnuVendorIds), "allowed_onu_vendors", defaultOnuVendorIds, help)
+ fs.DurationVar(&(so.MibAuditInterval),
+ "mib_audit_interval",
+ 300*time.Second,
+ "Mib Audit Interval in seconds - the value zero will disable Mib Audit")
- help = "Whether to enable metrics collection"
- flag.BoolVar(&(so.MetricsEnabled), "metrics_enabled", defaultMetricsEnabled, help)
+ fs.DurationVar(&(so.OmciTimeout),
+ "omci_timeout",
+ 3*time.Second,
+ "OMCI timeout duration - this timeout value is used on the OMCI channel for waiting on response from ONU")
- help = "Mib Audit Interval in seconds - the value zero will disable Mib Audit"
- flag.DurationVar(&(so.MibAuditInterval), "mib_audit_interval", defaultMibAuditInterval, help)
+ fs.DurationVar(&(so.AlarmAuditInterval),
+ "alarm_audit_interval",
+ 300*time.Second,
+ "Alarm Audit Interval in seconds - the value zero will disable alarm audit")
- help = "OMCI timeout duration - this timeout value is used on the OMCI channel for waiting on response from ONU"
- flag.DurationVar(&(so.OmciTimeout), "omci_timeout", defaultOmciTimeout, help)
+ fs.DurationVar(&(so.DownloadToAdapterTimeout),
+ "download_to_adapter_timeout",
+ 10*time.Second,
+ "File download to adapter timeout in seconds")
- help = "Alarm Audit Interval in seconds - the value zero will disable alarm audit"
- flag.DurationVar(&(so.AlarmAuditInterval), "alarm_audit_interval", defaultAlarmAuditInterval, help)
+ fs.DurationVar(&(so.DownloadToOnuTimeout4MB),
+ "download_to_onu_timeout_4MB",
+ 60*time.Minute,
+ "File download to ONU timeout in minutes for a block of 4MB")
- help = "File download to adapter timeout in seconds"
- flag.DurationVar(&(so.DownloadToAdapterTimeout), "download_to_adapter_timeout", defaultDlToAdapterTimeout, help)
+ //Mask to indicate which possibly active ONU UNI state is really reported to the core
+ // compare python code - at the moment restrict active state to the first ONU UNI port
+ // check is limited to max 16 uni ports - cmp above UNI limit!!!
+ fs.IntVar(&(so.UniPortMask),
+ "uni_port_mask",
+ 0x0001,
+ "The bitmask to identify UNI ports that need to be enabled")
- help = "File download to ONU timeout in minutes for a block of 4MB"
- flag.DurationVar(&(so.DownloadToOnuTimeout4MB), "download_to_onu_timeout_4MB", defaultDlToOnuTimeoutPer4MB, help)
+ fs.StringVar(&(so.GrpcAddress),
+ "grpc_address",
+ ":50060",
+ "Adapter GRPC Server address")
- help = "The bitmask to identify UNI ports that need to be enabled"
- flag.IntVar(&(so.UniPortMask), "uni_port_mask", defaultUniPortMask, help)
+ fs.StringVar(&(so.CoreEndpoint),
+ "core_endpoint",
+ ":55555",
+ "Core endpoint")
- flag.Parse()
+ fs.StringVar(&(so.AdapterEndpoint),
+ "adapter_endpoint",
+ "",
+ "Adapter Endpoint")
+
+ fs.DurationVar(&(so.RPCTimeout),
+ "rpc_timeout",
+ 10*time.Second,
+ "The default timeout when making an RPC request")
+
+ fs.DurationVar(&(so.MinBackoffRetryDelay),
+ "min_retry_delay",
+ 500*time.Millisecond,
+ "The minimum number of milliseconds to delay before a connection retry attempt")
+
+ fs.DurationVar(&(so.MaxBackoffRetryDelay),
+ "max_retry_delay",
+ 10*time.Second,
+ "The maximum number of milliseconds to delay before a connection retry attempt")
+
+ _ = fs.Parse(args)
containerName := getContainerInfo()
if len(containerName) > 0 {
so.InstanceID = containerName
+ } else {
+ so.InstanceID = "openonu"
}
- so.Topic = fmt.Sprintf("%s_%d", *baseAdapterTopic, int32(so.CurrentReplica))
-
}
func getContainerInfo() string {