blob: 157ecffd80e582f4bee12176277c1cfc42cca451 [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Run
19
20This module implements Workflow Run interface
21"""
22
23import json
24import socketio
25
26from .countdown_latch import CountDownLatch
27from .utils import get_noop_logger, gen_id, gen_seq_id
28from .errors import ClientRPCError, ClientInputError, ClientResponseError
29
30WAIT_TIMEOUT = 10 # 10 seconds
31
32GREETING = 'cord.workflow.ctlsvc.greeting'
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070033WORKFLOW_RUN_COUNT_EVENTS = 'cord.workflow.ctlsvc.workflow.run.count'
34WORKFLOW_RUN_FETCH_EVENT = 'cord.workflow.ctlsvc.workflow.run.fetch'
35WORKFLOW_RUN_NOTIFY_EVENT = 'cord.workflow.ctlsvc.workflow.run.notify'
36
37
38class WorkflowRun(object):
39 def __init__(self, workflow_id, workflow_run_id, logger=None, name=None):
40 self.sio = socketio.Client()
41 self.workflow_id = workflow_id
42 self.workflow_run_id = workflow_run_id
43
44 if logger:
45 self.logger = logger
46 else:
47 self.logger = get_noop_logger()
48
49 if name:
50 self.name = name
51 else:
52 self.name = 'workflow_run_%s' % gen_id()
53
54 self.req_id = gen_seq_id()
55
56 # set sio handlers
57 self.logger.debug('Setting event handlers to Socket.IO')
58 self.sio.on('connect', self.__on_sio_connect)
59 self.sio.on('disconnect', self.__on_sio_disconnect)
60 self.sio.on(GREETING, self.__on_greeting_message)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070061 self.sio.on(WORKFLOW_RUN_COUNT_EVENTS, self.__on_count_events_message)
62 self.sio.on(WORKFLOW_RUN_FETCH_EVENT, self.__on_fetch_event_message)
63 self.sio.on(WORKFLOW_RUN_NOTIFY_EVENT, self.__on_notify_event_message)
64
65 self.handlers = {
66 'connect': self.__noop_connect_handler,
67 'disconnect': self.__noop_disconnect_handler,
68 'notify': self.__noop_notify_handler
69 }
70
71 # key is req_id
72 self.pending_requests = {}
73
74 def set_logger(self, logger):
75 self.logger = logger
76
77 def get_logger(self):
78 return self.logger
79
80 def __on_sio_connect(self):
81 self.logger.debug('connected to the server')
82 handler = self.handlers['connect']
83 if callable(handler):
84 handler()
85
86 def __noop_connect_handler(self):
87 self.logger.debug('no-op connect handler')
88
89 def __on_sio_disconnect(self):
90 self.logger.debug('disconnected from the server')
91 handler = self.handlers['disconnect']
92 if callable(handler):
93 handler()
94
95 def __noop_disconnect_handler(self):
96 self.logger.debug('no-op disconnect handler')
97
98 def __noop_notify_handler(self, workflow_id, workflow_run_id, topic):
99 self.logger.debug('no-op notify handler')
100
101 def __get_next_req_id(self):
102 req_id = self.req_id
103 self.req_id += 1
104 return req_id
105
106 def __on_greeting_message(self, data):
Illyoung Choi67e54e72019-07-25 10:44:59 -0700107 self.logger.debug('received a greeting message from the server')
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700108
109 def __on_notify_event_message(self, data):
110 """
111 Handler for a notify event
112 REQ = {
113 'topic': <topic>
114 }
115 """
116 self.logger.info('received a notify event message from the server')
117 topic = data['topic']
118
119 self.logger.info('a notify event message - topic (%s)' % topic)
120 if topic:
121 handler = self.handlers['notify']
122 if callable(handler):
123 self.logger.info('calling a notify event handler - %s' % handler)
124 handler(self.workflow_id, self.workflow_run_id, topic)
125
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700126 def __on_count_events_message(self, data):
127 self.__on_response(WORKFLOW_RUN_COUNT_EVENTS, data)
128
129 def __on_fetch_event_message(self, data):
130 self.__on_response(WORKFLOW_RUN_FETCH_EVENT, data)
131
132 def __check_pending_request(self, req_id):
133 """
134 Check a pending request
135 """
136 if req_id in self.pending_requests:
137 return True
138 return False
139
140 def __put_pending_request(self, api, params):
141 """
142 Put a pending request to a queue
143 """
144 req_id = self.__get_next_req_id()
145 latch = CountDownLatch()
146 params['req_id'] = req_id # inject req_id
147 self.sio.emit(api, params)
148 self.pending_requests[req_id] = {
149 'req_id': req_id,
150 'latch': latch,
151 'api': api,
152 'params': params,
153 'result': None
154 }
155 return req_id
156
157 def __wait_response(self, req_id):
158 """
159 Wait for completion of a request
160 """
161 if req_id in self.pending_requests:
162 req = self.pending_requests[req_id]
163 # python v 3.2 or below does not return a result
164 # that tells whether it is timedout or not
165 return req['latch'].wait(WAIT_TIMEOUT)
166 else:
167 self.logger.error(
168 'cannot find a pending request (%s) from a queue' % req_id
169 )
170 raise ClientRPCError(
171 req_id,
172 'cannot find a pending request (%s) from a queue' % req_id
173 )
174
175 def __complete_request(self, req_id, result):
176 """
177 Compelete a pending request
178 """
179 if req_id in self.pending_requests:
180 req = self.pending_requests[req_id]
181 req['latch'].count_down()
182 req['result'] = result
183 return
184
185 self.logger.error(
186 'cannot find a pending request (%s) from a queue' % req_id
187 )
188 raise ClientRPCError(
189 req_id,
190 'cannot find a pending request (%s) from a queue' % req_id
191 )
192
193 def __pop_pending_request(self, req_name):
194 """
195 Pop a pending request from a queue
196 """
197 return self.pending_requests.pop(req_name, None)
198
199 def connect(self, url):
200 """
201 Connect to the given url
202 """
203 query_string = 'id=%s&type=workflow_run&name=%s&workflow_id=%s&workflow_run_id=%s' % \
204 (self.name, self.name, self.workflow_id, self.workflow_run_id)
205 connect_url = '%s?%s' % (url, query_string)
206
207 if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
208 connect_url = 'http://%s' % connect_url
209
210 self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
211 self.sio.connect(url=connect_url, transports=['websocket'])
212
213 def disconnect(self):
214 """
215 Disconnect from the server
216 """
217 self.sio.disconnect()
218
219 def wait(self):
220 self.sio.wait()
221
222 def sleep(self, sec):
223 self.sio.sleep(sec)
224
225 def get_handlers(self):
226 return self.handlers
227
228 def set_handlers(self, new_handlers):
229 for k in self.handlers:
230 if k in new_handlers:
231 self.handlers[k] = new_handlers[k]
232
233 def __request(self, api, params={}):
234 if api and params:
235 req_id = self.__put_pending_request(api, params)
236 self.logger.debug('waiting for a response for req_id (%s)' % req_id)
237 self.__wait_response(req_id) # wait for completion
238 req = self.__pop_pending_request(req_id)
239 if req:
240 if req['latch'].get_count() > 0:
241 # timed out
242 self.logger.error('request (%s) timed out' % req_id)
243 raise ClientRPCError(
244 req_id,
245 'request (%s) timed out' % req_id
246 )
247 else:
248 return req['result']
249 else:
250 self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
251 raise ClientRPCError(
252 req_id,
253 'cannot find a pending request (%s) from a queue' % req_id
254 )
255 else:
256 self.logger.error(
257 'invalid arguments api (%s), params (%s)' %
258 (api, json.dumps(params))
259 )
260 raise ClientInputError(
261 'invalid arguments api (%s), params (%s)' %
262 (api, json.dumps(params))
263 )
264
265 def __on_response(self, api, result):
266 if result and 'req_id' in result:
267 self.logger.debug('completing a request (%s)' % result['req_id'])
268 self.__complete_request(result['req_id'], result)
269 else:
270 self.logger.error(
271 'invalid arguments api (%s), result (%s)' %
272 (api, json.dumps(result))
273 )
274 raise ClientInputError(
275 'invalid arguments api (%s), result (%s)' %
276 (api, json.dumps(result))
277 )
278
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700279 def count_events(self):
280 """
281 Count events.
282 """
283 result = self.__request(WORKFLOW_RUN_COUNT_EVENTS, {
284 'workflow_id': self.workflow_id,
285 'workflow_run_id': self.workflow_run_id
286 })
287 if result['error']:
288 self.logger.error(
289 'request (%s) failed with an error - %s' %
290 (result['req_id'], result['message'])
291 )
292 raise ClientResponseError(
293 'request (%s) failed with an error - %s' %
294 (result['req_id'], result['message'])
295 )
296 else:
297 return result['result']
298
299 def fetch_event(self, task_id, topic):
300 """
301 Fetch an event.
302 """
303 if task_id and topic:
304 result = self.__request(WORKFLOW_RUN_FETCH_EVENT, {
305 'workflow_id': self.workflow_id,
306 'workflow_run_id': self.workflow_run_id,
307 'task_id': task_id,
308 'topic': topic
309 })
310 if result['error']:
311 self.logger.error(
312 'request (%s) failed with an error - %s' %
313 (result['req_id'], result['message'])
314 )
315 raise ClientResponseError(
316 'request (%s) failed with an error - %s' %
317 (result['req_id'], result['message'])
318 )
319 else:
320 return result['result']
321 else:
322 self.logger.error(
323 'invalid arguments task_id (%s), topic (%s)' %
324 (task_id, topic)
325 )
326 raise ClientInputError(
327 'invalid arguments task_id (%s), topic (%s)' %
328 (task_id, topic)
329 )