VOL-1099: Event Bus topic for advertising OpenOMCI
state machine change events. Had to touch up the
review due to max commit message line set to
only 50 characters.
Fix indention issue in proto file

Change-Id: I09f472bd8b4bbf276fd31dd1b823ef81d8960394
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 88d75d4..af60d15 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -79,19 +79,25 @@
             # MIB Synchronization state machine
             self._mib_db_in_sync = False
             mib_synchronizer_info = support_classes.get('mib-synchronizer')
+            advertise = mib_synchronizer_info['advertise-events']
             self._mib_sync_sm = mib_synchronizer_info['state-machine'](self._omci_agent,
                                                                        device_id,
                                                                        mib_synchronizer_info['tasks'],
-                                                                       mib_db)
+                                                                       mib_db,
+                                                                       advertise_events=advertise)
             # ONU OMCI Capabilities state machine
             capabilities_info = support_classes.get('omci-capabilities')
+            advertise = capabilities_info['advertise-events']
             self._capabilities_sm = capabilities_info['state-machine'](self._omci_agent,
                                                                        device_id,
-                                                                       capabilities_info['tasks'])
+                                                                       capabilities_info['tasks'],
+                                                                       advertise_events=advertise)
             # ONU Performance Monitoring Intervals state machine
             interval_info = support_classes.get('performance-intervals')
+            advertise = interval_info['advertise-events']
             self._pm_intervals_sm = interval_info['state-machine'](self._omci_agent, device_id,
-                                                                   interval_info['tasks'])
+                                                                   interval_info['tasks'],
+                                                                   advertise_events=advertise)
         except Exception as e:
             self.log.exception('state-machine-create-failed', e=e)
             raise
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 0e21b29..57cf496 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -34,6 +34,7 @@
         'state-machine': MibSynchronizer,  # Implements the MIB synchronization state machine
         # 'database': MibDbVolatileDict,     # Implements volatile ME MIB database
         'database': MibDbExternal,         # Implements persistent ME MIB database
+        'advertise-events': True,          # Advertise events on OpenOMCI event bus
         'tasks': {
             'mib-upload': MibUploadTask,
             'get-mds': GetMdsTask,
@@ -44,12 +45,14 @@
     },
     'omci-capabilities': {
         'state-machine': OnuOmciCapabilities,   # Implements OMCI capabilities state machine
+        'advertise-events': False,              # Advertise events on OpenOMCI event bus
         'tasks': {
             'get-capabilities': OnuCapabilitiesTask  # Get supported ME and Commands
         }
     },
     'performance-intervals': {
         'state-machine': PerformanceIntervals,  # Implements PM Intervals State machine
+        'advertise-events': False,              # Advertise events on OpenOMCI event bus
         'tasks': {
             'sync-time': SyncTimeTask,
             'collect-data': IntervalDataTask,
@@ -85,7 +88,8 @@
         self.log = structlog.get_logger()
         self._core = core
         self._started = False
-        self._devices = dict()        # device-id -> DeviceEntry
+        self._devices = dict()       # device-id -> DeviceEntry
+        self._event_bus = None
 
         # OMCI related databases are on a per-agent basis. State machines and tasks
         # are per ONU Vendore
@@ -145,6 +149,7 @@
 
         self.log.debug('stop')
         self._started = False
+        self._event_bus = None
 
         # ONUs OMCI shutdown
         for device in self._devices.itervalues():
@@ -153,6 +158,27 @@
         # DB shutdown
         self._mib_db.stop()
 
+    def mk_event_bus(self):
+        """ Get the event bus for OpenOMCI"""
+        if self._event_bus is None:
+            from voltha.extensions.omci.openomci_event_bus import OpenOmciEventBus
+            self._event_bus = OpenOmciEventBus()
+
+        return self._event_bus
+
+    def advertise(self, event_type, data):
+        """
+        Advertise an OpenOMCU event on the kafka bus
+        :param event_type: (int) Event Type (enumberation from OpenOMCI protobuf definitions)
+        :param data: (Message, dict, ...) Associated data (will be convert to a string)
+        """
+        if self._started:
+            try:
+                self.mk_event_bus().advertise(event_type, data)
+
+            except Exception as e:
+                self.log.exception('advertise-failure', e=e)
+
     def add_device(self, device_id, adapter_agent, custom_me_map=None,
                    support_classes=OpenOmciAgentDefaults):
         """
diff --git a/voltha/extensions/omci/openomci_event_bus.py b/voltha/extensions/omci/openomci_event_bus.py
new file mode 100644
index 0000000..c432032
--- /dev/null
+++ b/voltha/extensions/omci/openomci_event_bus.py
@@ -0,0 +1,50 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+from common.event_bus import EventBusClient
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEvent
+from common.utils.json_format import MessageToDict
+
+
+class OpenOmciEventBus(object):
+    """ Event bus for publishing OpenOMCI related events. """
+    __slots__ = (
+        '_event_bus_client',  # The event bus client used to publish events.
+        '_topic'              # the topic to publish to
+    )
+
+    def __init__(self):
+        self._event_bus_client = EventBusClient()
+        self._topic = 'openomci-events'
+
+    def message_to_dict(m):
+        return MessageToDict(m, True, True, False)
+
+    def advertise(self, event_type, data):
+        if isinstance(data, Message):
+            msg = dumps(MessageToDict(data, True, True))
+        elif isinstance(data, dict):
+            msg = dumps(data)
+        else:
+            msg = str(data)
+
+        event = OpenOmciEvent(
+            type=event_type,
+            data=msg
+        )
+        self._event_bus_client.publish(self._topic, event)
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index 854daf2..58ec377 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -26,6 +26,7 @@
     SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
 from voltha.extensions.omci.omci_entities import OntData
 from common.event_bus import EventBusClient
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
 
 RxEvent = OmciCCRxEvents
 DevEvent = OnuDeviceEvents
@@ -73,7 +74,9 @@
     DEFAULT_AUDIT_DELAY = 15       # Periodic tick to audit the MIB Data Sync
     DEFAULT_RESYNC_DELAY = 300     # Periodically force a resync
 
-    def __init__(self, agent, device_id, mib_sync_tasks, db, states=DEFAULT_STATES,
+    def __init__(self, agent, device_id, mib_sync_tasks, db,
+                 advertise_events=False,
+                 states=DEFAULT_STATES,
                  transitions=DEFAULT_TRANSITIONS,
                  initial_state='disabled',
                  timeout_delay=DEFAULT_TIMEOUT_RETRY,
@@ -85,6 +88,7 @@
         :param agent: (OpenOmciAgent) Agent
         :param device_id: (str) ONU Device ID
         :param db: (MibDbVolatileDict) MIB Database
+        :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
         :param mib_sync_tasks: (dict) Tasks to run
         :param states: (list) List of valid states
         :param transitions: (dict) Dictionary of triggers and state changes
@@ -111,9 +115,10 @@
         self._get_mds_task = mib_sync_tasks['get-mds']
         self._audit_task = mib_sync_tasks['mib-audit']
         self._resync_task = mib_sync_tasks['mib-resync']
+        self._advertise_events = advertise_events
 
         self._deferred = None
-        self._current_task = None   # TODO: Support multiple running tasks after v.1.3.0 release
+        self._current_task = None  # TODO: Support multiple running tasks after v.2.0 release
         self._task_deferred = None
         self._mib_data_sync = 0
         self._last_mib_db_sync_value = None
@@ -219,11 +224,31 @@
         """
         return self.last_mib_db_sync is None
 
+    @property
+    def advertise_events(self):
+        return self._advertise_events
+
+    @advertise_events.setter
+    def advertise_events(self, value):
+        if not isinstance(value, bool):
+            raise TypeError('Advertise event is a boolean')
+        self._advertise_events = value
+
+    def advertise(self, event, info):
+        """Advertise an event on the OpenOMCI event bus"""
+        if self._advertise_events:
+            self._agent.advertise(event,
+                                  {
+                                      'state-machine': self.machine.name,
+                                      'info': info,
+                                      'time': str(datetime.utcnow())
+                                  })
+
     def on_enter_disabled(self):
         """
         State machine is being stopped
         """
-        self.log.debug('state-transition')
+        self.advertise(OpenOmciEventType.state_change, self.state)
 
         self._cancel_deferred()
         if self._device is not None:
@@ -271,7 +296,7 @@
         Determine ONU status and start MIB Synchronization tasks
         """
         self._device = self._agent.get_device(self._device_id)
-        self.log.debug('state-transition', new_onu=self.is_new_onu)
+        self.advertise(OpenOmciEventType.state_change, self.state)
 
         # Make sure root of external MIB Database exists
         self._seed_database()
@@ -313,8 +338,10 @@
         """
         Begin full MIB data sync, starting with a MIB RESET
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+
         def success(results):
-            self.log.debug('mib-upload-success: {}'.format(results))
+            self.log.debug('mib-upload-success', results=results)
             self._current_task = None
             self._deferred = reactor.callLater(0, self.success)
 
@@ -334,10 +361,12 @@
         Create a simple task to fetch the MIB Data Sync value and
         determine if the ONU value matches what is in the MIB database
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+
         self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
 
         def success(onu_mds_value):
-            self.log.debug('examine-mds-success: {}'.format(onu_mds_value))
+            self.log.debug('examine-mds-success', mds_value=onu_mds_value)
             self._current_task = None
 
             # Examine MDS value
@@ -361,7 +390,7 @@
         """
         Schedule a tick to occur to in the future to request an audit
         """
-        self.log.debug('state-transition', audit_delay=self._audit_delay)
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self.last_mib_db_sync = datetime.utcnow()
         self._device.mib_db_in_sync = True
 
@@ -383,7 +412,7 @@
 
         Schedule a tick to occur to in the future to request an audit
         """
-        self.log.debug('state-transition', audit_delay=self._audit_delay)
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._device.mib_db_in_sync = False
 
         if all(diff is None for diff in [self._on_olt_only_diffs,
@@ -441,13 +470,13 @@
         next_resync = self.last_mib_db_sync + timedelta(seconds=self._resync_delay)\
             if self.last_mib_db_sync is not None else datetime.utcnow()
 
-        self.log.debug('state-transition', next_resync=next_resync)
+        self.advertise(OpenOmciEventType.state_change, self.state)
 
         if datetime.utcnow() >= next_resync:
             self._deferred = reactor.callLater(0, self.force_resync)
         else:
             def success(onu_mds_value):
-                self.log.debug('get-mds-success: {}'.format(onu_mds_value))
+                self.log.debug('get-mds-success', mds_value=onu_mds_value)
                 self._current_task = None
 
                 # Examine MDS value
@@ -472,8 +501,10 @@
 
         First calculate any differences
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
+
         def success(results):
-            self.log.debug('resync-success: {}'.format(results))
+            self.log.debug('resync-success', results=results)
 
             on_olt_only = results.get('on-olt-only')
             on_onu_only = results.get('on-onu-only')
diff --git a/voltha/extensions/omci/state_machines/omci_onu_capabilities.py b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
index 45145c7..c13739e 100644
--- a/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
+++ b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
@@ -17,6 +17,7 @@
 from transitions import Machine
 from twisted.internet import reactor
 from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
 
 
 class OnuOmciCapabilities(object):
@@ -40,6 +41,7 @@
     DEFAULT_RETRY = 10      # Seconds to delay after task failure/timeout/poll
 
     def __init__(self, agent, device_id, tasks,
+                 advertise_events=False,
                  states=DEFAULT_STATES,
                  transitions=DEFAULT_TRANSITIONS,
                  initial_state='disabled',
@@ -50,6 +52,7 @@
         :param agent: (OpenOmciAgent) Agent
         :param device_id: (str) ONU Device ID
         :param tasks: (dict) Tasks to run
+        :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
         :param states: (list) List of valid states
         :param transitions: (dict) Dictionary of triggers and state changes
         :param initial_state: (str) Initial state machine state
@@ -63,6 +66,7 @@
         self._timeout_delay = timeout_delay
 
         self._get_capabilities_task = tasks['get-capabilities']
+        self._advertise_events = advertise_events
 
         self._deferred = None
         self._current_task = None
@@ -136,11 +140,33 @@
         """
         return self._supported_msg_types if len(self._supported_msg_types) else None
 
+    @property
+    def advertise_events(self):
+        return self._advertise_events
+
+    @advertise_events.setter
+    def advertise_events(self, value):
+        if not isinstance(value, bool):
+            raise TypeError('Advertise event is a boolean')
+        self._advertise_events = value
+
+    def advertise(self, event, info):
+        """Advertise an event on the OpenOMCI event bus"""
+        from datetime import datetime
+
+        if self._advertise_events:
+            self._agent.advertise(event,
+                                  {
+                                      'state-machine': self.machine.name,
+                                      'info': info,
+                                      'time': str(datetime.utcnow())
+                                  })
+
     def on_enter_disabled(self):
         """
         State machine is being stopped
         """
-        self.log.debug('state-transition')
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._cancel_tasks()
 
@@ -158,7 +184,7 @@
         State machine has just started or the MIB database has transitioned
         to an out-of-synchronization state
         """
-        self.log.debug('state-transition')
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._device = self._agent.get_device(self._device_id)
 
@@ -190,11 +216,11 @@
         """
         State machine has just transitioned to an in-synchronization state
         """
-        self.log.debug('state-transition')
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
 
         def success(results):
-            self.log.debug('capabilities-success: {}'.format(results))
+            self.log.debug('capabilities-success', results=results)
             self._supported_entities = self._current_task.supported_managed_entities
             self._supported_msg_types = self._current_task.supported_message_types
             self._current_task = None
@@ -217,7 +243,7 @@
         Notify any subscribers for a capabilities event and wait until
         stopped or ONU MIB database goes out of sync
         """
-        self.log.debug('state-transition')
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._device.publish_omci_capabilities_event()
 
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index b3c656c..7f81ea3 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -19,6 +19,7 @@
 from random import uniform, shuffle
 from twisted.internet import reactor
 from common.utils.indexpool import IndexPool
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
 from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
 from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
     RX_RESPONSE_KEY
@@ -83,6 +84,7 @@
     DEFAULT_COLLECT_ATTEMPTS = 3     # Maximum number of collection fetch attempts
 
     def __init__(self, agent, device_id, tasks,
+                 advertise_events=False,
                  states=DEFAULT_STATES,
                  transitions=DEFAULT_TRANSITIONS,
                  initial_state='disabled',
@@ -96,6 +98,7 @@
         :param agent: (OpenOmciAgent) Agent
         :param device_id: (str) ONU Device ID
         :param tasks: (dict) Tasks to run
+        :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
         :param states: (list) List of valid states
         :param transitions: (dict) Dictionary of triggers and state changes
         :param initial_state: (str) Initial state machine state
@@ -119,6 +122,7 @@
         self._get_interval_task = tasks['collect-data']
         self._create_pm_task = tasks['create-pm']
         self._delete_pm_task = tasks['delete-pm']
+        self._advertise_events = advertise_events
 
         self._omci_cc_subscriptions = {               # RxEvent.enum -> Subscription Object
             RxEvent.MIB_Reset: None,
@@ -192,10 +196,30 @@
     def device_id(self):
         return self._device_id
 
+    @property
+    def advertise_events(self):
+        return self._advertise_events
+
+    @advertise_events.setter
+    def advertise_events(self, value):
+        if not isinstance(value, bool):
+            raise TypeError('Advertise event is a boolean')
+        self._advertise_events = value
+
+    def advertise(self, event, info):
+        """Advertise an event on the OpenOMCI event bus"""
+        if self._advertise_events:
+            self._agent.advertise(event,
+                                  {
+                                      'state-machine': self.machine.name,
+                                      'info': info,
+                                      'time': str(datetime.utcnow()),
+                                      'next': str(self._next_interval)
+                                  })
+
     def _me_is_supported(self, class_id):
         """
         Check to see if ONU supports this ME
-
         :param class_id: (int) ME Class ID
         :return: (bool) If ME is supported
         """
@@ -212,7 +236,7 @@
         case already in the Idle state.
 
         :param pm_class_id: (int) ME Class ID (1..0xFFFE)
-        :param pm_entity_id: (int) Instancec ID (1..0xFFFE)
+        :param pm_entity_id: (int) Instance ID (1..0xFFFE)
         :param cid: (int) Class ID of entity monitored, may be None
         :param eid: (int) Instance ID of entity monitored, may be None
         :param upstream: (bool): Flag indicating if PM is for upstream traffic
@@ -274,6 +298,7 @@
         """
         State machine is being stopped
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._cancel_tasks()
         self._next_interval = None
@@ -301,6 +326,8 @@
 
     def on_enter_starting(self):
         """ Add the PON/ANI and UNI PM intervals"""
+        self.advertise(OpenOmciEventType.state_change, self.state)
+
         self._device = self._agent.get_device(self._device_id)
         self._cancel_deferred()
 
@@ -365,6 +392,7 @@
         """
         State machine has just transitioned to the synchronize_time state
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
 
         def success(results):
@@ -391,6 +419,7 @@
         In this state, any added PM MEs that need to be created will be.
         TODO: some non-interval PM stats (if there are any) are collected here
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
 
         if len(self._del_pm_me) and self._delete_me_deferred is None:
@@ -408,6 +437,7 @@
         """
         State machine has just transitioned to the create_pm_me state
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._cancel_tasks()
         mes, self._add_pm_me = self._add_pm_me, dict()
@@ -439,6 +469,7 @@
         """
         State machine has just transitioned to the delete_pm_me state
         """
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._cancel_tasks()
 
@@ -474,6 +505,7 @@
             reactor.callLater(0, self.success)
             return
 
+        self.advertise(OpenOmciEventType.state_change, self.state)
         self._cancel_deferred()
         self._cancel_tasks()
         keys = self._pm_me_collect_retries.keys()
diff --git a/voltha/protos/omci_mib_db.proto b/voltha/protos/omci_mib_db.proto
index 89d2e65..914e480 100644
--- a/voltha/protos/omci_mib_db.proto
+++ b/voltha/protos/omci_mib_db.proto
@@ -66,3 +66,14 @@
     repeated MessageType message_types = 8;
 }
 
+message OpenOmciEventType {
+    enum OpenOmciEventType {
+        state_change = 0; // A state machine has transitioned to a new state
+    }
+}
+
+message OpenOmciEvent {
+    OpenOmciEventType.OpenOmciEventType type = 1;
+
+    string data = 2; // associated data, in json format
+}
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index 845472c..e7b0340 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -86,4 +86,7 @@
             'kpis':
                 kafka_topic: 'voltha.kpis'
                 filters:     [null]
+            'openomci-events':
+                kafka_topic: 'voltha.events'
+                filters:     [null]