| # |
| # Copyright 2017 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| import binascii |
| from pyvoltha.adapters.common.frameio.frameio import hexify |
| from twisted.python.failure import Failure |
| from unittest import TestCase, main, skip |
| from mock.mock_adapter_agent import MockAdapterAgent |
| from mock.mock_onu_handler import MockOnuHandler |
| from mock.mock_olt_handler import MockOltHandler |
| from mock.mock_onu import MockOnu |
| from pyvoltha.adapters.extensions.omci.omci_defs import * |
| from pyvoltha.adapters.extensions.omci.omci_frame import * |
| from pyvoltha.adapters.extensions.omci.omci_entities import * |
| from pyvoltha.adapters.extensions.omci.omci_me import ExtendedVlanTaggingOperationConfigurationDataFrame |
| from pyvoltha.adapters.extensions.omci.omci_cc import OMCI_CC, UNKNOWN_CLASS_ATTRIBUTE_KEY,\ |
| MAX_OMCI_REQUEST_AGE |
| |
| DEFAULT_OLT_DEVICE_ID = 'default_olt_mock' |
| DEFAULT_ONU_DEVICE_ID = 'default_onu_mock' |
| DEFAULT_PON_ID = 0 |
| DEFAULT_ONU_ID = 0 |
| DEFAULT_ONU_SN = 'TEST00000001' |
| |
| OP = EntityOperations |
| RC = ReasonCodes |
| |
| successful = False |
| error_reason = None |
| |
| |
| def chunk(indexable, chunk_size): |
| for i in range(0, len(indexable), chunk_size): |
| yield indexable[i:i + chunk_size] |
| |
| |
| def hex2raw(hex_string): |
| return ''.join(chr(int(byte, 16)) for byte in chunk(hex_string, 2)) |
| |
| |
| class TestOmciCc(TestCase): |
| """ |
| Test the Open OMCI Communication channels |
| |
| Note also added some testing of MockOnu behaviour since its behaviour during more |
| complicated unit/integration tests may be performed in the future. |
| """ |
| def setUp(self, let_msg_timeout=False): |
| self.adapter_agent = MockAdapterAgent() |
| |
| def tearDown(self): |
| if self.adapter_agent is not None: |
| self.adapter_agent.tearDown() |
| |
| def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID): |
| handler = MockOltHandler(self.adapter_agent, device_id) |
| self.adapter_agent.add_device(handler.device) |
| return handler |
| |
| def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID, |
| device_id=DEFAULT_ONU_DEVICE_ID, |
| pon_id=DEFAULT_PON_ID, |
| onu_id=DEFAULT_ONU_ID, |
| serial_no=DEFAULT_ONU_SN): |
| handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id) |
| handler.serial_number = serial_no |
| onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \ |
| if serial_no is not None else None |
| handler.onu_mock = onu |
| return handler |
| |
| def setup_one_of_each(self, timeout_messages=False): |
| # Most tests will use at lease one or more OLT and ONU |
| self.olt_handler = self.setup_mock_olt() |
| self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id) |
| self.onu_device = self.onu_handler.onu_mock |
| self.adapter_agent.timeout_the_message = timeout_messages |
| |
| self.adapter_agent.add_child_device(self.olt_handler.device, |
| self.onu_handler.device) |
| |
| def _is_omci_frame(self, results, omci_msg_type): |
| assert isinstance(results, OmciFrame), 'Not OMCI Frame' |
| assert 'omci_message' in results.fields, 'Not OMCI Frame' |
| if omci_msg_type is not None: |
| assert isinstance(results.fields['omci_message'], omci_msg_type) |
| return results |
| |
| def _check_status(self, results, value): |
| if value is not None: assert results is not None, 'unexpected emtpy message' |
| status = results.fields['omci_message'].fields['success_code'] |
| assert status == value,\ |
| 'Unexpected Status Code. Got {}, Expected: {}'.format(status, value) |
| return results |
| |
| def _check_mib_sync(self, results, value): |
| assert self.onu_device.mib_data_sync == value, \ |
| 'Unexpected MIB DATA Sync value. Got {}, Expected: {}'.format( |
| self.onu_device.mib_data_sync, value) |
| return results |
| |
| def _check_stats(self, results, _, stat, expected): |
| snapshot = self._snapshot_stats() |
| assert snapshot[stat] == expected, \ |
| 'Invalid statistic "{}". Got {}, Expected: {}'.format(stat, |
| snapshot[stat], |
| expected) |
| return results |
| |
| def _check_value_equal(self, results, name, value, expected): |
| assert value == expected, \ |
| 'Value "{}" not equal. Got {}, Expected: {}'.format(name, value, |
| expected) |
| return results |
| |
| def _default_errback(self, failure): |
| from twisted.internet.defer import TimeoutError |
| assert isinstance(failure.type, type(TimeoutError)) |
| |
| def _snapshot_stats(self): |
| omci_cc = self.onu_handler.omci_cc |
| return { |
| 'tx_frames': omci_cc.tx_frames, |
| 'rx_frames': omci_cc.rx_frames, |
| 'rx_unknown_tid': omci_cc.rx_unknown_tid, |
| 'rx_onu_frames': omci_cc.rx_onu_frames, |
| 'rx_onu_discards': omci_cc.rx_onu_discards, |
| 'rx_timeouts': omci_cc.rx_timeouts, |
| 'rx_unknown_me': omci_cc.rx_unknown_me, |
| 'rx_late': omci_cc.rx_late, |
| 'tx_errors': omci_cc.tx_errors, |
| 'consecutive_errors': omci_cc.consecutive_errors, |
| 'reply_min': omci_cc.reply_min, |
| 'reply_max': omci_cc.reply_max, |
| 'reply_average': omci_cc.reply_average, |
| 'hp_tx_queue_len': omci_cc.hp_tx_queue_len, |
| 'lp_tx_queue_len': omci_cc.lp_tx_queue_len, |
| 'max_hp_tx_queue': omci_cc.max_hp_tx_queue, |
| 'max_lp_tx_queue': omci_cc._max_lp_tx_queue, |
| } |
| |
| def test_default_init(self): |
| self.setup_one_of_each() |
| # Test default construction of OMCI_CC as well as |
| # various other parameter settings |
| omci_cc = self.onu_handler.omci_cc |
| |
| # No device directly associated |
| self.assertIsNotNone(omci_cc._adapter_agent) |
| self.assertIsNone(omci_cc._proxy_address) |
| |
| # No outstanding requests |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0) |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0) |
| |
| # No active requests |
| self.assertIsNone(omci_cc._tx_request[OMCI_CC.LOW_PRIORITY]) |
| self.assertIsNone(omci_cc._tx_request[OMCI_CC.HIGH_PRIORITY]) |
| |
| # Flags/properties |
| self.assertFalse(omci_cc.enabled) |
| |
| # Statistics |
| self.assertEqual(omci_cc.tx_frames, 0) |
| self.assertEqual(omci_cc.rx_frames, 0) |
| self.assertEqual(omci_cc.rx_unknown_tid, 0) |
| self.assertEqual(omci_cc.rx_onu_frames, 0) |
| self.assertEqual(omci_cc.rx_onu_discards, 0) |
| self.assertEqual(omci_cc.rx_unknown_me, 0) |
| self.assertEqual(omci_cc.rx_timeouts, 0) |
| self.assertEqual(omci_cc.rx_late, 0) |
| self.assertEqual(omci_cc.tx_errors, 0) |
| self.assertEqual(omci_cc.consecutive_errors, 0) |
| self.assertNotEquals(omci_cc.reply_min, 0.0) |
| self.assertEqual(omci_cc.reply_max, 0.0) |
| self.assertEqual(omci_cc.reply_average, 0.0) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0.0) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 0.0) |
| self.assertEqual(omci_cc._max_hp_tx_queue, 0.0) |
| self.assertEqual(omci_cc._max_lp_tx_queue, 0.0) |
| |
| def test_enable_disable(self): |
| self.setup_one_of_each() |
| |
| # Test enable property |
| omci_cc = self.onu_handler.omci_cc |
| |
| # Initially disabled |
| self.assertFalse(omci_cc.enabled) |
| omci_cc.enabled = False |
| self.assertFalse(omci_cc.enabled) |
| |
| omci_cc.enabled = True |
| self.assertTrue(omci_cc.enabled) |
| self.assertIsNotNone(omci_cc._proxy_address) |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0) |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0) |
| |
| omci_cc.enabled = True # Should be a NOP |
| self.assertTrue(omci_cc.enabled) |
| self.assertIsNotNone(omci_cc._proxy_address) |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.LOW_PRIORITY]), 0) |
| self.assertEqual(len(omci_cc._pending[OMCI_CC.HIGH_PRIORITY]), 0) |
| |
| omci_cc.enabled = False |
| self.assertFalse(omci_cc.enabled) |
| self.assertIsNone(omci_cc._proxy_address) |
| |
| def test_rx_discard_if_disabled(self): |
| # ME without a known decoder |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = False |
| snapshot = self._snapshot_stats() |
| |
| msg = '00fc2e0a00020000ff780000e00000010000000c' \ |
| '0000000000000000000000000000000000000000' \ |
| '00000028105a86ef' |
| |
| omci_cc.receive_message(hex2raw(msg)) |
| |
| # Note: No counter increments |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid']) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| |
| def test_message_send_get(self): |
| # Various tests of sending an OMCI message and it either |
| # getting a response or send catching some errors of |
| # importance |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # GET |
| # d = omci_cc.send() # TODO: Implement |
| # |
| # d.addCallbacks(self._is_omci_frame, self._default_errback) |
| # d.addCallback(self._check_status, RC.Success.value) |
| # d.addCallback(self._check_mib_sync, mib_data_sync) |
| # |
| # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| |
| # return d |
| |
| def test_message_send_set(self): |
| # Various tests of sending an OMCI message and it either |
| # getting a response or send catching some errors of |
| # importance |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # SET |
| # d = omci_cc.send() # TODO: Implement |
| # |
| # d.addCallbacks(self._is_omci_frame, self._default_errback) |
| # d.addCallback(self._check_status, RC.Success.value) |
| # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1) |
| # |
| # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| |
| # return d |
| # |
| # # Also test mib_data_sync rollover. 255 -> 1 (zero reserved) |
| # |
| # self.onu_device.mib_data_sync = 255 |
| # # SET |
| # self.assertTrue(True) # TODO: Implement (copy previous one here) |
| # self.assertEqual(1, self.onu_device.mib_data_sync) |
| |
| def test_message_send_create(self): |
| # Various tests of sending an OMCI message and it either |
| # getting a response or send catching some errors of |
| # importance |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # Create |
| # d = omci_cc.send() # TODO: Implement |
| # |
| # d.addCallbacks(self._is_omci_frame, self._default_errback) |
| # d.addCallback(self._check_status, RC.Success.value) |
| # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1) |
| # |
| # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| |
| # return d |
| |
| def test_message_send_delete(self): |
| # Various tests of sending an OMCI message and it either |
| # getting a response or send catching some errors of |
| # importance |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # Delete |
| # d = omci_cc.send() # TODO: Implement |
| # |
| # d.addCallbacks(self._is_omci_frame, self._default_errback) |
| # d.addCallback(self._check_status, RC.Success.value) |
| # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1) |
| # |
| # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| |
| # return d |
| |
| def test_message_send_mib_reset(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| self.onu_device.mib_data_sync = 10 |
| snapshot = self._snapshot_stats() |
| |
| # Successful MIB Reset |
| d = omci_cc.send_mib_reset(timeout=1.0) |
| |
| d.addCallbacks(self._is_omci_frame, self._default_errback) |
| d.addCallback(self._check_status, RC.Success) |
| d.addCallback(self._check_mib_sync, 0) |
| |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| return d |
| |
| def test_message_send_mib_upload(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # MIB Upload |
| d = omci_cc.send_mib_upload(timeout=1.0) |
| |
| d.addCallbacks(self._is_omci_frame, self._default_errback) |
| d.addCallback(self._check_status, RC.Success) |
| d.addCallback(self._check_mib_sync, mib_data_sync) |
| |
| # TODO: MIB Upload Results specific tests here |
| |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| return d |
| |
| def test_message_send_mib_upload_next(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| mib_data_sync = self.onu_device.mib_data_sync |
| |
| # # MIB Upload Next |
| # d = omci_cc.send_mib_upload_next(0, timeout=1.0) |
| # |
| # d.addCallbacks(self._is_omci_frame, self._default_errback) |
| # d.addCallback(self._check_status, RC.Success) |
| # d.addCallback(self._check_mib_sync, mib_data_sync) |
| # |
| # # TODO: MIB Upload Next Results specific tests here |
| # |
| # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| # return d |
| |
| def test_message_send_no_timeout(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| self.onu_device.mib_data_sync = 10 |
| snapshot = self._snapshot_stats() |
| |
| d = omci_cc.send_mib_reset(timeout=0) |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| return d |
| |
| def test_message_send_bad_timeout(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| self.onu_device.mib_data_sync = 10 |
| snapshot = self._snapshot_stats() |
| |
| d = omci_cc.send_mib_reset(timeout=MAX_OMCI_REQUEST_AGE + 1) |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'] + 1) |
| return d |
| |
| def test_message_send_not_a_frame(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| self.onu_device.mib_data_sync = 10 |
| snapshot = self._snapshot_stats() |
| |
| d = omci_cc.send('hello world', timeout=1) |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'] + 1) |
| return d |
| |
| def test_message_send_reboot(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # ONU Reboot |
| d = omci_cc.send_reboot(timeout=1.0) |
| |
| d.addCallbacks(self._is_omci_frame, self._default_errback) |
| d.addCallback(self._check_status, RC.Success) |
| |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts']) |
| d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| return d |
| |
| def test_message_send_with_omci_disabled(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| self.assertFalse(omci_cc.enabled) |
| |
| # Successful MIB Reset |
| d = omci_cc.send_mib_reset(timeout=1.0) |
| |
| def success_is_bad(_results): |
| assert False, 'This test should throw a failure/error' |
| |
| def fail_fast(_failure): |
| pass |
| return None |
| |
| d.addCallbacks(success_is_bad, fail_fast) |
| return d |
| |
| def test_message_send_get_with_latency(self): |
| # Various tests of sending an OMCI message and it either |
| # getting a response or send catching some errors of |
| # importance |
| self.setup_one_of_each() |
| self.olt_handler.latency = 0.500 # 1/2 second |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| |
| # Successful MIB Reset |
| d = omci_cc.send_mib_reset(timeout=1.0) |
| |
| d.addCallbacks(self._is_omci_frame, self._default_errback) |
| d.addCallback(self._check_status, RC.Success) |
| |
| def check_latency_values(_): |
| self.assertGreaterEqual(omci_cc.reply_min, self.olt_handler.latency) |
| self.assertGreaterEqual(omci_cc.reply_max, self.olt_handler.latency) |
| self.assertGreaterEqual(omci_cc.reply_average, self.olt_handler.latency) |
| |
| d.addCallback(check_latency_values) |
| return d |
| |
| def test_message_failures(self): |
| # Various tests of sending an OMCI message and it fails |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| self.assertEqual(omci_cc.tx_frames, 0) |
| self.assertEqual(omci_cc.rx_frames, 0) |
| self.assertEqual(omci_cc.rx_unknown_tid, 0) |
| self.assertEqual(omci_cc.rx_timeouts, 0) |
| self.assertEqual(omci_cc.rx_late, 0) |
| self.assertEqual(omci_cc.tx_errors, 0) |
| |
| # # Class ID not found |
| # d = omci_cc.send_mib_reset(timeout=1.0) |
| # self.assertTrue(True) # TODO: Implement |
| # todo: Test non-zero consecutive errors |
| # |
| # # Instance ID not found |
| # d = omci_cc.send_mib_reset(timeout=1.0) |
| # self.assertTrue(True) # TODO: Implement |
| # todo: Test non-zero consecutive errors |
| # |
| # # PON is disabled |
| # d = omci_cc.send_mib_reset(timeout=1.0) |
| # self.assertTrue(True) # TODO: Implement |
| # todo: Test non-zero consecutive errors |
| # |
| # # ONU is disabled |
| # d = omci_cc.send_mib_reset(timeout=1.0) |
| # self.assertTrue(True) # TODO: Implement |
| # todo: Test non-zero consecutive errors |
| # |
| # # ONU is not activated |
| # d = omci_cc.send_mib_reset(timeout=1.0) |
| # self.assertTrue(True) # TODO: Implement |
| # todo: Test non-zero consecutive errors |
| |
| # TODO: make OLT send back an unknown TID ( |
| |
| # todo: Test non-zero consecutive errors |
| # todo: Send a good frame |
| # todo: Test zero consecutive errors |
| # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors) |
| |
| def test_rx_unknown_me(self): |
| # ME without a known decoder |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # This is the ID ------+ |
| # v |
| msg = '00fc2e0a00020000ff780000e00000010000000c' \ |
| '0000000000000000000000000000000000000000' \ |
| '00000028' |
| |
| omci_cc.receive_message(hex2raw(msg)) |
| |
| # Note: After successful frame decode, a lookup of the corresponding request by |
| # TID is performed. None should be found, so we should see the Rx Unknown TID |
| # increment. |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'] + 1) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| self.assertEqual(omci_cc.consecutive_errors, 0) |
| |
| def test_rx_decode_unknown_me(self): |
| # ME without a known decoder |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # This is a MIB Upload Next Response. Where we would probably first see an |
| # unknown Class ID |
| # |
| # This is the ID ------+ |
| # v |
| msg = '00fc2e0a00020000ff780001e000' |
| blob = '00010000000c0000000000000000000000000000000000000000' |
| msg += blob + '00000028' |
| |
| # Dig into the internal method so we can get the returned frame |
| frame = omci_cc._decode_unknown_me(hex2raw(msg)) |
| |
| self.assertEqual(frame.fields['transaction_id'], 0x00fc) |
| self.assertEqual(frame.fields['message_type'], 0x2e) |
| |
| omci_fields = frame.fields['omci_message'].fields |
| |
| self.assertEqual(omci_fields['entity_class'], 0x0002) |
| self.assertEqual(omci_fields['entity_id'], 0x00) |
| self.assertEqual(omci_fields['object_entity_class'], 0x0ff78) |
| self.assertEqual(omci_fields['object_entity_id'], 0x01) |
| self.assertEqual(omci_fields['object_attributes_mask'], 0xe000) |
| |
| data_fields = omci_fields['object_data'] |
| |
| decoded_blob = data_fields.get(UNKNOWN_CLASS_ATTRIBUTE_KEY) |
| self.assertIsNotNone(decoded_blob) |
| self.assertEqual(decoded_blob, blob) |
| |
| def test_flush(self): |
| # Test flush of autonomous ONU queues |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # TODO: add more |
| self.assertTrue(True) # TODO: Implement |
| |
| def test_avc_rx(self): |
| # Test flush of autonomous ONU queues |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # TODO: add more |
| self.assertTrue(True) # TODO: Implement |
| |
| def test_rx_discard_if_disabled(self): |
| # ME without a known decoder |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = False |
| snapshot = self._snapshot_stats() |
| |
| msg = '00fc2e0a00020000ff780000e00000010000000c' \ |
| '0000000000000000000000000000000000000000' \ |
| '00000028105a86ef' |
| |
| omci_cc.receive_message(hex2raw(msg)) |
| |
| # Note: No counter increments |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid']) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| |
| def test_omci_alarm_decode(self): |
| """ |
| This test covers an issue discovered in Sept 2018 (JIRA-1213). It was |
| an exception during frame decode. |
| """ |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # Frame from the JIRA issue |
| msg = '0000100a000b0102800000000000000000000000' \ |
| '0000000000000000000000000000000000000015' \ |
| '000000282d3ae0a6' |
| |
| _results = omci_cc.receive_message(hex2raw(msg)) |
| |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid']) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1) |
| self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards']) |
| |
| def test_omci_avc_decode(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # Frame from the JIRA issue |
| msg = '0000110a0007000080004d4c2d33363236000000' \ |
| '0000000020202020202020202020202020202020' \ |
| '00000028' |
| |
| _results = omci_cc.receive_message(hex2raw(msg)) |
| |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid']) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1) |
| self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards']) |
| |
| def test_omci_unknown_onu_decode(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # Frame from the JIRA issue |
| msg = '0000190a0007000080004d4c2d33363236000000' \ |
| '0000000020202020202020202020202020202020' \ |
| '00000028' |
| |
| _results = omci_cc.receive_message(hex2raw(msg)) |
| |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames']) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid']) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'] + 1) |
| self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards'] + 1) |
| |
| def test_omci_bad_frame_decode(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # Frame from the JIRA issue |
| msg = '0020190a0007000080004d4c2d33363236000000' \ |
| '0000000000000028' |
| |
| _results = omci_cc.receive_message(hex2raw(msg)) |
| # NOTE: Currently do not increment any Rx Discard counters, just throw it away |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'] + 1) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| self.assertEqual(omci_cc.rx_onu_discards, snapshot['rx_onu_discards']) |
| |
| def test_rx_decode_onu_g(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| msg = '001e2e0a0002000001000000e000424657530000' \ |
| '0000000000000000000000324246575300107496' \ |
| '00000028e7fb4a91' |
| |
| omci_cc.receive_message(hex2raw(msg)) |
| |
| # Note: No counter increments |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'] + 1) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| |
| def test_rx_decode_extvlantagging(self): |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| msg = '030a290a00ab0201000d00000000001031323334' \ |
| '3536373839303132333435363738393031323334' \ |
| '000000281166d283' |
| |
| omci_cc.receive_message(hex2raw(msg)) |
| |
| self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'] + 1) |
| self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me']) |
| self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1) |
| self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames']) |
| |
| def _check_vlan_tag_op(self, results, attr, expected): |
| omci_msg = results.fields['omci_message'] |
| data = omci_msg.fields['data'] |
| val = data[attr] |
| self.assertEqual(expected, val) |
| return results |
| |
| @skip('for unknown omci failure') |
| #@deferred() |
| def test_rx_table_get_extvlantagging(self): |
| self.setup_one_of_each() |
| |
| onu = self.onu_handler.onu_mock |
| entity_id = 1 |
| vlan_tag_op1 = VlanTaggingOperation( |
| filter_outer_priority=15, |
| filter_outer_vid=4096, |
| filter_outer_tpid_de=2, |
| filter_inner_priority=15, |
| filter_inner_vid=4096, |
| filter_inner_tpid_de=0, |
| filter_ether_type=0, |
| treatment_tags_to_remove=0, |
| treatment_outer_priority=15, |
| treatment_outer_vid=1234, |
| treatment_outer_tpid_de=0, |
| treatment_inner_priority=0, |
| treatment_inner_vid=4091, |
| treatment_inner_tpid_de=4, |
| ) |
| vlan_tag_op2 = VlanTaggingOperation( |
| filter_outer_priority=14, |
| filter_outer_vid=1234, |
| filter_outer_tpid_de=5, |
| filter_inner_priority=1, |
| filter_inner_vid=2345, |
| filter_inner_tpid_de=1, |
| filter_ether_type=0, |
| treatment_tags_to_remove=1, |
| treatment_outer_priority=15, |
| treatment_outer_vid=2222, |
| treatment_outer_tpid_de=1, |
| treatment_inner_priority=1, |
| treatment_inner_vid=3333, |
| treatment_inner_tpid_de=5, |
| ) |
| vlan_tag_op3 = VlanTaggingOperation( |
| filter_outer_priority=13, |
| filter_outer_vid=55, |
| filter_outer_tpid_de=1, |
| filter_inner_priority=7, |
| filter_inner_vid=4567, |
| filter_inner_tpid_de=1, |
| filter_ether_type=0, |
| treatment_tags_to_remove=1, |
| treatment_outer_priority=2, |
| treatment_outer_vid=1111, |
| treatment_outer_tpid_de=1, |
| treatment_inner_priority=1, |
| treatment_inner_vid=3131, |
| treatment_inner_tpid_de=5, |
| ) |
| tbl = [vlan_tag_op1, vlan_tag_op2, vlan_tag_op3] |
| tblstr = str(vlan_tag_op1) + str(vlan_tag_op2) + str(vlan_tag_op3) |
| |
| onu._omci_response[OP.Get.value][ExtendedVlanTaggingOperationConfigurationData.class_id] = { |
| entity_id: OmciFrame(transaction_id=0, |
| message_type=OmciGetResponse.message_id, |
| omci_message=OmciGetResponse( |
| entity_class=ExtendedVlanTaggingOperationConfigurationData.class_id, |
| entity_id=1, |
| success_code=RC.Success.value, |
| attributes_mask=ExtendedVlanTaggingOperationConfigurationData.mask_for( |
| 'received_frame_vlan_tagging_operation_table'), |
| data={'received_frame_vlan_tagging_operation_table': 16 * len(tbl)} |
| )) |
| } |
| |
| rsp1 = binascii.a2b_hex(hexify(tblstr[0:OmciTableField.PDU_SIZE])) |
| rsp2 = binascii.a2b_hex(hexify(tblstr[OmciTableField.PDU_SIZE:])) |
| onu._omci_response[OP.GetNext.value][ExtendedVlanTaggingOperationConfigurationData.class_id] = { |
| entity_id: {0: {'failures':2, |
| 'frame':OmciFrame(transaction_id=0, |
| message_type=OmciGetNextResponse.message_id, |
| omci_message=OmciGetNextResponse( |
| entity_class=ExtendedVlanTaggingOperationConfigurationData.class_id, |
| entity_id=1, |
| success_code=RC.Success.value, |
| attributes_mask=ExtendedVlanTaggingOperationConfigurationData.mask_for( |
| 'received_frame_vlan_tagging_operation_table'), |
| data={'received_frame_vlan_tagging_operation_table': rsp1 |
| } |
| ))}, |
| 1: OmciFrame(transaction_id=0, |
| message_type=OmciGetNextResponse.message_id, |
| omci_message=OmciGetNextResponse( |
| entity_class=ExtendedVlanTaggingOperationConfigurationData.class_id, |
| entity_id=1, |
| success_code=RC.Success.value, |
| attributes_mask=ExtendedVlanTaggingOperationConfigurationData.mask_for( |
| 'received_frame_vlan_tagging_operation_table'), |
| data={'received_frame_vlan_tagging_operation_table': rsp2 |
| } |
| )) |
| } |
| } |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| |
| msg = ExtendedVlanTaggingOperationConfigurationDataFrame( |
| entity_id, |
| attributes={'received_frame_vlan_tagging_operation_table':True} |
| ) |
| |
| snapshot = self._snapshot_stats() |
| |
| frame = msg.get() |
| d = omci_cc.send(frame, timeout=5.0) |
| |
| d.addCallbacks(self._is_omci_frame, self._default_errback, [OmciGetResponse]) |
| d.addCallback(self._check_status, RC.Success) |
| |
| d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 5) |
| d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 3) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames']) |
| d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards']) |
| d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me']) |
| d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'] + 2) |
| d.addCallback(self._check_stats, snapshot, 'rx_late', snapshot['rx_late']) |
| d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors']) |
| d.addCallback(self._check_stats, snapshot, 'consecutive_errors', 0) |
| d.addCallback(self._check_vlan_tag_op, 'received_frame_vlan_tagging_operation_table', tbl) |
| |
| return d |
| |
| ################################################################## |
| # Start of tests specific to new stop_and_wait changes |
| # |
| def test_message_send_low_priority(self): |
| # self.setup_one_of_each(timeout_messages=True) |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # MIB Upload |
| d = omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| d.addCallback(self._check_stats, snapshot, 'lp_tx_queue_len', snapshot['lp_tx_queue_len']) |
| d.addCallback(self._check_stats, snapshot, 'hp_tx_queue_len', snapshot['hp_tx_queue_len']) |
| d.addCallback(self._check_stats, snapshot, 'max_lp_tx_queue', snapshot['max_lp_tx_queue'] + 1) |
| d.addCallback(self._check_stats, snapshot, 'max_hp_tx_queue', snapshot['max_hp_tx_queue']) |
| |
| # Flush to get ready for next test (one frame queued) |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| self.adapter_agent.timeout_the_message = True |
| omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| |
| self.assertEqual(omci_cc.lp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.max_lp_tx_queue, 1) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 0) |
| |
| # Flush to get ready for next test (two queued and new max) |
| omci_cc.flush() |
| omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| omci_cc.send_mib_upload(timeout=1.0, high_priority=False) |
| |
| self.assertEqual(omci_cc.lp_tx_queue_len, 2) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.max_lp_tx_queue, 2) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 0) |
| |
| def test_message_send_high_priority(self): |
| # self.setup_one_of_each(timeout_messages=True) |
| self.setup_one_of_each() |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| snapshot = self._snapshot_stats() |
| |
| # MIB Upload |
| d = omci_cc.send_mib_upload(high_priority=True) |
| d.addCallback(self._check_stats, snapshot, 'lp_tx_queue_len', snapshot['lp_tx_queue_len']) |
| d.addCallback(self._check_stats, snapshot, 'hp_tx_queue_len', snapshot['hp_tx_queue_len']) |
| d.addCallback(self._check_stats, snapshot, 'max_lp_tx_queue', snapshot['max_lp_tx_queue']) |
| d.addCallback(self._check_stats, snapshot, 'max_hp_tx_queue', snapshot['max_hp_tx_queue'] + 1) |
| |
| # Flush to get ready for next test (one frame queued) |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| self.adapter_agent.timeout_the_message = True |
| omci_cc.send_mib_upload(high_priority=True) |
| omci_cc.send_mib_upload(high_priority=True) |
| |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.max_lp_tx_queue, 0) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 1) |
| |
| # Flush to get ready for next test (two queued and new max) |
| omci_cc.flush() |
| omci_cc.send_mib_upload(high_priority=True) |
| omci_cc.send_mib_upload(high_priority=True) |
| omci_cc.send_mib_upload(high_priority=True) |
| |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 2) |
| self.assertEqual(omci_cc.max_lp_tx_queue, 0) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 2) |
| |
| def test_message_send_and_cancel(self): |
| global error_reason |
| global successful |
| # Do not send messages to adapter_agent |
| self.setup_one_of_each(timeout_messages=True) |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| |
| def success(_results): |
| global successful |
| successful = True |
| |
| def failure(reason): |
| global error_reason |
| error_reason = reason |
| |
| def notCalled(reason): |
| assert isinstance(reason, Failure), 'Should not be called with success' |
| |
| # Cancel one that is actively being sent |
| d = omci_cc.send_mib_upload(high_priority=False) |
| d.addCallbacks(success, failure) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| d.cancel() |
| self.assertIsInstance(error_reason, Failure) |
| self.assertFalse(successful) |
| self.assertTrue(d.called) |
| |
| self.assertEqual(omci_cc.max_lp_tx_queue, 1) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 0) |
| |
| # Flush to get ready for next test (one running, one queued, cancel the |
| # running one, so queued runs) |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| error_reason = None |
| d1 = omci_cc.send_mib_upload(high_priority=False) |
| d2 = omci_cc.send_mib_upload(high_priority=False) |
| d1.addCallbacks(success, failure) |
| d2.addCallbacks(notCalled, notCalled) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| d1.cancel() |
| self.assertIsInstance(error_reason, Failure) |
| self.assertFalse(successful) |
| self.assertTrue(d1.called) |
| self.assertFalse(d2.called) |
| |
| self.assertEqual(omci_cc.max_lp_tx_queue, 1) |
| self.assertEqual(omci_cc.max_hp_tx_queue, 0) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| # Flush to get ready for next test (one running, one queued, cancel the queued one) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| error_reason = None |
| d3 = omci_cc.send_mib_upload(timeout=55, high_priority=False) |
| d4 = omci_cc.send_mib_upload(timeout=55, high_priority=False) |
| d5 = omci_cc.send_mib_upload(timeout=55, high_priority=False) |
| d3.addCallbacks(notCalled, notCalled) |
| d4.addCallbacks(success, failure) |
| d5.addCallbacks(notCalled, notCalled) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 2) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| d4.cancel() |
| self.assertIsInstance(error_reason, Failure) |
| self.assertFalse(successful) |
| self.assertFalse(d3.called) |
| self.assertTrue(d4.called) |
| self.assertFalse(d5.called) |
| |
| def test_message_send_low_and_high_priority(self): |
| self.setup_one_of_each(timeout_messages=True) |
| |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| |
| omci_cc.send_mib_reset(high_priority=False) |
| omci_cc.send_mib_reset(high_priority=True) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| omci_cc.send_mib_reset(high_priority=False) |
| omci_cc.send_mib_reset(high_priority=True) |
| omci_cc.send_mib_reset(high_priority=False) |
| omci_cc.send_mib_reset(high_priority=True) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 1) |
| |
| def test_no_sw_download_and_mib_upload_at_same_time(self): |
| # Section B.2.3 of ITU G.988-2017 specifies that a MIB |
| # upload or software download at a given priority level |
| # is not allowed while a similar action in the other |
| # priority level is in progress. Relates to possible memory |
| # consumption/needs on the ONU. |
| # |
| # OMCI_CC only checks if the commands are currently in |
| # progress. ONU should reject messages if the upload/download |
| # is in progress (but not an active request is in progress). |
| |
| self.setup_one_of_each(timeout_messages=True) |
| omci_cc = self.onu_handler.omci_cc |
| omci_cc.enabled = True |
| |
| mib_upload_msgs = [omci_cc.send_mib_upload, |
| # omci_cc.send_mib_upload_next |
| ] |
| sw_download_msgs = [omci_cc.send_start_software_download, |
| # omci_cc.send_download_section, |
| # omci_cc.send_end_software_download |
| ] |
| |
| for upload in mib_upload_msgs: |
| for download in sw_download_msgs: |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| upload(high_priority=False) |
| download(1, 1, 1, high_priority=True) # Should stall send-next 50mS |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 1) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| upload(high_priority=True) |
| download(1, 1, 1, high_priority=False) # Should stall send-next 50mS |
| self.assertEqual(omci_cc.lp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| download(1, 1, 1, high_priority=False) |
| upload(high_priority=True) # Should stall send-next 50mS |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 1) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| download(1, 1, 1, high_priority=True) |
| upload(high_priority=False) # Should stall send-next 50mS) |
| self.assertEqual(omci_cc.lp_tx_queue_len, 1) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| omci_cc.flush() |
| self.assertEqual(omci_cc.lp_tx_queue_len, 0) |
| self.assertEqual(omci_cc.hp_tx_queue_len, 0) |
| |
| # Some more ideas for tests that we could add |
| # Send explicit tid that is not valid |
| # - Look at top of 'Send' method and test all the error conditions could may hit |
| |
| # Send multiple and have the OLT proxy throw an exception. Should call errback and |
| # schedule remainder in queue to still tx. |
| |
| # Send a frame and then inject a response and test the RX logic out, including late |
| # rx and retries by the OMCI_CC transmitter. |
| |
| |
| if __name__ == '__main__': |
| main() |