blob: 7d4d304cb51a2884553b2dc364321a69f56a9d2e [file] [log] [blame]
Matt Jeanneretf1e9c5d2019-02-08 07:41:29 -05001#
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""
17OMCI Message support
18"""
19
20import sys
21import arrow
22from twisted.internet import reactor, defer
23from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
24from common.frameio.frameio import hexify
25from voltha.extensions.omci.omci import *
26from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
27from voltha.extensions.omci.me_frame import MEFrame
28from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
29from common.event_bus import EventBusClient
30from enum import IntEnum
31from binascii import hexlify
32
33
34def hexify(buffer):
35 """Return a hexadecimal string encoding of input buffer"""
36 return ''.join('%02x' % ord(c) for c in buffer)
37
38
39DEFAULT_OMCI_TIMEOUT = 3 # Seconds
40MAX_OMCI_REQUEST_AGE = 60 # Seconds
41DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
42MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
43
44CONNECTED_KEY = 'connected'
45TX_REQUEST_KEY = 'tx-request'
46RX_RESPONSE_KEY = 'rx-response'
47UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
48
49
50class OmciCCRxEvents(IntEnum):
51 AVC_Notification = 0,
52 MIB_Upload = 1,
53 MIB_Upload_Next = 2,
54 Create = 3,
55 Delete = 4,
56 Set = 5,
57 Alarm_Notification = 6,
58 Test_Result = 7,
59 MIB_Reset = 8,
60 Connectivity = 9,
61 Get_ALARM_Get = 10,
62 Get_ALARM_Get_Next = 11
63
64
65# abbreviations
66OP = EntityOperations
67RxEvent = OmciCCRxEvents
68
69
70class OMCI_CC(object):
71 """ Handle OMCI Communication Channel specifics for Adtran ONUs"""
72
73 MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
74 MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
75 MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
76 MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
77 LOW_PRIORITY = 0
78 HIGH_PRIORITY = 1
79
80 # Offset into some tuples for pending lists and tx in progress
81 PENDING_DEFERRED = 0
82 PENDING_FRAME = 1
83 PENDING_TIMEOUT = 2
84 PENDING_RETRY = 3
85
86 REQUEST_TIMESTAMP = 0
87 REQUEST_DEFERRED = 1
88 REQUEST_FRAME = 2
89 REQUEST_TIMEOUT = 3
90 REQUEST_RETRY = 4
91 REQUEST_DELAYED_CALL = 5
92
93 _frame_to_event_type = {
94 OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
95 OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
96 OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
97 OmciCreateResponse.message_id: RxEvent.Create,
98 OmciDeleteResponse.message_id: RxEvent.Delete,
99 OmciSetResponse.message_id: RxEvent.Set,
100 OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
101 OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
102 }
103
104 def __init__(self, adapter_agent, device_id, me_map=None,
105 clock=None):
106 self.log = structlog.get_logger(device_id=device_id)
107 self._adapter_agent = adapter_agent
108 self._device_id = device_id
109 self._proxy_address = None
110 self._enabled = False
111 self._extended_messaging = False
112 self._me_map = me_map
113 if clock is None:
114 self.reactor = reactor
115 else:
116 self.reactor = clock
117
118 # Support 2 levels of priority since only baseline message set supported
119 self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
120 self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
121 self._tx_request_deferred = [None, None] # Tx in progress but held till child Rx/TX can finish. ie omci tables.
122 self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
123 self._rx_response = [None, None]
124
125 # Statistics
126 self._tx_frames = 0
127 self._rx_frames = 0
128 self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
129 self._rx_onu_frames = 0 # Autonomously generated ONU frames
130 self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
131 self._rx_timeouts = 0
132 self._rx_late = 0 # Frame response received after timeout on Tx
133 self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
134 self._tx_errors = 0 # Exceptions during tx request
135 self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
136 self._reply_min = sys.maxint # Fastest successful tx -> rx
137 self._reply_max = 0 # Longest successful tx -> rx
138 self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
139 self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
140 self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
141
142 self.event_bus = EventBusClient()
143
144 # If a list of custom ME Entities classes were provided, insert them into
145 # main class_id to entity map.
146 # TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
147
148 def __str__(self):
149 return "OMCISupport: {}".format(self._device_id)
150
151 def _get_priority_index(self, high_priority):
152 """ Centralized logic to help make extended message support easier in the future"""
153 return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
154 else OMCI_CC.LOW_PRIORITY
155
156 def _tid_is_high_priority(self, tid):
157 """ Centralized logic to help make extended message support easier in the future"""
158
159 return not self._extended_messaging and \
160 OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
161
162 @staticmethod
163 def event_bus_topic(device_id, event):
164 """
165 Get the topic name for a given event Frame Type
166 :param device_id: (str) ONU Device ID
167 :param event: (OmciCCRxEvents) Type of event
168 :return: (str) Topic string
169 """
170 assert event in OmciCCRxEvents, \
171 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
172
173 return 'omci-rx:{}:{}'.format(device_id, event.name)
174
175 @property
176 def enabled(self):
177 return self._enabled
178
179 @enabled.setter
180 def enabled(self, value):
181 """
182 Enable/disable the OMCI Communications Channel
183
184 :param value: (boolean) True to enable, False to disable
185 """
186 assert isinstance(value, bool), 'enabled is a boolean'
187
188 if self._enabled != value:
189 self._enabled = value
190 if self._enabled:
191 self._start()
192 else:
193 self._stop()
194
195 @property
196 def tx_frames(self):
197 return self._tx_frames
198
199 @property
200 def rx_frames(self):
201 return self._rx_frames
202
203 @property
204 def rx_unknown_tid(self):
205 return self._rx_unknown_tid # Tx TID not found
206
207 @property
208 def rx_unknown_me(self):
209 return self._rx_unknown_me
210
211 @property
212 def rx_onu_frames(self):
213 return self._rx_onu_frames
214
215 @property
216 def rx_onu_discards(self):
217 return self._rx_onu_discards # Attribute Value change autonomous overflows
218
219 @property
220 def rx_timeouts(self):
221 return self._rx_timeouts
222
223 @property
224 def rx_late(self):
225 return self._rx_late
226
227 @property
228 def tx_errors(self):
229 return self._tx_errors
230
231 @property
232 def consecutive_errors(self):
233 return self._consecutive_errors
234
235 @property
236 def reply_min(self):
237 return int(round(self._reply_min * 1000.0)) # Milliseconds
238
239 @property
240 def reply_max(self):
241 return int(round(self._reply_max * 1000.0)) # Milliseconds
242
243 @property
244 def reply_average(self):
245 avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
246 return int(round(avg * 1000.0)) # Milliseconds
247
248 @property
249 def hp_tx_queue_len(self):
250 return len(self._pending[OMCI_CC.HIGH_PRIORITY])
251
252 @property
253 def lp_tx_queue_len(self):
254 return len(self._pending[OMCI_CC.LOW_PRIORITY])
255
256 @property
257 def max_hp_tx_queue(self):
258 return self._max_hp_tx_queue
259
260 @property
261 def max_lp_tx_queue(self):
262 return self._max_lp_tx_queue
263
264 def _start(self):
265 """
266 Start the OMCI Communications Channel
267 """
268 assert self._enabled, 'Start should only be called if enabled'
269 self.flush()
270
271 device = self._adapter_agent.get_device(self._device_id)
272 self._proxy_address = device.proxy_address
273
274 def _stop(self):
275 """
276 Stop the OMCI Communications Channel
277 """
278 assert not self._enabled, 'Stop should only be called if disabled'
279 self.flush()
280 self._proxy_address = None
281
282 def _receive_onu_message(self, rx_frame):
283 """ Autonomously generated ONU frame Rx handler"""
284 self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
285 frame=hexify(str(rx_frame)))
286
287 msg_type = rx_frame.fields['message_type']
288 self._rx_onu_frames += 1
289
290 msg = {TX_REQUEST_KEY: None,
291 RX_RESPONSE_KEY: rx_frame}
292
293 if msg_type == EntityOperations.AlarmNotification.value:
294 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
295 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
296
297 elif msg_type == EntityOperations.AttributeValueChange.value:
298 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
299 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
300
301 elif msg_type == EntityOperations.TestResult.value:
302 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
303 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
304
305 else:
306 self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
307 self._rx_onu_discards += 1
308
309 def _update_rx_tx_stats(self, now, ts):
310 ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
311 secs = ts_diff.total_seconds()
312 self._reply_sum += secs
313 if secs < self._reply_min:
314 self._reply_min = secs
315 if secs > self._reply_max:
316 self._reply_max = secs
317 return secs
318
319 def receive_message(self, msg):
320 """
321 Receive and OMCI message from the proxy channel to the OLT.
322
323 Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
324 :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
325 """
326 if not self.enabled:
327 return
328
329 try:
330 now = arrow.utcnow()
331 d = None
332
333 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
334 # save the current value of the entity_id_to_class_map, then
335 # replace it with our custom one before decode, and then finally
336 # restore it later. Tried other ways but really made the code messy.
337 saved_me_map = omci_entities.entity_id_to_class_map
338 omci_entities.entity_id_to_class_map = self._me_map
339
340 try:
341 rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
342 rx_tid = rx_frame.fields['transaction_id']
343
344 if rx_tid == 0:
345 return self._receive_onu_message(rx_frame)
346
347 # Previously unreachable if this is the very first Rx or we
348 # have been running consecutive errors
349 if self._rx_frames == 0 or self._consecutive_errors != 0:
350 self.reactor.callLater(0, self._publish_connectivity_event, True)
351
352 self._rx_frames += 1
353 self._consecutive_errors = 0
354
355 except KeyError as e:
356 # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
357 self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
358 rx_frame = self._decode_unknown_me(msg)
359 self._rx_unknown_me += 1
360 rx_tid = rx_frame.fields.get('transaction_id')
361
362 except Exception as e:
363 self.log.exception('frame-decode', msg=hexlify(msg), e=e)
364 return
365
366 finally:
367 omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
368
369 try:
370 high_priority = self._tid_is_high_priority(rx_tid)
371 index = self._get_priority_index(high_priority)
372
373 # (timestamp, defer, frame, timeout, retry, delayedCall)
374 last_tx_tuple = self._tx_request[index]
375
376 if last_tx_tuple is None or \
377 last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
378 # Possible late Rx on a message that timed-out
379 self._rx_unknown_tid += 1
380 #self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
381 #return
382
383 ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
384 if dc is not None and not dc.cancelled and not dc.called:
385 dc.cancel()
386 # self.log.debug("cancel-timeout-called")
387
388 secs = self._update_rx_tx_stats(now, ts)
389
390 # Late arrival already serviced by a timeout?
391 if d.called:
392 self._rx_late += 1
393 return
394
395 except Exception as e:
396 self.log.exception('frame-match', msg=hexlify(msg), e=e)
397 if d is not None:
398 return d.errback(failure.Failure(e))
399 return
400
401 # Extended processing needed. Note 'data' field will be None on some error
402 # status returns
403 omci_msg = rx_frame.fields['omci_message']
404
405 if isinstance(omci_msg, OmciGetResponse) and \
406 omci_msg.fields.get('data') is not None and \
407 'table_attribute_mask' in omci_msg.fields['data']:
408 # Yes, run in a separate generator
409 reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
410 rx_frame, d, tx_frame, high_priority)
411 else:
412 # Publish Rx event to listeners in a different task
413 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
414
415 # begin success callback chain (will cancel timeout and queue next Tx message)
416 from copy import copy
417 original_callbacks = copy(d.callbacks)
418 self._rx_response[index] = rx_frame
419 d.callback(rx_frame)
420
421 except Exception as e:
422 self.log.exception('rx-msg', e=e)
423
424 @inlineCallbacks
425 def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
426 """
427 Special handling for Get Requests that may require additional 'get_next' operations
428 if a table attribute was requested.
429 """
430 omci_msg = rx_frame.fields['omci_message']
431 rx_tid = rx_frame.fields.get('transaction_id')
432 high_priority = self._tid_is_high_priority(rx_tid)
433 frame_index = self._get_priority_index(high_priority)
434
435 if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
436
437 # save tx request for later so that below send/recv can finish
438 self._tx_request_deferred[frame_index] = self._tx_request[frame_index]
439 self._tx_request[frame_index] = None
440
441 try:
442 entity_class = omci_msg.fields['entity_class']
443 entity_id = omci_msg.fields['entity_id']
444 table_attributes = omci_msg.fields['data']['table_attribute_mask']
445
446 # Table attribute mask is encoded opposite of managed entity mask.
447 if entity_class in self._me_map:
448 ec = self._me_map[entity_class]
449 for index in xrange(1, len(ec.attributes) + 1):
450 attr_mask = 1 << index
451
452 if attr_mask & table_attributes:
453 self.log.debug('omcc-get-table-ec', ec=ec, index=index, attr_mask=attr_mask,
454 table_attributes=table_attributes)
455 eca = ec.attributes[index]
456 self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
457
458 seq_no = 0
459 data_buffer = ''
460 count = omci_msg.fields['data'][eca.field.name + '_size']
461
462 if count > MAX_TABLE_ROW_COUNT:
463 self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
464 raise ValueError('Huge Table Size: {}'.format(count))
465
466 # Original timeout must be chopped up into each individual get-next request
467 # in order for total transaction to complete within the timeframe of the
468 # original get() timeout.
469 number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
470 timeout /= (1 + number_transactions)
471
472 # Start the loop
473 vals = []
474 for offset in xrange(0, count, OmciTableField.PDU_SIZE):
475 frame = MEFrame(ec, entity_id, {eca.field.name: seq_no}).get_next()
476 seq_no += 1
477
478 max_retries = 3
479 results = yield self.send(frame, min(timeout / max_retries, secs * 3), max_retries)
480
481 omci_getnext_msg = results.fields['omci_message']
482 status = omci_getnext_msg.fields['success_code']
483
484 if status != ReasonCodes.Success.value:
485 raise Exception('get-next-failure table=' + eca.field.name +
486 ' entity_id=' + str(entity_id) +
487 ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
488
489 # Extract the data
490 num_octets = count - offset
491 if num_octets > OmciTableField.PDU_SIZE:
492 num_octets = OmciTableField.PDU_SIZE
493
494 data = omci_getnext_msg.fields['data'][eca.field.name]
495 data_buffer += data[:num_octets]
496
497 while data_buffer:
498 data_buffer, val = eca.field.getfield(None, data_buffer)
499 vals.append(val)
500
501 omci_msg.fields['data'][eca.field.name] = vals
502 del omci_msg.fields['data'][eca.field.name + '_size']
503 self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
504 row_count=len(vals))
505 del omci_msg.fields['data']['table_attribute_mask']
506
507 except Exception as e:
508 self.log.exception('get-next-error', e=e)
509 self._tx_request_deferred[frame_index] = None
510 d.errback(failure.Failure(e), high_priority)
511 return
512
513 except IndexError as e:
514 self.log.exception('get-next-index-error', e=e)
515 self._tx_request_deferred[frame_index] = None
516 d.errback(failure.Failure(e), high_priority)
517 return
518
519 # Put it back so the outer Rx/Tx can finish
520 self._tx_request[frame_index] = self._tx_request_deferred[frame_index]
521 self._tx_request_deferred[frame_index] = None
522
523 # Publish Rx event to listeners in a different task
524 if not isinstance(omci_msg, OmciGetNextResponse):
525 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
526
527 from copy import copy
528 original_callbacks = copy(d.callbacks)
529 self._rx_response[frame_index] = rx_frame
530 d.callback(rx_frame)
531 self.log.debug("finished-processing-get-rx-frame")
532
533 def _decode_unknown_me(self, msg):
534 """
535 Decode an ME for an unsupported class ID. This should only occur for a subset
536 of message types (Get, Set, MIB Upload Next, ...) and they should only be
537 responses as well.
538
539 There are some times below that are commented out. For VOLTHA 2.0, it is
540 expected that any get, set, create, delete for unique (often vendor) MEs
541 will be coded by the ONU utilizing it and supplied to OpenOMCI as a
542 vendor-specific ME during device initialization.
543
544 :param msg: (str) Binary data
545 :return: (OmciFrame) resulting frame
546 """
547 from struct import unpack
548
549 (tid, msg_type, framing) = unpack('!HBB', msg[0:4])
550
551 assert framing == 0xa, 'Only basic OMCI framing supported at this time'
552 msg = msg[4:]
553
554 # TODO: Commented out items below are future work (not expected for VOLTHA v2.0)
555 (msg_class, kwargs) = {
556 # OmciCreateResponse.message_id: (OmciCreateResponse, None),
557 # OmciDeleteResponse.message_id: (OmciDeleteResponse, None),
558 # OmciSetResponse.message_id: (OmciSetResponse, None),
559 # OmciGetResponse.message_id: (OmciGetResponse, None),
560 # OmciGetAllAlarmsNextResponse.message_id: (OmciGetAllAlarmsNextResponse, None),
561 OmciMibUploadNextResponse.message_id: (OmciMibUploadNextResponse,
562 {
563 'entity_class': unpack('!H', msg[0:2])[0],
564 'entity_id': unpack('!H', msg[2:4])[0],
565 'object_entity_class': unpack('!H', msg[4:6])[0],
566 'object_entity_id': unpack('!H', msg[6:8])[0],
567 'object_attributes_mask': unpack('!H', msg[8:10])[0],
568 'object_data': {
569 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[10:-4])
570 },
571 }),
572 # OmciAlarmNotification.message_id: (OmciAlarmNotification, None),
573 OmciAttributeValueChange.message_id: (OmciAttributeValueChange,
574 {
575 'entity_class': unpack('!H', msg[0:2])[0],
576 'entity_id': unpack('!H', msg[2:4])[0],
577 'data': {
578 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[4:-8])
579 },
580 }),
581 # OmciTestResult.message_id: (OmciTestResult, None),
582 }.get(msg_type, None)
583
584 if msg_class is None:
585 raise TypeError('Unsupport Message Type for Unknown Decode: {}',
586 msg_type)
587
588 return OmciFrame(transaction_id=tid, message_type=msg_type,
589 omci_message=msg_class(**kwargs))
590
591 def _publish_rx_frame(self, tx_frame, rx_frame):
592 """
593 Notify listeners of successful response frame
594 :param tx_frame: (OmciFrame) Original request frame
595 :param rx_frame: (OmciFrame) Response frame
596 """
597 if self._enabled and isinstance(rx_frame, OmciFrame):
598 frame_type = rx_frame.fields['omci_message'].message_id
599 event_type = OMCI_CC._frame_to_event_type.get(frame_type)
600
601 if event_type is not None:
602 topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
603 msg = {TX_REQUEST_KEY: tx_frame,
604 RX_RESPONSE_KEY: rx_frame}
605
606 self.event_bus.publish(topic=topic, msg=msg)
607
608 def _publish_connectivity_event(self, connected):
609 """
610 Notify listeners of Rx/Tx connectivity over OMCI
611 :param connected: (bool) True if connectivity transitioned from unreachable
612 to reachable
613 """
614 if self._enabled:
615 topic = OMCI_CC.event_bus_topic(self._device_id,
616 RxEvent.Connectivity)
617 msg = {CONNECTED_KEY: connected}
618 self.event_bus.publish(topic=topic, msg=msg)
619
620 def flush(self):
621 """Flush/cancel in active or pending Tx requests"""
622 requests = []
623
624 for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
625 next_frame, self._tx_request[priority] = self._tx_request[priority], None
626 if next_frame is not None:
627 requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
628
629 requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
630 for next_frame in self._pending[priority]]
631 self._pending[priority] = list()
632
633 # Cancel them...
634 def cleanup_unhandled_error(_):
635 pass # So the cancel below does not flag an unhandled error
636
637 for d, dc in requests:
638 if d is not None and not d.called:
639 d.addErrback(cleanup_unhandled_error)
640 d.cancel()
641
642 if dc is not None and not dc.called and not dc.cancelled:
643 dc.cancel()
644
645 def _get_tx_tid(self, high_priority=False):
646 """
647 Get the next Transaction ID for a tx. Note TID=0 is reserved
648 for autonomously generated messages from an ONU
649
650 :return: (int) TID
651 """
652 if self._extended_messaging or not high_priority:
653 index = OMCI_CC.LOW_PRIORITY
654 min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
655 max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
656 else:
657 index = OMCI_CC.HIGH_PRIORITY
658 min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
659 max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
660
661 tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
662
663 if self._tx_tid[index] > max_tid:
664 self._tx_tid[index] = min_tid
665
666 return tx_tid
667
668 def _request_failure(self, value, tx_tid, high_priority):
669 """
670 Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
671 will call a different method that may retry if requested. This routine
672 will be called after the final (if any) timeout or other error
673
674 :param value: (Failure) Twisted failure
675 :param tx_tid: (int) Associated Tx TID
676 """
677 index = self._get_priority_index(high_priority)
678
679 if self._tx_request[index] is not None:
680 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
681 tx_frame_tid = tx_frame.fields['transaction_id']
682
683 if tx_frame_tid == tx_tid:
684 timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
685 dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
686 self._tx_request[index] = None
687
688 if dc is not None and not dc.called and not dc.cancelled:
689 dc.cancel()
690
691 if isinstance(value, failure.Failure):
692 value.trap(CancelledError)
693 self._rx_timeouts += 1
694 self._consecutive_errors += 1
695 if self._consecutive_errors == 1:
696 reactor.callLater(0, self._publish_connectivity_event, False)
697
698 self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
699 value = failure.Failure(TimeoutError(timeout, "Deferred"))
700 else:
701 # Search pending queue. This may be a cancel coming in from the original
702 # task that requested the Tx. If found, remove
703 # from pending queue
704 for index, request in enumerate(self._pending[index]):
705 req = request.get(OMCI_CC.PENDING_DEFERRED)
706 if req is not None and req.fields['transaction_id'] == tx_tid:
707 self._pending[index].pop(index)
708 break
709
710 self._send_next_request(high_priority)
711 return value
712
713 def _request_success(self, rx_frame, high_priority):
714 """
715 Handle transmit success (a matching Rx was received)
716
717 :param rx_frame: (OmciFrame) OMCI response frame with matching TID
718 :return: (OmciFrame) OMCI response frame with matching TID
719 """
720 index = self._get_priority_index(high_priority)
721
722 if rx_frame is None:
723 rx_frame = self._rx_response[index]
724
725 rx_tid = rx_frame.fields.get('transaction_id')
726
727 if rx_tid is not None:
728 if self._tx_request[index] is not None:
729 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
730 tx_tid = tx_frame.fields['transaction_id']
731
732 if rx_tid == tx_tid:
733 # Remove this request. Next callback in chain initiates next Tx
734 self._tx_request[index] = None
735 else:
736 self._rx_late += 1
737 else:
738 self._rx_late += 1
739
740 self._send_next_request(high_priority)
741
742 # Return rx_frame (to next item in callback list)
743 return rx_frame
744
745 def _request_timeout(self, tx_tid, high_priority):
746 """
747 Tx Request timed out. Resend immediately if there retries is non-zero. A
748 separate deferred (dc) is used on each actual Tx which is not the deferred
749 (d) that is returned to the caller of the 'send()' method.
750
751 If the timeout if the transmitted frame was zero, this is just cleanup of
752 that transmit request and not necessarily a transmit timeout
753
754 :param tx_tid: (int) TID of frame
755 :param high_priority: (bool) True if high-priority queue
756 """
757 self.log.debug("_request_timeout", tx_tid=tx_tid)
758 index = self._get_priority_index(high_priority)
759
760 if self._tx_request[index] is not None:
761 # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
762 ts, d, frame, timeout, retry, _dc = self._tx_request[index]
763
764 if frame.fields.get('transaction_id', 0) == tx_tid:
765 self._tx_request[index] = None
766
767 if timeout > 0:
768 self._rx_timeouts += 1
769
770 if retry > 0:
771 # Push on front of TX pending queue so that it transmits next with the
772 # original TID
773 self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
774
775 elif not d.called:
776 d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
777 else:
778 self.log.warn('timeout-but-not-the-tx-frame') # Statement mainly for debugging
779
780 self._send_next_request(high_priority)
781
782 def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
783 index = self._get_priority_index(high_priority)
784 tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
785
786 if front:
787 self._pending[index].insert(0, tuple)
788 else:
789 self._pending[index].append(tx_tuple)
790
791 # Monitor queue stats
792 qlen = len(self._pending[index])
793
794 if high_priority:
795 if self._max_hp_tx_queue < qlen:
796 self._max_hp_tx_queue = qlen
797
798 elif self._max_lp_tx_queue < qlen:
799 self._max_lp_tx_queue = qlen
800
801 self.log.debug("queue-size", index=index, pending_qlen=qlen)
802
803 def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
804 """
805 Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
806
807 :param frame: (OMCIFrame) Message to send
808 :param timeout: (int) Rx Timeout. 0=No response needed
809 :param retry: (int) Additional retry attempts on channel failure, default=0
810 :param high_priority: (bool) High Priority requests
811 :return: (deferred) A deferred that fires when the response frame is received
812 or if an error/timeout occurs
813 """
814 if not self.enabled or self._proxy_address is None:
815 # TODO custom exceptions throughout this code would be helpful
816 self._tx_errors += 1
817 return fail(result=failure.Failure(Exception('OMCI is not enabled')))
818
819 timeout = float(timeout)
820 if timeout > float(MAX_OMCI_REQUEST_AGE):
821 self._tx_errors += 1
822 msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
823 return fail(result=failure.Failure(Exception(msg)))
824
825 if not isinstance(frame, OmciFrame):
826 self._tx_errors += 1
827 msg = "Invalid frame class '{}'".format(type(frame))
828 return fail(result=failure.Failure(Exception(msg)))
829 try:
830 index = self._get_priority_index(high_priority)
831 tx_tid = frame.fields['transaction_id']
832
833 if tx_tid is None:
834 tx_tid = self._get_tx_tid(high_priority=high_priority)
835 frame.fields['transaction_id'] = tx_tid
836
837 assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
838 assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
839
840 # Queue it and request next Tx if tx channel is free
841 d = defer.Deferred()
842
843 self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
844 self._send_next_request(high_priority)
845
846 if timeout == 0:
847 self.log.debug("send-timeout-zero", tx_tid=tx_tid)
848 self.reactor.callLater(0, d.callback, 'queued')
849
850 return d
851
852 except Exception as e:
853 self._tx_errors += 1
854 self._consecutive_errors += 1
855
856 if self._consecutive_errors == 1:
857 self.reactor.callLater(0, self._publish_connectivity_event, False)
858
859 self.log.exception('send-omci', e=e)
860 return fail(result=failure.Failure(e))
861
862 def _ok_to_send(self, tx_request, high_priority):
863 """
864 G.988 specifies not to issue a MIB upload or a Software download request
865 when a similar action is in progress on the other channel. To keep the
866 logic here simple, a new upload/download will not be allowed if either a
867 upload/download is going on
868
869 :param tx_request (OmciFrame) Frame to send
870 :param high_priority: (bool) for queue selection
871 :return: True if okay to dequeue and send frame
872 """
873 other = self._get_priority_index(not high_priority)
874
875 if self._tx_request[other] is None:
876 return True
877
878 this_msg_type = tx_request.fields['message_type'] & 0x1f
879 not_allowed = {OP.MibUpload.value,
880 OP.MibUploadNext.value,
881 OP.StartSoftwareDownload.value,
882 OP.DownloadSection.value,
883 OP.EndSoftwareDownload.value}
884
885 if this_msg_type not in not_allowed:
886 return True
887
888 other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
889 return other_msg_type not in not_allowed
890
891 def _send_next_request(self, high_priority):
892 """
893 Pull next tx request and send it
894
895 :param high_priority: (bool) True if this was a high priority request
896 :return: results, so callback chain continues if needed
897 """
898 index = self._get_priority_index(high_priority)
899
900 if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
901 d = None
902 try:
903 if len(self._pending[index]) and \
904 not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
905 high_priority):
906 reactor.callLater(0.05, self._send_next_request, high_priority)
907 return
908
909 next_frame = self._pending[index].pop(0)
910
911 d = next_frame[OMCI_CC.PENDING_DEFERRED]
912 frame = next_frame[OMCI_CC.PENDING_FRAME]
913 timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
914 retry = next_frame[OMCI_CC.PENDING_RETRY]
915
916 tx_tid = frame.fields['transaction_id']
917
918 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
919 # save the current value of the entity_id_to_class_map, then
920 # replace it with our custom one before decode, and then finally
921 # restore it later. Tried other ways but really made the code messy.
922 saved_me_map = omci_entities.entity_id_to_class_map
923 omci_entities.entity_id_to_class_map = self._me_map
924
925 ts = arrow.utcnow().float_timestamp
926 try:
927 self._rx_response[index] = None
928 self._adapter_agent.send_proxied_message(self._proxy_address,
929 hexify(str(frame)))
930 finally:
931 omci_entities.entity_id_to_class_map = saved_me_map
932
933 self._tx_frames += 1
934
935 # Note: the 'd' deferred in the queued request we just got will
936 # already have its success callback queued (callLater -> 0) with a
937 # result of "queued". Here we need time it out internally so
938 # we can call cleanup appropriately. G.988 mentions that most ONUs
939 # will process an request in < 1 second.
940 dc_timeout = timeout if timeout > 0 else 1.0
941
942 # Timeout on internal deferred to support internal retries if requested
943 dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
944
945 # (timestamp, defer, frame, timeout, retry, delayedCall)
946 self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
947
948 if timeout > 0:
949 d.addCallbacks(self._request_success, self._request_failure,
950 callbackArgs=(high_priority,),
951 errbackArgs=(tx_tid, high_priority))
952
953 except IndexError:
954 pass # Nothing pending in this queue
955
956 except Exception as e:
957 self.log.exception('send-proxy-exception', e=e)
958 self._tx_request[index] = None
959 self.reactor.callLater(0, self._send_next_request, high_priority)
960
961 if d is not None:
962 d.errback(failure.Failure(e))
963 else:
964 self.log.debug("tx-request-occupied", index=index)
965
966 ###################################################################################
967 # MIB Action shortcuts
968
969 def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
970 """
971 Perform a MIB Reset
972 """
973 self.log.debug('send-mib-reset')
974
975 frame = OntDataFrame().mib_reset()
976 return self.send(frame, timeout=timeout, high_priority=high_priority)
977
978 def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
979 self.log.debug('send-mib-upload')
980
981 frame = OntDataFrame().mib_upload()
982 return self.send(frame, timeout=timeout, high_priority=high_priority)
983
984 def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
985 self.log.debug('send-mib-upload-next')
986
987 frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
988 return self.send(frame, timeout=timeout, high_priority=high_priority)
989
990 def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
991 """
992 Send an ONU Device reboot request (ONU-G ME).
993
994 NOTICE: This method is being deprecated and replaced with a tasks to preform this function
995 """
996 self.log.debug('send-mib-reboot')
997
998 frame = OntGFrame().reboot()
999 return self.send(frame, timeout=timeout, high_priority=high_priority)
1000
1001 def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1002 self.log.debug('send_get_alarm')
1003
1004 frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
1005 return self.send(frame, timeout=timeout, high_priority=high_priority)
1006
1007 def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1008 self.log.debug('send_get_alarm_next')
1009
1010 frame = OntDataFrame().get_all_alarm_next(seq_no)
1011 return self.send(frame, timeout=timeout, high_priority=high_priority)
1012
1013 def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1014 frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
1015 return self.send(frame, timeout, 3, high_priority=high_priority)
1016
1017 def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
1018 """
1019 # timeout=0 indicates no repons needed
1020 """
1021 # self.log.debug("send_download_section", instance_id=image_inst_id, section=section_num, timeout=timeout)
1022 if timeout > 0:
1023 frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
1024 else:
1025 frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
1026 return self.send(frame, timeout, high_priority=high_priority)
1027
1028 # if timeout > 0:
1029 # self.reactor.callLater(0, self.sim_receive_download_section_resp,
1030 # frame.fields["transaction_id"],
1031 # frame.fields["omci_message"].fields["section_number"])
1032 # return d
1033
1034 def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1035 frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
1036 return self.send(frame, timeout, high_priority=high_priority)
1037 # self.reactor.callLater(0, self.sim_receive_end_software_download_resp, frame.fields["transaction_id"])
1038 # return d
1039
1040 def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1041 frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
1042 return self.send(frame, timeout, high_priority=high_priority)
1043
1044 def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
1045 frame = SoftwareImageFrame(image_inst_id).commit_image()
1046 return self.send(frame, timeout, high_priority=high_priority)
1047