VOL-1395: Common shared libraries needed for Python based device adapters.
This is an initial check-in of code from the master branch. Additional work
is expected on a few items to work with the new go-core and will be covered
by separate JIRAs and commits.
Change-Id: I0856ec6b79b8d3e49082c609eb9c7eedd75b1708
diff --git a/python/adapters/extensions/omci/omci_cc.py b/python/adapters/extensions/omci/omci_cc.py
new file mode 100644
index 0000000..e1c6019
--- /dev/null
+++ b/python/adapters/extensions/omci/omci_cc.py
@@ -0,0 +1,1000 @@
+#
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+OMCI Message support
+"""
+
+import sys
+import arrow
+from twisted.internet import reactor, defer
+from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
+from common.frameio.frameio import hexify
+from voltha.extensions.omci.omci import *
+from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
+from voltha.extensions.omci.me_frame import MEFrame
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+from common.event_bus import EventBusClient
+from enum import IntEnum
+from binascii import hexlify
+
+
+def hexify(buffer):
+ """Return a hexadecimal string encoding of input buffer"""
+ return ''.join('%02x' % ord(c) for c in buffer)
+
+
+DEFAULT_OMCI_TIMEOUT = 10 # 3 # Seconds
+MAX_OMCI_REQUEST_AGE = 60 # Seconds
+DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
+MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
+
+CONNECTED_KEY = 'connected'
+TX_REQUEST_KEY = 'tx-request'
+RX_RESPONSE_KEY = 'rx-response'
+UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
+
+
+class OmciCCRxEvents(IntEnum):
+ AVC_Notification = 0,
+ MIB_Upload = 1,
+ MIB_Upload_Next = 2,
+ Create = 3,
+ Delete = 4,
+ Set = 5,
+ Alarm_Notification = 6,
+ Test_Result = 7,
+ MIB_Reset = 8,
+ Connectivity = 9,
+ Get_ALARM_Get = 10,
+ Get_ALARM_Get_Next = 11
+
+
+# abbreviations
+OP = EntityOperations
+RxEvent = OmciCCRxEvents
+
+
+class OMCI_CC(object):
+ """ Handle OMCI Communication Channel specifics for Adtran ONUs"""
+
+ MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
+ MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
+ MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
+ MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
+ LOW_PRIORITY = 0
+ HIGH_PRIORITY = 1
+
+ # Offset into some tuples for pending lists and tx in progress
+ PENDING_DEFERRED = 0
+ PENDING_FRAME = 1
+ PENDING_TIMEOUT = 2
+ PENDING_RETRY = 3
+
+ REQUEST_TIMESTAMP = 0
+ REQUEST_DEFERRED = 1
+ REQUEST_FRAME = 2
+ REQUEST_TIMEOUT = 3
+ REQUEST_RETRY = 4
+ REQUEST_DELAYED_CALL = 5
+
+ _frame_to_event_type = {
+ OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
+ OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
+ OmciMibUploadNextResponse.message_id: RxEvent.MIB_Upload_Next,
+ OmciCreateResponse.message_id: RxEvent.Create,
+ OmciDeleteResponse.message_id: RxEvent.Delete,
+ OmciSetResponse.message_id: RxEvent.Set,
+ OmciGetAllAlarmsResponse.message_id: RxEvent.Get_ALARM_Get,
+ OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
+ }
+
+ def __init__(self, adapter_agent, device_id, me_map=None,
+ clock=None):
+ self.log = structlog.get_logger(device_id=device_id)
+ self._adapter_agent = adapter_agent
+ self._device_id = device_id
+ self._proxy_address = None
+ self._enabled = False
+ self._extended_messaging = False
+ self._me_map = me_map
+ if clock is None:
+ self.reactor = reactor
+ else:
+ self.reactor = clock
+
+ # Support 2 levels of priority since only baseline message set supported
+ self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
+ self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
+ self._rx_response = [None, None]
+
+ # Statistics
+ self._tx_frames = 0
+ self._rx_frames = 0
+ self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
+ self._rx_onu_frames = 0 # Autonomously generated ONU frames
+ self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
+ self._rx_timeouts = 0
+ self._rx_late = 0 # Frame response received after timeout on Tx
+ self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
+ self._tx_errors = 0 # Exceptions during tx request
+ self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
+ self._reply_min = sys.maxint # Fastest successful tx -> rx
+ self._reply_max = 0 # Longest successful tx -> rx
+ self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
+ self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
+ self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
+
+ self.event_bus = EventBusClient()
+
+ # If a list of custom ME Entities classes were provided, insert them into
+ # main class_id to entity map.
+ # TODO: If this class becomes hidden from the ONU DA, move this to the OMCI State Machine runner
+
+ def __str__(self):
+ return "OMCISupport: {}".format(self._device_id)
+
+ def _get_priority_index(self, high_priority):
+ """ Centralized logic to help make extended message support easier in the future"""
+ return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
+ else OMCI_CC.LOW_PRIORITY
+
+ def _tid_is_high_priority(self, tid):
+ """ Centralized logic to help make extended message support easier in the future"""
+
+ return not self._extended_messaging and \
+ OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
+
+ @staticmethod
+ def event_bus_topic(device_id, event):
+ """
+ Get the topic name for a given event Frame Type
+ :param device_id: (str) ONU Device ID
+ :param event: (OmciCCRxEvents) Type of event
+ :return: (str) Topic string
+ """
+ assert event in OmciCCRxEvents, \
+ 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
+
+ return 'omci-rx:{}:{}'.format(device_id, event.name)
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ """
+ Enable/disable the OMCI Communications Channel
+
+ :param value: (boolean) True to enable, False to disable
+ """
+ assert isinstance(value, bool), 'enabled is a boolean'
+
+ if self._enabled != value:
+ self._enabled = value
+ if self._enabled:
+ self._start()
+ else:
+ self._stop()
+
+ @property
+ def tx_frames(self):
+ return self._tx_frames
+
+ @property
+ def rx_frames(self):
+ return self._rx_frames
+
+ @property
+ def rx_unknown_tid(self):
+ return self._rx_unknown_tid # Tx TID not found
+
+ @property
+ def rx_unknown_me(self):
+ return self._rx_unknown_me
+
+ @property
+ def rx_onu_frames(self):
+ return self._rx_onu_frames
+
+ @property
+ def rx_onu_discards(self):
+ return self._rx_onu_discards # Attribute Value change autonomous overflows
+
+ @property
+ def rx_timeouts(self):
+ return self._rx_timeouts
+
+ @property
+ def rx_late(self):
+ return self._rx_late
+
+ @property
+ def tx_errors(self):
+ return self._tx_errors
+
+ @property
+ def consecutive_errors(self):
+ return self._consecutive_errors
+
+ @property
+ def reply_min(self):
+ return int(round(self._reply_min * 1000.0)) # Milliseconds
+
+ @property
+ def reply_max(self):
+ return int(round(self._reply_max * 1000.0)) # Milliseconds
+
+ @property
+ def reply_average(self):
+ avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
+ return int(round(avg * 1000.0)) # Milliseconds
+
+ @property
+ def hp_tx_queue_len(self):
+ return len(self._pending[OMCI_CC.HIGH_PRIORITY])
+
+ @property
+ def lp_tx_queue_len(self):
+ return len(self._pending[OMCI_CC.LOW_PRIORITY])
+
+ @property
+ def max_hp_tx_queue(self):
+ return self._max_hp_tx_queue
+
+ @property
+ def max_lp_tx_queue(self):
+ return self._max_lp_tx_queue
+
+ def _start(self):
+ """
+ Start the OMCI Communications Channel
+ """
+ assert self._enabled, 'Start should only be called if enabled'
+ self.flush()
+
+ device = self._adapter_agent.get_device(self._device_id)
+ self._proxy_address = device.proxy_address
+
+ def _stop(self):
+ """
+ Stop the OMCI Communications Channel
+ """
+ assert not self._enabled, 'Stop should only be called if disabled'
+ self.flush()
+ self._proxy_address = None
+
+ def _receive_onu_message(self, rx_frame):
+ """ Autonomously generated ONU frame Rx handler"""
+ self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
+ frame=hexify(str(rx_frame)))
+
+ msg_type = rx_frame.fields['message_type']
+ self._rx_onu_frames += 1
+
+ msg = {TX_REQUEST_KEY: None,
+ RX_RESPONSE_KEY: rx_frame}
+
+ if msg_type == EntityOperations.AlarmNotification.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
+ self.reactor.callLater(0, self.event_bus.publish, topic, msg)
+
+ elif msg_type == EntityOperations.AttributeValueChange.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
+ self.reactor.callLater(0, self.event_bus.publish, topic, msg)
+
+ elif msg_type == EntityOperations.TestResult.value:
+ topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
+ self.reactor.callLater(0, self.event_bus.publish, topic, msg)
+
+ else:
+ self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
+ self._rx_onu_discards += 1
+
+ def _update_rx_tx_stats(self, now, ts):
+ ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
+ secs = ts_diff.total_seconds()
+ self._reply_sum += secs
+ if secs < self._reply_min:
+ self._reply_min = secs
+ if secs > self._reply_max:
+ self._reply_max = secs
+ return secs
+
+ def receive_message(self, msg):
+ """
+ Receive and OMCI message from the proxy channel to the OLT.
+
+ Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
+ :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
+ """
+ if not self.enabled:
+ return
+
+ try:
+ now = arrow.utcnow()
+ d = None
+
+ # NOTE: Since we may need to do an independent ME map on a per-ONU basis
+ # save the current value of the entity_id_to_class_map, then
+ # replace it with our custom one before decode, and then finally
+ # restore it later. Tried other ways but really made the code messy.
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
+
+ try:
+ rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
+ rx_tid = rx_frame.fields['transaction_id']
+
+ if rx_tid == 0:
+ return self._receive_onu_message(rx_frame)
+
+ # Previously unreachable if this is the very first Rx or we
+ # have been running consecutive errors
+ if self._rx_frames == 0 or self._consecutive_errors != 0:
+ self.reactor.callLater(0, self._publish_connectivity_event, True)
+
+ self._rx_frames += 1
+ self._consecutive_errors = 0
+
+ except KeyError as e:
+ # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
+ self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
+ rx_frame = self._decode_unknown_me(msg)
+ self._rx_unknown_me += 1
+ rx_tid = rx_frame.fields.get('transaction_id')
+
+ except Exception as e:
+ self.log.exception('frame-decode', msg=hexlify(msg), e=e)
+ return
+
+ finally:
+ omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
+
+ try:
+ high_priority = self._tid_is_high_priority(rx_tid)
+ index = self._get_priority_index(high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ last_tx_tuple = self._tx_request[index]
+
+ if last_tx_tuple is None or \
+ last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
+ # Possible late Rx on a message that timed-out
+ self._rx_unknown_tid += 1
+ self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
+ return
+
+ ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
+ if dc is not None and not dc.cancelled and not dc.called:
+ dc.cancel()
+ self.log.debug("cancel-timeout-called")
+
+ secs = self._update_rx_tx_stats(now, ts)
+
+ # Late arrival?
+ if d.called:
+ self._rx_late += 1
+ return
+
+ except Exception as e:
+ self.log.exception('frame-match', msg=hexlify(msg), e=e)
+ if d is not None:
+ return d.errback(failure.Failure(e))
+ return
+
+ # Extended processing needed. Note 'data' field will be None on some error
+ # status returns
+ omci_msg = rx_frame.fields['omci_message']
+
+ if isinstance(omci_msg, OmciGetResponse) and \
+ omci_msg.fields.get('data') is not None and \
+ 'table_attribute_mask' in omci_msg.fields['data']:
+ # Yes, run in a separate generator
+ reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
+ rx_frame, d, tx_frame, high_priority)
+ else:
+ # Publish Rx event to listeners in a different task
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
+
+ # begin success callback chain (will cancel timeout and queue next Tx message)
+ from copy import copy
+ original_callbacks = copy(d.callbacks)
+ self._rx_response[index] = rx_frame
+ d.callback(rx_frame)
+
+ except Exception as e:
+ self.log.exception('rx-msg', e=e)
+
+ @inlineCallbacks
+ def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
+ """
+ Special handling for Get Requests that may require additional 'get_next' operations
+ if a table attribute was requested.
+ """
+ omci_msg = rx_frame.fields['omci_message']
+ if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
+ try:
+ entity_class = omci_msg.fields['entity_class']
+ entity_id = omci_msg.fields['entity_id']
+ table_attributes = omci_msg.fields['data']['table_attribute_mask']
+
+ # Table attribute mask is encoded opposite of managed entity mask.
+ if entity_class in self._me_map:
+ ec = self._me_map[entity_class]
+ for index in xrange(16):
+ attr_mask = 1 << index
+
+ if attr_mask & table_attributes:
+ eca = ec.attributes[15-index]
+ self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
+
+ seq_no = 0
+ data_buffer = ''
+ count = omci_msg.fields['data'][eca.field.name + '_size']
+
+ if count > MAX_TABLE_ROW_COUNT:
+ self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
+ raise ValueError('Huge Table Size: {}'.format(count))
+
+ # Original timeout must be chopped up into each individual get-next request
+ # in order for total transaction to complete within the timeframe of the
+ # original get() timeout.
+ number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
+ timeout /= (1 + number_transactions)
+
+ # Start the loop
+ vals = []
+ for offset in xrange(0, count, OmciTableField.PDU_SIZE):
+ frame = MEFrame(ec, entity_id, {eca.field.name: seq_no}).get_next()
+ seq_no += 1
+
+ max_retries = 3
+ results = yield self.send(frame, min(timeout / max_retries, secs * 3), max_retries)
+
+ omci_getnext_msg = results.fields['omci_message']
+ status = omci_getnext_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise Exception('get-next-failure table=' + eca.field.name +
+ ' entity_id=' + str(entity_id) +
+ ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
+
+ # Extract the data
+ num_octets = count - offset
+ if num_octets > OmciTableField.PDU_SIZE:
+ num_octets = OmciTableField.PDU_SIZE
+
+ data = omci_getnext_msg.fields['data'][eca.field.name]
+ data_buffer += data[:num_octets]
+
+ while data_buffer:
+ data_buffer, val = eca.field.getfield(None, data_buffer)
+ vals.append(val)
+
+ omci_msg.fields['data'][eca.field.name] = vals
+ del omci_msg.fields['data'][eca.field.name + '_size']
+ self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
+ row_count=len(vals))
+ del omci_msg.fields['data']['table_attribute_mask']
+
+ except Exception as e:
+ self.log.exception('get-next-error', e=e)
+ d.errback(failure.Failure(e), high_priority)
+ return
+
+ # Notify sender of completed request
+ reactor.callLater(0, d.callback, rx_frame, high_priority)
+
+ # Publish Rx event to listeners in a different task except for internally-consumed get-next-response
+ if not isinstance(omci_msg, OmciGetNextResponse):
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
+
+ def _decode_unknown_me(self, msg):
+ """
+ Decode an ME for an unsupported class ID. This should only occur for a subset
+ of message types (Get, Set, MIB Upload Next, ...) and they should only be
+ responses as well.
+
+ There are some times below that are commented out. For VOLTHA 2.0, it is
+ expected that any get, set, create, delete for unique (often vendor) MEs
+ will be coded by the ONU utilizing it and supplied to OpenOMCI as a
+ vendor-specific ME during device initialization.
+
+ :param msg: (str) Binary data
+ :return: (OmciFrame) resulting frame
+ """
+ from struct import unpack
+
+ (tid, msg_type, framing) = unpack('!HBB', msg[0:4])
+
+ assert framing == 0xa, 'Only basic OMCI framing supported at this time'
+ msg = msg[4:]
+
+ # TODO: Commented out items below are future work (not expected for VOLTHA v2.0)
+ (msg_class, kwargs) = {
+ # OmciCreateResponse.message_id: (OmciCreateResponse, None),
+ # OmciDeleteResponse.message_id: (OmciDeleteResponse, None),
+ # OmciSetResponse.message_id: (OmciSetResponse, None),
+ # OmciGetResponse.message_id: (OmciGetResponse, None),
+ # OmciGetAllAlarmsNextResponse.message_id: (OmciGetAllAlarmsNextResponse, None),
+ OmciMibUploadNextResponse.message_id: (OmciMibUploadNextResponse,
+ {
+ 'entity_class': unpack('!H', msg[0:2])[0],
+ 'entity_id': unpack('!H', msg[2:4])[0],
+ 'object_entity_class': unpack('!H', msg[4:6])[0],
+ 'object_entity_id': unpack('!H', msg[6:8])[0],
+ 'object_attributes_mask': unpack('!H', msg[8:10])[0],
+ 'object_data': {
+ UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[10:-4])
+ },
+ }),
+ # OmciAlarmNotification.message_id: (OmciAlarmNotification, None),
+ OmciAttributeValueChange.message_id: (OmciAttributeValueChange,
+ {
+ 'entity_class': unpack('!H', msg[0:2])[0],
+ 'entity_id': unpack('!H', msg[2:4])[0],
+ 'data': {
+ UNKNOWN_CLASS_ATTRIBUTE_KEY: hexlify(msg[4:-8])
+ },
+ }),
+ # OmciTestResult.message_id: (OmciTestResult, None),
+ }.get(msg_type, None)
+
+ if msg_class is None:
+ raise TypeError('Unsupport Message Type for Unknown Decode: {}',
+ msg_type)
+
+ return OmciFrame(transaction_id=tid, message_type=msg_type,
+ omci_message=msg_class(**kwargs))
+
+ def _publish_rx_frame(self, tx_frame, rx_frame):
+ """
+ Notify listeners of successful response frame
+ :param tx_frame: (OmciFrame) Original request frame
+ :param rx_frame: (OmciFrame) Response frame
+ """
+ if self._enabled and isinstance(rx_frame, OmciFrame):
+ frame_type = rx_frame.fields['omci_message'].message_id
+ event_type = OMCI_CC._frame_to_event_type.get(frame_type)
+
+ if event_type is not None:
+ topic = OMCI_CC.event_bus_topic(self._device_id, event_type)
+ msg = {TX_REQUEST_KEY: tx_frame,
+ RX_RESPONSE_KEY: rx_frame}
+
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ def _publish_connectivity_event(self, connected):
+ """
+ Notify listeners of Rx/Tx connectivity over OMCI
+ :param connected: (bool) True if connectivity transitioned from unreachable
+ to reachable
+ """
+ if self._enabled:
+ topic = OMCI_CC.event_bus_topic(self._device_id,
+ RxEvent.Connectivity)
+ msg = {CONNECTED_KEY: connected}
+ self.event_bus.publish(topic=topic, msg=msg)
+
+ def flush(self):
+ """Flush/cancel in active or pending Tx requests"""
+ requests = []
+
+ for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
+ next_frame, self._tx_request[priority] = self._tx_request[priority], None
+ if next_frame is not None:
+ requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
+
+ requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
+ for next_frame in self._pending[priority]]
+ self._pending[priority] = list()
+
+ # Cancel them...
+ def cleanup_unhandled_error(_):
+ pass # So the cancel below does not flag an unhandled error
+
+ for d, dc in requests:
+ if d is not None and not d.called:
+ d.addErrback(cleanup_unhandled_error)
+ d.cancel()
+
+ if dc is not None and not dc.called and not dc.cancelled:
+ dc.cancel()
+
+ def _get_tx_tid(self, high_priority=False):
+ """
+ Get the next Transaction ID for a tx. Note TID=0 is reserved
+ for autonomously generated messages from an ONU
+
+ :return: (int) TID
+ """
+ if self._extended_messaging or not high_priority:
+ index = OMCI_CC.LOW_PRIORITY
+ min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
+ max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
+ else:
+ index = OMCI_CC.HIGH_PRIORITY
+ min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
+ max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
+
+ tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
+
+ if self._tx_tid[index] > max_tid:
+ self._tx_tid[index] = min_tid
+
+ return tx_tid
+
+ def _request_failure(self, value, tx_tid, high_priority):
+ """
+ Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
+ will call a different method that may retry if requested. This routine
+ will be called after the final (if any) timeout or other error
+
+ :param value: (Failure) Twisted failure
+ :param tx_tid: (int) Associated Tx TID
+ """
+ index = self._get_priority_index(high_priority)
+
+ if self._tx_request[index] is not None:
+ tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
+ tx_frame_tid = tx_frame.fields['transaction_id']
+
+ if tx_frame_tid == tx_tid:
+ timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
+ dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
+ self._tx_request[index] = None
+
+ if dc is not None and not dc.called and not dc.cancelled:
+ dc.cancel()
+
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ self._rx_timeouts += 1
+ self._consecutive_errors += 1
+ if self._consecutive_errors == 1:
+ reactor.callLater(0, self._publish_connectivity_event, False)
+
+ self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
+ value = failure.Failure(TimeoutError(timeout, "Deferred"))
+ else:
+ # Search pending queue. This may be a cancel coming in from the original
+ # task that requested the Tx. If found, remove
+ # from pending queue
+ for index, request in enumerate(self._pending[index]):
+ req = request.get(OMCI_CC.PENDING_DEFERRED)
+ if req is not None and req.fields['transaction_id'] == tx_tid:
+ self._pending[index].pop(index)
+ break
+
+ self._send_next_request(high_priority)
+ return value
+
+ def _request_success(self, rx_frame, high_priority):
+ """
+ Handle transmit success (a matching Rx was received)
+
+ :param rx_frame: (OmciFrame) OMCI response frame with matching TID
+ :return: (OmciFrame) OMCI response frame with matching TID
+ """
+ index = self._get_priority_index(high_priority)
+
+ if rx_frame is None:
+ rx_frame = self._rx_response[index]
+
+ rx_tid = rx_frame.fields.get('transaction_id')
+
+ if rx_tid is not None:
+ if self._tx_request[index] is not None:
+ tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
+ tx_tid = tx_frame.fields['transaction_id']
+
+ if rx_tid == tx_tid:
+ # Remove this request. Next callback in chain initiates next Tx
+ self._tx_request[index] = None
+ else:
+ self._rx_late += 1
+ else:
+ self._rx_late += 1
+
+ self._send_next_request(high_priority)
+
+ # Return rx_frame (to next item in callback list)
+ return rx_frame
+
+ def _request_timeout(self, tx_tid, high_priority):
+ """
+ Tx Request timed out. Resend immediately if there retries is non-zero. A
+ separate deferred (dc) is used on each actual Tx which is not the deferred
+ (d) that is returned to the caller of the 'send()' method.
+
+ :param tx_tid: (int) TID of frame
+ :param high_priority: (bool) True if high-priority queue
+ """
+ self.log.debug("_request_timeout", tx_tid=tx_tid)
+ index = self._get_priority_index(high_priority)
+
+ if self._tx_request[index] is not None:
+ # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
+ ts, d, frame, timeout, retry, _dc = self._tx_request[index]
+
+ if frame.fields.get('transaction_id', 0) == tx_tid:
+ self._tx_request[index] = None
+
+ if retry > 0:
+ # Push on front of TX pending queue so that it transmits next with the
+ # original TID
+ self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
+ else:
+ d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
+
+ self._send_next_request(high_priority)
+
+ def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
+ index = self._get_priority_index(high_priority)
+ tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
+
+ if front:
+ self._pending[index].insert(0, tuple)
+ else:
+ self._pending[index].append(tx_tuple)
+
+ # Monitor queue stats
+ qlen = len(self._pending[index])
+
+ if high_priority:
+ if self._max_hp_tx_queue < qlen:
+ self._max_hp_tx_queue = qlen
+
+ elif self._max_lp_tx_queue < qlen:
+ self._max_lp_tx_queue = qlen
+
+ def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
+ """
+ Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
+
+ :param frame: (OMCIFrame) Message to send
+ :param timeout: (int) Rx Timeout. 0=No response needed
+ :param retry: (int) Additional retry attempts on channel failure, default=0
+ :param high_priority: (bool) High Priority requests
+ :return: (deferred) A deferred that fires when the response frame is received
+ or if an error/timeout occurs
+ """
+ if not self.enabled or self._proxy_address is None:
+ # TODO custom exceptions throughout this code would be helpful
+ self._tx_errors += 1
+ return fail(result=failure.Failure(Exception('OMCI is not enabled')))
+
+ timeout = float(timeout)
+ if timeout > float(MAX_OMCI_REQUEST_AGE):
+ self._tx_errors += 1
+ msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
+ return fail(result=failure.Failure(Exception(msg)))
+
+ if not isinstance(frame, OmciFrame):
+ self._tx_errors += 1
+ msg = "Invalid frame class '{}'".format(type(frame))
+ return fail(result=failure.Failure(Exception(msg)))
+ try:
+ index = self._get_priority_index(high_priority)
+ tx_tid = frame.fields['transaction_id']
+
+ if tx_tid is None:
+ tx_tid = self._get_tx_tid(high_priority=high_priority)
+ frame.fields['transaction_id'] = tx_tid
+
+ assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
+ assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
+
+ # Queue it and request next Tx if tx channel is free
+ d = defer.Deferred()
+
+ self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
+ self._send_next_request(high_priority)
+
+ if timeout == 0:
+ self.log.debug("send-timeout-zero", tx_tid=tx_tid)
+ self.reactor.callLater(0, d.callback, 'queued')
+
+ return d
+
+ except Exception as e:
+ self._tx_errors += 1
+ self._consecutive_errors += 1
+
+ if self._consecutive_errors == 1:
+ self.reactor.callLater(0, self._publish_connectivity_event, False)
+
+ self.log.exception('send-omci', e=e)
+ return fail(result=failure.Failure(e))
+
+ def _ok_to_send(self, tx_request, high_priority):
+ """
+ G.988 specifies not to issue a MIB upload or a Software download request
+ when a similar action is in progress on the other channel. To keep the
+ logic here simple, a new upload/download will not be allowed if either a
+ upload/download is going on
+
+ :param tx_request (OmciFrame) Frame to send
+ :param high_priority: (bool) for queue selection
+ :return: True if okay to dequeue and send frame
+ """
+ other = self._get_priority_index(not high_priority)
+
+ if self._tx_request[other] is None:
+ return True
+
+ this_msg_type = tx_request.fields['message_type'] & 0x1f
+ not_allowed = {OP.MibUpload.value,
+ OP.MibUploadNext.value,
+ OP.StartSoftwareDownload.value,
+ OP.DownloadSection.value,
+ OP.EndSoftwareDownload.value}
+
+ if this_msg_type not in not_allowed:
+ return True
+
+ other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
+ return other_msg_type not in not_allowed
+
+ def _send_next_request(self, high_priority):
+ """
+ Pull next tx request and send it
+
+ :param high_priority: (bool) True if this was a high priority request
+ :return: results, so callback chain continues if needed
+ """
+ index = self._get_priority_index(high_priority)
+
+ if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
+ d = None
+ try:
+ if len(self._pending[index]) and \
+ not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
+ high_priority):
+ reactor.callLater(0.05, self._send_next_request, high_priority)
+ return
+
+ next_frame = self._pending[index].pop(0)
+
+ d = next_frame[OMCI_CC.PENDING_DEFERRED]
+ frame = next_frame[OMCI_CC.PENDING_FRAME]
+ timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
+ retry = next_frame[OMCI_CC.PENDING_RETRY]
+
+ tx_tid = frame.fields['transaction_id']
+
+ # NOTE: Since we may need to do an independent ME map on a per-ONU basis
+ # save the current value of the entity_id_to_class_map, then
+ # replace it with our custom one before decode, and then finally
+ # restore it later. Tried other ways but really made the code messy.
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
+
+ ts = arrow.utcnow().float_timestamp
+ try:
+ self._rx_response[index] = None
+ self._adapter_agent.send_proxied_message(self._proxy_address,
+ hexify(str(frame)))
+ finally:
+ omci_entities.entity_id_to_class_map = saved_me_map
+
+ self._tx_frames += 1
+
+ if timeout > 0:
+ # Timeout on internal deferred to support internal retries if requested
+ dc = self.reactor.callLater(timeout, self._request_timeout, tx_tid, high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
+ d.addCallbacks(self._request_success, self._request_failure,
+ callbackArgs=(high_priority,),
+ errbackArgs=(tx_tid, high_priority))
+
+ except IndexError:
+ pass # Nothing pending in this queue
+
+ except Exception as e:
+ self.log.exception('send-proxy-exception', e=e)
+ self._tx_request[index] = None
+ self.reactor.callLater(0, self._send_next_request, high_priority)
+
+ if d is not None:
+ d.errback(failure.Failure(e))
+
+ ###################################################################################
+ # MIB Action shortcuts
+
+ def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ """
+ Perform a MIB Reset
+ """
+ self.log.debug('send-mib-reset')
+
+ frame = OntDataFrame().mib_reset()
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ self.log.debug('send-mib-upload')
+
+ frame = OntDataFrame().mib_upload()
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ self.log.debug('send-mib-upload-next')
+
+ frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ """
+ Send an ONU Device reboot request (ONU-G ME).
+
+ NOTICE: This method is being deprecated and replaced with a tasks to preform this function
+ """
+ self.log.debug('send-mib-reboot')
+
+ frame = OntGFrame().reboot()
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ self.log.debug('send_get_alarm')
+
+ frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ self.log.debug('send_get_alarm_next')
+
+ frame = OntDataFrame().get_all_alarm_next(seq_no)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
+
+ def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
+ return self.send(frame, timeout, 3, high_priority=high_priority)
+
+ def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
+ """
+ # timeout=0 indicates no repons needed
+ """
+ # self.log.debug("send_download_section", instance_id=image_inst_id, section=section_num, timeout=timeout)
+ if timeout > 0:
+ frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
+ else:
+ frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
+ return self.send(frame, timeout, high_priority=high_priority)
+
+ # if timeout > 0:
+ # self.reactor.callLater(0, self.sim_receive_download_section_resp,
+ # frame.fields["transaction_id"],
+ # frame.fields["omci_message"].fields["section_number"])
+ # return d
+
+ def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
+ return self.send(frame, timeout, high_priority=high_priority)
+ # self.reactor.callLater(0, self.sim_receive_end_software_download_resp, frame.fields["transaction_id"])
+ # return d
+
+ def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
+ return self.send(frame, timeout, high_priority=high_priority)
+
+ def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
+ frame = SoftwareImageFrame(image_inst_id).commit_image()
+ return self.send(frame, timeout, high_priority=high_priority)
+