VOL-3507 Implement the device update queries in rw-core
Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f2fd10a..4589b91 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -18,11 +18,13 @@
import (
"context"
+ "fmt"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -45,16 +47,23 @@
func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(newFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
return coreutils.DoneResponse(), nil
}
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0)
@@ -62,6 +71,8 @@
for _, flow := range newFlows {
flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
if created {
@@ -72,6 +83,8 @@
//Flow needs to be updated.
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
+ desc = fmt.Sprintf("failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
}
flowsToDelete = append(flowsToDelete, flowToReplace)
@@ -101,9 +114,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: flowsToAdd},
@@ -117,27 +132,38 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addFlowsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(flowsToDel)) == 0 {
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
return coreutils.DoneResponse(), nil
}
+ defer agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
+
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
for _, flow := range flowsToDel {
@@ -145,6 +171,7 @@
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
flowHandle.Unlock()
@@ -162,9 +189,10 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -178,16 +206,21 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteFlowToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
return response, nil
}
func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "update-flows-to-adapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(updatedFlows)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
return coreutils.DoneResponse(), nil
@@ -198,10 +231,15 @@
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ desc = fmt.Sprint("invalid device states")
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
+
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
@@ -212,6 +250,8 @@
// Update the store and cache
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
@@ -231,9 +271,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, updatedAllFlows, nil, nil)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
} else {
logger.Debugw(ctx, "updating-flows-and-groups",
log.Fields{
@@ -260,11 +302,15 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateFlowToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
@@ -301,16 +347,30 @@
func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw(ctx, "deleteAllFlows", log.Fields{"device-id": agent.deviceID})
+ var error string
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "deleteAllFlows", nil, nil, operStatus, &desc)
+
for flowID := range agent.flowLoader.ListIDs() {
if flowHandle, have := agent.flowLoader.Lock(flowID); have {
// Update the store and cache
if err := flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
+ error += fmt.Sprintf("%v ", flowID)
logger.Errorw(ctx, "unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
continue
}
flowHandle.Unlock()
}
}
+
+ if error != "" {
+ desc = fmt.Sprintf("Unable to delete flows : %s", error)
+ } else {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
+
return nil
}