VOL-610: OpenOMCI MIB Sync and Task Runner
VOL-610: Bug fix and cleanup based on internal review and test
Change-Id: Icd27826d34b2cc188b278eeb0a78264a58fafeb3
diff --git a/requirements.txt b/requirements.txt
index 4cd7dc6..6c57a0c 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -41,6 +41,7 @@
six==1.11.0
structlog==17.2.0
termcolor==1.1.0
+transitions==0.6.4
treq==17.8.0
Twisted==17.9.0
txaioetcd==0.3.0
diff --git a/tests/utests/voltha/extensions/omci/mock/__init__.py b/tests/utests/voltha/extensions/omci/mock/__init__.py
index e69de29..2792694 100644
--- a/tests/utests/voltha/extensions/omci/mock/__init__.py
+++ b/tests/utests/voltha/extensions/omci/mock/__init__.py
@@ -0,0 +1,24 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from nose.twistedtools import threaded_reactor, stop_reactor
+
+
+def setup_module():
+ threaded_reactor()
+
+
+def teardown_module():
+ stop_reactor()
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_task.py b/tests/utests/voltha/extensions/omci/mock/mock_task.py
new file mode 100644
index 0000000..0d85f64
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/mock_task.py
@@ -0,0 +1,97 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from voltha.extensions.omci.tasks.task import Task
+from common.utils.asleep import asleep
+from twisted.internet.defer import inlineCallbacks, failure
+from twisted.internet import reactor
+
+
+class SimpleTask(Task):
+ def __init__(self, omci_agent, device_id,
+ exclusive=True,
+ success=True,
+ delay=0,
+ value=None,
+ priority=Task.DEFAULT_PRIORITY):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param exclusive: (bool) True if the task should run by itself
+ :param success: (bool) True if the task should complete successfully
+ :param delay: (int/float) Time it takes the task to complete
+ :param priority (int) Priority of the task
+ :param value: (various) The value (string, int, ...) to return if successful
+ or an Exception to send to the errBack if 'success'
+ is False
+ """
+ super(SimpleTask, self).__init__('Simple Mock Task',
+ omci_agent,
+ device_id,
+ exclusive=exclusive,
+ priority=priority)
+ self._delay = delay
+ self._success = success
+ self._value = value
+ self._local_deferred = None
+ self._running = False
+
+
+ def cancel_deferred(self):
+ super(SimpleTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(SimpleTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_task)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.cancel_deferred()
+ super(SimpleTask, self).stop()
+
+ @inlineCallbacks
+ def perform_task(self):
+ """
+ Get the 'mib_data_sync' attribute of the ONU
+ """
+ try:
+ running = True
+
+ if self._delay > 0:
+ yield asleep(self._delay)
+
+ if self._success:
+ self.deferred.callback(self._value)
+
+ self.deferred.errback(failure.Failure(self._value))
+ running = False
+
+ except Exception as e:
+ running = False
+ self.deferred.errback(failure.Failure(e))
diff --git a/tests/utests/voltha/extensions/omci/test_mib_sync.py b/tests/utests/voltha/extensions/omci/test_mib_sync.py
new file mode 100644
index 0000000..10a1172
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_mib_sync.py
@@ -0,0 +1,37 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+from mock.mock_adapter_agent import MockAdapterAgent
+
+
+
+class TestMibSync(TestCase):
+ """
+ Test the MIB Synchronizer State Machine
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+
+ def tearDown(self):
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
+
+ # TODO: Add tests
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/utests/voltha/extensions/omci/test_mib_upload.py b/tests/utests/voltha/extensions/omci/test_mib_upload.py
new file mode 100644
index 0000000..c372819
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_mib_upload.py
@@ -0,0 +1,36 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+from mock.mock_adapter_agent import MockAdapterAgent
+
+
+class TestMibUpload(TestCase):
+ """
+ Test the MIB Upload Task
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+
+ def tearDown(self):
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
+
+ # TODO: Add tests
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index c0049b2..2152f04 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -14,7 +14,6 @@
# limitations under the License.
#
from unittest import TestCase, main
-from nose.twistedtools import threaded_reactor, stop_reactor
from mock.mock_adapter_agent import MockAdapterAgent
from mock.mock_onu_handler import MockOnuHandler
from mock.mock_olt_handler import MockOltHandler
@@ -32,15 +31,7 @@
RC = ReasonCodes
-def setup_module():
- threaded_reactor()
-
-
-def teardown_module():
- stop_reactor()
-
-
-class TestOmciCcExample(TestCase):
+class TestOmciCc(TestCase):
"""
Test the Open OMCI Communication channels
diff --git a/tests/utests/voltha/extensions/omci/test_task_runner.py b/tests/utests/voltha/extensions/omci/test_task_runner.py
new file mode 100644
index 0000000..e15541b
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_task_runner.py
@@ -0,0 +1,278 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+from nose.tools import raises
+from twisted.internet import defer
+from twisted.internet.defer import inlineCallbacks, returnValue, CancelledError
+from mock.mock_task import SimpleTask
+from nose.twistedtools import deferred
+from voltha.extensions.omci.tasks.task_runner import TaskRunner
+
+DEVICE_ID = 'omci-unit-tests'
+
+
+class TestTaskRunner(TestCase):
+ """
+ Test the Task Runner Object
+ """
+
+ def setUp(self):
+ # defer.setDebugging(True)
+ self.runner = TaskRunner(DEVICE_ID)
+
+ def tearDown(self):
+ r, self.runner = self.runner, None
+ r.stop()
+
+ def test_default_init(self):
+ self.assertFalse(self.runner.active)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 0)
+ self.assertEqual(self.runner.failed_tasks, 0)
+
+ def test_start_stop(self):
+ self.assertFalse(self.runner.active)
+
+ self.runner.start()
+ self.assertTrue(self.runner.active)
+
+ self.runner.stop()
+ self.assertFalse(self.runner.active)
+
+ def test_simple_task_init(self):
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=0,
+ success=True, value=0, delay=0)
+
+ self.assertEqual(t.priority, 0)
+ self.assertGreater(t.task_id, 0)
+ self.assertTrue(t.exclusive)
+ self.assertFalse(t.deferred.called)
+
+ @raises(AssertionError)
+ def test_simple_negative_priority(self):
+ SimpleTask(None, DEVICE_ID, priority=-1)
+
+ @raises(AssertionError)
+ def test_simple_big_priority(self):
+ SimpleTask(None, DEVICE_ID, priority=256)
+
+ def unexpected_error(self, _failure):
+ self.assertEqual('Should not be here, expected success', _failure)
+
+ def unexpected_success(self, _results):
+ self.assertEqual('Should not be here, expected a failure', _results)
+
+ @deferred(timeout=5)
+ def test_simple_success(self):
+ expected_result = 123
+
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=0,
+ success=True, value=expected_result, delay=0)
+
+ d = self.runner.queue_task(t)
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.runner.start()
+
+ def check_results(results):
+ self.assertEqual(results, expected_result)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 1)
+ self.assertEqual(self.runner.failed_tasks, 0)
+ self.assertTrue(self.runner.active)
+ return results
+
+ d.addCallbacks(check_results, self.unexpected_error)
+ return d
+
+ @raises(Exception)
+ @deferred(timeout=5)
+ def test_simple_failure(self):
+ self.expected_failure = Exception('Testing a task failure')
+
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=0,
+ success=False, value=self.expected_failure,
+ delay=0)
+
+ d = self.runner.queue_task(t)
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.runner.start()
+
+ def expected_failure(failure):
+ self.assertEqual(failure, self.expected_failure)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 0)
+ self.assertEqual(self.runner.failed_tasks, 1)
+ self.assertTrue(self.runner.active)
+ return failure
+
+ d.addCallbacks(self.unexpected_success, expected_failure)
+ return d
+
+ @deferred(timeout=5)
+ def test_priority(self):
+ self.last_value_set = 0
+
+ t1 = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=1,
+ success=True, value=1, delay=0)
+
+ t2 = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=2, # Should finish first
+ success=True, value=2, delay=0)
+
+ d1 = self.runner.queue_task(t1)
+ d2 = self.runner.queue_task(t2)
+
+ def set_last_value(results):
+ self.last_value_set = results
+
+ d1.addCallbacks(set_last_value, self.unexpected_error)
+ d2.addCallbacks(set_last_value, self.unexpected_error)
+
+ self.assertEqual(self.runner.pending_tasks, 2)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ d = defer.gatherResults([d1, d2], consumeErrors=True)
+
+ def check_results(_):
+ self.assertEqual(self.last_value_set, 1)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 2)
+
+ d.addCallbacks(check_results, self.unexpected_error)
+
+ self.runner.start()
+ return d
+
+ @inlineCallbacks
+ def check_that_t1_t2_running_and_last_is_not(self, results):
+ from common.utils.asleep import asleep
+ yield asleep(0.1)
+
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 2)
+ self.assertEqual(self.runner.successful_tasks_completed, 1)
+
+ returnValue(results)
+
+ @deferred(timeout=10)
+ def test_concurrent(self):
+ blocker = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=10,
+ success=True, value=1, delay=0.5)
+
+ t1 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=2)
+
+ t2 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=2)
+
+ last = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=8,
+ success=True, value=1, delay=0)
+
+ d0 = self.runner.queue_task(blocker)
+ d0.addCallbacks(self.check_that_t1_t2_running_and_last_is_not,
+ self.unexpected_error)
+
+ d1 = self.runner.queue_task(t1)
+ d2 = self.runner.queue_task(t2)
+ d3 = self.runner.queue_task(last)
+
+ self.assertEqual(self.runner.pending_tasks, 4)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ d = defer.gatherResults([d0, d1, d2, d3], consumeErrors=True)
+
+ def check_final_results(_):
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 4)
+ self.assertEqual(self.runner.failed_tasks, 0)
+
+ d.addCallbacks(check_final_results, self.unexpected_error)
+
+ self.runner.start()
+ return d
+
+ @raises(CancelledError)
+ @deferred(timeout=2)
+ def test_cancel_queued(self):
+ t = SimpleTask(None, DEVICE_ID,
+ exclusive=True, priority=9,
+ success=True, value=1, delay=0)
+
+ d = self.runner.queue_task(t)
+ self.assertEqual(self.runner.pending_tasks, 1)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ self.runner.cancel_task(t.task_id)
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ return d
+
+ @deferred(timeout=200)
+ def test_cancel_running(self):
+ t1 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=0.5)
+ t2 = SimpleTask(None, DEVICE_ID,
+ exclusive=False, priority=9,
+ success=True, value=1, delay=200)
+
+ d1 = self.runner.queue_task(t1)
+ d2 = self.runner.queue_task(t2)
+
+ self.assertEqual(self.runner.pending_tasks, 2)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ def kill_task_t2(_, task_id):
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 1)
+
+ self.runner.cancel_task(task_id)
+ self.assertEqual(self.runner.running_tasks, 0)
+
+ d1.addCallbacks(kill_task_t2, self.unexpected_error,
+ callbackArgs=[t2.task_id])
+
+ def expected_error(failure):
+ self.assertTrue(isinstance(failure.value, CancelledError))
+ self.assertEqual(self.runner.pending_tasks, 0)
+ self.assertEqual(self.runner.running_tasks, 0)
+ self.assertEqual(self.runner.successful_tasks_completed, 1)
+ self.assertEqual(self.runner.failed_tasks, 1)
+
+ d2.addCallbacks(self.unexpected_success, expected_error)
+
+ self.runner.start()
+ return defer.gatherResults([d1, d2], consumeErrors=True)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/extensions/omci/database/__init__.py b/voltha/extensions/omci/database/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/extensions/omci/database/__init__.py
diff --git a/voltha/extensions/omci/database/mib_db_api.py b/voltha/extensions/omci/database/mib_db_api.py
new file mode 100644
index 0000000..c6cc325
--- /dev/null
+++ b/voltha/extensions/omci/database/mib_db_api.py
@@ -0,0 +1,221 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+OpenOMCI MIB Database API
+"""
+
+import structlog
+from datetime import datetime
+
+CREATED_KEY = 'created'
+MODIFIED_KEY = 'modified'
+MDS_KEY = 'mib_data_sync'
+LAST_SYNC_KEY = 'last_mib_sync'
+VERSION_KEY = 'version'
+
+
+class DatabaseStateError(Exception):
+ def __init__(self, *args):
+ Exception.__init__(self, *args)
+
+
+class MibDbApi(object):
+ """
+ MIB Database API Base Class
+
+ Derive the ME MIB Database implementation from this API. For an example
+ implementation, look at the mib_db_dict.py implementation
+ """
+ def __init__(self, omci_agent):
+ """
+ Class initializer
+ :param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
+ """
+ self.log = structlog.get_logger()
+ self._omci_agent = omci_agent
+ self._started = False
+
+ now = datetime.utcnow()
+ self._created = now
+ self._modified = now
+
+ def start(self):
+ """
+ Start up/restore the database. For in-memory, will be a nop. For external
+ DB, may need to create the DB and fetch create/modified values
+ """
+ if not self._started:
+ self._started = True
+ # For a derived class that is a persistent DB, Restore DB (connect,
+ # get created/modified times, ....) or something along those lines.
+ # Minimal restore could just be getting ONU device IDs' so they are cached
+ # locally. Maximum restore would be a full in-memory version of database
+ # for fast 'GET' request support.
+ # Remember to restore the '_created' and '_modified' times (above) as well
+ # from the database
+
+ def stop(self):
+ """
+ Start up the database. For in-memory, will be a nop. For external
+ DB, may need to create the DB and fetch create/modified values
+ """
+ if self._started:
+ self._started = False
+
+ @property
+ def active(self):
+ """
+ Is the database active
+ :return: (bool) True if active
+ """
+ return self._started
+
+ @property
+ def created(self):
+ """
+ Date (UTC) that the database was created
+ :return: (datetime) creation date
+ """
+ return self._created
+
+ @property
+ def modified(self):
+ """
+ Date (UTC) that the database last added or removed a device
+ or updated a device's ME information
+ :return: (datetime) last modification date
+ """
+ return self._modified
+
+ def add(self, device_id, overwrite=False):
+ """
+ Add a new ONU to database
+
+ :param device_id: (str) Device ID of ONU to add
+ :param overwrite: (bool) Overwrite existing entry if found.
+
+ :raises KeyError: If device does not exist and 'overwrite' is False
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def remove(self, device_id):
+ """
+ Remove an ONU from the database
+
+ :param device_id: (str) Device ID of ONU to remove from database
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def set(self, device_id, class_id, entity_id, attributes):
+ """
+ Set/Create a database value. This should only be called by the MIB synchronizer
+ and its related tasks
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def delete(self, device_id, class_id, entity_id):
+ """
+ Delete an entity from the database if it exists
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+
+ :returns: (bool) True if the instance was found and deleted. False
+ if it did not exist.
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def query(self, device_id, class_id=None, instance_id=None, attributes=None):
+ """
+ Get database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises KeyError: If the requested device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def on_mib_reset(self, device_id):
+ """
+ Reset/clear the database for a specific Device
+
+ :param device_id: (str) ONU Device ID
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ # Your derived class should clear out all MIB data and update the
+ # modified stats appropriately
+ raise NotImplementedError('Implement this in your derive class')
+
+ def save_mib_data_sync(self, device_id, value):
+ """
+ Save the MIB Data Sync to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (int) Value to save
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def get_mib_data_sync(self, device_id):
+ """
+ Get the MIB Data Sync value last saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def save_last_sync(self, device_id, value):
+ """
+ Save the Last Sync time to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (DateTime) Value to save
+ """
+ raise NotImplementedError('Implement this in your derive class')
+
+ def get_last_sync(self, device_id):
+ """
+ Get the Last SYnc Time saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ raise NotImplementedError('Implement this in your derive class')
diff --git a/voltha/extensions/omci/database/mib_db_dict.py b/voltha/extensions/omci/database/mib_db_dict.py
new file mode 100644
index 0000000..b74294e
--- /dev/null
+++ b/voltha/extensions/omci/database/mib_db_dict.py
@@ -0,0 +1,352 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import copy
+from mib_db_api import *
+
+
+class MibDbVolatileDict(MibDbApi):
+ """
+ A very simple in-memory database for ME storage. Data is not persistent
+ across reboots.
+
+ In Phase 2, this DB will be instantiated on a per-ONU basis but act as if
+ it is shared for all ONUs. This class will be updated with and external
+ key-value store (or other appropriate database) in Voltha 1.3 Sprint 3
+
+ This class can be used for unit tests
+ """
+ CURRENT_VERSION = 1
+
+ def __init__(self, omci_agent):
+ """
+ Class initializer
+ :param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
+ """
+ super(MibDbVolatileDict, self).__init__(omci_agent)
+ self._data = dict() # device_id -> ME ID -> Inst ID -> Attr Name -> Values
+
+ def start(self):
+ """
+ Start up/restore the database. For in-memory, will be a nop. For external
+ DB, may need to create the DB and fetch create/modified values
+ """
+ super(MibDbVolatileDict, self).start()
+ # TODO: Delete this method if nothing else is done except calling the base class
+
+ def stop(self):
+ """
+ Start up the database. For in-memory, will be a nop. For external
+ DB, may need to create the DB and fetch create/modified values
+ """
+ super(MibDbVolatileDict, self).stop()
+ # TODO: Delete this method if nothing else is done except calling the base class
+
+ def add(self, device_id, overwrite=False):
+ """
+ Add a new ONU to database
+
+ :param device_id: (str) Device ID of ONU to add
+ :param overwrite: (bool) Overwrite existing entry if found.
+
+ :raises KeyError: If device does not exist and 'overwrite' is False
+ """
+ self.log.debug('add-device', device_id=device_id, overwrite=overwrite)
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not overwrite and device_id in self._data:
+ raise KeyError('Device {} already exists in the database'
+ .format(device_id))
+
+ now = datetime.utcnow()
+ self._data[device_id] = {
+ CREATED_KEY: now,
+ MODIFIED_KEY: now,
+ MDS_KEY: 0,
+ LAST_SYNC_KEY: None,
+ VERSION_KEY: MibDbVolatileDict.CURRENT_VERSION
+ }
+
+ def remove(self, device_id):
+ """
+ Remove an ONU from the database
+
+ :param device_id: (str) Device ID of ONU to remove from database
+ """
+ self.log.debug('remove-device', device_id=device_id)
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if device_id in self._data:
+ del self._data[device_id]
+
+ def on_mib_reset(self, device_id):
+ """
+ Reset/clear the database for a specific Device
+
+ :param device_id: (str) ONU Device ID
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ now = datetime.utcnow()
+ device_db = self._data.get(device_id)
+
+ if device_db is None:
+ self._data[device_id] = {
+ CREATED_KEY: now,
+ LAST_SYNC_KEY: None
+ }
+ device_db = self._data[device_id]
+ else:
+ created = device_db[CREATED_KEY]
+ last_sync = device_db[LAST_SYNC_KEY]
+
+ self._data[device_id] = {
+ CREATED_KEY: created,
+ LAST_SYNC_KEY: last_sync
+ }
+
+ device_db[MODIFIED_KEY] = now
+ device_db[MDS_KEY] = 0
+ device_db[VERSION_KEY] = MibDbVolatileDict.CURRENT_VERSION
+ self._modified = now
+
+ def save_mib_data_sync(self, device_id, value):
+ """
+ Save the MIB Data Sync to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (int) Value to save
+ """
+ if device_id in self._data:
+ assert 0 <= value <= 255,\
+ 'Invalid MIB Data Sync Value: {}'.format(value)
+
+ self._data[device_id][MODIFIED_KEY] = datetime.utcnow()
+ self._data[device_id][MDS_KEY] = value
+
+ def get_mib_data_sync(self, device_id):
+ """
+ Get the MIB Data Sync value last saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ if device_id not in self._data:
+ return None
+
+ return self._data[device_id].get(MDS_KEY)
+
+ def save_last_sync(self, device_id, value):
+ """
+ Save the Last Sync time to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (DateTime) Value to save
+ """
+ if device_id in self._data:
+ self._data[device_id][MODIFIED_KEY] = datetime.utcnow()
+ self._data[device_id][LAST_SYNC_KEY] = value
+
+ def get_last_sync(self, device_id):
+ """
+ Get the Last SYnc Time saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ if device_id not in self._data:
+ return None
+
+ return self._data[device_id].get(LAST_SYNC_KEY)
+
+ def set(self, device_id, class_id, entity_id, attributes):
+ """
+ Set a database value. This should only be called by the MIB synchronizer
+ and its related tasks
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ now = datetime.utcnow()
+ try:
+ device_db = self._data[device_id]
+ class_db = device_db.get(class_id)
+
+ if class_db is None:
+ device_db[class_id] = {
+ CREATED_KEY: now,
+ MODIFIED_KEY: now
+ }
+ class_db = device_db[class_id]
+ self._modified = now
+
+ instance_db = class_db.get(entity_id)
+ if instance_db is None:
+ class_db[entity_id] = {
+ CREATED_KEY: now,
+ MODIFIED_KEY: now
+ }
+ instance_db = class_db[entity_id]
+ class_db[MODIFIED_KEY] = now
+ device_db[MODIFIED_KEY] = now
+ self._modified = now
+
+ changed = False
+
+ for attribute, value in attributes.items():
+ assert isinstance(attribute, basestring)
+ assert value is not None, "Attribute '{}' value cannot be 'None'".\
+ format(attribute)
+
+ db_value = instance_db.get(attribute)
+ assert db_value is None or isinstance(value, type(db_value)), \
+ "New value for attribute '{}' type is changing from '{}' to '{}'".\
+ format(attribute, type(db_value), type(value))
+
+ if db_value is None or db_value != value:
+ instance_db[attribute] = value
+ changed = True
+
+ instance_db[MODIFIED_KEY] = now
+ class_db[MODIFIED_KEY] = now
+ device_db[MODIFIED_KEY] = now
+ self._modified = now
+
+ return changed
+
+ except Exception as e:
+ self.log.error('set-failure', e=e)
+ raise
+
+ def delete(self, device_id, class_id, entity_id):
+ """
+ Delete an entity from the database if it exists. If all instances
+ of a class are deleted, the class is deleted as well.
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+
+ :returns: (bool) True if the instance was found and deleted. False
+ if it did not exist.
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ try:
+ device_db = self._data[device_id]
+ class_db = device_db.get(class_id)
+
+ if class_db is None:
+ return False
+
+ instance_db = class_db.get(entity_id)
+ if instance_db is None:
+ return False
+
+ now = datetime.utcnow()
+ del class_db[entity_id]
+
+ if len(class_db) == len([CREATED_KEY, MODIFIED_KEY]):
+ del device_db[class_id]
+ else:
+ class_db[MODIFIED_KEY] = now
+ device_db[MODIFIED_KEY] = now
+ self._modified = now
+
+ return True
+
+ except Exception as e:
+ self.log.error('delete-failure', e=e)
+ raise
+
+ def query(self, device_id, class_id=None, instance_id=None, attributes=None):
+ """
+ Get database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises KeyError: If the requested device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID is a string')
+
+ device_db = self._data[device_id]
+ if class_id is None:
+ return device_db # TODO: copy.deepcopy(device_db)
+
+ if not isinstance(class_id, int):
+ raise TypeError('Class ID is an integer')
+
+ class_db = device_db.get(class_id, dict())
+ if instance_id is None or len(class_db) == 0:
+ return class_db # TODO: copy.deepcopy(class_db)
+
+ if not isinstance(instance_id, int):
+ raise TypeError('Instance ID is an integer')
+
+ instance_db = class_db.get(instance_id, dict())
+ if attributes is None or len(instance_db) == 0:
+ return instance_db # TODO: copy.deepcopy(instance_db)
+
+ if not isinstance(attributes, (basestring, list)):
+ raise TypeError('Attributes should be a string or list of strings')
+
+ if not isinstance(attributes, list):
+ attributes = [attributes]
+
+ return {attr: val for attr, val in instance_db.iteritems()
+ if attr in attributes}
diff --git a/voltha/extensions/omci/me_frame.py b/voltha/extensions/omci/me_frame.py
index 1808d04..e491bae 100644
--- a/voltha/extensions/omci/me_frame.py
+++ b/voltha/extensions/omci/me_frame.py
@@ -169,8 +169,8 @@
return OmciFrame(
transaction_id=None,
- message_type=OmciGet.message_id,
- omci_message=OmciGet(
+ message_type=OmciDelete.message_id,
+ omci_message=OmciDelete(
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id')
))
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index c855cfe..8f1bb4d 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -24,6 +24,8 @@
from common.frameio.frameio import hexify
from voltha.extensions.omci.omci import *
from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame
+from common.event_bus import EventBusClient
+from enum import IntEnum
_MAX_INCOMING_ALARM_MESSAGES = 256
@@ -34,13 +36,41 @@
MAX_OMCI_REQUEST_AGE = 60 # Seconds
MAX_OMCI_TX_ID = 0xFFFF # 2 Octets max
+CONNECTED_KEY = 'connected'
+TX_REQUEST_KEY = 'tx-request'
+RX_RESPONSE_KEY = 'rx-response'
+
+
+class OmciCCRxEvents(IntEnum):
+ AVC_Notification = 0,
+ MIB_Upload = 1,
+ MIB_Upload_Next = 2,
+ Create = 3,
+ Delete = 4,
+ Set = 5,
+ Alarm_Notification = 6,
+ Test_Result = 7,
+ MIB_Reset = 8,
+ Connectivity = 9
+
+
# abbreviations
OP = EntityOperations
+RxEvent = OmciCCRxEvents
class OMCI_CC(object):
""" Handle OMCI Communication Channel specifics for Adtran ONUs"""
+ _frame_to_event_type = {
+ OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
+ OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
+ OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
+ OmciCreateResponse.message_id: RxEvent.Create,
+ OmciDeleteResponse.message_id: RxEvent.Delete,
+ OmciSetResponse.message_id: RxEvent.Set,
+ }
+
def __init__(self, adapter_agent, device_id, me_map=None,
alarm_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
avc_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
@@ -72,6 +102,8 @@
self._reply_max = 0 # Longest successful tx -> rx
self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
+ self.event_bus = EventBusClient()
+
# If a list of custom ME Entities classes were provided, insert them into
# main class_id to entity map.
# TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
@@ -79,6 +111,19 @@
def __str__(self):
return "OMCISupport: {}".format(self._device_id)
+ @staticmethod
+ def event_bus_topic(device_id, event):
+ """
+ Get the topic name for a given event Frame Type
+ :param device_id: (str) ONU Device ID
+ :param event: (OmciCCRxEvents) Type of event
+ :return: (str) Topic string
+ """
+ assert event in OmciCCRxEvents, \
+ 'Event {} is not an OMCI-CC Rx Event'.format(event.Name)
+
+ return 'omci-rx:{}:{}'.format(device_id, event.name)
+
@property
def enabled(self):
return self._enabled
@@ -199,9 +244,6 @@
Start the OMCI Communications Channel
"""
assert self._enabled, 'Start should only be called if enabled'
- #
- # TODO: Perform any other common startup tasks here
- #
self.flush()
device = self._adapter_agent.get_device(self._device_id)
@@ -212,9 +254,6 @@
Stop the OMCI Communications Channel
"""
assert not self._enabled, 'Stop should only be called if disabled'
- #
- # TODO: Perform common shutdown tasks here
- #
self.flush()
self._proxy_address = None
@@ -225,7 +264,6 @@
def _receive_onu_message(self, rx_frame):
""" Autonomously generated ONU frame Rx handler"""
- # TODO: Best way to handle autonomously generated ONU frames may be pub/sub method
from twisted.internet.defer import QueueOverflow
self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
frame=hexify(str(rx_frame)))
@@ -235,7 +273,12 @@
self._rx_onu_frames += 1
+ msg = {TX_REQUEST_KEY: None,
+ RX_RESPONSE_KEY: rx_frame}
+
if msg_type == EntityOperations.AlarmNotification.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
+ reactor.callLater(0, self.event_bus.publish, topic, msg)
try:
self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
@@ -244,15 +287,26 @@
self.log.warn('onu-rx-alarm-overflow', cnt=self._rx_alarm_overflow)
elif msg_type == EntityOperations.AttributeValueChange.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
+ reactor.callLater(0, self.event_bus.publish, topic, msg)
try:
self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
except QueueOverflow:
self._rx_avc_overflow += 1
self.log.warn('onu-rx-avc-overflow', cnt=self._rx_avc_overflow)
+
+ elif msg_type == EntityOperations.TestResult.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
+ reactor.callLater(0, self.event_bus.publish, topic, msg)
+ try:
+ self._test_results_queue.put((rx_frame, arrow.utcnow().float_timestamp))
+
+ except QueueOverflow:
+ self.log.warn('onu-rx-test-results-overflow')
+
else:
# TODO: Need to add test results message support
-
self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
self._rx_onu_discards += 1
@@ -282,6 +336,11 @@
if rx_tid == 0:
return self._receive_onu_message(rx_frame)
+ # Previously unreachable if this is the very first Rx or we
+ # have been running consecutive errors
+ if self._rx_frames == 0 or self._consecutive_errors != 0:
+ reactor.callLater(0, self._publish_connectivity_event, True)
+
self._rx_frames += 1
self._consecutive_errors = 0
@@ -299,7 +358,7 @@
omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
try:
- (ts, d, _, _) = self._requests.pop(rx_tid)
+ (ts, d, tx_frame, _) = self._requests.pop(rx_tid)
ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
secs = ts_diff.total_seconds()
@@ -311,8 +370,6 @@
if secs > self._reply_max:
self._reply_max = secs
- # TODO: Could also validate response type based on request action
-
except KeyError as e:
# Possible late Rx on a message that timed-out
self._rx_unknown_tid += 1
@@ -325,11 +382,44 @@
return d.errback(failure.Failure(e))
return
- d.callback(rx_frame)
+ # Notify sender of completed request
+ reactor.callLater(0, d.callback, rx_frame)
+
+ # Publish Rx event to listeners in a different task
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
except Exception as e:
self.log.exception('rx-msg', e=e)
+ def _publish_rx_frame(self, tx_frame, rx_frame):
+ """
+ Notify listeners of successful response frame
+ :param tx_frame: (OmciFrame) Original request frame
+ :param rx_frame: (OmciFrame) Response frame
+ """
+ if self._enabled and isinstance(rx_frame, OmciFrame):
+ frame_type = rx_frame.fields['omci_message'].message_id
+ event_type = OMCI_CC._frame_to_event_type.get(frame_type)
+
+ if event_type is not None:
+ topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
+ msg = {TX_REQUEST_KEY: tx_frame,
+ RX_RESPONSE_KEY: rx_frame}
+
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ def _publish_connectivity_event(self, connected):
+ """
+ Notify listeners of Rx/Tx connectivity over OMCI
+ :param connected: (bool) True if connectivity transitioned from unreachable
+ to reachable
+ """
+ if self._enabled:
+ topic = OMCI_CC.event_bus_topic(self._device_id,
+ RxEvent.Connectivity)
+ msg = {CONNECTED_KEY: connected}
+ self.event_bus.publish(topic=topic, msg=msg)
+
def flush(self, max_age=0):
limit = arrow.utcnow().float_timestamp - max_age
old = [tid for tid, (ts, _, _, _) in self._requests.iteritems()
@@ -379,6 +469,10 @@
value.trap(CancelledError)
self._rx_timeouts += 1
self._consecutive_errors += 1
+
+ if self._consecutive_errors == 1:
+ reactor.callLater(0, self._publish_connectivity_event, False)
+
self.log.info('timeout', tx_id=tx_tid, timeout=timeout)
value = failure.Failure(TimeoutError(timeout, "Deferred"))
@@ -391,34 +485,8 @@
:param rx_frame: (OmciFrame) OMCI response frame with matching TID
:return: (OmciFrame) OMCI response frame with matching TID
"""
- #
- # TODO: Here we could update the MIB database if we did a set/create/delete
- # or perhaps a verify if a GET. Also could increment mib counter
- #
- # TODO: A better way to perform this in VOLTHA v1.3 would be to provide
- # a pub/sub capability for external users/tasks to monitor responses
- # that could optionally take a filter. This would allow a MIB-Sync
- # task to easily watch all AVC notifications as well as Set/Create/Delete
- # operations and keep them serialized. It may also be a better/easier
- # way to handle things if we containerize OpenOMCI.
- #
- try:
- if isinstance(rx_frame.omci_message, OmciGetResponse):
- pass # TODO: Implement MIB check or remove
-
- elif isinstance(rx_frame.omci_message, OmciSetResponse):
- pass # TODO: Implement MIB update
-
- elif isinstance(rx_frame.omci_message, OmciCreateResponse):
- pass # TODO: Implement MIB update
-
- elif isinstance(rx_frame.omci_message, OmciDeleteResponse):
- pass # TODO: Implement MIB update
-
- except Exception as e:
- self.log.exception('omci-message', e=e)
- raise
-
+ # At this point, no additional processing is required
+ # Continue with Rx Success callbacks.
return rx_frame
def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT):
@@ -478,6 +546,10 @@
except Exception as e:
self._tx_errors += 1
self._consecutive_errors += 1
+
+ if self._consecutive_errors == 1:
+ reactor.callLater(0, self._publish_connectivity_event, False)
+
self.log.exception('send-omci', e=e)
return fail(result=failure.Failure(e))
diff --git a/voltha/extensions/omci/omci_me.py b/voltha/extensions/omci/omci_me.py
index 427c1e7..8fff922 100644
--- a/voltha/extensions/omci/omci_me.py
+++ b/voltha/extensions/omci/omci_me.py
@@ -564,7 +564,7 @@
:param alloc_id: (int) This attribute links the T-CONT with the alloc-ID
assigned by the OLT in the assign_alloc-ID PLOAM
- message (0..0xFFF)
+ message (0..0xFFF) or 0xFFFF to mark as free
:param policy: (int) This attribute indicates the T-CONT's traffic scheduling
policy. Valid values:
@@ -576,8 +576,8 @@
self.check_type(alloc_id, (int, type(None)))
self.check_type(policy, (int, type(None)))
- if alloc_id is not None and not 0 <= alloc_id <= 0xFFF:
- raise ValueError('alloc_id should be 0..0xFFF')
+ if alloc_id is not None and not (0 <= alloc_id <= 0xFFF or alloc_id == 0xFFFF):
+ raise ValueError('alloc_id should be 0..0xFFF or 0xFFFF to mark it as free')
if policy is not None and not 0 <= policy <= 2:
raise ValueError('policy should be 0..2')
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
new file mode 100644
index 0000000..134d3ec
--- /dev/null
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -0,0 +1,247 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+import voltha.extensions.omci.omci_entities as omci_entities
+from voltha.extensions.omci.omci_cc import OMCI_CC
+from common.event_bus import EventBusClient
+from voltha.extensions.omci.tasks.task_runner import TaskRunner
+
+from twisted.internet import reactor
+from enum import IntEnum
+
+OP = EntityOperations
+RC = ReasonCodes
+
+IN_SYNC_KEY = 'in-sync'
+LAST_IN_SYNC_KEY = 'last-in-sync-time'
+
+
+class OnuDeviceEvents(IntEnum):
+ # Events of interest to Device Adapters and OpenOMCI State Machines
+ DeviceStatusEvent = 0 # OnuDeviceEntry running status changed
+ MibDatabaseSyncEvent = 1 # MIB database sync changed
+ # TODO: Add other events here as needed
+
+
+class OnuDeviceEntry(object):
+ """
+ An ONU Device entry in the MIB
+ """
+ def __init__(self, omci_agent, device_id, adapter_agent, custom_me_map,
+ mib_synchronizer_info, mib_db):
+ """
+ Class initializer
+
+ :param device_id: (str) ONU Device ID
+ :param custom_me_map: (dict) Additional/updated ME to add to class map
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._started = False
+ self._omci_agent = omci_agent # OMCI AdapterAgent
+ self._device_id = device_id # ONU Device ID
+ self._runner = TaskRunner(device_id) # OMCI_CC Task runner
+ self._deferred = None
+
+ try:
+ self._mib_db_in_sync = False
+ self.mib_sync = mib_synchronizer_info['state-machine'](self._omci_agent,
+ device_id,
+ mib_synchronizer_info['tasks'],
+ mib_db)
+ except Exception as e:
+ self.log.exception('mib-sync-create-failed', e=e)
+ raise
+
+ self._state_machines = [self.mib_sync]
+ self._custom_me_map = custom_me_map
+ self._me_map = omci_entities.entity_id_to_class_map.copy()
+
+ if custom_me_map is not None:
+ self._me_map.update(custom_me_map)
+
+ self.event_bus = EventBusClient()
+
+ # Create OMCI communications channel
+ self._omci_cc = OMCI_CC(adapter_agent, self.device_id, self._me_map)
+
+ @staticmethod
+ def event_bus_topic(device_id, event):
+ """
+ Get the topic name for a given event for this ONU Device
+ :param device_id: (str) ONU Device ID
+ :param event: (OnuDeviceEvents) Type of event
+ :return: (str) Topic string
+ """
+ assert event in OnuDeviceEvents, \
+ 'Event {} is not an ONU Device Event'.format(event.Name)
+ return 'omci-device:{}:{}'.format(device_id, event.name)
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def omci_cc(self):
+ return self._omci_cc
+
+ @property
+ def task_runner(self):
+ return self._runner
+
+ @property
+ def mib_synchronizer(self):
+ return self.mib_sync
+
+ @property
+ def active(self):
+ """
+ Is the ONU device currently active/running
+ """
+ return self._started
+
+ @property
+ def custom_me_map(self):
+ """ Custom Managed Entity Map for this device"""
+ return self._custom_me_map
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def mib_db_in_sync(self):
+ return self._mib_db_in_sync
+
+ @mib_db_in_sync.setter
+ def mib_db_in_sync(self, value):
+ if self._mib_db_in_sync != value:
+ # Save value
+ self._mib_db_in_sync = value
+
+ # Notify any event listeners
+
+ topic = OnuDeviceEntry.event_bus_topic(self.device_id,
+ OnuDeviceEvents.MibDatabaseSyncEvent)
+ msg = {
+ IN_SYNC_KEY: self._mib_db_in_sync,
+ LAST_IN_SYNC_KEY: self.mib_synchronizer.last_mib_db_sync
+ }
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ def start(self):
+ if self._started:
+ return
+
+ self._started = True
+ self._omci_cc.enabled = True
+ self._runner.start()
+
+ # Start MIB Sync and other state machines. Start 'later' so that any
+ # ONU Device, OMCI DB, OMCI Agent, and others are fully started before
+ # performing the start.
+
+ def start_state_machines(machines):
+ for sm in machines:
+ sm.start()
+
+ self._deferred = reactor.callLater(0, start_state_machines,
+ self._state_machines)
+ # Notify any event listeners
+ self._publish_device_status_event()
+
+ def stop(self):
+ if not self._started:
+ return
+
+ self._started = False
+ self._cancel_deferred()
+ self._omci_cc.enabled = False
+
+ # Halt MIB Sync and other state machines
+ for sm in self._state_machines:
+ sm.stop()
+
+ # Stop task runner
+ self._runner.stop()
+
+ # Notify any event listeners
+ self._publish_device_status_event()
+
+ def _publish_device_status_event(self):
+ """
+ Publish the ONU Device start/start status.
+ """
+ topic = OnuDeviceEntry.event_bus_topic(self.device_id,
+ OnuDeviceEvents.DeviceStatusEvent)
+ msg = {'active': self._started}
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ def delete(self):
+ self.stop()
+
+ def query_mib(self, class_id=None, instance_id=None, attributes=None):
+ """
+ Get MIB database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', class_id=class_id, instance_id=instance_id,
+ attributes=attributes)
+
+ return self.mib_synchronizer.query_mib(class_id=class_id, instance_id=instance_id,
+ attributes=attributes)
+
+ def query_mib_single_attribute(self, class_id, instance_id, attribute):
+ """
+ Get MIB database information for a single specific attribute
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attribute: (str) Managed Entity instance's attribute
+
+ :return: (varies) The value requested. If class/inst/attribute is
+ not found, None is returned
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query-single', class_id=class_id,
+ instance_id=instance_id, attributes=attribute)
+ assert isinstance(attribute, basestring), \
+ 'Only a single attribute value can be retrieved'
+
+ entry = self.mib_synchronizer.query_mib(class_id=class_id,
+ instance_id=instance_id,
+ attributes=attribute)
+
+ return entry[attribute] if attribute in entry else None
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
new file mode 100644
index 0000000..801ea03
--- /dev/null
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -0,0 +1,189 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+from voltha.extensions.omci.state_machines.mib_sync import MibSynchronizer
+from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
+from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
+from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+
+from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry
+
+OpenOmciAgentDefaults = {
+ 'mib-synchronizer': {
+ 'state-machine': MibSynchronizer, # Implements the MIB synchronization state machine
+ 'database': MibDbVolatileDict, # Implements ME MIB database
+ 'tasks': {
+ 'mib-upload': MibUploadTask,
+ 'get-mds': GetMdsTask,
+ 'mib-audit': GetMdsTask,
+ 'mib-resync': MibResyncTask,
+ 'mib-reconcile': None # TODO: post-v1.3.0 (Reconcile out-of-sync MIB DB)
+ }
+ },
+ # TODO: Alarm-synchronizer is a stretch goal for Voltha 1.3.0
+ # 'alarm-syncronizer': {
+ # 'state-machine': AlarmSynchronizer, # Implements the MIB synchronization state machine
+ # 'database': AlarmDb, # For any State storage needs
+ # 'tasks': {
+ # 'task-1': needToWrite,
+ # 'task-2': needToWrite,
+ # }
+ # }
+}
+
+
+class OpenOMCIAgent(object):
+ """
+ OpenOMCI for VOLTHA
+
+ This will become the primary interface into OpenOMCI for ONU Device Adapters
+ in VOLTHA v1.3 sprint 3 time frame.
+ """
+ def __init__(self, core, support_classes=OpenOmciAgentDefaults):
+ """
+ Class initializer
+
+ :param core: (VolthaCore) VOLTHA Core
+ :param support_classes: (Dict) Classes to support OMCI
+ """
+ self.log = structlog.get_logger()
+ self._core = core
+ self._started = False
+ self._devices = dict() # device-id -> DeviceEntry
+
+ # MIB Synchronization
+ self._mib_db = None
+ self._mib_synchronizer_info = support_classes['mib-synchronizer']
+ self._mib_database_cls = self._mib_synchronizer_info['database']
+
+ # Alarm Synchronization # TODO: Stretch goal for VOLTHA v1.3.0
+ # self._alarm_db = None
+ # self._alarm_synchronizer_info = support_classes['alarm-synchronizer']
+ # self._alarm_database_cls = self._alarm_synchronizer_info['database']
+
+ def start(self):
+ """
+ Start OpenOMCI
+ """
+ if self._started:
+ return
+
+ self.log.debug('start')
+ self._started = True
+
+ try:
+ # Create all databases as needed. This should be done before
+ # State machines are started for the first time
+
+ if self._mib_db is None:
+ self._mib_db = self._mib_database_cls(self)
+
+ # TODO Alarm DB
+
+ # Start/restore databases
+
+ self._mib_db.start()
+
+ for device in self._devices.itervalues():
+ device.start()
+
+ except Exception as e:
+ self.log.exception('startup', e=e)
+
+ def stop(self):
+ """
+ Shutdown OpenOMCI
+ """
+ if not self._started:
+ return
+
+ self.log.debug('stop')
+ self._started = False
+
+ # ONUs OMCI shutdown
+
+ for device in self._devices.itervalues():
+ device.stop()
+
+ # DB shutdown
+ self._mib_db.stop()
+
+ def add_device(self, device_id, adapter_agent, custom_me_map=None):
+ """
+ Add a new ONU to be managed.
+
+ To provide vendor-specific or custom Managed Entities, create your own Entity
+ ID to class mapping dictionary.
+
+ Since ONU devices can be added at any time (even during Device Handler
+ startup), the ONU device handler is responsible for calling start()/stop()
+ for this object.
+
+ :param device_id: (str) Device ID of ONU to add
+ :param adapter_agent: (AdapterAgent) Adapter agent for ONU
+ :param custom_me_map: (dict) Additional/updated ME to add to class map
+
+ :return: (OnuDeviceEntry) The ONU device
+ """
+ self.log.debug('add-device', device_id=device_id)
+
+ device = self._devices.get(device_id)
+
+ if device is None:
+ device = OnuDeviceEntry(self, device_id, adapter_agent, custom_me_map,
+ self._mib_synchronizer_info, self._mib_db)
+
+ self._devices[device_id] = device
+
+ return device
+
+ def remove_device(self, device_id, cleanup=False):
+ """
+ Remove a managed ONU
+
+ :param device_id: (str) Device ID of ONU to remove
+ :param cleanup: (bool) If true, scrub any state related information
+ """
+ self.log.debug('remove-device', device_id=device_id, cleanup=cleanup)
+
+ device = self._devices.get(device_id)
+
+ if device is not None:
+ device.stop()
+
+ if cleanup:
+ del self._devices[device_id]
+
+ def device_ids(self):
+ """
+ Get an immutable set of device IDs managed by this OpenOMCI instance
+
+ :return: (frozenset) Set of device IDs (str)
+ """
+ return frozenset(self._devices.keys())
+
+ def get_device(self, device_id):
+ """
+ Get ONU device entry. For external (non-OpenOMCI users) the ONU Device
+ returned should be used for read-only activity.
+
+ :param device_id: (str) ONU Device ID
+
+ :return: (OnuDeviceEntry) ONU Device entry
+ :raises KeyError: If device does not exist
+ """
+ return self._devices[device_id]
diff --git a/voltha/extensions/omci/state_machines/__init__.py b/voltha/extensions/omci/state_machines/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/__init__.py
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
new file mode 100644
index 0000000..edcd7f4
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -0,0 +1,662 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from datetime import datetime, timedelta
+from transitions import Machine
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
+ RX_RESPONSE_KEY
+from voltha.extensions.omci.omci_entities import OntData
+from common.event_bus import EventBusClient
+
+
+RxEvent = OmciCCRxEvents
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class MibSynchronizer(object):
+ """
+ OpenOMCI MIB Synchronizer state machine
+ """
+ DEFAULT_STATES = ['disabled', 'starting', 'uploading', 'examining_mds',
+ 'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
+
+ {'trigger': 'upload_mib', 'source': 'starting', 'dest': 'uploading'},
+ {'trigger': 'examine_mds', 'source': 'starting', 'dest': 'examining_mds'},
+
+ {'trigger': 'success', 'source': 'uploading', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'uploading', 'dest': 'starting'},
+
+ {'trigger': 'success', 'source': 'examining_mds', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'examining_mds', 'dest': 'starting'},
+ {'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'uploading'},
+
+ {'trigger': 'audit_mib', 'source': 'in_sync', 'dest': 'auditing'},
+ {'trigger': 'audit_mib', 'source': 'out_of_sync', 'dest': 'auditing'},
+
+ {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'auditing', 'dest': 'starting'},
+ {'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
+ {'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
+
+ {'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
+ {'trigger': 'timeout', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ ]
+ DEFAULT_TIMEOUT_RETRY = 5 # Seconds to delay after task failure/timeout
+ DEFAULT_AUDIT_DELAY = 15 # Periodic tick to audit the MIB Data Sync
+ DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
+
+ def __init__(self, agent, device_id, mib_sync_tasks, db, states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_TIMEOUT_RETRY,
+ audit_delay=DEFAULT_AUDIT_DELAY,
+ resync_delay=DEFAULT_RESYNC_DELAY):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param db: (MibDbVolatileDict) MIB Database
+ :param mib_sync_tasks: (dict) Tasks to run
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
+ a retry (goes back to starting state)
+ :param audit_delay: (int) Seconds between MIB audits while in sync. Set to
+ zero to disable audit. An operator can request
+ an audit manually by calling 'self.audit_mib'
+ :param resync_delay: (int) Seconds in sync before performing a forced MIB
+ resynchronization
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._database = db
+ self._timeout_delay = timeout_delay
+ self._audit_delay = audit_delay
+ self._resync_delay = resync_delay
+
+ self._upload_task = mib_sync_tasks['mib-upload']
+ self._get_mds_task = mib_sync_tasks['get-mds']
+ self._audit_task = mib_sync_tasks['mib-audit']
+ self._resync_task = mib_sync_tasks['mib-resync']
+
+ self._deferred = None
+ self._current_task = None # TODO: Support multiple running tasks after v.1.3.0 release
+ self._task_deferred = None
+ self._mib_data_sync = db.get_mib_data_sync(device_id) or 0
+ self._last_mib_db_sync_value = db.get_last_sync(device_id)
+
+ self._event_bus = EventBusClient()
+ self._subscriptions = { # RxEvent.enum -> Subscription Object
+ RxEvent.MIB_Reset: None,
+ RxEvent.AVC_Notification: None,
+ RxEvent.MIB_Upload: None,
+ RxEvent.MIB_Upload_Next: None,
+ RxEvent.Create: None,
+ RxEvent.Delete: None,
+ RxEvent.Set: None
+ }
+ self._sub_mapping = {
+ RxEvent.MIB_Reset: self.on_mib_reset_response,
+ RxEvent.AVC_Notification: self.on_avc_notification,
+ RxEvent.MIB_Upload: self.on_mib_upload_response,
+ RxEvent.MIB_Upload_Next: self.on_mib_upload_next_response,
+ RxEvent.Create: self.on_create_response,
+ RxEvent.Delete: self.on_delete_response,
+ RxEvent.Set: self.on_set_response
+ }
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}'.format(self.__class__.__name__))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d1]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def __str__(self):
+ return 'MIBSynchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def mib_data_sync(self):
+ return self._mib_data_sync
+
+ def increment_mib_data_sync(self):
+ self._mib_data_sync += 1
+ if self._mib_data_sync > 255:
+ self._mib_data_sync = 0
+
+ if self._database is not None:
+ self._database.save_mib_data_sync(self._device_id,
+ self._mib_data_sync)
+
+ @property
+ def last_mib_db_sync(self):
+ return self._last_mib_db_sync_value
+
+ @last_mib_db_sync.setter
+ def last_mib_db_sync(self, value):
+ self._last_mib_db_sync_value = value
+ if self._database is not None:
+ self._database.save_last_sync(self.device_id, value)
+
+ @property
+ def is_new_onu(self):
+ """
+ Is this a new ONU (has never completed MIB synchronization)
+ :return: (bool) True if this ONU should be considered new
+ """
+ return self.last_mib_db_sync is None
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self.log.debug('state-transition')
+
+ self._cancel_deferred()
+ if self._device is not None:
+ self._device.mib_db_in_sync = False
+
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ # Drop Response and Autonomous notification subscriptions
+ for event, sub in self._subscriptions.iteritems():
+ if sub is not None:
+ self._subscriptions[event] = None
+ self._device.omci_cc.event_bus.unsubscribe(sub)
+
+ # TODO: Stop and remove any currently running or scheduled tasks
+ # TODO: Anything else?
+
+ def on_enter_starting(self):
+ """
+ Determine ONU status and start MIB Synchronization tasks
+ """
+ self._device = self._agent.get_device(self._device_id)
+ self.log.debug('state-transition', new_onu=self.is_new_onu)
+
+ # Set up Response and Autonomous notification subscriptions
+ try:
+ for event, sub in self._sub_mapping.iteritems():
+ if self._subscriptions[event] is None:
+ self._subscriptions[event] = \
+ self._device.omci_cc.event_bus.subscribe(
+ topic=OMCI_CC.event_bus_topic(self._device_id, event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('subscription-setup', e=e)
+
+ # Determine if this ONU has ever synchronized
+ if self.is_new_onu:
+ # Start full MIB upload
+ self._deferred = reactor.callLater(0, self.upload_mib)
+
+ else:
+ # Examine the MIB Data Sync
+ self._deferred = reactor.callLater(0, self.examine_mds)
+
+ def on_enter_uploading(self):
+ """
+ Begin full MIB data sync, starting with a MIB RESET
+ """
+ def success(results):
+ self.log.info('mib-upload-success: {}'.format(results))
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('mib-upload-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._device.mib_db_in_sync = False
+ self._current_task = self._upload_task(self._agent, self._device_id)
+
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_examining_mds(self):
+ """
+ Create a simple task to fetch the MIB Data Sync value and
+ determine if the ONU value matches what is in the MIB database
+ """
+ self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
+
+ def success(onu_mds_value):
+ self.log.info('examine-mds-success: {}'.format(onu_mds_value))
+ self._current_task = None
+
+ # Examine MDS value
+ if self._mib_data_sync == onu_mds_value:
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ self._deferred = reactor.callLater(0, self.mismatch)
+
+ def failure(reason):
+ self.log.info('examine-mds-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._device.mib_db_in_sync = False
+ self._current_task = self._get_mds_task(self._agent, self._device_id)
+
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_in_sync(self):
+ """
+ Schedule a tick to occur to in the future to request an audit
+ """
+ self.log.debug('state-transition', audit_delay=self._audit_delay)
+ self.last_mib_db_sync = datetime.utcnow()
+ self._device.mib_db_in_sync = True
+
+ if self._audit_delay > 0:
+ self._deferred = reactor.callLater(self._audit_delay, self.audit_mib)
+
+ def on_enter_out_of_sync(self):
+ """
+ Schedule a tick to occur to in the future to request an audit
+ """
+ self.log.debug('state-transition', audit_delay=self._audit_delay)
+ self._device.mib_db_in_sync = False
+
+ if self._audit_delay > 0:
+ self._deferred = reactor.callLater(self._audit_delay, self.audit_mib)
+
+ def on_enter_auditing(self):
+ """
+ Perform a MIB Audit. If our last MIB resync was too long in the
+ past, perform a resynchronization anyway
+ """
+ next_resync = self.last_mib_db_sync + timedelta(seconds=self._resync_delay)\
+ if self.last_mib_db_sync is not None else datetime.utcnow()
+
+ self.log.debug('state-transition', next_resync=next_resync)
+
+ if datetime.utcnow() >= next_resync:
+ self._deferred = reactor.callLater(0, self.force_resync)
+ else:
+ def success(onu_mds_value):
+ self.log.debug('get-mds-success: {}'.format(onu_mds_value))
+ self._current_task = None
+ # Examine MDS value
+ if self._mib_data_sync == onu_mds_value:
+ self._deferred = reactor.callLater(0, self.success)
+ else:
+ self._device.mib_db_in_sync = False
+ self._deferred = reactor.callLater(0, self.mismatch)
+
+ def failure(reason):
+ self.log.info('get-mds-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._audit_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_resynchronizing(self):
+ """
+ Perform a resynchronization of the MIB database
+ """
+ def success(results):
+ self.log.info('resync-success: {}'.format(results))
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('resync-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ self._current_task = self._resync_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_mib_reset_response(self, _topic, msg):
+ """
+ Called upon receipt of a MIB Reset Response for this ONU
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-mib-reset-response', state=self.state)
+ try:
+ response = msg[RX_RESPONSE_KEY]
+
+ # Check if expected in current mib_sync state
+ if self.state != 'uploading' or self._subscriptions[RxEvent.MIB_Reset] is None:
+ self.log.error('rx-in-invalid-state', state=self.state)
+
+ else:
+ now = datetime.utcnow()
+
+ if not isinstance(response, OmciFrame):
+ raise TypeError('Response should be an OmciFrame')
+
+ omci_msg = response.fields['omci_message'].fields
+ status = omci_msg['success_code']
+
+ assert status == RC.Success, 'Unexpected MIB reset response status: {}'. \
+ format(status)
+
+ self._device.mib_db_in_sync = False
+ self._mib_data_sync = 0
+ self._device._modified = now
+ self._database.on_mib_reset(self._device_id)
+
+ except KeyError:
+ pass # NOP
+
+ def on_avc_notification(self, _topic, msg):
+ """
+ Process an Attribute Value Change Notification
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-avc-notification', state=self.state)
+
+ if self._subscriptions[RxEvent.AVC_Notification]:
+ try:
+ notification = msg[RX_RESPONSE_KEY]
+
+ if self.state == 'disabled':
+ self.log.error('rx-in-invalid-state', state=self.state)
+
+ elif self.state != 'uploading':
+ # Inspect the notification
+ omci_msg = notification.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ instance_id = omci_msg['entity_id']
+ data = omci_msg['data']
+ attributes = [data.keys()]
+
+ # Look up ME Instance in Database. Not-found can occur if a MIB
+ # reset has occurred
+ info = self._database.query(self.device_id, class_id, instance_id, attributes)
+ # TODO: Add old/new info to log message
+ self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
+
+ # Save the changed data to the MIB.
+ changed = self._database.set(self.device_id, class_id, instance_id, data)
+
+ if changed:
+ # Autonomous creation and deletion of managed entities do not
+ # result in an incrwment of the MIB data sync value. However,
+ # AVC's in response to a change by the Operater do incur an
+ # increment of the MIB Data Sync
+ pass
+
+ except KeyError:
+ pass # NOP
+
+ def on_mib_upload_response(self, _topic, msg):
+ """
+ Process a Set response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-mib-upload-next-response', state=self.state)
+
+ if self._subscriptions[RxEvent.MIB_Upload]:
+ # Check if expected in current mib_sync state
+ if self.state == 'resynchronizing':
+ # The resync task handles this
+ return
+
+ if self.state != 'uploading':
+ self.log.error('rx-in-invalid-state', state=self.state)
+
+ def on_mib_upload_next_response(self, _topic, msg):
+ """
+ Process a Set response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-mib-upload-next-response', state=self.state)
+
+ if self._subscriptions[RxEvent.MIB_Upload_Next]:
+ try:
+ if self.state == 'resynchronizing':
+ # The resync task handles this
+ return
+
+ # Check if expected in current mib_sync state
+ if self.state != 'uploading':
+ self.log.error('rx-in-invalid-state', state=self.state)
+
+ else:
+ response = msg[RX_RESPONSE_KEY]
+
+ # Extract entity instance information
+ omci_msg = response.fields['omci_message'].fields
+
+ class_id = omci_msg['object_entity_class']
+ entity_id = omci_msg['object_entity_id']
+
+ # Filter out the 'mib_data_sync' from the database. We save that at
+ # the device level and do not want it showing up during a re-sync
+ # during data compares
+
+ if class_id == OntData.class_id:
+ return
+
+ attributes = {k: v for k, v in omci_msg['object_data'].items()}
+
+ # Save to the database
+ self._database.set(self._device_id, class_id, entity_id, attributes)
+
+ except KeyError:
+ pass # NOP
+ except Exception as e:
+ self.log.exception('upload-next', e=e)
+
+ def on_create_response(self, _topic, msg):
+ """
+ Process a Set response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-create-response', state=self.state)
+
+ if self._subscriptions[RxEvent.Create]:
+ if self.state in ['disabled', 'uploading']:
+ self.log.error('rx-in-invalid-state', state=self.state)
+ return
+ try:
+ request = msg[TX_REQUEST_KEY]
+ response = msg[RX_RESPONSE_KEY]
+
+ if response.fields['omci_message'].fields['success_code'] != RC.Success:
+ # TODO: Support offline ONTs in post VOLTHA v1.3.0
+ omci_msg = response.fields['omci_message']
+ self.log.warn('set-response-failure',
+ class_id=omci_msg.fields['entity_class'],
+ instance_id=omci_msg.fields['entity_id'],
+ status=omci_msg.fields['status_code'],
+ status_text=self._status_to_text(omci_msg.fields['status_code']),
+ parameter_error_attributes_mask=omci_msg.fields['parameter_error_attributes_mask'])
+ else:
+ omci_msg = request.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ entity_id = omci_msg['entity_id']
+ attributes = {k: v for k, v in omci_msg['data'].items()}
+
+ # Save to the database
+ created = self._database.set(self._device_id, class_id, entity_id, attributes)
+
+ if created:
+ self.increment_mib_data_sync()
+
+ except KeyError as e:
+ pass # NOP
+ except Exception as e:
+ self.log.exception('create', e=e)
+
+ def on_delete_response(self, _topic, msg):
+ """
+ Process a Delete response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-delete-response', state=self.state)
+
+ if self._subscriptions[RxEvent.Delete]:
+ if self.state in ['disabled', 'uploading']:
+ self.log.error('rx-in-invalid-state', state=self.state)
+ return
+ try:
+ request = msg[TX_REQUEST_KEY]
+ response = msg[RX_RESPONSE_KEY]
+
+ if response.fields['omci_message'].fields['success_code'] != RC.Success:
+ # TODO: Support offline ONTs in post VOLTHA v1.3.0
+ omci_msg = response.fields['omci_message']
+ self.log.warn('set-response-failure',
+ class_id=omci_msg.fields['entity_class'],
+ instance_id=omci_msg.fields['entity_id'],
+ status=omci_msg.fields['status_code'],
+ status_text=self._status_to_text(omci_msg.fields['status_code']))
+ else:
+ omci_msg = request.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ entity_id = omci_msg['entity_id']
+
+ # Remove from the database
+ deleted = self._database.delete(self._device_id, class_id, entity_id)
+
+ if deleted:
+ self.increment_mib_data_sync()
+
+ except KeyError as e:
+ pass # NOP
+ except Exception as e:
+ self.log.exception('delete', e=e)
+
+ def on_set_response(self, _topic, msg):
+ """
+ Process a Set response
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.info('on-set-response', state=self.state)
+
+ if self._subscriptions[RxEvent.Set]:
+ if self.state in ['disabled', 'uploading']:
+ self.log.error('rx-in-invalid-state', state=self.state)
+ try:
+ request = msg[TX_REQUEST_KEY]
+ response = msg[RX_RESPONSE_KEY]
+
+ if response.fields['omci_message'].fields['success_code'] != RC.Success:
+ # TODO: Support offline ONTs in post VOLTHA v1.3.0
+ omci_msg = response.fields['omci_message']
+ self.log.warn('set-response-failure',
+ class_id=omci_msg.fields['entity_class'],
+ instance_id=omci_msg.fields['entity_id'],
+ status=omci_msg.fields['status_code'],
+ status_text=self._status_to_text(omci_msg.fields['status_code']),
+ unsupported_attribute_mask=omci_msg.fields['unsupported_attributes_mask'],
+ failed_attribute_mask=omci_msg.fields['failed_attributes_mask'])
+ else:
+ omci_msg = request.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ entity_id = omci_msg['entity_id']
+ attributes = {k: v for k, v in omci_msg['data'].items()}
+
+ # Save to the database
+ self._database.set(self._device_id, class_id, entity_id, attributes)
+ self.increment_mib_data_sync()
+
+ except KeyError as e:
+ pass # NOP
+ except Exception as e:
+ self.log.exception('set', e=e)
+
+ def _status_to_text(self, success_code):
+ return {
+ RC.Success: "Success",
+ RC.ProcessingError: "Processing Error",
+ RC.NotSupported: "Not Supported",
+ RC.ParameterError: "Paremeter Error",
+ RC.UnknownEntity: "Unknown Entity",
+ RC.UnknownInstance: "Unknown Instance",
+ RC.DeviceBusy: "Device Busy",
+ RC.InstanceExists: "Instance Exists"
+ }.get(success_code, 'Unknown status code: {}'.format(success_code))
+
+ def query_mib(self, class_id=None, instance_id=None, attributes=None):
+ """
+ Get MIB database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ return self._database.query(self._device_id, class_id=class_id,
+ instance_id=instance_id,
+ attributes=attributes)
diff --git a/voltha/extensions/omci/tasks/__init__.py b/voltha/extensions/omci/tasks/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/extensions/omci/tasks/__init__.py
diff --git a/voltha/extensions/omci/tasks/get_mds_task.py b/voltha/extensions/omci/tasks/get_mds_task.py
new file mode 100644
index 0000000..0fa0ae7
--- /dev/null
+++ b/voltha/extensions/omci/tasks/get_mds_task.py
@@ -0,0 +1,103 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_me import OntDataFrame
+from voltha.extensions.omci.omci_defs import ReasonCodes as RC
+
+
+class GetMdsTask(Task):
+ """
+ OpenOMCI Get MIB Data Sync value task
+
+ On successful completion, this task will call the 'callback' method of the
+ deferred returned by the start method and return the value of the MIB
+ Data Sync attribute of the ONT Data ME
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "Get MDS Task"
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(GetMdsTask, self).__init__(GetMdsTask.name,
+ omci_agent,
+ device_id,
+ priority=GetMdsTask.task_priority)
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(GetMdsTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(GetMdsTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_mds)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(GetMdsTask, self).stop()
+
+ @inlineCallbacks
+ def perform_get_mds(self):
+ """
+ Get the 'mib_data_sync' attribute of the ONU
+ """
+ self.log.info('perform-get-mds')
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # Request (MDS supplied value does not matter for a 'get' request)
+
+ results = yield device.omci_cc.send(OntDataFrame(mib_data_sync=123).get())
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ self.log.debug('ont-data-mds', status=status)
+
+ assert status == RC.Success, 'Unexpected Response Status: {}'.format(status)
+
+ # Successful if here
+ self.deferred.callback(omci_msg['data']['mib_data_sync'])
+
+ except TimeoutError as e:
+ self.log.warn('get-mds-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('get-mds', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/extensions/omci/tasks/mib_resync_task.py b/voltha/extensions/omci/tasks/mib_resync_task.py
new file mode 100644
index 0000000..923c4ef
--- /dev/null
+++ b/voltha/extensions/omci/tasks/mib_resync_task.py
@@ -0,0 +1,289 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from datetime import datetime
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, returnValue
+from twisted.internet import reactor
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_dict import *
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_entities import OntData
+
+
+class MibCopyException(Exception):
+ pass
+
+
+class MibDownloadException(Exception):
+ pass
+
+
+class MibResyncTask(Task):
+ """
+ OpenOMCI MIB resynchronization Task
+
+ This task should get a copy of the MIB and compare compare it to a
+ copy of the database. When the MIB Upload command is sent to the ONU,
+ it should make a copy and source the data requested from this database.
+ The ONU can still source AVC's and the the OLT can still send config
+ commands to the actual.
+ """
+ task_priority = 240
+ name = "MIB Resynchronization Task"
+
+ max_db_copy_retries = 3
+ db_copy_retry_delay = 7
+
+ max_mib_upload_next_retries = 3
+ mib_upload_next_delay = 10 # Max * delay < 60 seconds
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(MibResyncTask, self).__init__(MibResyncTask.name,
+ omci_agent,
+ device_id,
+ priority=MibResyncTask.task_priority)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._db_active = MibDbVolatileDict(omci_agent)
+ self._db_active.add(device_id)
+
+ def cancel_deferred(self):
+ super(MibResyncTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(MibResyncTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_mib_resync)
+ self._db_active.start()
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ self._db_active.stop()
+ super(MibResyncTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_resync(self):
+ """
+ Perform the MIB Resynchronization sequence
+
+ The sequence to be performed are:
+ - get a copy of the current MIB database (db_copy)
+
+ - perform MIB upload commands to get ONU's database and save this
+ to a local DB (db_active). Note that the ONU can still receive
+ create/delete/set/get operations from the operator and source
+ AVC notifications as well during this period.
+
+ - Compare the information in the db_copy to the db_active
+
+ During the mib upload process, the maximum time between mib upload next
+ requests is 1 minute.
+ """
+ self.log.info('perform-mib-resync')
+
+ # Try at least 3 times to snapshot the current MIB and get the
+ # MIB upload request out so ONU snapshots its database
+
+ db_copy = None
+ number_of_commands = None
+ commands_retrieved = 0
+
+ try:
+ results = yield self.snapshot_mib()
+ db_copy = results[0]
+ number_of_commands = results[1]
+
+ # Start the MIB upload sequence
+ commands_retrieved = yield self.upload_mib(number_of_commands)
+
+ except Exception as e:
+ self.deferred.errback(failure.Failure(e))
+ returnValue(None)
+
+ if db_copy is None:
+ e = MibCopyException('Failed to get local database copy')
+ self.deferred.errback(failure.Failure(e))
+ returnValue('FAILED')
+
+ if commands_retrieved < number_of_commands:
+ e = MibDownloadException('Only retrieved {} of {} instances'.
+ format(commands_retrieved, number_of_commands))
+ self.deferred.errback(failure.Failure(e))
+ returnValue('FAILED')
+
+ # Compare the database
+
+ mib_differences = self.compare_mibs(db_copy,
+ self._db_active.query(self.device_id))
+
+ if mib_differences is None:
+ self.deferred.callback('success')
+ self.deferred.callback('TODO: This task has not been coded.')
+
+ # TODO: Handle mismatches
+ pass
+
+ @inlineCallbacks
+ def snapshot_mib(self):
+ """
+ Snapshot the MIB on the ONU and create a copy of our local MIB database
+
+ :return: (pair) (db_copy, number_of_commands)
+ """
+ db_copy = None
+ number_of_commands = None
+
+ try:
+ max_tries = MibResyncTask.max_db_copy_retries - 1
+
+ for retries in xrange(0, max_tries + 1):
+ # Send MIB Upload so ONU snapshots its MIB
+ try:
+ mib_upload_time = datetime.utcnow()
+ number_of_commands = yield self.send_mib_upload()
+
+ if number_of_commands is None:
+ if retries >= max_tries:
+ break
+
+ except TimeoutError as e:
+ self.log.warn('timeout', e=e)
+ if retries >= max_tries:
+ raise
+
+ yield asleep(MibResyncTask.db_copy_retry_delay)
+ continue
+
+ # Get a snapshot of the local MIB database
+ db_copy = self._device.query_mib()
+
+ if db_copy is None or db_copy[MODIFIED_KEY] > mib_upload_time:
+ if retries >= max_tries:
+ break
+
+ yield asleep(MibResyncTask.db_copy_retry_delay)
+ continue
+ break
+
+ except Exception as e:
+ self.log.exception('mib-resync', e=e)
+ raise
+
+ # Handle initial failures
+
+ if db_copy is None or number_of_commands is None:
+ raise MibCopyException('Failed to snapshot MIB copy after {} retries'.
+ format(MibResyncTask.max_db_copy_retries))
+
+ returnValue((db_copy, number_of_commands))
+
+ @inlineCallbacks
+ def send_mib_upload(self):
+ """
+ Perform MIB upload command and get the number of entries to retrieve
+
+ :return: (int) Number of commands to execute or None on error
+ """
+ ########################################
+ # Begin MIB Upload
+ try:
+ results = yield self._device.omci_cc.send_mib_upload()
+ number_of_commands = results.fields['omci_message'].fields['number_of_commands']
+
+ if number_of_commands is None or number_of_commands <= 0:
+ raise ValueError('Number of commands was {}'.format(number_of_commands))
+
+ returnValue(number_of_commands)
+
+ except TimeoutError as e:
+ self.log.warn('mib-resync-get-timeout', e=e)
+ raise
+
+ @inlineCallbacks
+ def upload_mib(self, number_of_commands):
+ ########################################
+ # Begin MIB Upload
+
+ seq_no = None
+
+ for seq_no in xrange(number_of_commands):
+ max_tries = MibResyncTask.max_mib_upload_next_retries - 1
+
+ for retries in xrange(0, max_tries + 1):
+ try:
+ response = yield self._device.omci_cc.send_mib_upload_next(seq_no)
+
+ omci_msg = response.fields['omci_message'].fields
+ class_id = omci_msg['object_entity_class']
+ entity_id = omci_msg['object_entity_id']
+
+ # Filter out the 'mib_data_sync' from the database. We save that at
+ # the device level and do not want it showing up during a re-sync
+ # during data compares
+
+ if class_id == OntData.class_id:
+ pass # TODO: Save to a local variable
+
+ attributes = {k: v for k, v in omci_msg['object_data'].items()}
+
+ # Save to the database
+ self._db_active.set(self.device_id, class_id, entity_id, attributes)
+
+ except TimeoutError as e:
+ self.log.warn('mib-resync-timeout', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ if retries >= max_tries:
+ raise
+
+ yield asleep(MibResyncTask.mib_upload_next_delay)
+ continue
+
+ returnValue(seq_no)
+
+ def compare_mibs(self, db_copy, db_active):
+ """
+ Compare the our db_copy with the ONU's active copy
+ :param db_copy: (dict) OpenOMCI's copy of the database
+ :param db_active: (dict) ONU's database snapshot
+ :return: (dict) Difference dictionary
+ """
+ return None # TODO: Do this
+ # TODO: Note that certain MEs are excluded from the MIB upload. In particular,
+ # instances of some gneeral purpose MEs, such as the Managed Entity ME and
+ # and the Attribute ME are not included in the MIB upload. Also all table
+ # attributes are not included in the MIB upload (but we do not yet support
+ # tables in this OpenOMCI implementation (VOLTHA v1.3.0)
diff --git a/voltha/extensions/omci/tasks/mib_upload.py b/voltha/extensions/omci/tasks/mib_upload.py
new file mode 100644
index 0000000..8f442cd
--- /dev/null
+++ b/voltha/extensions/omci/tasks/mib_upload.py
@@ -0,0 +1,118 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from twisted.internet import reactor
+
+
+class MibUploadTask(Task):
+ """
+ OpenOMCI MIB upload task
+
+ On successful completion, this task will call the 'callback' method of the
+ deferred returned by the start method. Only a textual message is provided as
+ the successful results and it lists the number of ME entities successfully
+ retrieved.
+
+ Note that the MIB Synchronization State Machine will get event subscription
+ information for the MIB Reset and MIB Upload Next requests and it is the
+ MIB Synchronization State Machine that actually populates the MIB Database.
+ """
+ task_priority = 250
+ name = "MIB Upload Task"
+
+ def __init__(self, omci_agent, device_id):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ """
+ super(MibUploadTask, self).__init__(MibUploadTask.name,
+ omci_agent,
+ device_id,
+ priority=MibUploadTask.task_priority)
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(MibUploadTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(MibUploadTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_mib_upload)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(MibUploadTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_upload(self):
+ """
+ Perform the MIB Upload sequence
+ """
+ self.log.info('perform-mib-upload')
+
+ seq_no = 0
+ number_of_commands = 0
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # MIB Reset
+ yield device.omci_cc.send_mib_reset()
+
+ ########################################
+ # Begin MIB Upload
+ results = yield device.omci_cc.send_mib_upload()
+ number_of_commands = results.fields['omci_message'].fields['number_of_commands']
+
+ for seq_no in xrange(number_of_commands):
+ if not device.active or not device.omci_cc.enabled:
+ self.deferred.errback(failure.Failure(
+ GeneratorExit('OMCI and/or ONU is not active')))
+ return
+ yield device.omci_cc.send_mib_upload_next(seq_no)
+
+ # Successful if here
+ self.log.info('mib-synchronized')
+ self.deferred.callback('success, loaded {} ME Instances'.
+ format(number_of_commands))
+
+ except TimeoutError as e:
+ self.log.warn('mib-upload-timeout', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('mib-upload', e=e)
+ self.deferred.errback(failure.Failure(e))
+
diff --git a/voltha/extensions/omci/tasks/task.py b/voltha/extensions/omci/tasks/task.py
new file mode 100644
index 0000000..513d563
--- /dev/null
+++ b/voltha/extensions/omci/tasks/task.py
@@ -0,0 +1,104 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet import defer
+
+
+class Task(object):
+ """
+ OpenOMCI Base Task implementation
+
+ An OMCI task can be one or more OMCI requests, comparisons, or whatever
+ is needed to do a specific unit of work that needs to be ran to completion
+ successfully.
+
+ On successful completion, the task should called the 'callback' method of
+ the deferred and pass back whatever is meaningful to the user/state-machine
+ that launched it.
+
+ On failure, the 'errback' routine should be called with an appropriate
+ Failure object.
+ """
+ DEFAULT_PRIORITY = 128
+ _next_task_id = 0
+
+ def __init__(self, name, omci_agent, device_id, priority=DEFAULT_PRIORITY,
+ exclusive=True):
+ """
+ Class initialization
+
+ :param name: (str) Task Name
+ :param device_id: (str) ONU Device ID
+ :param priority: (int) Task priority (0..255) 255 Highest
+ :param exclusive: (bool) If True, this task needs exclusive access to the
+ OMCI Communications channel when it runs
+ """
+ assert 0 <= priority <= 255, 'Priority should be 0..255'
+
+ Task._next_task_id += 1
+ self._task_id = Task._next_task_id
+ self.log = structlog.get_logger(device_id=device_id, name=name,
+ task_id=self._task_id)
+ self.name = name
+ self.device_id = device_id
+ self.omci_agent = omci_agent
+ self._exclusive = exclusive
+ self._deferred = defer.Deferred() # Fires upon completion
+ self._priority = priority
+
+ def __str__(self):
+ return 'Task: {}, ID:{}'.format(self.name, self.task_id)
+
+ @property
+ def priority(self):
+ return self._priority
+
+ @property
+ def task_id(self):
+ return self._task_id
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ @property
+ def deferred(self):
+ return self._deferred
+
+ def cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start task operations
+ """
+ self.log.debug('starting')
+ assert self._deferred is not None and not self._deferred.called, \
+ 'Cannot re-use the same task'
+
+ def stop(self):
+ """
+ Stop task synchronization
+ """
+ self.log.debug('stopping')
+ self.cancel_deferred()
+ self.omci_agent = None # Should only start/stop once
+
diff --git a/voltha/extensions/omci/tasks/task_runner.py b/voltha/extensions/omci/tasks/task_runner.py
new file mode 100644
index 0000000..7162a8c
--- /dev/null
+++ b/voltha/extensions/omci/tasks/task_runner.py
@@ -0,0 +1,248 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from twisted.internet import reactor
+
+
+class TaskRunner(object):
+ """
+ Control the number of running tasks utilizing the OMCI Communications
+ channel (OMCI_CC
+ """
+ def __init__(self, device_id):
+ self.log = structlog.get_logger(device_id=device_id)
+ self._pending_queue = dict() # task-priority -> [tasks]
+ self._running_queue = dict() # task-id -> task
+ self._active = False
+
+ self._successful_tasks = 0
+ self._failed_tasks = 0
+
+ def __str__(self):
+ return 'TaskRunner: Pending: {}, Running:{}'.format(self.pending_tasks,
+ self.running_tasks)
+
+ @property
+ def active(self):
+ return self._active
+
+ @property
+ def pending_tasks(self):
+ """
+ Get the number of tasks pending to run
+ """
+ count = 0
+ for tasks in self._pending_queue.itervalues():
+ count += len(tasks)
+ return count
+
+ @property
+ def running_tasks(self):
+ """
+ Get the number of tasks currently running
+ """
+ return len(self._running_queue)
+
+ @property
+ def successful_tasks_completed(self):
+ return self._successful_tasks
+
+ @property
+ def failed_tasks(self):
+ return self._failed_tasks
+
+ # TODO: add properties for various stats as needed
+
+ def start(self):
+ """
+ Start the Task runner
+ """
+ self.log.debug('starting', active=self._active)
+
+ if not self._active:
+ assert len(self._running_queue) == 0, 'Running task queue not empty'
+ self._active = True
+ self._run_next_task()
+
+ def stop(self):
+ """
+ Stop the Task runner, first stopping any tasks and flushing the queue
+ """
+ self.log.debug('stopping', active=self._active)
+
+ if self._active:
+ self._active = False
+
+ pq, self._pending_queue = self._pending_queue, dict()
+ rq, self._running_queue = self._running_queue, dict()
+
+ # Stop running tasks
+ for task in rq.itervalues():
+ try:
+ task.stop()
+ except:
+ pass
+
+ # Kill pending tasks
+ for d in pq.iterkeys():
+ try:
+ d.cancel()
+ except:
+ pass
+
+ def _run_next_task(self):
+ """
+ Search for next task to run, if one can
+ :return:
+ """
+ self.log.debug('run-next', active=self._active, pending=len(self._pending_queue))
+
+ if self._active and len(self._pending_queue) > 0:
+ # Cannot run a new task if a running one needs the OMCI_CC exclusively
+
+ if any(task.exclusive for task in self._running_queue.itervalues()):
+ self.log.debug('exclusive-running')
+ return # An exclusive task is already running
+
+ try:
+ priorities = [k for k in self._pending_queue.iterkeys()]
+ priorities.sort(reverse=True)
+ highest_priority = priorities[0] if len(priorities) else None
+
+ if highest_priority is not None:
+ queue = self._pending_queue[highest_priority]
+ next_task = queue[0] if len(queue) else None
+
+ if next_task is not None:
+ if next_task.exclusive and len(self._running_queue) > 0:
+ self.log.debug('next-is-exclusive', task=str(next_task))
+ return # Next task to run needs exclusive access
+
+ queue.pop(0)
+ if len(queue) == 0:
+ del self._pending_queue[highest_priority]
+
+ self.log.debug('starting-task', task=str(next_task))
+ self._running_queue[next_task.task_id] = next_task
+ reactor.callLater(0, next_task.start)
+
+ # Run again if others are waiting
+ if len(self._pending_queue):
+ self._run_next_task()
+
+ except Exception as e:
+ self.log.exception('run-next', e=e)
+
+ def _on_task_success(self, results, task):
+ """
+ A task completed successfully callback
+ :param results: deferred results
+ :param task: (Task) The task that succeeded
+ :return: deferred results
+ """
+ self.log.debug('task-success', task_id=task.task_id)
+ try:
+ assert task is not None and task.task_id in self._running_queue,\
+ 'Task not found in running queue'
+
+ self._successful_tasks += 1
+ del self._running_queue[task.task_id]
+
+ except Exception as e:
+ self.log.exception('task-error', e=e)
+
+ finally:
+ reactor.callLater(0, self._run_next_task)
+
+ return results
+
+ def _on_task_failure(self, failure, task):
+ """
+ A task completed with failure callback
+ :param failure: (Failure) Failure results
+ :param task: (Task) The task that failed
+ :return: (Failure) Failure results
+ """
+ try:
+ assert task is not None and task.task_id in self._running_queue,\
+ 'Task not found in running queue'
+
+ self._failed_tasks += 1
+ del self._running_queue[task.task_id]
+
+ reactor.callLater(0, self._run_next_task)
+
+ except Exception as e:
+ # Check the pending queue
+
+ for priority, tasks in self._pending_queue.iteritems():
+ found = next((t for t in tasks if t.task_id == task.task_id), None)
+
+ if found is not None:
+ self._pending_queue[task.priority].remove(task)
+ if len(self._pending_queue[task.priority]) == 0:
+ del self._pending_queue[task.priority]
+ return failure
+
+ self.log.exception('task-error', e=e)
+ raise
+
+ finally:
+ reactor.callLater(0, self._run_next_task)
+
+ return failure
+
+ def queue_task(self, task):
+ """
+ Place a task on the queue to run
+
+ :param task: (Task) task to run
+ :return: (deferred) Deferred that will fire on task completion
+ """
+ self.log.debug('queue-task', active=self._active, task=str(task))
+
+ if task.priority not in self._pending_queue:
+ self._pending_queue[task.priority] = []
+
+ task.deferred.addCallbacks(self._on_task_success, self._on_task_failure,
+ callbackArgs=[task], errbackArgs=[task])
+
+ self._pending_queue[task.priority].append(task)
+ self._run_next_task()
+
+ return task.deferred
+
+ def cancel_task(self, task_id):
+ """
+ Cancel a pending or running task. The cancel method will be called
+ for the task's deferred
+
+ :param task_id: (int) Task identifier
+ """
+ task = self._running_queue.get(task_id, None)
+
+ if task is not None:
+ task.stop()
+ reactor.callLater(0, self._run_next_task)
+
+ else:
+ for priority, tasks in self._pending_queue.iteritems():
+ task = next((t for t in tasks if t.task_id == task_id), None)
+
+ if task is not None:
+ task.deferred.cancel()
+ return
+