VOL-3718 Packets coming from Open OLT Adapter to the VOLTHA Core always have zero in their UNI port field. OLT Adapter should set the true UNI port info to the control packets before passing them to the VOLTHA Core in the scope of packet-in workflow.
As a solution, pon-and-gem-port-to-uni-port relations are learned during subscriber provisioning operation and kept in the kv store and a cache (a map named gemToUniMap) in a lazy manner. In the packet-in workflow, UNI port info is resolved by looking up the gemToUniMap and put into the packet before passing it to the upper layer.
Change-Id: I6b60f7c1964452b6789f6a9905ab04000fe2d33c
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 44c6788..d9ffa7a 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -148,6 +148,11 @@
pbit1 = '1'
)
+type gemPortKey struct {
+ intfID uint32
+ gemPort uint32
+}
+
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -210,6 +215,11 @@
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
incomingFlows []chan flowControlBlock
+
+ //this map keeps uni port info by gem and pon port. This relation shall be used for packet-out operations
+ gemToUniMap map[gemPortKey][]uint32
+ //We need to have a global lock on the gemToUniLock map
+ gemToUniLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -253,12 +263,38 @@
//Load flowID list per gem map per interface from the kvstore.
flowMgr.loadFlowIDlistForGem(ctx, idx)
//load interface to multicast queue map from kv store
+
+ flowMgr.gemToUniMap = make(map[gemPortKey][]uint32)
+ flowMgr.gemToUniLock = sync.RWMutex{}
+
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
+// toGemToUniMap adds uni info consisting of onu and uni ID to the map and associates it with a gem port
+func (f *OpenOltFlowMgr) toGemToUniMap(ctx context.Context, gemPK gemPortKey, onuID uint32, uniID uint32) {
+ f.gemToUniLock.Lock()
+ f.gemToUniMap[gemPK] = []uint32{onuID, uniID}
+ f.gemToUniLock.Unlock()
+}
+
+// fromGemToUniMap returns onu and uni ID associated with the given key
+func (f *OpenOltFlowMgr) fromGemToUniMap(key gemPortKey) ([]uint32, bool) {
+ f.gemToUniLock.RLock()
+ defer f.gemToUniLock.RUnlock()
+ val, ok := f.gemToUniMap[key]
+ return val, ok
+}
+
+// removeFromGemToUniMap removes an entry associated with the given key from gemToUniMap
+func (f *OpenOltFlowMgr) removeFromGemToUniMap(key gemPortKey) {
+ f.gemToUniLock.Lock()
+ defer f.gemToUniLock.Unlock()
+ delete(f.gemToUniMap, key)
+}
+
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
@@ -841,6 +877,9 @@
}
if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
+ } else {
+ //add to gem to uni cache
+ f.addGemPortUniAssociationsToCache(ctx, intfID, onuID, uniID, gemPortIDs)
}
logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
for _, gemPort := range gemPortIDs {
@@ -848,6 +887,19 @@
}
}
+//addGemPortUniAssociationsToCache
+func (f *OpenOltFlowMgr) addGemPortUniAssociationsToCache(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortIDs []uint32) {
+ for _, gemPortID := range gemPortIDs {
+ key := gemPortKey{
+ intfID: intfID,
+ gemPort: gemPortID,
+ }
+ f.toGemToUniMap(ctx, key, onuID, uniID)
+ }
+ logger.Debugw(ctx, "gem-to-uni-info-added-to-cache", log.Fields{"device-id": f.deviceHandler.device.Id, "intfID": intfID,
+ "gemPortIDs": gemPortIDs, "onuID": onuID, "uniID": uniID})
+}
+
func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
var tpCount int
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
@@ -1849,7 +1901,13 @@
// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+ // also clear gem to uni cache
+ f.removeFromGemToUniMap(gemPortKey{
+ intfID: Intf,
+ gemPort: uint32(gemPortID),
+ })
f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+
f.onuIdsLock.Lock() // TODO: What is this lock?
//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
@@ -2430,55 +2488,24 @@
"onu-gem": f.onuGemInfo})
}
-//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
-func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
-
- f.onuGemInfoLock.RLock()
- defer f.onuGemInfoLock.RUnlock()
-
- logger.Debugw(ctx, "getting-onu-id-from-gem-port-and-pon-port",
- log.Fields{
- "device-id": f.deviceHandler.device.Id,
- "onu-geminfo": f.onuGemInfo,
- "intf-id": intfID,
- "gemport-id": gemPortID})
- // get onuid from the onugem info cache
- onugem := f.onuGemInfo
-
- for _, onu := range onugem {
- for _, gem := range onu.GemPorts {
- if gem == gemPortID {
- return onu.OnuID, nil
- }
- }
- }
- logger.Errorw(ctx, "onu-id-from-gem-port-not-found", log.Fields{
- "gem-port-id": gemPortID,
- "interface-id": intfID,
- "all-gems-on-port": onugem,
- })
- return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
- "interface-id": intfID,
- "gem-port-id": gemPortID},
- nil)
-}
-
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
- var onuID uint32
+ var onuID, uniID uint32
var err error
if packetIn.IntfType == "pon" {
// packet indication does not have serial number , so sending as nil
- if onuID, err = f.getOnuIDfromGemPortMap(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
+ // get onu and uni ids associated with the given pon and gem ports
+ if onuID, uniID, err = f.GetUniPortByPonPortGemPort(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
// Called method is returning error with all data populated; just return the same
return logicalPortNum, err
}
+ logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d")
+
if packetIn.PortNo != 0 {
logicalPortNum = packetIn.PortNo
} else {
- uniID := uint32(0) // FIXME - multi-uni support
logicalPortNum = MkUniPortNum(ctx, packetIn.IntfId, onuID, uniID)
}
// Store the gem port through which the packet_in came. Use the same gem port for packet_out
@@ -2498,6 +2525,40 @@
return logicalPortNum, nil
}
+//GetUniPortByPonPortGemPort return onu and uni IDs associated with given pon and gem ports
+func (f *OpenOltFlowMgr) GetUniPortByPonPortGemPort(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, uint32, error) {
+ key := gemPortKey{
+ intfID: intfID,
+ gemPort: gemPortID,
+ }
+ uniPortInfo, ok := f.fromGemToUniMap(key) //try to get from the cache first
+ if ok {
+ if len(uniPortInfo) > 1 {
+ //return onu ID and uni port from the cache
+ logger.Debugw(ctx, "found-uni-port-by-pon-and-gem-ports",
+ log.Fields{
+ "intfID": intfID,
+ "gemPortID": gemPortID,
+ "onuID, uniID": uniPortInfo})
+ return uniPortInfo[0], uniPortInfo[1], nil
+ }
+ }
+ //If uni port is not found in cache try to get it from kv store. if it is found in kv store, update the cache and return.
+ onuID, uniID, err := f.resourceMgr.GetUniPortByPonPortGemPortFromKVStore(ctx, intfID, gemPortID)
+ if err == nil {
+ f.toGemToUniMap(ctx, key, onuID, uniID)
+ logger.Infow(ctx, "found-uni-port-by-pon-and-gem-port-from-kv-store-and-updating-cache-with-uni-port",
+ log.Fields{
+ "gemPortKey": key,
+ "onuID": onuID,
+ "uniID": uniID})
+ return onuID, uniID, nil
+ }
+ return uint32(0), uint32(0), olterrors.NewErrNotFound("uni-id",
+ log.Fields{"interfaceID": intfID, "gemPortID": gemPortID},
+ errors.New("no uni port found"))
+}
+
//GetPacketOutGemPortID returns gemPortId
func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
var gemPortID uint32
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index a7c671d..b3f806d 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -722,11 +722,11 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 255, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
// Negative Test cases.
{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 257, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 612ee8c..5f88472 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -730,6 +730,30 @@
}
}
+//GetUniPortByPonPortGemPortFromKVStore retrieves onu and uni ID associated with the pon and gem ports.
+func (RsrcMgr *OpenOltResourceMgr) GetUniPortByPonPortGemPortFromKVStore(ctx context.Context, PonPort uint32, GemPort uint32) (uint32, uint32, error) {
+ IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
+ logger.Debugf(ctx, "Getting ONU and UNI IDs from the path %s", IntfGEMPortPath)
+ var Data []uint32
+ Value, err := RsrcMgr.KVStore.Get(ctx, IntfGEMPortPath)
+ if err == nil {
+ if Value != nil {
+ Val, _ := ponrmgr.ToByte(Value.Value)
+ if err = json.Unmarshal(Val, &Data); err != nil {
+ logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
+ return 0, 0, errors.New("failed to unmarshal the data retrieved")
+ }
+ }
+ } else {
+ logger.Errorf(ctx, "Failed to get data from kvstore for %s", IntfGEMPortPath, err)
+ return 0, 0, errors.New("could not get data")
+ }
+ if len(Data) < 2 {
+ return 0, 0, errors.New("invalid data format")
+ }
+ return Data[0], Data[1], nil
+}
+
// UpdateGEMportsPonportToOnuMapOnKVStore updates onu and uni id associated with the gem port to the kv store
// This stored information is used when packet_indication is received and we need to derive the ONU Id for which
// the packet arrived based on the pon_intf and gemport available in the packet_indication
@@ -738,7 +762,7 @@
/* Update onu and uni id associated with the gem port to the kv store. */
var IntfGEMPortPath string
- Data := fmt.Sprintf("%d %d", onuID, uniID)
+ Data := []uint32{onuID, uniID}
for _, GEM := range gemPorts {
IntfGEMPortPath = fmt.Sprintf("%d,%d", PonPort, GEM)
Val, err := json.Marshal(Data)
diff --git a/pkg/mocks/mockKVClient.go b/pkg/mocks/mockKVClient.go
index 662e236..290f191 100644
--- a/pkg/mocks/mockKVClient.go
+++ b/pkg/mocks/mockKVClient.go
@@ -183,6 +183,20 @@
str, _ := json.Marshal(data)
return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
}
+ //Interface, GEM port path
+ if strings.Contains(key, "0,255") {
+ //return onuID, uniID associated with the given interface and GEM port
+ data := []uint32{1, 0}
+ str, _ := json.Marshal(data)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+ //Interface, GEM port path
+ if strings.Contains(key, "0,257") {
+ //return onuID, uniID associated with the given interface and GEM port
+ data := []uint32{1, 0}
+ str, _ := json.Marshal(data)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
maps := make(map[string]*kvstore.KVPair)
maps[key] = &kvstore.KVPair{Key: key}