Add test scripts
Add model-accessor for model-operator
Add more sample workflows
Rework AT&T workflow
Change-Id: I33b5713e221c70bdde5768f1061c06dbbb1dccd6
diff --git a/VERSION b/VERSION
index 79a2734..5d4294b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.0
\ No newline at end of file
+0.5.1
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index c35b212..553b087 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -111,17 +111,22 @@
# drop plugin code to plugin dir of airflow
COPY ./src/cord_workflow_airflow_extensions/cord_workflow_plugin.py ${AIRFLOW_HOME}/plugins/cord_workflow_plugin.py
+# make xosapi directory
+RUN mkdir -p /var/run/xosapi \
+ && chmod a+rwx /var/run/xosapi
+
# drop sample workflow code to dags dir of airflow
-COPY ./workflow_examples/simple-cord-workflow/simple_cord_workflow.py ${AIRFLOW_HOME}/dags/simple_cord_workflow.py
-COPY ./workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json ${HOME}/simple_cord_workflow_essence.json
+COPY ./workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py ${AIRFLOW_HOME}/dags/sequential_cord_workflow.py
+COPY ./workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json ${HOME}/sequential_cord_workflow_essence.json
+COPY ./workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py ${AIRFLOW_HOME}/dags/parallel_cord_workflow.py
+COPY ./workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json ${HOME}/parallel_cord_workflow_essence.json
+COPY ./workflow_examples/att-workflow/att_workflow.py ${AIRFLOW_HOME}/dags/att_workflow.py
+COPY ./workflow_examples/att-workflow/att_workflow_essence.json ${HOME}/att_workflow_essence.json
COPY ./workflow_examples/simple-airflow-workflow/simple_airflow_workflow.py ${AIRFLOW_HOME}/dags/simple_airflow_workflow.py
COPY ./workflow_examples/simple-airflow-workflow/simple_airflow_workflow_essence.json ${HOME}/simple_airflow_workflow_essence.json
# copy scripts
-COPY ./workflow_examples/connection_setup.sh ${HOME}/connection_setup.sh
-COPY ./workflow_examples/register_essence.sh ${HOME}/register_essence.sh
-COPY ./workflow_examples/emit_events_model.sh ${HOME}/emit_events_model.sh
-COPY ./workflow_examples/emit_events_onu.sh ${HOME}/emit_events_onu.sh
+COPY ./workflow_examples/scripts ${HOME}/scripts
# copy kickstarter code & workflow ctl code
COPY ./src/tools/kickstarter.py ${HOME}/kickstarter.py
@@ -131,10 +136,7 @@
COPY ./src/tools/config.json /etc/cord_workflow_airflow_extensions/config.json
RUN chown -R ${AIRFLOW_USER}:${AIRFLOW_USER} ${HOME} \
- && chmod 755 ${HOME}/connection_setup.sh \
- && chmod 755 ${HOME}/register_essence.sh \
- && chmod 755 ${HOME}/emit_events_model.sh \
- && chmod 755 ${HOME}/emit_events_onu.sh \
+ && chmod 755 -R ${HOME}/scripts \
&& chmod 755 ${HOME}/kickstarter.py \
&& chmod 755 ${HOME}/workflow_ctl.py
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 3f14ec5..89ddc3d 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -27,13 +27,13 @@
- "5432:5432"
controller:
- image: opencord/cord-workflow-controller:0.5.0
+ image: opencord/cord-workflow-controller:0.5.2
ports:
- "3030:3030"
airflow:
# image: opencord/cord-workflow-airflow
- image: cord-workflow-airflow:0.5.0
+ image: cord-workflow-airflow:0.5.1
restart: always
depends_on:
- postgres
diff --git a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
index 2532477..3a6c83c 100644
--- a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
+++ b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
from airflow.plugins_manager import AirflowPlugin
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
@@ -162,16 +163,42 @@
*args,
**kwargs)
self.cord_event_sensor_task_id = cord_event_sensor_task_id
+ self.model_accessor = None
+
+ def create_model_accessor(self):
+ self.log.info("Creating model accessor...")
+ from xossynchronizer.modelaccessor import model_accessor
+ self.model_accessor = model_accessor
+
+ def wait_for_ready(self):
+ self.log.info("Waiting for model accessor to get ready...")
+ models_active = False
+ # wait = False
+ while not models_active:
+ try:
+ # variable is unused
+ _i = self.model_accessor.Site.objects.first() # noqa: F841
+ models_active = True
+ except Exception as e:
+ self.log.info("Exception", e=e)
+ self.log.info("Waiting for data model to come up before starting...")
+ time.sleep(10)
+ # wait = True
+
+ # if wait:
+ # # Safety factor, seeing that we stumbled waiting for the data model to come up.
+ # time.sleep(60)
def execute_callable(self):
- # TODO
- model_accessor = None
+ # temporarily comment out this two lines
+ # self.create_model_accessor()
+ # self.wait_for_ready()
message = None
if self.cord_event_sensor_task_id:
message = self.op_kwargs['ti'].xcom_pull(task_ids=self.cord_event_sensor_task_id)
- new_op_kwargs = dict(self.op_kwargs, model_accessor=model_accessor, message=message)
+ new_op_kwargs = dict(self.op_kwargs, model_accessor=self.model_accessor, message=message)
return self.python_callable(*self.op_args, **new_op_kwargs)
diff --git a/workflow_examples/att-workflow/README.md b/workflow_examples/att-workflow/README.md
index e211d2a..27d2b7d 100644
--- a/workflow_examples/att-workflow/README.md
+++ b/workflow_examples/att-workflow/README.md
@@ -1,5 +1,3 @@
# AT&T workflow example
-This is not a working version. This code is only to show how a new workflow implementation will look like.
-
Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/__init__.py b/workflow_examples/att-workflow/__init__.py
deleted file mode 100644
index 1eb71b1..0000000
--- a/workflow_examples/att-workflow/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/att_dag.py b/workflow_examples/att-workflow/att_dag.py
deleted file mode 100644
index a464176..0000000
--- a/workflow_examples/att-workflow/att_dag.py
+++ /dev/null
@@ -1,237 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example AT&T workflow using Airflow
-"""
-import json
-import logging
-import airflow
-from datetime import datetime
-from airflow import DAG
-from airflow import AirflowException
-from airflow.operators import PythonOperator
-from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
-from cord_workflow_airflow_extensions.operators import CORDModelOperator
-
-from att_helpers import *
-from att_service_instance_funcs import *
-
-
-args = {
- 'start_date': datetime.utcnow(),
- 'owner': 'ATT',
-}
-
-dag_att = DAG(
- dag_id='att_workflow_onu',
- default_args=args,
- # this dag will be triggered by external systems
- schedule_interval=None,
-)
-
-dag_att.doc_md = __doc__
-
-
-def ONU_event(model_accessor, message, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info('onu.events: received event', message=message)
-
- si = find_or_create_att_si(model_accessor, logging, message)
- if message['status'] == 'activated':
- logging.info('onu.events: activated onu', message=message)
- si.no_sync = False
- si.uni_port_id = long(message['portNumber'])
- si.of_dpid = message['deviceId']
- si.oper_onu_status = 'ENABLED'
- si.save_changed_fields(always_update_timestamp=True)
- elif message['status'] == 'disabled':
- logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
- si.oper_onu_status = 'DISABLED'
- si.save_changed_fields(always_update_timestamp=True)
- else:
- logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
- raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-
-
-def AUTH_event(model_accessor, message, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info('authentication.events: Got event for subscriber', message=message)
-
- si = find_or_create_att_si(model_accessor, logging, message)
- logging.debug('authentication.events: Updating service instance', si=si)
- si.authentication_state = message['authenticationState']
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, message, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info('dhcp.events: Got event for subscriber', message=message)
-
- si = find_or_create_att_si(model_accessor, logging, message)
- logging.debug('dhcp.events: Updating service instance', si=si)
- si.dhcp_state = message['messageType']
- si.ip_address = message['ipAddress']
- si.mac_address = message['macAddress']
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DriverService_event(model_accessor, message, si, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- event_type = message['event_type']
-
- go = False
- if event_type == 'create':
- logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
- go = True
- elif event_type == 'update':
- logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
- (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
- go = True
- elif event_type == 'delete':
- pass
- else:
- pass
-
- if not go:
- return
-
- # handle only create & update events
-
- # Changing ONU state can change auth state
- # Changing auth state can change DHCP state
- # So need to process in this order
- process_onu_state(model_accessor, si)
- process_auth_state(si)
- process_dhcp_state(si)
-
- validate_states(si)
-
- # handling the subscriber status
- # It's a combination of all the other states
- subscriber = get_subscriber(model_accessor, si.serial_number)
- if subscriber:
- update_subscriber(model_accessor, subscriber, si)
-
- si.save_changed_fields()
-
-
-onu_event_sensor = CORDEventSensor(
- task_id='onu_event_sensor',
- topic='onu.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-onu_event_handler = CORDModelOperator(
- task_id='onu_event_handler',
- python_callable=ONU_event,
- cord_event_sensor_task_id='onu_event_sensor',
- dag=dag_att
-)
-
-auth_event_sensor = CORDEventSensor(
- task_id='auth_event_sensor',
- topic='authentication.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-auth_event_handler = CORDModelOperator(
- task_id='auth_event_handler',
- python_callable=AUTH_event,
- cord_event_sensor_task_id='auth_event_sensor',
- dag=dag_att
-)
-
-dhcp_event_sensor = CORDEventSensor(
- task_id='dhcp_event_sensor',
- topic='dhcp.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-dhcp_event_handler = CORDModelOperator(
- task_id='dhcp_event_handler',
- python_callable=DHCP_event,
- cord_event_sensor_task_id='dhcp_event_sensor',
- dag=dag_att
-)
-
-att_model_event_sensor1 = CORDModelSensor(
- task_id='att_model_event_sensor1',
- model_name='AttWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-att_model_event_handler1 = CORDModelOperator(
- task_id='att_model_event_handler1',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='att_model_event_sensor1',
- dag=dag_att
-)
-
-att_model_event_sensor2 = CORDModelSensor(
- task_id='att_model_event_sensor2',
- model_name='AttWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-att_model_event_handler2 = CORDModelOperator(
- task_id='att_model_event_handler2',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='att_model_event_sensor2',
- dag=dag_att
-)
-
-att_model_event_sensor3 = CORDModelSensor(
- task_id='att_model_event_sensor3',
- model_name='AttWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_att
-)
-
-att_model_event_handler3 = CORDModelOperator(
- task_id='att_model_event_handler3',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='att_model_event_sensor3',
- dag=dag_att
-)
-
-
-onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
- auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
- dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/workflow_examples/att-workflow/att_helpers.py b/workflow_examples/att-workflow/att_helpers.py
deleted file mode 100644
index 2abd2ab..0000000
--- a/workflow_examples/att-workflow/att_helpers.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from xossynchronizer.steps.syncstep import DeferredException
-
-def validate_onu(model_accessor, logging, att_si):
- """
- This method validate an ONU against the whitelist and set the appropriate state.
- It's expected that the deferred exception is managed in the caller method,
- for example a model_policy or a sync_step.
-
- :param att_si: AttWorkflowDriverServiceInstance
- :return: [boolean, string]
- """
-
- oss_service = att_si.owner.leaf_model
-
- # See if there is a matching entry in the whitelist.
- matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
- owner_id=oss_service.id,
- )
- matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
-
- if len(matching_entries) == 0:
- logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
- return [False, "ONU not found in whitelist"]
-
- whitelisted = matching_entries[0]
- try:
- onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
- pon_port = onu.pon_port
- except IndexError:
- raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
-
- if onu.admin_state == "ADMIN_DISABLED":
- return [False, "ONU has been manually disabled"]
-
- if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
- logging.warn("ONU disable as location don't match",
- object=str(att_si),
- serial_number=att_si.serial_number,
- pon_port=pon_port.port_no,
- whitelisted_pon_port=whitelisted.pon_port_id,
- device_id=att_si.of_dpid,
- whitelisted_device_id=whitelisted.device_id,
- **att_si.tologdict())
- return [False, "ONU activated in wrong location"]
-
- return [True, "ONU has been validated"]
-
-def find_or_create_att_si(model_accessor, logging, event):
- try:
- att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
- serial_number=event["serialNumber"]
- )
- logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
- except IndexError:
- # create an AttWorkflowDriverServiceInstance, the validation will be
- # triggered in the corresponding sync step
- att_si = model_accessor.AttWorkflowDriverServiceInstance(
- serial_number=event["serialNumber"],
- of_dpid=event["deviceId"],
- uni_port_id=long(event["portNumber"]),
- # we assume there is only one AttWorkflowDriverService
- owner=model_accessor.AttWorkflowDriverService.objects.first()
- )
- logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
- return att_si
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/att_service_instance_funcs.py b/workflow_examples/att-workflow/att_service_instance_funcs.py
deleted file mode 100644
index df179f9..0000000
--- a/workflow_examples/att-workflow/att_service_instance_funcs.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from att_helpers import *
-
-# Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
-def process_onu_state(model_accessor, si):
- [valid, message] = validate_onu(model_accessor, logging, si)
- si.status_message = message
- if valid:
- si.admin_onu_state = "ENABLED"
- update_onu(model_accessor, si.serial_number, "ENABLED")
- else:
- si.admin_onu_state = "DISABLED"
- update_onu(model_accessor, si.serial_number, "DISABLED")
-
-
-# If the ONU has been disabled then we force re-authentication when it
-# is re-enabled.
-# Setting si.authentication_state = AWAITING:
-# -> subscriber status = "awaiting_auth"
-# -> service chain deleted
-# -> need authentication to restore connectivity after ONU enabled
-def process_auth_state(si):
- auth_msgs = {
- "AWAITING": " - Awaiting Authentication",
- "REQUESTED": " - Authentication requested",
- "STARTED": " - Authentication started",
- "APPROVED": " - Authentication succeeded",
- "DENIED": " - Authentication denied"
- }
- if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
- si.authentication_state = "AWAITING"
- else:
- si.status_message += auth_msgs[si.authentication_state]
-
-
-# The DhcpL2Relay ONOS app generates events that update the fields below.
-# It only sends events when it processes DHCP packets. It keeps no internal state.
-# We reset dhcp_state when:
-# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
-# -> subscriber status = "awaiting_auth"
-# -> service chain not present
-# -> subscriber's OLT flow rules, xconnect not present
-# -> DHCP packets won't go through
-# Note, however, that the DHCP state at the endpoints is not changed.
-# A previously issued DHCP lease may still be valid.
-def process_dhcp_state(si):
- if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
- si.ip_address = ""
- si.mac_address = ""
- si.dhcp_state = "AWAITING"
-
-
-# Make sure the object is in a legitimate state
-# It should be after the above processing steps
-# However this might still fail if an event has fired in the meantime
-# Valid states:
-# ONU | Auth | DHCP
-# ===============================
-# AWAITING | AWAITING | AWAITING
-# ENABLED | * | AWAITING
-# ENABLED | APPROVED | *
-# DISABLED | AWAITING | AWAITING
-def validate_states(si):
- if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
- "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
- return
- if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
- return
- logging.warning(
- "MODEL_POLICY (validate_states): invalid state combination",
- onu_state=si.admin_onu_state,
- auth_state=si.authentication_state,
- dhcp_state=si.dhcp_state)
-
-
-def update_onu(model_accessor, serial_number, admin_state):
- onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
- == serial_number.lower()][0]
- if onu.admin_state == "ADMIN_DISABLED":
- logging.debug(
- "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
- (serial_number, admin_state))
- return
- if onu.admin_state == admin_state:
- logging.debug(
- "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
- (serial_number, admin_state))
- else:
- logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
- onu.admin_state = admin_state
- onu.save_changed_fields(always_update_timestamp=True)
-
-
-def get_subscriber(model_accessor, serial_number):
- try:
- return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
- == serial_number.lower()][0]
- except IndexError:
- # If the subscriber doesn't exist we don't do anything
- logging.debug(
- "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
- onu_device=serial_number)
- return None
-
-
-def update_subscriber_ip(model_accessor, subscriber, ip):
- # TODO check if the subscriber has an IP and update it,
- # or create a new one
- try:
- ip = model_accessor.RCORDIpAddress.objects.filter(
- subscriber_id=subscriber.id,
- ip=ip
- )[0]
- logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
- onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
- ip.save_changed_fields()
- except IndexError:
- logging.debug(
- "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
- onu_device=subscriber.onu_device,
- subscriber_status=subscriber.status,
- ip=ip)
- ip = model_accessor.RCORDIpAddress(
- subscriber_id=subscriber.id,
- ip=ip,
- description="DHCP Assigned IP Address"
- )
- ip.save()
-
-
-def delete_subscriber_ip(model_accessor, subscriber, ip):
- try:
- ip = model_accessor.RCORDIpAddress.objects.filter(
- subscriber_id=subscriber.id,
- ip=ip
- )[0]
- logging.debug(
- "MODEL_POLICY: delete RCORDIpAddress for subscriber",
- onu_device=subscriber.onu_device,
- subscriber_status=subscriber.status,
- ip=ip)
- ip.delete()
- except BaseException:
- logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
-
-
-def update_subscriber(model_accessor, subscriber, si):
- cur_status = subscriber.status
- # Don't change state if someone has disabled the subscriber
- if subscriber.status != "disabled":
- if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
- subscriber.status = "awaiting-auth"
- elif si.authentication_state == "APPROVED":
- subscriber.status = "enabled"
- elif si.authentication_state == "DENIED":
- subscriber.status = "auth-failed"
-
- # NOTE we save the subscriber only if:
- # - the status has changed
- # - we get a DHCPACK event
- if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
- logging.debug(
- "MODEL_POLICY: updating subscriber",
- onu_device=subscriber.onu_device,
- authentication_state=si.authentication_state,
- subscriber_status=subscriber.status)
- if subscriber.status == "awaiting-auth":
- delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
- subscriber.mac_address = ""
- elif si.ip_address and si.mac_address:
- update_subscriber_ip(model_accessor, subscriber, si.ip_address)
- subscriber.mac_address = si.mac_address
- subscriber.save_changed_fields(always_update_timestamp=True)
- else:
- logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
- authentication_state=si.authentication_state, subscriber_status=subscriber.status)
diff --git a/workflow_examples/att-workflow/att_workflow.py b/workflow_examples/att-workflow/att_workflow.py
new file mode 100644
index 0000000..0ad0ae7
--- /dev/null
+++ b/workflow_examples/att-workflow/att_workflow.py
@@ -0,0 +1,462 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+log = logging.getLogger(__name__)
+args = {
+ # hard coded date
+ 'start_date': datetime(2019, 1, 1),
+ 'owner': 'ATT'
+}
+
+dag_att = DAG(
+ dag_id='att_workflow',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_att.doc_md = __doc__
+
+dag_att_admin = DAG(
+ dag_id='att_admin_workflow',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_att_admin.doc_md = __doc__
+
+def find_or_create_att_si(model_accessor, event):
+ try:
+ att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+ serial_number=event["serialNumber"]
+ )
+ log.debug("Found existing AttWorkflowDriverServiceInstance - si = %s" % att_si)
+ except IndexError:
+ # create an AttWorkflowDriverServiceInstance, the validation will be
+ # triggered in the corresponding sync step
+ att_si = model_accessor.AttWorkflowDriverServiceInstance(
+ serial_number=event["serialNumber"],
+ of_dpid=event["deviceId"],
+ uni_port_id=long(event["portNumber"]),
+ # we assume there is only one AttWorkflowDriverService
+ owner=model_accessor.AttWorkflowDriverService.objects.first()
+ )
+ log.debug("Created new AttWorkflowDriverServiceInstance - si = %s" % att_si)
+ return att_si
+
+
+def validate_onu(model_accessor, si):
+ """
+ This method validate an ONU against the whitelist and set the appropriate state.
+ It's expected that the deferred exception is managed in the caller method,
+ for example a model_policy or a sync_step.
+
+ :param si: AttWorkflowDriverServiceInstance
+ :return: [boolean, string]
+ """
+
+ oss_service = si.owner.leaf_model
+
+ # See if there is a matching entry in the whitelist.
+ matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+ owner_id=oss_service.id,
+ )
+ matching_entries = [e for e in matching_entries if e.serial_number.lower() == si.serial_number.lower()]
+
+ if len(matching_entries) == 0:
+ log.warn("ONU not found in whitelist - serial_number = %s" % si.serial_number)
+ return [False, "ONU not found in whitelist"]
+
+ whitelisted = matching_entries[0]
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=si.serial_number)
+ pon_port = onu.pon_port
+ except IndexError:
+ raise DeferredException("ONU device %s is not know to XOS yet" % si.serial_number)
+
+ if onu.admin_state == "ADMIN_DISABLED":
+ return [False, "ONU has been manually disabled"]
+
+ if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
+ log.warn("ONU disable as location don't match - serial_number = %s, device_id= %s" % (si.serial_number, si.of_dpid))
+ return [False, "ONU activated in wrong location"]
+
+ return [True, "ONU has been validated"]
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+ onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+ == serial_number.lower()][0]
+ if onu.admin_state == "ADMIN_DISABLED":
+ log.debug(
+ "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+ (serial_number, admin_state))
+ return
+ if onu.admin_state == admin_state:
+ log.debug(
+ "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+ (serial_number, admin_state))
+ else:
+ log.debug(
+ "MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" %
+ (serial_number, admin_state))
+ onu.admin_state = admin_state
+ onu.save_changed_fields(always_update_timestamp=True)
+
+
+def process_onu_state(model_accessor, si):
+ """
+ Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
+ """
+
+ [valid, message] = validate_onu(model_accessor, si)
+ si.status_message = message
+ if valid:
+ si.admin_onu_state = "ENABLED"
+ update_onu(model_accessor, si.serial_number, "ENABLED")
+ else:
+ si.admin_onu_state = "DISABLED"
+ update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+def process_auth_state(si):
+ """
+ If the ONU has been disabled then we force re-authentication when it
+ is re-enabled.
+ Setting si.authentication_state = AWAITING:
+ -> subscriber status = "awaiting_auth"
+ -> service chain deleted
+ -> need authentication to restore connectivity after ONU enabled
+ """
+
+ auth_msgs = {
+ "AWAITING": " - Awaiting Authentication",
+ "REQUESTED": " - Authentication requested",
+ "STARTED": " - Authentication started",
+ "APPROVED": " - Authentication succeeded",
+ "DENIED": " - Authentication denied"
+ }
+ if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+ si.authentication_state = "AWAITING"
+ else:
+ si.status_message += auth_msgs[si.authentication_state]
+
+
+def process_dhcp_state(si):
+ """
+ The DhcpL2Relay ONOS app generates events that update the fields below.
+ It only sends events when it processes DHCP packets. It keeps no internal state.
+ We reset dhcp_state when:
+ si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+ -> subscriber status = "awaiting_auth"
+ -> service chain not present
+ -> subscriber's OLT flow rules, xconnect not present
+ -> DHCP packets won't go through
+ Note, however, that the DHCP state at the endpoints is not changed.
+ A previously issued DHCP lease may still be valid.
+ """
+
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ si.ip_address = ""
+ si.mac_address = ""
+ si.dhcp_state = "AWAITING"
+
+
+def validate_states(si):
+ """
+ Make sure the object is in a legitimate state
+ It should be after the above processing steps
+ However this might still fail if an event has fired in the meantime
+ Valid states:
+ ONU | Auth | DHCP
+ ===============================
+ AWAITING | AWAITING | AWAITING
+ ENABLED | * | AWAITING
+ ENABLED | APPROVED | *
+ DISABLED | AWAITING | AWAITING
+ """
+
+ if (si.admin_onu_state == "AWAITING" or si.admin_onu_state == "DISABLED") and \
+ si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+ return
+
+ if si.admin_onu_state == "ENABLED" and \
+ (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+ return
+
+ log.warning(
+ "validate_states: invalid state combination - onu_state = %s, \
+ auth_state = %s, dhcp_state = %s" %
+ (si.admin_onu_state, si.authentication_state, si.dhcp_state)
+ )
+
+
+def get_subscriber(model_accessor, serial_number):
+ try:
+ return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+ == serial_number.lower()][0]
+ except IndexError:
+ # If the subscriber doesn't exist we don't do anything
+ log.debug(
+ "subscriber does not exists for this SI, doing nothing - onu_device = %s" %
+ serial_number
+ )
+ return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+ # TODO check if the subscriber has an IP and update it,
+ # or create a new one
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ log.debug(
+ "found existing RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip
+ )
+ ip.save_changed_fields()
+ except IndexError:
+ log.debug(
+ "Creating new RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip = model_accessor.RCORDIpAddress(
+ subscriber_id=subscriber.id,
+ ip=ip,
+ description="DHCP Assigned IP Address"
+ )
+ ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ log.debug(
+ "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip.delete()
+ except BaseException:
+ log.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+ cur_status = subscriber.status
+ # Don't change state if someone has disabled the subscriber
+ if subscriber.status != "disabled":
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "APPROVED":
+ subscriber.status = "enabled"
+ elif si.authentication_state == "DENIED":
+ subscriber.status = "auth-failed"
+
+ # NOTE we save the subscriber only if:
+ # - the status has changed
+ # - we get a DHCPACK event
+ if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+ log.debug(
+ "updating subscriber",
+ onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state,
+ subscriber_status=subscriber.status
+ )
+
+ if subscriber.status == "awaiting-auth":
+ delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = ""
+ elif si.ip_address and si.mac_address:
+ update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = si.mac_address
+ subscriber.save_changed_fields(always_update_timestamp=True)
+ else:
+ log.debug(
+ "subscriber status has not changed",
+ onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state,
+ subscriber_status=subscriber.status
+ )
+
+
+def update_model(model_accessor, si):
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+ log.info('onu.events: received an event - message = %s' % message)
+
+ si = find_or_create_att_si(model_accessor, message)
+ if message['status'] == 'activated':
+ log.info('onu.events: activated onu')
+ si.no_sync = False
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
+ elif message['status'] == 'disabled':
+ log.info('onu.events: disabled onu, resetting the subscriber')
+ si.oper_onu_status = 'DISABLED'
+ else:
+ log.error('onu.events: Unknown status value: %s' % message['status'])
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'])
+
+ update_model(model_accessor, si)
+
+def on_auth_event(model_accessor, message, **kwargs):
+ log.info('authentication.events: received an event - message = %s' % message)
+
+ si = find_or_create_att_si(model_accessor, message)
+ log.debug('authentication.events: Updating service instance')
+ si.authentication_state = message['authenticationState']
+ update_model(model_accessor, si)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+ log.info('dhcp.events: received an event - message = %s' % message)
+
+ si = find_or_create_att_si(model_accessor, message)
+ log.debug('dhcp.events: Updating service instance')
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ update_model(model_accessor, si)
+
+
+def DriverService_event(model_accessor, message, **kwargs):
+ log.info('model event: received an event - %s' % message)
+
+ # handle only create & update events
+ event_type = message['event_type']
+ if event_type is None or event_type.lower() not in ['create', 'update']:
+ log.error('can not handle an event type - %s' % event_type)
+ return
+
+ si = find_or_create_att_si(model_accessor, message)
+ update_model(model_accessor, si)
+
+
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
+ topic='onu.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
+ python_callable=on_onu_event,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
+)
+
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+auth_event_handler = CORDModelOperator(
+ task_id='auth_event_handler',
+ python_callable=on_auth_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
+)
+
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+dhcp_event_handler = CORDModelOperator(
+ task_id='dhcp_event_handler',
+ python_callable=on_dhcp_event,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
+)
+
+join = DummyOperator(
+ task_id='join',
+ trigger_rule=TriggerRule.ALL_DONE,
+ dag=dag_att
+)
+
+att_model_event_sensor = CORDModelSensor(
+ task_id='att_model_event_sensor',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att_admin
+)
+
+att_model_event_handler = CORDModelOperator(
+ task_id='att_model_event_handler',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor',
+ dag=dag_att_admin
+)
+
+# handle standard flow
+onu_event_sensor >> onu_event_handler >> join
+auth_event_sensor >> auth_event_handler >> join
+dhcp_event_sensor >> dhcp_event_handler >> join
+
+# handle admin flow
+att_model_event_sensor >> att_model_event_handler
+
diff --git a/workflow_examples/att-workflow/att_workflow_essence.json b/workflow_examples/att-workflow/att_workflow_essence.json
new file mode 100644
index 0000000..50a9441
--- /dev/null
+++ b/workflow_examples/att-workflow/att_workflow_essence.json
@@ -0,0 +1,177 @@
+{
+ "att_admin_workflow": {
+ "dag": {
+ "dag_id": "att_admin_workflow",
+ "local_variable": "dag_att_admin"
+ },
+ "dependencies": {
+ "att_model_event_handler": {
+ "parents": [
+ "att_model_event_sensor"
+ ]
+ },
+ "att_model_event_sensor": {
+ "children": [
+ "att_model_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "att_model_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor",
+ "dag": "dag_att_admin",
+ "dag_id": "att_admin_workflow",
+ "local_variable": "att_model_event_handler",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler"
+ },
+ "att_model_event_sensor": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att_admin",
+ "dag_id": "att_admin_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor"
+ }
+ }
+ },
+ "att_workflow": {
+ "dag": {
+ "dag_id": "att_workflow",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
+ ]
+ },
+ "join": {
+ "parents": [
+ "onu_event_handler",
+ "auth_event_handler",
+ "dhcp_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "local_variable": "auth_event_handler",
+ "python_callable": "on_auth_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
+ "topic": "authentication.events"
+ },
+ "dhcp_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "on_dhcp_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_sensor",
+ "poke_interval": 5,
+ "task_id": "dhcp_event_sensor",
+ "topic": "dhcp.events"
+ },
+ "join": {
+ "class": "DummyOperator",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "local_variable": "join",
+ "task_id": "join",
+ "trigger_rule": {
+ "Attribute": {
+ "attr": "ALL_DONE",
+ "ctx": "Load",
+ "value": {
+ "Name": {
+ "ctx": "Load",
+ "id": "TriggerRule"
+ }
+ }
+ }
+ }
+ },
+ "onu_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "local_variable": "onu_event_handler",
+ "python_callable": "on_onu_event",
+ "task_id": "onu_event_handler"
+ },
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_sensor",
+ "poke_interval": 5,
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/workflow_examples/emit_events_model.sh b/workflow_examples/emit_events_model.sh
deleted file mode 100644
index 6adbcbb..0000000
--- a/workflow_examples/emit_events_model.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#! /bin/bash
-
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# emit events
-python ./workflow_ctl.py emit "datamodel.cordWorkflowDriverServiceInstance" "{'event_type': 'update', 'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py
new file mode 100644
index 0000000..42e0e2d
--- /dev/null
+++ b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py
@@ -0,0 +1,147 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example parallel workflow
+"""
+import json
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+
+log = logging.getLogger(__name__)
+args = {
+ # hard coded date
+ 'start_date': datetime(2019, 1, 1),
+ 'owner': 'iychoi'
+}
+
+dag_parallel_cord = DAG(
+ dag_id='parallel_cord_workflow',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_parallel_cord.doc_md = __doc__
+
+dag_parallel_cord_admin = DAG(
+ dag_id='parallel_cord_workflow_admin',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_parallel_cord_admin.doc_md = __doc__
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+ log.info('onu.events: received an event - %s' % message)
+
+
+def on_auth_event(model_accessor, message, **kwargs):
+ log.info('authentication.events: received an event - %s' % message)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+ log.info('dhcp.events: received an event - %s' % message)
+
+
+def on_model_event(model_accessor, message, **kwargs):
+ log.info('model event: received an event - %s' % message)
+
+
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
+ topic='onu.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_parallel_cord
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
+ python_callable=on_onu_event,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_parallel_cord
+)
+
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_parallel_cord
+)
+
+auth_event_handler = CORDModelOperator(
+ task_id='auth_event_handler',
+ python_callable=on_auth_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_parallel_cord
+)
+
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_parallel_cord
+)
+
+dhcp_event_handler = CORDModelOperator(
+ task_id='dhcp_event_handler',
+ python_callable=on_dhcp_event,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_parallel_cord
+)
+
+join = DummyOperator(
+ task_id='join',
+ trigger_rule=TriggerRule.ALL_DONE,
+ dag=dag_parallel_cord
+)
+
+att_model_event_sensor = CORDModelSensor(
+ task_id='att_model_event_sensor',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_parallel_cord_admin
+)
+
+att_model_event_handler = CORDModelOperator(
+ task_id='att_model_event_handler',
+ python_callable=on_model_event,
+ cord_event_sensor_task_id='att_model_event_sensor',
+ dag=dag_parallel_cord_admin
+)
+
+# handle standard flow
+onu_event_sensor >> onu_event_handler >> join
+auth_event_sensor >> auth_event_handler >> join
+dhcp_event_sensor >> dhcp_event_handler >> join
+
+# handle admin flow
+att_model_event_sensor >> att_model_event_handler
+
diff --git a/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json
new file mode 100644
index 0000000..713426c
--- /dev/null
+++ b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json
@@ -0,0 +1,177 @@
+{
+ "parallel_cord_workflow": {
+ "dag": {
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "dag_parallel_cord"
+ },
+ "dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
+ ]
+ },
+ "join": {
+ "parents": [
+ "onu_event_handler",
+ "auth_event_handler",
+ "dhcp_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "join"
+ ],
+ "parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "auth_event_handler",
+ "python_callable": "on_auth_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
+ "topic": "authentication.events"
+ },
+ "dhcp_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "on_dhcp_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_sensor",
+ "poke_interval": 5,
+ "task_id": "dhcp_event_sensor",
+ "topic": "dhcp.events"
+ },
+ "join": {
+ "class": "DummyOperator",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "join",
+ "task_id": "join",
+ "trigger_rule": {
+ "Attribute": {
+ "attr": "ALL_DONE",
+ "ctx": "Load",
+ "value": {
+ "Name": {
+ "ctx": "Load",
+ "id": "TriggerRule"
+ }
+ }
+ }
+ }
+ },
+ "onu_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "local_variable": "onu_event_handler",
+ "python_callable": "on_onu_event",
+ "task_id": "onu_event_handler"
+ },
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_parallel_cord",
+ "dag_id": "parallel_cord_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_sensor",
+ "poke_interval": 5,
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
+ }
+ }
+ },
+ "parallel_cord_workflow_admin": {
+ "dag": {
+ "dag_id": "parallel_cord_workflow_admin",
+ "local_variable": "dag_parallel_cord_admin"
+ },
+ "dependencies": {
+ "att_model_event_handler": {
+ "parents": [
+ "att_model_event_sensor"
+ ]
+ },
+ "att_model_event_sensor": {
+ "children": [
+ "att_model_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "att_model_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor",
+ "dag": "dag_parallel_cord_admin",
+ "dag_id": "parallel_cord_workflow_admin",
+ "local_variable": "att_model_event_handler",
+ "python_callable": "on_model_event",
+ "task_id": "att_model_event_handler"
+ },
+ "att_model_event_sensor": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_parallel_cord_admin",
+ "dag_id": "parallel_cord_workflow_admin",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/workflow_examples/connection_setup.sh b/workflow_examples/scripts/connection_setup.sh
similarity index 100%
rename from workflow_examples/connection_setup.sh
rename to workflow_examples/scripts/connection_setup.sh
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_auth.sh
similarity index 83%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_auth.sh
index 03888e7..3edd9de 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_auth.sh
@@ -15,4 +15,4 @@
# limitations under the License.
# emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "authentication.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_dhcp.sh
similarity index 84%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_dhcp.sh
index 03888e7..f112232 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_dhcp.sh
@@ -15,4 +15,4 @@
# limitations under the License.
# emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "dhcp.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_model.sh
similarity index 78%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_model.sh
index 03888e7..8c481c2 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_model.sh
@@ -15,4 +15,4 @@
# limitations under the License.
# emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "datamodel.cordWorkflowDriverServiceInstance" "{'event_type': 'update', 'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_onu.sh
similarity index 84%
rename from workflow_examples/emit_events_onu.sh
rename to workflow_examples/scripts/emit_events_onu.sh
index 03888e7..a9038d7 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_onu.sh
@@ -15,4 +15,4 @@
# limitations under the License.
# emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/register_essence.sh b/workflow_examples/scripts/register_essence.sh
similarity index 69%
rename from workflow_examples/register_essence.sh
rename to workflow_examples/scripts/register_essence.sh
index 1d1f4c0..9bd8395 100644
--- a/workflow_examples/register_essence.sh
+++ b/workflow_examples/scripts/register_essence.sh
@@ -15,5 +15,7 @@
# limitations under the License.
# register essence
-python ./workflow_ctl.py reg ./simple_cord_workflow_essence.json
-python ./workflow_ctl.py reg ./simple_airflow_workflow_essence.json
+python ../workflow_ctl.py reg ../sequential_cord_workflow_essence.json
+python ../workflow_ctl.py reg ../parallel_cord_workflow_essence.json
+python ../workflow_ctl.py reg ../simple_airflow_workflow_essence.json
+python ../workflow_ctl.py reg ../att_workflow_essence.json
diff --git a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
new file mode 100644
index 0000000..40dfe6e
--- /dev/null
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
@@ -0,0 +1,158 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example sequential workflow
+"""
+
+
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+
+log = logging.getLogger(__name__)
+
+args = {
+ # hard coded date
+ 'start_date': datetime(2019, 1, 1),
+ 'owner': 'iychoi'
+}
+
+dag_sequential_cord = DAG(
+ dag_id='sequential_cord_workflow',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None
+)
+dag_sequential_cord.doc_md = __doc__
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+ log.info('onu.events: received an event - %s' % message)
+
+
+def on_auth_event(model_accessor, message, **kwargs):
+ log.info('authentication.events: received an event - %s' % message)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+ log.info('dhcp.events: received an event - %s' % message)
+
+
+def on_model_event(model_accessor, message, **kwargs):
+ log.info('model event: received an event - %s' % message)
+
+
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
+ topic='onu.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
+ python_callable=on_onu_event,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_sequential_cord
+)
+
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+auth_event_handler = CORDModelOperator(
+ task_id='auth_event_handler',
+ python_callable=on_auth_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_sequential_cord
+)
+
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+dhcp_event_handler = CORDModelOperator(
+ task_id='dhcp_event_handler',
+ python_callable=on_dhcp_event,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_sequential_cord
+)
+
+cord_model_event_sensor1 = CORDModelSensor(
+ task_id='cord_model_event_sensor1',
+ model_name='cordWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+cord_model_event_handler1 = CORDModelOperator(
+ task_id='cord_model_event_handler1',
+ python_callable=on_model_event,
+ cord_event_sensor_task_id='cord_model_event_sensor1',
+ dag=dag_sequential_cord
+)
+
+cord_model_event_sensor2 = CORDModelSensor(
+ task_id='cord_model_event_sensor2',
+ model_name='cordWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+cord_model_event_handler2 = CORDModelOperator(
+ task_id='cord_model_event_handler2',
+ python_callable=on_model_event,
+ cord_event_sensor_task_id='cord_model_event_sensor2',
+ dag=dag_sequential_cord
+)
+
+cord_model_event_sensor3 = CORDModelSensor(
+ task_id='cord_model_event_sensor3',
+ model_name='cordWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_sequential_cord
+)
+
+cord_model_event_handler3 = CORDModelOperator(
+ task_id='cord_model_event_handler3',
+ python_callable=on_model_event,
+ cord_event_sensor_task_id='cord_model_event_sensor3',
+ dag=dag_sequential_cord
+)
+
+onu_event_sensor >> onu_event_handler >> cord_model_event_sensor1 >> cord_model_event_handler1 >> \
+ auth_event_sensor >> auth_event_handler >> cord_model_event_sensor2 >> cord_model_event_handler2 >> \
+ dhcp_event_sensor >> dhcp_event_handler >> cord_model_event_sensor3 >> cord_model_event_handler3
diff --git a/workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
similarity index 90%
rename from workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json
rename to workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
index 9d4499b..f8b6ba6 100644
--- a/workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
@@ -2,9 +2,25 @@
"simple_cord_workflow": {
"dag": {
"dag_id": "simple_cord_workflow",
- "local_variable": "dag_cord"
+ "local_variable": "dag_sequential_cord"
},
"dependencies": {
+ "auth_event_handler": {
+ "children": [
+ "cord_model_event_sensor2"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "cord_model_event_handler1"
+ ]
+ },
"cord_model_event_handler1": {
"children": [
"auth_event_sensor"
@@ -50,22 +66,6 @@
"dhcp_event_handler"
]
},
- "auth_event_handler": {
- "children": [
- "cord_model_event_sensor2"
- ],
- "parents": [
- "auth_event_sensor"
- ]
- },
- "auth_event_sensor": {
- "children": [
- "auth_event_handler"
- ],
- "parents": [
- "cord_model_event_handler1"
- ]
- },
"dhcp_event_handler": {
"children": [
"cord_model_event_sensor3"
@@ -97,19 +97,39 @@
}
},
"tasks": {
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_sequential_cord",
+ "dag_id": "simple_cord_workflow",
+ "local_variable": "auth_event_handler",
+ "python_callable": "on_auth_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_sequential_cord",
+ "dag_id": "simple_cord_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
+ "topic": "authentication.events"
+ },
"cord_model_event_handler1": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "cord_model_event_sensor1",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"local_variable": "cord_model_event_handler1",
- "python_callable": "DriverService_event",
+ "python_callable": "on_model_event",
"task_id": "cord_model_event_handler1"
},
"cord_model_event_handler2": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "cord_model_event_sensor2",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"local_variable": "cord_model_event_handler2",
"python_callable": "DriverService_event",
@@ -118,7 +138,7 @@
"cord_model_event_handler3": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "cord_model_event_sensor3",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"local_variable": "cord_model_event_handler3",
"python_callable": "DriverService_event",
@@ -127,7 +147,7 @@
"cord_model_event_sensor1": {
"class": "CORDModelSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"key_field": "serialNumber",
"local_variable": "cord_model_event_sensor1",
@@ -138,7 +158,7 @@
"cord_model_event_sensor2": {
"class": "CORDModelSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"key_field": "serialNumber",
"local_variable": "cord_model_event_sensor2",
@@ -149,7 +169,7 @@
"cord_model_event_sensor3": {
"class": "CORDModelSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"key_field": "serialNumber",
"local_variable": "cord_model_event_sensor3",
@@ -157,39 +177,19 @@
"poke_interval": 5,
"task_id": "cord_model_event_sensor3"
},
- "auth_event_handler": {
- "class": "CORDModelOperator",
- "cord_event_sensor_task_id": "auth_event_sensor",
- "dag": "dag_cord",
- "dag_id": "simple_cord_workflow",
- "local_variable": "auth_event_handler",
- "python_callable": "AUTH_event",
- "task_id": "auth_event_handler"
- },
- "auth_event_sensor": {
- "class": "CORDEventSensor",
- "controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
- "dag_id": "simple_cord_workflow",
- "key_field": "serialNumber",
- "local_variable": "auth_event_sensor",
- "poke_interval": 5,
- "task_id": "auth_event_sensor",
- "topic": "authentication.events"
- },
"dhcp_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "dhcp_event_sensor",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"local_variable": "dhcp_event_handler",
- "python_callable": "DHCP_event",
+ "python_callable": "on_dhcp_event",
"task_id": "dhcp_event_handler"
},
"dhcp_event_sensor": {
"class": "CORDEventSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"key_field": "serialNumber",
"local_variable": "dhcp_event_sensor",
@@ -200,16 +200,16 @@
"onu_event_handler": {
"class": "CORDModelOperator",
"cord_event_sensor_task_id": "onu_event_sensor",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"local_variable": "onu_event_handler",
- "python_callable": "ONU_event",
+ "python_callable": "on_onu_event",
"task_id": "onu_event_handler"
},
"onu_event_sensor": {
"class": "CORDEventSensor",
"controller_conn_id": "local_cord_controller",
- "dag": "dag_cord",
+ "dag": "dag_sequential_cord",
"dag_id": "simple_cord_workflow",
"key_field": "serialNumber",
"local_variable": "onu_event_sensor",
diff --git a/workflow_examples/simple-cord-workflow/simple_cord_workflow.py b/workflow_examples/simple-cord-workflow/simple_cord_workflow.py
deleted file mode 100644
index 0eee50d..0000000
--- a/workflow_examples/simple-cord-workflow/simple_cord_workflow.py
+++ /dev/null
@@ -1,235 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example CORD workflow using Airflow
-"""
-
-
-import logging
-from datetime import datetime
-from airflow import DAG
-from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
-from airflow.operators.cord_workflow_plugin import CORDModelOperator
-
-
-log = logging.getLogger(__name__)
-
-args = {
- # hard coded date
- 'start_date': datetime(2019, 1, 1),
- 'owner': 'iychoi'
-}
-
-dag_cord = DAG(
- dag_id='simple_cord_workflow',
- default_args=args,
- # this dag will be triggered by external systems
- schedule_interval=None
-)
-
-dag_cord.doc_md = __doc__
-
-
-def ONU_event(model_accessor, message, **kwargs):
- log.info('onu.events: received an event - %s' % message)
-
- """
- si = find_or_create_cord_si(model_accessor, logging, message)
- if message['status'] == 'activated':
- logging.info('onu.events: activated onu', message=message)
- si.no_sync = False
- si.uni_port_id = long(message['portNumber'])
- si.of_dpid = message['deviceId']
- si.oper_onu_status = 'ENABLED'
- si.save_changed_fields(always_update_timestamp=True)
- elif message['status'] == 'disabled':
- logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
- si.oper_onu_status = 'DISABLED'
- si.save_changed_fields(always_update_timestamp=True)
- else:
- logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
- raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
- """
-
-
-def AUTH_event(model_accessor, message, **kwargs):
- log.info('authentication.events: received an event - %s' % message)
-
- """
- si = find_or_create_cord_si(model_accessor, logging, message)
- logging.debug('authentication.events: Updating service instance', si=si)
- si.authentication_state = message['authenticationState']
- si.save_changed_fields(always_update_timestamp=True)
- """
-
-
-def DHCP_event(model_accessor, message, **kwargs):
- log.info('dhcp.events: received an event - %s' % message)
-
- """
- si = find_or_create_cord_si(model_accessor, logging, message)
- logging.debug('dhcp.events: Updating service instance', si=si)
- si.dhcp_state = message['messageType']
- si.ip_address = message['ipAddress']
- si.mac_address = message['macAddress']
- si.save_changed_fields(always_update_timestamp=True)
- """
-
-
-def DriverService_event(model_accessor, message, **kwargs):
- log.info('model event: received an event - %s' % message)
-
- """
- event_type = message['event_type']
-
- go = False
- si = find_or_create_cord_si(model_accessor, logging, message)
-
- if event_type == 'create':
- logging.debug('MODEL_POLICY: handle_create for cordWorkflowDriverServiceInstance %s ' % si.id)
- go = True
- elif event_type == 'update':
- logging.debug('MODEL_POLICY: handle_update for cordWorkflowDriverServiceInstance %s ' %
- (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
- go = True
- elif event_type == 'delete':
- pass
- else:
- pass
-
- if not go:
- return
-
- # handle only create & update events
-
- # Changing ONU state can change auth state
- # Changing auth state can change DHCP state
- # So need to process in this order
- process_onu_state(model_accessor, si)
- process_auth_state(si)
- process_dhcp_state(si)
-
- validate_states(si)
-
- # handling the subscriber status
- # It's a combination of all the other states
- subscriber = get_subscriber(model_accessor, si.serial_number)
- if subscriber:
- update_subscriber(model_accessor, subscriber, si)
-
- si.save_changed_fields()
- """
-
-
-onu_event_sensor = CORDEventSensor(
- task_id='onu_event_sensor',
- topic='onu.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-onu_event_handler = CORDModelOperator(
- task_id='onu_event_handler',
- python_callable=ONU_event,
- cord_event_sensor_task_id='onu_event_sensor',
- dag=dag_cord
-)
-
-auth_event_sensor = CORDEventSensor(
- task_id='auth_event_sensor',
- topic='authentication.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-auth_event_handler = CORDModelOperator(
- task_id='auth_event_handler',
- python_callable=AUTH_event,
- cord_event_sensor_task_id='auth_event_sensor',
- dag=dag_cord
-)
-
-dhcp_event_sensor = CORDEventSensor(
- task_id='dhcp_event_sensor',
- topic='dhcp.events',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-dhcp_event_handler = CORDModelOperator(
- task_id='dhcp_event_handler',
- python_callable=DHCP_event,
- cord_event_sensor_task_id='dhcp_event_sensor',
- dag=dag_cord
-)
-
-cord_model_event_sensor1 = CORDModelSensor(
- task_id='cord_model_event_sensor1',
- model_name='cordWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-cord_model_event_handler1 = CORDModelOperator(
- task_id='cord_model_event_handler1',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='cord_model_event_sensor1',
- dag=dag_cord
-)
-
-cord_model_event_sensor2 = CORDModelSensor(
- task_id='cord_model_event_sensor2',
- model_name='cordWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-cord_model_event_handler2 = CORDModelOperator(
- task_id='cord_model_event_handler2',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='cord_model_event_sensor2',
- dag=dag_cord
-)
-
-cord_model_event_sensor3 = CORDModelSensor(
- task_id='cord_model_event_sensor3',
- model_name='cordWorkflowDriverServiceInstance',
- key_field='serialNumber',
- controller_conn_id='local_cord_controller',
- poke_interval=5,
- dag=dag_cord
-)
-
-cord_model_event_handler3 = CORDModelOperator(
- task_id='cord_model_event_handler3',
- python_callable=DriverService_event,
- cord_event_sensor_task_id='cord_model_event_sensor3',
- dag=dag_cord
-)
-
-
-onu_event_sensor >> onu_event_handler >> cord_model_event_sensor1 >> cord_model_event_handler1 >> \
- auth_event_sensor >> auth_event_handler >> cord_model_event_sensor2 >> cord_model_event_handler2 >> \
- dhcp_event_sensor >> dhcp_event_handler >> cord_model_event_sensor3 >> cord_model_event_handler3