[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index ba58d2f..43a8929 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -18,15 +18,13 @@
 
 import (
 	"context"
-	"fmt"
-	"strconv"
 
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/common"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -47,32 +45,30 @@
 func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
 
+	var err error
 	var desc string
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	if (len(newGroups)) == 0 {
-		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		desc = "no new groups"
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+		return coreutils.DoneResponse(), err
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
-		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
@@ -80,8 +76,6 @@
 	for _, group := range newGroups {
 		groupHandle, created, err := agent.groupCache.LockOrCreate(ctx, group)
 		if err != nil {
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 
@@ -91,11 +85,9 @@
 			groupToChange := groupHandle.GetReadOnly()
 			if !proto.Equal(groupToChange, group) {
 				//Group needs to be updated.
-				if err := groupHandle.Update(ctx, group); err != nil {
+				if err = groupHandle.Update(ctx, group); err != nil {
 					groupHandle.Unlock()
-					desc = fmt.Sprintf("failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
-					agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+					return coreutils.DoneResponse(), err
 				}
 				groupsToDelete = append(groupsToDelete, groupToChange)
 				groupsToAdd = append(groupsToAdd, group)
@@ -109,25 +101,22 @@
 	}
 	// Sanity check
 	if (len(groupsToAdd)) == 0 {
-		logger.Debugw(ctx, "no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		desc = "no group to update"
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
 	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllGroups := agent.listDeviceGroups()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
+		ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(updatedAllGroups))
+		for _, group := range updatedAllGroups {
+			groupSlice[ctr] = group
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, nil, &voltha.FlowGroups{Items: groupSlice}, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -138,57 +127,46 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-	agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
 func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
 
+	var desc string
+	var err error
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
 	if (len(groupsToDel)) == 0 {
-		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+		desc = "nothing to delete"
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
-	var desc string
-	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
-	defer agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
-
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		desc = err.Error()
-		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+		return coreutils.DoneResponse(), err
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
-		agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	for _, group := range groupsToDel {
 		if groupHandle, have := agent.groupCache.Lock(group.Desc.GroupId); have {
 			// Update the store and cache
-			if err := groupHandle.Delete(ctx); err != nil {
+			if err = groupHandle.Delete(ctx); err != nil {
 				groupHandle.Unlock()
-				desc = err.Error()
 				return coreutils.DoneResponse(), err
 			}
 			groupHandle.Unlock()
@@ -196,19 +174,16 @@
 	}
 
 	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllGroups := agent.listDeviceGroups()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			return coreutils.DoneResponse(), err
+		ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(updatedAllGroups))
+		for _, group := range updatedAllGroups {
+			groupSlice[ctr] = group
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, nil, &voltha.FlowGroups{Items: groupSlice}, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -219,13 +194,7 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
@@ -235,47 +204,43 @@
 	logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
 
 	var desc string
+	var err error
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	if (len(updatedGroups)) == 0 {
-		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+		desc = "no groups to update"
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+		return coreutils.DoneResponse(), err
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or Reconciling is in progress/failed", device.Id)
-		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
-		desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
-		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+		err = status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		err = status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
 	for _, group := range updatedGroups {
 		if groupHandle, have := agent.groupCache.Lock(group.Desc.GroupId); have {
 			// Update the store and cache
-			if err := groupHandle.Update(ctx, group); err != nil {
+			if err = groupHandle.Update(ctx, group); err != nil {
 				groupHandle.Unlock()
-				desc = err.Error()
-				agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 			groupsToUpdate = append(groupsToUpdate, group)
@@ -283,21 +248,17 @@
 		}
 	}
 
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllGroups := agent.listDeviceGroups()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
+		ctr, groupSlice := 0, make([]*ofp.OfpGroupEntry, len(updatedAllGroups))
+		for _, group := range updatedAllGroups {
+			groupSlice[ctr] = group
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, nil, &voltha.FlowGroups{Items: groupSlice}, flowMetadata, response)
 	} else {
 		logger.Debugw(ctx, "updating-groups",
 			log.Fields{
@@ -307,8 +268,8 @@
 
 		// Sanity check
 		if (len(groupsToUpdate)) == 0 {
-			logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
-			cancel()
+			desc = "nothing to update"
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 			return coreutils.DoneResponse(), nil
 		}
 
@@ -321,17 +282,9 @@
 			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-	agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }