Implement workflow essence extractor
- Extract useful information from airflow workflow code
- Produce "essence" as a json output
- Output will be passed to workflow controller for workflow management
No source code change.
Change-Id: I01de9939fdf699522e81c369676c33c73a38b4bc
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..cd1437d
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,3 @@
+# Workflow examples
+
+This directory contains workflow examples.
diff --git a/examples/att-workflow/README.md b/examples/att-workflow/README.md
new file mode 100644
index 0000000..e211d2a
--- /dev/null
+++ b/examples/att-workflow/README.md
@@ -0,0 +1,5 @@
+# AT&T workflow example
+
+This is not a working version. This code is only to show how a new workflow implementation will look like.
+
+Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/examples/att-workflow/__init__.py b/examples/att-workflow/__init__.py
new file mode 100644
index 0000000..1eb71b1
--- /dev/null
+++ b/examples/att-workflow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/examples/att-workflow/att_dag.py b/examples/att-workflow/att_dag.py
new file mode 100644
index 0000000..7df5d03
--- /dev/null
+++ b/examples/att-workflow/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("onu.events: received event", event=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ if event["status"] == "activated":
+ logging.info("onu.events: activated onu", value=event)
+ si.no_sync = False
+ si.uni_port_id = long(event["portNumber"])
+ si.of_dpid = event["deviceId"]
+ si.oper_onu_status = "ENABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ elif event["status"] == "disabled":
+ logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+ si.oper_onu_status = "DISABLED"
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+ raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ go = False
+ if event_type == 'create':
+ logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("authentication.events: Updating service instance", si=si)
+ si.authentication_state = event["authenticationState"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+ si = find_or_create_att_si(model_accessor, logging, event)
+ logging.debug("dhcp.events: Updating service instance", si=si)
+ si.dhcp_state = event["messageType"]
+ si.ip_address = event["ipAddress"]
+ si.mac_address = event["macAddress"]
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+ task_id='onu_event_handler',
+ topic='onu.events',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=ONU_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+ task_id='onu_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+ task_id='auth_event_handler',
+ topic="authentication.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=Auth_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+ task_id='auth_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+ task_id='dhcp_event_handler',
+ topic="dhcp.events",
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DHCP_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+ task_id='dhcp_model_event_handler',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ provide_context=True,
+ python_callable=DriverService_event,
+ poke_interval=5,
+ dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+ auth_event_handler >> auth_model_event_handler >> \
+ dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/examples/att-workflow/att_helpers.py b/examples/att-workflow/att_helpers.py
new file mode 100644
index 0000000..2abd2ab
--- /dev/null
+++ b/examples/att-workflow/att_helpers.py
@@ -0,0 +1,79 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.steps.syncstep import DeferredException
+
+def validate_onu(model_accessor, logging, att_si):
+ """
+ This method validate an ONU against the whitelist and set the appropriate state.
+ It's expected that the deferred exception is managed in the caller method,
+ for example a model_policy or a sync_step.
+
+ :param att_si: AttWorkflowDriverServiceInstance
+ :return: [boolean, string]
+ """
+
+ oss_service = att_si.owner.leaf_model
+
+ # See if there is a matching entry in the whitelist.
+ matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+ owner_id=oss_service.id,
+ )
+ matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
+
+ if len(matching_entries) == 0:
+ logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
+ return [False, "ONU not found in whitelist"]
+
+ whitelisted = matching_entries[0]
+ try:
+ onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
+ pon_port = onu.pon_port
+ except IndexError:
+ raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
+
+ if onu.admin_state == "ADMIN_DISABLED":
+ return [False, "ONU has been manually disabled"]
+
+ if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
+ logging.warn("ONU disable as location don't match",
+ object=str(att_si),
+ serial_number=att_si.serial_number,
+ pon_port=pon_port.port_no,
+ whitelisted_pon_port=whitelisted.pon_port_id,
+ device_id=att_si.of_dpid,
+ whitelisted_device_id=whitelisted.device_id,
+ **att_si.tologdict())
+ return [False, "ONU activated in wrong location"]
+
+ return [True, "ONU has been validated"]
+
+def find_or_create_att_si(model_accessor, logging, event):
+ try:
+ att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+ serial_number=event["serialNumber"]
+ )
+ logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
+ except IndexError:
+ # create an AttWorkflowDriverServiceInstance, the validation will be
+ # triggered in the corresponding sync step
+ att_si = model_accessor.AttWorkflowDriverServiceInstance(
+ serial_number=event["serialNumber"],
+ of_dpid=event["deviceId"],
+ uni_port_id=long(event["portNumber"]),
+ # we assume there is only one AttWorkflowDriverService
+ owner=model_accessor.AttWorkflowDriverService.objects.first()
+ )
+ logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
+ return att_si
\ No newline at end of file
diff --git a/examples/att-workflow/att_service_instance_funcs.py b/examples/att-workflow/att_service_instance_funcs.py
new file mode 100644
index 0000000..df179f9
--- /dev/null
+++ b/examples/att-workflow/att_service_instance_funcs.py
@@ -0,0 +1,190 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from att_helpers import *
+
+# Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
+def process_onu_state(model_accessor, si):
+ [valid, message] = validate_onu(model_accessor, logging, si)
+ si.status_message = message
+ if valid:
+ si.admin_onu_state = "ENABLED"
+ update_onu(model_accessor, si.serial_number, "ENABLED")
+ else:
+ si.admin_onu_state = "DISABLED"
+ update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+# If the ONU has been disabled then we force re-authentication when it
+# is re-enabled.
+# Setting si.authentication_state = AWAITING:
+# -> subscriber status = "awaiting_auth"
+# -> service chain deleted
+# -> need authentication to restore connectivity after ONU enabled
+def process_auth_state(si):
+ auth_msgs = {
+ "AWAITING": " - Awaiting Authentication",
+ "REQUESTED": " - Authentication requested",
+ "STARTED": " - Authentication started",
+ "APPROVED": " - Authentication succeeded",
+ "DENIED": " - Authentication denied"
+ }
+ if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+ si.authentication_state = "AWAITING"
+ else:
+ si.status_message += auth_msgs[si.authentication_state]
+
+
+# The DhcpL2Relay ONOS app generates events that update the fields below.
+# It only sends events when it processes DHCP packets. It keeps no internal state.
+# We reset dhcp_state when:
+# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+# -> subscriber status = "awaiting_auth"
+# -> service chain not present
+# -> subscriber's OLT flow rules, xconnect not present
+# -> DHCP packets won't go through
+# Note, however, that the DHCP state at the endpoints is not changed.
+# A previously issued DHCP lease may still be valid.
+def process_dhcp_state(si):
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ si.ip_address = ""
+ si.mac_address = ""
+ si.dhcp_state = "AWAITING"
+
+
+# Make sure the object is in a legitimate state
+# It should be after the above processing steps
+# However this might still fail if an event has fired in the meantime
+# Valid states:
+# ONU | Auth | DHCP
+# ===============================
+# AWAITING | AWAITING | AWAITING
+# ENABLED | * | AWAITING
+# ENABLED | APPROVED | *
+# DISABLED | AWAITING | AWAITING
+def validate_states(si):
+ if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
+ "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+ return
+ if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+ return
+ logging.warning(
+ "MODEL_POLICY (validate_states): invalid state combination",
+ onu_state=si.admin_onu_state,
+ auth_state=si.authentication_state,
+ dhcp_state=si.dhcp_state)
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+ onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+ == serial_number.lower()][0]
+ if onu.admin_state == "ADMIN_DISABLED":
+ logging.debug(
+ "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+ (serial_number, admin_state))
+ return
+ if onu.admin_state == admin_state:
+ logging.debug(
+ "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+ (serial_number, admin_state))
+ else:
+ logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+ onu.admin_state = admin_state
+ onu.save_changed_fields(always_update_timestamp=True)
+
+
+def get_subscriber(model_accessor, serial_number):
+ try:
+ return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+ == serial_number.lower()][0]
+ except IndexError:
+ # If the subscriber doesn't exist we don't do anything
+ logging.debug(
+ "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
+ onu_device=serial_number)
+ return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+ # TODO check if the subscriber has an IP and update it,
+ # or create a new one
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
+ ip.save_changed_fields()
+ except IndexError:
+ logging.debug(
+ "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip = model_accessor.RCORDIpAddress(
+ subscriber_id=subscriber.id,
+ ip=ip,
+ description="DHCP Assigned IP Address"
+ )
+ ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+ try:
+ ip = model_accessor.RCORDIpAddress.objects.filter(
+ subscriber_id=subscriber.id,
+ ip=ip
+ )[0]
+ logging.debug(
+ "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+ onu_device=subscriber.onu_device,
+ subscriber_status=subscriber.status,
+ ip=ip)
+ ip.delete()
+ except BaseException:
+ logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+ cur_status = subscriber.status
+ # Don't change state if someone has disabled the subscriber
+ if subscriber.status != "disabled":
+ if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+ subscriber.status = "awaiting-auth"
+ elif si.authentication_state == "APPROVED":
+ subscriber.status = "enabled"
+ elif si.authentication_state == "DENIED":
+ subscriber.status = "auth-failed"
+
+ # NOTE we save the subscriber only if:
+ # - the status has changed
+ # - we get a DHCPACK event
+ if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+ logging.debug(
+ "MODEL_POLICY: updating subscriber",
+ onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state,
+ subscriber_status=subscriber.status)
+ if subscriber.status == "awaiting-auth":
+ delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = ""
+ elif si.ip_address and si.mac_address:
+ update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+ subscriber.mac_address = si.mac_address
+ subscriber.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
+ authentication_state=si.authentication_state, subscriber_status=subscriber.status)