VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c122574..84f765f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -133,7 +133,7 @@
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
- logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+ logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -172,7 +172,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
+ logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
// Remove the device transient loader
if err := agent.deleteTransientState(ctx); err != nil {
return err
@@ -236,7 +236,7 @@
// TODO: Post failure message onto kafka
}
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
select {
@@ -283,19 +283,31 @@
}
-func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ //add failure
} else if rpcResponse.Err != nil {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+ //add failure
} else {
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
@@ -326,7 +338,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
oldDevice := agent.getDeviceReadOnlyWithoutLock()
if oldDevice.AdminState == voltha.AdminState_ENABLED {
@@ -360,6 +372,8 @@
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
@@ -376,16 +390,28 @@
func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
response.Error(status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
response.Error(rpcResponse.Err)
} else {
response.Done()
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
response.Error(ctx.Err())
}
}
@@ -451,7 +477,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
cloned := agent.cloneDeviceWithoutLock()
@@ -476,6 +502,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
@@ -491,13 +519,15 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
if agent.isDeletionInProgress() {
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
@@ -508,7 +538,7 @@
}
func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -521,27 +551,29 @@
agent.deviceID)
}
device := agent.cloneDeviceWithoutLock()
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
return err
}
previousAdminState := device.AdminState
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
return err
}
// Since it is a case of force delete, nothing needs to be done on adapter responses.
- go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+ go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
agent.onFailure)
}
return nil
}
func (agent *Agent) deleteDevice(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -561,14 +593,16 @@
// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ currentDeviceTransientState, previousDeviceTransientState); err != nil {
return err
}
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
@@ -578,7 +612,7 @@
}
return err
}
- go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+ go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
agent.onDeleteFailure)
}
return nil
@@ -588,7 +622,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+ logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
@@ -597,7 +631,7 @@
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
- logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -651,6 +685,8 @@
}
// Send packet to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
@@ -664,7 +700,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
@@ -686,14 +722,14 @@
cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.ConnectStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.OperStatus = operStatus
}
- logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"device-id": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+ logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
// Store the device
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
@@ -742,11 +778,13 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "simulateAlarm", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
@@ -778,10 +816,16 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
+
}
return nil
}
@@ -805,7 +849,7 @@
//Reverting TransientState update
err := agent.updateTransientState(ctx, prevTransientState)
logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
- "previousTransientState": prevTransientState, "currentTransientState": transientState})
+ "previous-transient-state": prevTransientState, "current-transient-state": transientState})
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
@@ -818,10 +862,14 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
-
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, transientState, prevTransientState); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
}
return nil
}
@@ -829,7 +877,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceReason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
@@ -837,7 +885,7 @@
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+ logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
// Remove the associated peer ports on the parent device
for portID := range agent.portLoader.ListIDs() {
@@ -861,6 +909,8 @@
//send request to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
if err != nil {
cancel()
@@ -907,12 +957,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+ logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
return testResp, nil
}
func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
- logger.Debugw(ctx, "getExtValue", log.Fields{"device-id": agent.deviceID, "onuid": valueparam.Id, "valuetype": valueparam.Value})
+ logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -938,12 +988,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+ logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
return Resp, nil
}
func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
- logger.Debugw(ctx, "setExtValue", log.Fields{"device-id": value.Id})
+ logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -965,12 +1015,12 @@
}
// Unmarshal and return the response
- logger.Debug(ctx, "setExtValue-Success-device-agent")
+ logger.Debug(ctx, "set-ext-value-success-device-agent")
return &empty.Empty{}, nil
}
func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
- logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
@@ -1004,7 +1054,7 @@
}
func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
- logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 8b8e1a7..f2fd10a 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -78,7 +78,7 @@
flowsToAdd = append(flowsToAdd, flow)
} else {
//No need to change the flow. It is already exist.
- logger.Debugw(ctx, "No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
}
}
flowHandle.Unlock()
@@ -92,6 +92,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -151,6 +153,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
@@ -182,7 +186,7 @@
}
func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw(ctx, "updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
if (len(updatedFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
@@ -218,6 +222,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index e450972..9552b78 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -81,7 +81,7 @@
groupsToAdd = append(groupsToAdd, group)
} else {
//No need to change the group. It is already exist.
- logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+ logger.Debugw(ctx, "no-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
}
}
@@ -95,6 +95,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -153,6 +155,8 @@
// Send update to adapters
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllGroups := agent.listDeviceGroups()
@@ -183,7 +187,7 @@
}
func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
if (len(updatedGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
@@ -216,6 +220,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index acebaca..eed1879 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -22,6 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -32,7 +33,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "downloadImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "download-image", log.Fields{"device-id": agent.deviceID})
if agent.device.Root {
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
@@ -62,6 +63,8 @@
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DownloadImage(subCtx, cloned, clonedImg)
if err != nil {
cancel()
@@ -87,7 +90,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "cancelImageDownload", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -108,6 +111,8 @@
return nil, err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
if err != nil {
cancel()
@@ -123,7 +128,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "activateImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -158,6 +163,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
@@ -174,7 +181,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- logger.Debugw(ctx, "revertImage", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "revert-image", log.Fields{"device-id": agent.deviceID})
// Update image download state
cloned := agent.cloneDeviceWithoutLock()
@@ -195,6 +202,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
@@ -206,7 +215,7 @@
}
func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw(ctx, "getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
@@ -261,7 +270,7 @@
}
func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw(ctx, "getImageDownload", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -276,7 +285,7 @@
}
func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- logger.Debugw(ctx, "listImageDownloads", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -288,7 +297,7 @@
// onImageFailure brings back the device to Enabled state and sets the image to image download_failed.
func (agent *Agent) onImageFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
if res, ok := response.(error); ok {
@@ -338,7 +347,7 @@
// onImageSuccess brings back the device to Enabled state and sets the image to image download_failed.
func (agent *Agent) onImageSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
logger.Errorw(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 0640788..d726115 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -20,6 +20,7 @@
"context"
"github.com/gogo/protobuf/proto"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -30,7 +31,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
+ logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -40,6 +41,8 @@
}
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
if err != nil {
cancel()
@@ -53,7 +56,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
+ logger.Debugw(ctx, "init-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
@@ -61,7 +64,7 @@
}
func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
- logger.Debugw(ctx, "listPmConfigs", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "list-pm-configs", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index a3489fc..6e53d16 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -22,6 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/core/device/port"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -43,7 +44,7 @@
// getPorts retrieves the ports information of the device based on the port type.
func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- logger.Debugw(ctx, "getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+ logger.Debugw(ctx, "get-ports", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
for _, port := range agent.listDevicePorts() {
if port.Type == portType {
@@ -63,7 +64,7 @@
}
func (agent *Agent) updatePortsOperState(ctx context.Context, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortsOperState", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "update-ports-oper-state", log.Fields{"device-id": agent.deviceID})
for portID := range agent.portLoader.ListIDs() {
if portHandle, have := agent.portLoader.Lock(portID); have {
@@ -79,12 +80,14 @@
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if newPort.Type == voltha.Port_ETHERNET_NNI || newPort.Type == voltha.Port_ETHERNET_UNI {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
go func(portID uint32, ctx context.Context) {
if err := agent.deviceMgr.logicalDeviceMgr.updatePortState(ctx, agent.deviceID, portID, operStatus); err != nil {
// TODO: VOL-2707
logger.Warnw(ctx, "unable-to-update-logical-port-state", log.Fields{"error": err})
}
- }(portID, log.WithSpanFromContext(context.Background(), ctx))
+ }(portID, subCtx)
}
}
portHandle.Unlock()
@@ -116,7 +119,7 @@
}
func (agent *Agent) deleteAllPorts(ctx context.Context) error {
- logger.Debugw(ctx, "deleteAllPorts", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -159,7 +162,7 @@
oldPort := portHandle.GetReadOnly()
if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
- logger.Debugw(ctx, "port already exists", log.Fields{"port": port})
+ logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
return nil
}
@@ -218,7 +221,7 @@
}
func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
- logger.Debugw(ctx, "disablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
portHandle, have := agent.portLoader.Lock(portID)
if !have {
@@ -244,6 +247,8 @@
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
if err != nil {
cancel()
@@ -254,7 +259,7 @@
}
func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
- logger.Debugw(ctx, "enablePort", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
portHandle, have := agent.portLoader.Lock(portID)
if !have {
@@ -280,6 +285,8 @@
return err
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
if err != nil {
cancel()
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 3d06a68..6947447 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -35,6 +35,7 @@
tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -52,6 +53,7 @@
adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
oltAdapter *cm.OLTAdapter
onuAdapter *cm.ONUAdapter
@@ -75,6 +77,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -116,6 +119,7 @@
func (dat *DATest) startCore(ctx context.Context) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
grpcPort, err := freeport.GetFreePort()
@@ -138,8 +142,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
-
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
@@ -161,6 +165,9 @@
if dat.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
}
+ if dat.kEventClient != nil {
+ dat.kEventClient.Stop(ctx)
+ }
}
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 21d535f..4489196 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -20,12 +20,18 @@
"context"
"encoding/binary"
"encoding/hex"
- "sync"
-
+ "fmt"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opentracing/opentracing-go"
+ jtracing "github.com/uber/jaeger-client-go"
+ "sync"
+ "time"
)
type Manager struct {
@@ -33,21 +39,34 @@
packetInQueueDone chan bool
changeEventQueue chan openflow_13.ChangeEvent
changeEventQueueDone chan bool
+ RPCEventManager *RPCEventManager
}
-func NewManager() *Manager {
+type RPCEventManager struct {
+ eventProxy eventif.EventProxy
+ coreInstanceID string
+}
+
+func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *Manager {
return &Manager{
packetInQueue: make(chan openflow_13.PacketIn, 100),
packetInQueueDone: make(chan bool, 1),
changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
changeEventQueueDone: make(chan bool, 1),
+ RPCEventManager: NewRPCEventManager(proxyForRPCEvents, instanceID),
}
}
+func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *RPCEventManager {
+ return &RPCEventManager{
+ eventProxy: proxyForRPCEvents,
+ coreInstanceID: instanceID,
+ }
+}
func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
// TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
- logger.Debugw(ctx, "SendPacketIn", log.Fields{"packetIn": packetIn})
+ logger.Debugw(ctx, "send-packet-in", log.Fields{"packet-in": packetIn})
q.packetInQueue <- packetIn
}
@@ -66,9 +85,9 @@
defer streamingTracker.Unlock()
if _, ok := streamingTracker.calls[method]; ok {
// bail out the other packet in thread
- logger.Debugf(ctx, "%s streaming call already running. Exiting it", method)
+ logger.Debugf(ctx, "%s-streaming-call-already-running-exiting-it", method)
done <- true
- logger.Debugf(ctx, "Last %s exited. Continuing ...", method)
+ logger.Debugf(ctx, "last-%s-exited-continuing", method)
} else {
streamingTracker.calls[method] = &callTracker{failedPacket: nil}
}
@@ -79,10 +98,10 @@
if tracker.failedPacket != nil {
switch tracker.failedPacket.(type) {
case openflow_13.PacketIn:
- logger.Debug(ctx, "Enqueueing last failed packetIn")
+ logger.Debug(ctx, "enqueueing-last-failed-packet-in")
q.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
case openflow_13.ChangeEvent:
- logger.Debug(ctx, "Enqueueing last failed changeEvent")
+ logger.Debug(ctx, "enqueueing-last-failed-change-event")
q.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
}
}
@@ -92,8 +111,9 @@
// ReceivePacketsIn receives packets from adapter
func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
ctx := context.Background()
+ ctx = utils.WithRPCMetadataContext(ctx, "ReceivePacketsIn")
var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
- logger.Debugw(ctx, "ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+ logger.Debugw(ctx, "receive-packets-in-request", log.Fields{"packets-in": packetsIn})
err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
@@ -109,6 +129,9 @@
})
if err := packetsIn.Send(&packet); err != nil {
logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
+ go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+ nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
// save the last failed packet in
streamingTracker.failedPacket = packet
} else {
@@ -118,7 +141,7 @@
}
}
case <-q.packetInQueueDone:
- logger.Debug(ctx, "Another ReceivePacketsIn running. Bailing out ...")
+ logger.Debug(ctx, "another-receive-packets-in-running-bailing-out")
break loop
}
}
@@ -128,7 +151,7 @@
}
func (q *Manager) SendChangeEvent(ctx context.Context, deviceID string, reason openflow_13.OfpPortReason, desc *openflow_13.OfpPort) {
- logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
+ logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID, "reason": reason, "desc": desc})
q.changeEventQueue <- openflow_13.ChangeEvent{
Id: deviceID,
Event: &openflow_13.ChangeEvent_PortStatus{
@@ -141,8 +164,8 @@
}
func (q *Manager) SendFlowChangeEvent(ctx context.Context, deviceID string, res []error, xid uint32, flowCookie uint64) {
- logger.Debugw(ctx, "SendChangeEvent", log.Fields{"device-id": deviceID,
- "flowId": xid, "flowCookie": flowCookie, "errors": res})
+ logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID,
+ "flow-id": xid, "flow-cookie": flowCookie, "errors": res})
errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
//Manually creating the data payload for the flow error message
bs := make([]byte, 2)
@@ -179,8 +202,9 @@
// ReceiveChangeEvents receives change in events
func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
ctx := context.Background()
+ ctx = utils.WithRPCMetadataContext(ctx, "ReceiveChangeEvents")
var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
- logger.Debugw(ctx, "ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+ logger.Debugw(ctx, "receive-change-events-request", log.Fields{"change-events": changeEvents})
err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
@@ -195,7 +219,10 @@
logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
if err := changeEvents.Send(&event); err != nil {
logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
- // save last failed changeevent
+ go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+ nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
+ time.Now().UnixNano())
+ // save last failed change event
streamingTracker.failedPacket = event
} else {
if streamingTracker.failedPacket != nil {
@@ -204,7 +231,7 @@
}
}
case <-q.changeEventQueueDone:
- logger.Debug(ctx, "Another ReceiveChangeEvents already running. Bailing out ...")
+ logger.Debug(ctx, "another-receive-change-events-already-running-bailing-out")
break loop
}
}
@@ -215,3 +242,44 @@
func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
return q.changeEventQueue
}
+
+func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+ logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
+ var opID string
+ var rpc string
+
+ if span := opentracing.SpanFromContext(ctx); span != nil {
+ if jSpan, ok := span.(*jtracing.Span); ok {
+ opID = fmt.Sprintf("%016x", jSpan.SpanContext().TraceID().Low) // Using Sprintf to avoid removal of leading 0s
+ }
+ }
+ rpc = utils.GetRPCMetadataFromContext(ctx)
+ rpcev := &voltha.RPCEvent{
+ Rpc: rpc,
+ OperationId: opID,
+ ResourceId: resourceID,
+ Service: q.coreInstanceID,
+ Status: &common.OperationResp{
+ Code: common.OperationResp_OPERATION_FAILURE,
+ },
+ Description: desc,
+ Context: context,
+ }
+ return rpcev
+}
+
+func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ if rpcEvent.Rpc != "" {
+ _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ }
+}
+
+func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+ id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ if rpcEvent.Rpc != "" {
+ _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ }
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 6b0b6f7..7ab00da 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -92,7 +92,7 @@
return nil
}
- logger.Infow(ctx, "starting-logical_device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
+ logger.Infow(ctx, "starting-logical-device-agent", log.Fields{"logical-device-id": agent.logicalDeviceID, "load-from-db": loadFromDB})
var startSucceeded bool
defer func() {
@@ -128,13 +128,14 @@
logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
- logger.Debugw(ctx, "logicaldevice-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
+ logger.Debugw(ctx, "logical-device-created", log.Fields{"logical-device-id": agent.logicalDeviceID, "root-id": ld.RootDeviceId})
agent.logicalDevice = ld
// Setup the logicalports - internal processing, no need to propagate the client context
go func() {
- err := agent.setupLogicalPorts(log.WithSpanFromContext(context.Background(), ctx))
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.setupLogicalPorts(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-setup-logical-ports", log.Fields{"error": err})
}
@@ -169,7 +170,8 @@
// Setup the device routes. Building routes may fail if the pre-conditions are not satisfied (e.g. no PON ports present)
if loadFromDB {
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warn(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -183,7 +185,7 @@
func (agent *LogicalAgent) stop(ctx context.Context) error {
var returnErr error
agent.stopOnce.Do(func() {
- logger.Info(ctx, "stopping-logical_device-agent")
+ logger.Info(ctx, "stopping-logical-device-agent")
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
// This should never happen - an error is returned only if the agent is stopped and an agent is only stopped once.
@@ -202,14 +204,14 @@
if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
- logger.Debugw(ctx, "logicaldevice-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Debugw(ctx, "logical-device-removed", log.Fields{"logical-device-id": agent.logicalDeviceID})
}
// TODO: remove all entries from all loaders
// TODO: don't allow any more modifications to flows/groups/meters/ports or to any logical device field
agent.stopped = true
- logger.Info(ctx, "logical_device-agent-stopped")
+ logger.Info(ctx, "logical-device-agent-stopped")
})
return returnErr
}
@@ -224,7 +226,7 @@
}
func (agent *LogicalAgent) addFlowsAndGroupsToDevices(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) []coreutils.Response {
- logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
+ logger.Debugw(ctx, "send-add-flows-to-device-manager", log.Fields{"logical-device-id": agent.logicalDeviceID, "device-rules": deviceRules, "flow-metadata": flowMetadata})
responses := make([]coreutils.Response, 0)
for deviceID, value := range deviceRules.GetRules() {
@@ -232,6 +234,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.addFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -260,6 +264,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
start := time.Now()
if err := agent.deviceMgr.deleteFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
@@ -286,6 +292,8 @@
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.updateFlowsAndGroups(subCtx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
logger.Errorw(ctx, "flow-update-failed", log.Fields{"device-id": deviceId, "error": err})
@@ -313,6 +321,8 @@
logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(subCtx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error(ctx, "flow-delete-failed-from-parent-device", log.Fields{
@@ -331,15 +341,15 @@
func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
if logger.V(log.InfoLevel) {
logger.Infow(ctx, "packet-out", log.Fields{
- "packet": hex.EncodeToString(packet.Data),
- "inPort": packet.GetInPort(),
+ "packet": hex.EncodeToString(packet.Data),
+ "in-port": packet.GetInPort(),
})
}
outPort := fu.GetPacketOutPort(packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
- logger.Error(ctx, "packetout-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
+ logger.Error(ctx, "packet-out-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
}
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6b8abef..2b88abc 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -21,6 +21,7 @@
"errors"
"fmt"
"strconv"
+ "time"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/route"
@@ -48,7 +49,7 @@
//updateFlowTable updates the flow table of that logical device
func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "UpdateFlowTable")
+ logger.Debug(ctx, "update-flow-table")
if flow == nil {
return nil
}
@@ -72,19 +73,19 @@
//flowAdd adds a flow to the flow table of that logical device
func (agent *LogicalAgent) flowAdd(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowAdd", log.Fields{"flow": mod})
+ logger.Debugw(ctx, "flow-add", log.Fields{"flow": mod})
if mod == nil {
return nil
}
flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
if err != nil {
- logger.Errorw(ctx, "flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-add-failed", log.Fields{"flow-mod": mod, "err": err})
return err
}
var updated bool
var changed bool
if changed, updated, err = agent.decomposeAndAdd(ctx, flow, flowUpdate); err != nil {
- logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+ logger.Errorw(ctx, "flow-decompose-and-add-failed ", log.Fields{"flow-mod": mod, "err": err})
return err
}
if changed && !updated {
@@ -115,7 +116,7 @@
// TODO: this currently does nothing
if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
// TODO: should this error be notified other than being logged?
- logger.Warnw(ctx, "overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
} else {
// Add flow
changed = true
@@ -141,7 +142,7 @@
flowMeterConfig, err := agent.GetMeterConfig(ctx, updatedFlows)
if err != nil {
- logger.Error(ctx, "Meter-referred-in-flow-not-present")
+ logger.Error(ctx, "meter-referred-in-flow-not-present")
return changed, updated, err
}
@@ -178,8 +179,10 @@
"flow": flow,
"groups": groups,
})
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
// Revert added flows
- if err := agent.revertAddedFlows(log.WithSpanFromContext(context.Background(), ctx), mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
+ if err := agent.revertAddedFlows(subCtx, mod, flow, flowToReplace, deviceRules, toMetadata(flowMeterConfig)); err != nil {
logger.Errorw(ctx, "failure-to-delete-flow-after-failed-addition", log.Fields{
"error": err,
"logical-device-id": agent.logicalDeviceID,
@@ -189,6 +192,13 @@
}
// send event
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
}
@@ -198,7 +208,7 @@
// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
// will be reverted, both from the logical devices and the devices.
func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+ logger.Debugw(ctx, "revert-flow-add", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
flowHandle, have := agent.flowLoader.Lock(addedFlow.Id)
if !have {
@@ -243,7 +253,7 @@
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDelete(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
- logger.Debug(ctx, "flowDelete")
+ logger.Debug(ctx, "flow-delete")
mod := flowUpdate.FlowMod
if mod == nil {
return nil
@@ -274,7 +284,7 @@
//Delete the matched flows
if len(toDelete) > 0 {
- logger.Debugw(ctx, "flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+ logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
for _, flow := range toDelete {
if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
@@ -299,7 +309,7 @@
metersConfig, err := agent.GetMeterConfig(ctx, toDelete)
if err != nil { // This should never happen
- logger.Error(ctx, "Meter-referred-in-flows-not-present")
+ logger.Error(ctx, "meter-referred-in-flows-not-present")
return err
}
@@ -336,7 +346,13 @@
go func() {
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ logger.Errorw(ctx, "failure-updating-device-flows", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
// TODO: Revert the flow deletion
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -350,7 +366,7 @@
//flowDeleteStrict deletes a flow from the flow table of that logical device
func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, flowUpdate *ofp.FlowTableUpdate) error {
mod := flowUpdate.FlowMod
- logger.Debugw(ctx, "flowDeleteStrict", log.Fields{"mod": mod})
+ logger.Debugw(ctx, "flow-delete-strict", log.Fields{"mod": mod})
if mod == nil {
return nil
}
@@ -359,10 +375,10 @@
if err != nil {
return err
}
- logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+ logger.Debugw(ctx, "flow-id-in-flow-delete-strict", log.Fields{"flow-id": flow.Id})
flowHandle, have := agent.flowLoader.Lock(flow.Id)
if !have {
- logger.Debugw(ctx, "Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+ logger.Debugw(ctx, "skipping-flow-delete-strict-request-no-flow-found", log.Fields{"flow-mod": mod})
return nil
}
defer flowHandle.Unlock()
@@ -421,6 +437,14 @@
// TODO: Revert flow changes
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["flow-id"] = string(flow.Id)
+ context["device-rules"] = deviceRules.String()
+ // Create context and send extra information as part of it.
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
}
}()
@@ -448,7 +472,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingMeter(ctx context.Context, meterID uint32) error {
- logger.Infow(ctx, "Delete-flows-matching-meter", log.Fields{"meter": meterID})
+ logger.Infow(ctx, "delete-flows-matching-meter", log.Fields{"meter": meterID})
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
@@ -466,7 +490,7 @@
}
func (agent *LogicalAgent) deleteFlowsHavingGroup(ctx context.Context, groupID uint32) (map[uint64]*ofp.OfpFlowStats, error) {
- logger.Infow(ctx, "Delete-flows-matching-group", log.Fields{"groupID": groupID})
+ logger.Infow(ctx, "delete-flows-matching-group", log.Fields{"group-id": groupID})
flowsRemoved := make(map[uint64]*ofp.OfpFlowStats)
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 7a9f91c..b7bf1b6 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "time"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
@@ -44,7 +45,7 @@
//updateGroupTable updates the group table of that logical device
func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug(ctx, "updateGroupTable")
+ logger.Debug(ctx, "update-group-table")
if groupMod == nil {
return nil
}
@@ -64,7 +65,7 @@
if groupMod == nil {
return nil
}
- logger.Debugw(ctx, "groupAdd", log.Fields{"GroupId": groupMod.GroupId})
+ logger.Debugw(ctx, "group-add", log.Fields{"group-id": groupMod.GroupId})
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
@@ -83,7 +84,7 @@
deviceRules := fu.NewDeviceRules()
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
- logger.Debugw(ctx, "rules", log.Fields{"rules for group-add": deviceRules.String()})
+ logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-add": deviceRules.String()})
// Update the devices
respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
@@ -92,6 +93,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
@@ -99,7 +107,7 @@
}
func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debugw(ctx, "groupDelete", log.Fields{"groupMod": groupMod})
+ logger.Debugw(ctx, "group-delete", log.Fields{"group-mod": groupMod})
if groupMod == nil {
return nil
}
@@ -135,7 +143,7 @@
}
if len(affectedGroups) == 0 {
- logger.Debugw(ctx, "no-group-to-delete", log.Fields{"groupId": groupMod.GroupId})
+ logger.Debugw(ctx, "no-group-to-delete", log.Fields{"group-id": groupMod.GroupId})
return nil
}
@@ -167,6 +175,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
@@ -174,7 +189,7 @@
}
func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug(ctx, "groupModify")
+ logger.Debug(ctx, "group-modify")
if groupMod == nil {
return nil
}
@@ -199,7 +214,7 @@
//update KV
if err := groupHandle.Update(ctx, groupEntry); err != nil {
- logger.Errorw(ctx, "Cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Errorw(ctx, "cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
@@ -210,6 +225,13 @@
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
+ context := make(map[string]string)
+ context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
+ context["group-id"] = string(groupMod.GroupId)
+ context["device-rules"] = deviceRules.String()
+ go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
//TODO: Revert flow changes
}
}()
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index ab1b82b..05a5108 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -32,7 +32,7 @@
// listLogicalDevicePorts returns logical device ports
func (agent *LogicalAgent) listLogicalDevicePorts(ctx context.Context) map[uint32]*voltha.LogicalPort {
- logger.Debug(ctx, "listLogicalDevicePorts")
+ logger.Debug(ctx, "list-logical-device-ports")
portIDs := agent.portLoader.ListIDs()
ret := make(map[uint32]*voltha.LogicalPort, len(portIDs))
for portID := range portIDs {
@@ -45,7 +45,7 @@
}
func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "updateLogicalPort", log.Fields{"device-id": device.Id, "port": port})
+ logger.Debugw(ctx, "update-logical-port", log.Fields{"device-id": device.Id, "port": port})
switch port.Type {
case voltha.Port_ETHERNET_NNI:
if err := agent.addNNILogicalPort(ctx, device.Id, devicePorts, port); err != nil {
@@ -58,7 +58,9 @@
case voltha.Port_PON_OLT:
// Rebuilt the routes on Parent PON port addition
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+
+ if err := agent.buildRoutes(subCtx); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -67,7 +69,8 @@
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
go func() {
- if err := agent.updateAllRoutes(log.WithSpanFromContext(context.Background(), ctx), device.Id, devicePorts); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.updateAllRoutes(subCtx, device.Id, devicePorts); err != nil {
// Not an error - temporary state
logger.Infow(ctx, "failed-to-update-routes-after-adding-child-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
@@ -82,10 +85,10 @@
// added to it. While the logical device was being created we could have received requests to add
// NNI and UNI ports which were discarded. Now is the time to add them if needed
func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
- logger.Infow(ctx, "setupLogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
// First add any NNI ports which could have been missing
if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
- logger.Errorw(ctx, "error-setting-up-NNI-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
+ logger.Errorw(ctx, "error-setting-up-nni-ports", log.Fields{"error": err, "device-id": agent.rootDeviceID})
return err
}
@@ -99,21 +102,22 @@
for _, child := range children.Items {
response := coreutils.NewResponse()
responses = append(responses, response)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
go func(ctx context.Context, child *voltha.Device) {
defer response.Done()
childPorts, err := agent.deviceMgr.listDevicePorts(ctx, child.Id)
if err != nil {
- logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+ logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
return
}
if err = agent.setupUNILogicalPorts(ctx, child, childPorts); err != nil {
- logger.Error(ctx, "setting-up-UNI-ports-failed", log.Fields{"device-id": child.Id})
+ logger.Error(ctx, "setting-up-uni-ports-failed", log.Fields{"device-id": child.Id})
response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- }(log.WithSpanFromContext(context.Background(), ctx), child)
+ }(subCtx, child)
}
// Wait for completion
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
@@ -124,7 +128,7 @@
// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
- logger.Infow(ctx, "setupNNILogicalPorts-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-nni-logical-ports-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
devicePorts, err := agent.deviceMgr.listDevicePorts(ctx, deviceID)
@@ -137,7 +141,7 @@
for _, port := range devicePorts {
if port.Type == voltha.Port_ETHERNET_NNI {
if err = agent.addNNILogicalPort(ctx, deviceID, devicePorts, port); err != nil {
- logger.Errorw(ctx, "error-adding-NNI-port", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-adding-nni-port", log.Fields{"error": err})
}
}
}
@@ -146,7 +150,7 @@
// updatePortState updates the port state of the device
func (agent *LogicalAgent) updatePortState(ctx context.Context, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Infow(ctx, "updatePortState-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
+ logger.Infow(ctx, "update-port-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID, "port-no": portNo, "state": operStatus})
portHandle, have := agent.portLoader.Lock(portNo)
if !have {
@@ -179,14 +183,14 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Infow(ctx, "setupUNILogicalPorts", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
//Get UNI port number
for _, port := range childDevicePorts {
if port.Type == voltha.Port_ETHERNET_UNI {
if err = agent.addUNILogicalPort(ctx, childDevice.Id, childDevice.AdminState, childDevice.OperStatus, childDevicePorts, port); err != nil {
- logger.Errorw(ctx, "error-adding-UNI-port", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-adding-uni-port", log.Fields{"error": err})
}
}
}
@@ -195,7 +199,7 @@
// deleteAllLogicalPorts deletes all logical ports associated with this logical device
func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
- logger.Infow(ctx, "updatePortsState-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ logger.Infow(ctx, "update-ports-state-start", log.Fields{"logical-device-id": agent.logicalDeviceID})
// for each port
for portID := range agent.portLoader.ListIDs() {
@@ -215,7 +219,8 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warnw(ctx, "device-routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -245,7 +250,8 @@
// Reset the logical device routes
go func() {
- if err := agent.buildRoutes(log.WithSpanFromContext(context.Background(), ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.buildRoutes(subCtx); err != nil {
logger.Warnw(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
}
}()
@@ -301,7 +307,7 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, deviceID string, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "addNNILogicalPort", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
+ logger.Debugw(ctx, "add-nni-logical-port", log.Fields{"logical-device-id": agent.logicalDeviceID, "nni-port": port})
label := fmt.Sprintf("nni-%d", port.PortNo)
ofpPort := *port.OfpPort
@@ -334,7 +340,8 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
// First setup the routes
- if err := agent.updateRoutes(log.WithSpanFromContext(context.Background(), ctx), deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.updateRoutes(subCtx, deviceID, devicePorts, nniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": nniPort.OfpPort.PortNo, "error": err})
@@ -351,7 +358,7 @@
// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
// scenario. This also applies to the case where the port was already added.
func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, deviceID string, deviceAdminState voltha.AdminState_Types, deviceOperStatus voltha.OperStatus_Types, devicePorts map[uint32]*voltha.Port, port *voltha.Port) error {
- logger.Debugw(ctx, "addUNILogicalPort", log.Fields{"port": port})
+ logger.Debugw(ctx, "add-uni-logical-port", log.Fields{"port": port})
if deviceAdminState != voltha.AdminState_ENABLED || deviceOperStatus != voltha.OperStatus_ACTIVE {
logger.Infow(ctx, "device-not-ready", log.Fields{"device-id": deviceID, "admin": deviceAdminState, "oper": deviceOperStatus})
return nil
@@ -384,16 +391,16 @@
// Setup the routes for this device and then send the port update event to the OF Controller
go func() {
- ctx = log.WithSpanFromContext(context.Background(), ctx)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
// First setup the routes
- if err := agent.updateRoutes(ctx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
+ if err := agent.updateRoutes(subCtx, deviceID, devicePorts, uniPort, agent.listLogicalDevicePorts(ctx)); err != nil {
// This is not an error as we may not have enough logical ports to set up routes or some PON ports have not been
// created yet.
logger.Infow(ctx, "routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": uniPort.OfpPort.PortNo, "error": err})
}
// send event, and allow any queued events to be sent as well
- queuePosition.send(ctx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
+ queuePosition.send(subCtx, agent, agent.logicalDeviceID, ofp.OfpPortReason_OFPPR_ADD, uniPort.OfpPort)
}()
return nil
}
@@ -414,7 +421,8 @@
// send is a convenience to avoid calling both assignQueuePosition and qp.send
func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
qp := e.assignQueuePosition()
- go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ go qp.send(subCtx, agent, deviceID, reason, desc)
}
// sendCompletion will make sure that given channel is notified when queue is empty
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index dcd6691..71ecf26 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -30,6 +30,7 @@
tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
@@ -46,6 +47,7 @@
kmp kafka.InterContainerProxy
logicalDeviceMgr *LogicalManager
kClient kafka.Client
+ kEventClient kafka.Client
kvClientPort int
oltAdapterName string
onuAdapterName string
@@ -68,6 +70,7 @@
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
+ test.kEventClient = mock_kafka.NewKafkaClient()
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -135,6 +138,7 @@
func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
+ cfg.EventTopic = "voltha.events"
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
grpcPort, err := freeport.GetFreePort()
@@ -157,8 +161,8 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
-
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+ eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
if err = lda.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
}
@@ -175,6 +179,9 @@
if lda.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
}
+ if lda.kEventClient != nil {
+ lda.kEventClient.Stop(ctx)
+ }
}
func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 2fca7c5..9ea12fa 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -67,7 +67,7 @@
// This can happen when an agent for the logical device has been created but the logical device
// itself is not ready for action as it is waiting for switch and port capabilities from the
// relevant adapter. In such a case prevent any request aimed at that logical device.
- logger.Debugf(ctx, "Logical device %s is not ready to serve requests", logicalDeviceID)
+ logger.Debugf(ctx, "logical-device-%s-is-not-ready-to-serve-requests", logicalDeviceID)
return nil
}
return lda
@@ -88,7 +88,8 @@
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
- logger.Debugw(ctx, "getlogicalDevice", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevice")
+ logger.Debugw(ctx, "get-logical-device", log.Fields{"logical-device-id": id})
if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.GetLogicalDeviceReadOnly(ctx)
}
@@ -97,7 +98,8 @@
//ListLogicalDevices returns the list of all logical devices
func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
- logger.Debug(ctx, "ListAllLogicalDevices")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevices")
+ logger.Debug(ctx, "list-all-logical-devices")
var logicalDevices []*voltha.LogicalDevice
if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
@@ -139,7 +141,8 @@
go func() {
//TODO: either wait for the agent to be started before returning, or
// implement locks in the agent to ensure request are not processed before start() is complete
- err := agent.start(log.WithSpanFromContext(context.Background(), ctx), false)
+ ldCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.start(ldCtx, false)
if err != nil {
logger.Errorw(ctx, "unable-to-create-the-logical-device", log.Fields{"error": err})
ldMgr.deleteLogicalDeviceAgent(id)
@@ -205,7 +208,7 @@
}
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
} else {
- logger.Debugw(ctx, "logicalDevice not in model", log.Fields{"lDeviceId": lDeviceID})
+ logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
}
// announce completion of task to any number of waiting channels
ldMgr.logicalDevicesLoadingLock.Lock()
@@ -281,7 +284,8 @@
// ListLogicalDeviceFlows returns the flows of logical device
func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlows", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlows")
+ logger.Debugw(ctx, "list-logical-device-flows", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -298,7 +302,8 @@
// ListLogicalDeviceFlowGroups returns logical device flow groups
func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
- logger.Debugw(ctx, "ListLogicalDeviceFlowGroups", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceFlowGroups")
+ logger.Debugw(ctx, "list-logical-device-flow-groups", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -315,7 +320,8 @@
// ListLogicalDevicePorts returns logical device ports
func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
- logger.Debugw(ctx, "ListLogicalDevicePorts", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDevicePorts")
+ logger.Debugw(ctx, "list-logical-device-ports", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -332,6 +338,7 @@
// GetLogicalDevicePort returns logical device port details
func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetLogicalDevicePort")
// Get the logical device where this port is attached
agent := ldMgr.getLogicalDeviceAgent(ctx, lPortID.Id)
if agent == nil {
@@ -382,7 +389,7 @@
}
func (ldMgr *LogicalManager) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"childDeviceId": childDevice.Id, "parentDeviceId": childDevice.ParentId, "current-data": childDevice})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"child-device-id": childDevice.Id, "parent-device-id": childDevice.ParentId, "current-data": childDevice})
// Sanity check
if childDevice.Root {
return errors.New("Device-root")
@@ -392,7 +399,7 @@
parentID := childDevice.ParentId
logDeviceID := ldMgr.deviceMgr.GetParentDeviceID(ctx, parentID)
- logger.Debugw(ctx, "setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceID, "parentId": parentID})
+ logger.Debugw(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": logDeviceID, "parentId": parentID})
if parentID == "" || logDeviceID == "" {
return errors.New("device-in-invalid-state")
@@ -407,7 +414,7 @@
}
func (ldMgr *LogicalManager) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "deleteAllLogicalPorts", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": device.Id})
var ldID *string
var err error
@@ -425,7 +432,7 @@
}
func (ldMgr *LogicalManager) updatePortState(ctx context.Context, deviceID string, portNo uint32, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "updatePortState", log.Fields{"device-id": deviceID, "state": state, "portNo": portNo})
+ logger.Debugw(ctx, "update-Port-state", log.Fields{"device-id": deviceID, "state": state, "port-no": portNo})
var ldID *string
var err error
@@ -444,7 +451,8 @@
// UpdateLogicalDeviceFlowTable updates logical device flow table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceFlowTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowTable")
+ logger.Debugw(ctx, "update-logical-device-flow-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -454,7 +462,8 @@
// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateLogicalDeviceMeterTable", log.Fields{"logical-device-id": meter.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceMeterTable")
+ logger.Debugw(ctx, "update-logical-device-meter-table", log.Fields{"logical-device-id": meter.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
@@ -464,7 +473,8 @@
// ListLogicalDeviceMeters returns logical device meters
func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
- logger.Debugw(ctx, "ListLogicalDeviceMeters", log.Fields{"logical-device-id": id.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "ListLogicalDeviceMeters")
+ logger.Debugw(ctx, "list-logical-device-meters", log.Fields{"logical-device-id": id.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -480,7 +490,8 @@
// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
- logger.Debugw(ctx, "UpdateGroupTable", log.Fields{"logical-device-id": flow.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateLogicalDeviceFlowGroupTable")
+ logger.Debugw(ctx, "update-group-table", log.Fields{"logical-device-id": flow.Id})
agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
@@ -490,7 +501,8 @@
// EnableLogicalDevicePort enables logical device port
func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "EnableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "EnableLogicalDevicePort")
+ logger.Debugw(ctx, "enable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -504,7 +516,8 @@
// DisableLogicalDevicePort disables logical device port
func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw(ctx, "DisableLogicalDevicePort", log.Fields{"logical-device-id": id})
+ ctx = utils.WithRPCMetadataContext(ctx, "DisableLogicalDevicePort")
+ logger.Debugw(ctx, "disable-logical-device-port", log.Fields{"logical-device-id": id})
agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -517,7 +530,7 @@
}
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
- logger.Debugw(ctx, "packetIn", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
+ logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
agent.packetIn(ctx, port, transactionID, packet)
} else {
@@ -529,35 +542,45 @@
// StreamPacketsOut sends packets to adapter
func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
ctx := context.Background()
- logger.Debugw(ctx, "StreamPacketsOut-request", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request", log.Fields{"packets": packets})
+
loop:
for {
select {
case <-packets.Context().Done():
- logger.Infow(ctx, "StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+ logger.Infow(ctx, "stream-packets-out-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
break loop
default:
}
packet, err := packets.Recv()
+ pktCtx := utils.WithRPCMetadataContext(packets.Context(), "StreamPacketsOut")
+
if err == io.EOF {
- logger.Debugw(ctx, "Received-EOF", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "received-eof", log.Fields{"packets": packets})
break loop
}
-
if err != nil {
- logger.Errorw(ctx, "Failed to receive packet out", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
+ go ldMgr.SendRPCEvent(pktCtx, packet.Id, err.Error(), nil,
+ "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
continue
}
- if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
- agent.packetOut(packets.Context(), packet.PacketOut)
+ if agent := ldMgr.getLogicalDeviceAgent(pktCtx, packet.Id); agent != nil {
+ agent.packetOut(pktCtx, packet.PacketOut)
} else {
- logger.Errorf(ctx, "No logical device agent present", log.Fields{"logical-device-id": packet.Id})
+ logger.Errorf(ctx, "no-logical-device-agent-present", log.Fields{"logical-device-id": packet.Id})
}
}
- logger.Debugw(ctx, "StreamPacketsOut-request-done", log.Fields{"packets": packets})
+ logger.Debugw(ctx, "stream-packets-out-request-done", log.Fields{"packets": packets})
return nil
}
+
+func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+ id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+ category, subCategory, raisedTs)
+}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index f92f2e5..cbc6bb9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -29,6 +29,7 @@
"github.com/opencord/voltha-go/rw_core/core/device/remote"
"github.com/opencord/voltha-go/rw_core/core/device/state"
"github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/common"
@@ -42,10 +43,11 @@
// Manager represent device manager attributes
type Manager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
- adapterProxy *remote.AdapterProxy
+ deviceAgents sync.Map
+ rootDevices map[string]bool
+ lockRootDeviceMap sync.RWMutex
+ adapterProxy *remote.AdapterProxy
+ *event.RPCEventManager
adapterMgr *adapter.Manager
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
@@ -59,7 +61,7 @@
}
//NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
@@ -69,12 +71,13 @@
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout,
+ RPCEventManager: event.NewRPCEventManager(eventProxy, coreInstanceID),
deviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
logicalDeviceMgr := &LogicalManager{
- Manager: event.NewManager(),
+ Manager: event.NewManager(eventProxy, coreInstanceID),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
dbPath: dbPath,
@@ -141,21 +144,22 @@
// CreateDevice creates a new parent device in the data model
func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
- logger.Errorf(ctx, "No Device Info Present")
+ logger.Errorf(ctx, "no-device-info-present")
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
+ ctx = utils.WithRPCMetadataContext(ctx, "CreateDevice")
logger.Debugw(ctx, "create-device", log.Fields{"device": *device})
deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
- logger.Errorf(ctx, "Failed to fetch parent device info")
+ logger.Errorf(ctx, "failed-to-fetch-parent-device-info")
return nil, err
}
if deviceExist {
- logger.Errorf(ctx, "Device is Pre-provisioned already with same IP-Port or MAC Address")
+ logger.Errorf(ctx, "device-is-pre-provisioned-already-with-same-ip-port-or-mac-address")
return nil, errors.New("device is already pre-provisioned")
}
- logger.Debugw(ctx, "CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+ logger.Debugw(ctx, "create-device", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Ensure this device is set as root
device.Root = true
@@ -163,7 +167,7 @@
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
- logger.Errorw(ctx, "Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
+ logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
return nil, err
}
dMgr.addDeviceAgentToMap(agent)
@@ -172,9 +176,9 @@
// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "EnableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "EnableDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "enable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -184,9 +188,9 @@
// DisableDevice disables a device along with any child device it may have
func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DisableDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "DisableDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "disable-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -196,9 +200,9 @@
//RebootDevice invoked the reboot API to the corresponding adapter
func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "RebootDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "RebootDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -208,8 +212,9 @@
// DeleteDevice removes a device from the data model
func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
- logger.Debugw(ctx, "DeleteDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -219,8 +224,9 @@
// ForceDeleteDevice removes a device from the data model forcefully without successfully waiting for the adapters.
func (dMgr *Manager) ForceDeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ForceDeleteDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
- logger.Debugw(ctx, "ForceDeleteDevice", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "force-delete-device", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -230,7 +236,7 @@
// GetDevicePort returns the port details for a specific device port entry
func (dMgr *Manager) GetDevicePort(ctx context.Context, deviceID string, portID uint32) (*voltha.Port, error) {
- logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "get-device-port", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", deviceID)
@@ -240,9 +246,9 @@
// ListDevicePorts returns the ports details for a specific device entry
func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePorts")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -259,9 +265,9 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlows")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDeviceFlows", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-flows", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -278,9 +284,9 @@
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceFlowGroups")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-device-flow-groups", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -298,7 +304,7 @@
// This function is called only in the Core that does not own this device. In the Core that owns this device then a
// deletion deletion also includes removal of any reference of this device.
func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
- logger.Infow(ctx, "stopManagingDevice", log.Fields{"device-id": id})
+ logger.Infow(ctx, "stop-managing-device", log.Fields{"device-id": id})
if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
if device, err := dMgr.getDeviceReadOnly(ctx, id); err == nil && device.Root {
// stop managing the logical device
@@ -315,7 +321,7 @@
// RunPostDeviceDelete removes any reference of this device
func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
- logger.Infow(ctx, "RunPostDeviceDelete", log.Fields{"device-id": cDevice.Id})
+ logger.Infow(ctx, "run-post-device-delete", log.Fields{"device-id": cDevice.Id})
dMgr.stopManagingDevice(ctx, cDevice.Id)
return nil
}
@@ -323,13 +329,14 @@
// GetDevice exists primarily to implement the gRPC interface.
// Internal functions should call getDeviceReadOnly instead.
func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetDevice")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
return dMgr.getDeviceReadOnly(ctx, id.Id)
}
// getDeviceReadOnly will returns a device, either from memory or from the dB, if present
func (dMgr *Manager) getDeviceReadOnly(ctx context.Context, id string) (*voltha.Device, error) {
- logger.Debugw(ctx, "getDeviceReadOnly", log.Fields{"device-id": id})
+ logger.Debugw(ctx, "get-device-read-only", log.Fields{"device-id": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDeviceReadOnly(ctx)
}
@@ -337,7 +344,7 @@
}
func (dMgr *Manager) listDevicePorts(ctx context.Context, id string) (map[uint32]*voltha.Port, error) {
- logger.Debugw(ctx, "listDevicePorts", log.Fields{"device-id": id})
+ logger.Debugw(ctx, "list-device-ports", log.Fields{"device-id": id})
agent := dMgr.getDeviceAgent(ctx, id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id)
@@ -347,8 +354,8 @@
// GetChildDevice will return a device, either from memory or from the dB, if present
func (dMgr *Manager) GetChildDevice(ctx context.Context, parentDeviceID string, serialNumber string, onuID int64, parentPortNo int64) (*voltha.Device, error) {
- logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
- "parentPortNo": parentPortNo, "onuId": onuID})
+ logger.Debugw(ctx, "get-child-device", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber,
+ "parent-port-no": parentPortNo, "onu-id": onuID})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
@@ -356,7 +363,7 @@
}
childDeviceIds := dMgr.getAllChildDeviceIds(ctx, parentDevicePorts)
if len(childDeviceIds) == 0 {
- logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw(ctx, "no-child-devices", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber, "onu-id": onuID})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
@@ -368,14 +375,14 @@
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
if searchDevice.ParentPortNo == uint32(parentPortNo) {
- logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onuId": onuID})
+ logger.Debugw(ctx, "found-child-by-onuid", log.Fields{"parent-device-id": parentDeviceID, "onu-id": onuID})
foundOnuID = true
}
}
foundSerialNumber := false
if searchDevice.SerialNumber == serialNumber {
- logger.Debugw(ctx, "found-child-by-serialnumber", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+ logger.Debugw(ctx, "found-child-by-serial-number", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
foundSerialNumber = true
}
@@ -395,18 +402,18 @@
}
if foundChildDevice != nil {
- logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "foundChildDevice": foundChildDevice})
+ logger.Debugw(ctx, "child-device-found", log.Fields{"parent-device-id": parentDeviceID, "found-child-device": foundChildDevice})
return foundChildDevice, nil
}
logger.Debugw(ctx, "child-device-not-found", log.Fields{"parent-device-id": parentDeviceID,
- "serialNumber": serialNumber, "onuId": onuID, "parentPortNo": parentPortNo})
+ "serial-number": serialNumber, "onu-id": onuID, "parent-port-no": parentPortNo})
return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
}
// GetChildDeviceWithProxyAddress will return a device based on proxy address
func (dMgr *Manager) GetChildDeviceWithProxyAddress(ctx context.Context, proxyAddress *voltha.Device_ProxyAddress) (*voltha.Device, error) {
- logger.Debugw(ctx, "GetChildDeviceWithProxyAddress", log.Fields{"proxyAddress": proxyAddress})
+ logger.Debugw(ctx, "get-child-device-with-proxy-address", log.Fields{"proxy-address": proxyAddress})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, proxyAddress.DeviceId)
if err != nil {
@@ -429,11 +436,11 @@
}
if foundChildDevice != nil {
- logger.Debugw(ctx, "child-device-found", log.Fields{"proxyAddress": proxyAddress})
+ logger.Debugw(ctx, "child-device-found", log.Fields{"proxy-address": proxyAddress})
return foundChildDevice, nil
}
- logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxyAddress": proxyAddress})
+ logger.Warnw(ctx, "child-device-not-found", log.Fields{"proxy-address": proxyAddress})
return nil, status.Errorf(codes.NotFound, "%s", proxyAddress)
}
@@ -445,7 +452,8 @@
// ListDevices retrieves the latest devices from the data model
func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
- logger.Debug(ctx, "ListDevices")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevices")
+ logger.Debug(ctx, "list-devices")
result := &voltha.Devices{}
var devices []*voltha.Device
@@ -467,7 +475,7 @@
}
result.Items = append(result.Items, device)
}
- logger.Debugw(ctx, "ListDevices-end", log.Fields{"len": len(result.Items)})
+ logger.Debugw(ctx, "list-devices-end", log.Fields{"len": len(result.Items)})
return result, nil
}
@@ -476,7 +484,7 @@
hostPort := newDevice.GetHostAndPort()
var devices []*voltha.Device
if err := dMgr.dProxy.List(ctx, &devices); err != nil {
- logger.Errorw(ctx, "Failed to list devices from cluster data proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-list-devices-from-cluster-data-proxy", log.Fields{"error": err})
return false, err
}
for _, device := range devices {
@@ -524,12 +532,12 @@
logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
- logger.Warnw(ctx, "Failure loading device", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
} else {
dMgr.addDeviceAgentToMap(agent)
}
} else {
- logger.Debugw(ctx, "Device not in model", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "device-is-not-in-model", log.Fields{"device-id": deviceID})
}
// announce completion of task to any number of waiting channels
dMgr.devicesLoadingLock.Lock()
@@ -577,7 +585,7 @@
return err
}
}
- logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "numChildren": len(childDeviceIds)})
+ logger.Debugw(ctx, "loaded-children", log.Fields{"device-id": device.Id, "num-children": len(childDeviceIds)})
}
return nil
}
@@ -626,7 +634,8 @@
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
- logger.Debug(ctx, "ListDeviceIDs")
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDeviceIds")
+ logger.Debug(ctx, "list-device-ids")
// Report only device IDs that are in the device agent map
return dMgr.listDeviceIdsFromMap(), nil
}
@@ -634,7 +643,8 @@
// ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
// trigger loading the devices along with their children and parent in memory
func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
- logger.Debugw(ctx, "ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
+ ctx = utils.WithRPCMetadataContext(ctx, "ReconcileDevices")
+ logger.Debugw(ctx, "reconcile-devices", log.Fields{"num-devices": len(ids.Items)})
if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
@@ -668,12 +678,12 @@
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
- logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
- "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
+ logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
+ "current-replica": adapter.CurrentReplica, "total-replicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
// Let's reconcile the device managed by this Core only
if len(dMgr.rootDevices) == 0 {
- logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapterId": adapter.Id})
+ logger.Debugw(ctx, "nothing-to-reconcile", log.Fields{"adapter-id": adapter.Id})
return nil
}
@@ -683,7 +693,7 @@
if dAgent == nil {
continue
}
- logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapterType": adapter.Type})
+ logger.Debugw(ctx, "checking-adapter-type", log.Fields{"agentType": dAgent.deviceType, "adapter-type": adapter.Type})
if dAgent.deviceType == adapter.Type {
rootDevice, _ := dAgent.getDeviceReadOnly(ctx)
if rootDevice == nil {
@@ -691,7 +701,7 @@
}
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
if err != nil {
- logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+ logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
continue
}
if isDeviceOwnedByService {
@@ -709,7 +719,7 @@
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
if err != nil {
- logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
+ logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
}
if isDeviceOwnedByService {
if dMgr.isOkToReconcile(ctx, childDevice) {
@@ -735,7 +745,7 @@
return status.Errorf(codes.Aborted, "errors-%s", res)
}
} else {
- logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapterId": adapter.Id})
+ logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
}
return nil
}
@@ -782,7 +792,7 @@
}
func (dMgr *Manager) UpdateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "UpdateDeviceUsingAdapterData", log.Fields{"device-id": device.Id, "device": device})
+ logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id, "device": device})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.updateDeviceUsingAdapterData(ctx, device)
}
@@ -809,7 +819,9 @@
if err != nil {
return err
}
- if err = dMgr.logicalDeviceMgr.updateLogicalPort(log.WithSpanFromContext(context.Background(), ctx), device, ports, port); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+
+ if err = dMgr.logicalDeviceMgr.updateLogicalPort(subCtx, device, ports, port); err != nil {
return err
}
return nil
@@ -823,7 +835,8 @@
}
// Setup peer ports in its own routine
go func() {
- if err := dMgr.addPeerPort(log.WithSpanFromContext(context.Background(), ctx), deviceID, port); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := dMgr.addPeerPort(subCtx, deviceID, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
}()
@@ -833,7 +846,7 @@
}
func (dMgr *Manager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "addFlowsAndGroups", log.Fields{"device-id": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
+ logger.Debugw(ctx, "add-flows-and-groups", log.Fields{"device-id": deviceID, "groups:": groups, "flow-metadata": flowMetadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.addFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -842,7 +855,7 @@
// deleteParentFlows removes flows from the parent device based on specific attributes
func (dMgr *Manager) deleteParentFlows(ctx context.Context, deviceID string, uniPort uint32, metadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "deleteParentFlows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
+ logger.Debugw(ctx, "delete-parent-flows", log.Fields{"device-id": deviceID, "uni-port": uniPort, "metadata": metadata})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if !agent.isRootDevice {
return status.Errorf(codes.FailedPrecondition, "not-a-parent-device-%s", deviceID)
@@ -853,7 +866,7 @@
}
func (dMgr *Manager) deleteFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "deleteFlowsAndGroups", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "delete-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.deleteFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -861,7 +874,7 @@
}
func (dMgr *Manager) updateFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "updateFlowsAndGroups", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "update-flows-and-groups", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateFlowsAndGroups(ctx, flows, groups, flowMetadata)
}
@@ -871,8 +884,8 @@
// UpdateDevicePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": configs.Id})
-
if configs.Id == "" {
return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
}
@@ -896,8 +909,8 @@
// ListDevicePmConfigs returns pm configs of device
func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListDevicePmConfigs")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -906,7 +919,7 @@
}
func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
- logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.getSwitchCapability(ctx)
}
@@ -914,7 +927,7 @@
}
func (dMgr *Manager) GetPorts(ctx context.Context, deviceID string, portType voltha.Port_PortType) (*voltha.Ports, error) {
- logger.Debugw(ctx, "GetPorts", log.Fields{"device-id": deviceID, "portType": portType})
+ logger.Debugw(ctx, "get-ports", log.Fields{"device-id": deviceID, "port-type": portType})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", deviceID)
@@ -923,7 +936,7 @@
}
func (dMgr *Manager) UpdateDeviceStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw(ctx, "UpdateDeviceStatus", log.Fields{"device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+ logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceStatus(ctx, operStatus, connStatus)
}
@@ -931,7 +944,7 @@
}
func (dMgr *Manager) UpdateChildrenStatus(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
- logger.Debugw(ctx, "UpdateChildrenStatus", log.Fields{"parent-device-id": deviceID, "operStatus": operStatus, "connStatus": connStatus})
+ logger.Debugw(ctx, "update-children-status", log.Fields{"parent-device-id": deviceID, "oper-status": operStatus, "conn-status": connStatus})
parentDevicePorts, err := dMgr.listDevicePorts(ctx, deviceID)
if err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
@@ -947,17 +960,18 @@
}
func (dMgr *Manager) UpdatePortState(ctx context.Context, deviceID string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "UpdatePortState", log.Fields{"device-id": deviceID, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+ logger.Debugw(ctx, "update-port-state", log.Fields{"device-id": deviceID, "port-type": portType, "port-no": portNo, "oper-status": operStatus})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updatePortState(ctx, portType, portNo, operStatus); err != nil {
- logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "portNo": portNo, "error": err})
+ logger.Errorw(ctx, "updating-port-state-failed", log.Fields{"device-id": deviceID, "port-no": portNo, "error": err})
return err
}
// Notify the logical device manager to change the port state
// Do this for NNI and UNIs only. PON ports are not known by logical device
if portType == voltha.Port_ETHERNET_NNI || portType == voltha.Port_ETHERNET_UNI {
go func() {
- err := dMgr.logicalDeviceMgr.updatePortState(log.WithSpanFromContext(context.Background(), ctx), deviceID, portNo, operStatus)
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := dMgr.logicalDeviceMgr.updatePortState(subCtx, deviceID, portNo, operStatus)
if err != nil {
// While we want to handle (catch) and log when
// an update to a port was not able to be
@@ -975,7 +989,7 @@
}
func (dMgr *Manager) DeleteAllPorts(ctx context.Context, deviceID string) error {
- logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "delete-all-ports", log.Fields{"device-id": deviceID})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.deleteAllPorts(ctx); err != nil {
return err
@@ -985,7 +999,8 @@
// typically is part of a device deletion phase.
if device, err := dMgr.getDeviceReadOnly(ctx, deviceID); err == nil {
go func() {
- if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(log.WithSpanFromContext(context.Background(), ctx), device); err != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
}
}()
@@ -1000,7 +1015,7 @@
//UpdatePortsState updates all ports on the device
func (dMgr *Manager) UpdatePortsState(ctx context.Context, deviceID string, portTypeFilter uint32, state voltha.OperStatus_Types) error {
- logger.Debugw(ctx, "UpdatePortsState", log.Fields{"device-id": deviceID})
+ logger.Debugw(ctx, "update-ports-state", log.Fields{"device-id": deviceID})
agent := dMgr.getDeviceAgent(ctx, deviceID)
if agent == nil {
return status.Errorf(codes.NotFound, "%s", deviceID)
@@ -1009,7 +1024,7 @@
return status.Error(codes.Unimplemented, "state-change-not-implemented")
}
if err := agent.updatePortsOperState(ctx, portTypeFilter, state); err != nil {
- logger.Warnw(ctx, "updatePortsOperState-failed", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "update-ports-state-failed", log.Fields{"device-id": deviceID, "error": err})
return err
}
return nil
@@ -1017,7 +1032,7 @@
func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int64, deviceType string,
channelID int64, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error) {
- logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceID, "parentPortNo": parentPortNo, "deviceType": deviceType, "channelId": channelID, "vendorId": vendorID, "serialNumber": serialNumber, "onuId": onuID})
+ logger.Debugw(ctx, "child-device-detected", log.Fields{"parent-device-id": parentDeviceID, "parent-port-no": parentPortNo, "device-type": deviceType, "channel-id": channelID, "vendor-id": vendorID, "serial-number": serialNumber, "onu-id": onuID})
if deviceType == "" && vendorID != "" {
logger.Debug(ctx, "device-type-is-nil-fetching-device-type")
@@ -1037,7 +1052,7 @@
}
//if no match found for the vendorid,report adapter with the custom error message
if deviceType == "" {
- logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendorId": vendorID})
+ logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendor-id": vendorID})
return nil, status.Errorf(codes.NotFound, "%s", vendorID)
}
@@ -1060,7 +1075,7 @@
}
if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
- logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serialNumber": serialNumber})
+ logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": parentDeviceID, "serial-number": serialNumber})
return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
}
@@ -1078,7 +1093,8 @@
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
go func() {
- err := agent.enableDevice(log.WithSpanFromContext(context.Background(), ctx))
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
+ err := agent.enableDevice(subCtx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
@@ -1089,7 +1105,7 @@
}
func (dMgr *Manager) packetOut(ctx context.Context, deviceID string, outPort uint32, packet *ofp.OfpPacketOut) error {
- logger.Debugw(ctx, "packetOut", log.Fields{"device-id": deviceID, "outPort": outPort})
+ logger.Debugw(ctx, "packet-out", log.Fields{"device-id": deviceID, "out-port": outPort})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.packetOut(ctx, outPort, packet)
}
@@ -1098,7 +1114,7 @@
// PacketIn receives packet from adapter
func (dMgr *Manager) PacketIn(ctx context.Context, deviceID string, port uint32, transactionID string, packet []byte) error {
- logger.Debugw(ctx, "PacketIn", log.Fields{"device-id": deviceID, "port": port})
+ logger.Debugw(ctx, "packet-in", log.Fields{"device-id": deviceID, "port": port})
// Get the logical device Id based on the deviceId
var device *voltha.Device
var err error
@@ -1118,24 +1134,25 @@
}
func (dMgr *Manager) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
- logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parentId": parentID})
+ logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
return agent.setParentID(ctx, device, parentID)
}
return status.Errorf(codes.NotFound, "%s", device.Id)
}
-// CreateLogicalDevice creates logical device in core
+//
+//CreateLogicalDevice creates logical device in core
func (dMgr *Manager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "CreateLogicalDevice")
+ logger.Info(ctx, "create-logical-device")
// Verify whether the logical device has already been created
if cDevice.ParentId != "" {
- logger.Debugw(ctx, "Parent device already exist.", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
+ logger.Debugw(ctx, "parent-device-already-exist", log.Fields{"device-id": cDevice.Id, "logical-device-id": cDevice.Id})
return nil
}
var err error
if _, err = dMgr.logicalDeviceMgr.createLogicalDevice(ctx, cDevice); err != nil {
- logger.Warnw(ctx, "createlogical-device-error", log.Fields{"device": cDevice})
+ logger.Warnw(ctx, "create-logical-device-error", log.Fields{"device": cDevice})
return err
}
return nil
@@ -1143,10 +1160,10 @@
// DeleteLogicalDevice deletes logical device from core
func (dMgr *Manager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "DeleteLogicalDevice")
+ logger.Info(ctx, "delete-logical-device")
var err error
if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
- logger.Warnw(ctx, "deleteLogical-device-error", log.Fields{"device-id": cDevice.Id})
+ logger.Warnw(ctx, "delete-logical-device-error", log.Fields{"device-id": cDevice.Id})
return err
}
// Remove the logical device Id from the parent device
@@ -1160,7 +1177,7 @@
logger.Debugw(ctx, "delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
// Just log the error. The logical device or port may already have been deleted before this callback is invoked.
- logger.Warnw(ctx, "deleteLogical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
+ logger.Warnw(ctx, "delete-logical-ports-error", log.Fields{"device-id": cDevice.Id, "error": err})
}
return nil
}
@@ -1178,7 +1195,7 @@
//ChildDevicesLost is invoked by an adapter to indicate that a parent device is in a state (Disabled) where it
//cannot manage the child devices. This will trigger the Core to disable all the child devices.
func (dMgr *Manager) ChildDevicesLost(ctx context.Context, parentDeviceID string) error {
- logger.Debug(ctx, "ChildDevicesLost")
+ logger.Debug(ctx, "child-devices-lost")
parentDevice, err := dMgr.getDeviceReadOnly(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"parent-device-id": parentDeviceID, "error": err})
@@ -1190,7 +1207,7 @@
//ChildDevicesDetected is invoked by an adapter when child devices are found, typically after after a
// disable/enable sequence. This will trigger the Core to Enable all the child devices of that parent.
func (dMgr *Manager) ChildDevicesDetected(ctx context.Context, parentDeviceID string) error {
- logger.Debug(ctx, "ChildDevicesDetected")
+ logger.Debug(ctx, "child-devices-detected")
parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID)
if err != nil {
logger.Warnw(ctx, "failed-getting-device", log.Fields{"device-id": parentDeviceID, "error": err})
@@ -1203,16 +1220,17 @@
allChildEnableRequestSent := true
for childDeviceID := range childDeviceIds {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
+ subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
// Run the children re-registration in its own routine
go func(ctx context.Context) {
err = agent.enableDevice(ctx)
if err != nil {
logger.Errorw(ctx, "unable-to-enable-device", log.Fields{"error": err})
}
- }(log.WithSpanFromContext(context.Background(), ctx))
+ }(subCtx)
} else {
err = status.Errorf(codes.Unavailable, "no agent for child device %s", childDeviceID)
- logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "childId": childDeviceID})
+ logger.Errorw(ctx, "no-child-device-agent", log.Fields{"parent-device-id": parentDeviceID, "child-id": childDeviceID})
allChildEnableRequestSent = false
}
}
@@ -1229,7 +1247,7 @@
//DisableAllChildDevices is invoked as a callback when the parent device is disabled
func (dMgr *Manager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
- logger.Debug(ctx, "DisableAllChildDevices")
+ logger.Debug(ctx, "disable-all-child-devices")
ports, _ := dMgr.listDevicePorts(ctx, parentCurrDevice.Id)
for childDeviceID := range dMgr.getAllChildDeviceIds(ctx, ports) {
if agent := dMgr.getDeviceAgent(ctx, childDeviceID); agent != nil {
@@ -1244,7 +1262,7 @@
//DeleteAllChildDevices is invoked as a callback when the parent device is deleted
func (dMgr *Manager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device) error {
- logger.Debug(ctx, "DeleteAllChildDevices")
+ logger.Debug(ctx, "delete-all-child-devices")
force := false
// Get the parent device Transient state, if its FORCE_DELETED(go for force delete for child devices)
// So in cases when this handler is getting called other than DELETE operation, no force option would be used.
@@ -1301,7 +1319,7 @@
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevicePorts map[uint32]*voltha.Port) map[string]struct{} {
- logger.Debug(ctx, "getAllChildDeviceIds")
+ logger.Debug(ctx, "get-all-child-device-ids")
childDeviceIds := make(map[string]struct{}, len(parentDevicePorts))
for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
@@ -1314,7 +1332,7 @@
//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
- logger.Debugw(ctx, "GetAllChildDevices", log.Fields{"parent-device-id": parentDeviceID})
+ logger.Debugw(ctx, "get-all-child-devices", log.Fields{"parent-device-id": parentDeviceID})
if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
for deviceID := range dMgr.getAllChildDeviceIds(ctx, parentDevicePorts) {
@@ -1329,13 +1347,13 @@
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "SetupUNILogicalPorts")
+ logger.Info(ctx, "setup-uni-logical-ports")
cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
if err != nil {
return err
}
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
- logger.Warnw(ctx, "setupUNILogicalPorts-error", log.Fields{"device": cDevice, "err": err})
+ logger.Warnw(ctx, "setup-uni-logical-ports-error", log.Fields{"device": cDevice, "err": err})
return err
}
return nil
@@ -1346,9 +1364,9 @@
// DownloadImage execute an image download request
func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DownloadImage")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "download-image", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1362,9 +1380,9 @@
// CancelImageDownload cancels image download request
func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "CancelImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1378,9 +1396,9 @@
// ActivateImageUpdate activates image update request
func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ActivateImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "activate-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1394,9 +1412,9 @@
// RevertImageUpdate reverts image update
func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "RevertImageUpdate")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "rever-image-update", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1413,9 +1431,9 @@
// GetImageDownloadStatus returns status of image download
func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownloadStatus")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "get-image-download-status", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1428,12 +1446,12 @@
}
func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
+ ctx = utils.WithRPCMetadataContext(ctx, "UpdateImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "update-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
- logger.Debugw(ctx, "UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ logger.Debugw(ctx, "update-image-download-failed", log.Fields{"err": err, "image-name": img.Name})
return err
}
} else {
@@ -1444,9 +1462,9 @@
// GetImageDownload returns image download
func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImageDownload")
log.EnrichSpan(ctx, log.Fields{"device-id": img.Id})
-
- logger.Debugw(ctx, "GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ logger.Debugw(ctx, "get-image-download", log.Fields{"device-id": img.Id, "image-name": img.Name})
agent := dMgr.getDeviceAgent(ctx, img.Id)
if agent == nil {
return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
@@ -1460,9 +1478,9 @@
// ListImageDownloads returns image downloads
func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "ListImageDownloads")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "ListImageDownloads", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "list-image-downloads", log.Fields{"device-id": id.Id})
agent := dMgr.getDeviceAgent(ctx, id.Id)
if agent == nil {
return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
@@ -1476,9 +1494,9 @@
// GetImages returns all images for a specific device entry
func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetImages")
log.EnrichSpan(ctx, log.Fields{"device-id": id.Id})
-
- logger.Debugw(ctx, "GetImages", log.Fields{"device-id": id.Id})
+ logger.Debugw(ctx, "get-images", log.Fields{"device-id": id.Id})
device, err := dMgr.getDeviceReadOnly(ctx, id.Id)
if err != nil {
return nil, err
@@ -1487,7 +1505,7 @@
}
func (dMgr *Manager) NotifyInvalidTransition(ctx context.Context, device *voltha.Device) error {
- logger.Errorw(ctx, "NotifyInvalidTransition", log.Fields{
+ logger.Errorw(ctx, "notify-invalid-transition", log.Fields{
"device": device.Id,
"curr-admin-state": device.AdminState,
"curr-oper-state": device.OperStatus,
@@ -1507,16 +1525,17 @@
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
if device, _ := dMgr.getDeviceReadOnly(ctx, deviceID); device != nil {
- logger.Infow(ctx, "GetParentDeviceId", log.Fields{"device-id": device.Id, "parentId": device.ParentId})
+ logger.Infow(ctx, "get-parent-device-id", log.Fields{"device-id": device.Id, "parent-id": device.ParentId})
return device.ParentId
}
return ""
}
func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
- logger.Debugw(ctx, "SimulateAlarm", log.Fields{"id": simulateReq.Id, "Indicator": simulateReq.Indicator, "IntfId": simulateReq.IntfId,
- "PortTypeName": simulateReq.PortTypeName, "OnuDeviceId": simulateReq.OnuDeviceId, "InverseBitErrorRate": simulateReq.InverseBitErrorRate,
- "Drift": simulateReq.Drift, "NewEqd": simulateReq.NewEqd, "OnuSerialNumber": simulateReq.OnuSerialNumber, "Operation": simulateReq.Operation})
+ ctx = utils.WithRPCMetadataContext(ctx, "SimulateAlarm")
+ logger.Debugw(ctx, "simulate-alarm", log.Fields{"id": simulateReq.Id, "indicator": simulateReq.Indicator, "intf-id": simulateReq.IntfId,
+ "port-type-name": simulateReq.PortTypeName, "onu-device-id": simulateReq.OnuDeviceId, "inverse-bit-error-rate": simulateReq.InverseBitErrorRate,
+ "drift": simulateReq.Drift, "new-eqd": simulateReq.NewEqd, "onu-serial-number": simulateReq.OnuSerialNumber, "operation": simulateReq.Operation})
agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
@@ -1528,7 +1547,7 @@
}
func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
- logger.Debugw(ctx, "UpdateDeviceReason", log.Fields{"device-id": deviceID, "reason": reason})
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": deviceID, "reason": reason})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
return agent.updateDeviceReason(ctx, reason)
}
@@ -1536,9 +1555,9 @@
}
func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "EnablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
- logger.Debugw(ctx, "EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ logger.Debugw(ctx, "enable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1547,9 +1566,9 @@
}
func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "DisablePort")
log.EnrichSpan(ctx, log.Fields{"device-id": port.DeviceId})
-
- logger.Debugw(ctx, "DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
+ logger.Debugw(ctx, "disable-port", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
@@ -1559,7 +1578,7 @@
// ChildDeviceLost calls parent adapter to delete child device and all its references
func (dMgr *Manager) ChildDeviceLost(ctx context.Context, curr *voltha.Device) error {
- logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
+ logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": curr.Id, "parent-device-id": curr.ParentId})
if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
if err := parentAgent.ChildDeviceLost(ctx, curr); err != nil {
// Just log the message and let the remaining pipeline proceed.
@@ -1571,9 +1590,9 @@
}
func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "StartOmciTestAction")
log.EnrichSpan(ctx, log.Fields{"device-id": request.Id})
-
- logger.Debugw(ctx, "StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
+ logger.Debugw(ctx, "start-omci-test-action", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
agent := dMgr.getDeviceAgent(ctx, request.Id)
if agent == nil {
return nil, status.Errorf(codes.NotFound, "%s", request.Id)
@@ -1582,9 +1601,9 @@
}
func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
+ ctx = utils.WithRPCMetadataContext(ctx, "GetExtValue")
log.EnrichSpan(ctx, log.Fields{"device-id": value.Id})
-
- logger.Debugw(ctx, "getExtValue", log.Fields{"onu-id": value.Id})
+ logger.Debugw(ctx, "get-ext-value", log.Fields{"onu-id": value.Id})
cDevice, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1598,7 +1617,7 @@
if err != nil {
return nil, err
}
- logger.Debugw(ctx, "getExtValue-result", log.Fields{"result": resp})
+ logger.Debugw(ctx, "get-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
@@ -1607,7 +1626,8 @@
// SetExtValue set some given configs or value
func (dMgr *Manager) SetExtValue(ctx context.Context, value *voltha.ValueSet) (*empty.Empty, error) {
- logger.Debugw(ctx, "setExtValue", log.Fields{"onu-id": value.Id})
+ ctx = utils.WithRPCMetadataContext(ctx, "SetExtValue")
+ logger.Debugw(ctx, "set-ext-value", log.Fields{"onu-id": value.Id})
device, err := dMgr.getDeviceReadOnly(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
@@ -1617,9 +1637,15 @@
if err != nil {
return nil, err
}
- logger.Debugw(ctx, "setExtValue-result", log.Fields{"result": resp})
+ logger.Debugw(ctx, "set-ext-value-result", log.Fields{"result": resp})
return resp, nil
}
return nil, status.Errorf(codes.NotFound, "%s", value.Id)
}
+
+func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
+ category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ dMgr.RPCEventManager.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+}