blob: 789e797824ba2127eae6d44094f81a4b816bfff5 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair, RETRY_BACKOFF
16from python.common.utils.asleep import asleep
17from python.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
18from consul import ConsulException
19from consul.twisted import Consul
20from structlog import get_logger
21from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
22
23log = get_logger()
24
25class ConsulClient(KVClient):
26
27 def __init__(self, kv_host, kv_port):
28 KVClient.__init__(self, kv_host, kv_port)
29 self.session_id = None
30 self.client = Consul(kv_host, kv_port)
31
32 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
33 self._retriggering_watch(key, key_change_callback, timeout)
34
35 @inlineCallbacks
36 def _retriggering_watch(self, key, key_change_callback, timeout):
37 self.key_watches[key] = ConsulWatch(self.client, key, key_change_callback, timeout)
38 yield self.key_watches[key].start()
39
40 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
41 if key in self.key_watches:
42 self.key_watches[key].stop()
43
44 @inlineCallbacks
45 def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
46 log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
47 err = None
48 result = None
49 while True:
50 try:
51 if operation == 'GET':
52 result = yield self._get(key, **kw)
53 elif operation == 'LIST':
54 result, err = yield self._list(key)
55 elif operation == 'PUT':
56 # Put returns a boolean response
57 result = yield self.client.kv.put(key, value)
58 if not result:
59 err = 'put-failed'
60 elif operation == 'DELETE':
61 # Delete returns a boolean response
62 result = yield self.client.kv.delete(key)
63 if not result:
64 err = 'delete-failed'
65 elif operation == 'RESERVE':
66 result, err = yield self._reserve(key, value, **kw)
67 elif operation == 'RENEW':
68 result, err = yield self._renew_reservation(key)
69 elif operation == 'RELEASE':
70 result, err = yield self._release_reservation(key)
71 elif operation == 'RELEASE-ALL':
72 err = yield self._release_all_reservations()
73 self._clear_backoff()
74 break
75 except ConsulException as ex:
76 if 'ConnectionRefusedError' in ex.message:
77 log.exception('comms-exception', ex=ex)
78 yield self._backoff('consul-not-up')
79 else:
80 log.error('consul-specific-exception', ex=ex)
81 err = ex
82 except Exception as ex:
83 log.error('consul-exception', ex=ex)
84 err = ex
85
86 if timeout > 0 and self.retry_time > timeout:
87 err = 'operation-timed-out'
88 if err is not None:
89 self._clear_backoff()
90 break
91
92 returnValue((result,err))
93
94 @inlineCallbacks
95 def _get(self, key, **kw):
96 kvp = None
97 index, rec = yield self.client.kv.get(key, **kw)
98 if rec is not None:
99 kvp = KVPair(rec['Key'], rec['Value'], index)
100 returnValue(kvp)
101
102 @inlineCallbacks
103 def _list(self, key):
104 err = None
105 list = []
106 index, recs = yield self.client.kv.get(key, recurse=True)
107 for rec in recs:
108 list.append(KVPair(rec['Key'], rec['Value'], rec['ModifyIndex']))
109 returnValue((list, err))
110
111 @inlineCallbacks
112 def _reserve(self, key, value, **kw):
113 for name, val in kw.items():
114 if name == 'ttl':
115 ttl = val
116 break
117 reserved = False
118 err = 'reservation-failed'
119 owner = None
120
121 # Create a session
122 self.session_id = yield self.client.session.create(behavior='delete',
123 ttl=ttl) # lock_delay=1)
124 log.debug('create-session', id=self.session_id)
125 # Try to acquire the key
126 result = yield self.client.kv.put(key, value, acquire=self.session_id)
127 log.debug('key-acquire', key=key, value=value, sess=self.session_id, result=result)
128
129 # Check if reservation succeeded
130 index, record = yield self.client.kv.get(key)
131 if record is not None and 'Value' in record:
132 owner = record['Value']
133 log.debug('get-key', session=record['Session'], owner=owner)
134 if record['Session'] == self.session_id and owner == value:
135 reserved = True
136 log.debug('key-reserved', key=key, value=value, ttl=ttl)
137 # Add key to reservation list
138 self.key_reservations[key] = self.session_id
139 else:
140 log.debug('reservation-held-by-another', owner=owner)
141
142 if reserved:
143 err = None
144 returnValue((owner, err))
145
146 @inlineCallbacks
147 def _renew_reservation(self, key):
148 result = None
149 err = None
150 if key not in self.key_reservations:
151 err = 'key-not-reserved'
152 else:
153 session_id = self.key_reservations[key]
154 # A successfully renewed session returns an object with fields:
155 # Node, CreateIndex, Name, ModifyIndex, ID, Behavior, TTL,
156 # LockDelay, and Checks
157 result = yield self.client.session.renew(session_id=session_id)
158 log.debug('session-renew', result=result)
159 if result is None:
160 err = 'session-renewal-failed'
161 returnValue((result, err))
162
163 @inlineCallbacks
164 def _release_reservation(self, key):
165 err = None
166 if key not in self.key_reservations:
167 err = 'key-not-reserved'
168 else:
169 session_id = self.key_reservations[key]
170 # A successfully destroyed session returns a boolean result
171 success = yield self.client.session.destroy(session_id)
172 log.debug('session-destroy', result=success)
173 if not success:
174 err = 'session-destroy-failed'
175 self.session_id = None
176 self.key_reservations.pop(key)
177 returnValue((success, err))
178
179 @inlineCallbacks
180 def _release_all_reservations(self):
181 err = None
182 keys_to_delete = []
183 for key in self.key_reservations:
184 session_id = self.key_reservations[key]
185 # A successfully destroyed session returns a boolean result
186 success = yield self.client.session.destroy(session_id)
187 if not success:
188 err = 'session-destroy-failed'
189 log.debug('session-destroy', id=session_id, result=success)
190 self.session_id = None
191 keys_to_delete.append(key)
192 for key in keys_to_delete:
193 self.key_reservations.pop(key)
194 returnValue(err)
195
196
197class ConsulWatch():
198
199 def __init__(self, consul, key, callback, timeout):
200 self.client = consul
201 self.key = key
202 self.index = None
203 self.callback = callback
204 self.timeout = timeout
205 self.period = 60
206 self.running = True
207 self.retries = 0
208 self.retry_time = 0
209
210 @inlineCallbacks
211 def start(self):
212 self.running = True
213 index, rec = yield self._get_with_retry(self.key, None,
214 timeout=self.timeout)
215 self.index = str(index)
216
217 @inlineCallbacks
218 def _get(key, deferred):
219 try:
220 index, rec = yield self._get_with_retry(key, None,
221 timeout=self.timeout,
222 index=self.index)
223 self.index = str(index)
224 if not deferred.called:
225 log.debug('got-result-cancelling-deferred')
226 deferred.callback((self.index, rec))
227 except Exception as e:
228 log.exception('got-exception', e=e)
229
230 while self.running:
231 try:
232 rcvd = DeferredWithTimeout(timeout=self.period)
233 _get(self.key, rcvd)
234 try:
235 # Update index for next watch iteration
236 index, rec = yield rcvd
237 log.debug('event-received', index=index, rec=rec)
238 # Notify client of key change event
239 if rec is None:
240 # Key has been deleted
241 self._send_event(Event(Event.DELETE, self.key, None))
242 else:
243 self._send_event(Event(Event.PUT, rec['Key'], rec['Value']))
244 except TimeOutError as e:
245 log.debug('no-events-over-watch-period', key=self.key)
246 except Exception as e:
247 log.exception('exception', e=e)
248 except Exception as e:
249 log.exception('exception', e=e)
250
251 log.debug('close-watch', key=self.key)
252
253 def stop(self):
254 self.running = False
255 self.callback = None
256
257 @inlineCallbacks
258 def _get_with_retry(self, key, value, timeout, *args, **kw):
259 log.debug('watch-period', key=key, period=self.period, timeout=timeout, args=args, kw=kw)
260 err = None
261 result = None
262 while True:
263 try:
264 result = yield self.client.kv.get(key, **kw)
265 self._clear_backoff()
266 break
267 except ConsulException as ex:
268 err = ex
269 if 'ConnectionRefusedError' in ex.message:
270 self._send_event(Event(Event.CONNECTION_DOWN, self.key, None))
271 log.exception('comms-exception', ex=ex)
272 yield self._backoff('consul-not-up')
273 else:
274 log.error('consul-specific-exception', ex=ex)
275 except Exception as ex:
276 err = ex
277 log.error('consul-exception', ex=ex)
278
279 if timeout > 0 and self.retry_time > timeout:
280 err = 'operation-timed-out'
281 if err is not None:
282 self._clear_backoff()
283 break
284
285 returnValue(result)
286
287 def _send_event(self, event):
288 if self.callback is not None:
289 self.callback(event)
290
291 def _backoff(self, msg):
292 wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
293 self.retry_time += wait_time
294 self.retries += 1
295 log.error(msg, next_retry_in_secs=wait_time,
296 total_delay_in_secs = self.retry_time,
297 retries=self.retries)
298 return asleep(wait_time)
299
300 def _clear_backoff(self):
301 if self.retries:
302 log.debug('reconnected-to-kv', after_retries=self.retries)
303 self.retries = 0
304 self.retry_time = 0