blob: 7c5a9ded54d401c10bd9d60492df7bcbace548a9 [file] [log] [blame]
Illyoung Choi39262742019-07-23 13:28:00 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Kickstarter
19
20This module kickstarts Airflow workflows for requests from Workflow Controller
21"""
22
23import json
24import os
25import os.path
26import argparse
27import pyfiglet
28import traceback
29import socket
30import time
31import subprocess
32
33from multistructlog import create_logger
34from cord_workflow_controller_client.manager import Manager
35
36# We can't use experimental APIs for managing workflows/workflow runs of Airflow
37# - REST API does not provide sufficient features at this version
38# - API_Client does not work if a caller is not in main thread
39
40# from importlib import import_module
41# from airflow import configuration as AirflowConf
42# from airflow import api
43# from airflow.models import DagRun
44
45try:
46 from urllib.parse import urlparse
47except ImportError:
48 from urlparse import urlparse
49
50log = create_logger()
51manager = None
52# airflow_client = None
53
54airflow_bin = os.getenv('AIRFLOW_BIN', '/usr/local/bin')
55
56progargs = {
57 'controller_url': 'http://localhost:3030',
58 'airflow_bin': airflow_bin,
59 'logging': None
60}
61
62DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
63SOCKET_CONNECTION_TEST_TIMEOUT = 5
64DEFAULT_CONNECTION_TEST_DELAY = 5
65DEFAULT_CONNECTION_TEST_RETRY = 999999
66
67
68def print_graffiti():
69 result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
70 print(result)
71
72
73def get_arg_parser():
74 parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
75 parser.add_argument('--config', help='locate a configuration file')
76 parser.add_argument('--controller', help='CORD Workflow Controller URL')
77 return parser
78
79
80def read_config(path):
81 if os.path.exists(path):
82 with open(path) as json_config_file:
83 data = json.load(json_config_file)
84 return data
85 return {}
86
87
88def pretty_format_json(j):
89 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
90 return dumps
91
92
93def is_port_open(url, timeout):
94 o = urlparse(url)
95 hostname = o.hostname
96 port = o.port
97
98 if (not port) or port <= 0:
99 if o.scheme.lower() == 'http':
100 port = 80
101 elif o.scheme.lower() == 'https':
102 port = 443
103
104 succeed = False
105 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106 sock.settimeout(timeout)
107 try:
108 sock.connect((hostname, int(port)))
109 sock.shutdown(socket.SHUT_RDWR)
110 succeed = True
111 except BaseException:
112 pass
113 finally:
114 sock.close()
115
116 return succeed
117
118
119def check_web_live(url,
120 retry=DEFAULT_CONNECTION_TEST_RETRY,
121 delay=DEFAULT_CONNECTION_TEST_DELAY,
122 timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
123 ipup = False
124 for _ in range(retry):
125 if is_port_open(url, timeout):
126 ipup = True
127 break
128 else:
129 time.sleep(delay)
130 return ipup
131
132
133def get_airflow_cli():
134 return os.path.join(progargs['airflow_bin'], 'airflow')
135
136
137def check_airflow_live():
138 try:
139 subprocess.check_call([get_airflow_cli(), 'list_dags'])
140 return True
141 except subprocess.CalledProcessError as e:
142 log.error(e)
143 return False
144
145
146def on_kickstart(workflow_id, workflow_run_id):
147 # if manager and airflow_client:
148 if manager:
149 try:
150 created = False
151 log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
152 # message = airflow_client.trigger_dag(
153 # dag_id=workflow_id,
154 # run_id=workflow_run_id
155 # )
156 # log.info('> Airflow Response: %s' % message)
157
158 output = subprocess.Popen(
159 [get_airflow_cli(), 'trigger_dag', '-r', workflow_run_id, workflow_id],
160 stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
161 encoding='utf8'
162 )
163
164 for line in output.stdout.readlines():
165 if 'Created <DagRun ' in line:
166 created = True
167 break
168
169 if created:
170 # let controller know that the new workflow run is created
171 log.info('> Notifying a new workflow run (%s) for a workflow (%s)' % (workflow_run_id, workflow_id))
172 manager.report_new_workflow_run(workflow_id, workflow_run_id)
173 except subprocess.CalledProcessError as e:
174 # when shell exited with non-zero code
175 log.error('> Error : %s' % e)
176 except Exception as e:
177 log.error('> Error : %s' % e)
178 log.debug(traceback.format_exc())
179
180
181def on_check_status(workflow_id, workflow_run_id):
182 # if manager and airflow_client:
183 if manager:
184 try:
185 status = 'unknown'
186 log.info('> Checking status of workflow run (%s)' % (workflow_run_id))
187
188 # run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
189 # status = 'unknown'
190 # if run:
191 # # run is an array
192 # # this should be one of ['success', 'running', 'failed']
193 # status = run[0].state
194 # else:
195 # log.error(
196 # 'Cannot retrieve status of a workflow run (%s, %s)' %
197 # (workflow_id, workflow_run_id)
198 # )
199 # status = 'unknown'
200
201 output = subprocess.Popen(
202 [get_airflow_cli(), 'list_dag_runs', workflow_id],
203 stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
204 encoding='utf8'
205 )
206
207 title = False
208 body = False
209 for line in output.stdout.readlines():
210 if 'DAG RUNS' in line:
211 title = True
212 elif title and ('--------' in line):
213 body = True
214 elif body:
215 # id | run_id | state | execution_date | state_date |
216 if workflow_run_id in line:
217 # found the line
218 # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
219 # 2019-07-25T21:35:06.242130+00:00 |
220 fields = line.split('|')
221 status = fields[2].strip()
222 break
223
224 log.info('> status : %s' % status)
225
226 # let controller know the status of the workflow run
227 log.info(
228 '> Updating status of a workflow run (%s) - status : %s' %
229 (workflow_run_id, status)
230 )
231 manager.report_workflow_run_status(workflow_id, workflow_run_id, status)
232 except subprocess.CalledProcessError as e:
233 # when shell exited with non-zero code
234 log.error('> Error : %s' % e)
235 except Exception as e:
236 log.error('> Error : %s' % e)
237 log.debug(traceback.format_exc())
238
239
240def on_check_status_bulk(requests):
241 # if manager and airflow_client:
242 if requests:
243 req = {}
244 for req in requests:
245 workflow_id = req['workflow_id']
246 workflow_run_id = req['workflow_run_id']
247
248 if workflow_id not in req:
249 req[workflow_id] = []
250
251 req[workflow_id].append(workflow_run_id)
252
253 if manager:
254 try:
255 log.info('> Checking status of workflow runs')
256
257 statuses = []
258 for workflow_id in req:
259 workflow_run_ids = req[workflow_id]
260
261 output = subprocess.Popen(
262 [get_airflow_cli(), 'list_dag_runs', workflow_id],
263 stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
264 encoding='utf8'
265 )
266
267 title = False
268 body = False
269 for line in output.stdout.readlines():
270 if 'DAG RUNS' in line:
271 title = True
272 elif title and ('--------' in line):
273 body = True
274 elif body:
275 # id | run_id | state | execution_date | state_date |
276 for workflow_run_id in workflow_run_ids:
277 if workflow_run_id in line:
278 # found the line
279 # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
280 # 2019-07-25T21:35:06.242130+00:00 |
281 fields = line.split('|')
282 status = fields[2].strip()
283
284 log.info('> status of a workflow run (%s) : %s' % (workflow_run_id, status))
285 statuses.append({
286 'workflow_id': workflow_id,
287 'workflow_run_id': workflow_run_id,
288 'status': status
289 })
290
291 # let controller know statuses of workflow runs
292 log.info('> Updating status of workflow runs')
293 manager.report_workflow_run_status_bulk(statuses)
294 except subprocess.CalledProcessError as e:
295 # when shell exited with non-zero code
296 log.error('> Error : %s' % e)
297 except Exception as e:
298 log.error('> Error : %s' % e)
299 log.debug(traceback.format_exc())
300
301
302# for command-line execution
303def main(args):
304 print_graffiti()
305
306 # check if config path is set
307 config_file_path = DEFAULT_CONFIG_FILE_PATH
308 if args.config:
309 config_file_path = args.config
310
311 if os.path.exists(config_file_path):
312 # read config
313 config = read_config(config_file_path)
314 if config:
315 global progargs
316 for k in progargs:
317 # overwrite
318 if k in config:
319 progargs[k] = config[k]
320
321 global log
322 log = create_logger(progargs["logging"])
323
324 if args.controller:
325 progargs['controller_url'] = args.controller
326
327 print('=CONFIG=')
328 config_json_string = pretty_format_json(progargs)
329 print(config_json_string)
330 print('\n')
331
332 # checking controller and airflow web interface
333 log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
334 controller_live = check_web_live(progargs['controller_url'])
335 if not controller_live:
336 log.error('Controller (%s) appears to be down' % progargs['controller_url'])
337 raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
338
339 airflow_live = check_airflow_live()
340 if not airflow_live:
341 log.error('Airflow appears to be down')
342 raise IOError('Airflow appears to be down')
343
344 # connect to workflow controller
345 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
346 global manager
347 manager = Manager(logger=log)
348 manager.connect(progargs['controller_url'])
349 manager.set_handlers({
350 'kickstart': on_kickstart,
351 'check_status': on_check_status,
352 'check_status_bulk': on_check_status_bulk
353 })
354
355 # connect to airflow
356 # global airflow_client
357 # log.info('Connecting to Airflow...')
358
359 # api.load_auth()
360 # api_module = import_module(AirflowConf.get('cli', 'api_client'))
361 # airflow_client = api_module.Client(
362 # api_base_url=AirflowConf.get('cli', 'endpoint_url'),
363 # auth=api.api_auth.client_auth
364 # )
365
366 log.info('Waiting for kickstart events from Workflow Controller...')
367 try:
368 manager.wait()
369 finally:
370 log.info('Terminating the program...')
371 manager.disconnect()
372
373
374if __name__ == "__main__":
375 parser = get_arg_parser()
376 args = parser.parse_args()
377 main(args)