VOL-722: MIB Resynchronization Task implemented
Tested with Adtran and T&W ONUs
Change-Id: I8bf795ec15b93ddc4693ba84f277d3cc765b2cd0
diff --git a/tests/utests/voltha/extensions/omci/test_mib_resync_task.py b/tests/utests/voltha/extensions/omci/test_mib_resync_task.py
index d598c48..073b2d6 100644
--- a/tests/utests/voltha/extensions/omci/test_mib_resync_task.py
+++ b/tests/utests/voltha/extensions/omci/test_mib_resync_task.py
@@ -214,10 +214,10 @@
self.assertEqual(len(attr_diffs), 0)
def test_on_olt_only(self):
- class_id = PriorityQueueG.class_id
+ class_id = GemInterworkingTp.class_id
inst_id = 0
attributes = {
- 'related_port': int(1234567) # IntField
+ 'gal_loopback_configuration': int(1)
}
self.olt_db.set(_DEVICE_ID, class_id, inst_id, attributes)
diff --git a/tests/utests/voltha/extensions/omci/test_openomci_agent.py b/tests/utests/voltha/extensions/omci/test_openomci_agent.py
index d1c69d2..498a59c 100644
--- a/tests/utests/voltha/extensions/omci/test_openomci_agent.py
+++ b/tests/utests/voltha/extensions/omci/test_openomci_agent.py
@@ -24,6 +24,7 @@
from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+from voltha.extensions.omci.tasks.mib_reconcile_task import MibReconcileTask
class TestOpenOmciAgent(TestCase):
@@ -53,7 +54,7 @@
self.assertTrue(isinstance(mib_sync_tasks['get-mds'], type(GetMdsTask)))
self.assertTrue(isinstance(mib_sync_tasks['mib-audit'], type(GetMdsTask)))
self.assertTrue(isinstance(mib_sync_tasks['mib-resync'], type(MibResyncTask)))
- # self.assertTrue(isinstance(mib_sync_tasks['mib-reconcile'], type('TODO: not yet coded')))
+ self.assertTrue(isinstance(mib_sync_tasks['mib-reconcile'], type(MibReconcileTask)))
# caps = OpenOmciAgentDefaults.get('onu-capabilities')
#
diff --git a/voltha/extensions/omci/database/mib_db_dict.py b/voltha/extensions/omci/database/mib_db_dict.py
index daadc68..19294b2 100644
--- a/voltha/extensions/omci/database/mib_db_dict.py
+++ b/voltha/extensions/omci/database/mib_db_dict.py
@@ -234,13 +234,14 @@
try:
device_db = self._data[device_id]
class_db = device_db.get(class_id)
+ created = False
if class_db is None:
- device_db[class_id] = {
- CLASS_ID_KEY: class_id
- }
+ device_db[class_id] = {CLASS_ID_KEY: class_id}
+
class_db = device_db[class_id]
self._modified = now
+ created = True
instance_db = class_db.get(instance_id)
if instance_db is None:
@@ -252,6 +253,7 @@
}
instance_db = class_db[instance_id]
self._modified = now
+ created = True
changed = False
@@ -290,7 +292,7 @@
if ATTRIBUTES_KEY in instance_db else None
assert db_value is None or isinstance(value, type(db_value)), \
- "New value for attribute '{}' type is changing from '{}' to '{}'".\
+ "New value type for attribute '{}' type is changing from '{}' to '{}'".\
format(attribute, type(db_value), type(value))
if db_value is None or db_value != value:
@@ -301,10 +303,11 @@
instance_db[MODIFIED_KEY] = now
self._modified = now
- return changed
+ return changed or created
except Exception as e:
- self.log.error('set-failure', e=e)
+ self.log.error('set-failure', e, class_id=class_id,
+ instance_id=instance_id, attributes=attributes)
raise
def delete(self, device_id, class_id, instance_id):
diff --git a/voltha/extensions/omci/database/mib_db_ext.py b/voltha/extensions/omci/database/mib_db_ext.py
index 96fc0e3..e12331f 100644
--- a/voltha/extensions/omci/database/mib_db_ext.py
+++ b/voltha/extensions/omci/database/mib_db_ext.py
@@ -754,7 +754,7 @@
except KeyError:
# Here if the class-id does not yet exist in the database
- self.log.info("adding-key-not-found", class_id=class_id)
+ self.log.debug("adding-key-not-found", class_id=class_id)
return self._add_new_class(device_id, class_id, instance_id,
attributes)
finally:
@@ -827,7 +827,7 @@
diff = datetime.utcnow() - start_time
# NOTE: Change to 'debug' when checked in, manually change to 'info'
# for development testing.
- self.log.info('db-delete-time', milliseconds=diff.microseconds/1000)
+ self.log.debug('db-delete-time', milliseconds=diff.microseconds/1000)
self._statistics['delete'].increment(diff.microseconds/1000)
def query(self, device_id, class_id=None, instance_id=None, attributes=None):
@@ -910,8 +910,8 @@
diff = end_time.utcnow() - start_time
# NOTE: Change to 'debug' when checked in, manually change to 'info'
# for development testing.
- self.log.info('db-get-time', milliseconds=diff.microseconds/1000, class_id=class_id,
- instance_id=instance_id)
+ self.log.debug('db-get-time', milliseconds=diff.microseconds/1000, class_id=class_id,
+ instance_id=instance_id)
self._statistics['get'].increment(diff.microseconds/1000)
def _instance_to_dict(self, device_id, class_id, instance):
diff --git a/voltha/extensions/omci/omci_me.py b/voltha/extensions/omci/omci_me.py
index cf55a44..4f84fbb 100644
--- a/voltha/extensions/omci/omci_me.py
+++ b/voltha/extensions/omci/omci_me.py
@@ -681,7 +681,11 @@
raise TypeError('ignore_arc should be a boolean')
if mib_data_sync is not None:
- data = {'mib_data_sync': mib_data_sync}
+ # Note: Currently the Scapy decode/encode is 16-bits since we need
+ # the data field that large in order to support MIB and Alarm Upload Next
+ # commands. Push our 8-bit MDS value into the upper 8-bits so that
+ # it is encoded properly into the ONT_Data 'set' frame
+ data = {'mib_data_sync': mib_data_sync << 8}
elif sequence_number is not None:
data = {'mib_data_sync': sequence_number}
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 43f55b6..086e1d7 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -123,8 +123,8 @@
# downloader_info['tasks'],
# advertise_events=advertise)
self._image_agent = ImageAgent(self._omci_agent, device_id, downloader_info['state-machine'],
- downloader_info['tasks'],
- advertise_events=advertise)
+ downloader_info['tasks'],
+ advertise_events=advertise)
except Exception as e:
self.log.exception('state-machine-create-failed', e=e)
raise
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index d337475..8c90b31 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -20,6 +20,7 @@
from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+from voltha.extensions.omci.tasks.mib_reconcile_task import MibReconcileTask
from voltha.extensions.omci.tasks.sync_time_task import SyncTimeTask
from voltha.extensions.omci.state_machines.Alarm_sync import AlarmSynchronizer
from voltha.extensions.omci.tasks.alarm_sync_data import AlarmSyncDataTask
@@ -47,7 +48,7 @@
'get-mds': GetMdsTask,
'mib-audit': GetMdsTask,
'mib-resync': MibResyncTask,
- 'mib-reconcile': None # TODO: post-v1.3.0 (Reconcile out-of-sync MIB DB)
+ 'mib-reconcile': MibReconcileTask
}
},
'omci-capabilities': {
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index ebb5124..b63e36b 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -18,6 +18,7 @@
from transitions import Machine
from twisted.internet import reactor
from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.database.mib_db_api import MDS_KEY
from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
AttributeAccess
from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
@@ -49,29 +50,30 @@
{'trigger': 'examine_mds', 'source': 'starting', 'dest': 'examining_mds'},
{'trigger': 'success', 'source': 'uploading', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'uploading', 'dest': 'starting'},
{'trigger': 'success', 'source': 'examining_mds', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'examining_mds', 'dest': 'starting'},
- {'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'uploading'},
+ {'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'resynchronizing'},
{'trigger': 'audit_mib', 'source': 'in_sync', 'dest': 'auditing'},
+
+ {'trigger': 'success', 'source': 'out_of_sync', 'dest': 'in_sync'},
{'trigger': 'audit_mib', 'source': 'out_of_sync', 'dest': 'auditing'},
{'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
- {'trigger': 'timeout', 'source': 'auditing', 'dest': 'starting'},
{'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
{'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
{'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
{'trigger': 'diffs_found', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
- {'trigger': 'timeout', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
+
+ # Do wildcard 'timeout' trigger that sends us back to start
+ {'trigger': 'timeout', 'source': '*', 'dest': 'starting'},
# Do wildcard 'stop' trigger last so it covers all previous states
{'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
]
DEFAULT_TIMEOUT_RETRY = 5 # Seconds to delay after task failure/timeout
- DEFAULT_AUDIT_DELAY = 15 # Periodic tick to audit the MIB Data Sync
+ DEFAULT_AUDIT_DELAY = 60 # Periodic tick to audit the MIB Data Sync
DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
def __init__(self, agent, device_id, mib_sync_tasks, db,
@@ -115,6 +117,7 @@
self._get_mds_task = mib_sync_tasks['get-mds']
self._audit_task = mib_sync_tasks['mib-audit']
self._resync_task = mib_sync_tasks['mib-resync']
+ self._reconcile_task = mib_sync_tasks['mib-reconcile']
self._advertise_events = advertise_events
self._deferred = None
@@ -123,10 +126,13 @@
self._mib_data_sync = 0
self._last_mib_db_sync_value = None
self._device_in_db = False
+ self._next_resync = None
self._on_olt_only_diffs = None
self._on_onu_only_diffs = None
self._attr_diffs = None
+ self._audited_olt_db = None
+ self._audited_onu_db = None
self._event_bus = EventBusClient()
self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
@@ -293,7 +299,7 @@
def on_enter_starting(self):
"""
- Determine ONU status and start MIB Synchronization tasks
+ Determine ONU status and start/re-start MIB Synchronization tasks
"""
self._device = self._agent.get_device(self._device_id)
self.advertise(OpenOmciEventType.state_change, self.state)
@@ -325,6 +331,13 @@
except Exception as e:
self.log.exception('dev-subscription-setup', e=e)
+ # Clear any previous audit results
+ self._on_olt_only_diffs = None
+ self._on_onu_only_diffs = None
+ self._attr_diffs = None
+ self._audited_olt_db = None
+ self._audited_onu_db = None
+
# Determine if this ONU has ever synchronized
if self.is_new_onu:
# Start full MIB upload
@@ -336,13 +349,14 @@
def on_enter_uploading(self):
"""
- Begin full MIB data sync, starting with a MIB RESET
+ Begin full MIB data upload, starting with a MIB RESET
"""
self.advertise(OpenOmciEventType.state_change, self.state)
def success(results):
self.log.debug('mib-upload-success', results=results)
self._current_task = None
+ self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
def failure(reason):
@@ -371,6 +385,7 @@
# Examine MDS value
if self.mib_data_sync == onu_mds_value:
+ self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
else:
self._deferred = reactor.callLater(0, self.mismatch)
@@ -388,7 +403,7 @@
def on_enter_in_sync(self):
"""
- Schedule a tick to occur to in the future to request an audit
+ The OLT/OpenOMCI MIB Database is in sync with the ONU MIB Database.
"""
self.advertise(OpenOmciEventType.state_change, self.state)
self.last_mib_db_sync = datetime.utcnow()
@@ -404,75 +419,59 @@
o the MIB_Data_Sync values are not equal, or
o the MIBs were compared and differences were found.
- If all of the *_diff properties are allNone, then we are here after initial
- startup and MDS did not match, or the MIB Audit/Resync state failed.
-
- In the second case, one or more of our *_diff properties will be non-None.
- If that is true, we need to update the ONU accordingly.
-
- Schedule a tick to occur to in the future to request an audit
+ Schedule a task to reconcile the differences
"""
self.advertise(OpenOmciEventType.state_change, self.state)
- self._device.mib_db_in_sync = False
- if all(diff is None for diff in [self._on_olt_only_diffs,
- self._on_onu_only_diffs,
- self._attr_diffs]):
- # Retry the Audit process
- self._deferred = reactor.callLater(1, self.audit_mib)
+ # We are only out-of-sync if there were differences. If here due to MDS
+ # value differences, still run the reconcile so we up date the ONU's MDS
+ # value to match ours.
- else:
- step = 'Nothing'
- class_id = 0
- instance_id = 0
- attribute = ''
+ self._device.mib_db_in_sync = self._attr_diffs is None and \
+ self._on_onu_only_diffs is None and \
+ self._on_olt_only_diffs is None
- try:
- # Need to update the ONU accordingly
- if self._attr_diffs is not None:
- step = 'attribute-update'
- pass # TODO: Perform the 'set' commands needed
+ def success(onu_mds_value):
+ self.log.debug('examine-mds-success', mds_value=onu_mds_value)
+ self._current_task = None
+ self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
+ self._deferred = reactor.callLater(0, self.success)
- if self._on_onu_only_diffs is not None:
- step = 'onu-cleanup'
- #
- # TODO: May want to watch for ONU only attributes
- # It is possible that if they are the 'default' value or
- # are not used if another attribute is set a specific way.
- #
- # For instance, no one may set the gal_loopback_configuration
- # in the GEM Interworking Termination point since its default
- # values is '0' disable, but when we audit, the ONU will report zero.
- #
- # A good way to perhaps fix this is to update our database with the
- # default. Or perhaps set all defaults in the database in the first
- # place when we do the initial create/set.
- #
- pass # TODO: Perform 'delete' commands as needed, see 'default' note above
+ def failure(reason):
+ self.log.info('examine-mds-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
- if self._on_olt_only_diffs is not None:
- step = 'olt-push'
- pass # TODO: Perform 'create' commands as needed
+ diff_collection = {
+ 'onu-only': self._on_onu_only_diffs,
+ 'olt-only': self._on_olt_only_diffs,
+ 'attributes': self._attr_diffs,
+ 'olt-db': self._audited_olt_db,
+ 'onu-db': self._audited_onu_db
+ }
+ # Clear out results since reconciliation task will be handling them
+ self._on_olt_only_diffs = None
+ self._on_onu_only_diffs = None
+ self._attr_diffs = None
+ self._audited_olt_db = None
+ self._audited_onu_db = None
- self._deferred = reactor.callLater(1, self.audit_mib)
-
- except Exception as e:
- self.log.exception('onu-update', e=e, step=step, class_id=class_id,
- instance_id=instance_id, attribute=attribute)
- # Retry the Audit process
- self._deferred = reactor.callLater(1, self.audit_mib)
+ self._current_task = self._reconcile_task(self._agent, self._device_id, diff_collection)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
def on_enter_auditing(self):
"""
Perform a MIB Audit. If our last MIB resync was too long in the
past, perform a resynchronization anyway
"""
- next_resync = self.last_mib_db_sync + timedelta(seconds=self._resync_delay)\
- if self.last_mib_db_sync is not None else datetime.utcnow()
-
self.advertise(OpenOmciEventType.state_change, self.state)
- if datetime.utcnow() >= next_resync:
+ if self._next_resync is None:
+ self.log.error('next-forced-resync-error', msg='Next Resync should always be valid at this point')
+ self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
+
+ if datetime.utcnow() >= self._next_resync:
self._deferred = reactor.callLater(0, self.force_resync)
else:
def success(onu_mds_value):
@@ -509,25 +508,22 @@
on_olt_only = results.get('on-olt-only')
on_onu_only = results.get('on-onu-only')
attr_diffs = results.get('attr-diffs')
+ olt_db = results.get('olt-db')
+ onu_db = results.get('onu-db')
self._current_task = None
self._on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
self._on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
self._attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
+ self._audited_olt_db = olt_db
+ self._audited_onu_db = onu_db
- if all(diff is None for diff in [self._on_olt_only_diffs,
- self._on_onu_only_diffs,
- self._attr_diffs]):
- # TODO: If here, do we need to make sure OpenOMCI mib_data_sync matches
- # the ONU. Remember we compared against an ONU snapshot, it may
- # be different now. Best thing to do is perhaps set it to our
- # MDS value if different. Also remember that setting the MDS on
- # the ONU to 'n' is a set command and it will be 'n+1' after the
- # set.
- #
- # TODO: Also look into attributes covered by AVC and treat appropriately
- # since may have missed the AVC
+ mds_equal = self.mib_data_sync == self._audited_onu_db[MDS_KEY]
+ if mds_equal and all(diff is None for diff in [self._on_olt_only_diffs,
+ self._on_onu_only_diffs,
+ self._attr_diffs]):
+ self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
self._deferred = reactor.callLater(0, self.success)
else:
self._deferred = reactor.callLater(0, self.diffs_found)
@@ -535,9 +531,6 @@
def failure(reason):
self.log.info('resync-failure', reason=reason)
self._current_task = None
- self._on_olt_only_diffs = None
- self._on_onu_only_diffs = None
- self._attr_diffs = None
self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
self._current_task = self._resync_task(self._agent, self._device_id)
@@ -595,29 +588,29 @@
if self.state == 'disabled':
self.log.error('rx-in-invalid-state', state=self.state)
- elif self.state != 'uploading':
- # Inspect the notification
- omci_msg = notification.fields['omci_message'].fields
- class_id = omci_msg['entity_class']
- instance_id = omci_msg['entity_id']
- data = omci_msg['data']
- attributes = [data.keys()]
+ # Inspect the notification
+ omci_msg = notification.fields['omci_message'].fields
+ class_id = omci_msg['entity_class']
+ instance_id = omci_msg['entity_id']
+ data = omci_msg['data']
+ attributes = [data.keys()]
- # Look up ME Instance in Database. Not-found can occur if a MIB
- # reset has occurred
- info = self._database.query(self.device_id, class_id, instance_id, attributes)
- # TODO: Add old/new info to log message
- self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
+ # Look up ME Instance in Database. Not-found can occur if a MIB
+ # reset has occurred
+ info = self._database.query(self.device_id, class_id, instance_id, attributes)
+ # TODO: Add old/new info to log message
+ self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
- # Save the changed data to the MIB.
- changed = self._database.set(self.device_id, class_id, instance_id, data)
+ # Save the changed data to the MIB.
+ self._database.set(self.device_id, class_id, instance_id, data)
- if changed:
- # Autonomous creation and deletion of managed entities do not
- # result in an increment of the MIB data sync value. However,
- # AVC's in response to a change by the Operator do incur an
- # increment of the MIB Data Sync
- pass
+ # Autonomous creation and deletion of managed entities do not
+ # result in an increment of the MIB data sync value. However,
+ # AVC's in response to a change by the Operator do incur an
+ # increment of the MIB Data Sync. If here during uploading,
+ # we issued a MIB-Reset which may generate AVC. (TODO: Focus testing during hardening)
+ if self.state == 'uploading':
+ self.increment_mib_data_sync()
except KeyError:
pass # NOP
@@ -844,11 +837,11 @@
entity_id = omci_msg['entity_id']
attributes = {k: v for k, v in omci_msg['data'].items()}
- # Save to the database
- modified = self._database.set(self._device_id, class_id, entity_id, attributes)
-
- if modified:
- self.increment_mib_data_sync()
+ # Save to the database (Do not save 'sets' of the mib-data-sync however)
+ if class_id != OntData.class_id:
+ modified = self._database.set(self._device_id, class_id, entity_id, attributes)
+ if modified:
+ self.increment_mib_data_sync()
except KeyError as e:
pass # NOP
@@ -919,3 +912,18 @@
if isinstance(attributes, dict) and len(attributes) and\
self.query_mib(class_id, entity_id) is not None:
self._database.set(self._device_id, class_id, entity_id, attributes)
+
+ def mib_delete(self, class_id, entity_id):
+ """
+ Delete an existing ME Class instance
+
+ This method is primarily used by other state machines to delete an ME
+ from the MIB database
+
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME Class entity ID
+
+ :raises KeyError: If device does not exist
+ :raises DatabaseStateError: If the database is not enabled
+ """
+ self._database.delete(self._device_id, class_id, entity_id)
diff --git a/voltha/extensions/omci/tasks/get_mds_task.py b/voltha/extensions/omci/tasks/get_mds_task.py
index 3807f7d..325fa1a 100644
--- a/voltha/extensions/omci/tasks/get_mds_task.py
+++ b/voltha/extensions/omci/tasks/get_mds_task.py
@@ -88,15 +88,20 @@
omci_msg = results.fields['omci_message'].fields
status = omci_msg['success_code']
- self.log.debug('ont-data-mds', status=status,
- mib_data_sync=omci_msg['data']['mib_data_sync']
- if 'data' in omci_msg and 'mib_data_sync' in omci_msg['data']
- else None)
+
+ # Note: Currently the data reported by the Scapy decode is 16-bits since we need
+ # the data field that large in order to support MIB and Alarm Upload Next
+ # commands. Select only the first 8-bits since that is the size of the MIB
+ # Data Sync attribute
+ mds = (omci_msg['data']['mib_data_sync'] >> 8) & 0xFF \
+ if 'data' in omci_msg and 'mib_data_sync' in omci_msg['data'] else -1
+
+ self.log.debug('ont-data-mds', status=status, mib_data_sync=mds)
assert status == RC.Success, 'Unexpected Response Status: {}'.format(status)
# Successful if here
- self.deferred.callback(omci_msg['data']['mib_data_sync'])
+ self.deferred.callback(mds)
except TimeoutError as e:
self.log.warn('get-mds-timeout', e=e)
diff --git a/voltha/extensions/omci/tasks/mib_reconcile_task.py b/voltha/extensions/omci/tasks/mib_reconcile_task.py
new file mode 100644
index 0000000..721b225
--- /dev/null
+++ b/voltha/extensions/omci/tasks/mib_reconcile_task.py
@@ -0,0 +1,678 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from common.utils.asleep import asleep
+from voltha.extensions.omci.tasks.task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure, returnValue, TimeoutError
+from voltha.extensions.omci.omci_defs import *
+from voltha.extensions.omci.omci_me import OntDataFrame
+from voltha.extensions.omci.omci_frame import OmciFrame, OmciDelete, OmciCreate, OmciSet
+from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
+
+OP = EntityOperations
+RC = ReasonCodes
+AA = AttributeAccess
+
+
+class MibReconcileException(Exception):
+ pass
+
+
+class MibPartialSuccessException(Exception):
+ pass
+
+
+class MibReconcileTask(Task):
+ """
+ OpenOMCI MIB Reconcile Task
+
+ This task attempts to resynchronize the MIB. Note that it runs in exclusive
+ OMCI-CC mode so that it can query the current database/ONU to verify the
+ differences still exist before correcting them.
+ """
+ task_priority = 240
+ name = "MIB Reconcile Task"
+ max_sequential_db_updates = 5 # Be kind, rewind
+ db_update_pause = 0.05 # 50mS
+
+ def __init__(self, omci_agent, device_id, diffs):
+ """
+ Class initialization
+
+ :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param diffs: (dict) Dictionary of what was found to be invalid
+ """
+ super(MibReconcileTask, self).__init__(MibReconcileTask.name,
+ omci_agent,
+ device_id,
+ priority=MibReconcileTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._diffs = diffs
+ self._device = None
+ self._sync_sm = None
+ self._db_updates = 0 # For tracking sequential blocking consul/etcd updates
+
+ def cancel_deferred(self):
+ super(MibReconcileTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Reconcile task
+ """
+ super(MibReconcileTask, self).start()
+
+ self._device = self.omci_agent.get_device(self.device_id)
+
+ if self._device is None:
+ e = MibReconcileException('Device {} no longer exists'.format(self.device_id))
+ self.deferred.errback(failure.Failure(e))
+ return
+
+ self._sync_sm = self._device.mib_synchronizer
+
+ if self._device is None:
+ e = MibReconcileException('Device {} MIB State machine no longer exists'.format(self.device_id))
+ self.deferred.errback(failure.Failure(e))
+ return
+
+ self._local_deferred = reactor.callLater(0, self.perform_mib_reconcile)
+
+ def stop(self):
+ """
+ Shutdown MIB Reconcile task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ super(MibReconcileTask, self).stop()
+
+ @inlineCallbacks
+ def perform_mib_reconcile(self):
+ """
+ Perform the MIB Reconciliation sequence.
+
+ The sequence to reconcile will be to clean up ONU only MEs, followed by
+ OLT/OpenOMCI-only MEs, and then finally correct common MEs with differing
+ attributes.
+ """
+ self.log.debug('perform-mib-reconcile')
+
+ try:
+ successes = 0
+ failures = 0
+
+ if self._diffs['onu-only'] is not None and len(self._diffs['onu-only']):
+ results = yield self.fix_onu_only(self._diffs['onu-only'],
+ self._diffs['onu-db'])
+ self.log.debug('onu-only-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ if self._diffs['olt-only'] is not None and len(self._diffs['olt-only']):
+ results = yield self.fix_olt_only(self._diffs['olt-only'],
+ self._diffs['onu-db'],
+ self._diffs['olt-db'])
+ self.log.debug('olt-only-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ if self._diffs['attributes'] is not None and len(self._diffs['attributes']):
+ results = yield self.fix_attributes_only(self._diffs['attributes'],
+ self._diffs['onu-db'],
+ self._diffs['olt-db'])
+ self.log.debug('attributes-results', good=results[0], bad=results[1])
+ successes += results[0]
+ failures += results[1]
+
+ # Success? Update MIB-data-sync
+ if failures == 0:
+ results = yield self.update_mib_data_sync()
+ successes += results[0]
+ failures += results[1]
+
+ # Send back final status
+ if failures > 0:
+ msg = '{} Successful updates, {} failures'.format(successes, failure)
+ error = MibPartialSuccessException(msg) if successes \
+ else MibReconcileException(msg)
+ self.deferred.errback(failure.Failure(error))
+ else:
+ self.deferred.callback('{} Successful updates'.format(successes))
+
+ except Exception as e:
+ if not self.deferred.called:
+ self.log.exception('reconcile', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def fix_onu_only(self, onu, onu_db):
+ """
+ Fix ME's that were only found on the ONU. For ONU only MEs there are
+ the following things that will be checked.
+
+ o ME's that do not have an OpenOMCI class decoder. These are stored
+ as binary blobs in the MIB database. Since we do not ever set them
+ (since no encoder as well), just store them in the OLT/OpenOMCI MIB
+ Database.
+
+ o For ME's that are created by the ONU (no create/delete access), the
+ MEs 'may' be due to a firmware upgrade and reboot or in response to
+ an OLT creating another ME entity and then creating this ME. Place
+ these 'new' into the database.
+
+ o For ME's that are created by the OLT/OpenOMCI, delete them from the
+ ONU
+
+ :param onu: (list(int,int)) List of tuples where (class_id, inst_id)
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ ####################################################################
+ # First the undecodables and onu-created (treated the same)
+ undecodable = self._undecodable(onu, me_map)
+ onu_created = self._onu_created(onu, me_map)
+
+ if len(undecodable) or len(onu_created):
+ results = yield self.fix_onu_only_save_to_db(undecodable, onu_created, onu_db)
+ successes += results[0]
+ failures += results[1]
+
+ ####################################################################
+ # Last the OLT created values, resend these to the ONU
+
+ olt_created = self._olt_created(onu, me_map)
+ if len(olt_created):
+ results = yield self.fix_onu_only_remove_from_onu(olt_created)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_onu_only_save_to_db(self, undecodable, onu_created, onu_db):
+ """ In ONU database and needs to be saved to OLT/OpenOMCI database """
+ successes = 0
+ failures = 0
+
+ for cid, eid in undecodable + onu_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # If in current MIB, had an audit issue or other MIB operation
+ # put it into the database, declare it a failure so we audit again
+ olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+
+ if olt_entry is not None and len(olt_entry):
+ self.log.debug('onu-only-in-current', cid=cid, eid=eid)
+ failures += 1 # Mark as failure so we audit again
+
+ elif cid not in onu_db:
+ self.log.warn('onu-only-not-in-audit', cid=cid, eid=eid)
+ failures += 1
+
+ else:
+ entry = onu_db[cid][eid]
+ self.strobe_watchdog()
+ self._sync_sm.mib_set(cid, eid, entry[ATTRIBUTES_KEY])
+ successes += 1
+
+ # If we do nothing but DB updates for ALOT of MEs, we are
+ # blocking other async twisted tasks, be kind and pause
+ self._db_updates += 1
+
+ if self._db_updates >= MibReconcileTask.max_sequential_db_updates:
+ self._db_updates = 0
+ self._local_deferred = yield asleep(MibReconcileTask.db_update_pause)
+
+ except Exception as e:
+ self.log.warn('onu-only-error', e=e)
+ failures += 1
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_onu_only_remove_from_onu(self, olt_created,):
+ """ On ONU, but no longer on OLT/OpenOMCI, delete it """
+ successes = 0
+ failures = 0
+
+ for cid, eid in olt_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # If in current MIB, had an audit issue, declare it an error
+ # and next audit should clear it up
+ current_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+
+ if current_entry is not None and len(current_entry):
+ self.log.debug('onu-only-in-current', cid=cid, eid=eid)
+ failures += 1
+
+ else:
+ # Delete it from the ONU. Assume success
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciDelete.message_id,
+ omci_message=OmciDelete(entity_class=cid, entity_id=eid))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'onu-attribute-update')
+ successes += 1
+ self._db_updates = 0
+
+ except Exception as e:
+ self.log.warn('olt-only-error', e=e)
+ failures += 1
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_olt_only(self, olt, onu_db, olt_db):
+ """
+ Fix ME's that were only found on the OLT. For OLT only MEs there are
+ the following things that will be checked.
+
+ o ME's that do not have an OpenOMCI class decoder. These are stored
+ as binary blobs in the MIB database. Since the OLT will never
+ create these (all are learned from ONU), it is assumed the ONU
+ has removed them for some purpose. So delete them from the OLT
+ database.
+
+ o For ME's that are created by the ONU (no create/delete access), the
+ MEs 'may' not be on the ONU because of a reboot or an OLT created
+ ME was deleted and the ONU gratuitously removes it. So delete them
+ from the OLT database.
+
+ o For ME's that are created by the OLT/OpenOMCI, delete them from the
+ ONU
+
+ :param olt: (list(int,int)) List of tuples where (class_id, inst_id)
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+ :param olt_db: (dict) OLT Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ ####################################################################
+ # First the undecodables and onu-created (treated the same) remove
+ # from OpenOMCI database
+ undecodable = self._undecodable(olt, me_map)
+ onu_created = self._onu_created(olt, me_map)
+
+ if len(undecodable) or len(onu_created):
+ good, bad = self.fix_olt_only_remove_from_db(undecodable, onu_created)
+ successes += good
+ failures += bad
+
+ ####################################################################
+ # Last the OLT created
+
+ olt_created = self._olt_created(olt, me_map)
+ if len(olt_created):
+ results = yield self.fix_olt_only_create_on_onu(olt_created, me_map)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ def fix_olt_only_remove_from_db(self, undecodable, onu_created):
+ """ On OLT, but not on ONU and are ONU created, delete from OLT/OpenOMCI DB """
+ successes = 0
+ failures = 0
+
+ for cid, eid in undecodable + onu_created:
+ if self.deferred.called: # Check if task canceled
+ break
+ try:
+ # Delete it. If already deleted (KeyError), then that is okay
+ self._sync_sm.mib_delete(cid, eid)
+ self.strobe_watchdog()
+
+ except KeyError:
+ successes += 1 # Not found in DB anymore, assume success
+
+ except Exception as e:
+ self.log.warn('olt-only-db-error', cid=cid, eid=eid, e=e)
+ failures += 1
+
+ return successes, failures
+
+ @inlineCallbacks
+ def fix_olt_only_create_on_onu(self, olt_created, me_map):
+ """ Found on OLT and created by OLT, so create on ONU"""
+ successes = 0
+ failures = 0
+
+ for cid, eid in olt_created:
+ if self.deferred.called: # Check if task canceled
+ break
+
+ try:
+ # Get current entry, use it if found
+ olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid)
+ me_entry = me_map[cid]
+
+ if olt_entry is None or len(olt_entry) == 0:
+ successes += 1 # Deleted before task got to run
+ else:
+ # Create it in the ONU. Only set-by-create attributes allowed
+ sbc_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items()
+ if AA.SetByCreate in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())}
+
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(entity_class=cid,
+ entity_id=eid,
+ data=sbc_data))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'olt-create-sbc')
+ successes += 1
+ self._db_updates = 0
+
+ # Try any writeable attributes now (but not set-by-create)
+ writeable_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items()
+ if AA.Writable in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())
+ and AA.SetByCreate not in
+ next((attr.access for attr in me_entry.attributes
+ if attr.field.name == k), set())}
+
+ if len(writeable_data):
+ attributes_mask = me_entry.mask_for(*writeable_data.keys())
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(entity_class=cid,
+ entity_id=eid,
+ attributes_mask=attributes_mask,
+ data=writeable_data))
+
+ self._local_deferred = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(self._local_deferred, 'olt-set-writeable')
+ successes += 1
+
+ except Exception as e:
+ self.log.exception('olt-only-fix', e=e, cid=cid, eid=eid)
+ failures += 1
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only(self, attrs, onu_db, olt_db):
+ """
+ Fix ME's that were found on both the ONU and OLT, but had differing
+ attribute values. There are several cases to handle here
+
+ o For ME's created on the ONU that have write attributes that
+ only exist in the ONU's database, copy these to the OLT/OpenOMCI
+ database
+
+ o For all other writeable attributes, the OLT value takes precedence
+
+ :param attrs: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute)
+ points to the specific ME instance where attributes
+ are different
+ :param onu_db: (dict) ONU Database snapshot at time of audit
+ :param olt_db: (dict) OLT Database snapshot at time of audit
+
+ :return: (int, int) successes, failures
+ """
+ successes = 0
+ failures = 0
+ me_map = self._device.me_map
+
+ # Collect up attributes on a per CID/EID basis. This will result in
+ # the minimal number of operations to either the database of over
+ # the OMCI-CC to the ONU
+
+ attr_map = dict()
+ for cid, eid, attribute in attrs:
+ if (cid, eid) not in attr_map:
+ attr_map[(cid, eid)] = {attribute}
+ else:
+ attr_map[(cid, eid)].add(attribute)
+
+ for entity_pair, attributes in attr_map.items():
+ cid = entity_pair[0]
+ eid = entity_pair[1]
+
+ # Skip MEs we cannot encode/decode
+ if cid not in me_map:
+ self.log.warn('no-me-map-decoder', class_id=cid)
+ failures += 1
+ continue
+
+ if self.deferred.called: # Check if task canceled
+ break
+
+ # Build up MIB set commands and ONU Set (via OMCI) commands
+ # based of the attributes
+ me_entry = me_map[cid]
+ mib_data_to_save = dict()
+ onu_data_to_set = dict()
+ olt_attributes = olt_db[cid][eid][ATTRIBUTES_KEY]
+ onu_attributes = onu_db[cid][eid][ATTRIBUTES_KEY]
+
+ for attribute in attributes:
+ map_access = next((attr.access for attr in me_entry.attributes
+ if attr.field.name == attribute), set())
+ writeable = AA.Writable in map_access or AA.SetByCreate in map_access
+
+ # If only in ONU database snapshot, save it to OLT
+ if attribute in onu_attributes and attribute not in olt_attributes:
+ # On onu only
+ mib_data_to_save[attribute] = onu_attributes[attribute]
+
+ elif writeable:
+ # On olt only or in both. Either way OLT wins
+ onu_data_to_set[attribute] = olt_attributes[attribute]
+
+ # Now do the bulk operations For both, check to see if the target
+ # is still the same as when the audit was performed. If it is, do
+ # the commit. If not, mark as a failure so an expedited audit will
+ # occur and check again.
+
+ if len(mib_data_to_save):
+ results = yield self.fix_attributes_only_in_mib(cid, eid, mib_data_to_save)
+ successes += results[0]
+ failures += results[1]
+
+ if len(onu_data_to_set):
+ results = yield self.fix_attributes_only_on_olt(cid, eid, onu_data_to_set, olt_db, me_entry)
+ successes += results[0]
+ failures += results[1]
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only_in_mib(self, cid, eid, mib_data):
+ successes = 0
+ failures = 0
+ try:
+ # Get current and verify same as during audit it is missing from our DB
+ attributes = mib_data.keys()
+ current_entry = self._device.query_mib(cid, eid, attributes)
+
+ if current_entry is not None and len(current_entry):
+ clashes = {k: v for k, v in current_entry[ATTRIBUTES_KEY].items()
+ if k in attributes and v != mib_data[k]}
+
+ if len(clashes):
+ raise ValueError('Existing DB entry for {}/{} attributes clash with audit data. Clash: {}'.
+ format(cid, eid, clashes))
+
+ self._sync_sm.mib_set(cid, eid, mib_data)
+ successes += len(mib_data)
+ self.strobe_watchdog()
+
+ # If we do nothing but DB updates for ALOT of MEs, we are
+ # blocking other async twisted tasks, be kind and yield
+ self._db_updates += 1
+ if self._db_updates >= MibReconcileTask.max_sequential_db_updates:
+ self._db_updates = 0
+ self._local_deferred = yield asleep(MibReconcileTask.db_update_pause)
+
+ except ValueError as e:
+ self.log.debug('attribute-changed', e)
+ failures += len(mib_data)
+
+ except Exception as e:
+ self.log.exception('attribute-only-fix-mib', e=e, cid=cid, eid=eid)
+ failures += len(mib_data)
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def fix_attributes_only_on_olt(self, cid, eid, onu_data, olt_db, me_entry):
+ successes = 0
+ failures = 0
+
+ try:
+ # On olt only or in both. Either way OLT wins, first verify that
+ # the OLT version is still the same data that we want to
+ # update on the ONU. Verify the data for the OLT is the same as
+ # at time of audit
+ olt_db_entries = {k: v for k, v in olt_db[cid][eid][ATTRIBUTES_KEY].items()
+ if k in onu_data.keys()}
+ current_entries = self._sync_sm.query_mib(class_id=cid, instance_id=eid,
+ attributes=onu_data.keys())
+
+ still_the_same = all(current_entries.get(k) == v for k, v in olt_db_entries.items())
+ if not still_the_same:
+ returnValue((0, len(onu_data))) # Wait for it to stabilize
+
+ # OLT data still matches, do the set operations now
+ # while len(onu_data):
+ attributes_mask = me_entry.mask_for(*onu_data.keys())
+ frame = OmciFrame(transaction_id=None,
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(entity_class=cid,
+ entity_id=eid,
+ attributes_mask=attributes_mask,
+ data=onu_data))
+
+ results = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(results, 'onu-attribute-update')
+ successes += len(onu_data)
+ self._db_updates = 0
+
+ except Exception as e:
+ self.log.exception('attribute-only-fix-onu', e=e, cid=cid, eid=eid)
+ failures += len(onu_data)
+ self.strobe_watchdog()
+
+ returnValue((successes, failures))
+
+ @inlineCallbacks
+ def update_mib_data_sync(self):
+ """
+ As the final step of MIB resynchronization, the OLT sets the MIB data sync
+ attribute of the ONU data ME to some suitable value of its own choice. It
+ then sets its own record of the same attribute to the same value,
+ incremented by 1, as explained in clause
+
+ :return: (int, int) success, failure counts
+ """
+ # Get MDS to set, do not user zero
+
+ new_mds_value = self._sync_sm.mib_data_sync
+ if new_mds_value == 0:
+ self._sync_sm.increment_mib_data_sync()
+ new_mds_value = self._sync_sm.mib_data_sync
+
+ # Update it. The set response will be sent on the OMCI-CC pub/sub bus
+ # and the MIB Synchronizer will update this MDS value in the database
+ # if successful.
+ try:
+ frame = OntDataFrame(mib_data_sync=new_mds_value).set()
+
+ results = yield self._device.omci_cc.send(frame)
+ self.check_status_and_state(results, 'ont-data-mbs-update')
+ returnValue((1, 0))
+
+ except TimeoutError as e:
+ self.log.debug('ont-data-send-timeout', e=e)
+ returnValue((0, 1))
+
+ except Exception as e:
+ self.log.exception('ont-data-send', e=e, mds=new_mds_value)
+ returnValue((0, 1))
+
+ def check_status_and_state(self, results, operation=''):
+ """
+ Check the results of an OMCI response. An exception is thrown
+ if the task was cancelled or an error was detected.
+
+ :param results: (OmciFrame) OMCI Response frame
+ :param operation: (str) what operation was being performed
+ :return: True if successful, False if the entity existed (already created)
+ """
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a')
+ failed_mask = omci_msg.get('failed_attributes_mask', 'n/a')
+ unsupported_mask = omci_msg.get('unsupported_attributes_mask', 'n/a')
+ self.strobe_watchdog()
+
+ self.log.debug(operation, status=status, error_mask=error_mask,
+ failed_mask=failed_mask, unsupported_mask=unsupported_mask)
+
+ if status == RC.Success:
+ return True
+
+ elif status == RC.InstanceExists:
+ return False
+
+ msg = '{} failed with a status of {}, error_mask: {}, failed_mask: {}, unsupported_mask: {}'.\
+ format(operation, status, error_mask, failed_mask, unsupported_mask)
+
+ raise MibReconcileException(msg)
+
+ def _undecodable(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid not in me_map]
+
+ def _onu_created(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and
+ OP.Create not in me_map[cid].mandatory_operations and
+ OP.Create not in me_map[cid].optional_operations]
+
+ def _olt_created(self, cid_eid_list, me_map):
+ return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and
+ OP.Create in me_map[cid].mandatory_operations or
+ OP.Create in me_map[cid].optional_operations]
diff --git a/voltha/extensions/omci/tasks/mib_resync_task.py b/voltha/extensions/omci/tasks/mib_resync_task.py
index a6b6892..52cf0b0 100644
--- a/voltha/extensions/omci/tasks/mib_resync_task.py
+++ b/voltha/extensions/omci/tasks/mib_resync_task.py
@@ -19,9 +19,10 @@
from common.utils.asleep import asleep
from voltha.extensions.omci.database.mib_db_dict import *
from voltha.extensions.omci.omci_entities import OntData
-from voltha.extensions.omci.omci_defs import AttributeAccess
-AA = AttributeAccess
+from voltha.extensions.omci.omci_defs import AttributeAccess, EntityOperations
+AA = AttributeAccess
+OP = EntityOperations
class MibCopyException(Exception):
pass
@@ -59,7 +60,7 @@
"""
Class initialization
- :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent
:param device_id: (str) ONU Device ID
"""
super(MibResyncTask, self).__init__(MibResyncTask.name,
@@ -144,14 +145,17 @@
self.deferred.errback(failure.Failure(e))
else:
# Compare the databases
+ active_copy = self._db_active.query(self.device_id)
on_olt_only, on_onu_only, attr_diffs = \
- self.compare_mibs(db_copy, self._db_active.query(self.device_id))
+ self.compare_mibs(db_copy, active_copy)
self.deferred.callback(
{
'on-olt-only': on_olt_only if len(on_olt_only) else None,
'on-onu-only': on_onu_only if len(on_onu_only) else None,
- 'attr-diffs': attr_diffs if len(attr_diffs) else None
+ 'attr-diffs': attr_diffs if len(attr_diffs) else None,
+ 'olt-db': db_copy,
+ 'onu-db': active_copy
})
except Exception as e:
@@ -253,10 +257,15 @@
# Filter out the 'mib_data_sync' from the database. We save that at
# the device level and do not want it showing up during a re-sync
# during data comparison
-
+ from binascii import hexlify
if class_id == OntData.class_id:
break
+ # The T&W ONU reports an ME with class ID 0 but only on audit. Perhaps others do as well.
+ if class_id == 0 or class_id > 0xFFFF:
+ self.log.warn('invalid-class-id', class_id=class_id)
+ break
+
attributes = {k: v for k, v in omci_msg['object_data'].items()}
# Save to the database
@@ -285,21 +294,34 @@
:param db_copy: (dict) OpenOMCI's copy of the database
:param db_active: (dict) ONU's database snapshot
- :return: (dict), (dict), dict() Differences
+ :return: (dict), (dict), (list) Differences
"""
self.strobe_watchdog()
+ me_map = self.omci_agent.get_device(self.device_id).me_map
# Class & Entities only in local copy (OpenOMCI)
- on_olt_only = self.get_lsh_only_dict(db_copy, db_active)
+ on_olt_temp = self.get_lhs_only_dict(db_copy, db_active)
+
+ # Remove any entries that are not reported during an upload (but could
+ # be in our database copy. Retain undecodable class IDs.
+ on_olt_only = [(cid, eid) for cid, eid in on_olt_temp
+ if cid not in me_map or not me_map[cid].hidden]
+
+ # Further reduce the on_olt_only MEs reported in an audit to not
+ # include missed MEs that are ONU created. Not all ONUs report MEs
+ # that are ONU created unless we are doing the initial MIB upload.
+ # Adtran does report them, T&W may not as well as a few others
+ on_olt_only = [(cid, eid) for cid, eid in on_olt_only if cid in me_map and
+ (OP.Create in me_map[cid].mandatory_operations or
+ OP.Create in me_map[cid].optional_operations)]
# Class & Entities only on remote (ONU)
- on_onu_only = self.get_lsh_only_dict(db_active, db_copy)
+ on_onu_only = self.get_lhs_only_dict(db_active, db_copy)
# Class & Entities on both local & remote, but one or more attributes
# are different on the ONU. This is the value that the local (OpenOMCI)
# thinks should be on the remote (ONU)
- me_map = self.omci_agent.get_device(self.device_id).me_map
attr_diffs = self.get_attribute_diffs(db_copy, db_active, me_map)
# TODO: Note that certain MEs are excluded from the MIB upload. In particular,
@@ -310,7 +332,7 @@
return on_olt_only, on_onu_only, attr_diffs
- def get_lsh_only_dict(self, lhs, rhs):
+ def get_lhs_only_dict(self, lhs, rhs):
"""
Compare two MIB database dictionaries and return the ME Class ID and
instances that are unique to the lhs dictionary. Both parameters
@@ -376,7 +398,7 @@
# Weed out read-only attributes. Attributes on onu may be read-only. These
# will only show up it the OpenOMCI (OLT-side) database if it changed and
# an AVC Notification was sourced by the ONU
- # TODO: These could be calculated once at ONU startup (device add)
+ # TODO: These class IDs could be calculated once at ONU startup (at device add)
if cls_id in me_map:
ro_attrs = {attr.field.name for attr in me_map[cls_id].attributes
if attr.access == ro_set}
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index 0db4e5d..0a5b1ec 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -15,7 +15,7 @@
#
from task import Task
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, failure, returnValue
+from twisted.internet.defer import failure, inlineCallbacks, TimeoutError, returnValue
from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from voltha.extensions.omci.omci_me import MEFrame
from voltha.extensions.omci.omci_frame import OmciFrame
@@ -195,6 +195,9 @@
self.log.info('get-completed')
self.deferred.callback(self)
+ except TimeoutError as e:
+ self.deferred.errback(failure.Failure(e))
+
except Exception as e:
self.log.exception('perform-get', e=e, class_id=self._entity_class,
entity_id=self._entity_id, attributes=self._attributes)
@@ -246,6 +249,9 @@
results_omci['data'].update(get_omci['data'])
+ except TimeoutError:
+ self.log.debug('missing-timeout')
+
except Exception as e:
self.log.exception('missing-failure', e=e)
@@ -291,6 +297,9 @@
tmp_results.fields['omci_message'].fields['data'][attr] = \
results.fields['omci_message'].fields['data'][attr]
+ except TimeoutError as e:
+ self.log.debug('attr-timeout')
+
except Exception as e:
self.log.exception('attr-failure', e=e)