[VOL-2356]core log_level command line argument should take log level names not int value
Change-Id: Ifc9ca5aa6d2b40e068e0ecad845a788edad0a3a1
diff --git a/VERSION b/VERSION
index 32d1656..f90b1af 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.3.1-dev
+2.3.2
diff --git a/go.mod b/go.mod
index 10d32db..ef050dc 100644
--- a/go.mod
+++ b/go.mod
@@ -6,8 +6,8 @@
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
- github.com/opencord/voltha-lib-go/v3 v3.0.7
- github.com/opencord/voltha-protos/v3 v3.2.2
+ github.com/opencord/voltha-lib-go/v3 v3.0.12
+ github.com/opencord/voltha-protos/v3 v3.2.3
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
google.golang.org/grpc v1.24.0
diff --git a/go.sum b/go.sum
index 7c9567a..64bb746 100644
--- a/go.sum
+++ b/go.sum
@@ -190,12 +190,10 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.0.7 h1:o0Oujm9rkNWrTCkmTiJyHRxBGzCYhjfElN2IOzif8v4=
-github.com/opencord/voltha-lib-go/v3 v3.0.7/go.mod h1:l/AgBlYqXEiHLHS6NR654Q7m5BfX5YVnrpykf7kOmGw=
-github.com/opencord/voltha-protos/v3 v3.2.1 h1:5CAxtWzHqDMNItBRklDkXN5YwE9b6vuCXr5UKTAuJBg=
-github.com/opencord/voltha-protos/v3 v3.2.1/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
-github.com/opencord/voltha-protos/v3 v3.2.2 h1:Fdg2T6xtUjVZPBWV636/mMH6lYggalQ2e7uFPoOhZDc=
-github.com/opencord/voltha-protos/v3 v3.2.2/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
+github.com/opencord/voltha-lib-go/v3 v3.0.12 h1:YGpyjl0UxdVUpKuo/VO5eCi+1+5EbMK9ZUqjDvWUQWQ=
+github.com/opencord/voltha-lib-go/v3 v3.0.12/go.mod h1:69Y+rVd25Nq2SUeoY7Q1BXtwrcUPllG0erhq+aK8Qec=
+github.com/opencord/voltha-protos/v3 v3.2.3 h1:Wv73mw1Ye0bCfyhOk5svgrlE2tLizHq6tQluoDq9Vg8=
+github.com/opencord/voltha-protos/v3 v3.2.3/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 0ffe807..0c2c3ce 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -41,7 +41,7 @@
defaultKVStorePort = 2379 // Consul = 8500; Etcd = 2379
defaultKVTxnKeyDelTime = 60
defaultKVStoreDataPrefix = "service/voltha"
- defaultLogLevel = 0
+ defaultLogLevel = "DEBUG"
defaultBanner = false
defaultDisplayVersionOnly = false
defaultCoreTopic = "rwcore"
@@ -81,7 +81,7 @@
KVTxnKeyDelTime int
KVStoreDataPrefix string
CoreTopic string
- LogLevel int
+ LogLevel string
Banner bool
DisplayVersionOnly bool
RWCoreKey string
@@ -201,7 +201,7 @@
flag.StringVar(&(cf.KVStoreDataPrefix), "kv_store_data_prefix", defaultKVStoreDataPrefix, help)
help = fmt.Sprintf("Log level")
- flag.IntVar(&(cf.LogLevel), "log_level", defaultLogLevel, help)
+ flag.StringVar(&(cf.LogLevel), "log_level", defaultLogLevel, help)
help = fmt.Sprintf("Timeout for long running request")
flag.Int64Var(&(cf.LongRunningRequestTimeout), "timeout_long_request", defaultLongRunningRequestTimeout, help)
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index c0678a3..27ebb58 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -191,12 +191,12 @@
log.Debugw("UpdateLogLevel-request", log.Fields{"package": logging.PackageName, "intval": int(logging.Level)})
if logging.PackageName == "" {
- log.SetAllLogLevel(int(logging.Level))
- log.SetDefaultLogLevel(int(logging.Level))
+ log.SetAllLogLevel(log.LogLevel(logging.Level))
+ log.SetDefaultLogLevel(log.LogLevel(logging.Level))
} else if logging.PackageName == "default" {
- log.SetDefaultLogLevel(int(logging.Level))
+ log.SetDefaultLogLevel(log.LogLevel(logging.Level))
} else {
- log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
+ log.SetPackageLogLevel(logging.PackageName, log.LogLevel(logging.Level))
}
return &empty.Empty{}, nil
diff --git a/rw_core/main.go b/rw_core/main.go
index 71f64f3..e8a7ea6 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -223,9 +223,16 @@
} else {
log.Fatal("HOSTNAME not set")
}
+
realMain()
+
+ logLevel, err := log.StringToLogLevel(cf.LogLevel)
+ if err != nil {
+ panic(err)
+ }
+
//Setup default logger - applies for packages that do not have specific logger set
- if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": instanceID}); err != nil {
+ if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": instanceID}); err != nil {
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
@@ -235,7 +242,7 @@
}
// Update all loggers to log level specified as input parameter
- log.SetAllLogLevel(cf.LogLevel)
+ log.SetAllLogLevel(logLevel)
//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index c5e1c14..1882135 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -124,7 +124,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
logger.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -148,7 +148,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -174,7 +174,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -195,7 +195,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -226,7 +226,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -277,7 +277,7 @@
Value: oId,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -315,7 +315,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -335,7 +335,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -354,7 +354,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -414,7 +414,7 @@
}
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -456,7 +456,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -508,7 +508,7 @@
Key: "packet",
Value: pkt,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -532,7 +532,7 @@
Key: "device_reason",
Value: reason,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -550,7 +550,7 @@
Key: "device_pm_config",
Value: pmConfigs,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
logger.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
return unPackResponse(rpc, pmConfigs.Id, success, result)
}
@@ -567,7 +567,7 @@
{Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -604,7 +604,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 9bb49ac..96829c5 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -103,7 +103,7 @@
logger.Debug("update-liveness-channel-reason-change")
b.liveness <- alive
b.lastLivenessTime = time.Now()
- } else if time.Now().Sub(b.lastLivenessTime) > b.LivenessChannelInterval {
+ } else if time.Since(b.lastLivenessTime) > b.LivenessChannelInterval {
logger.Debug("update-liveness-channel-reason-interval")
b.liveness <- alive
b.lastLivenessTime = time.Now()
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index a0f39cd..2d126f7 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -30,7 +30,6 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
ectdAPI *v3Client.Client
- leaderRev v3Client.Client
keyReservations map[string]*v3Client.LeaseID
watchedChannels sync.Map
writeLock sync.Mutex
@@ -39,9 +38,6 @@
lockToMutexLock sync.Mutex
}
-// Connection Timeout in Seconds
-var connTimeout int = 2
-
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
duration := GetDuration(timeout)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
index b9981e6..4de929f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
@@ -689,7 +689,10 @@
}
var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
h := md5.New()
- h.Write([]byte(flowString))
+ if _, err := h.Write([]byte(flowString)); err != nil {
+ logger.Errorw("hash-flow-status", log.Fields{"error": err})
+ return 0
+ }
hash := big.NewInt(0)
hash.SetBytes(h.Sum(nil))
return hash.Uint64()
@@ -745,7 +748,7 @@
meter.Stats.DurationSec = 0
meter.Stats.DurationNsec = 0
// band stats init
- for _, _ = range meterMod.Bands {
+ for range meterMod.Bands {
band := &ofp.OfpMeterBandStats{}
band.PacketBandCount = 0
band.ByteBandCount = 0
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index d21fdd5..aa77ffb 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -272,7 +272,14 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ logger.Errorw("send-failed", log.Fields{
+ "topic": toTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -287,7 +294,13 @@
// Wait for response as well as timeout or cancellation
// Remove the subscription for a response on return
- defer kp.unSubscribeForResponse(protoRequest.Header.Id)
+ defer func() {
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Errorw("response-unsubscribe-failed", log.Fields{
+ "id": protoRequest.Header.Id,
+ "error": err})
+ }
+ }()
select {
case msg, ok := <-ch:
if !ok {
@@ -378,23 +391,6 @@
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
-// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
-// responses from that topic.
-func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
- kp.topicToResponseChannelMap[topic] = arg
- }
-}
-
-func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
- kp.lockTopicResponseChannelMap.RLock()
- defer kp.lockTopicResponseChannelMap.RUnlock()
- _, exist := kp.topicToResponseChannelMap[topic]
- return exist
-}
-
func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
@@ -407,15 +403,16 @@
delete(kp.topicToResponseChannelMap, topic)
return err
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToResponseChannelMap {
+ for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -438,19 +435,22 @@
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
// Close the kafka client client first by unsubscribing to this topic
- kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ return err
+ }
delete(kp.topicToRequestHandlerChannelMap, topic)
return nil
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToRequestHandlerChannelMap {
+ for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -489,6 +489,7 @@
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
@@ -575,11 +576,12 @@
// Go over all returned values
var marshalledReturnedVal *any.Any
var err error
- for _, returnVal := range returnedValues {
- if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
+
+ // for now we support only 1 returned value - (excluding the error)
+ if len(returnedValues) > 0 {
+ if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
}
- break // for now we support only 1 returned value - (excluding the error)
}
responseBody := &ic.InterContainerResponseBody{
@@ -730,7 +732,14 @@
key := msg.Header.KeyTopic
logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
+ logger.Errorw("send-reply-failed", log.Fields{
+ "topic": replyTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index c0c16f9..deb72fd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -32,8 +32,6 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
-type returnErrorFunction func() error
-
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
//consumer or a group consumer
@@ -48,7 +46,6 @@
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- client sarama.Client
KafkaHost string
KafkaPort int
producer sarama.AsyncProducer
@@ -478,7 +475,7 @@
logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
- } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
@@ -558,7 +555,7 @@
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
- return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+ return fmt.Errorf("not-a-proto-msg-%s", msg)
}
var marshalled []byte
@@ -740,14 +737,6 @@
}
}
-func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- if _, exist := sc.topicToConsumerChannelMap[id]; exist {
- delete(sc.topicToConsumerChannelMap, id)
- }
-}
-
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
sc.lockTopicToConsumerChannelMap.RLock()
defer sc.lockTopicToConsumerChannelMap.RUnlock()
@@ -838,24 +827,6 @@
return nil
}
-func (sc *SaramaClient) clearConsumerChannelMap() error {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- var err error
- for topic, consumerCh := range sc.topicToConsumerChannelMap {
- for _, ch := range consumerCh.channels {
- // Channel will be closed in the removeChannel method
- removeChannel(consumerCh.channels, ch)
- }
- if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
- err = errTemp
- }
- //err = consumerCh.consumers.Close()
- delete(sc.topicToConsumerChannelMap, topic)
- }
- return err
-}
-
//createPublisher creates the publisher which is used to send a message onto kafka
func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
@@ -1082,7 +1053,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
@@ -1109,7 +1086,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 43567e3..69e22a4 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -50,9 +50,11 @@
"strings"
)
+type LogLevel int8
+
const (
// DebugLevel logs a message at debug level
- DebugLevel = iota
+ DebugLevel = LogLevel(iota)
// InfoLevel logs a message at info level
InfoLevel
// WarnLevel logs a message at warning level
@@ -106,10 +108,10 @@
Warningf(string, ...interface{})
// V reports whether verbosity level l is at least the requested verbose level.
- V(l int) bool
+ V(l LogLevel) bool
//Returns the log level of this specific logger
- GetLogLevel() int
+ GetLogLevel() LogLevel
}
// Fields is used as key-value pairs for structured logging
@@ -127,7 +129,7 @@
packageName string
}
-func intToAtomicLevel(l int) zp.AtomicLevel {
+func logLevelToAtomicLevel(l LogLevel) zp.AtomicLevel {
switch l {
case DebugLevel:
return zp.NewAtomicLevelAt(zc.DebugLevel)
@@ -143,7 +145,7 @@
return zp.NewAtomicLevelAt(zc.ErrorLevel)
}
-func intToLevel(l int) zc.Level {
+func logLevelToLevel(l LogLevel) zc.Level {
switch l {
case DebugLevel:
return zc.DebugLevel
@@ -159,7 +161,7 @@
return zc.ErrorLevel
}
-func levelToInt(l zc.Level) int {
+func levelToLogLevel(l zc.Level) LogLevel {
switch l {
case zc.DebugLevel:
return DebugLevel
@@ -175,25 +177,25 @@
return ErrorLevel
}
-func StringToInt(l string) int {
- switch l {
+func StringToLogLevel(l string) (LogLevel, error) {
+ switch strings.ToUpper(l) {
case "DEBUG":
- return DebugLevel
+ return DebugLevel, nil
case "INFO":
- return InfoLevel
+ return InfoLevel, nil
case "WARN":
- return WarnLevel
+ return WarnLevel, nil
case "ERROR":
- return ErrorLevel
+ return ErrorLevel, nil
case "FATAL":
- return FatalLevel
+ return FatalLevel, nil
}
- return ErrorLevel
+ return 0, errors.New("Given LogLevel is invalid : " + l)
}
-func getDefaultConfig(outputType string, level int, defaultFields Fields) zp.Config {
+func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
return zp.Config{
- Level: intToAtomicLevel(level),
+ Level: logLevelToAtomicLevel(level),
Encoding: outputType,
Development: true,
OutputPaths: []string{"stdout"},
@@ -215,7 +217,7 @@
// SetLogger needs to be invoked before the logger API can be invoked. This function
// initialize the default logger (zap's sugaredlogger)
-func SetDefaultLogger(outputType string, level int, defaultFields Fields) (Logger, error) {
+func SetDefaultLogger(outputType string, level LogLevel, defaultFields Fields) (Logger, error) {
// Build a custom config using zap
cfg = getDefaultConfig(outputType, level, defaultFields)
@@ -241,7 +243,7 @@
// be available to it, notably log tracing with filename.functionname.linenumber annotation.
//
// pkgNames parameter should be used for testing only as this function detects the caller's package.
-func AddPackage(outputType string, level int, defaultFields Fields, pkgNames ...string) (Logger, error) {
+func AddPackage(outputType string, level LogLevel, defaultFields Fields, pkgNames ...string) (Logger, error) {
if cfgs == nil {
cfgs = make(map[string]zp.Config)
}
@@ -318,12 +320,12 @@
func UpdateLogger(defaultFields Fields) (Logger, error) {
pkgName, _, _, _ := getCallerInfo()
if _, exist := loggers[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("package-%s-not-registered", pkgName))
+ return nil, fmt.Errorf("package-%s-not-registered", pkgName)
}
// Build a new logger
if _, exist := cfgs[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("config-%s-not-registered", pkgName))
+ return nil, fmt.Errorf("config-%s-not-registered", pkgName)
}
cfg := cfgs[pkgName]
@@ -347,7 +349,7 @@
return loggers[pkgName], nil
}
-func setLevel(cfg zp.Config, level int) {
+func setLevel(cfg zp.Config, level LogLevel) {
switch level {
case DebugLevel:
cfg.Level.SetLevel(zc.DebugLevel)
@@ -366,7 +368,7 @@
//SetPackageLogLevel dynamically sets the log level of a given package to level. This is typically invoked at an
// application level during debugging
-func SetPackageLogLevel(packageName string, level int) {
+func SetPackageLogLevel(packageName string, level LogLevel) {
// Get proper config
if cfg, ok := cfgs[packageName]; ok {
setLevel(cfg, level)
@@ -374,7 +376,7 @@
}
//SetAllLogLevel sets the log level of all registered packages to level
-func SetAllLogLevel(level int) {
+func SetAllLogLevel(level LogLevel) {
// Get proper config
for _, cfg := range cfgs {
setLevel(cfg, level)
@@ -382,7 +384,7 @@
}
//GetPackageLogLevel returns the current log level of a package.
-func GetPackageLogLevel(packageName ...string) (int, error) {
+func GetPackageLogLevel(packageName ...string) (LogLevel, error) {
var name string
if len(packageName) == 1 {
name = packageName[0]
@@ -390,21 +392,21 @@
name, _, _, _ = getCallerInfo()
}
if cfg, ok := cfgs[name]; ok {
- return levelToInt(cfg.Level.Level()), nil
+ return levelToLogLevel(cfg.Level.Level()), nil
}
- return 0, errors.New(fmt.Sprintf("unknown-package-%s", name))
+ return 0, fmt.Errorf("unknown-package-%s", name)
}
//GetDefaultLogLevel gets the log level used for packages that don't have specific loggers
-func GetDefaultLogLevel() int {
- return levelToInt(cfg.Level.Level())
+func GetDefaultLogLevel() LogLevel {
+ return levelToLogLevel(cfg.Level.Level())
}
//SetLogLevel sets the log level for the logger corresponding to the caller's package
-func SetLogLevel(level int) error {
+func SetLogLevel(level LogLevel) error {
pkgName, _, _, _ := getCallerInfo()
if _, exist := cfgs[pkgName]; !exist {
- return errors.New(fmt.Sprintf("unregistered-package-%s", pkgName))
+ return fmt.Errorf("unregistered-package-%s", pkgName)
}
cfg := cfgs[pkgName]
setLevel(cfg, level)
@@ -412,7 +414,7 @@
}
//SetDefaultLogLevel sets the log level used for packages that don't have specific loggers
-func SetDefaultLogLevel(level int) {
+func SetDefaultLogLevel(level LogLevel) {
setLevel(cfg, level)
}
@@ -641,13 +643,13 @@
}
// V reports whether verbosity level l is at least the requested verbose level.
-func (l logger) V(level int) bool {
- return l.parent.Core().Enabled(intToLevel(level))
+func (l logger) V(level LogLevel) bool {
+ return l.parent.Core().Enabled(logLevelToLevel(level))
}
// GetLogLevel returns the current level of the logger
-func (l logger) GetLogLevel() int {
- return levelToInt(cfgs[l.packageName].Level.Level())
+func (l logger) GetLogLevel() LogLevel {
+ return levelToLogLevel(cfgs[l.packageName].Level.Level())
}
// With returns a logger initialized with the key-value pairs
@@ -776,11 +778,11 @@
}
// V reports whether verbosity level l is at least the requested verbose level.
-func V(level int) bool {
+func V(level LogLevel) bool {
return getPackageLevelLogger().V(level)
}
//GetLogLevel returns the log level of the invoking package
-func GetLogLevel() int {
+func GetLogLevel() LogLevel {
return getPackageLevelLogger().GetLogLevel()
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
index 9f00953..932c287 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
@@ -231,15 +231,26 @@
p.mutex.RLock()
defer p.mutex.RUnlock()
w.Header().Set("Content-Type", "application/json")
- w.Write([]byte("{"))
+ if _, err := w.Write([]byte("{")); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma := ""
for c, s := range p.status {
- w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String())))
+ if _, err := w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String()))); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma = ", "
}
- w.Write([]byte("}"))
+ if _, err := w.Write([]byte("}")); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
w.WriteHeader(http.StatusOK)
-
}
// ListenAndServe implements 3 HTTP endpoints on the given port for healthz, readz, and detailz. Returns only on error
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 354f814..9c7063f 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -96,7 +96,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.0.7
+# github.com/opencord/voltha-lib-go/v3 v3.0.12
github.com/opencord/voltha-lib-go/v3/pkg/log
github.com/opencord/voltha-lib-go/v3/pkg/db
github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore