VOL-3507 Implement the device update queries in rw-core

Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f2fd10a..4589b91 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -18,11 +18,13 @@
 
 import (
 	"context"
+	"fmt"
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
 	fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/common"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -45,16 +47,23 @@
 func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(newFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
 		return coreutils.DoneResponse(), nil
 	}
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -62,6 +71,8 @@
 	for _, flow := range newFlows {
 		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
 		if err != nil {
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 		if created {
@@ -72,6 +83,8 @@
 				//Flow needs to be updated.
 				if err := flowHandle.Update(ctx, flow); err != nil {
 					flowHandle.Unlock()
+					desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+					agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
 				}
 				flowsToDelete = append(flowsToDelete, flowToReplace)
@@ -101,9 +114,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: flowsToAdd},
@@ -117,27 +132,38 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
 func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(flowsToDel)) == 0 {
 		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
+	defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
+
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
+		desc = err.Error()
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	for _, flow := range flowsToDel {
@@ -145,6 +171,7 @@
 			// Update the store and cache
 			if err := flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
+				desc = err.Error()
 				return coreutils.DoneResponse(), err
 			}
 			flowHandle.Unlock()
@@ -162,9 +189,10 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -178,16 +206,21 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
 	}
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
 }
 
 func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
 	if (len(updatedFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 		return coreutils.DoneResponse(), nil
@@ -198,10 +231,15 @@
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		desc = fmt.Sprint("invalid device states")
+		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
 	}
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
+		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
+
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -212,6 +250,8 @@
 			// Update the store and cache
 			if err := flowHandle.Update(ctx, flow); err != nil {
 				flowHandle.Unlock()
+				desc = err.Error()
+				agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 
@@ -231,9 +271,11 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
 	} else {
 		logger.Debugw(ctx, "updating-flows-and-groups",
 			log.Fields{
@@ -260,11 +302,15 @@
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
 			cancel()
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
 	}
 
+	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+	agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
@@ -301,16 +347,30 @@
 func (agent *Agent) deleteAllFlows(ctx context.Context) error {
 	logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
 
+	var error string
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+
 	for flowID := range agent.flowLoader.ListIDs() {
 		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
 			// Update the store and cache
 			if err := flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
+				error += fmt.Sprintf("%v ", flowID)
 				logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
 				continue
 			}
 			flowHandle.Unlock()
 		}
 	}
+
+	if error != "" {
+		desc = fmt.Sprintf("Unable to delete flows : %s", error)
+	} else {
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+
 	return nil
 }