blob: 73caa071115e5b5ecd8bbf7282a4e2f54465239f [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package device
import (
"context"
"fmt"
"strconv"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
//updateGroupTable updates the group table of that logical device
func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("updateGroupTable")
if groupMod == nil {
return nil
}
switch groupMod.GetCommand() {
case ofp.OfpGroupModCommand_OFPGC_ADD:
return agent.groupAdd(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_DELETE:
return agent.groupDelete(ctx, groupMod)
case ofp.OfpGroupModCommand_OFPGC_MODIFY:
return agent.groupModify(ctx, groupMod)
}
return status.Errorf(codes.Internal, "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
}
func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
if groupMod == nil {
return nil
}
logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
agent.groupLock.Lock()
_, ok := agent.groups[groupMod.GroupId]
if ok {
agent.groupLock.Unlock()
return fmt.Errorf("Group %d already exists", groupMod.GroupId)
}
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
groupChunk := GroupChunk{
group: groupEntry,
}
//add to map
agent.groups[groupMod.GroupId] = &groupChunk
groupChunk.lock.Lock()
defer groupChunk.lock.Unlock()
agent.groupLock.Unlock()
//add to the kv store
path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
groupID := strconv.Itoa(int(groupMod.GroupId))
if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
agent.groupLock.Lock()
delete(agent.groups, groupMod.GroupId)
agent.groupLock.Unlock()
return err
}
deviceRules := fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
fg := fu.NewFlowsAndGroups()
fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
// Update the devices
respChnls := agent.addFlowsAndGroupsToDevices(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
}()
return nil
}
func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("groupDelete")
if groupMod == nil {
return nil
}
affectedFlows := make([]*ofp.OfpFlowStats, 0)
affectedGroups := make([]*ofp.OfpGroupEntry, 0)
var groupsChanged bool
groupID := groupMod.GroupId
var err error
if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
if err := func() error {
agent.groupLock.Lock()
defer agent.groupLock.Unlock()
for key, groupChunk := range agent.groups {
//Remove from store and cache. Do this in a one time lock allocation.
path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
}
delete(agent.groups, groupID)
var flows []*ofp.OfpFlowStats
if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
return err
}
affectedFlows = append(affectedFlows, flows...)
affectedGroups = append(affectedGroups, groupChunk.group)
}
return nil
}(); err != nil {
return err
}
groupsChanged = true
} else {
agent.groupLock.RLock()
groupChunk, ok := agent.groups[groupID]
agent.groupLock.RUnlock()
if !ok {
logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
return nil
}
groupChunk.lock.Lock()
defer groupChunk.lock.Unlock()
var flows []*ofp.OfpFlowStats
if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
return err
}
//remove from store
if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
return err
}
affectedFlows = append(affectedFlows, flows...)
affectedGroups = append(affectedGroups, groupChunk.group)
groupsChanged = true
}
if err != nil || groupsChanged {
var deviceRules *fu.DeviceRules
deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
if err != nil {
return err
}
logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
// Update the devices
respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, nil)
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
}()
}
return nil
}
func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
logger.Debug("groupModify")
if groupMod == nil {
return nil
}
groupID := groupMod.GroupId
agent.groupLock.RLock()
groupChunk, ok := agent.groups[groupID]
agent.groupLock.RUnlock()
if !ok {
return fmt.Errorf("group-absent:%d", groupID)
}
//Don't let any other thread to make modifications to this group till all done here.
groupChunk.lock.Lock()
defer groupChunk.lock.Unlock()
//replace existing group entry with new group definition
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
deviceRules := fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
fg := fu.NewFlowsAndGroups()
fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
//update KV
if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
// Update the devices
respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
}()
return nil
}