blob: d2572570c3fae3351f28a65242fbd8dbd7d2eed1 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from datetime import datetime, timedelta
18from transitions import Machine
19from twisted.internet import reactor
20from voltha.extensions.omci.omci_frame import OmciFrame
21from voltha.extensions.omci.database.mib_db_api import MDS_KEY
22from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
23 AttributeAccess
24from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
25 RX_RESPONSE_KEY
26from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, OnuDeviceEntry, \
27 SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
28from voltha.extensions.omci.omci_entities import OntData
29from common.event_bus import EventBusClient
30from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
31
32RxEvent = OmciCCRxEvents
33DevEvent = OnuDeviceEvents
34OP = EntityOperations
35RC = ReasonCodes
36AA = AttributeAccess
37
38
39class MibSynchronizer(object):
40 """
41 OpenOMCI MIB Synchronizer state machine
42 """
43 DEFAULT_STATES = ['disabled', 'starting', 'uploading', 'examining_mds',
44 'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
45
46 DEFAULT_TRANSITIONS = [
47 {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
48
49 {'trigger': 'upload_mib', 'source': 'starting', 'dest': 'uploading'},
50 {'trigger': 'examine_mds', 'source': 'starting', 'dest': 'examining_mds'},
51
52 {'trigger': 'success', 'source': 'uploading', 'dest': 'in_sync'},
53
54 {'trigger': 'success', 'source': 'examining_mds', 'dest': 'in_sync'},
55 {'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'resynchronizing'},
56
57 {'trigger': 'audit_mib', 'source': 'in_sync', 'dest': 'auditing'},
58
59 {'trigger': 'success', 'source': 'out_of_sync', 'dest': 'in_sync'},
60 {'trigger': 'audit_mib', 'source': 'out_of_sync', 'dest': 'auditing'},
61
62 {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
63 {'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
64 {'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
65
66 {'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
67 {'trigger': 'diffs_found', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
68
69 # Do wildcard 'timeout' trigger that sends us back to start
70 {'trigger': 'timeout', 'source': '*', 'dest': 'starting'},
71
72 # Do wildcard 'stop' trigger last so it covers all previous states
73 {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
74 ]
75 DEFAULT_TIMEOUT_RETRY = 5 # Seconds to delay after task failure/timeout
76 DEFAULT_AUDIT_DELAY = 60 # Periodic tick to audit the MIB Data Sync
77 DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
78
79 def __init__(self, agent, device_id, mib_sync_tasks, db,
80 advertise_events=False,
81 states=DEFAULT_STATES,
82 transitions=DEFAULT_TRANSITIONS,
83 initial_state='disabled',
84 timeout_delay=DEFAULT_TIMEOUT_RETRY,
85 audit_delay=DEFAULT_AUDIT_DELAY,
86 resync_delay=DEFAULT_RESYNC_DELAY):
87 """
88 Class initialization
89
90 :param agent: (OpenOmciAgent) Agent
91 :param device_id: (str) ONU Device ID
92 :param db: (MibDbVolatileDict) MIB Database
93 :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
94 :param mib_sync_tasks: (dict) Tasks to run
95 :param states: (list) List of valid states
96 :param transitions: (dict) Dictionary of triggers and state changes
97 :param initial_state: (str) Initial state machine state
98 :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
99 a retry (goes back to starting state)
100 :param audit_delay: (int) Seconds between MIB audits while in sync. Set to
101 zero to disable audit. An operator can request
102 an audit manually by calling 'self.audit_mib'
103 :param resync_delay: (int) Seconds in sync before performing a forced MIB
104 resynchronization
105 """
106 self.log = structlog.get_logger(device_id=device_id)
107
108 self._agent = agent
109 self._device_id = device_id
110 self._device = None
111 self._database = db
112 self._timeout_delay = timeout_delay
113 self._audit_delay = audit_delay
114 self._resync_delay = resync_delay
115
116 self._upload_task = mib_sync_tasks['mib-upload']
117 self._get_mds_task = mib_sync_tasks['get-mds']
118 self._audit_task = mib_sync_tasks['mib-audit']
119 self._resync_task = mib_sync_tasks['mib-resync']
120 self._reconcile_task = mib_sync_tasks['mib-reconcile']
121 self._advertise_events = advertise_events
122
123 self._deferred = None
124 self._current_task = None # TODO: Support multiple running tasks after v.2.0 release
125 self._task_deferred = None
126 self._mib_data_sync = 0
127 self._last_mib_db_sync_value = None
128 self._device_in_db = False
129 self._next_resync = None
130
131 self._on_olt_only_diffs = None
132 self._on_onu_only_diffs = None
133 self._attr_diffs = None
134 self._audited_olt_db = None
135 self._audited_onu_db = None
136
137 self._event_bus = EventBusClient()
138 self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
139 RxEvent.MIB_Reset: None,
140 RxEvent.AVC_Notification: None,
141 RxEvent.MIB_Upload: None,
142 RxEvent.MIB_Upload_Next: None,
143 RxEvent.Create: None,
144 RxEvent.Delete: None,
145 RxEvent.Set: None,
146 }
147 self._omci_cc_sub_mapping = {
148 RxEvent.MIB_Reset: self.on_mib_reset_response,
149 RxEvent.AVC_Notification: self.on_avc_notification,
150 RxEvent.MIB_Upload: self.on_mib_upload_response,
151 RxEvent.MIB_Upload_Next: self.on_mib_upload_next_response,
152 RxEvent.Create: self.on_create_response,
153 RxEvent.Delete: self.on_delete_response,
154 RxEvent.Set: self.on_set_response,
155 }
156 self._onu_dev_subscriptions = { # DevEvent.enum -> Subscription Object
157 DevEvent.OmciCapabilitiesEvent: None
158 }
159 self._onu_dev_sub_mapping = {
160 DevEvent.OmciCapabilitiesEvent: self.on_capabilities_event
161 }
162
163 # Statistics and attributes
164 # TODO: add any others if it will support problem diagnosis
165
166 # Set up state machine to manage states
167 self.machine = Machine(model=self, states=states,
168 transitions=transitions,
169 initial=initial_state,
170 queued=True,
171 name='{}-{}'.format(self.__class__.__name__,
172 device_id))
173 try:
174 import logging
175 logging.getLogger('transitions').setLevel(logging.WARNING)
176 except Exception as e:
177 self.log.exception('log-level-failed', e=e)
178
179 def _cancel_deferred(self):
180 d1, self._deferred = self._deferred, None
181 d2, self._task_deferred = self._task_deferred, None
182
183 for d in [d1, d1]:
184 try:
185 if d is not None and not d.called:
186 d.cancel()
187 except:
188 pass
189
190 def __str__(self):
191 return 'MIBSynchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
192
193 def delete(self):
194 """
195 Cleanup any state information
196 """
197 self.stop()
198 db, self._database = self._database, None
199
200 if db is not None:
201 db.remove(self._device_id)
202
203 @property
204 def device_id(self):
205 return self._device_id
206
207 @property
208 def mib_data_sync(self):
209 return self._mib_data_sync
210
211 def increment_mib_data_sync(self):
212 self._mib_data_sync += 1
213 if self._mib_data_sync > 255:
214 self._mib_data_sync = 0
215
216 if self._database is not None:
217 self._database.save_mib_data_sync(self._device_id,
218 self._mib_data_sync)
219
220 @property
221 def last_mib_db_sync(self):
222 return self._last_mib_db_sync_value
223
224 @last_mib_db_sync.setter
225 def last_mib_db_sync(self, value):
226 self._last_mib_db_sync_value = value
227 if self._database is not None:
228 self._database.save_last_sync(self.device_id, value)
229
230 @property
231 def is_new_onu(self):
232 """
233 Is this a new ONU (has never completed MIB synchronization)
234 :return: (bool) True if this ONU should be considered new
235 """
236 return self.last_mib_db_sync is None
237
238 @property
239 def advertise_events(self):
240 return self._advertise_events
241
242 @advertise_events.setter
243 def advertise_events(self, value):
244 if not isinstance(value, bool):
245 raise TypeError('Advertise event is a boolean')
246 self._advertise_events = value
247
248 def advertise(self, event, info):
249 """Advertise an event on the OpenOMCI event bus"""
250 if self._advertise_events:
251 self._agent.advertise(event,
252 {
253 'state-machine': self.machine.name,
254 'info': info,
255 'time': str(datetime.utcnow())
256 })
257
258 def on_enter_disabled(self):
259 """
260 State machine is being stopped
261 """
262 self.advertise(OpenOmciEventType.state_change, self.state)
263
264 self._cancel_deferred()
265 if self._device is not None:
266 self._device.mib_db_in_sync = False
267
268 task, self._current_task = self._current_task, None
269 if task is not None:
270 task.stop()
271
272 # Drop Response and Autonomous notification subscriptions
273 for event, sub in self._omci_cc_subscriptions.iteritems():
274 if sub is not None:
275 self._omci_cc_subscriptions[event] = None
276 self._device.omci_cc.event_bus.unsubscribe(sub)
277
278 for event, sub in self._onu_dev_subscriptions.iteritems():
279 if sub is not None:
280 self._onu_dev_subscriptions[event] = None
281 self._device.event_bus.unsubscribe(sub)
282
283 # TODO: Stop and remove any currently running or scheduled tasks
284 # TODO: Anything else?
285
286 def _seed_database(self):
287 if not self._device_in_db:
288 try:
289 try:
290 self._database.start()
291 self._database.add(self._device_id)
292 self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
293
294 except KeyError:
295 # Device already is in database
296 self.log.debug('seed-db-exist', device_id=self._device_id)
297 self._mib_data_sync = self._database.get_mib_data_sync(self._device_id)
298 self._last_mib_db_sync_value = self._database.get_last_sync(self._device_id)
299
300 self._device_in_db = True
301
302 except Exception as e:
303 self.log.exception('seed-database-failure', e=e)
304
305 def on_enter_starting(self):
306 """
307 Determine ONU status and start/re-start MIB Synchronization tasks
308 """
309 self._device = self._agent.get_device(self._device_id)
310 self.advertise(OpenOmciEventType.state_change, self.state)
311
312 # Make sure root of external MIB Database exists
313 self._seed_database()
314
315 # Set up Response and Autonomous notification subscriptions
316 try:
317 for event, sub in self._omci_cc_sub_mapping.iteritems():
318 if self._omci_cc_subscriptions[event] is None:
319 self._omci_cc_subscriptions[event] = \
320 self._device.omci_cc.event_bus.subscribe(
321 topic=OMCI_CC.event_bus_topic(self._device_id, event),
322 callback=sub)
323
324 except Exception as e:
325 self.log.exception('omci-cc-subscription-setup', e=e)
326
327 # Set up ONU device subscriptions
328 try:
329 for event, sub in self._onu_dev_sub_mapping.iteritems():
330 if self._onu_dev_subscriptions[event] is None:
331 self._onu_dev_subscriptions[event] = \
332 self._device.event_bus.subscribe(
333 topic=OnuDeviceEntry.event_bus_topic(self._device_id, event),
334 callback=sub)
335
336 except Exception as e:
337 self.log.exception('dev-subscription-setup', e=e)
338
339 # Clear any previous audit results
340 self._on_olt_only_diffs = None
341 self._on_onu_only_diffs = None
342 self._attr_diffs = None
343 self._audited_olt_db = None
344 self._audited_onu_db = None
345
346 # Determine if this ONU has ever synchronized
347 if self.is_new_onu:
348 # Start full MIB upload
349 self._deferred = reactor.callLater(0, self.upload_mib)
350
351 else:
352 # Examine the MIB Data Sync
353 self._deferred = reactor.callLater(0, self.examine_mds)
354
355 def on_enter_uploading(self):
356 """
357 Begin full MIB data upload, starting with a MIB RESET
358 """
359 self.advertise(OpenOmciEventType.state_change, self.state)
360
361 def success(results):
362 self.log.debug('mib-upload-success', results=results)
363 self._current_task = None
364 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
365 self._deferred = reactor.callLater(0, self.success)
366
367 def failure(reason):
368 self.log.info('mib-upload-failure', reason=reason)
369 self._current_task = None
370 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
371
372 self._device.mib_db_in_sync = False
373 self._current_task = self._upload_task(self._agent, self._device_id)
374
375 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
376 self._task_deferred.addCallbacks(success, failure)
377
378 def on_enter_examining_mds(self):
379 """
380 Create a simple task to fetch the MIB Data Sync value and
381 determine if the ONU value matches what is in the MIB database
382 """
383 self.advertise(OpenOmciEventType.state_change, self.state)
384
385 self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
386
387 def success(onu_mds_value):
388 self.log.debug('examine-mds-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
389 self._current_task = None
390
391 # Examine MDS value
392 if self.mib_data_sync == onu_mds_value:
393 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
394 self._deferred = reactor.callLater(0, self.success)
395 else:
396 self._deferred = reactor.callLater(0, self.mismatch)
397
398 def failure(reason):
399 self.log.info('examine-mds-failure', reason=reason)
400 self._current_task = None
401 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
402
403 self._device.mib_db_in_sync = False
404 self._current_task = self._get_mds_task(self._agent, self._device_id)
405
406 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
407 self._task_deferred.addCallbacks(success, failure)
408
409 def on_enter_in_sync(self):
410 """
411 The OLT/OpenOMCI MIB Database is in sync with the ONU MIB Database.
412 """
413 self.advertise(OpenOmciEventType.state_change, self.state)
414 self.last_mib_db_sync = datetime.utcnow()
415 self._device.mib_db_in_sync = True
416
417 if self._audit_delay > 0:
418 self._deferred = reactor.callLater(self._audit_delay, self.audit_mib)
419
420 def on_enter_out_of_sync(self):
421 """
422 The MIB in OpenOMCI and the ONU are out of sync. This can happen if:
423
424 o the MIB_Data_Sync values are not equal, or
425 o the MIBs were compared and differences were found.
426
427 Schedule a task to reconcile the differences
428 """
429 self.advertise(OpenOmciEventType.state_change, self.state)
430
431 # We are only out-of-sync if there were differences. If here due to MDS
432 # value differences, still run the reconcile so we up date the ONU's MDS
433 # value to match ours.
434
435 self._device.mib_db_in_sync = self._attr_diffs is None and \
436 self._on_onu_only_diffs is None and \
437 self._on_olt_only_diffs is None
438
439 def success(onu_mds_value):
440 self.log.debug('reconcile-success', mds_value=onu_mds_value)
441 self._current_task = None
442 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
443 self._deferred = reactor.callLater(0, self.success)
444
445 def failure(reason):
446 self.log.info('reconcile-failure', reason=reason)
447 self._current_task = None
448 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
449
450 diff_collection = {
451 'onu-only': self._on_onu_only_diffs,
452 'olt-only': self._on_olt_only_diffs,
453 'attributes': self._attr_diffs,
454 'olt-db': self._audited_olt_db,
455 'onu-db': self._audited_onu_db
456 }
457 # Clear out results since reconciliation task will be handling them
458 self._on_olt_only_diffs = None
459 self._on_onu_only_diffs = None
460 self._attr_diffs = None
461 self._audited_olt_db = None
462 self._audited_onu_db = None
463
464 self._current_task = self._reconcile_task(self._agent, self._device_id, diff_collection)
465 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
466 self._task_deferred.addCallbacks(success, failure)
467
468 def on_enter_auditing(self):
469 """
470 Perform a MIB Audit. If our last MIB resync was too long in the
471 past, perform a resynchronization anyway
472 """
473 self.advertise(OpenOmciEventType.state_change, self.state)
474
475 if self._next_resync is None:
476 self.log.error('next-forced-resync-error', msg='Next Resync should always be valid at this point')
477 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
478
479 if datetime.utcnow() >= self._next_resync:
480 self._deferred = reactor.callLater(0, self.force_resync)
481 else:
482 def success(onu_mds_value):
483 self.log.debug('audit-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
484 self._current_task = None
485
486 # Examine MDS value
487 if self.mib_data_sync == onu_mds_value:
488 self._deferred = reactor.callLater(0, self.success)
489 else:
490 self._device.mib_db_in_sync = False
491 self._deferred = reactor.callLater(0, self.mismatch)
492
493 def failure(reason):
494 self.log.info('audit-failure', reason=reason)
495 self._current_task = None
496 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
497
498 self._current_task = self._audit_task(self._agent, self._device_id)
499 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
500 self._task_deferred.addCallbacks(success, failure)
501
502 def on_enter_resynchronizing(self):
503 """
504 Perform a resynchronization of the MIB database
505
506 First calculate any differences
507 """
508 self.advertise(OpenOmciEventType.state_change, self.state)
509
510 def success(results):
511 self.log.debug('resync-success', results=results)
512
513 on_olt_only = results.get('on-olt-only')
514 on_onu_only = results.get('on-onu-only')
515 attr_diffs = results.get('attr-diffs')
516 olt_db = results.get('olt-db')
517 onu_db = results.get('onu-db')
518
519 self._current_task = None
520 self._on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
521 self._on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
522 self._attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
523 self._audited_olt_db = olt_db
524 self._audited_onu_db = onu_db
525
526 mds_equal = self.mib_data_sync == self._audited_onu_db[MDS_KEY]
527
528 if mds_equal and all(diff is None for diff in [self._on_olt_only_diffs,
529 self._on_onu_only_diffs,
530 self._attr_diffs]):
531 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
532 self._deferred = reactor.callLater(0, self.success)
533 else:
534 self._deferred = reactor.callLater(0, self.diffs_found)
535
536 def failure(reason):
537 self.log.info('resync-failure', reason=reason)
538 self._current_task = None
539 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
540
541 self._current_task = self._resync_task(self._agent, self._device_id)
542 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
543 self._task_deferred.addCallbacks(success, failure)
544
545 def on_mib_reset_response(self, _topic, msg):
546 """
547 Called upon receipt of a MIB Reset Response for this ONU
548
549 :param _topic: (str) OMCI-RX topic
550 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
551 """
552 self.log.debug('on-mib-reset-response', state=self.state)
553 try:
554 response = msg[RX_RESPONSE_KEY]
555
556 # Check if expected in current mib_sync state
557 if self.state != 'uploading' or self._omci_cc_subscriptions[RxEvent.MIB_Reset] is None:
558 self.log.error('rx-in-invalid-state', state=self.state)
559
560 else:
561 now = datetime.utcnow()
562
563 if not isinstance(response, OmciFrame):
564 raise TypeError('Response should be an OmciFrame')
565
566 omci_msg = response.fields['omci_message'].fields
567 status = omci_msg['success_code']
568
569 assert status == RC.Success, 'Unexpected MIB reset response status: {}'. \
570 format(status)
571
572 self._device.mib_db_in_sync = False
573 self._mib_data_sync = 0
574 self._device._modified = now
575 self._database.on_mib_reset(self._device_id)
576
577 except KeyError:
578 pass # NOP
579
580 def on_avc_notification(self, _topic, msg):
581 """
582 Process an Attribute Value Change Notification
583
584 :param _topic: (str) OMCI-RX topic
585 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
586 """
587 self.log.debug('on-avc-notification', state=self.state)
588
589 if self._omci_cc_subscriptions[RxEvent.AVC_Notification]:
590 try:
591 notification = msg[RX_RESPONSE_KEY]
592
593 if self.state == 'disabled':
594 self.log.error('rx-in-invalid-state', state=self.state)
595
596 # Inspect the notification
597 omci_msg = notification.fields['omci_message'].fields
598 class_id = omci_msg['entity_class']
599 instance_id = omci_msg['entity_id']
600 data = omci_msg['data']
601 attributes = [data.keys()]
602
603 # Look up ME Instance in Database. Not-found can occur if a MIB
604 # reset has occurred
605 info = self._database.query(self.device_id, class_id, instance_id, attributes)
606 # TODO: Add old/new info to log message
607 self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
608
609 # Save the changed data to the MIB.
610 self._database.set(self.device_id, class_id, instance_id, data)
611
612 # Autonomous creation and deletion of managed entities do not
613 # result in an increment of the MIB data sync value. However,
614 # AVC's in response to a change by the Operator do incur an
615 # increment of the MIB Data Sync. If here during uploading,
616 # we issued a MIB-Reset which may generate AVC. (TODO: Focus testing during hardening)
617 if self.state == 'uploading':
618 self.increment_mib_data_sync()
619
620 except KeyError:
621 pass # NOP
622
623 def on_mib_upload_response(self, _topic, msg):
624 """
625 Process a MIB Upload response
626
627 :param _topic: (str) OMCI-RX topic
628 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
629 """
630 self.log.debug('on-mib-upload-next-response', state=self.state)
631
632 if self._omci_cc_subscriptions[RxEvent.MIB_Upload]:
633 # Check if expected in current mib_sync state
634 if self.state == 'resynchronizing':
635 # The resync task handles this
636 # TODO: Remove this subscription if we never do anything with the response
637 return
638
639 if self.state != 'uploading':
640 self.log.error('rx-in-invalid-state', state=self.state)
641
642 def on_mib_upload_next_response(self, _topic, msg):
643 """
644 Process a MIB Upload Next response
645
646 :param _topic: (str) OMCI-RX topic
647 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
648 """
649 self.log.debug('on-mib-upload-next-response', state=self.state)
650
651 if self._omci_cc_subscriptions[RxEvent.MIB_Upload_Next]:
652 try:
653 if self.state == 'resynchronizing':
654 # The resync task handles this
655 return
656
657 # Check if expected in current mib_sync state
658 if self.state != 'uploading':
659 self.log.error('rx-in-invalid-state', state=self.state)
660
661 else:
662 response = msg[RX_RESPONSE_KEY]
663
664 # Extract entity instance information
665 omci_msg = response.fields['omci_message'].fields
666
667 class_id = omci_msg['object_entity_class']
668 entity_id = omci_msg['object_entity_id']
669
670 # Filter out the 'mib_data_sync' from the database. We save that at
671 # the device level and do not want it showing up during a re-sync
672 # during data compares
673
674 if class_id == OntData.class_id:
675 return
676
677 attributes = {k: v for k, v in omci_msg['object_data'].items()}
678
679 # Save to the database
680 self._database.set(self._device_id, class_id, entity_id, attributes)
681
682 except KeyError:
683 pass # NOP
684 except Exception as e:
685 self.log.exception('upload-next', e=e)
686
687 def on_create_response(self, _topic, msg):
688 """
689 Process a Set response
690
691 :param _topic: (str) OMCI-RX topic
692 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
693 """
694 self.log.debug('on-create-response', state=self.state)
695
696 if self._omci_cc_subscriptions[RxEvent.Create]:
697 if self.state in ['disabled', 'uploading']:
698 self.log.error('rx-in-invalid-state', state=self.state)
699 return
700 try:
701 request = msg[TX_REQUEST_KEY]
702 response = msg[RX_RESPONSE_KEY]
703 status = response.fields['omci_message'].fields['success_code']
704
705 if status != RC.Success and status != RC.InstanceExists:
706 # TODO: Support offline ONTs in post VOLTHA v1.3.0
707 omci_msg = response.fields['omci_message']
708 self.log.warn('set-response-failure',
709 class_id=omci_msg.fields['entity_class'],
710 instance_id=omci_msg.fields['entity_id'],
711 status=omci_msg.fields['success_code'],
712 status_text=self._status_to_text(omci_msg.fields['success_code']),
713 parameter_error_attributes_mask=omci_msg.fields['parameter_error_attributes_mask'])
714 else:
715 omci_msg = request.fields['omci_message'].fields
716 class_id = omci_msg['entity_class']
717 entity_id = omci_msg['entity_id']
718 attributes = {k: v for k, v in omci_msg['data'].items()}
719
720 # Save to the database
721 created = self._database.set(self._device_id, class_id, entity_id, attributes)
722
723 if created:
724 self.increment_mib_data_sync()
725
726 # If the ME contains set-by-create or writeable values that were
727 # not specified in the create command, the ONU will have
728 # initialized those fields
729
730 if class_id in self._device.me_map:
731 sbc_w_set = {attr.field.name for attr in self._device.me_map[class_id].attributes
732 if (AA.SBC in attr.access or AA.W in attr.access)
733 and attr.field.name != 'managed_entity_id'}
734
735 missing = sbc_w_set - {k for k in attributes.iterkeys()}
736
737 if len(missing):
738 # Request the missing attributes
739 self.update_sbc_w_items(class_id, entity_id, missing)
740
741 except KeyError as e:
742 pass # NOP
743
744 except Exception as e:
745 self.log.exception('create', e=e)
746
747 def update_sbc_w_items(self, class_id, entity_id, missing_attributes):
748 """
749 Perform a get-request for Set-By-Create (SBC) or writable (w) attributes
750 that were not specified in the original Create request.
751
752 :param class_id: (int) Class ID
753 :param entity_id: (int) Instance ID
754 :param missing_attributes: (set) Missing SBC or Writable attribute
755 """
756 if len(missing_attributes) and class_id in self._device.me_map:
757 from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
758
759 self.log.info('update-sbc-items', class_id=class_id, entity_id=entity_id,
760 attributes=missing_attributes)
761
762 def success(results):
763 self._database.set(self._device_id, class_id, entity_id, results.attributes)
764
765 def failure(reason):
766 self.log.warn('update-sbc-w-failed', reason=reason, class_id=class_id,
767 entity_id=entity_id, attributes=missing_attributes)
768
769 d = self._device.task_runner.queue_task(OmciGetRequest(self._agent, self._device_id,
770 self._device.me_map[class_id],
771 entity_id, missing_attributes,
772 allow_failure=True))
773 d.addCallbacks(success, failure)
774
775 def on_delete_response(self, _topic, msg):
776 """
777 Process a Delete response
778
779 :param _topic: (str) OMCI-RX topic
780 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
781 """
782 self.log.debug('on-delete-response', state=self.state)
783
784 if self._omci_cc_subscriptions[RxEvent.Delete]:
785 if self.state in ['disabled', 'uploading']:
786 self.log.error('rx-in-invalid-state', state=self.state)
787 return
788 try:
789 request = msg[TX_REQUEST_KEY]
790 response = msg[RX_RESPONSE_KEY]
791
792 if response.fields['omci_message'].fields['success_code'] != RC.Success:
793 # TODO: Support offline ONTs in post VOLTHA v1.3.0
794 omci_msg = response.fields['omci_message']
795 self.log.warn('set-response-failure',
796 class_id=omci_msg.fields['entity_class'],
797 instance_id=omci_msg.fields['entity_id'],
798 status=omci_msg.fields['success_code'],
799 status_text=self._status_to_text(omci_msg.fields['success_code']))
800 else:
801 omci_msg = request.fields['omci_message'].fields
802 class_id = omci_msg['entity_class']
803 entity_id = omci_msg['entity_id']
804
805 # Remove from the database
806 deleted = self._database.delete(self._device_id, class_id, entity_id)
807
808 if deleted:
809 self.increment_mib_data_sync()
810
811 except KeyError as e:
812 pass # NOP
813 except Exception as e:
814 self.log.exception('delete', e=e)
815
816 def on_set_response(self, _topic, msg):
817 """
818 Process a Set response
819
820 :param _topic: (str) OMCI-RX topic
821 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
822 """
823 self.log.debug('on-set-response', state=self.state)
824
825 if self._omci_cc_subscriptions[RxEvent.Set]:
826 if self.state in ['disabled', 'uploading']:
827 self.log.error('rx-in-invalid-state', state=self.state)
828 try:
829 request = msg[TX_REQUEST_KEY]
830 response = msg[RX_RESPONSE_KEY]
831
832 if response.fields['omci_message'].fields['success_code'] != RC.Success:
833 # TODO: Support offline ONTs in post VOLTHA v1.3.0
834 omci_msg = response.fields['omci_message']
835 self.log.warn('set-response-failure',
836 class_id=omci_msg.fields['entity_class'],
837 instance_id=omci_msg.fields['entity_id'],
838 status=omci_msg.fields['success_code'],
839 status_text=self._status_to_text(omci_msg.fields['success_code']),
840 unsupported_attribute_mask=omci_msg.fields['unsupported_attributes_mask'],
841 failed_attribute_mask=omci_msg.fields['failed_attributes_mask'])
842 else:
843 omci_msg = request.fields['omci_message'].fields
844 class_id = omci_msg['entity_class']
845 entity_id = omci_msg['entity_id']
846 attributes = {k: v for k, v in omci_msg['data'].items()}
847
848 # Save to the database (Do not save 'sets' of the mib-data-sync however)
849 if class_id != OntData.class_id:
850 modified = self._database.set(self._device_id, class_id, entity_id, attributes)
851 if modified:
852 self.increment_mib_data_sync()
853
854 except KeyError as _e:
855 pass # NOP
856 except Exception as e:
857 self.log.exception('set', e=e)
858
859 # TODO: Future -> Monitor Software download start, section, activate, and commit responses
860 # and increment MIB Data Sync per Table 11.2.2-1 of ITUT-T G.988 (11/2017)
861 # on page 515. Eventually also monitor set-table responses once the
862 # extended message set is supported.
863 def on_capabilities_event(self, _topic, msg):
864 """
865 Process a OMCI capabilties event
866 :param _topic: (str) OnuDeviceEntry Capabilities event
867 :param msg: (dict) Message Entities & Message Types supported
868 """
869 self._database.update_supported_managed_entities(self.device_id,
870 msg[SUPPORTED_MESSAGE_ENTITY_KEY])
871 self._database.update_supported_message_types(self.device_id,
872 msg[SUPPORTED_MESSAGE_TYPES_KEY])
873
874 def _status_to_text(self, success_code):
875 return {
876 RC.Success: "Success",
877 RC.ProcessingError: "Processing Error",
878 RC.NotSupported: "Not Supported",
879 RC.ParameterError: "Paremeter Error",
880 RC.UnknownEntity: "Unknown Entity",
881 RC.UnknownInstance: "Unknown Instance",
882 RC.DeviceBusy: "Device Busy",
883 RC.InstanceExists: "Instance Exists"
884 }.get(success_code, 'Unknown status code: {}'.format(success_code))
885
886 def query_mib(self, class_id=None, instance_id=None, attributes=None):
887 """
888 Get MIB database information.
889
890 This method can be used to request information from the database to the detailed
891 level requested
892
893 :param class_id: (int) Managed Entity class ID
894 :param instance_id: (int) Managed Entity instance
895 :param attributes: (list or str) Managed Entity instance's attributes
896
897 :return: (dict) The value(s) requested. If class/inst/attribute is
898 not found, an empty dictionary is returned
899 :raises DatabaseStateError: If the database is not enabled or does not exist
900 """
901 from voltha.extensions.omci.database.mib_db_api import DatabaseStateError
902
903 self.log.debug('query', class_id=class_id,
904 instance_id=instance_id, attributes=attributes)
905 if self._database is None:
906 raise DatabaseStateError('Database does not yet exist')
907
908 return self._database.query(self._device_id, class_id=class_id,
909 instance_id=instance_id,
910 attributes=attributes)
911
912 def mib_set(self, class_id, entity_id, attributes):
913 """
914 Set attributes of an existing ME Class instance
915
916 This method is primarily used by other state machines to save ME specific
917 information to the persistent database. Access by objects external to the
918 OpenOMCI library is discouraged.
919
920 :param class_id: (int) ME Class ID
921 :param entity_id: (int) ME Class entity ID
922 :param attributes: (dict) attribute -> value pairs to set
923 """
924 # It must exist first (but attributes can be new)
925 if isinstance(attributes, dict) and len(attributes) and\
926 self.query_mib(class_id, entity_id) is not None:
927 self._database.set(self._device_id, class_id, entity_id, attributes)
928
929 def mib_delete(self, class_id, entity_id):
930 """
931 Delete an existing ME Class instance
932
933 This method is primarily used by other state machines to delete an ME
934 from the MIB database
935
936 :param class_id: (int) ME Class ID
937 :param entity_id: (int) ME Class entity ID
938
939 :raises KeyError: If device does not exist
940 :raises DatabaseStateError: If the database is not enabled
941 """
942 self._database.delete(self._device_id, class_id, entity_id)