VOL-1317: Added stop-and-wait support for OMCI Comm Channel
Change-Id: I39b8e27755f5a5355aae74487eb4bfe7fa1388fc
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
index ad034ea..235c795 100644
--- a/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
+++ b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
@@ -64,6 +64,7 @@
self._devices = dict() # device-id -> mock device
self.core = MockCore()
self.deferred = d
+ self.timeout_the_message = False
@property
def send_omci_defer(self):
@@ -145,7 +146,7 @@
self.log.debug("--> send_proxied_message", message=msg)
# if proxy_address is None:
- if self.deferred is not None:
+ if self.deferred is not None and not self.timeout_the_message:
self.deferred.callback(msg)
# return None
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index f7ea647..cf8975c 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -15,7 +15,7 @@
#
import binascii
from common.frameio.frameio import hexify
-from nose.twistedtools import deferred
+from twisted.python.failure import Failure
from unittest import TestCase, main, skip
from mock.mock_adapter_agent import MockAdapterAgent
from mock.mock_onu_handler import MockOnuHandler
@@ -25,7 +25,8 @@
from voltha.extensions.omci.omci_frame import *
from voltha.extensions.omci.omci_entities import *
from voltha.extensions.omci.omci_me import ExtendedVlanTaggingOperationConfigurationDataFrame
-from voltha.extensions.omci.omci_cc import UNKNOWN_CLASS_ATTRIBUTE_KEY
+from voltha.extensions.omci.omci_cc import OMCI_CC, UNKNOWN_CLASS_ATTRIBUTE_KEY,\
+ MAX_OMCI_REQUEST_AGE
DEFAULT_OLT_DEVICE_ID = 'default_olt_mock'
DEFAULT_ONU_DEVICE_ID = 'default_onu_mock'
@@ -36,6 +37,9 @@
OP = EntityOperations
RC = ReasonCodes
+successful = False
+error_reason = None
+
def chunk(indexable, chunk_size):
for i in range(0, len(indexable), chunk_size):
@@ -53,7 +57,7 @@
Note also added some testing of MockOnu behaviour since its behaviour during more
complicated unit/integration tests may be performed in the future.
"""
- def setUp(self):
+ def setUp(self, let_msg_timeout=False):
self.adapter_agent = MockAdapterAgent()
def tearDown(self):
@@ -77,11 +81,12 @@
handler.onu_mock = onu
return handler
- def setup_one_of_each(self):
+ def setup_one_of_each(self, timeout_messages=False):
# Most tests will use at lease one or more OLT and ONU
self.olt_handler = self.setup_mock_olt()
self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id)
self.onu_device = self.onu_handler.onu_mock
+ self.adapter_agent.timeout_the_message = timeout_messages
self.adapter_agent.add_child_device(self.olt_handler.device,
self.onu_handler.device)
@@ -123,7 +128,6 @@
def _default_errback(self, failure):
from twisted.internet.defer import TimeoutError
assert isinstance(failure.type, type(TimeoutError))
- return None
def _snapshot_stats(self):
omci_cc = self.onu_handler.omci_cc
@@ -135,11 +139,16 @@
'rx_onu_discards': omci_cc.rx_onu_discards,
'rx_timeouts': omci_cc.rx_timeouts,
'rx_unknown_me': omci_cc.rx_unknown_me,
+ 'rx_late': omci_cc.rx_late,
'tx_errors': omci_cc.tx_errors,
'consecutive_errors': omci_cc.consecutive_errors,
'reply_min': omci_cc.reply_min,
'reply_max': omci_cc.reply_max,
- 'reply_average': omci_cc.reply_average
+ 'reply_average': omci_cc.reply_average,
+ 'hp_tx_queue_len': omci_cc.hp_tx_queue_len,
+ 'lp_tx_queue_len': omci_cc.lp_tx_queue_len,
+ 'max_hp_tx_queue': omci_cc.max_hp_tx_queue,
+ 'max_lp_tx_queue': omci_cc._max_lp_tx_queue,
}
def test_default_init(self):
@@ -153,7 +162,12 @@
self.assertIsNone(omci_cc._proxy_address)
# No outstanding requests
- self.assertEqual(len(omci_cc._requests), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0)
+
+ # No active requests
+ self.assertIsNone(omci_cc._tx_request[OMCI_CC.LOW_PRIORITY])
+ self.assertIsNone(omci_cc._tx_request[OMCI_CC.HIGH_PRIORITY])
# Flags/properties
self.assertFalse(omci_cc.enabled)
@@ -166,11 +180,16 @@
self.assertEqual(omci_cc.rx_onu_discards, 0)
self.assertEqual(omci_cc.rx_unknown_me, 0)
self.assertEqual(omci_cc.rx_timeouts, 0)
+ self.assertEqual(omci_cc.rx_late, 0)
self.assertEqual(omci_cc.tx_errors, 0)
self.assertEqual(omci_cc.consecutive_errors, 0)
self.assertNotEquals(omci_cc.reply_min, 0.0)
self.assertEqual(omci_cc.reply_max, 0.0)
self.assertEqual(omci_cc.reply_average, 0.0)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0.0)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 0.0)
+ self.assertEqual(omci_cc._max_hp_tx_queue, 0.0)
+ self.assertEqual(omci_cc._max_lp_tx_queue, 0.0)
def test_enable_disable(self):
self.setup_one_of_each()
@@ -186,12 +205,14 @@
omci_cc.enabled = True
self.assertTrue(omci_cc.enabled)
self.assertIsNotNone(omci_cc._proxy_address)
- self.assertEqual(len(omci_cc._requests), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0)
omci_cc.enabled = True # Should be a NOP
self.assertTrue(omci_cc.enabled)
self.assertIsNotNone(omci_cc._proxy_address)
- self.assertEqual(len(omci_cc._requests), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0)
+ self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0)
omci_cc.enabled = False
self.assertFalse(omci_cc.enabled)
@@ -216,7 +237,6 @@
self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
- self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
def test_message_send_get(self):
# Various tests of sending an OMCI message and it either
@@ -367,6 +387,7 @@
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
@@ -395,6 +416,7 @@
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
@@ -427,6 +449,48 @@
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
+ def test_message_send_no_timeout(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ self.onu_device.mib_data_sync = 10
+ snapshot = self._snapshot_stats()
+
+ d = omci_cc.send_mib_reset(timeout=0)
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ return d
+
+ def test_message_send_bad_timeout(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ self.onu_device.mib_data_sync = 10
+ snapshot = self._snapshot_stats()
+
+ d = omci_cc.send_mib_reset(timeout=MAX_OMCI_REQUEST_AGE + 1)
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'])
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'] + 1)
+ return d
+
+ def test_message_send_not_a_frame(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ self.onu_device.mib_data_sync = 10
+ snapshot = self._snapshot_stats()
+
+ d = omci_cc.send('hello world', timeout=1)
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'])
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'] + 1)
+ return d
+
def test_message_send_reboot(self):
self.setup_one_of_each()
@@ -447,6 +511,7 @@
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
@@ -506,6 +571,7 @@
self.assertEqual(omci_cc.rx_frames, 0)
self.assertEqual(omci_cc.rx_unknown_tid, 0)
self.assertEqual(omci_cc.rx_timeouts, 0)
+ self.assertEqual(omci_cc.rx_late, 0)
self.assertEqual(omci_cc.tx_errors, 0)
# # Class ID not found
@@ -624,7 +690,6 @@
# TODO: add more
self.assertTrue(True) # TODO: Implement
-
def test_rx_discard_if_disabled(self):
# ME without a known decoder
self.setup_one_of_each()
@@ -644,7 +709,6 @@
self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
- self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
def test_omci_alarm_decode(self):
"""
@@ -655,15 +719,79 @@
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
# Frame from the JIRA issue
msg = '0000100a000b0102800000000000000000000000' \
'0000000000000000000000000000000000000015' \
'000000282d3ae0a6'
- results = omci_cc.receive_message(hex2raw(msg))
+ _results = omci_cc.receive_message(hex2raw(msg))
- self.assertTrue(True, 'Truth is the truth')
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1)
+ self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'])
+
+ def test_omci_avc_decode(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # Frame from the JIRA issue
+ msg = '0000110a0007000080004d4c2d33363236000000' \
+ '0000000020202020202020202020202020202020' \
+ '00000028'
+
+ _results = omci_cc.receive_message(hex2raw(msg))
+
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1)
+ self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'])
+
+ def test_omci_unknown_onu_decode(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # Frame from the JIRA issue
+ msg = '0000190a0007000080004d4c2d33363236000000' \
+ '0000000020202020202020202020202020202020' \
+ '00000028'
+
+ _results = omci_cc.receive_message(hex2raw(msg))
+
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1)
+ self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'] + 1)
+
+ def test_omci_bad_frame_decode(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # Frame from the JIRA issue
+ msg = '0020190a0007000080004d4c2d33363236000000' \
+ '0000000000000028'
+
+ _results = omci_cc.receive_message(hex2raw(msg))
+ # NOTE: Currently do not increment any Rx Discard counters, just throw it away
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'] + 1)
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1)
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
+ self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'])
def test_rx_decode_onu_g(self):
self.setup_one_of_each()
@@ -709,7 +837,7 @@
self.assertEqual(expected, val)
return results
- @skip('for unknow omci failure')
+ @skip('for unknown omci failure')
#@deferred()
def test_rx_table_get_extvlantagging(self):
self.setup_one_of_each()
@@ -832,12 +960,274 @@
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'] + 2)
+ d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_stats, snapshot, 'consecutive_errors', 0)
d.addCallback(self._check_vlan_tag_op, 'received_frame_vlan_tagging_operation_table', tbl)
return d
+ ##################################################################
+ # Start of tests specific to new stop_and_wait changes
+ #
+ def test_message_send_low_priority(self):
+ # self.setup_one_of_each(timeout_messages=True)
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # MIB Upload
+ d = omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+ d.addCallback(self._check_stats, snapshot, 'lp_tx_queue_len', snapshot['lp_tx_queue_len'])
+ d.addCallback(self._check_stats, snapshot, 'hp_tx_queue_len', snapshot['hp_tx_queue_len'])
+ d.addCallback(self._check_stats, snapshot, 'max_lp_tx_queue', snapshot['max_lp_tx_queue'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'max_hp_tx_queue', snapshot['max_hp_tx_queue'])
+
+ # Flush to get ready for next test (one frame queued)
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ self.adapter_agent.timeout_the_message = True
+ omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+ omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+
+ self.assertEqual(omci_cc.lp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.max_lp_tx_queue, 1)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 0)
+
+ # Flush to get ready for next test (two queued and new max)
+ omci_cc.flush()
+ omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+ omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+ omci_cc.send_mib_upload(timeout=1.0, high_priority=False)
+
+ self.assertEqual(omci_cc.lp_tx_queue_len, 2)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.max_lp_tx_queue, 2)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 0)
+
+ def test_message_send_high_priority(self):
+ # self.setup_one_of_each(timeout_messages=True)
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # MIB Upload
+ d = omci_cc.send_mib_upload(high_priority=True)
+ d.addCallback(self._check_stats, snapshot, 'lp_tx_queue_len', snapshot['lp_tx_queue_len'])
+ d.addCallback(self._check_stats, snapshot, 'hp_tx_queue_len', snapshot['hp_tx_queue_len'])
+ d.addCallback(self._check_stats, snapshot, 'max_lp_tx_queue', snapshot['max_lp_tx_queue'])
+ d.addCallback(self._check_stats, snapshot, 'max_hp_tx_queue', snapshot['max_hp_tx_queue'] + 1)
+
+ # Flush to get ready for next test (one frame queued)
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ self.adapter_agent.timeout_the_message = True
+ omci_cc.send_mib_upload(high_priority=True)
+ omci_cc.send_mib_upload(high_priority=True)
+
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.max_lp_tx_queue, 0)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 1)
+
+ # Flush to get ready for next test (two queued and new max)
+ omci_cc.flush()
+ omci_cc.send_mib_upload(high_priority=True)
+ omci_cc.send_mib_upload(high_priority=True)
+ omci_cc.send_mib_upload(high_priority=True)
+
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 2)
+ self.assertEqual(omci_cc.max_lp_tx_queue, 0)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 2)
+
+ def test_message_send_and_cancel(self):
+ global error_reason
+ global successful
+ # Do not send messages to adapter_agent
+ self.setup_one_of_each(timeout_messages=True)
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+
+ def success(_results):
+ global successful
+ successful = True
+
+ def failure(reason):
+ global error_reason
+ error_reason = reason
+
+ def notCalled(reason):
+ assert isinstance(reason, Failure), 'Should not be called with success'
+
+ # Cancel one that is actively being sent
+ d = omci_cc.send_mib_upload(high_priority=False)
+ d.addCallbacks(success, failure)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ d.cancel()
+ self.assertIsInstance(error_reason, Failure)
+ self.assertFalse(successful)
+ self.assertTrue(d.called)
+
+ self.assertEqual(omci_cc.max_lp_tx_queue, 1)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 0)
+
+ # Flush to get ready for next test (one running, one queued, cancel the
+ # running one, so queued runs)
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ error_reason = None
+ d1 = omci_cc.send_mib_upload(high_priority=False)
+ d2 = omci_cc.send_mib_upload(high_priority=False)
+ d1.addCallbacks(success, failure)
+ d2.addCallbacks(notCalled, notCalled)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ d1.cancel()
+ self.assertIsInstance(error_reason, Failure)
+ self.assertFalse(successful)
+ self.assertTrue(d1.called)
+ self.assertFalse(d2.called)
+
+ self.assertEqual(omci_cc.max_lp_tx_queue, 1)
+ self.assertEqual(omci_cc.max_hp_tx_queue, 0)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ # Flush to get ready for next test (one running, one queued, cancel the queued one)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ error_reason = None
+ d3 = omci_cc.send_mib_upload(timeout=55, high_priority=False)
+ d4 = omci_cc.send_mib_upload(timeout=55, high_priority=False)
+ d5 = omci_cc.send_mib_upload(timeout=55, high_priority=False)
+ d3.addCallbacks(notCalled, notCalled)
+ d4.addCallbacks(success, failure)
+ d5.addCallbacks(notCalled, notCalled)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 2)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ d4.cancel()
+ self.assertIsInstance(error_reason, Failure)
+ self.assertFalse(successful)
+ self.assertFalse(d3.called)
+ self.assertTrue(d4.called)
+ self.assertFalse(d5.called)
+
+ def test_message_send_low_and_high_priority(self):
+ self.setup_one_of_each(timeout_messages=True)
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+
+ omci_cc.send_mib_reset(high_priority=False)
+ omci_cc.send_mib_reset(high_priority=True)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ omci_cc.send_mib_reset(high_priority=False)
+ omci_cc.send_mib_reset(high_priority=True)
+ omci_cc.send_mib_reset(high_priority=False)
+ omci_cc.send_mib_reset(high_priority=True)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 1)
+
+ def test_no_sw_download_and_mib_upload_at_same_time(self):
+ # Section B.2.3 of ITU G.988-2017 specifies that a MIB
+ # upload or software download at a given priority level
+ # is not allowed while a similar action in the other
+ # priority level is in progress. Relates to possible memory
+ # consumption/needs on the ONU.
+ #
+ # OMCI_CC only checks if the commands are currently in
+ # progress. ONU should reject messages if the upload/download
+ # is in progress (but not an active request is in progress).
+
+ self.setup_one_of_each(timeout_messages=True)
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+
+ mib_upload_msgs = [omci_cc.send_mib_upload,
+ # omci_cc.send_mib_upload_next
+ ]
+ sw_download_msgs = [omci_cc.send_start_software_download,
+ # omci_cc.send_download_section,
+ # omci_cc.send_end_software_download
+ ]
+
+ for upload in mib_upload_msgs:
+ for download in sw_download_msgs:
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ upload(high_priority=False)
+ download(1, 1, 1, high_priority=True) # Should stall send-next 50mS
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 1)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ upload(high_priority=True)
+ download(1, 1, 1, high_priority=False) # Should stall send-next 50mS
+ self.assertEqual(omci_cc.lp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ download(1, 1, 1, high_priority=False)
+ upload(high_priority=True) # Should stall send-next 50mS
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 1)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ download(1, 1, 1, high_priority=True)
+ upload(high_priority=False) # Should stall send-next 50mS)
+ self.assertEqual(omci_cc.lp_tx_queue_len, 1)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ omci_cc.flush()
+ self.assertEqual(omci_cc.lp_tx_queue_len, 0)
+ self.assertEqual(omci_cc.hp_tx_queue_len, 0)
+
+ # Some more ideas for tests that we could add
+ # Send explicit tid that is not valid
+ # - Look at top of 'Send' method and test all the error conditions could may hit
+
+ # Send multiple and have the OLT proxy throw an exception. Should call errback and
+ # schedule remainder in queue to still tx.
+
+ # Send a frame and then inject a response and test the RX logic out, including late
+ # rx and retries by the OMCI_CC transmitter.
+
+
if __name__ == '__main__':
main()
-
diff --git a/voltha/extensions/kpi/onu/onu_omci_pm.py b/voltha/extensions/kpi/onu/onu_omci_pm.py
index fc2316e..c186bbb 100644
--- a/voltha/extensions/kpi/onu/onu_omci_pm.py
+++ b/voltha/extensions/kpi/onu/onu_omci_pm.py
@@ -75,10 +75,15 @@
('rx_onu_frames', PmConfig.COUNTER), # Rx ONU autonomous messages
('rx_unknown_me', PmConfig.COUNTER), # Managed Entities without a decode definition
('rx_timeouts', PmConfig.COUNTER),
+ ('rx_late', PmConfig.COUNTER),
('consecutive_errors', PmConfig.COUNTER),
('reply_min', PmConfig.GAUGE), # Milliseconds
('reply_max', PmConfig.GAUGE), # Milliseconds
('reply_average', PmConfig.GAUGE), # Milliseconds
+ ('hp_tx_queue_len', PmConfig.GAUGE),
+ ('lp_tx_queue_len', PmConfig.GAUGE),
+ ('max_hp_tx_queue', PmConfig.GAUGE),
+ ('max_lp_tx_queue', PmConfig.GAUGE),
}
self.omci_cc_metrics_config = {m: PmConfig(name=m, type=t, enabled=True)
for (m, t) in self.omci_cc_pm_names}
diff --git a/voltha/extensions/omci/database/mib_db_dict.py b/voltha/extensions/omci/database/mib_db_dict.py
index fddbf60..6a7de8f 100644
--- a/voltha/extensions/omci/database/mib_db_dict.py
+++ b/voltha/extensions/omci/database/mib_db_dict.py
@@ -314,7 +314,7 @@
return changed or created
except Exception as e:
- self.log.error('set-failure', e, class_id=class_id,
+ self.log.error('set-failure', e=e, class_id=class_id,
instance_id=instance_id, attributes=attributes)
raise
diff --git a/voltha/extensions/omci/database/mib_db_ext.py b/voltha/extensions/omci/database/mib_db_ext.py
index cf7ad1d..6b49e2b 100644
--- a/voltha/extensions/omci/database/mib_db_ext.py
+++ b/voltha/extensions/omci/database/mib_db_ext.py
@@ -77,8 +77,8 @@
DEVICE_PATH = MIB_PATH + '/{}' # .format(device_id)
# Classes, Instances, and Attributes as lists from root proxy
- CLASSES_PATH = DEVICE_PATH + '/classes' # .format(device_id)
- INSTANCES_PATH = DEVICE_PATH +'/classes/{}/instances' # .format(device_id, class_id)
+ CLASSES_PATH = DEVICE_PATH + '/classes' # .format(device_id)
+ INSTANCES_PATH = DEVICE_PATH + '/classes/{}/instances' # .format(device_id, class_id)
ATTRIBUTES_PATH = DEVICE_PATH + '/classes/{}/instances/{}/attributes' # .format(device_id, class_id, instance_id)
# Single Class, Instance, and Attribute as objects from device proxy
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 7abbfbc..7d4d304 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -20,31 +20,33 @@
import sys
import arrow
from twisted.internet import reactor, defer
-from twisted.internet.defer import DeferredQueue, TimeoutError, CancelledError, failure, fail, inlineCallbacks
+from twisted.internet.defer import TimeoutError, CancelledError, failure, fail, succeed, inlineCallbacks
from common.frameio.frameio import hexify
from voltha.extensions.omci.omci import *
from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame, SoftwareImageFrame
from voltha.extensions.omci.me_frame import MEFrame
-from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
from common.event_bus import EventBusClient
from enum import IntEnum
from binascii import hexlify
+
def hexify(buffer):
"""Return a hexadecimal string encoding of input buffer"""
return ''.join('%02x' % ord(c) for c in buffer)
-DEFAULT_OMCI_TIMEOUT = 3 # Seconds
-MAX_OMCI_REQUEST_AGE = 60 # Seconds
-MAX_OMCI_TX_ID = 0xFFFF # 2 Octets max
+
+DEFAULT_OMCI_TIMEOUT = 3 # Seconds
+MAX_OMCI_REQUEST_AGE = 60 # Seconds
DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE = 31 # Bytes
-#DEFAULT_OMCI_DOWNLOAD_WINDOW_SIZE = 32 # sections
+MAX_TABLE_ROW_COUNT = 512 # Keep get-next logic reasonable
CONNECTED_KEY = 'connected'
TX_REQUEST_KEY = 'tx-request'
RX_RESPONSE_KEY = 'rx-response'
UNKNOWN_CLASS_ATTRIBUTE_KEY = 'voltha-unknown-blob'
+
class OmciCCRxEvents(IntEnum):
AVC_Notification = 0,
MIB_Upload = 1,
@@ -64,9 +66,30 @@
OP = EntityOperations
RxEvent = OmciCCRxEvents
+
class OMCI_CC(object):
""" Handle OMCI Communication Channel specifics for Adtran ONUs"""
+ MIN_OMCI_TX_ID_LOW_PRIORITY = 0x0001 # 2 Octets max
+ MAX_OMCI_TX_ID_LOW_PRIORITY = 0x7FFF # 2 Octets max
+ MIN_OMCI_TX_ID_HIGH_PRIORITY = 0x8000 # 2 Octets max
+ MAX_OMCI_TX_ID_HIGH_PRIORITY = 0xFFFF # 2 Octets max
+ LOW_PRIORITY = 0
+ HIGH_PRIORITY = 1
+
+ # Offset into some tuples for pending lists and tx in progress
+ PENDING_DEFERRED = 0
+ PENDING_FRAME = 1
+ PENDING_TIMEOUT = 2
+ PENDING_RETRY = 3
+
+ REQUEST_TIMESTAMP = 0
+ REQUEST_DEFERRED = 1
+ REQUEST_FRAME = 2
+ REQUEST_TIMEOUT = 3
+ REQUEST_RETRY = 4
+ REQUEST_DELAYED_CALL = 5
+
_frame_to_event_type = {
OmciMibResetResponse.message_id: RxEvent.MIB_Reset,
OmciMibUploadResponse.message_id: RxEvent.MIB_Upload,
@@ -84,15 +107,21 @@
self._adapter_agent = adapter_agent
self._device_id = device_id
self._proxy_address = None
- self._tx_tid = 1
self._enabled = False
- self._requests = dict() # Tx ID -> (timestamp, deferred, tx_frame, timeout, retry, delayedCall)
+ self._extended_messaging = False
self._me_map = me_map
if clock is None:
self.reactor = reactor
else:
self.reactor = clock
-
+
+ # Support 2 levels of priority since only baseline message set supported
+ self._tx_tid = [OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY, OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY]
+ self._tx_request = [None, None] # Tx in progress (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._tx_request_deferred = [None, None] # Tx in progress but held till child Rx/TX can finish. ie omci tables.
+ self._pending = [list(), list()] # pending queue (deferred, tx_frame, timeout, retry)
+ self._rx_response = [None, None]
+
# Statistics
self._tx_frames = 0
self._rx_frames = 0
@@ -100,12 +129,15 @@
self._rx_onu_frames = 0 # Autonomously generated ONU frames
self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
self._rx_timeouts = 0
+ self._rx_late = 0 # Frame response received after timeout on Tx
self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
self._tx_errors = 0 # Exceptions during tx request
self._consecutive_errors = 0 # Rx & Tx errors in a row, a good RX resets this to 0
self._reply_min = sys.maxint # Fastest successful tx -> rx
self._reply_max = 0 # Longest successful tx -> rx
self._reply_sum = 0.0 # Total seconds for successful tx->rx (float for average)
+ self._max_hp_tx_queue = 0 # Maximum size of high priority tx pending queue
+ self._max_lp_tx_queue = 0 # Maximum size of low priority tx pending queue
self.event_bus = EventBusClient()
@@ -116,6 +148,17 @@
def __str__(self):
return "OMCISupport: {}".format(self._device_id)
+ def _get_priority_index(self, high_priority):
+ """ Centralized logic to help make extended message support easier in the future"""
+ return OMCI_CC.HIGH_PRIORITY if high_priority and not self._extended_messaging \
+ else OMCI_CC.LOW_PRIORITY
+
+ def _tid_is_high_priority(self, tid):
+ """ Centralized logic to help make extended message support easier in the future"""
+
+ return not self._extended_messaging and \
+ OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY <= tid <= OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
+
@staticmethod
def event_bus_topic(device_id, event):
"""
@@ -178,6 +221,10 @@
return self._rx_timeouts
@property
+ def rx_late(self):
+ return self._rx_late
+
+ @property
def tx_errors(self):
return self._tx_errors
@@ -198,6 +245,22 @@
avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
return int(round(avg * 1000.0)) # Milliseconds
+ @property
+ def hp_tx_queue_len(self):
+ return len(self._pending[OMCI_CC.HIGH_PRIORITY])
+
+ @property
+ def lp_tx_queue_len(self):
+ return len(self._pending[OMCI_CC.LOW_PRIORITY])
+
+ @property
+ def max_hp_tx_queue(self):
+ return self._max_hp_tx_queue
+
+ @property
+ def max_lp_tx_queue(self):
+ return self._max_lp_tx_queue
+
def _start(self):
"""
Start the OMCI Communications Channel
@@ -243,6 +306,16 @@
self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
self._rx_onu_discards += 1
+ def _update_rx_tx_stats(self, now, ts):
+ ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
+ secs = ts_diff.total_seconds()
+ self._reply_sum += secs
+ if secs < self._reply_min:
+ self._reply_min = secs
+ if secs > self._reply_max:
+ self._reply_max = secs
+ return secs
+
def receive_message(self, msg):
"""
Receive and OMCI message from the proxy channel to the OLT.
@@ -250,98 +323,135 @@
Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
:param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
"""
- if self.enabled:
+ if not self.enabled:
+ return
+
+ try:
+ now = arrow.utcnow()
+ d = None
+
+ # NOTE: Since we may need to do an independent ME map on a per-ONU basis
+ # save the current value of the entity_id_to_class_map, then
+ # replace it with our custom one before decode, and then finally
+ # restore it later. Tried other ways but really made the code messy.
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
+
try:
- now = arrow.utcnow()
- d = None
+ rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
+ rx_tid = rx_frame.fields['transaction_id']
- # NOTE: Since we may need to do an independent ME map on a per-ONU basis
- # save the current value of the entity_id_to_class_map, then
- # replace it with our custom one before decode, and then finally
- # restore it later. Tried other ways but really made the code messy.
+ if rx_tid == 0:
+ return self._receive_onu_message(rx_frame)
- saved_me_map = omci_entities.entity_id_to_class_map
- omci_entities.entity_id_to_class_map = self._me_map
+ # Previously unreachable if this is the very first Rx or we
+ # have been running consecutive errors
+ if self._rx_frames == 0 or self._consecutive_errors != 0:
+ self.reactor.callLater(0, self._publish_connectivity_event, True)
- try:
- rx_frame = msg if isinstance(msg, OmciFrame) else OmciFrame(msg)
- rx_tid = rx_frame.fields['transaction_id']
+ self._rx_frames += 1
+ self._consecutive_errors = 0
- if rx_tid == 0:
- return self._receive_onu_message(rx_frame)
-
- # Previously unreachable if this is the very first Rx or we
- # have been running consecutive errors
- if self._rx_frames == 0 or self._consecutive_errors != 0:
- self.reactor.callLater(0, self._publish_connectivity_event, True)
-
- self._rx_frames += 1
- self._consecutive_errors = 0
-
- except KeyError as e:
- # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
- self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
- rx_frame = self._decode_unknown_me(msg)
- self._rx_unknown_me += 1
- rx_tid = rx_frame.fields.get('transaction_id')
-
- except Exception as e:
- self.log.exception('frame-decode', msg=hexlify(msg), e=e)
- return
-
- finally:
- omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
-
- try:
- # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
- (ts, d, tx_frame, timeout, retry, dc) = self._requests.pop(rx_tid)
- if dc is not None and not dc.cancelled and not dc.called:
- self.log.debug("cancel timeout call")
- dc.cancel()
-
- ts_diff = now - arrow.Arrow.utcfromtimestamp(ts)
- secs = ts_diff.total_seconds()
- self._reply_sum += secs
-
- if secs < self._reply_min:
- self._reply_min = secs
-
- if secs > self._reply_max:
- self._reply_max = secs
-
- except KeyError as e:
- # Possible late Rx on a message that timed-out
- self._rx_unknown_tid += 1
- self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
- return
-
- except Exception as e:
- self.log.exception('frame-match', msg=hexlify(msg), e=e)
- if d is not None:
- return d.errback(failure.Failure(e))
- return
-
- reactor.callLater(0, self._process_rx_frame, timeout, secs, rx_frame, d, tx_frame)
+ except KeyError as e:
+ # Unknown, Unsupported, or vendor-specific ME. Key is the unknown classID
+ self.log.debug('frame-decode-key-error', msg=hexlify(msg), e=e)
+ rx_frame = self._decode_unknown_me(msg)
+ self._rx_unknown_me += 1
+ rx_tid = rx_frame.fields.get('transaction_id')
except Exception as e:
- self.log.exception('rx-msg', e=e)
+ self.log.exception('frame-decode', msg=hexlify(msg), e=e)
+ return
+
+ finally:
+ omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
+
+ try:
+ high_priority = self._tid_is_high_priority(rx_tid)
+ index = self._get_priority_index(high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ last_tx_tuple = self._tx_request[index]
+
+ if last_tx_tuple is None or \
+ last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
+ # Possible late Rx on a message that timed-out
+ self._rx_unknown_tid += 1
+ #self.log.warn('tx-message-missing', rx_id=rx_tid, msg=hexlify(msg))
+ #return
+
+ ts, d, tx_frame, timeout, retry, dc = last_tx_tuple
+ if dc is not None and not dc.cancelled and not dc.called:
+ dc.cancel()
+ # self.log.debug("cancel-timeout-called")
+
+ secs = self._update_rx_tx_stats(now, ts)
+
+ # Late arrival already serviced by a timeout?
+ if d.called:
+ self._rx_late += 1
+ return
+
+ except Exception as e:
+ self.log.exception('frame-match', msg=hexlify(msg), e=e)
+ if d is not None:
+ return d.errback(failure.Failure(e))
+ return
+
+ # Extended processing needed. Note 'data' field will be None on some error
+ # status returns
+ omci_msg = rx_frame.fields['omci_message']
+
+ if isinstance(omci_msg, OmciGetResponse) and \
+ omci_msg.fields.get('data') is not None and \
+ 'table_attribute_mask' in omci_msg.fields['data']:
+ # Yes, run in a separate generator
+ reactor.callLater(0, self._process_get_rx_frame, timeout, secs,
+ rx_frame, d, tx_frame, high_priority)
+ else:
+ # Publish Rx event to listeners in a different task
+ reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
+
+ # begin success callback chain (will cancel timeout and queue next Tx message)
+ from copy import copy
+ original_callbacks = copy(d.callbacks)
+ self._rx_response[index] = rx_frame
+ d.callback(rx_frame)
+
+ except Exception as e:
+ self.log.exception('rx-msg', e=e)
@inlineCallbacks
- def _process_rx_frame(self, timeout, secs, rx_frame, d, tx_frame):
+ def _process_get_rx_frame(self, timeout, secs, rx_frame, d, tx_frame, high_priority):
+ """
+ Special handling for Get Requests that may require additional 'get_next' operations
+ if a table attribute was requested.
+ """
omci_msg = rx_frame.fields['omci_message']
+ rx_tid = rx_frame.fields.get('transaction_id')
+ high_priority = self._tid_is_high_priority(rx_tid)
+ frame_index = self._get_priority_index(high_priority)
+
if isinstance(omci_msg, OmciGetResponse) and 'table_attribute_mask' in omci_msg.fields['data']:
+
+ # save tx request for later so that below send/recv can finish
+ self._tx_request_deferred[frame_index] = self._tx_request[frame_index]
+ self._tx_request[frame_index] = None
+
try:
entity_class = omci_msg.fields['entity_class']
entity_id = omci_msg.fields['entity_id']
table_attributes = omci_msg.fields['data']['table_attribute_mask']
- device = self._adapter_agent.get_device(self._device_id)
+ # Table attribute mask is encoded opposite of managed entity mask.
if entity_class in self._me_map:
ec = self._me_map[entity_class]
- for index in xrange(16):
+ for index in xrange(1, len(ec.attributes) + 1):
attr_mask = 1 << index
if attr_mask & table_attributes:
+ self.log.debug('omcc-get-table-ec', ec=ec, index=index, attr_mask=attr_mask,
+ table_attributes=table_attributes)
eca = ec.attributes[index]
self.log.debug('omcc-get-table-attribute', table_name=eca.field.name)
@@ -349,10 +459,14 @@
data_buffer = ''
count = omci_msg.fields['data'][eca.field.name + '_size']
+ if count > MAX_TABLE_ROW_COUNT:
+ self.log.error('omcc-get-table-huge', count=count, name=eca.field.name)
+ raise ValueError('Huge Table Size: {}'.format(count))
+
# Original timeout must be chopped up into each individual get-next request
# in order for total transaction to complete within the timeframe of the
# original get() timeout.
- number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
+ number_transactions = 1 + (count + OmciTableField.PDU_SIZE - 1) / OmciTableField.PDU_SIZE
timeout /= (1 + number_transactions)
# Start the loop
@@ -371,6 +485,7 @@
raise Exception('get-next-failure table=' + eca.field.name +
' entity_id=' + str(entity_id) +
' sqn=' + str(seq_no) + ' omci-status ' + str(status))
+
# Extract the data
num_octets = count - offset
if num_octets > OmciTableField.PDU_SIZE:
@@ -383,24 +498,38 @@
data_buffer, val = eca.field.getfield(None, data_buffer)
vals.append(val)
- omci_msg.fields['data'][eca.field.name] = vals;
+ omci_msg.fields['data'][eca.field.name] = vals
del omci_msg.fields['data'][eca.field.name + '_size']
self.log.debug('omcc-got-table-attribute-rows', table_name=eca.field.name,
- row_count=len(vals))
+ row_count=len(vals))
del omci_msg.fields['data']['table_attribute_mask']
except Exception as e:
self.log.exception('get-next-error', e=e)
- d.errback(failure.Failure(e))
+ self._tx_request_deferred[frame_index] = None
+ d.errback(failure.Failure(e), high_priority)
return
- # Notify sender of completed request
- reactor.callLater(0, d.callback, rx_frame)
+ except IndexError as e:
+ self.log.exception('get-next-index-error', e=e)
+ self._tx_request_deferred[frame_index] = None
+ d.errback(failure.Failure(e), high_priority)
+ return
- # Publish Rx event to listeners in a different task except for internally-consumed get-next-response
+ # Put it back so the outer Rx/Tx can finish
+ self._tx_request[frame_index] = self._tx_request_deferred[frame_index]
+ self._tx_request_deferred[frame_index] = None
+
+ # Publish Rx event to listeners in a different task
if not isinstance(omci_msg, OmciGetNextResponse):
reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
+ from copy import copy
+ original_callbacks = copy(d.callbacks)
+ self._rx_response[frame_index] = rx_frame
+ d.callback(rx_frame)
+ self.log.debug("finished-processing-get-rx-frame")
+
def _decode_unknown_me(self, msg):
"""
Decode an ME for an unsupported class ID. This should only occur for a subset
@@ -488,147 +617,237 @@
msg = {CONNECTED_KEY: connected}
self.event_bus.publish(topic=topic, msg=msg)
- def flush(self, max_age=0):
- limit = arrow.utcnow().float_timestamp - max_age
- old = [tid for tid, (ts, _, _, _, _, _) in self._requests.iteritems()
- if ts <= limit]
+ def flush(self):
+ """Flush/cancel in active or pending Tx requests"""
+ requests = []
- for tid in old:
- (_, d, _, _, _, dc) = self._requests.pop(tid)
+ for priority in {OMCI_CC.HIGH_PRIORITY, OMCI_CC.LOW_PRIORITY}:
+ next_frame, self._tx_request[priority] = self._tx_request[priority], None
+ if next_frame is not None:
+ requests.append((next_frame[OMCI_CC.REQUEST_DEFERRED], next_frame[OMCI_CC.REQUEST_DELAYED_CALL]))
+
+ requests += [(next_frame[OMCI_CC.PENDING_DEFERRED], None)
+ for next_frame in self._pending[priority]]
+ self._pending[priority] = list()
+
+ # Cancel them...
+ def cleanup_unhandled_error(_):
+ pass # So the cancel below does not flag an unhandled error
+
+ for d, dc in requests:
if d is not None and not d.called:
+ d.addErrback(cleanup_unhandled_error)
d.cancel()
if dc is not None and not dc.called and not dc.cancelled:
dc.cancel()
- def _get_tx_tid(self):
+ def _get_tx_tid(self, high_priority=False):
"""
Get the next Transaction ID for a tx. Note TID=0 is reserved
for autonomously generated messages from an ONU
:return: (int) TID
"""
- tx_tid, self._tx_tid = self._tx_tid, self._tx_tid + 1
- if self._tx_tid > MAX_OMCI_TX_ID:
- self._tx_tid = 1
+ if self._extended_messaging or not high_priority:
+ index = OMCI_CC.LOW_PRIORITY
+ min_tid = OMCI_CC.MIN_OMCI_TX_ID_LOW_PRIORITY
+ max_tid = OMCI_CC.MAX_OMCI_TX_ID_LOW_PRIORITY
+ else:
+ index = OMCI_CC.HIGH_PRIORITY
+ min_tid = OMCI_CC.MIN_OMCI_TX_ID_HIGH_PRIORITY
+ max_tid = OMCI_CC.MAX_OMCI_TX_ID_HIGH_PRIORITY
+
+ tx_tid, self._tx_tid[index] = self._tx_tid[index], self._tx_tid[index] + 1
+
+ if self._tx_tid[index] > max_tid:
+ self._tx_tid[index] = min_tid
return tx_tid
- def _request_failure(self, value, tx_tid):
+ def _request_failure(self, value, tx_tid, high_priority):
"""
- Handle a transmit failure and/or Rx timeout
+ Handle a transmit failure. Rx Timeouts are handled on the 'dc' deferred and
+ will call a different method that may retry if requested. This routine
+ will be called after the final (if any) timeout or other error
:param value: (Failure) Twisted failure
:param tx_tid: (int) Associated Tx TID
"""
- if tx_tid in self._requests:
- (_, _, _, timeout, retry, dc) = self._requests.pop(tx_tid)
- if dc is not None and not dc.called and not dc.cancelled:
- dc.cancel()
+ index = self._get_priority_index(high_priority)
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- self._rx_timeouts += 1
- self._consecutive_errors += 1
+ if self._tx_request[index] is not None:
+ tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
+ tx_frame_tid = tx_frame.fields['transaction_id']
- if self._consecutive_errors == 1:
- reactor.callLater(0, self._publish_connectivity_event, False)
+ if tx_frame_tid == tx_tid:
+ timeout = self._tx_request[index][OMCI_CC.REQUEST_TIMEOUT]
+ dc = self._tx_request[index][OMCI_CC.REQUEST_DELAYED_CALL]
+ self._tx_request[index] = None
- self.log.info('timeout', tx_id=tx_tid, timeout=timeout)
- value = failure.Failure(TimeoutError(timeout, "Deferred"))
+ if dc is not None and not dc.called and not dc.cancelled:
+ dc.cancel()
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ self._rx_timeouts += 1
+ self._consecutive_errors += 1
+ if self._consecutive_errors == 1:
+ reactor.callLater(0, self._publish_connectivity_event, False)
+
+ self.log.debug('timeout', tx_id=tx_tid, timeout=timeout)
+ value = failure.Failure(TimeoutError(timeout, "Deferred"))
+ else:
+ # Search pending queue. This may be a cancel coming in from the original
+ # task that requested the Tx. If found, remove
+ # from pending queue
+ for index, request in enumerate(self._pending[index]):
+ req = request.get(OMCI_CC.PENDING_DEFERRED)
+ if req is not None and req.fields['transaction_id'] == tx_tid:
+ self._pending[index].pop(index)
+ break
+
+ self._send_next_request(high_priority)
return value
- def _request_success(self, rx_frame):
+ def _request_success(self, rx_frame, high_priority):
"""
Handle transmit success (a matching Rx was received)
:param rx_frame: (OmciFrame) OMCI response frame with matching TID
:return: (OmciFrame) OMCI response frame with matching TID
"""
- # At this point, no additional processing is required
- # Continue with Rx Success callbacks.
+ index = self._get_priority_index(high_priority)
+
+ if rx_frame is None:
+ rx_frame = self._rx_response[index]
+
+ rx_tid = rx_frame.fields.get('transaction_id')
+
+ if rx_tid is not None:
+ if self._tx_request[index] is not None:
+ tx_frame = self._tx_request[index][OMCI_CC.REQUEST_FRAME]
+ tx_tid = tx_frame.fields['transaction_id']
+
+ if rx_tid == tx_tid:
+ # Remove this request. Next callback in chain initiates next Tx
+ self._tx_request[index] = None
+ else:
+ self._rx_late += 1
+ else:
+ self._rx_late += 1
+
+ self._send_next_request(high_priority)
+
+ # Return rx_frame (to next item in callback list)
return rx_frame
- def _request_timeout(self, tx_tid):
- self.log.debug("_request_timeout", tx_tid=tx_tid)
- if tx_tid in self._requests:
- req = self._requests[tx_tid] # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
- frame = req[2]
- timeout = req[3]
- retry = req[4]
- if retry > 0:
- retry -= 1
- self.send(frame, timeout, retry)
- else:
- d = req[1]
- d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
-
- def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0):
+ def _request_timeout(self, tx_tid, high_priority):
"""
- Send the OMCI Frame to the ONU via the proxy_channel
+ Tx Request timed out. Resend immediately if there retries is non-zero. A
+ separate deferred (dc) is used on each actual Tx which is not the deferred
+ (d) that is returned to the caller of the 'send()' method.
+
+ If the timeout if the transmitted frame was zero, this is just cleanup of
+ that transmit request and not necessarily a transmit timeout
+
+ :param tx_tid: (int) TID of frame
+ :param high_priority: (bool) True if high-priority queue
+ """
+ self.log.debug("_request_timeout", tx_tid=tx_tid)
+ index = self._get_priority_index(high_priority)
+
+ if self._tx_request[index] is not None:
+ # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
+ ts, d, frame, timeout, retry, _dc = self._tx_request[index]
+
+ if frame.fields.get('transaction_id', 0) == tx_tid:
+ self._tx_request[index] = None
+
+ if timeout > 0:
+ self._rx_timeouts += 1
+
+ if retry > 0:
+ # Push on front of TX pending queue so that it transmits next with the
+ # original TID
+ self._queue_frame(d, frame, timeout, retry - 1, high_priority, front=True)
+
+ elif not d.called:
+ d.errback(failure.Failure(TimeoutError(timeout, "Send OMCI TID -{}".format(tx_tid))))
+ else:
+ self.log.warn('timeout-but-not-the-tx-frame') # Statement mainly for debugging
+
+ self._send_next_request(high_priority)
+
+ def _queue_frame(self, d, frame, timeout, retry, high_priority, front=False):
+ index = self._get_priority_index(high_priority)
+ tx_tuple = (d, frame, timeout, retry) # Pending -> (deferred, tx_frame, timeout, retry)
+
+ if front:
+ self._pending[index].insert(0, tuple)
+ else:
+ self._pending[index].append(tx_tuple)
+
+ # Monitor queue stats
+ qlen = len(self._pending[index])
+
+ if high_priority:
+ if self._max_hp_tx_queue < qlen:
+ self._max_hp_tx_queue = qlen
+
+ elif self._max_lp_tx_queue < qlen:
+ self._max_lp_tx_queue = qlen
+
+ self.log.debug("queue-size", index=index, pending_qlen=qlen)
+
+ def send(self, frame, timeout=DEFAULT_OMCI_TIMEOUT, retry=0, high_priority=False):
+ """
+ Queue the OMCI Frame for a transmit to the ONU via the proxy_channel
:param frame: (OMCIFrame) Message to send
:param timeout: (int) Rx Timeout. 0=No response needed
+ :param retry: (int) Additional retry attempts on channel failure, default=0
+ :param high_priority: (bool) High Priority requests
:return: (deferred) A deferred that fires when the response frame is received
or if an error/timeout occurs
"""
- self.flush(max_age=MAX_OMCI_REQUEST_AGE)
-
- timeout = float(timeout)
- assert timeout <= float(MAX_OMCI_REQUEST_AGE), \
- 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
- assert isinstance(frame, OmciFrame), \
- "Invalid frame class '{}'".format(type(frame))
-
- if not self.enabled:
+ if not self.enabled or self._proxy_address is None:
# TODO custom exceptions throughout this code would be helpful
+ self._tx_errors += 1
return fail(result=failure.Failure(Exception('OMCI is not enabled')))
+ timeout = float(timeout)
+ if timeout > float(MAX_OMCI_REQUEST_AGE):
+ self._tx_errors += 1
+ msg = 'Maximum timeout is {} seconds'.format(MAX_OMCI_REQUEST_AGE)
+ return fail(result=failure.Failure(Exception(msg)))
+
+ if not isinstance(frame, OmciFrame):
+ self._tx_errors += 1
+ msg = "Invalid frame class '{}'".format(type(frame))
+ return fail(result=failure.Failure(Exception(msg)))
try:
+ index = self._get_priority_index(high_priority)
tx_tid = frame.fields['transaction_id']
+
if tx_tid is None:
- tx_tid = self._get_tx_tid()
+ tx_tid = self._get_tx_tid(high_priority=high_priority)
frame.fields['transaction_id'] = tx_tid
- assert tx_tid not in self._requests, 'TX TID {} is already exists'.format(tx_tid)
- assert tx_tid >= 0, 'Invalid Tx TID: {}'.format(tx_tid)
+ assert tx_tid not in self._pending[index], 'TX TID {} is already exists'.format(tx_tid)
+ assert tx_tid > 0, 'Invalid Tx TID: {}'.format(tx_tid)
- ts = arrow.utcnow().float_timestamp
+ # Queue it and request next Tx if tx channel is free
+ d = defer.Deferred()
- if tx_tid in self._requests:
- req = self._requests[tx_tid] # (0: timestamp, 1: defer, 2: frame, 3: timeout, 4: retry, 5: delayedCall)
- d = req[1]
- timeout = req[3]
- dc = req[5]
- if dc is not None and not dc.cancelled: # delayedCall returned from last send
- dc.cancel()
- else:
- req = None
- d = defer.Deferred()
+ self._queue_frame(d, frame, timeout, retry, high_priority, front=False)
+ self._send_next_request(high_priority)
- # NOTE: Since we may need to do an independent ME map on a per-ONU basis
- # save the current value of the entity_id_to_class_map, then
- # replace it with our custom one before decode, and then finally
- # restore it later. Tried other ways but really made the code messy.
-
- saved_me_map = omci_entities.entity_id_to_class_map
- omci_entities.entity_id_to_class_map = self._me_map
- try:
- self._adapter_agent.send_proxied_message(self._proxy_address,
- hexify(str(frame)))
- finally:
- omci_entities.entity_id_to_class_map = saved_me_map
-
- self._tx_frames += 1
-
- if timeout > 0:
- dc = self.reactor.callLater(timeout, self._request_timeout, tx_tid)
- self._requests[tx_tid] = (ts, d, frame, timeout, retry, dc)
- d.addCallbacks(self._request_success, self._request_failure, errbackArgs=(tx_tid,))
- else:
+ if timeout == 0:
self.log.debug("send-timeout-zero", tx_tid=tx_tid)
- self.reactor.callLater(0, d.callback, tx_tid) # no response needed to trigger the defer; just fire it.
+ self.reactor.callLater(0, d.callback, 'queued')
+
+ return d
except Exception as e:
self._tx_errors += 1
@@ -640,33 +859,135 @@
self.log.exception('send-omci', e=e)
return fail(result=failure.Failure(e))
- return d
+ def _ok_to_send(self, tx_request, high_priority):
+ """
+ G.988 specifies not to issue a MIB upload or a Software download request
+ when a similar action is in progress on the other channel. To keep the
+ logic here simple, a new upload/download will not be allowed if either a
+ upload/download is going on
+
+ :param tx_request (OmciFrame) Frame to send
+ :param high_priority: (bool) for queue selection
+ :return: True if okay to dequeue and send frame
+ """
+ other = self._get_priority_index(not high_priority)
+
+ if self._tx_request[other] is None:
+ return True
+
+ this_msg_type = tx_request.fields['message_type'] & 0x1f
+ not_allowed = {OP.MibUpload.value,
+ OP.MibUploadNext.value,
+ OP.StartSoftwareDownload.value,
+ OP.DownloadSection.value,
+ OP.EndSoftwareDownload.value}
+
+ if this_msg_type not in not_allowed:
+ return True
+
+ other_msg_type = self._tx_request[other][OMCI_CC.REQUEST_FRAME].fields['message_type'] & 0x1f
+ return other_msg_type not in not_allowed
+
+ def _send_next_request(self, high_priority):
+ """
+ Pull next tx request and send it
+
+ :param high_priority: (bool) True if this was a high priority request
+ :return: results, so callback chain continues if needed
+ """
+ index = self._get_priority_index(high_priority)
+
+ if self._tx_request[index] is None: # TODO or self._tx_request[index][OMCI_CC.REQUEST_DEFERRED].called:
+ d = None
+ try:
+ if len(self._pending[index]) and \
+ not self._ok_to_send(self._pending[index][0][OMCI_CC.PENDING_FRAME],
+ high_priority):
+ reactor.callLater(0.05, self._send_next_request, high_priority)
+ return
+
+ next_frame = self._pending[index].pop(0)
+
+ d = next_frame[OMCI_CC.PENDING_DEFERRED]
+ frame = next_frame[OMCI_CC.PENDING_FRAME]
+ timeout = next_frame[OMCI_CC.PENDING_TIMEOUT]
+ retry = next_frame[OMCI_CC.PENDING_RETRY]
+
+ tx_tid = frame.fields['transaction_id']
+
+ # NOTE: Since we may need to do an independent ME map on a per-ONU basis
+ # save the current value of the entity_id_to_class_map, then
+ # replace it with our custom one before decode, and then finally
+ # restore it later. Tried other ways but really made the code messy.
+ saved_me_map = omci_entities.entity_id_to_class_map
+ omci_entities.entity_id_to_class_map = self._me_map
+
+ ts = arrow.utcnow().float_timestamp
+ try:
+ self._rx_response[index] = None
+ self._adapter_agent.send_proxied_message(self._proxy_address,
+ hexify(str(frame)))
+ finally:
+ omci_entities.entity_id_to_class_map = saved_me_map
+
+ self._tx_frames += 1
+
+ # Note: the 'd' deferred in the queued request we just got will
+ # already have its success callback queued (callLater -> 0) with a
+ # result of "queued". Here we need time it out internally so
+ # we can call cleanup appropriately. G.988 mentions that most ONUs
+ # will process an request in < 1 second.
+ dc_timeout = timeout if timeout > 0 else 1.0
+
+ # Timeout on internal deferred to support internal retries if requested
+ dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
+
+ if timeout > 0:
+ d.addCallbacks(self._request_success, self._request_failure,
+ callbackArgs=(high_priority,),
+ errbackArgs=(tx_tid, high_priority))
+
+ except IndexError:
+ pass # Nothing pending in this queue
+
+ except Exception as e:
+ self.log.exception('send-proxy-exception', e=e)
+ self._tx_request[index] = None
+ self.reactor.callLater(0, self._send_next_request, high_priority)
+
+ if d is not None:
+ d.errback(failure.Failure(e))
+ else:
+ self.log.debug("tx-request-occupied", index=index)
###################################################################################
# MIB Action shortcuts
- def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_mib_reset(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
"""
Perform a MIB Reset
"""
self.log.debug('send-mib-reset')
frame = OntDataFrame().mib_reset()
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_mib_upload(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
self.log.debug('send-mib-upload')
frame = OntDataFrame().mib_upload()
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_mib_upload_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
self.log.debug('send-mib-upload-next')
frame = OntDataFrame(sequence_number=seq_no).mib_upload_next()
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_reboot(self, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
"""
Send an ONU Device reboot request (ONU-G ME).
@@ -675,25 +996,25 @@
self.log.debug('send-mib-reboot')
frame = OntGFrame().reboot()
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_get_all_alarm(self, alarm_retrieval_mode=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
self.log.debug('send_get_alarm')
frame = OntDataFrame().get_all_alarm(alarm_retrieval_mode)
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_get_all_alarm_next(self, seq_no, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
self.log.debug('send_get_alarm_next')
frame = OntDataFrame().get_all_alarm_next(seq_no)
- return self.send(frame, timeout)
+ return self.send(frame, timeout=timeout, high_priority=high_priority)
- def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_start_software_download(self, image_inst_id, image_size, window_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
frame = SoftwareImageFrame(image_inst_id).start_software_download(image_size, window_size-1)
- return self.send(frame, timeout, 3)
+ return self.send(frame, timeout, 3, high_priority=high_priority)
- def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0):
+ def send_download_section(self, image_inst_id, section_num, data, size=DEFAULT_OMCI_DOWNLOAD_SECTION_SIZE, timeout=0, high_priority=False):
"""
# timeout=0 indicates no repons needed
"""
@@ -702,7 +1023,7 @@
frame = SoftwareImageFrame(image_inst_id).download_section(True, section_num, data)
else:
frame = SoftwareImageFrame(image_inst_id).download_section(False, section_num, data)
- return self.send(frame, timeout)
+ return self.send(frame, timeout, high_priority=high_priority)
# if timeout > 0:
# self.reactor.callLater(0, self.sim_receive_download_section_resp,
@@ -710,17 +1031,17 @@
# frame.fields["omci_message"].fields["section_number"])
# return d
- def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_end_software_download(self, image_inst_id, crc32, image_size, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
frame = SoftwareImageFrame(image_inst_id).end_software_download(crc32, image_size)
- return self.send(frame, timeout)
+ return self.send(frame, timeout, high_priority=high_priority)
# self.reactor.callLater(0, self.sim_receive_end_software_download_resp, frame.fields["transaction_id"])
# return d
- def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_active_image(self, image_inst_id, flag=0, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
frame = SoftwareImageFrame(image_inst_id).activate_image(flag)
- return self.send(frame, timeout)
+ return self.send(frame, timeout, high_priority=high_priority)
- def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT):
+ def send_commit_image(self, image_inst_id, timeout=DEFAULT_OMCI_TIMEOUT, high_priority=False):
frame = SoftwareImageFrame(image_inst_id).commit_image()
- return self.send(frame, timeout)
+ return self.send(frame, timeout, high_priority=high_priority)
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index ca4e4f4..f6559a3 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -99,7 +99,7 @@
continue
try:
s, value = fld.getfield(pkt, s)
- except Exception, e:
+ except Exception, _e:
raise
if isinstance(pkt, OmciGetResponse) and isinstance(fld, OmciTableField):
data[fld.name + '_size'] = value
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index c955018..b47fbab 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -138,6 +138,7 @@
def database_class(self):
return self._mib_database_cls
+ # TODO: Need to deprecate this. ImageAgent is using it and should not
@property
def database(self):
return self._mib_db
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index 8b353c9..690df1c 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -19,7 +19,9 @@
from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from voltha.extensions.omci.omci_me import MEFrame
from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_cc import DEFAULT_OMCI_TIMEOUT
from voltha.extensions.omci.omci_messages import OmciGet
+from voltha.extensions.omci.omci_fields import OmciTableField
RC = ReasonCodes
OP = EntityOperations
@@ -148,10 +150,19 @@
"""
Perform the initial get request
"""
- self.log.debug('perform-get')
-
+ self.log.info('perform-get', entity_class=self._entity_class,
+ entity_id=self._entity_id, attributes=self._attributes)
try:
- frame = MEFrame(self._entity_class, self._entity_id, self._attributes).get()
+ # If one or more attributes is a table attribute, get it separately
+ def is_table_attr(attr):
+ index = self._entity_class.attribute_name_to_index_map[attr]
+ attr_def = self._entity_class.attributes[index]
+ return isinstance(attr_def.field, OmciTableField)
+
+ first_attributes = {attr for attr in self._attributes if not is_table_attr(attr)}
+ table_attributes = {attr for attr in self._attributes if is_table_attr(attr)}
+
+ frame = MEFrame(self._entity_class, self._entity_id, first_attributes).get()
self.strobe_watchdog()
results = yield self._device.omci_cc.send(frame)
@@ -167,11 +178,14 @@
missing_attr = frame.fields['omci_message'].fields['attributes_mask'] ^ \
results_omci['attributes_mask']
- if missing_attr > 0:
+ if missing_attr > 0 or len(table_attributes) > 0:
+ self.log.info('perform-get-missing', num_missing=missing_attr,
+ table_attr=table_attributes)
self.strobe_watchdog()
self._local_deferred = reactor.callLater(0,
self.perform_get_missing_attributes,
- missing_attr)
+ missing_attr,
+ table_attributes)
returnValue(self._local_deferred)
elif status == RC.AttributeFailure.value:
@@ -204,7 +218,7 @@
self.deferred.errback(failure.Failure(e))
@inlineCallbacks
- def perform_get_missing_attributes(self, missing_attr):
+ def perform_get_missing_attributes(self, missing_attr, table_attributes):
"""
This method is called when the original Get requests completes with success
but not all attributes were returned. This can happen if one or more of the
@@ -213,10 +227,12 @@
This routine iterates through the missing attributes and attempts to retrieve
the ones that were missing.
- :param missing_attr: (set) Missing attributes
+ :param missing_attr: (int) Missing attributes bitmask
+ :param table_attributes: (set) Attributes that need table get/get-next support
"""
- self.log.debug('perform-get-missing', attrs=missing_attr)
+ self.log.debug('perform-get-missing', attrs=missing_attr, tbl=table_attributes)
+ # Retrieve missing attributes first (if any)
results_omci = self._results.fields['omci_message'].fields
for index in xrange(16):
@@ -255,6 +271,39 @@
except Exception as e:
self.log.exception('missing-failure', e=e)
+ # Now any table attributes. OMCI_CC handles background get/get-next sequencing
+ for tbl_attr in table_attributes:
+ attr_mask = self._entity_class.mask_for(tbl_attr)
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._entity_class.class_id,
+ entity_id=self._entity_id,
+ attributes_mask=attr_mask
+ )
+ )
+ try:
+ timeout = 2 * DEFAULT_OMCI_TIMEOUT # Multiple frames expected
+ self.strobe_watchdog()
+ get_results = yield self._device.omci_cc.send(frame,
+ timeout=timeout)
+ self.strobe_watchdog()
+ get_omci = get_results.fields['omci_message'].fields
+ if get_omci['success_code'] != RC.Success.value:
+ continue
+
+ if results_omci.get('data') is None:
+ results_omci['data'] = dict()
+
+ results_omci['data'].update(get_omci['data'])
+
+ except TimeoutError:
+ self.log.debug('tbl-attr-timeout')
+
+ except Exception as e:
+ self.log.exception('tbl-attr-timeout', e=e)
+
self.deferred.callback(self)
@inlineCallbacks