Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2018 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | from pyvoltha.common.utils.asleep import asleep |
| 18 | from pyvoltha.adapters.extensions.omci.tasks.task import Task |
| 19 | from twisted.internet import reactor |
| 20 | from twisted.internet.defer import inlineCallbacks, failure, returnValue, TimeoutError |
| 21 | from pyvoltha.adapters.extensions.omci.omci_defs import * |
| 22 | from pyvoltha.adapters.extensions.omci.omci_me import OntDataFrame |
| 23 | from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame, OmciDelete, OmciCreate, OmciSet |
Matt Jeanneret | d55eb15 | 2019-10-08 13:59:23 -0400 | [diff] [blame] | 24 | from pyvoltha.adapters.extensions.omci.omci_fields import OmciTableField |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 25 | from pyvoltha.adapters.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY |
| 26 | |
| 27 | OP = EntityOperations |
| 28 | RC = ReasonCodes |
| 29 | AA = AttributeAccess |
| 30 | |
| 31 | |
| 32 | class MibReconcileException(Exception): |
| 33 | pass |
| 34 | |
| 35 | |
| 36 | class MibPartialSuccessException(Exception): |
| 37 | pass |
| 38 | |
| 39 | |
| 40 | class MibReconcileTask(Task): |
| 41 | """ |
| 42 | OpenOMCI MIB Reconcile Task |
| 43 | |
| 44 | This task attempts to resynchronize the MIB. Note that it runs in exclusive |
| 45 | OMCI-CC mode so that it can query the current database/ONU to verify the |
| 46 | differences still exist before correcting them. |
| 47 | """ |
| 48 | task_priority = 240 |
| 49 | name = "MIB Reconcile Task" |
| 50 | max_sequential_db_updates = 5 # Be kind, rewind |
| 51 | db_update_pause = 0.05 # 50mS |
| 52 | |
| 53 | def __init__(self, omci_agent, device_id, diffs): |
| 54 | """ |
| 55 | Class initialization |
| 56 | |
| 57 | :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent |
| 58 | :param device_id: (str) ONU Device ID |
| 59 | :param diffs: (dict) Dictionary of what was found to be invalid |
| 60 | """ |
| 61 | super(MibReconcileTask, self).__init__(MibReconcileTask.name, |
| 62 | omci_agent, |
| 63 | device_id, |
| 64 | priority=MibReconcileTask.task_priority, |
| 65 | exclusive=False) |
| 66 | self._local_deferred = None |
| 67 | self._diffs = diffs |
| 68 | self._device = None |
| 69 | self._sync_sm = None |
| 70 | self._db_updates = 0 # For tracking sequential blocking consul/etcd updates |
| 71 | |
| 72 | def cancel_deferred(self): |
| 73 | super(MibReconcileTask, self).cancel_deferred() |
| 74 | |
| 75 | d, self._local_deferred = self._local_deferred, None |
| 76 | try: |
| 77 | if d is not None and not d.called: |
| 78 | d.cancel() |
| 79 | except: |
| 80 | pass |
| 81 | |
| 82 | def start(self): |
| 83 | """ |
| 84 | Start MIB Reconcile task |
| 85 | """ |
| 86 | super(MibReconcileTask, self).start() |
| 87 | |
| 88 | self._device = self.omci_agent.get_device(self.device_id) |
| 89 | |
| 90 | if self._device is None: |
| 91 | e = MibReconcileException('Device {} no longer exists'.format(self.device_id)) |
| 92 | self.deferred.errback(failure.Failure(e)) |
| 93 | return |
| 94 | |
| 95 | self._sync_sm = self._device.mib_synchronizer |
| 96 | |
| 97 | if self._device is None: |
| 98 | e = MibReconcileException('Device {} MIB State machine no longer exists'.format(self.device_id)) |
| 99 | self.deferred.errback(failure.Failure(e)) |
| 100 | return |
| 101 | |
| 102 | self._local_deferred = reactor.callLater(0, self.perform_mib_reconcile) |
| 103 | |
| 104 | def stop(self): |
| 105 | """ |
| 106 | Shutdown MIB Reconcile task |
| 107 | """ |
| 108 | self.log.debug('stopping') |
| 109 | |
| 110 | self.cancel_deferred() |
| 111 | self._device = None |
| 112 | super(MibReconcileTask, self).stop() |
| 113 | |
| 114 | @inlineCallbacks |
| 115 | def perform_mib_reconcile(self): |
| 116 | """ |
| 117 | Perform the MIB Reconciliation sequence. |
| 118 | |
| 119 | The sequence to reconcile will be to clean up ONU only MEs, followed by |
| 120 | OLT/OpenOMCI-only MEs, and then finally correct common MEs with differing |
| 121 | attributes. |
| 122 | """ |
| 123 | self.log.debug('perform-mib-reconcile') |
| 124 | |
| 125 | try: |
| 126 | successes = 0 |
| 127 | failures = 0 |
| 128 | |
| 129 | if self._diffs['onu-only'] is not None and len(self._diffs['onu-only']): |
| 130 | results = yield self.fix_onu_only(self._diffs['onu-only'], |
| 131 | self._diffs['onu-db']) |
| 132 | self.log.debug('onu-only-results', good=results[0], bad=results[1]) |
| 133 | successes += results[0] |
| 134 | failures += results[1] |
| 135 | |
| 136 | if self._diffs['olt-only'] is not None and len(self._diffs['olt-only']): |
| 137 | results = yield self.fix_olt_only(self._diffs['olt-only'], |
| 138 | self._diffs['onu-db'], |
| 139 | self._diffs['olt-db']) |
| 140 | self.log.debug('olt-only-results', good=results[0], bad=results[1]) |
| 141 | successes += results[0] |
| 142 | failures += results[1] |
| 143 | |
| 144 | if self._diffs['attributes'] is not None and len(self._diffs['attributes']): |
| 145 | results = yield self.fix_attributes_only(self._diffs['attributes'], |
| 146 | self._diffs['onu-db'], |
| 147 | self._diffs['olt-db']) |
| 148 | self.log.debug('attributes-results', good=results[0], bad=results[1]) |
| 149 | successes += results[0] |
| 150 | failures += results[1] |
| 151 | |
| 152 | # Success? Update MIB-data-sync |
| 153 | if failures == 0: |
| 154 | results = yield self.update_mib_data_sync() |
| 155 | successes += results[0] |
| 156 | failures += results[1] |
| 157 | |
| 158 | # Send back final status |
| 159 | if failures > 0: |
| 160 | msg = '{} Successful updates, {} failures'.format(successes, failure) |
| 161 | error = MibPartialSuccessException(msg) if successes \ |
| 162 | else MibReconcileException(msg) |
| 163 | self.deferred.errback(failure.Failure(error)) |
| 164 | else: |
| 165 | self.deferred.callback('{} Successful updates'.format(successes)) |
| 166 | |
| 167 | except Exception as e: |
| 168 | if not self.deferred.called: |
| 169 | self.log.exception('reconcile', e=e) |
| 170 | self.deferred.errback(failure.Failure(e)) |
| 171 | |
| 172 | @inlineCallbacks |
| 173 | def fix_onu_only(self, onu, onu_db): |
| 174 | """ |
| 175 | Fix ME's that were only found on the ONU. For ONU only MEs there are |
| 176 | the following things that will be checked. |
| 177 | |
| 178 | o ME's that do not have an OpenOMCI class decoder. These are stored |
| 179 | as binary blobs in the MIB database. Since we do not ever set them |
| 180 | (since no encoder as well), just store them in the OLT/OpenOMCI MIB |
| 181 | Database. |
| 182 | |
| 183 | o For ME's that are created by the ONU (no create/delete access), the |
| 184 | MEs 'may' be due to a firmware upgrade and reboot or in response to |
| 185 | an OLT creating another ME entity and then creating this ME. Place |
| 186 | these 'new' into the database. |
| 187 | |
| 188 | o For ME's that are created by the OLT/OpenOMCI, delete them from the |
| 189 | ONU |
| 190 | |
| 191 | :param onu: (list(int,int)) List of tuples where (class_id, inst_id) |
| 192 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 193 | |
| 194 | :return: (int, int) successes, failures |
| 195 | """ |
| 196 | successes = 0 |
| 197 | failures = 0 |
| 198 | me_map = self._device.me_map |
| 199 | |
| 200 | #################################################################### |
| 201 | # First the undecodables and onu-created (treated the same) |
| 202 | undecodable = self._undecodable(onu, me_map) |
| 203 | onu_created = self._onu_created(onu, me_map) |
| 204 | |
| 205 | if len(undecodable) or len(onu_created): |
| 206 | results = yield self.fix_onu_only_save_to_db(undecodable, onu_created, onu_db) |
| 207 | successes += results[0] |
| 208 | failures += results[1] |
| 209 | |
| 210 | #################################################################### |
| 211 | # Last the OLT created values, resend these to the ONU |
| 212 | |
| 213 | olt_created = self._olt_created(onu, me_map) |
| 214 | if len(olt_created): |
| 215 | results = yield self.fix_onu_only_remove_from_onu(olt_created) |
| 216 | successes += results[0] |
| 217 | failures += results[1] |
| 218 | |
| 219 | returnValue((successes, failures)) |
| 220 | |
| 221 | @inlineCallbacks |
| 222 | def fix_onu_only_save_to_db(self, undecodable, onu_created, onu_db): |
| 223 | """ |
| 224 | In ONU database and needs to be saved to OLT/OpenOMCI database. |
| 225 | |
| 226 | Note that some, perhaps all, of these instances could be ONU create |
| 227 | in response to the OLT creating some other ME instance. So treat |
| 228 | the Database operation as a create. |
| 229 | """ |
| 230 | successes = 0 |
| 231 | failures = 0 |
| 232 | |
| 233 | for cid, eid in undecodable + onu_created: |
| 234 | if self.deferred.called: # Check if task canceled |
| 235 | break |
| 236 | try: |
| 237 | # If in current MIB, had an audit issue or other MIB operation |
| 238 | # put it into the database, declare it a failure so we audit again |
| 239 | try: |
| 240 | olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 241 | |
| 242 | except KeyError: # Common for ONU created MEs during audit |
| 243 | olt_entry = None |
| 244 | |
| 245 | if olt_entry is not None and len(olt_entry): |
| 246 | self.log.debug('onu-only-in-current', cid=cid, eid=eid) |
| 247 | failures += 1 # Mark as failure so we audit again |
| 248 | |
| 249 | elif cid not in onu_db: |
| 250 | self.log.warn('onu-only-not-in-audit', cid=cid, eid=eid) |
| 251 | failures += 1 |
| 252 | |
| 253 | else: |
| 254 | entry = onu_db[cid][eid] |
| 255 | self.strobe_watchdog() |
| 256 | self._sync_sm.mib_set(cid, eid, entry[ATTRIBUTES_KEY]) |
| 257 | successes += 1 |
| 258 | |
| 259 | # If we do nothing but DB updates for ALOT of MEs, we are |
| 260 | # blocking other async twisted tasks, be kind and pause |
| 261 | self._db_updates += 1 |
| 262 | |
| 263 | if self._db_updates >= MibReconcileTask.max_sequential_db_updates: |
| 264 | self._db_updates = 0 |
| 265 | self._local_deferred = yield asleep(MibReconcileTask.db_update_pause) |
| 266 | |
| 267 | except Exception as e: |
| 268 | self.log.warn('onu-only-error', e=e) |
| 269 | failures += 1 |
| 270 | |
| 271 | returnValue((successes, failures)) |
| 272 | |
| 273 | @inlineCallbacks |
| 274 | def fix_onu_only_remove_from_onu(self, olt_created,): |
| 275 | """ On ONU, but no longer on OLT/OpenOMCI, delete it """ |
| 276 | successes = 0 |
| 277 | failures = 0 |
| 278 | |
| 279 | for cid, eid in olt_created: |
| 280 | if self.deferred.called: # Check if task canceled |
| 281 | break |
| 282 | try: |
| 283 | # If in current MIB, had an audit issue, declare it an error |
| 284 | # and next audit should clear it up |
| 285 | try: |
| 286 | current_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 287 | |
| 288 | except KeyError: |
| 289 | # Expected if no other entities with same class present in MIB |
| 290 | current_entry = None |
| 291 | |
| 292 | if current_entry is not None and len(current_entry): |
| 293 | self.log.debug('onu-only-in-current', cid=cid, eid=eid) |
| 294 | failures += 1 |
| 295 | |
| 296 | else: |
| 297 | # Delete it from the ONU. Assume success |
| 298 | frame = OmciFrame(transaction_id=None, |
| 299 | message_type=OmciDelete.message_id, |
| 300 | omci_message=OmciDelete(entity_class=cid, entity_id=eid)) |
| 301 | |
| 302 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 303 | self.check_status_and_state(self._local_deferred, 'onu-attribute-update') |
| 304 | successes += 1 |
| 305 | self._db_updates = 0 |
| 306 | |
| 307 | except Exception as e: |
| 308 | self.log.warn('olt-only-error', e=e) |
| 309 | failures += 1 |
| 310 | self.strobe_watchdog() |
| 311 | |
| 312 | returnValue((successes, failures)) |
| 313 | |
| 314 | @inlineCallbacks |
| 315 | def fix_olt_only(self, olt, onu_db, olt_db): |
| 316 | """ |
| 317 | Fix ME's that were only found on the OLT. For OLT only MEs there are |
| 318 | the following things that will be checked. |
| 319 | |
| 320 | o ME's that do not have an OpenOMCI class decoder. These are stored |
| 321 | as binary blobs in the MIB database. Since the OLT will never |
| 322 | create these (all are learned from ONU), it is assumed the ONU |
| 323 | has removed them for some purpose. So delete them from the OLT |
| 324 | database. |
| 325 | |
| 326 | o For ME's that are created by the ONU (no create/delete access), the |
| 327 | MEs 'may' not be on the ONU because of a reboot or an OLT created |
| 328 | ME was deleted and the ONU gratuitously removes it. So delete them |
| 329 | from the OLT database. |
| 330 | |
| 331 | o For ME's that are created by the OLT/OpenOMCI, delete them from the |
| 332 | ONU |
| 333 | |
| 334 | :param olt: (list(int,int)) List of tuples where (class_id, inst_id) |
| 335 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 336 | :param olt_db: (dict) OLT Database snapshot at time of audit |
| 337 | |
| 338 | :return: (int, int) successes, failures |
| 339 | """ |
| 340 | successes = 0 |
| 341 | failures = 0 |
| 342 | me_map = self._device.me_map |
| 343 | |
| 344 | #################################################################### |
| 345 | # First the undecodables and onu-created (treated the same) remove |
| 346 | # from OpenOMCI database |
| 347 | undecodable = self._undecodable(olt, me_map) |
| 348 | onu_created = self._onu_created(olt, me_map) |
| 349 | |
| 350 | if len(undecodable) or len(onu_created): |
| 351 | good, bad = self.fix_olt_only_remove_from_db(undecodable, onu_created) |
| 352 | successes += good |
| 353 | failures += bad |
| 354 | |
| 355 | #################################################################### |
| 356 | # Last the OLT created |
| 357 | |
| 358 | olt_created = self._olt_created(olt, me_map) |
| 359 | if len(olt_created): |
| 360 | results = yield self.fix_olt_only_create_on_onu(olt_created, me_map) |
| 361 | successes += results[0] |
| 362 | failures += results[1] |
| 363 | |
| 364 | returnValue((successes, failures)) |
| 365 | |
| 366 | def fix_olt_only_remove_from_db(self, undecodable, onu_created): |
| 367 | """ On OLT, but not on ONU and are ONU created, delete from OLT/OpenOMCI DB """ |
| 368 | successes = 0 |
| 369 | failures = 0 |
| 370 | |
| 371 | for cid, eid in undecodable + onu_created: |
| 372 | if self.deferred.called: # Check if task canceled |
| 373 | break |
| 374 | try: |
| 375 | # Delete it. If already deleted (KeyError), then that is okay |
| 376 | self._sync_sm.mib_delete(cid, eid) |
| 377 | self.strobe_watchdog() |
| 378 | |
| 379 | except KeyError: |
| 380 | successes += 1 # Not found in DB anymore, assume success |
| 381 | |
| 382 | except Exception as e: |
| 383 | self.log.warn('olt-only-db-error', cid=cid, eid=eid, e=e) |
| 384 | failures += 1 |
| 385 | |
| 386 | return successes, failures |
| 387 | |
| 388 | @inlineCallbacks |
| 389 | def fix_olt_only_create_on_onu(self, olt_created, me_map): |
| 390 | """ Found on OLT and created by OLT, so create on ONU""" |
| 391 | successes = 0 |
| 392 | failures = 0 |
| 393 | |
| 394 | for cid, eid in olt_created: |
| 395 | if self.deferred.called: # Check if task canceled |
| 396 | break |
| 397 | |
| 398 | try: |
| 399 | # Get current entry, use it if found |
| 400 | olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 401 | me_entry = me_map[cid] |
| 402 | |
| 403 | if olt_entry is None or len(olt_entry) == 0: |
| 404 | successes += 1 # Deleted before task got to run |
| 405 | else: |
| 406 | # Create it in the ONU. Only set-by-create attributes allowed |
| 407 | sbc_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items() |
| 408 | if AA.SetByCreate in |
| 409 | next((attr.access for attr in me_entry.attributes |
| 410 | if attr.field.name == k), set())} |
| 411 | |
| 412 | frame = OmciFrame(transaction_id=None, |
| 413 | message_type=OmciCreate.message_id, |
| 414 | omci_message=OmciCreate(entity_class=cid, |
| 415 | entity_id=eid, |
| 416 | data=sbc_data)) |
| 417 | |
| 418 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 419 | self.check_status_and_state(self._local_deferred, 'olt-create-sbc') |
| 420 | successes += 1 |
| 421 | self._db_updates = 0 |
| 422 | |
| 423 | # Try any writeable attributes now (but not set-by-create) |
Matt Jeanneret | d55eb15 | 2019-10-08 13:59:23 -0400 | [diff] [blame] | 424 | writeable_data = dict() |
| 425 | table_data = dict() |
| 426 | for key, value in olt_entry[ATTRIBUTES_KEY].items(): |
| 427 | for attr in me_entry.attributes: |
| 428 | if AA.SetByCreate in attr.access: |
| 429 | continue |
| 430 | if AA.Writable in attr.access: |
| 431 | if attr.field.name == key: |
| 432 | if isinstance(attr.field, OmciTableField): |
| 433 | table_data[key] = value |
| 434 | else: |
| 435 | writeable_data[key] = value |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 436 | |
| 437 | if len(writeable_data): |
| 438 | attributes_mask = me_entry.mask_for(*writeable_data.keys()) |
| 439 | frame = OmciFrame(transaction_id=None, |
| 440 | message_type=OmciSet.message_id, |
| 441 | omci_message=OmciSet(entity_class=cid, |
| 442 | entity_id=eid, |
| 443 | attributes_mask=attributes_mask, |
| 444 | data=writeable_data)) |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 445 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 446 | self.check_status_and_state(self._local_deferred, 'olt-set-writeable') |
| 447 | successes += 1 |
| 448 | |
Matt Jeanneret | d55eb15 | 2019-10-08 13:59:23 -0400 | [diff] [blame] | 449 | for key, value in table_data.items(): |
| 450 | for row in value: |
| 451 | setvalue = { key : row } |
| 452 | attributes_mask = me_entry.mask_for(*setvalue.keys()) |
| 453 | frame = OmciFrame(transaction_id=None, |
| 454 | message_type=OmciSet.message_id, |
| 455 | omci_message=OmciSet(entity_class=cid, |
| 456 | entity_id=eid, |
| 457 | attributes_mask=attributes_mask, |
| 458 | data=setvalue)) |
| 459 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 460 | self.check_status_and_state(self._local_deferred, 'olt-set-table') |
| 461 | successes += 1 |
| 462 | |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 463 | except Exception as e: |
| 464 | self.log.exception('olt-only-fix', e=e, cid=cid, eid=eid) |
| 465 | failures += 1 |
| 466 | self.strobe_watchdog() |
| 467 | |
| 468 | returnValue((successes, failures)) |
| 469 | |
| 470 | @inlineCallbacks |
| 471 | def fix_attributes_only(self, attrs, onu_db, olt_db): |
| 472 | """ |
| 473 | Fix ME's that were found on both the ONU and OLT, but had differing |
| 474 | attribute values. There are several cases to handle here |
| 475 | |
| 476 | o For ME's created on the ONU that have write attributes that |
| 477 | only exist in the ONU's database, copy these to the OLT/OpenOMCI |
| 478 | database |
| 479 | |
| 480 | o For all other writeable attributes, the OLT value takes precedence |
| 481 | |
| 482 | :param attrs: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute) |
| 483 | points to the specific ME instance where attributes |
| 484 | are different |
| 485 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 486 | :param olt_db: (dict) OLT Database snapshot at time of audit |
| 487 | |
| 488 | :return: (int, int) successes, failures |
| 489 | """ |
| 490 | successes = 0 |
| 491 | failures = 0 |
| 492 | me_map = self._device.me_map |
| 493 | |
| 494 | # Collect up attributes on a per CID/EID basis. This will result in |
| 495 | # the minimal number of operations to either the database of over |
| 496 | # the OMCI-CC to the ONU |
| 497 | |
| 498 | attr_map = dict() |
| 499 | for cid, eid, attribute in attrs: |
| 500 | if (cid, eid) not in attr_map: |
| 501 | attr_map[(cid, eid)] = {attribute} |
| 502 | else: |
| 503 | attr_map[(cid, eid)].add(attribute) |
| 504 | |
| 505 | for entity_pair, attributes in attr_map.items(): |
| 506 | cid = entity_pair[0] |
| 507 | eid = entity_pair[1] |
| 508 | |
| 509 | # Skip MEs we cannot encode/decode |
| 510 | if cid not in me_map: |
| 511 | self.log.warn('no-me-map-decoder', class_id=cid) |
| 512 | failures += 1 |
| 513 | continue |
| 514 | |
| 515 | if self.deferred.called: # Check if task canceled |
| 516 | break |
| 517 | |
| 518 | # Build up MIB set commands and ONU Set (via OMCI) commands |
| 519 | # based of the attributes |
| 520 | me_entry = me_map[cid] |
| 521 | mib_data_to_save = dict() |
| 522 | onu_data_to_set = dict() |
| 523 | olt_attributes = olt_db[cid][eid][ATTRIBUTES_KEY] |
| 524 | onu_attributes = onu_db[cid][eid][ATTRIBUTES_KEY] |
| 525 | |
| 526 | for attribute in attributes: |
| 527 | map_access = next((attr.access for attr in me_entry.attributes |
| 528 | if attr.field.name == attribute), set()) |
| 529 | writeable = AA.Writable in map_access or AA.SetByCreate in map_access |
| 530 | |
| 531 | # If only in ONU database snapshot, save it to OLT |
| 532 | if attribute in onu_attributes and attribute not in olt_attributes: |
| 533 | # On onu only |
| 534 | mib_data_to_save[attribute] = onu_attributes[attribute] |
| 535 | |
| 536 | elif writeable: |
| 537 | # On olt only or in both. Either way OLT wins |
| 538 | onu_data_to_set[attribute] = olt_attributes[attribute] |
| 539 | |
| 540 | # Now do the bulk operations For both, check to see if the target |
| 541 | # is still the same as when the audit was performed. If it is, do |
| 542 | # the commit. If not, mark as a failure so an expedited audit will |
| 543 | # occur and check again. |
| 544 | |
| 545 | if len(mib_data_to_save): |
| 546 | results = yield self.fix_attributes_only_in_mib(cid, eid, mib_data_to_save) |
| 547 | successes += results[0] |
| 548 | failures += results[1] |
| 549 | |
| 550 | if len(onu_data_to_set): |
| 551 | results = yield self.fix_attributes_only_on_olt(cid, eid, onu_data_to_set, olt_db, me_entry) |
| 552 | successes += results[0] |
| 553 | failures += results[1] |
| 554 | |
| 555 | returnValue((successes, failures)) |
| 556 | |
| 557 | @inlineCallbacks |
| 558 | def fix_attributes_only_in_mib(self, cid, eid, mib_data): |
| 559 | successes = 0 |
| 560 | failures = 0 |
| 561 | try: |
| 562 | # Get current and verify same as during audit it is missing from our DB |
| 563 | attributes = mib_data.keys() |
| 564 | current_entry = self._device.query_mib(cid, eid, attributes) |
| 565 | |
| 566 | if current_entry is not None and len(current_entry): |
| 567 | clashes = {k: v for k, v in current_entry.items() |
| 568 | if k in attributes and v != mib_data[k]} |
| 569 | |
| 570 | if len(clashes): |
| 571 | raise ValueError('Existing DB entry for {}/{} attributes clash with audit data. Clash: {}'. |
| 572 | format(cid, eid, clashes)) |
| 573 | |
| 574 | self._sync_sm.mib_set(cid, eid, mib_data) |
| 575 | successes += len(mib_data) |
| 576 | self.strobe_watchdog() |
| 577 | |
| 578 | # If we do nothing but DB updates for ALOT of MEs, we are |
| 579 | # blocking other async twisted tasks, be kind and yield |
| 580 | self._db_updates += 1 |
| 581 | if self._db_updates >= MibReconcileTask.max_sequential_db_updates: |
| 582 | self._db_updates = 0 |
| 583 | self._local_deferred = yield asleep(MibReconcileTask.db_update_pause) |
| 584 | |
| 585 | except ValueError as e: |
| 586 | self.log.debug('attribute-changed', e) |
| 587 | failures += len(mib_data) |
| 588 | |
| 589 | except Exception as e: |
| 590 | self.log.exception('attribute-only-fix-mib', e=e, cid=cid, eid=eid) |
| 591 | failures += len(mib_data) |
| 592 | |
| 593 | returnValue((successes, failures)) |
| 594 | |
| 595 | @inlineCallbacks |
| 596 | def fix_attributes_only_on_olt(self, cid, eid, onu_data, olt_db, me_entry): |
| 597 | successes = 0 |
| 598 | failures = 0 |
| 599 | |
| 600 | try: |
| 601 | # On olt only or in both. Either way OLT wins, first verify that |
| 602 | # the OLT version is still the same data that we want to |
| 603 | # update on the ONU. Verify the data for the OLT is the same as |
| 604 | # at time of audit |
| 605 | olt_db_entries = {k: v for k, v in olt_db[cid][eid][ATTRIBUTES_KEY].items() |
| 606 | if k in onu_data.keys()} |
| 607 | current_entries = self._sync_sm.query_mib(class_id=cid, instance_id=eid, |
| 608 | attributes=onu_data.keys()) |
| 609 | |
| 610 | still_the_same = all(current_entries.get(k) == v for k, v in olt_db_entries.items()) |
| 611 | if not still_the_same: |
| 612 | returnValue((0, len(onu_data))) # Wait for it to stabilize |
| 613 | |
| 614 | # OLT data still matches, do the set operations now |
| 615 | # while len(onu_data): |
| 616 | attributes_mask = me_entry.mask_for(*onu_data.keys()) |
| 617 | frame = OmciFrame(transaction_id=None, |
| 618 | message_type=OmciSet.message_id, |
| 619 | omci_message=OmciSet(entity_class=cid, |
| 620 | entity_id=eid, |
| 621 | attributes_mask=attributes_mask, |
| 622 | data=onu_data)) |
| 623 | |
| 624 | results = yield self._device.omci_cc.send(frame) |
| 625 | self.check_status_and_state(results, 'onu-attribute-update') |
| 626 | successes += len(onu_data) |
| 627 | self._db_updates = 0 |
| 628 | |
| 629 | except Exception as e: |
| 630 | self.log.exception('attribute-only-fix-onu', e=e, cid=cid, eid=eid) |
| 631 | failures += len(onu_data) |
| 632 | self.strobe_watchdog() |
| 633 | |
| 634 | returnValue((successes, failures)) |
| 635 | |
| 636 | @inlineCallbacks |
Matt Jeanneret | 9f01e49 | 2019-09-17 19:47:34 -0400 | [diff] [blame] | 637 | def _get_current_mds(self): |
| 638 | self.strobe_watchdog() |
| 639 | results = yield self._device.omci_cc.send(OntDataFrame().get()) |
| 640 | |
| 641 | omci_msg = results.fields['omci_message'].fields |
| 642 | status = omci_msg['success_code'] |
| 643 | mds = (omci_msg['data']['mib_data_sync'] >> 8) & 0xFF \ |
| 644 | if status == 0 and 'data' in omci_msg and 'mib_data_sync' in omci_msg['data'] else -1 |
| 645 | returnValue(mds) |
| 646 | |
| 647 | @inlineCallbacks |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 648 | def update_mib_data_sync(self): |
| 649 | """ |
| 650 | As the final step of MIB resynchronization, the OLT sets the MIB data sync |
| 651 | attribute of the ONU data ME to some suitable value of its own choice. It |
| 652 | then sets its own record of the same attribute to the same value, |
| 653 | incremented by 1, as explained in clause |
| 654 | |
| 655 | :return: (int, int) success, failure counts |
| 656 | """ |
Matt Jeanneret | 9f01e49 | 2019-09-17 19:47:34 -0400 | [diff] [blame] | 657 | # Get MDS to set |
| 658 | self._sync_sm.increment_mib_data_sync() |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 659 | new_mds_value = self._sync_sm.mib_data_sync |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 660 | |
| 661 | # Update it. The set response will be sent on the OMCI-CC pub/sub bus |
| 662 | # and the MIB Synchronizer will update this MDS value in the database |
| 663 | # if successful. |
| 664 | try: |
Matt Jeanneret | 9f01e49 | 2019-09-17 19:47:34 -0400 | [diff] [blame] | 665 | # previous_mds = yield self._get_current_mds() |
| 666 | |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 667 | frame = OntDataFrame(mib_data_sync=new_mds_value).set() |
| 668 | |
| 669 | results = yield self._device.omci_cc.send(frame) |
| 670 | self.check_status_and_state(results, 'ont-data-mbs-update') |
Matt Jeanneret | 9f01e49 | 2019-09-17 19:47:34 -0400 | [diff] [blame] | 671 | |
| 672 | ######################################### |
| 673 | # Debug. Verify new MDS value was received. Should be 1 greater |
| 674 | # than what was sent |
| 675 | # new_mds = yield self._get_current_mds() |
| 676 | # self.log.info('mds-update', previous=previous_mds, new=new_mds_value, now=new_mds) |
| 677 | # Done |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 678 | returnValue((1, 0)) |
| 679 | |
| 680 | except TimeoutError as e: |
| 681 | self.log.debug('ont-data-send-timeout', e=e) |
| 682 | returnValue((0, 1)) |
| 683 | |
| 684 | except Exception as e: |
| 685 | self.log.exception('ont-data-send', e=e, mds=new_mds_value) |
| 686 | returnValue((0, 1)) |
| 687 | |
| 688 | def check_status_and_state(self, results, operation=''): |
| 689 | """ |
| 690 | Check the results of an OMCI response. An exception is thrown |
| 691 | if the task was cancelled or an error was detected. |
| 692 | |
| 693 | :param results: (OmciFrame) OMCI Response frame |
| 694 | :param operation: (str) what operation was being performed |
| 695 | :return: True if successful, False if the entity existed (already created) |
| 696 | """ |
| 697 | omci_msg = results.fields['omci_message'].fields |
| 698 | status = omci_msg['success_code'] |
| 699 | error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a') |
| 700 | failed_mask = omci_msg.get('failed_attributes_mask', 'n/a') |
| 701 | unsupported_mask = omci_msg.get('unsupported_attributes_mask', 'n/a') |
| 702 | self.strobe_watchdog() |
| 703 | |
| 704 | self.log.debug(operation, status=status, error_mask=error_mask, |
| 705 | failed_mask=failed_mask, unsupported_mask=unsupported_mask) |
| 706 | |
| 707 | if status == RC.Success: |
| 708 | return True |
| 709 | |
| 710 | elif status == RC.InstanceExists: |
| 711 | return False |
| 712 | |
| 713 | msg = '{} failed with a status of {}, error_mask: {}, failed_mask: {}, unsupported_mask: {}'.\ |
| 714 | format(operation, status, error_mask, failed_mask, unsupported_mask) |
| 715 | |
| 716 | raise MibReconcileException(msg) |
| 717 | |
| 718 | def _undecodable(self, cid_eid_list, me_map): |
| 719 | return [(cid, eid) for cid, eid in cid_eid_list if cid not in me_map] |
| 720 | |
| 721 | def _onu_created(self, cid_eid_list, me_map): |
| 722 | return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and |
| 723 | (OP.Create not in me_map[cid].mandatory_operations and |
| 724 | OP.Create not in me_map[cid].optional_operations)] |
| 725 | |
| 726 | def _olt_created(self, cid_eid_list, me_map): |
| 727 | return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and |
| 728 | (OP.Create in me_map[cid].mandatory_operations or |
| 729 | OP.Create in me_map[cid].optional_operations)] |