[VOL-5255] Use all uplink interfaces not only interface 0 at OLT
Raising new MR after addressing comments from the original MR.
Link to original MR:
https://gerrit.opencord.org/c/voltha-openolt-adapter/+/34993
Change-Id: Ic2b1589db87755ca62f656e18bc5e314c3e85cd8
Signed-off-by: Sridhar Ravindra <sridhar.ravindra@radisys.com>
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 7389cb7..5b4a474 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -164,6 +164,7 @@
flowMetadata *ofp.FlowMetadata
direction tp_pb.Direction
intfID uint32
+ nniIntfID uint32
onuID uint32
uniID uint32
tpID uint32
@@ -283,7 +284,7 @@
return nil
}
-func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, nni_port uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) error {
var allocID uint32
@@ -332,7 +333,7 @@
"usmeter-id": UsMeterID,
"dsmeter-id": DsMeterID,
"tp-id": TpID})
- allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, nni_port, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
if allocID == 0 || gemPorts == nil || TpInst == nil {
logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
return olterrors.NewErrNotFound(
@@ -358,6 +359,7 @@
logger.Debugw(ctx, "CreateSchedulerQueues",
log.Fields{"dir": sq.direction,
"intf-id": sq.intfID,
+ "nniIntfID": sq.nniIntfID,
"onu-id": sq.onuID,
"uni-id": sq.uniID,
"tp-id": sq.tpID,
@@ -512,7 +514,7 @@
log.Fields{"direction": sq.direction,
"traffic-queues": trafficQueues,
"device-id": f.deviceHandler.device.Id})
- queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
+ queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, NetworkIntfId: sq.nniIntfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues,
TechProfileId: TrafficSched[0].TechProfileId}
@@ -543,6 +545,7 @@
"TrafficScheds": TrafficSched,
"device-id": f.deviceHandler.device.Id,
"intfID": sq.intfID,
+ "nniIntfID": sq.nniIntfID,
"onuID": sq.onuID,
"uniID": sq.uniID})
if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
@@ -564,7 +567,8 @@
"traffic-queues": trafficQueues,
"device-id": f.deviceHandler.device.Id})
queues := &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
- UniId: sq.uniID, PortNo: sq.uniPort,
+ NetworkIntfId: sq.nniIntfID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues,
TechProfileId: TrafficSched[0].TechProfileId}
if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx, queues); err != nil {
@@ -613,6 +617,7 @@
log.Fields{
"direction": sq.direction,
"intf-id": sq.intfID,
+ "nniIntfID": sq.nniIntfID,
"onu-id": sq.onuID,
"uni-id": sq.uniID,
"uni-port": sq.uniPort,
@@ -629,7 +634,8 @@
if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
- UniId: sq.uniID, PortNo: sq.uniPort,
+ NetworkIntfId: sq.nniIntfID,
+ UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: TrafficQueues,
TechProfileId: sq.tpID}); err != nil {
return olterrors.NewErrAdapter("unable-to-remove-traffic-queues-from-device",
@@ -799,7 +805,7 @@
}
// This function allocates tconts and GEM ports for an ONU
-func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) (uint32, []uint32, interface{}) {
+func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, nniIntfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) (uint32, []uint32, interface{}) {
var allocIDs []uint32
var allgemPortIDs []uint32
var gemPortIDs []uint32
@@ -847,7 +853,7 @@
switch tpInst := techProfileInstance.(type) {
case *tp_pb.TechProfileInstance:
if UsMeterID != 0 {
- sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+ sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
@@ -863,7 +869,7 @@
}
}
if DsMeterID != 0 {
- sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
+ sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
@@ -1907,7 +1913,7 @@
// clearResources clears pon resources in kv store and the device
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
- flowID uint64, portNum uint32, tpID uint32, sendDeleteGemRequest bool) error {
+ flowID uint64, portNum uint32, tpID uint32, sendDeleteGemRequest bool, nniIntfID uint32) error {
logger.Debugw(ctx, "clearing-resources", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "tpID": tpID})
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1996,7 +2002,7 @@
}
}
// Remove queues at OLT in upstream and downstream direction
- schedQ := schedQueue{tpInst: techprofileInst, direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum}
+ schedQ := schedQueue{tpInst: techprofileInst, direction: tp_pb.Direction_UPSTREAM, intfID: intfID, nniIntfID: nniIntfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum}
if err := f.RemoveQueues(ctx, schedQ); err != nil {
logger.Warn(ctx, err)
}
@@ -2095,7 +2101,7 @@
}
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
+func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string, nni_port uint32) error {
logger.Infow(ctx, "clear-flow-from-resource-manager",
log.Fields{
"flowDirection": flowDirection,
@@ -2159,7 +2165,7 @@
return err
}
- if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true); err != nil {
+ if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true, nni_port); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
"device-id": f.deviceHandler.device.Id,
@@ -2197,15 +2203,25 @@
if flows.HasGroup(flow) {
direction = Multicast
- return f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
} else if plt.IsUpstream(actionInfo[Output].(uint32)) {
direction = Upstream
} else {
direction = Downstream
}
+ var nni_port uint32
+ if direction == Upstream {
+ if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+ nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+ }
+ } else {
+ classifierInfo := make(map[string]interface{})
+ formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+ nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+ }
+
// Serialize flow removes on a per subscriber basis
- err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction)
+ err := f.clearFlowFromDeviceAndResourceManager(ctx, flow, direction, nni_port)
return err
}
@@ -2422,7 +2438,17 @@
return err
}
}
- return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, tpID, UsMeterID, DsMeterID, flowMetadata)
+
+ var nni_port uint32
+ if plt.IsUpstream(actionInfo[Output].(uint32)) {
+ if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+ nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+ }
+ } else {
+ nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4
+ }
+
+ return f.processAddFlow(ctx, intfID, nni_port, onuID, uniID, portNo, classifierInfo, actionInfo, flow, tpID, UsMeterID, DsMeterID, flowMetadata)
}
// handleFlowWithGroup adds multicast flow to the device.
@@ -2859,6 +2885,15 @@
flowContext := &flowContext{classifierInfo, actionInfo, flow, pbitToGem, gemToAes, intfID, onuID, uniID, portNo, allocID, gemPortID, tpID}
+ var nni_port uint32
+ if plt.IsUpstream(actionInfo[Output].(uint32)) {
+ if !plt.IsControllerBoundFlow(actionInfo[Output].(uint32)) {
+ nni_port = actionInfo[Output].(uint32) & 0x1f // convert e.g. 16777220 to port 4 (starting with 0)
+ }
+ } else {
+ nni_port = classifierInfo[InPort].(uint32) & 0x1f // convert e.g. 16777220 to port 4 (starting with 0)
+ }
+
if ipProto, ok := classifierInfo[IPProto]; ok {
if ipProto.(uint32) == IPProtoDhcp {
logger.Infow(ctx, "adding-dhcp-flow", log.Fields{
@@ -2872,7 +2907,7 @@
if err := f.addDHCPTrapFlow(ctx, flowContext); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
} else if ipProto.(uint32) == IgmpProto {
@@ -2885,7 +2920,7 @@
if err := f.addIGMPTrapFlow(ctx, flowContext); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
} else {
@@ -2909,7 +2944,7 @@
if err := f.addEthTypeBasedFlow(ctx, flowContext, vlanID, ethType.(uint32)); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
} else if ethType.(uint32) == PPPoEDEthType {
@@ -2924,7 +2959,7 @@
if err := f.addUpstreamTrapFlow(ctx, flowContext); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
}
@@ -2938,7 +2973,7 @@
if err := f.addUpstreamDataPathFlow(ctx, flowContext); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
} else if direction == tp_pb.Direction_DOWNSTREAM {
@@ -2951,7 +2986,7 @@
if err := f.addDownstreamDataPathFlow(ctx, flowContext); err != nil {
logger.Warn(ctx, err)
logger.Errorw(ctx, "reverting-scheduler-and-queue-for-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID, "flow-id": flow.Id, "tp-id": tpID})
- _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false)
+ _ = f.clearResources(ctx, intfID, int32(onuID), int32(uniID), flow.Id, portNo, tpID, false, nni_port)
return err
}
} else {
@@ -3234,6 +3269,24 @@
}
logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
return intfID, nil
+ } else if portType == voltha.Port_ETHERNET_UNI {
+ if _, ok := action[Output]; ok {
+ intfID, err := plt.IntfIDFromNniPortNum(ctx, action[Output].(uint32))
+ if err != nil {
+ logger.Debugw(ctx, "invalid-action-port-number",
+ log.Fields{
+ "port-number": action[Output].(uint32),
+ "err": err})
+ return uint32(0), nil
+ }
+ logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
+ return intfID, nil
+ } else {
+ logger.Debugw(ctx, "action-port-number-empty",
+ log.Fields{
+ "action": action})
+ return uint32(0), nil
+ }
}
return uint32(0), nil
}