blob: e32215b57fe43a8cd3844ef3419a16e0fbcd6702 [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Control CLI
19
20This module kickstarts Airflow workflows for requests from Workflow Controller
21"""
22
23import json
24import os.path
25import argparse
26
27from multistructlog import create_logger
28from cord_workflow_controller_client.manager import Manager
29
30
31log = create_logger()
32progargs = {
33 'controller_url': 'http://localhost:3030',
34 'airflow_url': 'http://localhost:8080',
35 'airflow_username': '',
36 'airflow_password': '',
37 'logging': None
38}
39
40DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
41
42
43def get_arg_parser():
44 parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
45 parser.add_argument('--config', help='locate a configuration file')
46 parser.add_argument('--controller', help='CORD Workflow Controller URL')
47 parser.add_argument('--airflow', help='Airflow REST URL')
48 parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
49 parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
50 parser.add_argument('cmd', help='Command')
51 parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
52 return parser
53
54
55def read_config(path):
56 if os.path.exists(path):
57 with open(path) as json_config_file:
58 data = json.load(json_config_file)
59 return data
60 return {}
61
62
63def read_json_file(filename):
64 if filename:
65 with open(filename, 'r') as f:
66 return json.load(f)
67 return None
68
69
70def register_workflow(args):
71 # expect args should be a list of essence files
72 if not args:
73 raise 'no essence file is given'
74
75 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
76 manager = Manager(logger=log)
77 connected = False
78 results = []
79
80 try:
81 manager.connect(progargs['controller_url'])
82 connected = True
83
84 for essence_file in args:
85 if not os.path.exists(essence_file):
86 log.error('cannot find the essence file (%s)' % essence_file)
87 continue
88
89 essence = read_json_file(essence_file)
90 log.info('Registering an essence file (%s)...' % essence_file)
91 result = manager.register_workflow_essence(essence)
92 if result:
93 log.inof('registered an essence file (%s)' % essence_file)
94 else:
95 log.error('cannot register an essence file (%s)' % essence_file)
96
97 results.append(result)
98 finally:
99 if connected:
100 manager.disconnect()
101
102 return results
103
104
105# for command-line execution
106def main(args):
107 # check if config path is set
108 config_file_path = DEFAULT_CONFIG_FILE_PATH
109 if args.config:
110 config_file_path = args.config
111
112 if os.path.exists(config_file_path):
113 # read config
114 config = read_config(config_file_path)
115 if config:
116 global progargs
117 for k in progargs:
118 # overwrite
119 progargs[k] = config[k]
120
121 global log
122 log = create_logger(progargs["logging"])
123
124 if args.airflow:
125 progargs['airflow_url'] = args.airflow
126
127 if args.controller:
128 progargs['controller_url'] = args.controller
129
130 if args.airflow_user:
131 progargs['airflow_user'] = args.airflow_user
132
133 if args.airflow_passwd:
134 progargs['airflow_passwd'] = args.airflow_passwd
135
136 if args.cmd:
137 if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
138 results = register_workflow(args.cmd_args)
139 print(results)
140 else:
141 log.error('unknown command %s' % args.cmd)
142 raise 'unknown command %s' % args.cmd
143
144
145if __name__ == "__main__":
146 parser = get_arg_parser()
147 args = parser.parse_args()
148 main(args)