Rework directory structures
Implement kickstarter daemon
Implement a command line interface to manage workflow registration
Refine workflow essence extractor code
Change-Id: I61fd1f497a55af501c579e70a9f6c51f32f5e15c
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/test/test_essence_extractor.py b/test/test_essence_extractor.py
new file mode 100644
index 0000000..8024611
--- /dev/null
+++ b/test/test_essence_extractor.py
@@ -0,0 +1,98 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import json
+import os
+import collections
+
+from cord_workflow_airflow_extensions.essence_extractor import EssenceExtractor
+
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+examples_dir = os.path.join(test_path, "workflow_examples")
+extension_expected_result = ".expected.json"
+
+try:
+ basestring
+except NameError:
+ basestring = str
+
+
+# convert unicode string object to plain string object
+def convert(data):
+ if isinstance(data, basestring):
+ return str(data)
+ elif isinstance(data, collections.Mapping):
+ v = {}
+ for item in data:
+ v[convert(item)] = convert(data[item])
+ return v
+ elif isinstance(data, collections.Iterable):
+ v = []
+ for item in data:
+ v.append(convert(item))
+ return v
+ else:
+ return data
+
+
+class TestEssenceExtractor(unittest.TestCase):
+ """
+ Try extract essence from all examples under workflow-examples dir.
+ Then compares results with expected solution.
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def isDagFile(self, filepath):
+ _, file_extension = os.path.splitext(filepath)
+ if file_extension == ".py":
+ return True
+ return False
+
+ def testExtract(self):
+ dags = [f for f in os.listdir(examples_dir) if self.isDagFile(f)]
+ for dag in dags:
+ dag_path = os.path.join(examples_dir, dag)
+
+ essence_extractor = EssenceExtractor()
+ essence_extractor.parse_codefile(dag_path)
+ workflow_info = essence_extractor.extract()
+
+ # find its solution file
+ expected_result_file = dag_path + extension_expected_result
+ self.assertTrue(os.path.exists(expected_result_file))
+
+ # compare content
+ with open(dag_path + extension_expected_result) as json_file:
+ # this builds a dict with unicode strings
+ expected_workflow_info_uni = json.load(json_file)
+ expected_workflow_info = convert(expected_workflow_info_uni)
+ if workflow_info != expected_workflow_info:
+ print("Expected")
+ print(expected_workflow_info)
+
+ print("We got")
+ print(workflow_info)
+ self.fail("produced result is different")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_kickstarter.py b/test/test_kickstarter.py
new file mode 100644
index 0000000..c121a57
--- /dev/null
+++ b/test/test_kickstarter.py
@@ -0,0 +1,41 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.kickstarter import check_web_live
+
+
+class TestKickstarter(unittest.TestCase):
+ """
+ Check if some private functions work.
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testCheckWebLive(self):
+ live = check_web_live('http://google.com', 1, 3, 3)
+ self.assertTrue(live, 'failed to connect to http://google.com')
+
+ live = check_web_live('http://google.com:1234', 2, 2, 3)
+ self.assertFalse(live, 'should fail but succeeded')
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_workflow_ctl.py b/test/test_workflow_ctl.py
new file mode 100644
index 0000000..38090d7
--- /dev/null
+++ b/test/test_workflow_ctl.py
@@ -0,0 +1,43 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.workflow_ctl import register_workflow
+
+
+class TestWorkflowCtl(unittest.TestCase):
+ """
+ Check if some private functions work.
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testRegisterWorkflow(self):
+ failed = False
+ try:
+ register_workflow(None)
+ except BaseException:
+ failed = True
+
+ self.assertTrue(failed, 'invalid args should fail')
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/workflow_examples/att_dag.py b/test/workflow_examples/att_dag.py
new file mode 100644
index 0000000..c3bd3ea
--- /dev/null
+++ b/test/workflow_examples/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler_task',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler_task',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler_task',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+ auth_event_handler >> auth_model_event_handler >> \
+ dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/att_dag.py.expected.json b/test/workflow_examples/att_dag.py.expected.json
new file mode 100644
index 0000000..109b2a9
--- /dev/null
+++ b/test/workflow_examples/att_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler_task": {
+ "children": [
+ "auth_model_event_handler_task"
+ ],
+ "parents": [
+ "onu_model_event_handler_task"
+ ]
+ },
+ "auth_model_event_handler_task": {
+ "children": [
+ "dhcp_event_handler_task"
+ ],
+ "parents": [
+ "auth_event_handler_task"
+ ]
+ },
+ "dhcp_event_handler_task": {
+ "children": [
+ "dhcp_model_event_handler_task"
+ ],
+ "parents": [
+ "auth_model_event_handler_task"
+ ]
+ },
+ "dhcp_model_event_handler_task": {
+ "parents": [
+ "dhcp_event_handler_task"
+ ]
+ },
+ "onu_event_handler_task": {
+ "children": [
+ "onu_model_event_handler_task"
+ ]
+ },
+ "onu_model_event_handler_task": {
+ "children": [
+ "auth_event_handler_task"
+ ],
+ "parents": [
+ "onu_event_handler_task"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler_task",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler_task"
+ },
+ "dhcp_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler_task",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler_task"
+ },
+ "onu_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler_task",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler_task"
+ }
+ }
+ }
+}
diff --git a/test/workflow_examples/left_right_mix_dag.py b/test/workflow_examples/left_right_mix_dag.py
new file mode 100644
index 0000000..e734d0d
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag.py
@@ -0,0 +1,200 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler << onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
+dhcp_event_handler << auth_model_event_handler
+dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag.py.expected.json b/test/workflow_examples/left_right_mix_dag.py.expected.json
new file mode 100644
index 0000000..05607ba
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "auth_model_event_handler"
+ ],
+ "parents": [
+ "onu_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "children": [
+ "dhcp_event_handler"
+ ],
+ "parents": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "dhcp_model_event_handler"
+ ],
+ "parents": [
+ "auth_model_event_handler"
+ ]
+ },
+ "dhcp_model_event_handler": {
+ "parents": [
+ "dhcp_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/test/workflow_examples/left_right_mix_dag2.py b/test/workflow_examples/left_right_mix_dag2.py
new file mode 100644
index 0000000..6fa1df1
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag2.py
@@ -0,0 +1,196 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler << auth_event_handler
diff --git a/test/workflow_examples/left_right_mix_dag2.py.expected.json b/test/workflow_examples/left_right_mix_dag2.py.expected.json
new file mode 100644
index 0000000..7d9430b
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag2.py.expected.json
@@ -0,0 +1,100 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler",
+ "auth_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/test/workflow_examples/multi_children_parents_dag.py b/test/workflow_examples/multi_children_parents_dag.py
new file mode 100644
index 0000000..8e59e22
--- /dev/null
+++ b/test/workflow_examples/multi_children_parents_dag.py
@@ -0,0 +1,197 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
+ auth_event_handler >> dhcp_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/multi_children_parents_dag.py.expected.json b/test/workflow_examples/multi_children_parents_dag.py.expected.json
new file mode 100644
index 0000000..e0dc284
--- /dev/null
+++ b/test/workflow_examples/multi_children_parents_dag.py.expected.json
@@ -0,0 +1,130 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "dhcp_event_handler"
+ ],
+ "parents": [
+ "onu_model_event_handler",
+ "auth_model_event_handler",
+ "dhcp_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "parents": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler",
+ "auth_model_event_handler",
+ "dhcp_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/test/workflow_examples/two_dags.py b/test/workflow_examples/two_dags.py
new file mode 100644
index 0000000..1906881
--- /dev/null
+++ b/test/workflow_examples/two_dags.py
@@ -0,0 +1,184 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att1 = DAG(
+ dag_id='att_workflow_onu1',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att2 = DAG(
+ dag_id='att_workflow_onu2',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att1.doc_md = __doc__
+dag_att2.doc_md = __doc__
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att1,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att1,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att2,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att2,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
diff --git a/test/workflow_examples/two_dags.py.expected.json b/test/workflow_examples/two_dags.py.expected.json
new file mode 100644
index 0000000..dac7c12
--- /dev/null
+++ b/test/workflow_examples/two_dags.py.expected.json
@@ -0,0 +1,90 @@
+{
+ "att_workflow_onu1": {
+ "dag": {
+ "dag_id": "att_workflow_onu1",
+ "local_variable": "dag_att1"
+ },
+ "dependencies": {
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att1",
+ "dag_id": "att_workflow_onu1",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att1",
+ "dag_id": "att_workflow_onu1",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ },
+ "att_workflow_onu2": {
+ "dag": {
+ "dag_id": "att_workflow_onu2",
+ "local_variable": "dag_att2"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "auth_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "parents": [
+ "auth_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att2",
+ "dag_id": "att_workflow_onu2",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att2",
+ "dag_id": "att_workflow_onu2",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ }
+ }
+ }
+}