VOL-1336: GetRequest task handles table attribute retrival instead of OMCI_CC
Change-Id: Ie948b0603b49a2faed56039f9861a42cc409ec1f
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index cf8975c..ea9d926 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -625,7 +625,7 @@
# Note: After successful frame decode, a lookup of the corresponding request by
# TID is performed. None should be found, so we should see the Rx Unknown TID
# increment.
- self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'] + 1)
self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'] + 1)
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1)
self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
@@ -668,27 +668,28 @@
self.assertIsNotNone(decoded_blob)
self.assertEqual(decoded_blob, blob)
- def test_flush(self):
- # Test flush of autonomous ONU queues
+ def test_rx_unknown_me_avc(self):
+ # ME without a known decoder but is and attribute value change
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
- # TODO: add more
- self.assertTrue(True) # TODO: Implement
+ msg = '0000110aff78000080000e000000' \
+ '00000000000000000000000000000000000000000000000000000' \
+ '00000028'
- def test_avc_rx(self):
- # Test flush of autonomous ONU queues
- self.setup_one_of_each()
+ omci_cc.receive_message(hex2raw(msg))
- omci_cc = self.onu_handler.omci_cc
- omci_cc.enabled = True
- snapshot = self._snapshot_stats()
-
- # TODO: add more
- self.assertTrue(True) # TODO: Implement
+ # Blob decode should work and then it should be passed off to the
+ # ONU Autonomous frame processor
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'] + 1)
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1)
+ self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'])
+ self.assertEqual(omci_cc.consecutive_errors, 0)
def test_rx_discard_if_disabled(self):
# ME without a known decoder
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 7d4d304..29711ef 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -39,7 +39,6 @@
DEFAULT_OMCI_TIMEOUT = 3 # Seconds
MAX_OMCI_REQUEST_AGE = 60 # Seconds
DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
-MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
CONNECTED_KEY = 'connected'
TX_REQUEST_KEY = 'tx-request'
@@ -118,7 +117,6 @@
# Support 2 levels of priority since only baseline message set supported
self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
- self._tx_request_deferred = [None, None] # Tx in progress but held till child Rx/TX can finish. ie omci tables.
self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
self._rx_response = [None, None]
@@ -281,8 +279,7 @@
def _receive_onu_message(self, rx_frame):
""" Autonomously generated ONU frame Rx handler"""
- self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
- frame=hexify(str(rx_frame)))
+ self.log.debug('rx-onu-frame', frame_type=type(rx_frame))
msg_type = rx_frame.fields['message_type']
self._rx_onu_frames += 1
@@ -339,25 +336,12 @@
try:
rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
- rx_tid = rx_frame.fields['transaction_id']
-
- if rx_tid == 0:
- return self._receive_onu_message(rx_frame)
-
- # Previously unreachable if this is the very first Rx or we
- # have been running consecutive errors
- if self._rx_frames == 0 or self._consecutive_errors != 0:
- self.reactor.callLater(0, self._publish_connectivity_event, True)
-
- self._rx_frames += 1
- self._consecutive_errors = 0
except KeyError as e:
# Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
rx_frame = self._decode_unknown_me(msg)
self._rx_unknown_me += 1
- rx_tid = rx_frame.fields.get('transaction_id')
except Exception as e:
self.log.exception('frame-decode', msg=hexlify(msg), e=e)
@@ -366,6 +350,18 @@
finally:
omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
+ rx_tid = rx_frame.fields['transaction_id']
+ if rx_tid == 0:
+ return self._receive_onu_message(rx_frame)
+
+ # Previously unreachable if this is the very first round-trip Rx or we
+ # have been running consecutive errors
+ if self._rx_frames == 0 or self._consecutive_errors != 0:
+ self.reactor.callLater(0, self._publish_connectivity_event, True)
+
+ self._rx_frames += 1
+ self._consecutive_errors = 0
+
try:
high_priority = self._tid_is_high_priority(rx_tid)
index = self._get_priority_index(high_priority)
@@ -377,15 +373,14 @@
last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
# Possible late Rx on a message that timed-out
self._rx_unknown_tid += 1
- #self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
- #return
+ self._rx_late += 1
+ return
ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
if dc is not None and not dc.cancelled and not dc.called:
dc.cancel()
- # self.log.debug("cancel-timeout-called")
- secs = self._update_rx_tx_stats(now, ts)
+ _secs = self._update_rx_tx_stats(now, ts)
# Late arrival already serviced by a timeout?
if d.called:
@@ -398,138 +393,16 @@
return d.errback(failure.Failure(e))
return
- # Extended processing needed. Note 'data' field will be None on some error
- # status returns
- omci_msg = rx_frame.fields['omci_message']
+ # Publish Rx event to listeners in a different task
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
- if isinstance(omci_msg, OmciGetResponse) and \
- omci_msg.fields.get('data') is not None and \
- 'table_attribute_mask' in omci_msg.fields['data']:
- # Yes, run in a separate generator
- reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
- rx_frame, d, tx_frame, high_priority)
- else:
- # Publish Rx event to listeners in a different task
- reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
-
- # begin success callback chain (will cancel timeout and queue next Tx message)
- from copy import copy
- original_callbacks = copy(d.callbacks)
- self._rx_response[index] = rx_frame
- d.callback(rx_frame)
+ # begin success callback chain (will cancel timeout and queue next Tx message)
+ self._rx_response[index] = rx_frame
+ d.callback(rx_frame)
except Exception as e:
self.log.exception('rx-msg', e=e)
- @inlineCallbacks
- def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
- """
- Special handling for Get Requests that may require additional 'get_next' operations
- if a table attribute was requested.
- """
- omci_msg = rx_frame.fields['omci_message']
- rx_tid = rx_frame.fields.get('transaction_id')
- high_priority = self._tid_is_high_priority(rx_tid)
- frame_index = self._get_priority_index(high_priority)
-
- if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
-
- # save tx request for later so that below send/recv can finish
- self._tx_request_deferred[frame_index] = self._tx_request[frame_index]
- self._tx_request[frame_index] = None
-
- try:
- entity_class = omci_msg.fields['entity_class']
- entity_id = omci_msg.fields['entity_id']
- table_attributes = omci_msg.fields['data']['table_attribute_mask']
-
- # Table attribute mask is encoded opposite of managed entity mask.
- if entity_class in self._me_map:
- ec = self._me_map[entity_class]
- for index in xrange(1, len(ec.attributes) + 1):
- attr_mask = 1 << index
-
- if attr_mask & table_attributes:
- self.log.debug('omcc-get-table-ec', ec=ec, index=index, attr_mask=attr_mask,
- table_attributes=table_attributes)
- eca = ec.attributes[index]
- self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
-
- seq_no = 0
- data_buffer = ''
- count = omci_msg.fields['data'][eca.field.name + '_size']
-
- if count > MAX_TABLE_ROW_COUNT:
- self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
- raise ValueError('Huge Table Size: {}'.format(count))
-
- # Original timeout must be chopped up into each individual get-next request
- # in order for total transaction to complete within the timeframe of the
- # original get() timeout.
- number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
- timeout /= (1 + number_transactions)
-
- # Start the loop
- vals = []
- for offset in xrange(0, count, OmciTableField.PDU_SIZE):
- frame = MEFrame(ec, entity_id, {eca.field.name: seq_no}).get_next()
- seq_no += 1
-
- max_retries = 3
- results = yield self.send(frame, min(timeout / max_retries, secs * 3), max_retries)
-
- omci_getnext_msg = results.fields['omci_message']
- status = omci_getnext_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise Exception('get-next-failure table=' + eca.field.name +
- ' entity_id=' + str(entity_id) +
- ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > OmciTableField.PDU_SIZE:
- num_octets = OmciTableField.PDU_SIZE
-
- data = omci_getnext_msg.fields['data'][eca.field.name]
- data_buffer += data[:num_octets]
-
- while data_buffer:
- data_buffer, val = eca.field.getfield(None, data_buffer)
- vals.append(val)
-
- omci_msg.fields['data'][eca.field.name] = vals
- del omci_msg.fields['data'][eca.field.name + '_size']
- self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
- row_count=len(vals))
- del omci_msg.fields['data']['table_attribute_mask']
-
- except Exception as e:
- self.log.exception('get-next-error', e=e)
- self._tx_request_deferred[frame_index] = None
- d.errback(failure.Failure(e), high_priority)
- return
-
- except IndexError as e:
- self.log.exception('get-next-index-error', e=e)
- self._tx_request_deferred[frame_index] = None
- d.errback(failure.Failure(e), high_priority)
- return
-
- # Put it back so the outer Rx/Tx can finish
- self._tx_request[frame_index] = self._tx_request_deferred[frame_index]
- self._tx_request_deferred[frame_index] = None
-
- # Publish Rx event to listeners in a different task
- if not isinstance(omci_msg, OmciGetNextResponse):
- reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
-
- from copy import copy
- original_callbacks = copy(d.callbacks)
- self._rx_response[frame_index] = rx_frame
- d.callback(rx_frame)
- self.log.debug("finished-processing-get-rx-frame")
-
def _decode_unknown_me(self, msg):
"""
Decode an ME for an unsupported class ID. This should only occur for a subset
diff --git a/voltha/extensions/omci/omci_entities.py b/voltha/extensions/omci/omci_entities.py
index 3968224..d8dac81 100644
--- a/voltha/extensions/omci/omci_entities.py
+++ b/voltha/extensions/omci/omci_entities.py
@@ -28,6 +28,7 @@
from voltha.extensions.omci.omci_fields import OmciSerialNumberField, OmciTableField
from voltha.extensions.omci.omci_defs import bitpos_from_mask
+
class EntityClassAttribute(object):
def __init__(self, fld, access=set(), optional=False, range_check=None,
@@ -650,6 +651,7 @@
self.fields['treatment_inner_tpid_de'] = 0x7
return self
+
class ExtendedVlanTaggingOperationConfigurationData(EntityClass):
class_id = 171
attributes = [
@@ -1160,6 +1162,64 @@
}
+class OmciMeTypeTable(Packet):
+ """
+ OMCI ME Supported Types Table
+ """
+ name = "OmciMeTypeTable"
+ fields_desc = [
+ ShortField("me_type", None)
+ ]
+
+ def to_json(self):
+ return json.dumps(self.fields, separators=(',', ':'))
+
+ @staticmethod
+ def json_from_value(value):
+ data = int(value)
+ temp = OmciMeTypeTable(me_type=data)
+ return json.dumps(temp.fields, separators=(',', ':'))
+
+ def index(self):
+ return '{:04}'.format(self.fields.get('me_type', 0))
+
+ def is_delete(self):
+ return self.fields.get('me_type', 0) == 0
+
+ def delete(self):
+ self.fields['me_type'] = 0
+ return self
+
+
+class OmciMsgTypeTable(Packet):
+ """
+ OMCI Supported Message Types Table
+ """
+ name = "OmciMsgTypeTable"
+ fields_desc = [
+ ByteField("msg_type", None)
+ ]
+
+ def to_json(self):
+ return json.dumps(self.fields, separators=(',', ':'))
+
+ @staticmethod
+ def json_from_value(value):
+ data = int(value)
+ temp = OmciMeTypeTable(me_type=data)
+ return json.dumps(temp.fields, separators=(',', ':'))
+
+ def index(self):
+ return '{:02}'.format(self.fields.get('msg_type', 0))
+
+ def is_delete(self):
+ return self.fields.get('me_type', 0) == 0
+
+ def delete(self):
+ self.fields['me_type'] = 0
+ return self
+
+
class Omci(EntityClass):
class_id = 287
hidden = True
@@ -1167,21 +1227,15 @@
ECA(ShortField("managed_entity_id", None), {AA.R},
range_check=lambda x: x == 0),
- # TODO: Can this be expressed better in SCAPY, probably not?
- # On the initial, Get request for either the me_type or message_type
- # attributes, you will receive a 4 octet value (big endian) that is
- # the number of octets to 'get-next' to fully load the desired
- # attribute. For a basic OMCI formatted message, that will be 29
- # octets per get-request.
- #
- # For the me_type_table, these are 16-bit values (ME Class IDs)
- #
- # For the message_type_table, these are 8-bit values (Actions)
+ ECA(OmciTableField(
+ PacketLenField("me_type_table", None,
+ OmciMeTypeTable, length_from=lambda pkt: 2)),
+ {AA.R}),
- ECA(FieldListField("me_type_table", None, ByteField('', 0),
- count_from=lambda _: 29), {AA.R}),
- ECA(FieldListField("message_type_table", None, ByteField('', 0),
- count_from=lambda _: 29), {AA.R}),
+ ECA(OmciTableField(
+ PacketLenField("message_type_table", None,
+ OmciMsgTypeTable, length_from=lambda pkt: 1)),
+ {AA.R}),
]
mandatory_operations = {OP.Get, OP.GetNext}
diff --git a/voltha/extensions/omci/omci_fields.py b/voltha/extensions/omci/omci_fields.py
index b6ccf5e..09cf465 100644
--- a/voltha/extensions/omci/omci_fields.py
+++ b/voltha/extensions/omci/omci_fields.py
@@ -34,6 +34,7 @@
val.remove_payload()
return remain + s[self._align:], val
+
class StrCompoundField(Field):
__slots__ = ['flds']
@@ -61,6 +62,7 @@
data += value
return s, data
+
class XStrFixedLenField(StrFixedLenField):
"""
XStrFixedLenField which value is printed as hexadecimal.
@@ -72,6 +74,7 @@
def m2i(self, pkt, x):
return None if x is None else binascii.b2a_hex(x)
+
class MultipleTypeField(object):
"""MultipleTypeField are used for fields that can be implemented by
various Field subclasses, depending on conditions on the packet.
@@ -162,14 +165,17 @@
def __getattr__(self, attr):
return getattr(self._find_fld(), attr)
+
class OmciSerialNumberField(StrCompoundField):
def __init__(self, name, default=None):
assert default is None or (isinstance(default, str) and len(default) == 12), 'invalid default serial number'
vendor_default = default[0:4] if default is not None else None
vendor_serial_default = default[4:12] if default is not None else None
super(OmciSerialNumberField, self).__init__(name,
- [StrFixedLenField('vendor_id', vendor_default, 4),
- XStrFixedLenField('vendor_serial_number', vendor_serial_default, 4)])
+ [StrFixedLenField('vendor_id', vendor_default, 4),
+ XStrFixedLenField('vendor_serial_number',
+ vendor_serial_default, 4)])
+
class OmciTableField(MultipleTypeField):
def __init__(self, tblfld):
@@ -178,14 +184,14 @@
assert hasattr(tblfld.cls, 'is_delete'), 'No delete() method defined for OmciTableField row object'
super(OmciTableField, self).__init__(
[
- (IntField('table_length', 0), (self.cond_pkt, self.cond_pkt_val)),
- (PadField(StrField('me_type_table', None), OmciTableField.PDU_SIZE),
- (self.cond_pkt2, self.cond_pkt_val2))
+ (IntField('table_length', 0), (self.cond_pkt, self.cond_pkt_val)),
+ (PadField(StrField('me_type_table', None), OmciTableField.PDU_SIZE),
+ (self.cond_pkt2, self.cond_pkt_val2))
], tblfld)
- PDU_SIZE = 29 # Baseline message set raw get-next PDU size
- OmciGetResponseMessageId = 0x29 # Ugh circular dependency
- OmciGetNextResponseMessageId = 0x3a # Ugh circular dependency
+ PDU_SIZE = 29 # Baseline message set raw get-next PDU size
+ OmciGetResponseMessageId = 0x29 # Ugh circular dependency
+ OmciGetNextResponseMessageId = 0x3a # Ugh circular dependency
def cond_pkt(self, pkt):
return pkt is not None and pkt.message_id == self.OmciGetResponseMessageId
@@ -200,8 +206,10 @@
return pkt is not None and pkt.message_id == self.OmciGetNextResponseMessageId
def to_json(self, new_values, old_values_json):
- if not isinstance(new_values, list): new_values = [new_values] # If setting a scalar, augment the old table
- else: old_values_json = None # If setting a vector of new values, erase all old_values
+ if not isinstance(new_values, list):
+ new_values = [new_values] # If setting a scalar, augment the old table
+ else:
+ old_values_json = None # If setting a vector of new values, erase all old_values
key_value_pairs = dict()
@@ -209,6 +217,7 @@
for old in old_table:
index = old.index()
key_value_pairs[index] = old
+
for new in new_values:
index = new.index()
if new.is_delete():
@@ -226,14 +235,19 @@
return str_values
def load_json(self, json_str):
- if json_str is None: json_str = '[]'
+ if json_str is None:
+ json_str = '[]'
+
json_values = json.loads(json_str)
key_value_pairs = dict()
+
for json_value in json_values:
v = self.default.cls(**json_value)
index = v.index()
key_value_pairs[index] = v
+
table = []
for k, v in sorted(key_value_pairs.iteritems()):
table.append(v)
+
return table
\ No newline at end of file
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index f6559a3..5da57bf 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -103,7 +103,7 @@
raise
if isinstance(pkt, OmciGetResponse) and isinstance(fld, OmciTableField):
data[fld.name + '_size'] = value
- table_attribute_mask = table_attribute_mask | (1 << index)
+ table_attribute_mask = table_attribute_mask | (1 << (16 - index))
else:
data[fld.name] = value
if table_attribute_mask:
@@ -199,7 +199,14 @@
ByteField("success_code", 0),
ShortField("attributes_mask", None),
ConditionalField(
- OmciMaskedData("data"), lambda pkt: pkt.success_code == 0)
+ ShortField("unsupported_attributes_mask", 0),
+ lambda pkt: pkt.success_code == 9),
+ ConditionalField(
+ ShortField("failed_attributes_mask", 0),
+ lambda pkt: pkt.success_code == 9),
+ ConditionalField(
+ OmciMaskedData("data"),
+ lambda pkt: pkt.success_code == 0 or pkt.success_code == 9)
]
diff --git a/voltha/extensions/omci/state_machines/mib_sync.py b/voltha/extensions/omci/state_machines/mib_sync.py
index 2a8b535..1a61335 100644
--- a/voltha/extensions/omci/state_machines/mib_sync.py
+++ b/voltha/extensions/omci/state_machines/mib_sync.py
@@ -25,7 +25,7 @@
RX_RESPONSE_KEY
from voltha.extensions.omci.onu_device_entry import OnuDeviceEvents, OnuDeviceEntry, \
SUPPORTED_MESSAGE_ENTITY_KEY, SUPPORTED_MESSAGE_TYPES_KEY
-from voltha.extensions.omci.omci_entities import OntData
+from voltha.extensions.omci.omci_entities import OntData, Omci
from common.event_bus import EventBusClient
from voltha.protos.omci_mib_db_pb2 import OpenOmciEventType
@@ -667,11 +667,11 @@
class_id = omci_msg['object_entity_class']
entity_id = omci_msg['object_entity_id']
- # Filter out the 'mib_data_sync' from the database. We save that at
- # the device level and do not want it showing up during a re-sync
- # during data compares
+ # Filter out the 'mib_data_sync' and 'omci' from the database. We save
+ # that at the device level and do not want it showing up during a
+ # re-sync during data compares
- if class_id == OntData.class_id:
+ if class_id in {OntData.class_id, Omci.class_id}:
return
attributes = {k: v for k, v in omci_msg['object_data'].items()}
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index 690df1c..3bcc052 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -19,8 +19,7 @@
from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from voltha.extensions.omci.omci_me import MEFrame
from voltha.extensions.omci.omci_frame import OmciFrame
-from voltha.extensions.omci.omci_cc import DEFAULT_OMCI_TIMEOUT
-from voltha.extensions.omci.omci_messages import OmciGet
+from voltha.extensions.omci.omci_messages import OmciGet, OmciGetNext
from voltha.extensions.omci.omci_fields import OmciTableField
RC = ReasonCodes
@@ -47,6 +46,7 @@
"""
task_priority = 128
name = "ONU OMCI Get Task"
+ MAX_TABLE_SIZE = 16 * 1024 # Keep get-next logic reasonable
def __init__(self, omci_agent, device_id, entity_class, entity_id, attributes,
exclusive=True, allow_failure=False):
@@ -145,6 +145,11 @@
"""
return self._failed_or_unknown_attributes
+ def is_table_attr(self, attr):
+ index = self._entity_class.attribute_name_to_index_map[attr]
+ attr_def = self._entity_class.attributes[index]
+ return isinstance(attr_def.field, OmciTableField)
+
@inlineCallbacks
def perform_get_omci(self):
"""
@@ -154,61 +159,62 @@
entity_id=self._entity_id, attributes=self._attributes)
try:
# If one or more attributes is a table attribute, get it separately
- def is_table_attr(attr):
- index = self._entity_class.attribute_name_to_index_map[attr]
- attr_def = self._entity_class.attributes[index]
- return isinstance(attr_def.field, OmciTableField)
- first_attributes = {attr for attr in self._attributes if not is_table_attr(attr)}
- table_attributes = {attr for attr in self._attributes if is_table_attr(attr)}
+ first_attributes = {attr for attr in self._attributes if not self.is_table_attr(attr)}
+ table_attributes = {attr for attr in self._attributes if self.is_table_attr(attr)}
- frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ if len(first_attributes):
+ frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
+ self.strobe_watchdog()
+ results = yield self._device.omci_cc.send(frame)
- status = results.fields['omci_message'].fields['success_code']
- self.log.debug('perform-get-status', status=status)
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-get-status', status=status)
- # Success?
- if status == RC.Success.value:
- self._results = results
- results_omci = results.fields['omci_message'].fields
+ if status == RC.AttributeFailure.value:
+ # What failed? Note if only one attribute was attempted, then
+ # that is an overall failure
- # Were all attributes fetched?
- missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
- results_omci['attributes_mask']
+ if not self._allow_failure or len(self._attributes) <= 1:
+ raise GetException('Get failed with status code: {}'.
+ format(RC.AttributeFailure.value))
- if missing_attr > 0 or len(table_attributes) > 0:
- self.log.info('perform-get-missing', num_missing=missing_attr,
- table_attr=table_attributes)
self.strobe_watchdog()
- self._local_deferred = reactor.callLater(0,
- self.perform_get_missing_attributes,
- missing_attr,
- table_attributes)
- returnValue(self._local_deferred)
+ # TODO: update failed & unknown attributes set
- elif status == RC.AttributeFailure.value:
- # What failed? Note if only one attribute was attempted, then
- # that is an overall failure
+ # Success?
+ if status in {RC.Success.value, RC.AttributeFailure.value}:
+ self._results = results
+ results_omci = results.fields['omci_message'].fields
- if not self._allow_failure or len(self._attributes) <= 1:
- raise GetException('Get failed with status code: {}'.
- format(RC.AttributeFailure.value))
+ # Were all attributes fetched?
+ missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
+ results_omci['attributes_mask']
+ if missing_attr > 0 or len(table_attributes) > 0:
+ self.log.info('perform-get-missing', num_missing=missing_attr,
+ table_attr=table_attributes)
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.perform_get_missing_attributes,
+ missing_attr, table_attributes)
+ returnValue(self._local_deferred)
+
+ else:
+ raise GetException('Get failed with status code: {}'.format(status))
+
+ self.log.debug('get-completed')
+ self.deferred.callback(self)
+
+ elif len(table_attributes) > 0:
+ # Here if only table attributes were requested
+ self.log.info('perform-get-table', table_attr=table_attributes)
self.strobe_watchdog()
self._local_deferred = reactor.callLater(0,
- self.perform_get_failed_attributes,
- results,
- self._attributes)
+ self.process_get_table,
+ table_attributes)
returnValue(self._local_deferred)
- else:
- raise GetException('Get failed with status code: {}'.format(status))
-
- self.log.debug('get-completed')
- self.deferred.callback(self)
-
except TimeoutError as e:
self.deferred.errback(failure.Failure(e))
@@ -222,11 +228,14 @@
"""
This method is called when the original Get requests completes with success
but not all attributes were returned. This can happen if one or more of the
- attributes would have exceeded the space available in the OMCI frame.
+ attributes would have exceeded the space available in the OMCI frame or if
+ one of the attributes is a table.
This routine iterates through the missing attributes and attempts to retrieve
the ones that were missing.
+ Once missing attributes are recovered, the table attributes are requested
+
:param missing_attr: (int) Missing attributes bitmask
:param table_attributes: (set) Attributes that need table get/get-next support
"""
@@ -235,28 +244,35 @@
# Retrieve missing attributes first (if any)
results_omci = self._results.fields['omci_message'].fields
- for index in xrange(16):
- attr_mask = 1 << index
+ try:
+ # Get remaining attributes one at a time
+ for index in range(15, 1, -1):
+ attr_mask = 1 << index
- if attr_mask & missing_attr:
- # Get this attribute
- frame = OmciFrame(
- transaction_id=None, # OMCI-CC will set
- message_type=OmciGet.message_id,
- omci_message=OmciGet(
- entity_class=self._entity_class.class_id,
- entity_id=self._entity_id,
- attributes_mask=attr_mask
+ if attr_mask & missing_attr:
+ # Get this attribute
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
)
- )
- try:
self.strobe_watchdog()
get_results = yield self._device.omci_cc.send(frame)
get_omci = get_results.fields['omci_message'].fields
- if get_omci['success_code'] != RC.Success.value:
+ status = get_omci['success_code']
+
+ if status == RC.AttributeFailure.value:
+ # TODO: update failed & unknown attributes set
continue
+ elif status != RC.Success.value:
+ raise GetException('Get failed with status code: {}'.format(status))
+
assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
results_omci['attributes_mask'] |= attr_mask
@@ -265,92 +281,127 @@
results_omci['data'].update(get_omci['data'])
- except TimeoutError:
- self.log.debug('missing-timeout')
+ except TimeoutError as e:
+ self.log.debug('missing-timeout')
+ self.deferred.errback(failure.Failure(e))
- except Exception as e:
- self.log.exception('missing-failure', e=e)
+ except Exception as e:
+ self.log.exception('missing-failure', class_id=self._entity_class.class_id,
+ entity_id=self._entity_id, e=e)
+ self.deferred.errback(failure.Failure(e))
- # Now any table attributes. OMCI_CC handles background get/get-next sequencing
- for tbl_attr in table_attributes:
- attr_mask = self._entity_class.mask_for(tbl_attr)
- frame = OmciFrame(
- transaction_id=None, # OMCI-CC will set
- message_type=OmciGet.message_id,
- omci_message=OmciGet(
- entity_class=self._entity_class.class_id,
- entity_id=self._entity_id,
- attributes_mask=attr_mask
- )
- )
- try:
- timeout = 2 * DEFAULT_OMCI_TIMEOUT # Multiple frames expected
- self.strobe_watchdog()
- get_results = yield self._device.omci_cc.send(frame,
- timeout=timeout)
- self.strobe_watchdog()
- get_omci = get_results.fields['omci_message'].fields
- if get_omci['success_code'] != RC.Success.value:
- continue
-
- if results_omci.get('data') is None:
- results_omci['data'] = dict()
-
- results_omci['data'].update(get_omci['data'])
-
- except TimeoutError:
- self.log.debug('tbl-attr-timeout')
-
- except Exception as e:
- self.log.exception('tbl-attr-timeout', e=e)
+ # Now any table attributes
+ if len(table_attributes):
+ self.strobe_watchdog()
+ self._local_deferred = reactor.callLater(0,
+ self.process_get_table,
+ table_attributes)
+ returnValue(self._local_deferred)
self.deferred.callback(self)
@inlineCallbacks
- def perform_get_failed_attributes(self, tmp_results, attributes):
+ def process_get_table(self, table_attributes):
"""
-
- :param tmp_results:
- :param attributes:
- :return:
+ Special handling for Get Requests that may require additional 'get_next' operations
+ if a table attribute was requested.
"""
- self.log.debug('perform-get-failed', attrs=attributes)
+ # Retrieve attributes retrieved so far so we can add to them
+ try:
+ results_omci = self._results.fields['omci_message'].fields if self._results is not None else {}
- for attr in attributes:
- try:
- frame = MEFrame(self._entity_class, self._entity_id, {attr}).get()
+ for tbl_attr in table_attributes:
+ attr_mask = self._entity_class.mask_for(tbl_attr)
+ attr_index = self._entity_class.attribute_indices_from_mask(attr_mask)[0]
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ # First get will retrieve the size
+ get_results = yield self._device.omci_cc.send(frame)
self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
- status = results.fields['omci_message'].fields['success_code']
+ if self._results is None:
+ self._results = get_results
+ results_omci = self._results.fields['omci_message'].fields
- if status == RC.AttributeFailure.value:
- self.log.debug('unknown-or-invalid-attribute', attr=attr, status=status)
- self._failed_or_unknown_attributes.add(attr)
+ omci_fields = get_results.fields['omci_message'].fields
+ if omci_fields['success_code'] == RC.AttributeFailure.value:
+ # Copy over any failed or unsupported attribute masks for final result
+ results_fields = results_omci.fields['omci_message'].fields
+ results_fields['unsupported_attributes_mask'] |= omci_fields['unsupported_attributes_mask']
+ results_fields['failed_attributes_mask'] |= omci_fields['failed_attributes_mask']
- elif status != RC.Success.value:
- self.log.warn('invalid-get', class_id=self._entity_class,
- attribute=attr, status=status)
- self._failed_or_unknown_attributes.add(attr)
+ if omci_fields['success_code'] != RC.Success.value:
+ raise GetException('Get table attribute failed with status code: {}'.
+ format(omci_fields['success_code']))
- else:
- # Add to partial results and correct the status
- tmp_results.fields['omci_message'].fields['success_code'] = status
- tmp_results.fields['omci_message'].fields['attributes_mask'] |= \
- results.fields['omci_message'].fields['attributes_mask']
+ eca = self._entity_class.attributes[attr_index]
- if tmp_results.fields['omci_message'].fields.get('data') is None:
- tmp_results.fields['omci_message'].fields['data'] = dict()
+ self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
+ attr_size = omci_fields['data'][eca.field.name + '_size']
- tmp_results.fields['omci_message'].fields['data'][attr] = \
- results.fields['omci_message'].fields['data'][attr]
+ if attr_size > self.MAX_TABLE_SIZE:
+ self.log.error('omcc-get-table-huge', count=attr_size, name=eca.field.name)
+ raise ValueError('Huge Table Size: {}'.format(attr_size))
- except TimeoutError as e:
- self.log.debug('attr-timeout')
+ # Start the loop
+ seq_no = 0
+ data_buffer = ''
- except Exception as e:
- self.log.exception('attr-failure', e=e)
+ for offset in xrange(0, attr_size, OmciTableField.PDU_SIZE):
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGetNext.message_id,
+ omci_message=OmciGetNext(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask,
+ command_sequence_number=seq_no
+ )
+ )
+ get_results = yield self._device.omci_cc.send(frame)
- self._results = tmp_results
- self.deferred.callback(self)
+ omci_fields = get_results.fields['omci_message'].fields
+ status = omci_fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise Exception('get-next-failure table=' + eca.field.name +
+ ' entity_id=' + str(self._entity_id) +
+ ' sqn=' + str(seq_no) + ' omci-status ' + str(status))
+
+ # Extract the data
+ num_octets = attr_size - offset
+ if num_octets > OmciTableField.PDU_SIZE:
+ num_octets = OmciTableField.PDU_SIZE
+
+ data = omci_fields['data'][eca.field.name]
+ data_buffer += data[:num_octets]
+ seq_no += 1
+
+ vals = []
+ while data_buffer:
+ data_buffer, val = eca.field.getfield(None, data_buffer)
+ vals.append(val)
+
+ # Save off the retrieved data
+ results_omci['attributes_mask'] |= attr_mask
+ results_omci['data'][eca.field.name] = vals
+
+ self.deferred.callback(self)
+
+ except TimeoutError as e:
+ self.log.debug('tbl-attr-timeout')
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('tbl-attr-timeout', class_id=self._entity_class.class_id,
+ entity_id=self._entity_id, e=e)
+ self.deferred.errback(failure.Failure(e))
+
diff --git a/voltha/extensions/omci/tasks/onu_capabilities_task.py b/voltha/extensions/omci/tasks/onu_capabilities_task.py
index 048382c..ee23b15 100644
--- a/voltha/extensions/omci/tasks/onu_capabilities_task.py
+++ b/voltha/extensions/omci/tasks/onu_capabilities_task.py
@@ -20,6 +20,8 @@
from voltha.extensions.omci.omci_defs import ReasonCodes
from voltha.extensions.omci.omci_me import OmciFrame
from voltha.extensions.omci.omci import EntityOperations
+from voltha.extensions.omci.tasks.omci_get_request import OmciGetRequest
+from voltha.extensions.omci.omci_entities import Omci
class GetNextException(Exception):
@@ -66,7 +68,9 @@
super(OnuCapabilitiesTask, self).__init__(OnuCapabilitiesTask.name,
omci_agent,
device_id,
- priority=OnuCapabilitiesTask.task_priority)
+ exclusive=False,
+ priority=OnuCapabilitiesTask.task_priority,
+ watchdog_timeout=2*Task.DEFAULT_WATCHDOG_SECS)
self._local_deferred = None
self._device = omci_agent.get_device(device_id)
self._pdu_size = omci_pdu_size
@@ -139,9 +143,9 @@
try:
self.strobe_watchdog()
self._supported_entities = yield self.get_supported_entities()
-
self.strobe_watchdog()
self._supported_msg_types = yield self.get_supported_message_types()
+ self.strobe_watchdog()
self.log.debug('get-success',
supported_entities=self.supported_managed_entities,
@@ -166,57 +170,24 @@
@inlineCallbacks
def get_supported_entities(self):
"""
- Get the supported ME Types for this ONU.
+ Get the supported Message Types (actions) for this ONU.
"""
try:
- # Get the number of requests needed
- frame = OmciFrame(me_type_table=True).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ # Use the GetRequest Task to perform table retrieval
+ get_request = OmciGetRequest(self.omci_agent, self.device_id, Omci, 0,
+ ["me_type_table"], exclusive=False)
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
+ results = yield self._device.task_runner.queue_task(get_request)
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure('Get count of supported entities failed with status code: {}'.
- format(status))
- data = omci_msg.fields['data']['me_type_table']
- count = self.get_count_from_data_buffer(bytearray(data))
+ if results.success_code != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get supported managed entities table failed with status code: {}'.
+ format(results.success_code))
- seq_no = 0
- data_buffer = bytearray(0)
- self.log.debug('me-type-count', octets=count, data=hexlify(data))
-
- # Start the loop
- for offset in xrange(0, count, self._pdu_size):
- frame = OmciFrame(me_type_table=seq_no).get_next()
- seq_no += 1
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
-
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure(
- 'Get supported entities request at offset {} of {} failed with status code: {}'.
- format(offset + 1, count, status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > self._pdu_size:
- num_octets = self._pdu_size
-
- data = omci_msg.fields['data']['me_type_table']
- data_buffer += bytearray(data[:num_octets])
-
- me_types = {(data_buffer[x] << 8) + data_buffer[x + 1]
- for x in xrange(0, len(data_buffer), 2)}
- returnValue(me_types)
+ returnValue({attr.fields['me_type'] for attr in results.attributes['me_type_table']})
except Exception as e:
self.log.exception('get-entities', e=e)
- self.deferred.errback(failure.Failure(e))
+ raise
@inlineCallbacks
def get_supported_message_types(self):
@@ -224,59 +195,18 @@
Get the supported Message Types (actions) for this ONU.
"""
try:
- # Get the number of requests needed
- frame = OmciFrame(message_type_table=True).get()
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ # Use the GetRequest Task to perform table retrieval
+ get_request = OmciGetRequest(self.omci_agent, self.device_id, Omci, 0,
+ ["message_type_table"], exclusive=False)
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
+ results = yield self._device.task_runner.queue_task(get_request)
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure('Get count of supported msg types failed with status code: {}'.
- format(status))
+ if results.success_code != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get supported msg types table failed with status code: {}'.
+ format(results.success_code))
- data = omci_msg.fields['data']['message_type_table']
- count = self.get_count_from_data_buffer(bytearray(data))
-
- seq_no = 0
- data_buffer = list()
- self.log.debug('me-type-count', octets=count, data=hexlify(data))
-
- # Start the loop
- for offset in xrange(0, count, self._pdu_size):
- frame = OmciFrame(message_type_table=seq_no).get_next()
- seq_no += 1
- self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
-
- omci_msg = results.fields['omci_message']
- status = omci_msg.fields['success_code']
-
- if status != ReasonCodes.Success.value:
- raise GetCapabilitiesFailure(
- 'Get supported msg types request at offset {} of {} failed with status code: {}'.
- format(offset + 1, count, status))
-
- # Extract the data
- num_octets = count - offset
- if num_octets > self._pdu_size:
- num_octets = self._pdu_size
-
- data = omci_msg.fields['data']['message_type_table']
- data_buffer += data[:num_octets]
-
- def buffer_to_message_type(value):
- """
- Convert an integer value to the appropriate EntityOperations enumeration
- :param value: (int) Message type value (4..29)
- :return: (EntityOperations) Enumeration, None on failure
- """
- next((v for k, v in EntityOperations.__members__.items() if v.value == value), None)
-
- msg_types = {buffer_to_message_type(v) for v in data_buffer if v is not None}
- returnValue({msg_type for msg_type in msg_types if msg_type is not None})
+ returnValue({attr.fields['msg_type'] for attr in results.attributes['message_type_table']})
except Exception as e:
self.log.exception('get-msg-types', e=e)
- self.deferred.errback(failure.Failure(e))
+ raise