blob: 6f9924ed3e4a77c7578f8c6440dee5cd8af87ad9 [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Kickstarter
19
20This module kickstarts Airflow workflows for requests from Workflow Controller
21"""
22
23import json
24import os.path
25import argparse
26import pyfiglet
27import traceback
28import socket
29import time
30
31from multistructlog import create_logger
32from cord_workflow_controller_client.manager import Manager
Illyoung Choi2e971512019-07-18 14:15:19 -070033from importlib import import_module
Illyoung Choife121d02019-07-16 10:47:41 -070034from urlparse import urlparse
Illyoung Choi2e971512019-07-18 14:15:19 -070035from airflow import configuration as AirflowConf
36from airflow import api
37from airflow.models import DagRun
Illyoung Choife121d02019-07-16 10:47:41 -070038
39
40log = create_logger()
41manager = None
42airflow_client = None
43
44progargs = {
45 'controller_url': 'http://localhost:3030',
Illyoung Choife121d02019-07-16 10:47:41 -070046 'logging': None
47}
48
49DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
50SOCKET_CONNECTION_TEST_TIMEOUT = 5
51DEFAULT_CONNECTION_TEST_DELAY = 5
52DEFAULT_CONNECTION_TEST_RETRY = 999999
53
54
55def print_graffiti():
56 result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
57 print(result)
58
59
60def get_arg_parser():
61 parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
62 parser.add_argument('--config', help='locate a configuration file')
63 parser.add_argument('--controller', help='CORD Workflow Controller URL')
Illyoung Choife121d02019-07-16 10:47:41 -070064 return parser
65
66
67def read_config(path):
68 if os.path.exists(path):
69 with open(path) as json_config_file:
70 data = json.load(json_config_file)
71 return data
72 return {}
73
74
75def pretty_format_json(j):
76 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
77 return dumps
78
79
80def is_port_open(url, timeout):
81 o = urlparse(url)
82 hostname = o.hostname
83 port = o.port
84
85 if (not port) or port <= 0:
86 if o.scheme.lower() == 'http':
87 port = 80
88 elif o.scheme.lower() == 'https':
89 port = 443
90
91 succeed = False
92 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
93 sock.settimeout(timeout)
94 try:
95 sock.connect((hostname, int(port)))
96 sock.shutdown(socket.SHUT_RDWR)
97 succeed = True
98 except BaseException:
99 pass
100 finally:
101 sock.close()
102
103 return succeed
104
105
106def check_web_live(url,
107 retry=DEFAULT_CONNECTION_TEST_RETRY,
108 delay=DEFAULT_CONNECTION_TEST_DELAY,
109 timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
110 ipup = False
111 for _ in range(retry):
112 if is_port_open(url, timeout):
113 ipup = True
114 break
115 else:
116 time.sleep(delay)
117 return ipup
118
119
120def on_kickstart(workflow_id, workflow_run_id):
121 if manager and airflow_client:
122 try:
123 log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
124
125 airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
Illyoung Choi2e971512019-07-18 14:15:19 -0700126 message = airflow_client.trigger_dag(
127 dag_id=workflow_id,
128 run_id=workflow_run_id
129 )
130 log.info('> Airflow Response: %s' % message)
Illyoung Choife121d02019-07-16 10:47:41 -0700131
132 # let controller know that the new workflow run is created
133 log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
134 manager.notify_new_workflow_run(workflow_id, workflow_run_id)
Illyoung Choi2e971512019-07-18 14:15:19 -0700135 except Exception as e:
136 log.error('> Error : %s' % e)
137 log.debug(traceback.format_exc())
Illyoung Choife121d02019-07-16 10:47:41 -0700138
Illyoung Choi2e971512019-07-18 14:15:19 -0700139
140def on_check_state(workflow_id, workflow_run_id):
141 if manager and airflow_client:
142 try:
143 log.info('> Checking state of a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
144
145 run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
146 state = 'unknown'
147 if run:
148 # run is an array
149 # this should be one of ['success', 'running', 'failed']
150 state = run[0].state
151 else:
152 log.error(
153 'Cannot retrieve state of a workflow run (%s, %s)' %
154 (workflow_id, workflow_run_id)
155 )
156 state = 'unknown'
157
158 log.info('> state : %s' % state)
159
160 # let controller know the state of the workflow run
161 log.info(
162 '> Notifying update of state of a workflow (%s), a workflow run (%s) - state : %s' %
163 (workflow_id, workflow_run_id, state)
164 )
165 manager.report_workflow_run_state(workflow_id, workflow_run_id, state)
Illyoung Choife121d02019-07-16 10:47:41 -0700166 except Exception as e:
167 log.error('> Error : %s' % e)
168 log.debug(traceback.format_exc())
169
170
171# for command-line execution
172def main(args):
173 print_graffiti()
174
175 # check if config path is set
176 config_file_path = DEFAULT_CONFIG_FILE_PATH
177 if args.config:
178 config_file_path = args.config
179
180 if os.path.exists(config_file_path):
181 # read config
182 config = read_config(config_file_path)
183 if config:
184 global progargs
185 for k in progargs:
186 # overwrite
187 progargs[k] = config[k]
188
189 global log
190 log = create_logger(progargs["logging"])
191
Illyoung Choife121d02019-07-16 10:47:41 -0700192 if args.controller:
193 progargs['controller_url'] = args.controller
194
Illyoung Choife121d02019-07-16 10:47:41 -0700195 print('=CONFIG=')
196 config_json_string = pretty_format_json(progargs)
197 print(config_json_string)
198 print('\n')
199
200 # checking controller and airflow web interface
201 log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
202 controller_live = check_web_live(progargs['controller_url'])
203 if not controller_live:
204 log.error('Controller (%s) appears to be down' % progargs['controller_url'])
Illyoung Choi2e971512019-07-18 14:15:19 -0700205 raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
Illyoung Choife121d02019-07-16 10:47:41 -0700206
207 # connect to workflow controller
208 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
209 global manager
210 manager = Manager(logger=log)
211 manager.connect(progargs['controller_url'])
212 manager.set_handlers({'kickstart': on_kickstart})
213
214 # connect to airflow
215 global airflow_client
Illyoung Choi2e971512019-07-18 14:15:19 -0700216 log.info('Connecting to Airflow...')
Illyoung Choife121d02019-07-16 10:47:41 -0700217
Illyoung Choi2e971512019-07-18 14:15:19 -0700218 api.load_auth()
219 api_module = import_module(AirflowConf.get('cli', 'api_client'))
220 airflow_client = api_module.Client(
221 api_base_url=AirflowConf.get('cli', 'endpoint_url'),
222 auth=api.api_auth.client_auth
223 )
Illyoung Choife121d02019-07-16 10:47:41 -0700224
225 log.info('Waiting for kickstart events from Workflow Controller...')
226 try:
227 manager.wait()
228 finally:
229 log.info('Terminating the program...')
230 manager.disconnect()
231
232
233if __name__ == "__main__":
234 parser = get_arg_parser()
235 args = parser.parse_args()
236 main(args)