blob: ff51041163ccde77ea1920b582b4aae89c0372de [file] [log] [blame]
/*
* Copyright 2020-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//Package core provides the utility for olt devices, flows, groups and statistics
package core
import (
"context"
"sync"
"github.com/opencord/voltha-lib-go/v7/pkg/flows"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
openoltpb2 "github.com/opencord/voltha-protos/v5/go/openolt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
type QueueInfoBrief struct {
gemPortID uint32
servicePriority uint32
}
//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
type OpenOltGroupMgr struct {
deviceHandler *DeviceHandler
resourceMgr *rsrcMgr.OpenOltResourceMgr
interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
interfaceToMcastQueueMapLock sync.RWMutex
}
//////////////////////////////////////////////
// EXPORTED FUNCTIONS //
//////////////////////////////////////////////
//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
logger.Infow(ctx, "initializing-group-manager", log.Fields{"device-id": dh.device.Id})
var grpMgr OpenOltGroupMgr
grpMgr.deviceHandler = dh
grpMgr.resourceMgr = rMgr
grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
logger.Info(ctx, "initialization-of-group-manager-success")
return &grpMgr
}
// AddGroup add or update the group
func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
logger.Infow(ctx, "add-group", log.Fields{"group": group})
if group == nil {
return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
}
groupToOlt := openoltpb2.Group{
GroupId: group.Desc.GroupId,
Command: openoltpb2.Group_SET_MEMBERS,
Action: g.buildGroupAction(),
}
logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
_, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
if err != nil {
return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
}
// group members not created yet. So let's store the group
if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
}
logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
return nil
}
// DeleteGroup deletes a group from the device
func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
if group == nil {
logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
}
groupToOlt := openoltpb2.Group{
GroupId: group.Desc.GroupId,
}
logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
_, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
if err != nil {
logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
}
//remove group from the store
if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
return olterrors.NewErrPersistence("delete", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
}
logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
return nil
}
// ModifyGroup updates the group
func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
logger.Infow(ctx, "modify-group", log.Fields{"group": group})
if group == nil || group.Desc == nil {
return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
}
newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
//get existing members of the group
val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
if err != nil {
return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
}
var current *openoltpb2.Group // represents the group on the device
if groupExists {
// group already exists
current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
logger.Debugw(ctx, "modify-group--group exists",
log.Fields{
"group on the device": val,
"new": group})
} else {
current = g.buildGroup(ctx, group.Desc.GroupId, nil)
}
logger.Debugw(ctx, "modify-group--comparing-current-and-new",
log.Fields{
"group on the device": current,
"new": newGroup})
// get members to be added
membersToBeAdded := g.findDiff(current, newGroup)
// get members to be removed
membersToBeRemoved := g.findDiff(newGroup, current)
logger.Infow(ctx, "modify-group--differences found", log.Fields{
"membersToBeAdded": membersToBeAdded,
"membersToBeRemoved": membersToBeRemoved,
"groupId": group.Desc.GroupId})
groupToOlt := openoltpb2.Group{
GroupId: group.Desc.GroupId,
}
var errAdd, errRemoved error
if len(membersToBeAdded) > 0 {
groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
groupToOlt.Members = membersToBeAdded
//execute addMembers
errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
}
if len(membersToBeRemoved) > 0 {
groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
groupToOlt.Members = membersToBeRemoved
//execute removeMembers
errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
}
//save the modified group
if errAdd == nil && errRemoved == nil {
if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
return olterrors.NewErrPersistence("add", "flow-group", uint64(group.Desc.GroupId), log.Fields{"group": group}, err)
}
logger.Infow(ctx, "modify-group-was-success--storing-group",
log.Fields{
"group": group,
"existingGroup": current})
} else {
logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
log.Fields{"group": group})
if errAdd != nil {
return errAdd
}
return errRemoved
}
return nil
}
//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
//and put them into interfaceToMcastQueueMap.
func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
if err != nil {
logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
return
}
for intf, queueInfo := range storedMulticastQueueMap {
q := QueueInfoBrief{
gemPortID: queueInfo[0],
servicePriority: queueInfo[1],
}
g.interfaceToMcastQueueMap[intf] = &q
}
}
//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
g.interfaceToMcastQueueMapLock.RLock()
defer g.interfaceToMcastQueueMapLock.RUnlock()
val, present := g.interfaceToMcastQueueMap[intfID]
return val, present
}
//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
g.interfaceToMcastQueueMapLock.Lock()
defer g.interfaceToMcastQueueMapLock.Unlock()
g.interfaceToMcastQueueMap[intfID] = val
}
////////////////////////////////////////////////
// INTERNAL or UNEXPORTED FUNCTIONS //
////////////////////////////////////////////////
//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
//Returns (nil, false, nil) if the group does not exists in the KV store.
func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
if err != nil {
return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
}
if exists {
return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
}
return nil, exists, nil
}
func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
groupDesc := ofp.OfpGroupDesc{
Type: ofp.OfpGroupType_OFPGT_ALL,
GroupId: groupID,
}
groupEntry := ofp.OfpGroupEntry{
Desc: &groupDesc,
}
for i := 0; i < len(outPorts); i++ {
var acts []*ofp.OfpAction
acts = append(acts, flows.Output(outPorts[i]))
bucket := ofp.OfpBucket{
Actions: acts,
}
groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
}
return &groupEntry
}
//buildGroupAction creates and returns a group action
func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
var actionCmd openoltpb2.ActionCmd
var action openoltpb2.Action
action.Cmd = &actionCmd
//pop outer vlan
action.Cmd.RemoveOuterTag = true
return &action
}
//callGroupAddRemove performs add/remove buckets operation for the indicated group
func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
if err := g.performGroupOperation(ctx, group); err != nil {
st, _ := status.FromError(err)
//ignore already exists error code
if st.Code() != codes.AlreadyExists {
return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
}
}
return nil
}
//findDiff compares group members and finds members which only exists in groups2
func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
var members []*openoltpb2.GroupMember
for _, bucket := range group2.Members {
if !g.contains(group1.Members, bucket) {
// bucket does not exist and must be added
members = append(members, bucket)
}
}
return members
}
//contains returns true if the members list contains the given member; false otherwise
func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
for _, groupMember := range members {
if groupMember.InterfaceId == member.InterfaceId {
return true
}
}
return false
}
//performGroupOperation call performGroupOperation operation of openolt proto
func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
logger.Debugw(ctx, "sending-group-to-device",
log.Fields{
"groupToOlt": group,
"command": group.Command})
_, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
if err != nil {
return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
}
return nil
}
//buildGroup build openoltpb2.Group from given group id and bucket list
func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
group := openoltpb2.Group{
GroupId: groupID}
// create members of the group
for _, ofBucket := range buckets {
member := g.buildMember(ctx, ofBucket)
if member != nil && !g.contains(group.Members, member) {
group.Members = append(group.Members, member)
}
}
return &group
}
//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
var outPort uint32
outPortFound := false
for _, ofAction := range ofBucket.Actions {
if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
outPort = ofAction.GetOutput().Port
outPortFound = true
}
}
if !outPortFound {
logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
return nil
}
interfaceID := IntfIDFromUniPortNum(outPort)
logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
log.Fields{
"portNumber:": outPort,
"interfaceId:": interfaceID})
g.interfaceToMcastQueueMapLock.RLock()
defer g.interfaceToMcastQueueMapLock.RUnlock()
if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
member := openoltpb2.GroupMember{
InterfaceId: interfaceID,
InterfaceType: openoltpb2.GroupMember_PON,
GemPortId: groupInfo.gemPortID,
Priority: groupInfo.servicePriority,
}
//add member to the group
return &member
}
logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
return nil
}