blob: bcbf3721c4ffb747e9b57e56f6cfaa8b48fee852 [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Probe
19
20This module implements Workflow Probe interface
21"""
22
23import json
24import socketio
25
Illyoung Choi67e54e72019-07-25 10:44:59 -070026from .countdown_latch import CountDownLatch
27from .utils import get_noop_logger, gen_id, gen_seq_id
28from .errors import ClientRPCError, ClientInputError, ClientResponseError
29
30WAIT_TIMEOUT = 10 # 10 seconds
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070031
32GREETING = 'cord.workflow.ctlsvc.greeting'
Illyoung Choi67e54e72019-07-25 10:44:59 -070033EVENT_EMIT = 'cord.workflow.ctlsvc.event.emit'
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070034
35
36class Probe(object):
37 def __init__(self, logger=None, name=None):
38 self.sio = socketio.Client()
39
40 if logger:
41 self.logger = logger
42 else:
43 self.logger = get_noop_logger()
44
45 if name:
46 self.name = name
47 else:
48 self.name = 'probe_%s' % gen_id()
49
Illyoung Choi67e54e72019-07-25 10:44:59 -070050 self.req_id = gen_seq_id()
51
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070052 # set sio handlers
53 self.logger.debug('Setting event handlers to Socket.IO')
54 self.sio.on('connect', self.__on_sio_connect)
55 self.sio.on('disconnect', self.__on_sio_disconnect)
56 self.sio.on(GREETING, self.__on_greeting_message)
Illyoung Choi67e54e72019-07-25 10:44:59 -070057 self.sio.on(EVENT_EMIT, self.__on_event_emit_message)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070058 self.handlers = {
59 'connect': self.__noop_connect_handler,
60 'disconnect': self.__noop_disconnect_handler
61 }
62
Illyoung Choi67e54e72019-07-25 10:44:59 -070063 # key is req_id
64 self.pending_requests = {}
65
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070066 def set_logger(self, logger):
67 self.logger = logger
68
69 def get_logger(self):
70 return self.logger
71
72 def __on_sio_connect(self):
73 self.logger.debug('connected to the server')
74 handler = self.handlers['connect']
75 if callable(handler):
76 handler()
77
78 def __noop_connect_handler(self):
79 self.logger.debug('no-op connect handler')
80
81 def __on_sio_disconnect(self):
82 self.logger.debug('disconnected from the server')
83 handler = self.handlers['disconnect']
84 if callable(handler):
85 handler()
86
87 def __noop_disconnect_handler(self):
88 self.logger.debug('no-op disconnect handler')
89
Illyoung Choi67e54e72019-07-25 10:44:59 -070090 def __get_next_req_id(self):
91 req_id = self.req_id
92 self.req_id += 1
93 return req_id
94
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070095 def __on_greeting_message(self, data):
96 self.logger.debug('received a greeting message from the server')
97
Illyoung Choi67e54e72019-07-25 10:44:59 -070098 def __on_event_emit_message(self, data):
99 self.__on_response(EVENT_EMIT, data)
100
101 def __check_pending_request(self, req_id):
102 """
103 Check a pending request
104 """
105 if req_id in self.pending_requests:
106 return True
107 return False
108
109 def __put_pending_request(self, api, params):
110 """
111 Put a pending request to a queue
112 """
113 req_id = self.__get_next_req_id()
114 latch = CountDownLatch()
115 params['req_id'] = req_id # inject req_id
116 self.sio.emit(api, params)
117 self.pending_requests[req_id] = {
118 'req_id': req_id,
119 'latch': latch,
120 'api': api,
121 'params': params,
122 'result': None
123 }
124 return req_id
125
126 def __wait_response(self, req_id):
127 """
128 Wait for completion of a request
129 """
130 if req_id in self.pending_requests:
131 req = self.pending_requests[req_id]
132 # python v 3.2 or below does not return a result
133 # that tells whether it is timedout or not
134 return req['latch'].wait(WAIT_TIMEOUT)
135 else:
136 self.logger.error(
137 'cannot find a pending request (%s) from a queue' % req_id
138 )
139 raise ClientRPCError(
140 req_id,
141 'cannot find a pending request (%s) from a queue' % req_id
142 )
143
144 def __complete_request(self, req_id, result):
145 """
146 Compelete a pending request
147 """
148 if req_id in self.pending_requests:
149 req = self.pending_requests[req_id]
150 req['latch'].count_down()
151 req['result'] = result
152 return
153
154 self.logger.error(
155 'cannot find a pending request (%s) from a queue' % req_id
156 )
157 raise ClientRPCError(
158 req_id,
159 'cannot find a pending request (%s) from a queue' % req_id
160 )
161
162 def __pop_pending_request(self, req_name):
163 """
164 Pop a pending request from a queue
165 """
166 return self.pending_requests.pop(req_name, None)
167
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700168 def connect(self, url):
169 """
170 Connect to the given url
171 """
172 query_string = 'id=%s&type=probe&name=%s' % (self.name, self.name)
173 connect_url = '%s?%s' % (url, query_string)
174
175 if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
176 connect_url = 'http://%s' % connect_url
177
178 self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
179 self.sio.connect(url=connect_url, transports=['websocket'])
180
181 def disconnect(self):
182 """
183 Disconnect from the server
184 """
185 self.sio.disconnect()
186
Illyoung Choi67e54e72019-07-25 10:44:59 -0700187 def wait(self):
188 self.sio.wait()
189
190 def sleep(self, sec):
191 self.sio.sleep(sec)
192
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700193 def get_handlers(self):
194 return self.handlers
195
196 def set_handlers(self, new_handlers):
197 for k in self.handlers:
198 if k in new_handlers:
199 self.handlers[k] = new_handlers[k]
200
Illyoung Choi67e54e72019-07-25 10:44:59 -0700201 def __request(self, api, params={}):
202 if api and params:
203 req_id = self.__put_pending_request(api, params)
204 self.logger.debug('waiting for a response for req_id (%s)' % req_id)
205 self.__wait_response(req_id) # wait for completion
206 req = self.__pop_pending_request(req_id)
207 if req:
208 if req['latch'].get_count() > 0:
209 # timed out
210 self.logger.error('request (%s) timed out' % req_id)
211 raise ClientRPCError(
212 req_id,
213 'request (%s) timed out' % req_id
214 )
215 else:
216 return req['result']
217 else:
218 self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
219 raise ClientRPCError(
220 req_id,
221 'cannot find a pending request (%s) from a queue' % req_id
222 )
223 else:
224 self.logger.error(
225 'invalid arguments api (%s), params (%s)' %
226 (api, json.dumps(params))
227 )
228 raise ClientInputError(
229 'invalid arguments api (%s), params (%s)' %
230 (api, json.dumps(params))
231 )
232
233 def __on_response(self, api, result):
234 if result and 'req_id' in result:
235 self.logger.debug('completing a request (%s)' % result['req_id'])
236 self.__complete_request(result['req_id'], result)
237 else:
238 self.logger.error(
239 'invalid arguments api (%s), result (%s)' %
240 (api, json.dumps(result))
241 )
242 raise ClientInputError(
243 'invalid arguments api (%s), result (%s)' %
244 (api, json.dumps(result))
245 )
246
247 def emit_event(self, topic, message):
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700248 """
249 Emit event to Workflow Controller
250 """
Illyoung Choi67e54e72019-07-25 10:44:59 -0700251 if topic and message:
252 result = self.__request(EVENT_EMIT, {
253 'topic': topic,
254 'message': message
255 })
256 if result['error']:
257 self.logger.error(
258 'request (%s) failed with an error - %s' %
259 (result['req_id'], result['message'])
260 )
261 raise ClientResponseError(
262 'request (%s) failed with an error - %s' %
263 (result['req_id'], result['message'])
264 )
265 else:
266 return result['result']
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700267 else:
268 self.logger.error(
Illyoung Choi67e54e72019-07-25 10:44:59 -0700269 'invalid arguments topic(%s), message(%s)' %
270 (topic, json.dumps(message))
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700271 )
272 raise ClientInputError(
Illyoung Choi67e54e72019-07-25 10:44:59 -0700273 'invalid arguments topic(%s), message(%s)' %
274 (topic, json.dumps(message))
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700275 )