blob: cba72e324029f1c47697f41fdef158547a9f3fe0 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2019-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Airflow Sensors
"""
from .hook import CORDWorkflowControllerHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class CORDEventSensor(BaseSensorOperator):
# STEEL BLUE
# http://bootflat.github.io/color-picker.html
ui_color = '#4b77be'
@apply_defaults
def __init__(
self,
topic,
key_field,
controller_conn_id='cord_controller_default',
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.topic = topic
self.key_field = key_field
self.controller_conn_id = controller_conn_id
self.message = None
self.hook = None
def __create_hook(self, context):
"""
Return connection hook.
"""
return CORDWorkflowControllerHook(self.dag_id, context['dag_run'].run_id, self.controller_conn_id)
def execute(self, context):
"""
Overridden to allow messages to be passed to next tasks via XCOM
"""
if self.hook is None:
self.hook = self.__create_hook(context)
self.hook.update_status(self.task_id, 'begin')
super().execute(context)
self.hook.update_status(self.task_id, 'end')
self.hook.close_conn()
self.hook = None
return self.message
def poke(self, context):
# we need to use notification to immediately react at event
# https://github.com/apache/airflow/blob/master/airflow/sensors/base_sensor_operator.py#L122
self.log.info('Poking : trying to fetch a message with a topic %s', self.topic)
event = self.hook.fetch_event(self.task_id, self.topic)
if event:
self.message = event
return True
return False
class CORDModelSensor(CORDEventSensor):
# SISKIN SPROUT YELLOW
# http://bootflat.github.io/color-picker.html
ui_color = '#7a942e'
@apply_defaults
def __init__(
self,
model_name,
key_field,
controller_conn_id='cord_controller_default',
*args,
**kwargs):
topic = 'datamodel.%s' % model_name
super().__init__(topic=topic, *args, **kwargs)