Initial DT workflow
Sample PPPoE-based workflow.
Track the subscriber PPPoE protocols state, authentication and IP address assignement are done via PPPoE protocols (PAP, CHAP, IPCP...).
Work with BNG app running on ONOS that generates events in the bng.pppoe Kafka topic.
Change-Id: Iae57395dcc90d027932c790c1c36d7b3e3f3e19b
diff --git a/xos/synchronizer/event_steps/__init__.py b/xos/synchronizer/event_steps/__init__.py
new file mode 100755
index 0000000..8612cfd
--- /dev/null
+++ b/xos/synchronizer/event_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/event_steps/onu_event.py b/xos/synchronizer/event_steps/onu_event.py
new file mode 100755
index 0000000..9f342df
--- /dev/null
+++ b/xos/synchronizer/event_steps/onu_event.py
@@ -0,0 +1,50 @@
+
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import DtHelpers
+
+
+class ONUEventStep(EventStep):
+ topics = ["onu.events"]
+ technology = "kafka"
+
+ max_onu_retry = 50
+
+ def __init__(self, *args, **kwargs):
+ super(ONUEventStep, self).__init__(*args, **kwargs)
+
+ def process_event(self, event):
+ value = json.loads(event.value)
+ self.log.info("onu.events: received event", value=value)
+
+ dt_si = DtHelpers.find_or_create_dt_si(self.model_accessor, self.log, value)
+ if value["status"] == "activated":
+ self.log.info("onu.events: activated onu", value=value)
+ dt_si.no_sync = False
+ dt_si.uni_port_id = long(value["portNumber"])
+ dt_si.of_dpid = value["deviceId"]
+ dt_si.oper_onu_status = "ENABLED"
+ dt_si.save_changed_fields(always_update_timestamp=True)
+ elif value["status"] == "disabled":
+ self.log.info("onu.events: disabled onu, resetting the subscriber", value=value)
+ dt_si.oper_onu_status = "DISABLED"
+ dt_si.save_changed_fields(always_update_timestamp=True)
+ return
+ else:
+ self.log.warn("onu.events: Unknown status value: %s" % value["status"], value=value)
+ return
diff --git a/xos/synchronizer/event_steps/pppoe_event.py b/xos/synchronizer/event_steps/pppoe_event.py
new file mode 100755
index 0000000..33c24fe
--- /dev/null
+++ b/xos/synchronizer/event_steps/pppoe_event.py
@@ -0,0 +1,60 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from xossynchronizer.event_steps.eventstep import EventStep
+from helpers import DtHelpers
+
+
+class SubscriberPppoeEventStep(EventStep):
+ topics = ["bng.pppoe"]
+ technology = "kafka"
+
+ to_pppoe = {
+ "SESSION_INIT": "INITIATED",
+ "SESSION_CONFIRMATION": "CONNECTED",
+ "SESSION_TERMINATION": "DISCONNECTED"
+ }
+ to_ipcp = {
+ "IPCP_CONF_ACK": "CONF_ACK",
+ "IPCP_CONF_REQ": "CONF_REQUEST"
+ }
+
+ to_auth = {
+ "AUTH_REQ": "STARTED",
+ "AUTH_SUCCESS": "APPROVED",
+ "AUTH_FAILED": "DENIED"
+ }
+
+ def __init__(self, *args, **kwargs):
+ super(SubscriberPppoeEventStep, self).__init__(*args, **kwargs)
+
+ def process_event(self, event):
+ value = json.loads(event.value)
+ self.log.info("bng.pppoe: Got event for subscriber", event_value=value)
+
+ si = DtHelpers.find_or_create_dt_si(self.model_accessor, self.log, value)
+ self.log.debug("bng.pppoe: Updating service instance", si=si)
+ # Map messageType to the different SI states
+ messageType = value["eventType"]
+ if messageType in self.to_pppoe.keys():
+ si.pppoe_state = self.to_pppoe[messageType]
+ if messageType in self.to_ipcp.keys():
+ si.ipcp_state = self.to_ipcp[messageType]
+ if messageType in self.to_auth.keys():
+ si.authentication_state = self.to_auth[messageType]
+ si.ip_address = value["ipAddress"]
+ si.mac_address = value["macAddress"]
+ si.pppoe_session_id = value["sessionId"]
+ si.save_changed_fields(always_update_timestamp=True)
diff --git a/xos/synchronizer/event_steps/test_onu_events.py b/xos/synchronizer/event_steps/test_onu_events.py
new file mode 100755
index 0000000..af44867
--- /dev/null
+++ b/xos/synchronizer/event_steps/test_onu_events.py
@@ -0,0 +1,190 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, Mock
+import json
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestSyncOLTDevice(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "../test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ # END Setting up the config module
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("dt-workflow-driver", "dt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from onu_event import ONUEventStep
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.model_accessor = model_accessor
+ self.log = Mock()
+
+ self.event_step = ONUEventStep(model_accessor=self.model_accessor, log=self.log)
+
+ self.event = Mock()
+ self.event_dict = {
+ 'status': 'activated',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16'
+ }
+ self.event.value = json.dumps(self.event_dict)
+
+ self.pppoe = DtWorkflowDriverService(name="dt-workflow-driver")
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_create_instance(self):
+
+ with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as dt_si_mock, \
+ patch.object(DtWorkflowDriverService.objects, "get_items") as service_mock, \
+ patch.object(DtWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
+
+ dt_si_mock.return_value = []
+ service_mock.return_value = [self.pppoe]
+
+ self.event_step.process_event(self.event)
+
+ dt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(dt_si.serial_number, self.event_dict['serialNumber'])
+ self.assertEqual(dt_si.of_dpid, self.event_dict['deviceId'])
+ self.assertEqual(dt_si.uni_port_id, long(self.event_dict['portNumber']))
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(dt_si.admin_onu_state, "AWAITING")
+ self.assertEqual(dt_si.oper_onu_status, "ENABLED")
+
+ def test_reuse_instance(self):
+
+ si = DtWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo"
+ )
+
+ with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as dt_si_mock, \
+ patch.object(DtWorkflowDriverServiceInstance, "save", autospec=True) as mock_save:
+
+ dt_si_mock.return_value = [si]
+
+ self.event_step.process_event(self.event)
+
+ dt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ self.assertEqual(dt_si.serial_number, self.event_dict['serialNumber'])
+ self.assertEqual(dt_si.of_dpid, self.event_dict['deviceId'])
+ self.assertEqual(dt_si.uni_port_id, long(self.event_dict['portNumber']))
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(dt_si.admin_onu_state, "AWAITING")
+ self.assertEqual(dt_si.oper_onu_status, "ENABLED")
+
+ def test_disable_onu(self):
+ self.event_dict = {
+ 'status': 'disabled',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16',
+ }
+
+ si = DtWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo",
+ admin_onu_state="ENABLED",
+ oper_onu_status="ENABLED",
+ )
+
+ self.event.value = json.dumps(self.event_dict)
+
+ with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as dt_si_mock, \
+ patch.object(DtWorkflowDriverServiceInstance, "save_changed_fields", autospec=True) as mock_save:
+ dt_si_mock.return_value = [si]
+
+ self.event_step.process_event(self.event)
+
+ dt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(dt_si.admin_onu_state, 'ENABLED')
+ self.assertEqual(dt_si.oper_onu_status, 'DISABLED')
+
+ def test_enable_onu(self):
+ self.event_dict = {
+ 'status': 'activated',
+ 'serialNumber': 'BRCM1234',
+ 'deviceId': 'of:109299321',
+ 'portNumber': '16',
+ }
+
+ si = DtWorkflowDriverServiceInstance(
+ serial_number=self.event_dict["serialNumber"],
+ of_dpid="foo",
+ uni_port_id="foo",
+ admin_onu_state="DISABLED",
+ oper_onu_status="DISABLED",
+ )
+
+ self.event.value = json.dumps(self.event_dict)
+
+ with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as dt_si_mock, \
+ patch.object(DtWorkflowDriverServiceInstance, "save_changed_fields", autospec=True) as mock_save:
+ dt_si_mock.return_value = [si]
+
+ self.event_step.process_event(self.event)
+
+ dt_si = mock_save.call_args[0][0]
+
+ self.assertEqual(mock_save.call_count, 1)
+
+ # Receiving an ONU event doesn't change the admin_onu_state until the model policy runs
+ self.assertEqual(dt_si.admin_onu_state, 'DISABLED')
+ self.assertEqual(dt_si.oper_onu_status, 'ENABLED')
+
+
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
+ unittest.main()
diff --git a/xos/synchronizer/event_steps/test_pppoe_event.py b/xos/synchronizer/event_steps/test_pppoe_event.py
new file mode 100755
index 0000000..4932dab
--- /dev/null
+++ b/xos/synchronizer/event_steps/test_pppoe_event.py
@@ -0,0 +1,110 @@
+# Copyright 2020-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, Mock
+import json
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+class TestSubscriberAuthEvent(unittest.TestCase):
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+
+ # Setting up the config module
+ from xosconfig import Config
+ config = os.path.join(test_path, "../test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ from multistructlog import create_logger
+ log = create_logger(Config().get('logging'))
+ # END Setting up the config module
+
+ from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
+ mock_modelaccessor_config(test_path, [("dt-workflow-driver", "dt-workflow-driver.xproto"),
+ ("olt-service", "volt.xproto"),
+ ("rcord", "rcord.xproto")])
+
+ import xossynchronizer.modelaccessor
+ import mock_modelaccessor
+ reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
+ reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
+
+ from xossynchronizer.modelaccessor import model_accessor
+ from pppoe_event import SubscriberPppoeEventStep
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ self.model_accessor = model_accessor
+ self.log = log
+
+ self.event_step = SubscriberPppoeEventStep(model_accessor=self.model_accessor, log=self.log)
+
+ self.event = Mock()
+
+ self.volt = Mock()
+ self.volt.name = "vOLT"
+ self.volt.leaf_model = Mock()
+
+ # self.subscriber = RCORDSubscriber()
+ # self.subscriber.onu_device = "BRCM1234"
+ # self.subscriber.save = Mock()
+
+ self.mac_address = "00:AA:00:00:00:01"
+ self.ip_address = "192.168.3.5"
+ self.pppoe_session_id = "12"
+
+ self.si = DtWorkflowDriverServiceInstance()
+ self.si.serial_number = "BRCM1234"
+ self.si.save = Mock()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_ipcp_subscriber(self):
+
+ self.event.value = json.dumps({
+ "deviceId": "of:0000000000000001",
+ "portNumber": "1",
+ "macAddress": self.mac_address,
+ "ipAddress": self.ip_address,
+ "sessionId": self.pppoe_session_id,
+ "eventType": "IPCP_CONF_ACK",
+ 'serialNumber': "BRCM1234",
+ })
+
+ with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as si_mock:
+
+ si_mock.return_value = [self.si]
+
+ self.event_step.process_event(self.event)
+
+ self.si.save.assert_called()
+ self.assertEqual(self.si.ipcp_state, "CONF_ACK")
+ self.assertEqual(self.si.mac_address, self.mac_address)
+ self.assertEqual(self.si.ip_address, self.ip_address)
+ self.assertEqual(self.si.pppoe_session_id, self.pppoe_session_id)
+
+
+if __name__ == '__main__':
+ sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
+ unittest.main()