blob: 95507443a939a42c37adb8d3509ced8ca638c6fd [file] [log] [blame]
Richard Jankowski8b277c22017-12-19 09:49:27 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Etcd-based coordinator services """
18
Richard Jankowski8b277c22017-12-19 09:49:27 -050019from structlog import get_logger
20from twisted.internet import reactor
21from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
22from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
23from zope.interface import implementer
24
25from leader import Leader
26from common.utils.asleep import asleep
27from common.utils.message_queue import MessageQueue
28from voltha.registry import IComponent
29from worker import Worker
Zack Williams18357ed2018-11-14 10:41:08 -070030from simplejson import dumps
Richard Jankowski8b277c22017-12-19 09:49:27 -050031from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
32
33log = get_logger()
34
35
36class StaleMembershipEntryException(Exception):
37 pass
38
39
40@implementer(IComponent)
41class CoordinatorEtcd(object):
42 """
43 An app shall instantiate only one Coordinator (singleton).
44 A single instance of this object shall take care of all external
Zack Williams18357ed2018-11-14 10:41:08 -070045 with etcd, and via etcd, all coordination activities with its
Richard Jankowski8b277c22017-12-19 09:49:27 -050046 clustered peers. Roles include:
Zack Williams18357ed2018-11-14 10:41:08 -070047 - registering an ephemeral membership entry (k/v record) in etcd
Richard Jankowski8b277c22017-12-19 09:49:27 -050048 - participating in a symmetric leader election, and potentially assuming
49 the leader's role. What leadership entails is not a concern for the
50 coordination, it simply instantiates (and shuts down) a leader class
51 when it gains (or looses) leadership.
52 """
53
54 CONNECT_RETRY_INTERVAL_SEC = 1
55 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
56
57 # Public methods:
58
59 def __init__(self,
60 internal_host_address,
61 external_host_address,
62 instance_id,
63 rest_port,
64 config,
Richard Jankowski4ea26632018-05-14 17:45:38 -040065 etcd='localhost:2379',
66 container_name_regex='^.*\.([0-9]+)\..*$'):
Richard Jankowski8b277c22017-12-19 09:49:27 -050067
68 log.info('initializing-coordinator')
69 self.config = config['coordinator']
70 self.worker_config = config['worker']
71 self.leader_config = config['leader']
72 self.membership_watch_relatch_delay = config.get(
73 'membership_watch_relatch_delay', 0.1)
74 self.tracking_loop_delay = self.config.get(
75 'tracking_loop_delay', 1)
76 self.session_renewal_timeout = self.config.get(
77 'session_renewal_timeout', 5)
78 self.session_renewal_loop_delay = self.config.get(
79 'session_renewal_loop_delay', 3)
80 self.membership_maintenance_loop_delay = self.config.get(
81 'membership_maintenance_loop_delay', 5)
82 self.session_time_to_live = self.config.get(
83 'session_time_to_live', 10)
84 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
85 self.leader_prefix = '/'.join((self.prefix, self.config.get(
86 self.config['leader_key'], 'leader')))
87 self.membership_prefix = '/'.join((self.prefix, self.config.get(
88 self.config['membership_key'], 'members'), ''))
89 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
90 self.config['assignment_key'], 'assignments'), ''))
91 self.workload_prefix = '/'.join((self.prefix, self.config.get(
92 self.config['workload_key'], 'work'), ''))
93 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
94 self.config['core_store_key'], 'data/core')))
95 self.core_store_assignment_key = self.core_store_prefix + \
96 '/assignment'
97 self.core_storage_suffix = 'core_store'
98
99 self.retries = 0
100 self.instance_id = instance_id
101 self.internal_host_address = internal_host_address
102 self.external_host_address = external_host_address
103 self.rest_port = rest_port
104 self.membership_record_key = self.membership_prefix + self.instance_id
105
106 self.lease = None
107 # session_id refers to either a Consul session ID or an Etcd lease object
108 self.session_id = None
109 self.i_am_leader = False
110 self.leader_id = None # will be the instance id of the current leader
111 self.shutting_down = False
112 self.leader = None
113 self.membership_callback = None
114
115 self.worker = Worker(self.instance_id, self)
116
Richard Jankowski8b277c22017-12-19 09:49:27 -0500117 # Create etcd client
118 kv_host = etcd.split(':')[0].strip()
119 kv_port = etcd.split(':')[1].strip()
120 self.etcd_url = u'http://' + kv_host + u':' + kv_port
121 self.etcd = Client(reactor, self.etcd_url)
122
Richard Jankowski4ea26632018-05-14 17:45:38 -0400123 self.container_name_regex = container_name_regex
124
Richard Jankowski8b277c22017-12-19 09:49:27 -0500125 self.wait_for_leader_deferreds = []
126
127 self.peers_mapping_queue = MessageQueue()
128
129 def start(self):
130 log.debug('starting')
131 reactor.callLater(0, self._async_init)
132 log.info('started')
133 return self
134
135 @inlineCallbacks
136 def stop(self):
137 log.debug('stopping')
138 self.shutting_down = True
139 yield self._delete_session() # this will delete the leader lock too
140 yield self.worker.stop()
141 if self.leader is not None:
142 yield self.leader.stop()
143 self.leader = None
144 log.info('stopped')
145
146 def wait_for_a_leader(self):
147 """
148 Async wait till a leader is detected/elected. The deferred will be
149 called with the leader's instance_id
150 :return: Deferred.
151 """
152 d = Deferred()
153 if self.leader_id is not None:
154 d.callback(self.leader_id)
155 return d
156 else:
157 self.wait_for_leader_deferreds.append(d)
158 return d
159
160 # Wait for a core data id to be assigned to this voltha instance
161 @inlineCallbacks
162 def get_core_store_id_and_prefix(self):
163 core_store_id = yield self.worker.get_core_store_id()
164 returnValue((core_store_id, self.core_store_prefix))
165
166 def recv_peers_map(self):
167 return self.peers_mapping_queue.get()
168
169 def publish_peers_map_change(self, msg):
170 self.peers_mapping_queue.put(msg)
171
172 # Proxy methods for etcd with retry support
173
174 def kv_get(self, *args, **kw):
175 # Intercept 'index' argument
176 for name, value in kw.items():
177 if name == 'index':
178 kw.pop('index')
179 break
180 return self._retry('GET', *args, **kw)
181
182 def kv_put(self, *args, **kw):
183 return self._retry('PUT', *args, **kw)
184
185 def kv_delete(self, *args, **kw):
186 return self._retry('DELETE', *args, **kw)
187
188 # Methods exposing key membership information
189
190 @inlineCallbacks
191 def get_members(self):
192 """Return list of all members"""
193 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
194 returnValue([member['Key'][len(self.membership_prefix):]
195 for member in members])
196
197 # Private (internal) methods:
198
199 @inlineCallbacks
200 def _async_init(self):
201 yield self._create_session()
202 yield self._create_membership_record()
203 yield self._start_leader_tracking()
204 yield self.worker.start()
205
206 def _backoff(self, msg):
207 wait_time = self.RETRY_BACKOFF[min(self.retries,
208 len(self.RETRY_BACKOFF) - 1)]
209 self.retries += 1
210 log.error(msg, retry_in=wait_time)
211 return asleep(wait_time)
212
213 def _clear_backoff(self):
214 if self.retries:
Zack Williams18357ed2018-11-14 10:41:08 -0700215 log.info('reconnected-to-etcd', after_retries=self.retries)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500216 self.retries = 0
217
218 @inlineCallbacks
219 def _create_session(self):
220
221 @inlineCallbacks
222 def _create_session():
223 etcd = yield self.get_kv_client()
224 # Create etcd lease
225 self.lease = yield etcd.lease(self.session_time_to_live)
226 self.session_id = self.lease
227 log.info('created-etcd-lease', lease=self.session_id)
228 self._start_session_tracking()
229
230 yield self._retry(_create_session)
231
232 @inlineCallbacks
233 def _delete_session(self):
234 try:
235 yield self.lease.revoke()
236 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700237 log.exception('failed-to-delete-session %s' % e,
Richard Jankowski8b277c22017-12-19 09:49:27 -0500238 session_id=self.session_id)
239
240 @inlineCallbacks
241 def _create_membership_record(self):
242 yield self._do_create_membership_record_with_retries()
243 reactor.callLater(0, self._maintain_membership_record)
244
245 @inlineCallbacks
246 def _maintain_membership_record(self):
247 try:
248 while 1:
249 valid_membership = yield self._assert_membership_record_valid()
250 if not valid_membership:
251 log.info('recreating-membership-before',
252 session=self.session_id)
253 yield self._do_create_membership_record_with_retries()
254 log.info('recreating-membership-after',
255 session=self.session_id)
256 else:
257 log.debug('valid-membership', session=self.session_id)
258 # Async sleep before checking the membership record again
259 yield asleep(self.membership_maintenance_loop_delay)
260
261 except Exception, e:
262 log.exception('unexpected-error-leader-trackin', e=e)
263 finally:
264 # except in shutdown, the loop must continue (after a short delay)
265 if not self.shutting_down:
266 reactor.callLater(self.membership_watch_relatch_delay,
267 self._maintain_membership_record)
268
269 def _create_membership_record_data(self):
270 member_record = dict()
271 member_record['status'] = 'alive'
272 member_record['host_address'] = self.external_host_address
273 return member_record
274
275 @inlineCallbacks
276 def _assert_membership_record_valid(self):
277 try:
Zack Williams18357ed2018-11-14 10:41:08 -0700278 log.debug('membership-record-before')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500279 is_timeout, (_, record) = yield \
Zack Williams18357ed2018-11-14 10:41:08 -0700280 self.coordinator_get_with_timeout(
Richard Jankowski8b277c22017-12-19 09:49:27 -0500281 key=self.membership_record_key,
282 index=0,
283 timeout=5)
284 if is_timeout:
Zack Williams18357ed2018-11-14 10:41:08 -0700285 log.debug('timeout creating membership record in etcd, key: %s' %
286 self.membership_record_key)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500287 returnValue(False)
288
Zack Williams18357ed2018-11-14 10:41:08 -0700289 log.debug('membership-record-after', record=record)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500290 if record is None or \
291 'Session' not in record:
292 log.info('membership-record-change-detected',
293 old_session=self.session_id,
294 record=record)
295 returnValue(False)
296 else:
297 returnValue(True)
298 except Exception as e:
299 log.exception('membership-validation-exception', e=e)
300 returnValue(False)
301
302 @inlineCallbacks
303 def _do_create_membership_record_with_retries(self):
304 while 1:
305 log.info('recreating-membership', session=self.session_id)
306 result = yield self._retry(
307 'PUT',
308 self.membership_record_key,
309 dumps(self._create_membership_record_data()),
310 acquire=self.session_id)
311 if result:
312 log.info('new-membership-record-created',
313 session=self.session_id)
314 break
315 else:
316 log.warn('cannot-create-membership-record')
317 yield self._backoff('stale-membership-record')
318
319 def _start_session_tracking(self):
320 reactor.callLater(0, self._session_tracking_loop)
321
322 @inlineCallbacks
323 def _session_tracking_loop(self):
324
325 @inlineCallbacks
326 def _redo_session():
327 log.info('_redo_session-before')
328 yield self._delete_session()
329
330 # Create a new etcd connection/session with a new lease
331 try:
332 self.etcd = Client(reactor, self.etcd_url)
333 self.lease = yield self.etcd.lease(self.session_time_to_live)
334 self.session_id = self.lease
335 log.info('new-etcd-session', session=self.session_id)
336
337 except Exception as e:
338 log.exception('could-not-create-an-etcd-lease', e=e)
339
340 @inlineCallbacks
341 def _renew_session(m_callback):
342 try:
343 time_left = yield self.lease.remaining()
Zack Williams18357ed2018-11-14 10:41:08 -0700344 log.debug('_renew_session', time_left=time_left)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500345 result = yield self.lease.refresh()
Zack Williams18357ed2018-11-14 10:41:08 -0700346 log.debug('just-renewed-session', result=result)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500347 if not m_callback.called:
348 # Triggering callback will cancel the timeout timer
Zack Williams18357ed2018-11-14 10:41:08 -0700349 log.debug('trigger-callback-to-cancel-timeout-timer')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500350 m_callback.callback(result)
351 else:
352 # Timeout event has already been called. Just ignore
353 # this event
354 log.info('renew-called-after-timeout, etcd ref changed?')
355 except Exception, e:
356 # Let the invoking method receive a timeout
357 log.exception('could-not-renew-session', e=e)
358
359 try:
360 while 1:
361 log.debug('session-tracking-start')
362 rcvd = DeferredWithTimeout(
363 timeout=self.session_renewal_timeout)
364 _renew_session(rcvd)
365 try:
366 _ = yield rcvd
367 except TimeOutError as e:
368 log.info('session-renew-timeout', e=e)
369 # Redo the session
370 yield _redo_session()
371 except Exception as e:
372 log.exception('session-renew-exception', e=e)
373 else:
374 log.debug('successfully-renewed-session')
375
376 # Async sleep before the next session tracking
377 yield asleep(self.session_renewal_loop_delay)
378
379 except Exception as e:
380 log.exception('renew-exception', e=e)
381 finally:
382 reactor.callLater(self.session_renewal_loop_delay,
383 self._session_tracking_loop)
384
385 def _start_leader_tracking(self):
386 reactor.callLater(0, self._leadership_tracking_loop)
387
388 @inlineCallbacks
389 def _leadership_tracking_loop(self):
Richard Jankowski8b277c22017-12-19 09:49:27 -0500390 try:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400391 # Try to seize leadership via test-and-set operation.
392 # Success means the leader key was previously absent
393 # and was just re-created by this instance.
Richard Jankowski8b277c22017-12-19 09:49:27 -0500394
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400395 leader_prefix = bytes(self.leader_prefix)
396 txn = Transaction(
397 compare=[
398 CompVersion(leader_prefix, '==', 0)
399 ],
400 success=[
401 OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
402 OpGet(leader_prefix)
403 ],
404 failure=[]
405 )
406 newly_asserted = False
407 try:
408 result = yield self.etcd.submit(txn)
409 except Failed as failed:
410 # Leader key already present
411 pass
Richard Jankowski8b277c22017-12-19 09:49:27 -0500412 else:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400413 newly_asserted = True
414 log.info('leader-key-absent')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500415
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400416 # Confirm that the assertion succeeded by reading back
417 # the value of the leader key.
418 leader = None
419 result = yield self.etcd.get(leader_prefix)
420 if result.kvs:
421 kv = result.kvs[0]
422 leader = kv.value
Zack Williams18357ed2018-11-14 10:41:08 -0700423 log.debug('get-leader-key', leader=leader, instance=self.instance_id)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500424
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400425 if leader is None:
426 log.error('get-leader-failed')
427 elif leader == self.instance_id:
428 if newly_asserted:
429 log.info('leadership-seized')
430 yield self._assert_leadership()
431 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700432 log.debug('already-leader')
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400433 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700434 log.debug('leader-is-another', leader=leader)
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400435 yield self._assert_nonleadership(leader)
436
Zack Williams18357ed2018-11-14 10:41:08 -0700437 except Exception as e:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400438 log.exception('unexpected-error-leader-tracking', e=e)
439
440 finally:
441 # Except in shutdown, the loop must continue (after a short delay)
442 if not self.shutting_down:
443 reactor.callLater(self.tracking_loop_delay,
444 self._leadership_tracking_loop)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500445
446 @inlineCallbacks
447 def _assert_leadership(self):
448 """(Re-)assert leadership"""
449 if not self.i_am_leader:
450 self.i_am_leader = True
451 self._set_leader_id(self.instance_id)
452 yield self._just_gained_leadership()
453
454 @inlineCallbacks
455 def _assert_nonleadership(self, leader_id):
456 """(Re-)assert non-leader role"""
457
458 # update leader_id anyway
459 self._set_leader_id(leader_id)
460
461 if self.i_am_leader:
462 self.i_am_leader = False
463 yield self._just_lost_leadership()
464
465 def _set_leader_id(self, leader_id):
466 self.leader_id = leader_id
467 deferreds, self.wait_for_leader_deferreds = \
468 self.wait_for_leader_deferreds, []
469 for d in deferreds:
470 d.callback(leader_id)
471
472 def _just_gained_leadership(self):
473 log.info('became-leader')
474 self.leader = Leader(self)
475 return self.leader.start()
476
477 def _just_lost_leadership(self):
478 log.info('lost-leadership')
479 return self._halt_leader()
480
481 def _halt_leader(self):
482 if self.leader:
483 d = self.leader.stop()
484 self.leader = None
485 return d
486
487 def get_kv_client(self):
488 return self.etcd
489
490 @inlineCallbacks
491 def _retry(self, operation, *args, **kw):
492 prefix = False
Richard Jankowski4ea26632018-05-14 17:45:38 -0400493 keys_only = False
Richard Jankowski8b277c22017-12-19 09:49:27 -0500494 for name, value in kw.items():
495 if name == 'acquire':
496 lease = value
497 kw['lease'] = lease
498 kw.pop('acquire')
499 elif name == 'keys':
Richard Jankowski4ea26632018-05-14 17:45:38 -0400500 keys_only = True
501 prefix = True
502 keyset = KeySet(bytes(args[0]), prefix=True)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500503 kw['keys_only'] = True
504 kw.pop('keys')
505 elif name=='recurse':
Richard Jankowski8b277c22017-12-19 09:49:27 -0500506 prefix = True
507 keyset = KeySet(bytes(args[0]), prefix=True)
508 kw.pop('recurse')
Zack Williams18357ed2018-11-14 10:41:08 -0700509 log.debug('start-op', operation=operation, args=args, kw=kw)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500510
511 while 1:
512 try:
513 etcd = yield self.get_kv_client()
514 if operation == 'GET':
515 key = bytes(args[0])
516 # If multiple keys requested, return a list
517 # else return a single record
518 if not prefix:
519 index = 0
520 record = dict()
521 res = yield etcd.get(key, **kw)
522 if res.kvs:
523 if len(res.kvs) == 1:
524 kv = res.kvs[0]
525 index = kv.mod_revision
526 record['Key'] = kv.key
527 record['Value'] = kv.value
528 record['ModifyIndex'] = index
529 record['Session'] = self.lease.lease_id if self.lease else ''
530 result = (index, record)
531 else:
532 # Get values for all keys that match the prefix
Richard Jankowski4ea26632018-05-14 17:45:38 -0400533 # If keys_only requested, get only the keys
Richard Jankowski8b277c22017-12-19 09:49:27 -0500534 index = 0
535 records = []
Richard Jankowski4ea26632018-05-14 17:45:38 -0400536 keys = []
Richard Jankowski8b277c22017-12-19 09:49:27 -0500537 res = yield etcd.get(keyset, **kw)
538 if args[0] == 'service/voltha/assignments/':
539 log.info('assignments', result=res)
540 if res.kvs and len(res.kvs) > 0:
541 for kv in res.kvs:
542 # Which index should be returned? The max over all keys?
543 if kv.mod_revision > index:
544 index = kv.mod_revision
Richard Jankowski4ea26632018-05-14 17:45:38 -0400545 if keys_only:
546 keys.append(kv.key)
547 else:
548 rec = dict()
549 rec['Key'] = kv.key
550 rec['Value'] = kv.value
551 rec['ModifyIndex'] = kv.mod_revision
552 rec['Session'] = self.lease.lease_id if self.lease else ''
553 records.append(rec)
554 result = (index, keys) if keys_only else (index, records)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500555 elif operation == 'PUT':
556 key = bytes(args[0])
557 result = yield etcd.set(key, args[1], **kw)
558 elif operation == 'DELETE':
559 key = bytes(args[0])
Richard Jankowski4ea26632018-05-14 17:45:38 -0400560 result = yield etcd.delete(keyset)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500561 else:
562 # Default case - consider operation as a function call
563 result = yield operation(*args, **kw)
564 self._clear_backoff()
565 break
Zack Williams18357ed2018-11-14 10:41:08 -0700566 except Exception as e:
Richard Jankowski8b277c22017-12-19 09:49:27 -0500567 if not self.shutting_down:
568 log.exception(e)
Zack Williams18357ed2018-11-14 10:41:08 -0700569 yield self._backoff('etcd-unknown-error: %s' % e)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500570
Zack Williams18357ed2018-11-14 10:41:08 -0700571 log.debug('end-op', operation=operation, args=args, kw=kw)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500572 returnValue(result)
573
574 @inlineCallbacks
Zack Williams18357ed2018-11-14 10:41:08 -0700575 def coordinator_get_with_timeout(self, key, timeout, **kw):
Richard Jankowski8b277c22017-12-19 09:49:27 -0500576 """
577 Query etcd with a timeout
578 :param key: Key to query
579 :param timeout: timeout value
580 :param kw: additional key-value params
581 :return: (is_timeout, (index, result)).
582
583 The Consul version of this method performed a 'wait-type' get operation
584 that returned a result when the key's value had a ModifyIndex greater
585 than the 'index' argument. Not sure etcd supports this functionality.
586 """
587
588 # Intercept 'index' argument
589 for name, value in kw.items():
590 if name == 'index':
591 mod_revision = value
Zack Williams18357ed2018-11-14 10:41:08 -0700592 log.debug('coordinator-get-with-timeout-etcd',
593 index=mod_revision)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500594 kw.pop('index')
595 break
596
597 @inlineCallbacks
598 def _get(key, m_callback):
599 try:
600 (index, result) = yield self._retry('GET', key, **kw)
601 if index > mod_revision and not m_callback.called:
602 log.debug('got-result-cancelling-timer')
603 m_callback.callback((index, result))
604 except Exception as e:
605 log.exception('got-exception', e=e)
606
607 try:
608 rcvd = DeferredWithTimeout(timeout=timeout)
609 _get(key, rcvd)
610 try:
611 result = yield rcvd
612 log.debug('result-received', result=result)
613 returnValue((False, result))
614 except TimeOutError as e:
Zack Williams8d811fd2018-11-22 09:23:23 -0700615 log.debug('timeout-or-no-data-change', etcd_key=key)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500616 except Exception as e:
617 log.exception('exception', e=e)
618 except Exception as e:
619 log.exception('exception', e=e)
620
621 returnValue((True, (None, None)))