VOL-1448: Initial checkin of pyvoltha repository
This is very early work and unit tests are not currently running.
Future versions of this code will remove the protobuf directory
and address any v2.0 API changes such as the key-value store API
used by various libraries in pyvoltha
- Added .gitreview config file
- Moved VERSION file to expected location and specified a dev version
so no git tags or PyPI publishing occurs until we are ready.
- Removed generated .desc protobuf files
Change-Id: Icaedc6a4d2cff87cd7d538d3610586d0f5a5db18
diff --git a/test/unit/extensions/omci/mock/__init__.py b/test/unit/extensions/omci/mock/__init__.py
new file mode 100644
index 0000000..2792694
--- /dev/null
+++ b/test/unit/extensions/omci/mock/__init__.py
@@ -0,0 +1,24 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from nose.twistedtools import threaded_reactor, stop_reactor
+
+
+def setup_module():
+ threaded_reactor()
+
+
+def teardown_module():
+ stop_reactor()
diff --git a/test/unit/extensions/omci/mock/mock_adapter_agent.py b/test/unit/extensions/omci/mock/mock_adapter_agent.py
new file mode 100644
index 0000000..866eb67
--- /dev/null
+++ b/test/unit/extensions/omci/mock/mock_adapter_agent.py
@@ -0,0 +1,165 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# import binascii
+import structlog
+# from twisted.internet.defer import Deferred
+# from voltha.core.config.config_root import ConfigRoot
+# from pyvoltha.protos.voltha_pb2 import VolthaInstance
+# from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame
+
+class MockProxyAddress(object):
+ def __init__(self, device_id, pon_id, onu_id):
+ self.device_id = device_id # Device ID of proxy (OLT)
+ self.onu_id = onu_id
+ self.onu_session_id = onu_id
+
+ self.channel_group_id = pon_id # close enough for mock
+ self.channel_id = pon_id
+ self.channel_termination = pon_id
+
+
+class MockDevice(object):
+ def __init__(self, device_id, proxy_address=None, serial_number=None):
+ from pyvoltha.adapters.extensions.omci.omci_entities import entity_id_to_class_map
+ self.id = device_id
+ self.parent_id = None
+ self.proxy_address = proxy_address
+ self.serial_number = serial_number
+ self.me_map = entity_id_to_class_map
+
+
+class MockCore(object):
+ def __init__(self):
+ self.root = None # ConfigRoot(VolthaInstance())
+
+ def get_proxy(self, path):
+ return self.root.get_proxy(path)
+
+
+class MockAdapterAgent(object):
+ """
+ Minimal class to handle adapter-agent needs in OpenOMCI. It can be
+ used by a mock OLT or ONU.
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self, d=None):
+ self.log = structlog.get_logger()
+ self._devices = dict() # device-id -> mock device
+ self.core = MockCore()
+ self.deferred = d
+ self.timeout_the_message = False
+
+ @property
+ def send_omci_defer(self):
+ return self.deferred
+
+ @send_omci_defer.setter
+ def send_omci_defer(self, value):
+ self.deferred = value
+
+ @property
+ def name(self):
+ return "cig_mock_ont"
+
+ def tearDown(self):
+ """Test case cleanup"""
+ for device in self._devices.itervalues():
+ device.tearDown()
+ self._devices.clear()
+
+ def add_device(self, device):
+ self._devices[device.id] = device
+
+ def add_child_device(self, parent_device, child_device):
+ # Set parent
+ child_device.parent_id = parent_device.id
+
+ # Add ONU serial number if PON and ONU enabled
+
+ if (child_device.enabled and
+ child_device.serial_number is not None and
+ child_device.proxy_address.channel_id in parent_device.enabled_pons):
+ parent_device.activated_onus.add(child_device.serial_number)
+
+ self.add_device(child_device)
+
+ def get_device(self, device_id):
+ return self._devices[device_id]
+
+ def get_child_device(self, parent_device_id, **kwargs):
+ onu_id = kwargs.pop('onu_id', None)
+ pon_id = kwargs.pop('pon_id', None)
+ if onu_id is None and pon_id is None:
+ return None
+
+ # Get all child devices with the same parent ID
+ children_ids = set(d.id for d in self._devices.itervalues()
+ if d.parent_id == parent_device_id)
+
+ # Loop through all the child devices with this parent ID
+ for child_id in children_ids:
+ device = self.get_device(child_id)
+
+ # Does this child device match the passed in ONU ID?
+ found_onu_id = False
+ if onu_id is not None:
+ if device.proxy_address.onu_id == onu_id:
+ found_onu_id = True
+
+ # Does this child device match the passed in SERIAL NUMBER?
+ found_pon_id = False
+ if pon_id is not None:
+ if device.proxy_address.channel_id == pon_id:
+ found_pon_id = True
+ # Match ONU ID and PON ID
+ if onu_id is not None and pon_id is not None:
+ found = found_onu_id & found_pon_id
+ # Otherwise ONU ID or PON ID
+ else:
+ found = found_onu_id | found_pon_id
+
+ # Return the matched child device
+ if found:
+ return device
+
+ return None
+
+ def send_proxied_message(self, proxy_address, msg):
+ # Look up ONU handler and forward the message
+ self.log.debug("--> send_proxied_message", message=msg)
+
+ # if proxy_address is None:
+ if self.deferred is not None and not self.timeout_the_message:
+ self.deferred.callback(msg)
+ # return None
+
+ # olt_handler = self.get_device(proxy_address.device_id)
+
+ # if olt_handler is not None:
+ # olt_handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ # Look up ONU handler and forward the message
+
+ onu_handler = self.get_child_device(proxy_address.device_id,
+ onu_id=proxy_address.onu_id,
+ pon_id=proxy_address.channel_id)
+ if onu_handler is not None:
+ onu_handler.receive_proxied_message(proxy_address, msg)
diff --git a/test/unit/extensions/omci/mock/mock_olt_handler.py b/test/unit/extensions/omci/mock/mock_olt_handler.py
new file mode 100644
index 0000000..142dbd8
--- /dev/null
+++ b/test/unit/extensions/omci/mock/mock_olt_handler.py
@@ -0,0 +1,108 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from mock_adapter_agent import MockDevice
+from nose.twistedtools import reactor
+
+
+class MockOltHandler(MockDevice):
+ """
+ VERY Minimal class to handle OLT needs in OpenOMCI testing
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self, adapter_agent, device_id):
+ super(MockOltHandler, self).__init__(device_id)
+
+ self.device_id = device_id
+ self.device = self
+ self._adapter_agent = adapter_agent
+ self._num_tx = 0
+
+ ####################################################################
+ # NOTE: The following can be manipulated in your test case to modify the behaviour
+ # of this mock.
+ #
+ # Note that activated ONUs are added during adapter add_child_device
+ # if the ONU handler associated is 'enabled'
+
+ self.enabled = True # OLT is enabled/active
+ self.activated_onus = set() # Activated ONU serial numbers
+ self.enabled_pons = range(0, 16) # Enabled PONs
+ self.max_tx = sys.maxint # Fail after this many tx requests
+ self.latency = 0.0 # OMCI response latency (keep small)
+
+ # TODO: Implement minimal functionality
+
+ # TODO: Implement minimal functionality
+
+ def tearDown(self):
+ """Test case cleanup"""
+ pass
+
+ # Begin minimal set of needed IAdapter interfaces
+
+ def send_proxied_message(self, proxy_address, msg):
+ """Check various enabled flags and status and send if okay"""
+
+ if not self.enabled:
+ return None
+
+ pon_id = proxy_address.channel_id
+
+ if pon_id not in self.enabled_pons:
+ return None
+
+ # Look up ONU device ID.
+ onu_id = proxy_address.onu_id
+ onu_handler = self._adapter_agent.get_child_device(proxy_address.device_id,
+ pon_id=pon_id,
+ onu_id=onu_id)
+
+ if onu_handler is None or not onu_handler.enabled:
+ return None
+
+ onu_mock = onu_handler.onu_mock
+ if onu_mock is None or onu_mock.serial_number not in self.activated_onus:
+ return None
+
+ # And Tx success (silent discard for OMCI timeout testing)
+ if self._num_tx >= self.max_tx:
+ return None
+ self._num_tx += 1
+
+ response = onu_mock.rx_omci_frame(msg)
+
+ # Make async and add any requested latency. Bound it to less
+ # than 5 seconds since this is a unit test that need to be
+ # somewhat responsive
+
+ assert 0.0 <= self.latency <= 5, 'Best practice is latency <= 5 seconds'
+ if response is not None:
+ reactor.callLater(self.latency, self._deliver_proxy_message, proxy_address, response)
+
+ def _deliver_proxy_message(self, proxy_address, response):
+ from common.frameio.frameio import hexify
+ self._adapter_agent.receive_proxied_message(proxy_address,
+ hexify(str(response)))
+
+ def receive_proxied_message(self, _, __):
+ assert False, 'This is never called on the OLT side of proxy messaging'
+
diff --git a/test/unit/extensions/omci/mock/mock_onu.py b/test/unit/extensions/omci/mock/mock_onu.py
new file mode 100644
index 0000000..e63c5cd
--- /dev/null
+++ b/test/unit/extensions/omci/mock/mock_onu.py
@@ -0,0 +1,283 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame
+from pyvoltha.adapters.extensions.omci.omci_defs import *
+from pyvoltha.adapters.extensions.omci.omci_entities import *
+from pyvoltha.adapters.extensions.omci.omci_messages import *
+
+# abbreviations
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class MockOnu(object):
+ """
+ Minimal class that acts line an ONU. The Mock OLT handler will call into this
+ object with OMCI frames that it will respond to appropriately
+ """
+ def __init__(self, serial_number, adapter_agent, handler_id):
+ self.serial_number = serial_number
+ self._adapter_agent = adapter_agent # TODO: Remove any unused attributes
+ self._handler_id = handler_id
+ self.mib_data_sync = 0 # Assume at reboot!
+
+ # NOTE: when creating response frames, use the basic method of constructing
+ # these frames as the encoding created is unit-tested elsewhere
+ self._omci_response = {
+ OP.Get.value: {
+ CircuitPack.class_id: {
+ 257: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciGetResponse.message_id,
+ omci_message=OmciGetResponse(
+ entity_class=CircuitPack.class_id,
+ entity_id=0,
+ success_code=RC.Success.value,
+ attributes_mask=CircuitPack.mask_for('number_of_ports'),
+ data=OmciMaskedData('value',
+ entity_class=CircuitPack.class_id,
+ attributes_mask=CircuitPack.mask_for('number_of_ports'))
+ ))
+ },
+ # Additional OMCI GET request responses here if needed
+ },
+ OP.GetNext.value: {},
+ OP.Create.value: {
+ # TODO: Create some OMCI CREATE request responses here.
+
+ # def send_create_gal_ethernet_profile(self,
+ # entity_id,
+ # max_gem_payload_size):
+ # frame = OmciFrame(
+ # transaction_id=self.get_tx_id(),
+ # message_type=OmciCreate.message_id,
+ # omci_message=OmciCreate(
+ # entity_class=GalEthernetProfile.class_id,
+ # entity_id=entity_id,
+ # data=dict(
+ # max_gem_payload_size=max_gem_payload_size
+ # )
+ # )
+ # )
+ # self.send_omci_message(frame)
+ },
+ OP.Set.value: {
+ # TODO: Create some OMCI SET request responses here.
+
+ # def send_set_admin_state(self,
+ # entity_id,
+ # admin_state):
+ # data = dict(
+ # administrative_state=admin_state
+ # )
+ # frame = OmciFrame(
+ # transaction_id=self.get_tx_id(),
+ # message_type=OmciSet.message_id,
+ # omci_message=OmciSet(
+ # entity_class=OntG.class_id,
+ # entity_id=entity_id,
+ # attributes_mask=OntG.mask_for(*data.keys()),
+ # data=data
+ # )
+ # )
+ # self.send_omci_message(frame)
+
+ },
+ OP.Delete.value: {
+ # TODO: Create some OMCI DELETE responses here.
+ },
+ OP.MibReset.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciMibResetResponse.message_id,
+ omci_message=OmciMibResetResponse(
+ entity_class=OntData.class_id,
+ entity_id=0,
+ success_code=RC.Success.value
+ ))
+ }
+ },
+ OP.MibUpload.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciMibUploadResponse.message_id,
+ omci_message=OmciMibUploadResponse(
+ entity_class=OntData.class_id,
+ entity_id=0,
+ number_of_commands=3 # Should match list size for MibUploadNext below
+ ))
+ }
+ },
+ # OP.MibUploadNext.value: {
+ # OntData.class_id: {
+ # 0: [
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # OmciFrame(transaction_id=0,
+ # message_type=OmciMibUploadNextResponse.message_id,
+ # omci_message=OmciMibUploadNextResponse(
+ # entity_class=OntData.class_id,
+ # entity_id=0,
+ # object_entity_id=0, # TODO: Pick one
+ # object_attributes_mask=0, # TODO: Pick one
+ # object_data=None # TODO: Pick one
+ # )),
+ # ]
+ # }
+ # },
+ OP.Reboot.value: {
+ OntData.class_id: {
+ 0: OmciFrame(transaction_id=0, # Will get replaced
+ message_type=OmciRebootResponse.message_id,
+ omci_message=OmciRebootResponse(
+ entity_class=OntG.class_id,
+ entity_id=0,
+ success_code=RC.Success.value
+ ))
+ }
+ },
+ }
+ # TODO: Support Autonomous ONU messages as well
+
+ def tearDown(self):
+ """Test case cleanup"""
+ pass
+
+ def _request_to_response_type(self, message_type):
+ return {
+ OP.Create.value: OmciCreateResponse,
+ OP.Delete.value: OmciDeleteResponse,
+ OP.Set.value: OmciSetResponse,
+ OP.Get.value: OmciGetResponse,
+ OP.GetNext.value: OmciGetNextResponse,
+ OP.MibUpload.value: OmciMibUploadResponse,
+ OP.MibUploadNext.value: OmciMibUploadNextResponse,
+ OP.MibReset.value: OmciMibResetResponse,
+ OP.Reboot.value: OmciRebootResponse,
+ }.get(message_type & 0x1F, None)
+
+ def rx_omci_frame(self, msg):
+ try:
+ frame = OmciFrame(msg.decode('hex'))
+ response = None
+ response_type = self._request_to_response_type(frame.fields['message_type'])
+ transaction_id = frame.fields['transaction_id']
+
+ omci_message = frame.fields.get('omci_message')
+
+ class_id = omci_message.fields.get('entity_class') \
+ if omci_message is not None else None
+ instance_id = omci_message.fields.get('entity_id') \
+ if omci_message is not None else None
+
+ # Look up hardcode responses based on class and instance ID. If found
+ # return the response, otherwise send back an error
+
+ if response_type is None:
+ status = RC.ProcessingError.value
+ elif class_id is None:
+ status = RC.UnknownEntity.value
+ elif instance_id is None:
+ status = RC.UnknownInstance.value
+ else:
+ status = RC.Success.value
+ try:
+ response_id = response_type.message_id & 0x1f
+ response = self._omci_response[response_id][class_id][instance_id]
+
+ if response_id == OP.MibUploadNext.value:
+ # Special case. Need to get requested entry
+ assert isinstance(response, list)
+ pass
+ pass
+ pass
+ pass
+
+ if isinstance(omci_message, OmciGetNext):
+ response = response[omci_message.fields['command_sequence_number']]
+
+ if isinstance(response, dict):
+ if response['failures'] > 0:
+ response['failures'] -= 1
+ return None
+ else: response = response['frame']
+
+ response.fields['transaction_id'] = transaction_id
+ if 'success_code' in response.fields['omci_message'].fields:
+ response.fields['omci_message'].fields['success_code'] = status
+
+ if status == RC.Success.value:
+ if response_type.message_id in [OmciCreateResponse.message_id,
+ OmciDeleteResponse.message_id,
+ OmciSetResponse.message_id]:
+ self.mib_data_sync += 1
+ if self.mib_data_sync > 255:
+ self.mib_data_sync = 1
+ elif response_type.message_id == OmciMibResetResponse.message_id:
+ self.mib_data_sync = 0
+
+ except KeyError as e:
+ bad_key = e.args[0]
+ if bad_key == class_id:
+ status = RC.UnknownEntity.value
+ elif bad_key == instance_id:
+ status = RC.UnknownInstance.value
+ else:
+ status = RC.ProcessingError.value
+
+ if status != RC.Success.value and \
+ response_type not in [OmciMibUploadResponse,
+ OmciMibUploadNextResponse]:
+ response = OmciFrame(transaction_id=transaction_id,
+ message_type=response_type.message_id,
+ omci_message=response_type(
+ entity_class=class_id,
+ entity_id=instance_id,
+ success_code=status
+ ))
+ return response
+
+ except Exception as e:
+ pass
+
+ @property
+ def proxy_address(self, device_id='1'):
+ if self._proxy_address is None:
+ self._proxy_address = Device.ProxyAddress(
+ device_id=device_id,
+ channel_group_id=1,
+ channel_id=1,
+ channel_termination="XGSPON",
+ onu_id=20,
+ onu_session_id=1)
+
+ return self._proxy_address
+
diff --git a/test/unit/extensions/omci/mock/mock_onu_handler.py b/test/unit/extensions/omci/mock/mock_onu_handler.py
new file mode 100644
index 0000000..9ebe1f6
--- /dev/null
+++ b/test/unit/extensions/omci/mock/mock_onu_handler.py
@@ -0,0 +1,79 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from mock_adapter_agent import MockProxyAddress, MockDevice
+from pyvoltha.adapters.extensions.omci.omci_cc import *
+from pyvoltha.adapters.extensions.omci.omci_entities import entity_id_to_class_map
+
+
+class MockOnuHandler(MockDevice):
+ """
+ Minimal class to handle ONU needs in OpenOMCI testing
+
+ So that we do not have to duplicate the IAdapter functionality, just
+ the handler, the OLT and ONU handlers are derived from a mock Device
+ base class so that we can access the _devices map and get either a
+ Device to play with (like the real thing) or the associated handler
+ """
+ def __init__(self, adapter_agent, parent_id, device_id, pon_id, onu_id):
+
+ self.proxy_address = MockProxyAddress(parent_id, pon_id, onu_id)
+ super(MockOnuHandler, self).__init__(device_id, self.proxy_address)
+
+ self.device_id = device_id
+ self.device = self
+ self._adapter_agent = adapter_agent
+
+ self.onu_mock = None
+ self.omci_cc = OMCI_CC(adapter_agent, device_id, me_map=entity_id_to_class_map)
+
+ # Items that you can change to perform various test failures
+
+ self._enabled = True
+
+ def tearDown(self):
+ """Test case cleanup"""
+ if self.onu_mock is not None:
+ self.onu_mock.tearDown()
+ self.onu_mock = None
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+ olt = self._adapter_agent.get_device(self.proxy_address.device_id)
+ if olt is not None and self.proxy_address.channel_id in olt.enabled_pons:
+ if self._enabled:
+ olt.activated_onus.add(self.serial_number)
+ else:
+ olt.activated_onus.discard(self.serial_number)
+
+ # Begin minimal set of needed IAdapter interfaces
+
+ # TODO: Implement minimal functionality
+
+ def send_proxied_message(self, proxy_address, msg):
+ assert False, 'OpenOMCI will implement this for the MOCK ONU'
+
+ def receive_proxied_message(self, _, msg):
+ # Rx of OMCI message from MOCK OLT
+
+ if self.omci_cc is not None and self.enabled:
+ self.omci_cc.receive_message(msg.decode('hex'))
diff --git a/test/unit/extensions/omci/mock/mock_task.py b/test/unit/extensions/omci/mock/mock_task.py
new file mode 100644
index 0000000..aad0c60
--- /dev/null
+++ b/test/unit/extensions/omci/mock/mock_task.py
@@ -0,0 +1,94 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyvoltha.adapters.extensions.omci.tasks.task import Task
+from pyvoltha.common.utils.asleep import asleep
+from twisted.internet.defer import inlineCallbacks, failure
+from twisted.internet import reactor
+
+
+class SimpleTask(Task):
+ def __init__(self, omci_agent, device_id,
+ exclusive=True,
+ success=True,
+ delay=0,
+ value=None,
+ priority=Task.DEFAULT_PRIORITY,
+ watchdog_timeout=Task.DEFAULT_WATCHDOG_SECS):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param exclusive: (bool) True if the task should run by itself
+ :param success: (bool) True if the task should complete successfully
+ :param delay: (int/float) Time it takes the task to complete
+ :param priority (int) Priority of the task
+ :param watchdog_timeout (int or float) Watchdog timeout after task start
+ :param value: (various) The value (string, int, ...) to return if successful
+ or an Exception to send to the errBack if 'success'
+ is False
+ """
+ super(SimpleTask, self).__init__('Simple Mock Task',
+ omci_agent,
+ device_id,
+ exclusive=exclusive,
+ priority=priority,
+ watchdog_timeout=watchdog_timeout)
+ self._delay = delay
+ self._success = success
+ self._value = value
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(SimpleTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start MIB Synchronization tasks
+ """
+ super(SimpleTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_task)
+
+ def stop(self):
+ """
+ Shutdown MIB Synchronization tasks
+ """
+ self.cancel_deferred()
+ super(SimpleTask, self).stop()
+
+ @inlineCallbacks
+ def perform_task(self):
+ """
+ Get the 'mib_data_sync' attribute of the ONU
+ """
+ try:
+ if self._delay > 0:
+ yield asleep(self._delay)
+
+ if self._success:
+ self.deferred.callback(self._value)
+
+ self.deferred.errback(failure.Failure(self._value))
+
+ except Exception as e:
+ self.deferred.errback(failure.Failure(e))