blob: 780eec3183d460a1563f803e8cb43cf817fac576 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"fmt"
"time"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// listLogicalDeviceGroups returns logical device flow groups
func (agent *LogicalAgent) listLogicalDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
groupIDs := agent.groupCache.ListIDs()
groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
for groupID := range groupIDs {
if groupHandle, have := agent.groupCache.Lock(groupID); have {
groups[groupID] = groupHandle.GetReadOnly()
groupHandle.Unlock()
}
}
return groups
}
//updateGroupTable updates the group table of that logical device
func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug(ctx, "update-group-table")
if groupMod == nil {
return nil
}
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_DELETE:
return agent.groupDelete(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_MODIFY:
return agent.groupModify(ctx, groupMod)
}
return status.Errorf(codes.Internal, "unhandled-command: logical-device-id:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
}
func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
if groupMod == nil {
return nil
}
logger.Debugw(ctx, "group-add", log.Fields{"group-id": groupMod.GroupId})
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
groupHandle, created, err := agent.groupCache.LockOrCreate(ctx, groupEntry)
if err != nil {
return err
}
groupHandle.Unlock()
if !created {
return fmt.Errorf("group %d already exists", groupMod.GroupId)
}
fg := fu.NewFlowsAndGroups()
fg.AddGroup(groupEntry)
deviceRules := fu.NewDeviceRules()
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-add": deviceRules.String()})
// Update the devices
respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
context := make(map[string]string)
context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
context["logical-device-id"] = agent.logicalDeviceID
context["group-id"] = fmt.Sprintf("%v", groupMod.GroupId)
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
return nil
}
func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debugw(ctx, "group-delete", log.Fields{"group-mod": groupMod})
if groupMod == nil {
return nil
}
affectedFlows := make(map[uint64]*ofp.OfpFlowStats)
affectedGroups := make(map[uint32]*ofp.OfpGroupEntry)
toDelete := map[uint32]struct{}{groupMod.GroupId: {}}
if groupMod.GroupId == uint32(ofp.OfpGroup_OFPG_ALL) {
toDelete = agent.groupCache.ListIDs()
}
for groupID := range toDelete {
if groupHandle, have := agent.groupCache.Lock(groupID); have {
affectedGroups[groupID] = groupHandle.GetReadOnly()
if err := groupHandle.Delete(ctx); err != nil {
return err
}
groupHandle.Unlock()
//TODO: this is another case where ordering guarantees are not being made,
// group deletion does not guarantee deletion of corresponding flows.
// an error while deleting flows can cause inconsistent state.
flows, err := agent.deleteFlowsHavingGroup(ctx, groupID)
if err != nil {
logger.Errorw(ctx, "cannot-update-flow-for-group-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "groupID": groupID})
return err
}
for flowID, flow := range flows {
affectedFlows[flowID] = flow
}
}
}
if len(affectedGroups) == 0 {
logger.Debugw(ctx, "no-group-to-delete", log.Fields{"group-id": groupMod.GroupId})
return nil
}
var deviceRules *fu.DeviceRules
var err error
if len(affectedFlows) != 0 {
deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, affectedFlows, affectedGroups)
if err != nil {
return err
}
} else {
//no flow is affected, just remove the groups
deviceRules = fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
}
//add groups to deviceRules
for _, groupEntry := range affectedGroups {
fg := fu.NewFlowsAndGroups()
fg.AddGroup(groupEntry)
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
}
logger.Debugw(ctx, "rules", log.Fields{"rules": deviceRules.String()})
// delete groups and related flows, if any
respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &voltha.FlowMetadata{}, &ofp.OfpFlowMod{})
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
context := make(map[string]string)
context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
context["group-id"] = fmt.Sprintf("%v", groupMod.GroupId)
context["logical-device-id"] = agent.logicalDeviceID
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
return nil
}
func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug(ctx, "group-modify")
if groupMod == nil {
return nil
}
groupID := groupMod.GroupId
groupHandle, have := agent.groupCache.Lock(groupID)
if !have {
return fmt.Errorf("group-absent:%d", groupID)
}
defer groupHandle.Unlock()
//replace existing group entry with new group definition
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
deviceRules := fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
fg := fu.NewFlowsAndGroups()
fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
//update KV
if err := groupHandle.Update(ctx, groupEntry); err != nil {
logger.Errorw(ctx, "cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
}
// Update the devices
respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw(ctx, "failure-updating-device-flows-groups", log.Fields{"logical-device-id": agent.logicalDeviceID, "errors": res})
context := make(map[string]string)
context["rpc"] = coreutils.GetRPCMetadataFromContext(ctx)
context["group-id"] = fmt.Sprintf("%v", groupMod.GroupId)
context["logical-device-id"] = agent.logicalDeviceID
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
return nil
}