blob: 8469050d5a180d2b0e3888b741d695a11c36bd6f [file] [log] [blame]
Richard Jankowski8b277c22017-12-19 09:49:27 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Etcd-based coordinator services """
18
Richard Jankowski8b277c22017-12-19 09:49:27 -050019from structlog import get_logger
20from twisted.internet import reactor
21from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
22from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
23from zope.interface import implementer
Shad Ansarie4110042019-04-17 20:53:39 -070024from twisted.internet.error import DNSLookupError
Richard Jankowski8b277c22017-12-19 09:49:27 -050025
26from leader import Leader
27from common.utils.asleep import asleep
28from common.utils.message_queue import MessageQueue
29from voltha.registry import IComponent
30from worker import Worker
Zack Williams18357ed2018-11-14 10:41:08 -070031from simplejson import dumps
Richard Jankowski8b277c22017-12-19 09:49:27 -050032from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
33
34log = get_logger()
35
36
37class StaleMembershipEntryException(Exception):
38 pass
39
40
41@implementer(IComponent)
42class CoordinatorEtcd(object):
43 """
44 An app shall instantiate only one Coordinator (singleton).
45 A single instance of this object shall take care of all external
Zack Williams18357ed2018-11-14 10:41:08 -070046 with etcd, and via etcd, all coordination activities with its
Richard Jankowski8b277c22017-12-19 09:49:27 -050047 clustered peers. Roles include:
Zack Williams18357ed2018-11-14 10:41:08 -070048 - registering an ephemeral membership entry (k/v record) in etcd
Richard Jankowski8b277c22017-12-19 09:49:27 -050049 - participating in a symmetric leader election, and potentially assuming
50 the leader's role. What leadership entails is not a concern for the
51 coordination, it simply instantiates (and shuts down) a leader class
52 when it gains (or looses) leadership.
53 """
54
55 CONNECT_RETRY_INTERVAL_SEC = 1
56 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
57
58 # Public methods:
59
60 def __init__(self,
61 internal_host_address,
62 external_host_address,
63 instance_id,
64 rest_port,
65 config,
Richard Jankowski4ea26632018-05-14 17:45:38 -040066 etcd='localhost:2379',
67 container_name_regex='^.*\.([0-9]+)\..*$'):
Richard Jankowski8b277c22017-12-19 09:49:27 -050068
69 log.info('initializing-coordinator')
70 self.config = config['coordinator']
71 self.worker_config = config['worker']
72 self.leader_config = config['leader']
73 self.membership_watch_relatch_delay = config.get(
74 'membership_watch_relatch_delay', 0.1)
75 self.tracking_loop_delay = self.config.get(
76 'tracking_loop_delay', 1)
77 self.session_renewal_timeout = self.config.get(
78 'session_renewal_timeout', 5)
79 self.session_renewal_loop_delay = self.config.get(
80 'session_renewal_loop_delay', 3)
81 self.membership_maintenance_loop_delay = self.config.get(
82 'membership_maintenance_loop_delay', 5)
83 self.session_time_to_live = self.config.get(
84 'session_time_to_live', 10)
85 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
86 self.leader_prefix = '/'.join((self.prefix, self.config.get(
87 self.config['leader_key'], 'leader')))
88 self.membership_prefix = '/'.join((self.prefix, self.config.get(
89 self.config['membership_key'], 'members'), ''))
90 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
91 self.config['assignment_key'], 'assignments'), ''))
92 self.workload_prefix = '/'.join((self.prefix, self.config.get(
93 self.config['workload_key'], 'work'), ''))
94 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
95 self.config['core_store_key'], 'data/core')))
96 self.core_store_assignment_key = self.core_store_prefix + \
97 '/assignment'
98 self.core_storage_suffix = 'core_store'
99
100 self.retries = 0
101 self.instance_id = instance_id
102 self.internal_host_address = internal_host_address
103 self.external_host_address = external_host_address
104 self.rest_port = rest_port
105 self.membership_record_key = self.membership_prefix + self.instance_id
106
107 self.lease = None
108 # session_id refers to either a Consul session ID or an Etcd lease object
109 self.session_id = None
110 self.i_am_leader = False
111 self.leader_id = None # will be the instance id of the current leader
112 self.shutting_down = False
113 self.leader = None
114 self.membership_callback = None
115
116 self.worker = Worker(self.instance_id, self)
117
Richard Jankowski8b277c22017-12-19 09:49:27 -0500118 # Create etcd client
119 kv_host = etcd.split(':')[0].strip()
120 kv_port = etcd.split(':')[1].strip()
121 self.etcd_url = u'http://' + kv_host + u':' + kv_port
122 self.etcd = Client(reactor, self.etcd_url)
123
Richard Jankowski4ea26632018-05-14 17:45:38 -0400124 self.container_name_regex = container_name_regex
125
Richard Jankowski8b277c22017-12-19 09:49:27 -0500126 self.wait_for_leader_deferreds = []
127
128 self.peers_mapping_queue = MessageQueue()
129
130 def start(self):
131 log.debug('starting')
132 reactor.callLater(0, self._async_init)
133 log.info('started')
134 return self
135
136 @inlineCallbacks
137 def stop(self):
138 log.debug('stopping')
139 self.shutting_down = True
140 yield self._delete_session() # this will delete the leader lock too
141 yield self.worker.stop()
142 if self.leader is not None:
143 yield self.leader.stop()
144 self.leader = None
145 log.info('stopped')
146
147 def wait_for_a_leader(self):
148 """
149 Async wait till a leader is detected/elected. The deferred will be
150 called with the leader's instance_id
151 :return: Deferred.
152 """
153 d = Deferred()
154 if self.leader_id is not None:
155 d.callback(self.leader_id)
156 return d
157 else:
158 self.wait_for_leader_deferreds.append(d)
159 return d
160
161 # Wait for a core data id to be assigned to this voltha instance
162 @inlineCallbacks
163 def get_core_store_id_and_prefix(self):
164 core_store_id = yield self.worker.get_core_store_id()
165 returnValue((core_store_id, self.core_store_prefix))
166
167 def recv_peers_map(self):
168 return self.peers_mapping_queue.get()
169
170 def publish_peers_map_change(self, msg):
171 self.peers_mapping_queue.put(msg)
172
173 # Proxy methods for etcd with retry support
174
175 def kv_get(self, *args, **kw):
176 # Intercept 'index' argument
177 for name, value in kw.items():
178 if name == 'index':
179 kw.pop('index')
180 break
181 return self._retry('GET', *args, **kw)
182
183 def kv_put(self, *args, **kw):
184 return self._retry('PUT', *args, **kw)
185
186 def kv_delete(self, *args, **kw):
187 return self._retry('DELETE', *args, **kw)
188
189 # Methods exposing key membership information
190
191 @inlineCallbacks
192 def get_members(self):
193 """Return list of all members"""
194 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
195 returnValue([member['Key'][len(self.membership_prefix):]
196 for member in members])
197
198 # Private (internal) methods:
199
200 @inlineCallbacks
201 def _async_init(self):
202 yield self._create_session()
203 yield self._create_membership_record()
204 yield self._start_leader_tracking()
205 yield self.worker.start()
206
207 def _backoff(self, msg):
208 wait_time = self.RETRY_BACKOFF[min(self.retries,
209 len(self.RETRY_BACKOFF) - 1)]
210 self.retries += 1
Shad Ansarie4110042019-04-17 20:53:39 -0700211 log.warn(msg, retry_in=wait_time)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500212 return asleep(wait_time)
213
214 def _clear_backoff(self):
215 if self.retries:
Zack Williams18357ed2018-11-14 10:41:08 -0700216 log.info('reconnected-to-etcd', after_retries=self.retries)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500217 self.retries = 0
218
219 @inlineCallbacks
220 def _create_session(self):
221
222 @inlineCallbacks
223 def _create_session():
224 etcd = yield self.get_kv_client()
225 # Create etcd lease
226 self.lease = yield etcd.lease(self.session_time_to_live)
227 self.session_id = self.lease
228 log.info('created-etcd-lease', lease=self.session_id)
229 self._start_session_tracking()
230
231 yield self._retry(_create_session)
232
233 @inlineCallbacks
234 def _delete_session(self):
235 try:
236 yield self.lease.revoke()
237 except Exception as e:
Zack Williams18357ed2018-11-14 10:41:08 -0700238 log.exception('failed-to-delete-session %s' % e,
Richard Jankowski8b277c22017-12-19 09:49:27 -0500239 session_id=self.session_id)
240
241 @inlineCallbacks
242 def _create_membership_record(self):
243 yield self._do_create_membership_record_with_retries()
244 reactor.callLater(0, self._maintain_membership_record)
245
246 @inlineCallbacks
247 def _maintain_membership_record(self):
248 try:
249 while 1:
250 valid_membership = yield self._assert_membership_record_valid()
251 if not valid_membership:
252 log.info('recreating-membership-before',
253 session=self.session_id)
254 yield self._do_create_membership_record_with_retries()
255 log.info('recreating-membership-after',
256 session=self.session_id)
257 else:
258 log.debug('valid-membership', session=self.session_id)
259 # Async sleep before checking the membership record again
260 yield asleep(self.membership_maintenance_loop_delay)
261
262 except Exception, e:
263 log.exception('unexpected-error-leader-trackin', e=e)
264 finally:
265 # except in shutdown, the loop must continue (after a short delay)
266 if not self.shutting_down:
267 reactor.callLater(self.membership_watch_relatch_delay,
268 self._maintain_membership_record)
269
270 def _create_membership_record_data(self):
271 member_record = dict()
272 member_record['status'] = 'alive'
273 member_record['host_address'] = self.external_host_address
274 return member_record
275
276 @inlineCallbacks
277 def _assert_membership_record_valid(self):
278 try:
Zack Williams18357ed2018-11-14 10:41:08 -0700279 log.debug('membership-record-before')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500280 is_timeout, (_, record) = yield \
Zack Williams18357ed2018-11-14 10:41:08 -0700281 self.coordinator_get_with_timeout(
Richard Jankowski8b277c22017-12-19 09:49:27 -0500282 key=self.membership_record_key,
283 index=0,
284 timeout=5)
285 if is_timeout:
Zack Williams18357ed2018-11-14 10:41:08 -0700286 log.debug('timeout creating membership record in etcd, key: %s' %
287 self.membership_record_key)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500288 returnValue(False)
289
Zack Williams18357ed2018-11-14 10:41:08 -0700290 log.debug('membership-record-after', record=record)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500291 if record is None or \
292 'Session' not in record:
293 log.info('membership-record-change-detected',
294 old_session=self.session_id,
295 record=record)
296 returnValue(False)
297 else:
298 returnValue(True)
299 except Exception as e:
300 log.exception('membership-validation-exception', e=e)
301 returnValue(False)
302
303 @inlineCallbacks
304 def _do_create_membership_record_with_retries(self):
305 while 1:
306 log.info('recreating-membership', session=self.session_id)
307 result = yield self._retry(
308 'PUT',
309 self.membership_record_key,
310 dumps(self._create_membership_record_data()),
311 acquire=self.session_id)
312 if result:
313 log.info('new-membership-record-created',
314 session=self.session_id)
315 break
316 else:
317 log.warn('cannot-create-membership-record')
318 yield self._backoff('stale-membership-record')
319
320 def _start_session_tracking(self):
321 reactor.callLater(0, self._session_tracking_loop)
322
323 @inlineCallbacks
324 def _session_tracking_loop(self):
325
326 @inlineCallbacks
327 def _redo_session():
328 log.info('_redo_session-before')
329 yield self._delete_session()
330
331 # Create a new etcd connection/session with a new lease
332 try:
333 self.etcd = Client(reactor, self.etcd_url)
334 self.lease = yield self.etcd.lease(self.session_time_to_live)
335 self.session_id = self.lease
336 log.info('new-etcd-session', session=self.session_id)
337
338 except Exception as e:
339 log.exception('could-not-create-an-etcd-lease', e=e)
340
341 @inlineCallbacks
342 def _renew_session(m_callback):
343 try:
344 time_left = yield self.lease.remaining()
Zack Williams18357ed2018-11-14 10:41:08 -0700345 log.debug('_renew_session', time_left=time_left)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500346 result = yield self.lease.refresh()
Zack Williams18357ed2018-11-14 10:41:08 -0700347 log.debug('just-renewed-session', result=result)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500348 if not m_callback.called:
349 # Triggering callback will cancel the timeout timer
Zack Williams18357ed2018-11-14 10:41:08 -0700350 log.debug('trigger-callback-to-cancel-timeout-timer')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500351 m_callback.callback(result)
352 else:
353 # Timeout event has already been called. Just ignore
354 # this event
355 log.info('renew-called-after-timeout, etcd ref changed?')
356 except Exception, e:
357 # Let the invoking method receive a timeout
358 log.exception('could-not-renew-session', e=e)
359
360 try:
361 while 1:
362 log.debug('session-tracking-start')
363 rcvd = DeferredWithTimeout(
364 timeout=self.session_renewal_timeout)
365 _renew_session(rcvd)
366 try:
367 _ = yield rcvd
368 except TimeOutError as e:
369 log.info('session-renew-timeout', e=e)
370 # Redo the session
371 yield _redo_session()
372 except Exception as e:
373 log.exception('session-renew-exception', e=e)
374 else:
375 log.debug('successfully-renewed-session')
376
377 # Async sleep before the next session tracking
378 yield asleep(self.session_renewal_loop_delay)
379
380 except Exception as e:
381 log.exception('renew-exception', e=e)
382 finally:
383 reactor.callLater(self.session_renewal_loop_delay,
384 self._session_tracking_loop)
385
386 def _start_leader_tracking(self):
387 reactor.callLater(0, self._leadership_tracking_loop)
388
389 @inlineCallbacks
390 def _leadership_tracking_loop(self):
Richard Jankowski8b277c22017-12-19 09:49:27 -0500391 try:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400392 # Try to seize leadership via test-and-set operation.
393 # Success means the leader key was previously absent
394 # and was just re-created by this instance.
Richard Jankowski8b277c22017-12-19 09:49:27 -0500395
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400396 leader_prefix = bytes(self.leader_prefix)
397 txn = Transaction(
398 compare=[
399 CompVersion(leader_prefix, '==', 0)
400 ],
401 success=[
402 OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
403 OpGet(leader_prefix)
404 ],
405 failure=[]
406 )
407 newly_asserted = False
408 try:
409 result = yield self.etcd.submit(txn)
410 except Failed as failed:
411 # Leader key already present
412 pass
Richard Jankowski8b277c22017-12-19 09:49:27 -0500413 else:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400414 newly_asserted = True
415 log.info('leader-key-absent')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500416
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400417 # Confirm that the assertion succeeded by reading back
418 # the value of the leader key.
419 leader = None
420 result = yield self.etcd.get(leader_prefix)
421 if result.kvs:
422 kv = result.kvs[0]
423 leader = kv.value
Zack Williams18357ed2018-11-14 10:41:08 -0700424 log.debug('get-leader-key', leader=leader, instance=self.instance_id)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500425
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400426 if leader is None:
427 log.error('get-leader-failed')
428 elif leader == self.instance_id:
429 if newly_asserted:
430 log.info('leadership-seized')
431 yield self._assert_leadership()
432 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700433 log.debug('already-leader')
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400434 else:
Zack Williams18357ed2018-11-14 10:41:08 -0700435 log.debug('leader-is-another', leader=leader)
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400436 yield self._assert_nonleadership(leader)
437
Zack Williams18357ed2018-11-14 10:41:08 -0700438 except Exception as e:
Shad Ansarie4110042019-04-17 20:53:39 -0700439 log.warn('unexpected-error-leader-tracking', e=e)
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400440
441 finally:
442 # Except in shutdown, the loop must continue (after a short delay)
443 if not self.shutting_down:
444 reactor.callLater(self.tracking_loop_delay,
445 self._leadership_tracking_loop)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500446
447 @inlineCallbacks
448 def _assert_leadership(self):
449 """(Re-)assert leadership"""
450 if not self.i_am_leader:
451 self.i_am_leader = True
452 self._set_leader_id(self.instance_id)
453 yield self._just_gained_leadership()
454
455 @inlineCallbacks
456 def _assert_nonleadership(self, leader_id):
457 """(Re-)assert non-leader role"""
458
459 # update leader_id anyway
460 self._set_leader_id(leader_id)
461
462 if self.i_am_leader:
463 self.i_am_leader = False
464 yield self._just_lost_leadership()
465
466 def _set_leader_id(self, leader_id):
467 self.leader_id = leader_id
468 deferreds, self.wait_for_leader_deferreds = \
469 self.wait_for_leader_deferreds, []
470 for d in deferreds:
471 d.callback(leader_id)
472
473 def _just_gained_leadership(self):
474 log.info('became-leader')
475 self.leader = Leader(self)
476 return self.leader.start()
477
478 def _just_lost_leadership(self):
479 log.info('lost-leadership')
480 return self._halt_leader()
481
482 def _halt_leader(self):
483 if self.leader:
484 d = self.leader.stop()
485 self.leader = None
486 return d
487
488 def get_kv_client(self):
489 return self.etcd
490
491 @inlineCallbacks
492 def _retry(self, operation, *args, **kw):
493 prefix = False
Richard Jankowski4ea26632018-05-14 17:45:38 -0400494 keys_only = False
Richard Jankowski8b277c22017-12-19 09:49:27 -0500495 for name, value in kw.items():
496 if name == 'acquire':
497 lease = value
498 kw['lease'] = lease
499 kw.pop('acquire')
500 elif name == 'keys':
Richard Jankowski4ea26632018-05-14 17:45:38 -0400501 keys_only = True
502 prefix = True
503 keyset = KeySet(bytes(args[0]), prefix=True)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500504 kw['keys_only'] = True
505 kw.pop('keys')
506 elif name=='recurse':
Richard Jankowski8b277c22017-12-19 09:49:27 -0500507 prefix = True
508 keyset = KeySet(bytes(args[0]), prefix=True)
509 kw.pop('recurse')
Zack Williams18357ed2018-11-14 10:41:08 -0700510 log.debug('start-op', operation=operation, args=args, kw=kw)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500511
512 while 1:
513 try:
514 etcd = yield self.get_kv_client()
515 if operation == 'GET':
516 key = bytes(args[0])
517 # If multiple keys requested, return a list
518 # else return a single record
519 if not prefix:
520 index = 0
521 record = dict()
522 res = yield etcd.get(key, **kw)
523 if res.kvs:
524 if len(res.kvs) == 1:
525 kv = res.kvs[0]
526 index = kv.mod_revision
527 record['Key'] = kv.key
528 record['Value'] = kv.value
529 record['ModifyIndex'] = index
530 record['Session'] = self.lease.lease_id if self.lease else ''
531 result = (index, record)
532 else:
533 # Get values for all keys that match the prefix
Richard Jankowski4ea26632018-05-14 17:45:38 -0400534 # If keys_only requested, get only the keys
Richard Jankowski8b277c22017-12-19 09:49:27 -0500535 index = 0
536 records = []
Richard Jankowski4ea26632018-05-14 17:45:38 -0400537 keys = []
Richard Jankowski8b277c22017-12-19 09:49:27 -0500538 res = yield etcd.get(keyset, **kw)
539 if args[0] == 'service/voltha/assignments/':
540 log.info('assignments', result=res)
541 if res.kvs and len(res.kvs) > 0:
542 for kv in res.kvs:
543 # Which index should be returned? The max over all keys?
544 if kv.mod_revision > index:
545 index = kv.mod_revision
Richard Jankowski4ea26632018-05-14 17:45:38 -0400546 if keys_only:
547 keys.append(kv.key)
548 else:
549 rec = dict()
550 rec['Key'] = kv.key
551 rec['Value'] = kv.value
552 rec['ModifyIndex'] = kv.mod_revision
553 rec['Session'] = self.lease.lease_id if self.lease else ''
554 records.append(rec)
555 result = (index, keys) if keys_only else (index, records)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500556 elif operation == 'PUT':
557 key = bytes(args[0])
558 result = yield etcd.set(key, args[1], **kw)
559 elif operation == 'DELETE':
560 key = bytes(args[0])
Richard Jankowski4ea26632018-05-14 17:45:38 -0400561 result = yield etcd.delete(keyset)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500562 else:
563 # Default case - consider operation as a function call
564 result = yield operation(*args, **kw)
565 self._clear_backoff()
566 break
Shad Ansarie4110042019-04-17 20:53:39 -0700567 except DNSLookupError, e:
568 log.warn('dns-lookup-failed', operation=operation, args=args,
569 reason=e)
570 yield self._backoff('dns-lookup-failed')
Zack Williams18357ed2018-11-14 10:41:08 -0700571 except Exception as e:
Richard Jankowski8b277c22017-12-19 09:49:27 -0500572 if not self.shutting_down:
573 log.exception(e)
Zack Williams18357ed2018-11-14 10:41:08 -0700574 yield self._backoff('etcd-unknown-error: %s' % e)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500575
Zack Williams18357ed2018-11-14 10:41:08 -0700576 log.debug('end-op', operation=operation, args=args, kw=kw)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500577 returnValue(result)
578
579 @inlineCallbacks
Zack Williams18357ed2018-11-14 10:41:08 -0700580 def coordinator_get_with_timeout(self, key, timeout, **kw):
Richard Jankowski8b277c22017-12-19 09:49:27 -0500581 """
582 Query etcd with a timeout
583 :param key: Key to query
584 :param timeout: timeout value
585 :param kw: additional key-value params
586 :return: (is_timeout, (index, result)).
587
588 The Consul version of this method performed a 'wait-type' get operation
589 that returned a result when the key's value had a ModifyIndex greater
590 than the 'index' argument. Not sure etcd supports this functionality.
591 """
592
593 # Intercept 'index' argument
594 for name, value in kw.items():
595 if name == 'index':
596 mod_revision = value
Zack Williams18357ed2018-11-14 10:41:08 -0700597 log.debug('coordinator-get-with-timeout-etcd',
598 index=mod_revision)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500599 kw.pop('index')
600 break
601
602 @inlineCallbacks
603 def _get(key, m_callback):
604 try:
605 (index, result) = yield self._retry('GET', key, **kw)
606 if index > mod_revision and not m_callback.called:
607 log.debug('got-result-cancelling-timer')
608 m_callback.callback((index, result))
609 except Exception as e:
610 log.exception('got-exception', e=e)
611
612 try:
613 rcvd = DeferredWithTimeout(timeout=timeout)
614 _get(key, rcvd)
615 try:
616 result = yield rcvd
617 log.debug('result-received', result=result)
618 returnValue((False, result))
619 except TimeOutError as e:
Zack Williams8d811fd2018-11-22 09:23:23 -0700620 log.debug('timeout-or-no-data-change', etcd_key=key)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500621 except Exception as e:
622 log.exception('exception', e=e)
623 except Exception as e:
624 log.exception('exception', e=e)
625
626 returnValue((True, (None, None)))