[VOL-3215] Reorganize functions in agent.go
Functions moved without any change in contents
Change-Id: I42ba327e648bacf25e5d328743835b36be89f4b4
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 00a054e..dbaeb2f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -22,7 +22,6 @@
"errors"
"fmt"
"reflect"
- "strconv"
"sync"
"time"
@@ -31,7 +30,6 @@
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
- fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/gogo/protobuf/proto"
@@ -330,220 +328,6 @@
}
}
-//replaceFlowInList removes the old flow from list and adds the new one.
-func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
- if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
- flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
- }
- flowList = append(flowList, newFlow)
- return flowList
-}
-
-//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
-//panic if the index is out of range.
-func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
- flows[index] = flows[len(flows)-1]
- flows[len(flows)-1] = nil
- return flows[:len(flows)-1]
-}
-
-//replaceGroupInList removes the old group from list and adds the new one.
-func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
- if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
- groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
- }
- groupList = append(groupList, newGroup)
- return groupList
-}
-
-//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
-//panic if the index is out of range.
-func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
- groups[index] = groups[len(groups)-1]
- groups[len(groups)-1] = nil
- return groups[:len(groups)-1]
-}
-
-func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
-
- if (len(newFlows)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
- return coreutils.DoneResponse(), nil
- }
- device := agent.getDeviceWithoutLock()
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.ListIDs()
- for flowID := range flowIDs {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
- updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
- flowHandle.Unlock()
- }
- }
- }
- flowsToAdd := make([]*ofp.OfpFlowStats, 0)
- flowsToDelete := make([]*ofp.OfpFlowStats, 0)
- for _, flow := range newFlows {
- flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
- if err != nil {
- return coreutils.DoneResponse(), err
- }
-
- if created {
- flowsToAdd = append(flowsToAdd, flow)
- updatedAllFlows = append(updatedAllFlows, flow)
- } else {
- flowToReplace := flowHandle.GetReadOnly()
- if !proto.Equal(flowToReplace, flow) {
- //Flow needs to be updated.
- if err := flowHandle.Update(ctx, flow); err != nil {
- flowHandle.Unlock()
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
- }
- flowsToDelete = append(flowsToDelete, flowToReplace)
- flowsToAdd = append(flowsToAdd, flow)
- updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
- } else {
- //No need to change the flow. It is already exist.
- logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
- }
- }
-
- flowHandle.Unlock()
- }
-
- // Sanity check
- if (len(flowsToAdd)) == 0 {
- logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
- return coreutils.DoneResponse(), nil
- }
-
- // Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: flowsToAdd},
- ToRemove: &voltha.Flows{Items: flowsToDelete},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
- return response, nil
-}
-
-func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
-
- if (len(newGroups)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
- return coreutils.DoneResponse(), nil
- }
-
- device := agent.getDeviceWithoutLock()
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.ListIDs()
- for groupID := range groupIDs {
- if grpHandle, have := agent.groupLoader.Lock(groupID); have {
- updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
- grpHandle.Unlock()
- }
- }
- }
-
- groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
- groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
- for _, group := range newGroups {
-
- groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
- if err != nil {
- return coreutils.DoneResponse(), err
- }
-
- if created {
- groupsToAdd = append(groupsToAdd, group)
- updatedAllGroups = append(updatedAllGroups, group)
- } else {
- groupToChange := groupHandle.GetReadOnly()
- if !proto.Equal(groupToChange, group) {
- //Group needs to be updated.
- if err := groupHandle.Update(ctx, group); err != nil {
- groupHandle.Unlock()
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
- }
- groupsToDelete = append(groupsToDelete, groupToChange)
- groupsToAdd = append(groupsToAdd, group)
- updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
- } else {
- //No need to change the group. It is already exist.
- logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
- }
- }
-
- groupHandle.Unlock()
- }
- // Sanity check
- if (len(groupsToAdd)) == 0 {
- logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
- return coreutils.DoneResponse(), nil
- }
-
- // Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
- ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
- return response, nil
-}
-
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -564,141 +348,6 @@
return nil
}
-func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
-
- if (len(flowsToDel)) == 0 {
- logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
- return coreutils.DoneResponse(), nil
- }
-
- device := agent.getDeviceWithoutLock()
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.ListIDs()
- for flowID := range flowIDs {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
- updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
- flowHandle.Unlock()
- }
- }
- }
- for _, flow := range flowsToDel {
- if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
- // Update the store and cache
- flowToDelete := flowHandle.GetReadOnly()
- if err := flowHandle.Delete(ctx); err != nil {
- flowHandle.Unlock()
- return coreutils.DoneResponse(), err
- }
- if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
- updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
- }
- flowHandle.Unlock()
- }
- }
-
- // Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- ToRemove: &voltha.Flows{Items: flowsToDel},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
- return response, nil
-}
-
-func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
-
- if (len(groupsToDel)) == 0 {
- logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
- return coreutils.DoneResponse(), nil
- }
- device := agent.getDeviceWithoutLock()
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.ListIDs()
- for groupID := range groupIDs {
- if grpHandle, have := agent.groupLoader.Lock(groupID); have {
- updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
- grpHandle.Unlock()
- }
- }
- }
-
- for _, group := range groupsToDel {
- if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
- // Update the store and cache
- if err := groupHandle.Delete(ctx); err != nil {
- groupHandle.Unlock()
- return coreutils.DoneResponse(), err
- }
- if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
- updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
- }
- groupHandle.Unlock()
- }
- }
-
- // Send update to adapters
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToRemove: &voltha.FlowGroups{Items: groupsToDel},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
- return response, nil
-}
-
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -717,209 +366,6 @@
return nil
}
-//filterOutFlows removes flows from a device using the uni-port as filter
-func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
- var flowsToDelete []*ofp.OfpFlowStats
- // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
- flow := flowHandle.GetReadOnly()
- if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
- flowsToDelete = append(flowsToDelete, flow)
- }
- flowHandle.Unlock()
- }
- }
-
- logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
- if len(flowsToDelete) == 0 {
- return nil
- }
-
- response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
- if err != nil {
- return err
- }
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
-}
-
-func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
-
- if (len(updatedFlows)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
- return coreutils.DoneResponse(), nil
- }
-
- device := agent.getDeviceWithoutLock()
- if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
- }
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- flowIDs := agent.flowLoader.ListIDs()
- for flowID := range flowIDs {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
- updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
- flowHandle.Unlock()
- }
- }
- }
- flowsToAdd := make([]*ofp.OfpFlowStats, 0)
- flowsToDelete := make([]*ofp.OfpFlowStats, 0)
-
- for _, flow := range updatedFlows {
- if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
- flowToDelete := flowHandle.GetReadOnly()
- // Update the store and cache
- if err := flowHandle.Update(ctx, flow); err != nil {
- flowHandle.Unlock()
- return coreutils.DoneResponse(), err
- }
-
- flowsToDelete = append(flowsToDelete, flowToDelete)
- flowsToAdd = append(flowsToAdd, flow)
- updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
- flowHandle.Unlock()
- }
- }
-
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- // Process bulk flow update differently than incremental update
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- logger.Debugw("updating-flows-and-groups",
- log.Fields{
- "device-id": agent.deviceID,
- "flows-to-add": flowsToAdd,
- "flows-to-delete": flowsToDelete,
- })
- // Sanity check
- if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
- cancel()
- return coreutils.DoneResponse(), nil
- }
-
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: flowsToAdd},
- ToRemove: &voltha.Flows{Items: flowsToDelete},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
-
- return response, nil
-}
-
-func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
-
- if (len(updatedGroups)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
- return coreutils.DoneResponse(), nil
- }
-
- device := agent.getDeviceWithoutLock()
- if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
- }
- dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
- if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
- }
- updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
- if !dType.AcceptsAddRemoveFlowUpdates {
- groupIDs := agent.groupLoader.ListIDs()
- for groupID := range groupIDs {
- if grpHandle, have := agent.groupLoader.Lock(groupID); have {
- updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
- grpHandle.Unlock()
- }
- }
- }
- groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
-
- for _, group := range updatedGroups {
- if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
- // Update the store and cache
- if err := groupHandle.Update(ctx, group); err != nil {
- groupHandle.Unlock()
- return coreutils.DoneResponse(), err
- }
- groupsToUpdate = append(groupsToUpdate, group)
- updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
- groupHandle.Unlock()
- }
- }
-
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
- // Process bulk flow update differently than incremental update
- if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- } else {
- logger.Debugw("updating-groups",
- log.Fields{
- "device-id": agent.deviceID,
- "groups-to-update": groupsToUpdate,
- })
-
- // Sanity check
- if (len(groupsToUpdate)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
- cancel()
- return coreutils.DoneResponse(), nil
- }
-
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
- if err != nil {
- cancel()
- return coreutils.DoneResponse(), err
- }
- go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
- }
-
- return response, nil
-}
-
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -938,24 +384,6 @@
return nil
}
-//deleteAllFlows deletes all flows in the device table
-func (agent *Agent) deleteAllFlows(ctx context.Context) error {
- logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
-
- for flowID := range agent.flowLoader.ListIDs() {
- if flowHandle, have := agent.flowLoader.Lock(flowID); have {
- // Update the store and cache
- if err := flowHandle.Delete(ctx); err != nil {
- flowHandle.Unlock()
- logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
- continue
- }
- flowHandle.Unlock()
- }
- }
- return nil
-}
-
//disableDevice disable a device
func (agent *Agent) disableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -1058,323 +486,6 @@
return nil
}
-func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
-
- cloned := agent.getDeviceWithoutLock()
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return err
- }
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
- if err != nil {
- cancel()
- return err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
- return nil
-}
-
-func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
-
- cloned := agent.getDeviceWithoutLock()
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
-
- return agent.getDeviceWithoutLock().PmConfigs, nil
-}
-
-func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
-
- logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
-
- device := agent.getDeviceWithoutLock()
-
- if device.AdminState != voltha.AdminState_ENABLED {
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
- }
- // Save the image
- clonedImg := proto.Clone(img).(*voltha.ImageDownload)
- clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
- cloned := proto.Clone(device).(*voltha.Device)
- if cloned.ImageDownloads == nil {
- cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
- } else {
- if device.AdminState != voltha.AdminState_ENABLED {
- logger.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
- }
- // Save the image
- clonedImg := proto.Clone(img).(*voltha.ImageDownload)
- clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
- if device.ImageDownloads == nil {
- device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
- } else {
- device.ImageDownloads = append(device.ImageDownloads, clonedImg)
- }
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
- return nil, err
- }
-
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
- }
- return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-// isImageRegistered is a helper method to figure out if an image is already registered
-func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
- for _, image := range device.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- return true
- }
- }
- return false
-}
-
-func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
-
- logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
-
- device := agent.getDeviceWithoutLock()
-
- // Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, device) {
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
- }
-
- // Update image download state
- for _, image := range device.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
- }
- }
-
- if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
- // Set the device to Enabled
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
- return nil, err
- }
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
- }
- return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
-
- // Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
- }
-
- if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
- }
- // Update image download state
- for _, image := range cloned.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
- }
- }
- // Set the device to downloading_image
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
- return nil, err
- }
-
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
- // The status of the AdminState will be changed following the update_download_status response from the adapter
- // The image name will also be removed from the device list
- return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
-
- cloned := agent.getDeviceWithoutLock()
-
- // Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
- }
-
- if cloned.AdminState != voltha.AdminState_ENABLED {
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
- }
- // Update image download state
- for _, image := range cloned.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
- }
- }
-
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return nil, err
- }
-
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
- return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- device := agent.getDeviceWithoutLock()
- ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
- agent.requestQueue.RequestComplete()
- if err != nil {
- return nil, err
- }
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
- // Successful response
- imgDownload := &voltha.ImageDownload{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return imgDownload, nil
-}
-
-func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
-
- cloned := agent.getDeviceWithoutLock()
-
- // Update the image as well as remove it if the download was cancelled
- clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
- for _, image := range cloned.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- if image.DownloadState != voltha.ImageDownload_DOWNLOAD_CANCELLED {
- clonedImages = append(clonedImages, img)
- }
- }
- }
- cloned.ImageDownloads = clonedImages
- // Set the Admin state to enabled if required
- if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
- img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
- (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
- return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
- }
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
-
- cloned := agent.getDeviceWithoutLock()
- for _, image := range cloned.ImageDownloads {
- if image.Id == img.Id && image.Name == img.Name {
- return image, nil
- }
- }
- return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
-}
-
-func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return nil, err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
-
- return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
-}
-
-// getPorts retrieves the ports information of the device based on the port type.
-func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
- logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
- ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
- for _, port := range device.Ports {
- if port.Type == portType {
- ports.Items = append(ports.Items, port)
- }
- }
- }
- return ports
-}
-
// getSwitchCapability retrieves the switch capability of a parent device
func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
logger.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
@@ -1404,33 +515,6 @@
return switchCap, nil
}
-// getPortCapability retrieves the port capability of a device
-func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
- logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
- device, err := agent.getDevice(ctx)
- if err != nil {
- return nil, err
- }
- ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
- if err != nil {
- return nil, err
- }
- // Wait for adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed")
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
- // Successful response
- portCap := &ic.PortCapability{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return portCap, nil
-}
-
func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
// packet data is encoded in the args param as the first parameter
var packet []byte
@@ -1519,145 +603,6 @@
return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
}
-func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
- logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
- for _, port := range cloned.Ports {
- port.OperStatus = operStatus
- }
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Work only on latest data
- // TODO: Get list of ports from device directly instead of the entire device
- cloned := agent.getDeviceWithoutLock()
-
- // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
- if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
- return status.Errorf(codes.InvalidArgument, "%s", portType)
- }
- for _, port := range cloned.Ports {
- if port.Type == portType && port.PortNo == portNo {
- port.OperStatus = operStatus
- }
- }
- logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) deleteAllPorts(ctx context.Context) error {
- logger.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- cloned := agent.getDeviceWithoutLock()
-
- if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
- err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
- logger.Warnw("invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
- return err
- }
- if len(cloned.Ports) == 0 {
- logger.Debugw("no-ports-present", log.Fields{"deviceId": agent.deviceID})
- return nil
- }
-
- cloned.Ports = []*voltha.Port{}
- logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
-
- cloned := agent.getDeviceWithoutLock()
- updatePort := false
- if cloned.Ports == nil {
- // First port
- logger.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
- cloned.Ports = make([]*voltha.Port, 0)
- } else {
- for _, p := range cloned.Ports {
- if p.Type == port.Type && p.PortNo == port.PortNo {
- if p.Label == "" && p.Type == voltha.Port_PON_OLT {
- //Creation of OLT PON port is being processed after a default PON port was created. Just update it.
- logger.Infow("update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
- p.Label = port.Label
- p.OperStatus = port.OperStatus
- updatePort = true
- break
- }
- logger.Debugw("port already exists", log.Fields{"port": port})
- return nil
- }
- }
- }
- if !updatePort {
- cp := proto.Clone(port).(*voltha.Port)
- // Set the admin state of the port to ENABLE
- cp.AdminState = voltha.AdminState_ENABLED
- cloned.Ports = append(cloned.Ports, cp)
- }
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
-
- cloned := agent.getDeviceWithoutLock()
-
- // Get the peer port on the device based on the peerPort no
- found := false
- for _, port := range cloned.Ports {
- if port.PortNo == peerPort.PortNo { // found peerPort
- cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
- port.Peers = append(port.Peers, cp)
- logger.Debugw("found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
- found = true
- break
- }
- }
- if !found && agent.isRootdevice {
- // An ONU PON port has been created before the corresponding creation of the OLT PON port. Create the OLT PON port
- // with default values which will be updated once the OLT PON port creation is processed.
- ponPort := &voltha.Port{
- PortNo: peerPort.PortNo,
- Type: voltha.Port_PON_OLT,
- AdminState: voltha.AdminState_ENABLED,
- DeviceId: agent.deviceID,
- Peers: []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
- }
- cloned.Ports = append(cloned.Ports, ponPort)
- logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
- }
-
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -1768,88 +713,6 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_DISABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
-
- }
- }
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
- }
-
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
- return err
- }
-
- //send request to adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
- if err != nil {
- cancel()
- return err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
- return nil
-}
-
-func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
-
- var cp *voltha.Port
- // Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
- for _, port := range device.Ports {
- if port.PortNo == Port.PortNo {
- port.AdminState = voltha.AdminState_ENABLED
- cp = proto.Clone(port).(*voltha.Port)
- break
- }
- }
-
- if cp == nil {
- return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
- }
-
- if cp.Type != voltha.Port_PON_OLT {
- return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
- }
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
- return err
- }
- //send request to adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
- if err != nil {
- cancel()
- return err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
- return nil
-}
-
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 58f9aa9..2af224c 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -17,7 +17,16 @@
package device
import (
+ "context"
+
+ "github.com/gogo/protobuf/proto"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// listDeviceFlows returns device flows
@@ -32,3 +41,313 @@
}
return flows
}
+
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+
+ if (len(newFlows)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+ return coreutils.DoneResponse(), nil
+ }
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.ListIDs()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
+ }
+ }
+ flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+ flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+ for _, flow := range newFlows {
+ flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+ if err != nil {
+ return coreutils.DoneResponse(), err
+ }
+
+ if created {
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = append(updatedAllFlows, flow)
+ } else {
+ flowToReplace := flowHandle.GetReadOnly()
+ if !proto.Equal(flowToReplace, flow) {
+ //Flow needs to be updated.
+ if err := flowHandle.Update(ctx, flow); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+ }
+ flowsToDelete = append(flowsToDelete, flowToReplace)
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+ } else {
+ //No need to change the flow. It is already exist.
+ logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ }
+ }
+
+ flowHandle.Unlock()
+ }
+
+ // Sanity check
+ if (len(flowsToAdd)) == 0 {
+ logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+ return coreutils.DoneResponse(), nil
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: flowsToAdd},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+
+ if (len(flowsToDel)) == 0 {
+ logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.ListIDs()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
+ }
+ }
+ for _, flow := range flowsToDel {
+ if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ // Update the store and cache
+ flowToDelete := flowHandle.GetReadOnly()
+ if err := flowHandle.Delete(ctx); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+ updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+ }
+ flowHandle.Unlock()
+ }
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: flowsToDel},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+
+ if (len(updatedFlows)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+ }
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.ListIDs()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
+ }
+ }
+ flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+ flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+
+ for _, flow := range updatedFlows {
+ if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ flowToDelete := flowHandle.GetReadOnly()
+ // Update the store and cache
+ if err := flowHandle.Update(ctx, flow); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+
+ flowsToDelete = append(flowsToDelete, flowToDelete)
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+ flowHandle.Unlock()
+ }
+ }
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ // Process bulk flow update differently than incremental update
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ logger.Debugw("updating-flows-and-groups",
+ log.Fields{
+ "device-id": agent.deviceID,
+ "flows-to-add": flowsToAdd,
+ "flows-to-delete": flowsToDelete,
+ })
+ // Sanity check
+ if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+ cancel()
+ return coreutils.DoneResponse(), nil
+ }
+
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: flowsToAdd},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+
+ return response, nil
+}
+
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+ if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+ flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+ }
+ flowList = append(flowList, newFlow)
+ return flowList
+}
+
+//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
+//panic if the index is out of range.
+func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+ flows[index] = flows[len(flows)-1]
+ flows[len(flows)-1] = nil
+ return flows[:len(flows)-1]
+}
+
+//filterOutFlows removes flows from a device using the uni-port as filter
+func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
+ var flowsToDelete []*ofp.OfpFlowStats
+ // If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
+ for flowID := range agent.flowLoader.ListIDs() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ flow := flowHandle.GetReadOnly()
+ if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ flowHandle.Unlock()
+ }
+ }
+
+ logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
+ if len(flowsToDelete) == 0 {
+ return nil
+ }
+
+ response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
+ if err != nil {
+ return err
+ }
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
+//deleteAllFlows deletes all flows in the device table
+func (agent *Agent) deleteAllFlows(ctx context.Context) error {
+ logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
+
+ for flowID := range agent.flowLoader.ListIDs() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ // Update the store and cache
+ if err := flowHandle.Delete(ctx); err != nil {
+ flowHandle.Unlock()
+ logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+ continue
+ }
+ flowHandle.Unlock()
+ }
+ }
+ return nil
+}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index cb9557c..41bd9ac 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -17,7 +17,17 @@
package device
import (
+ "context"
+ "strconv"
+
+ "github.com/gogo/protobuf/proto"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// listDeviceGroups returns logical device flow groups
@@ -32,3 +42,263 @@
}
return groups
}
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+ if (len(newGroups)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.ListIDs()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+
+ groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+ groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+ for _, group := range newGroups {
+
+ groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+ if err != nil {
+ return coreutils.DoneResponse(), err
+ }
+
+ if created {
+ groupsToAdd = append(groupsToAdd, group)
+ updatedAllGroups = append(updatedAllGroups, group)
+ } else {
+ groupToChange := groupHandle.GetReadOnly()
+ if !proto.Equal(groupToChange, group) {
+ //Group needs to be updated.
+ if err := groupHandle.Update(ctx, group); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+ }
+ groupsToDelete = append(groupsToDelete, groupToChange)
+ groupsToAdd = append(groupsToAdd, group)
+ updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+ } else {
+ //No need to change the group. It is already exist.
+ logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+ }
+ }
+
+ groupHandle.Unlock()
+ }
+ // Sanity check
+ if (len(groupsToAdd)) == 0 {
+ logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+ if (len(groupsToDel)) == 0 {
+ logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+ return coreutils.DoneResponse(), nil
+ }
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.ListIDs()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+
+ for _, group := range groupsToDel {
+ if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ // Update the store and cache
+ if err := groupHandle.Delete(ctx); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+ updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+ }
+ groupHandle.Unlock()
+ }
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDel},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+ if (len(updatedGroups)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+ }
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.ListIDs()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+ groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+ for _, group := range updatedGroups {
+ if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ // Update the store and cache
+ if err := groupHandle.Update(ctx, group); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ groupsToUpdate = append(groupsToUpdate, group)
+ updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+ groupHandle.Unlock()
+ }
+ }
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ // Process bulk flow update differently than incremental update
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ logger.Debugw("updating-groups",
+ log.Fields{
+ "device-id": agent.deviceID,
+ "groups-to-update": groupsToUpdate,
+ })
+
+ // Sanity check
+ if (len(groupsToUpdate)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+ cancel()
+ return coreutils.DoneResponse(), nil
+ }
+
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+
+ return response, nil
+}
+
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+ if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+ groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+ }
+ groupList = append(groupList, newGroup)
+ return groupList
+}
+
+//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
+//panic if the index is out of range.
+func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+ groups[index] = groups[len(groups)-1]
+ groups[len(groups)-1] = nil
+ return groups[:len(groups)-1]
+}
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
new file mode 100644
index 0000000..208d817
--- /dev/null
+++ b/rw_core/core/device/agent_image.go
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
+
+ device := agent.getDeviceWithoutLock()
+
+ if device.AdminState != voltha.AdminState_ENABLED {
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+ }
+ // Save the image
+ clonedImg := proto.Clone(img).(*voltha.ImageDownload)
+ clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
+ cloned := proto.Clone(device).(*voltha.Device)
+ if cloned.ImageDownloads == nil {
+ cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+ } else {
+ if device.AdminState != voltha.AdminState_ENABLED {
+ logger.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+ }
+ // Save the image
+ clonedImg := proto.Clone(img).(*voltha.ImageDownload)
+ clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
+ if device.ImageDownloads == nil {
+ device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+ } else {
+ device.ImageDownloads = append(device.ImageDownloads, clonedImg)
+ }
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
+ return nil, err
+ }
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+// isImageRegistered is a helper method to figure out if an image is already registered
+func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
+ for _, image := range device.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ return true
+ }
+ }
+ return false
+}
+
+func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
+
+ device := agent.getDeviceWithoutLock()
+
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, device) {
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ }
+
+ // Update image download state
+ for _, image := range device.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
+ }
+ }
+
+ if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ // Set the device to Enabled
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
+ cloned := agent.getDeviceWithoutLock()
+
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, cloned) {
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ }
+
+ if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+ }
+ // Update image download state
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
+ }
+ }
+ // Set the device to downloading_image
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ return nil, err
+ }
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
+ // The status of the AdminState will be changed following the update_download_status response from the adapter
+ // The image name will also be removed from the device list
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
+
+ cloned := agent.getDeviceWithoutLock()
+
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, cloned) {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ }
+
+ if cloned.AdminState != voltha.AdminState_ENABLED {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+ }
+ // Update image download state
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
+ }
+ }
+
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ return nil, err
+ }
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
+
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ device := agent.getDeviceWithoutLock()
+ ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+ // Wait for the adapter response
+ rpcResponse, ok := <-ch
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ }
+ if rpcResponse.Err != nil {
+ return nil, rpcResponse.Err
+ }
+ // Successful response
+ imgDownload := &voltha.ImageDownload{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return imgDownload, nil
+}
+
+func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
+
+ cloned := agent.getDeviceWithoutLock()
+
+ // Update the image as well as remove it if the download was cancelled
+ clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ if image.DownloadState != voltha.ImageDownload_DOWNLOAD_CANCELLED {
+ clonedImages = append(clonedImages, img)
+ }
+ }
+ }
+ cloned.ImageDownloads = clonedImages
+ // Set the Admin state to enabled if required
+ if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
+ img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
+ (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
+ return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
+ }
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
+
+ cloned := agent.getDeviceWithoutLock()
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ return image, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
+}
+
+func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
+
+ return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
+}
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
new file mode 100644
index 0000000..8432b9b
--- /dev/null
+++ b/rw_core/core/device/agent_pm_config.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
+
+ cloned := agent.getDeviceWithoutLock()
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // Store the device
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ return err
+ }
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
+ if err != nil {
+ cancel()
+ return err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
+ return nil
+}
+
+func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
+
+ cloned := agent.getDeviceWithoutLock()
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
+
+ return agent.getDeviceWithoutLock().PmConfigs, nil
+}
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
new file mode 100644
index 0000000..2d7d210
--- /dev/null
+++ b/rw_core/core/device/agent_port.go
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+ logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+ ports := &voltha.Ports{}
+ if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
+ for _, port := range device.Ports {
+ if port.Type == portType {
+ ports.Items = append(ports.Items, port)
+ }
+ }
+ }
+ return ports
+}
+
+// getPortCapability retrieves the port capability of a device
+func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
+ logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
+ device, err := agent.getDevice(ctx)
+ if err != nil {
+ return nil, err
+ }
+ ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
+ if err != nil {
+ return nil, err
+ }
+ // Wait for adapter response
+ rpcResponse, ok := <-ch
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed")
+ }
+ if rpcResponse.Err != nil {
+ return nil, rpcResponse.Err
+ }
+ // Successful response
+ portCap := &ic.PortCapability{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return portCap, nil
+}
+func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ cloned := agent.getDeviceWithoutLock()
+ for _, port := range cloned.Ports {
+ port.OperStatus = operStatus
+ }
+ // Store the device
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ // Work only on latest data
+ // TODO: Get list of ports from device directly instead of the entire device
+ cloned := agent.getDeviceWithoutLock()
+
+ // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+ if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+ return status.Errorf(codes.InvalidArgument, "%s", portType)
+ }
+ for _, port := range cloned.Ports {
+ if port.Type == portType && port.PortNo == portNo {
+ port.OperStatus = operStatus
+ }
+ }
+ logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+ // Store the device
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) deleteAllPorts(ctx context.Context) error {
+ logger.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ cloned := agent.getDeviceWithoutLock()
+
+ if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
+ err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
+ logger.Warnw("invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
+ return err
+ }
+ if len(cloned.Ports) == 0 {
+ logger.Debugw("no-ports-present", log.Fields{"deviceId": agent.deviceID})
+ return nil
+ }
+
+ cloned.Ports = []*voltha.Port{}
+ logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+ // Store the device
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
+
+ cloned := agent.getDeviceWithoutLock()
+ updatePort := false
+ if cloned.Ports == nil {
+ // First port
+ logger.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
+ cloned.Ports = make([]*voltha.Port, 0)
+ } else {
+ for _, p := range cloned.Ports {
+ if p.Type == port.Type && p.PortNo == port.PortNo {
+ if p.Label == "" && p.Type == voltha.Port_PON_OLT {
+ //Creation of OLT PON port is being processed after a default PON port was created. Just update it.
+ logger.Infow("update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
+ p.Label = port.Label
+ p.OperStatus = port.OperStatus
+ updatePort = true
+ break
+ }
+ logger.Debugw("port already exists", log.Fields{"port": port})
+ return nil
+ }
+ }
+ }
+ if !updatePort {
+ cp := proto.Clone(port).(*voltha.Port)
+ // Set the admin state of the port to ENABLE
+ cp.AdminState = voltha.AdminState_ENABLED
+ cloned.Ports = append(cloned.Ports, cp)
+ }
+ // Store the device
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
+
+ cloned := agent.getDeviceWithoutLock()
+
+ // Get the peer port on the device based on the peerPort no
+ found := false
+ for _, port := range cloned.Ports {
+ if port.PortNo == peerPort.PortNo { // found peerPort
+ cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
+ port.Peers = append(port.Peers, cp)
+ logger.Debugw("found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
+ found = true
+ break
+ }
+ }
+ if !found && agent.isRootdevice {
+ // An ONU PON port has been created before the corresponding creation of the OLT PON port. Create the OLT PON port
+ // with default values which will be updated once the OLT PON port creation is processed.
+ ponPort := &voltha.Port{
+ PortNo: peerPort.PortNo,
+ Type: voltha.Port_PON_OLT,
+ AdminState: voltha.AdminState_ENABLED,
+ DeviceId: agent.deviceID,
+ Peers: []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
+ }
+ cloned.Ports = append(cloned.Ports, ponPort)
+ logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+ }
+
+ // Store the device
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+ var cp *voltha.Port
+ // Get the most up to date the device info
+ device := agent.getDeviceWithoutLock()
+ for _, port := range device.Ports {
+ if port.PortNo == Port.PortNo {
+ port.AdminState = voltha.AdminState_DISABLED
+ cp = proto.Clone(port).(*voltha.Port)
+ break
+
+ }
+ }
+ if cp == nil {
+ return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ }
+
+ if cp.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
+ }
+ // Store the device
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+ logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+
+ //send request to adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
+ if err != nil {
+ cancel()
+ return err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
+ return nil
+}
+
+func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+
+ var cp *voltha.Port
+ // Get the most up to date the device info
+ device := agent.getDeviceWithoutLock()
+ for _, port := range device.Ports {
+ if port.PortNo == Port.PortNo {
+ port.AdminState = voltha.AdminState_ENABLED
+ cp = proto.Clone(port).(*voltha.Port)
+ break
+ }
+ }
+
+ if cp == nil {
+ return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+ }
+
+ if cp.Type != voltha.Port_PON_OLT {
+ return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
+ }
+ // Store the device
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+ logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+ return err
+ }
+ //send request to adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
+ if err != nil {
+ cancel()
+ return err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
+ return nil
+}