VOL-1495: Update OpenOMCI to later version.
Capable of proper table handling, needed to push
extended vlan tagging operation
Should match commit: https://gerrit.opencord.org/#/c/12815/
in voltha 1.x tree
Change-Id: I05135afd56407c9496b2dfcde67938fd58a99020
diff --git a/pyvoltha/adapters/extensions/omci/me_frame.py b/pyvoltha/adapters/extensions/omci/me_frame.py
index ae3c191..4f451ee 100644
--- a/pyvoltha/adapters/extensions/omci/me_frame.py
+++ b/pyvoltha/adapters/extensions/omci/me_frame.py
@@ -42,6 +42,9 @@
return '{}: Entity_ID: {}, Data: {}'.\
format(self.entity_class_name, self._entity_id, self.data)
+ def __repr__(self):
+ return str(self)
+
@property
def entity_class(self):
"""
diff --git a/pyvoltha/adapters/extensions/omci/omci_cc.py b/pyvoltha/adapters/extensions/omci/omci_cc.py
index ac0731d..5f76198 100644
--- a/pyvoltha/adapters/extensions/omci/omci_cc.py
+++ b/pyvoltha/adapters/extensions/omci/omci_cc.py
@@ -41,7 +41,6 @@
DEFAULT_OMCI_TIMEOUT = 10 # 3 # Seconds
MAX_OMCI_REQUEST_AGE = 60 # Seconds
DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
-MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
CONNECTED_KEY = 'connected'
TX_REQUEST_KEY = 'tx-request'
@@ -109,7 +108,6 @@
self._core_proxy = core_proxy
self._adapter_proxy = adapter_proxy
self._device_id = device_id
- self._device = None
self._proxy_address = None
self._enabled = False
self._extended_messaging = False
@@ -285,8 +283,7 @@
def _receive_onu_message(self, rx_frame):
""" Autonomously generated ONU frame Rx handler"""
- self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
- frame=hexify(str(rx_frame)))
+ self.log.debug('rx-onu-frame', frame_type=type(rx_frame))
msg_type = rx_frame.fields['message_type']
self._rx_onu_frames += 1
@@ -338,43 +335,36 @@
# save the current value of the entity_id_to_class_map, then
# replace it with our custom one before decode, and then finally
# restore it later. Tried other ways but really made the code messy.
-
- # entity_classes = [c for c in entity_classes_name_map.itervalues()]
- # entity_id_to_class_map = dict((c.class_id, c) for c in entity_classes)
-
- global entity_id_to_class_map
- saved_me_map = entity_id_to_class_map
- entity_id_to_class_map = self._me_map
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
try:
rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
- rx_tid = rx_frame.fields['transaction_id']
-
- if rx_tid == 0:
- return self._receive_onu_message(rx_frame)
-
- # Previously unreachable if this is the very first Rx or we
- # have been running consecutive errors
- if self._rx_frames == 0 or self._consecutive_errors != 0:
- self.reactor.callLater(0, self._publish_connectivity_event, True)
-
- self._rx_frames += 1
- self._consecutive_errors = 0
except KeyError as e:
# Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
rx_frame = self._decode_unknown_me(msg)
self._rx_unknown_me += 1
- rx_tid = rx_frame.fields.get('transaction_id')
except Exception as e:
self.log.exception('frame-decode', msg=hexlify(msg), e=e)
return
finally:
- global entity_id_to_class_map
- entity_id_to_class_map = saved_me_map # Always restore it.
+ omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
+
+ rx_tid = rx_frame.fields['transaction_id']
+ if rx_tid == 0:
+ return self._receive_onu_message(rx_frame)
+
+ # Previously unreachable if this is the very first round-trip Rx or we
+ # have been running consecutive errors
+ if self._rx_frames == 0 or self._consecutive_errors != 0:
+ self.reactor.callLater(0, self._publish_connectivity_event, True)
+
+ self._rx_frames += 1
+ self._consecutive_errors = 0
try:
high_priority = self._tid_is_high_priority(rx_tid)
@@ -387,17 +377,16 @@
last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
# Possible late Rx on a message that timed-out
self._rx_unknown_tid += 1
- self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
+ self._rx_late += 1
return
ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
if dc is not None and not dc.cancelled and not dc.called:
dc.cancel()
- self.log.debug("cancel-timeout-called")
- secs = self._update_rx_tx_stats(now, ts)
+ _secs = self._update_rx_tx_stats(now, ts)
- # Late arrival?
+ # Late arrival already serviced by a timeout?
if d.called:
self._rx_late += 1
return
@@ -408,113 +397,16 @@
return d.errback(failure.Failure(e))
return
- # Extended processing needed. Note 'data' field will be None on some error
- # status returns
- omci_msg = rx_frame.fields['omci_message']
+ # Publish Rx event to listeners in a different task
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
- if isinstance(omci_msg, OmciGetResponse) and \
- omci_msg.fields.get('data') is not None and \
- 'table_attribute_mask' in omci_msg.fields['data']:
- # Yes, run in a separate generator
- reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
- rx_frame, d, tx_frame, high_priority)
- else:
- # Publish Rx event to listeners in a different task
- reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
-
- # begin success callback chain (will cancel timeout and queue next Tx message)
- from copy import copy
- original_callbacks = copy(d.callbacks)
- self._rx_response[index] = rx_frame
- d.callback(rx_frame)
+ # begin success callback chain (will cancel timeout and queue next Tx message)
+ self._rx_response[index] = rx_frame
+ d.callback(rx_frame)
except Exception as e:
self.log.exception('rx-msg', e=e)
- @inlineCallbacks
- def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
- """
- Special handling for Get Requests that may require additional 'get_next' operations
- if a table attribute was requested.
- """
- omci_msg = rx_frame.fields['omci_message']
- if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
- try:
- entity_class = omci_msg.fields['entity_class']
- entity_id = omci_msg.fields['entity_id']
- table_attributes = omci_msg.fields['data']['table_attribute_mask']
-
- # Table attribute mask is encoded opposite of managed entity mask.
- if entity_class in self._me_map:
- ec = self._me_map[entity_class]
- for index in xrange(16):
- attr_mask = 1 << index
-
- if attr_mask & table_attributes:
- eca = ec.attributes[15-index]
- self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
-
- seq_no = 0
- data_buffer = ''
- count = omci_msg.fields['data'][eca.field.name + '_size']
-
- if count > MAX_TABLE_ROW_COUNT:
- self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
- raise ValueError('Huge Table Size: {}'.format(count))
-
- # Original timeout must be chopped up into each individual get-next request
- # in order for total transaction to complete within the timeframe of the
- # original get() timeout.
- number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
- timeout /= (1 + number_transactions)
-
- # Start the loop
- vals = []
- for offset in xrange(0, count, OmciTableField.PDU_SIZE):
- frame = MEFrame(ec, entity_id, {eca.field.name: seq_no}).get_next()
- seq_no += 1
-
- max_retries = 3
- results = yield self.send(frame, min(timeout / max_retries, secs * 3), max_retries)
-
- omci_getnext_msg = results.fields['omci_message']
- status = omci_getnext_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise Exception('get-next-failure table=' + eca.field.name +
- ' entity_id=' + str(entity_id) +
- ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > OmciTableField.PDU_SIZE:
- num_octets = OmciTableField.PDU_SIZE
-
- data = omci_getnext_msg.fields['data'][eca.field.name]
- data_buffer += data[:num_octets]
-
- while data_buffer:
- data_buffer, val = eca.field.getfield(None, data_buffer)
- vals.append(val)
-
- omci_msg.fields['data'][eca.field.name] = vals
- del omci_msg.fields['data'][eca.field.name + '_size']
- self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
- row_count=len(vals))
- del omci_msg.fields['data']['table_attribute_mask']
-
- except Exception as e:
- self.log.exception('get-next-error', e=e)
- d.errback(failure.Failure(e), high_priority)
- return
-
- # Notify sender of completed request
- reactor.callLater(0, d.callback, rx_frame, high_priority)
-
- # Publish Rx event to listeners in a different task except for internally-consumed get-next-response
- if not isinstance(omci_msg, OmciGetNextResponse):
- reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
-
def _decode_unknown_me(self, msg):
"""
Decode an ME for an unsupported class ID. This should only occur for a subset
@@ -733,6 +625,9 @@
separate deferred (dc) is used on each actual Tx which is not the deferred
(d) that is returned to the caller of the 'send()' method.
+ If the timeout if the transmitted frame was zero, this is just cleanup of
+ that transmit request and not necessarily a transmit timeout
+
:param tx_tid: (int) TID of frame
:param high_priority: (bool) True if high-priority queue
"""
@@ -746,12 +641,18 @@
if frame.fields.get('transaction_id', 0) == tx_tid:
self._tx_request[index] = None
- if retry > 0:
- # Push on front of TX pending queue so that it transmits next with the
- # original TID
- self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
- else:
- d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
+ if timeout > 0:
+ self._rx_timeouts += 1
+
+ if retry > 0:
+ # Push on front of TX pending queue so that it transmits next with the
+ # original TID
+ self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
+
+ elif not d.called:
+ d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
+ else:
+ self.log.warn('timeout-but-not-the-tx-frame') # Statement mainly for debugging
self._send_next_request(high_priority)
@@ -774,6 +675,8 @@
elif self._max_lp_tx_queue < qlen:
self._max_lp_tx_queue = qlen
+ self.log.debug("queue-size", index=index, pending_qlen=qlen)
+
def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
"""
Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
@@ -894,9 +797,8 @@
# save the current value of the entity_id_to_class_map, then
# replace it with our custom one before decode, and then finally
# restore it later. Tried other ways but really made the code messy.
- global entity_id_to_class_map
- saved_me_map = entity_id_to_class_map
- entity_id_to_class_map = self._me_map
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
ts = arrow.utcnow().float_timestamp
try:
@@ -916,17 +818,24 @@
)
finally:
- global entity_id_to_class_map
- entity_id_to_class_map = saved_me_map
+ omci_entities.entity_id_to_class_map = saved_me_map
self._tx_frames += 1
- if timeout > 0:
- # Timeout on internal deferred to support internal retries if requested
- dc = self.reactor.callLater(timeout, self._request_timeout, tx_tid, high_priority)
+ # Note: the 'd' deferred in the queued request we just got will
+ # already have its success callback queued (callLater -> 0) with a
+ # result of "queued". Here we need time it out internally so
+ # we can call cleanup appropriately. G.988 mentions that most ONUs
+ # will process an request in < 1 second.
+ dc_timeout = timeout if timeout > 0 else 1.0
- # (timestamp, defer, frame, timeout, retry, delayedCall)
- self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
+ # Timeout on internal deferred to support internal retries if requested
+ dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
+
+ if timeout > 0:
d.addCallbacks(self._request_success, self._request_failure,
callbackArgs=(high_priority,),
errbackArgs=(tx_tid, high_priority))
@@ -941,6 +850,8 @@
if d is not None:
d.errback(failure.Failure(e))
+ else:
+ self.log.debug("tx-request-occupied", index=index)
###################################################################################
# MIB Action shortcuts
diff --git a/pyvoltha/adapters/extensions/omci/omci_defs.py b/pyvoltha/adapters/extensions/omci/omci_defs.py
index 5e55e37..64fefc5 100644
--- a/pyvoltha/adapters/extensions/omci/omci_defs.py
+++ b/pyvoltha/adapters/extensions/omci/omci_defs.py
@@ -15,7 +15,6 @@
#
from enum import Enum, IntEnum
-
class OmciUninitializedFieldError(Exception):
pass
@@ -23,7 +22,6 @@
class OmciInvalidTypeError(Exception):
pass
-
def bitpos_from_mask(mask, lsb_pos=0, increment=1):
"""
Turn a decimal value (bitmask) into a list of indices where each
diff --git a/pyvoltha/adapters/extensions/omci/omci_entities.py b/pyvoltha/adapters/extensions/omci/omci_entities.py
index c2206de..48b00d7 100644
--- a/pyvoltha/adapters/extensions/omci/omci_entities.py
+++ b/pyvoltha/adapters/extensions/omci/omci_entities.py
@@ -256,7 +256,7 @@
optional=True),
ECA(ByteField("invoke_protection_switch", None), {AA.R, AA.W},
optional=True, range_check=lambda x: 0 <= x <= 3),
- ECA(ByteField("alarm_reporting_control", 0), {AA.R, AA.W},
+ ECA(ByteField("arc", 0), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 1, optional=True, avc=True),
ECA(ByteField("arc_interval", 0), {AA.R, AA.W}, optional=True),
]
@@ -294,7 +294,7 @@
optional=True), # not really mandatory, see spec ITU-T G.988, 9.1.6
ECA(ByteField("total_traffic_scheduler_number", None), {AA.R},
optional=True), # not really mandatory, see spec ITU-T G.988, 9.1.6
- ECA(IntField("power_shed_override", None), {AA.R, AA.W},
+ ECA(IntField("power_sched_override", None), {AA.R, AA.W},
optional=True)
]
mandatory_operations = {OP.Get, OP.Set, OP.Reboot}
@@ -338,23 +338,23 @@
range_check=lambda x: 0 <= x <= 254),
ECA(ByteField("sensed_type", 0), {AA.R}, optional=True, avc=True),
# TODO: For sensed_type AVC, see note in AT&T OMCI Specification, V3.0, page 123
- ECA(ByteField("auto_detection_configuration", 0), {AA.R, AA.W},
+ ECA(ByteField("autodetection_config", 0), {AA.R, AA.W},
range_check=lambda x: x in [0, 1, 2, 3, 4, 5,
0x10, 0x11, 0x12, 0x13, 0x14,
0x20, 0x30], optional=True), # See ITU-T G.988
- ECA(ByteField("ethernet_loopback_configuration", 0), {AA.R, AA.W},
+ ECA(ByteField("ethernet_loopback_config", 0), {AA.R, AA.W},
range_check=lambda x: x in [0, 3]),
ECA(ByteField("administrative_state", 1), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 1),
ECA(ByteField("operational_state", 1), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 1, optional=True, avc=True),
- ECA(ByteField("configuration_ind", 0), {AA.R},
+ ECA(ByteField("config_ind", 0), {AA.R},
range_check=lambda x: x in [0, 1, 2, 3, 4, 0x11, 0x12, 0x13]),
ECA(ShortField("max_frame_size", 1518), {AA.R, AA.W}, optional=True),
- ECA(ByteField("dte_or_dce_ind", 0), {AA.R, AA.W},
+ ECA(ByteField("dte_dce_ind", 0), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 2),
ECA(ShortField("pause_time", 0), {AA.R, AA.W}, optional=True),
- ECA(ByteField("bridged_or_ip_ind", 2), {AA.R, AA.W},
+ ECA(ByteField("bridged_ip_ind", 2), {AA.R, AA.W},
optional=True, range_check=lambda x: 0 <= x <= 2),
ECA(ByteField("arc", 0), {AA.R, AA.W}, optional=True,
range_check=lambda x: 0 <= x <= 1, avc=True),
@@ -511,7 +511,7 @@
ECA(StrFixedLenField("olt_vendor_id", None, 4), {AA.R, AA.W}),
ECA(StrFixedLenField("equipment_id", None, 20), {AA.R, AA.W}),
ECA(StrFixedLenField("version", None, 14), {AA.R, AA.W}),
- ECA(StrFixedLenField("time_of_day_information", None, 14), {AA.R, AA.W})
+ ECA(StrFixedLenField("time_of_day", None, 14), {AA.R, AA.W})
]
mandatory_operations = {OP.Get, OP.Set}
@@ -521,7 +521,7 @@
attributes = [
ECA(ShortField("managed_entity_id", None), {AA.R},
range_check=lambda x: x == 0),
- ECA(ShortField("restore_power_timer_reset_interval", 0),
+ ECA(ShortField("restore_power_time_reset_interval", 0),
{AA.R, AA.W}),
ECA(ShortField("data_class_shedding_interval", 0), {AA.R, AA.W}),
ECA(ShortField("voice_class_shedding_interval", 0), {AA.R, AA.W}),
@@ -684,9 +684,9 @@
ECA(StrFixedLenField("vendor_id", None, 4), {AA.R}),
ECA(StrFixedLenField("version", None, 14), {AA.R}),
ECA(OmciSerialNumberField("serial_number"), {AA.R}),
- ECA(ByteField("traffic_management_option", None), {AA.R},
+ ECA(ByteField("traffic_management_options", None), {AA.R},
range_check=lambda x: 0 <= x <= 2),
- ECA(ByteField("deprecated", 0), {AA.R},
+ ECA(ByteField("vp_vc_cross_connection_option", 0), {AA.R},
optional=True, deprecated=True),
ECA(ByteField("battery_backup", None), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 1),
@@ -742,7 +742,7 @@
range_check=lambda x: 0 <= x <= 1),
ECA(ShortField("total_priority_queue_number", None), {AA.R}),
ECA(ByteField("total_traffic_scheduler_number", None), {AA.R}),
- ECA(ByteField("deprecated", None), {AA.R}, deprecated=True),
+ ECA(ByteField("mode", None), {AA.R}, deprecated=True),
ECA(ShortField("total_gem_port_id_number", None), {AA.R}),
ECA(IntField("sys_uptime", None), {AA.R}),
ECA(BitField("connectivity_capability", None, size=16), {AA.R}),
@@ -762,7 +762,7 @@
attributes = [
ECA(ShortField("managed_entity_id", None), {AA.R}),
ECA(ShortField("alloc_id", None), {AA.R, AA.W}),
- ECA(ByteField("deprecated", 1), {AA.R}, deprecated=True),
+ ECA(ByteField("mode_indicator", 1), {AA.R}, deprecated=True),
ECA(ByteField("policy", None), {AA.R, AA.W},
range_check=lambda x: 0 <= x <= 2),
]
@@ -778,7 +778,8 @@
ECA(ShortField("gem_block_length", None), {AA.R, AA.W}),
ECA(ByteField("piggyback_dba_reporting", None), {AA.R},
range_check=lambda x: 0 <= x <= 4),
- ECA(ByteField("deprecated", None), {AA.R}, deprecated=True),
+ ECA(ByteField("whole_ont_dba_reporting", None), {AA.R},
+ deprecated=True),
ECA(ByteField("sf_threshold", 5), {AA.R, AA.W}),
ECA(ByteField("sd_threshold", 9), {AA.R, AA.W}),
ECA(ByteField("arc", 0), {AA.R, AA.W},
@@ -787,7 +788,7 @@
ECA(ShortField("optical_signal_level", None), {AA.R}),
ECA(ByteField("lower_optical_threshold", 0xFF), {AA.R, AA.W}),
ECA(ByteField("upper_optical_threshold", 0xFF), {AA.R, AA.W}),
- ECA(ShortField("onu_response_time", None), {AA.R}),
+ ECA(ShortField("ont_response_time", None), {AA.R}),
ECA(ShortField("transmit_optical_level", None), {AA.R}),
ECA(ByteField("lower_transmit_power_threshold", 0x81), {AA.R, AA.W}),
ECA(ByteField("upper_transmit_power_threshold", 0x81), {AA.R, AA.W}),
@@ -809,7 +810,8 @@
class_id = 264
attributes = [
ECA(ShortField("managed_entity_id", None), {AA.R}),
- ECA(ShortField("deprecated", None), {AA.R, AA.W}, deprecated=True),
+ ECA(ShortField("configuration_option_status", None), {AA.R, AA.W},
+ deprecated=True),
ECA(ByteField("administrative_state", None), {AA.R, AA.W}),
ECA(ByteField("management_capability", None), {AA.R},
range_check=lambda x: 0 <= x <= 2),
@@ -1160,6 +1162,64 @@
}
+class OmciMeTypeTable(Packet):
+ """
+ OMCI ME Supported Types Table
+ """
+ name = "OmciMeTypeTable"
+ fields_desc = [
+ ShortField("me_type", None)
+ ]
+
+ def to_json(self):
+ return json.dumps(self.fields, separators=(',', ':'))
+
+ @staticmethod
+ def json_from_value(value):
+ data = int(value)
+ temp = OmciMeTypeTable(me_type=data)
+ return json.dumps(temp.fields, separators=(',', ':'))
+
+ def index(self):
+ return '{:04}'.format(self.fields.get('me_type', 0))
+
+ def is_delete(self):
+ return self.fields.get('me_type', 0) == 0
+
+ def delete(self):
+ self.fields['me_type'] = 0
+ return self
+
+
+class OmciMsgTypeTable(Packet):
+ """
+ OMCI Supported Message Types Table
+ """
+ name = "OmciMsgTypeTable"
+ fields_desc = [
+ ByteField("msg_type", None)
+ ]
+
+ def to_json(self):
+ return json.dumps(self.fields, separators=(',', ':'))
+
+ @staticmethod
+ def json_from_value(value):
+ data = int(value)
+ temp = OmciMeTypeTable(me_type=data)
+ return json.dumps(temp.fields, separators=(',', ':'))
+
+ def index(self):
+ return '{:02}'.format(self.fields.get('msg_type', 0))
+
+ def is_delete(self):
+ return self.fields.get('me_type', 0) == 0
+
+ def delete(self):
+ self.fields['me_type'] = 0
+ return self
+
+
class Omci(EntityClass):
class_id = 287
hidden = True
@@ -1167,21 +1227,15 @@
ECA(ShortField("managed_entity_id", None), {AA.R},
range_check=lambda x: x == 0),
- # TODO: Can this be expressed better in SCAPY, probably not?
- # On the initial, Get request for either the me_type or message_type
- # attributes, you will receive a 4 octet value (big endian) that is
- # the number of octets to 'get-next' to fully load the desired
- # attribute. For a basic OMCI formatted message, that will be 29
- # octets per get-request.
- #
- # For the me_type_table, these are 16-bit values (ME Class IDs)
- #
- # For the message_type_table, these are 8-bit values (Actions)
+ ECA(OmciTableField(
+ PacketLenField("me_type_table", None,
+ OmciMeTypeTable, length_from=lambda pkt: 2)),
+ {AA.R}),
- ECA(FieldListField("me_type_table", None, ByteField('', 0),
- count_from=lambda _: 29), {AA.R}),
- ECA(FieldListField("message_type_table", None, ByteField('', 0),
- count_from=lambda _: 29), {AA.R}),
+ ECA(OmciTableField(
+ PacketLenField("message_type_table", None,
+ OmciMsgTypeTable, length_from=lambda pkt: 1)),
+ {AA.R}),
]
mandatory_operations = {OP.Get, OP.GetNext}
diff --git a/pyvoltha/adapters/extensions/omci/omci_fields.py b/pyvoltha/adapters/extensions/omci/omci_fields.py
index 6330d04..09cf465 100644
--- a/pyvoltha/adapters/extensions/omci/omci_fields.py
+++ b/pyvoltha/adapters/extensions/omci/omci_fields.py
@@ -19,7 +19,6 @@
StrFixedLenField, PacketField
from scapy.packet import Raw
-
class FixedLenField(PadField):
"""
This Pad field limits parsing of its content to its size
@@ -166,14 +165,17 @@
def __getattr__(self, attr):
return getattr(self._find_fld(), attr)
+
class OmciSerialNumberField(StrCompoundField):
def __init__(self, name, default=None):
assert default is None or (isinstance(default, str) and len(default) == 12), 'invalid default serial number'
vendor_default = default[0:4] if default is not None else None
vendor_serial_default = default[4:12] if default is not None else None
super(OmciSerialNumberField, self).__init__(name,
- [StrFixedLenField('vendor_id', vendor_default, 4),
- XStrFixedLenField('vendor_serial_number', vendor_serial_default, 4)])
+ [StrFixedLenField('vendor_id', vendor_default, 4),
+ XStrFixedLenField('vendor_serial_number',
+ vendor_serial_default, 4)])
+
class OmciTableField(MultipleTypeField):
def __init__(self, tblfld):
@@ -182,14 +184,14 @@
assert hasattr(tblfld.cls, 'is_delete'), 'No delete() method defined for OmciTableField row object'
super(OmciTableField, self).__init__(
[
- (IntField('table_length', 0), (self.cond_pkt, self.cond_pkt_val)),
- (PadField(StrField('me_type_table', None), OmciTableField.PDU_SIZE),
- (self.cond_pkt2, self.cond_pkt_val2))
+ (IntField('table_length', 0), (self.cond_pkt, self.cond_pkt_val)),
+ (PadField(StrField('me_type_table', None), OmciTableField.PDU_SIZE),
+ (self.cond_pkt2, self.cond_pkt_val2))
], tblfld)
- PDU_SIZE = 29 # Baseline message set raw get-next PDU size
- OmciGetResponseMessageId = 0x29 # Ugh circular dependency
- OmciGetNextResponseMessageId = 0x3a # Ugh circular dependency
+ PDU_SIZE = 29 # Baseline message set raw get-next PDU size
+ OmciGetResponseMessageId = 0x29 # Ugh circular dependency
+ OmciGetNextResponseMessageId = 0x3a # Ugh circular dependency
def cond_pkt(self, pkt):
return pkt is not None and pkt.message_id == self.OmciGetResponseMessageId
@@ -204,8 +206,10 @@
return pkt is not None and pkt.message_id == self.OmciGetNextResponseMessageId
def to_json(self, new_values, old_values_json):
- if not isinstance(new_values, list): new_values = [new_values] # If setting a scalar, augment the old table
- else: old_values_json = None # If setting a vector of new values, erase all old_values
+ if not isinstance(new_values, list):
+ new_values = [new_values] # If setting a scalar, augment the old table
+ else:
+ old_values_json = None # If setting a vector of new values, erase all old_values
key_value_pairs = dict()
@@ -213,6 +217,7 @@
for old in old_table:
index = old.index()
key_value_pairs[index] = old
+
for new in new_values:
index = new.index()
if new.is_delete():
@@ -230,14 +235,19 @@
return str_values
def load_json(self, json_str):
- if json_str is None: json_str = '[]'
+ if json_str is None:
+ json_str = '[]'
+
json_values = json.loads(json_str)
key_value_pairs = dict()
+
for json_value in json_values:
v = self.default.cls(**json_value)
index = v.index()
key_value_pairs[index] = v
+
table = []
for k, v in sorted(key_value_pairs.iteritems()):
table.append(v)
+
return table
\ No newline at end of file
diff --git a/pyvoltha/adapters/extensions/omci/omci_messages.py b/pyvoltha/adapters/extensions/omci/omci_messages.py
index a6bc566..018edf6 100644
--- a/pyvoltha/adapters/extensions/omci/omci_messages.py
+++ b/pyvoltha/adapters/extensions/omci/omci_messages.py
@@ -103,7 +103,7 @@
raise
if isinstance(pkt, OmciGetResponse) and isinstance(fld, OmciTableField):
data[fld.name + '_size'] = value
- table_attribute_mask = table_attribute_mask | (1 << (15 - index))
+ table_attribute_mask = table_attribute_mask | (1 << (16 - index))
else:
data[fld.name] = value
if table_attribute_mask:
@@ -199,7 +199,14 @@
ByteField("success_code", 0),
ShortField("attributes_mask", None),
ConditionalField(
- OmciMaskedData("data"), lambda pkt: pkt.success_code == 0)
+ ShortField("unsupported_attributes_mask", 0),
+ lambda pkt: pkt.success_code == 9),
+ ConditionalField(
+ ShortField("failed_attributes_mask", 0),
+ lambda pkt: pkt.success_code == 9),
+ ConditionalField(
+ OmciMaskedData("data"),
+ lambda pkt: pkt.success_code == 0 or pkt.success_code == 9)
]
diff --git a/pyvoltha/adapters/extensions/omci/onu_configuration.py b/pyvoltha/adapters/extensions/omci/onu_configuration.py
index 710bbca..092f669 100644
--- a/pyvoltha/adapters/extensions/omci/onu_configuration.py
+++ b/pyvoltha/adapters/extensions/omci/onu_configuration.py
@@ -170,7 +170,7 @@
if ontg is None or ATTRIBUTES_KEY not in ontg:
return None
- return ontg[ATTRIBUTES_KEY].get('traffic_management_option')
+ return ontg[ATTRIBUTES_KEY].get('traffic_management_options')
@property
def onu_survival_time(self):
@@ -186,7 +186,7 @@
if ontg is None or ATTRIBUTES_KEY not in ontg:
return None
- return ontg[ATTRIBUTES_KEY].get('onu_survival_time', 0)
+ return ontg[ATTRIBUTES_KEY].get('ont_survival_time', 0)
@property
def equipment_id(self):
@@ -471,15 +471,15 @@
'entity-id': inst,
'expected-type': inst_data[ATTRIBUTES_KEY].get('expected_type', 0),
'sensed-type': inst_data[ATTRIBUTES_KEY].get('sensed_type', 0),
- 'autodetection-config': inst_data[ATTRIBUTES_KEY].get('auto_detection_configuration', 0),
- 'ethernet-loopback-config': inst_data[ATTRIBUTES_KEY].get('ethernet_loopback_configuration', 0),
+ 'autodetection-config': inst_data[ATTRIBUTES_KEY].get('autodetection_config', 0),
+ 'ethernet-loopback-config': inst_data[ATTRIBUTES_KEY].get('ethernet_loopback_config', 0),
'administrative-state': inst_data[ATTRIBUTES_KEY].get('administrative_state', 0),
'operational-state': inst_data[ATTRIBUTES_KEY].get('operational_state', 0),
- 'config-ind': inst_data[ATTRIBUTES_KEY].get('configuration_ind', 0),
+ 'config-ind': inst_data[ATTRIBUTES_KEY].get('config_ind', 0),
'max-frame-size': inst_data[ATTRIBUTES_KEY].get('max_frame_size', 0),
- 'dte-dce-ind': inst_data[ATTRIBUTES_KEY].get('dte_or_dce_ind', 0),
+ 'dte-dce-ind': inst_data[ATTRIBUTES_KEY].get('dte_dce_ind', 0),
'pause-time': inst_data[ATTRIBUTES_KEY].get('pause_time', 0),
- 'bridged-ip-ind': inst_data[ATTRIBUTES_KEY].get('bridged_or_ip_ind', 0),
+ 'bridged-ip-ind': inst_data[ATTRIBUTES_KEY].get('bridged_ip_ind', 0),
'arc': inst_data[ATTRIBUTES_KEY].get('arc', 0),
'arc-interval': inst_data[ATTRIBUTES_KEY].get('arc_interval', 0),
'pppoe-filter': inst_data[ATTRIBUTES_KEY].get('ppoe_filter', 0),
diff --git a/pyvoltha/adapters/extensions/omci/state_machines/mib_sync.py b/pyvoltha/adapters/extensions/omci/state_machines/mib_sync.py
index 0445324..3f18aa4 100644
--- a/pyvoltha/adapters/extensions/omci/state_machines/mib_sync.py
+++ b/pyvoltha/adapters/extensions/omci/state_machines/mib_sync.py
@@ -25,7 +25,7 @@
RX_RESPONSE_KEY
from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, OnuDeviceEntry, \
SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
-from pyvoltha.adapters.extensions.omci.omci_entities import OntData
+from pyvoltha.adapters.extensions.omci.omci_entities import OntData, Omci
from pyvoltha.common.event_bus import EventBusClient
from voltha_protos.omci_mib_db_pb2 import OpenOmciEventType
@@ -667,11 +667,11 @@
class_id = omci_msg['object_entity_class']
entity_id = omci_msg['object_entity_id']
- # Filter out the 'mib_data_sync' from the database. We save that at
- # the device level and do not want it showing up during a re-sync
- # during data compares
+ # Filter out the 'mib_data_sync' and 'omci' from the database. We save
+ # that at the device level and do not want it showing up during a
+ # re-sync during data compares
- if class_id == OntData.class_id:
+ if class_id in {OntData.class_id, Omci.class_id}:
return
attributes = {k: v for k, v in omci_msg['object_data'].items()}
@@ -855,11 +855,6 @@
pass # NOP
except Exception as e:
self.log.exception('set', e=e)
-
- # TODO: Future -> Monitor Software download start, section, activate, and commit responses
- # and increment MIB Data Sync per Table 11.2.2-1 of ITUT-T G.988 (11/2017)
- # on page 515. Eventually also monitor set-table responses once the
- # extended message set is supported.
def on_capabilities_event(self, _topic, msg):
"""
Process a OMCI capabilties event
diff --git a/pyvoltha/adapters/extensions/omci/state_machines/performance_intervals.py b/pyvoltha/adapters/extensions/omci/state_machines/performance_intervals.py
index cfdbafa..70be6d8 100644
--- a/pyvoltha/adapters/extensions/omci/state_machines/performance_intervals.py
+++ b/pyvoltha/adapters/extensions/omci/state_machines/performance_intervals.py
@@ -64,15 +64,12 @@
{'trigger': 'add_me', 'source': 'idle', 'dest': 'create_pm_me'},
{'trigger': 'delete_me', 'source': 'idle', 'dest': 'delete_pm_me'},
- # TODO: Can these be combined into one?
{'trigger': 'success', 'source': 'create_pm_me', 'dest': 'idle'},
{'trigger': 'failure', 'source': 'create_pm_me', 'dest': 'idle'},
- # TODO: Can these be combined into one?
{'trigger': 'success', 'source': 'delete_pm_me', 'dest': 'idle'},
{'trigger': 'failure', 'source': 'delete_pm_me', 'dest': 'idle'},
- # TODO: Can these be combined into one?
{'trigger': 'success', 'source': 'collect_data', 'dest': 'idle'},
{'trigger': 'failure', 'source': 'collect_data', 'dest': 'idle'},
diff --git a/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py b/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
index e163f23..70c01e9 100644
--- a/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
+++ b/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
@@ -18,7 +18,7 @@
from twisted.internet.defer import failure, inlineCallbacks, TimeoutError, returnValue
from pyvoltha.adapters.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from pyvoltha.adapters.extensions.omci.omci_me import MEFrame
-from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame
+from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame, OmciGetNext
from pyvoltha.adapters.extensions.omci.omci_cc import DEFAULT_OMCI_TIMEOUT
from pyvoltha.adapters.extensions.omci.omci_messages import OmciGet
from pyvoltha.adapters.extensions.omci.omci_fields import OmciTableField
@@ -47,9 +47,10 @@
"""
task_priority = 128
name = "ONU OMCI Get Task"
+ MAX_TABLE_SIZE = 16 * 1024 # Keep get-next logic reasonable
def __init__(self, omci_agent, device_id, entity_class, entity_id, attributes,
- exclusive=False, allow_failure=False):
+ exclusive=True, allow_failure=False):
"""
Class initialization
@@ -59,7 +60,7 @@
:param entity_id: (int) ME Class instance ID to retrieve
:param attributes: (list or set) Name of attributes to retrieve
:param exclusive: (bool) True if this GET request Task exclusively own the
- OMCI-CC while running. Default: False
+ OMCI-CC while running. Default: True
:param allow_failure: (bool) If true, attempt to get all valid attributes
if the original request receives an error
code of 9 (Attributes failed or unknown).
@@ -145,6 +146,11 @@
"""
return self._failed_or_unknown_attributes
+ def is_table_attr(self, attr):
+ index = self._entity_class.attribute_name_to_index_map[attr]
+ attr_def = self._entity_class.attributes[index]
+ return isinstance(attr_def.field, OmciTableField)
+
@inlineCallbacks
def perform_get_omci(self):
"""
@@ -154,61 +160,62 @@
entity_id=self._entity_id, attributes=self._attributes)
try:
# If one or more attributes is a table attribute, get it separately
- def is_table_attr(attr):
- index = self._entity_class.attribute_name_to_index_map[attr]
- attr_def = self._entity_class.attributes[index]
- return isinstance(attr_def.field, OmciTableField)
- first_attributes = {attr for attr in self._attributes if not is_table_attr(attr)}
- table_attributes = {attr for attr in self._attributes if is_table_attr(attr)}
+ first_attributes = {attr for attr in self._attributes if not self.is_table_attr(attr)}
+ table_attributes = {attr for attr in self._attributes if self.is_table_attr(attr)}
- frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ if len(first_attributes):
+ frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
- status = results.fields['omci_message'].fields['success_code']
- self.log.debug('perform-get-status', status=status)
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-get-status', status=status)
- # Success?
- if status == RC.Success.value:
- self._results = results
- results_omci = results.fields['omci_message'].fields
+ if status == RC.AttributeFailure.value:
+ # What failed? Note if only one attribute was attempted, then
+ # that is an overall failure
- # Were all attributes fetched?
- missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
- results_omci['attributes_mask']
+ if not self._allow_failure or len(self._attributes) <= 1:
+ raise GetException('Get failed with status code: {}'.
+ format(RC.AttributeFailure.value))
- if missing_attr > 0 or len(table_attributes) > 0:
- self.log.info('perform-get-missing', num_missing=missing_attr,
- table_attr=table_attributes)
self.strobe_watchdog()
- self._local_deferred = reactor.callLater(0,
- self.perform_get_missing_attributes,
- missing_attr,
- table_attributes)
- returnValue(self._local_deferred)
+ # TODO: update failed & unknown attributes set
- elif status == RC.AttributeFailure.value:
- # What failed? Note if only one attribute was attempted, then
- # that is an overall failure
+ # Success?
+ if status in {RC.Success.value, RC.AttributeFailure.value}:
+ self._results = results
+ results_omci = results.fields['omci_message'].fields
- if not self._allow_failure or len(self._attributes) <= 1:
- raise GetException('Get failed with status code: {}'.
- format(RC.AttributeFailure.value))
+ # Were all attributes fetched?
+ missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
+ results_omci['attributes_mask']
+ if missing_attr > 0 or len(table_attributes) > 0:
+ self.log.info('perform-get-missing', num_missing=missing_attr,
+ table_attr=table_attributes)
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_missing_attributes,
+ missing_attr, table_attributes)
+ returnValue(self._local_deferred)
+
+ else:
+ raise GetException('Get failed with status code: {}'.format(status))
+
+ self.log.debug('get-completed')
+ self.deferred.callback(self)
+
+ elif len(table_attributes) > 0:
+ # Here if only table attributes were requested
+ self.log.info('perform-get-table', table_attr=table_attributes)
self.strobe_watchdog()
self._local_deferred = reactor.callLater(0,
- self.perform_get_failed_attributes,
- results,
- self._attributes)
+ self.process_get_table,
+ table_attributes)
returnValue(self._local_deferred)
- else:
- raise GetException('Get failed with status code: {}'.format(status))
-
- self.log.debug('get-completed')
- self.deferred.callback(self)
-
except TimeoutError as e:
self.deferred.errback(failure.Failure(e))
@@ -222,11 +229,14 @@
"""
This method is called when the original Get requests completes with success
but not all attributes were returned. This can happen if one or more of the
- attributes would have exceeded the space available in the OMCI frame.
+ attributes would have exceeded the space available in the OMCI frame or if
+ one of the attributes is a table.
This routine iterates through the missing attributes and attempts to retrieve
the ones that were missing.
+ Once missing attributes are recovered, the table attributes are requested
+
:param missing_attr: (int) Missing attributes bitmask
:param table_attributes: (set) Attributes that need table get/get-next support
"""
@@ -235,28 +245,35 @@
# Retrieve missing attributes first (if any)
results_omci = self._results.fields['omci_message'].fields
- for index in xrange(16):
- attr_mask = 1 << index
+ try:
+ # Get remaining attributes one at a time
+ for index in range(15, 1, -1):
+ attr_mask = 1 << index
- if attr_mask & missing_attr:
- # Get this attribute
- frame = OmciFrame(
- transaction_id=None, # OMCI-CC will set
- message_type=OmciGet.message_id,
- omci_message=OmciGet(
- entity_class=self._entity_class.class_id,
- entity_id=self._entity_id,
- attributes_mask=attr_mask
+ if attr_mask & missing_attr:
+ # Get this attribute
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
)
- )
- try:
self.strobe_watchdog()
get_results = yield self._device.omci_cc.send(frame)
get_omci = get_results.fields['omci_message'].fields
- if get_omci['success_code'] != RC.Success.value:
+ status = get_omci['success_code']
+
+ if status == RC.AttributeFailure.value:
+ # TODO: update failed & unknown attributes set
continue
+ elif status != RC.Success.value:
+ raise GetException('Get failed with status code: {}'.format(status))
+
assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
results_omci['attributes_mask'] |= attr_mask
@@ -265,92 +282,127 @@
results_omci['data'].update(get_omci['data'])
- except TimeoutError:
- self.log.debug('missing-timeout')
+ except TimeoutError as e:
+ self.log.debug('missing-timeout')
+ self.deferred.errback(failure.Failure(e))
- except Exception as e:
- self.log.exception('missing-failure', e=e)
+ except Exception as e:
+ self.log.exception('missing-failure', class_id=self._entity_class.class_id,
+ entity_id=self._entity_id, e=e)
+ self.deferred.errback(failure.Failure(e))
- # Now any table attributes. OMCI_CC handles background get/get-next sequencing
- for tbl_attr in table_attributes:
- attr_mask = self._entity_class.mask_for(tbl_attr)
- frame = OmciFrame(
- transaction_id=None, # OMCI-CC will set
- message_type=OmciGet.message_id,
- omci_message=OmciGet(
- entity_class=self._entity_class.class_id,
- entity_id=self._entity_id,
- attributes_mask=attr_mask
- )
- )
- try:
- timeout = 2 * DEFAULT_OMCI_TIMEOUT # Multiple frames expected
- self.strobe_watchdog()
- get_results = yield self._device.omci_cc.send(frame,
- timeout=timeout)
- self.strobe_watchdog()
- get_omci = get_results.fields['omci_message'].fields
- if get_omci['success_code'] != RC.Success.value:
- continue
-
- if results_omci.get('data') is None:
- results_omci['data'] = dict()
-
- results_omci['data'].update(get_omci['data'])
-
- except TimeoutError:
- self.log.debug('tbl-attr-timeout')
-
- except Exception as e:
- self.log.exception('tbl-attr-timeout', e=e)
+ # Now any table attributes
+ if len(table_attributes):
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.process_get_table,
+ table_attributes)
+ returnValue(self._local_deferred)
self.deferred.callback(self)
@inlineCallbacks
- def perform_get_failed_attributes(self, tmp_results, attributes):
+ def process_get_table(self, table_attributes):
"""
-
- :param tmp_results:
- :param attributes:
- :return:
+ Special handling for Get Requests that may require additional 'get_next' operations
+ if a table attribute was requested.
"""
- self.log.debug('perform-get-failed', attrs=attributes)
+ # Retrieve attributes retrieved so far so we can add to them
+ try:
+ results_omci = self._results.fields['omci_message'].fields if self._results is not None else {}
- for attr in attributes:
- try:
- frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
+ for tbl_attr in table_attributes:
+ attr_mask = self._entity_class.mask_for(tbl_attr)
+ attr_index = self._entity_class.attribute_indices_from_mask(attr_mask)[0]
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ # First get will retrieve the size
+ get_results = yield self._device.omci_cc.send(frame)
self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
- status = results.fields['omci_message'].fields['success_code']
+ if self._results is None:
+ self._results = get_results
+ results_omci = self._results.fields['omci_message'].fields
- if status == RC.AttributeFailure.value:
- self.log.debug('unknown-or-invalid-attribute', attr=attr, status=status)
- self._failed_or_unknown_attributes.add(attr)
+ omci_fields = get_results.fields['omci_message'].fields
+ if omci_fields['success_code'] == RC.AttributeFailure.value:
+ # Copy over any failed or unsupported attribute masks for final result
+ results_fields = results_omci.fields['omci_message'].fields
+ results_fields['unsupported_attributes_mask'] |= omci_fields['unsupported_attributes_mask']
+ results_fields['failed_attributes_mask'] |= omci_fields['failed_attributes_mask']
- elif status != RC.Success.value:
- self.log.warn('invalid-get', class_id=self._entity_class,
- attribute=attr, status=status)
- self._failed_or_unknown_attributes.add(attr)
+ if omci_fields['success_code'] != RC.Success.value:
+ raise GetException('Get table attribute failed with status code: {}'.
+ format(omci_fields['success_code']))
- else:
- # Add to partial results and correct the status
- tmp_results.fields['omci_message'].fields['success_code'] = status
- tmp_results.fields['omci_message'].fields['attributes_mask'] |= \
- results.fields['omci_message'].fields['attributes_mask']
+ eca = self._entity_class.attributes[attr_index]
- if tmp_results.fields['omci_message'].fields.get('data') is None:
- tmp_results.fields['omci_message'].fields['data'] = dict()
+ self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
+ attr_size = omci_fields['data'][eca.field.name + '_size']
- tmp_results.fields['omci_message'].fields['data'][attr] = \
- results.fields['omci_message'].fields['data'][attr]
+ if attr_size > self.MAX_TABLE_SIZE:
+ self.log.error('omcc-get-table-huge', count=attr_size, name=eca.field.name)
+ raise ValueError('Huge Table Size: {}'.format(attr_size))
- except TimeoutError as e:
- self.log.debug('attr-timeout')
+ # Start the loop
+ seq_no = 0
+ data_buffer = ''
- except Exception as e:
- self.log.exception('attr-failure', e=e)
+ for offset in xrange(0, attr_size, OmciTableField.PDU_SIZE):
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGetNext.message_id,
+ omci_message=OmciGetNext(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask,
+ command_sequence_number=seq_no
+ )
+ )
+ get_results = yield self._device.omci_cc.send(frame)
- self._results = tmp_results
- self.deferred.callback(self)
+ omci_fields = get_results.fields['omci_message'].fields
+ status = omci_fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise Exception('get-next-failure table=' + eca.field.name +
+ ' entity_id=' + str(self._entity_id) +
+ ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
+
+ # Extract the data
+ num_octets = attr_size - offset
+ if num_octets > OmciTableField.PDU_SIZE:
+ num_octets = OmciTableField.PDU_SIZE
+
+ data = omci_fields['data'][eca.field.name]
+ data_buffer += data[:num_octets]
+ seq_no += 1
+
+ vals = []
+ while data_buffer:
+ data_buffer, val = eca.field.getfield(None, data_buffer)
+ vals.append(val)
+
+ # Save off the retrieved data
+ results_omci['attributes_mask'] |= attr_mask
+ results_omci['data'][eca.field.name] = vals
+
+ self.deferred.callback(self)
+
+ except TimeoutError as e:
+ self.log.debug('tbl-attr-timeout')
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('tbl-attr-timeout', class_id=self._entity_class.class_id,
+ entity_id=self._entity_id, e=e)
+ self.deferred.errback(failure.Failure(e))
+
diff --git a/pyvoltha/adapters/extensions/omci/tasks/onu_capabilities_task.py b/pyvoltha/adapters/extensions/omci/tasks/onu_capabilities_task.py
index e856ddc..9b7c5ab 100644
--- a/pyvoltha/adapters/extensions/omci/tasks/onu_capabilities_task.py
+++ b/pyvoltha/adapters/extensions/omci/tasks/onu_capabilities_task.py
@@ -18,8 +18,9 @@
from twisted.internet.defer import inlineCallbacks, failure, returnValue
from twisted.internet import reactor
from pyvoltha.adapters.extensions.omci.omci_defs import ReasonCodes
-from pyvoltha.adapters.extensions.omci.omci_me import OmciFrame
+from pyvoltha.adapters.extensions.omci.omci_me import OmciFrame, Omci
from pyvoltha.adapters.extensions.omci.omci import EntityOperations
+from pyvoltha.adapters.extensions.omci.tasks.omci_get_request import OmciGetRequest
class GetNextException(Exception):
@@ -66,7 +67,9 @@
super(OnuCapabilitiesTask, self).__init__(OnuCapabilitiesTask.name,
omci_agent,
device_id,
- priority=OnuCapabilitiesTask.task_priority)
+ exclusive=False,
+ priority=OnuCapabilitiesTask.task_priority,
+ watchdog_timeout=2*Task.DEFAULT_WATCHDOG_SECS)
self._local_deferred = None
self._device = omci_agent.get_device(device_id)
self._pdu_size = omci_pdu_size
@@ -139,9 +142,9 @@
try:
self.strobe_watchdog()
self._supported_entities = yield self.get_supported_entities()
-
self.strobe_watchdog()
self._supported_msg_types = yield self.get_supported_message_types()
+ self.strobe_watchdog()
self.log.debug('get-success',
supported_entities=self.supported_managed_entities,
@@ -166,57 +169,24 @@
@inlineCallbacks
def get_supported_entities(self):
"""
- Get the supported ME Types for this ONU.
+ Get the supported Message Types (actions) for this ONU.
"""
try:
- # Get the number of requests needed
- frame = OmciFrame(me_type_table=True).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ # Use the GetRequest Task to perform table retrieval
+ get_request = OmciGetRequest(self.omci_agent, self.device_id, Omci, 0,
+ ["me_type_table"], exclusive=False)
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
+ results = yield self._device.task_runner.queue_task(get_request)
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure('Get count of supported entities failed with status code: {}'.
- format(status))
- data = omci_msg.fields['data']['me_type_table']
- count = self.get_count_from_data_buffer(bytearray(data))
+ if results.success_code != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get supported managed entities table failed with status code: {}'.
+ format(results.success_code))
- seq_no = 0
- data_buffer = bytearray(0)
- self.log.debug('me-type-count', octets=count, data=hexlify(data))
-
- # Start the loop
- for offset in xrange(0, count, self._pdu_size):
- frame = OmciFrame(me_type_table=seq_no).get_next()
- seq_no += 1
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
-
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure(
- 'Get supported entities request at offset {} of {} failed with status code: {}'.
- format(offset + 1, count, status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > self._pdu_size:
- num_octets = self._pdu_size
-
- data = omci_msg.fields['data']['me_type_table']
- data_buffer += bytearray(data[:num_octets])
-
- me_types = {(data_buffer[x] << 8) + data_buffer[x + 1]
- for x in xrange(0, len(data_buffer), 2)}
- returnValue(me_types)
+ returnValue({attr.fields['me_type'] for attr in results.attributes['me_type_table']})
except Exception as e:
self.log.exception('get-entities', e=e)
- self.deferred.errback(failure.Failure(e))
+ raise
@inlineCallbacks
def get_supported_message_types(self):
@@ -224,59 +194,18 @@
Get the supported Message Types (actions) for this ONU.
"""
try:
- # Get the number of requests needed
- frame = OmciFrame(message_type_table=True).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ # Use the GetRequest Task to perform table retrieval
+ get_request = OmciGetRequest(self.omci_agent, self.device_id, Omci, 0,
+ ["message_type_table"], exclusive=False)
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
+ results = yield self._device.task_runner.queue_task(get_request)
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure('Get count of supported msg types failed with status code: {}'.
- format(status))
+ if results.success_code != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get supported msg types table failed with status code: {}'.
+ format(results.success_code))
- data = omci_msg.fields['data']['message_type_table']
- count = self.get_count_from_data_buffer(bytearray(data))
-
- seq_no = 0
- data_buffer = list()
- self.log.debug('me-type-count', octets=count, data=hexlify(data))
-
- # Start the loop
- for offset in xrange(0, count, self._pdu_size):
- frame = OmciFrame(message_type_table=seq_no).get_next()
- seq_no += 1
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
-
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure(
- 'Get supported msg types request at offset {} of {} failed with status code: {}'.
- format(offset + 1, count, status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > self._pdu_size:
- num_octets = self._pdu_size
-
- data = omci_msg.fields['data']['message_type_table']
- data_buffer += data[:num_octets]
-
- def buffer_to_message_type(value):
- """
- Convert an integer value to the appropriate EntityOperations enumeration
- :param value: (int) Message type value (4..29)
- :return: (EntityOperations) Enumeration, None on failure
- """
- next((v for k, v in EntityOperations.__members__.items() if v.value == value), None)
-
- msg_types = {buffer_to_message_type(v) for v in data_buffer if v is not None}
- returnValue({msg_type for msg_type in msg_types if msg_type is not None})
+ returnValue({attr.fields['msg_type'] for attr in results.attributes['message_type_table']})
except Exception as e:
self.log.exception('get-msg-types', e=e)
- self.deferred.errback(failure.Failure(e))
+ raise