blob: 238223f8a7a44345132022acbd4c63590f71d321 [file] [log] [blame]
Richard Jankowski8b277c22017-12-19 09:49:27 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Etcd-based coordinator services """
18
19from consul import ConsulException
20from consul.twisted import Consul
21from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
24from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
25from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
26from zope.interface import implementer
27
28from leader import Leader
29from common.utils.asleep import asleep
30from common.utils.message_queue import MessageQueue
31from voltha.registry import IComponent
32from worker import Worker
33from simplejson import dumps, loads
34from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
35
36log = get_logger()
37
38
39class StaleMembershipEntryException(Exception):
40 pass
41
42
43@implementer(IComponent)
44class CoordinatorEtcd(object):
45 """
46 An app shall instantiate only one Coordinator (singleton).
47 A single instance of this object shall take care of all external
48 with consul, and via consul, all coordination activities with its
49 clustered peers. Roles include:
50 - registering an ephemeral membership entry (k/v record) in consul
51 - participating in a symmetric leader election, and potentially assuming
52 the leader's role. What leadership entails is not a concern for the
53 coordination, it simply instantiates (and shuts down) a leader class
54 when it gains (or looses) leadership.
55 """
56
57 CONNECT_RETRY_INTERVAL_SEC = 1
58 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
59
60 # Public methods:
61
62 def __init__(self,
63 internal_host_address,
64 external_host_address,
65 instance_id,
66 rest_port,
67 config,
68 consul='localhost:8500',
69 etcd='localhost:2379'):
70
71 log.info('initializing-coordinator')
72 self.config = config['coordinator']
73 self.worker_config = config['worker']
74 self.leader_config = config['leader']
75 self.membership_watch_relatch_delay = config.get(
76 'membership_watch_relatch_delay', 0.1)
77 self.tracking_loop_delay = self.config.get(
78 'tracking_loop_delay', 1)
79 self.session_renewal_timeout = self.config.get(
80 'session_renewal_timeout', 5)
81 self.session_renewal_loop_delay = self.config.get(
82 'session_renewal_loop_delay', 3)
83 self.membership_maintenance_loop_delay = self.config.get(
84 'membership_maintenance_loop_delay', 5)
85 self.session_time_to_live = self.config.get(
86 'session_time_to_live', 10)
87 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
88 self.leader_prefix = '/'.join((self.prefix, self.config.get(
89 self.config['leader_key'], 'leader')))
90 self.membership_prefix = '/'.join((self.prefix, self.config.get(
91 self.config['membership_key'], 'members'), ''))
92 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
93 self.config['assignment_key'], 'assignments'), ''))
94 self.workload_prefix = '/'.join((self.prefix, self.config.get(
95 self.config['workload_key'], 'work'), ''))
96 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
97 self.config['core_store_key'], 'data/core')))
98 self.core_store_assignment_key = self.core_store_prefix + \
99 '/assignment'
100 self.core_storage_suffix = 'core_store'
101
102 self.retries = 0
103 self.instance_id = instance_id
104 self.internal_host_address = internal_host_address
105 self.external_host_address = external_host_address
106 self.rest_port = rest_port
107 self.membership_record_key = self.membership_prefix + self.instance_id
108
109 self.lease = None
110 # session_id refers to either a Consul session ID or an Etcd lease object
111 self.session_id = None
112 self.i_am_leader = False
113 self.leader_id = None # will be the instance id of the current leader
114 self.shutting_down = False
115 self.leader = None
116 self.membership_callback = None
117
118 self.worker = Worker(self.instance_id, self)
119
120 self.host = consul.split(':')[0].strip()
121 self.port = int(consul.split(':')[1].strip())
122
123 # TODO need to handle reconnect events properly
124 self.consul = Consul(host=self.host, port=self.port)
125
126 # Create etcd client
127 kv_host = etcd.split(':')[0].strip()
128 kv_port = etcd.split(':')[1].strip()
129 self.etcd_url = u'http://' + kv_host + u':' + kv_port
130 self.etcd = Client(reactor, self.etcd_url)
131
132 self.wait_for_leader_deferreds = []
133
134 self.peers_mapping_queue = MessageQueue()
135
136 def start(self):
137 log.debug('starting')
138 reactor.callLater(0, self._async_init)
139 log.info('started')
140 return self
141
142 @inlineCallbacks
143 def stop(self):
144 log.debug('stopping')
145 self.shutting_down = True
146 yield self._delete_session() # this will delete the leader lock too
147 yield self.worker.stop()
148 if self.leader is not None:
149 yield self.leader.stop()
150 self.leader = None
151 log.info('stopped')
152
153 def wait_for_a_leader(self):
154 """
155 Async wait till a leader is detected/elected. The deferred will be
156 called with the leader's instance_id
157 :return: Deferred.
158 """
159 d = Deferred()
160 if self.leader_id is not None:
161 d.callback(self.leader_id)
162 return d
163 else:
164 self.wait_for_leader_deferreds.append(d)
165 return d
166
167 # Wait for a core data id to be assigned to this voltha instance
168 @inlineCallbacks
169 def get_core_store_id_and_prefix(self):
170 core_store_id = yield self.worker.get_core_store_id()
171 returnValue((core_store_id, self.core_store_prefix))
172
173 def recv_peers_map(self):
174 return self.peers_mapping_queue.get()
175
176 def publish_peers_map_change(self, msg):
177 self.peers_mapping_queue.put(msg)
178
179 # Proxy methods for etcd with retry support
180
181 def kv_get(self, *args, **kw):
182 # Intercept 'index' argument
183 for name, value in kw.items():
184 if name == 'index':
185 kw.pop('index')
186 break
187 return self._retry('GET', *args, **kw)
188
189 def kv_put(self, *args, **kw):
190 return self._retry('PUT', *args, **kw)
191
192 def kv_delete(self, *args, **kw):
193 return self._retry('DELETE', *args, **kw)
194
195 # Methods exposing key membership information
196
197 @inlineCallbacks
198 def get_members(self):
199 """Return list of all members"""
200 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
201 returnValue([member['Key'][len(self.membership_prefix):]
202 for member in members])
203
204 # Private (internal) methods:
205
206 @inlineCallbacks
207 def _async_init(self):
208 yield self._create_session()
209 yield self._create_membership_record()
210 yield self._start_leader_tracking()
211 yield self.worker.start()
212
213 def _backoff(self, msg):
214 wait_time = self.RETRY_BACKOFF[min(self.retries,
215 len(self.RETRY_BACKOFF) - 1)]
216 self.retries += 1
217 log.error(msg, retry_in=wait_time)
218 return asleep(wait_time)
219
220 def _clear_backoff(self):
221 if self.retries:
222 log.info('reconnected-to-consul', after_retries=self.retries)
223 self.retries = 0
224
225 @inlineCallbacks
226 def _create_session(self):
227
228 @inlineCallbacks
229 def _create_session():
230 etcd = yield self.get_kv_client()
231 # Create etcd lease
232 self.lease = yield etcd.lease(self.session_time_to_live)
233 self.session_id = self.lease
234 log.info('created-etcd-lease', lease=self.session_id)
235 self._start_session_tracking()
236
237 yield self._retry(_create_session)
238
239 @inlineCallbacks
240 def _delete_session(self):
241 try:
242 yield self.lease.revoke()
243 except Exception as e:
244 log.exception('failed-to-delete-session',
245 session_id=self.session_id)
246
247 @inlineCallbacks
248 def _create_membership_record(self):
249 yield self._do_create_membership_record_with_retries()
250 reactor.callLater(0, self._maintain_membership_record)
251
252 @inlineCallbacks
253 def _maintain_membership_record(self):
254 try:
255 while 1:
256 valid_membership = yield self._assert_membership_record_valid()
257 if not valid_membership:
258 log.info('recreating-membership-before',
259 session=self.session_id)
260 yield self._do_create_membership_record_with_retries()
261 log.info('recreating-membership-after',
262 session=self.session_id)
263 else:
264 log.debug('valid-membership', session=self.session_id)
265 # Async sleep before checking the membership record again
266 yield asleep(self.membership_maintenance_loop_delay)
267
268 except Exception, e:
269 log.exception('unexpected-error-leader-trackin', e=e)
270 finally:
271 # except in shutdown, the loop must continue (after a short delay)
272 if not self.shutting_down:
273 reactor.callLater(self.membership_watch_relatch_delay,
274 self._maintain_membership_record)
275
276 def _create_membership_record_data(self):
277 member_record = dict()
278 member_record['status'] = 'alive'
279 member_record['host_address'] = self.external_host_address
280 return member_record
281
282 @inlineCallbacks
283 def _assert_membership_record_valid(self):
284 try:
285 log.info('membership-record-before')
286 is_timeout, (_, record) = yield \
287 self.consul_get_with_timeout(
288 key=self.membership_record_key,
289 index=0,
290 timeout=5)
291 if is_timeout:
292 returnValue(False)
293
294 log.info('membership-record-after', record=record)
295 if record is None or \
296 'Session' not in record:
297 log.info('membership-record-change-detected',
298 old_session=self.session_id,
299 record=record)
300 returnValue(False)
301 else:
302 returnValue(True)
303 except Exception as e:
304 log.exception('membership-validation-exception', e=e)
305 returnValue(False)
306
307 @inlineCallbacks
308 def _do_create_membership_record_with_retries(self):
309 while 1:
310 log.info('recreating-membership', session=self.session_id)
311 result = yield self._retry(
312 'PUT',
313 self.membership_record_key,
314 dumps(self._create_membership_record_data()),
315 acquire=self.session_id)
316 if result:
317 log.info('new-membership-record-created',
318 session=self.session_id)
319 break
320 else:
321 log.warn('cannot-create-membership-record')
322 yield self._backoff('stale-membership-record')
323
324 def _start_session_tracking(self):
325 reactor.callLater(0, self._session_tracking_loop)
326
327 @inlineCallbacks
328 def _session_tracking_loop(self):
329
330 @inlineCallbacks
331 def _redo_session():
332 log.info('_redo_session-before')
333 yield self._delete_session()
334
335 # Create a new etcd connection/session with a new lease
336 try:
337 self.etcd = Client(reactor, self.etcd_url)
338 self.lease = yield self.etcd.lease(self.session_time_to_live)
339 self.session_id = self.lease
340 log.info('new-etcd-session', session=self.session_id)
341
342 except Exception as e:
343 log.exception('could-not-create-an-etcd-lease', e=e)
344
345 @inlineCallbacks
346 def _renew_session(m_callback):
347 try:
348 time_left = yield self.lease.remaining()
349 log.info('_renew_session', time_left=time_left)
350 result = yield self.lease.refresh()
351 log.info('just-renewed-session', result=result)
352 if not m_callback.called:
353 # Triggering callback will cancel the timeout timer
354 log.info('trigger-callback-to-cancel-timeout-timer')
355 m_callback.callback(result)
356 else:
357 # Timeout event has already been called. Just ignore
358 # this event
359 log.info('renew-called-after-timeout, etcd ref changed?')
360 except Exception, e:
361 # Let the invoking method receive a timeout
362 log.exception('could-not-renew-session', e=e)
363
364 try:
365 while 1:
366 log.debug('session-tracking-start')
367 rcvd = DeferredWithTimeout(
368 timeout=self.session_renewal_timeout)
369 _renew_session(rcvd)
370 try:
371 _ = yield rcvd
372 except TimeOutError as e:
373 log.info('session-renew-timeout', e=e)
374 # Redo the session
375 yield _redo_session()
376 except Exception as e:
377 log.exception('session-renew-exception', e=e)
378 else:
379 log.debug('successfully-renewed-session')
380
381 # Async sleep before the next session tracking
382 yield asleep(self.session_renewal_loop_delay)
383
384 except Exception as e:
385 log.exception('renew-exception', e=e)
386 finally:
387 reactor.callLater(self.session_renewal_loop_delay,
388 self._session_tracking_loop)
389
390 def _start_leader_tracking(self):
391 reactor.callLater(0, self._leadership_tracking_loop)
392
393 @inlineCallbacks
394 def _leadership_tracking_loop(self):
395 log.info('leadership-attempt-before')
396
397 # Try to acquire leadership lease via test-and-set operation.
398 # Success means the leader key was previously absent and was
399 # just re-created by this instance.
400 leader_prefix = bytes(self.leader_prefix)
401 txn = Transaction(
402 compare=[
403 CompVersion(leader_prefix, '==', 0)
404 ],
405 success=[
406 OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
407 OpGet(leader_prefix)
408 ],
409 failure=[]
410 )
411 newly_asserted = False
412 try:
413 result = yield self.etcd.submit(txn)
414 except Failed as failed:
415 log.info('Leader key PRESENT')
416 for response in failed.responses:
417 log.info('Leader key already present', response=response)
418 else:
419 newly_asserted = True
420 log.info('Leader key ABSENT')
421 for response in result.responses:
422 log.info('Leader key was absent', response=response)
423
424 log.info('leadership-attempt-after')
425
426 # Confirm that the assertion succeeded by reading back the value
427 # of the leader key.
428 leader = None
429 result = yield self.etcd.get(b'service/voltha/leader')
430 if result.kvs:
431 kv = result.kvs[0]
432 leader = kv.value
433 log.info('Leader readback', leader=leader, instance=self.instance_id)
434
435 if leader is None:
436 log.info('Failed to read leader key')
437 elif leader == self.instance_id:
438 if newly_asserted:
439 log.info("I JUST BECAME LEADER!")
440 yield self._assert_leadership()
441 else:
442 log.info("I'm an aging LEADER")
443 else:
444 log.info('The LEADER is another', leader=leader)
445 yield self._assert_nonleadership(leader)
446
447 # May have to add code here to handle case where, for some reason, the lease
448 # had been blown away and the txn failed for that reason
449
450 # except in shutdown, the loop must continue (after a short delay)
451 if not self.shutting_down:
452 reactor.callLater(self.tracking_loop_delay,
453 self._leadership_tracking_loop)
454
455 @inlineCallbacks
456 def _assert_leadership(self):
457 """(Re-)assert leadership"""
458 if not self.i_am_leader:
459 self.i_am_leader = True
460 self._set_leader_id(self.instance_id)
461 yield self._just_gained_leadership()
462
463 @inlineCallbacks
464 def _assert_nonleadership(self, leader_id):
465 """(Re-)assert non-leader role"""
466
467 # update leader_id anyway
468 self._set_leader_id(leader_id)
469
470 if self.i_am_leader:
471 self.i_am_leader = False
472 yield self._just_lost_leadership()
473
474 def _set_leader_id(self, leader_id):
475 self.leader_id = leader_id
476 deferreds, self.wait_for_leader_deferreds = \
477 self.wait_for_leader_deferreds, []
478 for d in deferreds:
479 d.callback(leader_id)
480
481 def _just_gained_leadership(self):
482 log.info('became-leader')
483 self.leader = Leader(self)
484 return self.leader.start()
485
486 def _just_lost_leadership(self):
487 log.info('lost-leadership')
488 return self._halt_leader()
489
490 def _halt_leader(self):
491 if self.leader:
492 d = self.leader.stop()
493 self.leader = None
494 return d
495
496 def get_kv_client(self):
497 return self.etcd
498
499 @inlineCallbacks
500 def _retry(self, operation, *args, **kw):
501 prefix = False
502 for name, value in kw.items():
503 if name == 'acquire':
504 lease = value
505 kw['lease'] = lease
506 kw.pop('acquire')
507 elif name == 'keys':
508 kw['keys_only'] = True
509 kw.pop('keys')
510 elif name=='recurse':
511# if value == 'True':
512 prefix = True
513 keyset = KeySet(bytes(args[0]), prefix=True)
514 kw.pop('recurse')
515 log.info('start-op', operation=operation, args=args, kw=kw)
516
517 while 1:
518 try:
519 etcd = yield self.get_kv_client()
520 if operation == 'GET':
521 key = bytes(args[0])
522 # If multiple keys requested, return a list
523 # else return a single record
524 if not prefix:
525 index = 0
526 record = dict()
527 res = yield etcd.get(key, **kw)
528 if res.kvs:
529 if len(res.kvs) == 1:
530 kv = res.kvs[0]
531 index = kv.mod_revision
532 record['Key'] = kv.key
533 record['Value'] = kv.value
534 record['ModifyIndex'] = index
535 record['Session'] = self.lease.lease_id if self.lease else ''
536 result = (index, record)
537 else:
538 # Get values for all keys that match the prefix
539 index = 0
540 records = []
541 res = yield etcd.get(keyset, **kw)
542 if args[0] == 'service/voltha/assignments/':
543 log.info('assignments', result=res)
544 if res.kvs and len(res.kvs) > 0:
545 for kv in res.kvs:
546 # Which index should be returned? The max over all keys?
547 if kv.mod_revision > index:
548 index = kv.mod_revision
549 rec = dict()
550 rec['Key'] = kv.key
551 rec['Value'] = kv.value
552 rec['ModifyIndex'] = kv.mod_revision
553 rec['Session'] = self.lease.lease_id if self.lease else ''
554 records.append(rec)
555 result = (index, records)
556 elif operation == 'PUT':
557 key = bytes(args[0])
558 result = yield etcd.set(key, args[1], **kw)
559 elif operation == 'DELETE':
560 key = bytes(args[0])
561 result = yield etcd.delete(key, **kw)
562 else:
563 # Default case - consider operation as a function call
564 result = yield operation(*args, **kw)
565 self._clear_backoff()
566 break
567 except Exception, e:
568 if not self.shutting_down:
569 log.exception(e)
570 yield self._backoff('unknown-error')
571
572 log.info('end-op', operation=operation, args=args, kw=kw, result=result)
573 returnValue(result)
574
575 @inlineCallbacks
576 def consul_get_with_timeout(self, key, timeout, **kw):
577 """
578 Query etcd with a timeout
579 :param key: Key to query
580 :param timeout: timeout value
581 :param kw: additional key-value params
582 :return: (is_timeout, (index, result)).
583
584 The Consul version of this method performed a 'wait-type' get operation
585 that returned a result when the key's value had a ModifyIndex greater
586 than the 'index' argument. Not sure etcd supports this functionality.
587 """
588
589 # Intercept 'index' argument
590 for name, value in kw.items():
591 if name == 'index':
592 mod_revision = value
593 log.info('consul_get_with_timeout', index=mod_revision)
594 kw.pop('index')
595 break
596
597 @inlineCallbacks
598 def _get(key, m_callback):
599 try:
600 (index, result) = yield self._retry('GET', key, **kw)
601 if index > mod_revision and not m_callback.called:
602 log.debug('got-result-cancelling-timer')
603 m_callback.callback((index, result))
604 except Exception as e:
605 log.exception('got-exception', e=e)
606
607 try:
608 rcvd = DeferredWithTimeout(timeout=timeout)
609 _get(key, rcvd)
610 try:
611 result = yield rcvd
612 log.debug('result-received', result=result)
613 returnValue((False, result))
614 except TimeOutError as e:
615 log.debug('timeout-or-no-data-change', key=key)
616 except Exception as e:
617 log.exception('exception', e=e)
618 except Exception as e:
619 log.exception('exception', e=e)
620
621 returnValue((True, (None, None)))