blob: e1c601905a9b9d0fb8e77030dab5862575b597c0 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001#
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""
17OMCI Message support
18"""
19
20import sys
21import arrow
22from twisted.internet import reactor, defer
23from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
24from common.frameio.frameio import hexify
25from voltha.extensions.omci.omci import *
26from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
27from voltha.extensions.omci.me_frame import MEFrame
28from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
29from common.event_bus import EventBusClient
30from enum import IntEnum
31from binascii import hexlify
32
33
34def hexify(buffer):
35 """Return a hexadecimal string encoding of input buffer"""
36 return ''.join('%02x' % ord(c) for c in buffer)
37
38
39DEFAULT_OMCI_TIMEOUT = 10 # 3 # Seconds
40MAX_OMCI_REQUEST_AGE = 60 # Seconds
41DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
42MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
43
44CONNECTED_KEY = 'connected'
45TX_REQUEST_KEY = 'tx-request'
46RX_RESPONSE_KEY = 'rx-response'
47UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
48
49
50class OmciCCRxEvents(IntEnum):
51 AVC_Notification = 0,
52 MIB_Upload = 1,
53 MIB_Upload_Next = 2,
54 Create = 3,
55 Delete = 4,
56 Set = 5,
57 Alarm_Notification = 6,
58 Test_Result = 7,
59 MIB_Reset = 8,
60 Connectivity = 9,
61 Get_ALARM_Get = 10,
62 Get_ALARM_Get_Next = 11
63
64
65# abbreviations
66OP = EntityOperations
67RxEvent = OmciCCRxEvents
68
69
70class OMCI_CC(object):
71 """ Handle OMCI Communication Channel specifics for Adtran ONUs"""
72
73 MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
74 MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
75 MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
76 MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
77 LOW_PRIORITY = 0
78 HIGH_PRIORITY = 1
79
80 # Offset into some tuples for pending lists and tx in progress
81 PENDING_DEFERRED = 0
82 PENDING_FRAME = 1
83 PENDING_TIMEOUT = 2
84 PENDING_RETRY = 3
85
86 REQUEST_TIMESTAMP = 0
87 REQUEST_DEFERRED = 1
88 REQUEST_FRAME = 2
89 REQUEST_TIMEOUT = 3
90 REQUEST_RETRY = 4
91 REQUEST_DELAYED_CALL = 5
92
93 _frame_to_event_type = {
94 OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
95 OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
96 OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
97 OmciCreateResponse.message_id: RxEvent.Create,
98 OmciDeleteResponse.message_id: RxEvent.Delete,
99 OmciSetResponse.message_id: RxEvent.Set,
100 OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
101 OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
102 }
103
104 def __init__(self, adapter_agent, device_id, me_map=None,
105 clock=None):
106 self.log = structlog.get_logger(device_id=device_id)
107 self._adapter_agent = adapter_agent
108 self._device_id = device_id
109 self._proxy_address = None
110 self._enabled = False
111 self._extended_messaging = False
112 self._me_map = me_map
113 if clock is None:
114 self.reactor = reactor
115 else:
116 self.reactor = clock
117
118 # Support 2 levels of priority since only baseline message set supported
119 self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
120 self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
121 self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
122 self._rx_response = [None, None]
123
124 # Statistics
125 self._tx_frames = 0
126 self._rx_frames = 0
127 self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
128 self._rx_onu_frames = 0 # Autonomously generated ONU frames
129 self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
130 self._rx_timeouts = 0
131 self._rx_late = 0 # Frame response received after timeout on Tx
132 self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
133 self._tx_errors = 0 # Exceptions during tx request
134 self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
135 self._reply_min = sys.maxint # Fastest successful tx -> rx
136 self._reply_max = 0 # Longest successful tx -> rx
137 self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
138 self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
139 self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
140
141 self.event_bus = EventBusClient()
142
143 # If a list of custom ME Entities classes were provided, insert them into
144 # main class_id to entity map.
145 # TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
146
147 def __str__(self):
148 return "OMCISupport: {}".format(self._device_id)
149
150 def _get_priority_index(self, high_priority):
151 """ Centralized logic to help make extended message support easier in the future"""
152 return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
153 else OMCI_CC.LOW_PRIORITY
154
155 def _tid_is_high_priority(self, tid):
156 """ Centralized logic to help make extended message support easier in the future"""
157
158 return not self._extended_messaging and \
159 OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
160
161 @staticmethod
162 def event_bus_topic(device_id, event):
163 """
164 Get the topic name for a given event Frame Type
165 :param device_id: (str) ONU Device ID
166 :param event: (OmciCCRxEvents) Type of event
167 :return: (str) Topic string
168 """
169 assert event in OmciCCRxEvents, \
170 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
171
172 return 'omci-rx:{}:{}'.format(device_id, event.name)
173
174 @property
175 def enabled(self):
176 return self._enabled
177
178 @enabled.setter
179 def enabled(self, value):
180 """
181 Enable/disable the OMCI Communications Channel
182
183 :param value: (boolean) True to enable, False to disable
184 """
185 assert isinstance(value, bool), 'enabled is a boolean'
186
187 if self._enabled != value:
188 self._enabled = value
189 if self._enabled:
190 self._start()
191 else:
192 self._stop()
193
194 @property
195 def tx_frames(self):
196 return self._tx_frames
197
198 @property
199 def rx_frames(self):
200 return self._rx_frames
201
202 @property
203 def rx_unknown_tid(self):
204 return self._rx_unknown_tid # Tx TID not found
205
206 @property
207 def rx_unknown_me(self):
208 return self._rx_unknown_me
209
210 @property
211 def rx_onu_frames(self):
212 return self._rx_onu_frames
213
214 @property
215 def rx_onu_discards(self):
216 return self._rx_onu_discards # Attribute Value change autonomous overflows
217
218 @property
219 def rx_timeouts(self):
220 return self._rx_timeouts
221
222 @property
223 def rx_late(self):
224 return self._rx_late
225
226 @property
227 def tx_errors(self):
228 return self._tx_errors
229
230 @property
231 def consecutive_errors(self):
232 return self._consecutive_errors
233
234 @property
235 def reply_min(self):
236 return int(round(self._reply_min * 1000.0)) # Milliseconds
237
238 @property
239 def reply_max(self):
240 return int(round(self._reply_max * 1000.0)) # Milliseconds
241
242 @property
243 def reply_average(self):
244 avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
245 return int(round(avg * 1000.0)) # Milliseconds
246
247 @property
248 def hp_tx_queue_len(self):
249 return len(self._pending[OMCI_CC.HIGH_PRIORITY])
250
251 @property
252 def lp_tx_queue_len(self):
253 return len(self._pending[OMCI_CC.LOW_PRIORITY])
254
255 @property
256 def max_hp_tx_queue(self):
257 return self._max_hp_tx_queue
258
259 @property
260 def max_lp_tx_queue(self):
261 return self._max_lp_tx_queue
262
263 def _start(self):
264 """
265 Start the OMCI Communications Channel
266 """
267 assert self._enabled, 'Start should only be called if enabled'
268 self.flush()
269
270 device = self._adapter_agent.get_device(self._device_id)
271 self._proxy_address = device.proxy_address
272
273 def _stop(self):
274 """
275 Stop the OMCI Communications Channel
276 """
277 assert not self._enabled, 'Stop should only be called if disabled'
278 self.flush()
279 self._proxy_address = None
280
281 def _receive_onu_message(self, rx_frame):
282 """ Autonomously generated ONU frame Rx handler"""
283 self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
284 frame=hexify(str(rx_frame)))
285
286 msg_type = rx_frame.fields['message_type']
287 self._rx_onu_frames += 1
288
289 msg = {TX_REQUEST_KEY: None,
290 RX_RESPONSE_KEY: rx_frame}
291
292 if msg_type == EntityOperations.AlarmNotification.value:
293 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
294 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
295
296 elif msg_type == EntityOperations.AttributeValueChange.value:
297 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
298 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
299
300 elif msg_type == EntityOperations.TestResult.value:
301 topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
302 self.reactor.callLater(0, self.event_bus.publish, topic, msg)
303
304 else:
305 self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
306 self._rx_onu_discards += 1
307
308 def _update_rx_tx_stats(self, now, ts):
309 ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
310 secs = ts_diff.total_seconds()
311 self._reply_sum += secs
312 if secs < self._reply_min:
313 self._reply_min = secs
314 if secs > self._reply_max:
315 self._reply_max = secs
316 return secs
317
318 def receive_message(self, msg):
319 """
320 Receive and OMCI message from the proxy channel to the OLT.
321
322 Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
323 :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
324 """
325 if not self.enabled:
326 return
327
328 try:
329 now = arrow.utcnow()
330 d = None
331
332 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
333 # save the current value of the entity_id_to_class_map, then
334 # replace it with our custom one before decode, and then finally
335 # restore it later. Tried other ways but really made the code messy.
336 saved_me_map = omci_entities.entity_id_to_class_map
337 omci_entities.entity_id_to_class_map = self._me_map
338
339 try:
340 rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
341 rx_tid = rx_frame.fields['transaction_id']
342
343 if rx_tid == 0:
344 return self._receive_onu_message(rx_frame)
345
346 # Previously unreachable if this is the very first Rx or we
347 # have been running consecutive errors
348 if self._rx_frames == 0 or self._consecutive_errors != 0:
349 self.reactor.callLater(0, self._publish_connectivity_event, True)
350
351 self._rx_frames += 1
352 self._consecutive_errors = 0
353
354 except KeyError as e:
355 # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
356 self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
357 rx_frame = self._decode_unknown_me(msg)
358 self._rx_unknown_me += 1
359 rx_tid = rx_frame.fields.get('transaction_id')
360
361 except Exception as e:
362 self.log.exception('frame-decode', msg=hexlify(msg), e=e)
363 return
364
365 finally:
366 omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
367
368 try:
369 high_priority = self._tid_is_high_priority(rx_tid)
370 index = self._get_priority_index(high_priority)
371
372 # (timestamp, defer, frame, timeout, retry, delayedCall)
373 last_tx_tuple = self._tx_request[index]
374
375 if last_tx_tuple is None or \
376 last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
377 # Possible late Rx on a message that timed-out
378 self._rx_unknown_tid += 1
379 self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
380 return
381
382 ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
383 if dc is not None and not dc.cancelled and not dc.called:
384 dc.cancel()
385 self.log.debug("cancel-timeout-called")
386
387 secs = self._update_rx_tx_stats(now, ts)
388
389 # Late arrival?
390 if d.called:
391 self._rx_late += 1
392 return
393
394 except Exception as e:
395 self.log.exception('frame-match', msg=hexlify(msg), e=e)
396 if d is not None:
397 return d.errback(failure.Failure(e))
398 return
399
400 # Extended processing needed. Note 'data' field will be None on some error
401 # status returns
402 omci_msg = rx_frame.fields['omci_message']
403
404 if isinstance(omci_msg, OmciGetResponse) and \
405 omci_msg.fields.get('data') is not None and \
406 'table_attribute_mask' in omci_msg.fields['data']:
407 # Yes, run in a separate generator
408 reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
409 rx_frame, d, tx_frame, high_priority)
410 else:
411 # Publish Rx event to listeners in a different task
412 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
413
414 # begin success callback chain (will cancel timeout and queue next Tx message)
415 from copy import copy
416 original_callbacks = copy(d.callbacks)
417 self._rx_response[index] = rx_frame
418 d.callback(rx_frame)
419
420 except Exception as e:
421 self.log.exception('rx-msg', e=e)
422
423 @inlineCallbacks
424 def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
425 """
426 Special handling for Get Requests that may require additional 'get_next' operations
427 if a table attribute was requested.
428 """
429 omci_msg = rx_frame.fields['omci_message']
430 if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
431 try:
432 entity_class = omci_msg.fields['entity_class']
433 entity_id = omci_msg.fields['entity_id']
434 table_attributes = omci_msg.fields['data']['table_attribute_mask']
435
436 # Table attribute mask is encoded opposite of managed entity mask.
437 if entity_class in self._me_map:
438 ec = self._me_map[entity_class]
439 for index in xrange(16):
440 attr_mask = 1 << index
441
442 if attr_mask & table_attributes:
443 eca = ec.attributes[15-index]
444 self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
445
446 seq_no = 0
447 data_buffer = ''
448 count = omci_msg.fields['data'][eca.field.name + '_size']
449
450 if count > MAX_TABLE_ROW_COUNT:
451 self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
452 raise ValueError('Huge Table Size: {}'.format(count))
453
454 # Original timeout must be chopped up into each individual get-next request
455 # in order for total transaction to complete within the timeframe of the
456 # original get() timeout.
457 number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
458 timeout /= (1 + number_transactions)
459
460 # Start the loop
461 vals = []
462 for offset in xrange(0, count, OmciTableField.PDU_SIZE):
463 frame = MEFrame(ec, entity_id, {eca.field.name: seq_no}).get_next()
464 seq_no += 1
465
466 max_retries = 3
467 results = yield self.send(frame, min(timeout / max_retries, secs * 3), max_retries)
468
469 omci_getnext_msg = results.fields['omci_message']
470 status = omci_getnext_msg.fields['success_code']
471
472 if status != ReasonCodes.Success.value:
473 raise Exception('get-next-failure table=' + eca.field.name +
474 ' entity_id=' + str(entity_id) +
475 ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
476
477 # Extract the data
478 num_octets = count - offset
479 if num_octets > OmciTableField.PDU_SIZE:
480 num_octets = OmciTableField.PDU_SIZE
481
482 data = omci_getnext_msg.fields['data'][eca.field.name]
483 data_buffer += data[:num_octets]
484
485 while data_buffer:
486 data_buffer, val = eca.field.getfield(None, data_buffer)
487 vals.append(val)
488
489 omci_msg.fields['data'][eca.field.name] = vals
490 del omci_msg.fields['data'][eca.field.name + '_size']
491 self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
492 row_count=len(vals))
493 del omci_msg.fields['data']['table_attribute_mask']
494
495 except Exception as e:
496 self.log.exception('get-next-error', e=e)
497 d.errback(failure.Failure(e), high_priority)
498 return
499
500 # Notify sender of completed request
501 reactor.callLater(0, d.callback, rx_frame, high_priority)
502
503 # Publish Rx event to listeners in a different task except for internally-consumed get-next-response
504 if not isinstance(omci_msg, OmciGetNextResponse):
505 reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
506
507 def _decode_unknown_me(self, msg):
508 """
509 Decode an ME for an unsupported class ID. This should only occur for a subset
510 of message types (Get, Set, MIB Upload Next, ...) and they should only be
511 responses as well.
512
513 There are some times below that are commented out. For VOLTHA 2.0, it is
514 expected that any get, set, create, delete for unique (often vendor) MEs
515 will be coded by the ONU utilizing it and supplied to OpenOMCI as a
516 vendor-specific ME during device initialization.
517
518 :param msg: (str) Binary data
519 :return: (OmciFrame) resulting frame
520 """
521 from struct import unpack
522
523 (tid, msg_type, framing) = unpack('!HBB', msg[0:4])
524
525 assert framing == 0xa, 'Only basic OMCI framing supported at this time'
526 msg = msg[4:]
527
528 # TODO: Commented out items below are future work (not expected for VOLTHA v2.0)
529 (msg_class, kwargs) = {
530 # OmciCreateResponse.message_id: (OmciCreateResponse, None),
531 # OmciDeleteResponse.message_id: (OmciDeleteResponse, None),
532 # OmciSetResponse.message_id: (OmciSetResponse, None),
533 # OmciGetResponse.message_id: (OmciGetResponse, None),
534 # OmciGetAllAlarmsNextResponse.message_id: (OmciGetAllAlarmsNextResponse, None),
535 OmciMibUploadNextResponse.message_id: (OmciMibUploadNextResponse,
536 {
537 'entity_class': unpack('!H', msg[0:2])[0],
538 'entity_id': unpack('!H', msg[2:4])[0],
539 'object_entity_class': unpack('!H', msg[4:6])[0],
540 'object_entity_id': unpack('!H', msg[6:8])[0],
541 'object_attributes_mask': unpack('!H', msg[8:10])[0],
542 'object_data': {
543 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[10:-4])
544 },
545 }),
546 # OmciAlarmNotification.message_id: (OmciAlarmNotification, None),
547 OmciAttributeValueChange.message_id: (OmciAttributeValueChange,
548 {
549 'entity_class': unpack('!H', msg[0:2])[0],
550 'entity_id': unpack('!H', msg[2:4])[0],
551 'data': {
552 UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[4:-8])
553 },
554 }),
555 # OmciTestResult.message_id: (OmciTestResult, None),
556 }.get(msg_type, None)
557
558 if msg_class is None:
559 raise TypeError('Unsupport Message Type for Unknown Decode: {}',
560 msg_type)
561
562 return OmciFrame(transaction_id=tid, message_type=msg_type,
563 omci_message=msg_class(**kwargs))
564
565 def _publish_rx_frame(self, tx_frame, rx_frame):
566 """
567 Notify listeners of successful response frame
568 :param tx_frame: (OmciFrame) Original request frame
569 :param rx_frame: (OmciFrame) Response frame
570 """
571 if self._enabled and isinstance(rx_frame, OmciFrame):
572 frame_type = rx_frame.fields['omci_message'].message_id
573 event_type = OMCI_CC._frame_to_event_type.get(frame_type)
574
575 if event_type is not None:
576 topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
577 msg = {TX_REQUEST_KEY: tx_frame,
578 RX_RESPONSE_KEY: rx_frame}
579
580 self.event_bus.publish(topic=topic, msg=msg)
581
582 def _publish_connectivity_event(self, connected):
583 """
584 Notify listeners of Rx/Tx connectivity over OMCI
585 :param connected: (bool) True if connectivity transitioned from unreachable
586 to reachable
587 """
588 if self._enabled:
589 topic = OMCI_CC.event_bus_topic(self._device_id,
590 RxEvent.Connectivity)
591 msg = {CONNECTED_KEY: connected}
592 self.event_bus.publish(topic=topic, msg=msg)
593
594 def flush(self):
595 """Flush/cancel in active or pending Tx requests"""
596 requests = []
597
598 for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
599 next_frame, self._tx_request[priority] = self._tx_request[priority], None
600 if next_frame is not None:
601 requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
602
603 requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
604 for next_frame in self._pending[priority]]
605 self._pending[priority] = list()
606
607 # Cancel them...
608 def cleanup_unhandled_error(_):
609 pass # So the cancel below does not flag an unhandled error
610
611 for d, dc in requests:
612 if d is not None and not d.called:
613 d.addErrback(cleanup_unhandled_error)
614 d.cancel()
615
616 if dc is not None and not dc.called and not dc.cancelled:
617 dc.cancel()
618
619 def _get_tx_tid(self, high_priority=False):
620 """
621 Get the next Transaction ID for a tx. Note TID=0 is reserved
622 for autonomously generated messages from an ONU
623
624 :return: (int) TID
625 """
626 if self._extended_messaging or not high_priority:
627 index = OMCI_CC.LOW_PRIORITY
628 min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
629 max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
630 else:
631 index = OMCI_CC.HIGH_PRIORITY
632 min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
633 max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
634
635 tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
636
637 if self._tx_tid[index] > max_tid:
638 self._tx_tid[index] = min_tid
639
640 return tx_tid
641
642 def _request_failure(self, value, tx_tid, high_priority):
643 """
644 Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
645 will call a different method that may retry if requested. This routine
646 will be called after the final (if any) timeout or other error
647
648 :param value: (Failure) Twisted failure
649 :param tx_tid: (int) Associated Tx TID
650 """
651 index = self._get_priority_index(high_priority)
652
653 if self._tx_request[index] is not None:
654 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
655 tx_frame_tid = tx_frame.fields['transaction_id']
656
657 if tx_frame_tid == tx_tid:
658 timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
659 dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
660 self._tx_request[index] = None
661
662 if dc is not None and not dc.called and not dc.cancelled:
663 dc.cancel()
664
665 if isinstance(value, failure.Failure):
666 value.trap(CancelledError)
667 self._rx_timeouts += 1
668 self._consecutive_errors += 1
669 if self._consecutive_errors == 1:
670 reactor.callLater(0, self._publish_connectivity_event, False)
671
672 self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
673 value = failure.Failure(TimeoutError(timeout, "Deferred"))
674 else:
675 # Search pending queue. This may be a cancel coming in from the original
676 # task that requested the Tx. If found, remove
677 # from pending queue
678 for index, request in enumerate(self._pending[index]):
679 req = request.get(OMCI_CC.PENDING_DEFERRED)
680 if req is not None and req.fields['transaction_id'] == tx_tid:
681 self._pending[index].pop(index)
682 break
683
684 self._send_next_request(high_priority)
685 return value
686
687 def _request_success(self, rx_frame, high_priority):
688 """
689 Handle transmit success (a matching Rx was received)
690
691 :param rx_frame: (OmciFrame) OMCI response frame with matching TID
692 :return: (OmciFrame) OMCI response frame with matching TID
693 """
694 index = self._get_priority_index(high_priority)
695
696 if rx_frame is None:
697 rx_frame = self._rx_response[index]
698
699 rx_tid = rx_frame.fields.get('transaction_id')
700
701 if rx_tid is not None:
702 if self._tx_request[index] is not None:
703 tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
704 tx_tid = tx_frame.fields['transaction_id']
705
706 if rx_tid == tx_tid:
707 # Remove this request. Next callback in chain initiates next Tx
708 self._tx_request[index] = None
709 else:
710 self._rx_late += 1
711 else:
712 self._rx_late += 1
713
714 self._send_next_request(high_priority)
715
716 # Return rx_frame (to next item in callback list)
717 return rx_frame
718
719 def _request_timeout(self, tx_tid, high_priority):
720 """
721 Tx Request timed out. Resend immediately if there retries is non-zero. A
722 separate deferred (dc) is used on each actual Tx which is not the deferred
723 (d) that is returned to the caller of the 'send()' method.
724
725 :param tx_tid: (int) TID of frame
726 :param high_priority: (bool) True if high-priority queue
727 """
728 self.log.debug("_request_timeout", tx_tid=tx_tid)
729 index = self._get_priority_index(high_priority)
730
731 if self._tx_request[index] is not None:
732 # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
733 ts, d, frame, timeout, retry, _dc = self._tx_request[index]
734
735 if frame.fields.get('transaction_id', 0) == tx_tid:
736 self._tx_request[index] = None
737
738 if retry > 0:
739 # Push on front of TX pending queue so that it transmits next with the
740 # original TID
741 self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
742 else:
743 d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
744
745 self._send_next_request(high_priority)
746
747 def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
748 index = self._get_priority_index(high_priority)
749 tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
750
751 if front:
752 self._pending[index].insert(0, tuple)
753 else:
754 self._pending[index].append(tx_tuple)
755
756 # Monitor queue stats
757 qlen = len(self._pending[index])
758
759 if high_priority:
760 if self._max_hp_tx_queue < qlen:
761 self._max_hp_tx_queue = qlen
762
763 elif self._max_lp_tx_queue < qlen:
764 self._max_lp_tx_queue = qlen
765
766 def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
767 """
768 Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
769
770 :param frame: (OMCIFrame) Message to send
771 :param timeout: (int) Rx Timeout. 0=No response needed
772 :param retry: (int) Additional retry attempts on channel failure, default=0
773 :param high_priority: (bool) High Priority requests
774 :return: (deferred) A deferred that fires when the response frame is received
775 or if an error/timeout occurs
776 """
777 if not self.enabled or self._proxy_address is None:
778 # TODO custom exceptions throughout this code would be helpful
779 self._tx_errors += 1
780 return fail(result=failure.Failure(Exception('OMCI is not enabled')))
781
782 timeout = float(timeout)
783 if timeout > float(MAX_OMCI_REQUEST_AGE):
784 self._tx_errors += 1
785 msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
786 return fail(result=failure.Failure(Exception(msg)))
787
788 if not isinstance(frame, OmciFrame):
789 self._tx_errors += 1
790 msg = "Invalid frame class '{}'".format(type(frame))
791 return fail(result=failure.Failure(Exception(msg)))
792 try:
793 index = self._get_priority_index(high_priority)
794 tx_tid = frame.fields['transaction_id']
795
796 if tx_tid is None:
797 tx_tid = self._get_tx_tid(high_priority=high_priority)
798 frame.fields['transaction_id'] = tx_tid
799
800 assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
801 assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
802
803 # Queue it and request next Tx if tx channel is free
804 d = defer.Deferred()
805
806 self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
807 self._send_next_request(high_priority)
808
809 if timeout == 0:
810 self.log.debug("send-timeout-zero", tx_tid=tx_tid)
811 self.reactor.callLater(0, d.callback, 'queued')
812
813 return d
814
815 except Exception as e:
816 self._tx_errors += 1
817 self._consecutive_errors += 1
818
819 if self._consecutive_errors == 1:
820 self.reactor.callLater(0, self._publish_connectivity_event, False)
821
822 self.log.exception('send-omci', e=e)
823 return fail(result=failure.Failure(e))
824
825 def _ok_to_send(self, tx_request, high_priority):
826 """
827 G.988 specifies not to issue a MIB upload or a Software download request
828 when a similar action is in progress on the other channel. To keep the
829 logic here simple, a new upload/download will not be allowed if either a
830 upload/download is going on
831
832 :param tx_request (OmciFrame) Frame to send
833 :param high_priority: (bool) for queue selection
834 :return: True if okay to dequeue and send frame
835 """
836 other = self._get_priority_index(not high_priority)
837
838 if self._tx_request[other] is None:
839 return True
840
841 this_msg_type = tx_request.fields['message_type'] & 0x1f
842 not_allowed = {OP.MibUpload.value,
843 OP.MibUploadNext.value,
844 OP.StartSoftwareDownload.value,
845 OP.DownloadSection.value,
846 OP.EndSoftwareDownload.value}
847
848 if this_msg_type not in not_allowed:
849 return True
850
851 other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
852 return other_msg_type not in not_allowed
853
854 def _send_next_request(self, high_priority):
855 """
856 Pull next tx request and send it
857
858 :param high_priority: (bool) True if this was a high priority request
859 :return: results, so callback chain continues if needed
860 """
861 index = self._get_priority_index(high_priority)
862
863 if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
864 d = None
865 try:
866 if len(self._pending[index]) and \
867 not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
868 high_priority):
869 reactor.callLater(0.05, self._send_next_request, high_priority)
870 return
871
872 next_frame = self._pending[index].pop(0)
873
874 d = next_frame[OMCI_CC.PENDING_DEFERRED]
875 frame = next_frame[OMCI_CC.PENDING_FRAME]
876 timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
877 retry = next_frame[OMCI_CC.PENDING_RETRY]
878
879 tx_tid = frame.fields['transaction_id']
880
881 # NOTE: Since we may need to do an independent ME map on a per-ONU basis
882 # save the current value of the entity_id_to_class_map, then
883 # replace it with our custom one before decode, and then finally
884 # restore it later. Tried other ways but really made the code messy.
885 saved_me_map = omci_entities.entity_id_to_class_map
886 omci_entities.entity_id_to_class_map = self._me_map
887
888 ts = arrow.utcnow().float_timestamp
889 try:
890 self._rx_response[index] = None
891 self._adapter_agent.send_proxied_message(self._proxy_address,
892 hexify(str(frame)))
893 finally:
894 omci_entities.entity_id_to_class_map = saved_me_map
895
896 self._tx_frames += 1
897
898 if timeout > 0:
899 # Timeout on internal deferred to support internal retries if requested
900 dc = self.reactor.callLater(timeout, self._request_timeout, tx_tid, high_priority)
901
902 # (timestamp, defer, frame, timeout, retry, delayedCall)
903 self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
904 d.addCallbacks(self._request_success, self._request_failure,
905 callbackArgs=(high_priority,),
906 errbackArgs=(tx_tid, high_priority))
907
908 except IndexError:
909 pass # Nothing pending in this queue
910
911 except Exception as e:
912 self.log.exception('send-proxy-exception', e=e)
913 self._tx_request[index] = None
914 self.reactor.callLater(0, self._send_next_request, high_priority)
915
916 if d is not None:
917 d.errback(failure.Failure(e))
918
919 ###################################################################################
920 # MIB Action shortcuts
921
922 def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
923 """
924 Perform a MIB Reset
925 """
926 self.log.debug('send-mib-reset')
927
928 frame = OntDataFrame().mib_reset()
929 return self.send(frame, timeout=timeout, high_priority=high_priority)
930
931 def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
932 self.log.debug('send-mib-upload')
933
934 frame = OntDataFrame().mib_upload()
935 return self.send(frame, timeout=timeout, high_priority=high_priority)
936
937 def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
938 self.log.debug('send-mib-upload-next')
939
940 frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
941 return self.send(frame, timeout=timeout, high_priority=high_priority)
942
943 def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
944 """
945 Send an ONU Device reboot request (ONU-G ME).
946
947 NOTICE: This method is being deprecated and replaced with a tasks to preform this function
948 """
949 self.log.debug('send-mib-reboot')
950
951 frame = OntGFrame().reboot()
952 return self.send(frame, timeout=timeout, high_priority=high_priority)
953
954 def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
955 self.log.debug('send_get_alarm')
956
957 frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
958 return self.send(frame, timeout=timeout, high_priority=high_priority)
959
960 def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
961 self.log.debug('send_get_alarm_next')
962
963 frame = OntDataFrame().get_all_alarm_next(seq_no)
964 return self.send(frame, timeout=timeout, high_priority=high_priority)
965
966 def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
967 frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
968 return self.send(frame, timeout, 3, high_priority=high_priority)
969
970 def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
971 """
972 # timeout=0 indicates no repons needed
973 """
974 # self.log.debug("send_download_section", instance_id=image_inst_id, section=section_num, timeout=timeout)
975 if timeout > 0:
976 frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
977 else:
978 frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
979 return self.send(frame, timeout, high_priority=high_priority)
980
981 # if timeout > 0:
982 # self.reactor.callLater(0, self.sim_receive_download_section_resp,
983 # frame.fields["transaction_id"],
984 # frame.fields["omci_message"].fields["section_number"])
985 # return d
986
987 def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
988 frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
989 return self.send(frame, timeout, high_priority=high_priority)
990 # self.reactor.callLater(0, self.sim_receive_end_software_download_resp, frame.fields["transaction_id"])
991 # return d
992
993 def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
994 frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
995 return self.send(frame, timeout, high_priority=high_priority)
996
997 def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
998 frame = SoftwareImageFrame(image_inst_id).commit_image()
999 return self.send(frame, timeout, high_priority=high_priority)
1000