blob: e11aee8f079229eeb8db211010961b3d56807a04 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"strconv"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// listDeviceGroups returns logical device flow groups
func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
groupIDs := agent.groupLoader.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
if groupHandle, have := agent.groupLoader.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
}
return groups
}
func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
if (len(newGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
device := agent.getDeviceWithoutLock()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
grpHandle.Unlock()
}
}
}
groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
for _, group := range newGroups {
groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
if err != nil {
return coreutils.DoneResponse(), err
}
if created {
groupsToAdd = append(groupsToAdd, group)
updatedAllGroups = append(updatedAllGroups, group)
} else {
groupToChange := groupHandle.GetReadOnly()
if !proto.Equal(groupToChange, group) {
//Group needs to be updated.
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
}
groupsToDelete = append(groupsToDelete, groupToChange)
groupsToAdd = append(groupsToAdd, group)
updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
} else {
//No need to change the group. It is already exist.
logger.Debugw(ctx, "No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
}
}
groupHandle.Unlock()
}
// Sanity check
if (len(groupsToAdd)) == 0 {
logger.Debugw(ctx, "no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
}
groupChanges := &ofp.FlowGroupChanges{
ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
return response, nil
}
func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
if (len(groupsToDel)) == 0 {
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
return coreutils.DoneResponse(), nil
}
device := agent.getDeviceWithoutLock()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
grpHandle.Unlock()
}
}
}
for _, group := range groupsToDel {
if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
// Update the store and cache
if err := groupHandle.Delete(ctx); err != nil {
groupHandle.Unlock()
return coreutils.DoneResponse(), err
}
if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
}
groupHandle.Unlock()
}
}
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
}
groupChanges := &ofp.FlowGroupChanges{
ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
return response, nil
}
func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
logger.Debugw(ctx, "updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
if (len(updatedGroups)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
return coreutils.DoneResponse(), nil
}
device := agent.getDeviceWithoutLock()
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
if !dType.AcceptsAddRemoveFlowUpdates {
groupIDs := agent.groupLoader.ListIDs()
for groupID := range groupIDs {
if grpHandle, have := agent.groupLoader.Lock(groupID); have {
updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
grpHandle.Unlock()
}
}
}
groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
for _, group := range updatedGroups {
if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
// Update the store and cache
if err := groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
return coreutils.DoneResponse(), err
}
groupsToUpdate = append(groupsToUpdate, group)
updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
groupHandle.Unlock()
}
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
logger.Debugw(ctx, "updating-groups",
log.Fields{
"device-id": agent.deviceID,
"groups-to-update": groupsToUpdate,
})
// Sanity check
if (len(groupsToUpdate)) == 0 {
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
cancel()
return coreutils.DoneResponse(), nil
}
flowChanges := &ofp.FlowChanges{
ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
}
groupChanges := &ofp.FlowGroupChanges{
ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
return response, nil
}
//replaceGroupInList removes the old group from list and adds the new one.
func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
}
groupList = append(groupList, newGroup)
return groupList
}
//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
//panic if the index is out of range.
func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
groups[index] = groups[len(groups)-1]
groups[len(groups)-1] = nil
return groups[:len(groups)-1]
}