VOL-703: Improvements to OpenOMCIAgent add_device call, and more unit tests
ready for review
Change-Id: Iec40e507c63bc1823576a7255af23add32cb651a
diff --git a/tests/utests/voltha/extensions/omci/test_onu_device_entry.py b/tests/utests/voltha/extensions/omci/test_onu_device_entry.py
new file mode 100644
index 0000000..1c3d3ce
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_onu_device_entry.py
@@ -0,0 +1,256 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from unittest import TestCase, main
+from nose.tools import assert_raises
+from nose.twistedtools import deferred
+from copy import deepcopy
+from mock.mock_adapter_agent import MockAdapterAgent, MockCore
+from mock.mock_onu_handler import MockOnuHandler
+from mock.mock_olt_handler import MockOltHandler
+from mock.mock_onu import MockOnu
+from voltha.extensions.omci.openomci_agent import OpenOMCIAgent, OpenOmciAgentDefaults
+from voltha.extensions.omci.omci_defs import *
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_api import DEVICE_ID_KEY, CLASS_ID_KEY, CREATED_KEY, \
+ MODIFIED_KEY, MDS_KEY, LAST_SYNC_KEY, VERSION_KEY, DatabaseStateError
+from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+
+
+DEFAULT_OLT_DEVICE_ID = 'default_olt_mock'
+DEFAULT_ONU_DEVICE_ID = 'default_onu_mock'
+DEFAULT_PON_ID = 0
+DEFAULT_ONU_ID = 0
+DEFAULT_ONU_SN = 'TEST00000001'
+
+OP = EntityOperations
+RC = ReasonCodes
+
+
+def chunk(indexable, chunk_size):
+ for i in range(0, len(indexable), chunk_size):
+ yield indexable[i:i + chunk_size]
+
+
+def hex2raw(hex_string):
+ return ''.join(chr(int(byte, 16)) for byte in chunk(hex_string, 2))
+
+
+class TestOnuDeviceEntry(TestCase):
+ """
+ Test the ONU Device Entry methods
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+
+ custom = deepcopy(OpenOmciAgentDefaults)
+ custom['mib-synchronizer']['database'] = MibDbVolatileDict
+
+ self.agent = OpenOMCIAgent(MockCore, support_classes=custom)
+ self.agent.start()
+
+ def tearDown(self):
+ if self.agent is not None:
+ self.agent.stop()
+
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
+
+ def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID):
+ handler = MockOltHandler(self.adapter_agent, device_id)
+ self.adapter_agent.add_device(handler.device)
+ return handler
+
+ def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID,
+ device_id=DEFAULT_ONU_DEVICE_ID,
+ pon_id=DEFAULT_PON_ID,
+ onu_id=DEFAULT_ONU_ID,
+ serial_no=DEFAULT_ONU_SN):
+ handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id)
+ handler.serial_number = serial_no
+ onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \
+ if serial_no is not None else None
+ handler.onu_mock = onu
+ return handler
+
+ def setup_one_of_each(self):
+ # Most tests will use at lease one or more OLT and ONU
+ self.olt_handler = self.setup_mock_olt()
+ self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id)
+ self.onu_device = self.onu_handler.onu_mock
+
+ self.adapter_agent.add_child_device(self.olt_handler.device,
+ self.onu_handler.device)
+
+ def test_add_remove_device(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.agent.device_ids()), 0)
+
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+
+ # No MIB if not started
+ assert_raises(KeyError, onu_device.query_mib)
+
+ self.agent.remove_device(DEFAULT_ONU_DEVICE_ID)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+
+ def test_delete_device(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.agent.device_ids()), 0)
+
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+ # Can delete if it was not started
+ onu_device.delete()
+ self.assertEqual(len(self.agent.device_ids()), 0)
+
+ ##########################################
+ # Delete of ONU device okay if it is started
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+
+ # Start it and then delete it
+ onu_device.start()
+ onu_device.delete()
+ self.assertEqual(len(self.agent.device_ids()), 0)
+
+ @deferred(timeout=5)
+ def test_mib_query_fails_if_dev_not_started(self):
+ self.setup_one_of_each()
+
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+
+ def not_called(_reason):
+ assert False, 'Should never be called'
+
+ def check_status(_results):
+ # Device not yet started. Query should fail with KeyError since
+ # ONU is not in database yet
+ assert_raises(KeyError, onu_device.query_mib)
+
+ # Yield context so that MIB Database callLater runs. This is a waiting
+ # Async task from when the OpenOMCIAgent was started.
+ d = asleep(0.2)
+ d.addCallbacks(check_status, not_called)
+
+ return d
+
+ @deferred(timeout=5)
+ def test_mib_query_ok_if_dev_started(self):
+ self.setup_one_of_each()
+
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+
+ def not_called(_reason):
+ onu_device.stop()
+ assert False, 'Should never be called'
+
+ def check_status(_results):
+ # Device started. Query will succeed but nothing should be populated
+ # but the most basic items
+
+ results = onu_device.query_mib()
+ self.assertTrue(isinstance(results, dict))
+ self.assertEqual(results.get(DEVICE_ID_KEY), DEFAULT_ONU_DEVICE_ID)
+
+ self.assertIsNotNone(results.get(VERSION_KEY))
+ self.assertIsNotNone(results.get(CREATED_KEY))
+ self.assertIsNone(results.get(MODIFIED_KEY)) # Created! but not yet modified
+
+ self.assertEqual(results.get(MDS_KEY), 0)
+ self.assertIsNone(results.get(LAST_SYNC_KEY))
+
+ self.assertIsNone(results.get(CLASS_ID_KEY))
+
+ # Stopping still allows a query. Note you just delete a device
+ # to clean up any associated databases
+ onu_device.stop()
+ results = onu_device.query_mib()
+ self.assertTrue(isinstance(results, dict))
+
+ # Yield context so that MIB Database callLater runs. This is a waiting
+ # Async task from when the OpenOMCIAgent was started. But also start the
+ # device so that it's queued async state machines can run as well
+ onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(check_status, not_called)
+
+ return d
+
+ @deferred(timeout=5)
+ def test_delete_scrubs_mib(self):
+ self.setup_one_of_each()
+
+ onu_device = self.agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+ self.assertIsNotNone(onu_device)
+ self.assertEqual(len(self.agent.device_ids()), 1)
+ self.assertEqual(self.agent.get_device(DEFAULT_ONU_DEVICE_ID), onu_device)
+
+ def not_called(_reason):
+ onu_device.stop()
+ assert False, 'Should never be called'
+
+ def check_status(_results):
+ # Device started. Query will succeed but nothing should be populated
+ # but the most basic items
+
+ results = onu_device.query_mib()
+ self.assertTrue(isinstance(results, dict))
+ self.assertEqual(results.get(DEVICE_ID_KEY), DEFAULT_ONU_DEVICE_ID)
+
+ # Delete should wipe out any MIB data. Note that a delete of a started
+ # or stopped ONU device is allowed. In this case we are deleting a
+ # started ONU Device
+
+ onu_device.delete()
+ assert_raises(Exception, onu_device.query_mib)
+ # TODO: When capabilities are supported, make sure capabilities get cleared as well
+
+ # Yield context so that MIB Database callLater runs. This is a waiting
+ # Async task from when the OpenOMCIAgent was started. But also start the
+ # device so that it's queued async state machines can run as well
+ onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(check_status, not_called)
+
+ return d
+
+ # TODO: Test pub/sub interface if possible
+ # TODO: Test custom/vendor-specific ME support
+ # TODO: Test override of various state machines or OMCI tasks if possible
+
+
+if __name__ == '__main__':
+ main()
+
diff --git a/tests/utests/voltha/extensions/omci/test_openomci_agent.py b/tests/utests/voltha/extensions/omci/test_openomci_agent.py
new file mode 100644
index 0000000..d1c69d2
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_openomci_agent.py
@@ -0,0 +1,90 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from unittest import TestCase, main
+from nose.tools import assert_raises
+from copy import deepcopy
+from mock.mock_adapter_agent import MockAdapterAgent, MockCore
+from voltha.extensions.omci.openomci_agent import OpenOMCIAgent, OpenOmciAgentDefaults
+from voltha.extensions.omci.database.mib_db_ext import MibDbExternal
+from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+from voltha.extensions.omci.state_machines.mib_sync import MibSynchronizer
+from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
+from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
+from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+
+
+class TestOpenOmciAgent(TestCase):
+ """
+ Test the Open OMCI Agent
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+
+ def tearDown(self):
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
+
+ def test_omci_agent_defaults(self):
+ # Make sure someone does not check in bad default values
+
+ mib_sync = OpenOmciAgentDefaults.get('mib-synchronizer')
+
+ self.assertIsNotNone(mib_sync)
+ self.assertTrue(isinstance(mib_sync['state-machine'], type(MibSynchronizer)))
+ self.assertTrue(isinstance(mib_sync['database'], type(MibDbExternal)))
+
+ mib_sync_tasks = mib_sync.get('tasks')
+
+ self.assertIsNotNone(mib_sync_tasks)
+ self.assertTrue(isinstance(mib_sync_tasks['mib-upload'], type(MibUploadTask)))
+ self.assertTrue(isinstance(mib_sync_tasks['get-mds'], type(GetMdsTask)))
+ self.assertTrue(isinstance(mib_sync_tasks['mib-audit'], type(GetMdsTask)))
+ self.assertTrue(isinstance(mib_sync_tasks['mib-resync'], type(MibResyncTask)))
+ # self.assertTrue(isinstance(mib_sync_tasks['mib-reconcile'], type('TODO: not yet coded')))
+
+ # caps = OpenOmciAgentDefaults.get('onu-capabilities')
+ #
+ # self.assertIsNotNone(caps)
+
+ def test_omci_agent_default_init(self):
+ agent = OpenOMCIAgent(MockCore)
+
+ self.assertTrue(isinstance(agent.core, type(MockCore)))
+ self.assertTrue(isinstance(agent.database_class, type(MibDbExternal)))
+ self.assertEqual(len(agent.device_ids()), 0)
+ assert_raises(KeyError, agent.get_device, 'deadbeef')
+
+ def test_omci_agent_custom_mib_database(self):
+ custom = deepcopy(OpenOmciAgentDefaults)
+ custom['mib-synchronizer']['database'] = MibDbVolatileDict
+ agent = OpenOMCIAgent(MockCore, support_classes=custom)
+
+ self.assertTrue(isinstance(agent.core, type(MockCore)))
+ self.assertTrue(isinstance(agent.database_class, type(MibDbVolatileDict)))
+
+ def test_omci_agent_start_stop(self):
+ agent = OpenOMCIAgent(MockCore)
+
+ agent.start()
+ agent.start() # Should be a NOP, no side effects
+
+ agent.stop()
+ agent.stop() # Should be a NOP, no side effects
+
+
+if __name__ == '__main__':
+ main()
+
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index c02f86e..0deca63 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -1,5 +1,5 @@
#
-# Copyright 2017 the original author or authors.
+# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -44,7 +44,7 @@
An ONU Device entry in the MIB
"""
def __init__(self, omci_agent, device_id, adapter_agent, custom_me_map,
- mib_synchronizer_info, mib_db):
+ mib_db, support_classes):
"""
Class initializer
@@ -52,8 +52,8 @@
:param device_id: (str) ONU Device ID
:param adapter_agent: (AdapterAgent) Adapter agent for ONU
:param custom_me_map: (dict) Additional/updated ME to add to class map
- :param mib_synchronizer_info: (dict) MIB Synchronization State Machine & Task information
:param mib_db: (MibDbApi) MIB Database reference
+ :param support_classes: (dict) State machines and tasks for this ONU
"""
self.log = structlog.get_logger(device_id=device_id)
@@ -62,9 +62,12 @@
self._device_id = device_id # ONU Device ID
self._runner = TaskRunner(device_id) # OMCI_CC Task runner
self._deferred = None
+ self._first_in_sync = False
+ self._support_classes = support_classes
try:
self._mib_db_in_sync = False
+ mib_synchronizer_info = support_classes.get('mib-synchronizer')
self.mib_sync = mib_synchronizer_info['state-machine'](self._omci_agent,
device_id,
mib_synchronizer_info['tasks'],
@@ -73,7 +76,10 @@
self.log.exception('mib-sync-create-failed', e=e)
raise
- self._state_machines = [self.mib_sync]
+ self._state_machines = []
+ self._on_start_state_machines = [self.mib_sync] # Run when 'start()' called
+ self._on_sync_state_machines = [] # Run after first in_sync event
+
self._custom_me_map = custom_me_map
self._me_map = omci_entities.entity_id_to_class_map.copy()
@@ -148,8 +154,11 @@
# Save value
self._mib_db_in_sync = value
- # Notify any event listeners
+ # Start up other state machines if needed
+ if self._first_in_sync:
+ self.first_in_sync_event()
+ # Notify any event listeners
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.MibDatabaseSyncEvent)
msg = {
@@ -167,26 +176,29 @@
self._started = True
self._omci_cc.enabled = True
+ self._first_in_sync = True
self._runner.start()
- # Start MIB Sync and other state machines. Start 'later' so that any
+ # Start MIB Sync and other state machines that can run before the first
+ # MIB Synchronization event occurs. Start 'later' so that any
# ONU Device, OMCI DB, OMCI Agent, and others are fully started before
# performing the start.
+ self._state_machines = []
+
def start_state_machines(machines):
for sm in machines:
+ self._state_machines.append(sm)
sm.start()
self._deferred = reactor.callLater(0, start_state_machines,
- self._state_machines)
+ self._on_start_state_machines)
# Notify any event listeners
self._publish_device_status_event()
def stop(self):
"""
Stop the ONU Device Entry state machines
-
- When the ONU Device Entry is stopped,
"""
if not self._started:
return
@@ -199,12 +211,34 @@
for sm in self._state_machines:
sm.stop()
+ self._state_machines = []
+
# Stop task runner
self._runner.stop()
# Notify any event listeners
self._publish_device_status_event()
+ def first_in_sync_event(self):
+ """
+ This event is called on the first MIB synchronization event after
+ OpenOMCI has been started. It is responsible for starting any
+ other state machine and to initiate an ONU Capabilities report
+ """
+ if self._first_in_sync:
+ self._first_in_sync = False
+
+ # TODO: Start up the ONU Capabilities task
+
+ # Start up any other remaining OpenOMCI state machines
+ def start_state_machines(machines):
+ for sm in machines:
+ self._state_machines.append(sm)
+ sm.start()
+
+ self._deferred = reactor.callLater(0, start_state_machines,
+ self._on_sync_state_machines)
+
def _publish_device_status_event(self):
"""
Publish the ONU Device start/start status.
@@ -220,6 +254,7 @@
OMCI state information from the OpenOMCI Framework
"""
self.stop()
+ self.mib_synchronizer.delete()
# OpenOMCI cleanup
if self._omci_agent is not None:
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 22d56b4..1732fe2 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -36,7 +36,8 @@
'mib-reconcile': None # TODO: post-v1.3.0 (Reconcile out-of-sync MIB DB)
}
},
- # TODO: Alarm-synchronizer is a stretch goal for Voltha 1.3.0
+ # 'onu-capabilities': OnuCapabilitiesTask,
+ #
# 'alarm-syncronizer': {
# 'state-machine': AlarmSynchronizer, # Implements the MIB synchronization state machine
# 'database': AlarmDb, # For any State storage needs
@@ -133,7 +134,8 @@
# DB shutdown
self._mib_db.stop()
- def add_device(self, device_id, adapter_agent, custom_me_map=None):
+ def add_device(self, device_id, adapter_agent, custom_me_map=None,
+ support_classes=OpenOmciAgentDefaults):
"""
Add a new ONU to be managed.
@@ -147,6 +149,7 @@
:param device_id: (str) Device ID of ONU to add
:param adapter_agent: (AdapterAgent) Adapter agent for ONU
:param custom_me_map: (dict) Additional/updated ME to add to class map
+ :param support_classes: (dict) State machines and tasks for this ONU
:return: (OnuDeviceEntry) The ONU device
"""
@@ -156,7 +159,7 @@
if device is None:
device = OnuDeviceEntry(self, device_id, adapter_agent, custom_me_map,
- self._mib_synchronizer_info, self._mib_db)
+ self._mib_db, support_classes)
self._devices[device_id] = device
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index f121033..8b60846 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -162,6 +162,16 @@
def __str__(self):
return 'MIBSynchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+ db, self._database = self._database, None
+
+ if db is not None:
+ db.remove(self._device_id)
+
@property
def device_id(self):
return self._device_id
@@ -302,7 +312,7 @@
self._current_task = None
# Examine MDS value
- if self._mib_data_sync == onu_mds_value:
+ if self.mib_data_sync == onu_mds_value:
self._deferred = reactor.callLater(0, self.success)
else:
self._deferred = reactor.callLater(0, self.mismatch)
@@ -362,7 +372,6 @@
try:
# Need to update the ONU accordingly
if self._attr_diffs is not None:
- assert self._attr_diffs is not None, 'Should match'
step = 'attribute-update'
pass # TODO: Perform the 'set' commands needed
@@ -375,11 +384,11 @@
#
# For instance, no one may set the gal_loopback_configuration
# in the GEM Interworking Termination point since its default
- # values is '0' disable, but when we audit, the ONU will report zer
+ # values is '0' disable, but when we audit, the ONU will report zero.
#
# A good way to perhaps fix this is to update our database with the
# default. Or perhaps set all defaults in the database in the first
- # place when we do the initial create/set
+ # place when we do the initial create/set.
#
pass # TODO: Perform 'delete' commands as needed, see 'default' note above
@@ -413,7 +422,7 @@
self._current_task = None
# Examine MDS value
- if self._mib_data_sync == onu_mds_value:
+ if self.mib_data_sync == onu_mds_value:
self._deferred = reactor.callLater(0, self.success)
else:
self._device.mib_db_in_sync = False
@@ -551,7 +560,7 @@
def on_mib_upload_response(self, _topic, msg):
"""
- Process a Set response
+ Process a MIB Upload response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
@@ -562,6 +571,7 @@
# Check if expected in current mib_sync state
if self.state == 'resynchronizing':
# The resync task handles this
+ # TODO: Remove this subscription if we never do anything with the response
return
if self.state != 'uploading':
@@ -569,7 +579,7 @@
def on_mib_upload_next_response(self, _topic, msg):
"""
- Process a Set response
+ Process a MIB Upload Next response
:param _topic: (str) OMCI-RX topic
:param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)