VOL-609: OpenOMCI External MIB Database and unit-tests
added OpenOMCI protobuf reference to config persist startup
Change-Id: Ie285b1a030b7ea8bfb6cdb6713100dba3c2f3ccc
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
index cfc1ab4..69c7197 100644
--- a/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
+++ b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
@@ -14,6 +14,9 @@
# limitations under the License.
#
+from voltha.core.config.config_root import ConfigRoot
+from voltha.protos.voltha_pb2 import VolthaInstance
+
class MockProxyAddress(object):
def __init__(self, device_id, pon_id, onu_id):
@@ -28,10 +31,20 @@
class MockDevice(object):
def __init__(self, device_id, proxy_address=None, serial_number=None):
+ from voltha.extensions.omci.omci_entities import entity_id_to_class_map
self.id = device_id
self.parent_id = None
self.proxy_address = proxy_address
self.serial_number = serial_number
+ self.me_map = entity_id_to_class_map
+
+
+class MockCore(object):
+ def __init__(self):
+ self.root = ConfigRoot(VolthaInstance())
+
+ def get_proxy(self, path):
+ return self.root.get_proxy(path)
class MockAdapterAgent(object):
@@ -46,7 +59,7 @@
"""
def __init__(self):
self._devices = dict() # device-id -> mock device
- pass
+ self.core = MockCore()
def tearDown(self):
"""Test case cleanup"""
diff --git a/tests/utests/voltha/extensions/omci/test_mib_db_ext.py b/tests/utests/voltha/extensions/omci/test_mib_db_ext.py
new file mode 100644
index 0000000..b28e027
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_mib_db_ext.py
@@ -0,0 +1,461 @@
+
+from unittest import main, TestCase
+
+from voltha.protos.omci_mib_db_pb2 import MibInstanceData, MibClassData, \
+ MibDeviceData, MibAttributeData
+from datetime import datetime
+from voltha.extensions.omci.database.mib_db_ext import *
+from voltha.extensions.omci.database.mib_db_api import MODIFIED_KEY, CREATED_KEY,\
+ DEVICE_ID_KEY, MDS_KEY, LAST_SYNC_KEY
+from mock.mock_adapter_agent import MockAdapterAgent, MockDevice
+from nose.tools import raises, assert_raises
+import time
+
+_DEVICE_ID = 'br-549'
+
+
+class TestOmciMibDb(TestCase):
+
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+ self.adapter_agent.add_device(MockDevice(_DEVICE_ID)) # For Entity class lookups
+ self.db = MibDbExternal(self.adapter_agent)
+
+ def tearDown(self):
+ self.db.stop()
+
+ def test_start_stop(self):
+ # Simple start stop
+ self.assertFalse(self.db.active)
+ self.db.start()
+ self.assertTrue(self.db.active)
+ self.db.stop()
+ self.assertFalse(self.db.active)
+
+ # Start after start still okay
+ self.db.start()
+ self.db.start()
+ self.assertTrue(self.db.active)
+
+ self.db.stop()
+ self.db.stop()
+ self.assertFalse(self.db.active)
+
+ @raises(DatabaseStateError)
+ def test_bad_state_add(self):
+ self.db.add(_DEVICE_ID)
+
+ @raises(DatabaseStateError)
+ def test_bad_state_remove(self):
+ self.db.remove(_DEVICE_ID)
+
+ @raises(DatabaseStateError)
+ def test_bad_state_query_1(self):
+ self.db.query(_DEVICE_ID, 0)
+
+ @raises(DatabaseStateError)
+ def test_bad_state_query_2(self):
+ self.db.query(_DEVICE_ID, 0, 0)
+
+ @raises(DatabaseStateError)
+ def test_bad_state_query_3(self):
+ self.db.query(_DEVICE_ID, 0, 0, 'test')
+
+ @raises(DatabaseStateError)
+ def test_bad_state_set(self):
+ self.db.set(_DEVICE_ID, 0, 0, {'test': 123})
+
+ @raises(DatabaseStateError)
+ def test_bad_state_delete(self):
+ self.db.delete(_DEVICE_ID, 0, 0)
+
+ @raises(KeyError)
+ def test_no_device_query(self):
+ self.db.start()
+ self.db.query(_DEVICE_ID)
+
+ def test_no_device_last_sync(self):
+ self.db.start()
+ # Returns None, not a KeyError
+ value = self.db.get_last_sync(_DEVICE_ID)
+ self.assertIsNone(value)
+
+ def test_no_device_mds(self):
+ self.db.start()
+ # Returns None, not a KeyError
+ value = self.db.get_mib_data_sync(_DEVICE_ID)
+ self.assertIsNone(value)
+
+ @raises(KeyError)
+ def test_no_device_save_last_sync(self):
+ self.db.start()
+ self.db.save_last_sync(_DEVICE_ID, datetime.utcnow())
+
+ @raises(KeyError)
+ def test_no_device_save_mds(self):
+ self.db.start()
+ self.db.save_mib_data_sync(_DEVICE_ID, 123)
+
+ def test_param_types(self):
+ self.db.start()
+ assert_raises(TypeError, self.db.add, 123)
+ assert_raises(TypeError, self.db.remove, 123)
+ assert_raises(TypeError, self.db.query, 123)
+
+ assert_raises(TypeError, self.db.get_mib_data_sync, 123)
+ assert_raises(TypeError, self.db.save_mib_data_sync, 123, 0)
+ assert_raises(TypeError, self.db.save_mib_data_sync, _DEVICE_ID, 'zero')
+
+ assert_raises(TypeError, self.db.get_last_sync, 123)
+ assert_raises(TypeError, self.db.save_last_sync, 123, datetime.utcnow())
+ assert_raises(TypeError, self.db.save_last_sync, _DEVICE_ID, 'bad-date')
+
+ assert_raises(TypeError, self.db.set, 123, 0, 0, {'test': 0})
+ assert_raises(TypeError, self.db.set, None, 0, 0, {'test': 0})
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, None, 0, {'test': 0})
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, 0, None, {'test': 0})
+ assert_raises(TypeError, self.db.set, _DEVICE_ID, 0, 0, None)
+ assert_raises(TypeError, self.db.set, _DEVICE_ID, 0, 0, 'not-a-dict')
+
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, -1, 0, {'test': 0})
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, 0x10000, 0, {'test': 0})
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, 0, -1, {'test': 0})
+ assert_raises(ValueError, self.db.set, _DEVICE_ID, 0, 0x10000, {'test': 0})
+
+ assert_raises(TypeError, self.db.delete, 123, 0, 0)
+ assert_raises(ValueError, self.db.delete, _DEVICE_ID, -1, 0)
+ assert_raises(ValueError, self.db.delete, _DEVICE_ID, 0x10000, 0)
+ assert_raises(ValueError, self.db.delete, _DEVICE_ID, 0, -1)
+ assert_raises(ValueError, self.db.delete, _DEVICE_ID, 0, 0x10000)
+
+ def test_add_remove_device(self):
+ self.db.start()
+
+ # Remove of non-existent device is not an error
+ assert_raises(KeyError, self.db.query, _DEVICE_ID)
+ self.db.remove(_DEVICE_ID)
+
+ start_time = datetime.utcnow()
+ self.db.add(_DEVICE_ID)
+ dev_data = self.db.query(_DEVICE_ID)
+
+ self.assertEqual(dev_data[DEVICE_ID_KEY], _DEVICE_ID)
+ self.assertEquals(dev_data[MDS_KEY], 0)
+ self.assertIsNone(dev_data[LAST_SYNC_KEY])
+ self.assertEqual(dev_data[VERSION_KEY], MibDbExternal.CURRENT_VERSION)
+
+ self.assertGreaterEqual(self.db.created, start_time)
+
+ # Remove it
+ self.db.remove(_DEVICE_ID)
+ assert_raises(KeyError, self.db.query, _DEVICE_ID)
+
+ # Remove of non-existant dev okay
+ self.db.remove(_DEVICE_ID +'abcd')
+
+ # Overwrite tests
+ self.db.add(_DEVICE_ID)
+ assert_raises(KeyError, self.db.add, _DEVICE_ID)
+ self.db.add(_DEVICE_ID, overwrite=True) # This is okay
+
+ def test_mib_data_sync(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+ self.assertEquals(self.db.get_mib_data_sync(_DEVICE_ID), 0)
+
+ self.db.save_mib_data_sync(_DEVICE_ID, 100)
+ self.assertEqual(self.db.get_mib_data_sync(_DEVICE_ID), 100)
+
+ assert_raises(ValueError, self.db.save_mib_data_sync, _DEVICE_ID, -1)
+ assert_raises(ValueError, self.db.save_mib_data_sync, _DEVICE_ID, 256)
+
+ def test_last_sync(self):
+ self.db.start()
+ self.assertIsNone(self.db.get_last_sync(_DEVICE_ID))
+
+ self.db.add(_DEVICE_ID)
+ self.assertIsNone(self.db.get_last_sync(_DEVICE_ID))
+
+ now = datetime.utcnow()
+
+ self.db.save_last_sync(_DEVICE_ID, now)
+ self.assertEqual(self.db.get_last_sync(_DEVICE_ID), now)
+
+ assert_raises(TypeError, self.db.save_last_sync, _DEVICE_ID, 'hello')
+
+ def test_set_and_query(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID) # Base device DB created here
+ time.sleep(0.1)
+
+ class_id = OntG.class_id
+ inst_id = 0
+ attributes = {'vendor_id': 'ABCD'}
+
+ start_time = datetime.utcnow()
+ set_occurred = self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.assertTrue(set_occurred)
+ end_time = datetime.utcnow()
+
+ dev_data = self.db.query(_DEVICE_ID)
+ self.assertEqual(dev_data[DEVICE_ID_KEY], _DEVICE_ID)
+
+ dev_classes = [v for k, v in dev_data.items() if isinstance(k, int)]
+
+ self.assertEqual(len(dev_classes), 1)
+ class_data = dev_classes[0]
+
+ self.assertEqual(class_data[CLASS_ID_KEY], class_id)
+
+ class_insts = [v for k, v in class_data.items() if isinstance(k, int)]
+
+ self.assertEqual(len(class_insts), 1)
+ inst_data = class_insts[0]
+
+ self.assertEqual(inst_data[INSTANCE_ID_KEY], inst_id)
+ self.assertGreaterEqual(inst_data[MODIFIED_KEY], start_time)
+ self.assertLessEqual(inst_data[MODIFIED_KEY], end_time)
+ self.assertLessEqual(inst_data[CREATED_KEY], inst_data[MODIFIED_KEY])
+
+ inst_attributes = inst_data[ATTRIBUTES_KEY]
+ self.assertEqual(len(inst_attributes), 1)
+
+ self.assertTrue('vendor_id' in inst_attributes)
+ self.assertEqual(inst_attributes['vendor_id'], attributes['vendor_id'])
+
+ ########################################
+ # Query with device and class. Should be same as from full device query
+ cls_2_data = self.db.query(_DEVICE_ID, class_id)
+
+ self.assertEqual(class_data[CLASS_ID_KEY], cls_2_data[CLASS_ID_KEY])
+
+ cl2_insts = {k:v for k, v in cls_2_data.items() if isinstance(k, int)}
+ self.assertEqual(len(cl2_insts), len(class_insts))
+
+ # Bad class id query
+ cls_no_data = self.db.query(_DEVICE_ID, class_id + 1)
+ self.assertTrue(isinstance(cls_no_data, dict))
+ self.assertEqual(len(cls_no_data), 0)
+
+ ########################################
+ # Query with device, class, instance
+ inst_2_data = self.db.query(_DEVICE_ID, class_id, inst_id)
+
+ self.assertEqual(inst_data[INSTANCE_ID_KEY], inst_2_data[INSTANCE_ID_KEY])
+ self.assertEqual(inst_data[MODIFIED_KEY], inst_2_data[MODIFIED_KEY])
+ self.assertEqual(inst_data[CREATED_KEY], inst_2_data[CREATED_KEY])
+
+ inst2_attr = inst_2_data[ATTRIBUTES_KEY]
+ self.assertEqual(len(inst2_attr), len(inst_attributes))
+
+ # Bad instance id query
+ inst_no_data = self.db.query(_DEVICE_ID, class_id, inst_id + 100)
+ self.assertTrue(isinstance(inst_no_data, dict))
+ self.assertEqual(len(inst_no_data), 0)
+
+ ########################################
+ # Attribute queries
+ attr_2_data = self.db.query(_DEVICE_ID, class_id, inst_id, 'vendor_id')
+ self.assertEqual(attr_2_data['vendor_id'], attributes['vendor_id'])
+
+ attr_3_data = self.db.query(_DEVICE_ID, class_id, inst_id, ['vendor_id'])
+ self.assertEqual(attr_3_data['vendor_id'], attributes['vendor_id'])
+
+ attr_4_data = self.db.query(_DEVICE_ID, class_id, inst_id, {'vendor_id'})
+ self.assertEqual(attr_4_data['vendor_id'], attributes['vendor_id'])
+
+ attr_no_data = self.db.query(_DEVICE_ID, class_id, inst_id, 'no_such_thing')
+ self.assertTrue(isinstance(attr_no_data, dict))
+ self.assertEqual(len(attr_no_data), 0)
+
+ # Set to same value does not change modified data. The modified is
+ # at the instance level
+
+ class_id = OntG.class_id
+ inst_id = 0
+ attributes = {'vendor_id': 'ABCD'}
+ set_occurred = self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.assertFalse(set_occurred)
+
+ inst_3_data = self.db.query(_DEVICE_ID, class_id, inst_id)
+ self.assertEqual(inst_data[MODIFIED_KEY], inst_3_data[MODIFIED_KEY])
+ self.assertEqual(inst_data[CREATED_KEY], inst_3_data[CREATED_KEY])
+
+ # But set to new value does
+ time.sleep(0.1)
+ attributes = {'vendor_id': 'WXYZ'}
+ set_occurred = self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ self.assertTrue(set_occurred)
+
+ inst_4_data = self.db.query(_DEVICE_ID, class_id, inst_id)
+ self.assertLess(inst_3_data[MODIFIED_KEY], inst_4_data[MODIFIED_KEY])
+ self.assertEqual(inst_3_data[CREATED_KEY], inst_4_data[CREATED_KEY])
+
+ def test_delete_instances(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+ create_time = datetime.utcnow()
+
+ class_id = GalEthernetProfile.class_id
+ inst_id_1 = 0x100
+ inst_id_2 = 0x200
+ attributes = {'max_gem_payload_size': 1500}
+
+ self.db.set(_DEVICE_ID, class_id, inst_id_1, attributes)
+ self.db.set(_DEVICE_ID, class_id, inst_id_2, attributes)
+ time.sleep(0.1)
+
+ dev_data = self.db.query(_DEVICE_ID)
+ cls_data = self.db.query(_DEVICE_ID, class_id)
+ inst_data = {k: v for k, v in cls_data.items() if isinstance(k, int)}
+ self.assertEqual(len(inst_data), 2)
+
+ self.assertLessEqual(dev_data[CREATED_KEY], create_time)
+ self.assertLessEqual(self.db.created, create_time)
+
+ # Delete one instance
+ time.sleep(0.1)
+ result = self.db.delete(_DEVICE_ID, class_id, inst_id_1)
+ self.assertTrue(result) # True returned if a del actually happened
+
+ dev_data = self.db.query(_DEVICE_ID)
+ cls_data = self.db.query(_DEVICE_ID, class_id)
+ inst_data = {k: v for k, v in cls_data.items() if isinstance(k, int)}
+ self.assertEqual(len(inst_data), 1)
+
+ self.assertLessEqual(dev_data[CREATED_KEY], create_time)
+ self.assertLessEqual(self.db.created, create_time)
+
+ # Delete remaining instance
+ time.sleep(0.1)
+ result = self.db.delete(_DEVICE_ID, class_id, inst_id_2)
+ self.assertTrue(result) # True returned if a del actually happened
+
+ dev_data = self.db.query(_DEVICE_ID)
+ cls_data = {k: v for k, v in dev_data.items() if isinstance(k, int)}
+ self.assertEqual(len(cls_data), 0)
+ self.assertLessEqual(dev_data[CREATED_KEY], create_time)
+
+ # Delete returns false if not instance
+ self.assertFalse(self.db.delete(_DEVICE_ID, class_id, inst_id_1))
+ self.assertFalse(self.db.delete(_DEVICE_ID, class_id, inst_id_2))
+
+ def test_on_mib_reset_listener(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+ time.sleep(0.1)
+
+ class_id = OntG.class_id
+ inst_id = 0
+ attributes = {'vendor_id': 'ABCD'}
+
+ set_time = datetime.utcnow()
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+
+ time.sleep(0.1)
+ self.db.on_mib_reset(_DEVICE_ID)
+
+ dev_data = self.db.query(_DEVICE_ID)
+ self.assertEqual(dev_data[DEVICE_ID_KEY], _DEVICE_ID)
+ self.assertLessEqual(dev_data[CREATED_KEY], set_time)
+ self.assertLessEqual(self.db.created, set_time)
+
+ self.assertFalse(any(isinstance(cls, int) for cls in dev_data.iterkeys()))
+
+ def test_str_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = OltG.class_id
+ inst_id = 0
+ attributes = {
+ 'olt_vendor_id': 'ABCD', # StrFixedLenField(4)
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], basestring) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+ def test_object_as_a_str_field_serialization(self):
+ # Some entity classes such as ExtendedVlanTaggingOperationConfigurationData
+ # (class-id = 171) have a Scapy Packet derived object (VlanTaggingOperation)
+ # that in the parent object is a StrFixedLenField.
+ # These classes should encode into a JSON string. Test this serialization
+ # as well as deserialization
+ pass # TODO: More tests here
+
+ def test_mac_address_ip_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = IpHostConfigData.class_id
+ inst_id = 0
+ attributes = {
+ 'mac_address': '00:01:02:03:04:05', # MACField
+ 'ip_address': '1.2.3.4', # IPField
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], basestring) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+ def test_byte_and_short_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = UniG.class_id
+ inst_id = 0
+ attributes = {
+ 'administrative_state': int(1), # ByteField
+ 'non_omci_management_identifier': int(12345) # IPField
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], type(attributes[k])) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+ def test_int_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'related_port': int(1234567) # IntField
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], type(attributes[k])) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+ def test_long_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = PriorityQueueG.class_id
+ inst_id = 0
+ attributes = {
+ 'packet_drop_queue_thresholds': int(0x1234) # LongField
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], type(attributes[k])) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+ def test_bit_field_serialization(self):
+ self.db.start()
+ self.db.add(_DEVICE_ID)
+
+ class_id = OntG.class_id
+ inst_id = 0
+ attributes = {
+ 'extended_tc_layer_options': long(0x1234), # BitField(16)
+ }
+ self.db.set(_DEVICE_ID, class_id, inst_id, attributes)
+ data = self.db.query(_DEVICE_ID, class_id, inst_id, attributes.keys())
+ self.assertTrue(all(isinstance(data[k], type(attributes[k])) for k in attributes.keys()))
+ self.assertTrue(all(data[k] == attributes[k] for k in attributes.keys()))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
index b82bb99..78d45f9 100644
--- a/voltha/core/config/config_rev_persisted.py
+++ b/voltha/core/config/config_rev_persisted.py
@@ -137,5 +137,6 @@
bbf_fiber_traffic_descriptor_profile_body_pb2, \
bbf_fiber_tcont_body_pb2, bbf_fiber_gemport_body_pb2, \
bbf_fiber_multicast_gemport_body_pb2, \
- bbf_fiber_multicast_distribution_set_body_pb2
+ bbf_fiber_multicast_distribution_set_body_pb2, \
+ omci_mib_db_pb2
return getattr(locals()[module_name], cls_name)
diff --git a/voltha/extensions/omci/database/mib_db_api.py b/voltha/extensions/omci/database/mib_db_api.py
index c6cc325..38f5061 100644
--- a/voltha/extensions/omci/database/mib_db_api.py
+++ b/voltha/extensions/omci/database/mib_db_api.py
@@ -26,6 +26,10 @@
MDS_KEY = 'mib_data_sync'
LAST_SYNC_KEY = 'last_mib_sync'
VERSION_KEY = 'version'
+DEVICE_ID_KEY = 'device_id'
+CLASS_ID_KEY = 'class_id'
+INSTANCE_ID_KEY = 'instance_id'
+ATTRIBUTES_KEY = 'attributes'
class DatabaseStateError(Exception):
@@ -108,7 +112,7 @@
:param device_id: (str) Device ID of ONU to add
:param overwrite: (bool) Overwrite existing entry if found.
- :raises KeyError: If device does not exist and 'overwrite' is False
+ :raises KeyError: If device already exists and 'overwrite' is False
"""
raise NotImplementedError('Implement this in your derive class')
@@ -164,7 +168,7 @@
:param device_id: (str) ONU Device ID
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
- :param attributes: (list or str) Managed Entity instance's attributes
+ :param attributes: (list/set or str) Managed Entity instance's attributes
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
diff --git a/voltha/extensions/omci/database/mib_db_dict.py b/voltha/extensions/omci/database/mib_db_dict.py
index b74294e..d0cf98d 100644
--- a/voltha/extensions/omci/database/mib_db_dict.py
+++ b/voltha/extensions/omci/database/mib_db_dict.py
@@ -61,7 +61,7 @@
:param device_id: (str) Device ID of ONU to add
:param overwrite: (bool) Overwrite existing entry if found.
- :raises KeyError: If device does not exist and 'overwrite' is False
+ :raises KeyError: If device already exist and 'overwrite' is False
"""
self.log.debug('add-device', device_id=device_id, overwrite=overwrite)
@@ -77,6 +77,7 @@
now = datetime.utcnow()
self._data[device_id] = {
+ DEVICE_ID_KEY: device_id,
CREATED_KEY: now,
MODIFIED_KEY: now,
MDS_KEY: 0,
@@ -129,6 +130,7 @@
LAST_SYNC_KEY: last_sync
}
+ device_db[DEVICE_ID_KEY] = device_id
device_db[MODIFIED_KEY] = now
device_db[MDS_KEY] = 0
device_db[VERSION_KEY] = MibDbVolatileDict.CURRENT_VERSION
@@ -183,14 +185,14 @@
return self._data[device_id].get(LAST_SYNC_KEY)
- def set(self, device_id, class_id, entity_id, attributes):
+ def set(self, device_id, class_id, instance_id, attributes):
"""
Set a database value. This should only be called by the MIB synchronizer
and its related tasks
:param device_id: (str) ONU Device ID
:param class_id: (int) ME Class ID
- :param entity_id: (int) ME Entity ID
+ :param instance_id: (int) ME Entity ID
:param attributes: (dict) Attribute dictionary
:returns: (bool) True if the value was saved to the database. False if the
@@ -209,19 +211,22 @@
if class_db is None:
device_db[class_id] = {
+ CLASS_ID_KEY: class_id,
CREATED_KEY: now,
MODIFIED_KEY: now
}
class_db = device_db[class_id]
self._modified = now
- instance_db = class_db.get(entity_id)
+ instance_db = class_db.get(instance_id)
if instance_db is None:
- class_db[entity_id] = {
+ class_db[instance_id] = {
+ INSTANCE_ID_KEY: instance_id,
CREATED_KEY: now,
- MODIFIED_KEY: now
+ MODIFIED_KEY: now,
+ ATTRIBUTES_KEY: dict()
}
- instance_db = class_db[entity_id]
+ instance_db = class_db[instance_id]
class_db[MODIFIED_KEY] = now
device_db[MODIFIED_KEY] = now
self._modified = now
@@ -239,13 +244,14 @@
format(attribute, type(db_value), type(value))
if db_value is None or db_value != value:
- instance_db[attribute] = value
+ instance_db[ATTRIBUTES_KEY][attribute] = value
changed = True
- instance_db[MODIFIED_KEY] = now
- class_db[MODIFIED_KEY] = now
- device_db[MODIFIED_KEY] = now
- self._modified = now
+ if changed:
+ instance_db[MODIFIED_KEY] = now
+ class_db[MODIFIED_KEY] = now
+ device_db[MODIFIED_KEY] = now
+ self._modified = now
return changed
@@ -253,14 +259,14 @@
self.log.error('set-failure', e=e)
raise
- def delete(self, device_id, class_id, entity_id):
+ def delete(self, device_id, class_id, instance_id):
"""
Delete an entity from the database if it exists. If all instances
of a class are deleted, the class is deleted as well.
:param device_id: (str) ONU Device ID
:param class_id: (int) ME Class ID
- :param entity_id: (int) ME Entity ID
+ :param instance_id: (int) ME Entity ID
:returns: (bool) True if the instance was found and deleted. False
if it did not exist.
@@ -278,17 +284,18 @@
if class_db is None:
return False
- instance_db = class_db.get(entity_id)
+ instance_db = class_db.get(instance_id)
if instance_db is None:
return False
now = datetime.utcnow()
- del class_db[entity_id]
+ del class_db[instance_id]
if len(class_db) == len([CREATED_KEY, MODIFIED_KEY]):
del device_db[class_id]
else:
class_db[MODIFIED_KEY] = now
+
device_db[MODIFIED_KEY] = now
self._modified = now
@@ -308,7 +315,7 @@
:param device_id: (str) ONU Device ID
:param class_id: (int) Managed Entity class ID
:param instance_id: (int) Managed Entity instance
- :param attributes: (list or str) Managed Entity instance's attributes
+ :param attributes: (list/set or str) Managed Entity instance's attributes
:return: (dict) The value(s) requested. If class/inst/attribute is
not found, an empty dictionary is returned
@@ -342,11 +349,11 @@
if attributes is None or len(instance_db) == 0:
return instance_db # TODO: copy.deepcopy(instance_db)
- if not isinstance(attributes, (basestring, list)):
- raise TypeError('Attributes should be a string or list of strings')
+ if not isinstance(attributes, (basestring, list, set)):
+ raise TypeError('Attributes should be a string or list/set of strings')
- if not isinstance(attributes, list):
+ if not isinstance(attributes, (list, set)):
attributes = [attributes]
- return {attr: val for attr, val in instance_db.iteritems()
+ return {attr: val for attr, val in instance_db[ATTRIBUTES_KEY].iteritems()
if attr in attributes}
diff --git a/voltha/extensions/omci/database/mib_db_ext.py b/voltha/extensions/omci/database/mib_db_ext.py
new file mode 100644
index 0000000..f93f14b
--- /dev/null
+++ b/voltha/extensions/omci/database/mib_db_ext.py
@@ -0,0 +1,818 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from datetime import datetime
+from mib_db_api import *
+from voltha.protos.omci_mib_db_pb2 import MibInstanceData, MibClassData, \
+ MibDeviceData, MibAttributeData
+from voltha.extensions.omci.omci_entities import *
+from scapy.fields import StrField
+
+
+class MibDbExternal(MibDbApi):
+ """
+ A persistent external OpenOMCI MIB Database
+ """
+ CURRENT_VERSION = 1 # VOLTHA v1.3.0 release
+
+ _TIME_FORMAT = '%Y%m%d-%H%M%S.%f'
+
+ # Paths from root proxy
+ MIB_PATH = '/omci_mibs'
+ DEVICE_PATH = MIB_PATH + '/{}' # .format(device_id)
+
+ # Classes, Instances, and Attributes as lists from root proxy
+ CLASSES_PATH = DEVICE_PATH + '/classes' # .format(device_id)
+ INSTANCES_PATH = DEVICE_PATH +'/classes/{}/instances' # .format(device_id, class_id)
+ ATTRIBUTES_PATH = DEVICE_PATH + '/classes/{}/instances/{}/attributes' # .format(device_id, class_id, instance_id)
+
+ # Single Class, Instance, and Attribute as objects from device proxy
+ CLASS_PATH = '/classes/{}' # .format(class_id)
+ INSTANCE_PATH = '/classes/{}/instances/{}' # .format(class_id, instance_id)
+ ATTRIBUTE_PATH = '/classes/{}/instances/{}/attributes/{}' # .format(class_id, instance_id
+ # attribute_name)
+
+ def __init__(self, omci_agent):
+ """
+ Class initializer
+ :param omci_agent: (OpenOMCIAgent) OpenOMCI Agent
+ """
+ super(MibDbExternal, self).__init__(omci_agent)
+ self._core = omci_agent.core
+
+ def start(self):
+ """
+ Start up/restore the database
+ """
+ self.log.debug('start')
+
+ if not self._started:
+ super(MibDbExternal, self).start()
+ root_proxy = self._core.get_proxy('/')
+
+ try:
+ base = root_proxy.get(MibDbExternal.MIB_PATH)
+ self.log.info('db-exists', num_devices=len(base))
+
+ except Exception as e:
+ self.log.exception('start-failure', e=e)
+ raise
+
+ def stop(self):
+ """
+ Start up the database
+ """
+ self.log.debug('stop')
+
+ if self._started:
+ super(MibDbExternal, self).stop()
+ # TODO: Delete this method if nothing else is done except calling the base class
+
+ def _time_to_string(self, time):
+ return time.strftime(MibDbExternal._TIME_FORMAT) if time is not None else ''
+
+ def _string_to_time(self, time):
+ return datetime.strptime(time, MibDbExternal._TIME_FORMAT) if len(time) else None
+
+ def _attribute_to_string(self, device_id, class_id, attr_name, value):
+ """
+ Convert an ME's attribute value to string representation
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param attr_name: (str) Attribute Name (see EntityClasses)
+ :param value: (various) Attribute Value
+
+ :return: (str) String representation of the value
+ :raises KeyError: Device, Class ID, or Attribute does not exist
+ """
+ try:
+ me_map = self._omci_agent.get_device(device_id).me_map
+ entity = me_map[class_id]
+ attr_index = entity.attribute_name_to_index_map[attr_name]
+ eca = entity.attributes[attr_index]
+ field = eca.field
+
+ if isinstance(field, (StrField, MACField, IPField)):
+ # For StrField, value is an str already (or possibly JSON encoded object)
+ # For MACField, value is a string in ':' delimited form
+ # For IPField, value is a string in '.' delimited form
+ if hasattr(value, 'to_json'):
+ str_value = value.to_json()
+ else:
+ str_value = str(value)
+
+ elif isinstance(field, (ByteField, ShortField, IntField, LongField)):
+ # For ByteField, ShortField, IntField, and LongField value is an int
+ str_value = str(value)
+
+ elif isinstance(field, BitField):
+ # For BitField, value is a long
+ #
+ str_value = str(value)
+
+ else:
+ self.log.warning('default-conversion', type=type(field),
+ class_id=class_id, attribute=attr_name, value=str(value))
+ str_value = str(value)
+
+ return str_value
+
+ except Exception as e:
+ self.log.exception('attr-to-string', device_id=device_id,
+ class_id=class_id, attr=attr_name,
+ value=value, e=e)
+ raise
+
+ def _string_to_attribute(self, device_id, class_id, attr_name, str_value):
+ """
+ Convert an ME's attribute value-string to its Scapy decode equivalent
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param attr_name: (str) Attribute Name (see EntityClasses)
+ :param str_value: (str) Attribute Value in string form
+
+ :return: (various) String representation of the value
+ :raises KeyError: Device, Class ID, or Attribute does not exist
+ """
+ try:
+ me_map = self._omci_agent.get_device(device_id).me_map
+ entity = me_map[class_id]
+ attr_index = entity.attribute_name_to_index_map[attr_name]
+ eca = entity.attributes[attr_index]
+ field = eca.field
+
+ if isinstance(field, StrFixedLenField):
+ value = str_value
+
+ elif isinstance(field, MACField):
+ value = str_value
+
+ elif isinstance(field, IPField):
+ value = str_value
+
+ elif isinstance(field, (ByteField, ShortField, IntField, LongField)):
+ value = int(str_value)
+
+ elif isinstance(field, BitField):
+ value = long(str_value)
+
+ else:
+ self.log.warning('default-conversion', type=type(field),
+ class_id=class_id, attribute=attr_name, value=str_value)
+ value = None
+
+ return value
+
+ except Exception as e:
+ self.log.exception('attr-to-string', device_id=device_id,
+ class_id=class_id, attr=attr_name,
+ value=str_value, e=e)
+ raise
+
+ def add(self, device_id, overwrite=False):
+ """
+ Add a new ONU to database
+
+ :param device_id: (str) Device ID of ONU to add
+ :param overwrite: (bool) Overwrite existing entry if found.
+
+ :raises KeyError: If device already exists and 'overwrite' is False
+ """
+ self.log.debug('add-device', device_id=device_id, overwrite=overwrite)
+
+ now = datetime.utcnow()
+ found = False
+ root_proxy = self._core.get_proxy('/')
+ data = MibDeviceData(device_id=device_id,
+ created=self._time_to_string(now),
+ last_sync_time='',
+ mib_data_sync=0,
+ version=MibDbExternal.CURRENT_VERSION)
+ try:
+ dev_proxy = self._device_proxy(device_id)
+ found = True
+
+ if not overwrite:
+ # Device already exists
+ raise KeyError('Device with ID {} already exists in MIB database'.
+ format(device_id))
+
+ # Overwrite with new data
+ data = dev_proxy.get('/', depth=0)
+ self._root_proxy.update(MibDbExternal.DEVICE_PATH.format(device_id), data)
+ self._modified = now
+
+ except KeyError:
+ if found:
+ raise
+ # Did not exist, add it now
+ root_proxy.add(MibDbExternal.MIB_PATH, data)
+ self._created = now
+ self._modified = now
+
+ def remove(self, device_id):
+ """
+ Remove an ONU from the database
+
+ :param device_id: (str) Device ID of ONU to remove from database
+ """
+ self.log.debug('remove-device', device_id=device_id)
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ try:
+ # self._root_proxy.get(MibDbExternal.DEVICE_PATH.format(device_id))
+ self._root_proxy.remove(MibDbExternal.DEVICE_PATH.format(device_id))
+ self._modified = datetime.utcnow()
+
+ except KeyError:
+ # Did not exists, which is not a failure
+ pass
+
+ except Exception as e:
+ self.log.exception('remove-exception', device_id=device_id, e=e)
+ raise
+
+ @property
+ def _root_proxy(self):
+ return self._core.get_proxy('/')
+
+ def _device_proxy(self, device_id):
+ """
+ Return a config proxy to the OMCI MIB_DB leaf for a given device
+
+ :param device_id: (str) ONU Device ID
+ :return: (ConfigProxy) Configuration proxy rooted at OMCI MIB DB
+ :raises KeyError: If the device does not exist in the database
+ """
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ return self._core.get_proxy(MibDbExternal.DEVICE_PATH.format(device_id))
+
+ def _class_proxy(self, device_id, class_id, create=False):
+ """
+ Get a config proxy to a specific managed entity class
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param create: (bool) If true, create default instance (and class)
+ :return: (ConfigProxy) Class configuration proxy
+
+ :raises DatabaseStateError: If database is not started
+ :raises KeyError: If Instance does not exist and 'create' is False
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ fmt = MibDbExternal.DEVICE_PATH + MibDbExternal.CLASS_PATH
+ path = fmt.format(device_id, class_id)
+
+ try:
+ return self._core.get_proxy(path)
+
+ except KeyError:
+ if not create:
+ self.log.error('class-proxy-does-not-exist', device_id=device_id,
+ class_id=class_id)
+ raise
+
+ # Create class
+ data = MibClassData(class_id=class_id)
+ root_path = MibDbExternal.CLASSES_PATH.format(device_id)
+ self._root_proxy.add(root_path, data)
+
+ return self._core.get_proxy(path)
+
+ def _instance_proxy(self, device_id, class_id, instance_id, create=False):
+ """
+ Get a config proxy to a specific managed entity instance
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Class ID
+ :param instance_id: (int) Instance ID
+ :param create: (bool) If true, create default instance (and class)
+ :return: (ConfigProxy) Instance configuration proxy
+
+ :raises DatabaseStateError: If database is not started
+ :raises KeyError: If Instance does not exist and 'create' is False
+ """
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID is a string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ if not 0 <= instance_id <= 0xFFFF:
+ raise ValueError('instance-id is 0..0xFFFF')
+
+ fmt = MibDbExternal.DEVICE_PATH + MibDbExternal.INSTANCE_PATH
+ path = fmt.format(device_id, class_id, instance_id)
+
+ try:
+ return self._core.get_proxy(path)
+
+ except KeyError:
+ if not create:
+ self.log.error('instance-proxy-does-not-exist', device_id=device_id,
+ class_id=class_id, instance_id=instance_id)
+ raise
+
+ # Create instance, first make sure class exists
+ self._class_proxy(device_id, class_id, create=True)
+
+ now = self._time_to_string(datetime.utcnow())
+ data = MibInstanceData(instance_id=instance_id, created=now, modified=now)
+ root_path = MibDbExternal.INSTANCES_PATH.format(device_id, class_id)
+ self._root_proxy.add(root_path, data)
+
+ return self._core.get_proxy(path)
+
+ def on_mib_reset(self, device_id):
+ """
+ Reset/clear the database for a specific Device
+
+ :param device_id: (str) ONU Device ID
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('on-mib-reset', device_id=device_id)
+
+ try:
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=2)
+
+ # Wipe out any existing class IDs
+ class_ids = [c.class_id for c in data.classes]
+
+ if len(class_ids):
+ for class_id in class_ids:
+ device_proxy.remove(MibDbExternal.CLASS_PATH.format(class_id))
+
+ # Reset MIB Data Sync to zero
+ now = datetime.utcnow()
+ data = MibDeviceData(device_id=device_id,
+ created=data.created,
+ last_sync_time=data.last_sync_time,
+ mib_data_sync=0,
+ version=MibDbExternal.CURRENT_VERSION)
+ # Update
+ self._root_proxy.update(MibDbExternal.DEVICE_PATH.format(device_id),
+ data)
+ self._modified = now
+ self.log.debug('mib-reset-complete', device_id=device_id)
+
+ except Exception as e:
+ self.log.exception('mib-reset-exception', device_id=device_id, e=e)
+ raise
+
+ def save_mib_data_sync(self, device_id, value):
+ """
+ Save the MIB Data Sync to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (int) Value to save
+ """
+ self.log.debug('save-mds', device_id=device_id, value=value)
+
+ try:
+ if not isinstance(value, int):
+ raise TypeError('MIB Data Sync is an integer')
+
+ if not 0 <= value <= 255:
+ raise ValueError('Invalid MIB-data-sync value {}. Must be 0..255'.
+ format(value))
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+
+ now = datetime.utcnow()
+ data.mib_data_sync = value
+
+ # Update
+ self._root_proxy.update(MibDbExternal.DEVICE_PATH.format(device_id),
+ data)
+ self._modified = now
+ self.log.debug('save-mds-complete', device_id=device_id)
+
+ except Exception as e:
+ self.log.exception('save-mds-exception', device_id=device_id, e=e)
+ raise
+
+ def get_mib_data_sync(self, device_id):
+ """
+ Get the MIB Data Sync value last saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ self.log.debug('get-mds', device_id=device_id)
+
+ try:
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+ return int(data.mib_data_sync)
+
+ except KeyError:
+ return None # OMCI MIB_DB entry has not yet been created
+
+ except Exception as e:
+ self.log.exception('get-mds-exception', device_id=device_id, e=e)
+ raise
+
+ def save_last_sync(self, device_id, value):
+ """
+ Save the Last Sync time to the database in an easy location to access
+
+ :param device_id: (str) ONU Device ID
+ :param value: (DateTime) Value to save
+ """
+ self.log.debug('save-last-sync', device_id=device_id, time=str(value))
+
+ try:
+ if not isinstance(value, datetime):
+ raise TypeError('Expected a datetime object, got {}'.
+ format(type(datetime)))
+
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+
+ now = datetime.utcnow()
+ data.last_sync_time = self._time_to_string(value)
+
+ # Update
+ self._root_proxy.update(MibDbExternal.DEVICE_PATH.format(device_id),
+ data)
+ self._modified = now
+ self.log.debug('save-mds-complete', device_id=device_id)
+
+ except Exception as e:
+ self.log.exception('save-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def get_last_sync(self, device_id):
+ """
+ Get the Last Sync Time saved to the database for a device
+
+ :param device_id: (str) ONU Device ID
+ :return: (int) The Value or None if not found
+ """
+ self.log.debug('get-last-sync', device_id=device_id)
+
+ try:
+ device_proxy = self._device_proxy(device_id)
+ data = device_proxy.get(depth=0)
+ return self._string_to_time(data.last_sync_time)
+
+ except KeyError:
+ return None # OMCI MIB_DB entry has not yet been created
+
+ except Exception as e:
+ self.log.exception('get-last-sync-exception', e=e)
+ raise
+
+ def _add_new_class(self, device_id, class_id, instance_id, attributes):
+ """
+ Create an entry for a new class in the external database
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+ """
+ self.log.debug('add', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ now = self._time_to_string(datetime.utcnow())
+ attrs = [MibAttributeData(name=k,
+ value=self._attribute_to_string(device_id,
+ class_id,
+ k,
+ v)) for k, v in attributes.items()]
+ class_data = MibClassData(class_id=class_id,
+ instances=[MibInstanceData(instance_id=instance_id,
+ created=now,
+ modified=now,
+ attributes=attrs)])
+
+ self._root_proxy.add(MibDbExternal.CLASSES_PATH.format(device_id), class_data)
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes)
+ return True
+
+ def _add_new_instance(self, device_id, class_id, instance_id, attributes):
+ """
+ Create an entry for a instance of an existing class in the external database
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+ """
+ self.log.debug('add', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+
+ now = self._time_to_string(datetime.utcnow())
+ attrs = [MibAttributeData(name=k,
+ value=self._attribute_to_string(device_id,
+ class_id,
+ k,
+ v)) for k, v in attributes.items()]
+ instance_data = MibInstanceData(instance_id=instance_id,
+ created=now,
+ modified=now,
+ attributes=attrs)
+
+ self._root_proxy.add(MibDbExternal.INSTANCES_PATH.format(device_id, class_id),
+ instance_data)
+
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes)
+ return True
+
+ def set(self, device_id, class_id, instance_id, attributes):
+ """
+ Set a database value. This should only be called by the MIB synchronizer
+ and its related tasks
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) ME Entity ID
+ :param attributes: (dict) Attribute dictionary
+
+ :returns: (bool) True if the value was saved to the database. False if the
+ value was identical to the current instance
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('set', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+ try:
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be a string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError("Invalid Class ID: {}, should be 0..65535".format(class_id))
+
+ if not 0 <= instance_id <= 0xFFFF:
+ raise ValueError("Invalid Instance ID: {}, should be 0..65535".format(instance_id))
+
+ if not isinstance(attributes, dict):
+ raise TypeError("Attributes should be a dictionary")
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ # Determine the best strategy to add the information
+ dev_proxy = self._device_proxy(device_id)
+
+ try:
+ class_data = dev_proxy.get(MibDbExternal.CLASS_PATH.format(class_id), deep=True)
+
+ inst_data = next((inst for inst in class_data.instances
+ if inst.instance_id == instance_id), None)
+
+ if inst_data is None:
+ return self._add_new_instance(device_id, class_id, instance_id, attributes)
+
+ # Possibly adding to or updating an existing instance
+ # Get instance proxy, creating it if needed
+
+ exist_attr_indexes = dict()
+ attr_len = len(inst_data.attributes)
+
+ for index in xrange(0, attr_len):
+ exist_attr_indexes[inst_data.attributes[index].name] = index
+
+ modified = False
+ new_attributes = []
+
+ for k, v in attributes.items():
+ str_value = self._attribute_to_string(device_id, class_id, k, v)
+ new_attributes.append(MibAttributeData(name=k, value=str_value))
+
+ if k not in exist_attr_indexes or \
+ inst_data.attributes[exist_attr_indexes[k]].value != str_value:
+ modified = True
+
+ if modified:
+ now = datetime.utcnow()
+ new_data = MibInstanceData(instance_id=instance_id,
+ created=inst_data.created,
+ modified=self._time_to_string(now),
+ attributes=new_attributes)
+ dev_proxy.remove(MibDbExternal.INSTANCE_PATH.format(class_id, instance_id))
+ self._root_proxy.add(MibDbExternal.INSTANCES_PATH.format(device_id,
+ class_id), new_data)
+
+ self.log.debug('set-complete', device_id=device_id, class_id=class_id,
+ entity_id=instance_id, attributes=attributes, modified=modified)
+
+ return modified
+
+ except KeyError:
+ # Here if the class-id does not yet exist in the database
+ return self._add_new_class(device_id, class_id, instance_id,
+ attributes)
+ except Exception as e:
+ self.log.exception('set-exception', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes, e=e)
+ raise
+
+ def delete(self, device_id, class_id, entity_id):
+ """
+ Delete an entity from the database if it exists. If all instances
+ of a class are deleted, the class is deleted as well.
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Entity ID
+
+ :returns: (bool) True if the instance was found and deleted. False
+ if it did not exist.
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('delete', device_id=device_id, class_id=class_id,
+ entity_id=entity_id)
+
+ if not self._started:
+ raise DatabaseStateError('The Database is not currently active')
+
+ if not isinstance(device_id, basestring):
+ raise TypeError('Device ID should be an string')
+
+ if not 0 <= class_id <= 0xFFFF:
+ raise ValueError('class-id is 0..0xFFFF')
+
+ if not 0 <= entity_id <= 0xFFFF:
+ raise ValueError('instance-id is 0..0xFFFF')
+
+ try:
+ # Remove instance
+ self._instance_proxy(device_id, class_id, entity_id).remove('/')
+ now = datetime.utcnow()
+
+ # If resulting class has no instance, remove it as well
+ class_proxy = self._class_proxy(device_id, class_id)
+ class_data = class_proxy.get('/', depth=1)
+
+ if len(class_data.instances) == 0:
+ class_proxy.remove('/')
+
+ self._modified = now
+ return True
+
+ except KeyError:
+ return False # Not found
+
+ except Exception as e:
+ self.log.exception('get-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def query(self, device_id, class_id=None, instance_id=None, attributes=None):
+ """
+ Get database information.
+
+ This method can be used to request information from the database to the detailed
+ level requested
+
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) Managed Entity class ID
+ :param instance_id: (int) Managed Entity instance
+ :param attributes: (list/set or str) Managed Entity instance's attributes
+
+ :return: (dict) The value(s) requested. If class/inst/attribute is
+ not found, an empty dictionary is returned
+ :raises KeyError: If the requested device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self.log.debug('query', device_id=device_id, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
+ try:
+ if class_id is None:
+ # Get full device info
+ dev_data = self._device_proxy(device_id).get('/', depth=-1)
+ data = self._device_to_dict(dev_data)
+
+ elif instance_id is None:
+ # Get all instances of the class
+ try:
+ cls_data = self._class_proxy(device_id, class_id).get('/', depth=-1)
+ data = self._class_to_dict(device_id, cls_data)
+
+ except KeyError:
+ data = dict()
+
+ else:
+ # Get all attributes of a specific ME
+ try:
+ inst_data = self._instance_proxy(device_id, class_id, instance_id).\
+ get('/', depth=-1)
+
+ if attributes is None:
+ # All Attributes
+ data = self._instance_to_dict(device_id, class_id, inst_data)
+
+ else:
+ # Specific attribute(s)
+
+ if isinstance(attributes, basestring):
+ attributes = {attributes}
+
+ data = {
+ attr.name: self._string_to_attribute(device_id,
+ class_id,
+ attr.name,
+ attr.value)
+ for attr in inst_data.attributes if attr.name in attributes}
+
+ except KeyError:
+ data = dict()
+
+ return data
+
+ except KeyError:
+ self.log.warn('query-no-device', device_id=device_id)
+ raise
+
+ except Exception as e:
+ self.log.exception('get-last-sync-exception', device_id=device_id, e=e)
+ raise
+
+ def _instance_to_dict(self, device_id, class_id, instance):
+ if not isinstance(instance, MibInstanceData):
+ raise TypeError('{} is not of type MibInstanceData'.format(type(instance)))
+
+ data = {
+ INSTANCE_ID_KEY: instance.instance_id,
+ CREATED_KEY: self._string_to_time(instance.created),
+ MODIFIED_KEY: self._string_to_time(instance.modified),
+ ATTRIBUTES_KEY: dict()
+ }
+ for attribute in instance.attributes:
+ data[ATTRIBUTES_KEY][attribute.name] = self._string_to_attribute(device_id,
+ class_id,
+ attribute.name,
+ attribute.value)
+ return data
+
+ def _class_to_dict(self, device_id, val):
+ if not isinstance(val, MibClassData):
+ raise TypeError('{} is not of type MibClassData'.format(type(val)))
+
+ data = {
+ CLASS_ID_KEY: val.class_id,
+ }
+ for instance in val.instances:
+ data[instance.instance_id] = self._instance_to_dict(device_id,
+ val.class_id,
+ instance)
+ return data
+
+ def _device_to_dict(self, val):
+ if not isinstance(val, MibDeviceData):
+ raise TypeError('{} is not of type MibDeviceData'.format(type(val)))
+
+ data = {
+ DEVICE_ID_KEY: val.device_id,
+ CREATED_KEY: self._string_to_time(val.created),
+ LAST_SYNC_KEY: self._string_to_time(val.last_sync_time),
+ MDS_KEY: val.mib_data_sync,
+ VERSION_KEY: val.version
+ }
+ for class_data in val.classes:
+ data[class_data.class_id] = self._class_to_dict(val.device_id,
+ class_data)
+ return data
diff --git a/voltha/extensions/omci/omci_entities.py b/voltha/extensions/omci/omci_entities.py
index d30f7b2..c39335d 100644
--- a/voltha/extensions/omci/omci_entities.py
+++ b/voltha/extensions/omci/omci_entities.py
@@ -17,6 +17,7 @@
import sys
from binascii import hexlify
+import json
from scapy.fields import ByteField, ShortField, MACField, BitField, IPField
from scapy.fields import IntField, StrFixedLenField, LongField
from scapy.packet import Packet
@@ -532,6 +533,9 @@
BitField("treatment_inner_tpid_de", 0, 3),
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class ExtendedVlanTaggingOperationConfigurationData(EntityClass):
class_id = 171
@@ -814,6 +818,9 @@
ShortField("reserved0", 0)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class AccessControlRow1(Packet):
name = "AccessControlRow1"
@@ -831,6 +838,9 @@
ShortField("reserved1", 0)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class AccessControlRow2(Packet):
name = "AccessControlRow2"
@@ -844,6 +854,8 @@
StrFixedLenField("reserved2", None, 10)
]
+ def to_json(self):
+ return json.dumps(self.fields)
class DownstreamIgmpMulticastTci(Packet):
name = "DownstreamIgmpMulticastTci"
@@ -852,6 +864,9 @@
ShortField("tci", None)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class MulticastOperationsProfile(EntityClass):
class_id = 309
@@ -903,6 +918,9 @@
StrFixedLenField("reserved1", None, 8)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class AllowedPreviewGroupsRow0(Packet):
name = "AllowedPreviewGroupsRow0"
@@ -918,6 +936,9 @@
ShortField("vlan_id_uni", None)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class AllowedPreviewGroupsRow1(Packet):
name = "AllowedPreviewGroupsRow1"
@@ -933,6 +954,9 @@
ShortField("time_left", None)
]
+ def to_json(self):
+ return json.dumps(self.fields)
+
class MulticastSubscriberConfigInfo(EntityClass):
class_id = 310
diff --git a/voltha/extensions/omci/omci_me.py b/voltha/extensions/omci/omci_me.py
index 8fff922..fdd404a 100644
--- a/voltha/extensions/omci/omci_me.py
+++ b/voltha/extensions/omci/omci_me.py
@@ -653,11 +653,11 @@
"""
:param mib_data_sync: (int) This attribute is used to check the alignment
of the MIB of the ONU with the corresponding MIB
- in the OLT. (0..255)
+ in the OLT. (0..0xFFFF)
"""
self.check_type(mib_data_sync, (int, type(None)))
- if mib_data_sync is not None and not 0 <= mib_data_sync <= 255:
- raise ValueError('mib_data_sync should be 0..255')
+ if mib_data_sync is not None and not 0 <= mib_data_sync <= 0xFFFF:
+ raise ValueError('mib_data_sync should be 0..0xFFFF') # TODO: Verify max value
data = {'mib_data_sync': mib_data_sync} if mib_data_sync is not None else None
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 134d3ec..0169dd1 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -117,9 +117,14 @@
@property
def custom_me_map(self):
- """ Custom Managed Entity Map for this device"""
+ """ Vendor-specific Managed Entity Map for this vendor's device"""
return self._custom_me_map
+ @property
+ def me_map(self):
+ """ Combined ME and Vendor-specific Managed Entity Map for this device"""
+ return self._me_map
+
def _cancel_deferred(self):
d, self._deferred = self._deferred, None
try:
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index 801ea03..9973444 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -15,6 +15,7 @@
#
import structlog
from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+from voltha.extensions.omci.database.mib_db_ext import MibDbExternal
from voltha.extensions.omci.state_machines.mib_sync import MibSynchronizer
from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
@@ -25,7 +26,8 @@
OpenOmciAgentDefaults = {
'mib-synchronizer': {
'state-machine': MibSynchronizer, # Implements the MIB synchronization state machine
- 'database': MibDbVolatileDict, # Implements ME MIB database
+ # 'database': MibDbVolatileDict, # Implements volatile ME MIB database
+ 'database': MibDbExternal, # Implements persistent ME MIB database
'tasks': {
'mib-upload': MibUploadTask,
'get-mds': GetMdsTask,
@@ -75,6 +77,11 @@
# self._alarm_synchronizer_info = support_classes['alarm-synchronizer']
# self._alarm_database_cls = self._alarm_synchronizer_info['database']
+ @property
+ def core(self):
+ """ Return a reference to the VOLTHA Core component"""
+ return self._core
+
def start(self):
"""
Start OpenOMCI
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index edcd7f4..b0beb48 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -110,8 +110,9 @@
self._deferred = None
self._current_task = None # TODO: Support multiple running tasks after v.1.3.0 release
self._task_deferred = None
- self._mib_data_sync = db.get_mib_data_sync(device_id) or 0
- self._last_mib_db_sync_value = db.get_last_sync(device_id)
+ self._mib_data_sync = 0
+ self._last_mib_db_sync_value = None
+ self._device_in_db = False
self._event_bus = EventBusClient()
self._subscriptions = { # RxEvent.enum -> Subscription Object
@@ -214,6 +215,25 @@
# TODO: Stop and remove any currently running or scheduled tasks
# TODO: Anything else?
+ def _seed_database(self):
+ if not self._device_in_db:
+ try:
+ try:
+ self._database.start()
+ self._database.add(self._device_id)
+ self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
+
+ except KeyError:
+ # Device already is in database
+ self.log.debug('seed-db-exist', device_id=self._device_id)
+ self._mib_data_sync = self._database.get_mib_data_sync(self._device_id)
+ self._last_mib_db_sync_value = self._database.get_last_sync(self._device_id)
+
+ self._device_in_db = True
+
+ except Exception as e:
+ self.log.exception('seed-database-failure', e=e)
+
def on_enter_starting(self):
"""
Determine ONU status and start MIB Synchronization tasks
@@ -221,6 +241,9 @@
self._device = self._agent.get_device(self._device_id)
self.log.debug('state-transition', new_onu=self.is_new_onu)
+ # Make sure root of external MIB Database exists
+ self._seed_database()
+
# Set up Response and Autonomous notification subscriptions
try:
for event, sub in self._sub_mapping.iteritems():
diff --git a/voltha/extensions/omci/tasks/mib_upload.py b/voltha/extensions/omci/tasks/mib_upload.py
index 8f442cd..adea4a6 100644
--- a/voltha/extensions/omci/tasks/mib_upload.py
+++ b/voltha/extensions/omci/tasks/mib_upload.py
@@ -14,7 +14,7 @@
# limitations under the License.
#
from task import Task
-from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure, AlreadyCalledError
from twisted.internet import reactor
@@ -94,24 +94,50 @@
# Begin MIB Upload
results = yield device.omci_cc.send_mib_upload()
number_of_commands = results.fields['omci_message'].fields['number_of_commands']
+ failed = False
for seq_no in xrange(number_of_commands):
if not device.active or not device.omci_cc.enabled:
self.deferred.errback(failure.Failure(
GeneratorExit('OMCI and/or ONU is not active')))
return
- yield device.omci_cc.send_mib_upload_next(seq_no)
- # Successful if here
- self.log.info('mib-synchronized')
- self.deferred.callback('success, loaded {} ME Instances'.
- format(number_of_commands))
+ for retry in range(0, 3):
+ try:
+ self.log.debug('mib-upload-next-request', seq_no=seq_no, retry=retry, number_of_commands=number_of_commands)
+ yield device.omci_cc.send_mib_upload_next(seq_no)
+ self.log.debug('mib-upload-next-success', seq_no=seq_no, number_of_commands=number_of_commands)
+ failed = False
+ break
+
+ except TimeoutError as e:
+ from common.utils.asleep import asleep
+ self.log.warn('mib-upload-timeout', e=e, seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ failed = True
+ if retry < 2:
+ yield asleep(0.3)
+
+ if not failed:
+ # Successful if here
+ self.log.info('mib-synchronized')
+ self.deferred.callback('success, loaded {} ME Instances'.
+ format(number_of_commands))
+ else:
+ self.deferred.errback(failure.Failure(e))
except TimeoutError as e:
- self.log.warn('mib-upload-timeout', e=e, seq_no=seq_no,
+ self.log.warn('mib-upload-timeout-on-reset', e=e, seq_no=seq_no,
number_of_commands=number_of_commands)
self.deferred.errback(failure.Failure(e))
+ except AlreadyCalledError:
+ # Can occur if task canceled due to MIB Sync state change
+ self.log.debug('already-called-exception', seq_no=seq_no,
+ number_of_commands=number_of_commands)
+ assert self.deferred.called, \
+ 'Unexpected AlreadyCalledError exception: seq: {} of {}'.format(seq_no,
+ number_of_commands)
except Exception as e:
self.log.exception('mib-upload', e=e)
self.deferred.errback(failure.Failure(e))
diff --git a/voltha/protos/omci_mib_db.proto b/voltha/protos/omci_mib_db.proto
new file mode 100644
index 0000000..f07ec20
--- /dev/null
+++ b/voltha/protos/omci_mib_db.proto
@@ -0,0 +1,55 @@
+//
+// Copyright 2018 - present the original author or authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+syntax = "proto3";
+
+option go_package = "github.com/opencord/voltha/protos/go/voltha";
+
+package omci;
+
+import "meta.proto";
+
+
+message MibAttributeData {
+ string name = 1 [(voltha.access) = READ_ONLY];
+ string value = 2;
+}
+
+message MibInstanceData {
+ uint32 instance_id = 1 [(voltha.access) = READ_ONLY];
+ string created = 2;
+ string modified = 3;
+
+ repeated MibAttributeData attributes = 4
+ [(voltha.child_node) = {key: "name"}];
+}
+
+message MibClassData {
+ uint32 class_id = 1 [(voltha.access) = READ_ONLY];
+
+ repeated MibInstanceData instances= 2
+ [(voltha.child_node) = {key: "instance_id"}];
+}
+
+message MibDeviceData {
+ string device_id = 1 [(voltha.access) = READ_ONLY];
+ string created = 2;
+ string last_sync_time = 3;
+ uint32 mib_data_sync = 4;
+ uint32 version = 5;
+
+ repeated MibClassData classes = 6
+ [(voltha.child_node) = {key: "class_id"}];
+}
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 1dc7897..849c95c 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -30,6 +30,7 @@
import "bbf_fiber_multicast_gemport_body.proto";
import "bbf_fiber_tcont_body.proto";
import "bbf_fiber_traffic_descriptor_profile_body.proto";
+import "omci_mib_db.proto";
option java_package = "org.opencord.voltha";
@@ -135,6 +136,10 @@
repeated
bbf_fiber.MulticastDistributionSetData multicast_distibution_sets = 27
[(child_node) = {key: "name"}];
+
+ repeated
+ omci.MibDeviceData omci_mibs = 28
+ [(child_node) = {key: "device_id"}];
}
message VolthaInstances {
@@ -194,6 +199,10 @@
repeated
bbf_fiber.MulticastDistributionSetData multicast_distibution_sets = 27
[(child_node) = {key: "name"}];
+
+ repeated
+ omci.MibDeviceData omci_mib_database = 28
+ [(child_node) = {key: "device_id"}];
}
// Device Self Test Response