VOL-868: Improved MIB Database for created ME instances
Change-Id: I37ff1eeb97c819166d78bcd4fdd518716cfb9b9c
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_task.py b/tests/utests/voltha/extensions/omci/mock/mock_task.py
index 0d85f64..499a945 100644
--- a/tests/utests/voltha/extensions/omci/mock/mock_task.py
+++ b/tests/utests/voltha/extensions/omci/mock/mock_task.py
@@ -48,8 +48,6 @@
self._success = success
self._value = value
self._local_deferred = None
- self._running = False
-
def cancel_deferred(self):
super(SimpleTask, self).cancel_deferred()
@@ -81,8 +79,6 @@
Get the 'mib_data_sync' attribute of the ONU
"""
try:
- running = True
-
if self._delay > 0:
yield asleep(self._delay)
@@ -90,8 +86,6 @@
self.deferred.callback(self._value)
self.deferred.errback(failure.Failure(self._value))
- running = False
except Exception as e:
- running = False
self.deferred.errback(failure.Failure(e))
diff --git a/tests/utests/voltha/extensions/omci/test_mib_resync_task.py b/tests/utests/voltha/extensions/omci/test_mib_resync_task.py
new file mode 100644
index 0000000..d598c48
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_mib_resync_task.py
@@ -0,0 +1,372 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from unittest import main, TestCase
+from voltha.extensions.omci.omci_entities import *
+from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict as OnuDB
+from voltha.extensions.omci.database.mib_db_ext import MibDbExternal as OltDB
+from mock.mock_adapter_agent import MockAdapterAgent, MockDevice
+
+_DEVICE_ID = 'br-549'
+
+
+class TestOmciMibResyncTask(TestCase):
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+ self.adapter_agent.add_device(MockDevice(_DEVICE_ID)) # For Entity class lookups
+
+ self.onu_db = OnuDB(self.adapter_agent)
+ self.olt_db = OltDB(self.adapter_agent)
+
+ self.onu_db.start()
+ self.olt_db.start()
+
+ self.olt_db.add(_DEVICE_ID)
+ self.onu_db.add(_DEVICE_ID)
+
+ self.task = MibResyncTask(self.adapter_agent, _DEVICE_ID)
+
+ def tearDown(self):
+ self.onu_db.stop()
+ self.olt_db.stop()
+
+ def test_not_same_type_dbs(self):
+ #
+ # OLT DB is a copy of the 'external' DB, ONU is a volatile DB
+ #
+ self.assertNotEqual(type(self.olt_db), type(self.onu_db))
+
+ def test_db_same_format_str_field_serialization(self):
+ class_id = OltG.class_id
+ inst_id = 0
+ attributes = {
+ 'olt_vendor_id': 'ABCD', # StrFixedLenField(4)
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_mac_address_ip_field_serialization(self):
+ class_id = IpHostConfigData.class_id
+ inst_id = 0
+ attributes = {
+ 'mac_address': '00:01:02:03:04:05', # MACField
+ 'ip_address': '1.2.3.4', # IPField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_byte_and_short_field_serialization(self):
+ class_id = UniG.class_id
+ inst_id = 0
+ attributes = {
+ 'administrative_state': int(1), # ByteField
+ 'non_omci_management_identifier': int(12345) # IPField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_int_field_serialization(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'related_port': int(1234567) # IntField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_long_field_serialization(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'packet_drop_queue_thresholds': int(0x1234) # LongField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_bit_field_serialization(self):
+ class_id = OntG.class_id
+ inst_id = 0
+ attributes = {
+ 'extended_tc_layer_options': long(0x1234), # BitField(16)
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_list_field_serialization(self):
+ class_id = VlanTaggingFilterData.class_id
+ inst_id = 0
+ vlan_filter_list = [0] * 12
+ vlan_filter_list[0] = 0x1234
+
+ attributes = {
+ 'vlan_filter_list': vlan_filter_list, # FieldListField
+ 'forward_operation': 0,
+ 'number_of_entries': 1
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_db_same_format_complex_json_serialization(self):
+ class_id = ExtendedVlanTaggingOperationConfigurationData.class_id
+ inst_id = 0x202
+ table_data = VlanTaggingOperation(
+ filter_outer_priority=15,
+ filter_inner_priority=8,
+ filter_inner_vid=1024,
+ filter_inner_tpid_de=5,
+ filter_ether_type=0,
+ treatment_tags_to_remove=1,
+ pad3=2,
+ treatment_outer_priority=15,
+ treatment_inner_priority=8,
+ treatment_inner_vid=1024,
+ treatment_inner_tpid_de=4
+ )
+ attributes = dict(
+ received_frame_vlan_tagging_operation_table=table_data
+ )
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_on_olt_only(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'related_port': int(1234567) # IntField
+ }
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 1)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+ self.assertEqual(olt_only, [(class_id, inst_id)])
+
+ # Now a little more complex (extra instance on the OLT
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id + 1, attributes)
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 1)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+ self.assertEqual(olt_only, [(class_id, inst_id + 1)])
+
+ def test_on_onu_only(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'related_port': int(1234567) # IntField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 1)
+ self.assertEqual(len(attr_diffs), 0)
+ self.assertEqual(onu_only, [(class_id, inst_id)]) # Test contents of what was returned
+
+ # Now a little more complex (extra instance on the ONU
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id + 1, attributes)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 1)
+ self.assertEqual(len(attr_diffs), 0)
+ self.assertEqual(onu_only, [(class_id, inst_id + 1)]) # Test contents of what was returned
+
+ def test_on_attr_different_value(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes_olt = {
+ 'weight': int(12) # ByteField
+ }
+ attributes_onu = {
+ 'weight': int(34) # ByteField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes_onu)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes_olt)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 1)
+ self.assertEqual(attr_diffs, [(class_id, inst_id, 'weight')])
+
+ def test_ignore_read_only_attribute_differences(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes_olt = {
+ 'related_port': int(1234), # IntField (R/O)
+ 'maximum_queue_size': int(222) # Only on OLT but read-only
+ }
+ attributes_onu = {
+ 'related_port': int(5678) # IntField (R/O)
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes_onu)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes_olt)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 0)
+
+ def test_on_attr_more_on_olt(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes_olt = {
+ 'related_port': int(1234), # IntField
+ 'back_pressure_time': int(1234) # IntField
+ }
+ attributes_onu = {
+ 'related_port': int(1234) # IntField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes_onu)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes_olt)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 1)
+ self.assertEqual(attr_diffs, [(class_id, inst_id, 'back_pressure_time')])
+
+ def test_on_attr_more_on_onu(self):
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes_olt = {
+ 'related_port': int(1234) # IntField
+ }
+ attributes_onu = {
+ 'related_port': int(1234), # IntField
+ 'back_pressure_time': int(5678) # IntField
+ }
+ self.onu_db.set(_DEVICE_ID, class_id, inst_id, attributes_onu)
+ self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes_olt)
+
+ db_copy = self.olt_db.query(_DEVICE_ID)
+ db_active = self.onu_db.query(_DEVICE_ID)
+
+ olt_only, onu_only, attr_diffs = self.task.compare_mibs(db_copy, db_active)
+
+ self.assertEqual(len(olt_only), 0)
+ self.assertEqual(len(onu_only), 0)
+ self.assertEqual(len(attr_diffs), 1)
+ self.assertEqual(attr_diffs, [(class_id, inst_id, 'back_pressure_time')])
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/utests/voltha/extensions/omci/test_omci.py b/tests/utests/voltha/extensions/omci/test_omci.py
index a702c88..de851c3 100644
--- a/tests/utests/voltha/extensions/omci/test_omci.py
+++ b/tests/utests/voltha/extensions/omci/test_omci.py
@@ -1141,5 +1141,22 @@
)
self.assertGeneratedFrameEquals(frame, ref)
+ def test_omci_entity_ids(self):
+ from voltha.extensions.omci.omci_entities import entity_classes
+
+ # For Entity Classes that have a Managed Entity ID with Set-By-Create
+ # access, verify that the attribute name matches 'managed_entity_id'
+ #
+ # This is critical for the MIB Synchronizer state machine as it needs
+ # to backfill Set-By-Create attributes when it sees a Create response
+ # but it needs to ignore the 'managed_entity_id' attribute (by name).
+
+ for entity in entity_classes:
+ mei_attr = entity.attributes[0]
+ self.assertIsNotNone(mei_attr)
+ self.assertTrue(AA.SBC not in mei_attr.access or
+ mei_attr.field.name == 'managed_entity_id')
+
+
if __name__ == '__main__':
main()
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index 2a7fc5f..210d435 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -110,8 +110,10 @@
expected)
return results
- def _default_errback(self, _failure):
- assert False
+ def _default_errback(self, failure):
+ from twisted.internet.defer import TimeoutError
+ assert isinstance(failure.type, type(TimeoutError))
+ return None
def _snapshot_stats(self):
omci_cc = self.onu_handler.omci_cc
diff --git a/voltha/extensions/omci/omci_entities.py b/voltha/extensions/omci/omci_entities.py
index 0e4f52d..9b627e2 100644
--- a/voltha/extensions/omci/omci_entities.py
+++ b/voltha/extensions/omci/omci_entities.py
@@ -574,8 +574,9 @@
ECA(StrFixedLenField("received_frame_vlan_tagging_operation_table",
VlanTaggingOperation, 16), {AA.R, AA.W}),
ECA(ShortField("associated_me_pointer", None), {AA.R, AA.W, AA.SBC}),
- ECA(StrFixedLenField("dscp_to_p_bit_mapping", None, length=24),
- {AA.R, AA.W}), # TODO: Would a custom 3-bit group bitfield work better?
+ ECA(FieldListField("dscp_to_p_bit_mapping", None,
+ BitField('', 0, size=3), count_from=lambda _: 64),
+ {AA.R, AA.W}),
]
mandatory_operations = {OP.Create, OP.Delete, OP.Set, OP.Get, OP.GetNext}
optional_operations = {OP.SetTable}
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 0deca63..955c484 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -63,23 +63,33 @@
self._runner = TaskRunner(device_id) # OMCI_CC Task runner
self._deferred = None
self._first_in_sync = False
+
+ # OMCI related databases are on a per-agent basis. State machines and tasks
+ # are per ONU Vendor
+ #
self._support_classes = support_classes
try:
+ # MIB Synchronization state machine
self._mib_db_in_sync = False
mib_synchronizer_info = support_classes.get('mib-synchronizer')
- self.mib_sync = mib_synchronizer_info['state-machine'](self._omci_agent,
- device_id,
- mib_synchronizer_info['tasks'],
- mib_db)
+ self._mib_sync_sm = mib_synchronizer_info['state-machine'](self._omci_agent,
+ device_id,
+ mib_synchronizer_info['tasks'],
+ mib_db)
except Exception as e:
self.log.exception('mib-sync-create-failed', e=e)
raise
- self._state_machines = []
- self._on_start_state_machines = [self.mib_sync] # Run when 'start()' called
- self._on_sync_state_machines = [] # Run after first in_sync event
+ # Put state machines in the order you wish to start them
+ self._state_machines = []
+ self._on_start_state_machines = [ # Run when 'start()' called
+ self._mib_sync_sm,
+ ]
+ self._on_sync_state_machines = [ # Run after first in_sync event
+
+ ]
self._custom_me_map = custom_me_map
self._me_map = omci_entities.entity_id_to_class_map.copy()
@@ -117,7 +127,11 @@
@property
def mib_synchronizer(self):
- return self.mib_sync
+ """
+ Reference to the OpenOMCI MIB Synchronization state machine for this ONU
+ """
+ return self._mib_sync_sm
+
@property
def active(self):
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 1732fe2..e8cfaf8 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -20,7 +20,6 @@
from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
-
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry
OpenOmciAgentDefaults = {
@@ -68,14 +67,15 @@
self._started = False
self._devices = dict() # device-id -> DeviceEntry
- # MIB Synchronization
+ # OMCI related databases are on a per-agent basis. State machines and tasks
+ # are per ONU Vendore
+ #
+ # MIB Synchronization Database
self._mib_db = None
- self._mib_synchronizer_info = support_classes['mib-synchronizer']
- self._mib_database_cls = self._mib_synchronizer_info['database']
+ self._mib_database_cls = support_classes['mib-synchronizer']['database']
- # Alarm Synchronization # TODO: Stretch goal for VOLTHA v1.3.0
+ # Alarm Synchronization Database # TODO: Stretch goal for VOLTHA v1.3.0
# self._alarm_db = None
- # self._alarm_synchronizer_info = support_classes['alarm-synchronizer']
# self._alarm_database_cls = self._alarm_synchronizer_info['database']
@property
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index 8b60846..3e1bad1 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -18,16 +18,17 @@
from transitions import Machine
from twisted.internet import reactor
from voltha.extensions.omci.omci_frame import OmciFrame
-from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
+ AttributeAccess
from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
RX_RESPONSE_KEY
from voltha.extensions.omci.omci_entities import OntData
from common.event_bus import EventBusClient
-
RxEvent = OmciCCRxEvents
OP = EntityOperations
RC = ReasonCodes
+AA = AttributeAccess
class MibSynchronizer(object):
@@ -661,11 +662,52 @@
if created:
self.increment_mib_data_sync()
+ # If the ME contains set-by-create or writeable values that were
+ # not specified in the create command, the ONU will have
+ # initialized those fields
+
+ if class_id in self._device.me_map:
+ sbc_w_set = {attr.field.name for attr in self._device.me_map[class_id].attributes
+ if (AA.SBC in attr.access or AA.W in attr.access)
+ and attr.field.name != 'managed_entity_id'}
+
+ missing = sbc_w_set - {k for k in attributes.iterkeys()}
+
+ if len(missing):
+ # Request the missing attributes
+ self.update_sbc_w_items(class_id, entity_id, missing)
+
except KeyError as e:
pass # NOP
+
except Exception as e:
self.log.exception('create', e=e)
+ def update_sbc_w_items(self, class_id, entity_id, missing_attributes):
+ """
+ Perform a get-request for Set-By-Create (SBC) or writable (w) attributes
+ that were not specified in the original Create request.
+
+ :param class_id: (int) Class ID
+ :param entity_id: (int) Instance ID
+ :param missing_attributes: (set) Missing SBC or Writable attribute
+ """
+ if len(missing_attributes) and class_id in self._device.me_map:
+ from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
+
+ def success(results):
+ self._database.set(self._device_id, class_id, entity_id, results.attributes)
+
+ def failure(reason):
+ self.log.warn('update-sbc-w-failed', reason=reason, class_id=class_id,
+ entity_id=entity_id, attributes=missing_attributes)
+
+ d = self._device.task_runner.queue_task(OmciGetRequest(self._agent, self._device_id,
+ self._device.me_map[class_id],
+ entity_id, missing_attributes,
+ allow_failure=True))
+ d.addCallbacks(success, failure)
+
def on_delete_response(self, _topic, msg):
"""
Process a Delete response
@@ -740,8 +782,10 @@
attributes = {k: v for k, v in omci_msg['data'].items()}
# Save to the database
- self._database.set(self._device_id, class_id, entity_id, attributes)
- self.increment_mib_data_sync()
+ modified = self._database.set(self._device_id, class_id, entity_id, attributes)
+
+ if modified:
+ self.increment_mib_data_sync()
except KeyError as e:
pass # NOP
@@ -773,10 +817,14 @@
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
- :raises DatabaseStateError: If the database is not enabled
+ :raises DatabaseStateError: If the database is not enabled or does not exist
"""
+ from voltha.extensions.omci.database.mib_db_api import DatabaseStateError
+
self.log.debug('query', class_id=class_id,
instance_id=instance_id, attributes=attributes)
+ if self._database is None:
+ raise DatabaseStateError('Database does not yet exist')
return self._database.query(self._device_id, class_id=class_id,
instance_id=instance_id,
diff --git a/voltha/extensions/omci/tasks/get_mds_task.py b/voltha/extensions/omci/tasks/get_mds_task.py
index 4722eb5..e2f11fd 100644
--- a/voltha/extensions/omci/tasks/get_mds_task.py
+++ b/voltha/extensions/omci/tasks/get_mds_task.py
@@ -14,8 +14,8 @@
# limitations under the License.
#
from task import Task
-from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
from voltha.extensions.omci.omci_me import OntDataFrame
from voltha.extensions.omci.omci_defs import ReasonCodes as RC
diff --git a/voltha/extensions/omci/tasks/mib_resync_task.py b/voltha/extensions/omci/tasks/mib_resync_task.py
index 06b45ec..6a7c899 100644
--- a/voltha/extensions/omci/tasks/mib_resync_task.py
+++ b/voltha/extensions/omci/tasks/mib_resync_task.py
@@ -31,6 +31,10 @@
pass
+class MibResyncException(Exception):
+ pass
+
+
class MibResyncTask(Task):
"""
OpenOMCI MIB resynchronization Task
@@ -98,6 +102,10 @@
self._db_active = None
super(MibResyncTask, self).stop()
+ def stop_if_not_running(self):
+ if not self.running:
+ raise MibResyncException('Resync Task was cancelled')
+
@inlineCallbacks
def perform_mib_resync(self):
"""
@@ -169,6 +177,7 @@
# Send MIB Upload so ONU snapshots its MIB
try:
number_of_commands = yield self.send_mib_upload()
+ self.stop_if_not_running()
if number_of_commands is None:
if retries >= max_tries:
@@ -181,6 +190,7 @@
raise
yield asleep(MibResyncTask.db_copy_retry_delay)
+ self.stop_if_not_running()
continue
# Get a snapshot of the local MIB database
@@ -209,6 +219,7 @@
# Begin MIB Upload
try:
results = yield self._device.omci_cc.send_mib_upload()
+ self.stop_if_not_running()
number_of_commands = results.fields['omci_message'].fields['number_of_commands']
if number_of_commands is None or number_of_commands <= 0:
@@ -232,6 +243,7 @@
for retries in xrange(0, max_tries):
try:
response = yield self._device.omci_cc.send_mib_upload_next(seq_no)
+ self.stop_if_not_running()
omci_msg = response.fields['omci_message'].fields
class_id = omci_msg['object_entity_class']
diff --git a/voltha/extensions/omci/tasks/mib_upload.py b/voltha/extensions/omci/tasks/mib_upload.py
index adea4a6..eb0fa6b 100644
--- a/voltha/extensions/omci/tasks/mib_upload.py
+++ b/voltha/extensions/omci/tasks/mib_upload.py
@@ -16,6 +16,13 @@
from task import Task
from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+
+
+class MibUploadFailure(Exception):
+ """
+ This error is raised by default when the upload fails
+ """
class MibUploadTask(Task):
@@ -73,6 +80,10 @@
self.cancel_deferred()
super(MibUploadTask, self).stop()
+ def stop_if_not_running(self):
+ if not self.running:
+ raise MibUploadFailure('Upload Task was cancelled')
+
@inlineCallbacks
def perform_mib_upload(self):
"""
@@ -88,43 +99,49 @@
#########################################
# MIB Reset
- yield device.omci_cc.send_mib_reset()
+ results = yield device.omci_cc.send_mib_reset()
+ self.stop_if_not_running()
+
+ status = results.fields['omci_message'].fields['success_code']
+ if status != ReasonCodes.Success.value:
+ raise MibUploadFailure('MIB Reset request failed with status code: {}'.
+ format(status))
########################################
# Begin MIB Upload
results = yield device.omci_cc.send_mib_upload()
+ self.stop_if_not_running()
number_of_commands = results.fields['omci_message'].fields['number_of_commands']
- failed = False
for seq_no in xrange(number_of_commands):
if not device.active or not device.omci_cc.enabled:
- self.deferred.errback(failure.Failure(
- GeneratorExit('OMCI and/or ONU is not active')))
- return
+ raise MibUploadFailure('OMCI and/or ONU is not active')
for retry in range(0, 3):
try:
- self.log.debug('mib-upload-next-request', seq_no=seq_no, retry=retry, number_of_commands=number_of_commands)
+ self.log.debug('mib-upload-next-request', seq_no=seq_no,
+ retry=retry,
+ number_of_commands=number_of_commands)
yield device.omci_cc.send_mib_upload_next(seq_no)
- self.log.debug('mib-upload-next-success', seq_no=seq_no, number_of_commands=number_of_commands)
- failed = False
+ self.stop_if_not_running()
+ self.log.debug('mib-upload-next-success', seq_no=seq_no,
+ number_of_commands=number_of_commands)
break
except TimeoutError as e:
from common.utils.asleep import asleep
self.log.warn('mib-upload-timeout', e=e, seq_no=seq_no,
number_of_commands=number_of_commands)
- failed = True
- if retry < 2:
- yield asleep(0.3)
+ if retry >= 2:
+ raise MibUploadFailure('Upload timeout failure on req {} of {}'.
+ format(seq_no + 1, number_of_commands))
+ yield asleep(0.3)
+ self.stop_if_not_running()
- if not failed:
- # Successful if here
- self.log.info('mib-synchronized')
- self.deferred.callback('success, loaded {} ME Instances'.
- format(number_of_commands))
- else:
- self.deferred.errback(failure.Failure(e))
+ # Successful if here
+ self.log.info('mib-synchronized')
+ self.deferred.callback('success, loaded {} ME Instances'.
+ format(number_of_commands))
except TimeoutError as e:
self.log.warn('mib-upload-timeout-on-reset', e=e, seq_no=seq_no,
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
new file mode 100644
index 0000000..7d12829
--- /dev/null
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -0,0 +1,289 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, returnValue
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_me import MEFrame
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciGet
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class GetException(Exception):
+ pass
+
+
+class OmciGetRequest(Task):
+ """
+ OpenOMCI Get an OMCI ME Instance Attributes
+
+ Upon completion, the Task deferred callback is invoked with a reference of
+ this Task object.
+
+ The Task has an initializer option (allow_failure) that will retry all
+ requested attributes if the original request fails with a status code of
+ 9 (Attributes failed or unknown). This result means that an attribute
+ is not supported by the ONU or that a mandatory/optional attribute could
+ not be executed by the ONU, even if it is supported, for example,
+ because of a range or type violation.
+ """
+ task_priority = 128
+ name = "ONU OMCI Get Task"
+
+ def __init__(self, omci_agent, device_id, entity_class, entity_id, attributes,
+ exclusive=False, allow_failure=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param entity_class: (EntityClass) ME Class to retrieve
+ :param entity_id: (int) ME Class instance ID to retrieve
+ :param attributes: (list or set) Name of attributes to retrieve
+ :param exclusive: (bool) True if this GET request Task exclusively own the
+ OMCI-CC while running. Default: False
+ :param allow_failure: (bool) If true, attempt to get all valid attributes
+ if the original request receives an error
+ code of 9 (Attributes failed or unknown).
+ """
+ super(OmciGetRequest, self).__init__(OmciGetRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciGetRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._entity_class = entity_class
+ self._entity_id = entity_id
+ self._attributes = attributes
+ self._allow_failure = allow_failure
+ self._failed_or_unknown_attributes = set()
+ self._results = None
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciGetRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise MibResyncException('Get Request Task was cancelled')
+
+ @property
+ def attributes(self):
+ """
+ Return a dictionary of attributes for the request if the Get was
+ successfully completed. None otherwise
+ """
+ if self._results is None:
+ return None
+
+ omci_msg = self._results.fields['omci_message'].fields
+ return omci_msg['data'] if 'data' in omci_msg else None
+
+ @property
+ def success_code(self):
+ """
+ Return the OMCI success/reason code for the Get Response.
+ """
+ if self._results is None:
+ return None
+
+ return self._results.fields['omci_message'].fields['success_code']
+
+ @property
+ def raw_results(self):
+ """
+ Return the raw Get Response OMCIFrame
+ """
+ return self._results
+
+ def start(self):
+ """
+ Start MIB Capabilities task
+ """
+ super(OmciGetRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_omci)
+
+ @property
+ def failed_or_unknown_attributes(self):
+ """
+ Returns a set attributes that failed or unknown in the original get
+ request that resulted in an initial status code of 9 (Attributes
+ failed or unknown).
+
+ :return: (set of str) attributes
+ """
+ return self._failed_or_unknown_attributes
+
+ @inlineCallbacks
+ def perform_get_omci(self):
+ """
+ Perform the initial get request
+ """
+ self.log.info('perform-get')
+
+ try:
+ frame = MEFrame(self._entity_class, self._entity_id, self._attributes).get()
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.info('perform-get-status', status=status)
+
+ # Success?
+ if status == RC.Success.value:
+ self._results = results
+ results_omci = results.fields['omci_message'].fields
+
+ # Were all attributes fetched?
+ missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
+ results_omci['attributes_mask']
+
+ if missing_attr > 0:
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_missing_attributes,
+ missing_attr)
+ returnValue(self._local_deferred)
+
+ elif status == RC.AttributeFailure.value:
+ # What failed? Note if only one attribute was attempted, then
+ # that is an overall failure
+
+ if not self._allow_failure or len(self._attributes) <= 1:
+ raise GetException('Get failed with status code: {}'.
+ format(RC.AttributeFailure.value))
+
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_failed_attributes,
+ results,
+ self._attributes)
+ returnValue(self._local_deferred)
+
+ else:
+ raise GetException('Get failed with status code: {}'.format(status))
+
+ self.log.info('get-completed')
+ self.deferred.callback(self)
+
+ except Exception as e:
+ self.log.exception('perform-get', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def perform_get_missing_attributes(self, missing_attr):
+ """
+ This method is called when the original Get requests completes with success
+ but not all attributes were returned. This can happen if one or more of the
+ attributes would have exceeded the space available in the OMCI frame.
+
+ This routine iterates through the missing attributes and attempts to retrieve
+ the ones that were missing.
+
+ :param missing_attr: (set) Missing attributes
+ """
+ self.log.info('perform-get-missing', attrs=missing_attr)
+
+ results_omci = self._results.fields['omci_message'].fields
+
+ for index in xrange(16):
+ attr_mask = 1 << index
+
+ if attr_mask & missing_attr:
+ # Get this attribute
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ try:
+ get_results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ get_omci = get_results.fields['omci_message'].fields
+ if get_omci['success_code'] != RC.Success.value:
+ continue
+
+ assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
+ results_omci['attributes_mask'] |= attr_mask
+
+ if results_omci.get('data') is None:
+ results_omci['data'] = dict()
+
+ results_omci['data'].update(get_omci['data'])
+
+ except Exception as e:
+ self.log.exception('missing-failure', e=e)
+
+ self.deferred.callback(self)
+
+ @inlineCallbacks
+ def perform_get_failed_attributes(self, tmp_results, attributes):
+ """
+
+ :param tmp_results:
+ :param attributes:
+ :return:
+ """
+ self.log.info('perform-get-failed', attrs=attributes)
+
+ for attr in attributes:
+ try:
+ frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
+
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ status = results.fields['omci_message'].fields['success_code']
+
+ if status == RC.AttributeFailure.value:
+ self.log.info('unknown-or-invalid-attribute', attr=attr, status=status)
+ self._failed_or_unknown_attributes.add(attr)
+
+ elif status != RC.Success.value:
+ self.log.warn('invalid-get', class_id=self._entity_class,
+ attribute=attr, status=status)
+ self._failed_or_unknown_attributes.add(attr)
+
+ else:
+ # Add to partial results and correct the status
+ tmp_results.fields['omci_message'].fields['success_code'] = status
+ tmp_results.fields['omci_message'].fields['attributes_mask'] |= \
+ results.fields['omci_message'].fields['attributes_mask']
+
+ if tmp_results.fields['omci_message'].fields.get('data') is None:
+ tmp_results.fields['omci_message'].fields['data'] = dict()
+
+ tmp_results.fields['omci_message'].fields['data'][attr] = \
+ results.fields['omci_message'].fields['data'][attr]
+
+ except Exception as e:
+ self.log.exception('attr-failure', e=e)
+
+ self._results = tmp_results
+ self.deferred.callback(self)
diff --git a/voltha/extensions/omci/tasks/task.py b/voltha/extensions/omci/tasks/task.py
index 513d563..3bae4fc 100644
--- a/voltha/extensions/omci/tasks/task.py
+++ b/voltha/extensions/omci/tasks/task.py
@@ -55,12 +55,15 @@
self.name = name
self.device_id = device_id
self.omci_agent = omci_agent
+ self._running = False
self._exclusive = exclusive
+ # TODO: Should we watch for a cancel on the task's deferred as well?
self._deferred = defer.Deferred() # Fires upon completion
self._priority = priority
def __str__(self):
- return 'Task: {}, ID:{}'.format(self.name, self.task_id)
+ return 'Task: {}, ID:{}, Priority: {}, Exclusive: {}'.format(
+ self.name, self.task_id, self.priority, self.exclusive)
@property
def priority(self):
@@ -78,6 +81,15 @@
def deferred(self):
return self._deferred
+ @property
+ def running(self):
+ # Is the Task running?
+ #
+ # Can be useful for tasks that use inline callbacks to detect
+ # if the task has been canceled.
+ #
+ return self._running
+
def cancel_deferred(self):
d, self._deferred = self._deferred, None
try:
@@ -93,12 +105,13 @@
self.log.debug('starting')
assert self._deferred is not None and not self._deferred.called, \
'Cannot re-use the same task'
+ self._running = True
def stop(self):
"""
Stop task synchronization
"""
self.log.debug('stopping')
+ self._running = False
self.cancel_deferred()
self.omci_agent = None # Should only start/stop once
-
diff --git a/voltha/extensions/omci/tasks/task_runner.py b/voltha/extensions/omci/tasks/task_runner.py
index 7162a8c..e06ae1c 100644
--- a/voltha/extensions/omci/tasks/task_runner.py
+++ b/voltha/extensions/omci/tasks/task_runner.py
@@ -108,7 +108,9 @@
Search for next task to run, if one can
:return:
"""
- self.log.debug('run-next', active=self._active, pending=len(self._pending_queue))
+ self.log.debug('run-next', active=self._active,
+ num_running=len(self._running_queue),
+ num_pending=len(self._pending_queue))
if self._active and len(self._pending_queue) > 0:
# Cannot run a new task if a running one needs the OMCI_CC exclusively
@@ -135,7 +137,10 @@
if len(queue) == 0:
del self._pending_queue[highest_priority]
- self.log.debug('starting-task', task=str(next_task))
+ self.log.debug('starting-task', task=str(next_task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
+
self._running_queue[next_task.task_id] = next_task
reactor.callLater(0, next_task.start)
@@ -153,7 +158,9 @@
:param task: (Task) The task that succeeded
:return: deferred results
"""
- self.log.debug('task-success', task_id=task.task_id)
+ self.log.debug('task-success', task_id=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
try:
assert task is not None and task.task_id in self._running_queue,\
'Task not found in running queue'
@@ -162,7 +169,7 @@
del self._running_queue[task.task_id]
except Exception as e:
- self.log.exception('task-error', e=e)
+ self.log.exception('task-error', task=str(task), e=e)
finally:
reactor.callLater(0, self._run_next_task)
@@ -176,6 +183,9 @@
:param task: (Task) The task that failed
:return: (Failure) Failure results
"""
+ self.log.debug('task-failure', task_id=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
try:
assert task is not None and task.task_id in self._running_queue,\
'Task not found in running queue'
@@ -183,8 +193,6 @@
self._failed_tasks += 1
del self._running_queue[task.task_id]
- reactor.callLater(0, self._run_next_task)
-
except Exception as e:
# Check the pending queue
@@ -197,7 +205,7 @@
del self._pending_queue[task.priority]
return failure
- self.log.exception('task-error', e=e)
+ self.log.exception('task-error', task=str(task), e=e)
raise
finally:
@@ -212,7 +220,9 @@
:param task: (Task) task to run
:return: (deferred) Deferred that will fire on task completion
"""
- self.log.debug('queue-task', active=self._active, task=str(task))
+ self.log.debug('queue-task', active=self._active, task=str(task),
+ running=len(self._running_queue),
+ pending=len(self._pending_queue))
if task.priority not in self._pending_queue:
self._pending_queue[task.priority] = []
@@ -235,7 +245,11 @@
task = self._running_queue.get(task_id, None)
if task is not None:
- task.stop()
+ try:
+ task.stop()
+ except Exception as e:
+ self.log.exception('stop-error', task=str(task), e=e)
+
reactor.callLater(0, self._run_next_task)
else:
@@ -243,6 +257,9 @@
task = next((t for t in tasks if t.task_id == task_id), None)
if task is not None:
- task.deferred.cancel()
+ try:
+ task.deferred.cancel()
+ except Exception as e:
+ self.log.exception('cancel-error', task=str(task), e=e)
return