[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds

Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index f9ef936..87b9da4 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -90,6 +90,13 @@
 
 	OpenoltStream openolt.Openolt_EnableIndicationServer
 	enablePerf    bool
+
+	// Allocated Resources
+	// this data are to verify that the openolt adapter does not duplicate resources
+	AllocIDsLock   sync.RWMutex
+	AllocIDs       map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[AllocIds]map[FlowId]bool
+	GemPortIDsLock sync.RWMutex
+	GemPortIDs     map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[GemPortIDs]map[FlowId]bool
 }
 
 var olt OltDevice
@@ -123,6 +130,8 @@
 		PortStatsInterval:   options.Olt.PortStatsInterval,
 		dhcpServer:          dhcp.NewDHCPServer(),
 		PreviouslyConnected: false,
+		AllocIDs:            make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+		GemPortIDs:          make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
 	}
 
 	if val, ok := ControlledActivationModes[options.BBSim.ControlledActivation]; ok {
@@ -149,6 +158,10 @@
 				oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst)
 			},
 			"enter_initialized": func(e *fsm.Event) { olt.InitOlt() },
+			"enter_deleted": func(e *fsm.Event) {
+				// remove all the resource allocations
+				olt.clearAllResources()
+			},
 		},
 	)
 
@@ -169,6 +182,11 @@
 
 	// create PON ports
 	for i := 0; i < olt.NumPon; i++ {
+
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
 		p := CreatePonPort(&olt, uint32(i))
 
 		// create ONU devices
@@ -253,6 +271,12 @@
 		// in-band management
 		o.Nnis[0].OperState.SetState("down")
 	}
+
+	for ponId := range o.Pons {
+		// initialize the resource maps for every PON Ports
+		olt.AllocIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+		olt.GemPortIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+	}
 }
 
 func (o *OltDevice) RestartOLT() error {
@@ -822,16 +846,14 @@
 	publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
 
 	pon, _ := o.GetPonById(onu.IntfId)
+
+	// Initialize the resource maps for this ONU
+	olt.AllocIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+	olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+
 	_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
 	_onu.SetID(onu.OnuId)
 
-	if err := _onu.OperState.Event("enable"); err != nil {
-		oltLogger.WithFields(log.Fields{
-			"IntfId": _onu.PonPortID,
-			"OnuSn":  _onu.Sn(),
-			"OnuId":  _onu.ID,
-		}).Infof("Failed to transition ONU.OperState to enabled state: %s", err.Error())
-	}
 	if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
 		oltLogger.WithFields(log.Fields{
 			"IntfId": _onu.PonPortID,
@@ -845,7 +867,7 @@
 	return new(openolt.Empty), nil
 }
 
-func (o *OltDevice) DeactivateOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
+func (o *OltDevice) DeactivateOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
 	oltLogger.Error("DeactivateOnu not implemented")
 	return new(openolt.Empty), nil
 }
@@ -1056,6 +1078,21 @@
 			}
 		}
 
+		// validate that the flow reference correct IDs (Alloc, Gem)
+		if err := o.validateFlow(flow); err != nil {
+			oltLogger.WithFields(log.Fields{
+				"OnuId":        flow.OnuId,
+				"IntfId":       flow.AccessIntfId,
+				"Flow":         flow,
+				"SerialNumber": onu.Sn(),
+				"err":          err,
+			}).Error("invalid-flow-for-onu")
+			return nil, err
+		}
+
+		o.storeGemPortId(flow)
+		o.storeAllocId(flow)
+
 		msg := types.Message{
 			Type: types.FlowAdd,
 			Data: types.OnuFlowUpdateMessage{
@@ -1074,10 +1111,22 @@
 func (o *OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
 
 	oltLogger.WithFields(log.Fields{
-		"FlowId":   flow.FlowId,
-		"FlowType": flow.FlowType,
+		"AllocId":       flow.AllocId,
+		"Cookie":        flow.Cookie,
+		"FlowId":        flow.FlowId,
+		"FlowType":      flow.FlowType,
+		"GemportId":     flow.GemportId,
+		"IntfId":        flow.AccessIntfId,
+		"OnuId":         flow.OnuId,
+		"PortNo":        flow.PortNo,
+		"UniID":         flow.UniId,
+		"ReplicateFlow": flow.ReplicateFlow,
+		"PbitToGemport": flow.PbitToGemport,
 	}).Debug("OLT receives FlowRemove")
 
+	olt.freeGemPortId(flow)
+	olt.freeAllocId(flow)
+
 	if !o.enablePerf { // remove only if flow were stored
 		flowKey := FlowKey{
 			ID:        flow.FlowId,
@@ -1177,11 +1226,6 @@
 
 func (o *OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
 
-	oltLogger.WithFields(log.Fields{
-		"oltId":    o.ID,
-		"PonPorts": o.NumPon,
-	}).Info("OLT receives GetDeviceInfo call from VOLTHA")
-
 	intfIDs := []uint32{}
 	for i := 0; i < o.NumPon; i++ {
 		intfIDs = append(intfIDs, uint32(i))
@@ -1538,3 +1582,179 @@
 func (o *OltDevice) GetOnuStatistics(ctx context.Context, in *openolt.Onu) (*openolt.OnuStatistics, error) {
 	return &openolt.OnuStatistics{}, nil
 }
+
+func (o *OltDevice) storeAllocId(flow *openolt.Flow) {
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":  flow.AccessIntfId,
+		"OnuId":   flow.OnuId,
+		"PortNo":  flow.PortNo,
+		"AllocId": flow.AllocId,
+	}).Trace("storing-alloc-id-via-flow")
+
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId]; !ok {
+		o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId] = make(map[uint64]bool)
+	}
+	o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeAllocId(flow *openolt.Flow) {
+	// if this is the last flow referencing the AllocId then remove it
+	o.AllocIDsLock.Lock()
+	defer o.AllocIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-alloc-id-via-flow")
+
+	// NOTE look at the freeGemPortId implementation for comments and context
+	for ponId, ponValues := range o.AllocIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for allocId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.AllocIDs[ponId][onuId][uniId][allocId], flow.FlowId)
+						}
+						// if that was the last flow for a particular allocId, remove the entire allocId
+						if len(o.AllocIDs[ponId][onuId][uniId][allocId]) == 0 {
+							delete(o.AllocIDs[ponId][onuId][uniId], allocId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func (o *OltDevice) storeGemPortId(flow *openolt.Flow) {
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("storing-gem-port-id-via-flow")
+
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+	}
+	if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]; !ok {
+		o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId] = make(map[uint64]bool)
+	}
+	o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
+	// if this is the last flow referencing the GemPort then remove it
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+	}).Trace("freeing-gem-port-id-via-flow")
+
+	// NOTE that this loop is not very performant, it would be better if the flow carries
+	// the same information that it carries during a FlowAdd. If so we can directly remove
+	// items from the map
+
+	//delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId], flow.FlowId)
+	//if len(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]) == 0 {
+	//	delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo], flow.GemportId)
+	//}
+
+	// NOTE this loop assumes that flow IDs are unique per device
+	for ponId, ponValues := range o.GemPortIDs {
+		for onuId, onuValues := range ponValues {
+			for uniId, uniValues := range onuValues {
+				for gemId, flows := range uniValues {
+					for flowId := range flows {
+						// if the flow matches, remove it from the map.
+						if flow.FlowId == flowId {
+							delete(o.GemPortIDs[ponId][onuId][uniId][gemId], flow.FlowId)
+						}
+						// if that was the last flow for a particular gem, remove the entire gem
+						if len(o.GemPortIDs[ponId][onuId][uniId][gemId]) == 0 {
+							delete(o.GemPortIDs[ponId][onuId][uniId], gemId)
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+// validateFlow checks that:
+// - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
+// - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
+func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
+
+	// validate gemPort
+	o.GemPortIDsLock.RLock()
+	allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
+	o.GemPortIDsLock.RUnlock()
+	for onuId, onu := range allocatedGems {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for gem := range uni {
+				if gem == flow.GemportId {
+					return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d", gem, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	o.AllocIDsLock.RLock()
+	allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
+	o.AllocIDsLock.RUnlock()
+	for onuId, onu := range allocatedAllocIds {
+		if onuId == uint32(flow.OnuId) {
+			continue
+		}
+		for uniId, uni := range onu {
+			for allocId := range uni {
+				if allocId == flow.AllocId {
+					return fmt.Errorf("allocId-%d-already-in-use-on-uni-%d-onu-%d", allocId, uniId, onuId)
+				}
+			}
+		}
+	}
+
+	return nil
+}
+
+// clearAllResources is invoked up OLT Reboot to remove all the allocated
+// GemPorts, AllocId and ONU-IDs across the PONs
+func (o *OltDevice) clearAllResources() {
+
+	// remove the resources received via flows
+	o.GemPortIDsLock.Lock()
+	o.GemPortIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.GemPortIDsLock.Unlock()
+	o.AllocIDsLock.Lock()
+	o.AllocIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+	o.AllocIDsLock.Unlock()
+
+	// remove the resources received via OMCI
+	for _, pon := range o.Pons {
+		pon.removeAllAllocIds()
+		pon.removeAllGemPorts()
+		pon.removeAllOnuIds()
+	}
+}