Chip Boling | 32aab30 | 2019-01-23 10:50:18 -0600 | [diff] [blame^] | 1 | # |
| 2 | # Copyright 2018 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | from common.utils.asleep import asleep |
| 18 | from voltha.extensions.omci.tasks.task import Task |
| 19 | from twisted.internet import reactor |
| 20 | from twisted.internet.defer import inlineCallbacks, failure, returnValue, TimeoutError |
| 21 | from voltha.extensions.omci.omci_defs import * |
| 22 | from voltha.extensions.omci.omci_me import OntDataFrame |
| 23 | from voltha.extensions.omci.omci_frame import OmciFrame, OmciDelete, OmciCreate, OmciSet |
| 24 | from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY |
| 25 | |
| 26 | OP = EntityOperations |
| 27 | RC = ReasonCodes |
| 28 | AA = AttributeAccess |
| 29 | |
| 30 | |
| 31 | class MibReconcileException(Exception): |
| 32 | pass |
| 33 | |
| 34 | |
| 35 | class MibPartialSuccessException(Exception): |
| 36 | pass |
| 37 | |
| 38 | |
| 39 | class MibReconcileTask(Task): |
| 40 | """ |
| 41 | OpenOMCI MIB Reconcile Task |
| 42 | |
| 43 | This task attempts to resynchronize the MIB. Note that it runs in exclusive |
| 44 | OMCI-CC mode so that it can query the current database/ONU to verify the |
| 45 | differences still exist before correcting them. |
| 46 | """ |
| 47 | task_priority = 240 |
| 48 | name = "MIB Reconcile Task" |
| 49 | max_sequential_db_updates = 5 # Be kind, rewind |
| 50 | db_update_pause = 0.05 # 50mS |
| 51 | |
| 52 | def __init__(self, omci_agent, device_id, diffs): |
| 53 | """ |
| 54 | Class initialization |
| 55 | |
| 56 | :param omci_agent: (OpenOMCIAgent) OMCI Adapter agent |
| 57 | :param device_id: (str) ONU Device ID |
| 58 | :param diffs: (dict) Dictionary of what was found to be invalid |
| 59 | """ |
| 60 | super(MibReconcileTask, self).__init__(MibReconcileTask.name, |
| 61 | omci_agent, |
| 62 | device_id, |
| 63 | priority=MibReconcileTask.task_priority, |
| 64 | exclusive=False) |
| 65 | self._local_deferred = None |
| 66 | self._diffs = diffs |
| 67 | self._device = None |
| 68 | self._sync_sm = None |
| 69 | self._db_updates = 0 # For tracking sequential blocking consul/etcd updates |
| 70 | |
| 71 | def cancel_deferred(self): |
| 72 | super(MibReconcileTask, self).cancel_deferred() |
| 73 | |
| 74 | d, self._local_deferred = self._local_deferred, None |
| 75 | try: |
| 76 | if d is not None and not d.called: |
| 77 | d.cancel() |
| 78 | except: |
| 79 | pass |
| 80 | |
| 81 | def start(self): |
| 82 | """ |
| 83 | Start MIB Reconcile task |
| 84 | """ |
| 85 | super(MibReconcileTask, self).start() |
| 86 | |
| 87 | self._device = self.omci_agent.get_device(self.device_id) |
| 88 | |
| 89 | if self._device is None: |
| 90 | e = MibReconcileException('Device {} no longer exists'.format(self.device_id)) |
| 91 | self.deferred.errback(failure.Failure(e)) |
| 92 | return |
| 93 | |
| 94 | self._sync_sm = self._device.mib_synchronizer |
| 95 | |
| 96 | if self._device is None: |
| 97 | e = MibReconcileException('Device {} MIB State machine no longer exists'.format(self.device_id)) |
| 98 | self.deferred.errback(failure.Failure(e)) |
| 99 | return |
| 100 | |
| 101 | self._local_deferred = reactor.callLater(0, self.perform_mib_reconcile) |
| 102 | |
| 103 | def stop(self): |
| 104 | """ |
| 105 | Shutdown MIB Reconcile task |
| 106 | """ |
| 107 | self.log.debug('stopping') |
| 108 | |
| 109 | self.cancel_deferred() |
| 110 | self._device = None |
| 111 | super(MibReconcileTask, self).stop() |
| 112 | |
| 113 | @inlineCallbacks |
| 114 | def perform_mib_reconcile(self): |
| 115 | """ |
| 116 | Perform the MIB Reconciliation sequence. |
| 117 | |
| 118 | The sequence to reconcile will be to clean up ONU only MEs, followed by |
| 119 | OLT/OpenOMCI-only MEs, and then finally correct common MEs with differing |
| 120 | attributes. |
| 121 | """ |
| 122 | self.log.debug('perform-mib-reconcile') |
| 123 | |
| 124 | try: |
| 125 | successes = 0 |
| 126 | failures = 0 |
| 127 | |
| 128 | if self._diffs['onu-only'] is not None and len(self._diffs['onu-only']): |
| 129 | results = yield self.fix_onu_only(self._diffs['onu-only'], |
| 130 | self._diffs['onu-db']) |
| 131 | self.log.debug('onu-only-results', good=results[0], bad=results[1]) |
| 132 | successes += results[0] |
| 133 | failures += results[1] |
| 134 | |
| 135 | if self._diffs['olt-only'] is not None and len(self._diffs['olt-only']): |
| 136 | results = yield self.fix_olt_only(self._diffs['olt-only'], |
| 137 | self._diffs['onu-db'], |
| 138 | self._diffs['olt-db']) |
| 139 | self.log.debug('olt-only-results', good=results[0], bad=results[1]) |
| 140 | successes += results[0] |
| 141 | failures += results[1] |
| 142 | |
| 143 | if self._diffs['attributes'] is not None and len(self._diffs['attributes']): |
| 144 | results = yield self.fix_attributes_only(self._diffs['attributes'], |
| 145 | self._diffs['onu-db'], |
| 146 | self._diffs['olt-db']) |
| 147 | self.log.debug('attributes-results', good=results[0], bad=results[1]) |
| 148 | successes += results[0] |
| 149 | failures += results[1] |
| 150 | |
| 151 | # Success? Update MIB-data-sync |
| 152 | if failures == 0: |
| 153 | results = yield self.update_mib_data_sync() |
| 154 | successes += results[0] |
| 155 | failures += results[1] |
| 156 | |
| 157 | # Send back final status |
| 158 | if failures > 0: |
| 159 | msg = '{} Successful updates, {} failures'.format(successes, failure) |
| 160 | error = MibPartialSuccessException(msg) if successes \ |
| 161 | else MibReconcileException(msg) |
| 162 | self.deferred.errback(failure.Failure(error)) |
| 163 | else: |
| 164 | self.deferred.callback('{} Successful updates'.format(successes)) |
| 165 | |
| 166 | except Exception as e: |
| 167 | if not self.deferred.called: |
| 168 | self.log.exception('reconcile', e=e) |
| 169 | self.deferred.errback(failure.Failure(e)) |
| 170 | |
| 171 | @inlineCallbacks |
| 172 | def fix_onu_only(self, onu, onu_db): |
| 173 | """ |
| 174 | Fix ME's that were only found on the ONU. For ONU only MEs there are |
| 175 | the following things that will be checked. |
| 176 | |
| 177 | o ME's that do not have an OpenOMCI class decoder. These are stored |
| 178 | as binary blobs in the MIB database. Since we do not ever set them |
| 179 | (since no encoder as well), just store them in the OLT/OpenOMCI MIB |
| 180 | Database. |
| 181 | |
| 182 | o For ME's that are created by the ONU (no create/delete access), the |
| 183 | MEs 'may' be due to a firmware upgrade and reboot or in response to |
| 184 | an OLT creating another ME entity and then creating this ME. Place |
| 185 | these 'new' into the database. |
| 186 | |
| 187 | o For ME's that are created by the OLT/OpenOMCI, delete them from the |
| 188 | ONU |
| 189 | |
| 190 | :param onu: (list(int,int)) List of tuples where (class_id, inst_id) |
| 191 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 192 | |
| 193 | :return: (int, int) successes, failures |
| 194 | """ |
| 195 | successes = 0 |
| 196 | failures = 0 |
| 197 | me_map = self._device.me_map |
| 198 | |
| 199 | #################################################################### |
| 200 | # First the undecodables and onu-created (treated the same) |
| 201 | undecodable = self._undecodable(onu, me_map) |
| 202 | onu_created = self._onu_created(onu, me_map) |
| 203 | |
| 204 | if len(undecodable) or len(onu_created): |
| 205 | results = yield self.fix_onu_only_save_to_db(undecodable, onu_created, onu_db) |
| 206 | successes += results[0] |
| 207 | failures += results[1] |
| 208 | |
| 209 | #################################################################### |
| 210 | # Last the OLT created values, resend these to the ONU |
| 211 | |
| 212 | olt_created = self._olt_created(onu, me_map) |
| 213 | if len(olt_created): |
| 214 | results = yield self.fix_onu_only_remove_from_onu(olt_created) |
| 215 | successes += results[0] |
| 216 | failures += results[1] |
| 217 | |
| 218 | returnValue((successes, failures)) |
| 219 | |
| 220 | @inlineCallbacks |
| 221 | def fix_onu_only_save_to_db(self, undecodable, onu_created, onu_db): |
| 222 | """ |
| 223 | In ONU database and needs to be saved to OLT/OpenOMCI database. |
| 224 | |
| 225 | Note that some, perhaps all, of these instances could be ONU create |
| 226 | in response to the OLT creating some other ME instance. So treat |
| 227 | the Database operation as a create. |
| 228 | """ |
| 229 | successes = 0 |
| 230 | failures = 0 |
| 231 | |
| 232 | for cid, eid in undecodable + onu_created: |
| 233 | if self.deferred.called: # Check if task canceled |
| 234 | break |
| 235 | try: |
| 236 | # If in current MIB, had an audit issue or other MIB operation |
| 237 | # put it into the database, declare it a failure so we audit again |
| 238 | try: |
| 239 | olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 240 | |
| 241 | except KeyError: # Common for ONU created MEs during audit |
| 242 | olt_entry = None |
| 243 | |
| 244 | if olt_entry is not None and len(olt_entry): |
| 245 | self.log.debug('onu-only-in-current', cid=cid, eid=eid) |
| 246 | failures += 1 # Mark as failure so we audit again |
| 247 | |
| 248 | elif cid not in onu_db: |
| 249 | self.log.warn('onu-only-not-in-audit', cid=cid, eid=eid) |
| 250 | failures += 1 |
| 251 | |
| 252 | else: |
| 253 | entry = onu_db[cid][eid] |
| 254 | self.strobe_watchdog() |
| 255 | self._sync_sm.mib_set(cid, eid, entry[ATTRIBUTES_KEY]) |
| 256 | successes += 1 |
| 257 | |
| 258 | # If we do nothing but DB updates for ALOT of MEs, we are |
| 259 | # blocking other async twisted tasks, be kind and pause |
| 260 | self._db_updates += 1 |
| 261 | |
| 262 | if self._db_updates >= MibReconcileTask.max_sequential_db_updates: |
| 263 | self._db_updates = 0 |
| 264 | self._local_deferred = yield asleep(MibReconcileTask.db_update_pause) |
| 265 | |
| 266 | except Exception as e: |
| 267 | self.log.warn('onu-only-error', e=e) |
| 268 | failures += 1 |
| 269 | |
| 270 | returnValue((successes, failures)) |
| 271 | |
| 272 | @inlineCallbacks |
| 273 | def fix_onu_only_remove_from_onu(self, olt_created,): |
| 274 | """ On ONU, but no longer on OLT/OpenOMCI, delete it """ |
| 275 | successes = 0 |
| 276 | failures = 0 |
| 277 | |
| 278 | for cid, eid in olt_created: |
| 279 | if self.deferred.called: # Check if task canceled |
| 280 | break |
| 281 | try: |
| 282 | # If in current MIB, had an audit issue, declare it an error |
| 283 | # and next audit should clear it up |
| 284 | try: |
| 285 | current_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 286 | |
| 287 | except KeyError: |
| 288 | # Expected if no other entities with same class present in MIB |
| 289 | current_entry = None |
| 290 | |
| 291 | if current_entry is not None and len(current_entry): |
| 292 | self.log.debug('onu-only-in-current', cid=cid, eid=eid) |
| 293 | failures += 1 |
| 294 | |
| 295 | else: |
| 296 | # Delete it from the ONU. Assume success |
| 297 | frame = OmciFrame(transaction_id=None, |
| 298 | message_type=OmciDelete.message_id, |
| 299 | omci_message=OmciDelete(entity_class=cid, entity_id=eid)) |
| 300 | |
| 301 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 302 | self.check_status_and_state(self._local_deferred, 'onu-attribute-update') |
| 303 | successes += 1 |
| 304 | self._db_updates = 0 |
| 305 | |
| 306 | except Exception as e: |
| 307 | self.log.warn('olt-only-error', e=e) |
| 308 | failures += 1 |
| 309 | self.strobe_watchdog() |
| 310 | |
| 311 | returnValue((successes, failures)) |
| 312 | |
| 313 | @inlineCallbacks |
| 314 | def fix_olt_only(self, olt, onu_db, olt_db): |
| 315 | """ |
| 316 | Fix ME's that were only found on the OLT. For OLT only MEs there are |
| 317 | the following things that will be checked. |
| 318 | |
| 319 | o ME's that do not have an OpenOMCI class decoder. These are stored |
| 320 | as binary blobs in the MIB database. Since the OLT will never |
| 321 | create these (all are learned from ONU), it is assumed the ONU |
| 322 | has removed them for some purpose. So delete them from the OLT |
| 323 | database. |
| 324 | |
| 325 | o For ME's that are created by the ONU (no create/delete access), the |
| 326 | MEs 'may' not be on the ONU because of a reboot or an OLT created |
| 327 | ME was deleted and the ONU gratuitously removes it. So delete them |
| 328 | from the OLT database. |
| 329 | |
| 330 | o For ME's that are created by the OLT/OpenOMCI, delete them from the |
| 331 | ONU |
| 332 | |
| 333 | :param olt: (list(int,int)) List of tuples where (class_id, inst_id) |
| 334 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 335 | :param olt_db: (dict) OLT Database snapshot at time of audit |
| 336 | |
| 337 | :return: (int, int) successes, failures |
| 338 | """ |
| 339 | successes = 0 |
| 340 | failures = 0 |
| 341 | me_map = self._device.me_map |
| 342 | |
| 343 | #################################################################### |
| 344 | # First the undecodables and onu-created (treated the same) remove |
| 345 | # from OpenOMCI database |
| 346 | undecodable = self._undecodable(olt, me_map) |
| 347 | onu_created = self._onu_created(olt, me_map) |
| 348 | |
| 349 | if len(undecodable) or len(onu_created): |
| 350 | good, bad = self.fix_olt_only_remove_from_db(undecodable, onu_created) |
| 351 | successes += good |
| 352 | failures += bad |
| 353 | |
| 354 | #################################################################### |
| 355 | # Last the OLT created |
| 356 | |
| 357 | olt_created = self._olt_created(olt, me_map) |
| 358 | if len(olt_created): |
| 359 | results = yield self.fix_olt_only_create_on_onu(olt_created, me_map) |
| 360 | successes += results[0] |
| 361 | failures += results[1] |
| 362 | |
| 363 | returnValue((successes, failures)) |
| 364 | |
| 365 | def fix_olt_only_remove_from_db(self, undecodable, onu_created): |
| 366 | """ On OLT, but not on ONU and are ONU created, delete from OLT/OpenOMCI DB """ |
| 367 | successes = 0 |
| 368 | failures = 0 |
| 369 | |
| 370 | for cid, eid in undecodable + onu_created: |
| 371 | if self.deferred.called: # Check if task canceled |
| 372 | break |
| 373 | try: |
| 374 | # Delete it. If already deleted (KeyError), then that is okay |
| 375 | self._sync_sm.mib_delete(cid, eid) |
| 376 | self.strobe_watchdog() |
| 377 | |
| 378 | except KeyError: |
| 379 | successes += 1 # Not found in DB anymore, assume success |
| 380 | |
| 381 | except Exception as e: |
| 382 | self.log.warn('olt-only-db-error', cid=cid, eid=eid, e=e) |
| 383 | failures += 1 |
| 384 | |
| 385 | return successes, failures |
| 386 | |
| 387 | @inlineCallbacks |
| 388 | def fix_olt_only_create_on_onu(self, olt_created, me_map): |
| 389 | """ Found on OLT and created by OLT, so create on ONU""" |
| 390 | successes = 0 |
| 391 | failures = 0 |
| 392 | |
| 393 | for cid, eid in olt_created: |
| 394 | if self.deferred.called: # Check if task canceled |
| 395 | break |
| 396 | |
| 397 | try: |
| 398 | # Get current entry, use it if found |
| 399 | olt_entry = self._sync_sm.query_mib(class_id=cid, instance_id=eid) |
| 400 | me_entry = me_map[cid] |
| 401 | |
| 402 | if olt_entry is None or len(olt_entry) == 0: |
| 403 | successes += 1 # Deleted before task got to run |
| 404 | else: |
| 405 | # Create it in the ONU. Only set-by-create attributes allowed |
| 406 | sbc_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items() |
| 407 | if AA.SetByCreate in |
| 408 | next((attr.access for attr in me_entry.attributes |
| 409 | if attr.field.name == k), set())} |
| 410 | |
| 411 | frame = OmciFrame(transaction_id=None, |
| 412 | message_type=OmciCreate.message_id, |
| 413 | omci_message=OmciCreate(entity_class=cid, |
| 414 | entity_id=eid, |
| 415 | data=sbc_data)) |
| 416 | |
| 417 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 418 | self.check_status_and_state(self._local_deferred, 'olt-create-sbc') |
| 419 | successes += 1 |
| 420 | self._db_updates = 0 |
| 421 | |
| 422 | # Try any writeable attributes now (but not set-by-create) |
| 423 | writeable_data = {k: v for k, v in olt_entry[ATTRIBUTES_KEY].items() |
| 424 | if AA.Writable in |
| 425 | next((attr.access for attr in me_entry.attributes |
| 426 | if attr.field.name == k), set()) |
| 427 | and AA.SetByCreate not in |
| 428 | next((attr.access for attr in me_entry.attributes |
| 429 | if attr.field.name == k), set())} |
| 430 | |
| 431 | if len(writeable_data): |
| 432 | attributes_mask = me_entry.mask_for(*writeable_data.keys()) |
| 433 | frame = OmciFrame(transaction_id=None, |
| 434 | message_type=OmciSet.message_id, |
| 435 | omci_message=OmciSet(entity_class=cid, |
| 436 | entity_id=eid, |
| 437 | attributes_mask=attributes_mask, |
| 438 | data=writeable_data)) |
| 439 | |
| 440 | self._local_deferred = yield self._device.omci_cc.send(frame) |
| 441 | self.check_status_and_state(self._local_deferred, 'olt-set-writeable') |
| 442 | successes += 1 |
| 443 | |
| 444 | except Exception as e: |
| 445 | self.log.exception('olt-only-fix', e=e, cid=cid, eid=eid) |
| 446 | failures += 1 |
| 447 | self.strobe_watchdog() |
| 448 | |
| 449 | returnValue((successes, failures)) |
| 450 | |
| 451 | @inlineCallbacks |
| 452 | def fix_attributes_only(self, attrs, onu_db, olt_db): |
| 453 | """ |
| 454 | Fix ME's that were found on both the ONU and OLT, but had differing |
| 455 | attribute values. There are several cases to handle here |
| 456 | |
| 457 | o For ME's created on the ONU that have write attributes that |
| 458 | only exist in the ONU's database, copy these to the OLT/OpenOMCI |
| 459 | database |
| 460 | |
| 461 | o For all other writeable attributes, the OLT value takes precedence |
| 462 | |
| 463 | :param attrs: (list(int,int,str)) List of tuples where (class_id, inst_id, attribute) |
| 464 | points to the specific ME instance where attributes |
| 465 | are different |
| 466 | :param onu_db: (dict) ONU Database snapshot at time of audit |
| 467 | :param olt_db: (dict) OLT Database snapshot at time of audit |
| 468 | |
| 469 | :return: (int, int) successes, failures |
| 470 | """ |
| 471 | successes = 0 |
| 472 | failures = 0 |
| 473 | me_map = self._device.me_map |
| 474 | |
| 475 | # Collect up attributes on a per CID/EID basis. This will result in |
| 476 | # the minimal number of operations to either the database of over |
| 477 | # the OMCI-CC to the ONU |
| 478 | |
| 479 | attr_map = dict() |
| 480 | for cid, eid, attribute in attrs: |
| 481 | if (cid, eid) not in attr_map: |
| 482 | attr_map[(cid, eid)] = {attribute} |
| 483 | else: |
| 484 | attr_map[(cid, eid)].add(attribute) |
| 485 | |
| 486 | for entity_pair, attributes in attr_map.items(): |
| 487 | cid = entity_pair[0] |
| 488 | eid = entity_pair[1] |
| 489 | |
| 490 | # Skip MEs we cannot encode/decode |
| 491 | if cid not in me_map: |
| 492 | self.log.warn('no-me-map-decoder', class_id=cid) |
| 493 | failures += 1 |
| 494 | continue |
| 495 | |
| 496 | if self.deferred.called: # Check if task canceled |
| 497 | break |
| 498 | |
| 499 | # Build up MIB set commands and ONU Set (via OMCI) commands |
| 500 | # based of the attributes |
| 501 | me_entry = me_map[cid] |
| 502 | mib_data_to_save = dict() |
| 503 | onu_data_to_set = dict() |
| 504 | olt_attributes = olt_db[cid][eid][ATTRIBUTES_KEY] |
| 505 | onu_attributes = onu_db[cid][eid][ATTRIBUTES_KEY] |
| 506 | |
| 507 | for attribute in attributes: |
| 508 | map_access = next((attr.access for attr in me_entry.attributes |
| 509 | if attr.field.name == attribute), set()) |
| 510 | writeable = AA.Writable in map_access or AA.SetByCreate in map_access |
| 511 | |
| 512 | # If only in ONU database snapshot, save it to OLT |
| 513 | if attribute in onu_attributes and attribute not in olt_attributes: |
| 514 | # On onu only |
| 515 | mib_data_to_save[attribute] = onu_attributes[attribute] |
| 516 | |
| 517 | elif writeable: |
| 518 | # On olt only or in both. Either way OLT wins |
| 519 | onu_data_to_set[attribute] = olt_attributes[attribute] |
| 520 | |
| 521 | # Now do the bulk operations For both, check to see if the target |
| 522 | # is still the same as when the audit was performed. If it is, do |
| 523 | # the commit. If not, mark as a failure so an expedited audit will |
| 524 | # occur and check again. |
| 525 | |
| 526 | if len(mib_data_to_save): |
| 527 | results = yield self.fix_attributes_only_in_mib(cid, eid, mib_data_to_save) |
| 528 | successes += results[0] |
| 529 | failures += results[1] |
| 530 | |
| 531 | if len(onu_data_to_set): |
| 532 | results = yield self.fix_attributes_only_on_olt(cid, eid, onu_data_to_set, olt_db, me_entry) |
| 533 | successes += results[0] |
| 534 | failures += results[1] |
| 535 | |
| 536 | returnValue((successes, failures)) |
| 537 | |
| 538 | @inlineCallbacks |
| 539 | def fix_attributes_only_in_mib(self, cid, eid, mib_data): |
| 540 | successes = 0 |
| 541 | failures = 0 |
| 542 | try: |
| 543 | # Get current and verify same as during audit it is missing from our DB |
| 544 | attributes = mib_data.keys() |
| 545 | current_entry = self._device.query_mib(cid, eid, attributes) |
| 546 | |
| 547 | if current_entry is not None and len(current_entry): |
| 548 | clashes = {k: v for k, v in current_entry.items() |
| 549 | if k in attributes and v != mib_data[k]} |
| 550 | |
| 551 | if len(clashes): |
| 552 | raise ValueError('Existing DB entry for {}/{} attributes clash with audit data. Clash: {}'. |
| 553 | format(cid, eid, clashes)) |
| 554 | |
| 555 | self._sync_sm.mib_set(cid, eid, mib_data) |
| 556 | successes += len(mib_data) |
| 557 | self.strobe_watchdog() |
| 558 | |
| 559 | # If we do nothing but DB updates for ALOT of MEs, we are |
| 560 | # blocking other async twisted tasks, be kind and yield |
| 561 | self._db_updates += 1 |
| 562 | if self._db_updates >= MibReconcileTask.max_sequential_db_updates: |
| 563 | self._db_updates = 0 |
| 564 | self._local_deferred = yield asleep(MibReconcileTask.db_update_pause) |
| 565 | |
| 566 | except ValueError as e: |
| 567 | self.log.debug('attribute-changed', e) |
| 568 | failures += len(mib_data) |
| 569 | |
| 570 | except Exception as e: |
| 571 | self.log.exception('attribute-only-fix-mib', e=e, cid=cid, eid=eid) |
| 572 | failures += len(mib_data) |
| 573 | |
| 574 | returnValue((successes, failures)) |
| 575 | |
| 576 | @inlineCallbacks |
| 577 | def fix_attributes_only_on_olt(self, cid, eid, onu_data, olt_db, me_entry): |
| 578 | successes = 0 |
| 579 | failures = 0 |
| 580 | |
| 581 | try: |
| 582 | # On olt only or in both. Either way OLT wins, first verify that |
| 583 | # the OLT version is still the same data that we want to |
| 584 | # update on the ONU. Verify the data for the OLT is the same as |
| 585 | # at time of audit |
| 586 | olt_db_entries = {k: v for k, v in olt_db[cid][eid][ATTRIBUTES_KEY].items() |
| 587 | if k in onu_data.keys()} |
| 588 | current_entries = self._sync_sm.query_mib(class_id=cid, instance_id=eid, |
| 589 | attributes=onu_data.keys()) |
| 590 | |
| 591 | still_the_same = all(current_entries.get(k) == v for k, v in olt_db_entries.items()) |
| 592 | if not still_the_same: |
| 593 | returnValue((0, len(onu_data))) # Wait for it to stabilize |
| 594 | |
| 595 | # OLT data still matches, do the set operations now |
| 596 | # while len(onu_data): |
| 597 | attributes_mask = me_entry.mask_for(*onu_data.keys()) |
| 598 | frame = OmciFrame(transaction_id=None, |
| 599 | message_type=OmciSet.message_id, |
| 600 | omci_message=OmciSet(entity_class=cid, |
| 601 | entity_id=eid, |
| 602 | attributes_mask=attributes_mask, |
| 603 | data=onu_data)) |
| 604 | |
| 605 | results = yield self._device.omci_cc.send(frame) |
| 606 | self.check_status_and_state(results, 'onu-attribute-update') |
| 607 | successes += len(onu_data) |
| 608 | self._db_updates = 0 |
| 609 | |
| 610 | except Exception as e: |
| 611 | self.log.exception('attribute-only-fix-onu', e=e, cid=cid, eid=eid) |
| 612 | failures += len(onu_data) |
| 613 | self.strobe_watchdog() |
| 614 | |
| 615 | returnValue((successes, failures)) |
| 616 | |
| 617 | @inlineCallbacks |
| 618 | def update_mib_data_sync(self): |
| 619 | """ |
| 620 | As the final step of MIB resynchronization, the OLT sets the MIB data sync |
| 621 | attribute of the ONU data ME to some suitable value of its own choice. It |
| 622 | then sets its own record of the same attribute to the same value, |
| 623 | incremented by 1, as explained in clause |
| 624 | |
| 625 | :return: (int, int) success, failure counts |
| 626 | """ |
| 627 | # Get MDS to set, do not user zero |
| 628 | |
| 629 | new_mds_value = self._sync_sm.mib_data_sync |
| 630 | if new_mds_value == 0: |
| 631 | self._sync_sm.increment_mib_data_sync() |
| 632 | new_mds_value = self._sync_sm.mib_data_sync |
| 633 | |
| 634 | # Update it. The set response will be sent on the OMCI-CC pub/sub bus |
| 635 | # and the MIB Synchronizer will update this MDS value in the database |
| 636 | # if successful. |
| 637 | try: |
| 638 | frame = OntDataFrame(mib_data_sync=new_mds_value).set() |
| 639 | |
| 640 | results = yield self._device.omci_cc.send(frame) |
| 641 | self.check_status_and_state(results, 'ont-data-mbs-update') |
| 642 | returnValue((1, 0)) |
| 643 | |
| 644 | except TimeoutError as e: |
| 645 | self.log.debug('ont-data-send-timeout', e=e) |
| 646 | returnValue((0, 1)) |
| 647 | |
| 648 | except Exception as e: |
| 649 | self.log.exception('ont-data-send', e=e, mds=new_mds_value) |
| 650 | returnValue((0, 1)) |
| 651 | |
| 652 | def check_status_and_state(self, results, operation=''): |
| 653 | """ |
| 654 | Check the results of an OMCI response. An exception is thrown |
| 655 | if the task was cancelled or an error was detected. |
| 656 | |
| 657 | :param results: (OmciFrame) OMCI Response frame |
| 658 | :param operation: (str) what operation was being performed |
| 659 | :return: True if successful, False if the entity existed (already created) |
| 660 | """ |
| 661 | omci_msg = results.fields['omci_message'].fields |
| 662 | status = omci_msg['success_code'] |
| 663 | error_mask = omci_msg.get('parameter_error_attributes_mask', 'n/a') |
| 664 | failed_mask = omci_msg.get('failed_attributes_mask', 'n/a') |
| 665 | unsupported_mask = omci_msg.get('unsupported_attributes_mask', 'n/a') |
| 666 | self.strobe_watchdog() |
| 667 | |
| 668 | self.log.debug(operation, status=status, error_mask=error_mask, |
| 669 | failed_mask=failed_mask, unsupported_mask=unsupported_mask) |
| 670 | |
| 671 | if status == RC.Success: |
| 672 | return True |
| 673 | |
| 674 | elif status == RC.InstanceExists: |
| 675 | return False |
| 676 | |
| 677 | msg = '{} failed with a status of {}, error_mask: {}, failed_mask: {}, unsupported_mask: {}'.\ |
| 678 | format(operation, status, error_mask, failed_mask, unsupported_mask) |
| 679 | |
| 680 | raise MibReconcileException(msg) |
| 681 | |
| 682 | def _undecodable(self, cid_eid_list, me_map): |
| 683 | return [(cid, eid) for cid, eid in cid_eid_list if cid not in me_map] |
| 684 | |
| 685 | def _onu_created(self, cid_eid_list, me_map): |
| 686 | return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and |
| 687 | (OP.Create not in me_map[cid].mandatory_operations and |
| 688 | OP.Create not in me_map[cid].optional_operations)] |
| 689 | |
| 690 | def _olt_created(self, cid_eid_list, me_map): |
| 691 | return [(cid, eid) for cid, eid in cid_eid_list if cid in me_map and |
| 692 | (OP.Create in me_map[cid].mandatory_operations or |
| 693 | OP.Create in me_map[cid].optional_operations)] |