blob: 2a0be45a363b1c9b200855ab80df74dbaa484544 [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Kickstarter
19
20This module kickstarts Airflow workflows for requests from Workflow Controller
21"""
22
23import json
24import os.path
25import argparse
26import pyfiglet
27import traceback
28import socket
29import time
30
31from multistructlog import create_logger
32from cord_workflow_controller_client.manager import Manager
33from airflow.api.client.json_client import Client as AirflowClient
34from requests.auth import HTTPBasicAuth
35from urlparse import urlparse
36
37
38log = create_logger()
39manager = None
40airflow_client = None
41
42progargs = {
43 'controller_url': 'http://localhost:3030',
44 'airflow_url': 'http://localhost:8080',
45 'airflow_username': '',
46 'airflow_password': '',
47 'logging': None
48}
49
50DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
51SOCKET_CONNECTION_TEST_TIMEOUT = 5
52DEFAULT_CONNECTION_TEST_DELAY = 5
53DEFAULT_CONNECTION_TEST_RETRY = 999999
54
55
56def print_graffiti():
57 result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
58 print(result)
59
60
61def get_arg_parser():
62 parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
63 parser.add_argument('--config', help='locate a configuration file')
64 parser.add_argument('--controller', help='CORD Workflow Controller URL')
65 parser.add_argument('--airflow', help='Airflow REST URL')
66 parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
67 parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
68 return parser
69
70
71def read_config(path):
72 if os.path.exists(path):
73 with open(path) as json_config_file:
74 data = json.load(json_config_file)
75 return data
76 return {}
77
78
79def pretty_format_json(j):
80 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
81 return dumps
82
83
84def is_port_open(url, timeout):
85 o = urlparse(url)
86 hostname = o.hostname
87 port = o.port
88
89 if (not port) or port <= 0:
90 if o.scheme.lower() == 'http':
91 port = 80
92 elif o.scheme.lower() == 'https':
93 port = 443
94
95 succeed = False
96 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
97 sock.settimeout(timeout)
98 try:
99 sock.connect((hostname, int(port)))
100 sock.shutdown(socket.SHUT_RDWR)
101 succeed = True
102 except BaseException:
103 pass
104 finally:
105 sock.close()
106
107 return succeed
108
109
110def check_web_live(url,
111 retry=DEFAULT_CONNECTION_TEST_RETRY,
112 delay=DEFAULT_CONNECTION_TEST_DELAY,
113 timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
114 ipup = False
115 for _ in range(retry):
116 if is_port_open(url, timeout):
117 ipup = True
118 break
119 else:
120 time.sleep(delay)
121 return ipup
122
123
124def on_kickstart(workflow_id, workflow_run_id):
125 if manager and airflow_client:
126 try:
127 log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
128
129 airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
130
131 # let controller know that the new workflow run is created
132 log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
133 manager.notify_new_workflow_run(workflow_id, workflow_run_id)
134
135 log.info('> OK')
136 except Exception as e:
137 log.error('> Error : %s' % e)
138 log.debug(traceback.format_exc())
139
140
141# for command-line execution
142def main(args):
143 print_graffiti()
144
145 # check if config path is set
146 config_file_path = DEFAULT_CONFIG_FILE_PATH
147 if args.config:
148 config_file_path = args.config
149
150 if os.path.exists(config_file_path):
151 # read config
152 config = read_config(config_file_path)
153 if config:
154 global progargs
155 for k in progargs:
156 # overwrite
157 progargs[k] = config[k]
158
159 global log
160 log = create_logger(progargs["logging"])
161
162 if args.airflow:
163 progargs['airflow_url'] = args.airflow
164
165 if args.controller:
166 progargs['controller_url'] = args.controller
167
168 if args.airflow_user:
169 progargs['airflow_user'] = args.airflow_user
170
171 if args.airflow_passwd:
172 progargs['airflow_passwd'] = args.airflow_passwd
173
174 print('=CONFIG=')
175 config_json_string = pretty_format_json(progargs)
176 print(config_json_string)
177 print('\n')
178
179 # checking controller and airflow web interface
180 log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
181 controller_live = check_web_live(progargs['controller_url'])
182 if not controller_live:
183 log.error('Controller (%s) appears to be down' % progargs['controller_url'])
184 raise 'Controller (%s) appears to be down' % progargs['controller_url']
185
186 log.info('Checking if Airflow (%s) is live...' % progargs['airflow_url'])
187 airflow_live = check_web_live(progargs['airflow_url'])
188 if not airflow_live:
189 log.error('Airflow (%s) appears to be down' % progargs['airflow_url'])
190 raise 'Airflow (%s) appears to be down' % progargs['airflow_url']
191
192 # connect to workflow controller
193 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
194 global manager
195 manager = Manager(logger=log)
196 manager.connect(progargs['controller_url'])
197 manager.set_handlers({'kickstart': on_kickstart})
198
199 # connect to airflow
200 global airflow_client
201 log.info('Connecting to Airflow (%s)...' % progargs['airflow_url'])
202 http_auth = None
203 if progargs['airflow_user'] and progargs['airflow_passwd']:
204 log.info('Using a username %s' % progargs['airflow_user'])
205 http_auth = HTTPBasicAuth(progargs['airflow_user'], progargs['airflow_passwd'])
206
207 airflow_client = AirflowClient(progargs['airflow_url'], auth=http_auth)
208
209 log.info('Waiting for kickstart events from Workflow Controller...')
210 try:
211 manager.wait()
212 finally:
213 log.info('Terminating the program...')
214 manager.disconnect()
215
216
217if __name__ == "__main__":
218 parser = get_arg_parser()
219 args = parser.parse_args()
220 main(args)