[VOL-3837] Checking that OnuId, AllocId and GemPorts are unique per PON
Validate that received flow carry valid GemPortId and AllocIds
Change-Id: I1b8928c7a9e580c9711f61320595a449df7c30f5
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index f9ef936..87b9da4 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -90,6 +90,13 @@
OpenoltStream openolt.Openolt_EnableIndicationServer
enablePerf bool
+
+ // Allocated Resources
+ // this data are to verify that the openolt adapter does not duplicate resources
+ AllocIDsLock sync.RWMutex
+ AllocIDs map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[AllocIds]map[FlowId]bool
+ GemPortIDsLock sync.RWMutex
+ GemPortIDs map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[GemPortIDs]map[FlowId]bool
}
var olt OltDevice
@@ -123,6 +130,8 @@
PortStatsInterval: options.Olt.PortStatsInterval,
dhcpServer: dhcp.NewDHCPServer(),
PreviouslyConnected: false,
+ AllocIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+ GemPortIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
}
if val, ok := ControlledActivationModes[options.BBSim.ControlledActivation]; ok {
@@ -149,6 +158,10 @@
oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst)
},
"enter_initialized": func(e *fsm.Event) { olt.InitOlt() },
+ "enter_deleted": func(e *fsm.Event) {
+ // remove all the resource allocations
+ olt.clearAllResources()
+ },
},
)
@@ -169,6 +182,11 @@
// create PON ports
for i := 0; i < olt.NumPon; i++ {
+
+ // initialize the resource maps for every PON Ports
+ olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+ olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
p := CreatePonPort(&olt, uint32(i))
// create ONU devices
@@ -253,6 +271,12 @@
// in-band management
o.Nnis[0].OperState.SetState("down")
}
+
+ for ponId := range o.Pons {
+ // initialize the resource maps for every PON Ports
+ olt.AllocIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+ olt.GemPortIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+ }
}
func (o *OltDevice) RestartOLT() error {
@@ -822,16 +846,14 @@
publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), onuSnToString(onu.SerialNumber))
pon, _ := o.GetPonById(onu.IntfId)
+
+ // Initialize the resource maps for this ONU
+ olt.AllocIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+ olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool)
+
_onu, _ := pon.GetOnuBySn(onu.SerialNumber)
_onu.SetID(onu.OnuId)
- if err := _onu.OperState.Event("enable"); err != nil {
- oltLogger.WithFields(log.Fields{
- "IntfId": _onu.PonPortID,
- "OnuSn": _onu.Sn(),
- "OnuId": _onu.ID,
- }).Infof("Failed to transition ONU.OperState to enabled state: %s", err.Error())
- }
if err := _onu.InternalState.Event(OnuTxEnable); err != nil {
oltLogger.WithFields(log.Fields{
"IntfId": _onu.PonPortID,
@@ -845,7 +867,7 @@
return new(openolt.Empty), nil
}
-func (o *OltDevice) DeactivateOnu(context.Context, *openolt.Onu) (*openolt.Empty, error) {
+func (o *OltDevice) DeactivateOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) {
oltLogger.Error("DeactivateOnu not implemented")
return new(openolt.Empty), nil
}
@@ -1056,6 +1078,21 @@
}
}
+ // validate that the flow reference correct IDs (Alloc, Gem)
+ if err := o.validateFlow(flow); err != nil {
+ oltLogger.WithFields(log.Fields{
+ "OnuId": flow.OnuId,
+ "IntfId": flow.AccessIntfId,
+ "Flow": flow,
+ "SerialNumber": onu.Sn(),
+ "err": err,
+ }).Error("invalid-flow-for-onu")
+ return nil, err
+ }
+
+ o.storeGemPortId(flow)
+ o.storeAllocId(flow)
+
msg := types.Message{
Type: types.FlowAdd,
Data: types.OnuFlowUpdateMessage{
@@ -1074,10 +1111,22 @@
func (o *OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
oltLogger.WithFields(log.Fields{
- "FlowId": flow.FlowId,
- "FlowType": flow.FlowType,
+ "AllocId": flow.AllocId,
+ "Cookie": flow.Cookie,
+ "FlowId": flow.FlowId,
+ "FlowType": flow.FlowType,
+ "GemportId": flow.GemportId,
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "UniID": flow.UniId,
+ "ReplicateFlow": flow.ReplicateFlow,
+ "PbitToGemport": flow.PbitToGemport,
}).Debug("OLT receives FlowRemove")
+ olt.freeGemPortId(flow)
+ olt.freeAllocId(flow)
+
if !o.enablePerf { // remove only if flow were stored
flowKey := FlowKey{
ID: flow.FlowId,
@@ -1177,11 +1226,6 @@
func (o *OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) {
- oltLogger.WithFields(log.Fields{
- "oltId": o.ID,
- "PonPorts": o.NumPon,
- }).Info("OLT receives GetDeviceInfo call from VOLTHA")
-
intfIDs := []uint32{}
for i := 0; i < o.NumPon; i++ {
intfIDs = append(intfIDs, uint32(i))
@@ -1538,3 +1582,179 @@
func (o *OltDevice) GetOnuStatistics(ctx context.Context, in *openolt.Onu) (*openolt.OnuStatistics, error) {
return &openolt.OnuStatistics{}, nil
}
+
+func (o *OltDevice) storeAllocId(flow *openolt.Flow) {
+ o.AllocIDsLock.Lock()
+ defer o.AllocIDsLock.Unlock()
+
+ oltLogger.WithFields(log.Fields{
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "AllocId": flow.AllocId,
+ }).Trace("storing-alloc-id-via-flow")
+
+ if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+ o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+ }
+ if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId]; !ok {
+ o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId] = make(map[uint64]bool)
+ }
+ o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeAllocId(flow *openolt.Flow) {
+ // if this is the last flow referencing the AllocId then remove it
+ o.AllocIDsLock.Lock()
+ defer o.AllocIDsLock.Unlock()
+
+ oltLogger.WithFields(log.Fields{
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ }).Trace("freeing-alloc-id-via-flow")
+
+ // NOTE look at the freeGemPortId implementation for comments and context
+ for ponId, ponValues := range o.AllocIDs {
+ for onuId, onuValues := range ponValues {
+ for uniId, uniValues := range onuValues {
+ for allocId, flows := range uniValues {
+ for flowId := range flows {
+ // if the flow matches, remove it from the map.
+ if flow.FlowId == flowId {
+ delete(o.AllocIDs[ponId][onuId][uniId][allocId], flow.FlowId)
+ }
+ // if that was the last flow for a particular allocId, remove the entire allocId
+ if len(o.AllocIDs[ponId][onuId][uniId][allocId]) == 0 {
+ delete(o.AllocIDs[ponId][onuId][uniId], allocId)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+func (o *OltDevice) storeGemPortId(flow *openolt.Flow) {
+ o.GemPortIDsLock.Lock()
+ defer o.GemPortIDsLock.Unlock()
+
+ oltLogger.WithFields(log.Fields{
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ }).Trace("storing-gem-port-id-via-flow")
+
+ if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
+ o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool)
+ }
+ if _, ok := o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]; !ok {
+ o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId] = make(map[uint64]bool)
+ }
+ o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId][flow.FlowId] = true
+}
+
+func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
+ // if this is the last flow referencing the GemPort then remove it
+ o.GemPortIDsLock.Lock()
+ defer o.GemPortIDsLock.Unlock()
+
+ oltLogger.WithFields(log.Fields{
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ }).Trace("freeing-gem-port-id-via-flow")
+
+ // NOTE that this loop is not very performant, it would be better if the flow carries
+ // the same information that it carries during a FlowAdd. If so we can directly remove
+ // items from the map
+
+ //delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId], flow.FlowId)
+ //if len(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]) == 0 {
+ // delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo], flow.GemportId)
+ //}
+
+ // NOTE this loop assumes that flow IDs are unique per device
+ for ponId, ponValues := range o.GemPortIDs {
+ for onuId, onuValues := range ponValues {
+ for uniId, uniValues := range onuValues {
+ for gemId, flows := range uniValues {
+ for flowId := range flows {
+ // if the flow matches, remove it from the map.
+ if flow.FlowId == flowId {
+ delete(o.GemPortIDs[ponId][onuId][uniId][gemId], flow.FlowId)
+ }
+ // if that was the last flow for a particular gem, remove the entire gem
+ if len(o.GemPortIDs[ponId][onuId][uniId][gemId]) == 0 {
+ delete(o.GemPortIDs[ponId][onuId][uniId], gemId)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+// validateFlow checks that:
+// - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
+// - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
+func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
+
+ // validate gemPort
+ o.GemPortIDsLock.RLock()
+ allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
+ o.GemPortIDsLock.RUnlock()
+ for onuId, onu := range allocatedGems {
+ if onuId == uint32(flow.OnuId) {
+ continue
+ }
+ for uniId, uni := range onu {
+ for gem := range uni {
+ if gem == flow.GemportId {
+ return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d", gem, uniId, onuId)
+ }
+ }
+ }
+ }
+
+ o.AllocIDsLock.RLock()
+ allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
+ o.AllocIDsLock.RUnlock()
+ for onuId, onu := range allocatedAllocIds {
+ if onuId == uint32(flow.OnuId) {
+ continue
+ }
+ for uniId, uni := range onu {
+ for allocId := range uni {
+ if allocId == flow.AllocId {
+ return fmt.Errorf("allocId-%d-already-in-use-on-uni-%d-onu-%d", allocId, uniId, onuId)
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+// clearAllResources is invoked up OLT Reboot to remove all the allocated
+// GemPorts, AllocId and ONU-IDs across the PONs
+func (o *OltDevice) clearAllResources() {
+
+ // remove the resources received via flows
+ o.GemPortIDsLock.Lock()
+ o.GemPortIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+ o.GemPortIDsLock.Unlock()
+ o.AllocIDsLock.Lock()
+ o.AllocIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool)
+ o.AllocIDsLock.Unlock()
+
+ // remove the resources received via OMCI
+ for _, pon := range o.Pons {
+ pon.removeAllAllocIds()
+ pon.removeAllGemPorts()
+ pon.removeAllOnuIds()
+ }
+}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 154942b..acbe7a7 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -28,15 +28,27 @@
func createMockOlt(numPon int, numOnu int, services []ServiceIf) *OltDevice {
olt := &OltDevice{
- ID: 0,
+ ID: 0,
+ AllocIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
+ GemPortIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool),
}
for i := 0; i < numPon; i++ {
+
+ // initialize the resource maps for every PON Ports
+ olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+ olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool)
+
pon := PonPort{
ID: uint32(i),
}
for j := 0; j < numOnu; j++ {
+
+ // initialize the resource maps for every ONU and the first UNI
+ olt.AllocIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+ olt.GemPortIDs[uint32(i)][uint32(j)] = make(map[uint32]map[int32]map[uint64]bool)
+
onuId := uint32(i + j)
onu := Onu{
ID: onuId,
@@ -51,7 +63,7 @@
onu.Services = append(onu.Services, service)
}
- onu.SerialNumber = onu.NewSN(olt.ID, pon.ID, onu.ID)
+ onu.SerialNumber = NewSN(olt.ID, pon.ID, onu.ID)
pon.Onus = append(pon.Onus, &onu)
}
olt.Pons = append(olt.Pons, &pon)
@@ -227,3 +239,209 @@
assert.Equal(t, err, nil)
assert.Equal(t, found.Sn(), onu1.Sn())
}
+
+func Test_Olt_storeGemPortId(t *testing.T) {
+
+ const (
+ pon = 1
+ onu = 1
+ uni = 16
+ gem1 = 1024
+ gem2 = 1025
+ )
+
+ numPon := 2
+ numOnu := 2
+
+ olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+ // add a first flow on the ONU
+ flow1 := &openolt.Flow{
+ AccessIntfId: pon,
+ OnuId: onu,
+ PortNo: uni,
+ FlowId: 1,
+ GemportId: gem1,
+ }
+
+ olt.storeGemPortId(flow1)
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1) // we have 1 gem port
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1) // and one flow referencing it
+
+ // add a second flow on the ONU (same gem)
+ flow2 := &openolt.Flow{
+ AccessIntfId: pon,
+ OnuId: onu,
+ PortNo: uni,
+ FlowId: 2,
+ GemportId: gem1,
+ }
+
+ olt.storeGemPortId(flow2)
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1) // we have 1 gem port
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // and two flows referencing it
+
+ // add a third flow on the ONU (different gem)
+ flow3 := &openolt.Flow{
+ AccessIntfId: pon,
+ OnuId: onu,
+ PortNo: uni,
+ FlowId: 2,
+ GemportId: 1025,
+ }
+
+ olt.storeGemPortId(flow3)
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2) // we have 2 gem ports
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 2) // two flows referencing the first one
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem2]), 1) // and one flow referencing the second one
+}
+
+func Test_Olt_freeGemPortId(t *testing.T) {
+ const (
+ pon = 1
+ onu = 1
+ uni = 16
+ gem1 = 1024
+ gem2 = 1025
+ flow1 = 1
+ flow2 = 2
+ flow3 = 3
+ )
+
+ numPon := 2
+ numOnu := 2
+
+ olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+ olt.GemPortIDs[pon][onu][uni] = make(map[int32]map[uint64]bool)
+ olt.GemPortIDs[pon][onu][uni][gem1] = make(map[uint64]bool)
+ olt.GemPortIDs[pon][onu][uni][gem1][flow1] = true
+ olt.GemPortIDs[pon][onu][uni][gem1][flow2] = true
+ olt.GemPortIDs[pon][onu][uni][gem2] = make(map[uint64]bool)
+ olt.GemPortIDs[pon][onu][uni][gem2][flow3] = true
+
+ // remove one flow on the first gem, check that the gem is still allocated as there is still a flow referencing it
+ // NOTE that the flow remove only carries the flow ID, no other information
+ flowGem1 := &openolt.Flow{
+ FlowId: flow1,
+ }
+
+ olt.freeGemPortId(flowGem1)
+ // we still have two unis in the map
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 2)
+
+ // we should now have a single gem referenced on this UNI
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni][gem1]), 1, "gemport-not-removed")
+
+ // the gem should still reference flow 2
+ assert.Equal(t, olt.GemPortIDs[pon][onu][uni][gem1][flow2], true)
+ // but should not reference flow1
+ _, flow1Exists := olt.GemPortIDs[pon][onu][uni][gem1][flow1]
+ assert.Equal(t, flow1Exists, false)
+
+ // this is the only flow remaining on this gem, the gem should be removed
+ flowGem2 := &openolt.Flow{
+ FlowId: flow2,
+ }
+ olt.freeGemPortId(flowGem2)
+
+ // we should now have a single gem referenced on this UNI
+ assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 1, "gemport-not-removed")
+
+ // and it should be gem2
+ _, gem1exists := olt.GemPortIDs[pon][onu][uni][gem1]
+ assert.Equal(t, gem1exists, false)
+ _, gem2exists := olt.GemPortIDs[pon][onu][uni][gem2]
+ assert.Equal(t, gem2exists, true)
+}
+
+func Test_Olt_validateFlow(t *testing.T) {
+
+ const (
+ pon0 = 0
+ pon1 = 1
+ onu0 = 0
+ onu1 = 1
+ uniPort = 0
+ usedGemIdPon0 = 1024
+ usedGemIdPon1 = 1025
+ usedAllocIdPon0 = 1
+ usedAllocIdPon1 = 2
+ flowId = 1
+ )
+
+ numPon := 2
+ numOnu := 2
+
+ olt := createMockOlt(numPon, numOnu, []ServiceIf{})
+
+ olt.GemPortIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+ olt.GemPortIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+
+ olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0] = make(map[uint64]bool)
+ olt.GemPortIDs[pon0][onu0][uniPort][usedGemIdPon0][flowId] = true
+ olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1] = make(map[uint64]bool)
+ olt.GemPortIDs[pon1][onu0][uniPort][usedGemIdPon1][flowId] = true
+
+ olt.AllocIDs[pon0][onu0][uniPort] = make(map[int32]map[uint64]bool)
+ olt.AllocIDs[pon1][onu0][uniPort] = make(map[int32]map[uint64]bool)
+ olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0] = make(map[uint64]bool)
+ olt.AllocIDs[pon0][onu0][uniPort][usedAllocIdPon0][flowId] = true
+ olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1] = make(map[uint64]bool)
+ olt.AllocIDs[pon1][onu0][uniPort][usedAllocIdPon1][flowId] = true
+
+ // a GemPortID can be referenced across multiple flows on the same ONU
+ validGemFlow := &openolt.Flow{
+ AccessIntfId: pon0,
+ OnuId: onu0,
+ GemportId: usedGemIdPon0,
+ }
+
+ err := olt.validateFlow(validGemFlow)
+ assert.NilError(t, err)
+
+ // a GemPortID can NOT be referenced across different ONUs on the same PON
+ invalidGemFlow := &openolt.Flow{
+ AccessIntfId: pon0,
+ OnuId: onu1,
+ GemportId: usedGemIdPon0,
+ }
+ err = olt.validateFlow(invalidGemFlow)
+ assert.Error(t, err, "gem-1024-already-in-use-on-uni-0-onu-0")
+
+ // if a flow reference the same GEM on a different PON it's a valid flow
+ invalidGemDifferentPonFlow := &openolt.Flow{
+ AccessIntfId: pon1,
+ OnuId: onu1,
+ GemportId: usedGemIdPon0,
+ }
+ err = olt.validateFlow(invalidGemDifferentPonFlow)
+ assert.NilError(t, err)
+
+ // an allocId can be referenced across multiple flows on the same ONU
+ validAllocFlow := &openolt.Flow{
+ AccessIntfId: pon0,
+ OnuId: onu0,
+ AllocId: usedAllocIdPon0,
+ }
+ err = olt.validateFlow(validAllocFlow)
+ assert.NilError(t, err)
+
+ // an allocId can NOT be referenced across different ONUs on the same PON
+ invalidAllocFlow := &openolt.Flow{
+ AccessIntfId: pon0,
+ OnuId: onu1,
+ AllocId: usedAllocIdPon0,
+ }
+ err = olt.validateFlow(invalidAllocFlow)
+ assert.Error(t, err, "allocId-1-already-in-use-on-uni-0-onu-0")
+
+ // if a flow reference the same AllocId on a different PON it's a valid flow
+ invalidAllocDifferentPonFlow := &openolt.Flow{
+ AccessIntfId: pon1,
+ OnuId: onu1,
+ AllocId: usedAllocIdPon0,
+ }
+ err = olt.validateFlow(invalidAllocDifferentPonFlow)
+ assert.NilError(t, err)
+}
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index fd7eca0..ab7c35f 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -22,14 +22,13 @@
"fmt"
pb "github.com/opencord/bbsim/api/bbsim"
"github.com/opencord/bbsim/internal/bbsim/alarmsim"
- bbsim "github.com/opencord/bbsim/internal/bbsim/types"
- me "github.com/opencord/omci-lib-go/generated"
- "strconv"
-
"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
"github.com/opencord/bbsim/internal/bbsim/responders/eapol"
+ bbsim "github.com/opencord/bbsim/internal/bbsim/types"
+ me "github.com/opencord/omci-lib-go/generated"
"net"
+ "strconv"
"time"
"github.com/google/gopacket/layers"
@@ -102,11 +101,9 @@
// PortNo comes with flows and it's used when sending packetIndications,
// There is one PortNo per UNI Port, for now we're only storing the first one
// FIXME add support for multiple UNIs (each UNI has a different PortNo)
- PortNo uint32
- // deprecated (gemPort is on a Service basis)
- GemPortAdded bool
- Flows []FlowKey
- FlowIds []uint64 // keep track of the flows we currently have in the ONU
+ PortNo uint32
+ Flows []FlowKey
+ FlowIds []uint64 // keep track of the flows we currently have in the ONU
OperState *fsm.FSM
SerialNumber *openolt.SerialNumber
@@ -153,7 +150,7 @@
ActiveImageEntityId: 0, // when we start the SoftwareImage with ID 0 is active and committed
CommittedImageEntityId: 0,
}
- o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
+ o.SerialNumber = NewSN(olt.ID, pon.ID, id)
// NOTE this state machine is used to track the operational
// state as requested by VOLTHA
o.OperState = getOperStateFSM(func(e *fsm.Event) {
@@ -217,6 +214,18 @@
o.Channel <- msg
},
"enter_enabled": func(event *fsm.Event) {
+
+ if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "SerialNumber": o.Sn(),
+ }).Errorf("received-omci-with-sn-%s", common.OnuSnToString(sn))
+ return
+ } else {
+ o.PonPort.storeOnuId(o.ID, o.SerialNumber)
+ }
+
msg := bbsim.Message{
Type: bbsim.OnuIndication,
Data: bbsim.OnuIndicationMessage{
@@ -235,9 +244,11 @@
"enter_disabled": func(event *fsm.Event) {
// clean the ONU state
- o.GemPortAdded = false
o.PortNo = 0
o.Flows = []FlowKey{}
+ o.PonPort.removeOnuId(o.ID)
+ o.PonPort.removeAllocId(o.SerialNumber)
+ o.PonPort.removeGemPortBySn(o.SerialNumber)
// set the OperState to disabled
if err := o.OperState.Event("disable"); err != nil {
@@ -438,7 +449,7 @@
}).Debug("Stopped handling ONU Indication Channel")
}
-func (o Onu) NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
+func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
sn := new(openolt.SerialNumber)
@@ -477,10 +488,8 @@
}
func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
- // NOTE voltha returns an ID, but if we use that ID then it complains:
- // expected_onu_id: 1, received_onu_id: 1024, event: ONU-id-mismatch, can happen if both voltha and the olt rebooted
- // so we're using the internal ID that is 1
- // o.ID = msg.OnuID
+ // NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
+ // and stored in the Onu struct via onu.SetID
indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
IntfId: o.PonPortID,
@@ -495,11 +504,12 @@
return
}
onuLogger.WithFields(log.Fields{
- "IntfId": o.PonPortID,
- "OnuId": o.ID,
- "OperState": msg.OperState.String(),
- "AdminState": msg.OperState.String(),
- "OnuSn": o.Sn(),
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "VolthaOnuId": msg.OnuID,
+ "OperState": msg.OperState.String(),
+ "AdminState": msg.OperState.String(),
+ "OnuSn": o.Sn(),
}).Debug("Sent Indication_OnuInd")
}
@@ -707,10 +717,7 @@
case omci.GetRequestType:
responsePkt, _ = omcilib.CreateGetResponse(omciPkt, omciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId, o.CommittedImageEntityId)
case omci.SetRequestType:
- if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg); errResp == nil {
- o.MibDataSync++
- }
-
+ success := true
msgObj, _ := omcilib.ParseSetRequest(omciPkt)
switch msgObj.EntityClass {
case me.PhysicalPathTerminationPointEthernetUniClassID:
@@ -734,12 +741,101 @@
}
o.Channel <- msg
}
+ case me.TContClassID:
+ allocId := msgObj.Attributes["AllocId"].(uint16)
+
+ // if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
+ // otherwise we are adding it
+ if allocId == 255 || allocId == 65535 {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "TContId": msgObj.EntityInstance,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Trace("freeing-alloc-id-via-omci")
+ o.PonPort.removeAllocId(o.SerialNumber)
+ } else {
+ if used, sn := o.PonPort.isAllocIdAllocated(allocId); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+ success = false
+ } else {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "TContId": msgObj.EntityInstance,
+ "AllocId": allocId,
+ "SerialNumber": o.Sn(),
+ }).Trace("storing-alloc-id-via-omci")
+ o.PonPort.storeAllocId(allocId, o.SerialNumber)
+ }
+ }
+
+ }
+
+ if success {
+ if responsePkt, errResp = omcilib.CreateSetResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+ o.MibDataSync++
+ }
+ } else {
+ responsePkt, _ = omcilib.CreateSetResponse(omciPkt, omciMsg, me.AttributeFailure)
}
case omci.CreateRequestType:
- if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg); errResp == nil {
- o.MibDataSync++
+ // check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
+ var used bool
+ var sn *openolt.SerialNumber
+ msgObj, err := omcilib.ParseCreateRequest(omciPkt)
+ if err == nil {
+ if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+ if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
+ } else {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Trace("storing-gem-port-id-via-omci")
+ o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
+ }
+ }
+ }
+
+ // if the gemPort is valid then increment the MDS and return a successful response
+ // otherwise fail the request
+ // for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
+ // validation this check will need to be rewritten
+ if !used {
+ if responsePkt, errResp = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.Success); errResp == nil {
+ o.MibDataSync++
+ }
+ } else {
+ responsePkt, _ = omcilib.CreateCreateResponse(omciPkt, omciMsg, me.ProcessingError)
}
case omci.DeleteRequestType:
+ msgObj, err := omcilib.ParseDeleteRequest(omciPkt)
+ if err == nil {
+ if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
+ onuLogger.WithFields(log.Fields{
+ "IntfId": o.PonPortID,
+ "OnuId": o.ID,
+ "GemPortId": msgObj.EntityInstance,
+ "SerialNumber": o.Sn(),
+ }).Trace("freeing-gem-port-id-via-omci")
+ o.PonPort.removeGemPort(msgObj.EntityInstance)
+ }
+ }
+
if responsePkt, errResp = omcilib.CreateDeleteResponse(omciPkt, omciMsg); errResp == nil {
o.MibDataSync++
}
@@ -1037,6 +1133,7 @@
func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
onuLogger.WithFields(log.Fields{
+ "AllocId": msg.Flow.AllocId,
"Cookie": msg.Flow.Cookie,
"DstPort": msg.Flow.Classifier.DstPort,
"FlowId": msg.Flow.FlowId,
@@ -1123,7 +1220,6 @@
"OnuId": o.ID,
"SerialNumber": o.Sn(),
}).Info("Resetting GemPort")
- o.GemPortAdded = false
// check if ONU delete is performed and
// terminate the ONU's ProcessOnuMessages Go routine
@@ -1245,25 +1341,7 @@
if o.seqNumber > 290 {
// NOTE we are done with the MIB Upload (290 is the number of messages the omci-sim library will respond to)
- galEnet, _ := omcilib.CreateGalEnetRequest(o.getNextTid(false))
- sendOmciMsg(galEnet, o.PonPortID, o.ID, o.SerialNumber, "CreateGalEnetRequest", client)
- } else {
- mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
- sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
- }
- case omci.CreateResponseType:
- // NOTE Creating a GemPort,
- // BBsim actually doesn't care about the values, so we can do we want with the parameters
- // In the same way we can create a GemPort even without setting up UNIs/TConts/...
- // but we need the GemPort to trigger the state change
-
- if !o.GemPortAdded {
- // NOTE this sends a CreateRequestType and BBSim replies with a CreateResponseType
- // thus we send this request only once
- gemReq, _ := omcilib.CreateGemPortRequest(o.getNextTid(false))
- sendOmciMsg(gemReq, o.PonPortID, o.ID, o.SerialNumber, "CreateGemPortRequest", client)
- o.GemPortAdded = true
- } else {
+ // start sending the flows, we don't care about the OMCI setup in BBR, just that a lot of messages can go through
if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
onuLogger.WithFields(log.Fields{
"OnuId": o.ID,
@@ -1271,6 +1349,9 @@
"OnuSn": o.Sn(),
}).Errorf("Error while transitioning ONU State %v", err)
}
+ } else {
+ mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
+ sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
}
}
}
@@ -1290,14 +1371,16 @@
UniId: int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
FlowId: uint64(o.ID),
FlowType: "downstream",
- AllocId: int32(0),
NetworkIntfId: int32(0),
- GemportId: int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
Classifier: &classifierProto,
Action: &actionProto,
Priority: int32(100),
Cookie: uint64(o.ID),
- PortNo: uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+ PortNo: o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+ // AllocId and GemPorts need to be unique per PON
+ // for now use the ONU-ID, will need to change once we support multiple UNIs
+ AllocId: int32(o.ID),
+ GemportId: int32(o.ID),
}
if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1307,6 +1390,7 @@
"FlowId": downstreamFlow.FlowId,
"PortNo": downstreamFlow.PortNo,
"SerialNumber": common.OnuSnToString(o.SerialNumber),
+ "Err": err,
}).Fatalf("Failed to add EAPOL Flow")
}
log.WithFields(log.Fields{
@@ -1338,14 +1422,16 @@
UniId: int32(0), // FIXME do not hardcode this
FlowId: uint64(o.ID),
FlowType: "downstream",
- AllocId: int32(0),
NetworkIntfId: int32(0),
- GemportId: int32(1), // FIXME use the same value as CreateGemPortRequest PortID, do not hardcode
Classifier: &classifierProto,
Action: &actionProto,
Priority: int32(100),
Cookie: uint64(o.ID),
- PortNo: uint32(o.ID), // NOTE we are using this to map an incoming packetIndication to an ONU
+ PortNo: o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
+ // AllocId and GemPorts need to be unique per PON
+ // for now use the ONU-ID, will need to change once we support multiple UNIs
+ AllocId: int32(o.ID),
+ GemportId: int32(o.ID),
}
if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
@@ -1355,6 +1441,7 @@
"FlowId": downstreamFlow.FlowId,
"PortNo": downstreamFlow.PortNo,
"SerialNumber": common.OnuSnToString(o.SerialNumber),
+ "Err": err,
}).Fatalf("Failed to send DHCP Flow")
}
log.WithFields(log.Fields{
diff --git a/internal/bbsim/devices/onu_flow_test.go b/internal/bbsim/devices/onu_flow_test.go
index e0ad9fe..b116843 100644
--- a/internal/bbsim/devices/onu_flow_test.go
+++ b/internal/bbsim/devices/onu_flow_test.go
@@ -100,8 +100,6 @@
fsm.Callbacks{},
)
- onu.GemPortAdded = true
-
onu.FlowIds = []uint64{64}
flow := openolt.Flow{
@@ -115,7 +113,6 @@
}
onu.handleFlowRemove(msg)
assert.Equal(t, len(onu.FlowIds), 0)
- assert.Equal(t, onu.GemPortAdded, false)
}
func TestOnu_HhandleEAPOLStart(t *testing.T) {
@@ -429,8 +426,6 @@
// one we received with the EAPOL flow
onu := createMockOnu(1, 1)
- onu.GemPortAdded = false
-
onu.InternalState = fsm.NewFSM(
"enabled",
fsm.Events{
diff --git a/internal/bbsim/devices/onu_omci_test.go b/internal/bbsim/devices/onu_omci_test.go
index 4b9cc26..4d706b9 100644
--- a/internal/bbsim/devices/onu_omci_test.go
+++ b/internal/bbsim/devices/onu_omci_test.go
@@ -17,6 +17,7 @@
package devices
import (
+ "github.com/google/gopacket"
bbsim "github.com/opencord/bbsim/internal/bbsim/types"
omcilib "github.com/opencord/bbsim/internal/common/omci"
"github.com/opencord/omci-lib-go"
@@ -27,7 +28,6 @@
)
var mockAttr = me.AttributeValueMap{
- "ManagedEntityId": 12,
"PortId": 0,
"TContPointer": 0,
"Direction": 0,
@@ -46,6 +46,7 @@
},
Attributes: mockAttr,
}
+
omciPkt, err := omcilib.Serialize(omci.CreateRequestType, omciReq, 66)
if err != nil {
t.Fatal(err.Error())
@@ -119,6 +120,34 @@
}
}
+func omciBytesToMsg(t *testing.T, data []byte) (*omci.OMCI, *gopacket.Packet) {
+ packet := gopacket.NewPacket(data, omci.LayerTypeOMCI, gopacket.NoCopy)
+ if packet == nil {
+ t.Fatal("could not decode rxMsg as OMCI")
+ }
+ omciLayer := packet.Layer(omci.LayerTypeOMCI)
+ if omciLayer == nil {
+ t.Fatal("could not decode omci layer")
+ }
+ omciMsg, ok := omciLayer.(*omci.OMCI)
+ if !ok {
+ t.Fatal("could not assign omci layer")
+ }
+ return omciMsg, &packet
+}
+
+func omciToCreateResponse(t *testing.T, omciPkt *gopacket.Packet) *omci.CreateResponse {
+ msgLayer := (*omciPkt).Layer(omci.LayerTypeCreateResponse)
+ if msgLayer == nil {
+ t.Fatal("omci Msg layer could not be detected for CreateResponse - handling of MibSyncChan stopped")
+ }
+ msgObj, msgOk := msgLayer.(*omci.CreateResponse)
+ if !msgOk {
+ t.Fatal("omci Msg layer could not be assigned for CreateResponse - handling of MibSyncChan stopped")
+ }
+ return msgObj
+}
+
func Test_MibDataSyncIncrease(t *testing.T) {
onu := createMockOnu(1, 1)
@@ -174,3 +203,39 @@
onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciDeleteRequest(t)), stream)
assert.Equal(t, onu.MibDataSync, uint8(0))
}
+
+func Test_GemPortValidation(t *testing.T) {
+
+ // setup
+ onu := createMockOnu(1, 1)
+
+ stream := &mockStream{
+ Calls: make(map[int]*openolt.Indication),
+ }
+
+ // create a gem port via OMCI (gemPortId 12)
+ onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+ // the first time we created the gemPort
+ // the MDS should be incremented
+ assert.Equal(t, stream.CallCount, 1)
+ assert.Equal(t, onu.MibDataSync, uint8(1))
+
+ // and the OMCI response status should be me.Success
+ indication := stream.Calls[1].GetOmciInd()
+ _, omciPkt := omciBytesToMsg(t, indication.Pkt)
+ responseLayer := omciToCreateResponse(t, omciPkt)
+ assert.Equal(t, responseLayer.Result, me.Success)
+
+ // send a request to create the same gem port via OMCI (gemPortId 12)
+ onu.handleOmciRequest(makeOmciMessage(t, onu, makeOmciCreateRequest(t)), stream)
+
+ // this time the MDS should not be incremented
+ assert.Equal(t, stream.CallCount, 2)
+ assert.Equal(t, onu.MibDataSync, uint8(1))
+
+ // and the OMCI response status should be me.ProcessingError
+ _, omciPkt = omciBytesToMsg(t, stream.Calls[2].GetOmciInd().Pkt)
+ responseLayer = omciToCreateResponse(t, omciPkt)
+ assert.Equal(t, responseLayer.Result, me.ProcessingError)
+}
diff --git a/internal/bbsim/devices/onu_state_machine_test.go b/internal/bbsim/devices/onu_state_machine_test.go
index 6f62540..f0fd232 100644
--- a/internal/bbsim/devices/onu_state_machine_test.go
+++ b/internal/bbsim/devices/onu_state_machine_test.go
@@ -37,7 +37,6 @@
assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
onu.PortNo = 16
- onu.GemPortAdded = true
onu.Flows = []FlowKey{
{ID: 1, Direction: "upstream"},
{ID: 2, Direction: "downstream"},
@@ -46,7 +45,6 @@
_ = onu.InternalState.Event(OnuTxDisable)
assert.Equal(t, onu.InternalState.Current(), OnuStateDisabled)
- assert.Equal(t, onu.GemPortAdded, false)
assert.Equal(t, onu.PortNo, uint32(0))
assert.Equal(t, len(onu.Flows), 0)
}
@@ -95,7 +93,6 @@
assert.Equal(t, onu.InternalState.Current(), OnuStateEnabled)
// succeed
- onu.GemPortAdded = true
_ = onu.InternalState.Event("start_auth")
assert.Equal(t, onu.InternalState.Current(), "auth_started")
}
@@ -104,8 +101,6 @@
t.Skip("Needs to be moved in the Service struct")
onu := createTestOnu()
- onu.GemPortAdded = true
-
onu.InternalState.SetState("auth_started")
assert.Equal(t, onu.InternalState.Current(), "auth_started")
@@ -168,8 +163,6 @@
onu.InternalState.SetState("eap_response_success_received")
assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
- onu.GemPortAdded = false
-
err := onu.InternalState.Event("start_dhcp")
if err == nil {
t.Fail()
@@ -181,7 +174,6 @@
func Test_Onu_StateMachine_dhcp_start(t *testing.T) {
t.Skip("Needs to be moved in the Service struct")
onu := createTestOnu()
- onu.GemPortAdded = true
onu.InternalState.SetState("eap_response_success_received")
assert.Equal(t, onu.InternalState.Current(), "eap_response_success_received")
@@ -195,8 +187,6 @@
t.Skip("Needs to be moved in the Service struct")
onu := createTestOnu()
- onu.GemPortAdded = true
-
onu.InternalState.SetState("dhcp_started")
assert.Equal(t, onu.InternalState.Current(), "dhcp_started")
diff --git a/internal/bbsim/devices/onu_test_helpers.go b/internal/bbsim/devices/onu_test_helpers.go
index f0d5dab..2f18199 100644
--- a/internal/bbsim/devices/onu_test_helpers.go
+++ b/internal/bbsim/devices/onu_test_helpers.go
@@ -137,15 +137,16 @@
// this method creates a fake ONU used in the tests
func createMockOnu(id uint32, ponPortId uint32) *Onu {
o := Onu{
- ID: id,
- PonPortID: ponPortId,
- PortNo: 0,
- GemPortAdded: true,
+ ID: id,
+ PonPortID: ponPortId,
+ PortNo: 0,
PonPort: &PonPort{
- Olt: &OltDevice{},
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+ Olt: &OltDevice{},
},
}
- o.SerialNumber = o.NewSN(0, ponPortId, o.ID)
+ o.SerialNumber = NewSN(0, ponPortId, o.ID)
o.Channel = make(chan types.Message, 10)
return &o
}
@@ -155,11 +156,10 @@
olt := OltDevice{
ID: 0,
}
- pon := PonPort{
- ID: 1,
- Olt: &olt,
- }
- onu := CreateONU(&olt, &pon, 1, time.Duration(1*time.Millisecond), true)
+
+ pon := CreatePonPort(&olt, 1)
+
+ onu := CreateONU(&olt, pon, 1, time.Duration(1*time.Millisecond), true)
// NOTE we need this in order to create the OnuChannel
_ = onu.InternalState.Event(OnuTxInitialize)
onu.DiscoveryRetryDelay = 100 * time.Millisecond
diff --git a/internal/bbsim/devices/pon.go b/internal/bbsim/devices/pon.go
index 1e0483d..3d74eb1 100644
--- a/internal/bbsim/devices/pon.go
+++ b/internal/bbsim/devices/pon.go
@@ -19,6 +19,7 @@
import (
"bytes"
"fmt"
+ "sync"
"github.com/looplab/fsm"
"github.com/opencord/voltha-protos/v4/go/openolt"
@@ -37,17 +38,30 @@
// PON Attributes
OperState *fsm.FSM
Type string
+
+ // Allocated resources
+ // Some resources (eg: OnuId, AllocId and GemPorts) have to be unique per PON port
+ // we are keeping a list so that we can throw an error in cases we receive duplicates
+ AllocatedGemPorts map[uint16]*openolt.SerialNumber
+ allocatedGemPortsLock sync.RWMutex
+ AllocatedOnuIds map[uint32]*openolt.SerialNumber
+ allocatedOnuIdsLock sync.RWMutex
+ AllocatedAllocIds map[uint16]*openolt.SerialNumber
+ allocatedAllocIdsLock sync.RWMutex
}
// CreatePonPort creates pon port object
func CreatePonPort(olt *OltDevice, id uint32) *PonPort {
ponPort := PonPort{
- NumOnu: olt.NumOnuPerPon,
- ID: id,
- Type: "pon",
- Olt: olt,
- Onus: []*Onu{},
+ NumOnu: olt.NumOnuPerPon,
+ ID: id,
+ Type: "pon",
+ Olt: olt,
+ Onus: []*Onu{},
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ AllocatedOnuIds: make(map[uint32]*openolt.SerialNumber),
+ AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
}
ponPort.InternalState = fsm.NewFSM(
@@ -170,7 +184,7 @@
return &ponPort
}
-func (p PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
+func (p *PonPort) GetOnuBySn(sn *openolt.SerialNumber) (*Onu, error) {
for _, onu := range p.Onus {
if bytes.Equal(onu.SerialNumber.VendorSpecific, sn.VendorSpecific) {
return onu, nil
@@ -179,7 +193,7 @@
return nil, fmt.Errorf("Cannot find Onu with serial number %d in PonPort %d", sn, p.ID)
}
-func (p PonPort) GetOnuById(id uint32) (*Onu, error) {
+func (p *PonPort) GetOnuById(id uint32) (*Onu, error) {
for _, onu := range p.Onus {
if onu.ID == id {
return onu, nil
@@ -189,7 +203,7 @@
}
// GetNumOfActiveOnus returns number of active ONUs for PON port
-func (p PonPort) GetNumOfActiveOnus() uint32 {
+func (p *PonPort) GetNumOfActiveOnus() uint32 {
var count uint32 = 0
for _, onu := range p.Onus {
if onu.InternalState.Current() == OnuStateInitialized || onu.InternalState.Current() == OnuStateCreated || onu.InternalState.Current() == OnuStateDisabled {
@@ -199,3 +213,111 @@
}
return count
}
+
+// storeOnuId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeOnuId(onuId uint32, onuSn *openolt.SerialNumber) {
+ p.allocatedOnuIdsLock.Lock()
+ defer p.allocatedOnuIdsLock.Unlock()
+ p.AllocatedOnuIds[onuId] = onuSn
+}
+
+// removeOnuId removes the OnuId from the allocated resources
+func (p *PonPort) removeOnuId(onuId uint32) {
+ p.allocatedOnuIdsLock.Lock()
+ defer p.allocatedOnuIdsLock.Unlock()
+ delete(p.AllocatedOnuIds, onuId)
+}
+
+func (p *PonPort) removeAllOnuIds() {
+ p.allocatedOnuIdsLock.Lock()
+ defer p.allocatedOnuIdsLock.Unlock()
+ p.AllocatedOnuIds = make(map[uint32]*openolt.SerialNumber)
+}
+
+// isOnuIdAllocated returns whether this OnuId is already in use on this PON
+func (p *PonPort) isOnuIdAllocated(onuId uint32) (bool, *openolt.SerialNumber) {
+ p.allocatedOnuIdsLock.RLock()
+ defer p.allocatedOnuIdsLock.RUnlock()
+
+ if _, ok := p.AllocatedOnuIds[onuId]; ok {
+ return true, p.AllocatedOnuIds[onuId]
+ }
+ return false, nil
+}
+
+// storeGemPort adds the gemPortId to the gemports already allocated to this PON port
+func (p *PonPort) storeGemPort(gemPortId uint16, onuSn *openolt.SerialNumber) {
+ p.allocatedGemPortsLock.Lock()
+ defer p.allocatedGemPortsLock.Unlock()
+ p.AllocatedGemPorts[gemPortId] = onuSn
+}
+
+// removeGemPort removes the gemPortId from the allocated resources
+func (p *PonPort) removeGemPort(gemPortId uint16) {
+ p.allocatedGemPortsLock.Lock()
+ defer p.allocatedGemPortsLock.Unlock()
+ delete(p.AllocatedGemPorts, gemPortId)
+}
+
+func (p *PonPort) removeGemPortBySn(onuSn *openolt.SerialNumber) {
+ p.allocatedGemPortsLock.Lock()
+ defer p.allocatedGemPortsLock.Unlock()
+ for gemPort, sn := range p.AllocatedGemPorts {
+ if sn == onuSn {
+ delete(p.AllocatedGemPorts, gemPort)
+ }
+ }
+}
+
+func (p *PonPort) removeAllGemPorts() {
+ p.allocatedGemPortsLock.Lock()
+ defer p.allocatedGemPortsLock.Unlock()
+ p.AllocatedGemPorts = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isGemPortAllocated returns whether this gemPort is already in use on this PON
+func (p *PonPort) isGemPortAllocated(gemPortId uint16) (bool, *openolt.SerialNumber) {
+ p.allocatedGemPortsLock.RLock()
+ defer p.allocatedGemPortsLock.RUnlock()
+
+ if _, ok := p.AllocatedGemPorts[gemPortId]; ok {
+ return true, p.AllocatedGemPorts[gemPortId]
+ }
+ return false, nil
+}
+
+// storeAllocId adds the Id to the ONU Ids already allocated to this PON port
+func (p *PonPort) storeAllocId(allocId uint16, onuSn *openolt.SerialNumber) {
+ p.allocatedAllocIdsLock.Lock()
+ defer p.allocatedAllocIdsLock.Unlock()
+ p.AllocatedAllocIds[allocId] = onuSn
+}
+
+// removeAllocId removes the AllocId from the allocated resources
+// this is done via SN as the AllocId is not remove but set to a default value
+func (p *PonPort) removeAllocId(onuSn *openolt.SerialNumber) {
+ p.allocatedAllocIdsLock.Lock()
+ defer p.allocatedAllocIdsLock.Unlock()
+ for allocId, sn := range p.AllocatedAllocIds {
+ if sn == onuSn {
+ delete(p.AllocatedAllocIds, allocId)
+ }
+ }
+}
+
+func (p *PonPort) removeAllAllocIds() {
+ p.allocatedAllocIdsLock.Lock()
+ defer p.allocatedAllocIdsLock.Unlock()
+ p.AllocatedAllocIds = make(map[uint16]*openolt.SerialNumber)
+}
+
+// isAllocIdAllocated returns whether this AllocId is already in use on this PON
+func (p *PonPort) isAllocIdAllocated(allocId uint16) (bool, *openolt.SerialNumber) {
+ p.allocatedAllocIdsLock.RLock()
+ defer p.allocatedAllocIdsLock.RUnlock()
+
+ if _, ok := p.AllocatedAllocIds[allocId]; ok {
+ return true, p.AllocatedAllocIds[allocId]
+ }
+ return false, nil
+}
diff --git a/internal/bbsim/devices/pon_test.go b/internal/bbsim/devices/pon_test.go
new file mode 100644
index 0000000..45e1861
--- /dev/null
+++ b/internal/bbsim/devices/pon_test.go
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package devices
+
+import (
+ "github.com/opencord/voltha-protos/v4/go/openolt"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "testing"
+)
+
+var sn1 = NewSN(0, 0, 1)
+var sn2 = NewSN(0, 0, 2)
+var sn3 = NewSN(0, 0, 3)
+
+// NOTE that we are using a benchmark test to actually test concurrency
+func Benchmark_storeGemPort(b *testing.B) {
+ pon := PonPort{
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(3)
+
+ // concurrently add multiple ports
+ go func(wg *sync.WaitGroup) { pon.storeGemPort(1, sn1); wg.Done() }(&wg)
+ go func(wg *sync.WaitGroup) { pon.storeGemPort(2, sn2); wg.Done() }(&wg)
+ go func(wg *sync.WaitGroup) { pon.storeGemPort(3, sn3); wg.Done() }(&wg)
+
+ wg.Wait()
+
+ assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+}
+
+func Benchmark_removeGemPort(b *testing.B) {
+ pon := PonPort{
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ pon.storeGemPort(1, sn1)
+ pon.storeGemPort(2, sn2)
+ pon.storeGemPort(3, sn3)
+
+ assert.Equal(b, len(pon.AllocatedGemPorts), 3)
+
+ wg := sync.WaitGroup{}
+ wg.Add(3)
+
+ // concurrently add multiple ports
+ go func(wg *sync.WaitGroup) { pon.removeGemPort(1); wg.Done() }(&wg)
+ go func(wg *sync.WaitGroup) { pon.removeGemPort(2); wg.Done() }(&wg)
+ go func(wg *sync.WaitGroup) { pon.removeGemPort(3); wg.Done() }(&wg)
+
+ wg.Wait()
+
+ assert.Equal(b, len(pon.AllocatedGemPorts), 0)
+}
+
+func Test_removeGemPort(t *testing.T) {
+ pon := &PonPort{
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ pon.storeGemPort(1, sn1)
+ pon.storeGemPort(2, sn2)
+ assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+ // remove a non exiting gemPort
+ pon.removeGemPort(3)
+ assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+ // remove an existing gemPort
+ pon.removeGemPort(1)
+ assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+}
+
+func Test_removeGemPortBySn(t *testing.T) {
+ pon := &PonPort{
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ pon.storeGemPort(1, sn1)
+ pon.storeGemPort(2, sn2)
+ assert.Equal(t, len(pon.AllocatedGemPorts), 2)
+
+ // remove a non exiting gemPort
+ pon.removeGemPortBySn(sn1)
+ assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+ assert.Nil(t, pon.AllocatedGemPorts[1])
+ assert.Equal(t, pon.AllocatedGemPorts[2], sn2)
+}
+
+func Test_isGemPortAllocated(t *testing.T) {
+ pon := &PonPort{
+ AllocatedGemPorts: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ pon.storeGemPort(1, sn1)
+
+ assert.Equal(t, len(pon.AllocatedGemPorts), 1)
+
+ free, sn := pon.isGemPortAllocated(1)
+
+ assert.Equal(t, free, true)
+ assert.Equal(t, sn, sn1)
+
+ used, sn_ := pon.isGemPortAllocated(2)
+
+ assert.Equal(t, used, false)
+ assert.Nil(t, sn_)
+}
+
+// the allocId is never removed, is always set to either 255 or 65535
+func Test_removeAllocId(t *testing.T) {
+
+ const (
+ allocId1 = 1024
+ allocId2 = 1025
+ )
+
+ pon := &PonPort{
+ AllocatedAllocIds: make(map[uint16]*openolt.SerialNumber),
+ }
+
+ pon.AllocatedAllocIds[allocId1] = sn1
+ pon.AllocatedAllocIds[allocId2] = sn2
+
+ assert.Equal(t, len(pon.AllocatedAllocIds), 2)
+
+ pon.removeAllocId(sn1)
+
+ assert.Equal(t, len(pon.AllocatedAllocIds), 1)
+ assert.Nil(t, pon.AllocatedAllocIds[allocId1])
+ assert.Equal(t, pon.AllocatedAllocIds[allocId2], sn2)
+
+}