[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f13003b..6ad4488 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -20,13 +20,15 @@
 	"context"
 	"fmt"
 
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
 	"github.com/gogo/protobuf/proto"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/common"
-	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -47,8 +49,10 @@
 func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 
+	var err error
 	var desc string
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	if (len(newFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
@@ -56,22 +60,17 @@
 	}
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		desc = err.Error()
-		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
-		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -80,7 +79,6 @@
 		flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
 		if err != nil {
 			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 			return coreutils.DoneResponse(), err
 		}
 		if created {
@@ -91,9 +89,7 @@
 				//Flow needs to be updated.
 				if err := flowHandle.Update(ctx, flow); err != nil {
 					flowHandle.Unlock()
-					desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
-					agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
-					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+					return coreutils.DoneResponse(), err
 				}
 				flowsToDelete = append(flowsToDelete, flowToReplace)
 				flowsToAdd = append(flowsToAdd, flow)
@@ -108,25 +104,21 @@
 	// Sanity check
 	if (len(flowsToAdd)) == 0 {
 		logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
 	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
-
 		updatedAllFlows := agent.listDeviceFlows()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
+		ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+		for _, flow := range updatedAllFlows {
+			flowSlice[ctr] = flow
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: flowsToAdd},
@@ -137,55 +129,128 @@
 			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-	agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
+func (agent *Agent) sendBulkFlows(
+	ctx context.Context,
+	device *voltha.Device,
+	flows *voltha.Flows,
+	groups *voltha.FlowGroups,
+	flowMetadata *voltha.FlowMetadata,
+	response coreutils.Response,
+) {
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	// Get a grpc client
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
+		response.Error(err)
+		return
+	}
+	subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+	defer cancel()
+
+	if _, err = client.UpdateFlowsBulk(subCtx, &ic.BulkFlows{
+		Device:       device,
+		Flows:        flows,
+		Groups:       groups,
+		FlowMetadata: flowMetadata,
+	}); err != nil {
+		response.Error(err)
+	} else {
+		response.Done()
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+}
+
+func (agent *Agent) sendIncrementalFlows(
+	ctx context.Context,
+	device *voltha.Device,
+	flowChanges *ofp.FlowChanges,
+	groupChanges *ofp.FlowGroupChanges,
+	flowMetadata *voltha.FlowMetadata,
+	response coreutils.Response,
+) {
+	var err error
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+	// Get a grpc client
+	client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+	if err != nil {
+		logger.Errorw(ctx, "grpc-client-nil",
+			log.Fields{
+				"error":            err,
+				"device-id":        agent.deviceID,
+				"device-type":      agent.deviceType,
+				"adapter-endpoint": device.AdapterEndpoint,
+			})
+		response.Error(err)
+		return
+	}
+	subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+	defer cancel()
+	if _, err = client.UpdateFlowsIncrementally(subCtx, &ic.IncrementalFlows{
+		Device:       device,
+		Flows:        flowChanges,
+		Groups:       groupChanges,
+		FlowMetadata: flowMetadata,
+	}); err != nil {
+		response.Error(err)
+	} else {
+		response.Done()
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+	}
+}
+
 func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 
 	var desc string
+	var err error
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	if (len(flowsToDel)) == 0 {
 		logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
-	defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
-
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		desc = err.Error()
-		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+		return coreutils.DoneResponse(), err
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
-		agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	for _, flow := range flowsToDel {
 		if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
 			// Update the store and cache
-			if err := flowHandle.Delete(ctx); err != nil {
+			if err = flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
 				desc = err.Error()
 				return coreutils.DoneResponse(), err
@@ -195,20 +260,16 @@
 	}
 
 	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	if !dType.AcceptsAddRemoveFlowUpdates {
-
 		updatedAllFlows := agent.listDeviceFlows()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			return coreutils.DoneResponse(), err
+		ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+		for _, flow := range updatedAllFlows {
+			flowSlice[ctr] = flow
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -219,13 +280,7 @@
 			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
 	return response, nil
@@ -234,37 +289,35 @@
 func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
+	var err error
 	var desc string
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	if (len(updatedFlows)) == 0 {
 		logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		return coreutils.DoneResponse(), nil
 	}
 
 	device, err := agent.getDeviceReadOnly(ctx)
 	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+		return coreutils.DoneResponse(), err
 	}
 
 	if !agent.proceedWithRequest(device) {
-		desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
-		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+		err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+		return coreutils.DoneResponse(), err
 	}
 
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
-		desc = "invalid device states"
-		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+		err = status.Errorf(codes.FailedPrecondition, "invalid device states")
+		return coreutils.DoneResponse(), err
 	}
 
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
-		desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
-		agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+		return coreutils.DoneResponse(), err
 	}
 
 	flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -273,10 +326,8 @@
 		if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
 			flowToDelete := flowHandle.GetReadOnly()
 			// Update the store and cache
-			if err := flowHandle.Update(ctx, flow); err != nil {
+			if err = flowHandle.Update(ctx, flow); err != nil {
 				flowHandle.Unlock()
-				desc = err.Error()
-				agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 				return coreutils.DoneResponse(), err
 			}
 
@@ -286,21 +337,17 @@
 		}
 	}
 
-	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
 	response := coreutils.NewResponse()
+	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		updatedAllFlows := agent.listDeviceFlows()
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
+		ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+		for _, flow := range updatedAllFlows {
+			flowSlice[ctr] = flow
+			ctr++
 		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
+		go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
 	} else {
 		logger.Debugw(ctx, "updating-flows-and-groups",
 			log.Fields{
@@ -311,7 +358,7 @@
 		// Sanity check
 		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
 			logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
-			cancel()
+			operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 			return coreutils.DoneResponse(), nil
 		}
 
@@ -324,18 +371,9 @@
 			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			desc = err.Error()
-			agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
+		go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
-
 	operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-	agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
 	return response, nil
 }
 
@@ -362,7 +400,7 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -372,30 +410,29 @@
 func (agent *Agent) deleteAllFlows(ctx context.Context) error {
 	logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
 
-	var error string
+	var err error
+	var errFlows string
 	var desc string
 	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
-	defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+	defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
 
 	for flowID := range agent.flowCache.ListIDs() {
 		if flowHandle, have := agent.flowCache.Lock(flowID); have {
 			// Update the store and cache
-			if err := flowHandle.Delete(ctx); err != nil {
+			if err = flowHandle.Delete(ctx); err != nil {
 				flowHandle.Unlock()
-				error += fmt.Sprintf("%v ", flowID)
-				logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+				errFlows += fmt.Sprintf("%v ", flowID)
+				logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID, "error": err})
 				continue
 			}
 			flowHandle.Unlock()
 		}
 	}
 
-	if error != "" {
-		desc = fmt.Sprintf("Unable to delete flows : %s", error)
+	if errFlows != "" {
+		err = fmt.Errorf("unable to delete flows : %s", errFlows)
 	} else {
 		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 	}
-
-	return nil
+	return err
 }