VOL-1596 Add Support for handling multicast groups in OpenOLT Adapter.
VOL-1595 Add Support for handling multicast flows in OpenOLT Adapter.
Depends voltha-protos from the patch below:
https://gerrit.opencord.org/#/c/16690/
Change-Id: I1cc9900bd6400bb31aed11beda674138838a21d2
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 3ec4d0e..3def7bc 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -34,15 +34,15 @@
backoff "github.com/cenkalti/backoff/v3"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/pmmetrics"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
- "github.com/opencord/voltha-protos/v2/go/common"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- of "github.com/opencord/voltha-protos/v2/go/openflow_13"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/common"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ of "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
@@ -228,7 +228,7 @@
}
func (dh *DeviceHandler) addPort(intfID uint32, portType voltha.Port_PortType, state string) {
- var operStatus common.OperStatus_OperStatus
+ var operStatus common.OperStatus_Types
if state == "up" {
operStatus = voltha.OperStatus_ACTIVE
} else {
@@ -798,7 +798,7 @@
func (dh *DeviceHandler) sendProxiedMessage(onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) {
var intfID uint32
var onuID uint32
- var connectStatus common.ConnectStatus_ConnectStatus
+ var connectStatus common.ConnectStatus_Types
if onuDevice != nil {
intfID = onuDevice.ProxyAddress.GetChannelId()
onuID = onuDevice.ProxyAddress.GetOnuId()
@@ -1120,6 +1120,18 @@
// dh.flowMgr.RemoveFlow(flow)
}
}
+
+ if groups != nil {
+ for _, group := range groups.ToAdd.Items {
+ dh.flowMgr.AddGroup(group)
+ }
+ for _, group := range groups.ToUpdate.Items {
+ dh.flowMgr.ModifyGroup(group)
+ }
+ if len(groups.ToRemove.Items) != 0 {
+ log.Debug("Group delete operation is not supported for now")
+ }
+ }
log.Debug("UpdateFlowsIncrementally done successfully")
return nil
}
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 26b86ba..82cd862 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -25,21 +25,21 @@
"testing"
"time"
- "github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v3/pkg/pmmetrics"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v2/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
"github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
"github.com/opencord/voltha-openolt-adapter/mocks"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- of "github.com/opencord/voltha-protos/v2/go/openflow_13"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ of "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
func init() {
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 8c7cf8d..efc0029 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -20,10 +20,10 @@
import (
"errors"
- "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
/*=====================================================================
diff --git a/adaptercore/olt_platform_test.go b/adaptercore/olt_platform_test.go
index 7a799b8..141e8bd 100644
--- a/adaptercore/olt_platform_test.go
+++ b/adaptercore/olt_platform_test.go
@@ -22,9 +22,9 @@
"reflect"
"testing"
- fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
func TestMkUniPortNum(t *testing.T) {
diff --git a/adaptercore/olt_state_transitions.go b/adaptercore/olt_state_transitions.go
index feb07d7..697ee5f 100644
--- a/adaptercore/olt_state_transitions.go
+++ b/adaptercore/olt_state_transitions.go
@@ -21,7 +21,7 @@
"reflect"
"runtime"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
)
// DeviceState OLT Device state
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 375b137..7a53524 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -24,13 +24,13 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-openolt-adapter/config"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
//OpenOLT structure holds the OLT information
diff --git a/adaptercore/openolt_eventmgr.go b/adaptercore/openolt_eventmgr.go
index 19661ea..75086dc 100644
--- a/adaptercore/openolt_eventmgr.go
+++ b/adaptercore/openolt_eventmgr.go
@@ -21,10 +21,10 @@
"fmt"
"strconv"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
const (
diff --git a/adaptercore/openolt_eventmgr_test.go b/adaptercore/openolt_eventmgr_test.go
index 7430be4..7dbda43 100644
--- a/adaptercore/openolt_eventmgr_test.go
+++ b/adaptercore/openolt_eventmgr_test.go
@@ -23,7 +23,7 @@
"time"
"github.com/opencord/voltha-openolt-adapter/mocks"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
)
func mockEventMgr() *OpenOltEventMgr {
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 3376809..b8af453 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -28,16 +28,16 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
- "github.com/opencord/voltha-protos/v2/go/common"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- openoltpb2 "github.com/opencord/voltha-protos/v2/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/common"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
//deepcopy "github.com/getlantern/deepcopy"
"github.com/EagleChen/mapmutex"
@@ -57,6 +57,9 @@
//DhcpFlow flow category
DhcpFlow = "DHCP_FLOW"
+ //MulticastFlow flow category
+ MulticastFlow = "MULTICAST_FLOW"
+
//IgmpFlow flow category
IgmpFlow = "IGMP_FLOW"
@@ -90,6 +93,8 @@
Upstream = "upstream"
//Downstream constant
Downstream = "downstream"
+ //Multicast constant
+ Multicast = "multicast"
//PacketTagType constant
PacketTagType = "pkt_tag_type"
//Untagged constant
@@ -103,6 +108,8 @@
//EthType constant
EthType = "eth_type"
+ //EthDst constant
+ EthDst = "eth_dst"
//TPID constant
TPID = "tpid"
//IPProto constant
@@ -128,6 +135,8 @@
TunnelID = "tunnel_id"
//Output constant
Output = "output"
+ //GroupID constant
+ GroupID = "group_id"
// Actions
//PopVlan constant
@@ -152,6 +161,13 @@
PortNo = "portNo"
//AllocID constant
AllocID = "allocId"
+
+ //NoneOnuID constant
+ NoneOnuID = -1
+ //NoneUniID constant
+ NoneUniID = -1
+ //NoneGemPortID constant
+ NoneGemPortID = -1
)
type gemPortKey struct {
@@ -183,6 +199,11 @@
flowMetadata *voltha.FlowMetadata
}
+type queueInfoBrief struct {
+ gemPortID uint32
+ servicePriority uint32
+}
+
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
techprofile map[uint32]tp.TechProfileIf
@@ -195,7 +216,8 @@
lockCache sync.RWMutex
pendingFlowDelete sync.Map
// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
- perUserFlowHandleLock *mapmutex.Mutex
+ perUserFlowHandleLock *mapmutex.Mutex
+ interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -228,6 +250,9 @@
flowMgr.lockCache = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
+ flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+ //load interface to multicast queue map from kv store
+ flowMgr.loadInterfaceToMulticastQueueMap()
log.Info("Initialization of flow manager success!!")
return &flowMgr
}
@@ -239,6 +264,9 @@
} else if direction == Downstream {
log.Debug("downstream flow, not shifting id")
return uint64(flowID), nil
+ } else if direction == Multicast {
+ log.Debug("multicast flow, shifting id")
+ return 0x2<<15 | uint64(flowID), nil
} else {
log.Debug("Unrecognized direction")
return 0, fmt.Errorf("unrecognized direction %s", direction)
@@ -427,6 +455,25 @@
return err
}
+ if sq.direction == tp_pb.Direction_DOWNSTREAM {
+ multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(sq.tpInst)
+ if len(multicastTrafficQueues) > 0 {
+ if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+ //assumed that there is only one queue per PON for the multicast service
+ //the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
+ //just put it in interfaceToMcastQueueMap to use for building group members
+ multicastQueuePerPonPort := multicastTrafficQueues[0]
+ f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+ gemPortID: multicastQueuePerPonPort.GemportId,
+ servicePriority: multicastQueuePerPonPort.Priority,
+ }
+ //also store the queue info in kv store
+ f.resourceMgr.AddMcastQueueForIntf(sq.intfID,
+ multicastQueuePerPonPort.GemportId,
+ multicastQueuePerPonPort.Priority)
+ }
+ }
+ }
return nil
}
@@ -998,6 +1045,7 @@
classifier.DstPort, _ = classifierInfo[UDPDst].(uint32)
classifier.DstIp, _ = classifierInfo[Ipv4Dst].(uint32)
classifier.SrcIp, _ = classifierInfo[Ipv4Src].(uint32)
+ classifier.DstMac, _ = classifierInfo[EthDst].([]uint8)
if pktTagType, ok := classifierInfo[PacketTagType].(string); ok {
classifier.PktTagType = pktTagType
@@ -1505,6 +1553,12 @@
func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
+
+ if flowDirection == Multicast {
+ f.clearMulticastFlowFromResourceManager(flow)
+ return
+ }
+
var updatedFlows []rsrcMgr.FlowInfo
var flowID uint32
var onuID, uniID int32
@@ -1570,6 +1624,61 @@
}
}
+//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(flow *ofp.OfpFlowStats) {
+ classifierInfo := make(map[string]interface{})
+ formulateClassifierInfoFromFlow(classifierInfo, flow)
+ inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+
+ if err != nil {
+ log.Warnw("No inPort found. Cannot release resources of the multicast flow.", log.Fields{"flowId:": flow.Id})
+ return
+ }
+
+ networkInterfaceID := IntfIDFromNniPortNum(inPort)
+ var onuID = int32(NoneOnuID)
+ var uniID = int32(NoneUniID)
+ var flowID uint32
+ var updatedFlows []rsrcMgr.FlowInfo
+
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(networkInterfaceID, onuID, uniID)
+
+ for _, flowID = range flowIds {
+ flowInfo := f.resourceMgr.GetFlowIDInfo(networkInterfaceID, onuID, uniID, flowID)
+ if flowInfo == nil {
+ log.Debugw("No multicast FlowInfo found in the KV store",
+ log.Fields{"Intf": networkInterfaceID, "onuID": onuID, "uniID": uniID, "flowID": flowID})
+ continue
+ }
+ updatedFlows = nil
+ for _, flow := range *flowInfo {
+ updatedFlows = append(updatedFlows, flow)
+ }
+ for i, storedFlow := range updatedFlows {
+ if flow.Id == storedFlow.LogicalFlowID {
+ removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+ log.Debugw("Multicast flow to be deleted", log.Fields{"flow": storedFlow})
+ //remove from device
+ if ok := f.removeFlowFromDevice(&removeFlowMessage); !ok {
+ log.Errorw("Failed to remove multicast flow from device", log.Fields{"flowId": flow.Id})
+ return
+ }
+ log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
+ //Remove the Flow from FlowInfo
+ updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+ if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+ log.Error("Failed to delete multicast flow from the KV store", log.Fields{"flow": storedFlow, "err": err})
+ return
+ }
+ //release flow id
+ log.Debugw("Releasing multicast flow id", log.Fields{"flowId": flowID, "interfaceID": networkInterfaceID})
+ f.resourceMgr.FreeFlowID(uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+ }
+ }
+ }
+}
+
//RemoveFlow removes the flow from the device
func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
log.Debugw("Removing Flow", log.Fields{"flow": flow})
@@ -1587,12 +1696,14 @@
}
}
}
- if IsUpstream(actionInfo[Output].(uint32)) {
+
+ if flows.HasGroup(flow) {
+ direction = Multicast
+ } else if IsUpstream(actionInfo[Output].(uint32)) {
direction = Upstream
} else {
direction = Downstream
}
-
f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
return
@@ -1650,6 +1761,12 @@
return
}
+ if flows.HasGroup(flow) {
+ // handle multicast flow
+ f.handleFlowWithGroup(actionInfo, classifierInfo, flow)
+ return
+ }
+
/* Controller bound trap flows */
err = formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo, flow)
if err != nil {
@@ -1723,6 +1840,283 @@
}
}
+// handleFlowWithGroup adds multicast flow to the device.
+func (f *OpenOltFlowMgr) handleFlowWithGroup(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+ classifierInfo[PacketTagType] = DoubleTag
+ log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
+
+ inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+ if err != nil {
+ log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
+ return
+ }
+ //replace ipDst with ethDst
+ if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
+ flows.IsMulticastIp(ipv4Dst.(uint32)) {
+ // replace ipv4_dst classifier with eth_dst
+ multicastMac := flows.ConvertToMulticastMacBytes(ipv4Dst.(uint32))
+ delete(classifierInfo, Ipv4Dst)
+ delete(classifierInfo, EthType)
+ classifierInfo[EthDst] = multicastMac
+ log.Debugw("multicast-ip-to-mac-conversion-success", log.Fields{"ip:": ipv4Dst.(uint32), "mac:": multicastMac})
+ }
+
+ var onuID = NoneOnuID
+ var uniID = NoneUniID
+ var gemPortID = NoneGemPortID
+
+ networkInterfaceID := IntfIDFromNniPortNum(inPort)
+
+ var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
+ return
+ }
+ flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+ if err != nil {
+ log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
+ return
+ }
+ var classifierProto *openoltpb2.Classifier
+ if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
+ log.Error("Error in making classifier protobuf for multicast flow")
+ return
+ }
+ groupID := actionInfo[GroupID].(uint32)
+ multicastFlow := openoltpb2.Flow{
+ FlowId: flowID,
+ FlowType: Multicast,
+ NetworkIntfId: int32(networkInterfaceID),
+ GroupId: groupID,
+ Classifier: classifierProto,
+ Priority: int32(flow.Priority),
+ Cookie: flow.Cookie}
+
+ if ok := f.addFlowToDevice(flow, &multicastFlow); ok {
+ log.Debug("multicast flow added to device successfully")
+ //get cached group
+ group, _, err := f.GetFlowGroupFromKVStore(groupID, true)
+ if err == nil {
+ //calling groupAdd to set group members after multicast flow creation
+ if f.ModifyGroup(group) {
+ //cached group can be removed now
+ f.resourceMgr.RemoveFlowGroupFromKVStore(groupID, true)
+ }
+ }
+
+ flowsToKVStore := f.getUpdatedFlowInfo(&multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ int32(onuID),
+ int32(uniID),
+ flowID, flowsToKVStore); err != nil {
+ log.Errorw("Error uploading multicast flow into KV store", log.Fields{"flow": multicastFlow, "error": err})
+ }
+ }
+ return
+}
+
+//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
+func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(classifierInfo map[string]interface{}) (uint32, error) {
+ if _, ok := classifierInfo[InPort]; ok {
+ return classifierInfo[InPort].(uint32), nil
+ }
+ // find first NNI port of the device
+ nniPorts, e := f.resourceMgr.GetNNIFromKVStore()
+ if e == nil && len(nniPorts) > 0 {
+ return nniPorts[0], nil
+ }
+ return 0, errors.New("cannot find NNI port of device")
+}
+
+// AddGroup add or update the group
+func (f *OpenOltFlowMgr) AddGroup(group *ofp.OfpGroupEntry) {
+ log.Infow("add-group", log.Fields{"group": group})
+ if group == nil {
+ log.Warn("skipping nil group")
+ return
+ }
+
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ Command: openoltpb2.Group_SET_MEMBERS,
+ Action: f.buildGroupAction(),
+ }
+
+ log.Debugw("Sending group to device", log.Fields{"groupToOlt": groupToOlt})
+ _, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), &groupToOlt)
+ if err != nil {
+ log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
+ return
+ }
+ // group members not created yet. So let's store the group
+ if err := f.resourceMgr.AddFlowGroupToKVStore(group, true); err != nil {
+ log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
+ } else {
+ log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+ }
+}
+
+//buildGroupAction creates and returns a group action
+func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
+ var actionCmd openoltpb2.ActionCmd
+ var action openoltpb2.Action
+ action.Cmd = &actionCmd
+ //pop outer vlan
+ action.Cmd.RemoveOuterTag = true
+ return &action
+}
+
+// ModifyGroup updates the group
+func (f *OpenOltFlowMgr) ModifyGroup(group *ofp.OfpGroupEntry) bool {
+ log.Infow("modify-group", log.Fields{"group": group})
+ if group == nil || group.Desc == nil {
+ log.Warn("cannot modify group; group is nil")
+ return false
+ }
+
+ new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
+ //get existing members of the group
+ val, groupExists, err := f.GetFlowGroupFromKVStore(group.Desc.GroupId, false)
+
+ if err != nil {
+ log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
+ log.Fields{"groupId": group.Desc.GroupId, "err": err})
+ return false
+ }
+
+ var current *openoltpb2.Group
+ if groupExists {
+ // group already exists
+ current = f.buildGroup(group.Desc.GroupId, val.Desc.GetBuckets())
+ log.Debugw("modify-group: group exists.", log.Fields{"current": val, "new": group})
+ } else {
+ current = f.buildGroup(group.Desc.GroupId, nil)
+ }
+
+ log.Debugw("modify-group: comparing current and new.", log.Fields{"current": current, "new": new})
+ // check if the buckets are identical
+ bucketsIdentical := f.bucketsIdentical(current, new)
+
+ isSuccess := true
+ if !bucketsIdentical {
+ groupToOlt := openoltpb2.Group{
+ GroupId: group.Desc.GroupId,
+ Command: openoltpb2.Group_SET_MEMBERS,
+ Members: new.Members,
+ Action: f.buildGroupAction(),
+ }
+
+ if err := f.callGroupAdd(&groupToOlt); err != nil {
+ log.Warnw("One of the group add/remove operations has failed. Cannot save group modifications",
+ log.Fields{"group": group})
+ isSuccess = false
+ }
+ }
+
+ if isSuccess {
+ if err := f.resourceMgr.AddFlowGroupToKVStore(group, false); err != nil {
+ log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
+ }
+ log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
+ }
+ return isSuccess
+}
+
+//bucketsIdentical returns true if groups are identical; false otherwise
+func (f *OpenOltFlowMgr) bucketsIdentical(current *openoltpb2.Group, new *openoltpb2.Group) bool {
+ if current.GroupId == new.GroupId &&
+ len(new.Members) == len(current.Members) {
+ diff := f.findDiff(current, new)
+ if diff == nil || len(diff) < 1 {
+ log.Infow("modify-group: current and new buckets are the same. Won't send SET_MEMBERS again.",
+ log.Fields{"groupId:": current.GroupId})
+ return true
+ }
+ }
+ return false
+}
+
+//findDiff compares group members and finds members which only exists in groups2
+func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
+ var members []*openoltpb2.GroupMember
+ for _, bucket := range group2.Members {
+ if !f.contains(group1.Members, bucket) {
+ // bucket does not exist and must be added
+ members = append(members, bucket)
+ }
+ }
+ return members
+}
+
+//contains returns true if the members list contains the given member; false otherwise
+func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
+ for _, groupMember := range members {
+ if groupMember.InterfaceId == member.InterfaceId {
+ return true
+ }
+ }
+ return false
+}
+
+//callGroupAdd call GroupAdd operation of openolt proto
+func (f *OpenOltFlowMgr) callGroupAdd(group *openoltpb2.Group) error {
+ log.Debugw("Sending group to device", log.Fields{"groupToOlt": group, "command": group.Command})
+ _, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), group)
+ if err != nil {
+ log.Errorw("group operation failed", log.Fields{"err": err, "groupToOlt": group})
+ }
+ return err
+}
+
+//buildGroup build openoltpb2.Group from given group id and bucket list
+func (f *OpenOltFlowMgr) buildGroup(groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
+ group := openoltpb2.Group{
+ GroupId: groupID}
+ // create members of the group
+ if buckets != nil {
+ for _, ofBucket := range buckets {
+ member := f.buildMember(ofBucket)
+ if member != nil && !f.contains(group.Members, member) {
+ group.Members = append(group.Members, member)
+ }
+ }
+ }
+ return &group
+}
+
+//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
+func (f *OpenOltFlowMgr) buildMember(ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
+ var outPort uint32
+ outPortFound := false
+ for _, ofAction := range ofBucket.Actions {
+ if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ outPort = ofAction.GetOutput().Port
+ outPortFound = true
+ }
+ }
+
+ if !outPortFound {
+ log.Debugw("bucket skipped since no out port found in it",
+ log.Fields{"ofBucket": ofBucket})
+ return nil
+ }
+ interfaceID := IntfIDFromUniPortNum(outPort)
+ log.Debugw("got associated interface id of the port", log.Fields{"portNumber:": outPort, "interfaceId:": interfaceID})
+ if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
+ member := openoltpb2.GroupMember{
+ InterfaceId: interfaceID,
+ InterfaceType: openoltpb2.GroupMember_PON,
+ GemPortId: groupInfo.gemPortID,
+ Priority: groupInfo.servicePriority,
+ }
+ //add member to the group
+ return &member
+ }
+ log.Warnf("bucket skipped since interface-2-gem mapping cannot be found",
+ log.Fields{"ofBucket": ofBucket})
+ return nil
+}
+
//sendTPDownloadMsgToChild send payload
func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
@@ -2228,6 +2622,9 @@
if field.Type == flows.ETH_TYPE {
classifierInfo[EthType] = field.GetEthType()
log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
+ } else if field.Type == flows.ETH_DST {
+ classifierInfo[EthDst] = field.GetEthDst()
+ log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_DST]": classifierInfo[EthDst].([]uint8)})
} else if field.Type == flows.IP_PROTO {
classifierInfo[IPProto] = field.GetIpProto()
log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
@@ -2297,20 +2694,11 @@
return errors.New("invalid openflow class")
}
/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
- if ofbField := field.GetOfbField(); ofbField != nil {
- if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
- if vlan := ofbField.GetVlanVid(); vlan != 0 {
- actionInfo[VlanVid] = vlan & 0xfff
- log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
- } else {
- log.Error("No Invalid vlan id in set vlan-vid action")
- }
- } else {
- log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
- }
- }
+ formulateSetFieldActionInfoFromFlow(field, actionInfo)
}
}
+ } else if action.Type == flows.GROUP {
+ formulateGroupActionInfoFromFlow(action, actionInfo)
} else {
log.Errorw("Un supported action type", log.Fields{"type": action.Type})
return errors.New("un supported action type")
@@ -2319,6 +2707,30 @@
return nil
}
+func formulateSetFieldActionInfoFromFlow(field *ofp.OfpOxmField, actionInfo map[string]interface{}) {
+ if ofbField := field.GetOfbField(); ofbField != nil {
+ if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
+ if vlan := ofbField.GetVlanVid(); vlan != 0 {
+ actionInfo[VlanVid] = vlan & 0xfff
+ log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
+ } else {
+ log.Error("No Invalid vlan id in set vlan-vid action")
+ }
+ } else {
+ log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
+ }
+ }
+}
+
+func formulateGroupActionInfoFromFlow(action *ofp.OfpAction, actionInfo map[string]interface{}) {
+ if action.GetGroup() == nil {
+ log.Warn("No group entry found in the group action")
+ } else {
+ actionInfo[GroupID] = action.GetGroup().GroupId
+ log.Debugw("action-group-id", log.Fields{"actionInfo[GroupID]": actionInfo[GroupID].(uint32)})
+ }
+}
+
func formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
if isControllerFlow := IsControllerBoundFlow(actionInfo[Output].(uint32)); isControllerFlow {
log.Debug("Controller bound trap flows, getting inport from tunnelid")
@@ -2456,3 +2868,54 @@
}
return
}
+
+//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
+//and put them into interfaceToMcastQueueMap.
+func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap() {
+ storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap()
+ if err != nil {
+ log.Error("Failed to get pon interface to multicast queue map")
+ return
+ }
+ for intf, queueInfo := range storedMulticastQueueMap {
+ q := queueInfoBrief{
+ gemPortID: queueInfo[0],
+ servicePriority: queueInfo[1],
+ }
+ f.interfaceToMcastQueueMap[intf] = &q
+ }
+}
+
+//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
+//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
+//Returns (nil, false, nil) if the group does not exists in the KV store.
+func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+ exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(groupID, cached)
+ if err != nil {
+ log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
+ return nil, false, errors.New("failed to retrieve the flow group")
+ }
+ if exists {
+ return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
+ }
+ return nil, exists, nil
+}
+
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+ groupDesc := ofp.OfpGroupDesc{
+ Type: ofp.OfpGroupType_OFPGT_ALL,
+ GroupId: groupID,
+ }
+ groupEntry := ofp.OfpGroupEntry{
+ Desc: &groupDesc,
+ }
+ var acts []*ofp.OfpAction
+ for i := 0; i < len(outPorts); i++ {
+ acts = append(acts, flows.Output(outPorts[i]))
+ }
+ bucket := ofp.OfpBucket{
+ Actions: acts,
+ }
+ groupDesc.Buckets = []*ofp.OfpBucket{&bucket}
+ return &groupEntry
+}
diff --git a/adaptercore/openolt_flowmgr_test.go b/adaptercore/openolt_flowmgr_test.go
index 3b19081..8342a79 100644
--- a/adaptercore/openolt_flowmgr_test.go
+++ b/adaptercore/openolt_flowmgr_test.go
@@ -21,19 +21,19 @@
"fmt"
"testing"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/opencord/voltha-lib-go/v2/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
"github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
"github.com/opencord/voltha-openolt-adapter/mocks"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/openolt"
- openoltpb2 "github.com/opencord/voltha-protos/v2/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
+ openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
)
var flowMgr *OpenOltFlowMgr
@@ -88,6 +88,13 @@
tps[key] = mocks.MockTechProfile{TpID: key}
}
flwMgr.techprofile = tps
+
+ interface2mcastQeueuMap := make(map[uint32]*queueInfoBrief)
+ interface2mcastQeueuMap[0] = &queueInfoBrief{
+ gemPortID: 4000,
+ servicePriority: 3,
+ }
+ flwMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
return flwMgr
}
@@ -244,6 +251,23 @@
}
dhcpofpstats := fu.MkFlowStat(dhcpFa)
//dhcpofpstats.Cookie = dhcpofpstats.Id
+
+ //multicast flow
+ multicastFa := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(65536),
+ fu.VlanVid(660), //vlan
+ fu.Metadata_ofp(uint64(66)), //inner vlan
+ fu.EthType(0x800), //ipv4
+ fu.Ipv4Dst(3809869825), //227.22.0.1
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Group(1),
+ },
+ }
+ multicastOfpStats := fu.MkFlowStat(multicastFa)
+ multicastOfpStats.Id = 1
+
type args struct {
flow *ofp.OfpFlowStats
}
@@ -255,6 +279,7 @@
{"RemoveFlow", args{flow: ofpstats}},
{"RemoveFlow", args{flow: lldpofpstats}},
{"RemoveFlow", args{flow: dhcpofpstats}},
+ {"RemoveFlow", args{flow: multicastOfpStats}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -443,6 +468,20 @@
},
KV: kw6,
}
+ //multicast flow
+ fa11 := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(65536),
+ fu.VlanVid(660), //vlan
+ fu.Metadata_ofp(uint64(66)), //inner vlan
+ fu.EthType(0x800), //ipv4
+ fu.Ipv4Dst(3809869825), //227.22.0.1
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Group(1),
+ },
+ KV: kw6,
+ }
ofpstats := fu.MkFlowStat(fa)
ofpstats2 := fu.MkFlowStat(fa2)
ofpstats3 := fu.MkFlowStat(fa3)
@@ -454,6 +493,7 @@
ofpstats9 := fu.MkFlowStat(fa9)
ofpstats10 := fu.MkFlowStat(fa10)
igmpstats := fu.MkFlowStat(igmpFa)
+ ofpstats11 := fu.MkFlowStat(fa11)
fmt.Println(ofpstats6, ofpstats9, ofpstats10)
@@ -482,6 +522,7 @@
{"AddFlow", args{flow: igmpstats, flowMetadata: flowMetadata}},
//{"AddFlow", args{flow: ofpstats10, flowMetadata: flowMetadata}},
//ofpstats10
+ {"AddFlow", args{flow: ofpstats11, flowMetadata: flowMetadata}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -855,3 +896,30 @@
})
}
}
+
+func TestOpenOltFlowMgr_TestMulticastFlow(t *testing.T) {
+ //create group
+ group := newGroup(2, []uint32{1})
+ flowMgr.AddGroup(group)
+
+ //create multicast flow
+ multicastFlowArgs := &fu.FlowArgs{
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(65536),
+ fu.VlanVid(660), //vlan
+ fu.Metadata_ofp(uint64(66)), //inner vlan
+ fu.EthType(0x800), //ipv4
+ fu.Ipv4Dst(3809869825), //227.22.0.1
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Group(1),
+ },
+ }
+ ofpStats := fu.MkFlowStat(multicastFlowArgs)
+ flowMgr.AddFlow(ofpStats, &voltha.FlowMetadata{})
+
+ //add bucket to the group
+ group = newGroup(2, []uint32{1, 2})
+
+ flowMgr.ModifyGroup(group)
+}
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index 47e9ce3..aefd862 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -29,14 +29,14 @@
"sync"
"testing"
- com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
- fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-openolt-adapter/config"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "github.com/opencord/voltha-protos/v2/go/openflow_13"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
// mocks the OpenOLT struct.
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 21f21c9..0e3da4d 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -24,12 +24,12 @@
"strconv"
"strings"
- "github.com/opencord/voltha-lib-go/v2/pkg/db"
- "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
)
const (
@@ -48,6 +48,20 @@
OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
//FlowIDsForGem flowids_per_gem/<intfid>
FlowIDsForGem = "flowids_per_gem/{%d}"
+ //McastQueuesForIntf multicast queues for pon interfaces
+ McastQueuesForIntf = "mcast_qs_for_int"
+ //FlowGroup flow_groups/<flow_group_id>
+ // A group is stored under this path on the KV store after it has been installed to the device.
+ // It should also be deleted after it has been removed from the device accordingly.
+ FlowGroup = "flow_groups/{%d}"
+ //FlowGroupCached flow_groups_cached/<flow_group_id>
+ // When a group add request received, we create the group without setting any members to it since we cannot
+ // set any members to a group until it is associated with a multicast flow. It is a BAL limitation.
+ // When the related multicast flow has been created we perform set members operation for the group.
+ // That is why we need to keep the members of a group until the multicast flow creation request comes.
+ // We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
+ // we remove the group from the cached group store.
+ FlowGroupCached = "flow_groups_cached/{%d}"
)
// FlowInfo holds the flow information
@@ -74,6 +88,12 @@
LogicalPort uint32
}
+// GroupInfo holds group information
+type GroupInfo struct {
+ GroupID uint32
+ OutPorts []uint32
+}
+
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
DeviceID string // OLT device id
@@ -1269,9 +1289,9 @@
log.Error("failed to get data from kv store")
return nil, err
}
- if value != nil {
+ if value != nil && value.Value != nil {
if val, err = kvstore.ToByte(value.Value); err != nil {
- log.Error("Failed to convert to byte array", log.Fields{"error": err})
+ log.Error("Failed to convert to byte array ", log.Fields{"error": err})
return nil, err
}
if err = json.Unmarshal(val, &flowsForGem); err != nil {
@@ -1298,3 +1318,136 @@
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
}
+
+//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
+func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap() (map[uint32][]uint32, error) {
+ path := fmt.Sprintf(McastQueuesForIntf)
+ var mcastQueueToIntfMap map[uint32][]uint32
+ var val []byte
+
+ kvPair, err := RsrcMgr.KVStore.Get(path)
+ if err != nil {
+ log.Error("failed to get data from kv store")
+ return nil, err
+ }
+ if kvPair != nil && kvPair.Value != nil {
+ if val, err = kvstore.ToByte(kvPair.Value); err != nil {
+ log.Error("Failed to convert to byte array ", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
+ log.Error("Failed to unmarshall ", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ return mcastQueueToIntfMap, nil
+}
+
+//AddMcastQueueForIntf adds multicast queue for pon interface
+func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(intf uint32, gem uint32, servicePriority uint32) error {
+ var val []byte
+ path := fmt.Sprintf(McastQueuesForIntf)
+
+ mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap()
+ if err != nil {
+ log.Errorw("Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+ return err
+ }
+ if mcastQueues == nil {
+ mcastQueues = make(map[uint32][]uint32)
+ }
+ mcastQueues[intf] = []uint32{gem, servicePriority}
+ if val, err = json.Marshal(mcastQueues); err != nil {
+ log.Errorw("Failed to marshal data", log.Fields{"error": err})
+ return err
+ }
+ if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ return err
+ }
+ log.Debugw("added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+ return nil
+}
+
+//AddFlowGroupToKVStore adds flow group into KV store
+func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(groupEntry *ofp.OfpGroupEntry, cached bool) error {
+ var Value []byte
+ var err error
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupEntry.Desc.GroupId)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupEntry.Desc.GroupId)
+ }
+ //build group info object
+ var outPorts []uint32
+ for _, ofBucket := range groupEntry.Desc.Buckets {
+ for _, ofAction := range ofBucket.Actions {
+ if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ outPorts = append(outPorts, ofAction.GetOutput().Port)
+ }
+ }
+ }
+ groupInfo := GroupInfo{
+ GroupID: groupEntry.Desc.GroupId,
+ OutPorts: outPorts,
+ }
+
+ Value, err = json.Marshal(groupInfo)
+
+ if err != nil {
+ log.Error("failed to Marshal flow group object")
+ return err
+ }
+
+ if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", path)
+ return err
+ }
+ return nil
+}
+
+//RemoveFlowGroupFromKVStore removes flow group from KV store
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(groupID uint32, cached bool) bool {
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupID)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupID)
+ }
+ if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ log.Errorf("Failed to remove resource %s due to %s", path, err)
+ return false
+ }
+ return true
+}
+
+//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
+//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
+// Returns (false, {}, nil) if the group does not exists in the KV store.
+func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (bool, GroupInfo, error) {
+ var groupInfo GroupInfo
+ var path string
+ if cached {
+ path = fmt.Sprintf(FlowGroupCached, groupID)
+ } else {
+ path = fmt.Sprintf(FlowGroup, groupID)
+ }
+ kvPair, err := RsrcMgr.KVStore.Get(path)
+ if err != nil {
+ return false, groupInfo, err
+ }
+ if kvPair != nil && kvPair.Value != nil {
+ Val, err := kvstore.ToByte(kvPair.Value)
+ if err != nil {
+ log.Errorw("Failed to convert flow group into byte array", log.Fields{"error": err})
+ return false, groupInfo, err
+ }
+ if err = json.Unmarshal(Val, &groupInfo); err != nil {
+ log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+ return false, groupInfo, err
+ }
+ return true, groupInfo, nil
+ }
+ return false, groupInfo, nil
+}
diff --git a/adaptercore/resourcemanager/resourcemanager_test.go b/adaptercore/resourcemanager/resourcemanager_test.go
index 1eac99f..2cc6953 100644
--- a/adaptercore/resourcemanager/resourcemanager_test.go
+++ b/adaptercore/resourcemanager/resourcemanager_test.go
@@ -26,12 +26,13 @@
import (
"encoding/json"
"errors"
- "github.com/opencord/voltha-lib-go/v2/pkg/db"
- "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
"reflect"
"strconv"
"strings"
@@ -161,6 +162,19 @@
str, _ := json.Marshal(1)
return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
}
+ if strings.Contains(key, McastQueuesForIntf) {
+ log.Debug("Error Error Error Key:", McastQueuesForIntf)
+ mcastQueues := make(map[uint32][]uint32)
+ mcastQueues[10] = []uint32{4000, 0}
+ str, _ := json.Marshal(mcastQueues)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+ if strings.Contains(key, "flow_groups") && !strings.Contains(key, "1000") {
+ groupInfo := GroupInfo{GroupID: 2, OutPorts: []uint32{2}}
+ str, _ := json.Marshal(groupInfo)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+
maps := make(map[string]*kvstore.KVPair)
maps[key] = &kvstore.KVPair{Key: key}
return maps[key], nil
@@ -971,3 +985,141 @@
})
}
}
+
+func TestOpenOltResourceMgr_AddMcastQueueForIntf(t *testing.T) {
+ type args struct {
+ intf uint32
+ gem uint32
+ servicePriority uint32
+ }
+ tests := []struct {
+ name string
+ args args
+ fields *fields
+ }{
+ {"AddMcastQueueForIntf-1", args{0, 4000, 0}, getResMgr()},
+ {"AddMcastQueueForIntf-2", args{1, 4000, 1}, getResMgr()},
+ {"AddMcastQueueForIntf-3", args{2, 4000, 2}, getResMgr()},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ err := RsrcMgr.AddMcastQueueForIntf(tt.args.intf, tt.args.gem, tt.args.servicePriority)
+ if err != nil {
+ t.Errorf("%s got err= %s wants nil", tt.name, err)
+ return
+ }
+ })
+ }
+}
+
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+ groupDesc := ofp.OfpGroupDesc{
+ Type: ofp.OfpGroupType_OFPGT_ALL,
+ GroupId: groupID,
+ }
+ groupEntry := ofp.OfpGroupEntry{
+ Desc: &groupDesc,
+ }
+ var acts []*ofp.OfpAction
+ for i := 0; i < len(outPorts); i++ {
+ acts = append(acts, fu.Output(outPorts[i]))
+ }
+ bucket := ofp.OfpBucket{
+ Actions: acts,
+ }
+ groupDesc.Buckets = []*ofp.OfpBucket{&bucket}
+ return &groupEntry
+}
+
+func TestOpenOltResourceMgr_AddFlowGroupToKVStore(t *testing.T) {
+ type args struct {
+ group *ofp.OfpGroupEntry
+ cached bool
+ }
+ //create group 1
+ group1 := newGroup(1, []uint32{1})
+ //create group 2
+ group2 := newGroup(2, []uint32{2})
+ //define test set
+ tests := []struct {
+ name string
+ args args
+ fields *fields
+ }{
+ {"AddFlowGroupToKVStore-1", args{group1, true}, getResMgr()},
+ {"AddFlowGroupToKVStore-2", args{group2, false}, getResMgr()},
+ }
+ //execute tests
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ err := RsrcMgr.AddFlowGroupToKVStore(tt.args.group, tt.args.cached)
+ if err != nil {
+ t.Errorf("%s got err= %s wants nil", tt.name, err)
+ return
+ }
+ })
+ }
+}
+
+func TestOpenOltResourceMgr_RemoveFlowGroupFromKVStore(t *testing.T) {
+ type args struct {
+ groupID uint32
+ cached bool
+ }
+ //define test set
+ tests := []struct {
+ name string
+ args args
+ fields *fields
+ }{
+ {"RemoveFlowGroupFromKVStore-1", args{1, true}, getResMgr()},
+ {"RemoveFlowGroupFromKVStore-2", args{2, false}, getResMgr()},
+ }
+ //execute tests
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ success := RsrcMgr.RemoveFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+ if !success {
+ t.Errorf("%s got false but wants true", tt.name)
+ return
+ }
+ })
+ }
+}
+
+func TestOpenOltResourceMgr_GetFlowGroupFromKVStore(t *testing.T) {
+ type args struct {
+ groupID uint32
+ cached bool
+ }
+ //define test set
+ tests := []struct {
+ name string
+ args args
+ fields *fields
+ }{
+ {"GetFlowGroupFromKVStore-1", args{1, true}, getResMgr()},
+ {"GetFlowGroupFromKVStore-2", args{2, false}, getResMgr()},
+ {"GetFlowGroupFromKVStore-3", args{1000, false}, getResMgr()},
+ }
+ //execute tests
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ RsrcMgr := testResMgrObject(tt.fields)
+ exists, groupInfo, err := RsrcMgr.GetFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+ if err != nil {
+ t.Errorf("%s got error but wants nil error", tt.name)
+ return
+ } else if exists && (groupInfo.GroupID == 0) {
+ t.Errorf("%s got true and nil group info but expected not nil group info", tt.name)
+ return
+ } else if tt.args.groupID == 3 && exists {
+ t.Errorf("%s got true but wants false", tt.name)
+ return
+ }
+ })
+ }
+}
diff --git a/adaptercore/statsmanager.go b/adaptercore/statsmanager.go
index c42e86b..0905c1e 100755
--- a/adaptercore/statsmanager.go
+++ b/adaptercore/statsmanager.go
@@ -23,9 +23,9 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-protos/v2/go/openolt"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
var mutex = &sync.Mutex{}
@@ -355,7 +355,7 @@
var metricInfo voltha.MetricInformation
var ke voltha.KpiEvent2
- var volthaEventSubCatgry voltha.EventSubCategory_EventSubCategory
+ var volthaEventSubCatgry voltha.EventSubCategory_Types
if portType == "NNIStats" {
volthaEventSubCatgry = voltha.EventSubCategory_NNI
diff --git a/adaptercore/statsmanager_test.go b/adaptercore/statsmanager_test.go
index 6f16493..c6fddc5 100644
--- a/adaptercore/statsmanager_test.go
+++ b/adaptercore/statsmanager_test.go
@@ -21,9 +21,9 @@
"reflect"
"testing"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-protos/v2/go/openolt"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
func init() {