[VOL-3736]: Bug - Restarting multicast iperf server (at RG) will not restore the data stream at the RG.

- Use the right flow id when clearing mcast flow
- Fix UT

Change-Id: Ib392f0ec8fb62b7124774108fb507438ad00f14d
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index d9ffa7a..abaa97b 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -2051,7 +2051,7 @@
 	}
 	if !flowInfo.Flow.ReplicateFlow {
 		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
-			logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
+			logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 				"flow-id":        flow.Id,
 				"stored-flow":    flowInfo.Flow,
 				"device-id":      f.deviceHandler.device.Id,
@@ -2069,7 +2069,7 @@
 		logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
 		for _, gem := range gems {
 			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
-				logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
+				logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 					"flow-id":        flow.Id,
 					"stored-flow":    flowInfo.Flow,
 					"device-id":      f.deviceHandler.device.Id,
@@ -3297,7 +3297,6 @@
 
 	var onuID = int32(NoneOnuID)
 	var uniID = int32(NoneUniID)
-	var flowID uint64
 	if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
 		return olterrors.NewErrPersistence("remove", "flow", flow.Id,
 			log.Fields{
@@ -3322,7 +3321,7 @@
 		return err
 	}
 	// Remove flow from KV store
-	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
+	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id)
 }
 
 // reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index b3f806d..35c5d23 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -292,7 +292,7 @@
 	//multicast flow
 	multicastFa := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(65536),
+			fu.InPort(1048576),
 			fu.VlanVid(660),             //vlan
 			fu.Metadata_ofp(uint64(66)), //inner vlan
 			fu.EthType(0x800),           //ipv4
@@ -346,7 +346,7 @@
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
 			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
-			fu.Output(65536),
+			fu.Output(1048576),
 			fu.PushVlan(0x8100),
 		},
 		KV: kw,
@@ -355,7 +355,7 @@
 	// Downstream flow
 	fa3 := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(65536),
+			fu.InPort(1048576),
 			fu.Metadata_ofp(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257),
 		},
@@ -476,7 +476,7 @@
 			fu.Metadata_ofp(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 			fu.VlanPcp(1000),
-			fu.UdpDst(65535),
+			fu.UdpDst(1048576),
 			fu.UdpSrc(536870912),
 			fu.Ipv4Dst(65535),
 			fu.Ipv4Src(536870912),
@@ -511,7 +511,7 @@
 	//multicast flow
 	fa11 := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(65536),
+			fu.InPort(1048576),
 			fu.VlanVid(660),             //vlan
 			fu.Metadata_ofp(uint64(66)), //inner vlan
 			fu.EthType(0x800),           //ipv4
@@ -862,7 +862,7 @@
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
 			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
-			fu.Output(65536),
+			fu.Output(1048576),
 			fu.PushVlan(0x8100),
 		},
 		KV: kw,
@@ -880,7 +880,7 @@
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
 			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
-			fu.Output(65536),
+			fu.Output(1048576),
 			fu.PushVlan(0x8100),
 		},
 		KV: kw,
@@ -897,7 +897,7 @@
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
 			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
-			fu.Output(65536),
+			fu.Output(1048576),
 			fu.PushVlan(0x8100),
 		},
 		KV: kw,
@@ -905,7 +905,7 @@
 
 	fa4 := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
-			fu.InPort(65535),
+			fu.InPort(1048576),
 			fu.Metadata_ofp(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 			fu.VlanPcp(1),
@@ -1175,10 +1175,15 @@
 	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/core", log.DebugLevel)
 	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager", log.DebugLevel)
 	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/pkg/mocks", log.DebugLevel)
-	kw := make(map[string]uint64)
-	kw["table_id"] = 1
-	kw["meter_id"] = 1
-	kw["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+	kwTable1Meter1 := make(map[string]uint64)
+	kwTable1Meter1["table_id"] = 1
+	kwTable1Meter1["meter_id"] = 1
+	kwTable1Meter1["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+	kwTable0Meter1 := make(map[string]uint64)
+	kwTable0Meter1["table_id"] = 0
+	kwTable0Meter1["meter_id"] = 1
+	kwTable0Meter1["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
 
 	flowMetadata1 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
 		{
@@ -1204,20 +1209,6 @@
 		},
 	}}
 
-	flowMetadata2 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
-		{
-			Flags:   5,
-			MeterId: 2,
-			Bands: []*voltha.OfpMeterBandHeader{
-				{
-					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
-					Rate:      16000,
-					BurstSize: 30,
-				},
-			},
-		},
-	}}
-
 	// Downstream LLDP Trap from NNI0 flow
 	fa0 := &fu.FlowArgs{
 		MatchFields: []*ofp.OfpOxmOfbField{
@@ -1227,7 +1218,7 @@
 		Actions: []*ofp.OfpAction{
 			fu.Output(4294967293),
 		},
-		KV: kw,
+		KV: make(map[string]uint64),
 	}
 
 	// Upstream flow DHCP flow - ONU1 UNI0 PON0
@@ -1246,7 +1237,7 @@
 			fu.Output(2147483645),
 			fu.PushVlan(0x8100),
 		},
-		KV: kw,
+		KV: kwTable1Meter1,
 	}
 
 	// Upstream EAPOL - ONU1 UNI0 PON0
@@ -1265,7 +1256,7 @@
 			fu.Output(2147483645),
 			fu.PushVlan(0x8100),
 		},
-		KV: kw,
+		KV: kwTable1Meter1,
 	}
 
 	// Upstream HSIA - ONU1 UNI0 PON0
@@ -1275,6 +1266,7 @@
 			fu.Metadata_ofp(1),
 			//fu.EthType(0x8100),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(16),
 		},
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
@@ -1282,7 +1274,7 @@
 			fu.Output(1048576),
 			fu.PushVlan(0x8100),
 		},
-		KV: kw,
+		KV: kwTable1Meter1,
 	}
 
 	// Downstream HSIA - ONU1 UNI0 PON0
@@ -1292,6 +1284,7 @@
 			fu.Metadata_ofp(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
 			fu.VlanPcp(1),
+			fu.TunnelId(16),
 		},
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
@@ -1299,7 +1292,7 @@
 			fu.Output(536870912),
 			fu.PopVlan(),
 		},
-		KV: kw,
+		KV: kwTable0Meter1,
 	}
 
 	// Upstream flow DHCP flow - ONU1 UNI0 PON15
@@ -1310,7 +1303,7 @@
 			fu.IpProto(17), // dhcp
 			fu.VlanPcp(0),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
-			fu.TunnelId(61456),
+			fu.TunnelId(16),
 		},
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
@@ -1318,7 +1311,7 @@
 			fu.Output(2147483645),
 			fu.PushVlan(0x8100),
 		},
-		KV: kw,
+		KV: kwTable1Meter1,
 	}
 	// Upstream EAPOL - ONU1 UNI0 PON15
 	fa6 := &fu.FlowArgs{
@@ -1328,7 +1321,7 @@
 			fu.EthType(0x888E),
 			fu.VlanPcp(1),
 			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259),
-			fu.TunnelId(61456),
+			fu.TunnelId(16),
 		},
 		Actions: []*ofp.OfpAction{
 			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
@@ -1336,7 +1329,7 @@
 			fu.Output(2147483645),
 			fu.PushVlan(0x8100),
 		},
-		KV: kw,
+		KV: kwTable1Meter1,
 	}
 	flow0, _ := fu.MkFlowStat(fa0)
 	flow1, _ := fu.MkFlowStat(fa1)
@@ -1425,7 +1418,7 @@
 				ctx:          ctx,
 				flow:         flow1,
 				addFlow:      true,
-				flowMetadata: &flowMetadata2,
+				flowMetadata: &flowMetadata1,
 			},
 			wantErr: true,
 		},
diff --git a/pkg/mocks/mockTechprofile.go b/pkg/mocks/mockTechprofile.go
index 1bdbbe4..a016b2c 100644
--- a/pkg/mocks/mockTechprofile.go
+++ b/pkg/mocks/mockTechprofile.go
@@ -43,9 +43,54 @@
 
 // GetTPInstanceFromKVStore to mock techprofile GetTPInstanceFromKVStore method
 func (m MockTechProfile) GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (interface{}, error) {
-	logger.Debug(ctx, "Warning Warning Warning: GetTPInstanceFromKVStore")
-	return nil, nil
-
+	logger.Debug(ctx, "GetTPInstanceFromKVStore")
+	if techProfiletblID == 64 {
+		return &tp.TechProfile{
+			Name:                 "mock-tech-profile",
+			SubscriberIdentifier: "257",
+			ProfileType:          "mock",
+			Version:              0,
+			NumGemPorts:          1,
+			UsScheduler: tp.IScheduler{
+				AllocID:      1,
+				Direction:    "upstream",
+				AdditionalBw: "None",
+				Priority:     0,
+				Weight:       0,
+				QSchedPolicy: "",
+			},
+			DsScheduler: tp.IScheduler{
+				AllocID:      1,
+				Direction:    "downstream",
+				AdditionalBw: "None",
+				Priority:     0,
+				Weight:       0,
+				QSchedPolicy: "",
+			},
+			UpstreamGemPortAttributeList: []tp.IGemPortAttribute{{
+				GemportID: 1,
+				PbitMap:   "0b11111111",
+			},
+			},
+			DownstreamGemPortAttributeList: []tp.IGemPortAttribute{{
+				GemportID: 1,
+				PbitMap:   "0b11111111",
+			},
+			},
+		}, nil
+	} else if techProfiletblID == 65 {
+		return &tp.EponProfile{
+			Name:                         "mock-epon-profile",
+			SubscriberIdentifier:         "257",
+			ProfileType:                  "mock",
+			Version:                      0,
+			NumGemPorts:                  2,
+			UpstreamQueueAttributeList:   nil,
+			DownstreamQueueAttributeList: nil,
+		}, nil
+	} else {
+		return nil, nil
+	}
 }
 
 // CreateTechProfInstance to mock techprofile CreateTechProfInstance method