blob: 78cfa743c2af9c6e9b93e2bffdc416bd99bb3870 [file] [log] [blame]
Chip Boling32aab302019-01-23 10:50:18 -06001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import structlog
17import arrow
18from transitions import Machine
19from datetime import datetime, timedelta
20from random import uniform, shuffle
21from twisted.internet import reactor
22from common.utils.indexpool import IndexPool
23from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
24from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
25from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
26 RX_RESPONSE_KEY
27from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
28from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
29from voltha.extensions.omci.omci_entities import MacBridgePortConfigurationData
30from voltha.extensions.omci.omci_entities import EthernetPMMonitoringHistoryData, \
31 FecPerformanceMonitoringHistoryData, \
32 XgPonTcPerformanceMonitoringHistoryData, \
33 XgPonDownstreamPerformanceMonitoringHistoryData, \
34 XgPonUpstreamPerformanceMonitoringHistoryData, \
35 EthernetFrameUpstreamPerformanceMonitoringHistoryData, \
36 EthernetFrameDownstreamPerformanceMonitoringHistoryData, \
37 EthernetFrameExtendedPerformanceMonitoring, \
38 EthernetFrameExtendedPerformanceMonitoring64Bit, AniG
39
40
41RxEvent = OmciCCRxEvents
42OP = EntityOperations
43RC = ReasonCodes
44
45
46class PerformanceIntervals(object):
47 """
48 OpenOMCI ONU Performance Monitoring Intervals State machine
49
50 This state machine focuses on L2 Internet Data Service and Classical
51 PM (for the v2.0 release).
52 """
53 DEFAULT_STATES = ['disabled', 'starting', 'synchronize_time', 'idle', 'create_pm_me',
54 'collect_data', 'threshold_exceeded']
55
56 DEFAULT_TRANSITIONS = [
57 {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
58 {'trigger': 'tick', 'source': 'starting', 'dest': 'synchronize_time'},
59
60 {'trigger': 'success', 'source': 'synchronize_time', 'dest': 'idle'},
61 {'trigger': 'failure', 'source': 'synchronize_time', 'dest': 'synchronize_time'},
62
63 {'trigger': 'tick', 'source': 'idle', 'dest': 'collect_data'},
64 {'trigger': 'add_me', 'source': 'idle', 'dest': 'create_pm_me'},
65 {'trigger': 'delete_me', 'source': 'idle', 'dest': 'delete_pm_me'},
66
67 # TODO: Can these be combined into one?
68 {'trigger': 'success', 'source': 'create_pm_me', 'dest': 'idle'},
69 {'trigger': 'failure', 'source': 'create_pm_me', 'dest': 'idle'},
70
71 # TODO: Can these be combined into one?
72 {'trigger': 'success', 'source': 'delete_pm_me', 'dest': 'idle'},
73 {'trigger': 'failure', 'source': 'delete_pm_me', 'dest': 'idle'},
74
75 # TODO: Can these be combined into one?
76 {'trigger': 'success', 'source': 'collect_data', 'dest': 'idle'},
77 {'trigger': 'failure', 'source': 'collect_data', 'dest': 'idle'},
78
79 # TODO: Add rebooted event transitions to disabled or synchronize_time
80 # TODO: Need to capture Threshold Crossing Alarms appropriately
81
82 # Do wildcard 'stop' trigger last so it covers all previous states
83 {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
84 {'trigger': 'reboot', 'source': '*', 'dest': 'rebooted'},
85 ]
86 DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
87 DEFAULT_TICK_DELAY = 15 # Seconds between checks for collection tick
88 DEFAULT_INTERVAL_SKEW = 10 * 60 # Seconds to skew past interval boundary
89 DEFAULT_COLLECT_ATTEMPTS = 3 # Maximum number of collection fetch attempts
90 DEFAULT_CREATE_ATTEMPTS = 15 # Maximum number of attempts to create a PM Managed Entities
91
92 def __init__(self, agent, device_id, tasks,
93 advertise_events=False,
94 states=DEFAULT_STATES,
95 transitions=DEFAULT_TRANSITIONS,
96 initial_state='disabled',
97 timeout_delay=DEFAULT_RETRY,
98 tick_delay=DEFAULT_TICK_DELAY,
99 interval_skew=DEFAULT_INTERVAL_SKEW,
100 collect_attempts=DEFAULT_COLLECT_ATTEMPTS,
101 create_attempts=DEFAULT_CREATE_ATTEMPTS):
102 """
103 Class initialization
104
105 :param agent: (OpenOmciAgent) Agent
106 :param device_id: (str) ONU Device ID
107 :param tasks: (dict) Tasks to run
108 :param advertise_events: (bool) Advertise events on OpenOMCI Event Bus
109 :param states: (list) List of valid states
110 :param transitions: (dict) Dictionary of triggers and state changes
111 :param initial_state: (str) Initial state machine state
112 :param timeout_delay: (int/float) Number of seconds after a timeout to pause
113 :param tick_delay: (int/float) Collection poll check delay while idle
114 :param interval_skew: (int/float) Seconds to randomly skew the next interval
115 collection to spread out requests for PM intervals
116 :param collect_attempts: (int) Max requests for a single PM interval before fail
117 :param create_attempts: (int) Max attempts to create PM Managed entities before stopping state machine
118 """
119 self.log = structlog.get_logger(device_id=device_id)
120
121 self._agent = agent
122 self._device_id = device_id
123 self._device = None
124 self._pm_config = None
125 self._timeout_delay = timeout_delay
126 self._tick_delay = tick_delay
127 self._interval_skew = interval_skew
128 self._collect_attempts = collect_attempts
129 self._create_attempts = create_attempts
130
131 self._sync_time_task = tasks['sync-time']
132 self._get_interval_task = tasks['collect-data']
133 self._create_pm_task = tasks['create-pm']
134 self._delete_pm_task = tasks['delete-pm']
135 self._advertise_events = advertise_events
136
137 self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
138 RxEvent.MIB_Reset: None,
139 RxEvent.Create: None,
140 RxEvent.Delete: None
141 }
142 self._omci_cc_sub_mapping = {
143 RxEvent.MIB_Reset: self.on_mib_reset_response,
144 RxEvent.Create: self.on_create_response,
145 RxEvent.Delete: self.on_delete_response,
146 }
147 self._me_watch_list = {
148 MacBridgePortConfigurationData.class_id: {
149 'create-delete': self.add_remove_enet_frame_pm,
150 'instances': dict() # BP entity_id -> (PM class_id, PM entity_id)
151 }
152 }
153 self._deferred = None
154 self._task_deferred = None
155 self._current_task = None
156 self._add_me_deferred = None
157 self._delete_me_deferred = None
158 self._next_interval = None
159 self._enet_entity_id = IndexPool(1024, 1)
160 self._add_pm_me_retry = 0
161
162 # (Class ID, Instance ID) -> Collect attempts remaining
163 self._pm_me_collect_retries = dict()
164 self._pm_me_extended_info = dict()
165 self._add_pm_me = dict() # (pm cid, pm eid) -> (me cid, me eid, upstream)
166 self._del_pm_me = set()
167
168 # Pollable PM items
169 # Note that some items the KPI extracts are not listed below. These are the
170 # administrative states, operational states, and sensed ethernet type. The values
171 # in the MIB database should be accurate for these items.
172
173 self._ani_g_items = ["optical_signal_level", "transmit_optical_level"]
174 self._next_poll_time = datetime.utcnow()
175 self._poll_interval = 60 # TODO: Fixed at once a minute
176
177 # Statistics and attributes
178 # TODO: add any others if it will support problem diagnosis
179
180 # Set up state machine to manage states
181 self.machine = Machine(model=self, states=states,
182 transitions=transitions,
183 initial=initial_state,
184 queued=True,
185 ignore_invalid_triggers=True,
186 name='{}-{}'.format(self.__class__.__name__,
187 device_id))
188 try:
189 import logging
190 logging.getLogger('transitions').setLevel(logging.WARNING)
191 except Exception as e:
192 self.log.exception('log-level-failed', e=e)
193
194
195 def _cancel_deferred(self):
196 d1, self._deferred = self._deferred, None
197 d2, self._task_deferred = self._task_deferred, None
198 d3, self._add_me_deferred = self._add_me_deferred, None
199 d4, self._delete_me_deferred = self._delete_me_deferred, None
200
201 for d in [d1, d2, d3, d4]:
202 try:
203 if d is not None and not d.called:
204 d.cancel()
205 except:
206 pass
207
208 def _cancel_tasks(self):
209 task, self._current_task = self._current_task, None
210 if task is not None:
211 task.stop()
212
213 def __str__(self):
214 return 'PerformanceIntervals: Device ID: {}, State:{}'.format(self._device_id,
215 self.state)
216
217 def delete(self):
218 """
219 Cleanup any state information
220 """
221 self.stop()
222
223 @property
224 def device_id(self):
225 return self._device_id
226
227 @property
228 def advertise_events(self):
229 return self._advertise_events
230
231 @advertise_events.setter
232 def advertise_events(self, value):
233 if not isinstance(value, bool):
234 raise TypeError('Advertise event is a boolean')
235 self._advertise_events = value
236
237 def advertise(self, event, info):
238 """Advertise an event on the OpenOMCI event bus"""
239 if self._advertise_events:
240 self._agent.advertise(event,
241 {
242 'state-machine': self.machine.name,
243 'info': info,
244 'time': str(datetime.utcnow()),
245 'next': str(self._next_interval)
246 })
247
248 def set_pm_config(self, pm_config):
249 """
250 Set PM interval configuration
251
252 :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
253 :return:
254 """
255 self._pm_config = pm_config
256
257 def _me_is_supported(self, class_id):
258 """
259 Check to see if ONU supports this ME
260 :param class_id: (int) ME Class ID
261 :return: (bool) If ME is supported
262 """
263 #
264 supported = self._device.omci_capabilities.supported_managed_entities
265 return class_id in supported if supported is not None else False
266
267 def add_pm_me(self, pm_class_id, pm_entity_id, cid=0, eid=0, upstream=False):
268 """
269 Add a new Performance Monitoring ME.
270
271 The ME ID will be added to an internal list and will be added the next
272 time the idle state is reached. An 'add_pm_me' trigger will be raised in
273 case already in the Idle state.
274
275 :param pm_class_id: (int) ME Class ID (1..0xFFFE)
276 :param pm_entity_id: (int) Instance ID (1..0xFFFE)
277 :param cid: (int) Class ID of entity monitored, may be None
278 :param eid: (int) Instance ID of entity monitored, may be None
279 :param upstream: (bool): Flag indicating if PM is for upstream traffic
280 """
281 if not isinstance(pm_class_id, int):
282 raise TypeError('PM ME Instance ID is an integer')
283 if not 0 < pm_class_id < 0xFFFF:
284 raise ValueError('PM ME Instance ID must be 1..65534')
285
286 # Check to see if ONU supports this ME
287 if not self._me_is_supported(pm_class_id):
288 self.log.warn('unsupported-PM-me', class_id=pm_class_id)
289 return
290
291 key = (pm_class_id, pm_entity_id)
292 entry = (cid, eid, upstream)
293
294 if key not in self._pm_me_collect_retries and key not in self._add_pm_me:
295 self._add_pm_me[key] = entry
296
297 if self._add_me_deferred is None:
298 self._add_me_deferred = reactor.callLater(0, self.add_me)
299
300 if (pm_class_id, pm_entity_id) in self._del_pm_me:
301 self._del_pm_me.remove((pm_class_id, pm_entity_id))
302
303 def delete_pm_me(self, class_id, entity_id):
304 """
305 Remove a new Performance Monitoring ME.
306
307 The ME ID will be added to an internal list and will be removed the next
308 time the idle state is reached. An 'delete_pm_me' trigger will be raised in
309 case already in the Idle state.
310
311 :param class_id: (int) ME Class ID (1..0xFFFE)
312 :param entity_id: (int) Instance ID (1..0xFFFE)
313 """
314 if not isinstance(class_id, int):
315 raise TypeError('PM ME Class ID is an integer')
316 if not 0 < class_id < 0xFFFF:
317 raise ValueError('PM ME Class ID must be 1..65534')
318
319 # Check to see if ONU supports this ME
320 if not self._me_is_supported(class_id):
321 self.log.warn('unsupported-PM-me', class_id=class_id)
322 return
323
324 key = (class_id, entity_id)
325
326 if key in self._pm_me_collect_retries and key not in self._del_pm_me:
327 self._del_pm_me.add(key)
328
329 if self._delete_me_deferred is None:
330 self._delete_me_deferred = reactor.callLater(0, self.delete_me)
331
332 if key in self._add_pm_me:
333 self._add_pm_me.pop(key)
334
335 def on_enter_disabled(self):
336 """
337 State machine is being stopped
338 """
339 self.advertise(OpenOmciEventType.state_change, self.state)
340 self._cancel_deferred()
341 self._cancel_tasks()
342 self._next_interval = None
343
344 # Drop OMCI ME Response subscriptions
345 for event, sub in self._omci_cc_subscriptions.iteritems():
346 if sub is not None:
347 self._omci_cc_subscriptions[event] = None
348 self._device.omci_cc.event_bus.unsubscribe(sub)
349
350 # Manually remove ani ANI/PON and UNI PM interval MEs
351 config = self._device.configuration
352 anis = config.ani_g_entities
353 unis = config.uni_g_entities
354
355 if anis is not None:
356 for entity_id in anis.iterkeys():
357 self.delete_pm_me(FecPerformanceMonitoringHistoryData.class_id, entity_id)
358 self.delete_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id, entity_id)
359 self.delete_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id, entity_id)
360 self.delete_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id, entity_id)
361
362 if unis is not None:
363 for entity_id in config.uni_g_entities.iterkeys():
364 self.delete_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
365
366 def on_enter_starting(self):
367 """ Add the PON/ANI and UNI PM intervals"""
368 self.advertise(OpenOmciEventType.state_change, self.state)
369
370 self._device = self._agent.get_device(self._device_id)
371 self._cancel_deferred()
372
373 # Set up OMCI ME Response subscriptions
374 try:
375 for event, sub in self._omci_cc_sub_mapping.iteritems():
376 if self._omci_cc_subscriptions[event] is None:
377 self._omci_cc_subscriptions[event] = \
378 self._device.omci_cc.event_bus.subscribe(
379 topic=OMCI_CC.event_bus_topic(self._device_id, event),
380 callback=sub)
381
382 except Exception as e:
383 self.log.exception('omci-cc-subscription-setup', e=e)
384
385 try:
386 # Manually start some ANI/PON and UNI PM interval MEs
387 config = self._device.configuration
388 anis = config.ani_g_entities
389 unis = config.uni_g_entities
390
391 if anis is not None:
392 for entity_id in anis.iterkeys():
393 self.add_pm_me(FecPerformanceMonitoringHistoryData.class_id,
394 entity_id)
395 self.add_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id,
396 entity_id)
397 self.add_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id,
398 entity_id)
399 self.add_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id,
400 entity_id)
401
402 if unis is not None:
403 for entity_id in config.uni_g_entities.iterkeys():
404 self.add_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
405
406 # Look for existing instances of dynamically created ME's that have PM
407 # associated with them and add them now
408 for class_id in self._me_watch_list.iterkeys():
409 instances = {k: v for k, v in
410 self._device.query_mib(class_id=class_id).items()
411 if isinstance(k, int)}
412
413 for entity_id, data in instances.items():
414 method = self._me_watch_list[class_id]['create-delete']
415 cid, eid = method(None, class_id, entity_id,
416 add=True, attributes=data[ATTRIBUTES_KEY])
417 if cid > 0:
418 # BP entity_id -> (PM class_id, PM entity_id)
419 instances = self._me_watch_list[class_id]['instances']
420 instances[entity_id] = (cid, eid)
421
422 except Exception as e:
423 self.log.exception('pm-me-setup', class_id=class_id, e=e)
424
425 # Got to synchronize_time state
426 self._deferred = reactor.callLater(0, self.tick)
427
428 def on_enter_synchronize_time(self):
429 """
430 State machine has just transitioned to the synchronize_time state
431 """
432 self.advertise(OpenOmciEventType.state_change, self.state)
433 self._cancel_deferred()
434
435 def success(_results):
436 self.log.debug('sync-time-success')
437 self._current_task = None
438 self._deferred = reactor.callLater(0, self.success)
439 # Calculate next interval time
440 self._next_interval = self.get_next_interval
441
442 def failure(reason):
443 self.log.info('sync-time-failure', reason=reason)
444 self._current_task = None
445 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
446
447 # Schedule a task to set the ONU time
448 self._current_task = self._sync_time_task(self._agent, self._device_id)
449 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
450 self._task_deferred.addCallbacks(success, failure)
451
452 def on_enter_idle(self):
453 """
454 State machine has just transitioned to the idle state
455
456 In this state, any added PM MEs that need to be created will be.
457 TODO: some non-interval PM stats (if there are any) are collected here
458 """
459 self.advertise(OpenOmciEventType.state_change, self.state)
460 self._cancel_deferred()
461
462 if len(self._del_pm_me) and self._delete_me_deferred is None:
463 self._delete_me_deferred = reactor.callLater(0, self.delete_me)
464
465 elif len(self._add_pm_me) and self._add_me_deferred is None:
466 self._add_me_deferred = reactor.callLater(0, self.add_me)
467
468 elif datetime.utcnow() >= self._next_poll_time:
469 def success(results):
470 self._device.timestamp = arrow.utcnow().float_timestamp
471 self._device.mib_synchronizer.mib_set(results.me_class.class_id,
472 results.entity_id,
473 results.attributes)
474 self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
475
476 def failure(reason):
477 self.log.info('poll-failure', reason=reason)
478 self._device.timestamp = None
479 return None
480
481 # Scan all ANI-G ports
482 ani_g_entities = self._device.configuration.ani_g_entities
483 ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities is not None else None
484
485 if ani_g_entities_ids is not None and len(ani_g_entities_ids):
486 for entity_id in ani_g_entities_ids:
487 task = OmciGetRequest(self._agent, self.device_id,
488 AniG, entity_id,
489 self._ani_g_items, allow_failure=True)
490 self._task_deferred = self._device.task_runner.queue_task(task)
491 self._task_deferred.addCallbacks(success, failure)
492 else:
493 self.log.warn('poll-pm-no-anis')
494 self._next_poll_time = datetime.utcnow() + timedelta(seconds=self._poll_interval)
495
496 # TODO: Compute a better mechanism than just polling here, perhaps based on
497 # the next time to fetch data for 'any' interval
498 self._deferred = reactor.callLater(self._tick_delay, self.tick)
499
500 def on_enter_create_pm_me(self):
501 """
502 State machine has just transitioned to the create_pm_me state
503 """
504 self.advertise(OpenOmciEventType.state_change, self.state)
505 self._cancel_deferred()
506 self._cancel_tasks()
507 mes, self._add_pm_me = self._add_pm_me, dict()
508
509 def success(results):
510 self.log.debug('create-me-success', results=results)
511
512 # Check if already here. The create request could have received
513 # an already-exists status code which we consider successful
514 for pm, me in mes.items():
515 self._pm_me_collect_retries[pm] = self.pm_collected(pm)
516 self._pm_me_extended_info[pm] = me
517
518 self._current_task = None
519 self._deferred = reactor.callLater(0, self.success)
520
521 def failure(reason):
522 self.log.info('create-me-failure', reason=reason, retries=self._add_pm_me_retry)
523 self._current_task = None
524 if self._add_pm_me_retry <= self._create_attempts:
525 for pm, me in mes.items():
526 self._add_pm_me[pm] = me
527 self._add_pm_me_retry += 1
528 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
529 else:
530 # we cant seem to create any collection me, no point in doing anything
531 self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason, device_id=self._device_id)
532 self._deferred = reactor.callLater(self._timeout_delay, self.stop)
533
534 self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
535 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
536 self._task_deferred.addCallbacks(success, failure)
537
538 def on_enter_delete_pm_me(self):
539 """
540 State machine has just transitioned to the delete_pm_me state
541 """
542 self.advertise(OpenOmciEventType.state_change, self.state)
543 self._cancel_deferred()
544 self._cancel_tasks()
545
546 mes, self._del_pm_me = self._del_pm_me, set()
547
548 def success(results):
549 self.log.debug('delete-me-success', results=results)
550 self._current_task = None
551 for me in mes:
552 self._pm_me_collect_retries.pop(me)
553
554 self._deferred = reactor.callLater(0, self.success)
555
556 def failure(reason):
557 self.log.info('delete-me-failure', reason=reason)
558 self._current_task = None
559 for me in mes:
560 self._del_pm_me.add(me)
561
562 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
563
564 self._current_task = self._delete_pm_task(self._agent, self._device_id, mes)
565 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
566 self._task_deferred.addCallbacks(success, failure)
567
568 def on_enter_collect_data(self):
569 """
570 State machine has just transitioned to the collect_data state
571 """
572
573 if self._next_interval is not None and self._next_interval > datetime.utcnow():
574 self.log.debug('wait-next-interval')
575 # Not ready for next interval, transition back to idle and we should get
576 # called again after a short delay
577 reactor.callLater(0, self.success)
578 return
579
580 self.advertise(OpenOmciEventType.state_change, self.state)
581 self._cancel_deferred()
582 self._cancel_tasks()
583 keys = self._pm_me_collect_retries.keys()
584 shuffle(keys)
585
586 for key in keys:
587 class_id = key[0]
588 entity_id = key[1]
589
590 self.log.debug("in-enter-collect-data", data_key=key,
591 retries=self._pm_me_collect_retries[key])
592
593 # Collect the data ?
594 if self._pm_me_collect_retries[key] > 0:
595 def success(results):
596 self.log.debug('collect-success', results=results,
597 class_id=results.get('class_id'),
598 entity_id=results.get('entity_id'))
599 self._current_task = None
600 self._pm_me_collect_retries[key] = 0
601 self._deferred = reactor.callLater(0, self.success)
602 return results
603
604 def failure(reason):
605 self.log.info('collect-failure', reason=reason)
606 self._current_task = None
607 self._pm_me_collect_retries[key] -= 1
608 self._deferred = reactor.callLater(self._timeout_delay, self.failure)
609 return reason # Halt callback processing
610
611 # start the task
612 if key in self._pm_me_extended_info:
613 self.log.debug('collect-extended-info-found', data_key=key,
614 extended_info=self._pm_me_extended_info[key])
615 parent_class_id = self._pm_me_extended_info[key][0]
616 parent_entity_id = self._pm_me_extended_info[key][1]
617 upstream = self._pm_me_extended_info[key][2]
618 else:
619 self.log.debug('collect-extended-info-not-found', data_key=key)
620 parent_class_id = None
621 parent_entity_id = None
622 upstream = None
623
624 self._current_task = self._get_interval_task(self._agent, self._device_id,
625 class_id, entity_id,
626 parent_class_id=parent_class_id,
627 parent_entity_id=parent_entity_id,
628 upstream=upstream)
629 self._task_deferred = self._device.task_runner.queue_task(self._current_task)
630 self._task_deferred.addCallbacks(success, failure)
631 self._task_deferred.addCallback(self.publish_data)
632 return
633
634 # Here if all intervals have been collected (we are up to date)
635 self._next_interval = self.get_next_interval
636 self.log.debug('collect-calculate-next', next=self._next_interval)
637
638 self._pm_me_collect_retries = dict.fromkeys(self._pm_me_collect_retries, self._collect_attempts)
639 reactor.callLater(0, self.success)
640
641 def on_enter_threshold_exceeded(self):
642 """
643 State machine has just transitioned to the threshold_exceeded state
644 """
645 pass # TODO: Not sure if we want this state. Need to get alarm synchronizer working first
646
647 @property
648 def get_next_interval(self):
649 """
650 Determine the time for the next interval collection for all of this
651 ONUs PM Intervals. Earliest fetch time is at least 1 minute into the
652 next interval.
653
654 :return: (datetime) UTC time to get the next interval
655 """
656 now = datetime.utcnow()
657
658 # Get delta seconds to at least 1 minute into next interval
659 next_delta_secs = (16 - (now.minute % 15)) * 60
660 next_interval = now + timedelta(seconds=next_delta_secs)
661
662 # NOTE: For debugging, uncomment next section to perform collection
663 # right after initial code startup/mib-sync
664 if self._next_interval is None:
665 return now # Do it now (just for debugging purposes)
666
667 # Skew the next time up to the maximum specified
668 # TODO: May want to skew in a shorter range and select the minute
669 # based off some device property value to make collection a
670 # little more predictable on a per-ONU basis.
671 return next_interval + timedelta(seconds=uniform(0, self._interval_skew))
672
673 def pm_collected(self, key):
674 """
675 Query database and determine if PM data needs to be collected for this ME
676 """
677 class_id = key[0]
678 entity_id = key[1]
679
680 return self._collect_attempts # TODO: Implement persistent storage
681
682 def publish_data(self, results):
683 """
684 Publish the PM interval results on the appropriate bus. The results are
685 a dictionary with the following format.
686
687 'class-id': (int) ME Class ID,
688 'entity-id': (int) ME Entity ID,
689 'me-name': (str) ME Class name, # Mostly for debugging...
690 'interval-end-time': None,
691 'interval-utc-time': (DateTime) UTC time when retrieved from ONU,
692
693 Counters added here as they are retrieved with the format of
694 'counter-attribute-name': value (int)
695
696 :param results: (dict) PM results
697 """
698 self.log.debug('collect-publish', results=results)
699
700 if self._pm_config is not None:
701 self._pm_config.publish_metrics(results)
702
703 pass # TODO: Save off last time interval fetched to persistent storage?
704
705 def on_mib_reset_response(self, _topic, msg):
706 """
707 Called upon receipt of a MIB Reset Response for this ONU
708
709 :param _topic: (str) OMCI-RX topic
710 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
711 """
712 self.log.debug('on-mib-reset-response', state=self.state)
713 try:
714 response = msg[RX_RESPONSE_KEY]
715 omci_msg = response.fields['omci_message'].fields
716 status = omci_msg['success_code']
717
718 if status == RC.Success:
719 for class_id in self._me_watch_list.iterkeys():
720 # BP entity_id -> (PM class_id, PM entity_id)
721 instances = self._me_watch_list[class_id]['instances']
722 for _, me_pair in instances.items():
723 self._me_watch_list[class_id]['create-delete'](None, me_pair[0],
724 me_pair[1], add=False)
725 self._me_watch_list[class_id]['instances'] = dict()
726
727 except KeyError:
728 pass # NOP
729
730 def on_create_response(self, _topic, msg):
731 """
732 Called upon receipt of a Create Response for this ONU.
733
734 :param _topic: (str) OMCI-RX topic
735 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
736 """
737 self.log.debug('on-create-response', state=self.state)
738
739 def valid_request(stat, c_id, e_id):
740 return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
741 and stat in (RC.Success, RC.InstanceExists) \
742 and c_id in self._me_watch_list.keys() \
743 and e_id not in self._me_watch_list[c_id]['instances']
744
745 response = msg[RX_RESPONSE_KEY]
746 omci = response.fields['omci_message'].fields
747 class_id = omci['entity_class']
748 entity_id = omci['entity_id']
749 status = omci['success_code']
750
751 if valid_request(status, class_id, entity_id):
752 request = msg[TX_REQUEST_KEY]
753 method = self._me_watch_list[class_id]['create-delete']
754 cid, eid = method(request, class_id, entity_id, add=True)
755
756 if cid > 0:
757 # BP entity_id -> (PM class_id, PM entity_id)
758 instances = self._me_watch_list[class_id]['instances']
759 instances[entity_id] = (cid, eid)
760
761 def on_delete_response(self, _topic, msg):
762 """
763 Called upon receipt of a Delete Response for this ONU
764
765 :param _topic: (str) OMCI-RX topic
766 :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
767 """
768 self.log.debug('on-delete-response', state=self.state)
769
770 def valid_request(stat, cid, eid):
771 return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
772 and stat in (RC.Success, RC.UnknownInstance) \
773 and cid in self._me_watch_list.keys() \
774 and eid in self._me_watch_list[cid]['instances']
775
776 response = msg[RX_RESPONSE_KEY]
777 omci = response.fields['omci_message'].fields
778 class_id = omci['entity_class']
779 entity_id = omci['entity_id']
780 status = omci['success_code']
781
782 if valid_request(status, class_id, entity_id):
783 request = msg[TX_REQUEST_KEY]
784 method = self._me_watch_list[class_id]['create-delete']
785
786 method(request, class_id, entity_id, add=False)
787 # BP entity_id -> (PM class_id, PM entity_id)
788 instances = self._me_watch_list[class_id]['instances']
789 del instances[entity_id]
790
791 def get_pm_entity_id_for_add(self, pm_cid, eid):
792 """
793 Select the Entity ID to use for a specific PM Class ID. For extended
794 PM ME's, an entity id (>0) is allocated
795
796 :param pm_cid: (int) PM ME Class ID to create/get entry ID for
797 :param eid: (int) Reference class's entity ID. Used as PM entity ID for non-
798 extended PM history PMs
799 :return: (int) Entity ID to use
800 """
801 if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
802 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
803 return self._enet_entity_id.get_next()
804 return eid
805
806 def release_pm_entity_id(self, pm_cid, eid):
807 if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
808 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
809 try:
810 self._enet_entity_id.release(eid)
811 except:
812 pass
813
814 def add_remove_enet_frame_pm(self, request, class_id, entity_id,
815 add=True,
816 attributes=None):
817 """
818 Add/remove PM for the dynamic MAC Port configuration data.
819
820 This can be called in a variety of ways:
821
822 o If from an Response event from OMCI_CC, the request will contain
823 the original create/delete request. The class_id and entity_id will
824 be the MAC Data Configuration Data class and instance ID.
825 add = True if create, False if delete
826
827 o If starting up (and the associated ME is already created), the MAC
828 Data Configuration Data class and instance ID, and attributes are
829 provided. request = None and add = True
830
831 o If cleaning up (stopping), the PM ME class_id, entity_id are provided.
832 request = None and add = False
833
834 :return: (int, int) PM ME class_id and entity_id for add/remove was performed.
835 class and entity IDs are non-zero on success
836 """
837 pm_entity_id = 0
838 cid = 0
839 eid = 0
840 upstream = False
841
842 def tp_type_to_pm(tp):
843 # TODO: Support 64-bit extended Monitoring MEs.
844 # This will result in the need to maintain entity IDs of PMs differently
845 upstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
846 EthernetFrameExtendedPerformanceMonitoring.class_id,
847 EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id], True
848 downstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
849 EthernetFrameExtendedPerformanceMonitoring.class_id,
850 EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id], False
851 return {
852 1: downstream_types,
853 3: upstream_types,
854 5: downstream_types,
855 6: downstream_types,
856 }.get(tp, None)
857
858 if request is not None:
859 assert class_id == MacBridgePortConfigurationData.class_id
860
861 # Is this associated with the ANI or the UNI side of the bridge?
862 # For VOLTHA v2.0, only high-speed internet data service is
863 attributes = request.fields['omci_message'].fields['data']
864 pm_class_ids, upstream = tp_type_to_pm(attributes['tp_type'])
865 cid = request.fields['omci_message'].fields['entity_class']
866 eid = request.fields['omci_message'].fields['entity_id']
867 if not add:
868 instances = self._me_watch_list[cid]['instances']
869 _, pm_entity_id = instances.get(eid, (None, None))
870
871 elif add:
872 assert class_id == MacBridgePortConfigurationData.class_id
873 assert isinstance(attributes, dict)
874
875 # Is this associated with the ANI or the UNI side of the bridge?
876 pm_class_ids, upstream = tp_type_to_pm(attributes.get('tp_type'))
877 cid = class_id
878 eid = entity_id
879
880 else:
881 assert class_id in (EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id,
882 EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id,
883 EthernetFrameExtendedPerformanceMonitoring.class_id,
884 EthernetFrameExtendedPerformanceMonitoring64Bit.class_id)
885 pm_class_ids = [class_id]
886
887 if pm_class_ids is None:
888 return False # Unable to select a supported ME for this ONU
889
890 if add:
891 for pm_class_id in pm_class_ids:
892 if self._me_is_supported(pm_class_id):
893 pm_entity_id = self.get_pm_entity_id_for_add(pm_class_id, eid)
894 self.add_pm_me(pm_class_id, pm_entity_id, cid=cid, eid=eid,
895 upstream=upstream)
896 return pm_class_id, pm_entity_id
897 else:
898 for pm_class_id in pm_class_ids:
899 if self._me_is_supported(pm_class_id):
900 self.delete_pm_me(pm_class_id, pm_entity_id)
901 self.release_pm_entity_id(pm_class_id, pm_entity_id)
902 return pm_class_id, pm_entity_id
903
904 return 0, 0