VOL-1381 and VOL-1091 Add feature to collect PMmetrics of OLT, this will collect Tx and Rx and posts to kafka. This commit is also related to changes of commit 72b25abd4e804596413558ef10f098c2262dd65d voltha-go
Updated review comments

Updated with code optimization
Updated with dep files
Updated fix for sanity test
Updated with gomod changes

Change-Id: I899e855812a6eda812c64664a9154321cdb16876
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7f2b9cf..dbf301f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -35,6 +35,7 @@
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
 	"github.com/opencord/voltha-protos/go/common"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -76,6 +77,9 @@
 	discOnus      map[string]bool
 	onus          map[string]*OnuDevice
 	nniIntfID     int
+	portStats     *OpenOltStatisticsMgr
+	metrics       *pmmetrics.PmMetrics
+	stopCollector chan bool
 }
 
 //OnuDevice represents ONU related info
@@ -89,6 +93,17 @@
 	uniPorts      map[uint32]struct{}
 }
 
+var pmNames = []string{
+	"rx_bytes",
+	"rx_packets",
+	"rx_mcast_packets",
+	"rx_bcast_packets",
+	"tx_bytes",
+	"tx_packets",
+	"tx_mcast_packets",
+	"tx_bcast_packets",
+}
+
 //NewOnuDevice creates a new Onu Device
 func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string) *OnuDevice {
 	var device OnuDevice
@@ -122,7 +137,8 @@
 	// when the first IntfOperInd with status as "up" is received for
 	// any one of the available NNI port on the OLT device.
 	dh.nniIntfID = -1
-
+	dh.stopCollector = make(chan bool, 2)
+	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -352,15 +368,14 @@
 		go dh.handlePacketIndication(pktInd)
 	case *oop.Indication_PortStats:
 		portStats := indication.GetPortStats()
-		log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+		go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
 	case *oop.Indication_FlowStats:
 		flowStats := indication.GetFlowStats()
 		log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
 	case *oop.Indication_AlarmInd:
 		alarmInd := indication.GetAlarmInd()
 		log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
-		dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
-
+		go dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
 	}
 }
 
@@ -515,6 +530,8 @@
 	/* TODO: Instantiate Alarm , stats , BW managers */
 	/* Instantiating Event Manager to handle Alarms and KPIs */
 	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+	// Stats config for new device
+	dh.portStats = NewOpenOltStatsMgr(dh)
 
 	// Start reading indications
 	go dh.readIndications()
@@ -566,11 +583,51 @@
 	return deviceInfo, nil
 }
 
+func startCollector(dh *DeviceHandler) {
+	// Initial delay for OLT initialization
+	time.Sleep(1 * time.Minute)
+	log.Debugf("Starting-Collector")
+	context := make(map[string]string)
+	for {
+		select {
+		case <-dh.stopCollector:
+			log.Debugw("Stopping-Collector-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+			return
+		default:
+			freq := dh.metrics.ToPmConfigs().DefaultFreq
+			time.Sleep(time.Duration(freq) * time.Second)
+			context["oltid"] = dh.deviceID
+			context["devicetype"] = dh.deviceType
+			// NNI Stats
+			cmnni := dh.portStats.collectNNIMetrics(uint32(0))
+			log.Debugf("Collect-NNI-Metrics %v", cmnni)
+			go dh.portStats.publishMetrics("NNIStats", cmnni, uint32(0), context, dh.deviceID)
+			log.Debugf("Publish-NNI-Metrics")
+			// PON Stats
+			NumPonPORTS := dh.resourceMgr.DevInfo.GetPonPorts()
+			for i := uint32(0); i < NumPonPORTS; i++ {
+				cmpon := dh.portStats.collectPONMetrics(i)
+				log.Debugf("Collect-PON-Metrics %v", cmpon)
+
+				go dh.portStats.publishMetrics("PONStats", cmpon, i, context, dh.deviceID)
+				log.Debugf("Publish-PON-Metrics")
+			}
+		}
+	}
+}
+
 //AdoptDevice adopts the OLT device
 func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
 	dh.transitionMap = NewTransitionMap(dh)
 	log.Infow("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
 	dh.transitionMap.Handle(DeviceInit)
+
+	// Now, set the initial PM configuration for that device
+	if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+		log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+	}
+
+	go startCollector(dh)
 }
 
 //GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -893,7 +950,7 @@
 			"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
 		if err != nil {
 			log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
-				"From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id, "Error": err})
+				"From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
 			return
 		}
 	} else {
@@ -1199,6 +1256,8 @@
 		delete(dh.onus, onu)
 	}
 	log.Debug("Removed-device-from-Resource-manager-KV-store")
+	// Stop the Stats collector
+	dh.stopCollector <- true
 	//Reset the state
 	if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
 		log.Errorw("Failed-to-reboot-olt ", log.Fields{"err": err})