blob: 825d673fc6501b726f964c2e4ec41965ef80450a [file] [log] [blame]
Richard Jankowski8af3c0e2018-08-14 16:07:18 -04001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
16from common.utils.asleep import asleep
17from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
18from consul import ConsulException
19from consul.twisted import Consul
20from structlog import get_logger
21from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
22
23log = get_logger()
24
25class ConsulClient(KVClient):
26
27 def __init__(self, kv_host, kv_port):
28 KVClient.__init__(self, kv_host, kv_port)
29 self.session_id = None
30 self.client = Consul(kv_host, kv_port)
31 self.watcher = None
32
33 @inlineCallbacks
34 def get(self, key, timeout=DEFAULT_TIMEOUT):
35 result = yield self._op_with_retry('GET', key, None, timeout)
36 returnValue(result)
37
38 @inlineCallbacks
39 def list(self, key, timeout=DEFAULT_TIMEOUT):
40 result = yield self._op_with_retry('LIST', key, None, timeout)
41 returnValue(result)
42
43 @inlineCallbacks
44 def put(self, key, value, timeout=DEFAULT_TIMEOUT):
45 _, err = yield self._op_with_retry('PUT', key, value, timeout)
46 returnValue(err)
47
48 @inlineCallbacks
49 def delete(self, key, timeout=DEFAULT_TIMEOUT):
50 _, err = yield self._op_with_retry('DELETE', key, None, timeout)
51 returnValue(err)
52
53 @inlineCallbacks
54 def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
55 result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
56 returnValue(result)
57
58 @inlineCallbacks
59 def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
60 result, err = yield self._op_with_retry('RENEW', key, None, timeout)
61 returnValue(err)
62
63 @inlineCallbacks
64 def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
65 result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
66 returnValue(err)
67
68 @inlineCallbacks
69 def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
70 result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
71 returnValue(err)
72
73 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
74 self._retriggering_watch(key, key_change_callback, timeout)
75
76 @inlineCallbacks
77 def _retriggering_watch(self, key, key_change_callback, timeout):
78 self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
79 yield self.key_watches[key].start()
80
81 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
82 if key in self.key_watches:
83 self.key_watches[key].stop()
84
85 @inlineCallbacks
86 def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
87 log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
88 err = None
89 result = None
90 while True:
91 try:
92 if operation == 'GET':
93 result = yield self._get(key, **kw)
94 elif operation == 'LIST':
95 result, err = yield self._list(key)
96 elif operation == 'PUT':
97 # Put returns a boolean response
98 result = yield self.client.kv.put(key, value)
99 if not result:
100 err = 'put-failed'
101 elif operation == 'DELETE':
102 # Delete returns a boolean response
103 result = yield self.client.kv.delete(key)
104 if not result:
105 err = 'delete-failed'
106 elif operation == 'RESERVE':
107 result, err = yield self._reserve(key, value, **kw)
108 elif operation == 'RENEW':
109 result, err = yield self._renew_reservation(key)
110 elif operation == 'RELEASE':
111 result, err = yield self._release_reservation(key)
112 elif operation == 'RELEASE-ALL':
113 err = yield self._release_all_reservations()
114 self._clear_backoff()
115 break
116 except ConsulException as ex:
117 if 'ConnectionRefusedError' in ex.message:
118 log.exception('comms-exception', ex=ex)
119 yield self._backoff('consul-not-up')
120 else:
121 log.error('consul-specific-exception', ex=ex)
122 err = ex
123 except Exception as ex:
124 log.error('consul-exception', ex=ex)
125 err = ex
126
127 if timeout > 0 and self.retry_time > timeout:
128 err = 'operation-timed-out'
129 if err is not None:
130 self._clear_backoff()
131 break
132
133 returnValue((result,err))
134
135 @inlineCallbacks
136 def _get(self, key, **kw):
137 kvp = None
138 index, rec = yield self.client.kv.get(key, **kw)
139 if rec is not None:
140 kvp = KVPair(rec['Key'], rec['Value'], index)
141 returnValue(kvp)
142
143 @inlineCallbacks
144 def _list(self, key):
145 err = None
146 list = []
147 index, recs = yield self.client.kv.get(key, recurse=True)
148 for rec in recs:
149 list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
150 returnValue((list, err))
151
152 @inlineCallbacks
153 def _reserve(self, key, value, **kw):
154 for name, val in kw.items():
155 if name == 'ttl':
156 ttl = val
157 break
158 reserved = False
159 err = 'reservation-failed'
160 owner = None
161
162 # Create a session
163 self.session_id = yield self.client.session.create(behavior='delete',
164 ttl=ttl) # lock_delay=1)
165 log.debug('create-session', id=self.session_id)
166 # Try to acquire the key
167 result = yield self.client.kv.put(key, value, acquire=self.session_id)
168 log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
169
170 # Check if reservation succeeded
171 index, record = yield self.client.kv.get(key)
172 if record is not None and 'Value' in record:
173 owner = record['Value']
174 log.debug('get-key', session=record['Session'], owner=owner)
175 if record['Session'] == self.session_id and owner == value:
176 reserved = True
177 log.debug('key-reserved', key=key, value=value, ttl=ttl)
178 # Add key to reservation list
179 self.key_reservations[key] = self.session_id
180 else:
181 log.debug('reservation-held-by-another', owner=owner)
182
183 if reserved:
184 err = None
185 returnValue((owner, err))
186
187 @inlineCallbacks
188 def _renew_reservation(self, key):
189 result = None
190 err = None
191 if key not in self.key_reservations:
192 err = 'key-not-reserved'
193 else:
194 session_id = self.key_reservations[key]
195 # A successfully renewed session returns an object with fields:
196 # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
197 # LockDelay, and Checks
198 result = yield self.client.session.renew(session_id=session_id)
199 log.debug('session-renew', result=result)
200 if result is None:
201 err = 'session-renewal-failed'
202 returnValue((result, err))
203
204 @inlineCallbacks
205 def _release_reservation(self, key):
206 err = None
207 if key not in self.key_reservations:
208 err = 'key-not-reserved'
209 else:
210 session_id = self.key_reservations[key]
211 # A successfully destroyed session returns a boolean result
212 success = yield self.client.session.destroy(session_id)
213 log.debug('session-destroy', result=success)
214 if not success:
215 err = 'session-destroy-failed'
216 self.session_id = None
217 self.key_reservations.pop(key)
218 returnValue((success, err))
219
220 @inlineCallbacks
221 def _release_all_reservations(self):
222 err = None
223 keys_to_delete = []
224 for key in self.key_reservations:
225 session_id = self.key_reservations[key]
226 # A successfully destroyed session returns a boolean result
227 success = yield self.client.session.destroy(session_id)
228 if not success:
229 err = 'session-destroy-failed'
230 log.debug('session-destroy', id=session_id, result=success)
231 self.session_id = None
232 keys_to_delete.append(key)
233 for key in keys_to_delete:
234 self.key_reservations.pop(key)
235 returnValue(err)
236
237
238class ConsulWatch():
239
240 def __init__(self, consul, key, callback, timeout):
241 self.client = consul
242 self.key = key
243 self.index = None
244 self.callback = callback
245 self.timeout = timeout
246 self.period = 60
247 self.running = True
248 self.retries = 0
249 self.retry_time = 0
250
251 @inlineCallbacks
252 def start(self):
253 self.running = True
254 index, rec = yield self._get_with_retry(self.key, None,
255 timeout=self.timeout)
256 self.index = str(index)
257
258 @inlineCallbacks
259 def _get(key, deferred):
260 try:
261 index, rec = yield self._get_with_retry(key, None,
262 timeout=self.timeout,
263 index=self.index)
264 self.index = str(index)
265 if not deferred.called:
266 log.debug('got-result-cancelling-deferred')
267 deferred.callback((self.index, rec))
268 except Exception as e:
269 log.exception('got-exception', e=e)
270
271 while self.running:
272 try:
273 rcvd = DeferredWithTimeout(timeout=self.period)
274 _get(self.key, rcvd)
275 try:
276 # Update index for next watch iteration
277 index, rec = yield rcvd
278 log.debug('event-received', index=index, rec=rec)
279 # Notify client of key change event
280 if rec is None:
281 # Key has been deleted
282 self._send_event(Event(Event.DELETE, self.key, None))
283 else:
284 self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
285 except TimeOutError as e:
286 log.debug('no-events-over-watch-period', key=self.key)
287 except Exception as e:
288 log.exception('exception', e=e)
289 except Exception as e:
290 log.exception('exception', e=e)
291
292 log.debug('close-watch', key=self.key)
293
294 def stop(self):
295 self.running = False
296 self.callback = None
297
298 @inlineCallbacks
299 def _get_with_retry(self, key, value, timeout, *args, **kw):
300 log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
301 err = None
302 result = None
303 while True:
304 try:
305 result = yield self.client.kv.get(key, **kw)
306 self._clear_backoff()
307 break
308 except ConsulException as ex:
309 err = ex
310 if 'ConnectionRefusedError' in ex.message:
311 self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
312 log.exception('comms-exception', ex=ex)
313 yield self._backoff('consul-not-up')
314 else:
315 log.error('consul-specific-exception', ex=ex)
316 except Exception as ex:
317 err = ex
318 log.error('consul-exception', ex=ex)
319
320 if timeout > 0 and self.retry_time > timeout:
321 err = 'operation-timed-out'
322 if err is not None:
323 self._clear_backoff()
324 break
325
326 returnValue(result)
327
328 def _send_event(self, event):
329 if self.callback is not None:
330 self.callback(event)
331
332 def _backoff(self, msg):
333 wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
334 self.retry_time += wait_time
335 self.retries += 1
336 log.error(msg, next_retry_in_secs=wait_time,
337 total_delay_in_secs = self.retry_time,
338 retries=self.retries)
339 return asleep(wait_time)
340
341 def _clear_backoff(self):
342 if self.retries:
343 log.debug('reconnected-to-kv', after_retries=self.retries)
344 self.retries = 0
345 self.retry_time = 0