VOL-702: ONU OMCI Capabilities state machine and tasks
Change-Id: Ie3f653742fc67372f26bc69f732d553285de3d93
diff --git a/tests/utests/voltha/extensions/omci/test_omci_configuration.py b/tests/utests/voltha/extensions/omci/test_omci_configuration.py
new file mode 100644
index 0000000..e7986fe
--- /dev/null
+++ b/tests/utests/voltha/extensions/omci/test_omci_configuration.py
@@ -0,0 +1,484 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from hashlib import md5
+from unittest import TestCase, main
+from nose.tools import raises
+from nose.twistedtools import deferred
+from copy import deepcopy
+from mock.mock_adapter_agent import MockAdapterAgent, MockCore
+from mock.mock_onu_handler import MockOnuHandler
+from mock.mock_olt_handler import MockOltHandler
+from mock.mock_onu import MockOnu
+from voltha.extensions.omci.openomci_agent import OpenOMCIAgent, OpenOmciAgentDefaults
+from voltha.extensions.omci.onu_configuration import OMCCVersion
+from voltha.extensions.omci.omci_defs import *
+from voltha.extensions.omci.omci_entities import OntG, Ont2G, Cardholder, \
+ CircuitPack, SoftwareImage, AniG, UniG
+from common.utils.asleep import asleep
+from voltha.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+from datetime import datetime
+
+DEFAULT_OLT_DEVICE_ID = 'default_olt_mock'
+DEFAULT_ONU_DEVICE_ID = 'default_onu_mock'
+DEFAULT_PON_ID = 0
+DEFAULT_ONU_ID = 0
+DEFAULT_ONU_SN = 'TEST00000001'
+
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class TestOmciConfiguration(TestCase):
+ """
+ Test the OMCI read-only Configuration library methods
+ """
+ def setUp(self):
+ self.adapter_agent = MockAdapterAgent()
+
+ custom = deepcopy(OpenOmciAgentDefaults)
+ custom['mib-synchronizer']['database'] = MibDbVolatileDict
+
+ self.omci_agent = OpenOMCIAgent(MockCore, support_classes=custom)
+ self.omci_agent.start()
+
+ def tearDown(self):
+ if self.omci_agent is not None:
+ self.omci_agent.stop()
+
+ if self.adapter_agent is not None:
+ self.adapter_agent.tearDown()
+
+ def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID):
+ handler = MockOltHandler(self.adapter_agent, device_id)
+ self.adapter_agent.add_device(handler.device)
+ return handler
+
+ def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID,
+ device_id=DEFAULT_ONU_DEVICE_ID,
+ pon_id=DEFAULT_PON_ID,
+ onu_id=DEFAULT_ONU_ID,
+ serial_no=DEFAULT_ONU_SN):
+ handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id)
+ handler.serial_number = serial_no
+ onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \
+ if serial_no is not None else None
+ handler.onu_mock = onu
+ return handler
+
+ def setup_one_of_each(self):
+ # Most tests will use at lease one or more OLT and ONU
+ self.olt_handler = self.setup_mock_olt()
+ self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id)
+ self.onu_device = self.onu_handler.onu_mock
+
+ self.adapter_agent.add_child_device(self.olt_handler.device,
+ self.onu_handler.device)
+ # Add device to OpenOMCI
+ self.onu_device = self.omci_agent.add_device(DEFAULT_ONU_DEVICE_ID,
+ self.adapter_agent)
+
+ # Allow timeout trigger support while in disabled state for mib sync
+ # to make tests run cleanly while profiling.
+ self.onu_device.mib_synchronizer.machine.add_transition('timeout', 'disabled', 'disabled')
+
+ def not_called(self, _reason):
+ assert False, 'Should never be called'
+
+ def _stuff_database(self, entries):
+ """
+ Stuff the MIB database with some entries that we will use during tests
+ """
+ database = self.onu_device.mib_synchronizer._database
+
+ # Stuff a value into last in sync. This makes it look like
+ # the ONU has been in in-sync at least once.
+ self.onu_device.mib_synchronizer.last_mib_db_sync = datetime.utcnow()
+
+ # Entry is a tuple of (class_id, instance_id, {attributes})
+ for entry in entries:
+ database.set(DEFAULT_ONU_DEVICE_ID, entry[0], entry[1], entry[2])
+
+ def test_OMCCVersion(self):
+ for key, value in OMCCVersion.__members__.items():
+ self.assertEqual(OMCCVersion.to_enum(OMCCVersion[key].value), value)
+
+ self.assertEqual(OMCCVersion.to_enum(-1), OMCCVersion.Unknown)
+
+ @deferred(timeout=50000)
+ def test_defaults(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ @raises(AssertionError)
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+ # Should raise assertion if never been synchronized
+ config.version
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs. This is a waiting
+ # Async task from when the OpenOMCIAgent was started. But also start the
+ # device so that it's queued async state machines can run as well
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_but_empty(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ def stuff_db(_results):
+ self._stuff_database([])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ # On no Class ID for requested property, None should be
+ # returned
+ self.assertIsNone(config.version)
+ self.assertIsNone(config.traffic_management_option)
+ self.assertIsNone(config.onu_survival_time)
+ self.assertIsNone(config.equipment_id)
+ self.assertIsNone(config.omcc_version)
+ self.assertIsNone(config.vendor_product_code)
+ self.assertIsNone(config.total_priority_queues)
+ self.assertIsNone(config.total_traffic_schedulers)
+ self.assertIsNone(config.total_gem_ports)
+ self.assertIsNone(config.uptime)
+ self.assertIsNone(config.connectivity_capability)
+ self.assertIsNone(config.qos_configuration_flexibility)
+ self.assertIsNone(config.priority_queue_scale_factor)
+ self.assertIsNone(config.cardholder_entities)
+ self.assertIsNone(config.circuitpack_entities)
+ self.assertIsNone(config.software_images)
+ self.assertIsNone(config.ani_g_entities)
+ self.assertIsNone(config.uni_g_entities)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_ont_g_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ version = 'abcDEF'
+ tm_opt = 2
+ onu_survival = 123
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (OntG.class_id, 0, {'version': version,
+ 'traffic_management_options': tm_opt,
+ 'ont_survival_time': onu_survival
+ })])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ # On no Class ID for requested property, None should be
+ # returned
+ self.assertEqual(config.version, version)
+ self.assertEqual(config.traffic_management_option, tm_opt)
+ self.assertEqual(config.onu_survival_time, onu_survival)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_ont_2g_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ equip_id = 'br-549'
+ omcc_ver = OMCCVersion.G_988_2012
+ vend_code = 0x1234
+ queues = 64
+ scheds = 8
+ gem_ports = 24
+ uptime = 12345
+ conn_capp = 0x00aa
+ qos_flex = 0x001b
+ queue_scale = 1
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (Ont2G.class_id, 0, {'equipment_id': equip_id,
+ 'omcc_version': omcc_ver.value,
+ 'vendor_product_code': vend_code,
+ 'total_priority_queue_number': queues,
+ 'total_traffic_scheduler_number': scheds,
+ 'total_gem_port_id_number': gem_ports,
+ 'sys_uptime': uptime,
+ 'connectivity_capability': conn_capp,
+ 'qos_configuration_flexibility': qos_flex,
+ 'priority_queue_scale_factor': queue_scale
+ })])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ self.assertEqual(config.equipment_id, equip_id)
+ self.assertEqual(config.omcc_version, omcc_ver)
+ self.assertEqual(config.vendor_product_code, vend_code)
+ self.assertEqual(config.total_priority_queues, queues)
+ self.assertEqual(config.total_traffic_schedulers, scheds)
+ self.assertEqual(config.total_gem_ports, gem_ports)
+ self.assertEqual(config.uptime, uptime)
+ self.assertEqual(config.connectivity_capability, conn_capp)
+ self.assertEqual(config.qos_configuration_flexibility, qos_flex)
+ self.assertEqual(config.priority_queue_scale_factor, queue_scale)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_cardholder_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ ch_entity = 0x102
+ unit_type = 255
+ clie_code = 'abc123'
+ prot_ptr = 0
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (Cardholder.class_id, ch_entity, {'actual_plugin_unit_type': unit_type,
+ 'actual_equipment_id': clie_code,
+ 'protection_profile_pointer': prot_ptr,
+ })])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ cardholder = config.cardholder_entities
+ self.assertTrue(isinstance(cardholder, dict))
+ self.assertEqual(len(cardholder), 1)
+ self.assertEqual(cardholder[ch_entity]['entity-id'], ch_entity)
+ self.assertEqual(cardholder[ch_entity]['is-single-piece'], ch_entity >= 256)
+ self.assertEqual(cardholder[ch_entity]['slot-number'], ch_entity & 0xFF)
+ self.assertEqual(cardholder[ch_entity]['actual-plug-in-type'], unit_type)
+ self.assertEqual(cardholder[ch_entity]['actual-equipment-id'], clie_code)
+ self.assertEqual(cardholder[ch_entity]['protection-profile-ptr'], prot_ptr)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_circuitpack_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ cp_entity = 0x100
+ num_ports = 1
+ serial_num = 'ABCD01234'
+ cp_version = '1234ABCD'
+ vendor_id = 'AB-9876'
+ tconts = 2
+ pqueues = 64
+ sched_count = 8
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (CircuitPack.class_id, cp_entity, {'number_of_ports': num_ports,
+ 'serial_number': serial_num,
+ 'version': cp_version,
+ 'vendor_id': vendor_id,
+ 'total_tcont_buffer_number': tconts,
+ 'total_priority_queue_number': pqueues,
+ 'total_traffic_scheduler_number': sched_count,
+ })])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ circuitpack = config.circuitpack_entities
+ self.assertTrue(isinstance(circuitpack, dict))
+ self.assertEqual(len(circuitpack), 1)
+ self.assertEqual(circuitpack[cp_entity]['entity-id'], cp_entity)
+ self.assertEqual(circuitpack[cp_entity]['number-of-ports'], num_ports)
+ self.assertEqual(circuitpack[cp_entity]['serial-number'], serial_num)
+ self.assertEqual(circuitpack[cp_entity]['version'], cp_version)
+ self.assertEqual(circuitpack[cp_entity]['vendor-id'], vendor_id)
+ self.assertEqual(circuitpack[cp_entity]['total-tcont-count'], tconts)
+ self.assertEqual(circuitpack[cp_entity]['total-priority-queue-count'], pqueues)
+ self.assertEqual(circuitpack[cp_entity]['total-traffic-sched-count'], sched_count)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_software_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ sw_entity = 0x200
+ sw_version = 'Beta-0.0.2'
+ sw_hash = md5("just_a_test").hexdigest()
+ prod_code = 'MySoftware'
+ sw_active = True
+ sw_committed = True
+ sw_valid = True
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (SoftwareImage.class_id, sw_entity, {'version': sw_version,
+ 'is_committed': sw_committed,
+ 'is_active': sw_active,
+ 'is_valid': sw_valid,
+ 'product_code': prod_code,
+ 'image_hash': sw_hash,
+ })])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ images = config.software_images
+ self.assertTrue(isinstance(images, list))
+ self.assertEqual(len(images), 1)
+ self.assertEqual(images[0].name, 'running-revision' if sw_active else 'candidate-revision')
+ self.assertEqual(images[0].version, sw_version)
+ self.assertEqual(images[0].is_active, 1 if sw_active else 0)
+ self.assertEqual(images[0].is_committed, 1 if sw_committed else 0)
+ self.assertEqual(images[0].is_valid, 1 if sw_valid else 0)
+ self.assertEqual(images[0].hash, sw_hash)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_ani_g_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ entity_id = 0x0106
+ tconts = 4
+ dba_report = 4
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (AniG.class_id, entity_id, {'total_tcont_number': tconts,
+ 'piggyback_dba_reporting': dba_report
+ })
+ ])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ anig = config.ani_g_entities
+ self.assertTrue(isinstance(anig, dict))
+ self.assertEqual(len(anig), 1)
+
+ self.assertEqual(anig[entity_id]['entity-id'], entity_id)
+ self.assertEqual(anig[entity_id]['slot-number'], (entity_id >> 8) & 0xff)
+ self.assertEqual(anig[entity_id]['port-number'], entity_id & 0xff)
+ self.assertEqual(anig[entity_id]['total-tcont-count'], tconts)
+ self.assertEqual(anig[entity_id]['piggyback-dba-reporting'], dba_report)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+ @deferred(timeout=5)
+ def test_in_sync_with_uni_g_values(self):
+ self.setup_one_of_each()
+ self.assertEqual(len(self.omci_agent.device_ids()), 1)
+
+ entity_id = 0x4321
+ mgmt_cap = 0
+
+ def stuff_db(_results):
+ self._stuff_database([
+ (UniG.class_id, entity_id, {'management_capability': mgmt_cap})
+ ])
+
+ def do_my_tests(_results):
+ config = self.onu_device.configuration
+
+ unig = config.uni_g_entities
+ self.assertTrue(isinstance(unig, dict))
+ self.assertEqual(len(unig), 1)
+
+ self.assertEqual(unig[entity_id]['entity-id'], entity_id)
+ self.assertEqual(unig[entity_id]['management-capability'], mgmt_cap)
+
+ # No capabilities available until started
+ self.assertIsNone(self.onu_device.configuration)
+
+ # Yield context so that MIB Database callLater runs.
+ self.onu_device.start()
+ d = asleep(0.2)
+ d.addCallbacks(stuff_db, self.not_called)
+ d.addCallbacks(do_my_tests, self.not_called)
+ return d
+
+
+if __name__ == '__main__':
+ main()
+
diff --git a/voltha/extensions/omci/me_frame.py b/voltha/extensions/omci/me_frame.py
index e491bae..743ee1a 100644
--- a/voltha/extensions/omci/me_frame.py
+++ b/voltha/extensions/omci/me_frame.py
@@ -289,3 +289,28 @@
entity_id=getattr(self, 'entity_id'),
command_sequence_number=data['mib_data_sync']
))
+
+ def get_next(self):
+ """
+ Create a Get Next request frame for this ME
+ :return: (OmciFrame) OMCI Frame
+ """
+ assert hasattr(self, 'data'), 'data required for Get Next actions'
+ data = getattr(self, 'data')
+ MEFrame.check_type(data, dict)
+ assert len(data) == 1, 'Only one attribute should be specified'
+
+ mask_set = data.keys() if isinstance(data, dict) else data
+
+ self._check_operation(OP.GetNext)
+ self._check_attributes(mask_set, AA.Readable)
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciGetNext.message_id,
+ omci_message=OmciGetNext(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ attributes_mask=self.entity_class.mask_for(*mask_set),
+ command_sequence_number=data.values()[0]
+ ))
diff --git a/voltha/extensions/omci/omci_entities.py b/voltha/extensions/omci/omci_entities.py
index 9b627e2..665e21c 100644
--- a/voltha/extensions/omci/omci_entities.py
+++ b/voltha/extensions/omci/omci_entities.py
@@ -1023,6 +1023,31 @@
notifications = {OP.AttributeValueChange, OP.AlarmNotification}
+class Omci(EntityClass):
+ class_id = 287
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R},
+ range_check=lambda x: x == 0),
+
+ # TODO: Can this be expressed better in SCAPY, probably not?
+ # On the initial, Get request for either the me_type or message_type
+ # attributes, you will receive a 4 octet value (big endian) that is
+ # the number of octets to 'get-next' to fully load the desired
+ # attribute. For a basic OMCI formatted message, that will be 29
+ # octets per get-request.
+ #
+ # For the me_type_table, these are 16-bit values (ME Class IDs)
+ #
+ # For the message_type_table, these are 8-bit values (Actions)
+
+ ECA(FieldListField("me_type_table", None, ByteField('', 0),
+ count_from=lambda _: 29), {AA.R}),
+ ECA(FieldListField("message_type_table", None, ByteField('', 0),
+ count_from=lambda _: 29), {AA.R}),
+ ]
+ mandatory_operations = {OP.Get, OP.GetNext}
+
+
class EnhSecurityControl(EntityClass):
class_id = 332
attributes = [
diff --git a/voltha/extensions/omci/omci_frame.py b/voltha/extensions/omci/omci_frame.py
index 139a862..1f06e5f 100644
--- a/voltha/extensions/omci/omci_frame.py
+++ b/voltha/extensions/omci/omci_frame.py
@@ -25,7 +25,7 @@
OmciMibUploadNext, OmciMibUploadResponse, OmciMibUpload, \
OmciGetAllAlarmsNextResponse, OmciAttributeValueChange, \
OmciTestResult, OmciAlarmNotification, \
- OmciReboot, OmciRebootResponse
+ OmciReboot, OmciRebootResponse, OmciGetNext, OmciGetNextResponse
from voltha.extensions.omci.omci_messages import OmciCreateResponse
@@ -114,6 +114,13 @@
PacketField("omci_message", None, OmciRebootResponse), align=36),
lambda pkt: pkt.message_type == OmciRebootResponse.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciGetNext), align=36),
+ lambda pkt: pkt.message_type == OmciGetNext.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciGetNextResponse), align=36),
+ lambda pkt: pkt.message_type == OmciGetNextResponse.message_id),
+
# TODO add entries for remaining OMCI message types
IntField("omci_trailer", 0x00000028)
diff --git a/voltha/extensions/omci/omci_me.py b/voltha/extensions/omci/omci_me.py
index 1e7c183..6af02fb 100644
--- a/voltha/extensions/omci/omci_me.py
+++ b/voltha/extensions/omci/omci_me.py
@@ -693,3 +693,41 @@
data = {'mib_data_sync'} # Make Get's happy
super(OntDataFrame, self).__init__(OntData, 0, data)
+
+
+class OmciFrame(MEFrame):
+ """
+ This managed entity describes the ONU's general level of support for OMCI managed
+ entities and messages. This ME is not included in a MIB upload.
+ """
+ def __init__(self, me_type_table=None, message_type_table=None):
+ """
+ For 'get' request, set the type of table count you wish by
+ setting either me_me_type_table or message_type_table to
+ a boolean 'True' value
+
+ For 'get-next' requests, set the sequence number for the
+ table you wish to retrieve by setting either me_me_type_table or message_type_table to
+ a integer value.
+ """
+ if not isinstance(me_type_table, (bool, int, type(None))):
+ raise TypeError('Parameters must be a boolean or integer')
+
+ if not isinstance(message_type_table, (bool, int, type(None))):
+ raise TypeError('Parameters must be a boolean or integer')
+
+ if me_type_table is not None:
+ if isinstance(me_type_table, bool):
+ data = {'me_type_table'}
+ else:
+ data = {'me_type_table': me_type_table}
+
+ elif message_type_table is not None:
+ if isinstance('message_type_table', bool):
+ data = {'message_type_table'}
+ else:
+ data = {'message_type_table': message_type_table}
+ else:
+ raise NotImplemented('Unknown request')
+
+ super(OmciFrame, self).__init__(Omci, 0, data)
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index 2dee1e3..57d2be9 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -350,3 +350,28 @@
ShortField("entity_id", 0),
ByteField("success_code", 0)
]
+
+
+class OmciGetNext(OmciMessage):
+ name = "OmciGetNext"
+ message_id = 0x5A
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField("attributes_mask", None),
+ ShortField("command_sequence_number", None)
+ ]
+
+
+class OmciGetNextResponse(OmciMessage):
+ name = "OmciGetNextResponse"
+ message_id = 0x3A
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ByteField("success_code", 0),
+ ShortField("attributes_mask", None),
+ ConditionalField(OmciMaskedData("data"),
+ lambda pkt: pkt.success_code == 0)
+ ]
+
diff --git a/voltha/extensions/omci/onu_configuration.py b/voltha/extensions/omci/onu_configuration.py
new file mode 100644
index 0000000..fbd5018
--- /dev/null
+++ b/voltha/extensions/omci/onu_configuration.py
@@ -0,0 +1,444 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+
+from voltha.protos.device_pb2 import Image
+from omci_entities import *
+from database.mib_db_api import *
+from enum import IntEnum
+
+
+class OMCCVersion(IntEnum):
+ Unknown = 0 # Unknown or unsupported version
+ G_984_4 = 0x80 # (06/04)
+ G_984_4_2005_Amd_1 = 0x81 # Amd.1 (06/05)
+ G_984_4_2006_Amd_2 = 0x82 # Amd.2 (03/06)
+ G_984_4_2006_Amd_3 = 0x83 # Amd.3 (12/06)
+ G_984_4_2008 = 0x84 # (02/08)
+ G_984_4_2009_Amd_1 = 0x85 # Amd.1 (06/09)
+ G_984_4_2009_Amd_2_Base = 0x86 # Amd.2 (2009) Baseline message set only, w/o the extended message set option
+ G_984_4_2009_Amd_2 = 0x96 # Amd.2 (2009) Extended message set option + baseline message set.
+ G_988_2010_Base = 0xA0 # (2010) Baseline message set only, w/o the extended message set option
+ G_988_2011_Amd_1_Base = 0xA1 # Amd.1 (2011) Baseline message set only
+ G_988_2012_Amd_2_Base = 0xA2 # Amd.2 (2012) Baseline message set only
+ G_988_2012_Base = 0xA3 # (2012) Baseline message set only
+ G_988_2010 = 0xB0 # (2010) Baseline and extended message set
+ G_988_2011_Amd_1 = 0xB1 # Amd.1 (2011) Baseline and extended message set
+ G_988_2012_Amd_2 = 0xB2 # Amd.2 (2012) Baseline and extended message set
+ G_988_2012 = 0xB3 # (2012)Baseline and extended message set
+
+ @staticmethod
+ def values():
+ return {OMCCVersion[member].value for member in OMCCVersion.__members__.keys()}
+
+ @staticmethod
+ def to_enum(value):
+ return next((v for k, v in OMCCVersion.__members__.items()
+ if v.value == value), OMCCVersion.Unknown)
+
+
+class OnuConfiguration(object):
+ """
+ Utility class to query OMCI MIB Database for various ONU/OMCI Configuration
+ and capabilties. These capabilities revolve around read-only MEs discovered
+ during the MIB Upload process.
+
+ There is also a 'omci_onu_capabilities' State Machine and an
+ 'onu_capabilities_task.py' OMCI Task that will query the ONU, via the
+ OMCI (ME#287) Managed entity to get the full list of supported OMCI ME's
+ and available actions/message-types supported.
+
+ NOTE: Currently this class is optimized/tested for ONUs that support the
+ OpenOMCI implementation.
+ """
+ def __init__(self, omci_agent, device_id):
+ """
+ Initialize this instance of the OnuConfiguration class
+
+ :param omci_agent: (OpenOMCIAgent) agent reference
+ :param device_id: (str) ONU Device ID
+
+ :raises KeyError: If ONU Device is not registered with OpenOMCI
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+ self._device_id = device_id
+ self._onu_device = omci_agent.get_device(device_id)
+
+ # The capabilities
+ self._attributes = None
+ self.reset()
+
+ def _get_capability(self, attr, class_id, instance_id=None):
+ """
+ Get the OMCI capabilities for this device
+
+ :param attr: (str) OnuConfiguration attribute field
+ :param class_id: (int) ME Class ID
+ :param instance_id: (int) Instance ID. If not provided, all instances of the
+ specified class ID are returned if present in the DB.
+
+ :return: (dict) Class and/or Instances. None is returned if the CLASS is not present
+ """
+ try:
+ assert self._onu_device.mib_synchronizer.last_mib_db_sync is not None, \
+ 'MIB Database for ONU {} has never been synchronized'.format(self._device_id)
+
+ # Get the requested information
+ if self._attributes[attr] is None:
+ value = self._onu_device.query_mib(class_id, instance_id=instance_id)
+
+ if isinstance(value, dict) and len(value) > 0:
+ self._attributes[attr] = value
+
+ return self._attributes[attr]
+
+ except Exception as e:
+ self.log.exception('onu-capabilities', e=e, class_id=class_id,
+ instance_id=instance_id)
+ raise
+
+ def reset(self):
+ """
+ Reset the cached database entries to None. This method should be
+ called after any communications loss to the ONU (reboot, PON down, ...)
+ in case a new software load with different capabilities is available.
+ """
+ self._attributes = {
+ '_ont_g': None,
+ '_ont_2g': None,
+ '_ani_g': None,
+ '_uni_g': None,
+ '_cardholder': None,
+ '_circuit_pack': None,
+ '_software': None,
+ }
+
+ @property
+ def version(self):
+ """
+ This attribute identifies the version of the ONU as defined by the vendor
+ """
+ ontg = self._get_capability('_ont_g', OntG.class_id, 0)
+ if ontg is None or ATTRIBUTES_KEY not in ontg:
+ return None
+
+ return ontg[ATTRIBUTES_KEY].get('version')
+
+ @property
+ def traffic_management_option(self):
+ """
+ This attribute identifies the upstream traffic management function
+ implemented in the ONU. There are three options:
+
+ 0 Priority controlled and flexibly scheduled upstream traffic. The traffic
+ scheduler and priority queue mechanism are used for upstream traffic.
+
+ 1 Rate controlled upstream traffic. The maximum upstream traffic of each
+ individual connection is guaranteed by shaping.
+
+ 2 Priority and rate controlled. The traffic scheduler and priority queue
+ mechanism are used for upstream traffic. The maximum upstream traffic
+ of each individual connection is guaranteed by shaping.
+ """
+ ontg = self._get_capability('_ont_g', OntG.class_id, 0)
+ if ontg is None or ATTRIBUTES_KEY not in ontg:
+ return None
+
+ return ontg[ATTRIBUTES_KEY].get('traffic_management_options')
+
+ @property
+ def onu_survival_time(self):
+ """
+ This attribute indicates the minimum guaranteed time in milliseconds
+ between the loss of external power and the silence of the ONU. This does not
+ include survival time attributable to a backup battery. The value zero implies that
+ the actual time is not known.
+
+ Optional
+ """
+ ontg = self._get_capability('_ont_g', OntG.class_id, 0)
+ if ontg is None or ATTRIBUTES_KEY not in ontg:
+ return None
+
+ return ontg[ATTRIBUTES_KEY].get('ont_survival_time', 0)
+
+ @property
+ def equipment_id(self):
+ """
+ This attribute may be used to identify the specific type of ONU. In some
+ environments, this attribute may include the equipment CLEI code.
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('equipment_id')
+
+ @property
+ def omcc_version(self):
+ """
+ This attribute identifies the version of the OMCC protocol being used by the
+ ONU. This allows the OLT to manage a network with ONUs that support different
+ OMCC versions. Release levels of [ITU-T G.984.4] are supported with code
+ points of the form 0x8y and 0x9y, where y is a hexadecimal digit in the range
+ 0..F. Support for continuing revisions of this Recommendation is defined in
+ the 0xAy range.
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return OMCCVersion.to_enum(ont2g[ATTRIBUTES_KEY].get('omcc_version', 0))
+
+ @property
+ def vendor_product_code(self):
+ """
+ This attribute contains a vendor-specific product code for the ONU
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('vendor_product_code')
+
+ @property
+ def total_priority_queues(self):
+ """
+ This attribute reports the total number of upstream priority queues
+ that are not associated with a circuit pack, but with the ONU in its entirety
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('total_priority_queue_number')
+
+ @property
+ def total_traffic_schedulers(self):
+ """
+ This attribute reports the total number of traffic schedulers that
+ are not associated with a circuit pack, but with the ONU in its entirety.
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('total_traffic_scheduler_number')
+
+ @property
+ def total_gem_ports(self):
+ """
+ This attribute reports the total number of GEM port-IDs supported
+ by the ONU.
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('total_gem_port_id_number')
+
+ @property
+ def uptime(self):
+ """
+ This attribute counts 10 ms intervals since the ONU was last initialized.
+ It rolls over to 0 when full
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('sys_uptime')
+
+ @property
+ def connectivity_capability(self):
+ """
+ This attribute indicates the Ethernet connectivity models that the ONU
+ can support. The value 0 indicates that the capability is not supported; 1 signifies
+ support.
+
+ Bit Model [Figure reference ITU-T 988]
+ 1 (LSB) N:1 bridging, Figure 8.2.2-3
+ 2 1:M mapping, Figure 8.2.2-4
+ 3 1:P filtering, Figure 8.2.2-5
+ 4 N:M bridge-mapping, Figure 8.2.2-6
+ 5 1:MP map-filtering, Figure 8.2.2-7
+ 6 N:P bridge-filtering, Figure 8.2.2-8
+ 7 to refer to N:MP bridge-map-filtering, Figure 8.2.2-9
+ 8...16 Reserved
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('connectivity_capability')
+
+ @property
+ def qos_configuration_flexibility(self):
+ """
+ This attribute reports whether various managed entities in the
+ ONU are fixed by the ONU's architecture or whether they are configurable. For
+ backward compatibility, and if the ONU does not support this attribute, all such
+ attributes are understood to be hard-wired.
+
+ Bit Interpretation when bit value = 1
+ 1 (LSB) Priority queue ME: Port field of related port attribute is
+ read-write and can point to any T-CONT or UNI port in the
+ same slot
+ 2 Priority queue ME: The traffic scheduler pointer is permitted
+ to refer to any other traffic scheduler in the same slot
+ 3 Traffic scheduler ME: T-CONT pointer is read-write
+ 4 Traffic scheduler ME: Policy attribute is read-write
+ 5 T-CONT ME: Policy attribute is read-write
+ 6 Priority queue ME: Priority field of related port attribute is
+ read-write
+ 7..16 Reserved
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('qos_configuration_flexibility')
+
+ @property
+ def priority_queue_scale_factor(self):
+ """
+ This specifies the scale factor of several attributes of the priority
+ queue managed entity of section 5.2.8
+ """
+ ont2g = self._get_capability('_ont_2g', Ont2G.class_id, 0)
+ if ont2g is None or ATTRIBUTES_KEY not in ont2g:
+ return None
+
+ return ont2g[ATTRIBUTES_KEY].get('priority_queue_scale_factor', 1)
+
+ @property
+ def cardholder_entities(self):
+ """
+ Return a dictionary containing some overall information on the CardHolder
+ instances for this ONU.
+ """
+ ch = self._get_capability('_cardholder', Cardholder.class_id)
+ results = dict()
+
+ if ch is not None:
+ for inst, inst_data in ch.items():
+ if isinstance(inst, int):
+ results[inst] = {
+ 'entity-id': inst,
+ 'is-single-piece': inst >= 256,
+ 'slot-number': inst & 0xff,
+ 'actual-plug-in-type': inst_data[ATTRIBUTES_KEY].get('actual_plugin_unit_type', 0),
+ 'actual-equipment-id': inst_data[ATTRIBUTES_KEY].get('actual_equipment_id', 0),
+ 'protection-profile-ptr': inst_data[ATTRIBUTES_KEY].get('protection_profile_pointer', 0),
+ }
+ return results if len(results) else None
+
+ @property
+ def circuitpack_entities(self):
+ """
+ This specifies the scale factor of several attributes of the priority
+ queue managed entity of section 5.2.8
+ """
+ cp = self._get_capability('_circuit_pack', CircuitPack.class_id)
+ results = dict()
+
+ if cp is not None:
+ for inst, inst_data in cp.items():
+ if isinstance(inst, int):
+ results[inst] = {
+ 'entity-id': inst,
+ 'number-of-ports': inst_data[ATTRIBUTES_KEY].get('number_of_ports', 0),
+ 'serial-number': inst_data[ATTRIBUTES_KEY].get('serial_number', 0),
+ 'version': inst_data[ATTRIBUTES_KEY].get('version', 0),
+ 'vendor-id': inst_data[ATTRIBUTES_KEY].get('vendor_id', 0),
+ 'total-tcont-count': inst_data[ATTRIBUTES_KEY].get('total_tcont_buffer_number', 0),
+ 'total-priority-queue-count': inst_data[ATTRIBUTES_KEY].get('total_priority_queue_number', 0),
+ 'total-traffic-sched-count': inst_data[ATTRIBUTES_KEY].get('total_traffic_scheduler_number', 0),
+ }
+
+ return results if len(results) else None
+
+ @property
+ def software_images(self):
+ """
+ Get a list of software image information for the ONU. The information is provided
+ so that it may be directly added to the protobuf Device information software list.
+ """
+ sw = self._get_capability('_software', SoftwareImage.class_id)
+ images = list()
+
+ if sw is not None:
+ for inst, inst_data in sw.items():
+ if isinstance(inst, int):
+ is_active = inst_data[ATTRIBUTES_KEY].get('is_active', False)
+
+ images.append(Image(name='running-revision' if is_active else 'candidate-revision',
+ version=str(inst_data[ATTRIBUTES_KEY].get('version',
+ 'Not Available').rstrip('\0')),
+ is_active=is_active,
+ is_committed=inst_data[ATTRIBUTES_KEY].get('is_committed',
+ False),
+ is_valid=inst_data[ATTRIBUTES_KEY].get('is_valid',
+ False),
+ install_datetime='Not Available',
+ hash=str(inst_data[ATTRIBUTES_KEY].get('image_hash',
+ 'Not Available').rstrip('\0'))))
+ return images if len(images) else None
+
+ @property
+ def ani_g_entities(self):
+ """
+ This managed entity organizes data associated with each access network
+ interface supported by a G-PON ONU. The ONU automatically creates one
+ instance of this managed entity for each PON physical port.
+ """
+ ag = self._get_capability('_ani_g', AniG.class_id)
+ results = dict()
+
+ if ag is not None:
+ for inst, inst_data in ag.items():
+ if isinstance(inst, int):
+ results[inst] = {
+ 'entity-id': inst,
+ 'slot-number': (inst >> 8) & 0xff,
+ 'port-number': inst & 0xff,
+ 'total-tcont-count': inst_data[ATTRIBUTES_KEY].get('total_tcont_number', 0),
+ 'piggyback-dba-reporting': inst_data[ATTRIBUTES_KEY].get('piggyback_dba_reporting', 0),
+ }
+ return results if len(results) else None
+
+ @property
+ def uni_g_entities(self):
+ """
+ This managed entity organizes data associated with user network interfaces
+ (UNIs) supported by GEM. One instance of the UNI-G managed entity exists
+ for each UNI supported by the ONU.
+
+ The ONU automatically creates or deletes instances of this managed entity
+ upon the creation or deletion of a real or virtual circuit pack managed
+ entity, one per port.
+ """
+ ug = self._get_capability('_uni_g', UniG.class_id)
+ results = dict()
+
+ if ug is not None:
+ for inst, inst_data in ug.items():
+ if isinstance(inst, int):
+ results[inst] = {
+ 'entity-id': inst,
+ 'management-capability': inst_data[ATTRIBUTES_KEY].get('management_capability', 0)
+ }
+ return results if len(results) else None
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 955c484..9e3915d 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -20,6 +20,7 @@
from voltha.extensions.omci.omci_cc import OMCI_CC
from common.event_bus import EventBusClient
from voltha.extensions.omci.tasks.task_runner import TaskRunner
+from voltha.extensions.omci.onu_configuration import OnuConfiguration
from twisted.internet import reactor
from enum import IntEnum
@@ -36,6 +37,8 @@
# Events of interest to Device Adapters and OpenOMCI State Machines
DeviceStatusEvent = 0 # OnuDeviceEntry running status changed
MibDatabaseSyncEvent = 1 # MIB database sync changed
+ OmciCapabilitiesEvent = 2 # OMCI ME and message type capabilities
+
# TODO: Add other events here as needed
@@ -68,6 +71,7 @@
# are per ONU Vendor
#
self._support_classes = support_classes
+ self._configuration = None
try:
# MIB Synchronization state machine
@@ -77,6 +81,11 @@
device_id,
mib_synchronizer_info['tasks'],
mib_db)
+ # ONU OMCI Capabilities state machine
+ capabilities_info = support_classes.get('omci-capabilities')
+ self._capabilities_sm = capabilities_info['state-machine'](self._omci_agent,
+ device_id,
+ capabilities_info['tasks'])
except Exception as e:
self.log.exception('mib-sync-create-failed', e=e)
raise
@@ -86,6 +95,7 @@
self._state_machines = []
self._on_start_state_machines = [ # Run when 'start()' called
self._mib_sync_sm,
+ self._capabilities_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
@@ -132,6 +142,12 @@
"""
return self._mib_sync_sm
+ @property
+ def omci_capabilities(self):
+ """
+ Reference to the OpenOMCI OMCI Capabilities state machine for this ONU
+ """
+ return self._capabilities_sm
@property
def active(self):
@@ -181,6 +197,16 @@
}
self.event_bus.publish(topic=topic, msg=msg)
+ @property
+ def configuration(self):
+ """
+ Get the OMCI Configuration object for this ONU. This is a class that provides some
+ common database access functions for ONU capabilities and read-only configuration values.
+
+ :return: (OnuConfiguration)
+ """
+ return self._configuration
+
def start(self):
"""
Start the ONU Device Entry state machines
@@ -192,6 +218,7 @@
self._omci_cc.enabled = True
self._first_in_sync = True
self._runner.start()
+ self._configuration = OnuConfiguration(self._omci_agent, self._device_id)
# Start MIB Sync and other state machines that can run before the first
# MIB Synchronization event occurs. Start 'later' so that any
@@ -242,7 +269,8 @@
if self._first_in_sync:
self._first_in_sync = False
- # TODO: Start up the ONU Capabilities task
+ # Start up the ONU Capabilities task
+ self._configuration.reset()
# Start up any other remaining OpenOMCI state machines
def start_state_machines(machines):
@@ -262,6 +290,14 @@
msg = {ACTIVE_KEY: self._started}
self.event_bus.publish(topic=topic, msg=msg)
+ def publish_omci_capabilities_event(self):
+ """
+ Publish the ONU Device start/start status.
+ """
+ topic = OnuDeviceEntry.event_bus_topic(self.device_id,
+ OnuDeviceEvents.OmciCapabilitiesEvent)
+ self.event_bus.publish(topic=topic, msg=None)
+
def delete(self):
"""
Stop the ONU Device's state machine and remove the ONU, and any related
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index e8cfaf8..c78d8e5 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -21,6 +21,8 @@
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry
+from voltha.extensions.omci.state_machines.omci_onu_capabilities import OnuOmciCapabilities
+from voltha.extensions.omci.tasks.onu_capabilities_task import OnuCapabilitiesTask
OpenOmciAgentDefaults = {
'mib-synchronizer': {
@@ -35,8 +37,12 @@
'mib-reconcile': None # TODO: post-v1.3.0 (Reconcile out-of-sync MIB DB)
}
},
- # 'onu-capabilities': OnuCapabilitiesTask,
- #
+ 'omci-capabilities': {
+ 'state-machine': OnuOmciCapabilities, # Implements OMCI capabilities state mach9ine
+ 'tasks': {
+ 'get-capabilities': OnuCapabilitiesTask # Get supported ME and Commands
+ }
+ }
# 'alarm-syncronizer': {
# 'state-machine': AlarmSynchronizer, # Implements the MIB synchronization state machine
# 'database': AlarmDb, # For any State storage needs
diff --git a/voltha/extensions/omci/state_machines/omci_onu_capabilities.py b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
new file mode 100644
index 0000000..f288929
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/omci_onu_capabilities.py
@@ -0,0 +1,235 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from transitions import Machine
+from twisted.internet import reactor
+from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry, OnuDeviceEvents, IN_SYNC_KEY
+
+
+class OnuOmciCapabilities(object):
+ """
+ OpenOMCI ONU OMCI Capabilities State machine
+ """
+ DEFAULT_STATES = ['disabled', 'out_of_sync', 'in_sync', 'idle']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'out_of_sync'},
+ {'trigger': 'synchronized', 'source': 'out_of_sync', 'dest': 'in_sync'},
+
+ {'trigger': 'success', 'source': 'in_sync', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'in_sync', 'dest': 'out_of_sync'},
+
+ {'trigger': 'not_synchronized', 'source': 'idle', 'dest': 'out_of_sync'},
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ ]
+ DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
+
+ def __init__(self, agent, device_id, tasks,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_RETRY):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param tasks: (dict) Tasks to run
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout or poll
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._timeout_delay = timeout_delay
+
+ self._get_capabilities_task = tasks['get-capabilities']
+
+ self._deferred = None
+ self._current_task = None
+ self._task_deferred = None
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+
+ self._subscriptions = { # RxEvent.enum -> Subscription Object
+ OnuDeviceEvents.MibDatabaseSyncEvent: None
+ }
+ self._sub_mapping = {
+ OnuDeviceEvents.MibDatabaseSyncEvent: self.on_mib_sync_event
+ }
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}'.format(self.__class__.__name__))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+
+ for d in [d1, d1]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _cancel_tasks(self):
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ def __str__(self):
+ return 'OnuOmciCapabilities: Device ID: {}, State:{}'.format(self._device_id, self.state)
+
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def supported_managed_entities(self):
+ """
+ Return a set of the Managed Entity class IDs supported on this ONU
+ None is returned if no MEs have been discovered
+
+ :return: (set of ints)
+ """
+ return self._supported_entities if len(self._supported_entities) else None
+
+ @property
+ def supported_message_types(self):
+ """
+ Return a set of the Message Types supported on this ONU
+ None is returned if no message types have been discovered
+
+ :return: (set of EntityOperations)
+ """
+ return self._supported_msg_types if len(self._supported_msg_types) else None
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self.log.debug('state-transition')
+ self._cancel_deferred()
+ self._cancel_tasks()
+
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+
+ # Drop Response and Autonomous notification subscriptions
+ for event, sub in self._subscriptions.iteritems():
+ if sub is not None:
+ self._subscriptions[event] = None
+ self._device.event_bus.unsubscribe(sub)
+
+ def on_enter_out_of_sync(self):
+ """
+ State machine has just started or the MIB database has transitioned
+ to an out-of-synchronization state
+ """
+ self.log.debug('state-transition')
+ self._cancel_deferred()
+ self._device = self._agent.get_device(self._device_id)
+
+ # Subscribe to events of interest
+ try:
+ for event, sub in self._sub_mapping.iteritems():
+ if self._subscriptions[event] is None:
+ self._subscriptions[event] = \
+ self._device.event_bus.subscribe(
+ topic=OnuDeviceEntry.event_bus_topic(self._device_id,
+ event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('subscription-setup', e=e)
+
+ # Periodically check/poll for in-sync in case subscription was missed or
+ # already in sync
+ self._deferred = reactor.callLater(0, self.check_in_sync)
+
+ def check_in_sync(self):
+ if self._device.mib_db_in_sync:
+ self.synchronized()
+ else:
+ self._deferred = reactor.callLater(self._timeout_delay,
+ self.check_in_sync)
+
+ def on_enter_in_sync(self):
+ """
+ State machine has just transitioned to an in-synchronization state
+ """
+ self.log.debug('state-transition')
+ self._cancel_deferred()
+
+ def success(results):
+ self.log.debug('capabilities-success: {}'.format(results))
+ self._supported_entities = self._current_task.supported_managed_entities
+ self._supported_msg_types = self._current_task.supported_message_types
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('capabilities-failure', reason=reason)
+ self._supported_entities = frozenset()
+ self._supported_msg_types = frozenset()
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ # Schedule a task to read the ONU's OMCI capabilities
+ self._current_task = self._get_capabilities_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_idle(self):
+ """
+ Notify any subscribers for a capabilities event and wait until
+ stopped or ONU MIB database goes out of sync
+ """
+ self.log.debug('state-transition')
+ self._cancel_deferred()
+ self._device.publish_omci_capabilities_event()
+
+ def on_mib_sync_event(self, _topic, msg):
+ """
+ Handle In-Sync/Out-of-Sync for the MIB database
+ :param _topic: (str) Subscription topic
+ :param msg: (dict) In-Sync event data
+ """
+ if self._subscriptions.get(OnuDeviceEvents.MibDatabaseSyncEvent) is None:
+ return
+
+ if msg[IN_SYNC_KEY]:
+ self.synchronized()
+ else:
+ self.not_synchronized()
diff --git a/voltha/extensions/omci/tasks/onu_capabilities_task.py b/voltha/extensions/omci/tasks/onu_capabilities_task.py
new file mode 100644
index 0000000..4a8c1d3
--- /dev/null
+++ b/voltha/extensions/omci/tasks/onu_capabilities_task.py
@@ -0,0 +1,286 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from binascii import hexlify
+from twisted.internet.defer import inlineCallbacks, failure, returnValue
+from twisted.internet import reactor
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_me import OmciFrame
+from voltha.extensions.omci.omci import EntityOperations
+
+
+class GetNextException(Exception):
+ pass
+
+
+class GetCapabilitiesFailure(Exception):
+ pass
+
+
+class OnuCapabilitiesTask(Task):
+ """
+ OpenOMCI MIB Capabilities Task
+
+ This task requests information on supported MEs via the OMCI (ME#287)
+ Managed entity.
+
+ This task should be ran after MIB Synchronization and before any MIB
+ Downloads to the ONU.
+
+ Upon completion, the Task deferred callback is invoked with dictionary
+ containing the supported managed entities and message types.
+
+ results = {
+ 'supported-managed-entities': {set of supported managed entities},
+ 'supported-message-types': {set of supported message types}
+ }
+ """
+ task_priority = 240
+ name = "ONU Capabilities Task"
+
+ max_mib_get_next_retries = 3
+ mib_get_next_delay = 5
+ DEFAULT_OCTETS_PER_MESSAGE = 29
+
+ def __init__(self, omci_agent, device_id, omci_pdu_size=DEFAULT_OCTETS_PER_MESSAGE):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param omci_pdu_size: (int) OMCI Data payload size (not counting any trailers)
+ """
+ super(OnuCapabilitiesTask, self).__init__(OnuCapabilitiesTask.name,
+ omci_agent,
+ device_id,
+ priority=OnuCapabilitiesTask.task_priority)
+ self._local_deferred = None
+ self._device = omci_agent.get_device(device_id)
+ self._pdu_size = omci_pdu_size
+ self._supported_entities = set()
+ self._supported_msg_types = set()
+
+ def cancel_deferred(self):
+ super(OnuCapabilitiesTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def supported_managed_entities(self):
+ """
+ Return a set of the Managed Entity class IDs supported on this ONU
+
+ None is returned if no MEs have been discovered
+
+ :return: (set of ints)
+ """
+ return frozenset(self._supported_entities) if len(self._supported_entities) else None
+
+ @property
+ def supported_message_types(self):
+ """
+ Return a set of the Message Types supported on this ONU
+
+ None is returned if no message types have been discovered
+
+ :return: (set of EntityOperations)
+ """
+ return frozenset(self._supported_msg_types) if len(self._supported_msg_types) else None
+
+ def start(self):
+ """
+ Start MIB Capabilities task
+ """
+ super(OnuCapabilitiesTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_capabilities)
+
+ def stop(self):
+ """
+ Shutdown MIB Capabilities task
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ self._device = None
+ super(OnuCapabilitiesTask, self).stop()
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise GetCapabilitiesFailure('Get Capabilities Task was cancelled')
+
+ @inlineCallbacks
+ def perform_get_capabilities(self):
+ """
+ Perform the MIB Capabilities sequence.
+
+ The sequence is to perform a Get request with the attribute mask equal
+ to 'me_type_table'. The response to this request will carry the size
+ of (number of get-next sequences).
+
+ Then a loop is entered and get-next commands are sent for each sequence
+ requested.
+ """
+ self.log.info('perform-get')
+
+ try:
+ self._supported_entities = yield self.get_supported_entities()
+ self.stop_if_not_running()
+
+ self._supported_msg_types = yield self.get_supported_message_types()
+ self.stop_if_not_running()
+
+ self.log.debug('get-success',
+ supported_entities=self.supported_managed_entities,
+ supported_msg_types=self.supported_message_types)
+ results = {
+ 'supported-managed-entities': self.supported_managed_entities,
+ 'supported-message-types': self.supported_message_types
+ }
+ self.deferred.callback(results)
+
+ except Exception as e:
+ self.log.exception('perform-get', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ def get_count_from_data_buffer(self, data):
+ """
+ Extract the 4 octet buffer length from the OMCI PDU contents
+ """
+ self.log.debug('get-count-buffer', data=data)
+ return int(hexlify(data[:4]), 16)
+
+ @inlineCallbacks
+ def get_supported_entities(self):
+ """
+ Get the supported ME Types for this ONU.
+ """
+ try:
+ # Get the number of requests needed
+ frame = OmciFrame(me_type_table=True).get()
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get count of supported entities failed with status code: {}'.
+ format(status))
+ data = omci_msg.fields['data']['me_type_table']
+ count = self.get_count_from_data_buffer(bytearray(data))
+
+ seq_no = 0
+ data_buffer = bytearray(0)
+ self.log.debug('me-type-count', octets=count, data=data)
+
+ # Start the loop
+ for offset in xrange(0, count, self._pdu_size):
+ frame = OmciFrame(me_type_table=seq_no).get_next()
+ seq_no += 1
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure(
+ 'Get supported entities request at offset {} of {} failed with status code: {}'.
+ format(offset + 1, count, status))
+
+ # Extract the data
+ num_octets = count - offset
+ if num_octets > self._pdu_size:
+ num_octets = self._pdu_size
+
+ data = omci_msg.fields['data']['me_type_table']
+ data_buffer += bytearray(data[:num_octets])
+
+ me_types = {(data_buffer[x] << 8) + data_buffer[x + 1]
+ for x in xrange(0, len(data_buffer), 2)}
+ returnValue(me_types)
+
+ except Exception as e:
+ self.log.exception('get-entities', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ @inlineCallbacks
+ def get_supported_message_types(self):
+ """
+ Get the supported Message Types (actions) for this ONU.
+ """
+ try:
+ # Get the number of requests needed
+ frame = OmciFrame(message_type_table=True).get()
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure('Get count of supported msg types failed with status code: {}'.
+ format(status))
+
+ data = omci_msg.fields['data']['message_type_table']
+ count = self.get_count_from_data_buffer(bytearray(data))
+
+ seq_no = 0
+ data_buffer = list()
+ self.log.debug('me-type-count', octets=count, data=data)
+
+ # Start the loop
+ for offset in xrange(0, count, self._pdu_size):
+ frame = OmciFrame(message_type_table=seq_no).get_next()
+ seq_no += 1
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ omci_msg = results.fields['omci_message']
+ status = omci_msg.fields['success_code']
+
+ if status != ReasonCodes.Success.value:
+ raise GetCapabilitiesFailure(
+ 'Get supported msg types request at offset {} of {} failed with status code: {}'.
+ format(offset + 1, count, status))
+
+ # Extract the data
+ num_octets = count - offset
+ if num_octets > self._pdu_size:
+ num_octets = self._pdu_size
+
+ data = omci_msg.fields['data']['message_type_table']
+ data_buffer += data[:num_octets]
+
+ def buffer_to_message_type(value):
+ """
+ Convert an integer value to the appropriate EntityOperations enumeration
+ :param value: (int) Message type value (4..29)
+ :return: (EntityOperations) Enumeration, None on failure
+ """
+ next((v for k, v in EntityOperations.__members__.items() if v.value == value), None)
+
+ msg_types = {buffer_to_message_type(v) for v in data_buffer if v is not None}
+ returnValue({msg_type for msg_type in msg_types if msg_type is not None})
+
+ except Exception as e:
+ self.log.exception('get-msg-types', e=e)
+ self.deferred.errback(failure.Failure(e))