blob: 61fee2695d1186c899888a3a9a31c1119e5a722f [file] [log] [blame]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztif2da1d02016-09-13 23:21:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17""" Consul-based coordinator services """
18
Zsolt Haraszti109db832016-09-16 16:32:36 -070019from consul import ConsulException
Zsolt Harasztie060a7d2016-09-16 11:08:24 -070020from consul.twisted import Consul
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070021from requests import ConnectionError
22from structlog import get_logger
23from twisted.internet import reactor
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Jonathan Hartdd9daf52018-05-21 17:04:45 -070025from twisted.internet.error import DNSLookupError
Zsolt Harasztidafefe12016-11-14 21:29:58 -080026from zope.interface import implementer
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070027
Zsolt Harasztia3410312016-09-18 23:29:04 -070028from leader import Leader
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070029from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040030from common.utils.message_queue import MessageQueue
Zsolt Harasztidafefe12016-11-14 21:29:58 -080031from voltha.registry import IComponent
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070032from worker import Worker
khenaidooa8588f22017-06-16 12:13:34 -040033from simplejson import dumps, loads
khenaidoob1602a32017-07-27 16:59:52 -040034from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070035
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070036log = get_logger()
37
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070038
Zsolt Haraszti1420def2016-09-18 00:07:31 -070039class StaleMembershipEntryException(Exception):
40 pass
41
42
Zsolt Harasztidafefe12016-11-14 21:29:58 -080043@implementer(IComponent)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070044class Coordinator(object):
Zsolt Harasztia3410312016-09-18 23:29:04 -070045 """
46 An app shall instantiate only one Coordinator (singleton).
47 A single instance of this object shall take care of all external
48 with consul, and via consul, all coordination activities with its
49 clustered peers. Roles include:
50 - registering an ephemeral membership entry (k/v record) in consul
51 - participating in a symmetric leader election, and potentially assuming
52 the leader's role. What leadership entails is not a concern for the
53 coordination, it simply instantiates (and shuts down) a leader class
54 when it gains (or looses) leadership.
55 """
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070056
57 CONNECT_RETRY_INTERVAL_SEC = 1
58 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
59
Zsolt Harasztia3410312016-09-18 23:29:04 -070060 # Public methods:
61
Zsolt Harasztide22bbc2016-09-14 15:27:33 -070062 def __init__(self,
63 internal_host_address,
64 external_host_address,
65 instance_id,
66 rest_port,
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040067 config,
Richard Jankowski4ea26632018-05-14 17:45:38 -040068 consul='localhost:8500',
69 container_name_regex='^.*\.([0-9]+)\..*$'):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -070070
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070071 log.info('initializing-coordinator')
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040072 self.config = config['coordinator']
73 self.worker_config = config['worker']
74 self.leader_config = config['leader']
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040075 self.membership_watch_relatch_delay = config.get(
khenaidoo686f7bd2017-08-11 11:41:33 -040076 'membership_watch_relatch_delay', 0.1)
77 self.tracking_loop_delay = self.config.get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040078 'tracking_loop_delay', 1)
khenaidoo686f7bd2017-08-11 11:41:33 -040079 self.session_renewal_timeout = self.config.get(
80 'session_renewal_timeout', 5)
81 self.session_renewal_loop_delay = self.config.get(
khenaidoob1602a32017-07-27 16:59:52 -040082 'session_renewal_loop_delay', 3)
khenaidoo686f7bd2017-08-11 11:41:33 -040083 self.membership_maintenance_loop_delay = self.config.get(
84 'membership_maintenance_loop_delay', 5)
85 self.session_time_to_live = self.config.get(
86 'session_time_to_live', 10)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040087 self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080088 self.leader_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040089 self.config['leader_key'], 'leader')))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080090 self.membership_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040091 self.config['membership_key'], 'members'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080092 self.assignment_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040093 self.config['assignment_key'], 'assignments'), ''))
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080094 self.workload_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040095 self.config['workload_key'], 'work'), ''))
khenaidoo032d3302017-06-09 14:50:04 -040096 self.core_store_prefix = '/'.join((self.prefix, self.config.get(
khenaidoo08d48d22017-06-29 19:42:49 -040097 self.config['core_store_key'], 'data/core')))
98 self.core_store_assignment_key = self.core_store_prefix + \
99 '/assignment'
100 self.core_storage_suffix = 'core_store'
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400101
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700102 self.retries = 0
103 self.instance_id = instance_id
104 self.internal_host_address = internal_host_address
105 self.external_host_address = external_host_address
Zsolt Harasztide22bbc2016-09-14 15:27:33 -0700106 self.rest_port = rest_port
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400107 self.membership_record_key = self.membership_prefix + self.instance_id
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700108
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700109 self.session_id = None
110 self.i_am_leader = False
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700111 self.leader_id = None # will be the instance id of the current leader
112 self.shutting_down = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700113 self.leader = None
khenaidooe154d592017-08-03 19:08:27 -0400114 self.membership_callback = None
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700115
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700116 self.worker = Worker(self.instance_id, self)
117
khenaidoob1602a32017-07-27 16:59:52 -0400118 self.host = consul.split(':')[0].strip()
119 self.port = int(consul.split(':')[1].strip())
Zsolt Haraszti109db832016-09-16 16:32:36 -0700120
121 # TODO need to handle reconnect events properly
khenaidoob1602a32017-07-27 16:59:52 -0400122 self.consul = Consul(host=self.host, port=self.port)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700123
Richard Jankowski4ea26632018-05-14 17:45:38 -0400124 self.container_name_regex = container_name_regex
125
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700126 self.wait_for_leader_deferreds = []
127
khenaidoo08d48d22017-06-29 19:42:49 -0400128 self.peers_mapping_queue = MessageQueue()
129
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700130 def start(self):
131 log.debug('starting')
132 reactor.callLater(0, self._async_init)
133 log.info('started')
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800134 return self
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700135
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700136 @inlineCallbacks
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700137 def stop(self):
138 log.debug('stopping')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700139 self.shutting_down = True
Zsolt Harasztia3410312016-09-18 23:29:04 -0700140 yield self._delete_session() # this will delete the leader lock too
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700141 yield self.worker.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700142 if self.leader is not None:
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800143 yield self.leader.stop()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700144 self.leader = None
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700145 log.info('stopped')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700146
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700147 def wait_for_a_leader(self):
148 """
149 Async wait till a leader is detected/elected. The deferred will be
150 called with the leader's instance_id
151 :return: Deferred.
152 """
153 d = Deferred()
154 if self.leader_id is not None:
155 d.callback(self.leader_id)
156 return d
157 else:
158 self.wait_for_leader_deferreds.append(d)
159 return d
160
khenaidoo032d3302017-06-09 14:50:04 -0400161 # Wait for a core data id to be assigned to this voltha instance
162 @inlineCallbacks
163 def get_core_store_id_and_prefix(self):
164 core_store_id = yield self.worker.get_core_store_id()
165 returnValue((core_store_id, self.core_store_prefix))
166
khenaidoo08d48d22017-06-29 19:42:49 -0400167 def recv_peers_map(self):
168 return self.peers_mapping_queue.get()
169
170 def publish_peers_map_change(self, msg):
171 self.peers_mapping_queue.put(msg)
172
Zsolt Harasztia3410312016-09-18 23:29:04 -0700173 # Proxy methods for consul with retry support
174
175 def kv_get(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400176 return self._retry('GET', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700177
178 def kv_put(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400179 return self._retry('PUT', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700180
181 def kv_delete(self, *args, **kw):
khenaidoob1602a32017-07-27 16:59:52 -0400182 return self._retry('DELETE', *args, **kw)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700183
Zsolt Haraszti00d9a842016-11-23 11:18:23 -0800184 # Methods exposing key membership information
185
186 @inlineCallbacks
187 def get_members(self):
188 """Return list of all members"""
189 _, members = yield self.kv_get(self.membership_prefix, recurse=True)
190 returnValue([member['Key'][len(self.membership_prefix):]
191 for member in members])
192
Zsolt Harasztia3410312016-09-18 23:29:04 -0700193 # Private (internal) methods:
194
195 @inlineCallbacks
196 def _async_init(self):
197 yield self._create_session()
198 yield self._create_membership_record()
199 yield self._start_leader_tracking()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700200 yield self.worker.start()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700201
202 def _backoff(self, msg):
Zsolt Haraszti109db832016-09-16 16:32:36 -0700203 wait_time = self.RETRY_BACKOFF[min(self.retries,
204 len(self.RETRY_BACKOFF) - 1)]
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700205 self.retries += 1
Jonathan Hartdd9daf52018-05-21 17:04:45 -0700206 log.info(msg, retry_in=wait_time)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700207 return asleep(wait_time)
208
Zsolt Harasztia3410312016-09-18 23:29:04 -0700209 def _clear_backoff(self):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700210 if self.retries:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700211 log.info('reconnected-to-consul', after_retries=self.retries)
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700212 self.retries = 0
213
214 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700215 def _create_session(self):
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700216
217 @inlineCallbacks
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700218 def _create_session():
khenaidoob1602a32017-07-27 16:59:52 -0400219 consul = yield self.get_consul()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700220 # create consul session
khenaidoob1602a32017-07-27 16:59:52 -0400221 self.session_id = yield consul.session.create(
khenaidoo686f7bd2017-08-11 11:41:33 -0400222 behavior='release', ttl=self.session_time_to_live,
223 lock_delay=1)
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700224 log.info('created-consul-session', session_id=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400225 self._start_session_tracking()
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700226
227 yield self._retry(_create_session)
228
229 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700230 def _delete_session(self):
khenaidooe154d592017-08-03 19:08:27 -0400231 try:
232 yield self.consul.session.destroy(self.session_id)
233 except Exception as e:
234 log.exception('failed-to-delete-session',
khenaidoo686f7bd2017-08-11 11:41:33 -0400235 session_id=self.session_id)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700236
237 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700238 def _create_membership_record(self):
khenaidoob1602a32017-07-27 16:59:52 -0400239 yield self._do_create_membership_record_with_retries()
khenaidoo686f7bd2017-08-11 11:41:33 -0400240 reactor.callLater(0, self._maintain_membership_record)
241
242 @inlineCallbacks
243 def _maintain_membership_record(self):
244 try:
245 while 1:
246 valid_membership = yield self._assert_membership_record_valid()
247 if not valid_membership:
248 log.info('recreating-membership-before',
khenaidood6e0e802017-08-29 19:55:44 -0400249 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400250 yield self._do_create_membership_record_with_retries()
251 log.info('recreating-membership-after',
khenaidood6e0e802017-08-29 19:55:44 -0400252 session=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400253 else:
254 log.debug('valid-membership', session=self.session_id)
255 # Async sleep before checking the membership record again
256 yield asleep(self.membership_maintenance_loop_delay)
257
258 except Exception, e:
259 log.exception('unexpected-error-leader-trackin', e=e)
260 finally:
261 # except in shutdown, the loop must continue (after a short delay)
262 if not self.shutting_down:
263 reactor.callLater(self.membership_watch_relatch_delay,
264 self._maintain_membership_record)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700265
khenaidooa8588f22017-06-16 12:13:34 -0400266 def _create_membership_record_data(self):
267 member_record = dict()
268 member_record['status'] = 'alive'
269 member_record['host_address'] = self.external_host_address
270 return member_record
271
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700272 @inlineCallbacks
khenaidooe154d592017-08-03 19:08:27 -0400273 def _assert_membership_record_valid(self):
274 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400275 log.info('membership-record-before')
khenaidoo890b0902017-08-30 10:11:32 -0400276 is_timeout, (_, record) = yield \
Zack Williams18357ed2018-11-14 10:41:08 -0700277 self.coordinator_get_with_timeout(
khenaidoo890b0902017-08-30 10:11:32 -0400278 key=self.membership_record_key,
279 index=0,
280 timeout=5)
281 if is_timeout:
282 returnValue(False)
283
khenaidoo686f7bd2017-08-11 11:41:33 -0400284 log.info('membership-record-after', record=record)
khenaidooe154d592017-08-03 19:08:27 -0400285 if record is None or \
khenaidoo686f7bd2017-08-11 11:41:33 -0400286 'Session' not in record or \
287 record['Session'] != self.session_id:
288 log.info('membership-record-change-detected',
289 old_session=self.session_id,
290 record=record)
khenaidooe154d592017-08-03 19:08:27 -0400291 returnValue(False)
292 else:
293 returnValue(True)
294 except Exception as e:
295 log.exception('membership-validation-exception', e=e)
296 returnValue(False)
297
khenaidooe154d592017-08-03 19:08:27 -0400298 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400299 def _do_create_membership_record_with_retries(self):
300 while 1:
khenaidoo686f7bd2017-08-11 11:41:33 -0400301 log.info('recreating-membership', session=self.session_id)
302 result = yield self._retry(
303 'PUT',
304 self.membership_record_key,
305 dumps(self._create_membership_record_data()),
306 acquire=self.session_id)
307 if result:
308 log.info('new-membership-record-created',
309 session=self.session_id)
khenaidooe154d592017-08-03 19:08:27 -0400310 break
khenaidoo686f7bd2017-08-11 11:41:33 -0400311 else:
312 log.warn('cannot-create-membership-record')
313 yield self._backoff('stale-membership-record')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700314
khenaidoob1602a32017-07-27 16:59:52 -0400315 def _start_session_tracking(self):
316 reactor.callLater(0, self._session_tracking_loop)
317
318 @inlineCallbacks
319 def _session_tracking_loop(self):
320
321 @inlineCallbacks
322 def _redo_session():
323 log.info('_redo_session-before')
khenaidooe154d592017-08-03 19:08:27 -0400324 yield self._delete_session()
khenaidoob1602a32017-07-27 16:59:52 -0400325
khenaidoo686f7bd2017-08-11 11:41:33 -0400326 # Create a new consul connection/session with a TTL of 25 secs
khenaidoob1602a32017-07-27 16:59:52 -0400327 try:
328 self.consul = Consul(host=self.host, port=self.port)
329 self.session_id = yield self.consul.session.create(
330 behavior='release',
khenaidoo686f7bd2017-08-11 11:41:33 -0400331 ttl=self.session_time_to_live,
khenaidoob1602a32017-07-27 16:59:52 -0400332 lock_delay=1)
khenaidoo686f7bd2017-08-11 11:41:33 -0400333 log.info('new-consul-session', session=self.session_id)
khenaidoob1602a32017-07-27 16:59:52 -0400334
335 except Exception as e:
336 log.exception('could-not-create-a-consul-session', e=e)
337
338 @inlineCallbacks
339 def _renew_session(m_callback):
340 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400341 log.debug('_renew_session-before')
342 consul_ref = self.consul
343 result = yield consul_ref.session.renew(
khenaidoob1602a32017-07-27 16:59:52 -0400344 session_id=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400345 log.info('just-renewed-session', result=result)
khenaidoob1602a32017-07-27 16:59:52 -0400346 if not m_callback.called:
347 # Triggering callback will cancel the timeout timer
khenaidoo686f7bd2017-08-11 11:41:33 -0400348 log.info('trigger-callback-to-cancel-timout-timer')
khenaidoob1602a32017-07-27 16:59:52 -0400349 m_callback.callback(result)
khenaidoo686f7bd2017-08-11 11:41:33 -0400350 else:
351 # Timeout event has already been called. Just ignore
352 # this event
353 log.info('renew-called-after-timout',
354 new_consul_ref=self.consul,
355 old_consul_ref=consul_ref)
khenaidoob1602a32017-07-27 16:59:52 -0400356 except Exception, e:
357 # Let the invoking method receive a timeout
358 log.exception('could-not-renew-session', e=e)
359
360 try:
khenaidoo686f7bd2017-08-11 11:41:33 -0400361 while 1:
362 log.debug('session-tracking-start')
363 rcvd = DeferredWithTimeout(
364 timeout=self.session_renewal_timeout)
365 _renew_session(rcvd)
366 try:
367 _ = yield rcvd
368 except TimeOutError as e:
369 log.info('session-renew-timeout', e=e)
370 # Redo the session
371 yield _redo_session()
372 except Exception as e:
373 log.exception('session-renew-exception', e=e)
374 else:
375 log.debug('successfully-renewed-session')
376
377 # Async sleep before the next session tracking
378 yield asleep(self.session_renewal_loop_delay)
379
khenaidoob1602a32017-07-27 16:59:52 -0400380 except Exception as e:
khenaidoo686f7bd2017-08-11 11:41:33 -0400381 log.exception('renew-exception', e=e)
khenaidoob1602a32017-07-27 16:59:52 -0400382 finally:
383 reactor.callLater(self.session_renewal_loop_delay,
384 self._session_tracking_loop)
385
Zsolt Harasztia3410312016-09-18 23:29:04 -0700386 def _start_leader_tracking(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700387 reactor.callLater(0, self._leadership_tracking_loop)
388
389 @inlineCallbacks
390 def _leadership_tracking_loop(self):
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700391 try:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700392 # Attempt to acquire leadership lock. True indicates success;
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700393 # False indicates there is already a leader. It's instance id
394 # is then the value under the leader key service/voltha/leader.
395
396 # attempt acquire leader lock
khenaidoo686f7bd2017-08-11 11:41:33 -0400397 log.info('leadership-attempt-before')
khenaidoob1602a32017-07-27 16:59:52 -0400398 result = yield self._retry('PUT',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400399 self.leader_prefix,
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700400 self.instance_id,
401 acquire=self.session_id)
khenaidoo686f7bd2017-08-11 11:41:33 -0400402 log.info('leadership-attempt-after')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700403
404 # read it back before being too happy; seeing our session id is a
405 # proof and now we have the change id that we can use to reliably
406 # track any changes. In an unlikely scenario where the leadership
407 # key gets wiped out administratively since the previous line,
408 # the returned record can be None. Handle it.
khenaidoob1602a32017-07-27 16:59:52 -0400409 (index, record) = yield self._retry('GET',
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400410 self.leader_prefix)
khenaidoo686f7bd2017-08-11 11:41:33 -0400411 log.info('leader-prefix',
412 i_am_leader=result, index=index, record=record)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700413
414 if record is not None:
415 if result is True:
416 if record['Session'] == self.session_id:
Zsolt Harasztia3410312016-09-18 23:29:04 -0700417 yield self._assert_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700418 else:
419 pass # confusion; need to retry leadership
420 else:
421 leader_id = record['Value']
Zsolt Harasztia3410312016-09-18 23:29:04 -0700422 yield self._assert_nonleadership(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700423
424 # if record was none, we shall try leadership again
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700425 last = record
426 while last is not None:
427 # this shall return only when update is made to leader key
khenaidood6e0e802017-08-29 19:55:44 -0400428 # or expires after 5 seconds wait
429 is_timeout, (tmp_index, updated) = yield \
Zack Williams18357ed2018-11-14 10:41:08 -0700430 self.coordinator_get_with_timeout(
khenaidood6e0e802017-08-29 19:55:44 -0400431 key=self.leader_prefix,
432 index=index,
433 timeout=5)
434 # Timeout means either there is a lost connectivity to
435 # consul or there are no change to that key. Do nothing.
436 if is_timeout:
437 continue
438
439 # After timeout event the index returned from
Zack Williams18357ed2018-11-14 10:41:08 -0700440 # coordinator_get_with_timeout is None. If we are here it's
441 # not a timeout, therefore the index is a valid one.
khenaidood6e0e802017-08-29 19:55:44 -0400442 index=tmp_index
443
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700444 if updated is None or updated != last:
khenaidoo686f7bd2017-08-11 11:41:33 -0400445 log.info('leader-key-change',
446 index=index, updated=updated, last=last)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700447 # leadership has changed or vacated (or forcefully
448 # removed), apply now
khenaidoob1602a32017-07-27 16:59:52 -0400449 # If I was previoulsy the leader then assert a non
450 # leadership role before going for election
451 if self.i_am_leader:
khenaidoo686f7bd2017-08-11 11:41:33 -0400452 log.info('leaving-leaderdhip',
453 leader=self.instance_id)
khenaidoob1602a32017-07-27 16:59:52 -0400454 yield self._assert_nonleadership(self.instance_id)
455
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700456 break
457 last = updated
458
459 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700460 log.exception('unexpected-error-leader-trackin', e=e)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700461
462 finally:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700463 # except in shutdown, the loop must continue (after a short delay)
464 if not self.shutting_down:
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400465 reactor.callLater(self.tracking_loop_delay,
466 self._leadership_tracking_loop)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700467
Zsolt Harasztia3410312016-09-18 23:29:04 -0700468 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700469 def _assert_leadership(self):
470 """(Re-)assert leadership"""
471 if not self.i_am_leader:
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700472 self.i_am_leader = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700473 self._set_leader_id(self.instance_id)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700474 yield self._just_gained_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700475
Zsolt Harasztia3410312016-09-18 23:29:04 -0700476 @inlineCallbacks
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700477 def _assert_nonleadership(self, leader_id):
478 """(Re-)assert non-leader role"""
479
480 # update leader_id anyway
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700481 self._set_leader_id(leader_id)
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700482
483 if self.i_am_leader:
484 self.i_am_leader = False
Zsolt Harasztia3410312016-09-18 23:29:04 -0700485 yield self._just_lost_leadership()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700486
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700487 def _set_leader_id(self, leader_id):
488 self.leader_id = leader_id
489 deferreds, self.wait_for_leader_deferreds = \
490 self.wait_for_leader_deferreds, []
491 for d in deferreds:
492 d.callback(leader_id)
493
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700494 def _just_gained_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700495 log.info('became-leader')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700496 self.leader = Leader(self)
497 return self.leader.start()
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700498
499 def _just_lost_leadership(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700500 log.info('lost-leadership')
Zsolt Harasztia3410312016-09-18 23:29:04 -0700501 return self._halt_leader()
502
503 def _halt_leader(self):
khenaidoob1602a32017-07-27 16:59:52 -0400504 if self.leader:
505 d = self.leader.stop()
506 self.leader = None
Zsolt Harasztia3410312016-09-18 23:29:04 -0700507 return d
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700508
khenaidoob1602a32017-07-27 16:59:52 -0400509 def get_consul(self):
510 return self.consul
511
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700512 @inlineCallbacks
khenaidoob1602a32017-07-27 16:59:52 -0400513 def _retry(self, operation, *args, **kw):
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700514 while 1:
515 try:
khenaidoob1602a32017-07-27 16:59:52 -0400516 consul = yield self.get_consul()
khenaidoo686f7bd2017-08-11 11:41:33 -0400517 log.info('start', operation=operation, args=args)
khenaidoob1602a32017-07-27 16:59:52 -0400518 if operation == 'GET':
519 result = yield consul.kv.get(*args, **kw)
520 elif operation == 'PUT':
521 for name, value in kw.items():
522 if name == 'acquire':
523 if value != self.session_id:
524 log.info('updating-session-in-put-operation',
525 old_session=value,
526 new_session=self.session_id)
527 kw['acquire'] = self.session_id
528 break
529 result = yield consul.kv.put(*args, **kw)
530 elif operation == 'DELETE':
531 result = yield consul.kv.delete(*args, **kw)
532 else:
533 # Default case - consider operation as a function call
534 result = yield operation(*args, **kw)
Zsolt Harasztid4226ed2016-10-05 17:49:27 -0700535 self._clear_backoff()
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700536 break
537 except ConsulException, e:
khenaidoo5431e4c2017-08-17 15:05:40 -0400538 log.exception('consul-not-up',
539 operation=operation,
540 args=args,
541 session=self.consul.Session,
542 e=e)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700543 yield self._backoff('consul-not-up')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700544 except ConnectionError, e:
khenaidoob1602a32017-07-27 16:59:52 -0400545 log.exception('cannot-connect-to-consul',
khenaidoo5431e4c2017-08-17 15:05:40 -0400546 operation=operation,
547 args=args,
548 session=self.consul.Session,
549 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700550 yield self._backoff('cannot-connect-to-consul')
Jonathan Hartdd9daf52018-05-21 17:04:45 -0700551 except DNSLookupError, e:
552 log.info('dns-lookup-failed', operation=operation, args=args,
553 host=self.host)
554 yield self._backoff('dns-lookup-failed')
Zsolt Haraszti1420def2016-09-18 00:07:31 -0700555 except StaleMembershipEntryException, e:
khenaidoob1602a32017-07-27 16:59:52 -0400556 log.exception('stale-membership-record-in-the-way',
khenaidoo5431e4c2017-08-17 15:05:40 -0400557 operation=operation,
558 args=args,
559 session=self.consul.Session,
560 e=e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700561 yield self._backoff('stale-membership-record-in-the-way')
Zsolt Harasztif2da1d02016-09-13 23:21:35 -0700562 except Exception, e:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700563 if not self.shutting_down:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700564 log.exception(e)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700565 yield self._backoff('unknown-error')
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700566
khenaidoo686f7bd2017-08-11 11:41:33 -0400567 log.info('end', operation=operation, args=args)
Zsolt Harasztie060a7d2016-09-16 11:08:24 -0700568 returnValue(result)
khenaidood6e0e802017-08-29 19:55:44 -0400569
570 @inlineCallbacks
Zack Williams18357ed2018-11-14 10:41:08 -0700571 def coordinator_get_with_timeout(self, key, timeout, **kw):
khenaidood6e0e802017-08-29 19:55:44 -0400572 """
573 Query consul with a timeout
574 :param key: Key to query
575 :param timeout: timeout value
576 :param kw: additional key-value params
577 :return: (is_timeout, (index, result)).
578 """
579
580 @inlineCallbacks
581 def _get(key, m_callback):
582 try:
583 (index, result) = yield self._retry('GET', key, **kw)
584 if not m_callback.called:
585 log.debug('got-result-cancelling-timer')
586 m_callback.callback((index, result))
587 except Exception as e:
588 log.exception('got-exception', e=e)
589
590 try:
591 rcvd = DeferredWithTimeout(timeout=timeout)
592 _get(key, rcvd)
593 try:
594 result = yield rcvd
595 log.debug('result-received', result=result)
596 returnValue((False, result))
597 except TimeOutError as e:
Zack Williams8d811fd2018-11-22 09:23:23 -0700598 log.debug('timeout-or-no-data-change', consul_key=key)
khenaidood6e0e802017-08-29 19:55:44 -0400599 except Exception as e:
600 log.exception('exception', e=e)
601 except Exception as e:
602 log.exception('exception', e=e)
603
604 returnValue((True, (None, None)))