[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds

Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index f9ef936..87b9da4 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -90,6 +90,13 @@
 
 	OpenoltStream openolt.Openolt_EnableIndicationServer
 	enablePerf    bool
+
+	// Allocated Resources
+	// this data are to verify that the openolt adapter does not duplicate resources
+	AllocIDsLock   sync.RWMutex
+	AllocIDs       map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[AllocIds]map[FlowId]bool
+	GemPortIDsLock sync.RWMutex
+	GemPortIDs     map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[GemPortIDs]map[FlowId]bool
 }
 
 var olt OltDevice
@@ -123,6 +130,8 @@
 		PortStatsInterval:   options.Olt.PortStatsInterval,
 		dhcpServer:          dhcp.NewDHCPServer(),
 		PreviouslyConnected: false,
+		AllocIDs:            make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+		GemPortIDs:          make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
 	}
 
 	if val, ok := ControlledActivationModes[options.BBSim.ControlledActivation]; ok {
@@ -149,6 +158,10 @@
 				oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst)
 			},
 			"enter_initialized": func(e *fsm.Event) { olt.InitOlt() },
+			"enter_deleted": func(e *fsm.Event) {
+				// remove all the resource allocations
+				olt.clearAllResources()
+			},
 		},
 	)
 
@@ -169,6 +182,11 @@
 
 	// create PON ports
 	for i := 0; i < olt.NumPon; i++ {
+
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
 		p := CreatePonPort(&olt, uint32(i))
 
 		// create ONU devices
@@ -253,6 +271,12 @@
 		// in-band management
 		o.Nnis[0].OperState.SetState("down")
 	}
+
+	for ponId := range o.Pons {
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+	}
 }
 
 func (o *OltDevice) RestartOLT() error {
@@ -822,16 +846,14 @@
 	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
 
 	pon, _ := o.GetPonById(onu.IntfId)
+
+	// Initialize the resource maps for this ONU
+	olt.AllocIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+	olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+
 	_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
 	_onu.SetID(onu.OnuId)
 
-	if err := _onu.OperState.Event("enable"); err != nil {
-		oltLogger.WithFields(log.Fields{
-			"IntfId": _onu.PonPortID,
-			"OnuSn":  _onu.Sn(),
-			"OnuId":  _onu.ID,
-		}).Infof("Failed to transition ONU.OperState to enabled state: %s", err.Error())
-	}
 	if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
 		oltLogger.WithFields(log.Fields{
 			"IntfId": _onu.PonPortID,
@@ -845,7 +867,7 @@
 	return new(openolt.Empty), nil
 }
 
-func (o *OltDevice) DeactivateOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
+func (o *OltDevice) DeactivateOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
 	oltLogger.Error("DeactivateOnu not implemented")
 	return new(openolt.Empty), nil
 }
@@ -1056,6 +1078,21 @@
 			}
 		}
 
+		// validate that the flow reference correct IDs (Alloc, Gem)
+		if err := o.validateFlow(flow); err != nil {
+			oltLogger.WithFields(log.Fields{
+				"OnuId":        flow.OnuId,
+				"IntfId":       flow.AccessIntfId,
+				"Flow":         flow,
+				"SerialNumber": onu.Sn(),
+				"err":          err,
+			}).Error("invalid-flow-for-onu")
+			return nil, err
+		}
+
+		o.storeGemPortId(flow)
+		o.storeAllocId(flow)
+
 		msg := types.Message{
 			Type: types.FlowAdd,
 			Data: types.OnuFlowUpdateMessage{
@@ -1074,10 +1111,22 @@
 func (o *OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
 
 	oltLogger.WithFields(log.Fields{
-		"FlowId":   flow.FlowId,
-		"FlowType": flow.FlowType,
+		"AllocId":       flow.AllocId,
+		"Cookie":        flow.Cookie,
+		"FlowId":        flow.FlowId,
+		"FlowType":      flow.FlowType,
+		"GemportId":     flow.GemportId,
+		"IntfId":        flow.AccessIntfId,
+		"OnuId":         flow.OnuId,
+		"PortNo":        flow.PortNo,
+		"UniID":         flow.UniId,
+		"ReplicateFlow": flow.ReplicateFlow,
+		"PbitToGemport": flow.PbitToGemport,
 	}).Debug("OLT receives FlowRemove")
 
+	olt.freeGemPortId(flow)
+	olt.freeAllocId(flow)
+
 	if !o.enablePerf { // remove only if flow were stored
 		flowKey := FlowKey{
 			ID:        flow.FlowId,
@@ -1177,11 +1226,6 @@
 
 func (o *OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
 
-	oltLogger.WithFields(log.Fields{
-		"oltId":    o.ID,
-		"PonPorts": o.NumPon,
-	}).Info("OLT receives GetDeviceInfo call from VOLTHA")
-
 	intfIDs := []uint32{}
 	for i := 0; i < o.NumPon; i++ {
 		intfIDs = append(intfIDs, uint32(i))
@@ -1538,3 +1582,179 @@
 func (o *OltDevice) GetOnuStatistics(ctx context.Context, in *openolt.Onu) (*openolt.OnuStatistics, error) {
 	return &openolt.OnuStatistics{}, nil
 }
+
+func (o *OltDevice) storeAllocId(flow *openolt.Flow) {
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":  flow.AccessIntfId,
+		"OnuId":   flow.OnuId,
+		"PortNo":  flow.PortNo,
+		"AllocId": flow.AllocId,
+	}).Trace("storing-alloc-id-via-flow")
+
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId] = make(map[uint64]bool)
+	}
+	o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeAllocId(flow *openolt.Flow) {
+	// if this is the last flow referencing the AllocId then remove it
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-alloc-id-via-flow")
+
+	// NOTE look at the freeGemPortId implementation for comments and context
+	for ponId, ponValues := range o.AllocIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for allocId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.AllocIDs[ponId][onuId][uniId][allocId], flow.FlowId)
+						}
+						// if that was the last flow for a particular allocId, remove the entire allocId
+						if len(o.AllocIDs[ponId][onuId][uniId][allocId]) == 0 {
+							delete(o.AllocIDs[ponId][onuId][uniId], allocId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func (o *OltDevice) storeGemPortId(flow *openolt.Flow) {
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("storing-gem-port-id-via-flow")
+
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId] = make(map[uint64]bool)
+	}
+	o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
+	// if this is the last flow referencing the GemPort then remove it
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-gem-port-id-via-flow")
+
+	// NOTE that this loop is not very performant, it would be better if the flow carries
+	// the same information that it carries during a FlowAdd. If so we can directly remove
+	// items from the map
+
+	//delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId], flow.FlowId)
+	//if len(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]) == 0 {
+	//	delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo], flow.GemportId)
+	//}
+
+	// NOTE this loop assumes that flow IDs are unique per device
+	for ponId, ponValues := range o.GemPortIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for gemId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.GemPortIDs[ponId][onuId][uniId][gemId], flow.FlowId)
+						}
+						// if that was the last flow for a particular gem, remove the entire gem
+						if len(o.GemPortIDs[ponId][onuId][uniId][gemId]) == 0 {
+							delete(o.GemPortIDs[ponId][onuId][uniId], gemId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+// validateFlow checks that:
+// - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
+// - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
+func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
+
+	// validate gemPort
+	o.GemPortIDsLock.RLock()
+	allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
+	o.GemPortIDsLock.RUnlock()
+	for onuId, onu := range allocatedGems {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for gem := range uni {
+				if gem == flow.GemportId {
+					return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d", gem, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	o.AllocIDsLock.RLock()
+	allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
+	o.AllocIDsLock.RUnlock()
+	for onuId, onu := range allocatedAllocIds {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for allocId := range uni {
+				if allocId == flow.AllocId {
+					return fmt.Errorf("allocId-%d-already-in-use-on-uni-%d-onu-%d", allocId, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	return nil
+}
+
+// clearAllResources is invoked up OLT Reboot to remove all the allocated
+// GemPorts, AllocId and ONU-IDs across the PONs
+func (o *OltDevice) clearAllResources() {
+
+	// remove the resources received via flows
+	o.GemPortIDsLock.Lock()
+	o.GemPortIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.GemPortIDsLock.Unlock()
+	o.AllocIDsLock.Lock()
+	o.AllocIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.AllocIDsLock.Unlock()
+
+	// remove the resources received via OMCI
+	for _, pon := range o.Pons {
+		pon.removeAllAllocIds()
+		pon.removeAllGemPorts()
+		pon.removeAllOnuIds()
+	}
+}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 154942b..acbe7a7 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -28,15 +28,27 @@
 
 func createMockOlt(numPon int, numOnu int, services []ServiceIf) *OltDevice {
 	olt := &OltDevice{
-		ID: 0,
+		ID:         0,
+		AllocIDs:   make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+		GemPortIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
 	}
 
 	for i := 0; i < numPon; i++ {
+
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
 		pon := PonPort{
 			ID: uint32(i),
 		}
 
 		for j := 0; j < numOnu; j++ {
+
+			// initialize the resource maps for every ONU and the first UNI
+			olt.AllocIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+			olt.GemPortIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+
 			onuId := uint32(i + j)
 			onu := Onu{
 				ID:        onuId,
@@ -51,7 +63,7 @@
 				onu.Services = append(onu.Services, service)
 			}
 
-			onu.SerialNumber = onu.NewSN(olt.ID, pon.ID, onu.ID)
+			onu.SerialNumber = NewSN(olt.ID, pon.ID, onu.ID)
 			pon.Onus = append(pon.Onus, &onu)
 		}
 		olt.Pons = append(olt.Pons, &pon)
@@ -227,3 +239,209 @@
 	assert.Equal(t, err, nil)
 	assert.Equal(t, found.Sn(), onu1.Sn())
 }
+
+func Test_Olt_storeGemPortId(t *testing.T) {
+
+	const (
+		pon  = 1
+		onu  = 1
+		uni  = 16
+		gem1 = 1024
+		gem2 = 1025
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	// add a first flow on the ONU
+	flow1 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       1,
+		GemportId:    gem1,
+	}
+
+	olt.storeGemPortId(flow1)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1)       // we have 1 gem port
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1) // and one flow referencing it
+
+	// add a second flow on the ONU (same gem)
+	flow2 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       2,
+		GemportId:    gem1,
+	}
+
+	olt.storeGemPortId(flow2)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1)       // we have 1 gem port
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // and two flows referencing it
+
+	// add a third flow on the ONU (different gem)
+	flow3 := &openolt.Flow{
+		AccessIntfId: pon,
+		OnuId:        onu,
+		PortNo:       uni,
+		FlowId:       2,
+		GemportId:    1025,
+	}
+
+	olt.storeGemPortId(flow3)
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2)       // we have 2 gem ports
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // two flows referencing the first one
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem2]), 1) // and one flow referencing the second one
+}
+
+func Test_Olt_freeGemPortId(t *testing.T) {
+	const (
+		pon   = 1
+		onu   = 1
+		uni   = 16
+		gem1  = 1024
+		gem2  = 1025
+		flow1 = 1
+		flow2 = 2
+		flow3 = 3
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	olt.GemPortIDs[pon][onu][uni] = make(map[int32]map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem1] = make(map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem1][flow1] = true
+	olt.GemPortIDs[pon][onu][uni][gem1][flow2] = true
+	olt.GemPortIDs[pon][onu][uni][gem2] = make(map[uint64]bool)
+	olt.GemPortIDs[pon][onu][uni][gem2][flow3] = true
+
+	// remove one flow on the first gem, check that the gem is still allocated as there is still a flow referencing it
+	// NOTE that the flow remove only carries the flow ID, no other information
+	flowGem1 := &openolt.Flow{
+		FlowId: flow1,
+	}
+
+	olt.freeGemPortId(flowGem1)
+	// we still have two unis in the map
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2)
+
+	// we should now have a single gem referenced on this UNI
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1, "gemport-not-removed")
+
+	// the gem should still reference flow 2
+	assert.Equal(t, olt.GemPortIDs[pon][onu][uni][gem1][flow2], true)
+	// but should not reference flow1
+	_, flow1Exists := olt.GemPortIDs[pon][onu][uni][gem1][flow1]
+	assert.Equal(t, flow1Exists, false)
+
+	// this is the only flow remaining on this gem, the gem should be removed
+	flowGem2 := &openolt.Flow{
+		FlowId: flow2,
+	}
+	olt.freeGemPortId(flowGem2)
+
+	// we should now have a single gem referenced on this UNI
+	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1, "gemport-not-removed")
+
+	// and it should be gem2
+	_, gem1exists := olt.GemPortIDs[pon][onu][uni][gem1]
+	assert.Equal(t, gem1exists, false)
+	_, gem2exists := olt.GemPortIDs[pon][onu][uni][gem2]
+	assert.Equal(t, gem2exists, true)
+}
+
+func Test_Olt_validateFlow(t *testing.T) {
+
+	const (
+		pon0            = 0
+		pon1            = 1
+		onu0            = 0
+		onu1            = 1
+		uniPort         = 0
+		usedGemIdPon0   = 1024
+		usedGemIdPon1   = 1025
+		usedAllocIdPon0 = 1
+		usedAllocIdPon1 = 2
+		flowId          = 1
+	)
+
+	numPon := 2
+	numOnu := 2
+
+	olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+	olt.GemPortIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.GemPortIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+
+	olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0] = make(map[uint64]bool)
+	olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0][flowId] = true
+	olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1] = make(map[uint64]bool)
+	olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1][flowId] = true
+
+	olt.AllocIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.AllocIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+	olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0] = make(map[uint64]bool)
+	olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0][flowId] = true
+	olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1] = make(map[uint64]bool)
+	olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1][flowId] = true
+
+	// a GemPortID can be referenced across multiple flows on the same ONU
+	validGemFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu0,
+		GemportId:    usedGemIdPon0,
+	}
+
+	err := olt.validateFlow(validGemFlow)
+	assert.NilError(t, err)
+
+	// a GemPortID can NOT be referenced across different ONUs on the same PON
+	invalidGemFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu1,
+		GemportId:    usedGemIdPon0,
+	}
+	err = olt.validateFlow(invalidGemFlow)
+	assert.Error(t, err, "gem-1024-already-in-use-on-uni-0-onu-0")
+
+	// if a flow reference the same GEM on a different PON it's a valid flow
+	invalidGemDifferentPonFlow := &openolt.Flow{
+		AccessIntfId: pon1,
+		OnuId:        onu1,
+		GemportId:    usedGemIdPon0,
+	}
+	err = olt.validateFlow(invalidGemDifferentPonFlow)
+	assert.NilError(t, err)
+
+	// an allocId can be referenced across multiple flows on the same ONU
+	validAllocFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu0,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(validAllocFlow)
+	assert.NilError(t, err)
+
+	// an allocId can NOT be referenced across different ONUs on the same PON
+	invalidAllocFlow := &openolt.Flow{
+		AccessIntfId: pon0,
+		OnuId:        onu1,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(invalidAllocFlow)
+	assert.Error(t, err, "allocId-1-already-in-use-on-uni-0-onu-0")
+
+	// if a flow reference the same AllocId on a different PON it's a valid flow
+	invalidAllocDifferentPonFlow := &openolt.Flow{
+		AccessIntfId: pon1,
+		OnuId:        onu1,
+		AllocId:      usedAllocIdPon0,
+	}
+	err = olt.validateFlow(invalidAllocDifferentPonFlow)
+	assert.NilError(t, err)
+}
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fd7eca0..ab7c35f 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -22,14 +22,13 @@
 	"fmt"
 	pb "github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsim/alarmsim"
-	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
-	me "github.com/opencord/omci-lib-go/generated"
-	"strconv"
-
 	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
 	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
 	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+	me "github.com/opencord/omci-lib-go/generated"
 	"net"
+	"strconv"
 	"time"
 
 	"github.com/google/gopacket/layers"
@@ -102,11 +101,9 @@
 	// PortNo comes with flows and it's used when sending packetIndications,
 	// There is one PortNo per UNI Port, for now we're only storing the first one
 	// FIXME add support for multiple UNIs (each UNI has a different PortNo)
-	PortNo uint32
-	// deprecated (gemPort is on a Service basis)
-	GemPortAdded bool
-	Flows        []FlowKey
-	FlowIds      []uint64 // keep track of the flows we currently have in the ONU
+	PortNo  uint32
+	Flows   []FlowKey
+	FlowIds []uint64 // keep track of the flows we currently have in the ONU
 
 	OperState    *fsm.FSM
 	SerialNumber *openolt.SerialNumber
@@ -153,7 +150,7 @@
 		ActiveImageEntityId:           0, // when we start the SoftwareImage with ID 0 is active and committed
 		CommittedImageEntityId:        0,
 	}
-	o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
+	o.SerialNumber = NewSN(olt.ID, pon.ID, id)
 	// NOTE this state machine is used to track the operational
 	// state as requested by VOLTHA
 	o.OperState = getOperStateFSM(func(e *fsm.Event) {
@@ -217,6 +214,18 @@
 				o.Channel <- msg
 			},
 			"enter_enabled": func(event *fsm.Event) {
+
+				if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"SerialNumber": o.Sn(),
+					}).Errorf("received-omci-with-sn-%s", common.OnuSnToString(sn))
+					return
+				} else {
+					o.PonPort.storeOnuId(o.ID, o.SerialNumber)
+				}
+
 				msg := bbsim.Message{
 					Type: bbsim.OnuIndication,
 					Data: bbsim.OnuIndicationMessage{
@@ -235,9 +244,11 @@
 			"enter_disabled": func(event *fsm.Event) {
 
 				// clean the ONU state
-				o.GemPortAdded = false
 				o.PortNo = 0
 				o.Flows = []FlowKey{}
+				o.PonPort.removeOnuId(o.ID)
+				o.PonPort.removeAllocId(o.SerialNumber)
+				o.PonPort.removeGemPortBySn(o.SerialNumber)
 
 				// set the OperState to disabled
 				if err := o.OperState.Event("disable"); err != nil {
@@ -438,7 +449,7 @@
 	}).Debug("Stopped handling ONU Indication Channel")
 }
 
-func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
+func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
 
 	sn := new(openolt.SerialNumber)
 
@@ -477,10 +488,8 @@
 }
 
 func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
-	// NOTE voltha returns an ID, but if we use that ID then it complains:
-	// expected_onu_id: 1, received_onu_id: 1024, event: ONU-id-mismatch, can happen if both voltha and the olt rebooted
-	// so we're using the internal ID that is 1
-	// o.ID = msg.OnuID
+	// NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
+	// and stored in the Onu struct via onu.SetID
 
 	indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
 		IntfId:       o.PonPortID,
@@ -495,11 +504,12 @@
 		return
 	}
 	onuLogger.WithFields(log.Fields{
-		"IntfId":     o.PonPortID,
-		"OnuId":      o.ID,
-		"OperState":  msg.OperState.String(),
-		"AdminState": msg.OperState.String(),
-		"OnuSn":      o.Sn(),
+		"IntfId":      o.PonPortID,
+		"OnuId":       o.ID,
+		"VolthaOnuId": msg.OnuID,
+		"OperState":   msg.OperState.String(),
+		"AdminState":  msg.OperState.String(),
+		"OnuSn":       o.Sn(),
 	}).Debug("Sent Indication_OnuInd")
 
 }
@@ -707,10 +717,7 @@
 	case omci.GetRequestType:
 		responsePkt, _ = omcilib.CreateGetResponse(omciPkt, omciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId, o.CommittedImageEntityId)
 	case omci.SetRequestType:
-		if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
-		}
-
+		success := true
 		msgObj, _ := omcilib.ParseSetRequest(omciPkt)
 		switch msgObj.EntityClass {
 		case me.PhysicalPathTerminationPointEthernetUniClassID:
@@ -734,12 +741,101 @@
 				}
 				o.Channel <- msg
 			}
+		case me.TContClassID:
+			allocId := msgObj.Attributes["AllocId"].(uint16)
+
+			// if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
+			// otherwise we are adding it
+			if allocId == 255 || allocId == 65535 {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"TContId":      msgObj.EntityInstance,
+					"AllocId":      allocId,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-alloc-id-via-omci")
+				o.PonPort.removeAllocId(o.SerialNumber)
+			} else {
+				if used, sn := o.PonPort.isAllocIdAllocated(allocId); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+					success = false
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"TContId":      msgObj.EntityInstance,
+						"AllocId":      allocId,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-alloc-id-via-omci")
+					o.PonPort.storeAllocId(allocId, o.SerialNumber)
+				}
+			}
+
+		}
+
+		if success {
+			if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateSetResponse(omciPkt, omciMsg, me.AttributeFailure)
 		}
 	case omci.CreateRequestType:
-		if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg); errResp == nil {
-			o.MibDataSync++
+		// check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
+		var used bool
+		var sn *openolt.SerialNumber
+		msgObj, err := omcilib.ParseCreateRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+				} else {
+					onuLogger.WithFields(log.Fields{
+						"IntfId":       o.PonPortID,
+						"OnuId":        o.ID,
+						"GemPortId":    msgObj.EntityInstance,
+						"SerialNumber": o.Sn(),
+					}).Trace("storing-gem-port-id-via-omci")
+					o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
+				}
+			}
+		}
+
+		// if the gemPort is valid then increment the MDS and return a successful response
+		// otherwise fail the request
+		// for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
+		// validation this check will need to be rewritten
+		if !used {
+			if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+				o.MibDataSync++
+			}
+		} else {
+			responsePkt, _ = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.ProcessingError)
 		}
 	case omci.DeleteRequestType:
+		msgObj, err := omcilib.ParseDeleteRequest(omciPkt)
+		if err == nil {
+			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+				onuLogger.WithFields(log.Fields{
+					"IntfId":       o.PonPortID,
+					"OnuId":        o.ID,
+					"GemPortId":    msgObj.EntityInstance,
+					"SerialNumber": o.Sn(),
+				}).Trace("freeing-gem-port-id-via-omci")
+				o.PonPort.removeGemPort(msgObj.EntityInstance)
+			}
+		}
+
 		if responsePkt, errResp = omcilib.CreateDeleteResponse(omciPkt, omciMsg); errResp == nil {
 			o.MibDataSync++
 		}
@@ -1037,6 +1133,7 @@
 
 func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
 	onuLogger.WithFields(log.Fields{
+		"AllocId":           msg.Flow.AllocId,
 		"Cookie":            msg.Flow.Cookie,
 		"DstPort":           msg.Flow.Classifier.DstPort,
 		"FlowId":            msg.Flow.FlowId,
@@ -1123,7 +1220,6 @@
 			"OnuId":        o.ID,
 			"SerialNumber": o.Sn(),
 		}).Info("Resetting GemPort")
-		o.GemPortAdded = false
 
 		// check if ONU delete is performed and
 		// terminate the ONU's ProcessOnuMessages Go routine
@@ -1245,25 +1341,7 @@
 
 		if o.seqNumber > 290 {
 			// NOTE we are done with the MIB Upload (290 is the number of messages the omci-sim library will respond to)
-			galEnet, _ := omcilib.CreateGalEnetRequest(o.getNextTid(false))
-			sendOmciMsg(galEnet, o.PonPortID, o.ID, o.SerialNumber, "CreateGalEnetRequest", client)
-		} else {
-			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
-			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
-		}
-	case omci.CreateResponseType:
-		// NOTE Creating a GemPort,
-		// BBsim actually doesn't care about the values, so we can do we want with the parameters
-		// In the same way we can create a GemPort even without setting up UNIs/TConts/...
-		// but we need the GemPort to trigger the state change
-
-		if !o.GemPortAdded {
-			// NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
-			// thus we send this request only once
-			gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
-			sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
-			o.GemPortAdded = true
-		} else {
+			// start sending the flows, we don't care about the OMCI setup in BBR, just that a lot of messages can go through
 			if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
 				onuLogger.WithFields(log.Fields{
 					"OnuId":  o.ID,
@@ -1271,6 +1349,9 @@
 					"OnuSn":  o.Sn(),
 				}).Errorf("Error while transitioning ONU State %v", err)
 			}
+		} else {
+			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
+			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
 		}
 	}
 }
@@ -1290,14 +1371,16 @@
 		UniId:         int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1307,6 +1390,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to add EAPOL Flow")
 	}
 	log.WithFields(log.Fields{
@@ -1338,14 +1422,16 @@
 		UniId:         int32(0), // FIXME do not hardcode this
 		FlowId:        uint64(o.ID),
 		FlowType:      "downstream",
-		AllocId:       int32(0),
 		NetworkIntfId: int32(0),
-		GemportId:     int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
 		Classifier:    &classifierProto,
 		Action:        &actionProto,
 		Priority:      int32(100),
 		Cookie:        uint64(o.ID),
-		PortNo:        uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+		// AllocId and GemPorts need to be unique per PON
+		// for now use the ONU-ID, will need to change once we support multiple UNIs
+		AllocId:   int32(o.ID),
+		GemportId: int32(o.ID),
 	}
 
 	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1355,6 +1441,7 @@
 			"FlowId":       downstreamFlow.FlowId,
 			"PortNo":       downstreamFlow.PortNo,
 			"SerialNumber": common.OnuSnToString(o.SerialNumber),
+			"Err":          err,
 		}).Fatalf("Failed to send DHCP Flow")
 	}
 	log.WithFields(log.Fields{
diff --git a/internal/bbsim/devices/onu_flow_test.go b/internal/bbsim/devices/onu_flow_test.go
index e0ad9fe..b116843 100644
--- a/internal/bbsim/devices/onu_flow_test.go
+++ b/internal/bbsim/devices/onu_flow_test.go
@@ -100,8 +100,6 @@
 		fsm.Callbacks{},
 	)
 
-	onu.GemPortAdded = true
-
 	onu.FlowIds = []uint64{64}
 
 	flow := openolt.Flow{
@@ -115,7 +113,6 @@
 	}
 	onu.handleFlowRemove(msg)
 	assert.Equal(t, len(onu.FlowIds), 0)
-	assert.Equal(t, onu.GemPortAdded, false)
 }
 
 func TestOnu_HhandleEAPOLStart(t *testing.T) {
@@ -429,8 +426,6 @@
 	// one we received with the EAPOL flow
 	onu := createMockOnu(1, 1)
 
-	onu.GemPortAdded = false
-
 	onu.InternalState = fsm.NewFSM(
 		"enabled",
 		fsm.Events{
diff --git a/internal/bbsim/devices/onu_omci_test.go b/internal/bbsim/devices/onu_omci_test.go
index 4b9cc26..4d706b9 100644
--- a/internal/bbsim/devices/onu_omci_test.go
+++ b/internal/bbsim/devices/onu_omci_test.go
@@ -17,6 +17,7 @@
 package devices
 
 import (
+	"github.com/google/gopacket"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	omcilib "github.com/opencord/bbsim/internal/common/omci"
 	"github.com/opencord/omci-lib-go"
@@ -27,7 +28,6 @@
 )
 
 var mockAttr = me.AttributeValueMap{
-	"ManagedEntityId":                     12,
 	"PortId":                              0,
 	"TContPointer":                        0,
 	"Direction":                           0,
@@ -46,6 +46,7 @@
 		},
 		Attributes: mockAttr,
 	}
+
 	omciPkt, err := omcilib.Serialize(omci.CreateRequestType, omciReq, 66)
 	if err != nil {
 		t.Fatal(err.Error())
@@ -119,6 +120,34 @@
 	}
 }
 
+func omciBytesToMsg(t *testing.T, data []byte) (*omci.OMCI, *gopacket.Packet) {
+	packet := gopacket.NewPacket(data, omci.LayerTypeOMCI, gopacket.NoCopy)
+	if packet == nil {
+		t.Fatal("could not decode rxMsg as OMCI")
+	}
+	omciLayer := packet.Layer(omci.LayerTypeOMCI)
+	if omciLayer == nil {
+		t.Fatal("could not decode omci layer")
+	}
+	omciMsg, ok := omciLayer.(*omci.OMCI)
+	if !ok {
+		t.Fatal("could not assign omci layer")
+	}
+	return omciMsg, &packet
+}
+
+func omciToCreateResponse(t *testing.T, omciPkt *gopacket.Packet) *omci.CreateResponse {
+	msgLayer := (*omciPkt).Layer(omci.LayerTypeCreateResponse)
+	if msgLayer == nil {
+		t.Fatal("omci Msg layer could not be detected for CreateResponse - handling of MibSyncChan stopped")
+	}
+	msgObj, msgOk := msgLayer.(*omci.CreateResponse)
+	if !msgOk {
+		t.Fatal("omci Msg layer could not be assigned for CreateResponse - handling of MibSyncChan stopped")
+	}
+	return msgObj
+}
+
 func Test_MibDataSyncIncrease(t *testing.T) {
 	onu := createMockOnu(1, 1)
 
@@ -174,3 +203,39 @@
 	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciDeleteRequest(t)), stream)
 	assert.Equal(t, onu.MibDataSync, uint8(0))
 }
+
+func Test_GemPortValidation(t *testing.T) {
+
+	// setup
+	onu := createMockOnu(1, 1)
+
+	stream := &mockStream{
+		Calls: make(map[int]*openolt.Indication),
+	}
+
+	// create a gem port via OMCI (gemPortId 12)
+	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+	// the first time we created the gemPort
+	// the MDS should be incremented
+	assert.Equal(t, stream.CallCount, 1)
+	assert.Equal(t, onu.MibDataSync, uint8(1))
+
+	// and the OMCI response status should be me.Success
+	indication := stream.Calls[1].GetOmciInd()
+	_, omciPkt := omciBytesToMsg(t, indication.Pkt)
+	responseLayer := omciToCreateResponse(t, omciPkt)
+	assert.Equal(t, responseLayer.Result, me.Success)
+
+	// send a request to create the same gem port via OMCI (gemPortId 12)
+	onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+	// this time the MDS should not be incremented
+	assert.Equal(t, stream.CallCount, 2)
+	assert.Equal(t, onu.MibDataSync, uint8(1))
+
+	// and the OMCI response status should be me.ProcessingError
+	_, omciPkt = omciBytesToMsg(t, stream.Calls[2].GetOmciInd().Pkt)
+	responseLayer = omciToCreateResponse(t, omciPkt)
+	assert.Equal(t, responseLayer.Result, me.ProcessingError)
+}
diff --git a/internal/bbsim/devices/onu_state_machine_test.go b/internal/bbsim/devices/onu_state_machine_test.go
index 6f62540..f0fd232 100644
--- a/internal/bbsim/devices/onu_state_machine_test.go
+++ b/internal/bbsim/devices/onu_state_machine_test.go
@@ -37,7 +37,6 @@
 	assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
 
 	onu.PortNo = 16
-	onu.GemPortAdded = true
 	onu.Flows = []FlowKey{
 		{ID: 1, Direction: "upstream"},
 		{ID: 2, Direction: "downstream"},
@@ -46,7 +45,6 @@
 	_ = onu.InternalState.Event(OnuTxDisable)
 	assert.Equal(t, onu.InternalState.Current(), OnuStateDisabled)
 
-	assert.Equal(t, onu.GemPortAdded, false)
 	assert.Equal(t, onu.PortNo, uint32(0))
 	assert.Equal(t, len(onu.Flows), 0)
 }
@@ -95,7 +93,6 @@
 	assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
 
 	// succeed
-	onu.GemPortAdded = true
 	_ = onu.InternalState.Event("start_auth")
 	assert.Equal(t, onu.InternalState.Current(), "auth_started")
 }
@@ -104,8 +101,6 @@
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
 
-	onu.GemPortAdded = true
-
 	onu.InternalState.SetState("auth_started")
 
 	assert.Equal(t, onu.InternalState.Current(), "auth_started")
@@ -168,8 +163,6 @@
 	onu.InternalState.SetState("eap_response_success_received")
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
 
-	onu.GemPortAdded = false
-
 	err := onu.InternalState.Event("start_dhcp")
 	if err == nil {
 		t.Fail()
@@ -181,7 +174,6 @@
 func Test_Onu_StateMachine_dhcp_start(t *testing.T) {
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
-	onu.GemPortAdded = true
 
 	onu.InternalState.SetState("eap_response_success_received")
 	assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
@@ -195,8 +187,6 @@
 	t.Skip("Needs to be moved in the Service struct")
 	onu := createTestOnu()
 
-	onu.GemPortAdded = true
-
 	onu.InternalState.SetState("dhcp_started")
 
 	assert.Equal(t, onu.InternalState.Current(), "dhcp_started")
diff --git a/internal/bbsim/devices/onu_test_helpers.go b/internal/bbsim/devices/onu_test_helpers.go
index f0d5dab..2f18199 100644
--- a/internal/bbsim/devices/onu_test_helpers.go
+++ b/internal/bbsim/devices/onu_test_helpers.go
@@ -137,15 +137,16 @@
 // this method creates a fake ONU used in the tests
 func createMockOnu(id uint32, ponPortId uint32) *Onu {
 	o := Onu{
-		ID:           id,
-		PonPortID:    ponPortId,
-		PortNo:       0,
-		GemPortAdded: true,
+		ID:        id,
+		PonPortID: ponPortId,
+		PortNo:    0,
 		PonPort: &PonPort{
-			Olt: &OltDevice{},
+			AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+			AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+			Olt:               &OltDevice{},
 		},
 	}
-	o.SerialNumber = o.NewSN(0, ponPortId, o.ID)
+	o.SerialNumber = NewSN(0, ponPortId, o.ID)
 	o.Channel = make(chan types.Message, 10)
 	return &o
 }
@@ -155,11 +156,10 @@
 	olt := OltDevice{
 		ID: 0,
 	}
-	pon := PonPort{
-		ID:  1,
-		Olt: &olt,
-	}
-	onu := CreateONU(&olt, &pon, 1, time.Duration(1*time.Millisecond), true)
+
+	pon := CreatePonPort(&olt, 1)
+
+	onu := CreateONU(&olt, pon, 1, time.Duration(1*time.Millisecond), true)
 	// NOTE we need this in order to create the OnuChannel
 	_ = onu.InternalState.Event(OnuTxInitialize)
 	onu.DiscoveryRetryDelay = 100 * time.Millisecond
diff --git a/internal/bbsim/devices/pon.go b/internal/bbsim/devices/pon.go
index 1e0483d..3d74eb1 100644
--- a/internal/bbsim/devices/pon.go
+++ b/internal/bbsim/devices/pon.go
@@ -19,6 +19,7 @@
 import (
 	"bytes"
 	"fmt"
+	"sync"
 
 	"github.com/looplab/fsm"
 	"github.com/opencord/voltha-protos/v4/go/openolt"
@@ -37,17 +38,30 @@
 	// PON Attributes
 	OperState *fsm.FSM
 	Type      string
+
+	// Allocated resources
+	// Some resources (eg: OnuId, AllocId and GemPorts) have to be unique per PON port
+	// we are keeping a list so that we can throw an error in cases we receive duplicates
+	AllocatedGemPorts     map[uint16]*openolt.SerialNumber
+	allocatedGemPortsLock sync.RWMutex
+	AllocatedOnuIds       map[uint32]*openolt.SerialNumber
+	allocatedOnuIdsLock   sync.RWMutex
+	AllocatedAllocIds     map[uint16]*openolt.SerialNumber
+	allocatedAllocIdsLock sync.RWMutex
 }
 
 // CreatePonPort creates pon port object
 func CreatePonPort(olt *OltDevice, id uint32) *PonPort {
 
 	ponPort := PonPort{
-		NumOnu: olt.NumOnuPerPon,
-		ID:     id,
-		Type:   "pon",
-		Olt:    olt,
-		Onus:   []*Onu{},
+		NumOnu:            olt.NumOnuPerPon,
+		ID:                id,
+		Type:              "pon",
+		Olt:               olt,
+		Onus:              []*Onu{},
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+		AllocatedOnuIds:   make(map[uint32]*openolt.SerialNumber),
+		AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
 	}
 
 	ponPort.InternalState = fsm.NewFSM(
@@ -170,7 +184,7 @@
 	return &ponPort
 }
 
-func (p PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
+func (p *PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
 	for _, onu := range p.Onus {
 		if bytes.Equal(onu.SerialNumber.VendorSpecific, sn.VendorSpecific) {
 			return onu, nil
@@ -179,7 +193,7 @@
 	return nil, fmt.Errorf("Cannot find Onu with serial number %d in PonPort %d", sn, p.ID)
 }
 
-func (p PonPort) GetOnuById(id uint32) (*Onu, error) {
+func (p *PonPort) GetOnuById(id uint32) (*Onu, error) {
 	for _, onu := range p.Onus {
 		if onu.ID == id {
 			return onu, nil
@@ -189,7 +203,7 @@
 }
 
 // GetNumOfActiveOnus returns number of active ONUs for PON port
-func (p PonPort) GetNumOfActiveOnus() uint32 {
+func (p *PonPort) GetNumOfActiveOnus() uint32 {
 	var count uint32 = 0
 	for _, onu := range p.Onus {
 		if onu.InternalState.Current() == OnuStateInitialized || onu.InternalState.Current() == OnuStateCreated || onu.InternalState.Current() == OnuStateDisabled {
@@ -199,3 +213,111 @@
 	}
 	return count
 }
+
+// storeOnuId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeOnuId(onuId uint32, onuSn *openolt.SerialNumber) {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	p.AllocatedOnuIds[onuId] = onuSn
+}
+
+// removeOnuId removes the OnuId from the allocated resources
+func (p *PonPort) removeOnuId(onuId uint32) {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	delete(p.AllocatedOnuIds, onuId)
+}
+
+func (p *PonPort) removeAllOnuIds() {
+	p.allocatedOnuIdsLock.Lock()
+	defer p.allocatedOnuIdsLock.Unlock()
+	p.AllocatedOnuIds = make(map[uint32]*openolt.SerialNumber)
+}
+
+// isOnuIdAllocated returns whether this OnuId is already in use on this PON
+func (p *PonPort) isOnuIdAllocated(onuId uint32) (bool, *openolt.SerialNumber) {
+	p.allocatedOnuIdsLock.RLock()
+	defer p.allocatedOnuIdsLock.RUnlock()
+
+	if _, ok := p.AllocatedOnuIds[onuId]; ok {
+		return true, p.AllocatedOnuIds[onuId]
+	}
+	return false, nil
+}
+
+// storeGemPort adds the gemPortId to the gemports already allocated to this PON port
+func (p *PonPort) storeGemPort(gemPortId uint16, onuSn *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	p.AllocatedGemPorts[gemPortId] = onuSn
+}
+
+// removeGemPort removes the gemPortId from the allocated resources
+func (p *PonPort) removeGemPort(gemPortId uint16) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	delete(p.AllocatedGemPorts, gemPortId)
+}
+
+func (p *PonPort) removeGemPortBySn(onuSn *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	for gemPort, sn := range p.AllocatedGemPorts {
+		if sn == onuSn {
+			delete(p.AllocatedGemPorts, gemPort)
+		}
+	}
+}
+
+func (p *PonPort) removeAllGemPorts() {
+	p.allocatedGemPortsLock.Lock()
+	defer p.allocatedGemPortsLock.Unlock()
+	p.AllocatedGemPorts = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isGemPortAllocated returns whether this gemPort is already in use on this PON
+func (p *PonPort) isGemPortAllocated(gemPortId uint16) (bool, *openolt.SerialNumber) {
+	p.allocatedGemPortsLock.RLock()
+	defer p.allocatedGemPortsLock.RUnlock()
+
+	if _, ok := p.AllocatedGemPorts[gemPortId]; ok {
+		return true, p.AllocatedGemPorts[gemPortId]
+	}
+	return false, nil
+}
+
+// storeAllocId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeAllocId(allocId uint16, onuSn *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	p.AllocatedAllocIds[allocId] = onuSn
+}
+
+// removeAllocId removes the AllocId from the allocated resources
+// this is done via SN as the AllocId is not remove but set to a default value
+func (p *PonPort) removeAllocId(onuSn *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	for allocId, sn := range p.AllocatedAllocIds {
+		if sn == onuSn {
+			delete(p.AllocatedAllocIds, allocId)
+		}
+	}
+}
+
+func (p *PonPort) removeAllAllocIds() {
+	p.allocatedAllocIdsLock.Lock()
+	defer p.allocatedAllocIdsLock.Unlock()
+	p.AllocatedAllocIds = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isAllocIdAllocated returns whether this AllocId is already in use on this PON
+func (p *PonPort) isAllocIdAllocated(allocId uint16) (bool, *openolt.SerialNumber) {
+	p.allocatedAllocIdsLock.RLock()
+	defer p.allocatedAllocIdsLock.RUnlock()
+
+	if _, ok := p.AllocatedAllocIds[allocId]; ok {
+		return true, p.AllocatedAllocIds[allocId]
+	}
+	return false, nil
+}
diff --git a/internal/bbsim/devices/pon_test.go b/internal/bbsim/devices/pon_test.go
new file mode 100644
index 0000000..45e1861
--- /dev/null
+++ b/internal/bbsim/devices/pon_test.go
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package devices
+
+import (
+	"github.com/opencord/voltha-protos/v4/go/openolt"
+	"github.com/stretchr/testify/assert"
+	"sync"
+	"testing"
+)
+
+var sn1 = NewSN(0, 0, 1)
+var sn2 = NewSN(0, 0, 2)
+var sn3 = NewSN(0, 0, 3)
+
+// NOTE that we are using a benchmark test to actually test concurrency
+func Benchmark_storeGemPort(b *testing.B) {
+	pon := PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(3)
+
+	// concurrently add multiple ports
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(1, sn1); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(2, sn2); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.storeGemPort(3, sn3); wg.Done() }(&wg)
+
+	wg.Wait()
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+}
+
+func Benchmark_removeGemPort(b *testing.B) {
+	pon := PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	pon.storeGemPort(3, sn3)
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+
+	wg := sync.WaitGroup{}
+	wg.Add(3)
+
+	// concurrently add multiple ports
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(1); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(2); wg.Done() }(&wg)
+	go func(wg *sync.WaitGroup) { pon.removeGemPort(3); wg.Done() }(&wg)
+
+	wg.Wait()
+
+	assert.Equal(b, len(pon.AllocatedGemPorts), 0)
+}
+
+func Test_removeGemPort(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove a non exiting gemPort
+	pon.removeGemPort(3)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove an existing gemPort
+	pon.removeGemPort(1)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+}
+
+func Test_removeGemPortBySn(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+	pon.storeGemPort(2, sn2)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+	// remove a non exiting gemPort
+	pon.removeGemPortBySn(sn1)
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+	assert.Nil(t, pon.AllocatedGemPorts[1])
+	assert.Equal(t, pon.AllocatedGemPorts[2], sn2)
+}
+
+func Test_isGemPortAllocated(t *testing.T) {
+	pon := &PonPort{
+		AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.storeGemPort(1, sn1)
+
+	assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+	free, sn := pon.isGemPortAllocated(1)
+
+	assert.Equal(t, free, true)
+	assert.Equal(t, sn, sn1)
+
+	used, sn_ := pon.isGemPortAllocated(2)
+
+	assert.Equal(t, used, false)
+	assert.Nil(t, sn_)
+}
+
+// the allocId is never removed, is always set to either 255 or 65535
+func Test_removeAllocId(t *testing.T) {
+
+	const (
+		allocId1 = 1024
+		allocId2 = 1025
+	)
+
+	pon := &PonPort{
+		AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+	}
+
+	pon.AllocatedAllocIds[allocId1] = sn1
+	pon.AllocatedAllocIds[allocId2] = sn2
+
+	assert.Equal(t, len(pon.AllocatedAllocIds), 2)
+
+	pon.removeAllocId(sn1)
+
+	assert.Equal(t, len(pon.AllocatedAllocIds), 1)
+	assert.Nil(t, pon.AllocatedAllocIds[allocId1])
+	assert.Equal(t, pon.AllocatedAllocIds[allocId2], sn2)
+
+}