VOL-3501 Code changes to support rpc event

Change-Id: I2536c0c03faa5fb026349c906ebef46323398e9a
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c122574..84f765f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -133,7 +133,7 @@
 		agent.portLoader.Load(ctx)
 		agent.transientStateLoader.Load(ctx)
 
-		logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
+		logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
 		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
@@ -172,7 +172,7 @@
 	}
 	defer agent.requestQueue.RequestComplete()
 
-	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
+	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
 	// Remove the device transient loader
 	if err := agent.deleteTransientState(ctx); err != nil {
 		return err
@@ -236,7 +236,7 @@
 	// TODO: Post failure message onto kafka
 }
 
-func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
 	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
 	defer cancel()
 	select {
@@ -283,19 +283,31 @@
 
 }
 
-func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
 	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
 	defer cancel()
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+			//add failure
 		} else if rpcResponse.Err != nil {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
 			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+			//add failure
 		} else {
 			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
 		}
 	case <-ctx.Done():
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
 		onFailure(ctx, rpc, ctx.Err(), reqArgs)
 	}
 }
@@ -326,7 +338,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
 
 	oldDevice := agent.getDeviceReadOnlyWithoutLock()
 	if oldDevice.AdminState == voltha.AdminState_ENABLED {
@@ -360,6 +372,8 @@
 	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
 	var ch chan *kafka.RpcResponse
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
 		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
 	} else {
@@ -376,16 +390,28 @@
 
 func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
 	defer cancel()
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
 	select {
 	case rpcResponse, ok := <-ch:
 		if !ok {
+			//add failure
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
 		} else if rpcResponse.Err != nil {
+			//add failure
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
 			response.Error(rpcResponse.Err)
 		} else {
 			response.Done()
 		}
 	case <-ctx.Done():
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
 		response.Error(ctx.Err())
 	}
 }
@@ -451,7 +477,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
 
 	cloned := agent.cloneDeviceWithoutLock()
 
@@ -476,6 +502,8 @@
 	}
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
 	if err != nil {
 		cancel()
@@ -491,13 +519,15 @@
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
 	if agent.isDeletionInProgress() {
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
 		cancel()
@@ -508,7 +538,7 @@
 }
 
 func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
-	logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
@@ -521,27 +551,29 @@
 			agent.deviceID)
 	}
 	device := agent.cloneDeviceWithoutLock()
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
-		previousDeviceTransientState); err != nil {
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+		voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
 		return err
 	}
 	previousAdminState := device.AdminState
 	if previousAdminState != ic.AdminState_PREPROVISIONED {
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
 			return err
 		}
 		// Since it is a case of force delete, nothing needs to be done on adapter responses.
-		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+		go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
 			agent.onFailure)
 	}
 	return nil
 }
 
 func (agent *Agent) deleteDevice(ctx context.Context) error {
-	logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
@@ -561,14 +593,16 @@
 		// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
 		currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
 	}
-	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
-		previousDeviceTransientState); err != nil {
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
+		currentDeviceTransientState, previousDeviceTransientState); err != nil {
 		return err
 	}
 	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
 	// adapter
 	if previousAdminState != ic.AdminState_PREPROVISIONED {
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
@@ -578,7 +612,7 @@
 			}
 			return err
 		}
-		go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
 			agent.onDeleteFailure)
 	}
 	return nil
@@ -588,7 +622,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
+	logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.ParentId = parentID
@@ -597,7 +631,7 @@
 
 // getSwitchCapability retrieves the switch capability of a parent device
 func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
-	logger.Debugw(ctx, "getSwitchCapability", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
@@ -651,6 +685,8 @@
 	}
 	//	Send packet to adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
 	if err != nil {
 		cancel()
@@ -664,7 +700,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+	logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Root = device.Root
@@ -686,14 +722,14 @@
 	cloned := agent.cloneDeviceWithoutLock()
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
-		logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
 		cloned.ConnectStatus = connStatus
 	}
 	if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
-		logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+		logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
 		cloned.OperStatus = operStatus
 	}
-	logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"device-id": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
+	logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
 	// Store the device
 	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
@@ -742,11 +778,13 @@
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "simulateAlarm", log.Fields{"device-id": agent.deviceID})
+	logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
 
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
 	if err != nil {
 		cancel()
@@ -778,10 +816,16 @@
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
 		device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
-		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+		// Sending RPC EVENT here
+		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().UnixNano())
+
 	}
 	return nil
 }
@@ -805,7 +849,7 @@
 		//Reverting TransientState update
 		err := agent.updateTransientState(ctx, prevTransientState)
 		logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
-			"previousTransientState": prevTransientState, "currentTransientState": transientState})
+			"previous-transient-state": prevTransientState, "current-transient-state": transientState})
 		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
@@ -818,10 +862,14 @@
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
-
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
 		device, prevDevice, transientState, prevTransientState); err != nil {
-		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
+		// Sending RPC EVENT here
+		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().UnixNano())
 	}
 	return nil
 }
@@ -829,7 +877,7 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	logger.Debugw(ctx, "updateDeviceReason", log.Fields{"device-id": agent.deviceID, "reason": reason})
+	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
 
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
@@ -837,7 +885,7 @@
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw(ctx, "childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
+	logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
 
 	// Remove the associated peer ports on the parent device
 	for portID := range agent.portLoader.ListIDs() {
@@ -861,6 +909,8 @@
 
 	//send request to adapter
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
 	ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, agent.deviceID, device.ParentPortNo, device.ProxyAddress.OnuId)
 	if err != nil {
 		cancel()
@@ -907,12 +957,12 @@
 	if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
 		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 	}
-	logger.Debugw(ctx, "Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+	logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
 	return testResp, nil
 }
 
 func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
-	logger.Debugw(ctx, "getExtValue", log.Fields{"device-id": agent.deviceID, "onuid": valueparam.Id, "valuetype": valueparam.Value})
+	logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -938,12 +988,12 @@
 	if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
 		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 	}
-	logger.Debugw(ctx, "getExtValue-Success-device-agent", log.Fields{"Resp": Resp})
+	logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
 	return Resp, nil
 }
 
 func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
-	logger.Debugw(ctx, "setExtValue", log.Fields{"device-id": value.Id})
+	logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
 	}
@@ -965,12 +1015,12 @@
 	}
 
 	// Unmarshal and return the response
-	logger.Debug(ctx, "setExtValue-Success-device-agent")
+	logger.Debug(ctx, "set-ext-value-success-device-agent")
 	return &empty.Empty{}, nil
 }
 
 func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
-	logger.Debugw(ctx, "getSingleValue", log.Fields{"device-id": request.TargetId})
+	logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err
@@ -1004,7 +1054,7 @@
 }
 
 func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
-	logger.Debugw(ctx, "setSingleValue", log.Fields{"device-id": request.TargetId})
+	logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
 
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return nil, err