VOL-1596 Add Support for handling multicast groups in OpenOLT Adapter.
VOL-1595 Add Support for handling multicast flows in OpenOLT Adapter.
Depends voltha-protos from the patch below:
https://gerrit.opencord.org/#/c/16690/
Change-Id: I1cc9900bd6400bb31aed11beda674138838a21d2
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 21f21c9..0e3da4d 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -24,12 +24,12 @@
"strconv"
"strings"
- "github.com/opencord/voltha-lib-go/v2/pkg/db"
- "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
)
const (
@@ -48,6 +48,20 @@
OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
//FlowIDsForGem flowids_per_gem/<intfid>
FlowIDsForGem = "flowids_per_gem/{%d}"
+ //McastQueuesForIntf multicast queues for pon interfaces
+ McastQueuesForIntf = "mcast_qs_for_int"
+ //FlowGroup flow_groups/<flow_group_id>
+ // A group is stored under this path on the KV store after it has been installed to the device.
+ // It should also be deleted after it has been removed from the device accordingly.
+ FlowGroup = "flow_groups/{%d}"
+ //FlowGroupCached flow_groups_cached/<flow_group_id>
+ // When a group add request received, we create the group without setting any members to it since we cannot
+ // set any members to a group until it is associated with a multicast flow. It is a BAL limitation.
+ // When the related multicast flow has been created we perform set members operation for the group.
+ // That is why we need to keep the members of a group until the multicast flow creation request comes.
+ // We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
+ // we remove the group from the cached group store.
+ FlowGroupCached = "flow_groups_cached/{%d}"
)
// FlowInfo holds the flow information
@@ -74,6 +88,12 @@
LogicalPort uint32
}
+// GroupInfo holds group information
+type GroupInfo struct {
+ GroupID uint32
+ OutPorts []uint32
+}
+
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
DeviceID string // OLT device id
@@ -1269,9 +1289,9 @@
log.Error("failed to get data from kv store")
return nil, err
}
- if value != nil {
+ if value != nil && value.Value != nil {
if val, err = kvstore.ToByte(value.Value); err != nil {
- log.Error("Failed to convert to byte array", log.Fields{"error": err})
+ log.Error("Failed to convert to byte array ", log.Fields{"error": err})
return nil, err
}
if err = json.Unmarshal(val, &flowsForGem); err != nil {
@@ -1298,3 +1318,136 @@
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
}
+
+//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
+func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap() (map[uint32][]uint32, error) {
+ path := fmt.Sprintf(McastQueuesForIntf)
+ var mcastQueueToIntfMap map[uint32][]uint32
+ var val []byte
+
+ kvPair, err := RsrcMgr.KVStore.Get(path)
+ if err != nil {
+ log.Error("failed to get data from kv store")
+ return nil, err
+ }
+ if kvPair != nil && kvPair.Value != nil {
+ if val, err = kvstore.ToByte(kvPair.Value); err != nil {
+ log.Error("Failed to convert to byte array ", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
+ log.Error("Failed to unmarshall ", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ return mcastQueueToIntfMap, nil
+}
+
+//AddMcastQueueForIntf adds multicast queue for pon interface
+func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(intf uint32, gem uint32, servicePriority uint32) error {
+ var val []byte
+ path := fmt.Sprintf(McastQueuesForIntf)
+
+ mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap()
+ if err != nil {
+ log.Errorw("Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+ return err
+ }
+ if mcastQueues == nil {
+ mcastQueues = make(map[uint32][]uint32)
+ }
+ mcastQueues[intf] = []uint32{gem, servicePriority}
+ if val, err = json.Marshal(mcastQueues); err != nil {
+ log.Errorw("Failed to marshal data", log.Fields{"error": err})
+ return err
+ }
+ if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ return err
+ }
+ log.Debugw("added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+ return nil
+}
+
+//AddFlowGroupToKVStore adds flow group into KV store
+func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(groupEntry *ofp.OfpGroupEntry, cached bool) error {
+ var Value []byte
+ var err error
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupEntry.Desc.GroupId)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupEntry.Desc.GroupId)
+ }
+ //build group info object
+ var outPorts []uint32
+ for _, ofBucket := range groupEntry.Desc.Buckets {
+ for _, ofAction := range ofBucket.Actions {
+ if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ outPorts = append(outPorts, ofAction.GetOutput().Port)
+ }
+ }
+ }
+ groupInfo := GroupInfo{
+ GroupID: groupEntry.Desc.GroupId,
+ OutPorts: outPorts,
+ }
+
+ Value, err = json.Marshal(groupInfo)
+
+ if err != nil {
+ log.Error("failed to Marshal flow group object")
+ return err
+ }
+
+ if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", path)
+ return err
+ }
+ return nil
+}
+
+//RemoveFlowGroupFromKVStore removes flow group from KV store
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(groupID uint32, cached bool) bool {
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupID)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupID)
+ }
+ if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ log.Errorf("Failed to remove resource %s due to %s", path, err)
+ return false
+ }
+ return true
+}
+
+//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
+//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
+// Returns (false, {}, nil) if the group does not exists in the KV store.
+func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (bool, GroupInfo, error) {
+ var groupInfo GroupInfo
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupID)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupID)
+ }
+ kvPair, err := RsrcMgr.KVStore.Get(path)
+ if err != nil {
+ return false, groupInfo, err
+ }
+ if kvPair != nil && kvPair.Value != nil {
+ Val, err := kvstore.ToByte(kvPair.Value)
+ if err != nil {
+ log.Errorw("Failed to convert flow group into byte array", log.Fields{"error": err})
+ return false, groupInfo, err
+ }
+ if err = json.Unmarshal(Val, &groupInfo); err != nil {
+ log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+ return false, groupInfo, err
+ }
+ return true, groupInfo, nil
+ }
+ return false, groupInfo, nil
+}