blob: 210d4354e17482ce3313215d6211a6cd7f61e725 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest import TestCase, main
from mock.mock_adapter_agent import MockAdapterAgent
from mock.mock_onu_handler import MockOnuHandler
from mock.mock_olt_handler import MockOltHandler
from mock.mock_onu import MockOnu
from voltha.extensions.omci.omci_defs import *
from voltha.extensions.omci.omci_frame import *
from voltha.extensions.omci.omci_cc import UNKNOWN_CLASS_ATTRIBUTE_KEY
DEFAULT_OLT_DEVICE_ID = 'default_olt_mock'
DEFAULT_ONU_DEVICE_ID = 'default_onu_mock'
DEFAULT_PON_ID = 0
DEFAULT_ONU_ID = 0
DEFAULT_ONU_SN = 'TEST00000001'
OP = EntityOperations
RC = ReasonCodes
def chunk(indexable, chunk_size):
for i in range(0, len(indexable), chunk_size):
yield indexable[i:i + chunk_size]
def hex2raw(hex_string):
return ''.join(chr(int(byte, 16)) for byte in chunk(hex_string, 2))
class TestOmciCc(TestCase):
"""
Test the Open OMCI Communication channels
Note also added some testing of MockOnu behaviour since its behaviour during more
complicated unit/integration tests may be performed in the future.
"""
def setUp(self):
self.adapter_agent = MockAdapterAgent()
def tearDown(self):
if self.adapter_agent is not None:
self.adapter_agent.tearDown()
def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID):
handler = MockOltHandler(self.adapter_agent, device_id)
self.adapter_agent.add_device(handler.device)
return handler
def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID,
device_id=DEFAULT_ONU_DEVICE_ID,
pon_id=DEFAULT_PON_ID,
onu_id=DEFAULT_ONU_ID,
serial_no=DEFAULT_ONU_SN):
handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id)
handler.serial_number = serial_no
onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \
if serial_no is not None else None
handler.onu_mock = onu
return handler
def setup_one_of_each(self):
# Most tests will use at lease one or more OLT and ONU
self.olt_handler = self.setup_mock_olt()
self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id)
self.onu_device = self.onu_handler.onu_mock
self.adapter_agent.add_child_device(self.olt_handler.device,
self.onu_handler.device)
def _is_omci_frame(self, results):
assert isinstance(results, OmciFrame), 'Not OMCI Frame'
return results
def _check_status(self, results, value):
status = results.fields['omci_message'].fields['success_code']
assert status == value,\
'Unexpected Status Code. Got {}, Expected: {}'.format(status, value)
return results
def _check_mib_sync(self, results, value):
assert self.onu_device.mib_data_sync == value, \
'Unexpected MIB DATA Sync value. Got {}, Expected: {}'.format(
self.onu_device.mib_data_sync, value)
return results
def _check_stats(self, results, snapshot, stat, expected):
assert snapshot[stat] == expected, \
'Invalid statistic "{}". Got {}, Expected: {}'.format(stat,
snapshot[stat],
expected)
return results
def _check_value_equal(self, results, name, value, expected):
assert value == expected, \
'Value "{}" not equal. Got {}, Expected: {}'.format(name, value,
expected)
return results
def _default_errback(self, failure):
from twisted.internet.defer import TimeoutError
assert isinstance(failure.type, type(TimeoutError))
return None
def _snapshot_stats(self):
omci_cc = self.onu_handler.omci_cc
return {
'tx_frames': omci_cc.tx_frames,
'rx_frames': omci_cc.rx_frames,
'rx_unknown_tid': omci_cc.rx_unknown_tid,
'rx_onu_frames': omci_cc.rx_onu_frames,
'rx_alarm_overflow': omci_cc.rx_alarm_overflow,
'rx_avc_overflow': omci_cc.rx_avc_overflow,
'rx_onu_discards': omci_cc.rx_onu_discards,
'rx_timeouts': omci_cc.rx_timeouts,
'rx_unknown_me': omci_cc.rx_unknown_me,
'tx_errors': omci_cc.tx_errors,
'consecutive_errors': omci_cc.consecutive_errors,
'reply_min': omci_cc.reply_min,
'reply_max': omci_cc.reply_max,
'reply_average': omci_cc.reply_average
}
def test_default_init(self):
self.setup_one_of_each()
# Test default construction of OMCI_CC as well as
# various other parameter settings
omci_cc = self.onu_handler.omci_cc
# No device directly associated
self.assertIsNotNone(omci_cc._adapter_agent)
self.assertIsNone(omci_cc._proxy_address)
# No outstanding requests
self.assertEqual(len(omci_cc._requests), 0)
# Flags/properties
self.assertFalse(omci_cc.enabled)
# Statistics
self.assertEqual(omci_cc.tx_frames, 0)
self.assertEqual(omci_cc.rx_frames, 0)
self.assertEqual(omci_cc.rx_unknown_tid, 0)
self.assertEqual(omci_cc.rx_onu_frames, 0)
self.assertEqual(omci_cc.rx_alarm_overflow, 0)
self.assertEqual(omci_cc.rx_avc_overflow, 0)
self.assertEqual(omci_cc.rx_onu_discards, 0)
self.assertEqual(omci_cc.rx_unknown_me, 0)
self.assertEqual(omci_cc.rx_timeouts, 0)
self.assertEqual(omci_cc.tx_errors, 0)
self.assertEqual(omci_cc.consecutive_errors, 0)
self.assertNotEquals(omci_cc.reply_min, 0.0)
self.assertEqual(omci_cc.reply_max, 0.0)
self.assertEqual(omci_cc.reply_average, 0.0)
def test_enable_disable(self):
self.setup_one_of_each()
# Test enable property
omci_cc = self.onu_handler.omci_cc
# Initially disabled
self.assertFalse(omci_cc.enabled)
omci_cc.enabled = False
self.assertFalse(omci_cc.enabled)
omci_cc.enabled = True
self.assertTrue(omci_cc.enabled)
self.assertIsNotNone(omci_cc._proxy_address)
self.assertEqual(len(omci_cc._requests), 0)
omci_cc.enabled = True # Should be a NOP
self.assertTrue(omci_cc.enabled)
self.assertIsNotNone(omci_cc._proxy_address)
self.assertEqual(len(omci_cc._requests), 0)
omci_cc.enabled = False
self.assertFalse(omci_cc.enabled)
self.assertIsNone(omci_cc._proxy_address)
def test_rx_discard_if_disabled(self):
# ME without a known decoder
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = False
snapshot = self._snapshot_stats()
msg = '00fc2e0a00020000ff780000e00000010000000c' \
'0000000000000000000000000000000000000000' \
'00000028105a86ef'
omci_cc.receive_message(hex2raw(msg))
# Note: No counter increments
self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
def test_message_send_get(self):
# Various tests of sending an OMCI message and it either
# getting a response or send catching some errors of
# importance
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# GET
# d = omci_cc.send() # TODO: Implement
#
# d.addCallbacks(self._is_omci_frame, self._default_errback)
# d.addCallback(self._check_status, RC.Success.value)
# d.addCallback(self._check_mib_sync, mib_data_sync)
#
# d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
# d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
# d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
def test_message_send_set(self):
# Various tests of sending an OMCI message and it either
# getting a response or send catching some errors of
# importance
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# SET
# d = omci_cc.send() # TODO: Implement
#
# d.addCallbacks(self._is_omci_frame, self._default_errback)
# d.addCallback(self._check_status, RC.Success.value)
# d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
#
# d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
# d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
# d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
#
# # Also test mib_data_sync rollover. 255 -> 1 (zero reserved)
#
# self.onu_device.mib_data_sync = 255
# # SET
# self.assertTrue(True) # TODO: Implement (copy previous one here)
# self.assertEqual(1, self.onu_device.mib_data_sync)
def test_message_send_create(self):
# Various tests of sending an OMCI message and it either
# getting a response or send catching some errors of
# importance
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# Create
# d = omci_cc.send() # TODO: Implement
#
# d.addCallbacks(self._is_omci_frame, self._default_errback)
# d.addCallback(self._check_status, RC.Success.value)
# d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
#
# d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
# d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
# d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
def test_message_send_delete(self):
# Various tests of sending an OMCI message and it either
# getting a response or send catching some errors of
# importance
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# Delete
# d = omci_cc.send() # TODO: Implement
#
# d.addCallbacks(self._is_omci_frame, self._default_errback)
# d.addCallback(self._check_status, RC.Success.value)
# d.addCallback(self._check_mib_sync, mib_data_sync + 1 if mib_data_sync < 255 else 1)
#
# d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
# d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
# d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
def test_message_send_mib_reset(self):
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
self.onu_device.mib_data_sync = 10
snapshot = self._snapshot_stats()
# Successful MIB Reset
d = omci_cc.send_mib_reset(timeout=1.0)
d.addCallbacks(self._is_omci_frame, self._default_errback)
d.addCallback(self._check_status, RC.Success)
d.addCallback(self._check_mib_sync, 0)
d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
def test_message_send_mib_upload(self):
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# MIB Upload
d = omci_cc.send_mib_upload(timeout=1.0)
d.addCallbacks(self._is_omci_frame, self._default_errback)
d.addCallback(self._check_status, RC.Success)
d.addCallback(self._check_mib_sync, mib_data_sync)
# TODO: MIB Upload Results specific tests here
d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
def test_message_send_mib_upload_next(self):
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
mib_data_sync = self.onu_device.mib_data_sync
# # MIB Upload Next
# d = omci_cc.send_mib_upload_next(0, timeout=1.0)
#
# d.addCallbacks(self._is_omci_frame, self._default_errback)
# d.addCallback(self._check_status, RC.Success)
# d.addCallback(self._check_mib_sync, mib_data_sync)
#
# # TODO: MIB Upload Next Results specific tests here
#
# d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
# d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
# d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
# return d
def test_message_send_reboot(self):
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
# ONU Reboot
d = omci_cc.send_reboot(timeout=1.0)
d.addCallbacks(self._is_omci_frame, self._default_errback)
d.addCallback(self._check_status, RC.Success)
d.addCallback(self._check_stats, snapshot, 'tx_frames', snapshot['tx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
d.addCallback(self._check_stats, snapshot, 'tx_errors', snapshot['tx_errors'])
d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
return d
def test_message_send_with_omci_disabled(self):
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
self.assertFalse(omci_cc.enabled)
# Successful MIB Reset
d = omci_cc.send_mib_reset(timeout=1.0)
def success_is_bad(_results):
assert False, 'This test should throw a failure/error'
def fail_fast(_failure):
pass
return None
d.addCallbacks(success_is_bad, fail_fast)
return d
def test_message_send_get_with_latency(self):
# Various tests of sending an OMCI message and it either
# getting a response or send catching some errors of
# importance
self.setup_one_of_each()
self.olt_handler.latency = 0.500 # 1/2 second
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
# Successful MIB Reset
d = omci_cc.send_mib_reset(timeout=1.0)
d.addCallbacks(self._is_omci_frame, self._default_errback)
d.addCallback(self._check_status, RC.Success)
def check_latency_values(_):
self.assertGreaterEqual(omci_cc.reply_min, self.olt_handler.latency)
self.assertGreaterEqual(omci_cc.reply_max, self.olt_handler.latency)
self.assertGreaterEqual(omci_cc.reply_average, self.olt_handler.latency)
d.addCallback(check_latency_values)
return d
def test_message_failures(self):
# Various tests of sending an OMCI message and it fails
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
self.assertEqual(omci_cc.tx_frames, 0)
self.assertEqual(omci_cc.rx_frames, 0)
self.assertEqual(omci_cc.rx_unknown_tid, 0)
self.assertEqual(omci_cc.rx_timeouts, 0)
self.assertEqual(omci_cc.tx_errors, 0)
# # Class ID not found
# d = omci_cc.send_mib_reset(timeout=1.0)
# self.assertTrue(True) # TODO: Implement
# todo: Test non-zero consecutive errors
#
# # Instance ID not found
# d = omci_cc.send_mib_reset(timeout=1.0)
# self.assertTrue(True) # TODO: Implement
# todo: Test non-zero consecutive errors
#
# # PON is disabled
# d = omci_cc.send_mib_reset(timeout=1.0)
# self.assertTrue(True) # TODO: Implement
# todo: Test non-zero consecutive errors
#
# # ONU is disabled
# d = omci_cc.send_mib_reset(timeout=1.0)
# self.assertTrue(True) # TODO: Implement
# todo: Test non-zero consecutive errors
#
# # ONU is not activated
# d = omci_cc.send_mib_reset(timeout=1.0)
# self.assertTrue(True) # TODO: Implement
# todo: Test non-zero consecutive errors
# TODO: make OLT send back an unknown TID (
# todo: Test non-zero consecutive errors
# todo: Send a good frame
# todo: Test zero consecutive errors
# d.addCallback(self._check_value_equal, 'consecutive_errors', 0, omci_cc.consecutive_errors)
def test_rx_unknown_me(self):
# ME without a known decoder
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
# This is the ID ------+
# v
msg = '00fc2e0a00020000ff780000e00000010000000c' \
'0000000000000000000000000000000000000000' \
'00000028'
omci_cc.receive_message(hex2raw(msg))
# Note: After successful frame decode, a lookup of the corresponding request by
# TID is performed. None should be found, so we should see the Rx Unknown TID
# increment.
self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'] + 1)
self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'] + 1)
self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
self.assertEqual(omci_cc.consecutive_errors, 0)
def test_rx_decode_unknown_me(self):
# ME without a known decoder
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
# This is a MIB Upload Next Response. Where we would probably first see an
# unknown Class ID
#
# This is the ID ------+
# v
msg = '00fc2e0a00020000ff780001e000'
blob = '00010000000c0000000000000000000000000000000000000000'
msg += blob + '00000028'
# Dig into the internal method so we can get the returned frame
frame = omci_cc._decode_unknown_me(hex2raw(msg))
self.assertEqual(frame.fields['transaction_id'], 0x00fc)
self.assertEqual(frame.fields['message_type'], 0x2e)
omci_fields = frame.fields['omci_message'].fields
self.assertEqual(omci_fields['entity_class'], 0x0002)
self.assertEqual(omci_fields['entity_id'], 0x00)
self.assertEqual(omci_fields['object_entity_class'], 0x0ff78)
self.assertEqual(omci_fields['object_entity_id'], 0x01)
self.assertEqual(omci_fields['object_attributes_mask'], 0xe000)
data_fields = omci_fields['object_data']
decoded_blob = data_fields.get(UNKNOWN_CLASS_ATTRIBUTE_KEY)
self.assertIsNotNone(decoded_blob)
self.assertEqual(decoded_blob, blob)
def test_flush(self):
# Test flush of autonomous ONU queues
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
# TODO: add more
self.assertTrue(True) # TODO: Implement
def test_avc_rx(self):
# Test flush of autonomous ONU queues
self.setup_one_of_each()
omci_cc = self.onu_handler.omci_cc
omci_cc.enabled = True
snapshot = self._snapshot_stats()
# TODO: add more
self.assertTrue(True) # TODO: Implement
if __name__ == '__main__':
main()