VOL-2631 Update to voltha-lib-go 3.0.14
Change-Id: I21ef41d49ad6bd42ad301962583d2aabdea7ccad
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 1882135..86f186d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -284,7 +284,7 @@
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
} else {
@@ -294,8 +294,8 @@
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
}
}
@@ -361,7 +361,7 @@
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
} else {
@@ -372,7 +372,7 @@
}
logger.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
// TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
}
}
@@ -421,7 +421,7 @@
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
} else {
@@ -432,13 +432,7 @@
}
logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- code := codes.Internal
-
- if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
- code = codes.DeadlineExceeded
- }
-
- return nil, status.Errorf(code, "%s", unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
}
}
@@ -463,7 +457,7 @@
volthaDevices := &voltha.Devices{}
if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevices, nil
} else {
@@ -474,13 +468,7 @@
}
logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- code := codes.Internal
-
- if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
- code = codes.DeadlineExceeded
- }
-
- return nil, status.Errorf(code, "%s", unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
}
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
index 034de8e..da9c9eb 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
@@ -23,6 +23,7 @@
"strings"
"time"
+ "github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -60,7 +61,11 @@
return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
}
-func (ep *EventProxy) getEventHeader(eventName string, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, eventType adapterif.EventType, raisedTs int64) *voltha.EventHeader {
+func (ep *EventProxy) getEventHeader(eventName string,
+ category adapterif.EventCategory,
+ subCategory adapterif.EventSubCategory,
+ eventType adapterif.EventType,
+ raisedTs int64) (*voltha.EventHeader, error) {
var header voltha.EventHeader
if strings.Contains(eventName, "_") {
eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
@@ -73,9 +78,21 @@
header.SubCategory = subCategory
header.Type = eventType
header.TypeVersion = adapterif.EventTypeVersion
- header.RaisedTs = float32(raisedTs)
- header.ReportedTs = float32(time.Now().UnixNano())
- return &header
+
+ // raisedTs is in nanoseconds
+ timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
+ if err != nil {
+ return nil, err
+ }
+ header.RaisedTs = timestamp
+
+ timestamp, err = ptypes.TimestampProto(time.Now())
+ if err != nil {
+ return nil, err
+ }
+ header.ReportedTs = timestamp
+
+ return &header, nil
}
/* Send out device events*/
@@ -86,8 +103,11 @@
}
var event voltha.Event
var de voltha.Event_DeviceEvent
+ var err error
de.DeviceEvent = deviceEvent
- event.Header = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs)
+ if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
+ return err
+ }
event.EventType = &de
if err := ep.sendEvent(&event); err != nil {
logger.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
@@ -110,8 +130,11 @@
}
var event voltha.Event
var de voltha.Event_KpiEvent2
+ var err error
de.KpiEvent2 = kpiEvent
- event.Header = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs)
+ if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
+ return err
+ }
event.EventType = &de
if err := ep.sendEvent(&event); err != nil {
logger.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
index 78b8eb5..5b00887 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
@@ -658,3 +658,46 @@
}
return deviceId.Val, port, nil
}
+
+func (rhp *RequestHandlerProxy) Child_device_lost(args []*ic.Argument) error {
+ if len(args) < 4 {
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ return errors.New("invalid-number-of-args")
+ }
+
+ pDeviceId := &ic.StrType{}
+ pPortNo := &ic.IntType{}
+ onuID := &ic.IntType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "pDeviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
+ logger.Warnw("cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ return err
+ }
+ case "pPortNo":
+ if err := ptypes.UnmarshalAny(arg.Value, pPortNo); err != nil {
+ logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+ return err
+ }
+ case "onuID":
+ if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(pDeviceId.Val, fromTopic.Val)
+ //Invoke the Child_device_lost API on the adapter
+ if err := rhp.adapter.Child_device_lost(pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
+ return status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
index d3c562a..b782ebe 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
@@ -17,6 +17,9 @@
import (
"fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "google.golang.org/grpc/codes"
"math/rand"
"time"
)
@@ -71,3 +74,17 @@
}
return string(b)
}
+
+func ICProxyErrorCodeToGrpcErrorCode(icErr ic.ErrorCodeCodes) codes.Code {
+ switch icErr {
+ case ic.ErrorCode_INVALID_PARAMETERS:
+ return codes.InvalidArgument
+ case ic.ErrorCode_UNSUPPORTED_REQUEST:
+ return codes.Unavailable
+ case ic.ErrorCode_DEADLINE_EXCEEDED:
+ return codes.DeadlineExceeded
+ default:
+ log.Warnw("cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
+ return codes.Internal
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
index c0e44be..fc57247 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
@@ -51,4 +51,5 @@
Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
Enable_port(deviceId string, port *voltha.Port) error
Disable_port(deviceId string, port *voltha.Port) error
+ Child_device_lost(parentDeviceId string, parentPortNo uint32, onuID uint32) error
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 96829c5..04fe35d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -247,14 +247,14 @@
}
// CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
- return b.Client.Watch(ctx, formattedPath)
+ return b.Client.Watch(ctx, formattedPath, withPrefix)
}
// DeleteWatch stops watching events for the specified key
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index d30e049..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -81,7 +81,7 @@
ReleaseReservation(ctx context.Context, key string) error
ReleaseAllReservations(ctx context.Context) error
RenewReservation(ctx context.Context, key string) error
- Watch(ctx context.Context, key string) chan *Event
+ Watch(ctx context.Context, key string, withPrefix bool) chan *Event
AcquireLock(ctx context.Context, lockName string, timeout int) error
ReleaseLock(lockName string) error
IsConnectionUp(ctx context.Context) bool // timeout in second
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index fdf39be..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -360,7 +360,7 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 2d126f7..1014ada 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -298,10 +298,15 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
ctx, cancel := context.WithCancel(ctx)
- channel := w.Watch(ctx, key)
+ var channel v3Client.WatchChan
+ if withPrefix {
+ channel = w.Watch(ctx, key, v3Client.WithPrefix())
+ } else {
+ channel = w.Watch(ctx, key)
+ }
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index aa77ffb..91b2143 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -189,9 +189,15 @@
kp.doneOnce.Do(func() { close(kp.doneCh) })
// TODO : Perform cleanup
kp.kafkaClient.Stop()
- //kp.deleteAllTopicRequestHandlerChannelMap()
- //kp.deleteAllTopicResponseChannelMap()
- //kp.deleteAllTransactionIdToChannelMap()
+ err := kp.deleteAllTopicRequestHandlerChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap()
+ if err != nil {
+ log.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap()
}
func (kp *interContainerProxy) GetDefaultTopic() *Topic {
@@ -409,17 +415,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+ logger.Debug("delete-all-topic-response-channel")
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToResponseChannelMap {
// Unsubscribe to this topic first - this will close the subscribed channel
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
}
- delete(kp.topicToResponseChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
@@ -447,17 +461,25 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+ logger.Debug("delete-all-topic-request-channel")
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var err error
+ var unsubscribeFailTopics []string
for topic := range kp.topicToRequestHandlerChannelMap {
// Close the kafka client client first by unsubscribing to this topic
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
}
- delete(kp.topicToRequestHandlerChannelMap, topic)
}
- return err
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
}
func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
@@ -491,6 +513,7 @@
// nolint: unused
func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+ logger.Debug("delete-all-transaction-id-channel-map")
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 69e22a4..3ebdd3a 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -193,6 +193,22 @@
return 0, errors.New("Given LogLevel is invalid : " + l)
}
+func LogLevelToString(l LogLevel) (string, error) {
+ switch l {
+ case DebugLevel:
+ return "DEBUG", nil
+ case InfoLevel:
+ return "INFO", nil
+ case WarnLevel:
+ return "WARN", nil
+ case ErrorLevel:
+ return "ERROR", nil
+ case FatalLevel:
+ return "FATAL", nil
+ }
+ return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
return zp.Config{
Level: logLevelToAtomicLevel(level),