VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/go.mod b/go.mod
index 909f179..08c3f9d 100644
--- a/go.mod
+++ b/go.mod
@@ -6,9 +6,11 @@
github.com/gogo/protobuf v1.3.0
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
- github.com/opencord/voltha-lib-go/v4 v4.0.7
- github.com/opencord/voltha-protos/v4 v4.0.8
+ github.com/opencord/voltha-lib-go/v4 v4.0.8
+ github.com/opencord/voltha-protos/v4 v4.0.12
+ github.com/opentracing/opentracing-go v1.1.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
+ github.com/uber/jaeger-client-go v2.23.1+incompatible
google.golang.org/grpc v1.24.0
)
diff --git a/go.sum b/go.sum
index 66c6d3a..5bf604f 100644
--- a/go.sum
+++ b/go.sum
@@ -195,10 +195,10 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v4 v4.0.7 h1:YDEvyNkdoDkmdwXmQY3ve2IsyJo39A28qNDl0OjXsjM=
-github.com/opencord/voltha-lib-go/v4 v4.0.7/go.mod h1:8NFUZz/mp4OvRmilBRhkLOUrw4G01ruSAVdzQu2ivPc=
-github.com/opencord/voltha-protos/v4 v4.0.8 h1:/P9IYuWPTp/aabS5n1fakQgHCFNzKjW1JHw3TOLAfKE=
-github.com/opencord/voltha-protos/v4 v4.0.8/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
+github.com/opencord/voltha-lib-go/v4 v4.0.8 h1:QgcbvmTG917U89FdjchCckvLprETR3A8qnd4BInvRnA=
+github.com/opencord/voltha-lib-go/v4 v4.0.8/go.mod h1:n4QSYuNICFuQCQuTHf4N8Gh4ymOqPNbijqCD+sgSgJ4=
+github.com/opencord/voltha-protos/v4 v4.0.12 h1:x8drb8inaUByjVLFbXSiQwRTU//dfde0MKIHyKb1JMw=
+github.com/opencord/voltha-protos/v4 v4.0.12/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 03abc67..6a29e16 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -37,6 +37,7 @@
defaultBanner = false
defaultDisplayVersionOnly = false
defaultCoreTopic = "rwcore"
+ defaultEventTopic = "voltha.events"
defaultRWCoreEndpoint = "rwcore"
defaultRWCoreKey = "pki/voltha.key"
defaultRWCoreCert = "pki/voltha.crt"
@@ -67,6 +68,7 @@
KVStoreAddress string
KVTxnKeyDelTime int
CoreTopic string
+ EventTopic string
LogLevel string
Banner bool
DisplayVersionOnly bool
@@ -99,6 +101,7 @@
KVStoreAddress: defaultKVStoreAddress,
KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
CoreTopic: defaultCoreTopic,
+ EventTopic: defaultEventTopic,
LogLevel: defaultLogLevel,
Banner: defaultBanner,
DisplayVersionOnly: defaultDisplayVersionOnly,
@@ -139,6 +142,9 @@
help = fmt.Sprintf("RW Core topic")
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", defaultCoreTopic, help)
+ help = fmt.Sprintf("RW Core Event topic")
+ flag.StringVar(&(cf.EventTopic), "event_topic", defaultEventTopic, help)
+
flag.Bool("in_competing_mode", false, "deprecated")
help = fmt.Sprintf("KV store type")
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 75865cc..503d294 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -24,6 +24,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device"
+ "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -67,12 +68,12 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val})
+ logger.Debugw(ctx, "register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val})
return rhp.adapterMgr.RegisterAdapter(ctx, adapter, deviceTypes)
}
@@ -91,17 +92,17 @@
switch arg.Key {
case "device_id":
if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "getDevice", log.Fields{"device-id": pID.Id, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "get-device", log.Fields{"device-id": pID.Id, "transaction-id": transactionID.Val})
// Get the device via the device manager
device, err := rhp.deviceMgr.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), pID)
@@ -125,19 +126,19 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "DeviceUpdate", log.Fields{"device-id": device.Id, "transactionID": transactionID.Val})
-
- if err := rhp.deviceMgr.UpdateDeviceUsingAdapterData(log.WithSpanFromContext(context.TODO(), ctx), device); err != nil {
+ logger.Debugw(ctx, "device-update", log.Fields{"device-id": device.Id, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeviceUpdate")
+ if err := rhp.deviceMgr.UpdateDeviceUsingAdapterData(rpcCtx, device); err != nil {
logger.Debugw(ctx, "unable-to-update-device-using-adapter-data", log.Fields{"error": err})
return nil, err
}
@@ -161,34 +162,34 @@
switch arg.Key {
case "device_id":
if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case "serial_number":
if err := ptypes.UnmarshalAny(arg.Value, serialNumber); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case "onu_id":
if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case "parent_port_no":
if err := ptypes.UnmarshalAny(arg.Value, parentPortNo); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": pID.Id, "args": args, "transactionID": transactionID.Val})
-
- return rhp.deviceMgr.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
+ logger.Debugw(ctx, "get-child-device", log.Fields{"parent-device-id": pID.Id, "args": args, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "GetChildDevice")
+ return rhp.deviceMgr.GetChildDevice(rpcCtx, pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
}
// GetChildDeviceWithProxyAddress returns details of child device with proxy address
@@ -210,14 +211,14 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress, "transactionID": transactionID.Val})
-
- return rhp.deviceMgr.GetChildDeviceWithProxyAddress(log.WithSpanFromContext(context.TODO(), ctx), proxyAddress)
+ logger.Debugw(ctx, "get-child-device-with-proxy-address", log.Fields{"proxy-address": proxyAddress, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "GetChildDeviceWithProxyAddress")
+ return rhp.deviceMgr.GetChildDeviceWithProxyAddress(rpcCtx, proxyAddress)
}
// GetPorts returns the ports information of the device based on the port type.
@@ -244,14 +245,15 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "GetPorts", log.Fields{"device-id": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "get-ports", log.Fields{"device-id": deviceID.Id, "port-type": pt.Val, "transaction-id": transactionID.Val})
- return rhp.deviceMgr.GetPorts(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, voltha.Port_PortType(pt.Val))
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "GetPorts")
+ return rhp.deviceMgr.GetPorts(rpcCtx, deviceID.Id, voltha.Port_PortType(pt.Val))
}
// GetChildDevices gets all the child device IDs from the device passed as parameter
@@ -268,19 +270,20 @@
switch arg.Key {
case "device_id":
if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-id", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "GetChildDevices", log.Fields{"device-id": pID.Id, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "get-child-devices", log.Fields{"device-id": pID.Id, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "GetChildDevices")
- return rhp.deviceMgr.GetAllChildDevices(log.WithSpanFromContext(context.TODO(), ctx), pID.Id)
+ return rhp.deviceMgr.GetAllChildDevices(rpcCtx, pID.Id)
}
// ChildDeviceDetected is invoked when a child device is detected. The following parameters are expected:
@@ -339,18 +342,19 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": pID.Id, "parentPortNo": portNo.Val,
- "deviceType": dt.Val, "channelID": chnlID.Val, "serialNumber": serialNumber.Val,
- "vendorID": vendorID.Val, "onuID": onuID.Val, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": pID.Id, "parent-port-no": portNo.Val,
+ "device-type": dt.Val, "channel-id": chnlID.Val, "serial-number": serialNumber.Val,
+ "vendor-id": vendorID.Val, "onu-id": onuID.Val, "transaction-id": transactionID.Val})
- device, err := rhp.deviceMgr.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildDeviceDetected")
+ device, err := rhp.deviceMgr.ChildDeviceDetected(rpcCtx, pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
if err != nil {
- logger.Debugw(ctx, "child-detection-failed", log.Fields{"parent-device-id": pID.Id, "onuID": onuID.Val, "error": err})
+ logger.Debugw(ctx, "child-detection-failed", log.Fields{"parent-device-id": pID.Id, "onu-id": onuID.Val, "error": err})
}
return device, err
}
@@ -385,15 +389,17 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus,
- "conn-status": connStatus, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "device-state-update", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.UpdateDeviceStatus(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeviceStateUpdate")
+
+ if err := rhp.deviceMgr.UpdateDeviceStatus(rpcCtx, deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
logger.Debugw(ctx, "unable-to-update-device-status", log.Fields{"error": err})
return nil, err
@@ -431,16 +437,16 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ChildrenStateUpdate", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus,
- "conn-status": connStatus, "transactionID": transactionID.Val})
-
+ logger.Debugw(ctx, "children-state-update", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus,
+ "conn-status": connStatus, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildrenStateUpdate")
// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
- if err := rhp.deviceMgr.UpdateChildrenStatus(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+ if err := rhp.deviceMgr.UpdateChildrenStatus(rpcCtx, deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
logger.Debugw(ctx, "unable-to-update-children-status", log.Fields{"error": err})
return nil, err
@@ -478,14 +484,14 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"device-id": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
-
- if err := rhp.deviceMgr.UpdatePortsState(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, uint32(portTypeFilter.Val), voltha.OperStatus_Types(operStatus.Val)); err != nil {
+ logger.Debugw(ctx, "ports-state-update", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PortsStateUpdate")
+ if err := rhp.deviceMgr.UpdatePortsState(rpcCtx, deviceID.Id, uint32(portTypeFilter.Val), voltha.OperStatus_Types(operStatus.Val)); err != nil {
logger.Debugw(ctx, "unable-to-update-ports-state", log.Fields{"error": err})
return nil, err
}
@@ -528,15 +534,17 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "PortStateUpdate", log.Fields{"device-id": deviceID.Id, "operStatus": operStatus,
- "portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "port-state-update", log.Fields{"device-id": deviceID.Id, "oper-status": operStatus,
+ "port-type": portType, "port-no": portNo, "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.UpdatePortState(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PortStateUpdate")
+
+ if err := rhp.deviceMgr.UpdatePortState(rpcCtx, deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
voltha.OperStatus_Types(operStatus.Val)); err != nil {
// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
// warning.
@@ -565,14 +573,14 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceID.Id, "transactionID": transactionID.Val})
-
- if err := rhp.deviceMgr.DeleteAllPorts(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id); err != nil {
+ logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": deviceID.Id, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeleteAllPorts")
+ if err := rhp.deviceMgr.DeleteAllPorts(rpcCtx, deviceID.Id); err != nil {
logger.Debugw(ctx, "unable-to-delete-ports", log.Fields{"error": err})
return nil, err
}
@@ -603,14 +611,15 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID.Id, "portNo": portNo.Val, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "get-device-port", log.Fields{"device-id": deviceID.Id, "port-no": portNo.Val, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "GetDevicePort")
- return rhp.deviceMgr.GetDevicePort(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, uint32(portNo.Val))
+ return rhp.deviceMgr.GetDevicePort(rpcCtx, deviceID.Id, uint32(portNo.Val))
}
// ListDevicePorts returns all ports belonging to the device
@@ -631,14 +640,14 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID.Id, "transactionID": transactionID.Val})
-
- return rhp.deviceMgr.ListDevicePorts(log.WithSpanFromContext(context.TODO(), ctx), deviceID)
+ logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": deviceID.Id, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ListDevicePorts")
+ return rhp.deviceMgr.ListDevicePorts(rpcCtx, deviceID)
}
// ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
@@ -660,14 +669,16 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"device-id": parentDeviceID.Id, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "child-devices-lost", log.Fields{"device-id": parentDeviceID.Id, "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.ChildDevicesLost(log.WithSpanFromContext(context.TODO(), ctx), parentDeviceID.Id); err != nil {
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildDevicesLost")
+
+ if err := rhp.deviceMgr.ChildDevicesLost(rpcCtx, parentDeviceID.Id); err != nil {
logger.Debugw(ctx, "unable-to-disable-child-devices", log.Fields{"error": err})
return nil, err
}
@@ -693,14 +704,15 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"parent-device-id": parentDeviceID.Id, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "child-devices-detected", log.Fields{"parent-device-id": parentDeviceID.Id, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ChildDevicesDetected")
- if err := rhp.deviceMgr.ChildDevicesDetected(log.WithSpanFromContext(context.TODO(), ctx), parentDeviceID.Id); err != nil {
+ if err := rhp.deviceMgr.ChildDevicesDetected(rpcCtx, parentDeviceID.Id); err != nil {
logger.Debugw(ctx, "child-devices-detection-failed", log.Fields{"parent-device-id": parentDeviceID.Id, "error": err})
return nil, err
}
@@ -731,14 +743,15 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "PortCreated", log.Fields{"device-id": deviceID.Id, "port": port, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "port-created", log.Fields{"device-id": deviceID.Id, "port": port, "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PortCreated")
- if err := rhp.deviceMgr.AddPort(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, port); err != nil {
+ if err := rhp.deviceMgr.AddPort(rpcCtx, deviceID.Id, port); err != nil {
logger.Debugw(ctx, "unable-to-add-port", log.Fields{"error": err})
return nil, err
}
@@ -763,15 +776,16 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"device-id": pmConfigs.Id, "configs": pmConfigs,
- "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "device-pm-config-update", log.Fields{"device-id": pmConfigs.Id, "configs": pmConfigs,
+ "transaction-id": transactionID.Val})
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DevicePMConfigUpdate")
- if err := rhp.deviceMgr.InitPmConfigs(log.WithSpanFromContext(context.TODO(), ctx), pmConfigs.Id, pmConfigs); err != nil {
+ if err := rhp.deviceMgr.InitPmConfigs(rpcCtx, pmConfigs.Id, pmConfigs); err != nil {
logger.Debugw(ctx, "unable-to-initialize-pm-configs", log.Fields{"error": err})
return nil, err
}
@@ -808,15 +822,17 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "PacketIn", log.Fields{"device-id": deviceID.Id, "port": portNo.Val, "packet": packet,
- "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "packet-in", log.Fields{"device-id": deviceID.Id, "port": portNo.Val, "packet": packet,
+ "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.PacketIn(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "PacketIn")
+
+ if err := rhp.deviceMgr.PacketIn(rpcCtx, deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
logger.Debugw(ctx, "unable-to-receive-packet-from-adapter", log.Fields{"error": err})
return nil, err
@@ -848,15 +864,17 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "UpdateImageDownload", log.Fields{"device-id": deviceID.Id, "image-download": img,
- "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "update-image-download", log.Fields{"device-id": deviceID.Id, "image-download": img,
+ "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.UpdateImageDownload(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, img); err != nil {
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "UpdateImageDownload")
+
+ if err := rhp.deviceMgr.UpdateImageDownload(rpcCtx, deviceID.Id, img); err != nil {
logger.Debugw(ctx, "unable-to-update-image-download", log.Fields{"error": err})
return nil, err
}
@@ -881,14 +899,16 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parent-device-id": parentDeviceID.Id, "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "reconcile-child-devices", log.Fields{"parent-device-id": parentDeviceID.Id, "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.ReconcileChildDevices(log.WithSpanFromContext(context.TODO(), ctx), parentDeviceID.Id); err != nil {
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "ReconcileChildDevices")
+
+ if err := rhp.deviceMgr.ReconcileChildDevices(rpcCtx, parentDeviceID.Id); err != nil {
logger.Debugw(ctx, "unable-to-reconcile-child-devices", log.Fields{"error": err})
return nil, err
}
@@ -898,7 +918,7 @@
// DeviceReasonUpdate updates device reason
func (rhp *AdapterRequestHandlerProxy) DeviceReasonUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
- logger.Warn(ctx, "DeviceReasonUpdate: invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "device-reason-update-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("DeviceReasonUpdate: invalid-number-of-args")
return nil, err
}
@@ -919,15 +939,17 @@
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-id", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"device-id": deviceID.Id, "reason": reason.Val,
- "transactionID": transactionID.Val})
+ logger.Debugw(ctx, "device-reason-update", log.Fields{"device-id": deviceID.Id, "reason": reason.Val,
+ "transaction-id": transactionID.Val})
- if err := rhp.deviceMgr.UpdateDeviceReason(log.WithSpanFromContext(context.TODO(), ctx), deviceID.Id, reason.Val); err != nil {
+ rpcCtx := utils.WithRPCMetadataContext(log.WithSpanFromContext(context.TODO(), ctx), "DeviceReasonUpdate")
+
+ if err := rhp.deviceMgr.UpdateDeviceReason(rpcCtx, deviceID.Id, reason.Val); err != nil {
logger.Debugw(ctx, "unable-to-update-device-reason", log.Fields{"error": err})
return nil, err
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 7436083..8766c7c 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -38,6 +38,7 @@
cm "github.com/opencord/voltha-go/rw_core/mocks"
tst "github.com/opencord/voltha-go/rw_core/test"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -59,6 +60,7 @@
adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
numONUPerOLT int
startingUNIPortNo int
@@ -81,6 +83,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-nbi-test"
@@ -94,6 +97,7 @@
defer cancel()
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = nb.defaultTimeout
cfg.DefaultCoreTimeout = nb.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
@@ -118,7 +122,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
nb.adapterMgr = adapter.NewAdapterManager(ctx, proxy, nb.coreInstanceID, nb.kClient)
- nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(nb.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
nb.adapterMgr.Start(ctx)
if err := nb.kmp.Start(ctx); err != nil {
@@ -140,6 +145,9 @@
if nb.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, nb.etcdServer)
}
+ if nb.kEventClient != nil {
+ nb.kEventClient.Stop(ctx)
+ }
}
func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi *NBIHandler) {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1a22267..4cd1e85 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,6 +26,7 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -70,7 +71,7 @@
defer close(core.stopped)
defer core.shutdown()
- logger.Info(ctx, "Starting RW Core components")
+ logger.Info(ctx, "starting-rw-core-components")
// setup kv client
logger.Debugw(ctx, "create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
@@ -90,7 +91,7 @@
// wait until connection to KV Store is up
if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
- logger.Fatal(ctx, "Unable-to-connect-to-KV-store")
+ logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
go monitorKVStoreLiveness(ctx, backend, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
@@ -109,7 +110,25 @@
kafka.ProducerRetryBackoff(time.Millisecond*30),
kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
)
- // defer kafkaClient.Stop()
+
+ // create kafka client for events
+ kafkaClientEvent := kafka.NewSaramaClient(
+ kafka.Address(cf.KafkaClusterAddress),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30),
+ kafka.AutoCreateTopic(true),
+ kafka.MetadatMaxRetries(15),
+ )
+ // create event proxy
+ eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
+ if err := kafkaClientEvent.Start(ctx); err != nil {
+ logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+ return
+ }
+
+ defer kafkaClientEvent.Stop(ctx)
// create kv path
dbPath := model.NewDBPath(backend)
@@ -122,7 +141,7 @@
// core.kmp must be created before deviceMgr and adapterMgr
kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
- logger.Warn(ctx, "Failed to setup kafka connection")
+ logger.Warn(ctx, "failed-to-setup-kafka-connection")
return
}
defer kmp.Stop(ctx)
@@ -130,7 +149,7 @@
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy)
// register kafka RPC handler
registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c122574..84f765f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -133,7 +133,7 @@
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
- logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+ logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -172,7 +172,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
+ logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
// Remove the device transient loader
if err := agent.deleteTransientState(ctx); err != nil {
return err
@@ -236,7 +236,7 @@
// TODO: Post failure message onto kafka
}
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
select {
@@ -283,19 +283,31 @@
}
-func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ //add failure
} else if rpcResponse.Err != nil {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+ //add failure
} else {
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
@@ -326,7 +338,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
oldDevice := agent.getDeviceReadOnlyWithoutLock()
if oldDevice.AdminState == voltha.AdminState_ENABLED {
@@ -360,6 +372,8 @@
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
@@ -376,16 +390,28 @@
func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
response.Error(status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
response.Error(rpcResponse.Err)
} else {
response.Done()
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
response.Error(ctx.Err())
}
}
@@ -451,7 +477,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
cloned := agent.cloneDeviceWithoutLock()
@@ -476,6 +502,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
@@ -491,13 +519,15 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
if agent.isDeletionInProgress() {
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
@@ -508,7 +538,7 @@
}
func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -521,27 +551,29 @@
agent.deviceID)
}
device := agent.cloneDeviceWithoutLock()
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
return err
}
previousAdminState := device.AdminState
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
return err
}
// Since it is a case of force delete, nothing needs to be done on adapter responses.
- go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+ go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
agent.onFailure)
}
return nil
}
func (agent *Agent) deleteDevice(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -561,14 +593,16 @@
// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ currentDeviceTransientState, previousDeviceTransientState); err != nil {
return err
}
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
@@ -578,7 +612,7 @@
}
return err
}
- go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+ go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
agent.onDeleteFailure)
}
return nil
@@ -588,7 +622,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+ logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
@@ -597,7 +631,7 @@
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
- logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -651,6 +685,8 @@
}
// Send packet to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
@@ -664,7 +700,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
@@ -686,14 +722,14 @@
cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.ConnectStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.OperStatus = operStatus
}
- logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"device-id": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+ logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
// Store the device
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
@@ -742,11 +778,13 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "simulateAlarm", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
@@ -778,10 +816,16 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
+
}
return nil
}
@@ -805,7 +849,7 @@
//Reverting TransientState update
err := agent.updateTransientState(ctx, prevTransientState)
logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
- "previousTransientState": prevTransientState, "currentTransientState": transientState})
+ "previous-transient-state": prevTransientState, "current-transient-state": transientState})
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
@@ -818,10 +862,14 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
-
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, transientState, prevTransientState); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
}
return nil
}
@@ -829,7 +877,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceReason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
@@ -837,7 +885,7 @@
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+ logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
// Remove the associated peer ports on the parent device
for portID := range agent.portLoader.ListIDs() {
@@ -861,6 +909,8 @@
//send request to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
if err != nil {
cancel()
@@ -907,12 +957,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+ logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
return testResp, nil
}
func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
- logger.Debugw(ctx, "getExtValue", log.Fields{"device-id": agent.deviceID, "onuid": valueparam.Id, "valuetype": valueparam.Value})
+ logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -938,12 +988,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+ logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
return Resp, nil
}
func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
- logger.Debugw(ctx, "setExtValue", log.Fields{"device-id": value.Id})
+ logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -965,12 +1015,12 @@
}
// Unmarshal and return the response
- logger.Debug(ctx, "setExtValue-Success-device-agent")
+ logger.Debug(ctx, "set-ext-value-success-device-agent")
return &empty.Empty{}, nil
}
func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
- logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
@@ -1004,7 +1054,7 @@
}
func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
- logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 8b8e1a7..f2fd10a 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -78,7 +78,7 @@
flowsToAdd = append(flowsToAdd, flow)
} else {
//No need to change the flow. It is already exist.
- logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
}
}
flowHandle.Unlock()
@@ -92,6 +92,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -151,6 +153,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -182,7 +186,7 @@
}
func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw(ctx, "updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
if (len(updatedFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
@@ -218,6 +222,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index e450972..9552b78 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -81,7 +81,7 @@
groupsToAdd = append(groupsToAdd, group)
} else {
//No need to change the group. It is already exist.
- logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+ logger.Debugw(ctx, "no-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
}
}
@@ -95,6 +95,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -153,6 +155,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -183,7 +187,7 @@
}
func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
if (len(updatedGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
@@ -216,6 +220,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index acebaca..eed1879 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -22,6 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -32,7 +33,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "downloadImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "download-image", log.Fields{"device-id": agent.deviceID})
if agent.device.Root {
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
@@ -62,6 +63,8 @@
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DownloadImage(subCtx, cloned, clonedImg)
if err != nil {
cancel()
@@ -87,7 +90,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "cancelImageDownload", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -108,6 +111,8 @@
return nil, err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
if err != nil {
cancel()
@@ -123,7 +128,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "activateImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -158,6 +163,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
@@ -174,7 +181,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "revertImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "revert-image", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -195,6 +202,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
@@ -206,7 +215,7 @@
}
func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw(ctx, "getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
@@ -261,7 +270,7 @@
}
func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw(ctx, "getImageDownload", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -276,7 +285,7 @@
}
func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- logger.Debugw(ctx, "listImageDownloads", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -288,7 +297,7 @@
// onImageFailure brings back the device to Enabled state and sets the image to image download_failed.
func (agent *Agent) onImageFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
if res, ok := response.(error); ok {
@@ -338,7 +347,7 @@
// onImageSuccess brings back the device to Enabled state and sets the image to image download_failed.
func (agent *Agent) onImageSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
logger.Errorw(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 0640788..d726115 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -20,6 +20,7 @@
"context"
"github.com/gogo/protobuf/proto"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -30,7 +31,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
+ logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -40,6 +41,8 @@
}
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
if err != nil {
cancel()
@@ -53,7 +56,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
+ logger.Debugw(ctx, "init-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -61,7 +64,7 @@
}
func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
- logger.Debugw(ctx, "listPmConfigs", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "list-pm-configs", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index a3489fc..6e53d16 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -22,6 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/core/device/port"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -43,7 +44,7 @@
// getPorts retrieves the ports information of the device based on the port type.
func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+ logger.Debugw(ctx, "get-ports", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
for _, port := range agent.listDevicePorts() {
if port.Type == portType {
@@ -63,7 +64,7 @@
}
func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "update-ports-oper-state", log.Fields{"device-id": agent.deviceID})
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
@@ -79,12 +80,14 @@
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
go func(portID uint32, ctx context.Context) {
if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
// TODO: VOL-2707
logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
}
- }(portID, log.WithSpanFromContext(context.Background(), ctx))
+ }(portID, subCtx)
}
}
portHandle.Unlock()
@@ -116,7 +119,7 @@
}
func (agent *Agent) deleteAllPorts(ctx context.Context) error {
- logger.Debugw(ctx, "deleteAllPorts", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -159,7 +162,7 @@
oldPort := portHandle.GetReadOnly()
if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
- logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+ logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
return nil
}
@@ -218,7 +221,7 @@
}
func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
- logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
portHandle, have := agent.portLoader.Lock(portID)
if !have {
@@ -244,6 +247,8 @@
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
cancel()
@@ -254,7 +259,7 @@
}
func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
- logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
portHandle, have := agent.portLoader.Lock(portID)
if !have {
@@ -280,6 +285,8 @@
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
cancel()
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 3d06a68..6947447 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -35,6 +35,7 @@
tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -52,6 +53,7 @@
adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
oltAdapter *cm.OLTAdapter
onuAdapter *cm.ONUAdapter
@@ -75,6 +77,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -116,6 +119,7 @@
func (dat *DATest) startCore(ctx context.Context) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
grpcPort, err := freeport.GetFreePort()
@@ -138,8 +142,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
-
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
@@ -161,6 +165,9 @@
if dat.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
}
+ if dat.kEventClient != nil {
+ dat.kEventClient.Stop(ctx)
+ }
}
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 21d535f..4489196 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -20,12 +20,18 @@
"context"
"encoding/binary"
"encoding/hex"
- "sync"
-
+ "fmt"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opentracing/opentracing-go"
+ jtracing "github.com/uber/jaeger-client-go"
+ "sync"
+ "time"
)
type Manager struct {
@@ -33,21 +39,34 @@
packetInQueueDone chan bool
changeEventQueue chan openflow_13.ChangeEvent
changeEventQueueDone chan bool
+ RPCEventManager *RPCEventManager
}
-func NewManager() *Manager {
+type RPCEventManager struct {
+ eventProxy eventif.EventProxy
+ coreInstanceID string
+}
+
+func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *Manager {
return &Manager{
packetInQueue: make(chan openflow_13.PacketIn, 100),
packetInQueueDone: make(chan bool, 1),
changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
changeEventQueueDone: make(chan bool, 1),
+ RPCEventManager: NewRPCEventManager(proxyForRPCEvents, instanceID),
}
}
+func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *RPCEventManager {
+ return &RPCEventManager{
+ eventProxy: proxyForRPCEvents,
+ coreInstanceID: instanceID,
+ }
+}
func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
// TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
- logger.Debugw(ctx, "SendPacketIn", log.Fields{"packetIn": packetIn})
+ logger.Debugw(ctx, "send-packet-in", log.Fields{"packet-in": packetIn})
q.packetInQueue <- packetIn
}
@@ -66,9 +85,9 @@
defer streamingTracker.Unlock()
if _, ok := streamingTracker.calls[method]; ok {
// bail out the other packet in thread
- logger.Debugf(ctx, "%s streaming call already running. Exiting it", method)
+ logger.Debugf(ctx, "%s-streaming-call-already-running-exiting-it", method)
done <- true
- logger.Debugf(ctx, "Last %s exited. Continuing ...", method)
+ logger.Debugf(ctx, "last-%s-exited-continuing", method)
} else {
streamingTracker.calls[method] = &callTracker{failedPacket: nil}
}
@@ -79,10 +98,10 @@
if tracker.failedPacket != nil {
switch tracker.failedPacket.(type) {
case openflow_13.PacketIn:
- logger.Debug(ctx, "Enqueueing last failed packetIn")
+ logger.Debug(ctx, "enqueueing-last-failed-packet-in")
q.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
case openflow_13.ChangeEvent:
- logger.Debug(ctx, "Enqueueing last failed changeEvent")
+ logger.Debug(ctx, "enqueueing-last-failed-change-event")
q.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
}
}
@@ -92,8 +111,9 @@
// ReceivePacketsIn receives packets from adapter
func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
ctx := context.Background()
+ ctx = utils.WithRPCMetadataContext(ctx, "ReceivePacketsIn")
var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
- logger.Debugw(ctx, "ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+ logger.Debugw(ctx, "receive-packets-in-request", log.Fields{"packets-in": packetsIn})
err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
@@ -109,6 +129,9 @@
})
if err := packetsIn.Send(&packet); err != nil {
logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
+ go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+ nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
// save the last failed packet in
streamingTracker.failedPacket = packet
} else {
@@ -118,7 +141,7 @@
}
}
case <-q.packetInQueueDone:
- logger.Debug(ctx, "Another ReceivePacketsIn running. Bailing out ...")
+ logger.Debug(ctx, "another-receive-packets-in-running-bailing-out")
break loop
}
}
@@ -128,7 +151,7 @@
}
func (q *Manager) SendChangeEvent(ctx context.Context, deviceID string, reason openflow_13.OfpPortReason, desc *openflow_13.OfpPort) {
- logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
+ logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
q.changeEventQueue <- openflow_13.ChangeEvent{
Id: deviceID,
Event: &openflow_13.ChangeEvent_PortStatus{
@@ -141,8 +164,8 @@
}
func (q *Manager) SendFlowChangeEvent(ctx context.Context, deviceID string, res []error, xid uint32, flowCookie uint64) {
- logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID,
- "flowId": xid, "flowCookie": flowCookie, "errors": res})
+ logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID,
+ "flow-id": xid, "flow-cookie": flowCookie, "errors": res})
errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
//Manually creating the data payload for the flow error message
bs := make([]byte, 2)
@@ -179,8 +202,9 @@
// ReceiveChangeEvents receives change in events
func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
ctx := context.Background()
+ ctx = utils.WithRPCMetadataContext(ctx, "ReceiveChangeEvents")
var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
- logger.Debugw(ctx, "ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+ logger.Debugw(ctx, "receive-change-events-request", log.Fields{"change-events": changeEvents})
err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
@@ -195,7 +219,10 @@
logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
if err := changeEvents.Send(&event); err != nil {
logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
- // save last failed changeevent
+ go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+ nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
+ time.Now().UnixNano())
+ // save last failed change event
streamingTracker.failedPacket = event
} else {
if streamingTracker.failedPacket != nil {
@@ -204,7 +231,7 @@
}
}
case <-q.changeEventQueueDone:
- logger.Debug(ctx, "Another ReceiveChangeEvents already running. Bailing out ...")
+ logger.Debug(ctx, "another-receive-change-events-already-running-bailing-out")
break loop
}
}
@@ -215,3 +242,44 @@
func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
return q.changeEventQueue
}
+
+func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+ logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
+ var opID string
+ var rpc string
+
+ if span := opentracing.SpanFromContext(ctx); span != nil {
+ if jSpan, ok := span.(*jtracing.Span); ok {
+ opID = fmt.Sprintf("%016x", jSpan.SpanContext().TraceID().Low) // Using Sprintf to avoid removal of leading 0s
+ }
+ }
+ rpc = utils.GetRPCMetadataFromContext(ctx)
+ rpcev := &voltha.RPCEvent{
+ Rpc: rpc,
+ OperationId: opID,
+ ResourceId: resourceID,
+ Service: q.coreInstanceID,
+ Status: &common.OperationResp{
+ Code: common.OperationResp_OPERATION_FAILURE,
+ },
+ Description: desc,
+ Context: context,
+ }
+ return rpcev
+}
+
+func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ if rpcEvent.Rpc != "" {
+ _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ }
+}
+
+func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+ id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ if rpcEvent.Rpc != "" {
+ _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ }
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 6b0b6f7..7ab00da 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -92,7 +92,7 @@
return nil
}
- logger.Infow(ctx, "starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+ logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
var startSucceeded bool
defer func() {
@@ -128,13 +128,14 @@
logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
- logger.Debugw(ctx, "logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
+ logger.Debugw(ctx, "logical-device-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
- err := agent.setupLogicalPorts(log.WithSpanFromContext(context.Background(), ctx))
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.setupLogicalPorts(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
}
@@ -169,7 +170,8 @@
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromDB {
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -183,7 +185,7 @@
func (agent *LogicalAgent) stop(ctx context.Context) error {
var returnErr error
agent.stopOnce.Do(func() {
- logger.Info(ctx, "stopping-logical_device-agent")
+ logger.Info(ctx, "stopping-logical-device-agent")
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
@@ -202,14 +204,14 @@
if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
- logger.Debugw(ctx, "logicaldevice-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Debugw(ctx, "logical-device-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
}
// TODO: remove all entries from all loaders
// TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
- logger.Info(ctx, "logical_device-agent-stopped")
+ logger.Info(ctx, "logical-device-agent-stopped")
})
return returnErr
}
@@ -224,7 +226,7 @@
}
func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+ logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules, "flow-metadata": flowMetadata})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -232,6 +234,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -260,6 +264,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -286,6 +292,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw(ctx, "flow-update-failed", log.Fields{"device-id": deviceId, "error": err})
@@ -313,6 +321,8 @@
logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
@@ -331,15 +341,15 @@
func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
if logger.V(log.InfoLevel) {
logger.Infow(ctx, "packet-out", log.Fields{
- "packet": hex.EncodeToString(packet.Data),
- "inPort": packet.GetInPort(),
+ "packet": hex.EncodeToString(packet.Data),
+ "in-port": packet.GetInPort(),
})
}
outPort := fu.GetPacketOutPort(packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
- logger.Error(ctx, "packetout-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
+ logger.Error(ctx, "packet-out-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
}
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6b8abef..2b88abc 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -21,6 +21,7 @@
"errors"
"fmt"
"strconv"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/route"
@@ -48,7 +49,7 @@
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "UpdateFlowTable")
+ logger.Debug(ctx, "update-flow-table")
if flow == nil {
return nil
}
@@ -72,19 +73,19 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
+ logger.Debugw(ctx, "flow-add", log.Fields{"flow": mod})
if mod == nil {
return nil
}
flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
if err != nil {
- logger.Errorw(ctx, "flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-add-failed", log.Fields{"flow-mod": mod, "err": err})
return err
}
var updated bool
var changed bool
if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
- logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flow-mod": mod, "err": err})
return err
}
if changed && !updated {
@@ -115,7 +116,7 @@
// TODO: this currently does nothing
if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
// TODO: should this error be notified other than being logged?
- logger.Warnw(ctx, "overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
} else {
// Add flow
changed = true
@@ -141,7 +142,7 @@
flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
if err != nil {
- logger.Error(ctx, "Meter-referred-in-flow-not-present")
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
return changed, updated, err
}
@@ -178,8 +179,10 @@
"flow": flow,
"groups": groups,
})
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
// Revert added flows
- if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+ if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
"error": err,
"logical-device-id": agent.logicalDeviceID,
@@ -189,6 +192,13 @@
}
// send event
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
}
@@ -198,7 +208,7 @@
// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
// will be reverted, both from the logical devices and the devices.
func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+ logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
if !have {
@@ -243,7 +253,7 @@
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "flowDelete")
+ logger.Debug(ctx, "flow-delete")
mod := flowUpdate.FlowMod
if mod == nil {
return nil
@@ -274,7 +284,7 @@
//Delete the matched flows
if len(toDelete) > 0 {
- logger.Debugw(ctx, "flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+ logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
for _, flow := range toDelete {
if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
@@ -299,7 +309,7 @@
metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
if err != nil { // This should never happen
- logger.Error(ctx, "Meter-referred-in-flows-not-present")
+ logger.Error(ctx, "meter-referred-in-flows-not-present")
return err
}
@@ -336,7 +346,13 @@
go func() {
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
// TODO: Revert the flow deletion
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -350,7 +366,7 @@
//flowDeleteStrict deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
+ logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
if mod == nil {
return nil
}
@@ -359,10 +375,10 @@
if err != nil {
return err
}
- logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+ logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
flowHandle, have := agent.flowLoader.Lock(flow.Id)
if !have {
- logger.Debugw(ctx, "Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+ logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
return nil
}
defer flowHandle.Unlock()
@@ -421,6 +437,14 @@
// TODO: Revert flow changes
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ // Create context and send extra information as part of it.
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
@@ -448,7 +472,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
- logger.Infow(ctx, "Delete-flows-matching-meter", log.Fields{"meter": meterID})
+ logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
@@ -466,7 +490,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
- logger.Infow(ctx, "Delete-flows-matching-group", log.Fields{"groupID": groupID})
+ logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 7a9f91c..b7bf1b6 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "time"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
@@ -44,7 +45,7 @@
//updateGroupTable updates the group table of that logical device
func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug(ctx, "updateGroupTable")
+ logger.Debug(ctx, "update-group-table")
if groupMod == nil {
return nil
}
@@ -64,7 +65,7 @@
if groupMod == nil {
return nil
}
- logger.Debugw(ctx, "groupAdd", log.Fields{"GroupId": groupMod.GroupId})
+ logger.Debugw(ctx, "group-add", log.Fields{"group-id": groupMod.GroupId})
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
@@ -83,7 +84,7 @@
deviceRules := fu.NewDeviceRules()
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
- logger.Debugw(ctx, "rules", log.Fields{"rules for group-add": deviceRules.String()})
+ logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-add": deviceRules.String()})
// Update the devices
respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
@@ -92,6 +93,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
@@ -99,7 +107,7 @@
}
func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debugw(ctx, "groupDelete", log.Fields{"groupMod": groupMod})
+ logger.Debugw(ctx, "group-delete", log.Fields{"group-mod": groupMod})
if groupMod == nil {
return nil
}
@@ -135,7 +143,7 @@
}
if len(affectedGroups) == 0 {
- logger.Debugw(ctx, "no-group-to-delete", log.Fields{"groupId": groupMod.GroupId})
+ logger.Debugw(ctx, "no-group-to-delete", log.Fields{"group-id": groupMod.GroupId})
return nil
}
@@ -167,6 +175,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
@@ -174,7 +189,7 @@
}
func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug(ctx, "groupModify")
+ logger.Debug(ctx, "group-modify")
if groupMod == nil {
return nil
}
@@ -199,7 +214,7 @@
//update KV
if err := groupHandle.Update(ctx, groupEntry); err != nil {
- logger.Errorw(ctx, "Cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Errorw(ctx, "cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
@@ -210,6 +225,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index ab1b82b..05a5108 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -32,7 +32,7 @@
// listLogicalDevicePorts returns logical device ports
func (agent *LogicalAgent) listLogicalDevicePorts(ctx context.Context) map[uint32]*voltha.LogicalPort {
- logger.Debug(ctx, "listLogicalDevicePorts")
+ logger.Debug(ctx, "list-logical-device-ports")
portIDs := agent.portLoader.ListIDs()
ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
for portID := range portIDs {
@@ -45,7 +45,7 @@
}
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "updateLogicalPort", log.Fields{"device-id": device.Id, "port": port})
+ logger.Debugw(ctx, "update-logical-port", log.Fields{"device-id": device.Id, "port": port})
switch port.Type {
case voltha.Port_ETHERNET_NNI:
if err := agent.addNNILogicalPort(ctx, device.Id, devicePorts, port); err != nil {
@@ -58,7 +58,9 @@
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
+ if err := agent.buildRoutes(subCtx); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -67,7 +69,8 @@
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err := agent.updateAllRoutes(log.WithSpanFromContext(context.Background(), ctx), device.Id, devicePorts); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.updateAllRoutes(subCtx, device.Id, devicePorts); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -82,10 +85,10 @@
// added to it. While the logical device was being created we could have received requests to add
// NNI and UNI ports which were discarded. Now is the time to add them if needed
func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
- logger.Infow(ctx, "setupLogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
// First add any NNI ports which could have been missing
if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
- logger.Errorw(ctx, "error-setting-up-NNI-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
+ logger.Errorw(ctx, "error-setting-up-nni-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
return err
}
@@ -99,21 +102,22 @@
for _, child := range children.Items {
response := coreutils.NewResponse()
responses = append(responses, response)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
go func(ctx context.Context, child *voltha.Device) {
defer response.Done()
childPorts, err := agent.deviceMgr.listDevicePorts(ctx, child.Id)
if err != nil {
- logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+ logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
return
}
if err = agent.setupUNILogicalPorts(ctx, child, childPorts); err != nil {
- logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+ logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- }(log.WithSpanFromContext(context.Background(), ctx), child)
+ }(subCtx, child)
}
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
@@ -124,7 +128,7 @@
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
- logger.Infow(ctx, "setupNNILogicalPorts-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-nni-logical-ports-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
devicePorts, err := agent.deviceMgr.listDevicePorts(ctx, deviceID)
@@ -137,7 +141,7 @@
for _, port := range devicePorts {
if port.Type == voltha.Port_ETHERNET_NNI {
if err = agent.addNNILogicalPort(ctx, deviceID, devicePorts, port); err != nil {
- logger.Errorw(ctx, "error-adding-NNI-port", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-adding-nni-port", log.Fields{"error": err})
}
}
}
@@ -146,7 +150,7 @@
// updatePortState updates the port state of the device
func (agent *LogicalAgent) updatePortState(ctx context.Context, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Infow(ctx, "updatePortState-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
+ logger.Infow(ctx, "update-port-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "port-no": portNo, "state": operStatus})
portHandle, have := agent.portLoader.Lock(portNo)
if !have {
@@ -179,14 +183,14 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Infow(ctx, "setupUNILogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
//Get UNI port number
for _, port := range childDevicePorts {
if port.Type == voltha.Port_ETHERNET_UNI {
if err = agent.addUNILogicalPort(ctx, childDevice.Id, childDevice.AdminState, childDevice.OperStatus, childDevicePorts, port); err != nil {
- logger.Errorw(ctx, "error-adding-UNI-port", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-adding-uni-port", log.Fields{"error": err})
}
}
}
@@ -195,7 +199,7 @@
// deleteAllLogicalPorts deletes all logical ports associated with this logical device
func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
- logger.Infow(ctx, "updatePortsState-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "update-ports-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
// for each port
for portID := range agent.portLoader.ListIDs() {
@@ -215,7 +219,8 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warnw(ctx, "device-routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -245,7 +250,8 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -301,7 +307,7 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "addNNILogicalPort", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
+ logger.Debugw(ctx, "add-nni-logical-port", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
label := fmt.Sprintf("nni-%d", port.PortNo)
ofpPort := *port.OfpPort
@@ -334,7 +340,8 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(log.WithSpanFromContext(context.Background(), ctx), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.updateRoutes(subCtx, deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
@@ -351,7 +358,7 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, deviceID string, deviceAdminState voltha.AdminState_Types, deviceOperStatus voltha.OperStatus_Types, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "addUNILogicalPort", log.Fields{"port": port})
+ logger.Debugw(ctx, "add-uni-logical-port", log.Fields{"port": port})
if deviceAdminState != voltha.AdminState_ENABLED || deviceOperStatus != voltha.OperStatus_ACTIVE {
logger.Infow(ctx, "device-not-ready", log.Fields{"device-id": deviceID, "admin": deviceAdminState, "oper": deviceOperStatus})
return nil
@@ -384,16 +391,16 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
- ctx = log.WithSpanFromContext(context.Background(), ctx)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
// First setup the routes
- if err := agent.updateRoutes(ctx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(subCtx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
}
// send event, and allow any queued events to be sent as well
- queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
+ queuePosition.send(subCtx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
}()
return nil
}
@@ -414,7 +421,8 @@
// send is a convenience to avoid calling both assignQueuePosition and qp.send
func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
qp := e.assignQueuePosition()
- go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ go qp.send(subCtx, agent, deviceID, reason, desc)
}
// sendCompletion will make sure that given channel is notified when queue is empty
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index dcd6691..71ecf26 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -30,6 +30,7 @@
tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -46,6 +47,7 @@
kmp kafka.InterContainerProxy
logicalDeviceMgr *LogicalManager
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
oltAdapterName string
onuAdapterName string
@@ -68,6 +70,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -135,6 +138,7 @@
func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
grpcPort, err := freeport.GetFreePort()
@@ -157,8 +161,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
-
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
if err = lda.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
}
@@ -175,6 +179,9 @@
if lda.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
}
+ if lda.kEventClient != nil {
+ lda.kEventClient.Stop(ctx)
+ }
}
func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 2fca7c5..9ea12fa 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -67,7 +67,7 @@
// This can happen when an agent for the logical device has been created but the logical device
// itself is not ready for action as it is waiting for switch and port capabilities from the
// relevant adapter. In such a case prevent any request aimed at that logical device.
- logger.Debugf(ctx, "Logical device %s is not ready to serve requests", logicalDeviceID)
+ logger.Debugf(ctx, "logical-device-%s-is-not-ready-to-serve-requests", logicalDeviceID)
return nil
}
return lda
@@ -88,7 +88,8 @@
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
- logger.Debugw(ctx, "getlogicalDevice", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevice")
+ logger.Debugw(ctx, "get-logical-device", log.Fields{"logical-device-id": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.GetLogicalDeviceReadOnly(ctx)
}
@@ -97,7 +98,8 @@
//ListLogicalDevices returns the list of all logical devices
func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
- logger.Debug(ctx, "ListAllLogicalDevices")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevices")
+ logger.Debug(ctx, "list-all-logical-devices")
var logicalDevices []*voltha.LogicalDevice
if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
@@ -139,7 +141,8 @@
go func() {
//TODO: either wait for the agent to be started before returning, or
// implement locks in the agent to ensure request are not processed before start() is complete
- err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
+ ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.start(ldCtx, false)
if err != nil {
logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
ldMgr.deleteLogicalDeviceAgent(id)
@@ -205,7 +208,7 @@
}
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
} else {
- logger.Debugw(ctx, "logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
+ logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
}
// announce completion of task to any number of waiting channels
ldMgr.logicalDevicesLoadingLock.Lock()
@@ -281,7 +284,8 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlows", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlows")
+ logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -298,7 +302,8 @@
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlowGroups", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlowGroups")
+ logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -315,7 +320,8 @@
// ListLogicalDevicePorts returns logical device ports
func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
- logger.Debugw(ctx, "ListLogicalDevicePorts", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevicePorts")
+ logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -332,6 +338,7 @@
// GetLogicalDevicePort returns logical device port details
func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevicePort")
// Get the logical device where this port is attached
agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
if agent == nil {
@@ -382,7 +389,7 @@
}
func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"child-device-id": childDevice.Id, "parent-device-id": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -392,7 +399,7 @@
parentID := childDevice.ParentId
logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": logDeviceID, "parentId": parentID})
if parentID == "" || logDeviceID == "" {
return errors.New("device-in-invalid-state")
@@ -407,7 +414,7 @@
}
func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "deleteAllLogicalPorts", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": device.Id})
var ldID *string
var err error
@@ -425,7 +432,7 @@
}
func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortState", log.Fields{"device-id": deviceID, "state": state, "portNo": portNo})
+ logger.Debugw(ctx, "update-Port-state", log.Fields{"device-id": deviceID, "state": state, "port-no": portNo})
var ldID *string
var err error
@@ -444,7 +451,8 @@
// UpdateLogicalDeviceFlowTable updates logical device flow table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowTable")
+ logger.Debugw(ctx, "update-logical-device-flow-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -454,7 +462,8 @@
// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceMeterTable", log.Fields{"logical-device-id": meter.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceMeterTable")
+ logger.Debugw(ctx, "update-logical-device-meter-table", log.Fields{"logical-device-id": meter.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
@@ -464,7 +473,8 @@
// ListLogicalDeviceMeters returns logical device meters
func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
- logger.Debugw(ctx, "ListLogicalDeviceMeters", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceMeters")
+ logger.Debugw(ctx, "list-logical-device-meters", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -480,7 +490,8 @@
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateGroupTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowGroupTable")
+ logger.Debugw(ctx, "update-group-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -490,7 +501,8 @@
// EnableLogicalDevicePort enables logical device port
func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "EnableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "EnableLogicalDevicePort")
+ logger.Debugw(ctx, "enable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -504,7 +516,8 @@
// DisableLogicalDevicePort disables logical device port
func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "DisableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "DisableLogicalDevicePort")
+ logger.Debugw(ctx, "disable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -517,7 +530,7 @@
}
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
- logger.Debugw(ctx, "packetIn", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
+ logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
agent.packetIn(ctx, port, transactionID, packet)
} else {
@@ -529,35 +542,45 @@
// StreamPacketsOut sends packets to adapter
func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
ctx := context.Background()
- logger.Debugw(ctx, "StreamPacketsOut-request", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request", log.Fields{"packets": packets})
+
loop:
for {
select {
case <-packets.Context().Done():
- logger.Infow(ctx, "StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+ logger.Infow(ctx, "stream-packets-out-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
break loop
default:
}
packet, err := packets.Recv()
+ pktCtx := utils.WithRPCMetadataContext(packets.Context(), "StreamPacketsOut")
+
if err == io.EOF {
- logger.Debugw(ctx, "Received-EOF", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "received-eof", log.Fields{"packets": packets})
break loop
}
-
if err != nil {
- logger.Errorw(ctx, "Failed to receive packet out", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
+ go ldMgr.SendRPCEvent(pktCtx, packet.Id, err.Error(), nil,
+ "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
continue
}
- if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
- agent.packetOut(packets.Context(), packet.PacketOut)
+ if agent := ldMgr.getLogicalDeviceAgent(pktCtx, packet.Id); agent != nil {
+ agent.packetOut(pktCtx, packet.PacketOut)
} else {
- logger.Errorf(ctx, "No logical device agent present", log.Fields{"logical-device-id": packet.Id})
+ logger.Errorf(ctx, "no-logical-device-agent-present", log.Fields{"logical-device-id": packet.Id})
}
}
- logger.Debugw(ctx, "StreamPacketsOut-request-done", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request-done", log.Fields{"packets": packets})
return nil
}
+
+func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+ id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+ category, subCategory, raisedTs)
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index f92f2e5..cbc6bb9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -29,6 +29,7 @@
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/core/device/state"
"github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/common"
@@ -42,10 +43,11 @@
// Manager represent device manager attributes
type Manager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- adapterProxy *remote.AdapterProxy
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ adapterProxy *remote.AdapterProxy
+ *event.RPCEventManager
adapterMgr *adapter.Manager
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
@@ -59,7 +61,7 @@
}
//NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
@@ -69,12 +71,13 @@
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout,
+ RPCEventManager: event.NewRPCEventManager(eventProxy, coreInstanceID),
deviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
logicalDeviceMgr := &LogicalManager{
- Manager: event.NewManager(),
+ Manager: event.NewManager(eventProxy, coreInstanceID),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
dbPath: dbPath,
@@ -141,21 +144,22 @@
// CreateDevice creates a new parent device in the data model
func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
- logger.Errorf(ctx, "No Device Info Present")
+ logger.Errorf(ctx, "no-device-info-present")
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
+ ctx = utils.WithRPCMetadataContext(ctx, "CreateDevice")
logger.Debugw(ctx, "create-device", log.Fields{"device": *device})
deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
- logger.Errorf(ctx, "Failed to fetch parent device info")
+ logger.Errorf(ctx, "failed-to-fetch-parent-device-info")
return nil, err
}
if deviceExist {
- logger.Errorf(ctx, "Device is Pre-provisioned already with same IP-Port or MAC Address")
+ logger.Errorf(ctx, "device-is-pre-provisioned-already-with-same-ip-port-or-mac-address")
return nil, errors.New("device is already pre-provisioned")
}
- logger.Debugw(ctx, "CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+ logger.Debugw(ctx, "create-device", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Ensure this device is set as root
device.Root = true
@@ -163,7 +167,7 @@
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
- logger.Errorw(ctx, "Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
+ logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
return nil, err
}
dMgr.addDeviceAgentToMap(agent)
@@ -172,9 +176,9 @@
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "EnableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "EnableDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "enable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -184,9 +188,9 @@
// DisableDevice disables a device along with any child device it may have
func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DisableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "disable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -196,9 +200,9 @@
//RebootDevice invoked the reboot API to the corresponding adapter
func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "RebootDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -208,8 +212,9 @@
// DeleteDevice removes a device from the data model
func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
- logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -219,8 +224,9 @@
// ForceDeleteDevice removes a device from the data model forcefully without successfully waiting for the adapters.
func (dMgr *Manager) ForceDeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ForceDeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
- logger.Debugw(ctx, "ForceDeleteDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "force-delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -230,7 +236,7 @@
// GetDevicePort returns the port details for a specific device port entry
func (dMgr *Manager) GetDevicePort(ctx context.Context, deviceID string, portID uint32) (*voltha.Port, error) {
- logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "get-device-port", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", deviceID)
@@ -240,9 +246,9 @@
// ListDevicePorts returns the ports details for a specific device entry
func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePorts")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -259,9 +265,9 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlows")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDeviceFlows", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-flows", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -278,9 +284,9 @@
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlowGroups")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-flow-groups", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -298,7 +304,7 @@
// This function is called only in the Core that does not own this device. In the Core that owns this device then a
// deletion deletion also includes removal of any reference of this device.
func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
- logger.Infow(ctx, "stopManagingDevice", log.Fields{"device-id": id})
+ logger.Infow(ctx, "stop-managing-device", log.Fields{"device-id": id})
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if device, err := dMgr.getDeviceReadOnly(ctx, id); err == nil && device.Root {
// stop managing the logical device
@@ -315,7 +321,7 @@
// RunPostDeviceDelete removes any reference of this device
func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
- logger.Infow(ctx, "RunPostDeviceDelete", log.Fields{"device-id": cDevice.Id})
+ logger.Infow(ctx, "run-post-device-delete", log.Fields{"device-id": cDevice.Id})
dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
}
@@ -323,13 +329,14 @@
// GetDevice exists primarily to implement the gRPC interface.
// Internal functions should call getDeviceReadOnly instead.
func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
return dMgr.getDeviceReadOnly(ctx, id.Id)
}
// getDeviceReadOnly will returns a device, either from memory or from the dB, if present
func (dMgr *Manager) getDeviceReadOnly(ctx context.Context, id string) (*voltha.Device, error) {
- logger.Debugw(ctx, "getDeviceReadOnly", log.Fields{"device-id": id})
+ logger.Debugw(ctx, "get-device-read-only", log.Fields{"device-id": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDeviceReadOnly(ctx)
}
@@ -337,7 +344,7 @@
}
func (dMgr *Manager) listDevicePorts(ctx context.Context, id string) (map[uint32]*voltha.Port, error) {
- logger.Debugw(ctx, "listDevicePorts", log.Fields{"device-id": id})
+ logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id})
agent := dMgr.getDeviceAgent(ctx, id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -347,8 +354,8 @@
// GetChildDevice will return a device, either from memory or from the dB, if present
func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
- logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
- "parentPortNo": parentPortNo, "onuId": onuID})
+ logger.Debugw(ctx, "get-child-device", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
+ "parent-port-no": parentPortNo, "onu-id": onuID})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
@@ -356,7 +363,7 @@
}
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber, "onu-id": onuID})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
@@ -368,14 +375,14 @@
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
if searchDevice.ParentPortNo == uint32(parentPortNo) {
- logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onuId": onuID})
+ logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onu-id": onuID})
foundOnuID = true
}
}
foundSerialNumber := false
if searchDevice.SerialNumber == serialNumber {
- logger.Debugw(ctx, "found-child-by-serialnumber", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+ logger.Debugw(ctx, "found-child-by-serial-number", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
foundSerialNumber = true
}
@@ -395,18 +402,18 @@
}
if foundChildDevice != nil {
- logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "foundChildDevice": foundChildDevice})
+ logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "found-child-device": foundChildDevice})
return foundChildDevice, nil
}
logger.Debugw(ctx, "child-device-not-found", log.Fields{"parent-device-id": parentDeviceID,
- "serialNumber": serialNumber, "onuId": onuID, "parentPortNo": parentPortNo})
+ "serial-number": serialNumber, "onu-id": onuID, "parent-port-no": parentPortNo})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
// GetChildDeviceWithProxyAddress will return a device based on proxy address
func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
- logger.Debugw(ctx, "GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
+ logger.Debugw(ctx, "get-child-device-with-proxy-address", log.Fields{"proxy-address": proxyAddress})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, proxyAddress.DeviceId)
if err != nil {
@@ -429,11 +436,11 @@
}
if foundChildDevice != nil {
- logger.Debugw(ctx, "child-device-found", log.Fields{"proxyAddress": proxyAddress})
+ logger.Debugw(ctx, "child-device-found", log.Fields{"proxy-address": proxyAddress})
return foundChildDevice, nil
}
- logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxyAddress": proxyAddress})
+ logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxy-address": proxyAddress})
return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
}
@@ -445,7 +452,8 @@
// ListDevices retrieves the latest devices from the data model
func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
- logger.Debug(ctx, "ListDevices")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevices")
+ logger.Debug(ctx, "list-devices")
result := &voltha.Devices{}
var devices []*voltha.Device
@@ -467,7 +475,7 @@
}
result.Items = append(result.Items, device)
}
- logger.Debugw(ctx, "ListDevices-end", log.Fields{"len": len(result.Items)})
+ logger.Debugw(ctx, "list-devices-end", log.Fields{"len": len(result.Items)})
return result, nil
}
@@ -476,7 +484,7 @@
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
if err := dMgr.dProxy.List(ctx, &devices); err != nil {
- logger.Errorw(ctx, "Failed to list devices from cluster data proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-list-devices-from-cluster-data-proxy", log.Fields{"error": err})
return false, err
}
for _, device := range devices {
@@ -524,12 +532,12 @@
logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
- logger.Warnw(ctx, "Failure loading device", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
} else {
dMgr.addDeviceAgentToMap(agent)
}
} else {
- logger.Debugw(ctx, "Device not in model", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "device-is-not-in-model", log.Fields{"device-id": deviceID})
}
// announce completion of task to any number of waiting channels
dMgr.devicesLoadingLock.Lock()
@@ -577,7 +585,7 @@
return err
}
}
- logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "numChildren": len(childDeviceIds)})
+ logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "num-children": len(childDeviceIds)})
}
return nil
}
@@ -626,7 +634,8 @@
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
- logger.Debug(ctx, "ListDeviceIDs")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceIds")
+ logger.Debug(ctx, "list-device-ids")
// Report only device IDs that are in the device agent map
return dMgr.listDeviceIdsFromMap(), nil
}
@@ -634,7 +643,8 @@
// ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
// trigger loading the devices along with their children and parent in memory
func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
- logger.Debugw(ctx, "ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
+ ctx = utils.WithRPCMetadataContext(ctx, "ReconcileDevices")
+ logger.Debugw(ctx, "reconcile-devices", log.Fields{"num-devices": len(ids.Items)})
if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
@@ -668,12 +678,12 @@
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
- logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
- "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
+ logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
+ "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
// Let's reconcile the device managed by this Core only
if len(dMgr.rootDevices) == 0 {
- logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapterId": adapter.Id})
+ logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapter-id": adapter.Id})
return nil
}
@@ -683,7 +693,7 @@
if dAgent == nil {
continue
}
- logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapterType": adapter.Type})
+ logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapter-type": adapter.Type})
if dAgent.deviceType == adapter.Type {
rootDevice, _ := dAgent.getDeviceReadOnly(ctx)
if rootDevice == nil {
@@ -691,7 +701,7 @@
}
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
if err != nil {
- logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+ logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
continue
}
if isDeviceOwnedByService {
@@ -709,7 +719,7 @@
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
if err != nil {
- logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+ logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
}
if isDeviceOwnedByService {
if dMgr.isOkToReconcile(ctx, childDevice) {
@@ -735,7 +745,7 @@
return status.Errorf(codes.Aborted, "errors-%s", res)
}
} else {
- logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapterId": adapter.Id})
+ logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
}
return nil
}
@@ -782,7 +792,7 @@
}
func (dMgr *Manager) UpdateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "UpdateDeviceUsingAdapterData", log.Fields{"device-id": device.Id, "device": device})
+ logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.updateDeviceUsingAdapterData(ctx, device)
}
@@ -809,7 +819,9 @@
if err != nil {
return err
}
- if err = dMgr.logicalDeviceMgr.updateLogicalPort(log.WithSpanFromContext(context.Background(), ctx), device, ports, port); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(subCtx, device, ports, port); err != nil {
return err
}
return nil
@@ -823,7 +835,8 @@
}
// Setup peer ports in its own routine
go func() {
- if err := dMgr.addPeerPort(log.WithSpanFromContext(context.Background(), ctx), deviceID, port); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := dMgr.addPeerPort(subCtx, deviceID, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
}()
@@ -833,7 +846,7 @@
}
func (dMgr *Manager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "addFlowsAndGroups", log.Fields{"device-id": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
+ logger.Debugw(ctx, "add-flows-and-groups", log.Fields{"device-id": deviceID, "groups:": groups, "flow-metadata": flowMetadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -842,7 +855,7 @@
// deleteParentFlows removes flows from the parent device based on specific attributes
func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "deleteParentFlows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
+ logger.Debugw(ctx, "delete-parent-flows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if !agent.isRootDevice {
return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
@@ -853,7 +866,7 @@
}
func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "deleteFlowsAndGroups", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "delete-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -861,7 +874,7 @@
}
func (dMgr *Manager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "updateFlowsAndGroups", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "update-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -871,8 +884,8 @@
// UpdateDevicePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": configs.Id})
-
if configs.Id == "" {
return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
}
@@ -896,8 +909,8 @@
// ListDevicePmConfigs returns pm configs of device
func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -906,7 +919,7 @@
}
func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
- logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getSwitchCapability(ctx)
}
@@ -914,7 +927,7 @@
}
func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
- logger.Debugw(ctx, "GetPorts", log.Fields{"device-id": deviceID, "portType": portType})
+ logger.Debugw(ctx, "get-ports", log.Fields{"device-id": deviceID, "port-type": portType})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -923,7 +936,7 @@
}
func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw(ctx, "UpdateDeviceStatus", log.Fields{"device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+ logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceStatus(ctx, operStatus, connStatus)
}
@@ -931,7 +944,7 @@
}
func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw(ctx, "UpdateChildrenStatus", log.Fields{"parent-device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+ logger.Debugw(ctx, "update-children-status", log.Fields{"parent-device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, deviceID)
if err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
@@ -947,17 +960,18 @@
}
func (dMgr *Manager) UpdatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "UpdatePortState", log.Fields{"device-id": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+ logger.Debugw(ctx, "update-port-state", log.Fields{"device-id": deviceID, "port-type": portType, "port-no": portNo, "oper-status": operStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
- logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "portNo": portNo, "error": err})
+ logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "port-no": portNo, "error": err})
return err
}
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
go func() {
- err := dMgr.logicalDeviceMgr.updatePortState(log.WithSpanFromContext(context.Background(), ctx), deviceID, portNo, operStatus)
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := dMgr.logicalDeviceMgr.updatePortState(subCtx, deviceID, portNo, operStatus)
if err != nil {
// While we want to handle (catch) and log when
// an update to a port was not able to be
@@ -975,7 +989,7 @@
}
func (dMgr *Manager) DeleteAllPorts(ctx context.Context, deviceID string) error {
- logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.deleteAllPorts(ctx); err != nil {
return err
@@ -985,7 +999,8 @@
// typically is part of a device deletion phase.
if device, err := dMgr.getDeviceReadOnly(ctx, deviceID); err == nil {
go func() {
- if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(log.WithSpanFromContext(context.Background(), ctx), device); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
}
}()
@@ -1000,7 +1015,7 @@
//UpdatePortsState updates all ports on the device
func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, portTypeFilter uint32, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "UpdatePortsState", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "update-ports-state", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return status.Errorf(codes.NotFound, "%s", deviceID)
@@ -1009,7 +1024,7 @@
return status.Error(codes.Unimplemented, "state-change-not-implemented")
}
if err := agent.updatePortsOperState(ctx, portTypeFilter, state); err != nil {
- logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "update-ports-state-failed", log.Fields{"device-id": deviceID, "error": err})
return err
}
return nil
@@ -1017,7 +1032,7 @@
func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
- logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": parentDeviceID, "parent-port-no": parentPortNo, "device-type": deviceType, "channel-id": channelID, "vendor-id": vendorID, "serial-number": serialNumber, "onu-id": onuID})
if deviceType == "" && vendorID != "" {
logger.Debug(ctx, "device-type-is-nil-fetching-device-type")
@@ -1037,7 +1052,7 @@
}
//if no match found for the vendorid,report adapter with the custom error message
if deviceType == "" {
- logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendorId": vendorID})
+ logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendor-id": vendorID})
return nil, status.Errorf(codes.NotFound, "%s", vendorID)
}
@@ -1060,7 +1075,7 @@
}
if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
- logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+ logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
}
@@ -1078,7 +1093,8 @@
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
- err := agent.enableDevice(log.WithSpanFromContext(context.Background(), ctx))
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.enableDevice(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
@@ -1089,7 +1105,7 @@
}
func (dMgr *Manager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
- logger.Debugw(ctx, "packetOut", log.Fields{"device-id": deviceID, "outPort": outPort})
+ logger.Debugw(ctx, "packet-out", log.Fields{"device-id": deviceID, "out-port": outPort})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.packetOut(ctx, outPort, packet)
}
@@ -1098,7 +1114,7 @@
// PacketIn receives packet from adapter
func (dMgr *Manager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
- logger.Debugw(ctx, "PacketIn", log.Fields{"device-id": deviceID, "port": port})
+ logger.Debugw(ctx, "packet-in", log.Fields{"device-id": deviceID, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
var err error
@@ -1118,24 +1134,25 @@
}
func (dMgr *Manager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
- logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parentId": parentID})
+ logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.setParentID(ctx, device, parentID)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
-// CreateLogicalDevice creates logical device in core
+//
+//CreateLogicalDevice creates logical device in core
func (dMgr *Manager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "CreateLogicalDevice")
+ logger.Info(ctx, "create-logical-device")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
- logger.Debugw(ctx, "Parent device already exist.", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
+ logger.Debugw(ctx, "parent-device-already-exist", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
return nil
}
var err error
if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(ctx, cDevice); err != nil {
- logger.Warnw(ctx, "createlogical-device-error", log.Fields{"device": cDevice})
+ logger.Warnw(ctx, "create-logical-device-error", log.Fields{"device": cDevice})
return err
}
return nil
@@ -1143,10 +1160,10 @@
// DeleteLogicalDevice deletes logical device from core
func (dMgr *Manager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "DeleteLogicalDevice")
+ logger.Info(ctx, "delete-logical-device")
var err error
if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
- logger.Warnw(ctx, "deleteLogical-device-error", log.Fields{"device-id": cDevice.Id})
+ logger.Warnw(ctx, "delete-logical-device-error", log.Fields{"device-id": cDevice.Id})
return err
}
// Remove the logical device Id from the parent device
@@ -1160,7 +1177,7 @@
logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
// Just log the error. The logical device or port may already have been deleted before this callback is invoked.
- logger.Warnw(ctx, "deleteLogical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
+ logger.Warnw(ctx, "delete-logical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
}
return nil
}
@@ -1178,7 +1195,7 @@
//ChildDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
- logger.Debug(ctx, "ChildDevicesLost")
+ logger.Debug(ctx, "child-devices-lost")
parentDevice, err := dMgr.getDeviceReadOnly(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"parent-device-id": parentDeviceID, "error": err})
@@ -1190,7 +1207,7 @@
//ChildDevicesDetected is invoked by an adapter when child devices are found, typically after after a
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
- logger.Debug(ctx, "ChildDevicesDetected")
+ logger.Debug(ctx, "child-devices-detected")
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"device-id": parentDeviceID, "error": err})
@@ -1203,16 +1220,17 @@
allChildEnableRequestSent := true
for childDeviceID := range childDeviceIds {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
// Run the children re-registration in its own routine
go func(ctx context.Context) {
err = agent.enableDevice(ctx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
- }(log.WithSpanFromContext(context.Background(), ctx))
+ }(subCtx)
} else {
err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
- logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "childId": childDeviceID})
+ logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "child-id": childDeviceID})
allChildEnableRequestSent = false
}
}
@@ -1229,7 +1247,7 @@
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
- logger.Debug(ctx, "DisableAllChildDevices")
+ logger.Debug(ctx, "disable-all-child-devices")
ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
@@ -1244,7 +1262,7 @@
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
- logger.Debug(ctx, "DeleteAllChildDevices")
+ logger.Debug(ctx, "delete-all-child-devices")
force := false
// Get the parent device Transient state, if its FORCE_DELETED(go for force delete for child devices)
// So in cases when this handler is getting called other than DELETE operation, no force option would be used.
@@ -1301,7 +1319,7 @@
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevicePorts map[uint32]*voltha.Port) map[string]struct{} {
- logger.Debug(ctx, "getAllChildDeviceIds")
+ logger.Debug(ctx, "get-all-child-device-ids")
childDeviceIds := make(map[string]struct{}, len(parentDevicePorts))
for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
@@ -1314,7 +1332,7 @@
//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
- logger.Debugw(ctx, "GetAllChildDevices", log.Fields{"parent-device-id": parentDeviceID})
+ logger.Debugw(ctx, "get-all-child-devices", log.Fields{"parent-device-id": parentDeviceID})
if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
for deviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
@@ -1329,13 +1347,13 @@
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "SetupUNILogicalPorts")
+ logger.Info(ctx, "setup-uni-logical-ports")
cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
if err != nil {
return err
}
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
- logger.Warnw(ctx, "setupUNILogicalPorts-error", log.Fields{"device": cDevice, "err": err})
+ logger.Warnw(ctx, "setup-uni-logical-ports-error", log.Fields{"device": cDevice, "err": err})
return err
}
return nil
@@ -1346,9 +1364,9 @@
// DownloadImage execute an image download request
func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DownloadImage")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "download-image", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1362,9 +1380,9 @@
// CancelImageDownload cancels image download request
func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "CancelImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1378,9 +1396,9 @@
// ActivateImageUpdate activates image update request
func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ActivateImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "activate-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1394,9 +1412,9 @@
// RevertImageUpdate reverts image update
func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "RevertImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "rever-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1413,9 +1431,9 @@
// GetImageDownloadStatus returns status of image download
func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownloadStatus")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1428,12 +1446,12 @@
}
func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "update-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
- logger.Debugw(ctx, "UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw(ctx, "update-image-download-failed", log.Fields{"err": err, "image-name": img.Name})
return err
}
} else {
@@ -1444,9 +1462,9 @@
// GetImageDownload returns image download
func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1460,9 +1478,9 @@
// ListImageDownloads returns image downloads
func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListImageDownloads")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListImageDownloads", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -1476,9 +1494,9 @@
// GetImages returns all images for a specific device entry
func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImages")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "GetImages", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "get-images", log.Fields{"device-id": id.Id})
device, err := dMgr.getDeviceReadOnly(ctx, id.Id)
if err != nil {
return nil, err
@@ -1487,7 +1505,7 @@
}
func (dMgr *Manager) NotifyInvalidTransition(ctx context.Context, device *voltha.Device) error {
- logger.Errorw(ctx, "NotifyInvalidTransition", log.Fields{
+ logger.Errorw(ctx, "notify-invalid-transition", log.Fields{
"device": device.Id,
"curr-admin-state": device.AdminState,
"curr-oper-state": device.OperStatus,
@@ -1507,16 +1525,17 @@
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
if device, _ := dMgr.getDeviceReadOnly(ctx, deviceID); device != nil {
- logger.Infow(ctx, "GetParentDeviceId", log.Fields{"device-id": device.Id, "parentId": device.ParentId})
+ logger.Infow(ctx, "get-parent-device-id", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
return device.ParentId
}
return ""
}
func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
- logger.Debugw(ctx, "SimulateAlarm", log.Fields{"id": simulateReq.Id, "Indicator": simulateReq.Indicator, "IntfId": simulateReq.IntfId,
- "PortTypeName": simulateReq.PortTypeName, "OnuDeviceId": simulateReq.OnuDeviceId, "InverseBitErrorRate": simulateReq.InverseBitErrorRate,
- "Drift": simulateReq.Drift, "NewEqd": simulateReq.NewEqd, "OnuSerialNumber": simulateReq.OnuSerialNumber, "Operation": simulateReq.Operation})
+ ctx = utils.WithRPCMetadataContext(ctx, "SimulateAlarm")
+ logger.Debugw(ctx, "simulate-alarm", log.Fields{"id": simulateReq.Id, "indicator": simulateReq.Indicator, "intf-id": simulateReq.IntfId,
+ "port-type-name": simulateReq.PortTypeName, "onu-device-id": simulateReq.OnuDeviceId, "inverse-bit-error-rate": simulateReq.InverseBitErrorRate,
+ "drift": simulateReq.Drift, "new-eqd": simulateReq.NewEqd, "onu-serial-number": simulateReq.OnuSerialNumber, "operation": simulateReq.Operation})
agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
@@ -1528,7 +1547,7 @@
}
func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
- logger.Debugw(ctx, "UpdateDeviceReason", log.Fields{"device-id": deviceID, "reason": reason})
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": deviceID, "reason": reason})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceReason(ctx, reason)
}
@@ -1536,9 +1555,9 @@
}
func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "EnablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
- logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ logger.Debugw(ctx, "enable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1547,9 +1566,9 @@
}
func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DisablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
- logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ logger.Debugw(ctx, "disable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1559,7 +1578,7 @@
// ChildDeviceLost calls parent adapter to delete child device and all its references
func (dMgr *Manager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
- logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
+ logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
if err := parentAgent.ChildDeviceLost(ctx, curr); err != nil {
// Just log the message and let the remaining pipeline proceed.
@@ -1571,9 +1590,9 @@
}
func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "StartOmciTestAction")
log.EnrichSpan(ctx, log.Fields{"device-id": request.Id})
-
- logger.Debugw(ctx, "StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
+ logger.Debugw(ctx, "start-omci-test-action", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
agent := dMgr.getDeviceAgent(ctx, request.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", request.Id)
@@ -1582,9 +1601,9 @@
}
func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetExtValue")
log.EnrichSpan(ctx, log.Fields{"device-id": value.Id})
-
- logger.Debugw(ctx, "getExtValue", log.Fields{"onu-id": value.Id})
+ logger.Debugw(ctx, "get-ext-value", log.Fields{"onu-id": value.Id})
cDevice, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1598,7 +1617,7 @@
if err != nil {
return nil, err
}
- logger.Debugw(ctx, "getExtValue-result", log.Fields{"result": resp})
+ logger.Debugw(ctx, "get-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
@@ -1607,7 +1626,8 @@
// SetExtValue set some given configs or value
func (dMgr *Manager) SetExtValue(ctx context.Context, value *voltha.ValueSet) (*empty.Empty, error) {
- logger.Debugw(ctx, "setExtValue", log.Fields{"onu-id": value.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "SetExtValue")
+ logger.Debugw(ctx, "set-ext-value", log.Fields{"onu-id": value.Id})
device, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1617,9 +1637,15 @@
if err != nil {
return nil, err
}
- logger.Debugw(ctx, "setExtValue-result", log.Fields{"result": resp})
+ logger.Debugw(ctx, "set-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
}
+
+func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
+ category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ dMgr.RPCEventManager.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+}
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index f06fd6e..71d5acd 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -18,11 +18,22 @@
import (
"context"
- "os"
- "time"
-
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "os"
+ "time"
+)
+
+type contextKey string
+
+func (c contextKey) String() string {
+ return string(c)
+}
+
+var (
+ // RPCContextKey for keeping rpc name as metadata
+ rpcContextKey = contextKey("rpc")
)
// ResponseCallback is the function signature for callbacks to execute after a response is received.
@@ -126,3 +137,35 @@
}
return nil
}
+
+func WithRPCMetadataContext(ctx context.Context, rpcName string) context.Context {
+ ctx = context.WithValue(ctx, rpcContextKey, rpcName)
+ return ctx
+}
+
+func GetRPCMetadataFromContext(ctx context.Context) string {
+ if ctx != nil {
+ if val, ok := ctx.Value(rpcContextKey).(string); ok {
+ return val
+ }
+ }
+ return ""
+}
+
+func WithRPCMetadataFromContext(targetCtx, sourceCtx context.Context) context.Context {
+ if sourceCtx != nil {
+ if val, ok := sourceCtx.Value(rpcContextKey).(string); ok {
+ targetCtx = context.WithValue(targetCtx, rpcContextKey, val)
+ }
+ }
+ return targetCtx
+}
+
+func WithSpanAndRPCMetadataFromContext(sourceCtx context.Context) context.Context {
+ targetCtx := context.Background()
+ if sourceCtx != nil {
+ targetCtx = log.WithSpanFromContext(targetCtx, sourceCtx)
+ targetCtx = WithRPCMetadataFromContext(targetCtx, sourceCtx)
+ }
+ return targetCtx
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go
new file mode 100644
index 0000000..489a493
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package events
+
+import (
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
new file mode 100644
index 0000000..7418ea1
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package eventif
+
+import (
+ "context"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
+)
+
+// EventProxy interface for eventproxy
+type EventProxy interface {
+ SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
+ subCategory EventSubCategory, raisedTs int64) error
+ SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
+ subCategory EventSubCategory, raisedTs int64) error
+ SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
+ subCategory *EventSubCategory, raisedTs int64) error
+}
+
+const (
+ EventTypeVersion = "0.1"
+)
+
+type (
+ EventType = voltha.EventType_Types
+ EventCategory = voltha.EventCategory_Types
+ EventSubCategory = voltha.EventSubCategory_Types
+)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
new file mode 100644
index 0000000..c4014ee
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package events
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
+)
+
+type EventProxy struct {
+ kafkaClient kafka.Client
+ eventTopic kafka.Topic
+}
+
+func NewEventProxy(opts ...EventProxyOption) *EventProxy {
+ var proxy EventProxy
+ for _, option := range opts {
+ option(&proxy)
+ }
+ return &proxy
+}
+
+type EventProxyOption func(*EventProxy)
+
+func MsgClient(client kafka.Client) EventProxyOption {
+ return func(args *EventProxy) {
+ args.kafkaClient = client
+ }
+}
+
+func MsgTopic(topic kafka.Topic) EventProxyOption {
+ return func(args *EventProxy) {
+ args.eventTopic = topic
+ }
+}
+
+func (ep *EventProxy) formatId(eventName string) string {
+ return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
+}
+
+func (ep *EventProxy) getEventHeader(eventName string,
+ category eventif.EventCategory,
+ subCategory *eventif.EventSubCategory,
+ eventType eventif.EventType,
+ raisedTs int64) (*voltha.EventHeader, error) {
+ var header voltha.EventHeader
+ if strings.Contains(eventName, "_") {
+ eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
+ } else {
+ eventName = "UNKNOWN_EVENT"
+ }
+ /* Populating event header */
+ header.Id = ep.formatId(eventName)
+ header.Category = category
+ if subCategory != nil {
+ header.SubCategory = *subCategory
+ } else {
+ header.SubCategory = voltha.EventSubCategory_NONE
+ }
+ header.Type = eventType
+ header.TypeVersion = eventif.EventTypeVersion
+
+ // raisedTs is in nanoseconds
+ timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
+ if err != nil {
+ return nil, err
+ }
+ header.RaisedTs = timestamp
+
+ timestamp, err = ptypes.TimestampProto(time.Now())
+ if err != nil {
+ return nil, err
+ }
+ header.ReportedTs = timestamp
+
+ return &header, nil
+}
+
+/* Send out rpc events*/
+func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
+ if rpcEvent == nil {
+ logger.Error(ctx, "Received empty rpc event")
+ return errors.New("rpc event nil")
+ }
+ var event voltha.Event
+ var err error
+ if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
+ return err
+ }
+ logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
+ "RPCEventName": id})
+
+ return nil
+
+}
+
+/* Send out device events*/
+func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+ if deviceEvent == nil {
+ logger.Error(ctx, "Recieved empty device event")
+ return errors.New("Device event nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_DeviceEvent
+ var err error
+ de.DeviceEvent = deviceEvent
+ if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &de
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ return err
+ }
+ logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
+ "DeviceEventName": deviceEvent.DeviceEventName})
+
+ return nil
+
+}
+
+// SendKpiEvent is to send kpi events to voltha.event topic
+func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+ if kpiEvent == nil {
+ logger.Error(ctx, "Recieved empty kpi event")
+ return errors.New("KPI event nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_KpiEvent2
+ var err error
+ de.KpiEvent2 = kpiEvent
+ if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &de
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ return err
+ }
+ logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
+
+ return nil
+
+}
+
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+ logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
+ if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
+
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-protos/v4/go/voltha/events.pb.go b/vendor/github.com/opencord/voltha-protos/v4/go/voltha/events.pb.go
index f51a7b7..2dc1826 100644
--- a/vendor/github.com/opencord/voltha-protos/v4/go/voltha/events.pb.go
+++ b/vendor/github.com/opencord/voltha-protos/v4/go/voltha/events.pb.go
@@ -116,11 +116,12 @@
type EventSubCategory_Types int32
const (
- EventSubCategory_PON EventSubCategory_Types = 0
- EventSubCategory_OLT EventSubCategory_Types = 1
- EventSubCategory_ONT EventSubCategory_Types = 2
- EventSubCategory_ONU EventSubCategory_Types = 3
- EventSubCategory_NNI EventSubCategory_Types = 4
+ EventSubCategory_PON EventSubCategory_Types = 0
+ EventSubCategory_OLT EventSubCategory_Types = 1
+ EventSubCategory_ONT EventSubCategory_Types = 2
+ EventSubCategory_ONU EventSubCategory_Types = 3
+ EventSubCategory_NNI EventSubCategory_Types = 4
+ EventSubCategory_NONE EventSubCategory_Types = 5
)
var EventSubCategory_Types_name = map[int32]string{
@@ -129,14 +130,16 @@
2: "ONT",
3: "ONU",
4: "NNI",
+ 5: "NONE",
}
var EventSubCategory_Types_value = map[string]int32{
- "PON": 0,
- "OLT": 1,
- "ONT": 2,
- "ONU": 3,
- "NNI": 4,
+ "PON": 0,
+ "OLT": 1,
+ "ONT": 2,
+ "ONU": 3,
+ "NNI": 4,
+ "NONE": 5,
}
func (x EventSubCategory_Types) String() string {
@@ -1149,85 +1152,86 @@
func init() { proto.RegisterFile("voltha_protos/events.proto", fileDescriptor_e63e6c07044fd2c4) }
var fileDescriptor_e63e6c07044fd2c4 = []byte{
- // 1274 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdf, 0x6e, 0xdb, 0xb6,
- 0x17, 0xb6, 0xe4, 0xff, 0x47, 0x4e, 0xa2, 0xb0, 0xbf, 0xdf, 0xe6, 0xba, 0x5b, 0x9b, 0x7a, 0xd8,
- 0x10, 0xb4, 0xa8, 0x8c, 0x69, 0x05, 0x1a, 0xa4, 0x18, 0xb6, 0xd6, 0xf5, 0x1a, 0xa1, 0x8b, 0xed,
- 0x29, 0x4e, 0x80, 0xee, 0xc6, 0x60, 0x24, 0xc6, 0x11, 0x62, 0x5b, 0x02, 0x49, 0x1b, 0xcd, 0x03,
- 0xec, 0x7a, 0x0f, 0xb2, 0xe7, 0xd8, 0xdd, 0xde, 0x60, 0x18, 0xf6, 0x12, 0x7b, 0x80, 0x81, 0x7f,
- 0x64, 0x4b, 0x6e, 0x8a, 0x5e, 0x04, 0xbb, 0x12, 0x79, 0x78, 0x3e, 0x9e, 0xef, 0x7c, 0xe2, 0x39,
- 0x24, 0xb4, 0x96, 0xf1, 0x94, 0x5f, 0xe2, 0x71, 0x42, 0x63, 0x1e, 0xb3, 0x0e, 0x59, 0x92, 0x39,
- 0x67, 0x8e, 0x9c, 0xa1, 0x8a, 0x5a, 0x6b, 0x35, 0xf3, 0x3e, 0x33, 0xc2, 0xb1, 0xf2, 0x68, 0x7d,
- 0x36, 0x89, 0xe3, 0xc9, 0x94, 0x74, 0x70, 0x12, 0x75, 0xf0, 0x7c, 0x1e, 0x73, 0xcc, 0xa3, 0x78,
- 0xae, 0xf1, 0xad, 0x07, 0x7a, 0x55, 0xce, 0xce, 0x17, 0x17, 0x1d, 0x1e, 0xcd, 0x08, 0xe3, 0x78,
- 0x96, 0x68, 0x87, 0x8d, 0xe0, 0x41, 0x3c, 0x9b, 0xc5, 0x73, 0xb5, 0xd6, 0x7e, 0x0e, 0x3b, 0xdd,
- 0x78, 0x7e, 0x11, 0x4d, 0x7a, 0x82, 0xd2, 0xe8, 0x3a, 0x21, 0xed, 0x7d, 0x28, 0x8b, 0x2f, 0x43,
- 0x55, 0x28, 0xe2, 0x30, 0xb4, 0x0b, 0x08, 0xa0, 0x42, 0xc9, 0x2c, 0x5e, 0x12, 0xdb, 0x10, 0xe3,
- 0x45, 0x12, 0x62, 0x4e, 0x6c, 0xb3, 0x7d, 0x09, 0x56, 0x06, 0x8c, 0xbe, 0x86, 0x12, 0xbf, 0x4e,
- 0x48, 0xd3, 0xd8, 0x33, 0xf6, 0xb7, 0xdd, 0xcf, 0x1d, 0x15, 0xd6, 0xd9, 0xd8, 0xdf, 0x91, 0x9b,
- 0xfb, 0xd2, 0x15, 0x21, 0x28, 0x5d, 0x62, 0x76, 0xd9, 0x34, 0xf7, 0x8c, 0xfd, 0xba, 0x2f, 0xc7,
- 0xc2, 0x16, 0x62, 0x8e, 0x9b, 0x45, 0x65, 0x13, 0xe3, 0xf6, 0x23, 0x68, 0xbc, 0x49, 0xa2, 0x35,
- 0xc7, 0x56, 0xca, 0xb1, 0x0e, 0x65, 0x36, 0x8d, 0x02, 0x62, 0x17, 0x50, 0x05, 0x4c, 0xce, 0x6c,
- 0xa3, 0xfd, 0x9b, 0x09, 0xdb, 0xc7, 0x84, 0xd3, 0x28, 0x38, 0x26, 0x1c, 0xbf, 0xc2, 0x1c, 0xa3,
- 0xff, 0x41, 0x99, 0x47, 0x7c, 0xaa, 0xa8, 0xd5, 0x7d, 0x35, 0x41, 0xdb, 0x02, 0x20, 0x43, 0x1b,
- 0xbe, 0xc9, 0x19, 0x7a, 0x04, 0xbb, 0xd3, 0x78, 0x12, 0x05, 0x78, 0x3a, 0x0e, 0xc9, 0x32, 0x0a,
- 0xc8, 0x38, 0x0a, 0x35, 0x8b, 0x1d, 0xbd, 0xf0, 0x4a, 0xda, 0xbd, 0x10, 0xdd, 0x83, 0x3a, 0x23,
- 0x34, 0xc2, 0xd3, 0xf1, 0x3c, 0x6e, 0x96, 0xa4, 0x4f, 0x4d, 0x19, 0xfa, 0xb1, 0x58, 0x5c, 0x6f,
- 0x50, 0x56, 0x8b, 0x61, 0x8a, 0xfc, 0x16, 0xaa, 0x41, 0x3c, 0xe7, 0xe4, 0x1d, 0x6f, 0x56, 0xf6,
- 0x8a, 0xfb, 0x96, 0xfb, 0x45, 0x2a, 0x54, 0x9e, 0xb4, 0xd0, 0x4d, 0x78, 0xf5, 0xe6, 0x9c, 0x5e,
- 0xfb, 0x29, 0x46, 0xa8, 0xb3, 0x58, 0x44, 0x61, 0xb3, 0xaa, 0xd4, 0x11, 0xe3, 0xd6, 0x21, 0x34,
- 0xb2, 0xce, 0xc8, 0x86, 0xe2, 0x15, 0xb9, 0xd6, 0xc9, 0x8a, 0xa1, 0x10, 0x60, 0x89, 0xa7, 0x0b,
- 0xa2, 0x85, 0x56, 0x93, 0x43, 0xf3, 0xc0, 0x68, 0xff, 0x6a, 0x80, 0xad, 0x02, 0x9f, 0x09, 0xdb,
- 0x10, 0x47, 0x94, 0xa1, 0xef, 0xa0, 0x3a, 0x93, 0x36, 0xd6, 0x34, 0x24, 0xc7, 0x2f, 0xf3, 0x1c,
- 0xd7, 0xae, 0xda, 0xc0, 0x34, 0x4b, 0x8d, 0x12, 0x8c, 0xb2, 0x0b, 0x1f, 0x63, 0x64, 0x66, 0x19,
- 0xfd, 0x6e, 0xc0, 0xae, 0x02, 0x7b, 0xf3, 0x8b, 0x98, 0xce, 0xe4, 0x61, 0x47, 0x2e, 0xd4, 0x44,
- 0x45, 0xc8, 0x93, 0x21, 0xb6, 0xb1, 0xdc, 0x4f, 0x6e, 0xd6, 0xcd, 0x5f, 0xf9, 0xa1, 0xef, 0xd7,
- 0x69, 0x98, 0x32, 0x8d, 0xaf, 0xf2, 0x90, 0xcc, 0xfe, 0xff, 0x41, 0x1e, 0x7f, 0x19, 0x50, 0x4b,
- 0x0f, 0x2d, 0x72, 0x72, 0xb5, 0xd1, 0x4a, 0x79, 0x64, 0x0f, 0x75, 0xae, 0x30, 0xd6, 0x67, 0xd3,
- 0x94, 0x67, 0xf3, 0x10, 0x6a, 0x09, 0x25, 0x17, 0xd1, 0x3b, 0xc2, 0x9a, 0x45, 0x99, 0xcb, 0xfd,
- 0xcd, 0x3d, 0x9c, 0xa1, 0x76, 0x50, 0x39, 0xac, 0xfc, 0x5b, 0xa7, 0xb0, 0x95, 0x5b, 0xba, 0x21,
- 0x0b, 0x27, 0x9b, 0x85, 0xe5, 0x36, 0x3f, 0xf4, 0xbb, 0xb3, 0xf9, 0xfd, 0x62, 0x40, 0x3d, 0x8d,
- 0xed, 0xde, 0x22, 0x41, 0x55, 0x7c, 0x07, 0x00, 0xb2, 0x90, 0xc7, 0xba, 0xf6, 0x45, 0x8a, 0x77,
- 0x3f, 0xf8, 0xbb, 0xfc, 0xba, 0x74, 0x16, 0xff, 0xbb, 0xfd, 0x8f, 0x01, 0x96, 0xaa, 0x4b, 0x25,
- 0xf5, 0x03, 0xb0, 0x28, 0x61, 0xf1, 0x82, 0xaa, 0xfa, 0x53, 0x59, 0x42, 0x6a, 0xf2, 0x42, 0x51,
- 0xe7, 0xba, 0x3c, 0x65, 0x1f, 0x1e, 0xcf, 0xf1, 0x2c, 0x2d, 0x8c, 0x9d, 0x70, 0xbd, 0x51, 0x1f,
- 0xcf, 0x08, 0xda, 0x03, 0x2b, 0x24, 0x2c, 0xa0, 0x51, 0x22, 0xc2, 0xea, 0x6e, 0x90, 0x35, 0xa1,
- 0xc3, 0x75, 0x3d, 0x97, 0x24, 0xeb, 0xbd, 0x94, 0x75, 0x86, 0xd4, 0xcd, 0xc5, 0x7c, 0xab, 0xc2,
- 0xfd, 0xd3, 0x84, 0x9a, 0x3f, 0xec, 0xaa, 0x9c, 0x6d, 0x28, 0xd2, 0x24, 0x48, 0x81, 0x34, 0x09,
- 0xd0, 0x43, 0x68, 0xc4, 0x09, 0xa1, 0x52, 0x2d, 0x21, 0x83, 0xc2, 0x5b, 0x2b, 0x9b, 0x17, 0xa2,
- 0x26, 0x54, 0x19, 0xa1, 0x82, 0xa3, 0xce, 0x2b, 0x9d, 0xa2, 0xbb, 0x50, 0x63, 0x1c, 0x07, 0x57,
- 0x02, 0x58, 0xd2, 0x4b, 0x62, 0xee, 0x85, 0x9b, 0xea, 0x96, 0xdf, 0x53, 0x77, 0x43, 0xb1, 0xca,
- 0xfb, 0x8a, 0x3d, 0x5b, 0x2b, 0x56, 0x95, 0x8a, 0xad, 0xae, 0x8a, 0x34, 0x9f, 0x0f, 0xf4, 0xbe,
- 0x27, 0x50, 0x61, 0x1c, 0xf3, 0x05, 0x6b, 0xd6, 0xe4, 0x31, 0xfd, 0xbf, 0xa3, 0xef, 0xb2, 0x41,
- 0x9a, 0x95, 0x4f, 0x58, 0xe2, 0x6b, 0xa7, 0x5b, 0xa9, 0xbb, 0x84, 0x2d, 0xc9, 0xa4, 0x8b, 0x39,
- 0x99, 0xc4, 0xf4, 0xba, 0x4d, 0xd2, 0x1b, 0x67, 0x17, 0xb6, 0xba, 0x83, 0xe3, 0xe3, 0xd3, 0xbe,
- 0xd7, 0x7d, 0x31, 0xf2, 0x06, 0x7d, 0xbb, 0x80, 0x76, 0xc0, 0xea, 0xf5, 0xcf, 0x3c, 0x7f, 0xd0,
- 0x3f, 0xee, 0xf5, 0x47, 0xb6, 0x81, 0xb6, 0xa0, 0xde, 0xfb, 0xe9, 0xd4, 0x1b, 0xca, 0xa9, 0x89,
- 0x2c, 0xa8, 0x9e, 0xf4, 0xfc, 0x33, 0xaf, 0xdb, 0xb3, 0x8b, 0x68, 0x1b, 0x60, 0xe8, 0x0f, 0xba,
- 0xbd, 0x93, 0x13, 0xaf, 0xff, 0xda, 0x2e, 0xa1, 0x06, 0xd4, 0x4e, 0x7a, 0xdd, 0x53, 0xdf, 0x1b,
- 0xbd, 0xb5, 0xcb, 0xed, 0x23, 0xb0, 0x65, 0xdc, 0x93, 0xc5, 0xf9, 0x2a, 0xf4, 0xd3, 0xcc, 0x85,
- 0x3c, 0x94, 0x01, 0xab, 0x50, 0x1c, 0xfc, 0x28, 0x02, 0x89, 0x81, 0x0c, 0x21, 0x07, 0xa7, 0x76,
- 0x51, 0x0c, 0xfa, 0x7d, 0xcf, 0x2e, 0xb5, 0x2f, 0xa0, 0xbe, 0xbe, 0x2f, 0xdf, 0xa6, 0x5b, 0xd8,
- 0xd0, 0xe8, 0x0e, 0xfa, 0x3f, 0x78, 0xaf, 0xc7, 0xbd, 0x33, 0x41, 0xae, 0x20, 0xb8, 0xbe, 0x19,
- 0x7a, 0x7a, 0x6a, 0x08, 0x7a, 0xab, 0xa9, 0x6b, 0x9b, 0x02, 0xf0, 0xaa, 0x27, 0xa8, 0x6b, 0x8f,
- 0xa2, 0x00, 0xf8, 0xc3, 0xae, 0x9e, 0x96, 0xda, 0x7f, 0x9b, 0x60, 0xc9, 0x40, 0x47, 0x04, 0x87,
- 0x84, 0x8a, 0xc2, 0x5e, 0x55, 0x9d, 0x19, 0x85, 0xe8, 0x19, 0xd4, 0x02, 0x9d, 0x89, 0x94, 0x79,
- 0xdb, 0xbd, 0x97, 0xfe, 0xee, 0x9c, 0xc2, 0xba, 0x3b, 0xac, 0x9c, 0xd1, 0x0b, 0x68, 0xb0, 0xc5,
- 0xf9, 0x78, 0x05, 0x2e, 0x4a, 0xf0, 0xfd, 0x1c, 0x38, 0x23, 0x93, 0xc6, 0x5b, 0x6c, 0x6d, 0x42,
- 0x8f, 0x75, 0x53, 0x2a, 0x49, 0xe8, 0xa7, 0x39, 0xe8, 0x7b, 0x1d, 0xe9, 0x21, 0x34, 0xc4, 0x77,
- 0xbc, 0x24, 0x94, 0x89, 0x93, 0xab, 0x8e, 0xb6, 0x25, 0x6c, 0x67, 0xca, 0x84, 0x9e, 0x41, 0x9d,
- 0xe2, 0x88, 0x91, 0x70, 0xcc, 0x99, 0x3c, 0xd9, 0x96, 0xdb, 0x72, 0xd4, 0xf3, 0xcb, 0x49, 0x9f,
- 0x5f, 0xce, 0x28, 0x7d, 0x7e, 0xf9, 0x35, 0xe5, 0x3c, 0x62, 0xe8, 0xb9, 0xa8, 0x9a, 0x24, 0xa6,
- 0x5c, 0x41, 0xab, 0x1f, 0x85, 0x42, 0xea, 0x3e, 0x62, 0xed, 0x3f, 0x4c, 0x28, 0xab, 0x32, 0x7f,
- 0x0c, 0x95, 0x4b, 0xa9, 0xb2, 0xbe, 0x02, 0xef, 0xe4, 0x32, 0x52, 0x3f, 0xc0, 0xd7, 0x2e, 0xe8,
- 0x00, 0x1a, 0x81, 0x7c, 0x7a, 0xa9, 0x36, 0xa7, 0x5b, 0xfb, 0x9d, 0x1b, 0x9e, 0x65, 0x47, 0x05,
- 0xdf, 0x0a, 0x32, 0x0f, 0xb9, 0x0e, 0xd4, 0xaf, 0x92, 0x48, 0xc3, 0x8a, 0x12, 0x66, 0x6f, 0x36,
- 0xf4, 0xa3, 0x82, 0x5f, 0xbb, 0x4a, 0x6f, 0x37, 0x17, 0x60, 0x05, 0x70, 0xa5, 0xda, 0x96, 0xbb,
- 0xbb, 0x89, 0x70, 0x8f, 0x0a, 0x7e, 0xfd, 0x6a, 0x75, 0x61, 0x1c, 0x40, 0x23, 0xdb, 0x85, 0xa5,
- 0xdc, 0x19, 0x7a, 0x99, 0xe6, 0x29, 0xe8, 0x65, 0xfa, 0xb2, 0xa0, 0x47, 0x93, 0x40, 0xc3, 0x2a,
- 0x79, 0x7a, 0x69, 0x07, 0x11, 0xf4, 0x68, 0x12, 0xc8, 0xf1, 0xcb, 0x06, 0x80, 0xea, 0xf4, 0xe2,
- 0x5f, 0xbe, 0xec, 0xc1, 0x9d, 0x98, 0x4e, 0x9c, 0x38, 0x21, 0xf3, 0x20, 0xa6, 0xa1, 0x46, 0xfe,
- 0xec, 0x4c, 0x22, 0x7e, 0xb9, 0x38, 0x17, 0x2d, 0xa5, 0x93, 0xae, 0x75, 0xd4, 0xda, 0x13, 0xfd,
- 0x72, 0x5e, 0x3e, 0xed, 0x4c, 0x62, 0x6d, 0x3b, 0xaf, 0x48, 0xe3, 0x37, 0xff, 0x06, 0x00, 0x00,
- 0xff, 0xff, 0x7b, 0x2c, 0xa9, 0x18, 0xdb, 0x0b, 0x00, 0x00,
+ // 1282 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdb, 0x6e, 0xdb, 0x46,
+ 0x13, 0x16, 0xa9, 0xf3, 0x50, 0xb6, 0xe9, 0xcd, 0xff, 0xb7, 0x8a, 0xd2, 0x26, 0x8e, 0x8a, 0x16,
+ 0x46, 0x82, 0x50, 0x28, 0x5b, 0x20, 0x86, 0x83, 0x1e, 0x12, 0x85, 0x8d, 0x89, 0xd4, 0x94, 0x4a,
+ 0xcb, 0x06, 0xd2, 0x1b, 0x61, 0x4d, 0xae, 0x65, 0xc2, 0x92, 0x48, 0x70, 0x57, 0x42, 0xfc, 0x00,
+ 0xbd, 0xee, 0x83, 0xf4, 0x39, 0x7a, 0xd7, 0x37, 0x28, 0x8a, 0xbe, 0x44, 0x1f, 0xa0, 0xd8, 0x03,
+ 0x25, 0x52, 0x71, 0x90, 0x0b, 0xa3, 0x57, 0xdc, 0x9d, 0x9d, 0x6f, 0xe7, 0x9b, 0x8f, 0x3b, 0xb3,
+ 0x0b, 0x9d, 0x65, 0x3c, 0x65, 0x97, 0x78, 0x9c, 0xa4, 0x31, 0x8b, 0x69, 0x8f, 0x2c, 0xc9, 0x9c,
+ 0x51, 0x4b, 0xcc, 0x50, 0x4d, 0xae, 0x75, 0xda, 0x45, 0x9f, 0x19, 0x61, 0x58, 0x7a, 0x74, 0x3e,
+ 0x99, 0xc4, 0xf1, 0x64, 0x4a, 0x7a, 0x38, 0x89, 0x7a, 0x78, 0x3e, 0x8f, 0x19, 0x66, 0x51, 0x3c,
+ 0x57, 0xf8, 0xce, 0x03, 0xb5, 0x2a, 0x66, 0xe7, 0x8b, 0x8b, 0x1e, 0x8b, 0x66, 0x84, 0x32, 0x3c,
+ 0x4b, 0x94, 0xc3, 0x46, 0xf0, 0x20, 0x9e, 0xcd, 0xe2, 0xb9, 0x5c, 0xeb, 0x3e, 0x83, 0x9d, 0x7e,
+ 0x3c, 0xbf, 0x88, 0x26, 0x0e, 0xa7, 0x34, 0xba, 0x4e, 0x48, 0x77, 0x1f, 0xaa, 0xfc, 0x4b, 0x51,
+ 0x1d, 0xca, 0x38, 0x0c, 0xcd, 0x12, 0x02, 0xa8, 0xa5, 0x64, 0x16, 0x2f, 0x89, 0xa9, 0xf1, 0xf1,
+ 0x22, 0x09, 0x31, 0x23, 0xa6, 0xde, 0xbd, 0x04, 0x23, 0x07, 0x46, 0x5f, 0x42, 0x85, 0x5d, 0x27,
+ 0xa4, 0xad, 0xed, 0x69, 0xfb, 0xdb, 0xf6, 0xa7, 0x96, 0x0c, 0x6b, 0x6d, 0xec, 0x6f, 0x89, 0xcd,
+ 0x7d, 0xe1, 0x8a, 0x10, 0x54, 0x2e, 0x31, 0xbd, 0x6c, 0xeb, 0x7b, 0xda, 0x7e, 0xd3, 0x17, 0x63,
+ 0x6e, 0x0b, 0x31, 0xc3, 0xed, 0xb2, 0xb4, 0xf1, 0x71, 0xf7, 0x11, 0xb4, 0x5e, 0x27, 0xd1, 0x9a,
+ 0x63, 0x27, 0xe3, 0xd8, 0x84, 0x2a, 0x9d, 0x46, 0x01, 0x31, 0x4b, 0xa8, 0x06, 0x3a, 0xa3, 0xa6,
+ 0xd6, 0xfd, 0x4d, 0x87, 0xed, 0x63, 0xc2, 0xd2, 0x28, 0x38, 0x26, 0x0c, 0xbf, 0xc4, 0x0c, 0xa3,
+ 0xff, 0x41, 0x95, 0x45, 0x6c, 0x2a, 0xa9, 0x35, 0x7d, 0x39, 0x41, 0xdb, 0x1c, 0x20, 0x42, 0x6b,
+ 0xbe, 0xce, 0x28, 0x7a, 0x04, 0xbb, 0xd3, 0x78, 0x12, 0x05, 0x78, 0x3a, 0x0e, 0xc9, 0x32, 0x0a,
+ 0xc8, 0x38, 0x0a, 0x15, 0x8b, 0x1d, 0xb5, 0xf0, 0x52, 0xd8, 0xdd, 0x10, 0xdd, 0x83, 0x26, 0x25,
+ 0x69, 0x84, 0xa7, 0xe3, 0x79, 0xdc, 0xae, 0x08, 0x9f, 0x86, 0x34, 0x78, 0x31, 0x5f, 0x5c, 0x6f,
+ 0x50, 0x95, 0x8b, 0x61, 0x86, 0xfc, 0x06, 0xea, 0x41, 0x3c, 0x67, 0xe4, 0x2d, 0x6b, 0xd7, 0xf6,
+ 0xca, 0xfb, 0x86, 0xfd, 0x59, 0x26, 0x54, 0x91, 0x34, 0xd7, 0x8d, 0x7b, 0x39, 0x73, 0x96, 0x5e,
+ 0xfb, 0x19, 0x86, 0xab, 0xb3, 0x58, 0x44, 0x61, 0xbb, 0x2e, 0xd5, 0xe1, 0xe3, 0xce, 0x21, 0xb4,
+ 0xf2, 0xce, 0xc8, 0x84, 0xf2, 0x15, 0xb9, 0x56, 0xc9, 0xf2, 0x21, 0x17, 0x60, 0x89, 0xa7, 0x0b,
+ 0xa2, 0x84, 0x96, 0x93, 0x43, 0xfd, 0x40, 0xeb, 0xfe, 0xaa, 0x81, 0x29, 0x03, 0x9f, 0x71, 0xdb,
+ 0x10, 0x47, 0x29, 0x45, 0xdf, 0x41, 0x7d, 0x26, 0x6c, 0xb4, 0xad, 0x09, 0x8e, 0x9f, 0x17, 0x39,
+ 0xae, 0x5d, 0x95, 0x81, 0x2a, 0x96, 0x0a, 0xc5, 0x19, 0xe5, 0x17, 0x3e, 0xc4, 0x48, 0xcf, 0x33,
+ 0xfa, 0x5d, 0x83, 0x5d, 0x09, 0x76, 0xe7, 0x17, 0x71, 0x3a, 0x13, 0x87, 0x1d, 0xd9, 0xd0, 0xe0,
+ 0x15, 0x21, 0x4e, 0x06, 0xdf, 0xc6, 0xb0, 0x3f, 0xba, 0x59, 0x37, 0x7f, 0xe5, 0x87, 0xbe, 0x5f,
+ 0xa7, 0xa1, 0x8b, 0x34, 0xbe, 0x28, 0x42, 0x72, 0xfb, 0xff, 0x07, 0x79, 0xfc, 0xa5, 0x41, 0x23,
+ 0x3b, 0xb4, 0xc8, 0x2a, 0xd4, 0x46, 0x27, 0xe3, 0x91, 0x3f, 0xd4, 0x85, 0xc2, 0x58, 0x9f, 0x4d,
+ 0x5d, 0x9c, 0xcd, 0x43, 0x68, 0x24, 0x29, 0xb9, 0x88, 0xde, 0x12, 0xda, 0x2e, 0x8b, 0x5c, 0xee,
+ 0x6f, 0xee, 0x61, 0x0d, 0x95, 0x83, 0xcc, 0x61, 0xe5, 0xdf, 0x39, 0x85, 0xad, 0xc2, 0xd2, 0x0d,
+ 0x59, 0x58, 0xf9, 0x2c, 0x0c, 0xbb, 0xfd, 0xbe, 0xdf, 0x9d, 0xcf, 0xef, 0x17, 0x0d, 0x9a, 0x59,
+ 0x6c, 0xfb, 0x16, 0x09, 0xca, 0xe2, 0x3b, 0x00, 0x10, 0x85, 0x3c, 0x56, 0xb5, 0xcf, 0x53, 0xbc,
+ 0xfb, 0xde, 0xdf, 0xe5, 0x37, 0x85, 0x33, 0xff, 0xdf, 0xdd, 0x7f, 0x34, 0x30, 0x64, 0x5d, 0x4a,
+ 0xa9, 0x1f, 0x80, 0x91, 0x12, 0x1a, 0x2f, 0x52, 0x59, 0x7f, 0x32, 0x4b, 0xc8, 0x4c, 0x6e, 0xc8,
+ 0xeb, 0x5c, 0x95, 0xa7, 0xe8, 0xc3, 0xe3, 0x39, 0x9e, 0x65, 0x85, 0xb1, 0x13, 0xae, 0x37, 0xf2,
+ 0xf0, 0x8c, 0xa0, 0x3d, 0x30, 0x42, 0x42, 0x83, 0x34, 0x4a, 0x78, 0x58, 0xd5, 0x0d, 0xf2, 0x26,
+ 0x74, 0xb8, 0xae, 0xe7, 0x8a, 0x60, 0xbd, 0x97, 0xb1, 0xce, 0x91, 0xba, 0xb9, 0x98, 0x6f, 0x55,
+ 0xb8, 0x7f, 0xea, 0xd0, 0xf0, 0x87, 0x7d, 0x99, 0xb3, 0x09, 0xe5, 0x34, 0x09, 0x32, 0x60, 0x9a,
+ 0x04, 0xe8, 0x21, 0xb4, 0xe2, 0x84, 0xa4, 0x42, 0x2d, 0x2e, 0x83, 0xc4, 0x1b, 0x2b, 0x9b, 0x1b,
+ 0xa2, 0x36, 0xd4, 0x29, 0x49, 0x39, 0x47, 0x95, 0x57, 0x36, 0x45, 0x77, 0xa1, 0x41, 0x19, 0x0e,
+ 0xae, 0x38, 0xb0, 0xa2, 0x96, 0xf8, 0xdc, 0x0d, 0x37, 0xd5, 0xad, 0xbe, 0xa3, 0xee, 0x86, 0x62,
+ 0xb5, 0x77, 0x15, 0x7b, 0xba, 0x56, 0xac, 0x2e, 0x14, 0x5b, 0x5d, 0x15, 0x59, 0x3e, 0xef, 0xe9,
+ 0x7d, 0x4f, 0xa0, 0x46, 0x19, 0x66, 0x0b, 0xda, 0x6e, 0x88, 0x63, 0xfa, 0x7f, 0x4b, 0xdd, 0x65,
+ 0x83, 0x2c, 0x2b, 0x9f, 0xd0, 0xc4, 0x57, 0x4e, 0xb7, 0x52, 0x77, 0x09, 0x5b, 0x82, 0x49, 0x1f,
+ 0x33, 0x32, 0x89, 0xd3, 0xeb, 0x2e, 0xc9, 0x6e, 0x9c, 0x5d, 0xd8, 0xea, 0x0f, 0x8e, 0x8f, 0x4f,
+ 0x3d, 0xb7, 0xff, 0x7c, 0xe4, 0x0e, 0x3c, 0xb3, 0x84, 0x76, 0xc0, 0x70, 0xbc, 0x33, 0xd7, 0x1f,
+ 0x78, 0xc7, 0x8e, 0x37, 0x32, 0x35, 0xb4, 0x05, 0x4d, 0xe7, 0xa7, 0x53, 0x77, 0x28, 0xa6, 0x3a,
+ 0x32, 0xa0, 0x7e, 0xe2, 0xf8, 0x67, 0x6e, 0xdf, 0x31, 0xcb, 0x68, 0x1b, 0x60, 0xe8, 0x0f, 0xfa,
+ 0xce, 0xc9, 0x89, 0xeb, 0xbd, 0x32, 0x2b, 0xa8, 0x05, 0x8d, 0x13, 0xa7, 0x7f, 0xea, 0xbb, 0xa3,
+ 0x37, 0x66, 0xb5, 0xeb, 0x83, 0x29, 0xe2, 0x9e, 0x2c, 0xce, 0x57, 0xa1, 0xbf, 0xcd, 0x5d, 0xc8,
+ 0x43, 0x11, 0xb0, 0x0e, 0xe5, 0xc1, 0x8f, 0x3c, 0x10, 0x1f, 0x88, 0x10, 0x62, 0x70, 0x6a, 0x96,
+ 0xf9, 0xc0, 0xf3, 0x5c, 0xb3, 0x82, 0x1a, 0x50, 0xf1, 0x06, 0x9e, 0x63, 0x56, 0xbb, 0x17, 0xd0,
+ 0x5c, 0xdf, 0x9c, 0x6f, 0xb2, 0xcd, 0x4c, 0x68, 0xf5, 0x07, 0xde, 0x0f, 0xee, 0xab, 0xb1, 0x73,
+ 0xc6, 0x69, 0x96, 0x38, 0xeb, 0xd7, 0x43, 0x57, 0x4d, 0x35, 0x4e, 0x74, 0x35, 0xb5, 0x4d, 0x9d,
+ 0x03, 0x5e, 0x3a, 0x3c, 0x09, 0xe5, 0x51, 0xe6, 0x00, 0x7f, 0xd8, 0x57, 0xd3, 0x4a, 0xf7, 0x6f,
+ 0x1d, 0x0c, 0x11, 0xe8, 0x88, 0xe0, 0x90, 0xa4, 0xbc, 0xc4, 0x57, 0xf5, 0xa7, 0x47, 0x21, 0x7a,
+ 0x0a, 0x8d, 0x40, 0xe5, 0x24, 0x04, 0xdf, 0xb6, 0xef, 0x65, 0x3f, 0xbe, 0xa0, 0xb5, 0xea, 0x13,
+ 0x2b, 0x67, 0xf4, 0x1c, 0x5a, 0x74, 0x71, 0x3e, 0x5e, 0x81, 0xcb, 0x02, 0x7c, 0xbf, 0x00, 0xce,
+ 0x09, 0xa6, 0xf0, 0x06, 0x5d, 0x9b, 0xd0, 0x63, 0xd5, 0x9e, 0x2a, 0x02, 0xfa, 0x71, 0x01, 0xfa,
+ 0x4e, 0x6f, 0x7a, 0x08, 0x2d, 0xfe, 0x1d, 0x2f, 0x49, 0x4a, 0xf9, 0x19, 0x96, 0x87, 0xdc, 0xe0,
+ 0xb6, 0x33, 0x69, 0x42, 0x4f, 0xa1, 0x99, 0xe2, 0x88, 0x92, 0x70, 0xcc, 0xa8, 0x38, 0xe3, 0x86,
+ 0xdd, 0xb1, 0xe4, 0x43, 0xcc, 0xca, 0x1e, 0x62, 0xd6, 0x28, 0x7b, 0x88, 0xf9, 0x0d, 0xe9, 0x3c,
+ 0xa2, 0xe8, 0x19, 0xaf, 0x9f, 0x24, 0x4e, 0x99, 0x84, 0xd6, 0x3f, 0x08, 0x85, 0xcc, 0x7d, 0x44,
+ 0xbb, 0x7f, 0xe8, 0x50, 0x95, 0x05, 0xff, 0x18, 0x6a, 0x97, 0x42, 0x65, 0x75, 0x19, 0xde, 0x29,
+ 0x64, 0x24, 0x7f, 0x80, 0xaf, 0x5c, 0xd0, 0x01, 0xb4, 0x02, 0xf1, 0x08, 0x93, 0x0d, 0x4f, 0x35,
+ 0xf9, 0x3b, 0x37, 0x3c, 0xd0, 0x8e, 0x4a, 0xbe, 0x11, 0xe4, 0x9e, 0x74, 0x3d, 0x68, 0x5e, 0x25,
+ 0x91, 0x82, 0x95, 0x05, 0xcc, 0xdc, 0x6c, 0xed, 0x47, 0x25, 0xbf, 0x71, 0x95, 0xdd, 0x73, 0x36,
+ 0xc0, 0x0a, 0x60, 0x0b, 0xb5, 0x0d, 0x7b, 0x77, 0x13, 0x61, 0x1f, 0x95, 0xfc, 0xe6, 0xd5, 0xea,
+ 0xea, 0x38, 0x80, 0x56, 0xbe, 0x1f, 0x0b, 0xb9, 0x73, 0xf4, 0x72, 0x6d, 0x94, 0xd3, 0xcb, 0x75,
+ 0x68, 0x4e, 0x2f, 0x4d, 0x02, 0x05, 0xab, 0x15, 0xe9, 0x65, 0xbd, 0x84, 0xd3, 0x4b, 0x93, 0x40,
+ 0x8c, 0x5f, 0xb4, 0x00, 0x64, 0xcf, 0xe7, 0xff, 0xf2, 0x85, 0x03, 0x77, 0xe2, 0x74, 0x62, 0xc5,
+ 0x09, 0x99, 0x07, 0x71, 0x1a, 0x2a, 0xe4, 0xcf, 0xd6, 0x24, 0x62, 0x97, 0x8b, 0x73, 0xde, 0x5c,
+ 0x7a, 0xd9, 0x5a, 0x4f, 0xae, 0x3d, 0x51, 0x6f, 0xe8, 0xe5, 0xd7, 0xbd, 0x49, 0xac, 0x6c, 0xe7,
+ 0x35, 0x61, 0xfc, 0xea, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xd5, 0xe2, 0x3b, 0xe5, 0x0b,
+ 0x00, 0x00,
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index e072be6..8ced92b 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -102,13 +102,15 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v4 v4.0.7
+# github.com/opencord/voltha-lib-go/v4 v4.0.8
github.com/opencord/voltha-lib-go/v4/pkg/adapters
github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v4/pkg/adapters/common
github.com/opencord/voltha-lib-go/v4/pkg/config
github.com/opencord/voltha-lib-go/v4/pkg/db
github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore
+github.com/opencord/voltha-lib-go/v4/pkg/events
+github.com/opencord/voltha-lib-go/v4/pkg/events/eventif
github.com/opencord/voltha-lib-go/v4/pkg/flows
github.com/opencord/voltha-lib-go/v4/pkg/grpc
github.com/opencord/voltha-lib-go/v4/pkg/kafka
@@ -117,7 +119,7 @@
github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka
github.com/opencord/voltha-lib-go/v4/pkg/probe
github.com/opencord/voltha-lib-go/v4/pkg/version
-# github.com/opencord/voltha-protos/v4 v4.0.8
+# github.com/opencord/voltha-protos/v4 v4.0.12
github.com/opencord/voltha-protos/v4/go/common
github.com/opencord/voltha-protos/v4/go/ext/config
github.com/opencord/voltha-protos/v4/go/extension