[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds
Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fd7eca0..ab7c35f 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -22,14 +22,13 @@
"fmt"
pb "github.com/opencord/bbsim/api/bbsim"
"github.com/opencord/bbsim/internal/bbsim/alarmsim"
- bbsim "github.com/opencord/bbsim/internal/bbsim/types"
- me "github.com/opencord/omci-lib-go/generated"
- "strconv"
-
"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+ bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+ me "github.com/opencord/omci-lib-go/generated"
"net"
+ "strconv"
"time"
"github.com/google/gopacket/layers"
@@ -102,11 +101,9 @@
// PortNo comes with flows and it's used when sending packetIndications,
// There is one PortNo per UNI Port, for now we're only storing the first one
// FIXME add support for multiple UNIs (each UNI has a different PortNo)
- PortNo uint32
- // deprecated (gemPort is on a Service basis)
- GemPortAdded bool
- Flows []FlowKey
- FlowIds []uint64 // keep track of the flows we currently have in the ONU
+ PortNo uint32
+ Flows []FlowKey
+ FlowIds []uint64 // keep track of the flows we currently have in the ONU
OperState *fsm.FSM
SerialNumber *openolt.SerialNumber
@@ -153,7 +150,7 @@
ActiveImageEntityId: 0, // when we start the SoftwareImage with ID 0 is active and committed
CommittedImageEntityId: 0,
}
- o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
+ o.SerialNumber = NewSN(olt.ID, pon.ID, id)
// NOTE this state machine is used to track the operational
// state as requested by VOLTHA
o.OperState = getOperStateFSM(func(e *fsm.Event) {
@@ -217,6 +214,18 @@
o.Channel <- msg
},
"enter_enabled": func(event *fsm.Event) {
+
+ if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "SerialNumber": o.Sn(),
+ }).Errorf("received-omci-with-sn-%s", common.OnuSnToString(sn))
+ return
+ } else {
+ o.PonPort.storeOnuId(o.ID, o.SerialNumber)
+ }
+
msg := bbsim.Message{
Type: bbsim.OnuIndication,
Data: bbsim.OnuIndicationMessage{
@@ -235,9 +244,11 @@
"enter_disabled": func(event *fsm.Event) {
// clean the ONU state
- o.GemPortAdded = false
o.PortNo = 0
o.Flows = []FlowKey{}
+ o.PonPort.removeOnuId(o.ID)
+ o.PonPort.removeAllocId(o.SerialNumber)
+ o.PonPort.removeGemPortBySn(o.SerialNumber)
// set the OperState to disabled
if err := o.OperState.Event("disable"); err != nil {
@@ -438,7 +449,7 @@
}).Debug("Stopped handling ONU Indication Channel")
}
-func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
+func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
sn := new(openolt.SerialNumber)
@@ -477,10 +488,8 @@
}
func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
- // NOTE voltha returns an ID, but if we use that ID then it complains:
- // expected_onu_id: 1, received_onu_id: 1024, event: ONU-id-mismatch, can happen if both voltha and the olt rebooted
- // so we're using the internal ID that is 1
- // o.ID = msg.OnuID
+ // NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
+ // and stored in the Onu struct via onu.SetID
indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
IntfId: o.PonPortID,
@@ -495,11 +504,12 @@
return
}
onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "OperState": msg.OperState.String(),
- "AdminState": msg.OperState.String(),
- "OnuSn": o.Sn(),
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "VolthaOnuId": msg.OnuID,
+ "OperState": msg.OperState.String(),
+ "AdminState": msg.OperState.String(),
+ "OnuSn": o.Sn(),
}).Debug("Sent Indication_OnuInd")
}
@@ -707,10 +717,7 @@
case omci.GetRequestType:
responsePkt, _ = omcilib.CreateGetResponse(omciPkt, omciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId, o.CommittedImageEntityId)
case omci.SetRequestType:
- if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg); errResp == nil {
- o.MibDataSync++
- }
-
+ success := true
msgObj, _ := omcilib.ParseSetRequest(omciPkt)
switch msgObj.EntityClass {
case me.PhysicalPathTerminationPointEthernetUniClassID:
@@ -734,12 +741,101 @@
}
o.Channel <- msg
}
+ case me.TContClassID:
+ allocId := msgObj.Attributes["AllocId"].(uint16)
+
+ // if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
+ // otherwise we are adding it
+ if allocId == 255 || allocId == 65535 {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "TContId": msgObj.EntityInstance,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Trace("freeing-alloc-id-via-omci")
+ o.PonPort.removeAllocId(o.SerialNumber)
+ } else {
+ if used, sn := o.PonPort.isAllocIdAllocated(allocId); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+ success = false
+ } else {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "TContId": msgObj.EntityInstance,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Trace("storing-alloc-id-via-omci")
+ o.PonPort.storeAllocId(allocId, o.SerialNumber)
+ }
+ }
+
+ }
+
+ if success {
+ if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+ o.MibDataSync++
+ }
+ } else {
+ responsePkt, _ = omcilib.CreateSetResponse(omciPkt, omciMsg, me.AttributeFailure)
}
case omci.CreateRequestType:
- if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg); errResp == nil {
- o.MibDataSync++
+ // check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
+ var used bool
+ var sn *openolt.SerialNumber
+ msgObj, err := omcilib.ParseCreateRequest(omciPkt)
+ if err == nil {
+ if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+ if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+ } else {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Trace("storing-gem-port-id-via-omci")
+ o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
+ }
+ }
+ }
+
+ // if the gemPort is valid then increment the MDS and return a successful response
+ // otherwise fail the request
+ // for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
+ // validation this check will need to be rewritten
+ if !used {
+ if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+ o.MibDataSync++
+ }
+ } else {
+ responsePkt, _ = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.ProcessingError)
}
case omci.DeleteRequestType:
+ msgObj, err := omcilib.ParseDeleteRequest(omciPkt)
+ if err == nil {
+ if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Trace("freeing-gem-port-id-via-omci")
+ o.PonPort.removeGemPort(msgObj.EntityInstance)
+ }
+ }
+
if responsePkt, errResp = omcilib.CreateDeleteResponse(omciPkt, omciMsg); errResp == nil {
o.MibDataSync++
}
@@ -1037,6 +1133,7 @@
func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
onuLogger.WithFields(log.Fields{
+ "AllocId": msg.Flow.AllocId,
"Cookie": msg.Flow.Cookie,
"DstPort": msg.Flow.Classifier.DstPort,
"FlowId": msg.Flow.FlowId,
@@ -1123,7 +1220,6 @@
"OnuId": o.ID,
"SerialNumber": o.Sn(),
}).Info("Resetting GemPort")
- o.GemPortAdded = false
// check if ONU delete is performed and
// terminate the ONU's ProcessOnuMessages Go routine
@@ -1245,25 +1341,7 @@
if o.seqNumber > 290 {
// NOTE we are done with the MIB Upload (290 is the number of messages the omci-sim library will respond to)
- galEnet, _ := omcilib.CreateGalEnetRequest(o.getNextTid(false))
- sendOmciMsg(galEnet, o.PonPortID, o.ID, o.SerialNumber, "CreateGalEnetRequest", client)
- } else {
- mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
- sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
- }
- case omci.CreateResponseType:
- // NOTE Creating a GemPort,
- // BBsim actually doesn't care about the values, so we can do we want with the parameters
- // In the same way we can create a GemPort even without setting up UNIs/TConts/...
- // but we need the GemPort to trigger the state change
-
- if !o.GemPortAdded {
- // NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
- // thus we send this request only once
- gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
- sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
- o.GemPortAdded = true
- } else {
+ // start sending the flows, we don't care about the OMCI setup in BBR, just that a lot of messages can go through
if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
onuLogger.WithFields(log.Fields{
"OnuId": o.ID,
@@ -1271,6 +1349,9 @@
"OnuSn": o.Sn(),
}).Errorf("Error while transitioning ONU State %v", err)
}
+ } else {
+ mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
+ sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
}
}
}
@@ -1290,14 +1371,16 @@
UniId: int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
FlowId: uint64(o.ID),
FlowType: "downstream",
- AllocId: int32(0),
NetworkIntfId: int32(0),
- GemportId: int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
Classifier: &classifierProto,
Action: &actionProto,
Priority: int32(100),
Cookie: uint64(o.ID),
- PortNo: uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+ PortNo: o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+ // AllocId and GemPorts need to be unique per PON
+ // for now use the ONU-ID, will need to change once we support multiple UNIs
+ AllocId: int32(o.ID),
+ GemportId: int32(o.ID),
}
if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1307,6 +1390,7 @@
"FlowId": downstreamFlow.FlowId,
"PortNo": downstreamFlow.PortNo,
"SerialNumber": common.OnuSnToString(o.SerialNumber),
+ "Err": err,
}).Fatalf("Failed to add EAPOL Flow")
}
log.WithFields(log.Fields{
@@ -1338,14 +1422,16 @@
UniId: int32(0), // FIXME do not hardcode this
FlowId: uint64(o.ID),
FlowType: "downstream",
- AllocId: int32(0),
NetworkIntfId: int32(0),
- GemportId: int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
Classifier: &classifierProto,
Action: &actionProto,
Priority: int32(100),
Cookie: uint64(o.ID),
- PortNo: uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+ PortNo: o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+ // AllocId and GemPorts need to be unique per PON
+ // for now use the ONU-ID, will need to change once we support multiple UNIs
+ AllocId: int32(o.ID),
+ GemportId: int32(o.ID),
}
if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1355,6 +1441,7 @@
"FlowId": downstreamFlow.FlowId,
"PortNo": downstreamFlow.PortNo,
"SerialNumber": common.OnuSnToString(o.SerialNumber),
+ "Err": err,
}).Fatalf("Failed to send DHCP Flow")
}
log.WithFields(log.Fields{