blob: 2a8b53540e0b3a75adc03464958d2776fafca770 [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17from datetime import datetime, timedelta
18from transitions import Machine
19from twisted.internet import reactor
20from voltha.extensions.omci.omci_frame import OmciFrame
21from voltha.extensions.omci.database.mib_db_api import MDS_KEY
22from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes, \
23 AttributeAccess
24from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
25 RX_RESPONSE_KEY
26from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, OnuDeviceEntry, \
27 SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
28from voltha.extensions.omci.omci_entities import OntData
29from common.event_bus import EventBusClient
30from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
31
32RxEvent = OmciCCRxEvents
33DevEvent = OnuDeviceEvents
34OP = EntityOperations
35RC = ReasonCodes
36AA = AttributeAccess
37
38
39class MibSynchronizer(object):
40 """
41 OpenOMCI MIB Synchronizer state machine
42 """
43 DEFAULT_STATES = ['disabled', 'starting', 'uploading', 'examining_mds',
44 'in_sync', 'out_of_sync', 'auditing', 'resynchronizing']
45
46 DEFAULT_TRANSITIONS = [
47 {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
48
49 {'trigger': 'upload_mib', 'source': 'starting', 'dest': 'uploading'},
50 {'trigger': 'examine_mds', 'source': 'starting', 'dest': 'examining_mds'},
51
52 {'trigger': 'success', 'source': 'uploading', 'dest': 'in_sync'},
53
54 {'trigger': 'success', 'source': 'examining_mds', 'dest': 'in_sync'},
55 {'trigger': 'mismatch', 'source': 'examining_mds', 'dest': 'resynchronizing'},
56
57 {'trigger': 'audit_mib', 'source': 'in_sync', 'dest': 'auditing'},
58
59 {'trigger': 'success', 'source': 'out_of_sync', 'dest': 'in_sync'},
60 {'trigger': 'audit_mib', 'source': 'out_of_sync', 'dest': 'auditing'},
61
62 {'trigger': 'success', 'source': 'auditing', 'dest': 'in_sync'},
63 {'trigger': 'mismatch', 'source': 'auditing', 'dest': 'resynchronizing'},
64 {'trigger': 'force_resync', 'source': 'auditing', 'dest': 'resynchronizing'},
65
66 {'trigger': 'success', 'source': 'resynchronizing', 'dest': 'in_sync'},
67 {'trigger': 'diffs_found', 'source': 'resynchronizing', 'dest': 'out_of_sync'},
68
69 # Do wildcard 'timeout' trigger that sends us back to start
70 {'trigger': 'timeout', 'source': '*', 'dest': 'starting'},
71
72 # Do wildcard 'stop' trigger last so it covers all previous states
73 {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
74 ]
75 DEFAULT_TIMEOUT_RETRY = 5 # Seconds to delay after task failure/timeout
76 DEFAULT_AUDIT_DELAY = 60 # Periodic tick to audit the MIB Data Sync
77 DEFAULT_RESYNC_DELAY = 300 # Periodically force a resync
78
79 def __init__(self, agent, device_id, mib_sync_tasks, db,
80 advertise_events=False,
81 states=DEFAULT_STATES,
82 transitions=DEFAULT_TRANSITIONS,
83 initial_state='disabled',
84 timeout_delay=DEFAULT_TIMEOUT_RETRY,
85 audit_delay=DEFAULT_AUDIT_DELAY,
86 resync_delay=DEFAULT_RESYNC_DELAY):
87 """
88 Class initialization
89
90 :param agent: (OpenOmciAgent) Agent
91 :param device_id: (str) ONU Device ID
92 :param db: (MibDbVolatileDict) MIB Database
93 :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
94 :param mib_sync_tasks: (dict) Tasks to run
95 :param states: (list) List of valid states
96 :param transitions: (dict) Dictionary of triggers and state changes
97 :param initial_state: (str) Initial state machine state
98 :param timeout_delay: (int/float) Number of seconds after a timeout to attempt
99 a retry (goes back to starting state)
100 :param audit_delay: (int) Seconds between MIB audits while in sync. Set to
101 zero to disable audit. An operator can request
102 an audit manually by calling 'self.audit_mib'
103 :param resync_delay: (int) Seconds in sync before performing a forced MIB
104 resynchronization
105 """
106 self.log = structlog.get_logger(device_id=device_id)
107
108 self._agent = agent
109 self._device_id = device_id
110 self._device = None
111 self._database = db
112 self._timeout_delay = timeout_delay
113 self._audit_delay = audit_delay
114 self._resync_delay = resync_delay
115
116 self._upload_task = mib_sync_tasks['mib-upload']
117 self._get_mds_task = mib_sync_tasks['get-mds']
118 self._audit_task = mib_sync_tasks['mib-audit']
119 self._resync_task = mib_sync_tasks['mib-resync']
120 self._reconcile_task = mib_sync_tasks['mib-reconcile']
121 self._advertise_events = advertise_events
122
123 self._deferred = None
124 self._current_task = None # TODO: Support multiple running tasks after v.2.0 release
125 self._task_deferred = None
126 self._mib_data_sync = 0
127 self._last_mib_db_sync_value = None
128 self._device_in_db = False
129 self._next_resync = None
130
131 self._on_olt_only_diffs = None
132 self._on_onu_only_diffs = None
133 self._attr_diffs = None
134 self._audited_olt_db = None
135 self._audited_onu_db = None
136
137 self._event_bus = EventBusClient()
138 self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
139 RxEvent.MIB_Reset: None,
140 RxEvent.AVC_Notification: None,
141 RxEvent.MIB_Upload: None,
142 RxEvent.MIB_Upload_Next: None,
143 RxEvent.Create: None,
144 RxEvent.Delete: None,
145 RxEvent.Set: None,
146 }
147 self._omci_cc_sub_mapping = {
148 RxEvent.MIB_Reset: self.on_mib_reset_response,
149 RxEvent.AVC_Notification: self.on_avc_notification,
150 RxEvent.MIB_Upload: self.on_mib_upload_response,
151 RxEvent.MIB_Upload_Next: self.on_mib_upload_next_response,
152 RxEvent.Create: self.on_create_response,
153 RxEvent.Delete: self.on_delete_response,
154 RxEvent.Set: self.on_set_response,
155 }
156 self._onu_dev_subscriptions = { # DevEvent.enum -> Subscription Object
157 DevEvent.OmciCapabilitiesEvent: None
158 }
159 self._onu_dev_sub_mapping = {
160 DevEvent.OmciCapabilitiesEvent: self.on_capabilities_event
161 }
162
163 # Statistics and attributes
164 # TODO: add any others if it will support problem diagnosis
165
166 # Set up state machine to manage states
167 self.machine = Machine(model=self, states=states,
168 transitions=transitions,
169 initial=initial_state,
170 queued=True,
171 name='{}-{}'.format(self.__class__.__name__,
172 device_id))
173 try:
174 import logging
175 logging.getLogger('transitions').setLevel(logging.WARNING)
176 except Exception as e:
177 self.log.exception('log-level-failed', e=e)
178
179 def _cancel_deferred(self):
180 d1, self._deferred = self._deferred, None
181 d2, self._task_deferred = self._task_deferred, None
182
183 for d in [d1, d1]:
184 try:
185 if d is not None and not d.called:
186 d.cancel()
187 except:
188 pass
189
190 def __str__(self):
191 return 'MIBSynchronizer: Device ID: {}, State:{}'.format(self._device_id, self.state)
192
193 def delete(self):
194 """
195 Cleanup any state information
196 """
197 self.stop()
198 db, self._database = self._database, None
199
200 if db is not None:
201 db.remove(self._device_id)
202
203 @property
204 def device_id(self):
205 return self._device_id
206
207 @property
208 def mib_data_sync(self):
209 return self._mib_data_sync
210
211 def increment_mib_data_sync(self):
212 self._mib_data_sync += 1
213 if self._mib_data_sync > 255:
214 self._mib_data_sync = 0
215
216 if self._database is not None:
217 self._database.save_mib_data_sync(self._device_id,
218 self._mib_data_sync)
219
220 @property
221 def last_mib_db_sync(self):
222 return self._last_mib_db_sync_value
223
224 @last_mib_db_sync.setter
225 def last_mib_db_sync(self, value):
226 self._last_mib_db_sync_value = value
227 if self._database is not None:
228 self._database.save_last_sync(self.device_id, value)
229
230 @property
231 def is_new_onu(self):
232 """
233 Is this a new ONU (has never completed MIB synchronization)
234 :return: (bool) True if this ONU should be considered new
235 """
236 return self.last_mib_db_sync is None
237
238 @property
239 def advertise_events(self):
240 return self._advertise_events
241
242 @advertise_events.setter
243 def advertise_events(self, value):
244 if not isinstance(value, bool):
245 raise TypeError('Advertise event is a boolean')
246 self._advertise_events = value
247
248 def advertise(self, event, info):
249 """Advertise an event on the OpenOMCI event bus"""
250 if self._advertise_events:
251 self._agent.advertise(event,
252 {
253 'state-machine': self.machine.name,
254 'info': info,
255 'time': str(datetime.utcnow())
256 })
257
258 def on_enter_disabled(self):
259 """
260 State machine is being stopped
261 """
262 self.advertise(OpenOmciEventType.state_change, self.state)
263
264 self._cancel_deferred()
265 if self._device is not None:
266 self._device.mib_db_in_sync = False
267
268 task, self._current_task = self._current_task, None
269 if task is not None:
270 task.stop()
271
272 # Drop Response and Autonomous notification subscriptions
273 for event, sub in self._omci_cc_subscriptions.iteritems():
274 if sub is not None:
275 self._omci_cc_subscriptions[event] = None
276 self._device.omci_cc.event_bus.unsubscribe(sub)
277
278 for event, sub in self._onu_dev_subscriptions.iteritems():
279 if sub is not None:
280 self._onu_dev_subscriptions[event] = None
281 self._device.event_bus.unsubscribe(sub)
282
283 # TODO: Stop and remove any currently running or scheduled tasks
284 # TODO: Anything else?
285
286 def _seed_database(self):
287 if not self._device_in_db:
288 try:
289 try:
290 self._database.start()
291 self._database.add(self._device_id)
292 self.log.debug('seed-db-does-not-exist', device_id=self._device_id)
293
294 except KeyError:
295 # Device already is in database
296 self.log.debug('seed-db-exist', device_id=self._device_id)
297 self._mib_data_sync = self._database.get_mib_data_sync(self._device_id)
298 self._last_mib_db_sync_value = self._database.get_last_sync(self._device_id)
299
300 self._device_in_db = True
301
302 except Exception as e:
303 self.log.exception('seed-database-failure', e=e)
304
305 def on_enter_starting(self):
306 """
307 Determine ONU status and start/re-start MIB Synchronization tasks
308 """
309 self._device = self._agent.get_device(self._device_id)
310 self.advertise(OpenOmciEventType.state_change, self.state)
311
312 # Make sure root of external MIB Database exists
313 self._seed_database()
314
315 # Set up Response and Autonomous notification subscriptions
316 try:
317 for event, sub in self._omci_cc_sub_mapping.iteritems():
318 if self._omci_cc_subscriptions[event] is None:
319 self._omci_cc_subscriptions[event] = \
320 self._device.omci_cc.event_bus.subscribe(
321 topic=OMCI_CC.event_bus_topic(self._device_id, event),
322 callback=sub)
323
324 except Exception as e:
325 self.log.exception('omci-cc-subscription-setup', e=e)
326
327 # Set up ONU device subscriptions
328 try:
329 for event, sub in self._onu_dev_sub_mapping.iteritems():
330 if self._onu_dev_subscriptions[event] is None:
331 self._onu_dev_subscriptions[event] = \
332 self._device.event_bus.subscribe(
333 topic=OnuDeviceEntry.event_bus_topic(self._device_id, event),
334 callback=sub)
335
336 except Exception as e:
337 self.log.exception('dev-subscription-setup', e=e)
338
339 # Clear any previous audit results
340 self._on_olt_only_diffs = None
341 self._on_onu_only_diffs = None
342 self._attr_diffs = None
343 self._audited_olt_db = None
344 self._audited_onu_db = None
345
346 # Determine if this ONU has ever synchronized
347 if self.is_new_onu:
348 # Start full MIB upload
349 self._deferred = reactor.callLater(0, self.upload_mib)
350
351 else:
352 # Examine the MIB Data Sync
353 self._deferred = reactor.callLater(0, self.examine_mds)
354
355 def on_enter_uploading(self):
356 """
357 Begin full MIB data upload, starting with a MIB RESET
358 """
359 self.advertise(OpenOmciEventType.state_change, self.state)
360
361 def success(results):
362 self.log.debug('mib-upload-success', results=results)
363 self._current_task = None
364 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
365 self._deferred = reactor.callLater(0, self.success)
366
367 def failure(reason):
368 self.log.info('mib-upload-failure', reason=reason)
369 self._current_task = None
370 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
371
372 self._device.mib_db_in_sync = False
373 self._current_task = self._upload_task(self._agent, self._device_id)
374
375 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
376 self._task_deferred.addCallbacks(success, failure)
377
378 def on_enter_examining_mds(self):
379 """
380 Create a simple task to fetch the MIB Data Sync value and
381 determine if the ONU value matches what is in the MIB database
382 """
383 self.advertise(OpenOmciEventType.state_change, self.state)
384
385 self._mib_data_sync = self._database.get_mib_data_sync(self._device_id) or 0
386
387 def success(onu_mds_value):
388 self.log.debug('examine-mds-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
389 self._current_task = None
390
391 # Examine MDS value
392 if self.mib_data_sync == onu_mds_value:
393 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
394 self._deferred = reactor.callLater(0, self.success)
395 else:
396 self._deferred = reactor.callLater(0, self.mismatch)
397
398 def failure(reason):
399 self.log.info('examine-mds-failure', reason=reason)
400 self._current_task = None
401 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
402
403 self._device.mib_db_in_sync = False
404 self._current_task = self._get_mds_task(self._agent, self._device_id)
405
406 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
407 self._task_deferred.addCallbacks(success, failure)
408
409 def on_enter_in_sync(self):
410 """
411 The OLT/OpenOMCI MIB Database is in sync with the ONU MIB Database.
412 """
413 self.advertise(OpenOmciEventType.state_change, self.state)
414 self.last_mib_db_sync = datetime.utcnow()
415 self._device.mib_db_in_sync = True
416
417 if self._audit_delay > 0:
418 self._deferred = reactor.callLater(self._audit_delay, self.audit_mib)
419
420 def on_enter_out_of_sync(self):
421 """
422 The MIB in OpenOMCI and the ONU are out of sync. This can happen if:
423
424 o the MIB_Data_Sync values are not equal, or
425 o the MIBs were compared and differences were found.
426
427 Schedule a task to reconcile the differences
428 """
429 self.advertise(OpenOmciEventType.state_change, self.state)
430
431 # We are only out-of-sync if there were differences. If here due to MDS
432 # value differences, still run the reconcile so we up date the ONU's MDS
433 # value to match ours.
434
435 self._device.mib_db_in_sync = self._attr_diffs is None and \
436 self._on_onu_only_diffs is None and \
437 self._on_olt_only_diffs is None
438
439 def success(onu_mds_value):
440 self.log.debug('reconcile-success', mds_value=onu_mds_value)
441 self._current_task = None
442 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
443 self._deferred = reactor.callLater(0, self.success)
444
445 def failure(reason):
446 self.log.info('reconcile-failure', reason=reason)
447 self._current_task = None
448 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
449
450 diff_collection = {
451 'onu-only': self._on_onu_only_diffs,
452 'olt-only': self._on_olt_only_diffs,
453 'attributes': self._attr_diffs,
454 'olt-db': self._audited_olt_db,
455 'onu-db': self._audited_onu_db
456 }
457 # Clear out results since reconciliation task will be handling them
458 self._on_olt_only_diffs = None
459 self._on_onu_only_diffs = None
460 self._attr_diffs = None
461 self._audited_olt_db = None
462 self._audited_onu_db = None
463
464 self._current_task = self._reconcile_task(self._agent, self._device_id, diff_collection)
465 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
466 self._task_deferred.addCallbacks(success, failure)
467
468 def on_enter_auditing(self):
469 """
470 Perform a MIB Audit. If our last MIB resync was too long in the
471 past, perform a resynchronization anyway
472 """
473 self.advertise(OpenOmciEventType.state_change, self.state)
474
475 if self._next_resync is None:
476 self.log.error('next-forced-resync-error', msg='Next Resync should always be valid at this point')
477 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
478
479 if datetime.utcnow() >= self._next_resync:
480 self._deferred = reactor.callLater(0, self.force_resync)
481 else:
482 def success(onu_mds_value):
483 self.log.debug('audit-success', onu_mds_value=onu_mds_value, olt_mds_value=self.mib_data_sync)
484 self._current_task = None
485
486 # Examine MDS value
487 if self.mib_data_sync == onu_mds_value:
488 self._deferred = reactor.callLater(0, self.success)
489 else:
490 self._device.mib_db_in_sync = False
491 self._deferred = reactor.callLater(0, self.mismatch)
492
493 def failure(reason):
494 self.log.info('audit-failure', reason=reason)
495 self._current_task = None
496 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
497
498 self._current_task = self._audit_task(self._agent, self._device_id)
499 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
500 self._task_deferred.addCallbacks(success, failure)
501
502 def on_enter_resynchronizing(self):
503 """
504 Perform a resynchronization of the MIB database
505
506 First calculate any differences
507 """
508 self.advertise(OpenOmciEventType.state_change, self.state)
509
510 def success(results):
511 self.log.debug('resync-success', results=results)
512
513 on_olt_only = results.get('on-olt-only')
514 on_onu_only = results.get('on-onu-only')
515 attr_diffs = results.get('attr-diffs')
516 olt_db = results.get('olt-db')
517 onu_db = results.get('onu-db')
518
519 self._current_task = None
520 self._on_olt_only_diffs = on_olt_only if on_olt_only and len(on_olt_only) else None
521 self._on_onu_only_diffs = on_onu_only if on_onu_only and len(on_onu_only) else None
522 self._attr_diffs = attr_diffs if attr_diffs and len(attr_diffs) else None
523 self._audited_olt_db = olt_db
524 self._audited_onu_db = onu_db
525
526 mds_equal = self.mib_data_sync == self._audited_onu_db[MDS_KEY]
527
528 if mds_equal and all(diff is None for diff in [self._on_olt_only_diffs,
529 self._on_onu_only_diffs,
530 self._attr_diffs]):
531 self._next_resync = datetime.utcnow() + timedelta(seconds=self._resync_delay)
532 self._deferred = reactor.callLater(0, self.success)
533 else:
534 self._deferred = reactor.callLater(0, self.diffs_found)
535
536 def failure(reason):
537 self.log.info('resync-failure', reason=reason)
538 self._current_task = None
539 self._deferred = reactor.callLater(self._timeout_delay, self.timeout)
540
541 self._current_task = self._resync_task(self._agent, self._device_id)
542 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
543 self._task_deferred.addCallbacks(success, failure)
544
545 def on_mib_reset_response(self, _topic, msg):
546 """
547 Called upon receipt of a MIB Reset Response for this ONU
548
549 :param _topic: (str) OMCI-RX topic
550 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
551 """
552 self.log.debug('on-mib-reset-response', state=self.state)
553 try:
554 response = msg[RX_RESPONSE_KEY]
555
556 # Check if expected in current mib_sync state
557 if self.state != 'uploading' or self._omci_cc_subscriptions[RxEvent.MIB_Reset] is None:
558 self.log.error('rx-in-invalid-state', state=self.state)
559
560 else:
561 now = datetime.utcnow()
562
563 if not isinstance(response, OmciFrame):
564 raise TypeError('Response should be an OmciFrame')
565
566 omci_msg = response.fields['omci_message'].fields
567 status = omci_msg['success_code']
568
569 assert status == RC.Success, 'Unexpected MIB reset response status: {}'. \
570 format(status)
571
572 self._device.mib_db_in_sync = False
573 self._mib_data_sync = 0
574 self._device._modified = now
575 self._database.on_mib_reset(self._device_id)
576
577 except KeyError:
578 pass # NOP
579
580 def on_avc_notification(self, _topic, msg):
581 """
582 Process an Attribute Value Change Notification
583
584 :param _topic: (str) OMCI-RX topic
585 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
586 """
587 self.log.debug('on-avc-notification', state=self.state)
588
589 if self._omci_cc_subscriptions[RxEvent.AVC_Notification]:
590 try:
591 notification = msg[RX_RESPONSE_KEY]
592
593 if self.state == 'disabled':
594 self.log.error('rx-in-invalid-state', state=self.state)
595
596 # Inspect the notification
597 omci_msg = notification.fields['omci_message'].fields
598 class_id = omci_msg['entity_class']
599 instance_id = omci_msg['entity_id']
600 data = omci_msg['data']
601 attributes = [data.keys()]
602
603 # Look up ME Instance in Database. Not-found can occur if a MIB
604 # reset has occurred
605 info = self._database.query(self.device_id, class_id, instance_id, attributes)
606 # TODO: Add old/new info to log message
607 self.log.debug('avc-change', class_id=class_id, instance_id=instance_id)
608
609 # Save the changed data to the MIB.
610 self._database.set(self.device_id, class_id, instance_id, data)
611
612 # Autonomous creation and deletion of managed entities do not
613 # result in an increment of the MIB data sync value. However,
614 # AVC's in response to a change by the Operator do incur an
615 # increment of the MIB Data Sync. If here during uploading,
616 # we issued a MIB-Reset which may generate AVC. (TODO: Focus testing during hardening)
617 if self.state == 'uploading':
618 self.increment_mib_data_sync()
619
620 except KeyError:
621 pass # NOP
622
623 def on_mib_upload_response(self, _topic, msg):
624 """
625 Process a MIB Upload response
626
627 :param _topic: (str) OMCI-RX topic
628 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
629 """
630 self.log.debug('on-mib-upload-next-response', state=self.state)
631
632 if self._omci_cc_subscriptions[RxEvent.MIB_Upload]:
633 # Check if expected in current mib_sync state
634 if self.state == 'resynchronizing':
635 # The resync task handles this
636 # TODO: Remove this subscription if we never do anything with the response
637 return
638
639 if self.state != 'uploading':
640 self.log.error('rx-in-invalid-state', state=self.state)
641
642 def on_mib_upload_next_response(self, _topic, msg):
643 """
644 Process a MIB Upload Next response
645
646 :param _topic: (str) OMCI-RX topic
647 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
648 """
649 self.log.debug('on-mib-upload-next-response', state=self.state)
650
651 if self._omci_cc_subscriptions[RxEvent.MIB_Upload_Next]:
652 try:
653 if self.state == 'resynchronizing':
654 # The resync task handles this
655 return
656
657 # Check if expected in current mib_sync state
658 if self.state != 'uploading':
659 self.log.error('rx-in-invalid-state', state=self.state)
660
661 else:
662 response = msg[RX_RESPONSE_KEY]
663
664 # Extract entity instance information
665 omci_msg = response.fields['omci_message'].fields
666
667 class_id = omci_msg['object_entity_class']
668 entity_id = omci_msg['object_entity_id']
669
670 # Filter out the 'mib_data_sync' from the database. We save that at
671 # the device level and do not want it showing up during a re-sync
672 # during data compares
673
674 if class_id == OntData.class_id:
675 return
676
677 attributes = {k: v for k, v in omci_msg['object_data'].items()}
678
679 # Save to the database
680 self._database.set(self._device_id, class_id, entity_id, attributes)
681
682 except KeyError:
683 pass # NOP
684 except Exception as e:
685 self.log.exception('upload-next', e=e)
686
687 def on_create_response(self, _topic, msg):
688 """
689 Process a Set response
690
691 :param _topic: (str) OMCI-RX topic
692 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
693 """
694 self.log.debug('on-create-response', state=self.state)
695
696 if self._omci_cc_subscriptions[RxEvent.Create]:
697 if self.state in ['disabled', 'uploading']:
698 self.log.error('rx-in-invalid-state', state=self.state)
699 return
700 try:
701 request = msg[TX_REQUEST_KEY]
702 response = msg[RX_RESPONSE_KEY]
703 status = response.fields['omci_message'].fields['success_code']
704
705 if status != RC.Success and status != RC.InstanceExists:
706 # TODO: Support offline ONTs in post VOLTHA v1.3.0
707 omci_msg = response.fields['omci_message']
708 self.log.warn('set-response-failure',
709 class_id=omci_msg.fields['entity_class'],
710 instance_id=omci_msg.fields['entity_id'],
711 status=omci_msg.fields['success_code'],
712 status_text=self._status_to_text(omci_msg.fields['success_code']),
713 parameter_error_attributes_mask=omci_msg.fields['parameter_error_attributes_mask'])
714 else:
715 omci_msg = request.fields['omci_message'].fields
716 class_id = omci_msg['entity_class']
717 entity_id = omci_msg['entity_id']
718 attributes = {k: v for k, v in omci_msg['data'].items()}
719
720 # Save to the database
721 created = self._database.set(self._device_id, class_id, entity_id, attributes)
722
723 if created:
724 self.increment_mib_data_sync()
725
726 # If the ME contains set-by-create or writeable values that were
727 # not specified in the create command, the ONU will have
728 # initialized those fields
729
730 if class_id in self._device.me_map:
731 sbc_w_set = {attr.field.name for attr in self._device.me_map[class_id].attributes
732 if (AA.SBC in attr.access or AA.W in attr.access)
733 and attr.field.name != 'managed_entity_id'}
734
735 missing = sbc_w_set - {k for k in attributes.iterkeys()}
736
737 if len(missing):
738 # Request the missing attributes
739 self.update_sbc_w_items(class_id, entity_id, missing)
740
741 except KeyError as e:
742 pass # NOP
743
744 except Exception as e:
745 self.log.exception('create', e=e)
746
747 def update_sbc_w_items(self, class_id, entity_id, missing_attributes):
748 """
749 Perform a get-request for Set-By-Create (SBC) or writable (w) attributes
750 that were not specified in the original Create request.
751
752 :param class_id: (int) Class ID
753 :param entity_id: (int) Instance ID
754 :param missing_attributes: (set) Missing SBC or Writable attribute
755 """
756 if len(missing_attributes) and class_id in self._device.me_map:
757 from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
758
759 def success(results):
760 self._database.set(self._device_id, class_id, entity_id, results.attributes)
761
762 def failure(reason):
763 self.log.warn('update-sbc-w-failed', reason=reason, class_id=class_id,
764 entity_id=entity_id, attributes=missing_attributes)
765
766 d = self._device.task_runner.queue_task(OmciGetRequest(self._agent, self._device_id,
767 self._device.me_map[class_id],
768 entity_id, missing_attributes,
769 allow_failure=True))
770 d.addCallbacks(success, failure)
771
772 def on_delete_response(self, _topic, msg):
773 """
774 Process a Delete response
775
776 :param _topic: (str) OMCI-RX topic
777 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
778 """
779 self.log.debug('on-delete-response', state=self.state)
780
781 if self._omci_cc_subscriptions[RxEvent.Delete]:
782 if self.state in ['disabled', 'uploading']:
783 self.log.error('rx-in-invalid-state', state=self.state)
784 return
785 try:
786 request = msg[TX_REQUEST_KEY]
787 response = msg[RX_RESPONSE_KEY]
788
789 if response.fields['omci_message'].fields['success_code'] != RC.Success:
790 # TODO: Support offline ONTs in post VOLTHA v1.3.0
791 omci_msg = response.fields['omci_message']
792 self.log.warn('set-response-failure',
793 class_id=omci_msg.fields['entity_class'],
794 instance_id=omci_msg.fields['entity_id'],
795 status=omci_msg.fields['success_code'],
796 status_text=self._status_to_text(omci_msg.fields['success_code']))
797 else:
798 omci_msg = request.fields['omci_message'].fields
799 class_id = omci_msg['entity_class']
800 entity_id = omci_msg['entity_id']
801
802 # Remove from the database
803 deleted = self._database.delete(self._device_id, class_id, entity_id)
804
805 if deleted:
806 self.increment_mib_data_sync()
807
808 except KeyError as e:
809 pass # NOP
810 except Exception as e:
811 self.log.exception('delete', e=e)
812
813 def on_set_response(self, _topic, msg):
814 """
815 Process a Set response
816
817 :param _topic: (str) OMCI-RX topic
818 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
819 """
820 self.log.debug('on-set-response', state=self.state)
821
822 if self._omci_cc_subscriptions[RxEvent.Set]:
823 if self.state in ['disabled', 'uploading']:
824 self.log.error('rx-in-invalid-state', state=self.state)
825 try:
826 request = msg[TX_REQUEST_KEY]
827 response = msg[RX_RESPONSE_KEY]
828
829 if response.fields['omci_message'].fields['success_code'] != RC.Success:
830 # TODO: Support offline ONTs in post VOLTHA v1.3.0
831 omci_msg = response.fields['omci_message']
832 self.log.warn('set-response-failure',
833 class_id=omci_msg.fields['entity_class'],
834 instance_id=omci_msg.fields['entity_id'],
835 status=omci_msg.fields['success_code'],
836 status_text=self._status_to_text(omci_msg.fields['success_code']),
837 unsupported_attribute_mask=omci_msg.fields['unsupported_attributes_mask'],
838 failed_attribute_mask=omci_msg.fields['failed_attributes_mask'])
839 else:
840 omci_msg = request.fields['omci_message'].fields
841 class_id = omci_msg['entity_class']
842 entity_id = omci_msg['entity_id']
843 attributes = {k: v for k, v in omci_msg['data'].items()}
844
845 # Save to the database (Do not save 'sets' of the mib-data-sync however)
846 if class_id != OntData.class_id:
847 modified = self._database.set(self._device_id, class_id, entity_id, attributes)
848 if modified:
849 self.increment_mib_data_sync()
850
851 except KeyError as _e:
852 pass # NOP
853 except Exception as e:
854 self.log.exception('set', e=e)
855 def on_capabilities_event(self, _topic, msg):
856 """
857 Process a OMCI capabilties event
858 :param _topic: (str) OnuDeviceEntry Capabilities event
859 :param msg: (dict) Message Entities & Message Types supported
860 """
861 self._database.update_supported_managed_entities(self.device_id,
862 msg[SUPPORTED_MESSAGE_ENTITY_KEY])
863 self._database.update_supported_message_types(self.device_id,
864 msg[SUPPORTED_MESSAGE_TYPES_KEY])
865
866 def _status_to_text(self, success_code):
867 return {
868 RC.Success: "Success",
869 RC.ProcessingError: "Processing Error",
870 RC.NotSupported: "Not Supported",
871 RC.ParameterError: "Paremeter Error",
872 RC.UnknownEntity: "Unknown Entity",
873 RC.UnknownInstance: "Unknown Instance",
874 RC.DeviceBusy: "Device Busy",
875 RC.InstanceExists: "Instance Exists"
876 }.get(success_code, 'Unknown status code: {}'.format(success_code))
877
878 def query_mib(self, class_id=None, instance_id=None, attributes=None):
879 """
880 Get MIB database information.
881
882 This method can be used to request information from the database to the detailed
883 level requested
884
885 :param class_id: (int) Managed Entity class ID
886 :param instance_id: (int) Managed Entity instance
887 :param attributes: (list or str) Managed Entity instance's attributes
888
889 :return: (dict) The value(s) requested. If class/inst/attribute is
890 not found, an empty dictionary is returned
891 :raises DatabaseStateError: If the database is not enabled or does not exist
892 """
893 from voltha.extensions.omci.database.mib_db_api import DatabaseStateError
894
895 self.log.debug('query', class_id=class_id,
896 instance_id=instance_id, attributes=attributes)
897 if self._database is None:
898 raise DatabaseStateError('Database does not yet exist')
899
900 return self._database.query(self._device_id, class_id=class_id,
901 instance_id=instance_id,
902 attributes=attributes)
903
904 def mib_set(self, class_id, entity_id, attributes):
905 """
906 Set attributes of an existing ME Class instance
907
908 This method is primarily used by other state machines to save ME specific
909 information to the persistent database. Access by objects external to the
910 OpenOMCI library is discouraged.
911
912 :param class_id: (int) ME Class ID
913 :param entity_id: (int) ME Class entity ID
914 :param attributes: (dict) attribute -> value pairs to set
915 """
916 # It must exist first (but attributes can be new)
917 if isinstance(attributes, dict) and len(attributes) and\
918 self.query_mib(class_id, entity_id) is not None:
919 self._database.set(self._device_id, class_id, entity_id, attributes)
920
921 def mib_delete(self, class_id, entity_id):
922 """
923 Delete an existing ME Class instance
924
925 This method is primarily used by other state machines to delete an ME
926 from the MIB database
927
928 :param class_id: (int) ME Class ID
929 :param entity_id: (int) ME Class entity ID
930
931 :raises KeyError: If device does not exist
932 :raises DatabaseStateError: If the database is not enabled
933 """
934 self._database.delete(self._device_id, class_id, entity_id)