blob: a82ebea73a2d74deb172fdc98cf9ae3efbe6c74c [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070025from twisted.internet.task import LoopingCall
Zsolt Harasztidafefe12016-11-14 21:29:58 -080026from zope.interface import implementer
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070027
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040030from common.utils.message_queue import MessageQueue
Zsolt Harasztidafefe12016-11-14 21:29:58 -080031from voltha.registry import IComponent
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070032from worker import Worker
khenaidooa8588f22017-06-16 12:13:34 -040033from simplejson import dumps, loads
khenaidoob1602a32017-07-27 16:59:52 -040034from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070035
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070036log = get_logger()
37
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070038
Zsolt Haraszti1420def2016-09-18 00:07:31 -070039class StaleMembershipEntryException(Exception):
40 pass
41
42
Zsolt Harasztidafefe12016-11-14 21:29:58 -080043@implementer(IComponent)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070044class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070045 """
46 An app shall instantiate only one Coordinator (singleton).
47 A single instance of this object shall take care of all external
48 with consul, and via consul, all coordination activities with its
49 clustered peers. Roles include:
50 - registering an ephemeral membership entry (k/v record) in consul
51 - participating in a symmetric leader election, and potentially assuming
52 the leader's role. What leadership entails is not a concern for the
53 coordination, it simply instantiates (and shuts down) a leader class
54 when it gains (or looses) leadership.
55 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070056
57 CONNECT_RETRY_INTERVAL_SEC = 1
58 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
59
Zsolt Harasztia3410312016-09-18 23:29:04 -070060 # Public methods:
61
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070062 def __init__(self,
63 internal_host_address,
64 external_host_address,
65 instance_id,
66 rest_port,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040067 config,
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070068 consul='localhost:8500'):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070069
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 log.info('initializing-coordinator')
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040071 self.config = config['coordinator']
72 self.worker_config = config['worker']
73 self.leader_config = config['leader']
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040074 self.membership_watch_relatch_delay = config.get(
khenaidoo686f7bd2017-08-11 11:41:33 -040075 'membership_watch_relatch_delay', 0.1)
76 self.tracking_loop_delay = self.config.get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040077 'tracking_loop_delay', 1)
khenaidoo686f7bd2017-08-11 11:41:33 -040078 self.session_renewal_timeout = self.config.get(
79 'session_renewal_timeout', 5)
80 self.session_renewal_loop_delay = self.config.get(
khenaidoob1602a32017-07-27 16:59:52 -040081 'session_renewal_loop_delay', 3)
khenaidoo686f7bd2017-08-11 11:41:33 -040082 self.membership_maintenance_loop_delay = self.config.get(
83 'membership_maintenance_loop_delay', 5)
84 self.session_time_to_live = self.config.get(
85 'session_time_to_live', 10)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040086 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080087 self.leader_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040088 self.config['leader_key'], 'leader')))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080089 self.membership_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040090 self.config['membership_key'], 'members'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080091 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040092 self.config['assignment_key'], 'assignments'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080093 self.workload_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040094 self.config['workload_key'], 'work'), ''))
khenaidoo032d3302017-06-09 14:50:04 -040095 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040096 self.config['core_store_key'], 'data/core')))
97 self.core_store_assignment_key = self.core_store_prefix + \
98 '/assignment'
99 self.core_storage_suffix = 'core_store'
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400100
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700101 self.retries = 0
102 self.instance_id = instance_id
103 self.internal_host_address = internal_host_address
104 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -0700105 self.rest_port = rest_port
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400106 self.membership_record_key = self.membership_prefix + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700107
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700108 self.session_id = None
109 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700110 self.leader_id = None # will be the instance id of the current leader
111 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700112 self.leader = None
khenaidooe154d592017-08-03 19:08:27 -0400113 self.membership_callback = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700114
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700115 self.worker = Worker(self.instance_id, self)
116
khenaidoob1602a32017-07-27 16:59:52 -0400117 self.host = consul.split(':')[0].strip()
118 self.port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -0700119
120 # TODO need to handle reconnect events properly
khenaidoob1602a32017-07-27 16:59:52 -0400121 self.consul = Consul(host=self.host, port=self.port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700122
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700123 self.wait_for_leader_deferreds = []
124
khenaidoo08d48d22017-06-29 19:42:49 -0400125 self.peers_mapping_queue = MessageQueue()
126
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700127 def start(self):
128 log.debug('starting')
129 reactor.callLater(0, self._async_init)
130 log.info('started')
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800131 return self
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700132
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700133 @inlineCallbacks
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700134 def stop(self):
135 log.debug('stopping')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700136 self.shutting_down = True
Zsolt Harasztia3410312016-09-18 23:29:04 -0700137 yield self._delete_session() # this will delete the leader lock too
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700138 yield self.worker.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700139 if self.leader is not None:
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800140 yield self.leader.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700141 self.leader = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700142 log.info('stopped')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700143
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700144 def wait_for_a_leader(self):
145 """
146 Async wait till a leader is detected/elected. The deferred will be
147 called with the leader's instance_id
148 :return: Deferred.
149 """
150 d = Deferred()
151 if self.leader_id is not None:
152 d.callback(self.leader_id)
153 return d
154 else:
155 self.wait_for_leader_deferreds.append(d)
156 return d
157
khenaidoo032d3302017-06-09 14:50:04 -0400158 # Wait for a core data id to be assigned to this voltha instance
159 @inlineCallbacks
160 def get_core_store_id_and_prefix(self):
161 core_store_id = yield self.worker.get_core_store_id()
162 returnValue((core_store_id, self.core_store_prefix))
163
khenaidoo08d48d22017-06-29 19:42:49 -0400164 def recv_peers_map(self):
165 return self.peers_mapping_queue.get()
166
167 def publish_peers_map_change(self, msg):
168 self.peers_mapping_queue.put(msg)
169
Zsolt Harasztia3410312016-09-18 23:29:04 -0700170 # Proxy methods for consul with retry support
171
172 def kv_get(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400173 return self._retry('GET', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700174
175 def kv_put(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400176 return self._retry('PUT', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700177
178 def kv_delete(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400179 return self._retry('DELETE', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700180
Zsolt Haraszti00d9a842016-11-23 11:18:23 -0800181 # Methods exposing key membership information
182
183 @inlineCallbacks
184 def get_members(self):
185 """Return list of all members"""
186 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
187 returnValue([member['Key'][len(self.membership_prefix):]
188 for member in members])
189
Zsolt Harasztia3410312016-09-18 23:29:04 -0700190 # Private (internal) methods:
191
192 @inlineCallbacks
193 def _async_init(self):
194 yield self._create_session()
195 yield self._create_membership_record()
196 yield self._start_leader_tracking()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700197 yield self.worker.start()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700198
199 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700200 wait_time = self.RETRY_BACKOFF[min(self.retries,
201 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700202 self.retries += 1
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700203 log.error(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700204 return asleep(wait_time)
205
Zsolt Harasztia3410312016-09-18 23:29:04 -0700206 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700207 if self.retries:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700208 log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700209 self.retries = 0
210
211 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700212 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700213
214 @inlineCallbacks
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700215 def _create_session():
khenaidoob1602a32017-07-27 16:59:52 -0400216 consul = yield self.get_consul()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700217 # create consul session
khenaidoob1602a32017-07-27 16:59:52 -0400218 self.session_id = yield consul.session.create(
khenaidoo686f7bd2017-08-11 11:41:33 -0400219 behavior='release', ttl=self.session_time_to_live,
220 lock_delay=1)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700221 log.info('created-consul-session', session_id=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400222 self._start_session_tracking()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700223
224 yield self._retry(_create_session)
225
226 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700227 def _delete_session(self):
khenaidooe154d592017-08-03 19:08:27 -0400228 try:
229 yield self.consul.session.destroy(self.session_id)
230 except Exception as e:
231 log.exception('failed-to-delete-session',
khenaidoo686f7bd2017-08-11 11:41:33 -0400232 session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700233
234 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700235 def _create_membership_record(self):
khenaidoob1602a32017-07-27 16:59:52 -0400236 yield self._do_create_membership_record_with_retries()
khenaidoo686f7bd2017-08-11 11:41:33 -0400237 reactor.callLater(0, self._maintain_membership_record)
238
239 @inlineCallbacks
240 def _maintain_membership_record(self):
241 try:
242 while 1:
243 valid_membership = yield self._assert_membership_record_valid()
244 if not valid_membership:
245 log.info('recreating-membership-before',
khenaidood6e0e802017-08-29 19:55:44 -0400246 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400247 yield self._do_create_membership_record_with_retries()
248 log.info('recreating-membership-after',
khenaidood6e0e802017-08-29 19:55:44 -0400249 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400250 else:
251 log.debug('valid-membership', session=self.session_id)
252 # Async sleep before checking the membership record again
253 yield asleep(self.membership_maintenance_loop_delay)
254
255 except Exception, e:
256 log.exception('unexpected-error-leader-trackin', e=e)
257 finally:
258 # except in shutdown, the loop must continue (after a short delay)
259 if not self.shutting_down:
260 reactor.callLater(self.membership_watch_relatch_delay,
261 self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700262
khenaidooa8588f22017-06-16 12:13:34 -0400263 def _create_membership_record_data(self):
264 member_record = dict()
265 member_record['status'] = 'alive'
266 member_record['host_address'] = self.external_host_address
267 return member_record
268
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700269 @inlineCallbacks
khenaidooe154d592017-08-03 19:08:27 -0400270 def _assert_membership_record_valid(self):
271 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400272 log.info('membership-record-before')
273 (_, record) = yield self._retry('GET',
khenaidoo5431e4c2017-08-17 15:05:40 -0400274 self.membership_record_key,
275 wait='5s',
276 index=0
277 )
khenaidoo686f7bd2017-08-11 11:41:33 -0400278 log.info('membership-record-after', record=record)
khenaidooe154d592017-08-03 19:08:27 -0400279 if record is None or \
khenaidoo686f7bd2017-08-11 11:41:33 -0400280 'Session' not in record or \
281 record['Session'] != self.session_id:
282 log.info('membership-record-change-detected',
283 old_session=self.session_id,
284 record=record)
khenaidooe154d592017-08-03 19:08:27 -0400285 returnValue(False)
286 else:
287 returnValue(True)
288 except Exception as e:
289 log.exception('membership-validation-exception', e=e)
290 returnValue(False)
291
khenaidooe154d592017-08-03 19:08:27 -0400292 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400293 def _do_create_membership_record_with_retries(self):
294 while 1:
khenaidoo686f7bd2017-08-11 11:41:33 -0400295 log.info('recreating-membership', session=self.session_id)
296 result = yield self._retry(
297 'PUT',
298 self.membership_record_key,
299 dumps(self._create_membership_record_data()),
300 acquire=self.session_id)
301 if result:
302 log.info('new-membership-record-created',
303 session=self.session_id)
khenaidooe154d592017-08-03 19:08:27 -0400304 break
khenaidoo686f7bd2017-08-11 11:41:33 -0400305 else:
306 log.warn('cannot-create-membership-record')
307 yield self._backoff('stale-membership-record')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700308
khenaidoob1602a32017-07-27 16:59:52 -0400309 def _start_session_tracking(self):
310 reactor.callLater(0, self._session_tracking_loop)
311
312 @inlineCallbacks
313 def _session_tracking_loop(self):
314
315 @inlineCallbacks
316 def _redo_session():
317 log.info('_redo_session-before')
khenaidooe154d592017-08-03 19:08:27 -0400318 yield self._delete_session()
khenaidoob1602a32017-07-27 16:59:52 -0400319
khenaidoo686f7bd2017-08-11 11:41:33 -0400320 # Create a new consul connection/session with a TTL of 25 secs
khenaidoob1602a32017-07-27 16:59:52 -0400321 try:
322 self.consul = Consul(host=self.host, port=self.port)
323 self.session_id = yield self.consul.session.create(
324 behavior='release',
khenaidoo686f7bd2017-08-11 11:41:33 -0400325 ttl=self.session_time_to_live,
khenaidoob1602a32017-07-27 16:59:52 -0400326 lock_delay=1)
khenaidoo686f7bd2017-08-11 11:41:33 -0400327 log.info('new-consul-session', session=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400328
329 except Exception as e:
330 log.exception('could-not-create-a-consul-session', e=e)
331
332 @inlineCallbacks
333 def _renew_session(m_callback):
334 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400335 log.debug('_renew_session-before')
336 consul_ref = self.consul
337 result = yield consul_ref.session.renew(
khenaidoob1602a32017-07-27 16:59:52 -0400338 session_id=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400339 log.info('just-renewed-session', result=result)
khenaidoob1602a32017-07-27 16:59:52 -0400340 if not m_callback.called:
341 # Triggering callback will cancel the timeout timer
khenaidoo686f7bd2017-08-11 11:41:33 -0400342 log.info('trigger-callback-to-cancel-timout-timer')
khenaidoob1602a32017-07-27 16:59:52 -0400343 m_callback.callback(result)
khenaidoo686f7bd2017-08-11 11:41:33 -0400344 else:
345 # Timeout event has already been called. Just ignore
346 # this event
347 log.info('renew-called-after-timout',
348 new_consul_ref=self.consul,
349 old_consul_ref=consul_ref)
khenaidoob1602a32017-07-27 16:59:52 -0400350 except Exception, e:
351 # Let the invoking method receive a timeout
352 log.exception('could-not-renew-session', e=e)
353
354 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400355 while 1:
356 log.debug('session-tracking-start')
357 rcvd = DeferredWithTimeout(
358 timeout=self.session_renewal_timeout)
359 _renew_session(rcvd)
360 try:
361 _ = yield rcvd
362 except TimeOutError as e:
363 log.info('session-renew-timeout', e=e)
364 # Redo the session
365 yield _redo_session()
366 except Exception as e:
367 log.exception('session-renew-exception', e=e)
368 else:
369 log.debug('successfully-renewed-session')
370
371 # Async sleep before the next session tracking
372 yield asleep(self.session_renewal_loop_delay)
373
khenaidoob1602a32017-07-27 16:59:52 -0400374 except Exception as e:
khenaidoo686f7bd2017-08-11 11:41:33 -0400375 log.exception('renew-exception', e=e)
khenaidoob1602a32017-07-27 16:59:52 -0400376 finally:
377 reactor.callLater(self.session_renewal_loop_delay,
378 self._session_tracking_loop)
379
Zsolt Harasztia3410312016-09-18 23:29:04 -0700380 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700381 reactor.callLater(0, self._leadership_tracking_loop)
382
383 @inlineCallbacks
384 def _leadership_tracking_loop(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700385 try:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700386 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700387 # False indicates there is already a leader. It's instance id
388 # is then the value under the leader key service/voltha/leader.
389
390 # attempt acquire leader lock
khenaidoo686f7bd2017-08-11 11:41:33 -0400391 log.info('leadership-attempt-before')
khenaidoob1602a32017-07-27 16:59:52 -0400392 result = yield self._retry('PUT',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400393 self.leader_prefix,
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700394 self.instance_id,
395 acquire=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400396 log.info('leadership-attempt-after')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700397
398 # read it back before being too happy; seeing our session id is a
399 # proof and now we have the change id that we can use to reliably
400 # track any changes. In an unlikely scenario where the leadership
401 # key gets wiped out administratively since the previous line,
402 # the returned record can be None. Handle it.
khenaidoob1602a32017-07-27 16:59:52 -0400403 (index, record) = yield self._retry('GET',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400404 self.leader_prefix)
khenaidoo686f7bd2017-08-11 11:41:33 -0400405 log.info('leader-prefix',
406 i_am_leader=result, index=index, record=record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700407
408 if record is not None:
409 if result is True:
410 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700411 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700412 else:
413 pass # confusion; need to retry leadership
414 else:
415 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700416 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700417
418 # if record was none, we shall try leadership again
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700419 last = record
420 while last is not None:
421 # this shall return only when update is made to leader key
khenaidood6e0e802017-08-29 19:55:44 -0400422 # or expires after 5 seconds wait
423 is_timeout, (tmp_index, updated) = yield \
424 self.consul_get_with_timeout(
425 key=self.leader_prefix,
426 index=index,
427 timeout=5)
428 # Timeout means either there is a lost connectivity to
429 # consul or there are no change to that key. Do nothing.
430 if is_timeout:
431 continue
432
433 # After timeout event the index returned from
434 # consul_get_with_timeout is None. If we are here it's not a
435 # timeout, therefore the index is a valid one.
436 index=tmp_index
437
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700438 if updated is None or updated != last:
khenaidoo686f7bd2017-08-11 11:41:33 -0400439 log.info('leader-key-change',
440 index=index, updated=updated, last=last)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700441 # leadership has changed or vacated (or forcefully
442 # removed), apply now
khenaidoob1602a32017-07-27 16:59:52 -0400443 # If I was previoulsy the leader then assert a non
444 # leadership role before going for election
445 if self.i_am_leader:
khenaidoo686f7bd2017-08-11 11:41:33 -0400446 log.info('leaving-leaderdhip',
447 leader=self.instance_id)
khenaidoob1602a32017-07-27 16:59:52 -0400448 yield self._assert_nonleadership(self.instance_id)
449
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700450 break
451 last = updated
452
453 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700454 log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700455
456 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700457 # except in shutdown, the loop must continue (after a short delay)
458 if not self.shutting_down:
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400459 reactor.callLater(self.tracking_loop_delay,
460 self._leadership_tracking_loop)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700461
Zsolt Harasztia3410312016-09-18 23:29:04 -0700462 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700463 def _assert_leadership(self):
464 """(Re-)assert leadership"""
465 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700466 self.i_am_leader = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700467 self._set_leader_id(self.instance_id)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700468 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700469
Zsolt Harasztia3410312016-09-18 23:29:04 -0700470 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700471 def _assert_nonleadership(self, leader_id):
472 """(Re-)assert non-leader role"""
473
474 # update leader_id anyway
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700475 self._set_leader_id(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700476
477 if self.i_am_leader:
478 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700479 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700480
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700481 def _set_leader_id(self, leader_id):
482 self.leader_id = leader_id
483 deferreds, self.wait_for_leader_deferreds = \
484 self.wait_for_leader_deferreds, []
485 for d in deferreds:
486 d.callback(leader_id)
487
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700488 def _just_gained_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700489 log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700490 self.leader = Leader(self)
491 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700492
493 def _just_lost_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700494 log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700495 return self._halt_leader()
496
497 def _halt_leader(self):
khenaidoob1602a32017-07-27 16:59:52 -0400498 if self.leader:
499 d = self.leader.stop()
500 self.leader = None
Zsolt Harasztia3410312016-09-18 23:29:04 -0700501 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700502
khenaidoob1602a32017-07-27 16:59:52 -0400503 def get_consul(self):
504 return self.consul
505
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700506 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400507 def _retry(self, operation, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700508 while 1:
509 try:
khenaidoob1602a32017-07-27 16:59:52 -0400510 consul = yield self.get_consul()
khenaidoo686f7bd2017-08-11 11:41:33 -0400511 log.info('start', operation=operation, args=args)
khenaidoob1602a32017-07-27 16:59:52 -0400512 if operation == 'GET':
513 result = yield consul.kv.get(*args, **kw)
514 elif operation == 'PUT':
515 for name, value in kw.items():
516 if name == 'acquire':
517 if value != self.session_id:
518 log.info('updating-session-in-put-operation',
519 old_session=value,
520 new_session=self.session_id)
521 kw['acquire'] = self.session_id
522 break
523 result = yield consul.kv.put(*args, **kw)
524 elif operation == 'DELETE':
525 result = yield consul.kv.delete(*args, **kw)
526 else:
527 # Default case - consider operation as a function call
528 result = yield operation(*args, **kw)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700529 self._clear_backoff()
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700530 break
531 except ConsulException, e:
khenaidoo5431e4c2017-08-17 15:05:40 -0400532 log.exception('consul-not-up',
533 operation=operation,
534 args=args,
535 session=self.consul.Session,
536 e=e)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700537 yield self._backoff('consul-not-up')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700538 except ConnectionError, e:
khenaidoob1602a32017-07-27 16:59:52 -0400539 log.exception('cannot-connect-to-consul',
khenaidoo5431e4c2017-08-17 15:05:40 -0400540 operation=operation,
541 args=args,
542 session=self.consul.Session,
543 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700544 yield self._backoff('cannot-connect-to-consul')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700545 except StaleMembershipEntryException, e:
khenaidoob1602a32017-07-27 16:59:52 -0400546 log.exception('stale-membership-record-in-the-way',
khenaidoo5431e4c2017-08-17 15:05:40 -0400547 operation=operation,
548 args=args,
549 session=self.consul.Session,
550 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700551 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700552 except Exception, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700553 if not self.shutting_down:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700554 log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700555 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700556
khenaidoo686f7bd2017-08-11 11:41:33 -0400557 log.info('end', operation=operation, args=args)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700558 returnValue(result)
khenaidood6e0e802017-08-29 19:55:44 -0400559
560 @inlineCallbacks
561 def consul_get_with_timeout(self, key, timeout, **kw):
562 """
563 Query consul with a timeout
564 :param key: Key to query
565 :param timeout: timeout value
566 :param kw: additional key-value params
567 :return: (is_timeout, (index, result)).
568 """
569
570 @inlineCallbacks
571 def _get(key, m_callback):
572 try:
573 (index, result) = yield self._retry('GET', key, **kw)
574 if not m_callback.called:
575 log.debug('got-result-cancelling-timer')
576 m_callback.callback((index, result))
577 except Exception as e:
578 log.exception('got-exception', e=e)
579
580 try:
581 rcvd = DeferredWithTimeout(timeout=timeout)
582 _get(key, rcvd)
583 try:
584 result = yield rcvd
585 log.debug('result-received', result=result)
586 returnValue((False, result))
587 except TimeOutError as e:
588 log.debug('timeout-or-no-data-change', key=key)
589 except Exception as e:
590 log.exception('exception', e=e)
591 except Exception as e:
592 log.exception('exception', e=e)
593
594 returnValue((True, (None, None)))