blob: ac6f3a882dcc4070e770840eae9d7901c5f191cb [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Run
19
20This module implements Workflow Run interface
21"""
22
23import json
24import socketio
25
26from .countdown_latch import CountDownLatch
27from .utils import get_noop_logger, gen_id, gen_seq_id
28from .errors import ClientRPCError, ClientInputError, ClientResponseError
29
30WAIT_TIMEOUT = 10 # 10 seconds
31
32GREETING = 'cord.workflow.ctlsvc.greeting'
33WORKFLOW_RUN_UPDATE_STATUS = 'cord.workflow.ctlsvc.workflow.run.status'
34WORKFLOW_RUN_COUNT_EVENTS = 'cord.workflow.ctlsvc.workflow.run.count'
35WORKFLOW_RUN_FETCH_EVENT = 'cord.workflow.ctlsvc.workflow.run.fetch'
36WORKFLOW_RUN_NOTIFY_EVENT = 'cord.workflow.ctlsvc.workflow.run.notify'
37
38
39class WorkflowRun(object):
40 def __init__(self, workflow_id, workflow_run_id, logger=None, name=None):
41 self.sio = socketio.Client()
42 self.workflow_id = workflow_id
43 self.workflow_run_id = workflow_run_id
44
45 if logger:
46 self.logger = logger
47 else:
48 self.logger = get_noop_logger()
49
50 if name:
51 self.name = name
52 else:
53 self.name = 'workflow_run_%s' % gen_id()
54
55 self.req_id = gen_seq_id()
56
57 # set sio handlers
58 self.logger.debug('Setting event handlers to Socket.IO')
59 self.sio.on('connect', self.__on_sio_connect)
60 self.sio.on('disconnect', self.__on_sio_disconnect)
61 self.sio.on(GREETING, self.__on_greeting_message)
62 self.sio.on(WORKFLOW_RUN_UPDATE_STATUS, self.__on_update_status_message)
63 self.sio.on(WORKFLOW_RUN_COUNT_EVENTS, self.__on_count_events_message)
64 self.sio.on(WORKFLOW_RUN_FETCH_EVENT, self.__on_fetch_event_message)
65 self.sio.on(WORKFLOW_RUN_NOTIFY_EVENT, self.__on_notify_event_message)
66
67 self.handlers = {
68 'connect': self.__noop_connect_handler,
69 'disconnect': self.__noop_disconnect_handler,
70 'notify': self.__noop_notify_handler
71 }
72
73 # key is req_id
74 self.pending_requests = {}
75
76 def set_logger(self, logger):
77 self.logger = logger
78
79 def get_logger(self):
80 return self.logger
81
82 def __on_sio_connect(self):
83 self.logger.debug('connected to the server')
84 handler = self.handlers['connect']
85 if callable(handler):
86 handler()
87
88 def __noop_connect_handler(self):
89 self.logger.debug('no-op connect handler')
90
91 def __on_sio_disconnect(self):
92 self.logger.debug('disconnected from the server')
93 handler = self.handlers['disconnect']
94 if callable(handler):
95 handler()
96
97 def __noop_disconnect_handler(self):
98 self.logger.debug('no-op disconnect handler')
99
100 def __noop_notify_handler(self, workflow_id, workflow_run_id, topic):
101 self.logger.debug('no-op notify handler')
102
103 def __get_next_req_id(self):
104 req_id = self.req_id
105 self.req_id += 1
106 return req_id
107
108 def __on_greeting_message(self, data):
Illyoung Choi67e54e72019-07-25 10:44:59 -0700109 self.logger.debug('received a greeting message from the server')
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700110
111 def __on_notify_event_message(self, data):
112 """
113 Handler for a notify event
114 REQ = {
115 'topic': <topic>
116 }
117 """
118 self.logger.info('received a notify event message from the server')
119 topic = data['topic']
120
121 self.logger.info('a notify event message - topic (%s)' % topic)
122 if topic:
123 handler = self.handlers['notify']
124 if callable(handler):
125 self.logger.info('calling a notify event handler - %s' % handler)
126 handler(self.workflow_id, self.workflow_run_id, topic)
127
128 def __on_update_status_message(self, data):
129 self.__on_response(WORKFLOW_RUN_UPDATE_STATUS, data)
130
131 def __on_count_events_message(self, data):
132 self.__on_response(WORKFLOW_RUN_COUNT_EVENTS, data)
133
134 def __on_fetch_event_message(self, data):
135 self.__on_response(WORKFLOW_RUN_FETCH_EVENT, data)
136
137 def __check_pending_request(self, req_id):
138 """
139 Check a pending request
140 """
141 if req_id in self.pending_requests:
142 return True
143 return False
144
145 def __put_pending_request(self, api, params):
146 """
147 Put a pending request to a queue
148 """
149 req_id = self.__get_next_req_id()
150 latch = CountDownLatch()
151 params['req_id'] = req_id # inject req_id
152 self.sio.emit(api, params)
153 self.pending_requests[req_id] = {
154 'req_id': req_id,
155 'latch': latch,
156 'api': api,
157 'params': params,
158 'result': None
159 }
160 return req_id
161
162 def __wait_response(self, req_id):
163 """
164 Wait for completion of a request
165 """
166 if req_id in self.pending_requests:
167 req = self.pending_requests[req_id]
168 # python v 3.2 or below does not return a result
169 # that tells whether it is timedout or not
170 return req['latch'].wait(WAIT_TIMEOUT)
171 else:
172 self.logger.error(
173 'cannot find a pending request (%s) from a queue' % req_id
174 )
175 raise ClientRPCError(
176 req_id,
177 'cannot find a pending request (%s) from a queue' % req_id
178 )
179
180 def __complete_request(self, req_id, result):
181 """
182 Compelete a pending request
183 """
184 if req_id in self.pending_requests:
185 req = self.pending_requests[req_id]
186 req['latch'].count_down()
187 req['result'] = result
188 return
189
190 self.logger.error(
191 'cannot find a pending request (%s) from a queue' % req_id
192 )
193 raise ClientRPCError(
194 req_id,
195 'cannot find a pending request (%s) from a queue' % req_id
196 )
197
198 def __pop_pending_request(self, req_name):
199 """
200 Pop a pending request from a queue
201 """
202 return self.pending_requests.pop(req_name, None)
203
204 def connect(self, url):
205 """
206 Connect to the given url
207 """
208 query_string = 'id=%s&type=workflow_run&name=%s&workflow_id=%s&workflow_run_id=%s' % \
209 (self.name, self.name, self.workflow_id, self.workflow_run_id)
210 connect_url = '%s?%s' % (url, query_string)
211
212 if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
213 connect_url = 'http://%s' % connect_url
214
215 self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
216 self.sio.connect(url=connect_url, transports=['websocket'])
217
218 def disconnect(self):
219 """
220 Disconnect from the server
221 """
222 self.sio.disconnect()
223
224 def wait(self):
225 self.sio.wait()
226
227 def sleep(self, sec):
228 self.sio.sleep(sec)
229
230 def get_handlers(self):
231 return self.handlers
232
233 def set_handlers(self, new_handlers):
234 for k in self.handlers:
235 if k in new_handlers:
236 self.handlers[k] = new_handlers[k]
237
238 def __request(self, api, params={}):
239 if api and params:
240 req_id = self.__put_pending_request(api, params)
241 self.logger.debug('waiting for a response for req_id (%s)' % req_id)
242 self.__wait_response(req_id) # wait for completion
243 req = self.__pop_pending_request(req_id)
244 if req:
245 if req['latch'].get_count() > 0:
246 # timed out
247 self.logger.error('request (%s) timed out' % req_id)
248 raise ClientRPCError(
249 req_id,
250 'request (%s) timed out' % req_id
251 )
252 else:
253 return req['result']
254 else:
255 self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
256 raise ClientRPCError(
257 req_id,
258 'cannot find a pending request (%s) from a queue' % req_id
259 )
260 else:
261 self.logger.error(
262 'invalid arguments api (%s), params (%s)' %
263 (api, json.dumps(params))
264 )
265 raise ClientInputError(
266 'invalid arguments api (%s), params (%s)' %
267 (api, json.dumps(params))
268 )
269
270 def __on_response(self, api, result):
271 if result and 'req_id' in result:
272 self.logger.debug('completing a request (%s)' % result['req_id'])
273 self.__complete_request(result['req_id'], result)
274 else:
275 self.logger.error(
276 'invalid arguments api (%s), result (%s)' %
277 (api, json.dumps(result))
278 )
279 raise ClientInputError(
280 'invalid arguments api (%s), result (%s)' %
281 (api, json.dumps(result))
282 )
283
284 def update_status(self, task_id, status):
285 """
286 Update status of a workflow run.
287 """
288 if task_id and status:
289 result = self.__request(WORKFLOW_RUN_UPDATE_STATUS, {
290 'workflow_id': self.workflow_id,
291 'workflow_run_id': self.workflow_run_id,
292 'task_id': task_id,
293 'status': status
294 })
295 if result['error']:
296 self.logger.error(
297 'request (%s) failed with an error - %s' %
298 (result['req_id'], result['message'])
299 )
300 raise ClientResponseError(
301 'request (%s) failed with an error - %s' %
302 (result['req_id'], result['message'])
303 )
304 else:
305 return result['result']
306 else:
307 self.logger.error(
308 'invalid arguments task_id (%s) and status (%s)' %
309 (task_id, status)
310 )
311 raise ClientInputError(
312 'invalid arguments task_id (%s) and status (%s)' %
313 (task_id, status)
314 )
315
316 def count_events(self):
317 """
318 Count events.
319 """
320 result = self.__request(WORKFLOW_RUN_COUNT_EVENTS, {
321 'workflow_id': self.workflow_id,
322 'workflow_run_id': self.workflow_run_id
323 })
324 if result['error']:
325 self.logger.error(
326 'request (%s) failed with an error - %s' %
327 (result['req_id'], result['message'])
328 )
329 raise ClientResponseError(
330 'request (%s) failed with an error - %s' %
331 (result['req_id'], result['message'])
332 )
333 else:
334 return result['result']
335
336 def fetch_event(self, task_id, topic):
337 """
338 Fetch an event.
339 """
340 if task_id and topic:
341 result = self.__request(WORKFLOW_RUN_FETCH_EVENT, {
342 'workflow_id': self.workflow_id,
343 'workflow_run_id': self.workflow_run_id,
344 'task_id': task_id,
345 'topic': topic
346 })
347 if result['error']:
348 self.logger.error(
349 'request (%s) failed with an error - %s' %
350 (result['req_id'], result['message'])
351 )
352 raise ClientResponseError(
353 'request (%s) failed with an error - %s' %
354 (result['req_id'], result['message'])
355 )
356 else:
357 return result['result']
358 else:
359 self.logger.error(
360 'invalid arguments task_id (%s), topic (%s)' %
361 (task_id, topic)
362 )
363 raise ClientInputError(
364 'invalid arguments task_id (%s), topic (%s)' %
365 (task_id, topic)
366 )