[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go
new file mode 100644
index 0000000..608361b
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/utils.go
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"errors"
+	"strings"
+	"time"
+
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+)
+
+const (
+	TopicSeparator = "_"
+	DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+	// The name of the topic. It must start with a letter,
+	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+	// signs (`%`).
+	Name string
+}
+
+type KVArg struct {
+	Key   string
+	Value interface{}
+}
+
+type RpcMType int
+
+const (
+	RpcFormattingError RpcMType = iota
+	RpcSent
+	RpcReply
+	RpcTimeout
+	RpcTransportError
+	RpcSystemClosing
+)
+
+type RpcResponse struct {
+	MType RpcMType
+	Err   error
+	Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+	return &RpcResponse{
+		MType: messageType,
+		Err:   err,
+		Reply: body,
+	}
+}
+
+// TODO:  Remove and provide better may to get the device id
+// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
+//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+	pos := strings.LastIndex(topic.Name, TopicSeparator)
+	if pos == -1 {
+		return ""
+	}
+	adjustedPos := pos + len(TopicSeparator)
+	if adjustedPos >= len(topic.Name) {
+		return ""
+	}
+	deviceId := topic.Name[adjustedPos:len(topic.Name)]
+	if len(deviceId) != DeviceIdLength {
+		return ""
+	}
+	return deviceId
+}
+
+// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
+// context times out.
+func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
+	if kClient == nil {
+		return errors.New("kafka-client-is-nil")
+	}
+	for {
+		if err := kClient.Start(ctx); err != nil {
+			probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+		probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+		logger.Info(ctx, "kafka-connection-up")
+		break
+	}
+	return nil
+}
+
+/**
+MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func MonitorKafkaReadiness(ctx context.Context,
+	kClient Client,
+	liveProbeInterval, notLiveProbeInterval time.Duration,
+	serviceName string) {
+
+	if kClient == nil {
+		logger.Fatal(ctx, "kafka-client-is-nil")
+	}
+
+	logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
+
+	livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
+	healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
+	timeout := liveProbeInterval
+	failed := false
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+
+		select {
+		case healthiness := <-healthinessChannel:
+			if !healthiness {
+				// This will eventually cause K8s to restart the container, and will do
+				// so in a way that allows cleanup to continue, rather than an immediate
+				// panic and exit here.
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
+				logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
+				failed = true
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case liveliness := <-livelinessChannel:
+			if failed {
+				// Failures of the message bus are permanent and can't ever be recovered from,
+				// so make sure we never inadvertently reset a failed state back to unready.
+			} else if !liveliness {
+				// kafka not reachable or down, updating the status to not ready state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+				logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
+				timeout = notLiveProbeInterval
+			} else {
+				// kafka is reachable , updating the status to running state
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				timeout = liveProbeInterval
+			}
+			// Check if the timer has expired or not
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := kClient.SendLiveness(ctx)
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
+				}
+			}()
+		case <-ctx.Done():
+			return // just exit
+		}
+	}
+}