[VOL-2737]: Replicate flow on all pbits instead of just gem ports
            to align with latest openolt adapter implementation.

Change-Id: I13cccf59fba83b41d6ac41aa3e0c5b457edb8abe
diff --git a/VERSION b/VERSION
index 7dea76e..6d7de6e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.0.1
+1.0.2
diff --git a/core/att_workflow.go b/core/att_workflow.go
index a96bbe1..947a38d 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -18,6 +18,7 @@
 
 import (
 	"errors"
+	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -210,15 +211,21 @@
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
-	for _, gemID := range gemPortIDs {
-		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
-			ponresourcemanager.FLOW_ID, 1); err != nil {
-			return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-		} else {
-			if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID); err != nil {
-				subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, flowID)
-				return err
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+						return err
+					}
+				}
 			}
 		}
 	}
@@ -235,15 +242,21 @@
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
-	for _, gemID := range gemPortIDs {
-		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
-			ponresourcemanager.FLOW_ID, 1); err != nil {
-			return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-		} else {
-			if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID); err != nil {
-				subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, flowID)
-				return err
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+						return err
+					}
+				}
 			}
 		}
 	}
@@ -260,15 +273,21 @@
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
-	for _, gemID := range gemPortIDs {
-		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
-			ponresourcemanager.FLOW_ID, 1); err != nil {
-			return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-		} else {
-			if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID); err != nil {
-				subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, flowID)
-				return err
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+						return err
+					}
+				}
 			}
 		}
 	}
@@ -290,29 +309,35 @@
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
-	for _, gemID := range gemPortIDs {
-		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
-			ponresourcemanager.FLOW_ID, 1); err != nil {
-			return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-		} else {
-			var errUs, errDs error
-			if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID); errUs != nil {
-				log.Errorw("failed to install US HSIA flow",
-					log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-			}
-			if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID); errDs != nil {
-				log.Errorw("failed to install US HSIA flow",
-					log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
-			}
-			if errUs != nil && errDs != nil {
-				subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, flowID)
-			}
-			if errUs != nil || errDs != nil {
-				if errUs != nil {
-					return errUs
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					var errUs, errDs error
+					if errUs = AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+						log.Errorw("failed to install US HSIA flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errDs = AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+						log.Errorw("failed to install US HSIA flow",
+							log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+					}
+					if errUs != nil && errDs != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+					}
+					if errUs != nil || errDs != nil {
+						if errUs != nil {
+							return errUs
+						}
+						return errDs
+					}
 				}
-				return errDs
 			}
 		}
 	}
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index 7aa5746..b2ad300 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -18,6 +18,7 @@
 
 import (
 	"errors"
+	"strings"
 
 	"github.com/opencord/openolt-scale-tester/config"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -150,18 +151,24 @@
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
-	for _, gemID := range gemPortIDs {
-		if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
-			ponresourcemanager.FLOW_ID, 1); err != nil {
-			return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
-		} else {
-			if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID); err != nil {
-				subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
-					ponresourcemanager.FLOW_ID, flowID)
-				return err
-			}
-			if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID); err != nil {
-				return err
+	for idx, gemID := range gemPortIDs {
+		pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+		for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+			if pbitSet == '1' {
+				pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+				if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+					ponresourcemanager.FLOW_ID, 1); err != nil {
+					return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+				} else {
+					if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
+						subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+							ponresourcemanager.FLOW_ID, flowID)
+						return err
+					}
+					if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); err != nil {
+						return err
+					}
+				}
 			}
 		}
 	}
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index 656b8ec..4376760 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -167,12 +167,14 @@
 }
 
 func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
-	allocID uint32, gemID uint32) error {
+	allocID uint32, gemID uint32, pcp uint32) error {
 	log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
 		"direction": direction, "flowID": flowID})
 	var err error
 
 	flowClassifier, actionInfo := FormatClassfierAction(flowType, direction, subs)
+	// Update the o_pbit for which this flow has to be classified
+	flowClassifier.OPbits = pcp
 	flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
 		UniId: int32(subs.UniID), FlowId: flowID,
 		FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),