blob: e7f306b1e41af9d1be63d3470d9ffc824d83ed04 [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""
17OMCI Message support
18"""
19
20import sys
21import arrow
22from twisted.internet import reactor, defer
23from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
24from pyvoltha.adapters.common.frameio.frameio import hexify
25from pyvoltha.adapters.extensions.omci.omci import *
26from pyvoltha.adapters.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
27from pyvoltha.adapters.extensions.omci.me_frame import MEFrame
28from pyvoltha.adapters.extensions.omci.omci_defs import EntityOperations, ReasonCodes
29from pyvoltha.adapters.extensions.omci.omci_entities import entity_id_to_class_map
30from pyvoltha.common.event_bus import EventBusClient
Matt Jeanneret35c42d22019-03-07 17:23:27 -050031from voltha_protos.inter_container_pb2 import InterAdapterMessageType, InterAdapterOmciMessage
Chip Boling67b674a2019-02-08 11:42:18 -060032from enum import IntEnum
33from binascii import hexlify
34
35
36def hexify(buffer):
37 """Return a hexadecimal string encoding of input buffer"""
38 return ''.join('%02x' % ord(c) for c in buffer)
39
40
41DEFAULT_OMCI_TIMEOUT = 10 # 3 # Seconds
42MAX_OMCI_REQUEST_AGE = 60 # Seconds
43DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
Chip Boling67b674a2019-02-08 11:42:18 -060044
45CONNECTED_KEY = 'connected'
46TX_REQUEST_KEY = 'tx-request'
47RX_RESPONSE_KEY = 'rx-response'
48UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
49
50
51class OmciCCRxEvents(IntEnum):
52 AVC_Notification = 0,
53 MIB_Upload = 1,
54 MIB_Upload_Next = 2,
55 Create = 3,
56 Delete = 4,
57 Set = 5,
58 Alarm_Notification = 6,
59 Test_Result = 7,
60 MIB_Reset = 8,
61 Connectivity = 9,
62 Get_ALARM_Get = 10,
63 Get_ALARM_Get_Next = 11
64
65
66# abbreviations
67OP = EntityOperations
68RxEvent = OmciCCRxEvents
69
70
71class OMCI_CC(object):
72 """ Handle OMCI Communication Channel specifics for Adtran ONUs"""
73
74 MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
75 MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
76 MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
77 MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
78 LOW_PRIORITY = 0
79 HIGH_PRIORITY = 1
80
81 # Offset into some tuples for pending lists and tx in progress
82 PENDING_DEFERRED = 0
83 PENDING_FRAME = 1
84 PENDING_TIMEOUT = 2
85 PENDING_RETRY = 3
86
87 REQUEST_TIMESTAMP = 0
88 REQUEST_DEFERRED = 1
89 REQUEST_FRAME = 2
90 REQUEST_TIMEOUT = 3
91 REQUEST_RETRY = 4
92 REQUEST_DELAYED_CALL = 5
93
94 _frame_to_event_type = {
95 OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
96 OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
97 OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
98 OmciCreateResponse.message_id: RxEvent.Create,
99 OmciDeleteResponse.message_id: RxEvent.Delete,
100 OmciSetResponse.message_id: RxEvent.Set,
101 OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
102 OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
103 }
104
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500105 def __init__(self, core_proxy, adapter_proxy, device_id, me_map=None,
Chip Boling67b674a2019-02-08 11:42:18 -0600106 clock=None):
107 self.log = structlog.get_logger(device_id=device_id)
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500108 self._core_proxy = core_proxy
109 self._adapter_proxy = adapter_proxy
Chip Boling67b674a2019-02-08 11:42:18 -0600110 self._device_id = device_id
111 self._proxy_address = None
112 self._enabled = False
113 self._extended_messaging = False
114 self._me_map = me_map
115 if clock is None:
116 self.reactor = reactor
117 else:
118 self.reactor = clock
119
120 # Support 2 levels of priority since only baseline message set supported
121 self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
122 self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
123 self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
124 self._rx_response = [None, None]
125
126 # Statistics
127 self._tx_frames = 0
128 self._rx_frames = 0
129 self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
130 self._rx_onu_frames = 0 # Autonomously generated ONU frames
131 self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
132 self._rx_timeouts = 0
133 self._rx_late = 0 # Frame response received after timeout on Tx
134 self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
135 self._tx_errors = 0 # Exceptions during tx request
136 self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
137 self._reply_min = sys.maxint # Fastest successful tx -> rx
138 self._reply_max = 0 # Longest successful tx -> rx
139 self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
140 self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
141 self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
142
143 self.event_bus = EventBusClient()
144
145 # If a list of custom ME Entities classes were provided, insert them into
146 # main class_id to entity map.
147 # TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
148
149 def __str__(self):
150 return "OMCISupport: {}".format(self._device_id)
151
152 def _get_priority_index(self, high_priority):
153 """ Centralized logic to help make extended message support easier in the future"""
154 return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
155 else OMCI_CC.LOW_PRIORITY
156
157 def _tid_is_high_priority(self, tid):
158 """ Centralized logic to help make extended message support easier in the future"""
159
160 return not self._extended_messaging and \
161 OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
162
163 @staticmethod
164 def event_bus_topic(device_id, event):
165 """
166 Get the topic name for a given event Frame Type
167 :param device_id: (str) ONU Device ID
168 :param event: (OmciCCRxEvents) Type of event
169 :return: (str) Topic string
170 """
171 assert event in OmciCCRxEvents, \
172 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
173
174 return 'omci-rx:{}:{}'.format(device_id, event.name)
175
176 @property
177 def enabled(self):
178 return self._enabled
179
180 @enabled.setter
181 def enabled(self, value):
182 """
183 Enable/disable the OMCI Communications Channel
184
185 :param value: (boolean) True to enable, False to disable
186 """
187 assert isinstance(value, bool), 'enabled is a boolean'
188
189 if self._enabled != value:
190 self._enabled = value
191 if self._enabled:
192 self._start()
193 else:
194 self._stop()
195
196 @property
197 def tx_frames(self):
198 return self._tx_frames
199
200 @property
201 def rx_frames(self):
202 return self._rx_frames
203
204 @property
205 def rx_unknown_tid(self):
206 return self._rx_unknown_tid # Tx TID not found
207
208 @property
209 def rx_unknown_me(self):
210 return self._rx_unknown_me
211
212 @property
213 def rx_onu_frames(self):
214 return self._rx_onu_frames
215
216 @property
217 def rx_onu_discards(self):
218 return self._rx_onu_discards # Attribute Value change autonomous overflows
219
220 @property
221 def rx_timeouts(self):
222 return self._rx_timeouts
223
224 @property
225 def rx_late(self):
226 return self._rx_late
227
228 @property
229 def tx_errors(self):
230 return self._tx_errors
231
232 @property
233 def consecutive_errors(self):
234 return self._consecutive_errors
235
236 @property
237 def reply_min(self):
238 return int(round(self._reply_min * 1000.0)) # Milliseconds
239
240 @property
241 def reply_max(self):
242 return int(round(self._reply_max * 1000.0)) # Milliseconds
243
244 @property
245 def reply_average(self):
246 avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
247 return int(round(avg * 1000.0)) # Milliseconds
248
249 @property
250 def hp_tx_queue_len(self):
251 return len(self._pending[OMCI_CC.HIGH_PRIORITY])
252
253 @property
254 def lp_tx_queue_len(self):
255 return len(self._pending[OMCI_CC.LOW_PRIORITY])
256
257 @property
258 def max_hp_tx_queue(self):
259 return self._max_hp_tx_queue
260
261 @property
262 def max_lp_tx_queue(self):
263 return self._max_lp_tx_queue
264
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500265 @inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -0600266 def _start(self):
267 """
268 Start the OMCI Communications Channel
269 """
270 assert self._enabled, 'Start should only be called if enabled'
271 self.flush()
272
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500273 self._device = yield self._core_proxy.get_device(self._device_id)
274 self._proxy_address = self._device.proxy_address
Chip Boling67b674a2019-02-08 11:42:18 -0600275
276 def _stop(self):
277 """
278 Stop the OMCI Communications Channel
279 """
280 assert not self._enabled, 'Stop should only be called if disabled'
281 self.flush()
282 self._proxy_address = None
283
284 def _receive_onu_message(self, rx_frame):
285 """ Autonomously generated ONU frame Rx handler"""
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400286 self.log.debug('rx-onu-frame', frame_type=type(rx_frame))
Chip Boling67b674a2019-02-08 11:42:18 -0600287
288 msg_type = rx_frame.fields['message_type']
289 self._rx_onu_frames += 1
290
291 msg = {TX_REQUEST_KEY: None,
292 RX_RESPONSE_KEY: rx_frame}
293
294 if msg_type == EntityOperations.AlarmNotification.value:
295 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
296 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
297
298 elif msg_type == EntityOperations.AttributeValueChange.value:
299 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
300 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
301
302 elif msg_type == EntityOperations.TestResult.value:
303 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
304 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
305
306 else:
307 self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
308 self._rx_onu_discards += 1
309
310 def _update_rx_tx_stats(self, now, ts):
311 ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
312 secs = ts_diff.total_seconds()
313 self._reply_sum += secs
314 if secs < self._reply_min:
315 self._reply_min = secs
316 if secs > self._reply_max:
317 self._reply_max = secs
318 return secs
319
320 def receive_message(self, msg):
321 """
322 Receive and OMCI message from the proxy channel to the OLT.
323
324 Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
325 :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
326 """
327 if not self.enabled:
328 return
329
330 try:
331 now = arrow.utcnow()
332 d = None
333
334 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
335 # save the current value of the entity_id_to_class_map, then
336 # replace it with our custom one before decode, and then finally
337 # restore it later. Tried other ways but really made the code messy.
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400338 saved_me_map = omci_entities.entity_id_to_class_map
339 omci_entities.entity_id_to_class_map = self._me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600340
341 try:
342 rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
Chip Boling67b674a2019-02-08 11:42:18 -0600343
344 except KeyError as e:
345 # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
346 self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
347 rx_frame = self._decode_unknown_me(msg)
348 self._rx_unknown_me += 1
Chip Boling67b674a2019-02-08 11:42:18 -0600349
350 except Exception as e:
351 self.log.exception('frame-decode', msg=hexlify(msg), e=e)
352 return
353
354 finally:
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400355 omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
356
357 rx_tid = rx_frame.fields['transaction_id']
358 if rx_tid == 0:
359 return self._receive_onu_message(rx_frame)
360
361 # Previously unreachable if this is the very first round-trip Rx or we
362 # have been running consecutive errors
363 if self._rx_frames == 0 or self._consecutive_errors != 0:
364 self.reactor.callLater(0, self._publish_connectivity_event, True)
365
366 self._rx_frames += 1
367 self._consecutive_errors = 0
Chip Boling67b674a2019-02-08 11:42:18 -0600368
369 try:
370 high_priority = self._tid_is_high_priority(rx_tid)
371 index = self._get_priority_index(high_priority)
372
373 # (timestamp, defer, frame, timeout, retry, delayedCall)
374 last_tx_tuple = self._tx_request[index]
375
376 if last_tx_tuple is None or \
377 last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
378 # Possible late Rx on a message that timed-out
379 self._rx_unknown_tid += 1
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400380 self._rx_late += 1
Chip Boling67b674a2019-02-08 11:42:18 -0600381 return
382
383 ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
384 if dc is not None and not dc.cancelled and not dc.called:
385 dc.cancel()
Chip Boling67b674a2019-02-08 11:42:18 -0600386
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400387 _secs = self._update_rx_tx_stats(now, ts)
Chip Boling67b674a2019-02-08 11:42:18 -0600388
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400389 # Late arrival already serviced by a timeout?
Chip Boling67b674a2019-02-08 11:42:18 -0600390 if d.called:
391 self._rx_late += 1
392 return
393
394 except Exception as e:
395 self.log.exception('frame-match', msg=hexlify(msg), e=e)
396 if d is not None:
397 return d.errback(failure.Failure(e))
398 return
399
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400400 # Publish Rx event to listeners in a different task
401 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
Chip Boling67b674a2019-02-08 11:42:18 -0600402
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400403 # begin success callback chain (will cancel timeout and queue next Tx message)
404 self._rx_response[index] = rx_frame
405 d.callback(rx_frame)
Chip Boling67b674a2019-02-08 11:42:18 -0600406
407 except Exception as e:
408 self.log.exception('rx-msg', e=e)
409
Chip Boling67b674a2019-02-08 11:42:18 -0600410 def _decode_unknown_me(self, msg):
411 """
412 Decode an ME for an unsupported class ID. This should only occur for a subset
413 of message types (Get, Set, MIB Upload Next, ...) and they should only be
414 responses as well.
415
416 There are some times below that are commented out. For VOLTHA 2.0, it is
417 expected that any get, set, create, delete for unique (often vendor) MEs
418 will be coded by the ONU utilizing it and supplied to OpenOMCI as a
419 vendor-specific ME during device initialization.
420
421 :param msg: (str) Binary data
422 :return: (OmciFrame) resulting frame
423 """
424 from struct import unpack
425
426 (tid, msg_type, framing) = unpack('!HBB', msg[0:4])
427
428 assert framing == 0xa, 'Only basic OMCI framing supported at this time'
429 msg = msg[4:]
430
431 # TODO: Commented out items below are future work (not expected for VOLTHA v2.0)
432 (msg_class, kwargs) = {
433 # OmciCreateResponse.message_id: (OmciCreateResponse, None),
434 # OmciDeleteResponse.message_id: (OmciDeleteResponse, None),
435 # OmciSetResponse.message_id: (OmciSetResponse, None),
436 # OmciGetResponse.message_id: (OmciGetResponse, None),
437 # OmciGetAllAlarmsNextResponse.message_id: (OmciGetAllAlarmsNextResponse, None),
438 OmciMibUploadNextResponse.message_id: (OmciMibUploadNextResponse,
439 {
440 'entity_class': unpack('!H', msg[0:2])[0],
441 'entity_id': unpack('!H', msg[2:4])[0],
442 'object_entity_class': unpack('!H', msg[4:6])[0],
443 'object_entity_id': unpack('!H', msg[6:8])[0],
444 'object_attributes_mask': unpack('!H', msg[8:10])[0],
445 'object_data': {
446 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[10:-4])
447 },
448 }),
449 # OmciAlarmNotification.message_id: (OmciAlarmNotification, None),
450 OmciAttributeValueChange.message_id: (OmciAttributeValueChange,
451 {
452 'entity_class': unpack('!H', msg[0:2])[0],
453 'entity_id': unpack('!H', msg[2:4])[0],
454 'data': {
455 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[4:-8])
456 },
457 }),
458 # OmciTestResult.message_id: (OmciTestResult, None),
459 }.get(msg_type, None)
460
461 if msg_class is None:
462 raise TypeError('Unsupport Message Type for Unknown Decode: {}',
463 msg_type)
464
465 return OmciFrame(transaction_id=tid, message_type=msg_type,
466 omci_message=msg_class(**kwargs))
467
468 def _publish_rx_frame(self, tx_frame, rx_frame):
469 """
470 Notify listeners of successful response frame
471 :param tx_frame: (OmciFrame) Original request frame
472 :param rx_frame: (OmciFrame) Response frame
473 """
474 if self._enabled and isinstance(rx_frame, OmciFrame):
475 frame_type = rx_frame.fields['omci_message'].message_id
476 event_type = OMCI_CC._frame_to_event_type.get(frame_type)
477
478 if event_type is not None:
479 topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
480 msg = {TX_REQUEST_KEY: tx_frame,
481 RX_RESPONSE_KEY: rx_frame}
482
483 self.event_bus.publish(topic=topic, msg=msg)
484
485 def _publish_connectivity_event(self, connected):
486 """
487 Notify listeners of Rx/Tx connectivity over OMCI
488 :param connected: (bool) True if connectivity transitioned from unreachable
489 to reachable
490 """
491 if self._enabled:
492 topic = OMCI_CC.event_bus_topic(self._device_id,
493 RxEvent.Connectivity)
494 msg = {CONNECTED_KEY: connected}
495 self.event_bus.publish(topic=topic, msg=msg)
496
497 def flush(self):
498 """Flush/cancel in active or pending Tx requests"""
499 requests = []
500
501 for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
502 next_frame, self._tx_request[priority] = self._tx_request[priority], None
503 if next_frame is not None:
504 requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
505
506 requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
507 for next_frame in self._pending[priority]]
508 self._pending[priority] = list()
509
510 # Cancel them...
511 def cleanup_unhandled_error(_):
512 pass # So the cancel below does not flag an unhandled error
513
514 for d, dc in requests:
515 if d is not None and not d.called:
516 d.addErrback(cleanup_unhandled_error)
517 d.cancel()
518
519 if dc is not None and not dc.called and not dc.cancelled:
520 dc.cancel()
521
522 def _get_tx_tid(self, high_priority=False):
523 """
524 Get the next Transaction ID for a tx. Note TID=0 is reserved
525 for autonomously generated messages from an ONU
526
527 :return: (int) TID
528 """
529 if self._extended_messaging or not high_priority:
530 index = OMCI_CC.LOW_PRIORITY
531 min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
532 max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
533 else:
534 index = OMCI_CC.HIGH_PRIORITY
535 min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
536 max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
537
538 tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
539
540 if self._tx_tid[index] > max_tid:
541 self._tx_tid[index] = min_tid
542
543 return tx_tid
544
545 def _request_failure(self, value, tx_tid, high_priority):
546 """
547 Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
548 will call a different method that may retry if requested. This routine
549 will be called after the final (if any) timeout or other error
550
551 :param value: (Failure) Twisted failure
552 :param tx_tid: (int) Associated Tx TID
553 """
554 index = self._get_priority_index(high_priority)
555
556 if self._tx_request[index] is not None:
557 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
558 tx_frame_tid = tx_frame.fields['transaction_id']
559
560 if tx_frame_tid == tx_tid:
561 timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
562 dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
563 self._tx_request[index] = None
564
565 if dc is not None and not dc.called and not dc.cancelled:
566 dc.cancel()
567
568 if isinstance(value, failure.Failure):
569 value.trap(CancelledError)
570 self._rx_timeouts += 1
571 self._consecutive_errors += 1
572 if self._consecutive_errors == 1:
573 reactor.callLater(0, self._publish_connectivity_event, False)
574
575 self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
576 value = failure.Failure(TimeoutError(timeout, "Deferred"))
577 else:
578 # Search pending queue. This may be a cancel coming in from the original
579 # task that requested the Tx. If found, remove
580 # from pending queue
581 for index, request in enumerate(self._pending[index]):
582 req = request.get(OMCI_CC.PENDING_DEFERRED)
583 if req is not None and req.fields['transaction_id'] == tx_tid:
584 self._pending[index].pop(index)
585 break
586
587 self._send_next_request(high_priority)
588 return value
589
590 def _request_success(self, rx_frame, high_priority):
591 """
592 Handle transmit success (a matching Rx was received)
593
594 :param rx_frame: (OmciFrame) OMCI response frame with matching TID
595 :return: (OmciFrame) OMCI response frame with matching TID
596 """
597 index = self._get_priority_index(high_priority)
598
599 if rx_frame is None:
600 rx_frame = self._rx_response[index]
601
602 rx_tid = rx_frame.fields.get('transaction_id')
603
604 if rx_tid is not None:
605 if self._tx_request[index] is not None:
606 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
607 tx_tid = tx_frame.fields['transaction_id']
608
609 if rx_tid == tx_tid:
610 # Remove this request. Next callback in chain initiates next Tx
611 self._tx_request[index] = None
612 else:
613 self._rx_late += 1
614 else:
615 self._rx_late += 1
616
617 self._send_next_request(high_priority)
618
619 # Return rx_frame (to next item in callback list)
620 return rx_frame
621
622 def _request_timeout(self, tx_tid, high_priority):
623 """
624 Tx Request timed out. Resend immediately if there retries is non-zero. A
625 separate deferred (dc) is used on each actual Tx which is not the deferred
626 (d) that is returned to the caller of the 'send()' method.
627
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400628 If the timeout if the transmitted frame was zero, this is just cleanup of
629 that transmit request and not necessarily a transmit timeout
630
Chip Boling67b674a2019-02-08 11:42:18 -0600631 :param tx_tid: (int) TID of frame
632 :param high_priority: (bool) True if high-priority queue
633 """
634 self.log.debug("_request_timeout", tx_tid=tx_tid)
635 index = self._get_priority_index(high_priority)
636
637 if self._tx_request[index] is not None:
638 # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
639 ts, d, frame, timeout, retry, _dc = self._tx_request[index]
640
641 if frame.fields.get('transaction_id', 0) == tx_tid:
642 self._tx_request[index] = None
643
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400644 if timeout > 0:
645 self._rx_timeouts += 1
646
647 if retry > 0:
648 # Push on front of TX pending queue so that it transmits next with the
649 # original TID
650 self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
651
652 elif not d.called:
653 d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
654 else:
655 self.log.warn('timeout-but-not-the-tx-frame') # Statement mainly for debugging
Chip Boling67b674a2019-02-08 11:42:18 -0600656
657 self._send_next_request(high_priority)
658
659 def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
660 index = self._get_priority_index(high_priority)
661 tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
662
663 if front:
664 self._pending[index].insert(0, tuple)
665 else:
666 self._pending[index].append(tx_tuple)
667
668 # Monitor queue stats
669 qlen = len(self._pending[index])
670
671 if high_priority:
672 if self._max_hp_tx_queue < qlen:
673 self._max_hp_tx_queue = qlen
674
675 elif self._max_lp_tx_queue < qlen:
676 self._max_lp_tx_queue = qlen
677
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400678 self.log.debug("queue-size", index=index, pending_qlen=qlen)
679
Chip Boling67b674a2019-02-08 11:42:18 -0600680 def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
681 """
682 Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
683
684 :param frame: (OMCIFrame) Message to send
685 :param timeout: (int) Rx Timeout. 0=No response needed
686 :param retry: (int) Additional retry attempts on channel failure, default=0
687 :param high_priority: (bool) High Priority requests
688 :return: (deferred) A deferred that fires when the response frame is received
689 or if an error/timeout occurs
690 """
691 if not self.enabled or self._proxy_address is None:
692 # TODO custom exceptions throughout this code would be helpful
693 self._tx_errors += 1
694 return fail(result=failure.Failure(Exception('OMCI is not enabled')))
695
696 timeout = float(timeout)
697 if timeout > float(MAX_OMCI_REQUEST_AGE):
698 self._tx_errors += 1
699 msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
700 return fail(result=failure.Failure(Exception(msg)))
701
702 if not isinstance(frame, OmciFrame):
703 self._tx_errors += 1
704 msg = "Invalid frame class '{}'".format(type(frame))
705 return fail(result=failure.Failure(Exception(msg)))
706 try:
707 index = self._get_priority_index(high_priority)
708 tx_tid = frame.fields['transaction_id']
709
710 if tx_tid is None:
711 tx_tid = self._get_tx_tid(high_priority=high_priority)
712 frame.fields['transaction_id'] = tx_tid
713
714 assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
715 assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
716
717 # Queue it and request next Tx if tx channel is free
718 d = defer.Deferred()
719
720 self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
721 self._send_next_request(high_priority)
722
723 if timeout == 0:
724 self.log.debug("send-timeout-zero", tx_tid=tx_tid)
725 self.reactor.callLater(0, d.callback, 'queued')
726
727 return d
728
729 except Exception as e:
730 self._tx_errors += 1
731 self._consecutive_errors += 1
732
733 if self._consecutive_errors == 1:
734 self.reactor.callLater(0, self._publish_connectivity_event, False)
735
736 self.log.exception('send-omci', e=e)
737 return fail(result=failure.Failure(e))
738
739 def _ok_to_send(self, tx_request, high_priority):
740 """
741 G.988 specifies not to issue a MIB upload or a Software download request
742 when a similar action is in progress on the other channel. To keep the
743 logic here simple, a new upload/download will not be allowed if either a
744 upload/download is going on
745
746 :param tx_request (OmciFrame) Frame to send
747 :param high_priority: (bool) for queue selection
748 :return: True if okay to dequeue and send frame
749 """
750 other = self._get_priority_index(not high_priority)
751
752 if self._tx_request[other] is None:
753 return True
754
755 this_msg_type = tx_request.fields['message_type'] & 0x1f
756 not_allowed = {OP.MibUpload.value,
757 OP.MibUploadNext.value,
758 OP.StartSoftwareDownload.value,
759 OP.DownloadSection.value,
760 OP.EndSoftwareDownload.value}
761
762 if this_msg_type not in not_allowed:
763 return True
764
765 other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
766 return other_msg_type not in not_allowed
767
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500768 @inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -0600769 def _send_next_request(self, high_priority):
770 """
771 Pull next tx request and send it
772
773 :param high_priority: (bool) True if this was a high priority request
774 :return: results, so callback chain continues if needed
775 """
776 index = self._get_priority_index(high_priority)
777
778 if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
779 d = None
780 try:
781 if len(self._pending[index]) and \
782 not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
783 high_priority):
784 reactor.callLater(0.05, self._send_next_request, high_priority)
785 return
786
787 next_frame = self._pending[index].pop(0)
788
789 d = next_frame[OMCI_CC.PENDING_DEFERRED]
790 frame = next_frame[OMCI_CC.PENDING_FRAME]
791 timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
792 retry = next_frame[OMCI_CC.PENDING_RETRY]
793
794 tx_tid = frame.fields['transaction_id']
795
796 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
797 # save the current value of the entity_id_to_class_map, then
798 # replace it with our custom one before decode, and then finally
799 # restore it later. Tried other ways but really made the code messy.
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400800 saved_me_map = omci_entities.entity_id_to_class_map
801 omci_entities.entity_id_to_class_map = self._me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600802
803 ts = arrow.utcnow().float_timestamp
804 try:
805 self._rx_response[index] = None
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500806
Mahir Gunyeld40c88d2019-05-16 14:09:42 -0700807 omci_msg = InterAdapterOmciMessage(
808 message=hexify(str(frame)),
809 proxy_address=self._proxy_address,
810 connect_status=self._device.connect_status)
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500811
Mahir Gunyeld40c88d2019-05-16 14:09:42 -0700812 self.log.debug('inter-adapter-send-omci', omci_msg=omci_msg,
813 connect_status=self._device.connect_status,
814 channel_id=self._proxy_address.channel_id)
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500815
816 yield self._adapter_proxy.send_inter_adapter_message(
817 msg=omci_msg,
818 type=InterAdapterMessageType.OMCI_REQUEST,
819 from_adapter=self._device.type,
820 to_adapter=self._proxy_address.device_type,
821 to_device_id=self._device_id,
822 proxy_device_id=self._proxy_address.device_id
823 )
824
Chip Boling67b674a2019-02-08 11:42:18 -0600825 finally:
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400826 omci_entities.entity_id_to_class_map = saved_me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600827
828 self._tx_frames += 1
829
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400830 # Note: the 'd' deferred in the queued request we just got will
831 # already have its success callback queued (callLater -> 0) with a
832 # result of "queued". Here we need time it out internally so
833 # we can call cleanup appropriately. G.988 mentions that most ONUs
834 # will process an request in < 1 second.
835 dc_timeout = timeout if timeout > 0 else 1.0
Chip Boling67b674a2019-02-08 11:42:18 -0600836
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400837 # Timeout on internal deferred to support internal retries if requested
838 dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
839
840 # (timestamp, defer, frame, timeout, retry, delayedCall)
841 self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
842
843 if timeout > 0:
Chip Boling67b674a2019-02-08 11:42:18 -0600844 d.addCallbacks(self._request_success, self._request_failure,
845 callbackArgs=(high_priority,),
846 errbackArgs=(tx_tid, high_priority))
847
848 except IndexError:
849 pass # Nothing pending in this queue
850
851 except Exception as e:
852 self.log.exception('send-proxy-exception', e=e)
853 self._tx_request[index] = None
854 self.reactor.callLater(0, self._send_next_request, high_priority)
855
856 if d is not None:
857 d.errback(failure.Failure(e))
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400858 else:
859 self.log.debug("tx-request-occupied", index=index)
Chip Boling67b674a2019-02-08 11:42:18 -0600860
861 ###################################################################################
862 # MIB Action shortcuts
863
864 def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
865 """
866 Perform a MIB Reset
867 """
868 self.log.debug('send-mib-reset')
869
870 frame = OntDataFrame().mib_reset()
871 return self.send(frame, timeout=timeout, high_priority=high_priority)
872
873 def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
874 self.log.debug('send-mib-upload')
875
876 frame = OntDataFrame().mib_upload()
877 return self.send(frame, timeout=timeout, high_priority=high_priority)
878
879 def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
880 self.log.debug('send-mib-upload-next')
881
882 frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
883 return self.send(frame, timeout=timeout, high_priority=high_priority)
884
885 def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
886 """
887 Send an ONU Device reboot request (ONU-G ME).
888
889 NOTICE: This method is being deprecated and replaced with a tasks to preform this function
890 """
891 self.log.debug('send-mib-reboot')
892
893 frame = OntGFrame().reboot()
894 return self.send(frame, timeout=timeout, high_priority=high_priority)
895
896 def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
897 self.log.debug('send_get_alarm')
898
899 frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
900 return self.send(frame, timeout=timeout, high_priority=high_priority)
901
902 def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
903 self.log.debug('send_get_alarm_next')
904
905 frame = OntDataFrame().get_all_alarm_next(seq_no)
906 return self.send(frame, timeout=timeout, high_priority=high_priority)
907
908 def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
909 frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
910 return self.send(frame, timeout, 3, high_priority=high_priority)
911
912 def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
913 """
914 # timeout=0 indicates no repons needed
915 """
916 # self.log.debug("send_download_section", instance_id=image_inst_id, section=section_num, timeout=timeout)
917 if timeout > 0:
918 frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
919 else:
920 frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
921 return self.send(frame, timeout, high_priority=high_priority)
922
923 # if timeout > 0:
924 # self.reactor.callLater(0, self.sim_receive_download_section_resp,
925 # frame.fields["transaction_id"],
926 # frame.fields["omci_message"].fields["section_number"])
927 # return d
928
929 def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
930 frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
931 return self.send(frame, timeout, high_priority=high_priority)
932 # self.reactor.callLater(0, self.sim_receive_end_software_download_resp, frame.fields["transaction_id"])
933 # return d
934
935 def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
936 frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
937 return self.send(frame, timeout, high_priority=high_priority)
938
939 def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
940 frame = SoftwareImageFrame(image_inst_id).commit_image()
941 return self.send(frame, timeout, high_priority=high_priority)
942