VOL-3521 : scale: intermittent issue - voltha complains that different meter is in use for subscriber
- Process incoming flows on a per ONU basis using channels per ONU

Change-Id: I0f375d90d786a0135bb51ce18036e5297dc7297b
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 9464830..d8caa38 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -927,37 +927,37 @@
 	actionInfo3 := make(map[string]interface{})
 	classifierInfo4 := make(map[string]interface{})
 	actionInfo4 := make(map[string]interface{})
-	flowState, _ := fu.MkFlowStat(fa)
-	flowState2, _ := fu.MkFlowStat(fa2)
-	flowState3, _ := fu.MkFlowStat(fa3)
-	flowState4, _ := fu.MkFlowStat(fa4)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flowState)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flowState2)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flowState3)
-	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flowState4)
+	flow, _ := fu.MkFlowStat(fa)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo2, flow2)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo3, flow3)
+	formulateClassifierInfoFromFlow(ctx, classifierInfo4, flow4)
 
-	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flowState)
+	err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flow)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flowState2)
+	err = formulateActionInfoFromFlow(ctx, actionInfo2, classifierInfo2, flow2)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flowState3)
+	err = formulateActionInfoFromFlow(ctx, actionInfo3, classifierInfo3, flow3)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
 		return
 	}
 
-	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flowState4)
+	err = formulateActionInfoFromFlow(ctx, actionInfo4, classifierInfo4, flow4)
 	if err != nil {
 		// Error logging is already done in the called function
 		// So just return in case of error
@@ -1035,7 +1035,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo,
 				actionInfo:     actionInfo,
-				flow:           flowState,
+				flow:           flow,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1054,7 +1054,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo2,
 				actionInfo:     actionInfo2,
-				flow:           flowState2,
+				flow:           flow2,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1073,7 +1073,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo3,
 				actionInfo:     actionInfo3,
-				flow:           flowState3,
+				flow:           flow3,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1092,7 +1092,7 @@
 				args:           nil,
 				classifierInfo: classifierInfo4,
 				actionInfo:     actionInfo4,
-				flow:           flowState4,
+				flow:           flow4,
 				gemPort:        1,
 				intfID:         1,
 				onuID:          1,
@@ -1168,3 +1168,301 @@
 		return
 	}
 }
+
+func TestOpenOltFlowMgr_TestRouteFlowToOnuChannel(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/core", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-openolt-adapter/pkg/mocks", log.DebugLevel)
+	kw := make(map[string]uint64)
+	kw["table_id"] = 1
+	kw["meter_id"] = 1
+	kw["write_metadata"] = 0x4000000000 // Tech-Profile-ID 64
+
+	flowMetadata1 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 1,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      32000,
+					BurstSize: 30,
+				},
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      64000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	flowMetadata2 := voltha.FlowMetadata{Meters: []*voltha.OfpMeterConfig{
+		{
+			Flags:   5,
+			MeterId: 2,
+			Bands: []*voltha.OfpMeterBandHeader{
+				{
+					Type:      voltha.OfpMeterBandType_OFPMBT_DROP,
+					Rate:      16000,
+					BurstSize: 30,
+				},
+			},
+		},
+	}}
+
+	// Downstream LLDP Trap from NNI0 flow
+	fa0 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.EthType(35020),
+		},
+		Actions: []*ofp.OfpAction{
+			fu.Output(4294967293),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON0
+	fa1 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream EAPOL - ONU1 UNI0 PON0
+	fa2 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257),
+			fu.TunnelId(16),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Upstream HSIA - ONU1 UNI0 PON0
+	fa3 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870912),
+			fu.Metadata_ofp(1),
+			//fu.EthType(0x8100),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(1048576),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+
+	// Downstream HSIA - ONU1 UNI0 PON0
+	fa4 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(1048576),
+			fu.Metadata_ofp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.VlanPcp(1),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+			fu.Output(536870912),
+			fu.PopVlan(),
+		},
+		KV: kw,
+	}
+
+	// Upstream flow DHCP flow - ONU1 UNI0 PON15
+	fa5 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.IpProto(17), // dhcp
+			fu.VlanPcp(0),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	// Upstream EAPOL - ONU1 UNI0 PON15
+	fa6 := &fu.FlowArgs{
+		MatchFields: []*ofp.OfpOxmOfbField{
+			fu.InPort(536870927),
+			fu.Metadata_ofp(1),
+			fu.EthType(0x888E),
+			fu.VlanPcp(1),
+			fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 259),
+			fu.TunnelId(61456),
+		},
+		Actions: []*ofp.OfpAction{
+			//fu.SetField(fu.Metadata_ofp(uint64(ofp.OfpInstructionType_OFPIT_WRITE_METADATA | 2))),
+			fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 257)),
+			fu.Output(2147483645),
+			fu.PushVlan(0x8100),
+		},
+		KV: kw,
+	}
+	flow0, _ := fu.MkFlowStat(fa0)
+	flow1, _ := fu.MkFlowStat(fa1)
+	flow2, _ := fu.MkFlowStat(fa2)
+	flow3, _ := fu.MkFlowStat(fa3)
+	flow4, _ := fu.MkFlowStat(fa4)
+
+	flow5, _ := fu.MkFlowStat(fa5)
+	flow6, _ := fu.MkFlowStat(fa6)
+
+	type args struct {
+		ctx          context.Context
+		flow         *ofp.OfpFlowStats
+		addFlow      bool
+		flowMetadata *voltha.FlowMetadata
+	}
+	tests := []struct {
+		name        string
+		args        args
+		wantErr     bool
+		returnedErr error
+	}{
+		{
+			name: "RouteFlowToOnuChannel-0",
+			args: args{
+				ctx:          ctx,
+				flow:         flow0,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-1",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-2",
+			args: args{
+				ctx:          ctx,
+				flow:         flow2,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-3",
+			args: args{
+				ctx:          ctx,
+				flow:         flow3,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-4",
+			args: args{
+				ctx:          ctx,
+				flow:         flow4,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-5",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      false,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-6",
+			args: args{
+				ctx:          ctx,
+				flow:         flow1,
+				addFlow:      true,
+				flowMetadata: &flowMetadata2,
+			},
+			wantErr: true,
+		},
+		{
+			name: "RouteFlowToOnuChannel-7",
+			args: args{
+				ctx:          ctx,
+				flow:         flow5,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+		{
+			name: "RouteFlowToOnuChannel-8",
+			args: args{
+				ctx:          ctx,
+				flow:         flow6,
+				addFlow:      true,
+				flowMetadata: &flowMetadata1,
+			},
+			wantErr: false,
+		},
+	}
+
+	var wg sync.WaitGroup
+	defer wg.Wait() // wait for all go routines to complete
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			wg.Add(1) // one per go routine
+			go func() {
+				defer wg.Done()
+				tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+				if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
+					t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
+				}
+			}()
+		})
+	}
+}