[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds

Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fd7eca0..ab7c35f 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -22,14 +22,13 @@
 	"fmt"
 	pb "github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsim/alarmsim"
-	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
-	me "github.com/opencord/omci-lib-go/generated"
-	"strconv"
-
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
 	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+	me "github.com/opencord/omci-lib-go/generated"
 	"net"
+	"strconv"
 	"time"
 
 	"github.com/google/gopacket/layers"
@@ -102,11 +101,9 @@
 	// PortNo comes with flows and it's used when sending packetIndications,
 	// There is one PortNo per UNI Port, for now we're only storing the first one
 	// FIXME add support for multiple UNIs (each UNI has a different PortNo)
-	PortNo uint32
-	// deprecated (gemPort is on a Service basis)
-	GemPortAdded bool
-	Flows        []FlowKey
-	FlowIds      []uint64 // keep track of the flows we currently have in the ONU
+	PortNo  uint32
+	Flows   []FlowKey
+	FlowIds []uint64 // keep track of the flows we currently have in the ONU
 
 	OperState    *fsm.FSM
 	SerialNumber *openolt.SerialNumber
@@ -153,7 +150,7 @@
 		ActiveImageEntityId:           0, // when we start the SoftwareImage with ID 0 is active and committed
 		CommittedImageEntityId:        0,
 	}
-	o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
+	o.SerialNumber = NewSN(olt.ID, pon.ID, id)
 	// NOTE this state machine is used to track the operational
 	// state as requested by VOLTHA
 	o.OperState = getOperStateFSM(func(e *fsm.Event) {
@@ -217,6 +214,18 @@
 				o.Channel <- msg
 			},
 			"enter_enabled": func(event *fsm.Event) {
+
+				if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"SerialNumber": o.Sn(),
+					}).Errorf("received-omci-with-sn-%s", common.OnuSnToString(sn))
+					return
+				} else {
+					o.PonPort.storeOnuId(o.ID, o.SerialNumber)
+				}
+
 				msg := bbsim.Message{
 					Type: bbsim.OnuIndication,
 					Data: bbsim.OnuIndicationMessage{
@@ -235,9 +244,11 @@
 			"enter_disabled": func(event *fsm.Event) {
 
 				// clean the ONU state
-				o.GemPortAdded = false
 				o.PortNo = 0
 				o.Flows = []FlowKey{}
+				o.PonPort.removeOnuId(o.ID)
+				o.PonPort.removeAllocId(o.SerialNumber)
+				o.PonPort.removeGemPortBySn(o.SerialNumber)
 
 				// set the OperState to disabled
 				if err := o.OperState.Event("disable"); err != nil {
@@ -438,7 +449,7 @@
 	}).Debug("Stopped handling ONU Indication Channel")
 }
 
-func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
+func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
 
 	sn := new(openolt.SerialNumber)
 
@@ -477,10 +488,8 @@
 }
 
 func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
-	// NOTE voltha returns an ID, but if we use that ID then it complains:
-	// expected_onu_id: 1, received_onu_id: 1024, event: ONU-id-mismatch, can happen if both voltha and the olt rebooted
-	// so we're using the internal ID that is 1
-	// o.ID = msg.OnuID
+	// NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
+	// and stored in the Onu struct via onu.SetID
 
 	indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
 		IntfId:       o.PonPortID,
@@ -495,11 +504,12 @@
 		return
 	}
 	onuLogger.WithFields(log.Fields{
-		"IntfId":     o.PonPortID,
-		"OnuId":      o.ID,
-		"OperState":  msg.OperState.String(),
-		"AdminState": msg.OperState.String(),
-		"OnuSn":      o.Sn(),
+		"IntfId":      o.PonPortID,
+		"OnuId":       o.ID,
+		"VolthaOnuId": msg.OnuID,
+		"OperState":   msg.OperState.String(),
+		"AdminState":  msg.OperState.String(),
+		"OnuSn":       o.Sn(),
 	}).Debug("Sent Indication_OnuInd")
 
 }
@@ -707,10 +717,7 @@
 	case omci.GetRequestType:
 		responsePkt, _ = omcilib.CreateGetResponse(omciPkt, omciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId, o.CommittedImageEntityId)
 	case omci.SetRequestType:
-		if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
-		}
-
+		success := true
 		msgObj, _ := omcilib.ParseSetRequest(omciPkt)
 		switch msgObj.EntityClass {
 		case me.PhysicalPathTerminationPointEthernetUniClassID:
@@ -734,12 +741,101 @@
 				}
 				o.Channel <- msg
 			}
+		case me.TContClassID:
+			allocId := msgObj.Attributes["AllocId"].(uint16)
+
+			// if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
+			// otherwise we are adding it
+			if allocId == 255 || allocId == 65535 {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"TContId":      msgObj.EntityInstance,
+					"AllocId":      allocId,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-alloc-id-via-omci")
+				o.PonPort.removeAllocId(o.SerialNumber)
+			} else {
+				if used, sn := o.PonPort.isAllocIdAllocated(allocId); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+					success = false
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"TContId":      msgObj.EntityInstance,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-alloc-id-via-omci")
+					o.PonPort.storeAllocId(allocId, o.SerialNumber)
+				}
+			}
+
+		}
+
+		if success {
+			if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateSetResponse(omciPkt, omciMsg, me.AttributeFailure)
 		}
 	case omci.CreateRequestType:
-		if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
+		// check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
+		var used bool
+		var sn *openolt.SerialNumber
+		msgObj, err := omcilib.ParseCreateRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-gem-port-id-via-omci")
+					o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
+				}
+			}
+		}
+
+		// if the gemPort is valid then increment the MDS and return a successful response
+		// otherwise fail the request
+		// for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
+		// validation this check will need to be rewritten
+		if !used {
+			if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.ProcessingError)
 		}
 	case omci.DeleteRequestType:
+		msgObj, err := omcilib.ParseDeleteRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"GemPortId":    msgObj.EntityInstance,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-gem-port-id-via-omci")
+				o.PonPort.removeGemPort(msgObj.EntityInstance)
+			}
+		}
+
 		if responsePkt, errResp = omcilib.CreateDeleteResponse(omciPkt, omciMsg); errResp == nil {
 			o.MibDataSync++
 		}
@@ -1037,6 +1133,7 @@
 
 func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
 	onuLogger.WithFields(log.Fields{
+		"AllocId":           msg.Flow.AllocId,
 		"Cookie":            msg.Flow.Cookie,
 		"DstPort":           msg.Flow.Classifier.DstPort,
 		"FlowId":            msg.Flow.FlowId,
@@ -1123,7 +1220,6 @@
 			"OnuId":        o.ID,
 			"SerialNumber": o.Sn(),
 		}).Info("Resetting GemPort")
-		o.GemPortAdded = false
 
 		// check if ONU delete is performed and
 		// terminate the ONU's ProcessOnuMessages Go routine
@@ -1245,25 +1341,7 @@
 
 		if o.seqNumber > 290 {
 			// NOTE we are done with the MIB Upload (290 is the number of messages the omci-sim library will respond to)
-			galEnet, _ := omcilib.CreateGalEnetRequest(o.getNextTid(false))
-			sendOmciMsg(galEnet, o.PonPortID, o.ID, o.SerialNumber, "CreateGalEnetRequest", client)
-		} else {
-			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
-			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
-		}
-	case omci.CreateResponseType:
-		// NOTE Creating a GemPort,
-		// BBsim actually doesn't care about the values, so we can do we want with the parameters
-		// In the same way we can create a GemPort even without setting up UNIs/TConts/...
-		// but we need the GemPort to trigger the state change
-
-		if !o.GemPortAdded {
-			// NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
-			// thus we send this request only once
-			gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
-			sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
-			o.GemPortAdded = true
-		} else {
+			// start sending the flows, we don't care about the OMCI setup in BBR, just that a lot of messages can go through
 			if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
 				onuLogger.WithFields(log.Fields{
 					"OnuId":  o.ID,
@@ -1271,6 +1349,9 @@
 					"OnuSn":  o.Sn(),
 				}).Errorf("Error while transitioning ONU State %v", err)
 			}
+		} else {
+			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
+			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
 		}
 	}
 }
@@ -1290,14 +1371,16 @@
 		UniId:         int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1307,6 +1390,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to add EAPOL Flow")
 	}
 	log.WithFields(log.Fields{
@@ -1338,14 +1422,16 @@
 		UniId:         int32(0), // FIXME do not hardcode this
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1355,6 +1441,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to send DHCP Flow")
 	}
 	log.WithFields(log.Fields{