VOL-1381 and VOL-1091 Add feature to collect PMmetrics of OLT, this will collect Tx and Rx and posts to kafka. This commit is also related to changes of commit 72b25abd4e804596413558ef10f098c2262dd65d voltha-go
Updated review comments

Updated with code optimization
Updated with dep files
Updated fix for sanity test
Updated with gomod changes

Change-Id: I899e855812a6eda812c64664a9154321cdb16876
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7f2b9cf..dbf301f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -35,6 +35,7 @@
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
 	"github.com/opencord/voltha-protos/go/common"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -76,6 +77,9 @@
 	discOnus      map[string]bool
 	onus          map[string]*OnuDevice
 	nniIntfID     int
+	portStats     *OpenOltStatisticsMgr
+	metrics       *pmmetrics.PmMetrics
+	stopCollector chan bool
 }
 
 //OnuDevice represents ONU related info
@@ -89,6 +93,17 @@
 	uniPorts      map[uint32]struct{}
 }
 
+var pmNames = []string{
+	"rx_bytes",
+	"rx_packets",
+	"rx_mcast_packets",
+	"rx_bcast_packets",
+	"tx_bytes",
+	"tx_packets",
+	"tx_mcast_packets",
+	"tx_bcast_packets",
+}
+
 //NewOnuDevice creates a new Onu Device
 func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string) *OnuDevice {
 	var device OnuDevice
@@ -122,7 +137,8 @@
 	// when the first IntfOperInd with status as "up" is received for
 	// any one of the available NNI port on the OLT device.
 	dh.nniIntfID = -1
-
+	dh.stopCollector = make(chan bool, 2)
+	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -352,15 +368,14 @@
 		go dh.handlePacketIndication(pktInd)
 	case *oop.Indication_PortStats:
 		portStats := indication.GetPortStats()
-		log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+		go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
 	case *oop.Indication_FlowStats:
 		flowStats := indication.GetFlowStats()
 		log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
 	case *oop.Indication_AlarmInd:
 		alarmInd := indication.GetAlarmInd()
 		log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
-		dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
-
+		go dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
 	}
 }
 
@@ -515,6 +530,8 @@
 	/* TODO: Instantiate Alarm , stats , BW managers */
 	/* Instantiating Event Manager to handle Alarms and KPIs */
 	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+	// Stats config for new device
+	dh.portStats = NewOpenOltStatsMgr(dh)
 
 	// Start reading indications
 	go dh.readIndications()
@@ -566,11 +583,51 @@
 	return deviceInfo, nil
 }
 
+func startCollector(dh *DeviceHandler) {
+	// Initial delay for OLT initialization
+	time.Sleep(1 * time.Minute)
+	log.Debugf("Starting-Collector")
+	context := make(map[string]string)
+	for {
+		select {
+		case <-dh.stopCollector:
+			log.Debugw("Stopping-Collector-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+			return
+		default:
+			freq := dh.metrics.ToPmConfigs().DefaultFreq
+			time.Sleep(time.Duration(freq) * time.Second)
+			context["oltid"] = dh.deviceID
+			context["devicetype"] = dh.deviceType
+			// NNI Stats
+			cmnni := dh.portStats.collectNNIMetrics(uint32(0))
+			log.Debugf("Collect-NNI-Metrics %v", cmnni)
+			go dh.portStats.publishMetrics("NNIStats", cmnni, uint32(0), context, dh.deviceID)
+			log.Debugf("Publish-NNI-Metrics")
+			// PON Stats
+			NumPonPORTS := dh.resourceMgr.DevInfo.GetPonPorts()
+			for i := uint32(0); i < NumPonPORTS; i++ {
+				cmpon := dh.portStats.collectPONMetrics(i)
+				log.Debugf("Collect-PON-Metrics %v", cmpon)
+
+				go dh.portStats.publishMetrics("PONStats", cmpon, i, context, dh.deviceID)
+				log.Debugf("Publish-PON-Metrics")
+			}
+		}
+	}
+}
+
 //AdoptDevice adopts the OLT device
 func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
 	dh.transitionMap = NewTransitionMap(dh)
 	log.Infow("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
 	dh.transitionMap.Handle(DeviceInit)
+
+	// Now, set the initial PM configuration for that device
+	if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+		log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+	}
+
+	go startCollector(dh)
 }
 
 //GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -893,7 +950,7 @@
 			"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
 		if err != nil {
 			log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
-				"From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id, "Error": err})
+				"From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
 			return
 		}
 	} else {
@@ -1199,6 +1256,8 @@
 		delete(dh.onus, onu)
 	}
 	log.Debug("Removed-device-from-Resource-manager-KV-store")
+	// Stop the Stats collector
+	dh.stopCollector <- true
 	//Reset the state
 	if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
 		log.Errorw("Failed-to-reboot-olt ", log.Fields{"err": err})
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 6e94215..924430d 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -22,6 +22,9 @@
 	"net"
 	"reflect"
 	"testing"
+	"time"
+
+	"github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
 
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -43,6 +46,7 @@
 func newMockCoreProxy() *mocks.MockCoreProxy {
 	mcp := mocks.MockCoreProxy{}
 	mcp.Devices = make(map[string]*voltha.Device)
+	var pm []*voltha.PmConfig
 	mcp.Devices["olt"] = &voltha.Device{
 
 		Id:           "olt",
@@ -61,6 +65,13 @@
 			ChannelGroupId: 1,
 		},
 		ConnectStatus: 1,
+		PmConfigs: &voltha.PmConfigs{
+			DefaultFreq:  10,
+			Id:           "olt",
+			FreqOverride: false,
+			Grouped:      false,
+			Metrics:      pm,
+		},
 	}
 	mcp.Devices["onu1"] = &voltha.Device{
 
@@ -80,6 +91,13 @@
 			ChannelGroupId: 1,
 		},
 		ConnectStatus: 1,
+		PmConfigs: &voltha.PmConfigs{
+			DefaultFreq:  10,
+			Id:           "olt",
+			FreqOverride: false,
+			Grouped:      false,
+			Metrics:      pm,
+		},
 	}
 	mcp.Devices["onu2"] = &voltha.Device{
 		Id:         "2",
@@ -99,6 +117,13 @@
 			ChannelGroupId: 1,
 		},
 		ConnectStatus: 1,
+		PmConfigs: &voltha.PmConfigs{
+			DefaultFreq:  10,
+			Id:           "olt",
+			FreqOverride: false,
+			Grouped:      false,
+			Metrics:      pm,
+		},
 	}
 	return &mcp
 }
@@ -131,6 +156,8 @@
 	dh.Client = &mocks.MockOpenoltClient{}
 	dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}}
 	dh.transitionMap = &TransitionMap{}
+	dh.portStats = &OpenOltStatisticsMgr{}
+	dh.metrics = &pmmetrics.PmMetrics{}
 	return dh
 }
 
@@ -701,7 +728,7 @@
 	}{
 		// TODO: Add test cases.
 		{"AdoptDevice-1", dh1, args{device: dh1.device}},
-		{"AdoptDevice-1", dh2, args{device: dh2.device}},
+		{"AdoptDevice-2", dh2, args{device: dh2.device}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -985,3 +1012,33 @@
 		})
 	}
 }
+
+func Test_startCollector(t *testing.T) {
+	type args struct {
+		dh *DeviceHandler
+	}
+	dh := newMockDeviceHandler()
+	dh.portStats.NorthBoundPort = make(map[uint32]*NniPort)
+	dh.portStats.NorthBoundPort[0] = &NniPort{Name: "OLT-1"}
+	dh.portStats.SouthBoundPort = make(map[uint32]*PonPort)
+	dh.portStats.Device = dh
+	for i := 0; i < 16; i++ {
+		dh.portStats.SouthBoundPort[uint32(i)] = &PonPort{DeviceID: "OLT-1"}
+	}
+	tests := []struct {
+		name string
+		args args
+	}{
+		// TODO: Add test cases.
+		{"StartCollector-1", args{dh}},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			go func() {
+				time.Sleep(2 * time.Minute)
+				tt.args.dh.stopCollector <- true
+			}()
+			startCollector(tt.args.dh)
+		})
+	}
+}
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 5f48bc6..ed6d489 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -127,6 +127,17 @@
 	return 0
 }
 
+//PortNoToIntfID returns portnumber derived from interfaceID
+func PortNoToIntfID(portno uint32, intfType voltha.Port_PortType) uint32 {
+	if (intfType) == voltha.Port_ETHERNET_NNI {
+		return (1 << 16) ^ portno
+	}
+	if (intfType) == voltha.Port_PON_OLT {
+		return (2 << 28) ^ portno
+	}
+	return 0
+}
+
 //IntfIDFromNniPortNum returns Intf ID derived from portNum
 func IntfIDFromNniPortNum(portNum uint32) uint32 {
 	return portNum & 0xFFFF
diff --git a/adaptercore/statsmanager.go b/adaptercore/statsmanager.go
index 778accc..6a67f15 100755
--- a/adaptercore/statsmanager.go
+++ b/adaptercore/statsmanager.go
@@ -20,12 +20,16 @@
 import (
 	"errors"
 	"fmt"
+	"sync"
+	"time"
 
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
 	"github.com/opencord/voltha-protos/go/openolt"
 	"github.com/opencord/voltha-protos/go/voltha"
 )
 
+var mutex = &sync.Mutex{}
+
 // PonPort representation
 type PonPort struct {
 	/*
@@ -74,7 +78,7 @@
 	PON.IntfID = IntfID
 	PON.PortNum = PortNum
 	PON.PortID = 0
-	PON.Label = fmt.Sprintf("%s,%d", "pon-", PONID)
+	PON.Label = fmt.Sprintf("%s%d", "pon-", PONID)
 
 	PON.ONUs = make(map[uint32]interface{})
 	PON.ONUsByID = make(map[uint32]interface{})
@@ -145,7 +149,7 @@
 	var NNI NniPort
 
 	NNI.PortNum = PortNum
-	NNI.Name = fmt.Sprintf("%s,%d", "nni-", PortNum)
+	NNI.Name = fmt.Sprintf("%s%d", "nni-", PortNum)
 	NNI.IntfID = IntfID
 
 	NNI.RxBytes = 0
@@ -166,8 +170,8 @@
 // OpenOltStatisticsMgr structure
 type OpenOltStatisticsMgr struct {
 	Device         *DeviceHandler
-	NorthBoundPort map[uint32]NniPort
-	SouthBoundPort map[uint32]PonPort
+	NorthBoundPort map[uint32]*NniPort
+	SouthBoundPort map[uint32]*PonPort
 	// TODO  PMMetrics Metrics
 }
 
@@ -181,16 +185,16 @@
 	// Northbound and Southbound ports
 	// added to initialize the pm_metrics
 	var Ports interface{}
-	Ports, _ = InitPorts("nni", Dev.deviceID)
-	StatMgr.NorthBoundPort, _ = Ports.(map[uint32]NniPort)
-	Ports, _ = InitPorts("pon", Dev.deviceID)
-	StatMgr.SouthBoundPort, _ = Ports.(map[uint32]PonPort)
-
+	Ports, _ = InitPorts("nni", Dev.deviceID, 1)
+	StatMgr.NorthBoundPort, _ = Ports.(map[uint32]*NniPort)
+	NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
+	Ports, _ = InitPorts("pon", Dev.deviceID, NumPonPorts)
+	StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
 	return &StatMgr
 }
 
 // InitPorts collects the port objects:  nni and pon that are updated with the current data from the OLT
-func InitPorts(Intftype string, DeviceID string) (interface{}, error) {
+func InitPorts(Intftype string, DeviceID string, numOfPorts uint32) (interface{}, error) {
 	/*
 	     This method collects the port objects:  nni and pon that are updated with the
 	     current data from the OLT
@@ -204,17 +208,17 @@
 	*/
 	var i uint32
 	if Intftype == "nni" {
-		NniPorts := make(map[uint32]NniPort)
-		for i = 0; i <= 1; i++ {
+		NniPorts := make(map[uint32]*NniPort)
+		for i = 0; i < numOfPorts; i++ {
 			Port := BuildPortObject(i, "nni", DeviceID).(*NniPort)
-			NniPorts[Port.IntfID] = *Port
+			NniPorts[Port.IntfID] = Port
 		}
 		return NniPorts, nil
 	} else if Intftype == "pon" {
-		PONPorts := make(map[uint32]PonPort)
-		for i = 0; i <= 16; i++ {
+		PONPorts := make(map[uint32]*PonPort)
+		for i = 0; i < numOfPorts; i++ {
 			PONPort := BuildPortObject(i, "pon", DeviceID).(*PonPort)
-			PONPorts[PONPort.IntfID] = *PONPort
+			PONPorts[PortNoToIntfID(PONPort.IntfID, voltha.Port_PON_OLT)] = PONPort
 		}
 		return PONPorts, nil
 	} else {
@@ -237,13 +241,16 @@
 	//This builds a port object which is added to the
 	//appropriate northbound or southbound values
 	if IntfType == "nni" {
-		IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_UNI)
-		return NewNniPort(PortNum, IntfID)
+		IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_NNI)
+		nniID := PortNoToIntfID(IntfID, voltha.Port_ETHERNET_NNI)
+		log.Debugf("NniID %v", nniID)
+		return NewNniPort(PortNum, nniID)
 	} else if IntfType == "pon" {
 		// PON ports require a different configuration
 		//  intf_id and pon_id are currently equal.
-		IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_NNI)
-		PONID := IntfID
+		IntfID := IntfIDToPortNo(PortNum, voltha.Port_PON_OLT)
+		PONID := PortNoToIntfID(IntfID, voltha.Port_PON_OLT)
+		log.Debugf("PonID %v", PONID)
 		return NewPONPort(PONID, DeviceID, IntfID, PortNum)
 	} else {
 		log.Errorf("Invalid type of interface %s", IntfType)
@@ -251,10 +258,114 @@
 	}
 }
 
+// collectNNIMetrics will collect the nni port metrics
+func (StatMgr *OpenOltStatisticsMgr) collectNNIMetrics(nniID uint32) map[string]float32 {
+
+	nnival := make(map[string]float32)
+	mutex.Lock()
+	cm := StatMgr.Device.portStats.NorthBoundPort[nniID]
+	mutex.Unlock()
+	metricName := StatMgr.Device.metrics.GetSubscriberMetrics()
+
+	if metricName != nil && len(metricName) > 0 {
+		for mName := range metricName {
+			switch mName {
+			case "rx_bytes":
+				nnival["RxBytes"] = float32(cm.RxBytes)
+			case "rx_packets":
+				nnival["RxPackets"] = float32(cm.RxPackets)
+			case "rx_mcast_packets":
+				nnival["RxMcastPackets"] = float32(cm.RxMcastPackets)
+			case "rx_bcast_packets":
+				nnival["RxBcastPackets"] = float32(cm.RxBcastPackets)
+			case "tx_bytes":
+				nnival["TxBytes"] = float32(cm.TxBytes)
+			case "tx_packets":
+				nnival["TxPackets"] = float32(cm.TxPackets)
+			case "tx_mcast_packets":
+				nnival["TxMcastPackets"] = float32(cm.TxMcastPackets)
+			case "tx_bcast_packets":
+				nnival["TxBcastPackets"] = float32(cm.TxBcastPackets)
+			}
+		}
+	}
+	return nnival
+}
+
+// collectPONMetrics will collect the pon port metrics
+func (StatMgr *OpenOltStatisticsMgr) collectPONMetrics(pID uint32) map[string]float32 {
+
+	ponval := make(map[string]float32)
+	mutex.Lock()
+	cm := StatMgr.Device.portStats.SouthBoundPort[pID]
+	mutex.Unlock()
+	metricName := StatMgr.Device.metrics.GetSubscriberMetrics()
+
+	if metricName != nil && len(metricName) > 0 {
+		for mName := range metricName {
+			switch mName {
+			case "rx_bytes":
+				ponval["RxBytes"] = float32(cm.RxBytes)
+			case "rx_packets":
+				ponval["RxPackets"] = float32(cm.RxPackets)
+			case "rx_mcast_packets":
+				ponval["RxMcastPackets"] = float32(cm.RxMcastPackets)
+			case "rx_bcast_packets":
+				ponval["RxBcastPackets"] = float32(cm.RxBcastPackets)
+			case "tx_bytes":
+				ponval["TxBytes"] = float32(cm.TxBytes)
+			case "tx_packets":
+				ponval["TxPackets"] = float32(cm.TxPackets)
+			case "tx_mcast_packets":
+				ponval["TxMcastPackets"] = float32(cm.TxMcastPackets)
+			case "tx_bcast_packets":
+				ponval["TxBcastPackets"] = float32(cm.TxBcastPackets)
+			}
+		}
+	}
+	return ponval
+}
+
+// publishMatrics will publish the pon port metrics
+func (StatMgr OpenOltStatisticsMgr) publishMetrics(portType string, val map[string]float32, portnum uint32, context map[string]string, devID string) {
+	log.Debugf("Post-%v %v", portType, val)
+
+	var metricInfo voltha.MetricInformation
+	var ke voltha.KpiEvent2
+	var volthaEventSubCatgry voltha.EventSubCategory_EventSubCategory
+
+	if portType == "NNIStats" {
+		volthaEventSubCatgry = voltha.EventSubCategory_NNI
+	} else {
+		volthaEventSubCatgry = voltha.EventSubCategory_PON
+	}
+
+	raisedTs := time.Now().UnixNano()
+	mmd := voltha.MetricMetaData{
+		Title:    portType,
+		Ts:       float64(raisedTs),
+		Context:  context,
+		DeviceId: devID,
+	}
+
+	metricInfo.Metadata = &mmd
+	metricInfo.Metrics = val
+
+	ke.SliceData = []*voltha.MetricInformation{&metricInfo}
+	ke.Type = voltha.KpiEventType_slice
+	ke.Ts = float64(time.Now().UnixNano())
+
+	if err := StatMgr.Device.EventProxy.SendKpiEvent("STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, volthaEventSubCatgry, raisedTs); err != nil {
+		log.Errorw("Failed to send Pon stats", log.Fields{"err": err})
+	}
+
+}
+
 // PortStatisticsIndication handles the port statistics indication
-func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(PortStats *openolt.PortStatistics) {
+func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(PortStats *openolt.PortStatistics, NumPonPorts uint32) {
 	log.Debugf("port-stats-collected %v", PortStats)
-	StatMgr.PortsStatisticsKpis(PortStats)
+	StatMgr.PortsStatisticsKpis(PortStats, NumPonPorts)
+	log.Infow("Received port stats indication", log.Fields{"PortStats": PortStats})
 	// TODO send stats to core topic to the voltha kafka or a different kafka ?
 }
 
@@ -265,7 +376,7 @@
 }
 
 // PortsStatisticsKpis map the port stats values into a dictionary, creates the kpiEvent and then publish to Kafka
-func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(PortStats *openolt.PortStatistics) {
+func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(PortStats *openolt.PortStatistics, NumPonPorts uint32) {
 
 	/*map the port stats values into a dictionary
 	  Create a kpoEvent and publish to Kafka
@@ -276,7 +387,7 @@
 	//var err error
 	IntfID := PortStats.IntfId
 
-	if (IntfIDToPortNo(0, voltha.Port_ETHERNET_NNI) < IntfID) &&
+	if (IntfIDToPortNo(1, voltha.Port_ETHERNET_NNI) < IntfID) &&
 		(IntfID < IntfIDToPortNo(4, voltha.Port_ETHERNET_NNI)) {
 		/*
 		   for this release we are only interested in the first NNI for
@@ -284,24 +395,45 @@
 		   we are not using the other 3
 		*/
 		return
-	}
-	PMData := make(map[string]uint64)
-	PMData["rx_bytes"] = PortStats.RxBytes
-	PMData["rx_packets"] = PortStats.RxPackets
-	PMData["rx_ucast_packets"] = PortStats.RxUcastPackets
-	PMData["rx_mcast_packets"] = PortStats.RxMcastPackets
-	PMData["rx_bcast_packets"] = PortStats.RxBcastPackets
-	PMData["rx_error_packets"] = PortStats.RxErrorPackets
-	PMData["tx_bytes"] = PortStats.TxBytes
-	PMData["tx_packets"] = PortStats.TxPackets
-	PMData["tx_ucast_packets"] = PortStats.TxUcastPackets
-	PMData["tx_mcast_packets"] = PortStats.TxMcastPackets
-	PMData["tx_bcast_packets"] = PortStats.TxBcastPackets
-	PMData["tx_error_packets"] = PortStats.TxErrorPackets
-	PMData["rx_crc_errors"] = PortStats.RxCrcErrors
-	PMData["bip_errors"] = PortStats.BipErrors
+	} else if IntfIDToPortNo(0, voltha.Port_ETHERNET_NNI) == IntfID {
 
-	PMData["intf_id"] = uint64(PortStats.IntfId)
+		var portNNIStat NniPort
+		portNNIStat.IntfID = IntfID
+		portNNIStat.PortNum = uint32(0)
+		portNNIStat.RxBytes = PortStats.RxBytes
+		portNNIStat.RxPackets = PortStats.RxPackets
+		portNNIStat.RxMcastPackets = PortStats.RxMcastPackets
+		portNNIStat.RxBcastPackets = PortStats.RxBcastPackets
+		portNNIStat.TxBytes = PortStats.TxBytes
+		portNNIStat.TxPackets = PortStats.TxPackets
+		portNNIStat.TxMcastPackets = PortStats.TxMcastPackets
+		portNNIStat.TxBcastPackets = PortStats.TxBcastPackets
+		mutex.Lock()
+		StatMgr.NorthBoundPort[0] = &portNNIStat
+		mutex.Unlock()
+		log.Debugf("Received-NNI-Stats: %v", StatMgr.NorthBoundPort)
+	}
+	for i := uint32(0); i < NumPonPorts; i++ {
+
+		if IntfIDToPortNo(i, voltha.Port_PON_OLT) == IntfID {
+			var portPonStat PonPort
+			portPonStat.IntfID = IntfID
+			portPonStat.PortNum = i
+			portPonStat.PONID = i
+			portPonStat.RxBytes = PortStats.RxBytes
+			portPonStat.RxPackets = PortStats.RxPackets
+			portPonStat.RxMcastPackets = PortStats.RxMcastPackets
+			portPonStat.RxBcastPackets = PortStats.RxBcastPackets
+			portPonStat.TxBytes = PortStats.TxBytes
+			portPonStat.TxPackets = PortStats.TxPackets
+			portPonStat.TxMcastPackets = PortStats.TxMcastPackets
+			portPonStat.TxBcastPackets = PortStats.TxBcastPackets
+			mutex.Lock()
+			StatMgr.SouthBoundPort[i] = &portPonStat
+			mutex.Unlock()
+			log.Debugf("Received-PON-Stats-for-Port %v : %v", i, StatMgr.SouthBoundPort[i])
+		}
+	}
 
 	/*
 	   Based upon the intf_id map to an nni port or a pon port
diff --git a/adaptercore/statsmanager_test.go b/adaptercore/statsmanager_test.go
index 88f3703..91f6c87 100644
--- a/adaptercore/statsmanager_test.go
+++ b/adaptercore/statsmanager_test.go
@@ -18,6 +18,7 @@
 package adaptercore
 
 import (
+	"reflect"
 	"testing"
 
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -45,7 +46,7 @@
 		},
 		ConnectStatus: 1,
 	}
-	dh := &DeviceHandler{}
+	dh := newMockDeviceHandler()
 	dh.device = device
 	StatMgr := NewOpenOltStatsMgr(dh)
 
@@ -62,7 +63,215 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 
-			StatMgr.PortStatisticsIndication(tt.args.PortStats)
+			StatMgr.PortStatisticsIndication(tt.args.PortStats, 16)
+		})
+	}
+}
+
+func TestOpenOltStatisticsMgr_publishMetrics(t *testing.T) {
+	type fields struct {
+		Device         *DeviceHandler
+		NorthBoundPort map[uint32]*NniPort
+		SouthBoundPort map[uint32]*PonPort
+	}
+	type args struct {
+		portType string
+		val      map[string]float32
+		portnum  uint32
+		context  map[string]string
+	}
+	ctx := map[string]string{}
+	ctx["deviceID"] = "Test"
+	ponmap := map[uint32]*PonPort{}
+	ponmap[0] = &PonPort{
+		PONID:          0,
+		DeviceID:       "onu1",
+		IntfID:         0,
+		PortNum:        0,
+		PortID:         0,
+		Label:          "",
+		ONUs:           nil,
+		ONUsByID:       nil,
+		RxBytes:        0,
+		RxPackets:      0,
+		RxMcastPackets: 0,
+		RxBcastPackets: 0,
+		RxErrorPackets: 0,
+		TxBytes:        0,
+		TxPackets:      0,
+		TxUcastPackets: 0,
+		TxMcastPackets: 0,
+		TxBcastPackets: 0,
+		TxErrorPackets: 0,
+	}
+	nnimap := map[uint32]*NniPort{}
+	nnimap[0] = &NniPort{
+		PortNum:        0,
+		Name:           "olt1",
+		LogicalPort:    0,
+		IntfID:         0,
+		RxBytes:        0,
+		RxPackets:      0,
+		RxMcastPackets: uint64(1111),
+		RxBcastPackets: 0,
+		RxErrorPackets: 0,
+		TxBytes:        0,
+		TxPackets:      0,
+		TxUcastPackets: 0,
+		TxMcastPackets: 0,
+		TxBcastPackets: 0,
+		TxErrorPackets: 0,
+	}
+	pval := make(map[string]float32)
+	pval["rx_bytes"] = float32(111)
+	nval := make(map[string]float32)
+	nval["rx_bytes"] = float32(111)
+	dhandlerNNI := newMockDeviceHandler()
+	dhandlerNNI.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: nil, NorthBoundPort: nnimap}
+	dhandlerPON := newMockDeviceHandler()
+	dhandlerPON.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: ponmap, NorthBoundPort: nil}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+	}{
+		{
+			name: "PublishNNIMetrics-1",
+			fields: fields{
+				Device:         dhandlerNNI,
+				NorthBoundPort: nnimap,
+				SouthBoundPort: nil,
+			},
+			args: args{
+				portType: "NNIStats",
+				val:      nval,
+				portnum:  0,
+				context:  ctx,
+			},
+		},
+		{
+			name: "PublishPONMetrics-1",
+			fields: fields{
+				Device:         dhandlerPON,
+				NorthBoundPort: nil,
+				SouthBoundPort: ponmap,
+			},
+			args: args{
+				portType: "PONStats",
+				val:      pval,
+				portnum:  0,
+				context:  ctx,
+			},
+		},
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			StatMgr := &OpenOltStatisticsMgr{
+				Device:         tt.fields.Device,
+				NorthBoundPort: tt.fields.NorthBoundPort,
+				SouthBoundPort: tt.fields.SouthBoundPort,
+			}
+			StatMgr.publishMetrics(tt.args.portType, tt.args.val, tt.args.portnum, tt.args.context, "onu1")
+
+		})
+	}
+}
+
+func TestOpenOltStatisticsMgr_collectNNIMetrics(t *testing.T) {
+	type fields struct {
+		Device         *DeviceHandler
+		NorthBoundPort map[uint32]*NniPort
+		SouthBoundPort map[uint32]*PonPort
+	}
+	type args struct {
+		nniID uint32
+	}
+	dhandler := newMockDeviceHandler()
+	pmconfig := make(map[string]*voltha.PmConfig)
+	pmconfig["rx_bytes"] = &voltha.PmConfig{Name: "olt"}
+
+	var res map[string]float32
+	nnimap := map[uint32]*NniPort{}
+	nnimap[0] = &NniPort{Name: "olt"}
+	nnimap[1] = &NniPort{Name: "olt"}
+	dh := &DeviceHandler{portStats: &OpenOltStatisticsMgr{Device: dhandler, SouthBoundPort: nil, NorthBoundPort: nnimap}}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		want   map[string]float32
+	}{
+		{"CollectNNIMetrics-1", fields{
+			Device:         dh,
+			NorthBoundPort: nnimap,
+			SouthBoundPort: nil,
+		}, args{0}, res},
+		{"CollectNNIMetrics-2", fields{
+			Device:         dh,
+			NorthBoundPort: nnimap,
+			SouthBoundPort: nil,
+		}, args{1}, res},
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			StatMgr := &OpenOltStatisticsMgr{
+				Device:         tt.fields.Device,
+				NorthBoundPort: tt.fields.NorthBoundPort,
+				SouthBoundPort: tt.fields.SouthBoundPort,
+			}
+			got := StatMgr.collectNNIMetrics(tt.args.nniID)
+			if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+				t.Errorf("collectNNIMetrics() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestOpenOltStatisticsMgr_collectPONMetrics(t *testing.T) {
+	type fields struct {
+		Device         *DeviceHandler
+		NorthBoundPort map[uint32]*NniPort
+		SouthBoundPort map[uint32]*PonPort
+	}
+	type args struct {
+		pID uint32
+	}
+	dhandler := newMockDeviceHandler()
+	pmconfig := make(map[string]*voltha.PmConfig)
+	pmconfig["rx_bytes"] = &voltha.PmConfig{Name: "olt"}
+
+	var res map[string]float32
+	ponmap := map[uint32]*PonPort{}
+	ponmap[0] = &PonPort{DeviceID: "olt"}
+	ponmap[1] = &PonPort{DeviceID: "olt"}
+	dh := &DeviceHandler{portStats: &OpenOltStatisticsMgr{Device: dhandler, SouthBoundPort: ponmap, NorthBoundPort: nil}}
+
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+		want   map[string]float32
+	}{
+		{"CollectPONMetrics-1", fields{
+			Device:         dh,
+			NorthBoundPort: nil,
+			SouthBoundPort: ponmap,
+		}, args{0}, res},
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			StatMgr := &OpenOltStatisticsMgr{
+				Device:         tt.fields.Device,
+				NorthBoundPort: tt.fields.NorthBoundPort,
+				SouthBoundPort: tt.fields.SouthBoundPort,
+			}
+			got := StatMgr.collectPONMetrics(tt.args.pID)
+			if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+				t.Errorf("collectPONMetrics() = %v, want %v", got, tt.want)
+			}
 		})
 	}
 }