blob: 6df3b34e34436550044790eb208ca169169450c8 [file] [log] [blame]
Illyoung Choid1e4f5d2019-07-22 16:49:20 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
Illyoung Choi18e656a2019-07-30 11:27:36 -070017import time
Illyoung Choid1e4f5d2019-07-22 16:49:20 -070018from airflow.plugins_manager import AirflowPlugin
19from airflow.hooks.base_hook import BaseHook
20from airflow.operators.python_operator import PythonOperator
21from airflow.sensors.base_sensor_operator import BaseSensorOperator
22from airflow.utils.decorators import apply_defaults
23from cord_workflow_controller_client.workflow_run import WorkflowRun
24
25
26"""
27Airflow Hook
28"""
29
30
31class CORDWorkflowControllerException(Exception):
32 """
33 Alias for Exception.
34 """
35
36
37class CORDWorkflowControllerHook(BaseHook):
38 """
39 Hook for accessing CORD Workflow Controller
40 """
41
42 def __init__(
43 self,
44 workflow_id,
45 workflow_run_id,
46 controller_conn_id='cord_controller_default'):
47 super().__init__(source=None)
48 self.workflow_id = workflow_id
49 self.workflow_run_id = workflow_run_id
50 self.controller_conn_id = controller_conn_id
51
52 self.workflow_run_client = None
53
54 def __enter__(self):
55 return self
56
57 def __exit__(self, exc_type, exc_val, exc_tb):
58 if self.workflow_run_client is not None:
59 self.close_conn()
60
61 def get_conn(self):
62 """
63 Connect a Workflow Run client.
64 """
65 if self.workflow_run_client is None:
66 # find connection info from database or environment
67 # ENV: AIRFLOW_CONN_CORD_CONTROLLER_DEFAULT
68 connection_params = self.get_connection(self.controller_conn_id)
Illyoung Choi39262742019-07-23 13:28:00 -070069 # 'connection_params' has following fields
70 # schema
Illyoung Choid1e4f5d2019-07-22 16:49:20 -070071 # host
Illyoung Choi39262742019-07-23 13:28:00 -070072 # port
Illyoung Choid1e4f5d2019-07-22 16:49:20 -070073 # login - we don't use this yet
74 # password - we don't use this yet
75 try:
76 self.workflow_run_client = WorkflowRun(self.workflow_id, self.workflow_run_id)
Illyoung Choi39262742019-07-23 13:28:00 -070077 schema = connection_params.schema
78 if not schema:
79 schema = 'http'
80
81 host = connection_params.host
82 if not host:
83 host = 'localhost'
84
85 port = connection_params.port
86 if (not port) or (port <= 0):
87 port = 3030
88
89 url = '%s://%s:%s' % (schema, host, port)
90 self.workflow_run_client.connect(url)
Illyoung Choid1e4f5d2019-07-22 16:49:20 -070091 except BaseException as ex:
92 raise CORDWorkflowControllerException(ex)
93
94 return self.workflow_run_client
95
96 def close_conn(self):
97 """
98 Close the Workflow Run client
99 """
100 if self.workflow_run_client:
101 try:
102 self.workflow_run_client.disconnect()
103 except BaseException as ex:
104 raise CORDWorkflowControllerException(ex)
105
106 self.workflow_run_client = None
107
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700108 def count_events(self):
109 """
110 Count queued events for the workflow run.
111 """
112 client = self.get_conn()
113 try:
114 return client.count_events()
115 except BaseException as ex:
116 raise CORDWorkflowControllerException(ex)
117
118 def fetch_event(self, task_id, topic):
119 """
120 Fetch an event for the workflow run.
121 """
122 client = self.get_conn()
123 try:
124 return client.fetch_event(task_id, topic)
125 except BaseException as ex:
126 raise CORDWorkflowControllerException(ex)
127
128
129"""
130Airflow Operators
131"""
132
133
134class CORDModelOperator(PythonOperator):
135 """
136 Calls a python function with model accessor.
137 """
138
139 # SCARLET
140 # http://bootflat.github.io/color-picker.html
141 ui_color = '#cf3a24'
142
143 @apply_defaults
144 def __init__(
145 self,
146 python_callable,
147 cord_event_sensor_task_id=None,
148 op_args=None,
149 op_kwargs=None,
150 provide_context=True,
151 templates_dict=None,
152 templates_exts=None,
153 *args,
154 **kwargs
155 ):
156 super().__init__(
157 python_callable=python_callable,
158 op_args=op_args,
159 op_kwargs=op_kwargs,
160 provide_context=True,
161 templates_dict=templates_dict,
162 templates_exts=templates_exts,
163 *args,
164 **kwargs)
165 self.cord_event_sensor_task_id = cord_event_sensor_task_id
Illyoung Choi18e656a2019-07-30 11:27:36 -0700166 self.model_accessor = None
167
168 def create_model_accessor(self):
169 self.log.info("Creating model accessor...")
170 from xossynchronizer.modelaccessor import model_accessor
171 self.model_accessor = model_accessor
172
173 def wait_for_ready(self):
174 self.log.info("Waiting for model accessor to get ready...")
175 models_active = False
176 # wait = False
177 while not models_active:
178 try:
179 # variable is unused
180 _i = self.model_accessor.Site.objects.first() # noqa: F841
181 models_active = True
182 except Exception as e:
183 self.log.info("Exception", e=e)
184 self.log.info("Waiting for data model to come up before starting...")
185 time.sleep(10)
186 # wait = True
187
188 # if wait:
189 # # Safety factor, seeing that we stumbled waiting for the data model to come up.
190 # time.sleep(60)
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700191
192 def execute_callable(self):
Illyoung Choi4b5ebfd2019-08-01 10:39:06 -0700193 # TODO: NEED TO UNCOMMENT BELOW TWO LINES AFTER XOS_API CONFIGURATION
194 # comment out this two lines
Illyoung Choi18e656a2019-07-30 11:27:36 -0700195 # self.create_model_accessor()
196 # self.wait_for_ready()
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700197
198 message = None
199 if self.cord_event_sensor_task_id:
200 message = self.op_kwargs['ti'].xcom_pull(task_ids=self.cord_event_sensor_task_id)
201
Illyoung Choi18e656a2019-07-30 11:27:36 -0700202 new_op_kwargs = dict(self.op_kwargs, model_accessor=self.model_accessor, message=message)
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700203 return self.python_callable(*self.op_args, **new_op_kwargs)
204
205
206"""
207Airflow Sensors
208"""
209
210
211class CORDEventSensor(BaseSensorOperator):
212 # STEEL BLUE
213 # http://bootflat.github.io/color-picker.html
214 ui_color = '#4b77be'
215
216 @apply_defaults
217 def __init__(
218 self,
219 topic,
220 key_field,
221 controller_conn_id='cord_controller_default',
222 *args,
223 **kwargs):
224 super().__init__(*args, **kwargs)
225
Illyoung Choi39262742019-07-23 13:28:00 -0700226 self.log.debug('Initializing CORD EventSensor for topic %s' % topic)
227
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700228 self.topic = topic
229 self.key_field = key_field
230 self.controller_conn_id = controller_conn_id
231 self.message = None
232 self.hook = None
233
234 def __create_hook(self, context):
235 """
236 Return connection hook.
237 """
Illyoung Choi39262742019-07-23 13:28:00 -0700238 self.log.debug('Creating a hook for run_id %s' % context['dag_run'].run_id)
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700239 return CORDWorkflowControllerHook(self.dag_id, context['dag_run'].run_id, self.controller_conn_id)
240
241 def execute(self, context):
242 """
243 Overridden to allow messages to be passed to next tasks via XCOM
244 """
Illyoung Choi39262742019-07-23 13:28:00 -0700245 self.log.debug('Executing a task %s for run_id %s' % (self.task_id, context['dag_run'].run_id))
246
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700247 if self.hook is None:
248 self.hook = self.__create_hook(context)
249
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700250 super().execute(context)
251
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700252 self.hook.close_conn()
253 self.hook = None
254 return self.message
255
256 def poke(self, context):
257 # we need to use notification to immediately react at event
258 # https://github.com/apache/airflow/blob/master/airflow/sensors/base_sensor_operator.py#L122
259 self.log.info('Poking : trying to fetch a message with a topic %s', self.topic)
Illyoung Choi39262742019-07-23 13:28:00 -0700260
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700261 event = self.hook.fetch_event(self.task_id, self.topic)
262 if event:
263 self.message = event
264 return True
265 return False
266
267
268class CORDModelSensor(CORDEventSensor):
269 # SISKIN SPROUT YELLOW
270 # http://bootflat.github.io/color-picker.html
271 ui_color = '#7a942e'
272
273 @apply_defaults
274 def __init__(
275 self,
276 model_name,
277 key_field,
278 controller_conn_id='cord_controller_default',
279 *args,
280 **kwargs):
281 topic = 'datamodel.%s' % model_name
Illyoung Choi39262742019-07-23 13:28:00 -0700282 super().__init__(topic=topic, key_field=key_field, controller_conn_id=controller_conn_id, *args, **kwargs)
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700283
284
285"""
286Airflow Plugin Definition
287"""
288
289
290# Defining the plugin class
291class CORD_Workflow_Airflow_Plugin(AirflowPlugin):
Illyoung Choi39262742019-07-23 13:28:00 -0700292 name = "cord_workflow_plugin"
Illyoung Choid1e4f5d2019-07-22 16:49:20 -0700293 operators = [CORDModelOperator]
294 sensors = [CORDEventSensor, CORDModelSensor]
295 hooks = [CORDWorkflowControllerHook]
296 executors = []
297 macros = []
298 admin_views = []
299 flask_blueprints = []
300 menu_links = []
301 appbuilder_views = []
302 appbuilder_menu_items = []