blob: c736104689cde4cf8fb37e534eb86d351e2905f9 [file] [log] [blame]
Richard Jankowski8b277c22017-12-19 09:49:27 -05001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Etcd-based coordinator services """
18
19from consul import ConsulException
20from consul.twisted import Consul
21from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
24from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
25from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
26from zope.interface import implementer
27
28from leader import Leader
29from common.utils.asleep import asleep
30from common.utils.message_queue import MessageQueue
31from voltha.registry import IComponent
32from worker import Worker
33from simplejson import dumps, loads
34from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
35
36log = get_logger()
37
38
39class StaleMembershipEntryException(Exception):
40 pass
41
42
43@implementer(IComponent)
44class CoordinatorEtcd(object):
45 """
46 An app shall instantiate only one Coordinator (singleton).
47 A single instance of this object shall take care of all external
48 with consul, and via consul, all coordination activities with its
49 clustered peers. Roles include:
50 - registering an ephemeral membership entry (k/v record) in consul
51 - participating in a symmetric leader election, and potentially assuming
52 the leader's role. What leadership entails is not a concern for the
53 coordination, it simply instantiates (and shuts down) a leader class
54 when it gains (or looses) leadership.
55 """
56
57 CONNECT_RETRY_INTERVAL_SEC = 1
58 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
59
60 # Public methods:
61
62 def __init__(self,
63 internal_host_address,
64 external_host_address,
65 instance_id,
66 rest_port,
67 config,
68 consul='localhost:8500',
Richard Jankowski4ea26632018-05-14 17:45:38 -040069 etcd='localhost:2379',
70 container_name_regex='^.*\.([0-9]+)\..*$'):
Richard Jankowski8b277c22017-12-19 09:49:27 -050071
72 log.info('initializing-coordinator')
73 self.config = config['coordinator']
74 self.worker_config = config['worker']
75 self.leader_config = config['leader']
76 self.membership_watch_relatch_delay = config.get(
77 'membership_watch_relatch_delay', 0.1)
78 self.tracking_loop_delay = self.config.get(
79 'tracking_loop_delay', 1)
80 self.session_renewal_timeout = self.config.get(
81 'session_renewal_timeout', 5)
82 self.session_renewal_loop_delay = self.config.get(
83 'session_renewal_loop_delay', 3)
84 self.membership_maintenance_loop_delay = self.config.get(
85 'membership_maintenance_loop_delay', 5)
86 self.session_time_to_live = self.config.get(
87 'session_time_to_live', 10)
88 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
89 self.leader_prefix = '/'.join((self.prefix, self.config.get(
90 self.config['leader_key'], 'leader')))
91 self.membership_prefix = '/'.join((self.prefix, self.config.get(
92 self.config['membership_key'], 'members'), ''))
93 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
94 self.config['assignment_key'], 'assignments'), ''))
95 self.workload_prefix = '/'.join((self.prefix, self.config.get(
96 self.config['workload_key'], 'work'), ''))
97 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
98 self.config['core_store_key'], 'data/core')))
99 self.core_store_assignment_key = self.core_store_prefix + \
100 '/assignment'
101 self.core_storage_suffix = 'core_store'
102
103 self.retries = 0
104 self.instance_id = instance_id
105 self.internal_host_address = internal_host_address
106 self.external_host_address = external_host_address
107 self.rest_port = rest_port
108 self.membership_record_key = self.membership_prefix + self.instance_id
109
110 self.lease = None
111 # session_id refers to either a Consul session ID or an Etcd lease object
112 self.session_id = None
113 self.i_am_leader = False
114 self.leader_id = None # will be the instance id of the current leader
115 self.shutting_down = False
116 self.leader = None
117 self.membership_callback = None
118
119 self.worker = Worker(self.instance_id, self)
120
121 self.host = consul.split(':')[0].strip()
122 self.port = int(consul.split(':')[1].strip())
123
124 # TODO need to handle reconnect events properly
125 self.consul = Consul(host=self.host, port=self.port)
126
127 # Create etcd client
128 kv_host = etcd.split(':')[0].strip()
129 kv_port = etcd.split(':')[1].strip()
130 self.etcd_url = u'http://' + kv_host + u':' + kv_port
131 self.etcd = Client(reactor, self.etcd_url)
132
Richard Jankowski4ea26632018-05-14 17:45:38 -0400133 self.container_name_regex = container_name_regex
134
Richard Jankowski8b277c22017-12-19 09:49:27 -0500135 self.wait_for_leader_deferreds = []
136
137 self.peers_mapping_queue = MessageQueue()
138
139 def start(self):
140 log.debug('starting')
141 reactor.callLater(0, self._async_init)
142 log.info('started')
143 return self
144
145 @inlineCallbacks
146 def stop(self):
147 log.debug('stopping')
148 self.shutting_down = True
149 yield self._delete_session() # this will delete the leader lock too
150 yield self.worker.stop()
151 if self.leader is not None:
152 yield self.leader.stop()
153 self.leader = None
154 log.info('stopped')
155
156 def wait_for_a_leader(self):
157 """
158 Async wait till a leader is detected/elected. The deferred will be
159 called with the leader's instance_id
160 :return: Deferred.
161 """
162 d = Deferred()
163 if self.leader_id is not None:
164 d.callback(self.leader_id)
165 return d
166 else:
167 self.wait_for_leader_deferreds.append(d)
168 return d
169
170 # Wait for a core data id to be assigned to this voltha instance
171 @inlineCallbacks
172 def get_core_store_id_and_prefix(self):
173 core_store_id = yield self.worker.get_core_store_id()
174 returnValue((core_store_id, self.core_store_prefix))
175
176 def recv_peers_map(self):
177 return self.peers_mapping_queue.get()
178
179 def publish_peers_map_change(self, msg):
180 self.peers_mapping_queue.put(msg)
181
182 # Proxy methods for etcd with retry support
183
184 def kv_get(self, *args, **kw):
185 # Intercept 'index' argument
186 for name, value in kw.items():
187 if name == 'index':
188 kw.pop('index')
189 break
190 return self._retry('GET', *args, **kw)
191
192 def kv_put(self, *args, **kw):
193 return self._retry('PUT', *args, **kw)
194
195 def kv_delete(self, *args, **kw):
196 return self._retry('DELETE', *args, **kw)
197
198 # Methods exposing key membership information
199
200 @inlineCallbacks
201 def get_members(self):
202 """Return list of all members"""
203 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
204 returnValue([member['Key'][len(self.membership_prefix):]
205 for member in members])
206
207 # Private (internal) methods:
208
209 @inlineCallbacks
210 def _async_init(self):
211 yield self._create_session()
212 yield self._create_membership_record()
213 yield self._start_leader_tracking()
214 yield self.worker.start()
215
216 def _backoff(self, msg):
217 wait_time = self.RETRY_BACKOFF[min(self.retries,
218 len(self.RETRY_BACKOFF) - 1)]
219 self.retries += 1
220 log.error(msg, retry_in=wait_time)
221 return asleep(wait_time)
222
223 def _clear_backoff(self):
224 if self.retries:
225 log.info('reconnected-to-consul', after_retries=self.retries)
226 self.retries = 0
227
228 @inlineCallbacks
229 def _create_session(self):
230
231 @inlineCallbacks
232 def _create_session():
233 etcd = yield self.get_kv_client()
234 # Create etcd lease
235 self.lease = yield etcd.lease(self.session_time_to_live)
236 self.session_id = self.lease
237 log.info('created-etcd-lease', lease=self.session_id)
238 self._start_session_tracking()
239
240 yield self._retry(_create_session)
241
242 @inlineCallbacks
243 def _delete_session(self):
244 try:
245 yield self.lease.revoke()
246 except Exception as e:
247 log.exception('failed-to-delete-session',
248 session_id=self.session_id)
249
250 @inlineCallbacks
251 def _create_membership_record(self):
252 yield self._do_create_membership_record_with_retries()
253 reactor.callLater(0, self._maintain_membership_record)
254
255 @inlineCallbacks
256 def _maintain_membership_record(self):
257 try:
258 while 1:
259 valid_membership = yield self._assert_membership_record_valid()
260 if not valid_membership:
261 log.info('recreating-membership-before',
262 session=self.session_id)
263 yield self._do_create_membership_record_with_retries()
264 log.info('recreating-membership-after',
265 session=self.session_id)
266 else:
267 log.debug('valid-membership', session=self.session_id)
268 # Async sleep before checking the membership record again
269 yield asleep(self.membership_maintenance_loop_delay)
270
271 except Exception, e:
272 log.exception('unexpected-error-leader-trackin', e=e)
273 finally:
274 # except in shutdown, the loop must continue (after a short delay)
275 if not self.shutting_down:
276 reactor.callLater(self.membership_watch_relatch_delay,
277 self._maintain_membership_record)
278
279 def _create_membership_record_data(self):
280 member_record = dict()
281 member_record['status'] = 'alive'
282 member_record['host_address'] = self.external_host_address
283 return member_record
284
285 @inlineCallbacks
286 def _assert_membership_record_valid(self):
287 try:
288 log.info('membership-record-before')
289 is_timeout, (_, record) = yield \
290 self.consul_get_with_timeout(
291 key=self.membership_record_key,
292 index=0,
293 timeout=5)
294 if is_timeout:
295 returnValue(False)
296
297 log.info('membership-record-after', record=record)
298 if record is None or \
299 'Session' not in record:
300 log.info('membership-record-change-detected',
301 old_session=self.session_id,
302 record=record)
303 returnValue(False)
304 else:
305 returnValue(True)
306 except Exception as e:
307 log.exception('membership-validation-exception', e=e)
308 returnValue(False)
309
310 @inlineCallbacks
311 def _do_create_membership_record_with_retries(self):
312 while 1:
313 log.info('recreating-membership', session=self.session_id)
314 result = yield self._retry(
315 'PUT',
316 self.membership_record_key,
317 dumps(self._create_membership_record_data()),
318 acquire=self.session_id)
319 if result:
320 log.info('new-membership-record-created',
321 session=self.session_id)
322 break
323 else:
324 log.warn('cannot-create-membership-record')
325 yield self._backoff('stale-membership-record')
326
327 def _start_session_tracking(self):
328 reactor.callLater(0, self._session_tracking_loop)
329
330 @inlineCallbacks
331 def _session_tracking_loop(self):
332
333 @inlineCallbacks
334 def _redo_session():
335 log.info('_redo_session-before')
336 yield self._delete_session()
337
338 # Create a new etcd connection/session with a new lease
339 try:
340 self.etcd = Client(reactor, self.etcd_url)
341 self.lease = yield self.etcd.lease(self.session_time_to_live)
342 self.session_id = self.lease
343 log.info('new-etcd-session', session=self.session_id)
344
345 except Exception as e:
346 log.exception('could-not-create-an-etcd-lease', e=e)
347
348 @inlineCallbacks
349 def _renew_session(m_callback):
350 try:
351 time_left = yield self.lease.remaining()
352 log.info('_renew_session', time_left=time_left)
353 result = yield self.lease.refresh()
354 log.info('just-renewed-session', result=result)
355 if not m_callback.called:
356 # Triggering callback will cancel the timeout timer
357 log.info('trigger-callback-to-cancel-timeout-timer')
358 m_callback.callback(result)
359 else:
360 # Timeout event has already been called. Just ignore
361 # this event
362 log.info('renew-called-after-timeout, etcd ref changed?')
363 except Exception, e:
364 # Let the invoking method receive a timeout
365 log.exception('could-not-renew-session', e=e)
366
367 try:
368 while 1:
369 log.debug('session-tracking-start')
370 rcvd = DeferredWithTimeout(
371 timeout=self.session_renewal_timeout)
372 _renew_session(rcvd)
373 try:
374 _ = yield rcvd
375 except TimeOutError as e:
376 log.info('session-renew-timeout', e=e)
377 # Redo the session
378 yield _redo_session()
379 except Exception as e:
380 log.exception('session-renew-exception', e=e)
381 else:
382 log.debug('successfully-renewed-session')
383
384 # Async sleep before the next session tracking
385 yield asleep(self.session_renewal_loop_delay)
386
387 except Exception as e:
388 log.exception('renew-exception', e=e)
389 finally:
390 reactor.callLater(self.session_renewal_loop_delay,
391 self._session_tracking_loop)
392
393 def _start_leader_tracking(self):
394 reactor.callLater(0, self._leadership_tracking_loop)
395
396 @inlineCallbacks
397 def _leadership_tracking_loop(self):
Richard Jankowski8b277c22017-12-19 09:49:27 -0500398 try:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400399 # Try to seize leadership via test-and-set operation.
400 # Success means the leader key was previously absent
401 # and was just re-created by this instance.
Richard Jankowski8b277c22017-12-19 09:49:27 -0500402
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400403 leader_prefix = bytes(self.leader_prefix)
404 txn = Transaction(
405 compare=[
406 CompVersion(leader_prefix, '==', 0)
407 ],
408 success=[
409 OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
410 OpGet(leader_prefix)
411 ],
412 failure=[]
413 )
414 newly_asserted = False
415 try:
416 result = yield self.etcd.submit(txn)
417 except Failed as failed:
418 # Leader key already present
419 pass
Richard Jankowski8b277c22017-12-19 09:49:27 -0500420 else:
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400421 newly_asserted = True
422 log.info('leader-key-absent')
Richard Jankowski8b277c22017-12-19 09:49:27 -0500423
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400424 # Confirm that the assertion succeeded by reading back
425 # the value of the leader key.
426 leader = None
427 result = yield self.etcd.get(leader_prefix)
428 if result.kvs:
429 kv = result.kvs[0]
430 leader = kv.value
431 log.info('get-leader-key', leader=leader, instance=self.instance_id)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500432
Richard Jankowski57ebfdc2018-07-12 09:59:10 -0400433 if leader is None:
434 log.error('get-leader-failed')
435 elif leader == self.instance_id:
436 if newly_asserted:
437 log.info('leadership-seized')
438 yield self._assert_leadership()
439 else:
440 log.info('already-leader')
441 else:
442 log.info('leader-is-another', leader=leader)
443 yield self._assert_nonleadership(leader)
444
445 except Exception, e:
446 log.exception('unexpected-error-leader-tracking', e=e)
447
448 finally:
449 # Except in shutdown, the loop must continue (after a short delay)
450 if not self.shutting_down:
451 reactor.callLater(self.tracking_loop_delay,
452 self._leadership_tracking_loop)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500453
454 @inlineCallbacks
455 def _assert_leadership(self):
456 """(Re-)assert leadership"""
457 if not self.i_am_leader:
458 self.i_am_leader = True
459 self._set_leader_id(self.instance_id)
460 yield self._just_gained_leadership()
461
462 @inlineCallbacks
463 def _assert_nonleadership(self, leader_id):
464 """(Re-)assert non-leader role"""
465
466 # update leader_id anyway
467 self._set_leader_id(leader_id)
468
469 if self.i_am_leader:
470 self.i_am_leader = False
471 yield self._just_lost_leadership()
472
473 def _set_leader_id(self, leader_id):
474 self.leader_id = leader_id
475 deferreds, self.wait_for_leader_deferreds = \
476 self.wait_for_leader_deferreds, []
477 for d in deferreds:
478 d.callback(leader_id)
479
480 def _just_gained_leadership(self):
481 log.info('became-leader')
482 self.leader = Leader(self)
483 return self.leader.start()
484
485 def _just_lost_leadership(self):
486 log.info('lost-leadership')
487 return self._halt_leader()
488
489 def _halt_leader(self):
490 if self.leader:
491 d = self.leader.stop()
492 self.leader = None
493 return d
494
495 def get_kv_client(self):
496 return self.etcd
497
498 @inlineCallbacks
499 def _retry(self, operation, *args, **kw):
500 prefix = False
Richard Jankowski4ea26632018-05-14 17:45:38 -0400501 keys_only = False
Richard Jankowski8b277c22017-12-19 09:49:27 -0500502 for name, value in kw.items():
503 if name == 'acquire':
504 lease = value
505 kw['lease'] = lease
506 kw.pop('acquire')
507 elif name == 'keys':
Richard Jankowski4ea26632018-05-14 17:45:38 -0400508 keys_only = True
509 prefix = True
510 keyset = KeySet(bytes(args[0]), prefix=True)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500511 kw['keys_only'] = True
512 kw.pop('keys')
513 elif name=='recurse':
Richard Jankowski8b277c22017-12-19 09:49:27 -0500514 prefix = True
515 keyset = KeySet(bytes(args[0]), prefix=True)
516 kw.pop('recurse')
517 log.info('start-op', operation=operation, args=args, kw=kw)
518
519 while 1:
520 try:
521 etcd = yield self.get_kv_client()
522 if operation == 'GET':
523 key = bytes(args[0])
524 # If multiple keys requested, return a list
525 # else return a single record
526 if not prefix:
527 index = 0
528 record = dict()
529 res = yield etcd.get(key, **kw)
530 if res.kvs:
531 if len(res.kvs) == 1:
532 kv = res.kvs[0]
533 index = kv.mod_revision
534 record['Key'] = kv.key
535 record['Value'] = kv.value
536 record['ModifyIndex'] = index
537 record['Session'] = self.lease.lease_id if self.lease else ''
538 result = (index, record)
539 else:
540 # Get values for all keys that match the prefix
Richard Jankowski4ea26632018-05-14 17:45:38 -0400541 # If keys_only requested, get only the keys
Richard Jankowski8b277c22017-12-19 09:49:27 -0500542 index = 0
543 records = []
Richard Jankowski4ea26632018-05-14 17:45:38 -0400544 keys = []
Richard Jankowski8b277c22017-12-19 09:49:27 -0500545 res = yield etcd.get(keyset, **kw)
546 if args[0] == 'service/voltha/assignments/':
547 log.info('assignments', result=res)
548 if res.kvs and len(res.kvs) > 0:
549 for kv in res.kvs:
550 # Which index should be returned? The max over all keys?
551 if kv.mod_revision > index:
552 index = kv.mod_revision
Richard Jankowski4ea26632018-05-14 17:45:38 -0400553 if keys_only:
554 keys.append(kv.key)
555 else:
556 rec = dict()
557 rec['Key'] = kv.key
558 rec['Value'] = kv.value
559 rec['ModifyIndex'] = kv.mod_revision
560 rec['Session'] = self.lease.lease_id if self.lease else ''
561 records.append(rec)
562 result = (index, keys) if keys_only else (index, records)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500563 elif operation == 'PUT':
564 key = bytes(args[0])
565 result = yield etcd.set(key, args[1], **kw)
566 elif operation == 'DELETE':
567 key = bytes(args[0])
Richard Jankowski4ea26632018-05-14 17:45:38 -0400568 result = yield etcd.delete(keyset)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500569 else:
570 # Default case - consider operation as a function call
571 result = yield operation(*args, **kw)
572 self._clear_backoff()
573 break
574 except Exception, e:
575 if not self.shutting_down:
576 log.exception(e)
577 yield self._backoff('unknown-error')
578
Richard Jankowski9fb5b082018-05-16 17:58:00 -0400579 log.info('end-op', operation=operation, args=args, kw=kw)
Richard Jankowski8b277c22017-12-19 09:49:27 -0500580 returnValue(result)
581
582 @inlineCallbacks
583 def consul_get_with_timeout(self, key, timeout, **kw):
584 """
585 Query etcd with a timeout
586 :param key: Key to query
587 :param timeout: timeout value
588 :param kw: additional key-value params
589 :return: (is_timeout, (index, result)).
590
591 The Consul version of this method performed a 'wait-type' get operation
592 that returned a result when the key's value had a ModifyIndex greater
593 than the 'index' argument. Not sure etcd supports this functionality.
594 """
595
596 # Intercept 'index' argument
597 for name, value in kw.items():
598 if name == 'index':
599 mod_revision = value
600 log.info('consul_get_with_timeout', index=mod_revision)
601 kw.pop('index')
602 break
603
604 @inlineCallbacks
605 def _get(key, m_callback):
606 try:
607 (index, result) = yield self._retry('GET', key, **kw)
608 if index > mod_revision and not m_callback.called:
609 log.debug('got-result-cancelling-timer')
610 m_callback.callback((index, result))
611 except Exception as e:
612 log.exception('got-exception', e=e)
613
614 try:
615 rcvd = DeferredWithTimeout(timeout=timeout)
616 _get(key, rcvd)
617 try:
618 result = yield rcvd
619 log.debug('result-received', result=result)
620 returnValue((False, result))
621 except TimeOutError as e:
622 log.debug('timeout-or-no-data-change', key=key)
623 except Exception as e:
624 log.exception('exception', e=e)
625 except Exception as e:
626 log.exception('exception', e=e)
627
628 returnValue((True, (None, None)))