VOL-3507 Implement the device update queries in rw-core
Change-Id: I2e9de4935c12981ddb7f10924d629bcd0ec09ef5
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 9552b78..5db0ea5 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -18,11 +18,13 @@
import (
"context"
+ "fmt"
"strconv"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-protos/v4/go/common"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
@@ -45,6 +47,9 @@
func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(newGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
return coreutils.DoneResponse(), nil
@@ -52,10 +57,14 @@
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -64,6 +73,8 @@
for _, group := range newGroups {
groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
@@ -75,6 +86,8 @@
//Group needs to be updated.
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
+ desc = fmt.Sprintf("failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
}
groupsToDelete = append(groupsToDelete, groupToChange)
@@ -103,9 +116,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -119,10 +134,14 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "addGroupsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}
@@ -133,12 +152,20 @@
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
return coreutils.DoneResponse(), nil
}
+
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ defer agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
+
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -147,6 +174,7 @@
// Update the store and cache
if err := groupHandle.Delete(ctx); err != nil {
groupHandle.Unlock()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
groupHandle.Unlock()
@@ -163,9 +191,10 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -179,16 +208,21 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "deleteGroupsFromAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
return response, nil
}
func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "update-groups-to-adapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
if (len(updatedGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
return coreutils.DoneResponse(), nil
@@ -196,13 +230,19 @@
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
+ desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -212,6 +252,8 @@
// Update the store and cache
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
groupsToUpdate = append(groupsToUpdate, group)
@@ -229,9 +271,11 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, nil, updatedAllGroups, nil)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
} else {
logger.Debugw(ctx, "updating-groups",
log.Fields{
@@ -258,10 +302,14 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), err
}
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, "updateGroupsToAdapter", rpcResponse, response)
}
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return response, nil
}