[VOL-3318] : Framework for collecting Metrics from the ONU
- Currently implements collecting of OpticalPower Metrics and UNI status metrics
- Supports changing Metric collection frequency
- Use voltha-lib-go version 4.0.6

Change-Id: I9bd1ec8d8af9d739db96ae0303b6702dd3ce8520
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 0e0c1af..758bf9e 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -31,6 +31,7 @@
 	"github.com/looplab/fsm"
 	me "github.com/opencord/omci-lib-go/generated"
 	"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+	"github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v4/pkg/db"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
 	flow "github.com/opencord/voltha-lib-go/v4/pkg/flows"
@@ -124,6 +125,19 @@
 	drTechProfileConfigDeleteSuccess:   "tech-profile-config-delete-success",
 }
 
+// OmciOpticalMetricsNames are supported optical pm names
+var OmciOpticalMetricsNames = map[string]voltha.PmConfig_PmType{
+	"transmit_power": voltha.PmConfig_GAUGE,
+	"receive_power":  voltha.PmConfig_GAUGE,
+}
+
+// OmciUniMetricsNames are supported UNI status names
+var OmciUniMetricsNames = map[string]voltha.PmConfig_PmType{
+	"ethernet_type":   voltha.PmConfig_GAUGE,
+	"oper_status":     voltha.PmConfig_GAUGE,
+	"uni_admin_state": voltha.PmConfig_GAUGE,
+}
+
 //deviceHandler will interact with the ONU ? device.
 type deviceHandler struct {
 	deviceID         string
@@ -140,12 +154,15 @@
 	AdapterProxy adapterif.AdapterProxy
 	EventProxy   eventif.EventProxy
 
+	pmMetrics *common.PmMetrics
+
 	pOpenOnuAc      *OpenONUAC
 	pDeviceStateFsm *fsm.FSM
 	//pPonPort        *voltha.Port
 	deviceEntrySet  chan bool //channel for DeviceEntry set event
 	pOnuOmciDevice  *OnuDeviceEntry
 	pOnuTP          *onuUniTechProf
+	pOnuMetricsMgr  *onuMetricsManager
 	exitChannel     chan int
 	lockDevice      sync.RWMutex
 	pOnuIndication  *oop.OnuIndication
@@ -160,7 +177,6 @@
 	//discOnus sync.Map
 	//onus     sync.Map
 	//portStats          *OpenOltStatisticsMgr
-	//metrics            *pmmetrics.PmMetrics
 	stopCollector              chan bool
 	stopHeartbeatCheck         chan bool
 	uniEntityMap               map[uint32]*onuUniPort
@@ -194,7 +210,32 @@
 	dh.UniVlanConfigFsmMap = make(map[uint8]*UniVlanConfigFsm)
 	dh.reconciling = false
 	dh.ReadyForSpecificOmciConfig = false
+	metricNames := make([]string, len(OmciOpticalMetricsNames)+len(OmciUniMetricsNames))
 
+	for k := range OmciOpticalMetricsNames {
+		metricNames = append(metricNames, k)
+	}
+
+	for k := range OmciUniMetricsNames {
+		metricNames = append(metricNames, k)
+	}
+
+	// The frequency is in seconds.
+	dh.pmMetrics = common.NewPmMetrics(cloned.Id, common.Frequency(150), common.FrequencyOverride(false), common.Grouped(false), common.Metrics(metricNames))
+	for pmName, pmType := range OmciOpticalMetricsNames {
+		dh.pmMetrics.ToPmConfigs().Metrics = append(dh.pmMetrics.ToPmConfigs().Metrics, &voltha.PmConfig{
+			Name:    pmName,
+			Type:    pmType,
+			Enabled: true,
+		})
+	}
+	for pmName, pmType := range OmciUniMetricsNames {
+		dh.pmMetrics.ToPmConfigs().Metrics = append(dh.pmMetrics.ToPmConfigs().Metrics, &voltha.PmConfig{
+			Name:    pmName,
+			Type:    pmType,
+			Enabled: true,
+		})
+	}
 	// Device related state machine
 	dh.pDeviceStateFsm = fsm.NewFSM(
 		devStNull,
@@ -248,6 +289,10 @@
 			logger.Errorw(ctx, "Device FSM: Can't go to state DeviceInit", log.Fields{"err": err})
 		}
 		logger.Debugw(ctx, "Device FSM: ", log.Fields{"state": string(dh.pDeviceStateFsm.Current())})
+		// Now, set the initial PM configuration for that device
+		if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.pmMetrics.ToPmConfigs()); err != nil {
+			logger.Errorw(ctx, "error updating pm config to core", log.Fields{"device-id": dh.deviceID, "err": err})
+		}
 	} else {
 		logger.Debugw(ctx, "AdoptOrReconcileDevice: Agent/device init already done", log.Fields{"device-id": device.Id})
 	}
@@ -1015,6 +1060,7 @@
 		go dh.reconcileDeviceOnuInd(ctx)
 		// reconcilement will be continued after mib download is done
 	}
+
 	/*
 			############################################################################
 			# Setup Alarm handler
@@ -1058,6 +1104,7 @@
 		else:
 			self.logger.info('onu-already-activated')
 	*/
+
 	logger.Debug(ctx, "postInit-done")
 }
 
@@ -1187,11 +1234,12 @@
 
 //setOnuDeviceEntry sets the ONU device entry within the handler
 func (dh *deviceHandler) setOnuDeviceEntry(
-	apDeviceEntry *OnuDeviceEntry, apOnuTp *onuUniTechProf) {
+	apDeviceEntry *OnuDeviceEntry, apOnuTp *onuUniTechProf, apOnuMetricsMgr *onuMetricsManager) {
 	dh.lockDevice.Lock()
 	defer dh.lockDevice.Unlock()
 	dh.pOnuOmciDevice = apDeviceEntry
 	dh.pOnuTP = apOnuTp
+	dh.pOnuMetricsMgr = apOnuMetricsMgr
 }
 
 //addOnuDeviceEntry creates a new ONU device or returns the existing
@@ -1206,8 +1254,9 @@
 		/* and no alarm_db yet (oo.alarm_db)  */
 		deviceEntry = newOnuDeviceEntry(ctx, dh)
 		onuTechProfProc := newOnuUniTechProf(ctx, dh)
+		onuMetricsMgr := newonuMetricsManager(ctx, dh)
 		//error treatment possible //TODO!!!
-		dh.setOnuDeviceEntry(deviceEntry, onuTechProfProc)
+		dh.setOnuDeviceEntry(deviceEntry, onuTechProfProc, onuMetricsMgr)
 		// fire deviceEntry ready event to spread to possibly waiting processing
 		dh.deviceEntrySet <- true
 		logger.Debugw(ctx, "onuDeviceEntry-added", log.Fields{"device-id": dh.deviceID})
@@ -1398,6 +1447,10 @@
 		logger.Errorw(ctx, "MibSyncFsm invalid - cannot be executed!!", log.Fields{"device-id": dh.deviceID})
 		return fmt.Errorf("can't execute MibSync: %s", dh.deviceID)
 	}
+
+	// Start PM collector routine
+	go dh.startCollector(ctx)
+
 	return nil
 }
 
@@ -1516,7 +1569,9 @@
 			}
 		}
 	}
-	//TODO!!! care about PM/Alarm processing once started
+	// Stop collector routine for PM Counters
+	dh.stopCollector <- true
+
 	return nil
 }
 
@@ -2492,3 +2547,44 @@
 	}
 	return 0, errors.New("error-fetching-uni-port")
 }
+
+// updatePmConfig updates the pm metrics config.
+func (dh *deviceHandler) updatePmConfig(ctx context.Context, pmConfigs *voltha.PmConfigs) {
+	logger.Infow(ctx, "update-pm-config", log.Fields{"device-id": dh.device.Id, "pm-configs": pmConfigs})
+
+	// TODO: Currently support only updating the PM Sampling Frequency
+	if pmConfigs.DefaultFreq != dh.pmMetrics.ToPmConfigs().DefaultFreq {
+		dh.pmMetrics.UpdateFrequency(pmConfigs.DefaultFreq)
+		logger.Debugw(ctx, "frequency-updated--new-frequency", log.Fields{"device-id": dh.deviceID, "frequency": dh.pmMetrics.ToPmConfigs().DefaultFreq})
+	} else {
+		logger.Debugw(ctx, "new-frequency-same-as-old--not-updating", log.Fields{"frequency": pmConfigs.DefaultFreq})
+	}
+}
+
+func (dh *deviceHandler) startCollector(ctx context.Context) {
+	logger.Debugf(ctx, "startingCollector")
+
+	// Start routine to process OMCI GET Responses
+	go dh.pOnuMetricsMgr.processOmciMessages(ctx)
+
+	for {
+		select {
+		case <-dh.stopCollector:
+			logger.Debugw(ctx, "stopping-collector-for-onu", log.Fields{"device-id": dh.device.Id})
+			dh.pOnuMetricsMgr.stopProcessingOmciResponses <- true // Stop the OMCI GET response processing routine
+			return
+		case <-time.After(time.Duration(dh.pmMetrics.ToPmConfigs().DefaultFreq) * time.Second):
+			go func() {
+				logger.Debug(ctx, "startCollector before collecting optical metrics")
+				metricInfo := dh.pOnuMetricsMgr.collectOpticalMetrics(ctx)
+				dh.pOnuMetricsMgr.publishMetrics(ctx, metricInfo)
+			}()
+
+			go func() {
+				logger.Debug(ctx, "startCollector before collecting uni metrics")
+				metricInfo := dh.pOnuMetricsMgr.collectUniStatusMetrics(ctx)
+				dh.pOnuMetricsMgr.publishMetrics(ctx, metricInfo)
+			}()
+		}
+	}
+}