blob: 751356af8f7d85d55c8af05f0da5fb1c768a6578 [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Control CLI
19
20This module kickstarts Airflow workflows for requests from Workflow Controller
21"""
22
23import json
24import os.path
25import argparse
Illyoung Choi39262742019-07-23 13:28:00 -070026import re
Illyoung Choife121d02019-07-16 10:47:41 -070027
28from multistructlog import create_logger
29from cord_workflow_controller_client.manager import Manager
Illyoung Choi39262742019-07-23 13:28:00 -070030from cord_workflow_controller_client.probe import Probe
Illyoung Choife121d02019-07-16 10:47:41 -070031
32
33log = create_logger()
34progargs = {
35 'controller_url': 'http://localhost:3030',
Illyoung Choife121d02019-07-16 10:47:41 -070036 'logging': None
37}
38
39DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
40
41
Illyoung Choi2e971512019-07-18 14:15:19 -070042class InputError(Exception):
43 """Exception raised for errors in the input.
44
45 Attributes:
46 message -- explanation of the error
47 """
48
49 def __init__(self, message):
50 self.message = message
51
52
Illyoung Choife121d02019-07-16 10:47:41 -070053def get_arg_parser():
54 parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
55 parser.add_argument('--config', help='locate a configuration file')
56 parser.add_argument('--controller', help='CORD Workflow Controller URL')
Illyoung Choife121d02019-07-16 10:47:41 -070057 parser.add_argument('cmd', help='Command')
58 parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
59 return parser
60
61
62def read_config(path):
63 if os.path.exists(path):
64 with open(path) as json_config_file:
65 data = json.load(json_config_file)
66 return data
67 return {}
68
69
70def read_json_file(filename):
71 if filename:
72 with open(filename, 'r') as f:
73 return json.load(f)
74 return None
75
76
Illyoung Choi39262742019-07-23 13:28:00 -070077def read_json_string(str):
78 if str:
79 try:
80 return json.loads(str)
81 except json.decoder.JSONDecodeError:
82 return load_dirty_json(str)
83 return None
84
85
86def load_dirty_json(dirty_json):
87 regex_replace = [
88 (r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'),
89 (r" False([, \}\]])", r' false\1'),
90 (r" True([, \}\]])", r' true\1')
91 ]
92 for r, s in regex_replace:
93 dirty_json = re.sub(r, s, dirty_json)
94 clean_json = json.loads(dirty_json)
95 return clean_json
96
97
Illyoung Choife121d02019-07-16 10:47:41 -070098def register_workflow(args):
99 # expect args should be a list of essence files
100 if not args:
Illyoung Choi2e971512019-07-18 14:15:19 -0700101 raise InputError('no essence file is given')
Illyoung Choife121d02019-07-16 10:47:41 -0700102
103 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
104 manager = Manager(logger=log)
105 connected = False
106 results = []
107
108 try:
109 manager.connect(progargs['controller_url'])
110 connected = True
111
112 for essence_file in args:
113 if not os.path.exists(essence_file):
114 log.error('cannot find the essence file (%s)' % essence_file)
115 continue
116
117 essence = read_json_file(essence_file)
118 log.info('Registering an essence file (%s)...' % essence_file)
119 result = manager.register_workflow_essence(essence)
120 if result:
Illyoung Choi39262742019-07-23 13:28:00 -0700121 log.info('registered an essence file (%s)' % essence_file)
Illyoung Choife121d02019-07-16 10:47:41 -0700122 else:
123 log.error('cannot register an essence file (%s)' % essence_file)
124
125 results.append(result)
126 finally:
127 if connected:
128 manager.disconnect()
129
130 return results
131
132
Illyoung Choi39262742019-07-23 13:28:00 -0700133def emit_event(args):
134 # expect args should be a json event
135 if not args or len(args) != 2:
136 raise InputError('parameter should be <topic> <message>')
137
138 log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
139 probe = Probe(logger=log)
140 connected = False
141
142 try:
143 probe.connect(progargs['controller_url'])
144 connected = True
145
146 topic = args[0]
147 message = read_json_string(args[1])
148
149 log.info('Emitting an event (%s - %s)...' % (topic, message))
150 probe.emit_event(topic, message)
151 log.info('Emitted an event (%s - %s)...' % (topic, message))
152 return True
153 finally:
154 if connected:
155 probe.disconnect()
156
157 return False
158
159
Illyoung Choife121d02019-07-16 10:47:41 -0700160# for command-line execution
161def main(args):
162 # check if config path is set
163 config_file_path = DEFAULT_CONFIG_FILE_PATH
164 if args.config:
165 config_file_path = args.config
166
167 if os.path.exists(config_file_path):
168 # read config
169 config = read_config(config_file_path)
170 if config:
171 global progargs
172 for k in progargs:
173 # overwrite
Illyoung Choi39262742019-07-23 13:28:00 -0700174 if k in config:
175 progargs[k] = config[k]
Illyoung Choife121d02019-07-16 10:47:41 -0700176
177 global log
178 log = create_logger(progargs["logging"])
179
Illyoung Choife121d02019-07-16 10:47:41 -0700180 if args.controller:
181 progargs['controller_url'] = args.controller
182
Illyoung Choife121d02019-07-16 10:47:41 -0700183 if args.cmd:
184 if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
185 results = register_workflow(args.cmd_args)
186 print(results)
Illyoung Choi39262742019-07-23 13:28:00 -0700187 elif args.cmd.strip().lower() in ['emit', 'send', 'event', 'message']:
188 results = emit_event(args.cmd_args)
189 print(results)
Illyoung Choife121d02019-07-16 10:47:41 -0700190 else:
191 log.error('unknown command %s' % args.cmd)
Illyoung Choi2e971512019-07-18 14:15:19 -0700192 raise InputError('unknown command %s' % args.cmd)
Illyoung Choife121d02019-07-16 10:47:41 -0700193
194
195if __name__ == "__main__":
196 parser = get_arg_parser()
197 args = parser.parse_args()
198 main(args)