Workflow-driver for VOLTHA2.x and EPON adapter
Change-Id: I6ecb60014f005f7691d2c7ccd1c1ef56de79a5d4
diff --git a/xos/synchronizer/__init__.py b/xos/synchronizer/__init__.py
new file mode 100644
index 0000000..8612cfd
--- /dev/null
+++ b/xos/synchronizer/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/config.yaml b/xos/synchronizer/config.yaml
new file mode 100644
index 0000000..34ff082
--- /dev/null
+++ b/xos/synchronizer/config.yaml
@@ -0,0 +1,44 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: ntt-workflow-driver
+core_version: ">=4.0.0"
+required_models:
+ - NttWorkflowDriverService
+ - NttWorkflowDriverServiceInstance
+ - NttWorkflowDriverOltInformation
+ - RCORDSubscriber
+ - ONUDevice
+ - TechnologyProfile
+model_policies_dir: "/opt/xos/synchronizers/ntt-workflow-driver/model_policies"
+models_dir: "/opt/xos/synchronizers/ntt-workflow-driver/models"
+event_steps_dir: "/opt/xos/synchronizers/ntt-workflow-driver/event_steps"
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ file:
+ class: logging.handlers.RotatingFileHandler
+ filename: /var/log/xos.log
+ maxBytes: 10485760
+ backupCount: 5
+ loggers:
+ 'multistructlog':
+ handlers:
+ - console
+ - file
+ level: DEBUG
diff --git a/xos/synchronizer/event_steps/__init__.py b/xos/synchronizer/event_steps/__init__.py
new file mode 100644
index 0000000..8612cfd
--- /dev/null
+++ b/xos/synchronizer/event_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/event_steps/auth_event.py b/xos/synchronizer/event_steps/auth_event.py
new file mode 100644
index 0000000..f51855c
--- /dev/null
+++ b/xos/synchronizer/event_steps/auth_event.py
@@ -0,0 +1,34 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import NttHelpers
+
+
+class SubscriberAuthEventStep(EventStep):
+ topics = ["authentication.events"]
+ technology = "kafka"
+
+ def __init__(self, *args, **kwargs):
+ super(SubscriberAuthEventStep, self).__init__(*args, **kwargs)
+
+ def process_event(self, event):
+ value = json.loads(event.value)
+ self.log.info("authentication.events: Got event for subscriber", event_value=value)
+
+ si = NttHelpers.find_or_create_ntt_si(self.model_accessor, self.log, value)
+ self.log.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = value["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
new file mode 100644
index 0000000..14a95ac
--- /dev/null
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -0,0 +1,81 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import NttHelpers
+import requests
+from requests.auth import HTTPBasicAuth
+
+class ONUEventStep(EventStep):
+ topics = ["onu.events"]
+ technology = "kafka"
+
+ max_onu_retry = 50
+
+ def __init__(self, *args, **kwargs):
+ super(ONUEventStep, self).__init__(*args, **kwargs)
+
+ def process_event(self, event):
+ value = json.loads(event.value)
+ self.log.info("onu.events: received event", value=value)
+ # This is needed to be compatible with both Voltha 1.7 and Voltha 2.x
+ # It supposes to have only 1 subscriber per ONU and the subscriber is connected to the first port
+ if "-" in value["serialNumber"] and not value["serialNumber"].endswith("-1"):
+ self.log.info("Skip event, only consider [serialNumber]-1 events")
+ return
+
+ ntt_oi = NttHelpers.find_or_create_ntt_oi(self.model_accessor, self.log, value)
+ ntt_oi.no_sync = False
+ ntt_oi.of_dpid = value["deviceId"]
+ ntt_oi.save_changed_fields(always_update_timestamp=True)
+ ntt_si = NttHelpers.find_or_create_ntt_si(self.model_accessor, self.log, value)
+ if value["status"] == "activated":
+ self.log.info("onu.events: activated onu", value=value)
+ ntt_si.no_sync = False
+ ntt_si.uni_port_id = long(value["portNumber"])
+ ntt_si.of_dpid = value["deviceId"]
+ ntt_si.oper_onu_status = "ENABLED"
+ ntt_si.save_changed_fields(always_update_timestamp=True)
+ elif value["status"] == "disabled":
+ self.log.info("onu.events: disabled onu, resetting the subscriber", value=value)
+ ntt_si.oper_onu_status = "DISABLED"
+ ntt_si.save_changed_fields(always_update_timestamp=True)
+
+ log.debug("Removing subscriber with info",
+ uni_port_id = ntt_si.uni_port_id,
+ dp_id = ntt_si.of_dpid
+ )
+
+ onos_voltha_basic_auth = HTTPBasicAuth("karaf", "karaf")
+
+ handle = "%s/%s" % (ntt_si.of_dpid, ntt_si.uni_port_id)
+ # TODO store URL and PORT in the vOLT Service model
+ full_url = "http://129.60.110.180:8181/onos/olt/oltapp/%s" % (handle)
+
+ log.info("Sending request to onos-voltha", url=full_url)
+
+ request = requests.delete(full_url, auth=onos_voltha_basic_auth)
+
+ if request.status_code != 204:
+ raise Exception("Failed to remove subscriber from onos-voltha: %s" % request.text)
+
+ log.info("Removed Subscriber from onos voltha", response=request.text)
+
+ return
+ else:
+ self.log.warn("onu.events: Unknown status value: %s" % value["status"], value=value)
+ return
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_auth_event.py b/xos/synchronizer/event_steps/test_auth_event.py
new file mode 100644
index 0000000..9b1b954
--- /dev/null
+++ b/xos/synchronizer/event_steps/test_auth_event.py
@@ -0,0 +1,95 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, Mock
+import json
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestSubscriberAuthEvent(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "../test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ from multistructlog import create_logger
+ log = create_logger(Config().get('logging'))
+ # END Setting up the config module
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("ntt-workflow-driver", "ntt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from auth_event import SubscriberAuthEventStep
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.model_accessor = model_accessor
+ self.log = log
+
+ self.event_step = SubscriberAuthEventStep(model_accessor=self.model_accessor, log=self.log)
+
+ self.event = Mock()
+
+ self.ntt_si = NttWorkflowDriverServiceInstance()
+ self.ntt_si.serial_number = "BRCM1234"
+ self.ntt_si.save = Mock()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_authenticate_subscriber(self):
+
+ self.event.value = json.dumps({
+ 'authenticationState': "APPROVED",
+ 'deviceId': "of:0000000ce2314000",
+ 'portNumber': "101",
+ 'serialNumber': "BRCM1234",
+ })
+
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as ntt_si_mock:
+
+ ntt_si_mock.return_value = [self.ntt_si]
+
+ self.event_step.process_event(self.event)
+
+ self.ntt_si.save.assert_called()
+ self.ntt_si.save.assert_called_with(
+ always_update_timestamp=True, update_fields=[
+ 'authentication_state', 'serial_number', 'updated'])
+ self.assertEqual(self.ntt_si.authentication_state, 'APPROVED')
+
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
+ unittest.main()
\ No newline at end of file
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
new file mode 100644
index 0000000..22e031e
--- /dev/null
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -0,0 +1,226 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, Mock
+import json
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestSyncOLTDevice(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "../test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ # END Setting up the config module
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("ntt-workflow-driver", "ntt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from onu_event import ONUEventStep
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.model_accessor = model_accessor
+ self.log = Mock()
+
+ self.event_step = ONUEventStep(model_accessor=self.model_accessor, log=self.log)
+
+ self.event = Mock()
+ self.event_dict = {
+ 'status': 'activated',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16'
+ }
+ self.event.value = json.dumps(self.event_dict)
+
+ self.pppoe = NttWorkflowDriverService(name="ntt-workflow-driver")
+
+ self.pon_port = PONPort()
+ self.pon_port.port_no = 1234
+
+ self.onu = ONUDevice()
+ self.onu.pon_port = self.pon_port
+ self.onu.serial_number = "BRCM1234"
+ self.onu.mac_address = "0a0a0a"
+
+ self.technologyProfile = TechnologyProfile()
+ self.technologyProfile.profile_id = 64
+ self.technologyProfile.profile_value = '{"profile_type": "EPON","epon_attribute": {"package_type": "A"}}'
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_create_instance(self):
+
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as ntt_si_mock, \
+ patch.object(NttWorkflowDriverOltInformation.objects, "get_items") as ntt_oi_mock, \
+ patch.object(NttWorkflowDriverService.objects, "get_items") as service_mock, \
+ patch.object(NttWorkflowDriverServiceInstance, "save", autospec=True) as mock_save, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+
+ ntt_si_mock.return_value = []
+ ntt_oi_mock.return_value = []
+ service_mock.return_value = [self.pppoe]
+ onu_mock.return_value = [self.onu]
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ self.event_step.process_event(self.event)
+
+ ntt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(ntt_si.serial_number, self.event_dict['serialNumber'])
+ self.assertEqual(ntt_si.of_dpid, self.event_dict['deviceId'])
+ self.assertEqual(ntt_si.uni_port_id, long(self.event_dict['portNumber']))
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(ntt_si.admin_onu_state, "AWAITING")
+ self.assertEqual(ntt_si.oper_onu_status, "ENABLED")
+
+ def test_reuse_instance(self):
+
+ si = NttWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo"
+ )
+
+ oi = NttWorkflowDriverOltInformation(
+ of_dpid="of:109299321"
+ )
+
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as ntt_si_mock, \
+ patch.object(NttWorkflowDriverOltInformation.objects, "get_items") as ntt_oi_mock, \
+ patch.object(NttWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
+
+ ntt_si_mock.return_value = [si]
+ ntt_oi_mock.return_value = [oi]
+
+ self.event_step.process_event(self.event)
+
+ ntt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(ntt_si.serial_number, self.event_dict['serialNumber'])
+ self.assertEqual(ntt_si.of_dpid, self.event_dict['deviceId'])
+ self.assertEqual(ntt_si.uni_port_id, long(self.event_dict['portNumber']))
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(ntt_si.admin_onu_state, "AWAITING")
+ self.assertEqual(ntt_si.oper_onu_status, "ENABLED")
+
+ def test_disable_onu(self):
+ self.event_dict = {
+ 'status': 'disabled',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16',
+ }
+
+ si = NttWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo",
+ admin_onu_state="ENABLED",
+ oper_onu_status="ENABLED",
+ )
+
+ oi = NttWorkflowDriverOltInformation(
+ of_dpid="of:109299321"
+ )
+
+ self.event.value = json.dumps(self.event_dict)
+
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as ntt_si_mock, \
+ patch.object(NttWorkflowDriverOltInformation.objects, "get_items") as ntt_oi_mock, \
+ patch.object(NttWorkflowDriverServiceInstance, "save_changed_fields", autospec=True) as mock_save:
+ ntt_si_mock.return_value = [si]
+ ntt_oi_mock.return_value = [oi]
+
+ self.event_step.process_event(self.event)
+
+ ntt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(ntt_si.admin_onu_state, 'ENABLED')
+ self.assertEqual(ntt_si.oper_onu_status, 'DISABLED')
+
+ def test_enable_onu(self):
+ self.event_dict = {
+ 'status': 'activated',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16',
+ }
+
+ si = NttWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo",
+ admin_onu_state="DISABLED",
+ oper_onu_status="DISABLED",
+ )
+
+ oi = NttWorkflowDriverOltInformation(
+ of_dpid="of:109299321"
+ )
+
+ self.event.value = json.dumps(self.event_dict)
+
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as ntt_si_mock, \
+ patch.object(NttWorkflowDriverOltInformation.objects, "get_items") as ntt_oi_mock, \
+ patch.object(NttWorkflowDriverServiceInstance, "save_changed_fields", autospec=True) as mock_save:
+ ntt_si_mock.return_value = [si]
+ ntt_oi_mock.return_value = [oi]
+
+ self.event_step.process_event(self.event)
+
+ ntt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(ntt_si.admin_onu_state, 'DISABLED')
+ self.assertEqual(ntt_si.oper_onu_status, 'ENABLED')
+
+
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
+ unittest.main()
diff --git a/xos/synchronizer/helpers.py b/xos/synchronizer/helpers.py
new file mode 100644
index 0000000..7d66e8f
--- /dev/null
+++ b/xos/synchronizer/helpers.py
@@ -0,0 +1,171 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from xossynchronizer.steps.syncstep import DeferredException
+import time
+import requests
+from requests.auth import HTTPBasicAuth
+
+class NttHelpers():
+ @staticmethod
+ def validate_onu(model_accessor, log, ntt_si):
+ """
+ This method validate an ONU against the whitelist and set the appropriate state.
+ It's expected that the deferred exception is managed in the caller method,
+ for example a model_policy or a sync_step.
+
+ :param ntt_si: NttWorkflowDriverServiceInstance
+ :return: [boolean, string]
+ """
+ tech_value = json.loads(model_accessor.TechnologyProfile.objects.get(profile_id=64).profile_value)
+ if tech_value["profile_type"] == "EPON":
+ tech = tech_value["epon_attribute"]["package_type"]
+ else:
+ tech = tech_value["profile_type"]
+
+ oss_service = ntt_si.owner.leaf_model
+
+ # See if there is a matching entry in the whitelist.
+ matching_entries = model_accessor.NttWorkflowDriverWhiteListEntry.objects.filter(
+ owner_id=oss_service.id,
+ )
+
+ matching_entries = [e for e in matching_entries if e.mac_address.lower() == ntt_si.mac_address.lower()]
+
+ if len(matching_entries) == 0:
+ log.warn("ONU not found in whitelist")
+ return [False, "ONU not found in whitelist"]
+
+ whitelisted = matching_entries[0]
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=ntt_si.serial_number.split("-")[0])
+ pon_port = onu.pon_port
+ except IndexError:
+ raise DeferredException("ONU device %s is not know to XOS yet" % ntt_si.serial_number)
+
+ if onu.admin_state == "ADMIN_DISABLED":
+ return [False, "ONU has been manually disabled"]
+
+ if pon_port.port_no < whitelisted.pon_port_from or pon_port.port_no > whitelisted.pon_port_to:
+ log.warn("PON port is not approved.")
+ return [False, "PON port is not approved."]
+
+ if tech == "B":
+ if ntt_si.authentication_state == "DENIED":
+ return [False, "IEEE802.1X authentication has not been denied."]
+ elif ntt_si.authentication_state != "APPROVED":
+ return [True, "IEEE802.1X authentication has not been done yet."]
+ else:
+ pass
+
+ log.debug("Adding subscriber with info",
+ uni_port_id = ntt_si.uni_port_id,
+ dp_id = ntt_si.of_dpid
+ )
+
+ time.sleep(180)
+
+ onos_voltha_basic_auth = HTTPBasicAuth("karaf", "karaf")
+
+ handle = "%s/%s" % (ntt_si.of_dpid, ntt_si.uni_port_id)
+ # TODO store URL and PORT in the vOLT Service model
+ full_url = "http://129.60.110.180:8181/onos/olt/oltapp/%s" % (handle)
+
+ log.info("Sending request to onos-voltha", url=full_url)
+
+ request = requests.post(full_url, auth=onos_voltha_basic_auth)
+
+ if request.status_code != 200:
+ raise Exception("Failed to add subscriber in onos-voltha: %s" % request.text)
+ log.info("Added Subscriber in onos voltha", response=request.text)
+
+ return [True, "ONU has been validated"]
+
+ @staticmethod
+ def find_or_create_ntt_si(model_accessor, log, event):
+ try:
+ ntt_si = model_accessor.NttWorkflowDriverServiceInstance.objects.get(
+ serial_number=event["serialNumber"]
+ )
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=event["serialNumber"].split("-")[0])
+ ntt_si.mac_address = onu.mac_address
+ except IndexError:
+ log.debug("NttHelpers: ONU has been deleted", si=ntt_si)
+ log.debug("NttHelpers: Found existing NttWorkflowDriverServiceInstance", si=ntt_si)
+ except IndexError:
+ # create an NttWorkflowDriverServiceInstance, the validation will be
+ # triggered in the corresponding sync step
+ while True:
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=event["serialNumber"].split("-")[0])
+ break
+ except IndexError:
+ time.sleep(1)
+ continue
+
+ ntt_si = model_accessor.NttWorkflowDriverServiceInstance(
+ serial_number=event["serialNumber"],
+ of_dpid=event["deviceId"],
+ uni_port_id=long(event["portNumber"]),
+ mac_address=onu.mac_address,
+ # we assume there is only one NttWorkflowDriverService
+ owner=model_accessor.NttWorkflowDriverService.objects.first()
+ )
+ log.debug("NttHelpers: Created new NttWorkflowDriverServiceInstance", si=ntt_si)
+ return ntt_si
+
+ @staticmethod
+ def find_or_create_ntt_oi(model_accessor, log, event):
+ try:
+ ntt_oi = model_accessor.NttWorkflowDriverOltInformation.objects.get(
+ of_dpid=event["deviceId"]
+ )
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=event["serialNumber"].split("-")[0])
+ ntt_oi.port_no = onu.pon_port.port_no
+ tech_value = json.loads(model_accessor.TechnologyProfile.objects.get(profile_id=64).profile_value)
+ if tech_value["profile_type"] == "EPON":
+ tech = tech_value["epon_attribute"]["package_type"]
+ else:
+ tech = tech_value["profile_type"]
+ ntt_oi.olt_package = tech
+ except IndexError:
+ log.debug("NttHelpers: ONU has been deleted", oi=ntt_oi)
+ log.debug("NttHelpers: Found existing NttWorkflowDriverOltInformation", oi=ntt_oi)
+ except IndexError:
+ while True:
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=event["serialNumber"].split("-")[0])
+ break
+ except IndexError:
+ time.sleep(1)
+ continue
+
+ tech_value = json.loads(model_accessor.TechnologyProfile.objects.get(profile_id=64).profile_value)
+ if tech_value["profile_type"] == "EPON":
+ tech = tech_value["epon_attribute"]["package_type"]
+ else:
+ tech = tech_value["profile_type"]
+
+ pon_port = onu.pon_port
+ ntt_oi = model_accessor.NttWorkflowDriverOltInformation(
+ of_dpid=event["deviceId"],
+ olt_package=tech,
+ port_no=pon_port.port_no,
+ owner=model_accessor.NttWorkflowDriverService.objects.first()
+ )
+ log.debug("NttHelpers: Created new NttWorkflowDriverOltInformation", oi=ntt_oi)
+ return ntt_oi
\ No newline at end of file
diff --git a/xos/synchronizer/migrations/0001_initial.py b/xos/synchronizer/migrations/0001_initial.py
new file mode 100644
index 0000000..19cfbe8
--- /dev/null
+++ b/xos/synchronizer/migrations/0001_initial.py
@@ -0,0 +1,106 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.27 on 2020-01-14 19:25
+from __future__ import unicode_literals
+
+import core.models.xosbase_header
+from django.db import migrations, models
+import django.db.models.deletion
+import django.utils.timezone
+
+
+class Migration(migrations.Migration):
+
+ initial = True
+
+ dependencies = [
+ ('core', '0012_backupoperation_decl_uuid'),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='NttWorkflowDriverService',
+ fields=[
+ ('service_decl_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.Service_decl')),
+ ],
+ options={
+ 'verbose_name': 'NttWorkflowDriver Service',
+ },
+ bases=('core.service',),
+ ),
+ migrations.CreateModel(
+ name='NttWorkflowDriverServiceInstance',
+ fields=[
+ ('serviceinstance_decl_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.ServiceInstance_decl')),
+ ('serial_number', models.CharField(help_text=b'Serial number of ONU', max_length=256, unique=True)),
+ ('authentication_state', models.CharField(choices=[(b'AWAITING', b'Awaiting'), (b'STARTED', b'Started'), (b'REQUESTED', b'Requested'), (b'APPROVED', b'Approved'), (b'DENIED', b'Denied')], default=b'AWAITING', help_text=b'Subscriber authentication state', max_length=50)),
+ ('of_dpid', models.CharField(help_text=b'OLT MAC address', max_length=256)),
+ ('uni_port_id', models.IntegerField(help_text=b'ONU UNI port ID')),
+ ('admin_onu_state', models.CharField(choices=[(b'AWAITING', b'Awaiting'), (b'ENABLED', b'Enabled'), (b'DISABLED', b'Disabled')], default=b'AWAITING', help_text=b'ONU administrative state', max_length=256)),
+ ('status_message', models.CharField(blank=True, default=b'', help_text=b'Status text of current state machine state', max_length=256, null=True)),
+ ('pppoe_state', models.CharField(choices=[(b'AWAITING', b'Awaiting'), (b'INITIATED', b'Initiated'), (b'CONNECTED', b'Connected'), (b'DISCONNECTED', b'Disconnected')], default=b'AWAITING', help_text=b'State of the subscriber PPPoE session', max_length=256)),
+ ('pppoe_session_id', models.CharField(blank=True, help_text=b'Subscriber PPPoE session ID', max_length=20, null=True)),
+ ('ipcp_state', models.CharField(choices=[(b'AWAITING', b'Awaiting'), (b'CONF_ACK', b'Ack'), (b'CONF_REQUEST', b'Requested')], default=b'AWAITING', help_text=b'State of the IPCP protocol for IP address assignment', max_length=256)),
+ ('ip_address', models.CharField(blank=True, help_text=b'Subscriber IP address, learned from IPCP', max_length=20, null=True)),
+ ('mac_address', models.CharField(blank=True, help_text=b'Subscriber MAC address', max_length=20, null=True)),
+ ('oper_onu_status', models.CharField(choices=[(b'AWAITING', b'Awaiting'), (b'ENABLED', b'Enabled'), (b'DISABLED', b'Disabled')], default=b'AWAITING', help_text=b'ONU operational state', max_length=256)),
+ ],
+ options={
+ 'verbose_name': 'NttWorkflowDriver Service Instance',
+ },
+ bases=('core.serviceinstance',),
+ ),
+ migrations.CreateModel(
+ name='NttWorkflowDriverWhiteListEntry',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created', models.DateTimeField(auto_now_add=True, help_text=b'Time this model was created')),
+ ('updated', models.DateTimeField(default=django.utils.timezone.now, help_text=b'Time this model was changed by a non-synchronizer')),
+ ('enacted', models.DateTimeField(blank=True, default=None, help_text=b'When synced, set to the timestamp of the data that was synced', null=True)),
+ ('policed', models.DateTimeField(blank=True, default=None, help_text=b'When policed, set to the timestamp of the data that was policed', null=True)),
+ ('backend_register', models.CharField(blank=True, default=b'{}', max_length=1024, null=True)),
+ ('backend_need_delete', models.BooleanField(default=False)),
+ ('backend_need_reap', models.BooleanField(default=False)),
+ ('backend_status', models.CharField(default=b'Provisioning in progress', max_length=1024)),
+ ('backend_code', models.IntegerField(default=0)),
+ ('deleted', models.BooleanField(default=False)),
+ ('write_protect', models.BooleanField(default=False)),
+ ('lazy_blocked', models.BooleanField(default=False)),
+ ('no_sync', models.BooleanField(default=False)),
+ ('no_policy', models.BooleanField(default=False)),
+ ('policy_status', models.CharField(blank=True, default=b'Policy in process', max_length=1024, null=True)),
+ ('policy_code', models.IntegerField(blank=True, default=0, null=True)),
+ ('leaf_model_name', models.CharField(help_text=b'The most specialized model in this chain of inheritance, often defined by a service developer', max_length=1024)),
+ ('backend_need_delete_policy', models.BooleanField(default=False, help_text=b'True if delete model_policy must be run before object can be reaped')),
+ ('xos_managed', models.BooleanField(default=True, help_text=b'True if xos is responsible for creating/deleting this object')),
+ ('backend_handle', models.CharField(blank=True, help_text=b'Handle used by the backend to track this object', max_length=1024, null=True)),
+ ('changed_by_step', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a sync step', null=True)),
+ ('changed_by_policy', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a model policy', null=True)),
+ ('mac_address', models.CharField(blank=True, help_text=b'ONU MAC Address for package A authentication', max_length=256, null=True)),
+ ('pon_port_from', models.IntegerField(help_text=b'Start ID of approved PON Port')),
+ ('pon_port_to', models.IntegerField(help_text=b'End ID of approved PON Port')),
+ ('owner', models.ForeignKey(help_text=b'NttWorkflowDriverService that owns this white list entry', on_delete=django.db.models.deletion.CASCADE, related_name='whitelist_entries', to='ntt-workflow-driver.NttWorkflowDriverService')),
+ ],
+ options={
+ 'verbose_name': 'ONU Whitelist',
+ },
+ bases=(models.Model, core.models.xosbase_header.PlModelMixIn),
+ ),
+ migrations.AlterUniqueTogether(
+ name='nttworkflowdriverwhitelistentry',
+ unique_together=set([('owner', 'mac_address')]),
+ ),
+ ]
diff --git a/xos/synchronizer/migrations/0002_auto.py b/xos/synchronizer/migrations/0002_auto.py
new file mode 100644
index 0000000..028b5cb
--- /dev/null
+++ b/xos/synchronizer/migrations/0002_auto.py
@@ -0,0 +1,64 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.11 on 2020-11-21 07:39
+from __future__ import unicode_literals
+
+import core.models.xosbase_header
+from django.db import migrations, models
+import django.db.models.deletion
+import django.utils.timezone
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('ntt-workflow-driver', '0001_initial'),
+ ]
+
+ operations = [
+
+ migrations.RemoveField(
+ model_name='nttworkflowdriverserviceinstance',
+ name='pppoe_state',
+ ),
+ migrations.RemoveField(
+ model_name='nttworkflowdriverserviceinstance',
+ name='pppoe_session_id',
+ ),
+ migrations.RemoveField(
+ model_name='nttworkflowdriverserviceinstance',
+ name='ipcp_state',
+ ),
+ migrations.RemoveField(
+ model_name='nttworkflowdriverserviceinstance',
+ name='ip_address',
+ ),
+
+ migrations.CreateModel(
+ name='NttWorkflowDriverOltInformation',
+ fields=[
+ ('serviceinstance_decl_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='core.ServiceInstance_decl')),
+ ('of_dpid', models.CharField(help_text=b'OLT MAC address', max_length=256)),
+ ('olt_location', models.CharField(blank=True, help_text=b'OLT location', max_length=254, null=True)),
+ ('olt_package', models.CharField(blank=True, help_text=b'Package', max_length=254, null=True)),
+ ('port_no', models.IntegerField(blank=True, help_text=b'Port number', null=True)),
+ ],
+ options={
+ 'verbose_name': 'NttWorkflowDriver Olt Information',
+ },
+ bases=('core.serviceinstance',),
+ ),
+ ]
\ No newline at end of file
diff --git a/xos/synchronizer/migrations/__init__.py b/xos/synchronizer/migrations/__init__.py
new file mode 100644
index 0000000..dd67de8
--- /dev/null
+++ b/xos/synchronizer/migrations/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/xos/synchronizer/model_policies/__init__.py b/xos/synchronizer/model_policies/__init__.py
new file mode 100644
index 0000000..8612cfd
--- /dev/null
+++ b/xos/synchronizer/model_policies/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_serviceinstance.py
new file mode 100644
index 0000000..1c06af7
--- /dev/null
+++ b/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_serviceinstance.py
@@ -0,0 +1,76 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from helpers import NttHelpers
+from xossynchronizer.model_policies.policy import Policy
+
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+
+class DeferredException(Exception):
+ pass
+
+
+class NttWorkflowDriverServiceInstancePolicy(Policy):
+ model_name = "NttWorkflowDriverServiceInstance"
+
+ def handle_create(self, si):
+ self.logger.debug("MODEL_POLICY: handle_create for NttWorkflowDriverServiceInstance %s " % si.id)
+ self.handle_update(si)
+
+ def handle_update(self, si):
+ self.logger.debug("MODEL_POLICY: handle_update for NttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ self.process_onu_state(si)
+ si.save_changed_fields()
+
+ # Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
+ def process_onu_state(self, si):
+ [valid, message] = NttHelpers.validate_onu(self.model_accessor, self.logger, si)
+ si.status_message = message
+ if valid:
+ si.admin_onu_state = "ENABLED"
+ self.update_onu(si.serial_number, "ENABLED")
+ if si.authentication_state == '':
+ si.authentication_state = "APPROVED"
+ else:
+ si.admin_onu_state = "DISABLED"
+ self.update_onu(si.serial_number, "DISABLED")
+ si.authentication_state = "DENIED"
+
+ def update_onu(self, serial_number, admin_state):
+ onu = [onu for onu in self.model_accessor.ONUDevice.objects.all()
+ if onu.serial_number.lower() == serial_number.lower().split('-')[0]][0]
+ if onu.admin_state == "ADMIN_DISABLED":
+ self.logger.debug(
+ "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+ (onu.serial_number, admin_state))
+ return
+ if onu.admin_state == admin_state:
+ self.logger.debug(
+ "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+ (onu.serial_number, admin_state))
+ else:
+ self.logger.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (onu.serial_number, admin_state))
+ onu.admin_state = admin_state
+ onu.save_changed_fields(always_update_timestamp=True)
+
+ def handle_delete(self, si):
+ pass
diff --git a/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_whitelistentry.py
new file mode 100644
index 0000000..a3506c2
--- /dev/null
+++ b/xos/synchronizer/model_policies/model_policy_ntt_workflow_driver_whitelistentry.py
@@ -0,0 +1,80 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from helpers import NttHelpers
+from xossynchronizer.model_policies.policy import Policy
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+
+class NttWorkflowDriverWhiteListEntryPolicy(Policy):
+ model_name = "NttWorkflowDriverWhiteListEntry"
+
+ def handle_create(self, whitelist):
+ self.handle_update(whitelist)
+
+ # Update the SI if the onu_state has changed.
+ # The SI model policy will take care of updating other state.
+ def validate_onu_state(self, si):
+ [valid, message] = NttHelpers.validate_onu(self.model_accessor, self.logger, si)
+ if valid:
+ si.admin_onu_state = "ENABLED"
+ else:
+ si.admin_onu_state = "DISABLED"
+
+ self.logger.debug(
+ "MODEL_POLICY: activating NttWorkflowDriverServiceInstance because of change in the whitelist",
+ si=si,
+ onu_state=si.admin_onu_state,
+ authentication_state=si.authentication_state)
+
+ si.status_message = message
+ si.save_changed_fields(always_update_timestamp=True)
+
+ def handle_update(self, whitelist):
+ self.logger.debug("MODEL_POLICY: handle_update for NttWorkflowDriverWhiteListEntry", whitelist=whitelist)
+
+ sis = self.model_accessor.NttWorkflowDriverServiceInstance.objects.all()
+
+ for si in sis:
+
+ if si.mac_address.lower() != whitelist.mac_address.lower():
+ # NOTE we don't care about this SI as it has a different serial number
+ continue
+
+ self.validate_onu_state(si)
+
+ whitelist.backend_need_delete_policy = True
+ whitelist.save_changed_fields()
+
+ def handle_delete(self, whitelist):
+ self.logger.debug(
+ "MODEL_POLICY: handle_delete for NttWorkflowDriverWhiteListEntry")
+
+ # BUG: Sometimes the delete policy is not called, because the reaper deletes
+
+ assert(whitelist.owner)
+
+ sis = self.model_accessor.NttWorkflowDriverServiceInstance.objects.all()
+ sis = [si for si in sis if si.mac_address.lower() == whitelist.mac_address.lower()]
+
+ for si in sis:
+ self.validate_onu_state(si)
+
+ whitelist.backend_need_reap = True
+ whitelist.save_changed_fields()
diff --git a/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_serviceinstance.py b/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_serviceinstance.py
new file mode 100644
index 0000000..86e20cd
--- /dev/null
+++ b/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_serviceinstance.py
@@ -0,0 +1,135 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestModelPolicyNttWorkflowDriverServiceInstance(unittest.TestCase):
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ config = os.path.join(test_path, "../test_config.yaml")
+ from xosconfig import Config
+ Config.clear()
+ Config.init(config, 'synchronizer-config-schema.yaml')
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("ntt-workflow-driver", "ntt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_ntt_workflow_driver_serviceinstance import NttWorkflowDriverServiceInstancePolicy, NttHelpers
+ self.NttHelpers = NttHelpers
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ # Some of the functions we call have side-effects. For example, creating a VSGServiceInstance may lead to
+ # creation of tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
+ model_accessor.reset_all_object_stores()
+
+ self.policy = NttWorkflowDriverServiceInstancePolicy(model_accessor=model_accessor)
+ self.si = NttWorkflowDriverServiceInstance()
+ self.si.owner = NttWorkflowDriverService()
+ self.si.serial_number = "BRCM1234"
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_update_onu(self):
+
+ onu = ONUDevice(
+ serial_number="BRCM1234",
+ admin_state="ENABLED"
+ )
+ with patch.object(ONUDevice.objects, "get_items") as get_onu, \
+ patch.object(onu, "save") as onu_save:
+ get_onu.return_value = [onu]
+
+ self.policy.update_onu("brcm1234", "ENABLED")
+ onu_save.assert_not_called()
+
+ self.policy.update_onu("brcm1234", "DISABLED")
+ self.assertEqual(onu.admin_state, "DISABLED")
+ onu_save.assert_called_with(
+ always_update_timestamp=True, update_fields=[
+ 'admin_state', 'serial_number', 'updated'])
+
+ def test_enable_onu(self):
+ with patch.object(self.NttHelpers, "validate_onu") as validate_onu, \
+ patch.object(self.policy, "update_onu") as update_onu:
+ validate_onu.return_value = [True, "valid onu"]
+
+ self.policy.process_onu_state(self.si)
+
+ update_onu.assert_called_once()
+ update_onu.assert_called_with("BRCM1234", "ENABLED")
+
+ self.assertIn("valid onu", self.si.status_message)
+
+ def test_disable_onu(self):
+ with patch.object(self.NttHelpers, "validate_onu") as validate_onu, \
+ patch.object(self.policy, "update_onu") as update_onu:
+ validate_onu.return_value = [False, "invalid onu"]
+
+ self.policy.process_onu_state(self.si)
+
+ update_onu.assert_called_once()
+ update_onu.assert_called_with("BRCM1234", "DISABLED")
+
+ self.assertIn("invalid onu", self.si.status_message)
+
+ def test_handle_update_validate_onu(self):
+ """
+ Testing that handle_update calls validate_onu with the correct parameters
+ when necessary
+ """
+ with patch.object(self.policy, "process_onu_state") as process_onu_state, \
+ patch.object(self.policy, "update_onu") as update_onu:
+ update_onu.return_value = None
+
+ self.si.admin_onu_state = "AWAITING"
+ self.si.oper_onu_status = "AWAITING"
+ self.policy.handle_update(self.si)
+ process_onu_state.assert_called_with(self.si)
+
+ self.si.admin_onu_state = "ENABLED"
+ self.si.oper_onu_status = "ENABLED"
+ self.policy.handle_update(self.si)
+ process_onu_state.assert_called_with(self.si)
+
+ self.si.admin_onu_state = "DISABLED"
+ self.si.oper_onu_status = "DISABLED"
+ self.policy.handle_update(self.si)
+ process_onu_state.assert_called_with(self.si)
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+ unittest.main()
diff --git a/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_whitelistentry.py b/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_whitelistentry.py
new file mode 100644
index 0000000..32f309a
--- /dev/null
+++ b/xos/synchronizer/model_policies/test_model_policy_ntt_workflow_driver_whitelistentry.py
@@ -0,0 +1,129 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestModelPolicyNttWorkflowDriverWhiteListEntry(unittest.TestCase):
+ def setUp(self):
+ self.sys_path_save = sys.path
+
+ config = os.path.join(test_path, "../test_config.yaml")
+ from xosconfig import Config
+ Config.clear()
+ Config.init(config, 'synchronizer-config-schema.yaml')
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("ntt-workflow-driver", "ntt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from model_policy_ntt_workflow_driver_whitelistentry import NttWorkflowDriverWhiteListEntryPolicy, NttHelpers
+ self.NttHelpers = NttHelpers
+
+ from mock_modelaccessor import MockObjectList
+ self.MockObjectList = MockObjectList
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ # Some of the functions we call have side-effects. For example, creating a VSGServiceInstance may lead to
+ # creation of tags. Ideally, this wouldn't happen, but it does. So make sure we reset the world.
+ model_accessor.reset_all_object_stores()
+
+ self.policy = NttWorkflowDriverWhiteListEntryPolicy(model_accessor=model_accessor)
+
+ self.service = NttWorkflowDriverService()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ self.service = None
+
+ def test_enable_onu(self):
+ si = NttWorkflowDriverServiceInstance(mac_address="0a0a0a", owner_id=self.service.id, valid="invalid")
+ with patch.object(self.NttHelpers, "validate_onu") as validate_onu, \
+ patch.object(si, "save") as save_si:
+ validate_onu.return_value = [True, "valid onu"]
+
+ self.policy.validate_onu_state(si)
+
+ save_si.assert_called_once()
+ save_si.assert_called_with(
+ always_update_timestamp=True, update_fields=[
+ 'admin_onu_state', 'mac_address', 'status_message', 'updated'])
+
+ def test_disable_onu(self):
+ si = NttWorkflowDriverServiceInstance(mac_address="0a0a0a", owner_id=self.service.id, valid="invalid")
+ with patch.object(self.NttHelpers, "validate_onu") as validate_onu, \
+ patch.object(si, "save") as save_si:
+ validate_onu.return_value = [False, "invalid onu"]
+
+ self.policy.validate_onu_state(si)
+
+ save_si.assert_called_once()
+ save_si.assert_called_with(
+ always_update_timestamp=True, update_fields=[
+ 'admin_onu_state', 'mac_address', 'status_message', 'updated'])
+
+ def test_whitelist_update(self):
+ si = NttWorkflowDriverServiceInstance(mac_address="0a0a0a", owner_id=self.service.id)
+ wle = NttWorkflowDriverWhiteListEntry(mac_address="0a0a0a", owner_id=self.service.id, owner=self.service)
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as oss_si_items, \
+ patch.object(self.policy, "validate_onu_state") as validate_onu_state, \
+ patch.object(wle, "save") as wle_save:
+ oss_si_items.return_value = [si]
+
+ self.policy.handle_update(wle)
+
+ validate_onu_state.assert_called_with(si)
+ self.assertTrue(wle.backend_need_delete_policy)
+ wle_save.assert_called_with(
+ always_update_timestamp=False, update_fields=[
+ 'backend_need_delete_policy', 'mac_address', 'owner'])
+
+ def test_whitelist_delete(self):
+ si = NttWorkflowDriverServiceInstance(mac_address="0a0a0a", owner_id=self.service.id)
+ wle = NttWorkflowDriverWhiteListEntry(mac_address="0a0a0a", owner_id=self.service.id, owner=self.service)
+ with patch.object(NttWorkflowDriverServiceInstance.objects, "get_items") as oss_si_items, \
+ patch.object(self.policy, "validate_onu_state") as validate_onu_state, \
+ patch.object(wle, "save") as wle_save:
+ oss_si_items.return_value = [si]
+
+ self.policy.handle_delete(wle)
+
+ validate_onu_state.assert_called_with(si)
+ self.assertTrue(wle.backend_need_reap)
+ wle_save.assert_called_with(
+ always_update_timestamp=False, update_fields=[
+ 'backend_need_reap', 'mac_address', 'owner'])
+
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+ unittest.main()
diff --git a/xos/synchronizer/models/ntt-workflow-driver.xproto b/xos/synchronizer/models/ntt-workflow-driver.xproto
new file mode 100644
index 0000000..cf00c14
--- /dev/null
+++ b/xos/synchronizer/models/ntt-workflow-driver.xproto
@@ -0,0 +1,93 @@
+option name = "ntt-workflow-driver";
+option app_label = "ntt-workflow-driver";
+
+message NttWorkflowDriverService (Service){
+ option verbose_name = "NttWorkflowDriver Service";
+ option kind = "control";
+ option description = "Service that manages the EPON subscriber workflow";
+}
+
+message NttWorkflowDriverServiceInstance (ServiceInstance){
+ option owner_class_name = "NttWorkflowDriverService";
+ option verbose_name = "NttWorkflowDriver Service Instance";
+ option description = "Workflow settings for a particular ONU Device";
+ option policy_implemented = "True";
+
+ required string serial_number = 2 [
+ help_text = "Serial number of ONU",
+ max_length = 256,
+ tosca_key=True,
+ unique = True];
+ required string authentication_state = 3 [
+ help_text = "Subscriber authentication state",
+ choices = "(('AWAITING', 'Awaiting'), ('STARTED', 'Started'), ('REQUESTED', 'Requested'), ('APPROVED', 'Approved'), ('DENIED', 'Denied'), )",
+ default = "AWAITING",
+ feedback_state = True,
+ max_length = 50];
+ required string of_dpid = 4 [
+ help_text = "OLT MAC address",
+ max_length = 256];
+ required int32 uni_port_id = 5 [
+ help_text = "ONU UNI port ID"];
+ required string admin_onu_state = 6 [
+ help_text = "ONU administrative state",
+ choices = "(('AWAITING', 'Awaiting'), ('ENABLED', 'Enabled'), ('DISABLED', 'Disabled'))",
+ default = "AWAITING",
+ feedback_state = True,
+ max_length = 256];
+ optional string status_message = 7 [
+ help_text = "Status text of current state machine state",
+ default = "",
+ max_length = 256];
+
+ optional string mac_address = 10 [
+ help_text = "Subscriber MAC address",
+ feedback_state = True,
+ max_length = 20];
+ required string oper_onu_status = 11 [
+ help_text = "ONU operational state",
+ choices = "(('AWAITING', 'Awaiting'), ('ENABLED', 'Enabled'), ('DISABLED', 'Disabled'))",
+ default = "AWAITING",
+ feedback_state = True,
+ max_length = 256];
+}
+
+message NttWorkflowDriverWhiteListEntry (XOSBase) {
+ option verbose_name = "ONU Whitelist";
+ option plural = "nttworkflowdriverwhitelistentries";
+ option description = "White list entry";
+ option policy_implemented = "True";
+
+ required manytoone owner->NttWorkflowDriverService:whitelist_entries = 1:1001 [
+ help_text = "NttWorkflowDriverService that owns this white list entry",
+ db_index = True,
+ tosca_key = True];
+ optional string mac_address = 2 [
+ help_text = "ONU MAC Address for package A authentication",
+ max_length = 256,
+ tosca_key = True];
+ required int32 pon_port_from = 3 [
+ help_text = "Start ID of approved PON Port"];
+ required int32 pon_port_to = 4 [
+ help_text = "End ID of approved PON Port"];
+}
+
+message NttWorkflowDriverOltInformation (ServiceInstance){
+ option owner_class_name = "NttWorkflowDriverService";
+ option verbose_name = "NttWorkflowDriver Olt Information";
+ option description = "OLT information";
+ option policy_implemented = "True";
+
+ required string of_dpid = 2 [
+ help_text = "OLT MAC address",
+ max_length = 256];
+ optional string olt_location = 3 [
+ help_text = "OLT location",
+ max_length = 256];
+ optional string olt_package = 4 [
+ help_text = "Package",
+ max_length = 256];
+ optional uint32 port_no = 5 [
+ help_text = "Port number",
+ max_length = 256];
+}
diff --git a/xos/synchronizer/ntt-workflow-driver-synchronizer.py b/xos/synchronizer/ntt-workflow-driver-synchronizer.py
new file mode 100644
index 0000000..a1d9173
--- /dev/null
+++ b/xos/synchronizer/ntt-workflow-driver-synchronizer.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This imports and runs ../../xos-observer.py
+
+import os
+from xossynchronizer import Synchronizer
+from xosconfig import Config
+
+
+base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
+mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
+
+if os.path.isfile(mounted_config_file):
+ Config.init(base_config_file, 'synchronizer-config-schema.yaml', mounted_config_file)
+else:
+ Config.init(base_config_file, 'synchronizer-config-schema.yaml')
+
+Synchronizer().run()
diff --git a/xos/synchronizer/test_config.yaml b/xos/synchronizer/test_config.yaml
new file mode 100644
index 0000000..7e7c3c7
--- /dev/null
+++ b/xos/synchronizer/test_config.yaml
@@ -0,0 +1,31 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: test-NttWorkflowDriverService-config
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: "testframework"
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ loggers:
+ 'multistructlog':
+ handlers:
+ - console
+# level: DEBUG
diff --git a/xos/synchronizer/test_helpers.py b/xos/synchronizer/test_helpers.py
new file mode 100644
index 0000000..780f71d
--- /dev/null
+++ b/xos/synchronizer/test_helpers.py
@@ -0,0 +1,166 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, call, Mock, PropertyMock
+import json
+
+import os, sys
+
+test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestNttHelpers(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ # END Setting up the config module
+
+ from multistructlog import create_logger
+ self.log = create_logger(Config().get('logging'))
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("ntt-workflow-driver", "ntt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from helpers import NttHelpers
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.helpers = NttHelpers
+ self.model_accessor = model_accessor
+
+ self._volt = VOLTService()
+ self._volt.id = 1
+
+ self.volt = Service()
+ self.volt.id = 1
+ self.volt.name = "vOLT"
+ self.volt.leaf_model = self._volt
+
+ self.pon_port = PONPort()
+ self.pon_port.port_no = 1234
+
+ self.onu = ONUDevice()
+ self.onu.pon_port = self.pon_port
+ self.onu.serial_number = "BRCM1234"
+
+ self.technologyProfile = TechnologyProfile()
+ self.technologyProfile.profile_id = 64
+ self.technologyProfile.profile_value = '{"profile_type": "EPON","epon_attribute": {"package_type": "A"}}'
+
+ self.ntt_si = NttWorkflowDriverServiceInstance(
+ serial_number="BRCM1234",
+ owner=self.volt,
+ owner_id=self.volt.id,
+ mac_address="0a0a0a",
+ of_dpid="of:1234"
+ )
+
+ self.whitelist_entry = NttWorkflowDriverWhiteListEntry(
+ mac_address="0a0a0a",
+ owner=self.volt,
+ owner_id=self.volt.id,
+ pon_port_from=1234,
+ pon_port_to=1235,
+ )
+
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_not_in_whitelist(self):
+
+ with patch.object(NttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+ whitelist_mock.return_value = []
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.ntt_si)
+
+ self.assertFalse(res)
+ self.assertEqual(message, "ONU not found in whitelist")
+
+ def test_wrong_location_port(self):
+ self.pon_port.port_no = 666
+ with patch.object(NttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.ntt_si)
+
+ self.assertFalse(res)
+ self.assertEqual(message, "PON port is not approved.")
+
+ def test_deferred_validation(self):
+ with patch.object(NttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = []
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ with self.assertRaises(Exception) as e:
+ self.helpers.validate_onu(self.model_accessor, self.log, self.ntt_si)
+
+ self.assertEqual(e.exception.message, "ONU device %s is not know to XOS yet" % self.ntt_si.serial_number)
+
+ def test_validating_onu(self):
+ with patch.object(NttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.ntt_si)
+
+ self.assertTrue(res)
+ self.assertEqual(message, "ONU has been validated")
+
+ def test_validating_onu_uppercase(self):
+ self.whitelist_entry.mac_address = "0A0A0A"
+ with patch.object(NttWorkflowDriverWhiteListEntry.objects, "get_items") as whitelist_mock, \
+ patch.object(ONUDevice.objects, "get_items") as onu_mock, \
+ patch.object(TechnologyProfile.objects, "get_items") as technologyProfile_mock:
+ whitelist_mock.return_value = [self.whitelist_entry]
+ onu_mock.return_value = [self.onu]
+ technologyProfile_mock.return_value = [self.technologyProfile]
+
+ [res, message] = self.helpers.validate_onu(self.model_accessor, self.log, self.ntt_si)
+
+ self.assertTrue(res)
+ self.assertEqual(message, "ONU has been validated")
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file