VOL-2624 - fix SCA issues
Change-Id: I3a5e0aafc5b6bd6c6e865675a3481db289a7d772
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index d21fdd5..aa77ffb 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -272,7 +272,14 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
//key := GetDeviceIdFromTopic(*toTopic)
logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ logger.Errorw("send-failed", log.Fields{
+ "topic": toTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -287,7 +294,13 @@
// Wait for response as well as timeout or cancellation
// Remove the subscription for a response on return
- defer kp.unSubscribeForResponse(protoRequest.Header.Id)
+ defer func() {
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Errorw("response-unsubscribe-failed", log.Fields{
+ "id": protoRequest.Header.Id,
+ "error": err})
+ }
+ }()
select {
case msg, ok := <-ch:
if !ok {
@@ -378,23 +391,6 @@
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
-// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
-// responses from that topic.
-func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
- kp.topicToResponseChannelMap[topic] = arg
- }
-}
-
-func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
- kp.lockTopicResponseChannelMap.RLock()
- defer kp.lockTopicResponseChannelMap.RUnlock()
- _, exist := kp.topicToResponseChannelMap[topic]
- return exist
-}
-
func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
@@ -407,15 +403,16 @@
delete(kp.topicToResponseChannelMap, topic)
return err
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToResponseChannelMap {
+ for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -438,19 +435,22 @@
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
// Close the kafka client client first by unsubscribing to this topic
- kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ return err
+ }
delete(kp.topicToRequestHandlerChannelMap, topic)
return nil
} else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
+ return fmt.Errorf("%s-Topic-not-found", topic)
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
var err error
- for topic, _ := range kp.topicToRequestHandlerChannelMap {
+ for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
@@ -489,6 +489,7 @@
}
}
+// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
@@ -575,11 +576,12 @@
// Go over all returned values
var marshalledReturnedVal *any.Any
var err error
- for _, returnVal := range returnedValues {
- if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
+
+ // for now we support only 1 returned value - (excluding the error)
+ if len(returnedValues) > 0 {
+ if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
}
- break // for now we support only 1 returned value - (excluding the error)
}
responseBody := &ic.InterContainerResponseBody{
@@ -730,7 +732,14 @@
key := msg.Header.KeyTopic
logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
+ go func() {
+ if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
+ logger.Errorw("send-reply-failed", log.Fields{
+ "topic": replyTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index 56c90ca..f32c16c 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -52,9 +52,6 @@
type myInterface struct {
}
-func (m *myInterface) doSomething() {
-}
-
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
actualResult := newInterContainerProxy(RequestHandlerInterface(m))
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index c0c16f9..deb72fd 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -32,8 +32,6 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
-type returnErrorFunction func() error
-
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
//consumer or a group consumer
@@ -48,7 +46,6 @@
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- client sarama.Client
KafkaHost string
KafkaPort int
producer sarama.AsyncProducer
@@ -478,7 +475,7 @@
logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
- } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
@@ -558,7 +555,7 @@
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
- return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+ return fmt.Errorf("not-a-proto-msg-%s", msg)
}
var marshalled []byte
@@ -740,14 +737,6 @@
}
}
-func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- if _, exist := sc.topicToConsumerChannelMap[id]; exist {
- delete(sc.topicToConsumerChannelMap, id)
- }
-}
-
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
sc.lockTopicToConsumerChannelMap.RLock()
defer sc.lockTopicToConsumerChannelMap.RUnlock()
@@ -838,24 +827,6 @@
return nil
}
-func (sc *SaramaClient) clearConsumerChannelMap() error {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- var err error
- for topic, consumerCh := range sc.topicToConsumerChannelMap {
- for _, ch := range consumerCh.channels {
- // Channel will be closed in the removeChannel method
- removeChannel(consumerCh.channels, ch)
- }
- if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
- err = errTemp
- }
- //err = consumerCh.consumers.Close()
- delete(sc.topicToConsumerChannelMap, topic)
- }
- return err
-}
-
//createPublisher creates the publisher which is used to send a message onto kafka
func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
@@ -1082,7 +1053,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
@@ -1109,7 +1086,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}