[VOL-2356]core log_level command line argument should take log level names not int value

- StringToLogLevel method returns two arguments.

Change-Id: I83d20d645fa63363e71265b163273414f14688a7
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index c0c16f9..deb72fd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -32,8 +32,6 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 )
 
-type returnErrorFunction func() error
-
 // consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
 // topic, the consumer(s) broadcasts the message to all the listening channels.   The consumer can be a partition
 //consumer or a group consumer
@@ -48,7 +46,6 @@
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
 	cAdmin                        sarama.ClusterAdmin
-	client                        sarama.Client
 	KafkaHost                     string
 	KafkaPort                     int
 	producer                      sarama.AsyncProducer
@@ -478,7 +475,7 @@
 			logger.Info("update-liveness-channel-because-change")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
-		} else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
+		} else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
 			logger.Info("update-liveness-channel-because-interval")
 			sc.liveness <- alive
 			sc.lastLivenessTime = time.Now()
@@ -558,7 +555,7 @@
 	// ascertain the value interface type is a proto.Message
 	if protoMsg, ok = msg.(proto.Message); !ok {
 		logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
-		return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+		return fmt.Errorf("not-a-proto-msg-%s", msg)
 	}
 
 	var marshalled []byte
@@ -740,14 +737,6 @@
 	}
 }
 
-func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
-	sc.lockTopicToConsumerChannelMap.Lock()
-	defer sc.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := sc.topicToConsumerChannelMap[id]; exist {
-		delete(sc.topicToConsumerChannelMap, id)
-	}
-}
-
 func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
 	sc.lockTopicToConsumerChannelMap.RLock()
 	defer sc.lockTopicToConsumerChannelMap.RUnlock()
@@ -838,24 +827,6 @@
 	return nil
 }
 
-func (sc *SaramaClient) clearConsumerChannelMap() error {
-	sc.lockTopicToConsumerChannelMap.Lock()
-	defer sc.lockTopicToConsumerChannelMap.Unlock()
-	var err error
-	for topic, consumerCh := range sc.topicToConsumerChannelMap {
-		for _, ch := range consumerCh.channels {
-			// Channel will be closed in the removeChannel method
-			removeChannel(consumerCh.channels, ch)
-		}
-		if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
-			err = errTemp
-		}
-		//err = consumerCh.consumers.Close()
-		delete(sc.topicToConsumerChannelMap, topic)
-	}
-	return err
-}
-
 //createPublisher creates the publisher which is used to send a message onto kafka
 func (sc *SaramaClient) createPublisher() error {
 	// This Creates the publisher
@@ -1082,7 +1053,13 @@
 	sc.addTopicToConsumerChannelMap(topic.Name, cc)
 
 	//Start a consumers to listen on that specific topic
-	go sc.startConsumers(topic)
+	go func() {
+		if err := sc.startConsumers(topic); err != nil {
+			logger.Errorw("start-consumers-failed", log.Fields{
+				"topic": topic,
+				"error": err})
+		}
+	}()
 
 	return consumerListeningChannel, nil
 }
@@ -1109,7 +1086,13 @@
 	sc.addTopicToConsumerChannelMap(topic.Name, cc)
 
 	//Start a consumers to listen on that specific topic
-	go sc.startConsumers(topic)
+	go func() {
+		if err := sc.startConsumers(topic); err != nil {
+			logger.Errorw("start-consumers-failed", log.Fields{
+				"topic": topic,
+				"error": err})
+		}
+	}()
 
 	return consumerListeningChannel, nil
 }