VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
The patch addresses the following
- Create OpenOltFlowMgr per PON port (instead of one instance for the whole OLT device earlier)
- Create a separate OpenOltGroupMgr - currently one instance for the whole OLT device
- Remove redundant global lock around getting ONU-ID in DeviceHandler module as there exists a
separate per-pon-port lock in ResourceManager module which suffices the required synchronization
- Remove redundant locks in OpenOltFlowMgr module to serialize FlowDelete before FlowAdd
- Rename divideAndAddFlow to processAddFlow. "divideAndAddFlow" was used in 1.x voltha days and
had a different meaning and the name seems to have been blindly ported to 2.x adapter
and does not make sense anymore
Change-Id: I99827963cf242f1db0c27943c97bd05b749ae129
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
new file mode 100644
index 0000000..d14d24b
--- /dev/null
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -0,0 +1,350 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//Package core provides the utility for olt devices, flows, groups and statistics
+package core
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+ rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+//QueueInfoBrief has information about gemPortID and service priority associated with Mcast group
+type QueueInfoBrief struct {
+ gemPortID uint32
+ servicePriority uint32
+}
+
+//OpenOltGroupMgr creates the Structure of OpenOltGroupMgr obj
+type OpenOltGroupMgr struct {
+ deviceHandler *DeviceHandler
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
+ interfaceToMcastQueueMap map[uint32]*QueueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
+ interfaceToMcastQueueMapLock sync.RWMutex
+}
+
+//////////////////////////////////////////////
+// EXPORTED FUNCTIONS //
+//////////////////////////////////////////////
+
+//NewGroupManager creates OpenOltGroupMgr object and initializes the parameters
+func NewGroupManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltGroupMgr {
+ logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
+ var grpMgr OpenOltGroupMgr
+ grpMgr.deviceHandler = dh
+ grpMgr.resourceMgr = rMgr
+ grpMgr.interfaceToMcastQueueMap = make(map[uint32]*QueueInfoBrief)
+ grpMgr.interfaceToMcastQueueMapLock = sync.RWMutex{}
+ logger.Info(ctx, "initialization-of-group-manager-success")
+ return &grpMgr
+}
+
+// AddGroup add or update the group
+func (g *OpenOltGroupMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Infow(ctx, "add-group", log.Fields{"group": group})
+ if group == nil {
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ Command: openoltpb2.Group_SET_MEMBERS,
+ Action: g.buildGroupAction(),
+ }
+ logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
+ _, err := g.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
+ if err != nil {
+ return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+ }
+ // group members not created yet. So let's store the group
+ if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
+ return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return nil
+}
+
+// DeleteGroup deletes a group from the device
+func (g *OpenOltGroupMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
+ if group == nil {
+ logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ }
+ logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
+ _, err := g.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
+ if err != nil {
+ logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
+ return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
+ }
+ //remove group from the store
+ if err := g.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
+ return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
+ return nil
+}
+
+// ModifyGroup updates the group
+func (g *OpenOltGroupMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
+ logger.Infow(ctx, "modify-group", log.Fields{"group": group})
+ if group == nil || group.Desc == nil {
+ return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
+ }
+ newGroup := g.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
+ //get existing members of the group
+ val, groupExists, err := g.getFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
+ if err != nil {
+ return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
+ }
+ var current *openoltpb2.Group // represents the group on the device
+ if groupExists {
+ // group already exists
+ current = g.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
+ logger.Debugw(ctx, "modify-group--group exists",
+ log.Fields{
+ "group on the device": val,
+ "new": group})
+ } else {
+ current = g.buildGroup(ctx, group.Desc.GroupId, nil)
+ }
+ logger.Debugw(ctx, "modify-group--comparing-current-and-new",
+ log.Fields{
+ "group on the device": current,
+ "new": newGroup})
+ // get members to be added
+ membersToBeAdded := g.findDiff(current, newGroup)
+ // get members to be removed
+ membersToBeRemoved := g.findDiff(newGroup, current)
+ logger.Infow(ctx, "modify-group--differences found", log.Fields{
+ "membersToBeAdded": membersToBeAdded,
+ "membersToBeRemoved": membersToBeRemoved,
+ "groupId": group.Desc.GroupId})
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ }
+ var errAdd, errRemoved error
+ if len(membersToBeAdded) > 0 {
+ groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
+ groupToOlt.Members = membersToBeAdded
+ //execute addMembers
+ errAdd = g.callGroupAddRemove(ctx, &groupToOlt)
+ }
+ if len(membersToBeRemoved) > 0 {
+ groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
+ groupToOlt.Members = membersToBeRemoved
+ //execute removeMembers
+ errRemoved = g.callGroupAddRemove(ctx, &groupToOlt)
+ }
+ //save the modified group
+ if errAdd == nil && errRemoved == nil {
+ if err := g.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
+ return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
+ }
+ logger.Infow(ctx, "modify-group-was-success--storing-group",
+ log.Fields{
+ "group": group,
+ "existingGroup": current})
+ } else {
+ logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
+ log.Fields{"group": group})
+ if errAdd != nil {
+ return errAdd
+ }
+ return errRemoved
+ }
+ return nil
+}
+
+//LoadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
+//and put them into interfaceToMcastQueueMap.
+func (g *OpenOltGroupMgr) LoadInterfaceToMulticastQueueMap(ctx context.Context) {
+ storedMulticastQueueMap, err := g.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
+ if err != nil {
+ logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
+ return
+ }
+ for intf, queueInfo := range storedMulticastQueueMap {
+ q := QueueInfoBrief{
+ gemPortID: queueInfo[0],
+ servicePriority: queueInfo[1],
+ }
+ g.interfaceToMcastQueueMap[intf] = &q
+ }
+}
+
+//GetInterfaceToMcastQueueMap gets the mcast queue mapped to to the PON interface
+func (g *OpenOltGroupMgr) GetInterfaceToMcastQueueMap(intfID uint32) (*QueueInfoBrief, bool) {
+ g.interfaceToMcastQueueMapLock.RLock()
+ defer g.interfaceToMcastQueueMapLock.RUnlock()
+ val, present := g.interfaceToMcastQueueMap[intfID]
+ return val, present
+}
+
+//UpdateInterfaceToMcastQueueMap updates the mcast queue information mapped to a given PON interface
+func (g *OpenOltGroupMgr) UpdateInterfaceToMcastQueueMap(intfID uint32, val *QueueInfoBrief) {
+ g.interfaceToMcastQueueMapLock.Lock()
+ defer g.interfaceToMcastQueueMapLock.Unlock()
+ g.interfaceToMcastQueueMap[intfID] = val
+}
+
+////////////////////////////////////////////////
+// INTERNAL or UNEXPORTED FUNCTIONS //
+////////////////////////////////////////////////
+//getFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
+//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
+//Returns (nil, false, nil) if the group does not exists in the KV store.
+func (g *OpenOltGroupMgr) getFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+ exists, groupInfo, err := g.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
+ if err != nil {
+ return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
+ }
+ if exists {
+ return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
+ }
+ return nil, exists, nil
+}
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+ groupDesc := ofp.OfpGroupDesc{
+ Type: ofp.OfpGroupType_OFPGT_ALL,
+ GroupId: groupID,
+ }
+ groupEntry := ofp.OfpGroupEntry{
+ Desc: &groupDesc,
+ }
+ for i := 0; i < len(outPorts); i++ {
+ var acts []*ofp.OfpAction
+ acts = append(acts, flows.Output(outPorts[i]))
+ bucket := ofp.OfpBucket{
+ Actions: acts,
+ }
+ groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
+ }
+ return &groupEntry
+}
+
+//buildGroupAction creates and returns a group action
+func (g *OpenOltGroupMgr) buildGroupAction() *openoltpb2.Action {
+ var actionCmd openoltpb2.ActionCmd
+ var action openoltpb2.Action
+ action.Cmd = &actionCmd
+ //pop outer vlan
+ action.Cmd.RemoveOuterTag = true
+ return &action
+}
+
+//callGroupAddRemove performs add/remove buckets operation for the indicated group
+func (g *OpenOltGroupMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
+ if err := g.performGroupOperation(ctx, group); err != nil {
+ st, _ := status.FromError(err)
+ //ignore already exists error code
+ if st.Code() != codes.AlreadyExists {
+ return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
+ }
+ }
+ return nil
+}
+
+//findDiff compares group members and finds members which only exists in groups2
+func (g *OpenOltGroupMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
+ var members []*openoltpb2.GroupMember
+ for _, bucket := range group2.Members {
+ if !g.contains(group1.Members, bucket) {
+ // bucket does not exist and must be added
+ members = append(members, bucket)
+ }
+ }
+ return members
+}
+
+//contains returns true if the members list contains the given member; false otherwise
+func (g *OpenOltGroupMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
+ for _, groupMember := range members {
+ if groupMember.InterfaceId == member.InterfaceId {
+ return true
+ }
+ }
+ return false
+}
+
+//performGroupOperation call performGroupOperation operation of openolt proto
+func (g *OpenOltGroupMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
+ logger.Debugw(ctx, "sending-group-to-device",
+ log.Fields{
+ "groupToOlt": group,
+ "command": group.Command})
+ _, err := g.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
+ if err != nil {
+ return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
+ }
+ return nil
+}
+
+//buildGroup build openoltpb2.Group from given group id and bucket list
+func (g *OpenOltGroupMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
+ group := openoltpb2.Group{
+ GroupId: groupID}
+ // create members of the group
+ for _, ofBucket := range buckets {
+ member := g.buildMember(ctx, ofBucket)
+ if member != nil && !g.contains(group.Members, member) {
+ group.Members = append(group.Members, member)
+ }
+ }
+ return &group
+}
+
+//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
+func (g *OpenOltGroupMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
+ var outPort uint32
+ outPortFound := false
+ for _, ofAction := range ofBucket.Actions {
+ if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ outPort = ofAction.GetOutput().Port
+ outPortFound = true
+ }
+ }
+ if !outPortFound {
+ logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
+ return nil
+ }
+ interfaceID := IntfIDFromUniPortNum(outPort)
+ logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
+ log.Fields{
+ "portNumber:": outPort,
+ "interfaceId:": interfaceID})
+ g.interfaceToMcastQueueMapLock.RLock()
+ defer g.interfaceToMcastQueueMapLock.RUnlock()
+ if groupInfo, ok := g.interfaceToMcastQueueMap[interfaceID]; ok {
+ member := openoltpb2.GroupMember{
+ InterfaceId: interfaceID,
+ InterfaceType: openoltpb2.GroupMember_PON,
+ GemPortId: groupInfo.gemPortID,
+ Priority: groupInfo.servicePriority,
+ }
+ //add member to the group
+ return &member
+ }
+ logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
+ return nil
+}