[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f13003b..6ad4488 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -20,13 +20,15 @@
"context"
"fmt"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/common"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -47,8 +49,10 @@
func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
if (len(newFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
@@ -56,22 +60,17 @@
}
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
if !agent.proceedWithRequest(device) {
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return coreutils.DoneResponse(), err
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(), err
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -80,7 +79,6 @@
flowHandle, created, err := agent.flowCache.LockOrCreate(ctx, flow)
if err != nil {
desc = err.Error()
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
if created {
@@ -91,9 +89,7 @@
//Flow needs to be updated.
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
- desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+ return coreutils.DoneResponse(), err
}
flowsToDelete = append(flowsToDelete, flowToReplace)
flowsToAdd = append(flowsToAdd, flow)
@@ -108,25 +104,21 @@
// Sanity check
if (len(flowsToAdd)) == 0 {
logger.Debugw(ctx, "no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return coreutils.DoneResponse(), nil
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
response := coreutils.NewResponse()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if !dType.AcceptsAddRemoveFlowUpdates {
-
updatedAllFlows := agent.listDeviceFlows()
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
- if err != nil {
- cancel()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), err
+ ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+ for _, flow := range updatedAllFlows {
+ flowSlice[ctr] = flow
+ ctr++
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
+ go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: flowsToAdd},
@@ -137,55 +129,128 @@
ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
+ go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
+func (agent *Agent) sendBulkFlows(
+ ctx context.Context,
+ device *voltha.Device,
+ flows *voltha.Flows,
+ groups *voltha.FlowGroups,
+ flowMetadata *voltha.FlowMetadata,
+ response coreutils.Response,
+) {
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ // Get a grpc client
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ response.Error(err)
+ return
+ }
+ subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+ defer cancel()
+
+ if _, err = client.UpdateFlowsBulk(subCtx, &ic.BulkFlows{
+ Device: device,
+ Flows: flows,
+ Groups: groups,
+ FlowMetadata: flowMetadata,
+ }); err != nil {
+ response.Error(err)
+ } else {
+ response.Done()
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+}
+
+func (agent *Agent) sendIncrementalFlows(
+ ctx context.Context,
+ device *voltha.Device,
+ flowChanges *ofp.FlowChanges,
+ groupChanges *ofp.FlowGroupChanges,
+ flowMetadata *voltha.FlowMetadata,
+ response coreutils.Response,
+) {
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ // Get a grpc client
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ response.Error(err)
+ return
+ }
+ subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+ defer cancel()
+ if _, err = client.UpdateFlowsIncrementally(subCtx, &ic.IncrementalFlows{
+ Device: device,
+ Flows: flowChanges,
+ Groups: groupChanges,
+ FlowMetadata: flowMetadata,
+ }); err != nil {
+ response.Error(err)
+ } else {
+ response.Done()
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+}
+
func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
var desc string
+ var err error
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
if (len(flowsToDel)) == 0 {
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return coreutils.DoneResponse(), nil
}
- defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
-
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- desc = err.Error()
- return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+ return coreutils.DoneResponse(), err
}
if !agent.proceedWithRequest(device) {
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
- agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return coreutils.DoneResponse(), err
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(), err
}
for _, flow := range flowsToDel {
if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
// Update the store and cache
- if err := flowHandle.Delete(ctx); err != nil {
+ if err = flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
desc = err.Error()
return coreutils.DoneResponse(), err
@@ -195,20 +260,16 @@
}
// Send update to adapters
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
response := coreutils.NewResponse()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if !dType.AcceptsAddRemoveFlowUpdates {
-
updatedAllFlows := agent.listDeviceFlows()
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
- if err != nil {
- cancel()
- desc = err.Error()
- return coreutils.DoneResponse(), err
+ ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+ for _, flow := range updatedAllFlows {
+ flowSlice[ctr] = flow
+ ctr++
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
+ go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -219,13 +280,7 @@
ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- desc = err.Error()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
+ go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
}
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
return response, nil
@@ -234,37 +289,35 @@
func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
if (len(updatedFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return coreutils.DoneResponse(), nil
}
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
+ return coreutils.DoneResponse(), err
}
if !agent.proceedWithRequest(device) {
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return coreutils.DoneResponse(), err
}
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
- desc = "invalid device states"
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+ err = status.Errorf(codes.FailedPrecondition, "invalid device states")
+ return coreutils.DoneResponse(), err
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
-
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(), err
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -273,10 +326,8 @@
if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
flowToDelete := flowHandle.GetReadOnly()
// Update the store and cache
- if err := flowHandle.Update(ctx, flow); err != nil {
+ if err = flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
@@ -286,21 +337,17 @@
}
}
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
response := coreutils.NewResponse()
+ subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
updatedAllFlows := agent.listDeviceFlows()
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
- if err != nil {
- cancel()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), err
+ ctr, flowSlice := 0, make([]*ofp.OfpFlowStats, len(updatedAllFlows))
+ for _, flow := range updatedAllFlows {
+ flowSlice[ctr] = flow
+ ctr++
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
+ go agent.sendBulkFlows(subCtx, device, &voltha.Flows{Items: flowSlice}, nil, flowMetadata, response)
} else {
logger.Debugw(ctx, "updating-flows-and-groups",
log.Fields{
@@ -311,7 +358,7 @@
// Sanity check
if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
- cancel()
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return coreutils.DoneResponse(), nil
}
@@ -324,18 +371,9 @@
ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- desc = err.Error()
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
+ go agent.sendIncrementalFlows(subCtx, device, flowChanges, groupChanges, flowMetadata, response)
}
-
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
@@ -362,7 +400,7 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -372,30 +410,29 @@
func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
- var error string
+ var err error
+ var errFlows string
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
- defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
for flowID := range agent.flowCache.ListIDs() {
if flowHandle, have := agent.flowCache.Lock(flowID); have {
// Update the store and cache
- if err := flowHandle.Delete(ctx); err != nil {
+ if err = flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
- error += fmt.Sprintf("%v ", flowID)
- logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+ errFlows += fmt.Sprintf("%v ", flowID)
+ logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID, "error": err})
continue
}
flowHandle.Unlock()
}
}
- if error != "" {
- desc = fmt.Sprintf("Unable to delete flows : %s", error)
+ if errFlows != "" {
+ err = fmt.Errorf("unable to delete flows : %s", errFlows)
} else {
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
}
-
- return nil
+ return err
}