Implement workflow essence extractor
- Extract useful information from airflow workflow code
- Produce "essence" as a json output
- Output will be passed to workflow controller for workflow management
No source code change.
Change-Id: I01de9939fdf699522e81c369676c33c73a38b4bc
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7e0d89e
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+.noseids
+.vscode
+build
+dist
+.coverage
+coverage.xml
+cover
+.tox
+.DS_Store
+nose2-results.xml
+venv-service
+*.pyc
\ No newline at end of file
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..1c0dbdc
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=cord-workflow-airflow.git
+defaultremote=origin
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..5e0520b
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,58 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# set default shell
+SHELL = bash -e -o pipefail
+
+# Variables
+VERSION ?= $(shell cat ./VERSION)
+
+## Testing related
+CORDWORKFLOWAIRFLOW_LIBRARIES := $(wildcard lib/*)
+
+# Targets
+all: test
+
+# Create a virtualenv and install all the libraries
+venv-workflowengine:
+ virtualenv $@;\
+ source ./$@/bin/activate ; set -u ;\
+ pip install -r requirements.txt nose2 ;\
+ pip install -e lib/cord-workflow-essence-extractor
+
+# tests
+test: lib-test unit-test
+
+lib-test:
+ for lib in $(CORDWORKFLOWAIRFLOW_LIBRARIES); do pushd $$lib; tox; popd; done
+
+unit-test:
+ tox
+
+clean:
+ find . -name '*.pyc' | xargs rm -f
+ find . -name '__pycache__' | xargs rm -rf
+ rm -rf \
+ .coverage \
+ coverage.xml \
+ nose2-results.xml \
+ venv-workflowengine \
+ lib/*/.tox \
+ lib/*/build \
+ lib/*/dist \
+ lib/*/*.egg-info \
+ lib/*/.coverage \
+ lib/*/coverage.xml \
+ lib/*/*results.xml \
+ lib/*/*/VERSION
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ea09804
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# cord_workflow_airflow
+
+A workflow engine for CORD implemented on top of Airflow.
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..b9fd26f
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.1.0-dev1
\ No newline at end of file
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..cd1437d
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,3 @@
+# Workflow examples
+
+This directory contains workflow examples.
diff --git a/examples/att-workflow/README.md b/examples/att-workflow/README.md
new file mode 100644
index 0000000..e211d2a
--- /dev/null
+++ b/examples/att-workflow/README.md
@@ -0,0 +1,5 @@
+# AT&T workflow example
+
+This is not a working version. This code is only to show how a new workflow implementation will look like.
+
+Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/examples/att-workflow/__init__.py b/examples/att-workflow/__init__.py
new file mode 100644
index 0000000..1eb71b1
--- /dev/null
+++ b/examples/att-workflow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/examples/att-workflow/att_dag.py b/examples/att-workflow/att_dag.py
new file mode 100644
index 0000000..7df5d03
--- /dev/null
+++ b/examples/att-workflow/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+ auth_event_handler >> auth_model_event_handler >> \
+ dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/examples/att-workflow/att_helpers.py b/examples/att-workflow/att_helpers.py
new file mode 100644
index 0000000..2abd2ab
--- /dev/null
+++ b/examples/att-workflow/att_helpers.py
@@ -0,0 +1,79 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.steps.syncstep import DeferredException
+
+def validate_onu(model_accessor, logging, att_si):
+ """
+ This method validate an ONU against the whitelist and set the appropriate state.
+ It's expected that the deferred exception is managed in the caller method,
+ for example a model_policy or a sync_step.
+
+ :param att_si: AttWorkflowDriverServiceInstance
+ :return: [boolean, string]
+ """
+
+ oss_service = att_si.owner.leaf_model
+
+ # See if there is a matching entry in the whitelist.
+ matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+ owner_id=oss_service.id,
+ )
+ matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
+
+ if len(matching_entries) == 0:
+ logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
+ return [False, "ONU not found in whitelist"]
+
+ whitelisted = matching_entries[0]
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
+ pon_port = onu.pon_port
+ except IndexError:
+ raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
+
+ if onu.admin_state == "ADMIN_DISABLED":
+ return [False, "ONU has been manually disabled"]
+
+ if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
+ logging.warn("ONU disable as location don't match",
+ object=str(att_si),
+ serial_number=att_si.serial_number,
+ pon_port=pon_port.port_no,
+ whitelisted_pon_port=whitelisted.pon_port_id,
+ device_id=att_si.of_dpid,
+ whitelisted_device_id=whitelisted.device_id,
+ **att_si.tologdict())
+ return [False, "ONU activated in wrong location"]
+
+ return [True, "ONU has been validated"]
+
+def find_or_create_att_si(model_accessor, logging, event):
+ try:
+ att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+ serial_number=event["serialNumber"]
+ )
+ logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
+ except IndexError:
+ # create an AttWorkflowDriverServiceInstance, the validation will be
+ # triggered in the corresponding sync step
+ att_si = model_accessor.AttWorkflowDriverServiceInstance(
+ serial_number=event["serialNumber"],
+ of_dpid=event["deviceId"],
+ uni_port_id=long(event["portNumber"]),
+ # we assume there is only one AttWorkflowDriverService
+ owner=model_accessor.AttWorkflowDriverService.objects.first()
+ )
+ logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
+ return att_si
\ No newline at end of file
diff --git a/examples/att-workflow/att_service_instance_funcs.py b/examples/att-workflow/att_service_instance_funcs.py
new file mode 100644
index 0000000..df179f9
--- /dev/null
+++ b/examples/att-workflow/att_service_instance_funcs.py
@@ -0,0 +1,190 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from att_helpers import *
+
+# Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
+def process_onu_state(model_accessor, si):
+ [valid, message] = validate_onu(model_accessor, logging, si)
+ si.status_message = message
+ if valid:
+ si.admin_onu_state = "ENABLED"
+ update_onu(model_accessor, si.serial_number, "ENABLED")
+ else:
+ si.admin_onu_state = "DISABLED"
+ update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+# If the ONU has been disabled then we force re-authentication when it
+# is re-enabled.
+# Setting si.authentication_state = AWAITING:
+# -> subscriber status = "awaiting_auth"
+# -> service chain deleted
+# -> need authentication to restore connectivity after ONU enabled
+def process_auth_state(si):
+ auth_msgs = {
+ "AWAITING": " - Awaiting Authentication",
+ "REQUESTED": " - Authentication requested",
+ "STARTED": " - Authentication started",
+ "APPROVED": " - Authentication succeeded",
+ "DENIED": " - Authentication denied"
+ }
+ if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+ si.authentication_state = "AWAITING"
+ else:
+ si.status_message += auth_msgs[si.authentication_state]
+
+
+# The DhcpL2Relay ONOS app generates events that update the fields below.
+# It only sends events when it processes DHCP packets. It keeps no internal state.
+# We reset dhcp_state when:
+# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+# -> subscriber status = "awaiting_auth"
+# -> service chain not present
+# -> subscriber's OLT flow rules, xconnect not present
+# -> DHCP packets won't go through
+# Note, however, that the DHCP state at the endpoints is not changed.
+# A previously issued DHCP lease may still be valid.
+def process_dhcp_state(si):
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ si.ip_address = ""
+ si.mac_address = ""
+ si.dhcp_state = "AWAITING"
+
+
+# Make sure the object is in a legitimate state
+# It should be after the above processing steps
+# However this might still fail if an event has fired in the meantime
+# Valid states:
+# ONU | Auth | DHCP
+# ===============================
+# AWAITING | AWAITING | AWAITING
+# ENABLED | * | AWAITING
+# ENABLED | APPROVED | *
+# DISABLED | AWAITING | AWAITING
+def validate_states(si):
+ if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
+ "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+ return
+ if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+ return
+ logging.warning(
+ "MODEL_POLICY (validate_states): invalid state combination",
+ onu_state=si.admin_onu_state,
+ auth_state=si.authentication_state,
+ dhcp_state=si.dhcp_state)
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+ onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+ == serial_number.lower()][0]
+ if onu.admin_state == "ADMIN_DISABLED":
+ logging.debug(
+ "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+ (serial_number, admin_state))
+ return
+ if onu.admin_state == admin_state:
+ logging.debug(
+ "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+ (serial_number, admin_state))
+ else:
+ logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+ onu.admin_state = admin_state
+ onu.save_changed_fields(always_update_timestamp=True)
+
+
+def get_subscriber(model_accessor, serial_number):
+ try:
+ return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+ == serial_number.lower()][0]
+ except IndexError:
+ # If the subscriber doesn't exist we don't do anything
+ logging.debug(
+ "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
+ onu_device=serial_number)
+ return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+ # TODO check if the subscriber has an IP and update it,
+ # or create a new one
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
+ ip.save_changed_fields()
+ except IndexError:
+ logging.debug(
+ "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip = model_accessor.RCORDIpAddress(
+ subscriber_id=subscriber.id,
+ ip=ip,
+ description="DHCP Assigned IP Address"
+ )
+ ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ logging.debug(
+ "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip.delete()
+ except BaseException:
+ logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+ cur_status = subscriber.status
+ # Don't change state if someone has disabled the subscriber
+ if subscriber.status != "disabled":
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "APPROVED":
+ subscriber.status = "enabled"
+ elif si.authentication_state == "DENIED":
+ subscriber.status = "auth-failed"
+
+ # NOTE we save the subscriber only if:
+ # - the status has changed
+ # - we get a DHCPACK event
+ if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+ logging.debug(
+ "MODEL_POLICY: updating subscriber",
+ onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state,
+ subscriber_status=subscriber.status)
+ if subscriber.status == "awaiting-auth":
+ delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = ""
+ elif si.ip_address and si.mac_address:
+ update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = si.mac_address
+ subscriber.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state, subscriber_status=subscriber.status)
diff --git a/lib/cord-workflow-essence-extractor/.gitignore b/lib/cord-workflow-essence-extractor/.gitignore
new file mode 100644
index 0000000..f3a7ffa
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/.gitignore
@@ -0,0 +1,11 @@
+.noseids
+build
+cordworkflowessenceextractor.egg-info
+dist
+.coverage
+coverage.xml
+cover
+.DS_Store
+
+# setup.py copies this, don't commit it
+VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/MANIFEST.in b/lib/cord-workflow-essence-extractor/MANIFEST.in
new file mode 100644
index 0000000..b3a0b9c
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.rst
+include requirements.txt
+include cordworkflowessenceextractor/VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/README.rst b/lib/cord-workflow-essence-extractor/README.rst
new file mode 100644
index 0000000..680d130
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/README.rst
@@ -0,0 +1,11 @@
+CORD Workflow Essence Extractor
+===============================
+
+Extract workflow essence from Airflow workflow code. The essence information
+will be later passed to Workflow Controller for control.
+
+The essence extractor extracts following information from Airflow workflow code written in python:
+- DAG information (DAG ID)
+- Operators (class name, event and other parameters)
+- Task dependencies (parents and children)
+
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
new file mode 100644
index 0000000..8190914
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
@@ -0,0 +1,96 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import json
+import cordworkflowessenceextractor.workflow_essence_extractor as extractor
+
+import os
+import collections
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+examples_dir = os.path.join(test_path, "workflow-examples")
+extension_expected_result = ".expected.json"
+
+try:
+ basestring
+except NameError:
+ basestring = str
+
+
+def convert(data):
+ if isinstance(data, basestring):
+ return str(data)
+ elif isinstance(data, collections.Mapping):
+ v = {}
+ for item in data:
+ v[convert(item)] = convert(data[item])
+ return v
+ elif isinstance(data, collections.Iterable):
+ v = []
+ for item in data:
+ v.append(convert(item))
+ return v
+ else:
+ return data
+
+
+class TestParse(unittest.TestCase):
+
+ """
+ Try parse all examples under workflow-examples dir.
+ Then compares results with expected solution.
+ """
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def isDagFile(self, filepath):
+ _, file_extension = os.path.splitext(filepath)
+ if file_extension == ".py":
+ return True
+ return False
+
+ def test_parse(self):
+ dags = [f for f in os.listdir(examples_dir) if self.isDagFile(f)]
+
+ for dag in dags:
+ dag_path = os.path.join(examples_dir, dag)
+ tree = extractor.parse_codefile(dag_path)
+ workflow_info = extractor.extract_all(tree)
+
+ # check if its expected solution fil
+ expected_result_file = dag_path + extension_expected_result
+ self.assertTrue(os.path.exists(expected_result_file))
+
+ # compare content
+ with open(dag_path + extension_expected_result) as json_file:
+ # this builds a dict with unicode strings
+ expected_workflow_info_uni = json.load(json_file)
+ expected_workflow_info = convert(expected_workflow_info_uni)
+ if workflow_info != expected_workflow_info:
+ print("Expected")
+ print(expected_workflow_info)
+
+ print("We got")
+ print(workflow_info)
+ self.fail("produced result is different")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py
new file mode 100644
index 0000000..c3bd3ea
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler_task',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler_task',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler_task',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler_task',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+ auth_event_handler >> auth_model_event_handler >> \
+ dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json
new file mode 100644
index 0000000..109b2a9
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler_task": {
+ "children": [
+ "auth_model_event_handler_task"
+ ],
+ "parents": [
+ "onu_model_event_handler_task"
+ ]
+ },
+ "auth_model_event_handler_task": {
+ "children": [
+ "dhcp_event_handler_task"
+ ],
+ "parents": [
+ "auth_event_handler_task"
+ ]
+ },
+ "dhcp_event_handler_task": {
+ "children": [
+ "dhcp_model_event_handler_task"
+ ],
+ "parents": [
+ "auth_model_event_handler_task"
+ ]
+ },
+ "dhcp_model_event_handler_task": {
+ "parents": [
+ "dhcp_event_handler_task"
+ ]
+ },
+ "onu_event_handler_task": {
+ "children": [
+ "onu_model_event_handler_task"
+ ]
+ },
+ "onu_model_event_handler_task": {
+ "children": [
+ "auth_event_handler_task"
+ ],
+ "parents": [
+ "onu_event_handler_task"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler_task",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler_task"
+ },
+ "dhcp_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler_task",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler_task"
+ },
+ "onu_event_handler_task": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler_task",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler_task": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler_task"
+ }
+ }
+ }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py
new file mode 100644
index 0000000..e734d0d
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py
@@ -0,0 +1,200 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler << onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
+dhcp_event_handler << auth_model_event_handler
+dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json
new file mode 100644
index 0000000..05607ba
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "auth_model_event_handler"
+ ],
+ "parents": [
+ "onu_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "children": [
+ "dhcp_event_handler"
+ ],
+ "parents": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "dhcp_model_event_handler"
+ ],
+ "parents": [
+ "auth_model_event_handler"
+ ]
+ },
+ "dhcp_model_event_handler": {
+ "parents": [
+ "dhcp_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py
new file mode 100644
index 0000000..6fa1df1
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py
@@ -0,0 +1,196 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler << auth_event_handler
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json
new file mode 100644
index 0000000..7d9430b
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json
@@ -0,0 +1,100 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler",
+ "auth_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py
new file mode 100644
index 0000000..8e59e22
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py
@@ -0,0 +1,197 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
+ auth_event_handler >> dhcp_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json
new file mode 100644
index 0000000..e0dc284
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json
@@ -0,0 +1,130 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "dhcp_event_handler"
+ ],
+ "parents": [
+ "onu_model_event_handler",
+ "auth_model_event_handler",
+ "dhcp_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "parents": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler",
+ "auth_model_event_handler",
+ "dhcp_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ },
+ "dhcp_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler",
+ "topic": "dhcp.events"
+ },
+ "dhcp_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "dhcp_model_event_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py
new file mode 100644
index 0000000..1906881
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py
@@ -0,0 +1,184 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att1 = DAG(
+ dag_id='att_workflow_onu1',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att2 = DAG(
+ dag_id='att_workflow_onu2',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att1.doc_md = __doc__
+dag_att2.doc_md = __doc__
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att1,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att1,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att2,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att2,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json
new file mode 100644
index 0000000..dac7c12
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json
@@ -0,0 +1,90 @@
+{
+ "att_workflow_onu1": {
+ "dag": {
+ "dag_id": "att_workflow_onu1",
+ "local_variable": "dag_att1"
+ },
+ "dependencies": {
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att1",
+ "dag_id": "att_workflow_onu1",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att1",
+ "dag_id": "att_workflow_onu1",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ },
+ "att_workflow_onu2": {
+ "dag": {
+ "dag_id": "att_workflow_onu2",
+ "local_variable": "dag_att2"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "auth_model_event_handler"
+ ]
+ },
+ "auth_model_event_handler": {
+ "parents": [
+ "auth_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_att2",
+ "dag_id": "att_workflow_onu2",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "Auth_event",
+ "task_id": "auth_event_handler",
+ "topic": "authentication.events"
+ },
+ "auth_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_att2",
+ "dag_id": "att_workflow_onu2",
+ "key_field": "serialNumber",
+ "local_variable": "auth_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "auth_model_event_handler"
+ }
+ }
+ }
+}
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
new file mode 100644
index 0000000..a0a748a
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
@@ -0,0 +1,18 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from .workflow_essence_extractor import parse_codefile, extract_all
+
+__all__ = ["parse_codefile", "extract_all"]
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
new file mode 100644
index 0000000..bb62493
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
@@ -0,0 +1,552 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Essence Extractor
+
+This module extracts essence of airflow workflows
+Following information will be extracted from workflow code
+- DAG info
+- Operator info
+ - XOS-related operators
+ - Airflow operators
+- Dependency info
+"""
+
+import ast
+import sys
+import json
+import os.path
+
+
+def classname(cls):
+ return cls.__class__.__name__
+
+
+def jsonify_ast(node, level=0):
+ fields = {}
+ for k in node._fields:
+ fields[k] = '...'
+ v = getattr(node, k)
+ if isinstance(v, ast.AST):
+ if v._fields:
+ fields[k] = jsonify_ast(v)
+ else:
+ fields[k] = classname(v)
+
+ elif isinstance(v, list):
+ fields[k] = []
+ for e in v:
+ fields[k].append(jsonify_ast(e))
+
+ elif isinstance(v, str):
+ fields[k] = v
+
+ elif isinstance(v, int) or isinstance(v, float):
+ fields[k] = v
+
+ elif v is None:
+ fields[k] = None
+
+ else:
+ fields[k] = 'unrecognized'
+
+ ret = {
+ classname(node): fields
+ }
+ return ret
+
+
+def parse(code):
+ lines = code.split("\n")
+ if len(lines) == 1:
+ if code.endswith(".py") and os.path.exists(code):
+ return parse_codefile(code)
+ return parse_code(code)
+
+
+def parse_code(code):
+ tree = ast.parse(code)
+ return jsonify_ast(tree)
+
+
+def parse_codefile(code_filepath):
+ code = None
+ with open(code_filepath, "r") as f:
+ code = f.read()
+ tree = ast.parse(code, code_filepath)
+ return jsonify_ast(tree)
+
+
+def pretty_print_json(j):
+ dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+ print(dumps)
+
+
+def recursively_find_elements(tree, elem):
+ """
+ traverse AST and find elements
+ """
+ for e in tree:
+ obj = None
+ if isinstance(tree, list):
+ obj = e
+ elif isinstance(tree, dict):
+ obj = tree[e]
+
+ if e == elem:
+ yield obj
+
+ if obj and (isinstance(obj, list) or isinstance(obj, dict)):
+ for y in recursively_find_elements(obj, elem):
+ yield y
+
+
+def extract_func_calls(tree, func_name):
+ """
+ extract function calls with assignment
+ """
+ assigns = recursively_find_elements(tree, "Assign")
+ if assigns:
+ for assign in assigns:
+ found = False
+
+ calls = recursively_find_elements(assign, "Call")
+ if calls:
+ for call in calls:
+ funcs = recursively_find_elements(call, "func")
+ if funcs:
+ for func in funcs:
+ if "Name" in func:
+ name = func["Name"]
+ if "ctx" in name and "id" in name:
+ # found function
+ if name["id"] == func_name:
+ found = True
+
+ if found:
+ yield assign
+
+
+def extract_func_calls_airflow_operators(tree):
+ """
+ extract only airflow operators which end with "*Operator" or "*Sensor"
+ """
+ assigns = recursively_find_elements(tree, "Assign")
+ if assigns:
+ for assign in assigns:
+ found = False
+
+ calls = recursively_find_elements(assign, "Call")
+ if calls:
+ for call in calls:
+ funcs = recursively_find_elements(call, "func")
+ if funcs:
+ for func in funcs:
+ if "Name" in func:
+ name = func["Name"]
+ if "ctx" in name and "id" in name:
+ # found function
+ if name["id"].endswith(("Operator", "Sensor")):
+ found = True
+
+ if found:
+ yield assign
+
+
+def extract_bin_op(tree, op_name):
+ """
+ extract binary operation such as >>, <<
+ """
+ ops = recursively_find_elements(tree, "BinOp")
+ if ops:
+ for op in ops:
+ if op["op"] == op_name:
+ yield op
+
+
+def take_string_or_tree(tree):
+ if "Str" in tree:
+ return tree["Str"]["s"]
+ return tree
+
+
+def take_num_or_tree(tree):
+ if "Num" in tree:
+ return tree["Num"]["n"]
+ return tree
+
+
+def take_id_or_tree(tree):
+ if "Name" in tree:
+ return tree["Name"]["id"]
+ return tree
+
+
+def take_name_constant_or_tree(tree):
+ if "NameConstant" in tree:
+ return tree["NameConstant"]["value"]
+ return tree
+
+
+def take_value_or_tree(tree):
+ if "Str" in tree:
+ return tree["Str"]["s"]
+ elif "Num" in tree:
+ return tree["Num"]["n"]
+ elif "Name" in tree:
+ val = tree["Name"]["id"]
+ if val in ["True", "False"]:
+ return bool(val)
+ elif val == "None":
+ return None
+ return val
+ elif "NameConstant" in tree:
+ val = tree["NameConstant"]["value"]
+ if val in ["True", "False"]:
+ return bool(val)
+ elif val == "None":
+ return None
+ return val
+ elif "List" in tree:
+ vals = []
+ if "elts" in tree["List"]:
+ elts = tree["List"]["elts"]
+ for elt in elts:
+ val = take_value_or_tree(elt)
+ vals.append(val)
+ return vals
+ return tree
+
+
+def make_dag(tree):
+ loc_val = None
+ dag_id = None
+
+ if "targets" in tree:
+ targets = tree["targets"]
+ loc_val = take_id_or_tree(targets[0])
+
+ if "value" in tree:
+ value = tree["value"]
+ if "Call" in value:
+ call = value["Call"]
+ if "keywords" in call:
+ keywords = call["keywords"]
+ for keyword in keywords:
+ if "keyword" in keyword:
+ k = keyword["keyword"]
+ if k["arg"] == "dag_id":
+ dag_id = take_string_or_tree(k["value"])
+
+ return {
+ 'local_variable': loc_val,
+ 'dag_id': dag_id
+ }
+
+
+def make_airflow_operator(tree):
+ airflow_operator = {}
+
+ if "targets" in tree:
+ targets = tree["targets"]
+ loc_val = take_id_or_tree(targets[0])
+ airflow_operator["local_variable"] = loc_val
+
+ if "value" in tree:
+ value = tree["value"]
+ if "Call" in value:
+ call = value["Call"]
+ if "func" in call:
+ class_name = take_id_or_tree(call["func"])
+ airflow_operator["class"] = class_name
+
+ if "keywords" in call:
+ keywords = call["keywords"]
+ for keyword in keywords:
+ if "keyword" in keyword:
+ k = keyword["keyword"]
+ arg = k["arg"]
+ airflow_operator[arg] = take_value_or_tree(k["value"])
+
+ return airflow_operator
+
+
+def make_dependencies_bin_op(tree, dependencies):
+ children = []
+ parents = []
+ child = None
+ parent = None
+
+ if tree["op"] == "RShift":
+ child = take_id_or_tree(tree["right"])
+ parent = take_id_or_tree(tree["left"])
+ elif tree["op"] == "LShift":
+ child = take_id_or_tree(tree["left"])
+ parent = take_id_or_tree(tree["right"])
+
+ if child:
+ if isinstance(child, dict):
+ if "List" in child:
+ for c in child["List"]["elts"]:
+ children.append(take_id_or_tree(c))
+ elif "BinOp" in child:
+ deps = make_dependencies_bin_op(child["BinOp"], dependencies)
+ for dep in deps:
+ children.append(dep)
+ else:
+ children.append(take_id_or_tree(child))
+ else:
+ children.append(child)
+
+ if parent:
+ if isinstance(parent, dict):
+ if "List" in parent:
+ for p in parent["List"]["elts"]:
+ parents.append(take_id_or_tree(p))
+ elif "BinOp" in parent:
+ deps = make_dependencies_bin_op(parent["BinOp"], dependencies)
+ for dep in deps:
+ parents.append(dep)
+ else:
+ parents.append(take_id_or_tree(parent))
+ else:
+ parents.append(parent)
+
+ if len(parents) > 0 and len(children) > 0:
+ # make all-vs-all combinations
+ for p in parents:
+ for c in children:
+ dep = {
+ 'parent': p,
+ 'child': c
+ }
+ dependencies.append(dep)
+
+ if tree["op"] == "RShift":
+ return children
+ elif tree["op"] == "LShift":
+ return parents
+ return children
+
+
+def extract_dep_operations(tree):
+ # extract dependency definition using ">>"
+ ops = extract_bin_op(tree, "RShift")
+ if ops:
+ for op in ops:
+ deps = []
+ make_dependencies_bin_op(op, deps)
+ for dep in deps:
+ yield dep
+
+ # extract dependency definition using "<<"
+ ops = extract_bin_op(tree, "LShift")
+ if ops:
+ for op in ops:
+ deps = []
+ make_dependencies_bin_op(op, deps)
+ for dep in deps:
+ yield dep
+
+
+def extract_dags(tree):
+ dags = {}
+ calls = extract_func_calls(tree, "DAG")
+ if calls:
+ for call in calls:
+ dag = make_dag(call)
+ dagid = dag["dag_id"]
+ dags[dagid] = dag
+ return dags
+
+
+def extract_XOS_event_sensors(tree):
+ operators = {}
+ calls = extract_func_calls(tree, "XOSEventSensor")
+ if calls:
+ for call in calls:
+ operator = make_airflow_operator(call)
+ operatorid = operator["task_id"]
+ operators[operatorid] = operator
+ return operators
+
+
+def extract_XOS_model_sensors(tree):
+ operators = {}
+ calls = extract_func_calls(tree, "XOSModelSensor")
+ if calls:
+ for call in calls:
+ operator = make_airflow_operator(call)
+ operatorid = operator["task_id"]
+ operators[operatorid] = operator
+ return operators
+
+
+def extract_airflow_operators(tree):
+ operators = {}
+ calls = extract_func_calls_airflow_operators(tree)
+ if calls:
+ for call in calls:
+ operator = make_airflow_operator(call)
+ operatorid = operator["task_id"]
+ operators[operatorid] = operator
+ return operators
+
+
+def extract_all_operators(tree):
+ operators = {}
+ event_sensors = extract_XOS_event_sensors(tree)
+ if event_sensors:
+ for event_sensor in event_sensors:
+ operators[event_sensor] = event_sensors[event_sensor]
+
+ model_sensors = extract_XOS_model_sensors(tree)
+ if model_sensors:
+ for model_sensor in model_sensors:
+ operators[model_sensor] = model_sensors[model_sensor]
+
+ airflow_operators = extract_airflow_operators(tree)
+ if airflow_operators:
+ for airflow_operator in airflow_operators:
+ operators[airflow_operator] = airflow_operators[airflow_operator]
+
+ return operators
+
+
+def extract_dependencies(tree):
+ """
+ Build N-N dependencies from fragmented parent-child relations
+ A node can have multiple parents and multiple children
+ """
+ dependencies = {}
+ ops = extract_dep_operations(tree)
+ if ops:
+ for op in ops:
+ p = op["parent"]
+ c = op["child"]
+
+ if p in dependencies:
+ # append to an existing list
+ node_p = dependencies[p]
+ if "children" in node_p:
+ # prevent duplicates
+ if c not in node_p["children"]:
+ node_p["children"].append(c)
+ else:
+ node_p["children"] = [c]
+ else:
+ # create a new
+ node_p = {
+ 'children': [c]
+ }
+ dependencies[p] = node_p
+
+ if c in dependencies:
+ # append to an existing list
+ node_c = dependencies[c]
+ if "parents" in node_c:
+ # prevent duplicates
+ if p not in node_c["parents"]:
+ node_c["parents"].append(p)
+ else:
+ node_c["parents"] = [p]
+ else:
+ # create a new
+ node_c = {
+ 'parents': [p]
+ }
+ dependencies[c] = node_c
+
+ return dependencies
+
+
+def extract_all(tree):
+ """
+ Build highlevel information of workflows dag, operators and dependencies refers to each other
+ """
+ dags = extract_dags(tree)
+ operators = extract_all_operators(tree)
+ dependencies = extract_dependencies(tree)
+
+ dag_dict = {}
+ for dag_id in dags:
+ dag = dags[dag_id]
+ dag_var = dag["local_variable"]
+
+ # filter operators that do not belong to the dag
+ my_operators = {}
+ my_operators_var = {}
+ for task_id in operators:
+ operator = operators[task_id]
+ if operator["dag"] == dag_var:
+ # set dag_id
+ operator["dag_id"] = dag_id
+ my_operators[task_id] = operator
+
+ # this is to help fast search while working with dependencies
+ operator_local_var = operator["local_variable"]
+ my_operators_var[operator_local_var] = operator
+
+ # filter dependencies that do not belong to the dag
+ my_dependencies = {}
+ for task_var in dependencies:
+ if task_var in my_operators_var:
+ dependency = dependencies[task_var]
+ task_id = my_operators_var[task_var]["task_id"]
+
+ # convert dependency task_var to task_id
+ dep = {}
+ if "children" in dependency:
+ dep["children"] = []
+ for child in dependency["children"]:
+ if child in my_operators_var:
+ child_task_id = my_operators_var[child]["task_id"]
+ dep["children"].append(child_task_id)
+
+ if "parents" in dependency:
+ dep["parents"] = []
+ for parent in dependency["parents"]:
+ if parent in my_operators_var:
+ parent_task_id = my_operators_var[parent]["task_id"]
+ dep["parents"].append(parent_task_id)
+
+ my_dependencies[task_id] = dep
+
+ d = {
+ 'dag': dag,
+ 'tasks': my_operators,
+ 'dependencies': my_dependencies
+ }
+ dag_dict[dag_id] = d
+
+ return dag_dict
+
+
+# for command-line execution
+def main(argv):
+ if len(argv) < 1:
+ sys.exit("Error: Need a filepath")
+
+ code_filepath = argv[0]
+
+ tree = parse_codefile(code_filepath)
+ all = extract_all(tree)
+ pretty_print_json(all)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/lib/cord-workflow-essence-extractor/requirements.txt b/lib/cord-workflow-essence-extractor/requirements.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/requirements.txt
diff --git a/lib/cord-workflow-essence-extractor/setup.py b/lib/cord-workflow-essence-extractor/setup.py
new file mode 100644
index 0000000..1e0311d
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/setup.py
@@ -0,0 +1,57 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+import os
+from shutil import copyfile
+
+from setuptools import setup
+
+
+def readme():
+ with open("README.rst") as f:
+ return f.read()
+
+
+def version():
+ # Copy VERSION file of parent to module directory if not found
+ if not os.path.exists("cordworkflowessenceextractor/VERSION"):
+ copyfile("../../VERSION", "cordworkflowessenceextractor/VERSION")
+ with open("cordworkflowessenceextractor/VERSION") as f:
+ return f.read().strip()
+
+
+def parse_requirements(filename):
+ # parse a requirements.txt file, allowing for blank lines and comments
+ requirements = []
+ for line in open(filename):
+ if line and not line.startswith("#"):
+ requirements.append(line)
+ return requirements
+
+
+setup(
+ name="cordworkflowessenceextractor",
+ version=version(),
+ description="Extract workflow essence from airflow workflow code",
+ long_description=readme(),
+ author="Illyoung Choi",
+ author_email="iychoi@opennetworking.org",
+ classifiers=["License :: OSI Approved :: Apache Software License"],
+ license="Apache v2",
+ packages=["cordworkflowessenceextractor"],
+ install_requires=parse_requirements("requirements.txt"),
+ include_package_data=True,
+)
diff --git a/lib/cord-workflow-essence-extractor/tox.ini b/lib/cord-workflow-essence-extractor/tox.ini
new file mode 100644
index 0000000..eb3f3f6
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/tox.ini
@@ -0,0 +1,50 @@
+; Copyright 2019-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36,py37
+skip_missing_interpreters = true
+skipsdist = True
+
+[testenv]
+deps =
+ -r requirements.txt
+ nose2
+ flake8
+
+commands =
+ nose2 -c tox.ini --verbose --junit-xml
+ flake8
+
+[flake8]
+max-line-length = 119
+exclude =
+ .tox
+ build
+ cord-workflow-essence-extractor-tests/workflow-examples
+
+[unittest]
+plugins = nose2.plugins.junitxml
+
+[junit-xml]
+path = nose2-results.xml
+
+[coverage]
+always-on = True
+coverage =
+ cordworkflowessenceextractor
+ cord-workflow-essence-extractor-tests
+coverage-report =
+ term
+ xml
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/requirements.txt
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..2cf47b7
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,44 @@
+; Copyright 2019-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36,py37
+skip_missing_interpreters = True
+skipsdist = True
+
+[testenv]
+deps =
+ -r requirements.txt
+; -e lib/cord-workflow-essence-extractor
+ nose2
+ flake8
+
+commands =
+ nose2 -c tox.ini --verbose --junit-xml
+# flake8
+
+[flake8]
+max-line-length = 119
+
+[unittest]
+plugins = nose2.plugins.junitxml
+
+[junit-xml]
+path = nose2-results.xml
+
+[coverage]
+always-on = False
+coverage-report =
+ term
+ xml
\ No newline at end of file