blob: 9bad7d0a5909af3981cec9b685bfd4b6acdbd5de [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070025from twisted.internet.task import LoopingCall
Zsolt Harasztidafefe12016-11-14 21:29:58 -080026from zope.interface import implementer
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070027
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040030from common.utils.message_queue import MessageQueue
Zsolt Harasztidafefe12016-11-14 21:29:58 -080031from voltha.registry import IComponent
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070032from worker import Worker
khenaidooa8588f22017-06-16 12:13:34 -040033from simplejson import dumps, loads
khenaidoob1602a32017-07-27 16:59:52 -040034from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070035
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070036log = get_logger()
37
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070038
Zsolt Haraszti1420def2016-09-18 00:07:31 -070039class StaleMembershipEntryException(Exception):
40 pass
41
42
Zsolt Harasztidafefe12016-11-14 21:29:58 -080043@implementer(IComponent)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070044class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070045 """
46 An app shall instantiate only one Coordinator (singleton).
47 A single instance of this object shall take care of all external
48 with consul, and via consul, all coordination activities with its
49 clustered peers. Roles include:
50 - registering an ephemeral membership entry (k/v record) in consul
51 - participating in a symmetric leader election, and potentially assuming
52 the leader's role. What leadership entails is not a concern for the
53 coordination, it simply instantiates (and shuts down) a leader class
54 when it gains (or looses) leadership.
55 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070056
57 CONNECT_RETRY_INTERVAL_SEC = 1
58 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
59
Zsolt Harasztia3410312016-09-18 23:29:04 -070060 # Public methods:
61
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070062 def __init__(self,
63 internal_host_address,
64 external_host_address,
65 instance_id,
66 rest_port,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040067 config,
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070068 consul='localhost:8500'):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070069
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 log.info('initializing-coordinator')
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040071 self.config = config['coordinator']
72 self.worker_config = config['worker']
73 self.leader_config = config['leader']
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040074 self.membership_watch_relatch_delay = config.get(
khenaidoo686f7bd2017-08-11 11:41:33 -040075 'membership_watch_relatch_delay', 0.1)
76 self.tracking_loop_delay = self.config.get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040077 'tracking_loop_delay', 1)
khenaidoo686f7bd2017-08-11 11:41:33 -040078 self.session_renewal_timeout = self.config.get(
79 'session_renewal_timeout', 5)
80 self.session_renewal_loop_delay = self.config.get(
khenaidoob1602a32017-07-27 16:59:52 -040081 'session_renewal_loop_delay', 3)
khenaidoo686f7bd2017-08-11 11:41:33 -040082 self.membership_maintenance_loop_delay = self.config.get(
83 'membership_maintenance_loop_delay', 5)
84 self.session_time_to_live = self.config.get(
85 'session_time_to_live', 10)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040086 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080087 self.leader_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040088 self.config['leader_key'], 'leader')))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080089 self.membership_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040090 self.config['membership_key'], 'members'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080091 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040092 self.config['assignment_key'], 'assignments'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080093 self.workload_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040094 self.config['workload_key'], 'work'), ''))
khenaidoo032d3302017-06-09 14:50:04 -040095 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040096 self.config['core_store_key'], 'data/core')))
97 self.core_store_assignment_key = self.core_store_prefix + \
98 '/assignment'
99 self.core_storage_suffix = 'core_store'
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400100
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700101 self.retries = 0
102 self.instance_id = instance_id
103 self.internal_host_address = internal_host_address
104 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -0700105 self.rest_port = rest_port
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400106 self.membership_record_key = self.membership_prefix + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700107
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700108 self.session_id = None
109 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700110 self.leader_id = None # will be the instance id of the current leader
111 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700112 self.leader = None
khenaidooe154d592017-08-03 19:08:27 -0400113 self.membership_callback = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700114
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700115 self.worker = Worker(self.instance_id, self)
116
khenaidoob1602a32017-07-27 16:59:52 -0400117 self.host = consul.split(':')[0].strip()
118 self.port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -0700119
120 # TODO need to handle reconnect events properly
khenaidoob1602a32017-07-27 16:59:52 -0400121 self.consul = Consul(host=self.host, port=self.port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700122
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700123 self.wait_for_leader_deferreds = []
124
khenaidoo08d48d22017-06-29 19:42:49 -0400125 self.peers_mapping_queue = MessageQueue()
126
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700127 def start(self):
128 log.debug('starting')
129 reactor.callLater(0, self._async_init)
130 log.info('started')
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800131 return self
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700132
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700133 @inlineCallbacks
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700134 def stop(self):
135 log.debug('stopping')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700136 self.shutting_down = True
Zsolt Harasztia3410312016-09-18 23:29:04 -0700137 yield self._delete_session() # this will delete the leader lock too
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700138 yield self.worker.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700139 if self.leader is not None:
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800140 yield self.leader.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700141 self.leader = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700142 log.info('stopped')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700143
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700144 def wait_for_a_leader(self):
145 """
146 Async wait till a leader is detected/elected. The deferred will be
147 called with the leader's instance_id
148 :return: Deferred.
149 """
150 d = Deferred()
151 if self.leader_id is not None:
152 d.callback(self.leader_id)
153 return d
154 else:
155 self.wait_for_leader_deferreds.append(d)
156 return d
157
khenaidoo032d3302017-06-09 14:50:04 -0400158 # Wait for a core data id to be assigned to this voltha instance
159 @inlineCallbacks
160 def get_core_store_id_and_prefix(self):
161 core_store_id = yield self.worker.get_core_store_id()
162 returnValue((core_store_id, self.core_store_prefix))
163
khenaidoo08d48d22017-06-29 19:42:49 -0400164 def recv_peers_map(self):
165 return self.peers_mapping_queue.get()
166
167 def publish_peers_map_change(self, msg):
168 self.peers_mapping_queue.put(msg)
169
Zsolt Harasztia3410312016-09-18 23:29:04 -0700170 # Proxy methods for consul with retry support
171
172 def kv_get(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400173 return self._retry('GET', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700174
175 def kv_put(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400176 return self._retry('PUT', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700177
178 def kv_delete(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400179 return self._retry('DELETE', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700180
Zsolt Haraszti00d9a842016-11-23 11:18:23 -0800181 # Methods exposing key membership information
182
183 @inlineCallbacks
184 def get_members(self):
185 """Return list of all members"""
186 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
187 returnValue([member['Key'][len(self.membership_prefix):]
188 for member in members])
189
Zsolt Harasztia3410312016-09-18 23:29:04 -0700190 # Private (internal) methods:
191
192 @inlineCallbacks
193 def _async_init(self):
194 yield self._create_session()
195 yield self._create_membership_record()
196 yield self._start_leader_tracking()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700197 yield self.worker.start()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700198
199 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700200 wait_time = self.RETRY_BACKOFF[min(self.retries,
201 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700202 self.retries += 1
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700203 log.error(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700204 return asleep(wait_time)
205
Zsolt Harasztia3410312016-09-18 23:29:04 -0700206 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700207 if self.retries:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700208 log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700209 self.retries = 0
210
211 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700212 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700213
214 @inlineCallbacks
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700215 def _create_session():
khenaidoob1602a32017-07-27 16:59:52 -0400216 consul = yield self.get_consul()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700217 # create consul session
khenaidoob1602a32017-07-27 16:59:52 -0400218 self.session_id = yield consul.session.create(
khenaidoo686f7bd2017-08-11 11:41:33 -0400219 behavior='release', ttl=self.session_time_to_live,
220 lock_delay=1)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700221 log.info('created-consul-session', session_id=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400222 self._start_session_tracking()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700223
224 yield self._retry(_create_session)
225
226 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700227 def _delete_session(self):
khenaidooe154d592017-08-03 19:08:27 -0400228 try:
229 yield self.consul.session.destroy(self.session_id)
230 except Exception as e:
231 log.exception('failed-to-delete-session',
khenaidoo686f7bd2017-08-11 11:41:33 -0400232 session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700233
234 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700235 def _create_membership_record(self):
khenaidoob1602a32017-07-27 16:59:52 -0400236 yield self._do_create_membership_record_with_retries()
khenaidoo686f7bd2017-08-11 11:41:33 -0400237 reactor.callLater(0, self._maintain_membership_record)
238
239 @inlineCallbacks
240 def _maintain_membership_record(self):
241 try:
242 while 1:
243 valid_membership = yield self._assert_membership_record_valid()
244 if not valid_membership:
245 log.info('recreating-membership-before',
khenaidood6e0e802017-08-29 19:55:44 -0400246 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400247 yield self._do_create_membership_record_with_retries()
248 log.info('recreating-membership-after',
khenaidood6e0e802017-08-29 19:55:44 -0400249 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400250 else:
251 log.debug('valid-membership', session=self.session_id)
252 # Async sleep before checking the membership record again
253 yield asleep(self.membership_maintenance_loop_delay)
254
255 except Exception, e:
256 log.exception('unexpected-error-leader-trackin', e=e)
257 finally:
258 # except in shutdown, the loop must continue (after a short delay)
259 if not self.shutting_down:
260 reactor.callLater(self.membership_watch_relatch_delay,
261 self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700262
khenaidooa8588f22017-06-16 12:13:34 -0400263 def _create_membership_record_data(self):
264 member_record = dict()
265 member_record['status'] = 'alive'
266 member_record['host_address'] = self.external_host_address
267 return member_record
268
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700269 @inlineCallbacks
khenaidooe154d592017-08-03 19:08:27 -0400270 def _assert_membership_record_valid(self):
271 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400272 log.info('membership-record-before')
khenaidoo890b0902017-08-30 10:11:32 -0400273 is_timeout, (_, record) = yield \
274 self.consul_get_with_timeout(
275 key=self.membership_record_key,
276 index=0,
277 timeout=5)
278 if is_timeout:
279 returnValue(False)
280
khenaidoo686f7bd2017-08-11 11:41:33 -0400281 log.info('membership-record-after', record=record)
khenaidooe154d592017-08-03 19:08:27 -0400282 if record is None or \
khenaidoo686f7bd2017-08-11 11:41:33 -0400283 'Session' not in record or \
284 record['Session'] != self.session_id:
285 log.info('membership-record-change-detected',
286 old_session=self.session_id,
287 record=record)
khenaidooe154d592017-08-03 19:08:27 -0400288 returnValue(False)
289 else:
290 returnValue(True)
291 except Exception as e:
292 log.exception('membership-validation-exception', e=e)
293 returnValue(False)
294
khenaidooe154d592017-08-03 19:08:27 -0400295 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400296 def _do_create_membership_record_with_retries(self):
297 while 1:
khenaidoo686f7bd2017-08-11 11:41:33 -0400298 log.info('recreating-membership', session=self.session_id)
299 result = yield self._retry(
300 'PUT',
301 self.membership_record_key,
302 dumps(self._create_membership_record_data()),
303 acquire=self.session_id)
304 if result:
305 log.info('new-membership-record-created',
306 session=self.session_id)
khenaidooe154d592017-08-03 19:08:27 -0400307 break
khenaidoo686f7bd2017-08-11 11:41:33 -0400308 else:
309 log.warn('cannot-create-membership-record')
310 yield self._backoff('stale-membership-record')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700311
khenaidoob1602a32017-07-27 16:59:52 -0400312 def _start_session_tracking(self):
313 reactor.callLater(0, self._session_tracking_loop)
314
315 @inlineCallbacks
316 def _session_tracking_loop(self):
317
318 @inlineCallbacks
319 def _redo_session():
320 log.info('_redo_session-before')
khenaidooe154d592017-08-03 19:08:27 -0400321 yield self._delete_session()
khenaidoob1602a32017-07-27 16:59:52 -0400322
khenaidoo686f7bd2017-08-11 11:41:33 -0400323 # Create a new consul connection/session with a TTL of 25 secs
khenaidoob1602a32017-07-27 16:59:52 -0400324 try:
325 self.consul = Consul(host=self.host, port=self.port)
326 self.session_id = yield self.consul.session.create(
327 behavior='release',
khenaidoo686f7bd2017-08-11 11:41:33 -0400328 ttl=self.session_time_to_live,
khenaidoob1602a32017-07-27 16:59:52 -0400329 lock_delay=1)
khenaidoo686f7bd2017-08-11 11:41:33 -0400330 log.info('new-consul-session', session=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400331
332 except Exception as e:
333 log.exception('could-not-create-a-consul-session', e=e)
334
335 @inlineCallbacks
336 def _renew_session(m_callback):
337 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400338 log.debug('_renew_session-before')
339 consul_ref = self.consul
340 result = yield consul_ref.session.renew(
khenaidoob1602a32017-07-27 16:59:52 -0400341 session_id=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400342 log.info('just-renewed-session', result=result)
khenaidoob1602a32017-07-27 16:59:52 -0400343 if not m_callback.called:
344 # Triggering callback will cancel the timeout timer
khenaidoo686f7bd2017-08-11 11:41:33 -0400345 log.info('trigger-callback-to-cancel-timout-timer')
khenaidoob1602a32017-07-27 16:59:52 -0400346 m_callback.callback(result)
khenaidoo686f7bd2017-08-11 11:41:33 -0400347 else:
348 # Timeout event has already been called. Just ignore
349 # this event
350 log.info('renew-called-after-timout',
351 new_consul_ref=self.consul,
352 old_consul_ref=consul_ref)
khenaidoob1602a32017-07-27 16:59:52 -0400353 except Exception, e:
354 # Let the invoking method receive a timeout
355 log.exception('could-not-renew-session', e=e)
356
357 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400358 while 1:
359 log.debug('session-tracking-start')
360 rcvd = DeferredWithTimeout(
361 timeout=self.session_renewal_timeout)
362 _renew_session(rcvd)
363 try:
364 _ = yield rcvd
365 except TimeOutError as e:
366 log.info('session-renew-timeout', e=e)
367 # Redo the session
368 yield _redo_session()
369 except Exception as e:
370 log.exception('session-renew-exception', e=e)
371 else:
372 log.debug('successfully-renewed-session')
373
374 # Async sleep before the next session tracking
375 yield asleep(self.session_renewal_loop_delay)
376
khenaidoob1602a32017-07-27 16:59:52 -0400377 except Exception as e:
khenaidoo686f7bd2017-08-11 11:41:33 -0400378 log.exception('renew-exception', e=e)
khenaidoob1602a32017-07-27 16:59:52 -0400379 finally:
380 reactor.callLater(self.session_renewal_loop_delay,
381 self._session_tracking_loop)
382
Zsolt Harasztia3410312016-09-18 23:29:04 -0700383 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700384 reactor.callLater(0, self._leadership_tracking_loop)
385
386 @inlineCallbacks
387 def _leadership_tracking_loop(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700388 try:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700389 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700390 # False indicates there is already a leader. It's instance id
391 # is then the value under the leader key service/voltha/leader.
392
393 # attempt acquire leader lock
khenaidoo686f7bd2017-08-11 11:41:33 -0400394 log.info('leadership-attempt-before')
khenaidoob1602a32017-07-27 16:59:52 -0400395 result = yield self._retry('PUT',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400396 self.leader_prefix,
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700397 self.instance_id,
398 acquire=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400399 log.info('leadership-attempt-after')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700400
401 # read it back before being too happy; seeing our session id is a
402 # proof and now we have the change id that we can use to reliably
403 # track any changes. In an unlikely scenario where the leadership
404 # key gets wiped out administratively since the previous line,
405 # the returned record can be None. Handle it.
khenaidoob1602a32017-07-27 16:59:52 -0400406 (index, record) = yield self._retry('GET',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400407 self.leader_prefix)
khenaidoo686f7bd2017-08-11 11:41:33 -0400408 log.info('leader-prefix',
409 i_am_leader=result, index=index, record=record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700410
411 if record is not None:
412 if result is True:
413 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700414 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700415 else:
416 pass # confusion; need to retry leadership
417 else:
418 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700419 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700420
421 # if record was none, we shall try leadership again
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700422 last = record
423 while last is not None:
424 # this shall return only when update is made to leader key
khenaidood6e0e802017-08-29 19:55:44 -0400425 # or expires after 5 seconds wait
426 is_timeout, (tmp_index, updated) = yield \
427 self.consul_get_with_timeout(
428 key=self.leader_prefix,
429 index=index,
430 timeout=5)
431 # Timeout means either there is a lost connectivity to
432 # consul or there are no change to that key. Do nothing.
433 if is_timeout:
434 continue
435
436 # After timeout event the index returned from
437 # consul_get_with_timeout is None. If we are here it's not a
438 # timeout, therefore the index is a valid one.
439 index=tmp_index
440
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700441 if updated is None or updated != last:
khenaidoo686f7bd2017-08-11 11:41:33 -0400442 log.info('leader-key-change',
443 index=index, updated=updated, last=last)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700444 # leadership has changed or vacated (or forcefully
445 # removed), apply now
khenaidoob1602a32017-07-27 16:59:52 -0400446 # If I was previoulsy the leader then assert a non
447 # leadership role before going for election
448 if self.i_am_leader:
khenaidoo686f7bd2017-08-11 11:41:33 -0400449 log.info('leaving-leaderdhip',
450 leader=self.instance_id)
khenaidoob1602a32017-07-27 16:59:52 -0400451 yield self._assert_nonleadership(self.instance_id)
452
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700453 break
454 last = updated
455
456 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700457 log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700458
459 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700460 # except in shutdown, the loop must continue (after a short delay)
461 if not self.shutting_down:
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400462 reactor.callLater(self.tracking_loop_delay,
463 self._leadership_tracking_loop)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700464
Zsolt Harasztia3410312016-09-18 23:29:04 -0700465 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700466 def _assert_leadership(self):
467 """(Re-)assert leadership"""
468 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700469 self.i_am_leader = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700470 self._set_leader_id(self.instance_id)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700471 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700472
Zsolt Harasztia3410312016-09-18 23:29:04 -0700473 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700474 def _assert_nonleadership(self, leader_id):
475 """(Re-)assert non-leader role"""
476
477 # update leader_id anyway
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700478 self._set_leader_id(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700479
480 if self.i_am_leader:
481 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700482 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700483
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700484 def _set_leader_id(self, leader_id):
485 self.leader_id = leader_id
486 deferreds, self.wait_for_leader_deferreds = \
487 self.wait_for_leader_deferreds, []
488 for d in deferreds:
489 d.callback(leader_id)
490
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700491 def _just_gained_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700492 log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700493 self.leader = Leader(self)
494 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700495
496 def _just_lost_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700497 log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700498 return self._halt_leader()
499
500 def _halt_leader(self):
khenaidoob1602a32017-07-27 16:59:52 -0400501 if self.leader:
502 d = self.leader.stop()
503 self.leader = None
Zsolt Harasztia3410312016-09-18 23:29:04 -0700504 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700505
khenaidoob1602a32017-07-27 16:59:52 -0400506 def get_consul(self):
507 return self.consul
508
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700509 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400510 def _retry(self, operation, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700511 while 1:
512 try:
khenaidoob1602a32017-07-27 16:59:52 -0400513 consul = yield self.get_consul()
khenaidoo686f7bd2017-08-11 11:41:33 -0400514 log.info('start', operation=operation, args=args)
khenaidoob1602a32017-07-27 16:59:52 -0400515 if operation == 'GET':
516 result = yield consul.kv.get(*args, **kw)
517 elif operation == 'PUT':
518 for name, value in kw.items():
519 if name == 'acquire':
520 if value != self.session_id:
521 log.info('updating-session-in-put-operation',
522 old_session=value,
523 new_session=self.session_id)
524 kw['acquire'] = self.session_id
525 break
526 result = yield consul.kv.put(*args, **kw)
527 elif operation == 'DELETE':
528 result = yield consul.kv.delete(*args, **kw)
529 else:
530 # Default case - consider operation as a function call
531 result = yield operation(*args, **kw)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700532 self._clear_backoff()
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700533 break
534 except ConsulException, e:
khenaidoo5431e4c2017-08-17 15:05:40 -0400535 log.exception('consul-not-up',
536 operation=operation,
537 args=args,
538 session=self.consul.Session,
539 e=e)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700540 yield self._backoff('consul-not-up')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700541 except ConnectionError, e:
khenaidoob1602a32017-07-27 16:59:52 -0400542 log.exception('cannot-connect-to-consul',
khenaidoo5431e4c2017-08-17 15:05:40 -0400543 operation=operation,
544 args=args,
545 session=self.consul.Session,
546 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700547 yield self._backoff('cannot-connect-to-consul')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700548 except StaleMembershipEntryException, e:
khenaidoob1602a32017-07-27 16:59:52 -0400549 log.exception('stale-membership-record-in-the-way',
khenaidoo5431e4c2017-08-17 15:05:40 -0400550 operation=operation,
551 args=args,
552 session=self.consul.Session,
553 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700554 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700555 except Exception, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700556 if not self.shutting_down:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700557 log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700558 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700559
khenaidoo686f7bd2017-08-11 11:41:33 -0400560 log.info('end', operation=operation, args=args)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700561 returnValue(result)
khenaidood6e0e802017-08-29 19:55:44 -0400562
563 @inlineCallbacks
564 def consul_get_with_timeout(self, key, timeout, **kw):
565 """
566 Query consul with a timeout
567 :param key: Key to query
568 :param timeout: timeout value
569 :param kw: additional key-value params
570 :return: (is_timeout, (index, result)).
571 """
572
573 @inlineCallbacks
574 def _get(key, m_callback):
575 try:
576 (index, result) = yield self._retry('GET', key, **kw)
577 if not m_callback.called:
578 log.debug('got-result-cancelling-timer')
579 m_callback.callback((index, result))
580 except Exception as e:
581 log.exception('got-exception', e=e)
582
583 try:
584 rcvd = DeferredWithTimeout(timeout=timeout)
585 _get(key, rcvd)
586 try:
587 result = yield rcvd
588 log.debug('result-received', result=result)
589 returnValue((False, result))
590 except TimeOutError as e:
591 log.debug('timeout-or-no-data-change', key=key)
592 except Exception as e:
593 log.exception('exception', e=e)
594 except Exception as e:
595 log.exception('exception', e=e)
596
597 returnValue((True, (None, None)))