VOL-3507 Implement the device update queries in rw-core

Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index bc5fc1a..9358d8a 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,13 +21,15 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/empty"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"reflect"
 	"sync"
 	"time"
 
-	"github.com/gogo/protobuf/proto"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device/flow"
@@ -38,12 +40,11 @@
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	"github.com/opencord/voltha-protos/v4/go/extension"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 // Agent represents device agent attributes
@@ -136,12 +137,20 @@
 		logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
 	} else {
 		// Create a new device
+		var desc string
+		prevState := common.AdminState_UNKNOWN
+		currState := common.AdminState_UNKNOWN
+		operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+		defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
+
 		// Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
 		// is a new device, so populate them here before passing the device to ldProxy.Set.
 		// agent.deviceId will also have been set during newAgent().
 		device = (proto.Clone(deviceToCreate)).(*voltha.Device)
 		device.Id = agent.deviceID
 		device.AdminState = voltha.AdminState_PREPROVISIONED
+		currState = device.AdminState
 		if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
 			// Set the default vlan ID to the one specified by the parent adapter.  It can be
 			// overwritten by the child adapter during a device update request
@@ -150,8 +159,10 @@
 
 		// Add the initial device to the local model
 		if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+			desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
 		}
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		agent.device = device
 	}
 	startSucceeded = true
@@ -312,6 +323,42 @@
 	}
 }
 
+func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
+	defer cancel()
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer agent.logDeviceUpdate(ctx, rpc, prevState, &agent.device.AdminState, operStatus, &desc)
+	var rpce *voltha.RPCEvent
+	defer func() {
+		if rpce != nil {
+			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+		}
+	}()
+
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
+			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+			//add failure
+		} else if rpcResponse.Err != nil {
+			desc = rpcResponse.Err.Error()
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+			//add failure
+		} else {
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
+		}
+	case <-ctx.Done():
+		desc = ctx.Err().Error()
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
+		onFailure(ctx, rpc, ctx.Err(), reqArgs)
+	}
+}
+
 // getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
 func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -335,20 +382,34 @@
 
 // enableDevice activates a preprovisioned or a disable device
 func (agent *Agent) enableDevice(ctx context.Context) error {
+	//To preserve and use oldDevice state as prev state in new device
+	prevDeviceState := agent.device.AdminState
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
 
 	oldDevice := agent.getDeviceReadOnlyWithoutLock()
+
 	if oldDevice.AdminState == voltha.AdminState_ENABLED {
 		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
-		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+		desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	if agent.isDeletionInProgress() {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+
+		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+		desc = fmt.Sprintf("deviceId:%s, Device deletion is in progress.", agent.deviceID)
+		return status.Error(codes.FailedPrecondition, desc)
+
 	}
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
@@ -356,6 +417,7 @@
 	adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
 	if err != nil {
 		agent.requestQueue.RequestComplete()
+		desc = err.Error()
 		return err
 	}
 
@@ -365,7 +427,9 @@
 	// Update the Admin State and set the operational state to activating before sending the request to the Adapters
 	newDevice.AdminState = voltha.AdminState_ENABLED
 	newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+
 	if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
+		desc = err.Error()
 		return err
 	}
 
@@ -373,6 +437,7 @@
 	var ch chan *kafka.RpcResponse
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
 	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
 
 	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
 		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
@@ -381,15 +446,23 @@
 	}
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
+
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
 	// Wait for response
-	go agent.waitForAdapterResponse(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure)
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 	return nil
 }
 
-func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, ch chan *kafka.RpcResponse, response coreutils.Response) {
+func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
 	defer cancel()
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
+
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
@@ -401,17 +474,21 @@
 	case rpcResponse, ok := <-ch:
 		if !ok {
 			//add failure
+			desc = "Response Channel Closed"
 			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
 			response.Error(status.Errorf(codes.Aborted, "channel-closed"))
 		} else if rpcResponse.Err != nil {
 			//add failure
-			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
+			desc = rpcResponse.Err.Error()
+			rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
 			response.Error(rpcResponse.Err)
 		} else {
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 			response.Done()
 		}
 	case <-ctx.Done():
-		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
+		desc = ctx.Err().Error()
+		rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
 		response.Error(ctx.Err())
 	}
 }
@@ -474,7 +551,15 @@
 
 //disableDevice disable a device
 func (agent *Agent) disableDevice(ctx context.Context) error {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	prevDeviceState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
@@ -482,12 +567,14 @@
 	cloned := agent.cloneDeviceWithoutLock()
 
 	if cloned.AdminState == voltha.AdminState_DISABLED {
+		desc = "device-already-disabled"
 		logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
 		agent.requestQueue.RequestComplete()
 		return nil
 	}
 	if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
 		agent.requestQueue.RequestComplete()
+		desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
 	if agent.isDeletionInProgress() {
@@ -497,6 +584,7 @@
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+
 	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return err
 	}
@@ -507,15 +595,27 @@
 	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+	// Wait for response
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 
 	return nil
 }
 
 func (agent *Agent) rebootDevice(ctx context.Context) error {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	prevDeviceState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	defer agent.requestQueue.RequestComplete()
@@ -531,15 +631,25 @@
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
 		cancel()
+		desc = err.Error()
 		return err
 	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure)
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+
+	// Wait for response
+	go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
 	return nil
 }
 
 func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
 	logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 		return err
 	}
 	// Get the device Transient state, return err if it is DELETING
@@ -547,8 +657,10 @@
 
 	if agent.isStateDeleting(previousDeviceTransientState) {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress",
+		desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
 			agent.deviceID)
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	device := agent.cloneDeviceWithoutLock()
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
@@ -563,8 +675,13 @@
 		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 			return err
 		}
+		// As force delete will not be dependent over the response of adapter, marking this operation as success
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 		// Since it is a case of force delete, nothing needs to be done on adapter responses.
 		go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
 			agent.onFailure)
@@ -574,7 +691,15 @@
 
 func (agent *Agent) deleteDevice(ctx context.Context) error {
 	logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	prevState := agent.device.AdminState
+
+	defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
+
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
 		return err
 	}
 	// Get the device Transient state, return err if it is DELETING
@@ -582,7 +707,8 @@
 
 	if agent.isStateDeleting(previousDeviceTransientState) {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress", agent.deviceID)
+		desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress", agent.deviceID)
+		return status.Error(codes.FailedPrecondition, desc)
 	}
 	device := agent.cloneDeviceWithoutLock()
 	previousAdminState := device.AdminState
@@ -595,6 +721,7 @@
 	}
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
 		currentDeviceTransientState, previousDeviceTransientState); err != nil {
+		desc = err.Error()
 		return err
 	}
 	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
@@ -610,10 +737,13 @@
 			if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
 				logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
 			}
+			desc = err.Error()
 			return err
 		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
-			agent.onDeleteFailure)
+
+		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+		go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+			agent.onDeleteFailure, &prevState)
 	}
 	return nil
 }
@@ -877,11 +1007,24 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+
 	logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
+
 	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
-	return agent.updateDeviceAndReleaseLock(ctx, cloned)
+	retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
+	if retErr != nil {
+		desc = retErr.Error()
+	} else {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+		desc = reason
+	}
+	return retErr
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {