VOL-3347 GEM ports kept for packet-outs should be in pon-onu-uni-vlan-pbit basis instead of per pon-onu-uni basis.
Change-Id: I7e9ca29295d28d97908a99ba8c34c4c9b52046c4
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 9a00608..aa6e59f 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1585,7 +1585,7 @@
logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DelGemPortPktIn(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+ if err = dh.resourceMgr.DelGemPortPktInOfAllServices(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
}
@@ -1796,7 +1796,7 @@
onuID := OnuIDFromPortNum(uint32(egressPortNo))
uniID := UniIDFromPortNum(uint32(egressPortNo))
- gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo))
+ gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
if err != nil {
// In this case the openolt agent will receive the gemPortID as 0.
// The agent tries to retrieve the gemPortID in this case.
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 520ee7a..f2fec23 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -3045,7 +3045,7 @@
logicalPortNum = MkUniPortNum(ctx, packetIn.IntfId, onuID, uniID)
}
// Store the gem port through which the packet_in came. Use the same gem port for packet_out
- f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
+ f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId, packetIn.Pkt)
} else if packetIn.IntfType == "nni" {
logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
}
@@ -3059,13 +3059,17 @@
}
//GetPacketOutGemPortID returns gemPortId
-func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
var gemPortID uint32
- var err error
+
+ ctag, priority, err := getCTagFromPacket(ctx, packet)
+ if err != nil {
+ return 0, err
+ }
f.onuGemInfoLock.RLock()
defer f.onuGemInfoLock.RUnlock()
- pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
+ pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum, VlanID: ctag, Priority: priority}
var ok bool
gemPortID, ok = f.packetInGemPort[pktInkey]
if ok {
@@ -3077,7 +3081,7 @@
return gemPortID, nil
}
//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
- gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
+ gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, pktInkey)
if err == nil {
if gemPortID != 0 {
f.packetInGemPort[pktInkey] = gemPortID
@@ -3818,12 +3822,18 @@
}
// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
-func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32, pkt []byte) {
+ cTag, priority, err := getCTagFromPacket(ctx, pkt)
+ if err != nil {
+ logger.Errorw(ctx, "unable-to-update-gem-port-for-packet-in",
+ log.Fields{"intfID": intfID, "onuID": onuID, "logicalPort": logicalPort, "gemPort": gemPort, "err": err})
+ return
+ }
+ pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort, VlanID: cTag, Priority: priority}
f.onuGemInfoLock.Lock()
defer f.onuGemInfoLock.Unlock()
- pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
lookupGemPort, ok := f.packetInGemPort[pktInkey]
if ok {
if lookupGemPort == gemPort {
@@ -3845,6 +3855,33 @@
}
+//getCTagFromPacket retrieves and returns c-tag and priority value from a packet.
+func getCTagFromPacket(ctx context.Context, packet []byte) (uint16, uint8, error) {
+ if packet == nil || len(packet) < 18 {
+ log.Error("unable-get-c-tag-from-the-packet--invalid-packet-length ")
+ return 0, 0, errors.New("invalid packet length")
+ }
+ outerEthType := (uint16(packet[12]) << 8) | uint16(packet[13])
+ innerEthType := (uint16(packet[16]) << 8) | uint16(packet[17])
+
+ var index int8
+ if outerEthType == 0x8100 {
+ if innerEthType == 0x8100 {
+ // q-in-q 802.1ad or 802.1q double tagged packet.
+ // get the inner vlanId
+ index = 18
+ } else {
+ index = 14
+ }
+ priority := (packet[index] >> 5) & 0x7
+ //13 bits composes vlanId value
+ vlan := ((uint16(packet[index]) << 8) & 0x0fff) | uint16(packet[index+1])
+ return vlan, priority, nil
+ }
+ logger.Debugf(ctx, "No vlanId found in the packet. Returning zero as c-tag")
+ return 0, 0, nil
+}
+
// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 15e0244..c13bd5e 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -19,6 +19,7 @@
import (
"context"
+ "encoding/hex"
"fmt"
"reflect"
"strconv"
@@ -773,10 +774,33 @@
func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
// flwMgr := newMockFlowmgr()
+ //untagged packet in hex string
+ untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
+ untagged, err := hex.DecodeString(untaggedStr)
+ if err != nil {
+ t.Error("Unable to parse hex string", err)
+ panic(err)
+ }
+ //single-tagged packet in hex string. vlanID.pbit: 540.0
+ singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+ singleTagged, err := hex.DecodeString(singleTaggedStr)
+ if err != nil {
+ t.Error("Unable to parse hex string", err)
+ panic(err)
+ }
+ //double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
+ doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+ doubleTagged, err := hex.DecodeString(doubleTaggedStr)
+ if err != nil {
+ t.Error("Unable to parse hex string", err)
+ panic(err)
+ }
+
type args struct {
intfID uint32
onuID uint32
portNum uint32
+ packet []byte
}
tests := []struct {
name string
@@ -785,24 +809,34 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 1}, 1, false},
- {"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 2}, 2, false},
- {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2}, 0, true},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
+ {"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+ {"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
}
+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum)
- if (err != nil) != tt.wantErr {
- t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() error = %v, wantErr %v", err, tt.wantErr)
- return
+ got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+ if tt.wantErr {
+ if err == nil {
+ //error expected but got value
+ t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, wantErr %v", got, tt.wantErr)
+ }
+ } else {
+ if err != nil {
+ //error is not expected but got error
+ t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, want %v", got, tt.want)
+ }
}
- if got != tt.want {
- t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, want %v", got, tt.want)
- }
-
})
}
}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 42c3d21..58414f0 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -46,9 +46,11 @@
MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
//NnniIntfID - nniintfids
NnniIntfID = "nniintfids"
+ //OnuPacketINPathPrefix to be used as prefix for keys to be used to store packet-in gem ports
+ OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d"
// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
- //format: onu_packetin/<intfid>,<onuid>,<logicalport>
- OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
+ //format: onu_packetin/<intfid>,<onuid>,<logicalport>,<vlanId>,<priority>
+ OnuPacketINPath = OnuPacketINPathPrefix + ",%d,%d}"
//FlowIDsForGem flowids_per_gem/<intfid>
FlowIDsForGem = "flowids_per_gem/{%d}"
//McastQueuesForIntf multicast queues for pon interfaces
@@ -89,6 +91,8 @@
IntfID uint32
OnuID uint32
LogicalPort uint32
+ VlanID uint16
+ Priority uint8
}
// GroupInfo holds group information
@@ -1161,10 +1165,10 @@
return
}
-//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno
+//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
- path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort)
+ path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
Value, err := json.Marshal(gemPort)
if err != nil {
logger.Error(ctx, "Failed to marshal data")
@@ -1179,13 +1183,14 @@
return
}
-// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno
-func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) (uint32, error) {
+// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno, vlan id, priority bit
+func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
var Val []byte
var gemPort uint32
- path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
+ path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
+ packetInInfoKey.VlanID, packetInInfoKey.Priority)
value, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
@@ -1209,13 +1214,25 @@
return gemPort, nil
}
-// DelGemPortPktIn deletes the gemport from the pkt in path
-func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+//DelGemPortPktInOfAllServices deletes the gemports from pkt in path for all services
+func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktInOfAllServices(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
- path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Falied to remove resource %s", path)
- return err
+ //retrieve stored gem port from the store first.
+ Path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+ logger.Debugf(ctx, "getting flows from the path:%s", Path)
+ Value, err := RsrcMgr.KVStore.List(ctx, Path)
+ if err != nil {
+ logger.Errorf(ctx, "failed to get flows from kvstore for path %s", Path)
+ return errors.New("failed to get flows from kvstore for path " + Path)
+ }
+ logger.Debugf(ctx, "%d flows retrieved from the path:%s", len(Value), Path)
+
+ //remove them one by one
+ for key := range Value {
+ if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
+ logger.Errorf(ctx, "Falied to remove resource %s", key)
+ return err
+ }
}
return nil
}
diff --git a/pkg/mocks/mockKVClient.go b/pkg/mocks/mockKVClient.go
index fe6310b..78cbef6 100644
--- a/pkg/mocks/mockKVClient.go
+++ b/pkg/mocks/mockKVClient.go
@@ -52,6 +52,8 @@
FlowGroup = "flow_groups"
//FlowGroupCached flow_groups_cached/<flow_group_id>
FlowGroupCached = "flow_groups_cached"
+ //OnuPacketIn to extract gem port from packet-in
+ OnuPacketIn = "onu_packetin"
)
// MockKVClient mocks the AdapterProxy interface.
@@ -178,6 +180,10 @@
return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
}
+ if strings.Contains(key, OnuPacketIn) {
+ return getPacketInGemPort(key)
+ }
+
maps := make(map[string]*kvstore.KVPair)
maps[key] = &kvstore.KVPair{Key: key}
return maps[key], nil
@@ -185,6 +191,44 @@
return nil, errors.New("key didn't find")
}
+//getPacketInGemPort returns the GEM port associated with the given key
+func getPacketInGemPort(key string) (*kvstore.KVPair, error) {
+ //parse interface, onu, uni, vlan, priority values
+ arr := getParamsFromPacketInKey(key)
+
+ if len(arr) < 5 {
+ return nil, errors.New("key didn't find")
+ }
+ if arr[0] == "1" && arr[1] == "1" && arr[2] == "3" && arr[3] == "0" && arr[4] == "0" {
+ str, _ := json.Marshal(3)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+ if arr[0] == "2" && arr[1] == "2" && arr[2] == "4" && arr[3] == "549" && arr[4] == "0" {
+ str, _ := json.Marshal(4)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+ if arr[0] == "1" && arr[1] == "2" && arr[2] == "2" && arr[3] == "48" && arr[4] == "7" {
+ str, _ := json.Marshal(2)
+ return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+ }
+ return nil, errors.New("key didn't find")
+}
+
+//getParamsFromPacketInKey parse packetIn key that is in the format of "onu_packetin/{1,1,1,1,2}"
+func getParamsFromPacketInKey(key string) []string {
+ //return intfID, onuID, uniID, vlanID, priority
+ firstIndex := strings.Index(key, "{")
+ lastIndex := strings.Index(key, "}")
+ if firstIndex == -1 && lastIndex == -1 {
+ return []string{}
+ }
+ arr := strings.Split(key[firstIndex+1:lastIndex], ",")
+ if len(arr) < 5 {
+ return []string{}
+ }
+ return arr
+}
+
// Put mock function implementation for KVClient
func (kvclient *MockKVClient) Put(ctx context.Context, key string, value interface{}) error {
if key != "" {