blob: 66c63b0c7932177aecf9f627ab8b9c8c0d2e6b5f [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""
17OMCI Message support
18"""
19
Zack Williams84a71e92019-11-15 09:00:19 -070020from __future__ import absolute_import, division
Chip Boling67b674a2019-02-08 11:42:18 -060021import sys
22import arrow
23from twisted.internet import reactor, defer
24from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -060025from pyvoltha.adapters.extensions.omci.omci import *
26from pyvoltha.adapters.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
27from pyvoltha.adapters.extensions.omci.me_frame import MEFrame
28from pyvoltha.adapters.extensions.omci.omci_defs import EntityOperations, ReasonCodes
29from pyvoltha.adapters.extensions.omci.omci_entities import entity_id_to_class_map
30from pyvoltha.common.event_bus import EventBusClient
Matt Jeanneret35c42d22019-03-07 17:23:27 -050031from voltha_protos.inter_container_pb2 import InterAdapterMessageType, InterAdapterOmciMessage
Chip Boling67b674a2019-02-08 11:42:18 -060032from enum import IntEnum
33from binascii import hexlify
Zack Williams84a71e92019-11-15 09:00:19 -070034import codecs
Chip Boling67b674a2019-02-08 11:42:18 -060035
36DEFAULT_OMCI_TIMEOUT = 10 # 3 # Seconds
37MAX_OMCI_REQUEST_AGE = 60 # Seconds
38DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
Chip Boling67b674a2019-02-08 11:42:18 -060039
40CONNECTED_KEY = 'connected'
41TX_REQUEST_KEY = 'tx-request'
42RX_RESPONSE_KEY = 'rx-response'
43UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
44
45
46class OmciCCRxEvents(IntEnum):
47 AVC_Notification = 0,
48 MIB_Upload = 1,
49 MIB_Upload_Next = 2,
50 Create = 3,
51 Delete = 4,
52 Set = 5,
53 Alarm_Notification = 6,
54 Test_Result = 7,
55 MIB_Reset = 8,
56 Connectivity = 9,
57 Get_ALARM_Get = 10,
Matt Jeanneret702f05f2019-09-17 19:47:34 -040058 Get_ALARM_Get_Next = 11,
59 Start_Software_Download = 12,
60 Download_Section = 13,
61 End_Software_Download = 14,
62 Activate_Software = 15,
63 Commit_Software = 15,
Chip Boling67b674a2019-02-08 11:42:18 -060064
65
66# abbreviations
67OP = EntityOperations
68RxEvent = OmciCCRxEvents
69
70
71class OMCI_CC(object):
72 """ Handle OMCI Communication Channel specifics for Adtran ONUs"""
73
Matt Jeanneret702f05f2019-09-17 19:47:34 -040074 MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
75 MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
Chip Boling67b674a2019-02-08 11:42:18 -060076 MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
77 MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
78 LOW_PRIORITY = 0
79 HIGH_PRIORITY = 1
80
81 # Offset into some tuples for pending lists and tx in progress
82 PENDING_DEFERRED = 0
83 PENDING_FRAME = 1
84 PENDING_TIMEOUT = 2
85 PENDING_RETRY = 3
86
87 REQUEST_TIMESTAMP = 0
88 REQUEST_DEFERRED = 1
89 REQUEST_FRAME = 2
90 REQUEST_TIMEOUT = 3
91 REQUEST_RETRY = 4
92 REQUEST_DELAYED_CALL = 5
93
94 _frame_to_event_type = {
95 OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
96 OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
97 OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
98 OmciCreateResponse.message_id: RxEvent.Create,
99 OmciDeleteResponse.message_id: RxEvent.Delete,
100 OmciSetResponse.message_id: RxEvent.Set,
101 OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
102 OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
103 }
104
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500105 def __init__(self, core_proxy, adapter_proxy, device_id, me_map=None,
Chip Boling67b674a2019-02-08 11:42:18 -0600106 clock=None):
107 self.log = structlog.get_logger(device_id=device_id)
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500108 self._core_proxy = core_proxy
109 self._adapter_proxy = adapter_proxy
Chip Boling67b674a2019-02-08 11:42:18 -0600110 self._device_id = device_id
111 self._proxy_address = None
112 self._enabled = False
113 self._extended_messaging = False
114 self._me_map = me_map
115 if clock is None:
116 self.reactor = reactor
117 else:
118 self.reactor = clock
119
120 # Support 2 levels of priority since only baseline message set supported
121 self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
122 self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
123 self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
124 self._rx_response = [None, None]
125
126 # Statistics
127 self._tx_frames = 0
128 self._rx_frames = 0
129 self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
130 self._rx_onu_frames = 0 # Autonomously generated ONU frames
131 self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
132 self._rx_timeouts = 0
133 self._rx_late = 0 # Frame response received after timeout on Tx
134 self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
135 self._tx_errors = 0 # Exceptions during tx request
136 self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
Zack Williams84a71e92019-11-15 09:00:19 -0700137 self._reply_min = sys.maxsize # Fastest successful tx -> rx
Chip Boling67b674a2019-02-08 11:42:18 -0600138 self._reply_max = 0 # Longest successful tx -> rx
139 self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
140 self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
141 self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
142
143 self.event_bus = EventBusClient()
144
145 # If a list of custom ME Entities classes were provided, insert them into
146 # main class_id to entity map.
147 # TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
148
149 def __str__(self):
150 return "OMCISupport: {}".format(self._device_id)
151
152 def _get_priority_index(self, high_priority):
153 """ Centralized logic to help make extended message support easier in the future"""
154 return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
155 else OMCI_CC.LOW_PRIORITY
156
157 def _tid_is_high_priority(self, tid):
158 """ Centralized logic to help make extended message support easier in the future"""
159
160 return not self._extended_messaging and \
161 OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
162
163 @staticmethod
164 def event_bus_topic(device_id, event):
165 """
166 Get the topic name for a given event Frame Type
167 :param device_id: (str) ONU Device ID
168 :param event: (OmciCCRxEvents) Type of event
169 :return: (str) Topic string
170 """
171 assert event in OmciCCRxEvents, \
172 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
173
174 return 'omci-rx:{}:{}'.format(device_id, event.name)
175
176 @property
177 def enabled(self):
178 return self._enabled
179
180 @enabled.setter
181 def enabled(self, value):
182 """
183 Enable/disable the OMCI Communications Channel
184
185 :param value: (boolean) True to enable, False to disable
186 """
187 assert isinstance(value, bool), 'enabled is a boolean'
188
189 if self._enabled != value:
190 self._enabled = value
191 if self._enabled:
192 self._start()
193 else:
194 self._stop()
195
196 @property
197 def tx_frames(self):
198 return self._tx_frames
199
200 @property
201 def rx_frames(self):
202 return self._rx_frames
203
204 @property
205 def rx_unknown_tid(self):
206 return self._rx_unknown_tid # Tx TID not found
207
208 @property
209 def rx_unknown_me(self):
210 return self._rx_unknown_me
211
212 @property
213 def rx_onu_frames(self):
214 return self._rx_onu_frames
215
216 @property
217 def rx_onu_discards(self):
218 return self._rx_onu_discards # Attribute Value change autonomous overflows
219
220 @property
221 def rx_timeouts(self):
222 return self._rx_timeouts
223
224 @property
225 def rx_late(self):
226 return self._rx_late
227
228 @property
229 def tx_errors(self):
230 return self._tx_errors
231
232 @property
233 def consecutive_errors(self):
234 return self._consecutive_errors
235
236 @property
237 def reply_min(self):
Zack Williams84a71e92019-11-15 09:00:19 -0700238 return int(self._reply_min * 1000.0) # Milliseconds
Chip Boling67b674a2019-02-08 11:42:18 -0600239
240 @property
241 def reply_max(self):
Zack Williams84a71e92019-11-15 09:00:19 -0700242 return int(self._reply_max * 1000.0) # Milliseconds
Chip Boling67b674a2019-02-08 11:42:18 -0600243
244 @property
245 def reply_average(self):
246 avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
Zack Williams84a71e92019-11-15 09:00:19 -0700247 return int(avg * 1000.0) # Milliseconds
Chip Boling67b674a2019-02-08 11:42:18 -0600248
249 @property
250 def hp_tx_queue_len(self):
251 return len(self._pending[OMCI_CC.HIGH_PRIORITY])
252
253 @property
254 def lp_tx_queue_len(self):
255 return len(self._pending[OMCI_CC.LOW_PRIORITY])
256
257 @property
258 def max_hp_tx_queue(self):
259 return self._max_hp_tx_queue
260
261 @property
262 def max_lp_tx_queue(self):
263 return self._max_lp_tx_queue
264
265 def _start(self):
266 """
267 Start the OMCI Communications Channel
268 """
269 assert self._enabled, 'Start should only be called if enabled'
270 self.flush()
271
Chip Boling67b674a2019-02-08 11:42:18 -0600272 def _stop(self):
273 """
274 Stop the OMCI Communications Channel
275 """
276 assert not self._enabled, 'Stop should only be called if disabled'
277 self.flush()
278 self._proxy_address = None
279
280 def _receive_onu_message(self, rx_frame):
281 """ Autonomously generated ONU frame Rx handler"""
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400282 self.log.debug('rx-onu-frame', frame_type=type(rx_frame))
Chip Boling67b674a2019-02-08 11:42:18 -0600283
284 msg_type = rx_frame.fields['message_type']
285 self._rx_onu_frames += 1
286
287 msg = {TX_REQUEST_KEY: None,
288 RX_RESPONSE_KEY: rx_frame}
289
290 if msg_type == EntityOperations.AlarmNotification.value:
291 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
292 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
293
294 elif msg_type == EntityOperations.AttributeValueChange.value:
295 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
296 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
297
298 elif msg_type == EntityOperations.TestResult.value:
299 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
300 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
301
302 else:
303 self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
304 self._rx_onu_discards += 1
305
306 def _update_rx_tx_stats(self, now, ts):
307 ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
308 secs = ts_diff.total_seconds()
309 self._reply_sum += secs
310 if secs < self._reply_min:
311 self._reply_min = secs
312 if secs > self._reply_max:
313 self._reply_max = secs
314 return secs
315
316 def receive_message(self, msg):
317 """
318 Receive and OMCI message from the proxy channel to the OLT.
319
320 Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
321 :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
322 """
323 if not self.enabled:
324 return
325
326 try:
327 now = arrow.utcnow()
328 d = None
329
330 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
331 # save the current value of the entity_id_to_class_map, then
332 # replace it with our custom one before decode, and then finally
333 # restore it later. Tried other ways but really made the code messy.
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400334 saved_me_map = omci_entities.entity_id_to_class_map
335 omci_entities.entity_id_to_class_map = self._me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600336
337 try:
338 rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
lcuibfdfbb52019-04-29 22:47:52 -0700339 self.log.debug('recv-omci-msg', omci_msg=hexlify(msg))
Chip Boling67b674a2019-02-08 11:42:18 -0600340 except KeyError as e:
341 # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
Matteo Scandolo63efb062019-11-26 12:14:48 -0700342 self.log.debug('frame-decode-key-error', omci_msg=hexlify(msg), e=e)
Chip Boling67b674a2019-02-08 11:42:18 -0600343 rx_frame = self._decode_unknown_me(msg)
344 self._rx_unknown_me += 1
Chip Boling67b674a2019-02-08 11:42:18 -0600345
346 except Exception as e:
Matteo Scandolo63efb062019-11-26 12:14:48 -0700347 self.log.exception('frame-decode', omci_msg=hexlify(msg), e=e)
Chip Boling67b674a2019-02-08 11:42:18 -0600348 return
349
350 finally:
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400351 omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
352
353 rx_tid = rx_frame.fields['transaction_id']
onkarkundargi66a1d072019-09-23 14:58:50 +0530354 msg_type = rx_frame.fields['message_type']
Matt Jeanneret702f05f2019-09-17 19:47:34 -0400355 self.log.debug('Received message for rx_tid', rx_tid = rx_tid, msg_type = msg_type)
onkarkundargi66a1d072019-09-23 14:58:50 +0530356 # Filter the Test Result frame and route through receive onu
357 # message method.
358 if rx_tid == 0 or msg_type == EntityOperations.TestResult.value:
Matt Jeanneret8293a632019-07-31 15:56:13 -0400359 self.log.debug('Receive ONU message', rx_tid=0)
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400360 return self._receive_onu_message(rx_frame)
361
362 # Previously unreachable if this is the very first round-trip Rx or we
363 # have been running consecutive errors
364 if self._rx_frames == 0 or self._consecutive_errors != 0:
Matt Jeanneret8293a632019-07-31 15:56:13 -0400365 self.log.debug('Consecutive errors for rx', err = self._consecutive_errors)
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400366 self.reactor.callLater(0, self._publish_connectivity_event, True)
367
368 self._rx_frames += 1
369 self._consecutive_errors = 0
Chip Boling67b674a2019-02-08 11:42:18 -0600370
371 try:
372 high_priority = self._tid_is_high_priority(rx_tid)
373 index = self._get_priority_index(high_priority)
374
375 # (timestamp, defer, frame, timeout, retry, delayedCall)
376 last_tx_tuple = self._tx_request[index]
377
378 if last_tx_tuple is None or \
379 last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
380 # Possible late Rx on a message that timed-out
Matt Jeanneret8293a632019-07-31 15:56:13 -0400381 if last_tx_tuple:
382 self.log.debug('Unknown message', rx_tid=rx_tid,
383 tx_id=last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id'))
Chip Boling67b674a2019-02-08 11:42:18 -0600384 self._rx_unknown_tid += 1
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400385 self._rx_late += 1
Chip Boling67b674a2019-02-08 11:42:18 -0600386 return
387
388 ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
389 if dc is not None and not dc.cancelled and not dc.called:
390 dc.cancel()
Chip Boling67b674a2019-02-08 11:42:18 -0600391
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400392 _secs = self._update_rx_tx_stats(now, ts)
Chip Boling67b674a2019-02-08 11:42:18 -0600393
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400394 # Late arrival already serviced by a timeout?
Chip Boling67b674a2019-02-08 11:42:18 -0600395 if d.called:
396 self._rx_late += 1
Matt Jeanneret8293a632019-07-31 15:56:13 -0400397 self.log.debug('Serviced by timeout. Late arrival', rx_late = self._rx_late)
Chip Boling67b674a2019-02-08 11:42:18 -0600398 return
399
400 except Exception as e:
401 self.log.exception('frame-match', msg=hexlify(msg), e=e)
402 if d is not None:
403 return d.errback(failure.Failure(e))
404 return
405
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400406 # Publish Rx event to listeners in a different task
Matt Jeanneret8293a632019-07-31 15:56:13 -0400407 self.log.debug('Publish rx event', rx_tid = rx_tid,
408 tx_tid = tx_frame.fields['transaction_id'])
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400409 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
Chip Boling67b674a2019-02-08 11:42:18 -0600410
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400411 # begin success callback chain (will cancel timeout and queue next Tx message)
412 self._rx_response[index] = rx_frame
413 d.callback(rx_frame)
Chip Boling67b674a2019-02-08 11:42:18 -0600414
415 except Exception as e:
416 self.log.exception('rx-msg', e=e)
417
Chip Boling67b674a2019-02-08 11:42:18 -0600418 def _decode_unknown_me(self, msg):
419 """
420 Decode an ME for an unsupported class ID. This should only occur for a subset
421 of message types (Get, Set, MIB Upload Next, ...) and they should only be
422 responses as well.
423
424 There are some times below that are commented out. For VOLTHA 2.0, it is
425 expected that any get, set, create, delete for unique (often vendor) MEs
426 will be coded by the ONU utilizing it and supplied to OpenOMCI as a
427 vendor-specific ME during device initialization.
428
429 :param msg: (str) Binary data
430 :return: (OmciFrame) resulting frame
431 """
432 from struct import unpack
433
434 (tid, msg_type, framing) = unpack('!HBB', msg[0:4])
435
436 assert framing == 0xa, 'Only basic OMCI framing supported at this time'
437 msg = msg[4:]
438
439 # TODO: Commented out items below are future work (not expected for VOLTHA v2.0)
440 (msg_class, kwargs) = {
441 # OmciCreateResponse.message_id: (OmciCreateResponse, None),
442 # OmciDeleteResponse.message_id: (OmciDeleteResponse, None),
443 # OmciSetResponse.message_id: (OmciSetResponse, None),
444 # OmciGetResponse.message_id: (OmciGetResponse, None),
445 # OmciGetAllAlarmsNextResponse.message_id: (OmciGetAllAlarmsNextResponse, None),
446 OmciMibUploadNextResponse.message_id: (OmciMibUploadNextResponse,
447 {
448 'entity_class': unpack('!H', msg[0:2])[0],
449 'entity_id': unpack('!H', msg[2:4])[0],
450 'object_entity_class': unpack('!H', msg[4:6])[0],
451 'object_entity_id': unpack('!H', msg[6:8])[0],
452 'object_attributes_mask': unpack('!H', msg[8:10])[0],
453 'object_data': {
454 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[10:-4])
455 },
456 }),
457 # OmciAlarmNotification.message_id: (OmciAlarmNotification, None),
458 OmciAttributeValueChange.message_id: (OmciAttributeValueChange,
459 {
460 'entity_class': unpack('!H', msg[0:2])[0],
461 'entity_id': unpack('!H', msg[2:4])[0],
462 'data': {
463 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[4:-8])
464 },
465 }),
466 # OmciTestResult.message_id: (OmciTestResult, None),
467 }.get(msg_type, None)
468
469 if msg_class is None:
470 raise TypeError('Unsupport Message Type for Unknown Decode: {}',
471 msg_type)
472
473 return OmciFrame(transaction_id=tid, message_type=msg_type,
474 omci_message=msg_class(**kwargs))
475
476 def _publish_rx_frame(self, tx_frame, rx_frame):
477 """
478 Notify listeners of successful response frame
479 :param tx_frame: (OmciFrame) Original request frame
480 :param rx_frame: (OmciFrame) Response frame
481 """
482 if self._enabled and isinstance(rx_frame, OmciFrame):
483 frame_type = rx_frame.fields['omci_message'].message_id
484 event_type = OMCI_CC._frame_to_event_type.get(frame_type)
485
486 if event_type is not None:
487 topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
488 msg = {TX_REQUEST_KEY: tx_frame,
489 RX_RESPONSE_KEY: rx_frame}
490
491 self.event_bus.publish(topic=topic, msg=msg)
492
493 def _publish_connectivity_event(self, connected):
494 """
495 Notify listeners of Rx/Tx connectivity over OMCI
496 :param connected: (bool) True if connectivity transitioned from unreachable
497 to reachable
498 """
499 if self._enabled:
500 topic = OMCI_CC.event_bus_topic(self._device_id,
501 RxEvent.Connectivity)
502 msg = {CONNECTED_KEY: connected}
503 self.event_bus.publish(topic=topic, msg=msg)
504
505 def flush(self):
506 """Flush/cancel in active or pending Tx requests"""
507 requests = []
508
509 for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
510 next_frame, self._tx_request[priority] = self._tx_request[priority], None
511 if next_frame is not None:
512 requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
513
514 requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
515 for next_frame in self._pending[priority]]
516 self._pending[priority] = list()
517
518 # Cancel them...
519 def cleanup_unhandled_error(_):
520 pass # So the cancel below does not flag an unhandled error
521
522 for d, dc in requests:
523 if d is not None and not d.called:
524 d.addErrback(cleanup_unhandled_error)
525 d.cancel()
526
527 if dc is not None and not dc.called and not dc.cancelled:
528 dc.cancel()
529
530 def _get_tx_tid(self, high_priority=False):
531 """
532 Get the next Transaction ID for a tx. Note TID=0 is reserved
533 for autonomously generated messages from an ONU
534
535 :return: (int) TID
536 """
537 if self._extended_messaging or not high_priority:
538 index = OMCI_CC.LOW_PRIORITY
539 min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
540 max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
541 else:
542 index = OMCI_CC.HIGH_PRIORITY
543 min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
544 max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
545
546 tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
547
548 if self._tx_tid[index] > max_tid:
549 self._tx_tid[index] = min_tid
550
551 return tx_tid
552
553 def _request_failure(self, value, tx_tid, high_priority):
554 """
555 Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
556 will call a different method that may retry if requested. This routine
557 will be called after the final (if any) timeout or other error
558
559 :param value: (Failure) Twisted failure
560 :param tx_tid: (int) Associated Tx TID
561 """
562 index = self._get_priority_index(high_priority)
563
564 if self._tx_request[index] is not None:
565 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
566 tx_frame_tid = tx_frame.fields['transaction_id']
567
568 if tx_frame_tid == tx_tid:
569 timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
570 dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
571 self._tx_request[index] = None
572
573 if dc is not None and not dc.called and not dc.cancelled:
574 dc.cancel()
575
576 if isinstance(value, failure.Failure):
577 value.trap(CancelledError)
578 self._rx_timeouts += 1
579 self._consecutive_errors += 1
580 if self._consecutive_errors == 1:
581 reactor.callLater(0, self._publish_connectivity_event, False)
582
583 self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
584 value = failure.Failure(TimeoutError(timeout, "Deferred"))
585 else:
586 # Search pending queue. This may be a cancel coming in from the original
587 # task that requested the Tx. If found, remove
588 # from pending queue
589 for index, request in enumerate(self._pending[index]):
590 req = request.get(OMCI_CC.PENDING_DEFERRED)
591 if req is not None and req.fields['transaction_id'] == tx_tid:
592 self._pending[index].pop(index)
593 break
594
595 self._send_next_request(high_priority)
596 return value
597
598 def _request_success(self, rx_frame, high_priority):
599 """
600 Handle transmit success (a matching Rx was received)
601
602 :param rx_frame: (OmciFrame) OMCI response frame with matching TID
603 :return: (OmciFrame) OMCI response frame with matching TID
604 """
605 index = self._get_priority_index(high_priority)
606
607 if rx_frame is None:
608 rx_frame = self._rx_response[index]
609
610 rx_tid = rx_frame.fields.get('transaction_id')
611
612 if rx_tid is not None:
613 if self._tx_request[index] is not None:
614 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
615 tx_tid = tx_frame.fields['transaction_id']
616
617 if rx_tid == tx_tid:
618 # Remove this request. Next callback in chain initiates next Tx
619 self._tx_request[index] = None
620 else:
621 self._rx_late += 1
622 else:
623 self._rx_late += 1
624
625 self._send_next_request(high_priority)
626
Matt Jeanneret8293a632019-07-31 15:56:13 -0400627 self.log.debug('inter-adapter-recv-omci', tid=rx_tid)
628
Chip Boling67b674a2019-02-08 11:42:18 -0600629 # Return rx_frame (to next item in callback list)
630 return rx_frame
631
632 def _request_timeout(self, tx_tid, high_priority):
633 """
634 Tx Request timed out. Resend immediately if there retries is non-zero. A
635 separate deferred (dc) is used on each actual Tx which is not the deferred
636 (d) that is returned to the caller of the 'send()' method.
637
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400638 If the timeout if the transmitted frame was zero, this is just cleanup of
639 that transmit request and not necessarily a transmit timeout
640
Chip Boling67b674a2019-02-08 11:42:18 -0600641 :param tx_tid: (int) TID of frame
642 :param high_priority: (bool) True if high-priority queue
643 """
644 self.log.debug("_request_timeout", tx_tid=tx_tid)
645 index = self._get_priority_index(high_priority)
646
647 if self._tx_request[index] is not None:
648 # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
649 ts, d, frame, timeout, retry, _dc = self._tx_request[index]
650
651 if frame.fields.get('transaction_id', 0) == tx_tid:
652 self._tx_request[index] = None
653
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400654 if timeout > 0:
655 self._rx_timeouts += 1
656
657 if retry > 0:
658 # Push on front of TX pending queue so that it transmits next with the
659 # original TID
660 self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
661
662 elif not d.called:
663 d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
664 else:
665 self.log.warn('timeout-but-not-the-tx-frame') # Statement mainly for debugging
Chip Boling67b674a2019-02-08 11:42:18 -0600666
667 self._send_next_request(high_priority)
668
669 def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
670 index = self._get_priority_index(high_priority)
671 tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
672
673 if front:
674 self._pending[index].insert(0, tuple)
675 else:
676 self._pending[index].append(tx_tuple)
677
678 # Monitor queue stats
679 qlen = len(self._pending[index])
680
681 if high_priority:
682 if self._max_hp_tx_queue < qlen:
683 self._max_hp_tx_queue = qlen
684
685 elif self._max_lp_tx_queue < qlen:
686 self._max_lp_tx_queue = qlen
687
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400688 self.log.debug("queue-size", index=index, pending_qlen=qlen)
689
Chip Boling67b674a2019-02-08 11:42:18 -0600690 def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
691 """
692 Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
693
694 :param frame: (OMCIFrame) Message to send
695 :param timeout: (int) Rx Timeout. 0=No response needed
696 :param retry: (int) Additional retry attempts on channel failure, default=0
697 :param high_priority: (bool) High Priority requests
698 :return: (deferred) A deferred that fires when the response frame is received
699 or if an error/timeout occurs
700 """
701 if not self.enabled or self._proxy_address is None:
702 # TODO custom exceptions throughout this code would be helpful
703 self._tx_errors += 1
Matt Jeanneret8293a632019-07-31 15:56:13 -0400704 self.log.error("cannot-send-omci-msg", tx_errors=self._tx_errors, omci_cc_enabled=self._enabled, proxy_address=self._proxy_address)
Chip Boling67b674a2019-02-08 11:42:18 -0600705 return fail(result=failure.Failure(Exception('OMCI is not enabled')))
706
707 timeout = float(timeout)
708 if timeout > float(MAX_OMCI_REQUEST_AGE):
709 self._tx_errors += 1
710 msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
711 return fail(result=failure.Failure(Exception(msg)))
712
713 if not isinstance(frame, OmciFrame):
714 self._tx_errors += 1
715 msg = "Invalid frame class '{}'".format(type(frame))
716 return fail(result=failure.Failure(Exception(msg)))
717 try:
718 index = self._get_priority_index(high_priority)
719 tx_tid = frame.fields['transaction_id']
720
721 if tx_tid is None:
722 tx_tid = self._get_tx_tid(high_priority=high_priority)
723 frame.fields['transaction_id'] = tx_tid
724
725 assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
726 assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
727
728 # Queue it and request next Tx if tx channel is free
729 d = defer.Deferred()
730
731 self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
732 self._send_next_request(high_priority)
733
734 if timeout == 0:
735 self.log.debug("send-timeout-zero", tx_tid=tx_tid)
736 self.reactor.callLater(0, d.callback, 'queued')
737
738 return d
739
740 except Exception as e:
741 self._tx_errors += 1
742 self._consecutive_errors += 1
743
744 if self._consecutive_errors == 1:
745 self.reactor.callLater(0, self._publish_connectivity_event, False)
746
747 self.log.exception('send-omci', e=e)
748 return fail(result=failure.Failure(e))
749
750 def _ok_to_send(self, tx_request, high_priority):
751 """
752 G.988 specifies not to issue a MIB upload or a Software download request
753 when a similar action is in progress on the other channel. To keep the
754 logic here simple, a new upload/download will not be allowed if either a
755 upload/download is going on
756
757 :param tx_request (OmciFrame) Frame to send
758 :param high_priority: (bool) for queue selection
759 :return: True if okay to dequeue and send frame
760 """
761 other = self._get_priority_index(not high_priority)
762
763 if self._tx_request[other] is None:
764 return True
765
766 this_msg_type = tx_request.fields['message_type'] & 0x1f
767 not_allowed = {OP.MibUpload.value,
768 OP.MibUploadNext.value,
769 OP.StartSoftwareDownload.value,
770 OP.DownloadSection.value,
771 OP.EndSoftwareDownload.value}
772
773 if this_msg_type not in not_allowed:
774 return True
775
776 other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
777 return other_msg_type not in not_allowed
778
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500779 @inlineCallbacks
Chip Boling67b674a2019-02-08 11:42:18 -0600780 def _send_next_request(self, high_priority):
781 """
782 Pull next tx request and send it
783
784 :param high_priority: (bool) True if this was a high priority request
785 :return: results, so callback chain continues if needed
786 """
787 index = self._get_priority_index(high_priority)
788
789 if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
790 d = None
791 try:
792 if len(self._pending[index]) and \
793 not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
794 high_priority):
795 reactor.callLater(0.05, self._send_next_request, high_priority)
796 return
797
798 next_frame = self._pending[index].pop(0)
799
800 d = next_frame[OMCI_CC.PENDING_DEFERRED]
801 frame = next_frame[OMCI_CC.PENDING_FRAME]
802 timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
803 retry = next_frame[OMCI_CC.PENDING_RETRY]
804
805 tx_tid = frame.fields['transaction_id']
806
807 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
808 # save the current value of the entity_id_to_class_map, then
809 # replace it with our custom one before decode, and then finally
810 # restore it later. Tried other ways but really made the code messy.
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400811 saved_me_map = omci_entities.entity_id_to_class_map
812 omci_entities.entity_id_to_class_map = self._me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600813
814 ts = arrow.utcnow().float_timestamp
815 try:
816 self._rx_response[index] = None
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500817
Matt Jeanneret8293a632019-07-31 15:56:13 -0400818 # NOTE: We preload the tx audit fields and enqueue ourselves for receive.
819 # This is because, the response could be faster than the yield send wakeup latency
820 self._tx_frames += 1
821
822 # Note: the 'd' deferred in the queued request we just got will
823 # already have its success callback queued (callLater -> 0) with a
824 # result of "queued". Here we need time it out internally so
825 # we can call cleanup appropriately. G.988 mentions that most ONUs
826 # will process an request in < 1 second.
827 dc_timeout = timeout if timeout > 0 else 1.0
828
829 # Timeout on internal deferred to support internal retries if requested
830 dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
831
832 # (timestamp, defer, frame, timeout, retry, delayedCall)
833 self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
834
835 if timeout > 0:
836 d.addCallbacks(self._request_success, self._request_failure,
837 callbackArgs=(high_priority,),
838 errbackArgs=(tx_tid, high_priority))
839
Mahir Gunyeld40c88d2019-05-16 14:09:42 -0700840 omci_msg = InterAdapterOmciMessage(
lcuibfdfbb52019-04-29 22:47:52 -0700841 message=bytes(frame),
Mahir Gunyeld40c88d2019-05-16 14:09:42 -0700842 proxy_address=self._proxy_address,
843 connect_status=self._device.connect_status)
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500844
lcuibfdfbb52019-04-29 22:47:52 -0700845 self.log.debug('sent-omci-msg', tid=tx_tid, omci_msg=hexlify(bytes(frame)))
Matt Jeanneret35c42d22019-03-07 17:23:27 -0500846
847 yield self._adapter_proxy.send_inter_adapter_message(
848 msg=omci_msg,
849 type=InterAdapterMessageType.OMCI_REQUEST,
850 from_adapter=self._device.type,
851 to_adapter=self._proxy_address.device_type,
852 to_device_id=self._device_id,
853 proxy_device_id=self._proxy_address.device_id
854 )
Matt Jeanneret8293a632019-07-31 15:56:13 -0400855 self.log.debug('done-inter-adapter-send-message', tx_tid=tx_tid)
Chip Boling67b674a2019-02-08 11:42:18 -0600856 finally:
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400857 omci_entities.entity_id_to_class_map = saved_me_map
Chip Boling67b674a2019-02-08 11:42:18 -0600858
Chip Boling67b674a2019-02-08 11:42:18 -0600859 except IndexError:
860 pass # Nothing pending in this queue
861
862 except Exception as e:
863 self.log.exception('send-proxy-exception', e=e)
864 self._tx_request[index] = None
865 self.reactor.callLater(0, self._send_next_request, high_priority)
866
867 if d is not None:
868 d.errback(failure.Failure(e))
Matt Jeanneret72fe6ae2019-04-13 20:58:47 -0400869 else:
870 self.log.debug("tx-request-occupied", index=index)
Chip Boling67b674a2019-02-08 11:42:18 -0600871
872 ###################################################################################
873 # MIB Action shortcuts
874
875 def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600876 frame = OntDataFrame().mib_reset()
877 return self.send(frame, timeout=timeout, high_priority=high_priority)
878
879 def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600880 frame = OntDataFrame().mib_upload()
881 return self.send(frame, timeout=timeout, high_priority=high_priority)
882
883 def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600884 frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
885 return self.send(frame, timeout=timeout, high_priority=high_priority)
886
887 def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600888 frame = OntGFrame().reboot()
889 return self.send(frame, timeout=timeout, high_priority=high_priority)
890
891 def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600892 frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
893 return self.send(frame, timeout=timeout, high_priority=high_priority)
894
895 def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
Chip Boling67b674a2019-02-08 11:42:18 -0600896 frame = OntDataFrame().get_all_alarm_next(seq_no)
897 return self.send(frame, timeout=timeout, high_priority=high_priority)
898
899 def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
900 frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
901 return self.send(frame, timeout, 3, high_priority=high_priority)
Zack Williams84a71e92019-11-15 09:00:19 -0700902
Chip Boling67b674a2019-02-08 11:42:18 -0600903 def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
Matt Jeanneret40f28392019-12-04 18:21:46 -0500904 # timeout=0 indicates no response needed
Chip Boling67b674a2019-02-08 11:42:18 -0600905 if timeout > 0:
906 frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
907 else:
908 frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
909 return self.send(frame, timeout, high_priority=high_priority)
Zack Williams84a71e92019-11-15 09:00:19 -0700910
Chip Boling67b674a2019-02-08 11:42:18 -0600911 def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
912 frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
913 return self.send(frame, timeout, high_priority=high_priority)
Chip Boling67b674a2019-02-08 11:42:18 -0600914
915 def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
916 frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
917 return self.send(frame, timeout, high_priority=high_priority)
918
919 def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
920 frame = SoftwareImageFrame(image_inst_id).commit_image()
921 return self.send(frame, timeout, high_priority=high_priority)
922