VOL-3501 Code changes to support rpc event
Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c122574..84f765f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -133,7 +133,7 @@
agent.portLoader.Load(ctx)
agent.transientStateLoader.Load(ctx)
- logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+ logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -172,7 +172,7 @@
}
defer agent.requestQueue.RequestComplete()
- logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
+ logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
// Remove the device transient loader
if err := agent.deleteTransientState(ctx); err != nil {
return err
@@ -236,7 +236,7 @@
// TODO: Post failure message onto kafka
}
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
select {
@@ -283,19 +283,31 @@
}
-func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ //add failure
} else if rpcResponse.Err != nil {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+ //add failure
} else {
onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
@@ -326,7 +338,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
oldDevice := agent.getDeviceReadOnlyWithoutLock()
if oldDevice.AdminState == voltha.AdminState_ENABLED {
@@ -360,6 +372,8 @@
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
@@ -376,16 +390,28 @@
func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
defer cancel()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
select {
case rpcResponse, ok := <-ch:
if !ok {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
response.Error(status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
+ //add failure
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
response.Error(rpcResponse.Err)
} else {
response.Done()
}
case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
response.Error(ctx.Err())
}
}
@@ -451,7 +477,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
cloned := agent.cloneDeviceWithoutLock()
@@ -476,6 +502,8 @@
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
@@ -491,13 +519,15 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
if agent.isDeletionInProgress() {
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
cancel()
@@ -508,7 +538,7 @@
}
func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -521,27 +551,29 @@
agent.deviceID)
}
device := agent.cloneDeviceWithoutLock()
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
return err
}
previousAdminState := device.AdminState
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
return err
}
// Since it is a case of force delete, nothing needs to be done on adapter responses.
- go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+ go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
agent.onFailure)
}
return nil
}
func (agent *Agent) deleteDevice(ctx context.Context) error {
- logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -561,14 +593,16 @@
// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
- if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
- previousDeviceTransientState); err != nil {
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+ currentDeviceTransientState, previousDeviceTransientState); err != nil {
return err
}
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
@@ -578,7 +612,7 @@
}
return err
}
- go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+ go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
agent.onDeleteFailure)
}
return nil
@@ -588,7 +622,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+ logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
@@ -597,7 +631,7 @@
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
- logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
@@ -651,6 +685,8 @@
}
// Send packet to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
if err != nil {
cancel()
@@ -664,7 +700,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+ logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
@@ -686,14 +722,14 @@
cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.ConnectStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
- logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+ logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
cloned.OperStatus = operStatus
}
- logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"device-id": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+ logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
// Store the device
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
@@ -742,11 +778,13 @@
return err
}
defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "simulateAlarm", log.Fields{"device-id": agent.deviceID})
+ logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
@@ -778,10 +816,16 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
+
}
return nil
}
@@ -805,7 +849,7 @@
//Reverting TransientState update
err := agent.updateTransientState(ctx, prevTransientState)
logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
- "previousTransientState": prevTransientState, "currentTransientState": transientState})
+ "previous-transient-state": prevTransientState, "current-transient-state": transientState})
agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
@@ -818,10 +862,14 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
-
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
device, prevDevice, transientState, prevTransientState); err != nil {
- logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+ // Sending RPC EVENT here
+ rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().UnixNano())
}
return nil
}
@@ -829,7 +877,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- logger.Debugw(ctx, "updateDeviceReason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+ logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
@@ -837,7 +885,7 @@
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
- logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+ logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
// Remove the associated peer ports on the parent device
for portID := range agent.portLoader.ListIDs() {
@@ -861,6 +909,8 @@
//send request to adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
if err != nil {
cancel()
@@ -907,12 +957,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+ logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
return testResp, nil
}
func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
- logger.Debugw(ctx, "getExtValue", log.Fields{"device-id": agent.deviceID, "onuid": valueparam.Id, "valuetype": valueparam.Value})
+ logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -938,12 +988,12 @@
if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
- logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+ logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
return Resp, nil
}
func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
- logger.Debugw(ctx, "setExtValue", log.Fields{"device-id": value.Id})
+ logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -965,12 +1015,12 @@
}
// Unmarshal and return the response
- logger.Debug(ctx, "setExtValue-Success-device-agent")
+ logger.Debug(ctx, "set-ext-value-success-device-agent")
return &empty.Empty{}, nil
}
func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
- logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
@@ -1004,7 +1054,7 @@
}
func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
- logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+ logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err