VOL-2624 - fix SCA issues
Change-Id: I3a5e0aafc5b6bd6c6e865675a3481db289a7d772
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 7cb933d..86f186d 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -124,7 +124,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
logger.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -148,7 +148,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -174,7 +174,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -195,7 +195,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -226,7 +226,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -277,7 +277,7 @@
Value: oId,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -315,7 +315,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -335,7 +335,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -354,7 +354,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -414,7 +414,7 @@
}
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -450,7 +450,7 @@
Value: id,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
@@ -496,7 +496,7 @@
Key: "packet",
Value: pkt,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -520,7 +520,7 @@
Key: "device_reason",
Value: reason,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -538,7 +538,7 @@
Key: "device_pm_config",
Value: pmConfigs,
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
logger.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
return unPackResponse(rpc, pmConfigs.Id, success, result)
}
@@ -555,7 +555,7 @@
{Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
}
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
logger.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
@@ -592,7 +592,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
logger.Debugw("PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index 9bb49ac..96829c5 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -103,7 +103,7 @@
logger.Debug("update-liveness-channel-reason-change")
b.liveness <- alive
b.lastLivenessTime = time.Now()
- } else if time.Now().Sub(b.lastLivenessTime) > b.LivenessChannelInterval {
+ } else if time.Since(b.lastLivenessTime) > b.LivenessChannelInterval {
logger.Debug("update-liveness-channel-reason-interval")
b.liveness <- alive
b.lastLivenessTime = time.Now()
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index 0da95cd..ac1d0df 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -242,6 +242,7 @@
// Assert that kvstore has this value stored
kvpair, err := backend.Get(ctx, "key1")
+ assert.Nil(t, err)
assert.NotNil(t, kvpair)
assert.Equal(t, defaultPathPrefix+"/key1", kvpair.Key)
assert.Equal(t, []uint8("value1"), kvpair.Value)
@@ -250,6 +251,7 @@
err = backend.Put(ctx, "key1", []uint8("value11"))
assert.Nil(t, err)
kvpair, err = backend.Get(ctx, "key1")
+ assert.Nil(t, err)
assert.NotNil(t, kvpair)
assert.Equal(t, defaultPathPrefix+"/key1", kvpair.Key)
assert.Equal(t, []uint8("value11"), kvpair.Value)
@@ -273,6 +275,7 @@
defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
err := backend.Put(ctx, "key2", []uint8("value2"))
+ assert.Nil(t, err)
// Assert alive state has become true
assert.True(t, backend.alive)
@@ -286,8 +289,8 @@
// Assert that Get works fine for absent key3
kvpair, err = backend.Get(ctx, "key3")
- assert.Nil(t, kvpair)
assert.Nil(t, err) // no error as lookup is successful
+ assert.Nil(t, kvpair)
}
// Get operation should fail against Dummy Non-existent Etcd Server
@@ -309,18 +312,21 @@
defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
err := backend.Put(ctx, "key3", []uint8("value3"))
+ assert.Nil(t, err)
// Assert alive state has become true
assert.True(t, backend.alive)
// Assert that kvstore has this key stored
kvpair, err := backend.Get(ctx, "key3")
+ assert.Nil(t, err)
assert.NotNil(t, kvpair)
// Delete and Assert that key has been removed
err = backend.Delete(ctx, "key3")
assert.Nil(t, err)
kvpair, err = backend.Get(ctx, "key3")
+ assert.Nil(t, err)
assert.Nil(t, kvpair)
// Assert that Delete silently ignores absent key3
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index a0f39cd..2d126f7 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -30,7 +30,6 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
ectdAPI *v3Client.Client
- leaderRev v3Client.Client
keyReservations map[string]*v3Client.LeaseID
watchedChannels sync.Map
writeLock sync.Mutex
@@ -39,9 +38,6 @@
lockToMutexLock sync.Mutex
}
-// Connection Timeout in Seconds
-var connTimeout int = 2
-
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
duration := GetDuration(timeout)
diff --git a/pkg/flows/flow_utils.go b/pkg/flows/flow_utils.go
index b9981e6..4de929f 100644
--- a/pkg/flows/flow_utils.go
+++ b/pkg/flows/flow_utils.go
@@ -689,7 +689,10 @@
}
var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
h := md5.New()
- h.Write([]byte(flowString))
+ if _, err := h.Write([]byte(flowString)); err != nil {
+ logger.Errorw("hash-flow-status", log.Fields{"error": err})
+ return 0
+ }
hash := big.NewInt(0)
hash.SetBytes(h.Sum(nil))
return hash.Uint64()
@@ -745,7 +748,7 @@
meter.Stats.DurationSec = 0
meter.Stats.DurationNsec = 0
// band stats init
- for _, _ = range meterMod.Bands {
+ for range meterMod.Bands {
band := &ofp.OfpMeterBandStats{}
band.PacketBandCount = 0
band.ByteBandCount = 0
diff --git a/pkg/flows/flow_utils_test.go b/pkg/flows/flow_utils_test.go
index fc2ff27..5219cf3 100644
--- a/pkg/flows/flow_utils_test.go
+++ b/pkg/flows/flow_utils_test.go
@@ -45,8 +45,7 @@
allFlows = fg.ListFlows()
assert.Equal(t, 0, len(allFlows))
- var fa *FlowArgs
- fa = &FlowArgs{
+ fa := &FlowArgs{
KV: OfpFlowModArgs{"priority": 500},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(1),
@@ -322,8 +321,7 @@
assert.Equal(t, 0, len(val.ListFlows()))
assert.Equal(t, 0, len(val.ListGroups()))
- var fa *FlowArgs
- fa = &FlowArgs{
+ fa := &FlowArgs{
KV: OfpFlowModArgs{"priority": 500},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(2),
@@ -705,5 +703,5 @@
mcastIp := uint32(4001431809) //238.129.1.1
expectedMacInBytes := []byte{1, 0, 94, 1, 1, 1} //01:00:5e:01:01:01
macInBytes := ConvertToMulticastMacBytes(mcastIp)
- assert.True(t, bytes.Compare(macInBytes, expectedMacInBytes) == 0)
+ assert.True(t, bytes.Equal(macInBytes, expectedMacInBytes))
}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index d21fdd5..aa77ffb 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -272,7 +272,14 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ logger.Errorw("send-failed", log.Fields{
+ "topic": toTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -287,7 +294,13 @@
// Wait for response as well as timeout or cancellation
// Remove the subscription for a response on return
- defer kp.unSubscribeForResponse(protoRequest.Header.Id)
+ defer func() {
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Errorw("response-unsubscribe-failed", log.Fields{
+ "id": protoRequest.Header.Id,
+ "error": err})
+ }
+ }()
select {
case msg, ok := <-ch:
if !ok {
@@ -378,23 +391,6 @@
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
-// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
-// responses from that topic.
-func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
- kp.topicToResponseChannelMap[topic] = arg
- }
-}
-
-func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
- kp.lockTopicResponseChannelMap.RLock()
- defer kp.lockTopicResponseChannelMap.RUnlock()
- _, exist := kp.topicToResponseChannelMap[topic]
- return exist
-}
-
func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
@@ -407,15 +403,16 @@
delete(kp.topicToResponseChannelMap, topic)
return err
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToResponseChannelMap {
+ for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -438,19 +435,22 @@
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
// Close the kafka client client first by unsubscribing to this topic
- kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ return err
+ }
delete(kp.topicToRequestHandlerChannelMap, topic)
return nil
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToRequestHandlerChannelMap {
+ for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -489,6 +489,7 @@
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
@@ -575,11 +576,12 @@
// Go over all returned values
var marshalledReturnedVal *any.Any
var err error
- for _, returnVal := range returnedValues {
- if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
+
+ // for now we support only 1 returned value - (excluding the error)
+ if len(returnedValues) > 0 {
+ if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
}
- break // for now we support only 1 returned value - (excluding the error)
}
responseBody := &ic.InterContainerResponseBody{
@@ -730,7 +732,14 @@
key := msg.Header.KeyTopic
logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
+ logger.Errorw("send-reply-failed", log.Fields{
+ "topic": replyTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index 56c90ca..f32c16c 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -52,9 +52,6 @@
type myInterface struct {
}
-func (m *myInterface) doSomething() {
-}
-
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
actualResult := newInterContainerProxy(RequestHandlerInterface(m))
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index c0c16f9..deb72fd 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -32,8 +32,6 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
-type returnErrorFunction func() error
-
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
//consumer or a group consumer
@@ -48,7 +46,6 @@
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- client sarama.Client
KafkaHost string
KafkaPort int
producer sarama.AsyncProducer
@@ -478,7 +475,7 @@
logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
- } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
@@ -558,7 +555,7 @@
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
- return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+ return fmt.Errorf("not-a-proto-msg-%s", msg)
}
var marshalled []byte
@@ -740,14 +737,6 @@
}
}
-func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- if _, exist := sc.topicToConsumerChannelMap[id]; exist {
- delete(sc.topicToConsumerChannelMap, id)
- }
-}
-
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
sc.lockTopicToConsumerChannelMap.RLock()
defer sc.lockTopicToConsumerChannelMap.RUnlock()
@@ -838,24 +827,6 @@
return nil
}
-func (sc *SaramaClient) clearConsumerChannelMap() error {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- var err error
- for topic, consumerCh := range sc.topicToConsumerChannelMap {
- for _, ch := range consumerCh.channels {
- // Channel will be closed in the removeChannel method
- removeChannel(consumerCh.channels, ch)
- }
- if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
- err = errTemp
- }
- //err = consumerCh.consumers.Close()
- delete(sc.topicToConsumerChannelMap, topic)
- }
- return err
-}
-
//createPublisher creates the publisher which is used to send a message onto kafka
func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
@@ -1082,7 +1053,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
@@ -1109,7 +1086,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
diff --git a/pkg/log/log.go b/pkg/log/log.go
index 6b07693..07e5830 100644
--- a/pkg/log/log.go
+++ b/pkg/log/log.go
@@ -320,12 +320,12 @@
func UpdateLogger(defaultFields Fields) (Logger, error) {
pkgName, _, _, _ := getCallerInfo()
if _, exist := loggers[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("package-%s-not-registered", pkgName))
+ return nil, fmt.Errorf("package-%s-not-registered", pkgName)
}
// Build a new logger
if _, exist := cfgs[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("config-%s-not-registered", pkgName))
+ return nil, fmt.Errorf("config-%s-not-registered", pkgName)
}
cfg := cfgs[pkgName]
@@ -394,7 +394,7 @@
if cfg, ok := cfgs[name]; ok {
return levelToLogLevel(cfg.Level.Level()), nil
}
- return 0, errors.New(fmt.Sprintf("unknown-package-%s", name))
+ return 0, fmt.Errorf("unknown-package-%s", name)
}
//GetDefaultLogLevel gets the log level used for packages that don't have specific loggers
@@ -406,7 +406,7 @@
func SetLogLevel(level LogLevel) error {
pkgName, _, _, _ := getCallerInfo()
if _, exist := cfgs[pkgName]; !exist {
- return errors.New(fmt.Sprintf("unregistered-package-%s", pkgName))
+ return fmt.Errorf("unregistered-package-%s", pkgName)
}
cfg := cfgs[pkgName]
setLevel(cfg, level)
diff --git a/pkg/ponresourcemanager/ponresourcemanager.go b/pkg/ponresourcemanager/ponresourcemanager.go
index ad2150a..6ed18e9 100755
--- a/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/pkg/ponresourcemanager/ponresourcemanager.go
@@ -411,7 +411,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ONU_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ONU_ID); !status {
log.Error("Failed to clear ONU ID resource pool")
return errors.New("Failed to clear ONU ID resource pool")
}
@@ -425,7 +425,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ALLOC_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ALLOC_ID); !status {
log.Error("Failed to clear ALLOC ID resource pool ")
return errors.New("Failed to clear ALLOC ID resource pool")
}
@@ -438,7 +438,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(ctx, Intf, GEMPORT_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, GEMPORT_ID); !status {
log.Error("Failed to clear GEMPORT ID resource pool")
return errors.New("Failed to clear GEMPORT ID resource pool")
}
@@ -452,7 +452,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(ctx, Intf, FLOW_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, FLOW_ID); !status {
log.Error("Failed to clear FLOW ID resource pool")
return errors.New("Failed to clear FLOW ID resource pool")
}
@@ -483,7 +483,7 @@
Path := PONRMgr.GetPath(Intf, ResourceType)
if Path == "" {
log.Errorf("Failed to get path for resource type %s", ResourceType)
- return errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
+ return fmt.Errorf("Failed to get path for resource type %s", ResourceType)
}
//In case of adapter reboot and reconciliation resource in kv store
@@ -563,6 +563,9 @@
}
Value, err = ToByte(Resource.Value)
+ if err != nil {
+ return nil, err
+ }
// decode resource fetched from backend store to dictionary
err = json.Unmarshal(Value, &Result)
@@ -632,7 +635,7 @@
*/
if NumIDs < 1 {
log.Error("Invalid number of resources requested")
- return nil, errors.New(fmt.Sprintf("Invalid number of resources requested %d", NumIDs))
+ return nil, fmt.Errorf("Invalid number of resources requested %d", NumIDs)
}
// delegate to the master instance if sharing enabled across instances
@@ -645,7 +648,7 @@
Path := PONRMgr.GetPath(IntfID, ResourceType)
if Path == "" {
log.Errorf("Failed to get path for resource type %s", ResourceType)
- return nil, errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
+ return nil, fmt.Errorf("Failed to get path for resource type %s", ResourceType)
}
log.Debugf("Get resource for type %s on path %s", ResourceType, Path)
var Result []uint32
@@ -682,7 +685,7 @@
//Update resource in kv store
if PONRMgr.UpdateResource(ctx, Path, Resource) != nil {
log.Errorf("Failed to update resource %s", Path)
- return nil, errors.New(fmt.Sprintf("Failed to update resource %s", Path))
+ return nil, fmt.Errorf("Failed to update resource %s", Path)
}
return Result, nil
}
@@ -706,7 +709,7 @@
:param release_content: required number of ids
:return boolean: True if all IDs in given release_content release else False
*/
- if checkValidResourceType(ResourceType) == false {
+ if !checkValidResourceType(ResourceType) {
log.Error("Invalid resource type")
return false
}
@@ -1018,7 +1021,7 @@
: return true and the index if present false otherwise.
*/
- for idx, _ := range FlowIDList {
+ for idx := range FlowIDList {
if FlowID == FlowIDList[idx] {
return true, uint32(idx)
}
@@ -1042,13 +1045,13 @@
FlowIDs := PONRMgr.GetCurrentFlowIDsForOnu(ctx, IntfONUID)
if Add {
- if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == true {
- return err
+ if RetVal, _ = checkForFlowIDInList(FlowIDs, FlowID); RetVal {
+ return nil
}
FlowIDs = append(FlowIDs, FlowID)
} else {
- if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == false {
- return err
+ if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); !RetVal {
+ return nil
}
// delete the index and shift
FlowIDs = append(FlowIDs[:IDx], FlowIDs[IDx+1:]...)
@@ -1110,8 +1113,7 @@
Len := Data.Len()
var Idx int
for Idx = 0; Idx < Len; Idx++ {
- Val := Data.Get(Idx)
- if Val == false {
+ if !Data.Get(Idx) {
break
}
}
@@ -1138,8 +1140,7 @@
log.Error("Failed to get resource pool")
return false
}
- var Idx uint32
- Idx = Id - uint32(Resource[START_IDX].(float64))
+ Idx := Id - uint32(Resource[START_IDX].(float64))
Data.Set(int(Idx), false)
Resource[POOL] = Data.Data(false)
diff --git a/pkg/probe/probe.go b/pkg/probe/probe.go
index 9f00953..932c287 100644
--- a/pkg/probe/probe.go
+++ b/pkg/probe/probe.go
@@ -231,15 +231,26 @@
p.mutex.RLock()
defer p.mutex.RUnlock()
w.Header().Set("Content-Type", "application/json")
- w.Write([]byte("{"))
+ if _, err := w.Write([]byte("{")); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma := ""
for c, s := range p.status {
- w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String())))
+ if _, err := w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String()))); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma = ", "
}
- w.Write([]byte("}"))
+ if _, err := w.Write([]byte("}")); err != nil {
+ log.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
w.WriteHeader(http.StatusOK)
-
}
// ListenAndServe implements 3 HTTP endpoints on the given port for healthz, readz, and detailz. Returns only on error
diff --git a/pkg/probe/probe_test.go b/pkg/probe/probe_test.go
index 9edc561..93f9a03 100644
--- a/pkg/probe/probe_test.go
+++ b/pkg/probe/probe_test.go
@@ -29,7 +29,9 @@
)
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ if _, err := log.AddPackage(log.JSON, log.WarnLevel, nil); err != nil {
+ log.Fatalw("adding-log-package", log.Fields{"error": err})
+ }
}
func TestServiceStatusString(t *testing.T) {
@@ -366,6 +368,7 @@
func TestUpdateStatusFromNilContext(t *testing.T) {
p := &Probe{}
p.RegisterService("one")
+ // nolint: staticcheck
UpdateStatusFromContext(nil, "one", ServiceStatusRunning)
assert.Equal(t, 1, len(p.status), "wrong number of services")
diff --git a/pkg/techprofile/config.go b/pkg/techprofile/config.go
index 2df7147..4af2bd5 100644
--- a/pkg/techprofile/config.go
+++ b/pkg/techprofile/config.go
@@ -26,13 +26,9 @@
defaultVersion = 1.0
defaultLogLevel = 0
defaultGemportsCount = 1
- defaultNumTconts = 1
defaultPbits = "0b11111111"
- defaultKVStoreType = "etcd"
defaultKVStoreTimeout = 5 //in seconds
- defaultKVStoreHost = "127.0.0.1"
- defaultKVStorePort = 2379 // Consul = 8500; Etcd = 2379
// Tech profile path prefix in kv store
defaultKVPathPrefix = "service/voltha/technology_profiles"
diff --git a/pkg/techprofile/tech_profile.go b/pkg/techprofile/tech_profile.go
index b268a4c..ba8855f 100644
--- a/pkg/techprofile/tech_profile.go
+++ b/pkg/techprofile/tech_profile.go
@@ -121,7 +121,6 @@
const (
defaultOnuInstance = "multi-instance"
defaultUniInstance = "single-instance"
- defaultNumGemPorts = 1
defaultGemPayloadSize = "auto"
)
@@ -437,8 +436,10 @@
return nil
}
} else { // "single-instance"
- tpInst, err := t.getSingleInstanceTp(ctx, tpInstPath)
- if tpInst == nil {
+ if tpInst, err := t.getSingleInstanceTp(ctx, tpInstPath); err != nil {
+ log.Errorw("Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
+ return nil
+ } else if tpInst == nil {
// No "single-instance" tp found on one any uni port for the given TP ID
// Allocate a new TcontID or AllocID
if tcontIDs, err = t.resourceMgr.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {