VOL-3507 Implement the device update queries in rw-core

Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 9552b78..5db0ea5 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -18,11 +18,13 @@
 
 import (
 	"context"
+	"fmt"
 	"strconv"
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -45,6 +47,9 @@
 func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(newGroups)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
 		return coreutils.DoneResponse(), nil
@@ -52,10 +57,14 @@
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -64,6 +73,8 @@
 	for _, group := range newGroups {
 		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
 		if err != nil {
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 
@@ -75,6 +86,8 @@
 				//Group needs to be updated.
 				if err := groupHandle.Update(ctx, group); err != nil {
 					groupHandle.Unlock()
+					desc = fmt.Sprintf("failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+					agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
 				}
 				groupsToDelete = append(groupsToDelete, groupToChange)
@@ -103,9 +116,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -119,10 +134,14 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
@@ -133,12 +152,20 @@
 		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
 		return coreutils.DoneResponse(), nil
 	}
+
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
+
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -147,6 +174,7 @@
 			// Update the store and cache
 			if err := groupHandle.Delete(ctx); err != nil {
 				groupHandle.Unlock()
+				desc = err.Error()
 				return coreutils.DoneResponse(), err
 			}
 			groupHandle.Unlock()
@@ -163,9 +191,10 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -179,16 +208,21 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
 }
 
 func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(updatedGroups)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 		return coreutils.DoneResponse(), nil
@@ -196,13 +230,19 @@
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 
@@ -212,6 +252,8 @@
 			// Update the store and cache
 			if err := groupHandle.Update(ctx, group); err != nil {
 				groupHandle.Unlock()
+				desc = err.Error()
+				agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 			groupsToUpdate = append(groupsToUpdate, group)
@@ -229,9 +271,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
 	} else {
 		logger.Debugw(ctx, "updating-groups",
 			log.Fields{
@@ -258,10 +302,14 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
 	}
 
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }