VOL-635: Initial unit tests for OpenOMCI
Change-Id: Iecc0f08ba89dd46736d8e5d2aff177566f60e41b
diff --git a/tests/utests/voltha/extensions/omci/mock/__init__.py b/tests/utests/voltha/extensions/omci/mock/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/__init__.py
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
new file mode 100644
index 0000000..cfc1ab4
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/mock_adapter_agent.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+class MockProxyAddress(object):
+ def __init__(self, device_id, pon_id, onu_id):
+ self.device_id = device_id # Device ID of proxy (OLT)
+ self.onu_id = onu_id
+ self.onu_session_id = onu_id
+
+ self.channel_group_id = pon_id # close enough for mock
+ self.channel_id = pon_id
+ self.channel_termination = pon_id
+
+
+class MockDevice(object):
+ def __init__(self, device_id, proxy_address=None, serial_number=None):
+ self.id = device_id
+ self.parent_id = None
+ self.proxy_address = proxy_address
+ self.serial_number = serial_number
+
+
+class MockAdapterAgent(object):
+ """
+ Minimal class to handle adapter-agent needs in OpenOMCI. It can be
+ used by a mock OLT or ONU.
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self):
+ self._devices = dict() # device-id -> mock device
+ pass
+
+ def tearDown(self):
+ """Test case cleanup"""
+ for device in self._devices.itervalues():
+ device.tearDown()
+ self._devices.clear()
+
+ def add_device(self, device):
+ self._devices[device.id] = device
+
+ def add_child_device(self, parent_device, child_device):
+ # Set parent
+ child_device.parent_id = parent_device.id
+
+ # Add ONU serial number if PON and ONU enabled
+
+ if (child_device.enabled and
+ child_device.serial_number is not None and
+ child_device.proxy_address.channel_id in parent_device.enabled_pons):
+ parent_device.activated_onus.add(child_device.serial_number)
+
+ self.add_device(child_device)
+
+ def get_device(self, device_id):
+ return self._devices[device_id]
+
+ def get_child_device(self, parent_device_id, **kwargs):
+ onu_id = kwargs.pop('onu_id', None)
+ pon_id = kwargs.pop('pon_id', None)
+ if onu_id is None and pon_id is None:
+ return None
+
+ # Get all child devices with the same parent ID
+ children_ids = set(d.id for d in self._devices.itervalues()
+ if d.parent_id == parent_device_id)
+
+ # Loop through all the child devices with this parent ID
+ for child_id in children_ids:
+ device = self.get_device(child_id)
+
+ # Does this child device match the passed in ONU ID?
+ found_onu_id = False
+ if onu_id is not None:
+ if device.proxy_address.onu_id == onu_id:
+ found_onu_id = True
+
+ # Does this child device match the passed in SERIAL NUMBER?
+ found_pon_id = False
+ if pon_id is not None:
+ if device.proxy_address.channel_id == pon_id:
+ found_pon_id = True
+ # Match ONU ID and PON ID
+ if onu_id is not None and pon_id is not None:
+ found = found_onu_id & found_pon_id
+ # Otherwise ONU ID or PON ID
+ else:
+ found = found_onu_id | found_pon_id
+
+ # Return the matched child device
+ if found:
+ return device
+
+ return None
+
+ def send_proxied_message(self, proxy_address, msg):
+ # Look up ONU handler and forward the message
+
+ olt_handler = self.get_device(proxy_address.device_id)
+
+ if olt_handler is not None:
+ olt_handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ # Look up ONU handler and forward the message
+
+ onu_handler = self.get_child_device(proxy_address.device_id,
+ onu_id=proxy_address.onu_id,
+ pon_id=proxy_address.channel_id)
+ if onu_handler is not None:
+ onu_handler.receive_proxied_message(proxy_address, msg)
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_olt_handler.py b/tests/utests/voltha/extensions/omci/mock/mock_olt_handler.py
new file mode 100644
index 0000000..f5075b1
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/mock_olt_handler.py
@@ -0,0 +1,107 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from mock_adapter_agent import MockDevice
+from nose.twistedtools import reactor
+
+
+class MockOltHandler(MockDevice):
+ """
+ VERY Minimal class to handle OLT needs in OpenOMCI testing
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self, adapter_agent, device_id):
+ super(MockOltHandler, self).__init__(device_id)
+
+ self.device_id = device_id
+ self.device = self
+ self._adapter_agent = adapter_agent
+ self._num_tx = 0
+
+ ####################################################################
+ # NOTE: The following can be manipulated in your test case to modify the behaviour
+ # of this mock.
+ #
+ # Note that activated ONUs are added during adapter add_child_device
+ # if the ONU handler associated is 'enabled'
+
+ self.enabled = True # OLT is enabled/active
+ self.activated_onus = set() # Activated ONU serial numbers
+ self.enabled_pons = range(0, 16) # Enabled PONs
+ self.max_tx = sys.maxint # Fail after this many tx requests
+ self.latency = 0.0 # OMCI response latency (keep small)
+
+ # TODO: Implement minimal functionality
+
+ # TODO: Implement minimal functionality
+
+ def tearDown(self):
+ """Test case cleanup"""
+ pass
+
+ # Begin minimal set of needed IAdapter interfaces
+
+ def send_proxied_message(self, proxy_address, msg):
+ """Check various enabled flags and status and send if okay"""
+
+ if not self.enabled:
+ return None
+
+ pon_id = proxy_address.channel_id
+
+ if pon_id not in self.enabled_pons:
+ return None
+
+ # Look up ONU device ID.
+ onu_id = proxy_address.onu_id
+ onu_handler = self._adapter_agent.get_child_device(proxy_address.device_id,
+ pon_id=pon_id,
+ onu_id=onu_id)
+
+ if onu_handler is None or not onu_handler.enabled:
+ return None
+
+ onu_mock = onu_handler.onu_mock
+ if onu_mock is None or onu_mock.serial_number not in self.activated_onus:
+ return None
+
+ # And Tx success (silent discard for OMCI timeout testing)
+ if self._num_tx >= self.max_tx:
+ return None
+ self._num_tx += 1
+
+ response = onu_mock.rx_omci_frame(msg)
+
+ # Make async and add any requested latency. Bound it to less
+ # than 5 seconds since this is a unit test that need to be
+ # somewhat responsive
+
+ assert 0.0 <= self.latency <= 5, 'Best practice is latency <= 5 seconds'
+ reactor.callLater(self.latency, self._deliver_proxy_message, proxy_address, response)
+
+ def _deliver_proxy_message(self, proxy_address, response):
+ from common.frameio.frameio import hexify
+ self._adapter_agent.receive_proxied_message(proxy_address,
+ hexify(str(response)))
+
+ def receive_proxied_message(self, _, __):
+ assert False, 'This is never called on the OLT side of proxy messaging'
+
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_onu.py b/tests/utests/voltha/extensions/omci/mock/mock_onu.py
new file mode 100644
index 0000000..b662d49
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/mock_onu.py
@@ -0,0 +1,258 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_defs import *
+from voltha.extensions.omci.omci_entities import *
+from voltha.extensions.omci.omci_messages import *
+
+# abbreviations
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class MockOnu(object):
+ """
+ Minimal class that acts line an ONU. The Mock OLT handler will call into this
+ object with OMCI frames that it will respond to appropriately
+ """
+ def __init__(self, serial_number, adapter_agent, handler_id):
+ self.serial_number = serial_number
+ self._adapter_agent = adapter_agent # TODO: Remove any unused attributes
+ self._handler_id = handler_id
+ self.mib_data_sync = 0 # Assume at reboot!
+
+ # NOTE: when creating response frames, use the basic method of constructing
+ # these frames as the encoding created is unit-tested elsewhere
+ self._omci_response = {
+ OP.Get.value: {
+ CircuitPack.class_id: {
+ 257: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciGetResponse.message_id,
+ omci_message=OmciGetResponse(
+ entity_class=CircuitPack.class_id,
+ entity_id=0,
+ success_code=RC.Success.value,
+ attributes_mask=CircuitPack.mask_for('number_of_ports'),
+ data=OmciMaskedData('value',
+ entity_class=CircuitPack.class_id,
+ attributes_mask=CircuitPack.mask_for('number_of_ports'))
+ ))
+ },
+ # Additional OMCI GET request responses here if needed
+ },
+ OP.Create.value: {
+ # TODO: Create some OMCI CREATE request responses here.
+
+ # def send_create_gal_ethernet_profile(self,
+ # entity_id,
+ # max_gem_payload_size):
+ # frame = OmciFrame(
+ # transaction_id=self.get_tx_id(),
+ # message_type=OmciCreate.message_id,
+ # omci_message=OmciCreate(
+ # entity_class=GalEthernetProfile.class_id,
+ # entity_id=entity_id,
+ # data=dict(
+ # max_gem_payload_size=max_gem_payload_size
+ # )
+ # )
+ # )
+ # self.send_omci_message(frame)
+ },
+ OP.Set.value: {
+ # TODO: Create some OMCI SET request responses here.
+
+ # def send_set_admin_state(self,
+ # entity_id,
+ # admin_state):
+ # data = dict(
+ # administrative_state=admin_state
+ # )
+ # frame = OmciFrame(
+ # transaction_id=self.get_tx_id(),
+ # message_type=OmciSet.message_id,
+ # omci_message=OmciSet(
+ # entity_class=OntG.class_id,
+ # entity_id=entity_id,
+ # attributes_mask=OntG.mask_for(*data.keys()),
+ # data=data
+ # )
+ # )
+ # self.send_omci_message(frame)
+
+ },
+ OP.Delete.value: {
+ # TODO: Create some OMCI DELETE responses here.
+ },
+ OP.MibReset.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciMibResetResponse.message_id,
+ omci_message=OmciMibResetResponse(
+ entity_class=OntData.class_id,
+ entity_id=0,
+ success_code=RC.Success.value
+ ))
+ }
+ },
+ OP.MibUpload.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciMibUploadResponse.message_id,
+ omci_message=OmciMibUploadResponse(
+ entity_class=OntData.class_id,
+ entity_id=0,
+ number_of_commands=3 # Should match list size for MibUploadNext below
+ ))
+ }
+ },
+ # OP.MibUploadNext.value: {
+ # OntData.class_id: {
+ # 0: [
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # ]
+ # }
+ # },
+ OP.Reboot.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciRebootResponse.message_id,
+ omci_message=OmciRebootResponse(
+ entity_class=OntG.class_id,
+ entity_id=0,
+ success_code=RC.Success.value
+ ))
+ }
+ },
+ }
+ # TODO: Support Autonomous ONU messages as well
+
+ def tearDown(self):
+ """Test case cleanup"""
+ pass
+
+ def _request_to_response_type(self, message_type):
+ return {
+ OP.Create.value: OmciCreateResponse,
+ OP.Delete.value: OmciDeleteResponse,
+ OP.Set.value: OmciSetResponse,
+ OP.Get.value: OmciGetResponse,
+ OP.MibUpload.value: OmciMibUploadResponse,
+ OP.MibUploadNext.value: OmciMibUploadNextResponse,
+ OP.MibReset.value: OmciMibResetResponse,
+ OP.Reboot.value: OmciRebootResponse,
+ }.get(message_type & 0x1F, None)
+
+ def rx_omci_frame(self, msg):
+ try:
+ frame = OmciFrame(msg.decode('hex'))
+ response = None
+ response_type = self._request_to_response_type(frame.fields['message_type'])
+ transaction_id = frame.fields['transaction_id']
+
+ omci_message = frame.fields.get('omci_message')
+
+ class_id = omci_message.fields.get('entity_class') \
+ if omci_message is not None else None
+ instance_id = omci_message.fields.get('entity_id') \
+ if omci_message is not None else None
+
+ # Look up hardcode responses based on class and instance ID. If found
+ # return the response, otherwise send back an error
+
+ if response_type is None:
+ status = RC.ProcessingError.value
+ elif class_id is None:
+ status = RC.UnknownEntity.value
+ elif instance_id is None:
+ status = RC.UnknownInstance.value
+ else:
+ status = RC.Success.value
+ try:
+ response_id = response_type.message_id & 0x1f
+ response = self._omci_response[response_id][class_id][instance_id]
+
+ if response_id == OP.MibUploadNext.value:
+ # Special case. Need to get requested entry
+ assert isinstance(response, list)
+ pass
+ pass
+ pass
+ pass
+
+ response.fields['transaction_id'] = transaction_id
+ if 'success_code' in response.fields['omci_message'].fields:
+ response.fields['omci_message'].fields['success_code'] = status
+
+ if status == RC.Success.value:
+ if response_type.message_id in [OmciCreateResponse.message_id,
+ OmciDeleteResponse.message_id,
+ OmciSetResponse.message_id]:
+ self.mib_data_sync += 1
+ if self.mib_data_sync > 255:
+ self.mib_data_sync = 1
+ elif response_type.message_id == OmciMibResetResponse.message_id:
+ self.mib_data_sync = 0
+
+ except KeyError as e:
+ bad_key = e.args[0]
+ if bad_key == class_id:
+ status = RC.UnknownEntity.value
+ elif bad_key == instance_id:
+ status = RC.UnknownInstance.value
+ else:
+ status = RC.ProcessingError.value
+
+ if status != RC.Success.value and \
+ response_type not in [OmciMibUploadResponse,
+ OmciMibUploadNextResponse]:
+ response = OmciFrame(transaction_id=transaction_id,
+ message_type=response_type.message_id,
+ omci_message=response_type(
+ entity_class=class_id,
+ entity_id=instance_id,
+ success_code=status
+ ))
+ return response
+
+ except Exception as e:
+ pass
diff --git a/tests/utests/voltha/extensions/omci/mock/mock_onu_handler.py b/tests/utests/voltha/extensions/omci/mock/mock_onu_handler.py
new file mode 100644
index 0000000..243bb6e
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/mock/mock_onu_handler.py
@@ -0,0 +1,78 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from mock_adapter_agent import MockProxyAddress, MockDevice
+from voltha.extensions.omci.omci_cc import *
+
+
+class MockOnuHandler(MockDevice):
+ """
+ Minimal class to handle ONU needs in OpenOMCI testing
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self, adapter_agent, parent_id, device_id, pon_id, onu_id):
+
+ self.proxy_address = MockProxyAddress(parent_id, pon_id, onu_id)
+ super(MockOnuHandler, self).__init__(device_id, self.proxy_address)
+
+ self.device_id = device_id
+ self.device = self
+ self._adapter_agent = adapter_agent
+
+ self.onu_mock = None
+ self.omci_cc = OMCI_CC(adapter_agent, device_id)
+
+ # Items that you can change to perform various test failures
+
+ self._enabled = True
+
+ def tearDown(self):
+ """Test case cleanup"""
+ if self.onu_mock is not None:
+ self.onu_mock.tearDown()
+ self.onu_mock = None
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+ olt = self._adapter_agent.get_device(self.proxy_address.device_id)
+ if olt is not None and self.proxy_address.channel_id in olt.enabled_pons:
+ if self._enabled:
+ olt.activated_onus.add(self.serial_number)
+ else:
+ olt.activated_onus.discard(self.serial_number)
+
+ # Begin minimal set of needed IAdapter interfaces
+
+ # TODO: Implement minimal functionality
+
+ def send_proxied_message(self, proxy_address, msg):
+ assert False, 'OpenOMCI will implement this for the MOCK ONU'
+
+ def receive_proxied_message(self, _, msg):
+ # Rx of OMCI message from MOCK OLT
+
+ if self.omci_cc is not None and self.enabled:
+ self.omci_cc.receive_message(msg.decode('hex'))
diff --git a/tests/utests/voltha/extensions/omci/test_me_frame.py b/tests/utests/voltha/extensions/omci/test_me_frame.py
index 07aa49d..bee69f4 100644
--- a/tests/utests/voltha/extensions/omci/test_me_frame.py
+++ b/tests/utests/voltha/extensions/omci/test_me_frame.py
@@ -14,30 +14,260 @@
# limitations under the License.
#
from unittest import TestCase, main
-
from voltha.extensions.omci.me_frame import *
-
-# NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
-#
-# NOTE: This is a placeholder for OpenOMCI unit tests of the MEFrame class
-# Initial (worthwhile) tests will be provided in VOL-607
-#
-# NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
+from voltha.extensions.omci.omci_me import *
+from voltha.extensions.omci.omci import *
-class TestMeFrameExample(TestCase):
+def hexify(buffer):
+ """Return a hexadecimal string encoding of input buffer"""
+ return ''.join('%02x' % ord(c) for c in buffer)
- def test_example_1(self):
- self.assertTrue(True)
- self.assertFalse(False)
- self.assertEqual('123', '123')
+class TestSelectMeFrameGeneration(TestCase):
- def test_example_3(self):
+ def assertGeneratedFrameEquals(self, frame, ref):
+ assert isinstance(frame, Packet)
+ serialized_hexified_frame = hexify(str(frame)).upper()
+ ref = ref.upper()
+ if serialized_hexified_frame != ref:
+ self.fail('Mismatch:\nReference:\n{}\nGenerated (bad):\n{}'.format(
+ ref, serialized_hexified_frame
+ ))
- self.assertTrue(True)
- self.assertFalse(False)
- self.assertEqual('123', '123')
+ def test_mib_reset_message_serialization(self):
+ ref = '00004F0A000200000000000000000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ frame = OntDataFrame().mib_reset()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_gal_ethernet_profile(self):
+ ref = '0000440A011000010030000000000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ frame = GalEthernetProfileFrame(1, max_gem_payload_size=48).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_tcont_1(self):
+ ref = '0000480A010680008000040000000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+
+ frame = TcontFrame(0x8000, alloc_id=0x400).set()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_tcont_2(self):
+ ref = '0000480A010680018000040100000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+
+ frame = TcontFrame(0x8001, alloc_id=0x401).set()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_8021p_mapper_service_profile(self):
+ ref = '0000440A00828000ffffffffffffffff' \
+ 'ffffffffffffffffffff000000000000' \
+ '000000000000000000000028'
+ frame = Ieee8021pMapperServiceProfileFrame(0x8000).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_mac_bridge_service_profile(self):
+ ref = '0000440A002D02010001008000140002' \
+ '000f0001000000000000000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ spanning_tree_ind=False,
+ learning_ind=True,
+ priority=0x8000,
+ max_age=20 * 256,
+ hello_time=2 * 256,
+ forward_delay=15 * 256,
+ unknown_mac_address_discard=True
+ )
+ frame = MacBridgeServiceProfileFrame(0x201, attributes=data).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_gem_port_network_ctp(self):
+ ref = '0000440A010C01000400800003010000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+
+ data = dict(
+ port_id=0x400,
+ tcont_pointer=0x8000,
+ direction=3,
+ traffic_management_pointer_upstream=0x100
+ )
+ frame = GemPortNetworkCtpFrame(0x100, attributes=data).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ # Also test direction as a string parameter
+ frame = GemPortNetworkCtpFrame(0x100, port_id=0x400,
+ tcont_id=0x8000,
+ direction='bi-directional',
+ upstream_tm=0x100).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_gem_inteworking_tp(self):
+ ref = '0000440A010A80010100058000000000' \
+ '01000000000000000000000000000000' \
+ '000000000000000000000028'
+ frame = GemInterworkingTpFrame(0x8001,
+ gem_port_network_ctp_pointer=0x100,
+ interworking_option=5,
+ service_profile_pointer=0x8000,
+ interworking_tp_pointer=0x0,
+ gal_profile_pointer=0x1).create()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_8021p_mapper_service_profile(self):
+ ref = '0000480A008280007F80800100000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ ptrs = [0x8001, 0, 0, 0, 0, 0, 0, 0]
+ frame = Ieee8021pMapperServiceProfileFrame(0x8000,
+ interwork_tp_pointers=ptrs).set()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ ptrs = [0x8001, 0]
+ frame = Ieee8021pMapperServiceProfileFrame(0x8000,
+ interwork_tp_pointers=ptrs).set()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_mac_bridge_port_configuration_data(self):
+ ref = '0000440A002F21010201020380000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+
+ frame = MacBridgePortConfigurationDataFrame(0x2101,
+ bridge_id_pointer=0x201,
+ port_num=2,
+ tp_type=3,
+ tp_pointer=0x8000).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_vlan_tagging_filter_data(self):
+ ref = '0000440A005421010400000000000000' \
+ '00000000000000000000000000000000' \
+ '100100000000000000000028'
+ frame = VlanTaggingFilterDataFrame(0x2101,
+ vlan_tcis=[0x400],
+ forward_operation=0x10).create()
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_extended_vlan_tagging_operation_configuration_data(self):
+ ref = '0000440A00AB02020A04010000000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ association_type=10,
+ associated_me_pointer=0x401
+ )
+ frame = \
+ ExtendedVlanTaggingOperationConfigurationDataFrame(0x202,
+ attributes=data)\
+ .create()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_extended_vlan_tagging_operation_configuration_data(self):
+ ref = '0000480A00AB02023800810081000000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ input_tpid=0x8100,
+ output_tpid=0x8100,
+ downstream_mode=0, # inverse of upstream
+ )
+ frame = \
+ ExtendedVlanTaggingOperationConfigurationDataFrame(0x202,
+ attributes=data)\
+ .set()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_extended_vlan_tagging_1(self):
+ ref = '0000480A00AB02020400f00000008200' \
+ '5000402f000000082004000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ received_frame_vlan_tagging_operation_table=\
+ VlanTaggingOperation(
+ filter_outer_priority=15,
+ filter_inner_priority=8,
+ filter_inner_vid=1024,
+ filter_inner_tpid_de=5,
+ filter_ether_type=0,
+ treatment_tags_to_remove=1,
+ pad3=2,
+ treatment_outer_priority=15,
+ treatment_inner_priority=8,
+ treatment_inner_vid=1024,
+ treatment_inner_tpid_de=4
+ )
+ )
+ frame = \
+ ExtendedVlanTaggingOperationConfigurationDataFrame(0x202,
+ attributes=data)\
+ .set()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_set_extended_vlan_tagging_2(self):
+ ref = '0000480A00AB02020400F00000008200' \
+ 'd000402f00000008200c000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ received_frame_vlan_tagging_operation_table=
+ VlanTaggingOperation(
+ filter_outer_priority=15,
+ filter_inner_priority=8,
+ filter_inner_vid=1025,
+ filter_inner_tpid_de=5,
+ filter_ether_type=0,
+ treatment_tags_to_remove=1,
+ pad3=2,
+ treatment_outer_priority=15,
+ treatment_inner_priority=8,
+ treatment_inner_vid=1025,
+ treatment_inner_tpid_de=4
+ )
+ )
+
+ frame = \
+ ExtendedVlanTaggingOperationConfigurationDataFrame(0x202,
+ attributes=data)\
+ .set()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_create_mac_bridge_port_configuration_data2(self):
+ ref = '0000440A002F02010201010b04010000' \
+ '00000000000000000000000000000000' \
+ '000000000000000000000028'
+ data = dict(
+ bridge_id_pointer=0x201,
+ encapsulation_methods=0,
+ port_num=1,
+ port_priority=0,
+ port_path_cost=0,
+ port_spanning_tree_in=0,
+ lan_fcs_ind=0,
+ tp_type=11,
+ tp_pointer=0x401,
+ mac_learning_depth=0
+ )
+ frame = MacBridgePortConfigurationDataFrame(0x201,
+ attributes=data).create()
+
+ self.assertGeneratedFrameEquals(frame, ref)
+
+ def test_constraint_errors(self):
+ self.assertTrue(True) # TODO Also test some attribute constraint failures
if __name__ == '__main__':
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index e68c61c..c0049b2 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -14,32 +14,529 @@
# limitations under the License.
#
from unittest import TestCase, main
+from nose.twistedtools import threaded_reactor, stop_reactor
+from mock.mock_adapter_agent import MockAdapterAgent
+from mock.mock_onu_handler import MockOnuHandler
+from mock.mock_olt_handler import MockOltHandler
+from mock.mock_onu import MockOnu
+from voltha.extensions.omci.omci_defs import *
+from voltha.extensions.omci.omci_frame import *
-from voltha.extensions.omci.omci_cc import *
+DEFAULT_OLT_DEVICE_ID = 'default_olt_mock'
+DEFAULT_ONU_DEVICE_ID = 'default_onu_mock'
+DEFAULT_PON_ID = 0
+DEFAULT_ONU_ID = 0
+DEFAULT_ONU_SN = 'TEST00000001'
+
+OP = EntityOperations
+RC = ReasonCodes
-# NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
-#
-# NOTE: This is a placeholder for OpenOMCI unit tests of the OMCI_CC class
-# Initial (worthwhile) tests will be provided in VOL-607. The VOL-607
-# check-in will also likely include the start of a mock ONU device.
-#
-# NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
+def setup_module():
+ threaded_reactor()
+
+
+def teardown_module():
+ stop_reactor()
class TestOmciCcExample(TestCase):
+ """
+ Test the Open OMCI Communication channels
- def test_example_1(self):
+ Note also added some testing of MockOnu behaviour since its behaviour during more
+ complicated unit/integration tests may be performed in the future.
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
- self.assertTrue(True)
- self.assertFalse(False)
- self.assertEqual('123', '123')
+ def tearDown(self):
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
- def test_example_3(self):
+ def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID):
+ handler = MockOltHandler(self.adapter_agent, device_id)
+ self.adapter_agent.add_device(handler.device)
+ return handler
- self.assertTrue(True)
- self.assertFalse(False)
- self.assertEqual('123', '123')
+ def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID,
+ device_id=DEFAULT_ONU_DEVICE_ID,
+ pon_id=DEFAULT_PON_ID,
+ onu_id=DEFAULT_ONU_ID,
+ serial_no=DEFAULT_ONU_SN):
+ handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id)
+ handler.serial_number = serial_no
+ onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \
+ if serial_no is not None else None
+ handler.onu_mock = onu
+ return handler
+
+ def setup_one_of_each(self):
+ # Most tests will use at lease one or more OLT and ONU
+ self.olt_handler = self.setup_mock_olt()
+ self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id)
+ self.onu_device = self.onu_handler.onu_mock
+
+ self.adapter_agent.add_child_device(self.olt_handler.device,
+ self.onu_handler.device)
+
+ def _is_omci_frame(self, results):
+ assert isinstance(results, OmciFrame), 'Not OMCI Frame'
+ return results
+
+ def _check_status(self, results, value):
+ status = results.fields['omci_message'].fields['success_code']
+ assert status == value,\
+ 'Unexpected Status Code. Got {}, Expected: {}'.format(status, value)
+ return results
+
+ def _check_mib_sync(self, results, value):
+ assert self.onu_device.mib_data_sync == value, \
+ 'Unexpected MIB DATA Sync value. Got {}, Expected: {}'.format(
+ self.onu_device.mib_data_sync, value)
+ return results
+
+ def _check_stats(self, results, snapshot, stat, expected):
+ assert snapshot[stat] == expected, \
+ 'Invalid statistic "{}". Got {}, Expected: {}'.format(stat,
+ snapshot[stat],
+ expected)
+ return results
+
+ def _check_value_equal(self, results, name, value, expected):
+ assert value == expected, \
+ 'Value "{}" not equal. Got {}, Expected: {}'.format(name, value,
+ expected)
+ return results
+
+ def _default_errback(self, _failure):
+ assert False
+
+ def _snapshot_stats(self):
+ omci_cc = self.onu_handler.omci_cc
+ return {
+ 'tx_frames': omci_cc.tx_frames,
+ 'rx_frames': omci_cc.rx_frames,
+ 'rx_unknown_tid': omci_cc.rx_unknown_tid,
+ 'rx_onu_frames': omci_cc.rx_onu_frames,
+ 'rx_alarm_overflow': omci_cc.rx_alarm_overflow,
+ 'rx_avc_overflow': omci_cc.rx_avc_overflow,
+ 'rx_onu_discards': omci_cc.rx_onu_discards,
+ 'rx_timeouts': omci_cc.rx_timeouts,
+ 'tx_errors': omci_cc.tx_errors,
+ 'consecutive_errors': omci_cc.consecutive_errors,
+ 'reply_min': omci_cc.reply_min,
+ 'reply_max': omci_cc.reply_max,
+ 'reply_average': omci_cc.reply_average
+ }
+
+ def test_default_init(self):
+ self.setup_one_of_each()
+ # Test default construction of OMCI_CC as well as
+ # various other parameter settings
+ omci_cc = self.onu_handler.omci_cc
+
+ # No device directly associated
+ self.assertIsNotNone(omci_cc._adapter_agent)
+ self.assertIsNone(omci_cc._proxy_address)
+
+ # No outstanding requests
+ self.assertEqual(len(omci_cc._requests), 0)
+
+ # Flags/properties
+ self.assertFalse(omci_cc.enabled)
+
+ # Statistics
+ self.assertEqual(omci_cc.tx_frames, 0)
+ self.assertEqual(omci_cc.rx_frames, 0)
+ self.assertEqual(omci_cc.rx_unknown_tid, 0)
+ self.assertEqual(omci_cc.rx_onu_frames, 0)
+ self.assertEqual(omci_cc.rx_alarm_overflow, 0)
+ self.assertEqual(omci_cc.rx_avc_overflow, 0)
+ self.assertEqual(omci_cc.rx_onu_discards, 0)
+ self.assertEqual(omci_cc.rx_timeouts, 0)
+ self.assertEqual(omci_cc.tx_errors, 0)
+ self.assertEqual(omci_cc.consecutive_errors, 0)
+ self.assertNotEquals(omci_cc.reply_min, 0.0)
+ self.assertEqual(omci_cc.reply_max, 0.0)
+ self.assertEqual(omci_cc.reply_average, 0.0)
+
+ def test_enable_disable(self):
+ self.setup_one_of_each()
+
+ # Test enable property
+ omci_cc = self.onu_handler.omci_cc
+
+ # Initially disabled
+ self.assertFalse(omci_cc.enabled)
+ omci_cc.enabled = False
+ self.assertFalse(omci_cc.enabled)
+
+ omci_cc.enabled = True
+ self.assertTrue(omci_cc.enabled)
+ self.assertIsNotNone(omci_cc._proxy_address)
+ self.assertEqual(len(omci_cc._requests), 0)
+
+ omci_cc.enabled = True # Should be a NOP
+ self.assertTrue(omci_cc.enabled)
+ self.assertIsNotNone(omci_cc._proxy_address)
+ self.assertEqual(len(omci_cc._requests), 0)
+
+ omci_cc.enabled = False
+ self.assertFalse(omci_cc.enabled)
+ self.assertIsNone(omci_cc._proxy_address)
+
+ def test_message_send_get(self):
+ # Various tests of sending an OMCI message and it either
+ # getting a response or send catching some errors of
+ # importance
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # GET
+ # d = omci_cc.send() # TODO: Implement
+ #
+ # d.addCallbacks(self._is_omci_frame, self._default_errback)
+ # d.addCallback(self._check_status, RC.Success.value)
+ # d.addCallback(self._check_mib_sync, mib_data_sync)
+ #
+ # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+
+ # return d
+
+ def test_message_send_set(self):
+ # Various tests of sending an OMCI message and it either
+ # getting a response or send catching some errors of
+ # importance
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # SET
+ # d = omci_cc.send() # TODO: Implement
+ #
+ # d.addCallbacks(self._is_omci_frame, self._default_errback)
+ # d.addCallback(self._check_status, RC.Success.value)
+ # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
+ #
+ # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+
+ # return d
+ #
+ # # Also test mib_data_sync rollover. 255 -> 1 (zero reserved)
+ #
+ # self.onu_device.mib_data_sync = 255
+ # # SET
+ # self.assertTrue(True) # TODO: Implement (copy previous one here)
+ # self.assertEqual(1, self.onu_device.mib_data_sync)
+
+ def test_message_send_create(self):
+ # Various tests of sending an OMCI message and it either
+ # getting a response or send catching some errors of
+ # importance
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # Create
+ # d = omci_cc.send() # TODO: Implement
+ #
+ # d.addCallbacks(self._is_omci_frame, self._default_errback)
+ # d.addCallback(self._check_status, RC.Success.value)
+ # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
+ #
+ # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+
+ # return d
+
+ def test_message_send_delete(self):
+ # Various tests of sending an OMCI message and it either
+ # getting a response or send catching some errors of
+ # importance
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # Delete
+ # d = omci_cc.send() # TODO: Implement
+ #
+ # d.addCallbacks(self._is_omci_frame, self._default_errback)
+ # d.addCallback(self._check_status, RC.Success.value)
+ # d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
+ #
+ # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+
+ # return d
+
+ def test_message_send_mib_reset(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ self.onu_device.mib_data_sync = 10
+ snapshot = self._snapshot_stats()
+
+ # Successful MIB Reset
+ d = omci_cc.send_mib_reset(timeout=1.0)
+
+ d.addCallbacks(self._is_omci_frame, self._default_errback)
+ d.addCallback(self._check_status, RC.Success)
+ d.addCallback(self._check_mib_sync, 0)
+
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+ return d
+
+ def test_message_send_mib_upload(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # MIB Upload
+ d = omci_cc.send_mib_upload(timeout=1.0)
+
+ d.addCallbacks(self._is_omci_frame, self._default_errback)
+ d.addCallback(self._check_status, RC.Success)
+ d.addCallback(self._check_mib_sync, mib_data_sync)
+
+ # TODO: MIB Upload Results specific tests here
+
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+ return d
+
+ def test_message_send_mib_upload_next(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+ mib_data_sync = self.onu_device.mib_data_sync
+
+ # # MIB Upload Next
+ # d = omci_cc.send_mib_upload_next(0, timeout=1.0)
+ #
+ # d.addCallbacks(self._is_omci_frame, self._default_errback)
+ # d.addCallback(self._check_status, RC.Success)
+ # d.addCallback(self._check_mib_sync, mib_data_sync)
+ #
+ # # TODO: MIB Upload Next Results specific tests here
+ #
+ # d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ # d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ # d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ # d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+ # return d
+
+ def test_message_send_reboot(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # ONU Reboot
+ d = omci_cc.send_reboot(timeout=1.0)
+
+ d.addCallbacks(self._is_omci_frame, self._default_errback)
+ d.addCallback(self._check_status, RC.Success)
+
+ d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
+ d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
+ d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
+ d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
+ d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
+ d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
+ d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+ return d
+
+ def test_message_send_with_omci_disabled(self):
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ self.assertFalse(omci_cc.enabled)
+
+ # Successful MIB Reset
+ d = omci_cc.send_mib_reset(timeout=1.0)
+
+ def success_is_bad(_results):
+ assert False, 'This test should throw a failure/error'
+
+ def fail_fast(_failure):
+ pass
+ return None
+
+ d.addCallbacks(success_is_bad, fail_fast)
+ return d
+
+ def test_message_send_get_with_latency(self):
+ # Various tests of sending an OMCI message and it either
+ # getting a response or send catching some errors of
+ # importance
+ self.setup_one_of_each()
+ self.olt_handler.latency = 0.500 # 1/2 second
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+
+ # Successful MIB Reset
+ d = omci_cc.send_mib_reset(timeout=1.0)
+
+ d.addCallbacks(self._is_omci_frame, self._default_errback)
+ d.addCallback(self._check_status, RC.Success)
+
+ def check_latency_values(_):
+ self.assertGreaterEqual(omci_cc.reply_min, self.olt_handler.latency)
+ self.assertGreaterEqual(omci_cc.reply_max, self.olt_handler.latency)
+ self.assertGreaterEqual(omci_cc.reply_average, self.olt_handler.latency)
+
+ d.addCallback(check_latency_values)
+ return d
+
+ def test_message_failures(self):
+ # Various tests of sending an OMCI message and it fails
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ self.assertEqual(omci_cc.tx_frames, 0)
+ self.assertEqual(omci_cc.rx_frames, 0)
+ self.assertEqual(omci_cc.rx_unknown_tid, 0)
+ self.assertEqual(omci_cc.rx_timeouts, 0)
+ self.assertEqual(omci_cc.tx_errors, 0)
+
+ # # Class ID not found
+ # d = omci_cc.send_mib_reset(timeout=1.0)
+ # self.assertTrue(True) # TODO: Implement
+ # todo: Test non-zero consecutive errors
+ #
+ # # Instance ID not found
+ # d = omci_cc.send_mib_reset(timeout=1.0)
+ # self.assertTrue(True) # TODO: Implement
+ # todo: Test non-zero consecutive errors
+ #
+ # # PON is disabled
+ # d = omci_cc.send_mib_reset(timeout=1.0)
+ # self.assertTrue(True) # TODO: Implement
+ # todo: Test non-zero consecutive errors
+ #
+ # # ONU is disabled
+ # d = omci_cc.send_mib_reset(timeout=1.0)
+ # self.assertTrue(True) # TODO: Implement
+ # todo: Test non-zero consecutive errors
+ #
+ # # ONU is not activated
+ # d = omci_cc.send_mib_reset(timeout=1.0)
+ # self.assertTrue(True) # TODO: Implement
+ # todo: Test non-zero consecutive errors
+
+ # TODO: make OLT send back an unknown TID (
+
+ # todo: Test non-zero consecutive errors
+ # todo: Send a good frame
+ # todo: Test zero consecutive errors
+ # d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
+
+ def test_flush(self):
+ # Test flush of autonomous ONU queues
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # TODO: add more
+ self.assertTrue(True) # TODO: Implement
+
+ def test_avc_rx(self):
+ # Test flush of autonomous ONU queues
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+ snapshot = self._snapshot_stats()
+
+ # TODO: add more
+ self.assertTrue(True) # TODO: Implement
if __name__ == '__main__':
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 8a5c55c..c855cfe 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -21,12 +21,10 @@
import arrow
from twisted.internet import reactor, defer
from twisted.internet.defer import DeferredQueue, TimeoutError, CancelledError, failure, fail
-from voltha.protos import third_party
from common.frameio.frameio import hexify
from voltha.extensions.omci.omci import *
from voltha.extensions.omci.omci_me import OntGFrame, OntDataFrame
-_ = third_party
_MAX_INCOMING_ALARM_MESSAGES = 256
_MAX_INCOMING_AVC_MESSAGES = 256