VOL-3347 GEM ports kept for packet-outs should be in  pon-onu-uni-vlan-pbit  basis instead of per pon-onu-uni basis.

Change-Id: I7e9ca29295d28d97908a99ba8c34c4c9b52046c4
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 9a00608..aa6e59f 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1585,7 +1585,7 @@
 			logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
 		logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
-		if err = dh.resourceMgr.DelGemPortPktIn(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+		if err = dh.resourceMgr.DelGemPortPktInOfAllServices(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
 			logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
 		}
 	}
@@ -1796,7 +1796,7 @@
 		onuID := OnuIDFromPortNum(uint32(egressPortNo))
 		uniID := UniIDFromPortNum(uint32(egressPortNo))
 
-		gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo))
+		gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
 		if err != nil {
 			// In this case the openolt agent will receive the gemPortID as 0.
 			// The agent tries to retrieve the gemPortID in this case.
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 520ee7a..f2fec23 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -3045,7 +3045,7 @@
 			logicalPortNum = MkUniPortNum(ctx, packetIn.IntfId, onuID, uniID)
 		}
 		// Store the gem port through which the packet_in came. Use the same gem port for packet_out
-		f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
+		f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId, packetIn.Pkt)
 	} else if packetIn.IntfType == "nni" {
 		logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
 	}
@@ -3059,13 +3059,17 @@
 }
 
 //GetPacketOutGemPortID returns gemPortId
-func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
 	var gemPortID uint32
-	var err error
+
+	ctag, priority, err := getCTagFromPacket(ctx, packet)
+	if err != nil {
+		return 0, err
+	}
 
 	f.onuGemInfoLock.RLock()
 	defer f.onuGemInfoLock.RUnlock()
-	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum, VlanID: ctag, Priority: priority}
 	var ok bool
 	gemPortID, ok = f.packetInGemPort[pktInkey]
 	if ok {
@@ -3077,7 +3081,7 @@
 		return gemPortID, nil
 	}
 	//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
-	gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
+	gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, pktInkey)
 	if err == nil {
 		if gemPortID != 0 {
 			f.packetInGemPort[pktInkey] = gemPortID
@@ -3818,12 +3822,18 @@
 }
 
 // UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
-func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32, pkt []byte) {
+	cTag, priority, err := getCTagFromPacket(ctx, pkt)
+	if err != nil {
+		logger.Errorw(ctx, "unable-to-update-gem-port-for-packet-in",
+			log.Fields{"intfID": intfID, "onuID": onuID, "logicalPort": logicalPort, "gemPort": gemPort, "err": err})
+		return
+	}
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort, VlanID: cTag, Priority: priority}
 
 	f.onuGemInfoLock.Lock()
 	defer f.onuGemInfoLock.Unlock()
 
-	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
 	lookupGemPort, ok := f.packetInGemPort[pktInkey]
 	if ok {
 		if lookupGemPort == gemPort {
@@ -3845,6 +3855,33 @@
 
 }
 
+//getCTagFromPacket retrieves and returns c-tag and priority value from a packet.
+func getCTagFromPacket(ctx context.Context, packet []byte) (uint16, uint8, error) {
+	if packet == nil || len(packet) < 18 {
+		log.Error("unable-get-c-tag-from-the-packet--invalid-packet-length ")
+		return 0, 0, errors.New("invalid packet length")
+	}
+	outerEthType := (uint16(packet[12]) << 8) | uint16(packet[13])
+	innerEthType := (uint16(packet[16]) << 8) | uint16(packet[17])
+
+	var index int8
+	if outerEthType == 0x8100 {
+		if innerEthType == 0x8100 {
+			// q-in-q 802.1ad or 802.1q double tagged packet.
+			// get the inner vlanId
+			index = 18
+		} else {
+			index = 14
+		}
+		priority := (packet[index] >> 5) & 0x7
+		//13 bits composes vlanId value
+		vlan := ((uint16(packet[index]) << 8) & 0x0fff) | uint16(packet[index+1])
+		return vlan, priority, nil
+	}
+	logger.Debugf(ctx, "No vlanId found in the packet. Returning zero as c-tag")
+	return 0, 0, nil
+}
+
 // AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
 func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
 
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 15e0244..c13bd5e 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -19,6 +19,7 @@
 
 import (
 	"context"
+	"encoding/hex"
 	"fmt"
 	"reflect"
 	"strconv"
@@ -773,10 +774,33 @@
 func TestOpenOltFlowMgr_GetPacketOutGemPortID(t *testing.T) {
 	// flwMgr := newMockFlowmgr()
 
+	//untagged packet in hex string
+	untaggedStr := "01005e000002000000000001080046c00020000040000102fa140a000001e00000029404000017000705e10000fa"
+	untagged, err := hex.DecodeString(untaggedStr)
+	if err != nil {
+		t.Error("Unable to parse hex string", err)
+		panic(err)
+	}
+	//single-tagged packet in hex string. vlanID.pbit: 540.0
+	singleTaggedStr := "01005e0000010025ba48172481000225080046c0002000004000010257deab140023e0000001940400001164ee9b0000000000000000000000000000"
+	singleTagged, err := hex.DecodeString(singleTaggedStr)
+	if err != nil {
+		t.Error("Unable to parse hex string", err)
+		panic(err)
+	}
+	//double-tagged packet in hex string. vlanID.pbit: 210.0-48.7
+	doubleTaggedStr := "01005e000016deadbeefba11810002108100e030080046000028000000000102c5b87f000001e0000016940400002200f8030000000104000000e10000fa"
+	doubleTagged, err := hex.DecodeString(doubleTaggedStr)
+	if err != nil {
+		t.Error("Unable to parse hex string", err)
+		panic(err)
+	}
+
 	type args struct {
 		intfID  uint32
 		onuID   uint32
 		portNum uint32
+		packet  []byte
 	}
 	tests := []struct {
 		name    string
@@ -785,24 +809,34 @@
 		wantErr bool
 	}{
 		// TODO: Add test cases.
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 1}, 1, false},
-		{"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 2}, 2, false},
-		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2}, 0, true},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: untagged}, 3, false},
+		{"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 4, packet: singleTagged}, 4, false},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2, packet: doubleTagged}, 2, false},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 10, portNum: 10, packet: untagged}, 2, true},
+		{"GetPacketOutGemPortID", args{intfID: 1, onuID: 1, portNum: 3, packet: []byte{}}, 3, true},
 	}
+
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum)
-			if (err != nil) != tt.wantErr {
-				t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() error = %v, wantErr %v", err, tt.wantErr)
-				return
+			got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum, tt.args.packet)
+			if tt.wantErr {
+				if err == nil {
+					//error expected but got value
+					t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, wantErr %v", got, tt.wantErr)
+				}
+			} else {
+				if err != nil {
+					//error is not expected but got error
+					t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() error = %v, wantErr %v", err, tt.wantErr)
+					return
+				}
+				if got != tt.want {
+					t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, want %v", got, tt.want)
+				}
 			}
-			if got != tt.want {
-				t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() = %v, want %v", got, tt.want)
-			}
-
 		})
 	}
 }
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 42c3d21..58414f0 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -46,9 +46,11 @@
 	MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
 	//NnniIntfID - nniintfids
 	NnniIntfID = "nniintfids"
+	//OnuPacketINPathPrefix to be used as prefix for keys to be used to store packet-in gem ports
+	OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d"
 	// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
-	//format: onu_packetin/<intfid>,<onuid>,<logicalport>
-	OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
+	//format: onu_packetin/<intfid>,<onuid>,<logicalport>,<vlanId>,<priority>
+	OnuPacketINPath = OnuPacketINPathPrefix + ",%d,%d}"
 	//FlowIDsForGem flowids_per_gem/<intfid>
 	FlowIDsForGem = "flowids_per_gem/{%d}"
 	//McastQueuesForIntf multicast queues for pon interfaces
@@ -89,6 +91,8 @@
 	IntfID      uint32
 	OnuID       uint32
 	LogicalPort uint32
+	VlanID      uint16
+	Priority    uint8
 }
 
 // GroupInfo holds group information
@@ -1161,10 +1165,10 @@
 	return
 }
 
-//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno
+//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
 func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
 
-	path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort)
+	path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
 	Value, err := json.Marshal(gemPort)
 	if err != nil {
 		logger.Error(ctx, "Failed to marshal data")
@@ -1179,13 +1183,14 @@
 	return
 }
 
-// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno
-func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) (uint32, error) {
+// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno, vlan id, priority bit
+func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
 
 	var Val []byte
 	var gemPort uint32
 
-	path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
+	path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
+		packetInInfoKey.VlanID, packetInInfoKey.Priority)
 
 	value, err := RsrcMgr.KVStore.Get(ctx, path)
 	if err != nil {
@@ -1209,13 +1214,25 @@
 	return gemPort, nil
 }
 
-// DelGemPortPktIn deletes the gemport from the pkt in path
-func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+//DelGemPortPktInOfAllServices deletes the gemports from  pkt in path for all services
+func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktInOfAllServices(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
 
-	path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
-	if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
-		logger.Errorf(ctx, "Falied to remove resource %s", path)
-		return err
+	//retrieve stored gem port from the store first.
+	Path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+	logger.Debugf(ctx, "getting flows from the path:%s", Path)
+	Value, err := RsrcMgr.KVStore.List(ctx, Path)
+	if err != nil {
+		logger.Errorf(ctx, "failed to get flows from kvstore for path %s", Path)
+		return errors.New("failed to get flows from kvstore for path " + Path)
+	}
+	logger.Debugf(ctx, "%d flows retrieved from the path:%s", len(Value), Path)
+
+	//remove them one by one
+	for key := range Value {
+		if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
+			logger.Errorf(ctx, "Falied to remove resource %s", key)
+			return err
+		}
 	}
 	return nil
 }
diff --git a/pkg/mocks/mockKVClient.go b/pkg/mocks/mockKVClient.go
index fe6310b..78cbef6 100644
--- a/pkg/mocks/mockKVClient.go
+++ b/pkg/mocks/mockKVClient.go
@@ -52,6 +52,8 @@
 	FlowGroup = "flow_groups"
 	//FlowGroupCached flow_groups_cached/<flow_group_id>
 	FlowGroupCached = "flow_groups_cached"
+	//OnuPacketIn to extract gem port from packet-in
+	OnuPacketIn = "onu_packetin"
 )
 
 // MockKVClient mocks the AdapterProxy interface.
@@ -178,6 +180,10 @@
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
 
+		if strings.Contains(key, OnuPacketIn) {
+			return getPacketInGemPort(key)
+		}
+
 		maps := make(map[string]*kvstore.KVPair)
 		maps[key] = &kvstore.KVPair{Key: key}
 		return maps[key], nil
@@ -185,6 +191,44 @@
 	return nil, errors.New("key didn't find")
 }
 
+//getPacketInGemPort returns the GEM port associated with the given key
+func getPacketInGemPort(key string) (*kvstore.KVPair, error) {
+	//parse interface, onu, uni, vlan, priority values
+	arr := getParamsFromPacketInKey(key)
+
+	if len(arr) < 5 {
+		return nil, errors.New("key didn't find")
+	}
+	if arr[0] == "1" && arr[1] == "1" && arr[2] == "3" && arr[3] == "0" && arr[4] == "0" {
+		str, _ := json.Marshal(3)
+		return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+	}
+	if arr[0] == "2" && arr[1] == "2" && arr[2] == "4" && arr[3] == "549" && arr[4] == "0" {
+		str, _ := json.Marshal(4)
+		return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+	}
+	if arr[0] == "1" && arr[1] == "2" && arr[2] == "2" && arr[3] == "48" && arr[4] == "7" {
+		str, _ := json.Marshal(2)
+		return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+	}
+	return nil, errors.New("key didn't find")
+}
+
+//getParamsFromPacketInKey parse packetIn key that is in the format of "onu_packetin/{1,1,1,1,2}"
+func getParamsFromPacketInKey(key string) []string {
+	//return intfID, onuID, uniID, vlanID, priority
+	firstIndex := strings.Index(key, "{")
+	lastIndex := strings.Index(key, "}")
+	if firstIndex == -1 && lastIndex == -1 {
+		return []string{}
+	}
+	arr := strings.Split(key[firstIndex+1:lastIndex], ",")
+	if len(arr) < 5 {
+		return []string{}
+	}
+	return arr
+}
+
 // Put mock function implementation for KVClient
 func (kvclient *MockKVClient) Put(ctx context.Context, key string, value interface{}) error {
 	if key != "" {