VOL-1099: Event Bus topic for advertising OpenOMCI
state machine change events. Had to touch up the
review due to max commit message line set to
only 50 characters.
Fix indention issue in proto file
Change-Id: I09f472bd8b4bbf276fd31dd1b823ef81d8960394
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 88d75d4..af60d15 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -79,19 +79,25 @@
# MIB Synchronization state machine
self._mib_db_in_sync = False
mib_synchronizer_info = support_classes.get('mib-synchronizer')
+ advertise = mib_synchronizer_info['advertise-events']
self._mib_sync_sm = mib_synchronizer_info['state-machine'](self._omci_agent,
device_id,
mib_synchronizer_info['tasks'],
- mib_db)
+ mib_db,
+ advertise_events=advertise)
# ONU OMCI Capabilities state machine
capabilities_info = support_classes.get('omci-capabilities')
+ advertise = capabilities_info['advertise-events']
self._capabilities_sm = capabilities_info['state-machine'](self._omci_agent,
device_id,
- capabilities_info['tasks'])
+ capabilities_info['tasks'],
+ advertise_events=advertise)
# ONU Performance Monitoring Intervals state machine
interval_info = support_classes.get('performance-intervals')
+ advertise = interval_info['advertise-events']
self._pm_intervals_sm = interval_info['state-machine'](self._omci_agent, device_id,
- interval_info['tasks'])
+ interval_info['tasks'],
+ advertise_events=advertise)
except Exception as e:
self.log.exception('state-machine-create-failed', e=e)
raise
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 0e21b29..57cf496 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -34,6 +34,7 @@
'state-machine': MibSynchronizer, # Implements the MIB synchronization state machine
# 'database': MibDbVolatileDict, # Implements volatile ME MIB database
'database': MibDbExternal, # Implements persistent ME MIB database
+ 'advertise-events': True, # Advertise events on OpenOMCI event bus
'tasks': {
'mib-upload': MibUploadTask,
'get-mds': GetMdsTask,
@@ -44,12 +45,14 @@
},
'omci-capabilities': {
'state-machine': OnuOmciCapabilities, # Implements OMCI capabilities state machine
+ 'advertise-events': False, # Advertise events on OpenOMCI event bus
'tasks': {
'get-capabilities': OnuCapabilitiesTask # Get supported ME and Commands
}
},
'performance-intervals': {
'state-machine': PerformanceIntervals, # Implements PM Intervals State machine
+ 'advertise-events': False, # Advertise events on OpenOMCI event bus
'tasks': {
'sync-time': SyncTimeTask,
'collect-data': IntervalDataTask,
@@ -85,7 +88,8 @@
self.log = structlog.get_logger()
self._core = core
self._started = False
- self._devices = dict() # device-id -> DeviceEntry
+ self._devices = dict() # device-id -> DeviceEntry
+ self._event_bus = None
# OMCI related databases are on a per-agent basis. State machines and tasks
# are per ONU Vendore
@@ -145,6 +149,7 @@
self.log.debug('stop')
self._started = False
+ self._event_bus = None
# ONUs OMCI shutdown
for device in self._devices.itervalues():
@@ -153,6 +158,27 @@
# DB shutdown
self._mib_db.stop()
+ def mk_event_bus(self):
+ """ Get the event bus for OpenOMCI"""
+ if self._event_bus is None:
+ from voltha.extensions.omci.openomci_event_bus import OpenOmciEventBus
+ self._event_bus = OpenOmciEventBus()
+
+ return self._event_bus
+
+ def advertise(self, event_type, data):
+ """
+ Advertise an OpenOMCU event on the kafka bus
+ :param event_type: (int) Event Type (enumberation from OpenOMCI protobuf definitions)
+ :param data: (Message, dict, ...) Associated data (will be convert to a string)
+ """
+ if self._started:
+ try:
+ self.mk_event_bus().advertise(event_type, data)
+
+ except Exception as e:
+ self.log.exception('advertise-failure', e=e)
+
def add_device(self, device_id, adapter_agent, custom_me_map=None,
support_classes=OpenOmciAgentDefaults):
"""
diff --git a/voltha/extensions/omci/openomci_event_bus.py b/voltha/extensions/omci/openomci_event_bus.py
new file mode 100644
index 0000000..c432032
--- /dev/null
+++ b/voltha/extensions/omci/openomci_event_bus.py
@@ -0,0 +1,50 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+from common.event_bus import EventBusClient
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEvent
+from common.utils.json_format import MessageToDict
+
+
+class OpenOmciEventBus(object):
+ """ Event bus for publishing OpenOMCI related events. """
+ __slots__ = (
+ '_event_bus_client', # The event bus client used to publish events.
+ '_topic' # the topic to publish to
+ )
+
+ def __init__(self):
+ self._event_bus_client = EventBusClient()
+ self._topic = 'openomci-events'
+
+ def message_to_dict(m):
+ return MessageToDict(m, True, True, False)
+
+ def advertise(self, event_type, data):
+ if isinstance(data, Message):
+ msg = dumps(MessageToDict(data, True, True))
+ elif isinstance(data, dict):
+ msg = dumps(data)
+ else:
+ msg = str(data)
+
+ event = OpenOmciEvent(
+ type=event_type,
+ data=msg
+ )
+ self._event_bus_client.publish(self._topic, event)
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index 854daf2..58ec377 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -26,6 +26,7 @@
SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
from voltha.extensions.omci.omci_entities import OntData
from common.event_bus import EventBusClient
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
RxEvent = OmciCCRxEvents
DevEvent = OnuDeviceEvents
@@ -73,7 +74,9 @@
DEFAULT_AUDIT_DELAY = 15 # Periodic tick to audit the MIB Data Sync
DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
- def __init__(self, agent, device_id, mib_sync_tasks, db, states=DEFAULT_STATES,
+ def __init__(self, agent, device_id, mib_sync_tasks, db,
+ advertise_events=False,
+ states=DEFAULT_STATES,
transitions=DEFAULT_TRANSITIONS,
initial_state='disabled',
timeout_delay=DEFAULT_TIMEOUT_RETRY,
@@ -85,6 +88,7 @@
:param agent: (OpenOmciAgent) Agent
:param device_id: (str) ONU Device ID
:param db: (MibDbVolatileDict) MIB Database
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
:param mib_sync_tasks: (dict) Tasks to run
:param states: (list) List of valid states
:param transitions: (dict) Dictionary of triggers and state changes
@@ -111,9 +115,10 @@
self._get_mds_task = mib_sync_tasks['get-mds']
self._audit_task = mib_sync_tasks['mib-audit']
self._resync_task = mib_sync_tasks['mib-resync']
+ self._advertise_events = advertise_events
self._deferred = None
- self._current_task = None # TODO: Support multiple running tasks after v.1.3.0 release
+ self._current_task = None # TODO: Support multiple running tasks after v.2.0 release
self._task_deferred = None
self._mib_data_sync = 0
self._last_mib_db_sync_value = None
@@ -219,11 +224,31 @@
"""
return self.last_mib_db_sync is None
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow())
+ })
+
def on_enter_disabled(self):
"""
State machine is being stopped
"""
- self.log.debug('state-transition')
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
if self._device is not None:
@@ -271,7 +296,7 @@
Determine ONU status and start MIB Synchronization tasks
"""
self._device = self._agent.get_device(self._device_id)
- self.log.debug('state-transition', new_onu=self.is_new_onu)
+ self.advertise(OpenOmciEventType.state_change, self.state)
# Make sure root of external MIB Database exists
self._seed_database()
@@ -313,8 +338,10 @@
"""
Begin full MIB data sync, starting with a MIB RESET
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
+
def success(results):
- self.log.debug('mib-upload-success: {}'.format(results))
+ self.log.debug('mib-upload-success', results=results)
self._current_task = None
self._deferred = reactor.callLater(0, self.success)
@@ -334,10 +361,12 @@
Create a simple task to fetch the MIB Data Sync value and
determine if the ONU value matches what is in the MIB database
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
+
self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
def success(onu_mds_value):
- self.log.debug('examine-mds-success: {}'.format(onu_mds_value))
+ self.log.debug('examine-mds-success', mds_value=onu_mds_value)
self._current_task = None
# Examine MDS value
@@ -361,7 +390,7 @@
"""
Schedule a tick to occur to in the future to request an audit
"""
- self.log.debug('state-transition', audit_delay=self._audit_delay)
+ self.advertise(OpenOmciEventType.state_change, self.state)
self.last_mib_db_sync = datetime.utcnow()
self._device.mib_db_in_sync = True
@@ -383,7 +412,7 @@
Schedule a tick to occur to in the future to request an audit
"""
- self.log.debug('state-transition', audit_delay=self._audit_delay)
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._device.mib_db_in_sync = False
if all(diff is None for diff in [self._on_olt_only_diffs,
@@ -441,13 +470,13 @@
next_resync = self.last_mib_db_sync + timedelta(seconds=self._resync_delay)\
if self.last_mib_db_sync is not None else datetime.utcnow()
- self.log.debug('state-transition', next_resync=next_resync)
+ self.advertise(OpenOmciEventType.state_change, self.state)
if datetime.utcnow() >= next_resync:
self._deferred = reactor.callLater(0, self.force_resync)
else:
def success(onu_mds_value):
- self.log.debug('get-mds-success: {}'.format(onu_mds_value))
+ self.log.debug('get-mds-success', mds_value=onu_mds_value)
self._current_task = None
# Examine MDS value
@@ -472,8 +501,10 @@
First calculate any differences
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
+
def success(results):
- self.log.debug('resync-success: {}'.format(results))
+ self.log.debug('resync-success', results=results)
on_olt_only = results.get('on-olt-only')
on_onu_only = results.get('on-onu-only')
diff --git a/voltha/extensions/omci/state_machines/omci_onu_capabilities.py b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
index 45145c7..c13739e 100644
--- a/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
+++ b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
@@ -17,6 +17,7 @@
from transitions import Machine
from twisted.internet import reactor
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
class OnuOmciCapabilities(object):
@@ -40,6 +41,7 @@
DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
def __init__(self, agent, device_id, tasks,
+ advertise_events=False,
states=DEFAULT_STATES,
transitions=DEFAULT_TRANSITIONS,
initial_state='disabled',
@@ -50,6 +52,7 @@
:param agent: (OpenOmciAgent) Agent
:param device_id: (str) ONU Device ID
:param tasks: (dict) Tasks to run
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
:param states: (list) List of valid states
:param transitions: (dict) Dictionary of triggers and state changes
:param initial_state: (str) Initial state machine state
@@ -63,6 +66,7 @@
self._timeout_delay = timeout_delay
self._get_capabilities_task = tasks['get-capabilities']
+ self._advertise_events = advertise_events
self._deferred = None
self._current_task = None
@@ -136,11 +140,33 @@
"""
return self._supported_msg_types if len(self._supported_msg_types) else None
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ from datetime import datetime
+
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow())
+ })
+
def on_enter_disabled(self):
"""
State machine is being stopped
"""
- self.log.debug('state-transition')
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
@@ -158,7 +184,7 @@
State machine has just started or the MIB database has transitioned
to an out-of-synchronization state
"""
- self.log.debug('state-transition')
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._device = self._agent.get_device(self._device_id)
@@ -190,11 +216,11 @@
"""
State machine has just transitioned to an in-synchronization state
"""
- self.log.debug('state-transition')
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
def success(results):
- self.log.debug('capabilities-success: {}'.format(results))
+ self.log.debug('capabilities-success', results=results)
self._supported_entities = self._current_task.supported_managed_entities
self._supported_msg_types = self._current_task.supported_message_types
self._current_task = None
@@ -217,7 +243,7 @@
Notify any subscribers for a capabilities event and wait until
stopped or ONU MIB database goes out of sync
"""
- self.log.debug('state-transition')
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._device.publish_omci_capabilities_event()
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index b3c656c..7f81ea3 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -19,6 +19,7 @@
from random import uniform, shuffle
from twisted.internet import reactor
from common.utils.indexpool import IndexPool
+from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
RX_RESPONSE_KEY
@@ -83,6 +84,7 @@
DEFAULT_COLLECT_ATTEMPTS = 3 # Maximum number of collection fetch attempts
def __init__(self, agent, device_id, tasks,
+ advertise_events=False,
states=DEFAULT_STATES,
transitions=DEFAULT_TRANSITIONS,
initial_state='disabled',
@@ -96,6 +98,7 @@
:param agent: (OpenOmciAgent) Agent
:param device_id: (str) ONU Device ID
:param tasks: (dict) Tasks to run
+ :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
:param states: (list) List of valid states
:param transitions: (dict) Dictionary of triggers and state changes
:param initial_state: (str) Initial state machine state
@@ -119,6 +122,7 @@
self._get_interval_task = tasks['collect-data']
self._create_pm_task = tasks['create-pm']
self._delete_pm_task = tasks['delete-pm']
+ self._advertise_events = advertise_events
self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
RxEvent.MIB_Reset: None,
@@ -192,10 +196,30 @@
def device_id(self):
return self._device_id
+ @property
+ def advertise_events(self):
+ return self._advertise_events
+
+ @advertise_events.setter
+ def advertise_events(self, value):
+ if not isinstance(value, bool):
+ raise TypeError('Advertise event is a boolean')
+ self._advertise_events = value
+
+ def advertise(self, event, info):
+ """Advertise an event on the OpenOMCI event bus"""
+ if self._advertise_events:
+ self._agent.advertise(event,
+ {
+ 'state-machine': self.machine.name,
+ 'info': info,
+ 'time': str(datetime.utcnow()),
+ 'next': str(self._next_interval)
+ })
+
def _me_is_supported(self, class_id):
"""
Check to see if ONU supports this ME
-
:param class_id: (int) ME Class ID
:return: (bool) If ME is supported
"""
@@ -212,7 +236,7 @@
case already in the Idle state.
:param pm_class_id: (int) ME Class ID (1..0xFFFE)
- :param pm_entity_id: (int) Instancec ID (1..0xFFFE)
+ :param pm_entity_id: (int) Instance ID (1..0xFFFE)
:param cid: (int) Class ID of entity monitored, may be None
:param eid: (int) Instance ID of entity monitored, may be None
:param upstream: (bool): Flag indicating if PM is for upstream traffic
@@ -274,6 +298,7 @@
"""
State machine is being stopped
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
self._next_interval = None
@@ -301,6 +326,8 @@
def on_enter_starting(self):
""" Add the PON/ANI and UNI PM intervals"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
+
self._device = self._agent.get_device(self._device_id)
self._cancel_deferred()
@@ -365,6 +392,7 @@
"""
State machine has just transitioned to the synchronize_time state
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
def success(results):
@@ -391,6 +419,7 @@
In this state, any added PM MEs that need to be created will be.
TODO: some non-interval PM stats (if there are any) are collected here
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
if len(self._del_pm_me) and self._delete_me_deferred is None:
@@ -408,6 +437,7 @@
"""
State machine has just transitioned to the create_pm_me state
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
mes, self._add_pm_me = self._add_pm_me, dict()
@@ -439,6 +469,7 @@
"""
State machine has just transitioned to the delete_pm_me state
"""
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
@@ -474,6 +505,7 @@
reactor.callLater(0, self.success)
return
+ self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
self._cancel_tasks()
keys = self._pm_me_collect_retries.keys()
diff --git a/voltha/protos/omci_mib_db.proto b/voltha/protos/omci_mib_db.proto
index 89d2e65..914e480 100644
--- a/voltha/protos/omci_mib_db.proto
+++ b/voltha/protos/omci_mib_db.proto
@@ -66,3 +66,14 @@
repeated MessageType message_types = 8;
}
+message OpenOmciEventType {
+ enum OpenOmciEventType {
+ state_change = 0; // A state machine has transitioned to a new state
+ }
+}
+
+message OpenOmciEvent {
+ OpenOmciEventType.OpenOmciEventType type = 1;
+
+ string data = 2; // associated data, in json format
+}
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index 845472c..e7b0340 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -86,4 +86,7 @@
'kpis':
kafka_topic: 'voltha.kpis'
filters: [null]
+ 'openomci-events':
+ kafka_topic: 'voltha.events'
+ filters: [null]