Change flow of tasks in some workflow examples
Change-Id: I9a05147c0249db62f4a05b144a510fb1612ffa83
diff --git a/VERSION b/VERSION
index 5d4294b..2411653 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.1
\ No newline at end of file
+0.5.2
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 553b087..0e3a821 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -103,7 +103,7 @@
# CORD Workflow
RUN pip install multistructlog~=2.1.0 \
- && pip install cord-workflow-controller-client~=0.3.0 \
+ && pip install cord-workflow-controller-client~=0.5.0 \
&& pip install pyfiglet~=0.7 \
&& pip install xossynchronizer~=3.2.6 \
&& pip install xosapi~=3.2.6
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 89ddc3d..6a6dc38 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -27,13 +27,13 @@
- "5432:5432"
controller:
- image: opencord/cord-workflow-controller:0.5.2
+ image: opencord/cord-workflow-controller:0.5.5
ports:
- "3030:3030"
airflow:
# image: opencord/cord-workflow-airflow
- image: cord-workflow-airflow:0.5.1
+ image: cord-workflow-airflow:0.5.2
restart: always
depends_on:
- postgres
diff --git a/workflow_examples/att-workflow/att_workflow.py b/workflow_examples/att-workflow/att_workflow.py
index 0ad0ae7..11bfee8 100644
--- a/workflow_examples/att-workflow/att_workflow.py
+++ b/workflow_examples/att-workflow/att_workflow.py
@@ -21,8 +21,6 @@
from airflow import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.utils.trigger_rule import TriggerRule
from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
from airflow.operators.cord_workflow_plugin import CORDModelOperator
@@ -430,12 +428,6 @@
dag=dag_att
)
-join = DummyOperator(
- task_id='join',
- trigger_rule=TriggerRule.ALL_DONE,
- dag=dag_att
-)
-
att_model_event_sensor = CORDModelSensor(
task_id='att_model_event_sensor',
model_name='AttWorkflowDriverServiceInstance',
@@ -453,9 +445,7 @@
)
# handle standard flow
-onu_event_sensor >> onu_event_handler >> join
-auth_event_sensor >> auth_event_handler >> join
-dhcp_event_sensor >> dhcp_event_handler >> join
+onu_event_sensor >> onu_event_handler >> auth_event_sensor >> auth_event_handler >> dhcp_event_sensor >> dhcp_event_handler
# handle admin flow
att_model_event_sensor >> att_model_event_handler
diff --git a/workflow_examples/att-workflow/att_workflow_essence.json b/workflow_examples/att-workflow/att_workflow_essence.json
index 50a9441..22ccca3 100644
--- a/workflow_examples/att-workflow/att_workflow_essence.json
+++ b/workflow_examples/att-workflow/att_workflow_essence.json
@@ -47,7 +47,7 @@
"dependencies": {
"auth_event_handler": {
"children": [
- "join"
+ "dhcp_event_sensor"
],
"parents": [
"auth_event_sensor"
@@ -56,12 +56,12 @@
"auth_event_sensor": {
"children": [
"auth_event_handler"
+ ],
+ "parents": [
+ "onu_event_handler"
]
},
"dhcp_event_handler": {
- "children": [
- "join"
- ],
"parents": [
"dhcp_event_sensor"
]
@@ -69,18 +69,14 @@
"dhcp_event_sensor": {
"children": [
"dhcp_event_handler"
- ]
- },
- "join": {
+ ],
"parents": [
- "onu_event_handler",
- "auth_event_handler",
- "dhcp_event_handler"
+ "auth_event_handler"
]
},
"onu_event_handler": {
"children": [
- "join"
+ "auth_event_sensor"
],
"parents": [
"onu_event_sensor"
@@ -133,25 +129,6 @@
"task_id": "dhcp_event_sensor",
"topic": "dhcp.events"
},
- "join": {
- "class": "DummyOperator",
- "dag": "dag_att",
- "dag_id": "att_workflow",
- "local_variable": "join",
- "task_id": "join",
- "trigger_rule": {
- "Attribute": {
- "attr": "ALL_DONE",
- "ctx": "Load",
- "value": {
- "Name": {
- "ctx": "Load",
- "id": "TriggerRule"
- }
- }
- }
- }
- },
"onu_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "onu_event_sensor",
diff --git a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
index 40dfe6e..6b42ed9 100644
--- a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
@@ -13,32 +13,40 @@
# limitations under the License.
"""
-Example sequential workflow
+Example parallel workflow
"""
-
-
+import json
import logging
from datetime import datetime
from airflow import DAG
+from airflow import AirflowException
+from airflow.operators.python_operator import PythonOperator
from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
from airflow.operators.cord_workflow_plugin import CORDModelOperator
log = logging.getLogger(__name__)
-
args = {
# hard coded date
'start_date': datetime(2019, 1, 1),
'owner': 'iychoi'
}
-dag_sequential_cord = DAG(
- dag_id='sequential_cord_workflow',
+dag_parallel_cord = DAG(
+ dag_id='parallel_cord_workflow',
default_args=args,
# this dag will be triggered by external systems
schedule_interval=None
)
-dag_sequential_cord.doc_md = __doc__
+dag_parallel_cord.doc_md = __doc__
+
+dag_parallel_cord_admin = DAG(
+ dag_id='parallel_cord_workflow_admin',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_parallel_cord_admin.doc_md = __doc__
def on_onu_event(model_accessor, message, **kwargs):
@@ -63,14 +71,14 @@
key_field='serialNumber',
controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
onu_event_handler = CORDModelOperator(
task_id='onu_event_handler',
python_callable=on_onu_event,
cord_event_sensor_task_id='onu_event_sensor',
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
auth_event_sensor = CORDEventSensor(
@@ -79,14 +87,14 @@
key_field='serialNumber',
controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
auth_event_handler = CORDModelOperator(
task_id='auth_event_handler',
python_callable=on_auth_event,
cord_event_sensor_task_id='auth_event_sensor',
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
dhcp_event_sensor = CORDEventSensor(
@@ -95,64 +103,34 @@
key_field='serialNumber',
controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
dhcp_event_handler = CORDModelOperator(
task_id='dhcp_event_handler',
python_callable=on_dhcp_event,
cord_event_sensor_task_id='dhcp_event_sensor',
- dag=dag_sequential_cord
+ dag=dag_parallel_cord
)
-cord_model_event_sensor1 = CORDModelSensor(
- task_id='cord_model_event_sensor1',
- model_name='cordWorkflowDriverServiceInstance',
+att_model_event_sensor = CORDModelSensor(
+ task_id='att_model_event_sensor',
+ model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_sequential_cord
+ dag=dag_parallel_cord_admin
)
-cord_model_event_handler1 = CORDModelOperator(
- task_id='cord_model_event_handler1',
+att_model_event_handler = CORDModelOperator(
+ task_id='att_model_event_handler',
python_callable=on_model_event,
- cord_event_sensor_task_id='cord_model_event_sensor1',
- dag=dag_sequential_cord
+ cord_event_sensor_task_id='att_model_event_sensor',
+ dag=dag_parallel_cord_admin
)
-cord_model_event_sensor2 = CORDModelSensor(
- task_id='cord_model_event_sensor2',
- model_name='cordWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_sequential_cord
-)
+# handle standard flow
+onu_event_sensor >> onu_event_handler >> auth_event_sensor >> auth_event_handler >> dhcp_event_sensor >> dhcp_event_handler
+# handle admin flow
+att_model_event_sensor >> att_model_event_handler
-cord_model_event_handler2 = CORDModelOperator(
- task_id='cord_model_event_handler2',
- python_callable=on_model_event,
- cord_event_sensor_task_id='cord_model_event_sensor2',
- dag=dag_sequential_cord
-)
-
-cord_model_event_sensor3 = CORDModelSensor(
- task_id='cord_model_event_sensor3',
- model_name='cordWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_sequential_cord
-)
-
-cord_model_event_handler3 = CORDModelOperator(
- task_id='cord_model_event_handler3',
- python_callable=on_model_event,
- cord_event_sensor_task_id='cord_model_event_sensor3',
- dag=dag_sequential_cord
-)
-
-onu_event_sensor >> onu_event_handler >> cord_model_event_sensor1 >> cord_model_event_handler1 >> \
- auth_event_sensor >> auth_event_handler >> cord_model_event_sensor2 >> cord_model_event_handler2 >> \
- dhcp_event_sensor >> dhcp_event_handler >> cord_model_event_sensor3 >> cord_model_event_handler3
diff --git a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
index f8b6ba6..e908ae3 100644
--- a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
@@ -1,13 +1,13 @@
{
- "simple_cord_workflow": {
+ "parallel_cord_workflow": {
"dag": {
- "dag_id": "simple_cord_workflow",
- "local_variable": "dag_sequential_cord"
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "dag_parallel_cord"
},
"dependencies": {
"auth_event_handler": {
"children": [
- "cord_model_event_sensor2"
+ "dhcp_event_sensor"
],
"parents": [
"auth_event_sensor"
@@ -18,58 +18,10 @@
"auth_event_handler"
],
"parents": [
- "cord_model_event_handler1"
- ]
- },
- "cord_model_event_handler1": {
- "children": [
- "auth_event_sensor"
- ],
- "parents": [
- "cord_model_event_sensor1"
- ]
- },
- "cord_model_event_handler2": {
- "children": [
- "dhcp_event_sensor"
- ],
- "parents": [
- "cord_model_event_sensor2"
- ]
- },
- "cord_model_event_handler3": {
- "parents": [
- "cord_model_event_sensor3"
- ]
- },
- "cord_model_event_sensor1": {
- "children": [
- "cord_model_event_handler1"
- ],
- "parents": [
"onu_event_handler"
]
},
- "cord_model_event_sensor2": {
- "children": [
- "cord_model_event_handler2"
- ],
- "parents": [
- "auth_event_handler"
- ]
- },
- "cord_model_event_sensor3": {
- "children": [
- "cord_model_event_handler3"
- ],
- "parents": [
- "dhcp_event_handler"
- ]
- },
"dhcp_event_handler": {
- "children": [
- "cord_model_event_sensor3"
- ],
"parents": [
"dhcp_event_sensor"
]
@@ -79,12 +31,12 @@
"dhcp_event_handler"
],
"parents": [
- "cord_model_event_handler2"
+ "auth_event_handler"
]
},
"onu_event_handler": {
"children": [
- "cord_model_event_sensor1"
+ "auth_event_sensor"
],
"parents": [
"onu_event_sensor"
@@ -100,8 +52,8 @@
"auth_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "auth_event_sensor",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"local_variable": "auth_event_handler",
"python_callable": "on_auth_event",
"task_id": "auth_event_handler"
@@ -109,79 +61,19 @@
"auth_event_sensor": {
"class": "CORDEventSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"key_field": "serialNumber",
"local_variable": "auth_event_sensor",
"poke_interval": 5,
"task_id": "auth_event_sensor",
"topic": "authentication.events"
},
- "cord_model_event_handler1": {
- "class": "CORDModelOperator",
- "cord_event_sensor_task_id": "cord_model_event_sensor1",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "local_variable": "cord_model_event_handler1",
- "python_callable": "on_model_event",
- "task_id": "cord_model_event_handler1"
- },
- "cord_model_event_handler2": {
- "class": "CORDModelOperator",
- "cord_event_sensor_task_id": "cord_model_event_sensor2",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "local_variable": "cord_model_event_handler2",
- "python_callable": "DriverService_event",
- "task_id": "cord_model_event_handler2"
- },
- "cord_model_event_handler3": {
- "class": "CORDModelOperator",
- "cord_event_sensor_task_id": "cord_model_event_sensor3",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "local_variable": "cord_model_event_handler3",
- "python_callable": "DriverService_event",
- "task_id": "cord_model_event_handler3"
- },
- "cord_model_event_sensor1": {
- "class": "CORDModelSensor",
- "controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "key_field": "serialNumber",
- "local_variable": "cord_model_event_sensor1",
- "model_name": "cordWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "task_id": "cord_model_event_sensor1"
- },
- "cord_model_event_sensor2": {
- "class": "CORDModelSensor",
- "controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "key_field": "serialNumber",
- "local_variable": "cord_model_event_sensor2",
- "model_name": "cordWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "task_id": "cord_model_event_sensor2"
- },
- "cord_model_event_sensor3": {
- "class": "CORDModelSensor",
- "controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
- "key_field": "serialNumber",
- "local_variable": "cord_model_event_sensor3",
- "model_name": "cordWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "task_id": "cord_model_event_sensor3"
- },
"dhcp_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "dhcp_event_sensor",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"local_variable": "dhcp_event_handler",
"python_callable": "on_dhcp_event",
"task_id": "dhcp_event_handler"
@@ -189,8 +81,8 @@
"dhcp_event_sensor": {
"class": "CORDEventSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"key_field": "serialNumber",
"local_variable": "dhcp_event_sensor",
"poke_interval": 5,
@@ -200,8 +92,8 @@
"onu_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "onu_event_sensor",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"local_variable": "onu_event_handler",
"python_callable": "on_onu_event",
"task_id": "onu_event_handler"
@@ -209,8 +101,8 @@
"onu_event_sensor": {
"class": "CORDEventSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_sequential_cord",
- "dag_id": "simple_cord_workflow",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
"key_field": "serialNumber",
"local_variable": "onu_event_sensor",
"poke_interval": 5,
@@ -218,5 +110,45 @@
"topic": "onu.events"
}
}
+ },
+ "parallel_cord_workflow_admin": {
+ "dag": {
+ "dag_id": "parallel_cord_workflow_admin",
+ "local_variable": "dag_parallel_cord_admin"
+ },
+ "dependencies": {
+ "att_model_event_handler": {
+ "parents": [
+ "att_model_event_sensor"
+ ]
+ },
+ "att_model_event_sensor": {
+ "children": [
+ "att_model_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "att_model_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor",
+ "dag": "dag_parallel_cord_admin",
+ "dag_id": "parallel_cord_workflow_admin",
+ "local_variable": "att_model_event_handler",
+ "python_callable": "on_model_event",
+ "task_id": "att_model_event_handler"
+ },
+ "att_model_event_sensor": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_parallel_cord_admin",
+ "dag_id": "parallel_cord_workflow_admin",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor"
+ }
+ }
}
}
\ No newline at end of file