VOL-2631 Update to voltha-lib-go 3.0.14

Change-Id: I21ef41d49ad6bd42ad301962583d2aabdea7ccad
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 1882135..86f186d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -284,7 +284,7 @@
 		volthaDevice := &voltha.Device{}
 		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+			return nil, status.Error(codes.InvalidArgument, err.Error())
 		}
 		return volthaDevice, nil
 	} else {
@@ -294,8 +294,8 @@
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		}
 		logger.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
-		// TODO: Need to get the real error code
-		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
 	}
 
 }
@@ -361,7 +361,7 @@
 		volthaDevice := &voltha.Device{}
 		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+			return nil, status.Error(codes.InvalidArgument, err.Error())
 		}
 		return volthaDevice, nil
 	} else {
@@ -372,7 +372,7 @@
 		}
 		logger.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
 		// TODO:  Need to get the real error code
-		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
 	}
 }
 
@@ -421,7 +421,7 @@
 		volthaDevice := &voltha.Device{}
 		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+			return nil, status.Error(codes.InvalidArgument, err.Error())
 		}
 		return volthaDevice, nil
 	} else {
@@ -432,13 +432,7 @@
 		}
 		logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
 
-		code := codes.Internal
-
-		if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
-			code = codes.DeadlineExceeded
-		}
-
-		return nil, status.Errorf(code, "%s", unpackResult.Reason)
+		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
 	}
 }
 
@@ -463,7 +457,7 @@
 		volthaDevices := &voltha.Devices{}
 		if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+			return nil, status.Error(codes.InvalidArgument, err.Error())
 		}
 		return volthaDevices, nil
 	} else {
@@ -474,13 +468,7 @@
 		}
 		logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
 
-		code := codes.Internal
-
-		if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
-			code = codes.DeadlineExceeded
-		}
-
-		return nil, status.Errorf(code, "%s", unpackResult.Reason)
+		return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
 	}
 }
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
index 034de8e..da9c9eb 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
@@ -23,6 +23,7 @@
 	"strings"
 	"time"
 
+	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -60,7 +61,11 @@
 	return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
 }
 
-func (ep *EventProxy) getEventHeader(eventName string, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, eventType adapterif.EventType, raisedTs int64) *voltha.EventHeader {
+func (ep *EventProxy) getEventHeader(eventName string,
+	category adapterif.EventCategory,
+	subCategory adapterif.EventSubCategory,
+	eventType adapterif.EventType,
+	raisedTs int64) (*voltha.EventHeader, error) {
 	var header voltha.EventHeader
 	if strings.Contains(eventName, "_") {
 		eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
@@ -73,9 +78,21 @@
 	header.SubCategory = subCategory
 	header.Type = eventType
 	header.TypeVersion = adapterif.EventTypeVersion
-	header.RaisedTs = float32(raisedTs)
-	header.ReportedTs = float32(time.Now().UnixNano())
-	return &header
+
+	// raisedTs is in nanoseconds
+	timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
+	if err != nil {
+		return nil, err
+	}
+	header.RaisedTs = timestamp
+
+	timestamp, err = ptypes.TimestampProto(time.Now())
+	if err != nil {
+		return nil, err
+	}
+	header.ReportedTs = timestamp
+
+	return &header, nil
 }
 
 /* Send out device events*/
@@ -86,8 +103,11 @@
 	}
 	var event voltha.Event
 	var de voltha.Event_DeviceEvent
+	var err error
 	de.DeviceEvent = deviceEvent
-	event.Header = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs)
+	if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
+		return err
+	}
 	event.EventType = &de
 	if err := ep.sendEvent(&event); err != nil {
 		logger.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
@@ -110,8 +130,11 @@
 	}
 	var event voltha.Event
 	var de voltha.Event_KpiEvent2
+	var err error
 	de.KpiEvent2 = kpiEvent
-	event.Header = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs)
+	if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
+		return err
+	}
 	event.EventType = &de
 	if err := ep.sendEvent(&event); err != nil {
 		logger.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
index 78b8eb5..5b00887 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
@@ -658,3 +658,46 @@
 	}
 	return deviceId.Val, port, nil
 }
+
+func (rhp *RequestHandlerProxy) Child_device_lost(args []*ic.Argument) error {
+	if len(args) < 4 {
+		logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+		return errors.New("invalid-number-of-args")
+	}
+
+	pDeviceId := &ic.StrType{}
+	pPortNo := &ic.IntType{}
+	onuID := &ic.IntType{}
+	fromTopic := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "pDeviceId":
+			if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
+				logger.Warnw("cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+				return err
+			}
+		case "pPortNo":
+			if err := ptypes.UnmarshalAny(arg.Value, pPortNo); err != nil {
+				logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+				return err
+			}
+		case "onuID":
+			if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
+				logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return err
+			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return err
+			}
+		}
+	}
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(pDeviceId.Val, fromTopic.Val)
+	//Invoke the Child_device_lost API on the adapter
+	if err := rhp.adapter.Child_device_lost(pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
+		return status.Errorf(codes.NotFound, "%s", err.Error())
+	}
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
index d3c562a..b782ebe 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
@@ -17,6 +17,9 @@
 
 import (
 	"fmt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"google.golang.org/grpc/codes"
 	"math/rand"
 	"time"
 )
@@ -71,3 +74,17 @@
 	}
 	return string(b)
 }
+
+func ICProxyErrorCodeToGrpcErrorCode(icErr ic.ErrorCodeCodes) codes.Code {
+	switch icErr {
+	case ic.ErrorCode_INVALID_PARAMETERS:
+		return codes.InvalidArgument
+	case ic.ErrorCode_UNSUPPORTED_REQUEST:
+		return codes.Unavailable
+	case ic.ErrorCode_DEADLINE_EXCEEDED:
+		return codes.DeadlineExceeded
+	default:
+		log.Warnw("cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
+		return codes.Internal
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
index c0e44be..fc57247 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
@@ -51,4 +51,5 @@
 	Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
 	Enable_port(deviceId string, port *voltha.Port) error
 	Disable_port(deviceId string, port *voltha.Port) error
+	Child_device_lost(parentDeviceId string, parentPortNo uint32, onuID uint32) error
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 96829c5..04fe35d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -247,14 +247,14 @@
 }
 
 // CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
 	b.Lock()
 	defer b.Unlock()
 
 	formattedPath := b.makePath(key)
 	logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
 
-	return b.Client.Watch(ctx, formattedPath)
+	return b.Client.Watch(ctx, formattedPath, withPrefix)
 }
 
 // DeleteWatch stops watching events for the specified key
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index d30e049..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -81,7 +81,7 @@
 	ReleaseReservation(ctx context.Context, key string) error
 	ReleaseAllReservations(ctx context.Context) error
 	RenewReservation(ctx context.Context, key string) error
-	Watch(ctx context.Context, key string) chan *Event
+	Watch(ctx context.Context, key string, withPrefix bool) chan *Event
 	AcquireLock(ctx context.Context, lockName string, timeout int) error
 	ReleaseLock(lockName string) error
 	IsConnectionUp(ctx context.Context) bool // timeout in second
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index fdf39be..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -360,7 +360,7 @@
 
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
-func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 2d126f7..1014ada 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -298,10 +298,15 @@
 
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
-func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
 	w := v3Client.NewWatcher(c.ectdAPI)
 	ctx, cancel := context.WithCancel(ctx)
-	channel := w.Watch(ctx, key)
+	var channel v3Client.WatchChan
+	if withPrefix {
+		channel = w.Watch(ctx, key, v3Client.WithPrefix())
+	} else {
+		channel = w.Watch(ctx, key)
+	}
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
 	kp.doneOnce.Do(func() { close(kp.doneCh) })
 	// TODO : Perform cleanup
 	kp.kafkaClient.Stop()
-	//kp.deleteAllTopicRequestHandlerChannelMap()
-	//kp.deleteAllTopicResponseChannelMap()
-	//kp.deleteAllTransactionIdToChannelMap()
+	err := kp.deleteAllTopicRequestHandlerChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+	}
+	err = kp.deleteAllTopicResponseChannelMap()
+	if err != nil {
+		log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+	}
+	kp.deleteAllTransactionIdToChannelMap()
 }
 
 func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+	logger.Debug("delete-all-topic-response-channel")
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToResponseChannelMap {
 		// Unsubscribe to this topic first - this will close the subscribed channel
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToResponseChannelMap, topic)
 		}
-		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	logger.Debug("delete-all-topic-request-channel")
 	kp.lockTopicRequestHandlerChannelMap.Lock()
 	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
-	var err error
+	var unsubscribeFailTopics []string
 	for topic := range kp.topicToRequestHandlerChannelMap {
 		// Close the kafka client client first by unsubscribing to this topic
-		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+		if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
 			logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToRequestHandlerChannelMap, topic)
 		}
-		delete(kp.topicToRequestHandlerChannelMap, topic)
 	}
-	return err
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
 }
 
 func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
 
 // nolint: unused
 func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+	logger.Debug("delete-all-transaction-id-channel-map")
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	for key, value := range kp.transactionIdToChannelMap {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 69e22a4..3ebdd3a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -193,6 +193,22 @@
 	return 0, errors.New("Given LogLevel is invalid : " + l)
 }
 
+func LogLevelToString(l LogLevel) (string, error) {
+	switch l {
+	case DebugLevel:
+		return "DEBUG", nil
+	case InfoLevel:
+		return "INFO", nil
+	case WarnLevel:
+		return "WARN", nil
+	case ErrorLevel:
+		return "ERROR", nil
+	case FatalLevel:
+		return "FATAL", nil
+	}
+	return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
 func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
 	return zp.Config{
 		Level:            logLevelToAtomicLevel(level),