Implement workflow essence extractor
- Extract useful information from airflow workflow code
- Produce "essence" as a json output
- Output will be passed to workflow controller for workflow management

No source code change.

Change-Id: I01de9939fdf699522e81c369676c33c73a38b4bc
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..cd1437d
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,3 @@
+# Workflow examples
+
+This directory contains workflow examples.
diff --git a/examples/att-workflow/README.md b/examples/att-workflow/README.md
new file mode 100644
index 0000000..e211d2a
--- /dev/null
+++ b/examples/att-workflow/README.md
@@ -0,0 +1,5 @@
+# AT&T workflow example
+
+This is not a working version. This code is only to show how a new workflow implementation will look like.
+
+Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/examples/att-workflow/__init__.py b/examples/att-workflow/__init__.py
new file mode 100644
index 0000000..1eb71b1
--- /dev/null
+++ b/examples/att-workflow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/examples/att-workflow/att_dag.py b/examples/att-workflow/att_dag.py
new file mode 100644
index 0000000..7df5d03
--- /dev/null
+++ b/examples/att-workflow/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+    auth_event_handler >> auth_model_event_handler >> \
+    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/examples/att-workflow/att_helpers.py b/examples/att-workflow/att_helpers.py
new file mode 100644
index 0000000..2abd2ab
--- /dev/null
+++ b/examples/att-workflow/att_helpers.py
@@ -0,0 +1,79 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.steps.syncstep import DeferredException
+
+def validate_onu(model_accessor, logging, att_si):
+    """
+    This method validate an ONU against the whitelist and set the appropriate state.
+    It's expected that the deferred exception is managed in the caller method,
+    for example a model_policy or a sync_step.
+
+    :param att_si: AttWorkflowDriverServiceInstance
+    :return: [boolean, string]
+    """
+
+    oss_service = att_si.owner.leaf_model
+
+    # See if there is a matching entry in the whitelist.
+    matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+        owner_id=oss_service.id,
+    )
+    matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
+
+    if len(matching_entries) == 0:
+        logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
+        return [False, "ONU not found in whitelist"]
+
+    whitelisted = matching_entries[0]
+    try:
+        onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
+        pon_port = onu.pon_port
+    except IndexError:
+        raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
+
+    if onu.admin_state == "ADMIN_DISABLED":
+        return [False, "ONU has been manually disabled"]
+
+    if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
+        logging.warn("ONU disable as location don't match",
+                    object=str(att_si),
+                    serial_number=att_si.serial_number,
+                    pon_port=pon_port.port_no,
+                    whitelisted_pon_port=whitelisted.pon_port_id,
+                    device_id=att_si.of_dpid,
+                    whitelisted_device_id=whitelisted.device_id,
+                    **att_si.tologdict())
+        return [False, "ONU activated in wrong location"]
+
+    return [True, "ONU has been validated"]
+
+def find_or_create_att_si(model_accessor, logging, event):
+    try:
+        att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+            serial_number=event["serialNumber"]
+        )
+        logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
+    except IndexError:
+        # create an AttWorkflowDriverServiceInstance, the validation will be
+        # triggered in the corresponding sync step
+        att_si = model_accessor.AttWorkflowDriverServiceInstance(
+            serial_number=event["serialNumber"],
+            of_dpid=event["deviceId"],
+            uni_port_id=long(event["portNumber"]),
+            # we assume there is only one AttWorkflowDriverService
+            owner=model_accessor.AttWorkflowDriverService.objects.first()
+        )
+        logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
+    return att_si
\ No newline at end of file
diff --git a/examples/att-workflow/att_service_instance_funcs.py b/examples/att-workflow/att_service_instance_funcs.py
new file mode 100644
index 0000000..df179f9
--- /dev/null
+++ b/examples/att-workflow/att_service_instance_funcs.py
@@ -0,0 +1,190 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from att_helpers import *
+
+# Check the whitelist to see if the ONU is valid.  If it is, make sure that it's enabled.
+def process_onu_state(model_accessor, si):
+    [valid, message] = validate_onu(model_accessor, logging, si)
+    si.status_message = message
+    if valid:
+        si.admin_onu_state = "ENABLED"
+        update_onu(model_accessor, si.serial_number, "ENABLED")
+    else:
+        si.admin_onu_state = "DISABLED"
+        update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+# If the ONU has been disabled then we force re-authentication when it
+# is re-enabled.
+# Setting si.authentication_state = AWAITING:
+#   -> subscriber status = "awaiting_auth"
+#   -> service chain deleted
+#   -> need authentication to restore connectivity after ONU enabled
+def process_auth_state(si):
+    auth_msgs = {
+        "AWAITING": " - Awaiting Authentication",
+        "REQUESTED": " - Authentication requested",
+        "STARTED": " - Authentication started",
+        "APPROVED": " - Authentication succeeded",
+        "DENIED": " - Authentication denied"
+    }
+    if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+        si.authentication_state = "AWAITING"
+    else:
+        si.status_message += auth_msgs[si.authentication_state]
+
+
+# The DhcpL2Relay ONOS app generates events that update the fields below.
+# It only sends events when it processes DHCP packets.  It keeps no internal state.
+# We reset dhcp_state when:
+# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+#   -> subscriber status = "awaiting_auth"
+#   -> service chain not present
+#   -> subscriber's OLT flow rules, xconnect not present
+#   -> DHCP packets won't go through
+# Note, however, that the DHCP state at the endpoints is not changed.
+# A previously issued DHCP lease may still be valid.
+def process_dhcp_state(si):
+    if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+        si.ip_address = ""
+        si.mac_address = ""
+        si.dhcp_state = "AWAITING"
+
+
+# Make sure the object is in a legitimate state
+# It should be after the above processing steps
+# However this might still fail if an event has fired in the meantime
+# Valid states:
+# ONU       | Auth     | DHCP
+# ===============================
+# AWAITING  | AWAITING | AWAITING
+# ENABLED   | *        | AWAITING
+# ENABLED   | APPROVED | *
+# DISABLED  | AWAITING | AWAITING
+def validate_states(si):
+    if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
+            "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+        return
+    if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+        return
+    logging.warning(
+        "MODEL_POLICY (validate_states): invalid state combination",
+        onu_state=si.admin_onu_state,
+        auth_state=si.authentication_state,
+        dhcp_state=si.dhcp_state)
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+    onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+            == serial_number.lower()][0]
+    if onu.admin_state == "ADMIN_DISABLED":
+        logging.debug(
+            "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+            (serial_number, admin_state))
+        return
+    if onu.admin_state == admin_state:
+        logging.debug(
+            "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+            (serial_number, admin_state))
+    else:
+        logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+        onu.admin_state = admin_state
+        onu.save_changed_fields(always_update_timestamp=True)
+
+
+def get_subscriber(model_accessor, serial_number):
+    try:
+        return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+                == serial_number.lower()][0]
+    except IndexError:
+        # If the subscriber doesn't exist we don't do anything
+        logging.debug(
+            "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
+            onu_device=serial_number)
+        return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+    # TODO check if the subscriber has an IP and update it,
+    # or create a new one
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
+                            onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
+        ip.save_changed_fields()
+    except IndexError:
+        logging.debug(
+            "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip = model_accessor.RCORDIpAddress(
+            subscriber_id=subscriber.id,
+            ip=ip,
+            description="DHCP Assigned IP Address"
+        )
+        ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        logging.debug(
+            "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip.delete()
+    except BaseException:
+        logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+    cur_status = subscriber.status
+    # Don't change state if someone has disabled the subscriber
+    if subscriber.status != "disabled":
+        if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+            subscriber.status = "awaiting-auth"
+        elif si.authentication_state == "APPROVED":
+            subscriber.status = "enabled"
+        elif si.authentication_state == "DENIED":
+            subscriber.status = "auth-failed"
+
+        # NOTE we save the subscriber only if:
+        # - the status has changed
+        # - we get a DHCPACK event
+        if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+            logging.debug(
+                "MODEL_POLICY: updating subscriber",
+                onu_device=subscriber.onu_device,
+                authentication_state=si.authentication_state,
+                subscriber_status=subscriber.status)
+            if subscriber.status == "awaiting-auth":
+                delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = ""
+            elif si.ip_address and si.mac_address:
+                update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = si.mac_address
+            subscriber.save_changed_fields(always_update_timestamp=True)
+        else:
+            logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
+                              authentication_state=si.authentication_state, subscriber_status=subscriber.status)