VOL-716: Publish PM to Kafka and support of enable/disable

Change-Id: I8556d2891290276f779d164a082b311aa1f6cc29
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 569bb97..40f2678 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -72,6 +72,7 @@
         self._runner = TaskRunner(device_id)  # OMCI_CC Task runner
         self._deferred = None
         self._first_in_sync = False
+        self._first_capabilities = False
 
         # OMCI related databases are on a per-agent basis. State machines and tasks
         # are per ONU Vendor
@@ -125,6 +126,8 @@
             self._alarm_sync_sm,
         ]
         self._on_sync_state_machines = [        # Run after first in_sync event
+        ]
+        self._on_capabilities_state_machines = [  # Run after first capabilities events
             self._pm_intervals_sm
         ]
         self._custom_me_map = custom_me_map
@@ -183,6 +186,15 @@
         """
         return self._pm_intervals_sm
 
+    def set_pm_config(self, pm_config):
+        """
+        Set PM interval configuration
+
+        :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+        :return:
+        """
+        self._pm_intervals_sm.set_pm_config(pm_config)
+
     @property
     def alarm_synchronizer(self):
         """
@@ -281,6 +293,7 @@
         self._started = True
         self._omci_cc.enabled = True
         self._first_in_sync = True
+        self._first_capabilities = True
         self._runner.start()
         self._configuration = OnuConfiguration(self._omci_agent, self._device_id)
 
@@ -353,6 +366,25 @@
             self._deferred = reactor.callLater(0, start_state_machines,
                                                self._on_sync_state_machines)
 
+    def first_in_capabilities_event(self):
+        """
+        This event is called on the first capabilities event after
+        OpenOMCI has been started. It is responsible for starting any
+        other state machine. These are often state machines that have tasks
+        that are dependent upon knowing if various MEs are supported
+        """
+        if self._first_capabilities:
+            self._first_capabilities = False
+
+            # Start up any other remaining OpenOMCI state machines
+            def start_state_machines(machines):
+                for sm in machines:
+                    self._state_machines.append(sm)
+                    reactor.callLater(0, sm.start)
+
+            self._deferred = reactor.callLater(0, start_state_machines,
+                                               self._on_capabilities_state_machines)
+
     def _publish_device_status_event(self):
         """
         Publish the ONU Device start/start status.
@@ -366,6 +398,9 @@
         """
         Publish the ONU Device start/start status.
         """
+        if self.first_in_capabilities_event:
+            self.first_in_capabilities_event()
+
         topic = OnuDeviceEntry.event_bus_topic(self.device_id,
                                                OnuDeviceEvents.OmciCapabilitiesEvent)
         msg = {
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index 7f81ea3..417a706 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -113,6 +113,7 @@
         self._agent = agent
         self._device_id = device_id
         self._device = None
+        self._pm_config = None
         self._timeout_delay = timeout_delay
         self._tick_delay = tick_delay
         self._interval_skew = interval_skew
@@ -161,6 +162,7 @@
                                transitions=transitions,
                                initial=initial_state,
                                queued=True,
+                               ignore_invalid_triggers=True,
                                name='{}-{}'.format(self.__class__.__name__,
                                                    device_id))
 
@@ -217,6 +219,15 @@
                                       'next': str(self._next_interval)
                                   })
 
+    def set_pm_config(self, pm_config):
+        """
+        Set PM interval configuration
+
+        :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+        :return:
+        """
+        self._pm_config = pm_config
+
     def _me_is_supported(self, class_id):
         """
         Check to see if ONU supports this ME
@@ -395,8 +406,8 @@
         self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
 
-        def success(results):
-            self.log.debug('sync-time-success: {}'.format(results))
+        def success(_results):
+            self.log.debug('sync-time-success')
             self._current_task = None
             self._deferred = reactor.callLater(0, self.success)
             # Calculate next interval time
@@ -443,7 +454,7 @@
         mes, self._add_pm_me = self._add_pm_me, dict()
 
         def success(results):
-            self.log.debug('create-me-success: {}'.format(results))
+            self.log.debug('create-me-success', results=results)
 
             # Check if already here. The create request could have received
             # an already-exists status code which we consider successful
@@ -476,7 +487,7 @@
         mes, self._del_pm_me = self._del_pm_me, set()
 
         def success(results):
-            self.log.debug('delete-me-success: {}'.format(results))
+            self.log.debug('delete-me-success', results=results)
             self._current_task = None
             for me in mes:
                 self._pm_me_collect_retries.pop(me)
@@ -518,7 +529,9 @@
             # Collect the data ?
             if self._pm_me_collect_retries[key] > 0:
                 def success(results):
-                    self.log.info('collect-success', results=results)
+                    self.log.info('collect-success', results=results,
+                                  class_id=results.get('class_id'),
+                                  entity_id=results.get('entity_id'))
                     self._current_task = None
                     self._pm_me_collect_retries[key] = 0
                     self._deferred = reactor.callLater(0, self.success)
@@ -605,8 +618,11 @@
         :param results: (dict) PM results
         """
         self.log.debug('collect-publish', results=results)
-        pass  # TODO: Publish it here
-        pass  # TODO: Save off last time interval fetched to persistent storage
+
+        if self._pm_config is not None:
+            self._pm_config.publish_metrics(results)
+
+        pass  # TODO: Save off last time interval fetched to persistent storage?
 
     def on_mib_reset_response(self, _topic, msg):
         """
diff --git a/voltha/extensions/omci/tasks/interval_data_task.py b/voltha/extensions/omci/tasks/interval_data_task.py
index cb02187..04cc8eb 100644
--- a/voltha/extensions/omci/tasks/interval_data_task.py
+++ b/voltha/extensions/omci/tasks/interval_data_task.py
@@ -112,19 +112,19 @@
         attr_names = self._counter_attributes.keys()
 
         final_results = {
-            'class-id': self._class_id,
-            'entity-id': self._entity_id,
-            'me-name': self._entity.__name__,   # Mostly for debugging...
-            'interval-utc-time': None,
+            'class_id': self._class_id,
+            'entity_id': self._entity_id,
+            'me_name': self._entity.__name__,   # Mostly for debugging...
+            'interval_utc_time': None,
             # Counters added here as they are retrieved
         }
         last_end_time = None
 
         while len(attr_names) > 0:
             # Get as many attributes that will fit. Always include the 1 octet
-            # Interval End Time Attribute
+            # Interval End Time Attribute and 2 octets for the Entity ID
 
-            remaining_payload = self._max_payload - 1
+            remaining_payload = self._max_payload - 3
             attributes = list()
             for name in attr_names:
                 if self._counter_attributes[name] > remaining_payload:
@@ -145,6 +145,8 @@
                     attributes_mask=self._entity.mask_for(*attributes)
                 )
             )
+            self.log.debug('interval-get-request', class_id=self._class_id,
+                           entity_id=self._entity_id)
             try:
                 results = yield device.omci_cc.send(frame)
 
@@ -157,28 +159,27 @@
                                end_time=end_time)
 
                 if status != ReasonCodes.Success:
-                    raise IntervalDataTaskFailure('Unexpected Response Status: {}'.
-                                                  format(status))
-
+                    raise IntervalDataTaskFailure('Unexpected Response Status: {}, Class ID: {}'.
+                                                  format(status, self._class_id))
                 if last_end_time is None:
                     last_end_time = end_time
 
                 elif end_time != last_end_time:
                     msg = 'Interval End Time Changed during retrieval from {} to {}'\
                         .format(last_end_time, end_time)
-                    self.log.info('interval-roll-over', msg=msg)
+                    self.log.info('interval-roll-over', msg=msg, class_id=self._class_id)
                     raise IntervalDataTaskFailure(msg)
 
-                final_results['interval-utc-time'] = datetime.utcnow()
+                final_results['interval_utc_time'] = datetime.utcnow()
                 for attribute in attributes:
                     final_results[attribute] = omci_msg['data'].get(attribute)
 
             except TimeoutError as e:
-                self.log.warn('interval-get-timeout', e=e)
+                self.log.warn('interval-get-timeout', e=e, class_id=self._class_id)
                 self.deferred.errback(failure.Failure(e))
 
             except Exception as e:
-                self.log.exception('interval-get-failure', e=e)
+                self.log.exception('interval-get-failure', e=e, class_id=self._class_id)
                 self.deferred.errback(failure.Failure(e))
 
         # Successful if here
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index d2f41ec..e818d87 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -77,7 +77,6 @@
     @inlineCallbacks
     def perform_create(self):
         """ Perform the create requests """
-        self.log.info('perform-create')
 
         try:
             for pm, me in self._me_dict.items():
@@ -86,6 +85,7 @@
                 me_class_id = me[0]
                 me_entity_id = me[1]
                 upstream = me[2]
+                self.log.debug('create-pm-me', class_id=pm_class_id, entity_id=pm_entity_id)
 
                 if me_class_id == 0:
                     # Typical/common PM interval format
@@ -111,7 +111,6 @@
                         bitmap,        # Control fields bitmap
                         0,             # TCI
                         0              # Reserved
-
                     ]}
                     frame = OmciFrame(
                         transaction_id=None,  # OMCI-CC will set
diff --git a/voltha/extensions/pki/README.md b/voltha/extensions/pki/README.md
index 903c457..b6bce83 100644
--- a/voltha/extensions/pki/README.md
+++ b/voltha/extensions/pki/README.md
@@ -97,7 +97,7 @@
 | :-:      | :----- | :---- |
 | type     | string | "slice" or "ts". A "slice" is a set of path/metric data for the same time-stamp. A "ts" is a time-series: array of data for same metric |
 | ts       | float  | UTC time-stamp of data in slice mode (seconds since the epoch of January 1, 1970) |
-| prefixes | map    | One or more prefix_name - value pairs as described below |
+| prefixes | list   | One or more prefixes.  A prefix is a key-value pair described below |
 
 **NOTE**: The timestamp is currently retrieved as a whole value. It is also possible to easily get
 the floating timestamp which contains the fractional seconds since epoch. **Is this of use**?
@@ -208,20 +208,15 @@
 This initial code is only a preliminary sample. The following tasks need to be
 added to the VOLTHA JIRA or performed in the SEBA group:
     
-- Get a list from SEBA/VOLTHA on required metrics. 
-
-- Provide example JSON output and verify that it meets SEBA's requirements
+- Get a list from SEBA/VOLTHA on required metrics.
 
 - Get feedback from other OLT/ONU developers on any needed changes
 
-- Test PM group enable/disable
-
 - Allow PM groups to have different collection times
 
-- Solicit VOLTHA/SEBA if support for disabling of individual items in a PM group would be useful
-
 - Support calling a 'get-data' method before collect the metrics.  Currently metrics are collected
   in a device adapter independent way and the PM just updates what the attributes happen to have.
 
-- TODO: Probably a few more.  Look through code for more 'TODO' Notes
+- For statistics groups that have more than one instance, do we need to be able to
+  enable/disable specific instances? Major refactor of code if so (database work, ...)
 
diff --git a/voltha/extensions/pki/adapter_pm_metrics.py b/voltha/extensions/pki/adapter_pm_metrics.py
index e5b2c96..d9432a0 100644
--- a/voltha/extensions/pki/adapter_pm_metrics.py
+++ b/voltha/extensions/pki/adapter_pm_metrics.py
@@ -45,6 +45,7 @@
         self.freq_override = grouped and freq_override
         self.lc = None
         self.prefix = 'voltha.{}.{}'.format(self.name, self.device_id)
+        self.pm_group_metrics = dict()      # name -> PmGroupConfig
 
     def update(self, pm_config):
         # TODO: Move any common steps into base class
@@ -74,7 +75,7 @@
         if self.lc is not None and self.default_freq > 0:
             self.lc.stop()
 
-    def collect_metrics(self, group, names, config):
+    def collect_group_metrics(self, group, names, config):
         """
         Collect the metrics for a specific PM group.
 
@@ -95,14 +96,35 @@
         for (metric, t) in names:
             if config[metric].enabled and hasattr(group, metric):
                 metrics[metric] = getattr(group, metric)
+
         return metrics
 
-    def collect_group_metrics(self, metrics=None):
+    def collect_metrics(self, metrics=None):
+        """
+        Collect metrics for this adapter.
+
+        This method is called for each adapter at a fixed frequency. The adapter type
+        (OLT, ONU, ..) should provide a derived class where this method iterates
+        through all metrics and collects them up in a dictionary with the group/metric
+        name as the key, and the metric values as the contents.
+
+        For a group, the values are a map where   metric_name -> metric_value
+        For and individual metric, the values are the metric value
+
+        TODO: Currently all group metrics are collected on a single timer tick. This needs to be fixed.
+
+        :param metrics: (dict) Existing map to add collected metrics to.  This is
+                               provided to allow derived classes to call into further
+                               encapsulated classes
+
+        :return: (dict) metrics - see description above
+        """
         raise NotImplementedError('Your derived class should override this method')
 
     def collect_and_publish_metrics(self):
+        """ Request collection of all enabled metrics and publish them """
         try:
-            metrics = self.collect_group_metrics()
+            metrics = self.collect_metrics()
             self.publish_metrics(metrics)
 
         except Exception as e:
@@ -112,24 +134,27 @@
         """
         Publish the metrics during a collection
 
-        :param metrics: (dict) Metrics to publish
+        :param metrics: (dict) Metrics to publish. If empty, no metrics will be published
         """
-        import arrow
-        from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+        self.log.debug('publish-metrics', metrics=metrics)
 
-        try:
-            ts = arrow.utcnow().timestamp
-            kpi_event = KpiEvent(
-                type=KpiEventType.slice,
-                ts=ts,
-                prefixes={
-                    self.prefix + '.{}'.format(k): MetricValuePairs(metrics=metrics[k])
-                    for k in metrics.keys()}
-            )
-            self.adapter_agent.submit_kpis(kpi_event)
+        if len(metrics):
+            import arrow
+            from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
 
-        except Exception as e:
-            self.log.exception('failed-to-submit-kpis', e=e)
+            try:
+                ts = arrow.utcnow().timestamp
+                kpi_event = KpiEvent(
+                    type=KpiEventType.slice,
+                    ts=ts,
+                    prefixes={
+                        self.prefix + '.{}'.format(k): MetricValuePairs(metrics=metrics[k])
+                        for k in metrics.keys()}
+                )
+                self.adapter_agent.submit_kpis(kpi_event)
+
+            except Exception as e:
+                self.log.exception('failed-to-submit-kpis', e=e)
 
     # TODO: Need to support on-demand counter update if provided by the PM 'group'.
     #       Currently we expect PM data to be periodically polled by a separate
diff --git a/voltha/extensions/pki/olt/olt_pm_metrics.py b/voltha/extensions/pki/olt/olt_pm_metrics.py
index 04e403f..932aa7d 100644
--- a/voltha/extensions/pki/olt/olt_pm_metrics.py
+++ b/voltha/extensions/pki/olt/olt_pm_metrics.py
@@ -123,29 +123,29 @@
         self._pon_ports = kwargs.pop('pon-ports', None)
 
     def update(self, pm_config):
-        # TODO: Test both 'group' and 'non-group' functionality
-        # TODO: Test frequency override capability for a particular group
-        if self.default_freq != pm_config.default_freq:
-            # Update the callback to the new frequency.
-            self.default_freq = pm_config.default_freq
-            self.lc.stop()
-            self.lc.start(interval=self.default_freq / 10)
+        try:
+            # TODO: Test frequency override capability for a particular group
+            if self.default_freq != pm_config.default_freq:
+                # Update the callback to the new frequency.
+                self.default_freq = pm_config.default_freq
+                self.lc.stop()
+                self.lc.start(interval=self.default_freq / 10)
 
-        if pm_config.grouped:
-            for m in pm_config.groups:
-                # TODO: Need to support individual group enable/disable
-                pass
-                # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
-                # if m.enabled is True:
-                #     self.enable_pm_collection(m.group_name, remote)
-                # else:
-                #     self.disable_pm_collection(m.group_name, remote)
-        else:
-            for m in pm_config.metrics:
-                self.nni_metrics_config[m.name].enabled = m.enabled
-                self.pon_metrics_config[m.name].enabled = m.enabled
-                self.onu_metrics_config[m.name].enabled = m.enabled
-                self.gem_metrics_config[m.name].enabled = m.enabled
+            if pm_config.grouped:
+                for group in pm_config.groups:
+                    group_config = self.pm_group_metrics.get(group.group_name)
+                    if group_config is not None:
+                        group_config.enabled = group.enabled
+            else:
+                for m in pm_config.metrics:
+                    self.nni_metrics_config[m.name].enabled = m.enabled
+                    self.pon_metrics_config[m.name].enabled = m.enabled
+                    self.onu_metrics_config[m.name].enabled = m.enabled
+                    self.gem_metrics_config[m.name].enabled = m.enabled
+
+        except Exception as e:
+            self.log.exception('update-failure', e=e)
+            raise
 
     def make_proto(self, pm_config=None):
         if pm_config is None:
@@ -161,6 +161,8 @@
                 pm_ether_stats = PmGroupConfig(group_name='Ethernet',
                                                group_freq=self.default_freq,
                                                enabled=True)
+                self.pm_group_metrics[pm_ether_stats.group_name] = pm_ether_stats
+
             else:
                 pm_ether_stats = None
 
@@ -169,22 +171,26 @@
                                              group_freq=self.default_freq,
                                              enabled=True)
 
-                pm_ont_stats = PmGroupConfig(group_name='ONT',
+                pm_onu_stats = PmGroupConfig(group_name='ONU',
                                              group_freq=self.default_freq,
                                              enabled=True)
 
                 pm_gem_stats = PmGroupConfig(group_name='GEM',
                                              group_freq=self.default_freq,
                                              enabled=True)
+
+                self.pm_group_metrics[pm_pon_stats.group_name] = pm_pon_stats
+                self.pm_group_metrics[pm_onu_stats.group_name] = pm_onu_stats
+                self.pm_group_metrics[pm_gem_stats.group_name] = pm_gem_stats
             else:
                 pm_pon_stats = None
-                pm_ont_stats = None
+                pm_onu_stats = None
                 pm_gem_stats = None
 
         else:
             pm_ether_stats = pm_config if have_nni else None
             pm_pon_stats = pm_config if have_pon else None
-            pm_ont_stats = pm_config if have_pon else None
+            pm_onu_stats = pm_config if have_pon else None
             pm_gem_stats = pm_config if have_pon else None
 
         if have_nni:
@@ -214,7 +220,7 @@
                     if pm.name in metrics:
                         continue
                     metrics.add(pm.name)
-                pm_ont_stats.metrics.extend([PmConfig(name=pm.name,
+                pm_onu_stats.metrics.extend([PmConfig(name=pm.name,
                                                       type=pm.type,
                                                       enabled=pm.enabled)])
 
@@ -228,57 +234,44 @@
                                                       type=pm.type,
                                                       enabled=pm.enabled)])
         if self.grouped:
-            pm_groups = [stats for stats in (pm_ether_stats,
-                                             pm_pon_stats,
-                                             pm_ont_stats,
-                                             pm_gem_stats) if stats is not None]
-            pm_config.groups.extend(pm_groups)
+            pm_config.groups.extend([stats for stats in
+                                     self.pm_group_metrics.itervalues()])
 
         return pm_config
 
-    def collect_group_metrics(self, metrics=None):
+    def collect_metrics(self, metrics=None):
         # TODO: Currently PM collection is done for all metrics/groups on a single timer
         if metrics is None:
             metrics = dict()
 
-        for port in self._nni_ports:
-            metrics['nni.{}'.format(port.port_no)] = self.collect_nni_metrics(port)
-
+        if self.pm_group_metrics['Ethernet'].enabled:
+            for port in self._nni_ports:
+                name = 'nni.{}'.format(port.port_no)
+                metrics[name] = self.collect_group_metrics(port,
+                                                           self.nni_pm_names,
+                                                           self.nni_metrics_config)
         for port in self._pon_ports:
-            metrics['pon.{}'.format(port.pon_id)] = self.collect_pon_metrics(port)
-
+            if self.pm_group_metrics['PON'].enabled:
+                name = 'pon.{}'.format(port.pon_id)
+                metrics[name] = self.collect_group_metrics(port,
+                                                           self.pon_pm_names,
+                                                           self.pon_metrics_config)
             for onu_id in port.onu_ids:
                 onu = port.onu(onu_id)
                 if onu is not None:
-                    metrics['pon.{}.onu.{}'.format(port.pon_id, onu.onu_id)] = \
-                        self.collect_onu_metrics(onu)
-                    for gem in onu.gem_ports:
-                        if gem.multicast:
-                            continue
-
-                        metrics['pon.{}.onu.{}.gem.{}'.format(port.pon_id,
-                                                              onu.onu_id,
-                                                              gem.gem_id)] = \
-                            self.collect_gem_metrics(gem)
-            # TODO: Do any multicast GEM PORT metrics here...
+                    if self.pm_group_metrics['ONU'].enabled:
+                        name = 'pon.{}.onu.{}'.format(port.pon_id, onu.onu_id)
+                        metrics[name] = self.collect_group_metrics(onu,
+                                                                   self.onu_pm_names,
+                                                                   self.onu_metrics_config)
+                    if self.pm_group_metrics['GEM'].enabled:
+                        for gem in onu.gem_ports:
+                            if not gem.multicast:
+                                name = 'pon.{}.onu.{}.gem.{}'.format(port.pon_id,
+                                                                     onu.onu_id,
+                                                                     gem.gem_id)
+                                metrics[name] = self.collect_group_metrics(onu,
+                                                                           self.gem_pm_names,
+                                                                           self.gem_metrics_config)
+                            # TODO: Do any multicast GEM PORT metrics here...
         return metrics
-
-    def collect_nni_metrics(self, nni_port):
-        stats = {metric: getattr(nni_port, metric) for (metric, t) in self.nni_pm_names}
-        return {metric: value for metric, value in stats.iteritems()
-                if self.nni_metrics_config[metric].enabled}
-
-    def collect_pon_metrics(self, pon_port):
-        stats = {metric: getattr(pon_port, metric) for (metric, t) in self.pon_pm_names}
-        return {metric: value for metric, value in stats.iteritems()
-                if self.pon_metrics_config[metric].enabled}
-
-    def collect_onu_metrics(self, onu):
-        stats = {metric: getattr(onu, metric) for (metric, t) in self.onu_pm_names}
-        return {metric: value for metric, value in stats.iteritems()
-                if self.onu_metrics_config[metric].enabled}
-
-    def collect_gem_metrics(self, gem):
-        stats = {metric: getattr(gem, metric) for (metric, t) in self.gem_pm_names}
-        return {metric: value for metric, value in stats.iteritems()
-                if self.gem_metrics_config[metric].enabled}
diff --git a/voltha/extensions/pki/onu/IntervalMetrics.md b/voltha/extensions/pki/onu/IntervalMetrics.md
index 1f5bc53..cdf4e43 100644
--- a/voltha/extensions/pki/onu/IntervalMetrics.md
+++ b/voltha/extensions/pki/onu/IntervalMetrics.md
@@ -262,3 +262,7 @@
 | key_report_message_count        | This attribute counts key_report PLOAM messages transmitted. |
 | acknowledge_message_count       | This attribute counts acknowledge PLOAM messages transmitted. It includes all forms of acknowledgement, including those transmitted in response to a PLOAM grant when the ONU has nothing to send. |
 | sleep_request_message_count     | This attribute counts sleep_request PLOAM messages transmitted. |
+
+# Remaining Work Items
+
+- The enable/disable of a PM group (CLI/NBI) should control whether or not a PM interval is collected
\ No newline at end of file
diff --git a/voltha/extensions/pki/onu/onu_omci_pm.py b/voltha/extensions/pki/onu/onu_omci_pm.py
index 90a68a0..a1a3687 100644
--- a/voltha/extensions/pki/onu/onu_omci_pm.py
+++ b/voltha/extensions/pki/onu/onu_omci_pm.py
@@ -18,6 +18,12 @@
 
 
 class OnuOmciPmMetrics(AdapterPmMetrics):
+    """ ONU OMCI related metrics """
+
+    # Metric default settings
+    DEFAULT_OMCI_CC_ENABLED = False
+    DEFAULT_OMCI_CC_FREQUENCY = 1200        # 1/10ths of a second
+
     def __init__(self, adapter_agent, device_id,
                  grouped=False, freq_override=False, **kwargs):
         """
@@ -41,9 +47,7 @@
                                                **kwargs)
         self._omci_cc = kwargs.pop('omci-cc', None)
 
-        # PM Config Types are COUNTER, GUAGE, and STATE  # Note: GAUGE is misspelled in device.proto
         self.omci_pm_names = {
-            ('enabled', PmConfig.STATE),
             ('tx_frames', PmConfig.COUNTER),
             ('tx_errors', PmConfig.COUNTER),
             ('rx_frames', PmConfig.COUNTER),
@@ -65,7 +69,6 @@
         self.openomci_interval_pm = OnuPmIntervalMetrics(adapter_agent, device_id)
 
     def update(self, pm_config):
-        # TODO: Test both 'group' and 'non-group' functionality
         # TODO: Test frequency override capability for a particular group
         if self.default_freq != pm_config.default_freq:
             # Update the callback to the new frequency.
@@ -74,14 +77,10 @@
             self.lc.start(interval=self.default_freq / 10)
 
         if pm_config.grouped:
-            for m in pm_config.groups:
-                # TODO: Need to support individual group enable/disable
-                pass
-                # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
-                # if m.enabled is True:,
-                #     self.enable_pm_collection(m.group_name, remote)
-                # else:
-                #     self.disable_pm_collection(m.group_name, remote)
+            for group in pm_config.groups:
+                group_config = self.pm_group_metrics.get(group.group_name)
+                if group_config is not None:
+                    group_config.enabled = group.enabled
         else:
             for m in pm_config.metrics:
                 self.omci_metrics_config[m.name].enabled = m.enabled
@@ -93,9 +92,10 @@
 
         if self._omci_cc is not None:
             if self.grouped:
-                pm_omci_stats = PmGroupConfig(group_name='OMCI',
-                                              group_freq=self.default_freq,
-                                              enabled=True)
+                pm_omci_stats = PmGroupConfig(group_name='OMCI-CC',
+                                              group_freq=OnuOmciPmMetrics.DEFAULT_OMCI_CC_FREQUENCY,
+                                              enabled=OnuOmciPmMetrics.DEFAULT_OMCI_CC_ENABLED)
+                self.pm_group_metrics[pm_omci_stats.group_name] = pm_omci_stats
             else:
                 pm_omci_stats = pm_config
 
@@ -115,7 +115,26 @@
 
         return self.openomci_interval_pm.make_proto(pm_config)
 
-    def collect_device_metrics(self, metrics=None):
+    def collect_metrics(self, metrics=None):
+        """
+        Collect metrics for this adapter.
+
+        This method is callfor each adapter at a fixed frequency. The adapter type
+        (OLT, ONU, ..) should provide a derived class where this method iterates
+        through all metrics and collects them up in a dictionary with the group/metric
+        name as the key, and the metric values as the contents.
+
+        For a group, the values are a map where   metric_name -> metric_value
+        For and individual metric, the values are the metric value
+
+        TODO: Currently all group metrics are collected on a single timer tick. This needs to be fixed.
+
+        :param metrics: (dict) Existing map to add collected metrics to.  This is
+                               provided to allow derived classes to call into further
+                               encapsulated classes
+
+        :return: (dict) metrics - see description above
+        """
         # TODO: Currently PM collection is done for all metrics/groups on a single timer
         if metrics is None:
             metrics = dict()
@@ -123,7 +142,8 @@
         # Note: Interval PM is collection done autonomously, not through this method
 
         if self._omci_cc is not None:
-            metrics['omci-cc'] = self.collect_metrics(self._omci_cc,
-                                                      self.omci_pm_names,
-                                                      self.omci_metrics_config)
+            if self.pm_group_metrics['OMCI-CC'].enabled:
+                metrics['OMCI-CC'] = self.collect_group_metrics(self._omci_cc,
+                                                                self.omci_pm_names,
+                                                                self.omci_metrics_config)
         return metrics
diff --git a/voltha/extensions/pki/onu/onu_pm_interval_metrics.py b/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
index c34c6e3..1f52e56 100644
--- a/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
+++ b/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
@@ -33,12 +33,24 @@
     result of receipt of OMCI get responses on various PM History MEs.  They are
     also always managed as a group with a fixed frequency of 15 minutes.
     """
+    ME_ID_INFO = {
+        EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id: 'Ethernet Bridge Port History',
+        EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id: 'Ethernet Bridge Port History',
+        EthernetFrameExtendedPerformanceMonitoring.class_id: 'Ethernet Bridge Port History',
+        EthernetFrameExtendedPerformanceMonitoring64Bit.class_id: 'Ethernet Bridge Port History',
+        EthernetPMMonitoringHistoryData.class_id: 'Ethernet UNI History',
+        FecPerformanceMonitoringHistoryData.class_id: 'FEC History',
+        GemPortNetworkCtpMonitoringHistoryData.class_id: 'GEM Port History',
+        XgPonTcPerformanceMonitoringHistoryData.class_id: 'xgPON TC History',
+        XgPonDownstreamPerformanceMonitoringHistoryData.class_id: 'xgPON Downstream History',
+        XgPonUpstreamPerformanceMonitoringHistoryData.class_id: 'xgPON Upstream History'
+    }
+
     def __init__(self, adapter_agent, device_id, **kwargs):
         super(OnuPmIntervalMetrics, self).__init__(adapter_agent, device_id,
                                                    grouped=True, freq_override=False,
                                                    **kwargs)
         ethernet_bridge_history = {
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -61,7 +73,6 @@
                                                 for (m, t) in ethernet_bridge_history}
 
         ethernet_uni_history = {   # Ethernet History Data (Class ID 24)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -84,7 +95,6 @@
                                              for (m, t) in ethernet_uni_history}
 
         fec_history = {   # FEC History Data (Class ID 312)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -98,7 +108,6 @@
                                     for (m, t) in fec_history}
 
         gem_port_history = {  # GEM Port Network CTP History Data (Class ID 341)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -112,7 +121,6 @@
                                          for (m, t) in gem_port_history}
 
         xgpon_tc_history = {  # XgPon TC History Data (Class ID 344)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -129,7 +137,6 @@
                                          for (m, t) in xgpon_tc_history}
 
         xgpon_downstream_history = {  # XgPon Downstream History Data (Class ID 345)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -152,7 +159,6 @@
                                                  for (m, t) in xgpon_downstream_history}
 
         xgpon_upstream_history = {  # XgPon Upstream History Data (Class ID 346)
-            ('enabled', PmConfig.STATE),
             ('class_id', PmConfig.GUAGE),
             ('entity_id', PmConfig.GUAGE),
             ("interval_end_time", PmConfig.GUAGE),
@@ -187,14 +193,18 @@
 
         :param pm_config:
         """
-        for m in pm_config.groups:
-            # TODO: Need to support individual group enable/disable
-            pass
-            # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
-            # if m.enabled is True:,
-            #     self.enable_pm_collection(m.group_name, remote)
-            # else:
-            #     self.disable_pm_collection(m.group_name, remote)
+        self.log.debug('update')
+
+        try:
+            for group in pm_config.groups:
+                group_config = self.pm_group_metrics.get(group.group_name)
+                if group_config is not None and group_config.enabled != group.enabled:
+                    group_config.enabled = group.enabled
+                    # TODO: For OMCI PM Metrics, tie this into add/remove of the PM Interval ME itself
+
+        except Exception as e:
+            self.log.exception('update-failure', e=e)
+            raise
 
     def make_proto(self, pm_config=None):
         """
@@ -210,9 +220,10 @@
         """
         assert pm_config is not None
 
-        pm_ethernet_bridge_history = PmGroupConfig(group_name='Ethernet Bridge Port History',
+        pm_ethernet_bridge_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id],
                                                    group_freq=0,
                                                    enabled=True)
+        self.pm_group_metrics[pm_ethernet_bridge_history.group_name] = pm_ethernet_bridge_history
 
         for m in sorted(self._ethernet_bridge_history_config):
             pm = self._ethernet_bridge_history_config[m]
@@ -220,9 +231,10 @@
                                                                 type=pm.type,
                                                                 enabled=pm.enabled)])
 
-        pm_ethernet_uni_history = PmGroupConfig(group_name='Ethernet UNI History',
+        pm_ethernet_uni_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[EthernetPMMonitoringHistoryData.class_id],
                                                 group_freq=0,
                                                 enabled=True)
+        self.pm_group_metrics[pm_ethernet_uni_history.group_name] = pm_ethernet_uni_history
 
         for m in sorted(self._ethernet_uni_history_config):
             pm = self._ethernet_uni_history_config[m]
@@ -230,9 +242,10 @@
                                                              type=pm.type,
                                                              enabled=pm.enabled)])
 
-        pm_fec_history = PmGroupConfig(group_name='Upstream Ethernet History',
+        pm_fec_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[FecPerformanceMonitoringHistoryData.class_id],
                                        group_freq=0,
                                        enabled=True)
+        self.pm_group_metrics[pm_fec_history.group_name] = pm_fec_history
 
         for m in sorted(self._fec_history_config):
             pm = self._fec_history_config[m]
@@ -240,9 +253,10 @@
                                                     type=pm.type,
                                                     enabled=pm.enabled)])
 
-        pm_gem_port_history = PmGroupConfig(group_name='GEM Port History',
+        pm_gem_port_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[GemPortNetworkCtpMonitoringHistoryData.class_id],
                                             group_freq=0,
                                             enabled=True)
+        self.pm_group_metrics[pm_gem_port_history.group_name] = pm_gem_port_history
 
         for m in sorted(self._gem_port_history_config):
             pm = self._gem_port_history_config[m]
@@ -250,9 +264,10 @@
                                                          type=pm.type,
                                                          enabled=pm.enabled)])
 
-        pm_xgpon_tc_history = PmGroupConfig(group_name='xgPON TC History',
+        pm_xgpon_tc_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonTcPerformanceMonitoringHistoryData.class_id],
                                             group_freq=0,
                                             enabled=True)
+        self.pm_group_metrics[pm_xgpon_tc_history.group_name] = pm_xgpon_tc_history
 
         for m in sorted(self._xgpon_tc_history_config):
             pm = self._xgpon_tc_history_config[m]
@@ -260,9 +275,10 @@
                                                          type=pm.type,
                                                          enabled=pm.enabled)])
 
-        pm_xgpon_downstream_history = PmGroupConfig(group_name='xgPON Downstream History',
+        pm_xgpon_downstream_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonDownstreamPerformanceMonitoringHistoryData.class_id],
                                                     group_freq=0,
                                                     enabled=True)
+        self.pm_group_metrics[pm_xgpon_downstream_history.group_name] = pm_xgpon_downstream_history
 
         for m in sorted(self._xgpon_downstream_history_config):
             pm = self._xgpon_downstream_history_config[m]
@@ -270,9 +286,10 @@
                                                                  type=pm.type,
                                                                  enabled=pm.enabled)])
 
-        pm_xgpon_upstream_history = PmGroupConfig(group_name='xgPON Upstream History',
+        pm_xgpon_upstream_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonUpstreamPerformanceMonitoringHistoryData.class_id],
                                                   group_freq=0,
                                                   enabled=True)
+        self.pm_group_metrics[pm_xgpon_upstream_history.group_name] = pm_xgpon_upstream_history
 
         for m in sorted(self._xgpon_upstream_history_config):
             pm = self._xgpon_upstream_history_config[m]
@@ -280,14 +297,8 @@
                                                                type=pm.type,
                                                                enabled=pm.enabled)])
 
-        pm_config.groups.extend([pm_ethernet_bridge_history,
-                                 pm_ethernet_uni_history,
-                                 pm_fec_history,
-                                 pm_gem_port_history,
-                                 pm_xgpon_tc_history,
-                                 pm_xgpon_downstream_history,
-                                 pm_xgpon_upstream_history
-                                 ])
+        pm_config.groups.extend([stats for stats in self.pm_group_metrics.itervalues()])
+
         return pm_config
 
     def publish_metrics(self, interval_data):
@@ -305,14 +316,18 @@
 
         :return: (dict) Key/Value of metric data
         """
+        self.log.debug('publish-metrics', metrics=interval_data)
+
         try:
             import arrow
             from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
             # Locate config
 
-            config = self._configs.get(interval_data['class_id'])
+            class_id = interval_data['class_id']
+            config = self._configs.get(class_id)
+            group = self.pm_group_metrics.get(OnuPmIntervalMetrics.ME_ID_INFO.get(class_id, ''))
 
-            if config is not None and config['enabled'].enabled:
+            if config is not None and group is not None and group.enabled:
                 # Extract only the metrics we need to publish
                 config_keys = config.keys()
                 metrics = {
diff --git a/voltha/extensions/pki/onu/onu_pm_metrics.py b/voltha/extensions/pki/onu/onu_pm_metrics.py
index f1f08f1..9cede63 100644
--- a/voltha/extensions/pki/onu/onu_pm_metrics.py
+++ b/voltha/extensions/pki/onu/onu_pm_metrics.py
@@ -21,10 +21,15 @@
     """
     Shared ONU Device Adapter PM Metrics Manager
 
-    This class specifically addresses ONU genernal PM (health, ...) area
+    This class specifically addresses ONU general PM (health, ...) area
     specific PM (OMCI, PON, UNI) is supported in encapsulated classes accessible
     from this object
     """
+
+    # Metric default settings
+    DEFAULT_HEARTBEAT_ENABLED = False
+    DEFAULT_HEARTBEAT_FREQUENCY = 1200  # 1/10ths of a second
+
     def __init__(self, adapter_agent, device_id, grouped=False, freq_override=False, **kwargs):
         """
         Initializer for shared ONU Device Adapter PM metrics
@@ -52,7 +57,6 @@
         #       there is not yet a common 'heartbeat' object
         #
         self.health_pm_names = {
-            ('enabled', PmConfig.STATE),
             ('alarm_active', PmConfig.STATE),
             ('heartbeat_count', PmConfig.COUNTER),
             ('heartbeat_miss', PmConfig.COUNTER),
@@ -70,26 +74,26 @@
                                         freq_override=freq_override, **kwargs)
 
     def update(self, pm_config):
-        # TODO: Test both 'group' and 'non-group' functionality
-        # TODO: Test frequency override capability for a particular group
-        if self.default_freq != pm_config.default_freq:
-            # Update the callback to the new frequency.
-            self.default_freq = pm_config.default_freq
-            self.lc.stop()
-            self.lc.start(interval=self.default_freq / 10)
+        try:
+            # TODO: Test frequency override capability for a particular group
+            if self.default_freq != pm_config.default_freq:
+                # Update the callback to the new frequency.
+                self.default_freq = pm_config.default_freq
+                self.lc.stop()
+                self.lc.start(interval=self.default_freq / 10)
 
-        if pm_config.grouped:
-            for m in pm_config.groups:
-                # TODO: Need to support individual group enable/disable
-                pass
-                # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
-                # if m.enabled is True:,
-                #     self.enable_pm_collection(m.group_name, remote)
-                # else:
-                #     self.disable_pm_collection(m.group_name, remote)
-        else:
-            for m in pm_config.metrics:
-                self.health_metrics_config[m.name].enabled = m.enabled
+            if pm_config.grouped:
+                for group in pm_config.groups:
+                    group_config = self.pm_group_metrics.get(group.group_name)
+                    if group_config is not None:
+                        group_config.enabled = group.enabled
+            else:
+                for m in pm_config.metrics:
+                    self.health_metrics_config[m.name].enabled = m.enabled
+
+        except Exception as e:
+            self.log.exception('update-failure', e=e)
+            raise
 
         self.omci_pm.update(pm_config)
 
@@ -104,8 +108,9 @@
         if self._heartbeat is not None:
             if self.grouped:
                 pm_health_stats = PmGroupConfig(group_name='Heartbeat',
-                                                group_freq=self.default_freq,
-                                                enabled=True)
+                                                group_freq=OnuPmMetrics.DEFAULT_HEARTBEAT_FREQUENCY,
+                                                enabled=OnuPmMetrics.DEFAULT_HEARTBEAT_ENABLED)
+                self.pm_group_metrics[pm_health_stats.group_name] = pm_health_stats
             else:
                 pm_health_stats = pm_config
 
@@ -128,7 +133,12 @@
         pm_config = self.omci_pm.make_proto(pm_config)
         return pm_config
 
-    def collect_group_metrics(self, metrics=None):
+    def collect_metrics(self, metrics=None):
+        """
+        Collect metrics
+        :param metrics:
+        :return:
+        """
         # TODO: Currently PM collection is done for all metrics/groups on a single timer
         if metrics is None:
             metrics = dict()
@@ -138,7 +148,7 @@
         #     metrics['heartbeat'] = self.collect_metrics(self._heartbeat,
         #                                                 self.health_pm_names,
         #                                                 self.health_metrics_config)
-        self.omci_pm.collect_device_metrics(metrics=metrics)
+        self.omci_pm.collect_metrics(metrics=metrics)
         # TODO Add PON Port PM
         # TODO Add UNI Port PM
         return metrics