blob: 54f1117a5f52de16a11767cd6d22ae85cc7cf8e8 [file] [log] [blame]
Zsolt Harasztia3410312016-09-18 23:29:04 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztia3410312016-09-18 23:29:04 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070017import re
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070018
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070019from hash_ring import HashRing
Zsolt Harasztia3410312016-09-18 23:29:04 -070020from structlog import get_logger
21from twisted.internet import reactor
22from twisted.internet.base import DelayedCall
khenaidoo5431e4c2017-08-17 15:05:40 -040023from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
khenaidoo032d3302017-06-09 14:50:04 -040024from simplejson import dumps, loads
Zsolt Harasztia3410312016-09-18 23:29:04 -070025
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040027from common.utils.id_generation import get_next_core_id
Zsolt Harasztia3410312016-09-18 23:29:04 -070028
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070029log = get_logger()
30
Zsolt Harasztia3410312016-09-18 23:29:04 -070031
khenaidoo032d3302017-06-09 14:50:04 -040032class ConfigMappingException(Exception):
33 pass
34
35
Zsolt Harasztia3410312016-09-18 23:29:04 -070036class Leader(object):
37 """
38 A single instance of this object shall exist across the whole cluster.
39 This is guaranteed by the coordinator which instantiates this class
40 only when it secured the leadership lock, as well as calling the halt()
41 method in cases it looses the leadership lock.
42 """
43
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070044 ID_EXTRACTOR = '^(%s)([^/]+)$'
Richard Jankowski4ea26632018-05-14 17:45:38 -040045 CORE_STORE_KEY_EXTRACTOR = '^%s/(?P<core_store_id>[^/]+)/root$'
46 START_TIMESTAMP_EXTRACTOR = '^.*_([0-9]+)$'
khenaidoo5431e4c2017-08-17 15:05:40 -040047 ASSIGNMENT_ID_EXTRACTOR = '^(%s)([^/]+)/core_store$'
Zsolt Harasztia3410312016-09-18 23:29:04 -070048
Zsolt Harasztia3410312016-09-18 23:29:04 -070049 # Public methods:
50
51 def __init__(self, coordinator):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070052
53 self.coord = coordinator
Zsolt Harasztia3410312016-09-18 23:29:04 -070054 self.halted = False
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070055 self.soak_time = 3 # soak till membership/workload changes settle
Zsolt Harasztia3410312016-09-18 23:29:04 -070056
57 self.workload = []
58 self.members = []
khenaidoo032d3302017-06-09 14:50:04 -040059 self.core_store_ids = []
60 self.core_store_assignment = None
61
Zsolt Harasztia3410312016-09-18 23:29:04 -070062 self.reassignment_soak_timer = None
khenaidoob1602a32017-07-27 16:59:52 -040063 self.core_store_reassignment_soak_timer = None
Zsolt Harasztia3410312016-09-18 23:29:04 -070064
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070065 self.workload_id_match = re.compile(
khenaidoo032d3302017-06-09 14:50:04 -040066 self.ID_EXTRACTOR % self.coord.workload_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070067
68 self.member_id_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040069 self.ID_EXTRACTOR % self.coord.membership_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070070
khenaidoo032d3302017-06-09 14:50:04 -040071 self.core_data_id_match = re.compile(
72 self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
73
Richard Jankowski4ea26632018-05-14 17:45:38 -040074 self.core_match = re.compile(self.coord.container_name_regex).match
75 self.timestamp_match = re.compile(self.START_TIMESTAMP_EXTRACTOR).match
khenaidoo82ce00d2017-08-15 12:01:46 -040076
khenaidoo5431e4c2017-08-17 15:05:40 -040077 self.assignment_id_match = re.compile(
78 self.ASSIGNMENT_ID_EXTRACTOR % self.coord.assignment_prefix).match
khenaidoo82ce00d2017-08-15 12:01:46 -040079
khenaidood6e0e802017-08-29 19:55:44 -040080 self.members_tracking_sleep_to_prevent_flood = \
81 self.coord.leader_config.get((self.coord.leader_config[
82 'members_track_error_to_prevent_flood']), 1)
83
Zsolt Harasztia3410312016-09-18 23:29:04 -070084 @inlineCallbacks
85 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070086 log.debug('starting')
khenaidooa8588f22017-06-16 12:13:34 -040087 # yield self._validate_workload()
Zsolt Harasztia3410312016-09-18 23:29:04 -070088 yield self._start_tracking_assignments()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070089 log.info('started')
Zsolt Harasztia3410312016-09-18 23:29:04 -070090
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070091 def stop(self):
Zsolt Harasztia3410312016-09-18 23:29:04 -070092 """Suspend leadership duties immediately"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070093 log.debug('stopping')
Zsolt Harasztia3410312016-09-18 23:29:04 -070094 self.halted = True
95
96 # any active cancellations, releases, etc., should happen here
97 if isinstance(self.reassignment_soak_timer, DelayedCall):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070098 if not self.reassignment_soak_timer.called:
99 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700100
khenaidoob1602a32017-07-27 16:59:52 -0400101 if isinstance(self.core_store_reassignment_soak_timer, DelayedCall):
102 if not self.core_store_reassignment_soak_timer.called:
103 self.core_store_reassignment_soak_timer.cancel()
104
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700105 log.info('stopped')
106
Zsolt Harasztia3410312016-09-18 23:29:04 -0700107 # Private methods:
108
Zsolt Harasztia3410312016-09-18 23:29:04 -0700109
110 def _start_tracking_assignments(self):
111 """
112 We must track both the cluster member list as well as the workload
113 list. Upon change in either, we must rerun our sharding algorithm
114 and reassign work as/if needed.
115 """
Zsolt Harasztia3410312016-09-18 23:29:04 -0700116 reactor.callLater(0, self._track_members, 0)
117
Zsolt Harasztia3410312016-09-18 23:29:04 -0700118 @inlineCallbacks
khenaidoo032d3302017-06-09 14:50:04 -0400119 def _get_core_store_mappings(self):
khenaidooa8588f22017-06-16 12:13:34 -0400120 try:
121 # Get the mapping record
khenaidoo032d3302017-06-09 14:50:04 -0400122 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400123 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400124 if mappings:
125 self.core_store_assignment = loads(mappings[0]['Value'])
126 return
127 else: # Key has not been created yet
128 # Create the key with an empty dictionary value
129 value = dict()
khenaidoo08d48d22017-06-29 19:42:49 -0400130 result = yield self.coord.kv_put(
131 self.coord.core_store_assignment_key,
132 dumps(value))
khenaidooa8588f22017-06-16 12:13:34 -0400133 if not result:
134 raise ConfigMappingException(self.instance_id)
khenaidoo032d3302017-06-09 14:50:04 -0400135
khenaidooa8588f22017-06-16 12:13:34 -0400136 # Ensure the record was created
137 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400138 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400139
140 self.core_store_assignment = loads(mappings[0]['Value'])
141
142 except Exception, e:
143 log.exception('error', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400144
145 @inlineCallbacks
146 def _update_core_store_references(self):
147 try:
148 # Get the current set of configs keys
149 (_, results) = yield self.coord.kv_get(
150 self.coord.core_store_prefix, recurse=False, keys=True)
151
152 matches = (self.core_data_id_match(e) for e in results or [])
153 core_ids = [m.group(1) for m in matches if m is not None]
154
155 self.core_store_ids = core_ids
156
157 # Update the config mapping
158 self._get_core_store_mappings()
159
160 log.debug('core-data', core_ids=core_ids,
161 assignment=self.core_store_assignment)
162
163 except Exception, e:
khenaidooa8588f22017-06-16 12:13:34 -0400164 log.exception('error-update-store', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400165
khenaidoo82ce00d2017-08-15 12:01:46 -0400166 def _sanitize_member_list(self, members):
167 # This method removes any duplicates from the member list using the
168 # voltha number from the member id and the time that voltha instance
169 # started, again from the member id. This method is meaningful only
Richard Jankowski4ea26632018-05-14 17:45:38 -0400170 # in a clustered environment (e.g. Docker swarm or Kubernetes). In
171 # a non-cluster environment the member id is formatted differently.
172 # In such a case, the method below will create an exception and
173 # return the member list as is.
khenaidoo82ce00d2017-08-15 12:01:46 -0400174
175 try:
176 unique_members = {}
177 update_occurred = False
178 log.info('members', members=members)
179 for member in members:
180 log.info('member', member=member)
181 # Extract the swarm assigned number of the voltha instance
182 voltha_number = self.core_match(member['id']).group(1)
183 timestamp = self.timestamp_match(member['id']).group(1)
184 if voltha_number not in unique_members:
185 unique_members[voltha_number] = {'id': member['id'],
186 'timestamp': timestamp,
187 'host': member['host']}
188 else:
189 # Verify whether if this member has the latest timestamp. If
190 # yes, overwrite the previous one
191 if unique_members[voltha_number]['timestamp'] < timestamp:
192 unique_members[voltha_number] = {'id': member['id'],
193 'timestamp': timestamp,
194 'host': member['host']}
khenaidoo5431e4c2017-08-17 15:05:40 -0400195 update_occurred = True
khenaidoo82ce00d2017-08-15 12:01:46 -0400196
197 if update_occurred:
198 updated_members = []
199 for _, unique_member in unique_members.iteritems():
200 updated_members.append({'host': unique_member['host'],
201 'id': unique_member['id']})
202 return updated_members
203 else:
204 return members
Richard Jankowski4ea26632018-05-14 17:45:38 -0400205 except Exception as e:
206 log.exception('extraction-error', e=e)
khenaidoo82ce00d2017-08-15 12:01:46 -0400207 return members
208
khenaidoo032d3302017-06-09 14:50:04 -0400209 @inlineCallbacks
khenaidoo5431e4c2017-08-17 15:05:40 -0400210 def _is_temporal_state(self, members):
211 try:
212 # First get the current core assignments
213 (_, results) = yield self.coord.kv_get(
214 self.coord.assignment_prefix,
215 recurse=True)
216
217 log.debug('core-assignments', assignment=results)
218 if results:
219 old_assignment = [
220 {'id': self.assignment_id_match(e['Key']).group(2),
221 'core': e['Value']}
222 for e in results]
223
224 # If there are no curr_assignments then we are starting the
225 # system. In this case we should keep processing
226 if len(old_assignment) == 0:
227 returnValue(False)
228
229 # Tackle the simplest scenario - #members >= #old_assignment
Gertjan Van Droogenbroeck26eabe12017-10-12 17:35:37 -0500230 if members is not None and len(members) >= len(old_assignment):
khenaidoo5431e4c2017-08-17 15:05:40 -0400231 returnValue(False)
232
233 # Everything else is a temporal state
234 log.info('temporal-state-detected', members=members,
235 old_assignments=old_assignment)
236
237 returnValue(True)
238 else:
239 returnValue(False)
240 except Exception as e:
241 log.exception('temporal-state-error', e=e)
242 returnValue(True)
243
244 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700245 def _track_members(self, index):
khenaidoob1602a32017-07-27 16:59:52 -0400246 previous_index = index
Zsolt Harasztia3410312016-09-18 23:29:04 -0700247 try:
khenaidood6e0e802017-08-29 19:55:44 -0400248 log.info('member-tracking-before')
249 is_timeout, (tmp_index, results) = yield \
250 self.coord.consul_get_with_timeout(
251 key=self.coord.membership_prefix,
252 recurse=True,
253 index=index,
254 timeout=10)
khenaidoo5431e4c2017-08-17 15:05:40 -0400255 # Check whether we are still the leader - a new regime may be in
256 # place by the time we see a membership update
257 if self.halted:
258 log.info('I am no longer the leader')
259 return
Zsolt Harasztia3410312016-09-18 23:29:04 -0700260
khenaidood6e0e802017-08-29 19:55:44 -0400261 if is_timeout:
262 log.debug('timeout-or-no-membership-changed')
263 return
264
khenaidoob1602a32017-07-27 16:59:52 -0400265 # This can happen if consul went down and came back with no data
266 if not results:
267 log.error('no-active-members')
268 # Bail out of leadership and go for an early election
269 self.coord._just_lost_leadership()
270 return
Zsolt Harasztia3410312016-09-18 23:29:04 -0700271
khenaidood6e0e802017-08-29 19:55:44 -0400272 # After timeout event the index returned from
273 # consul_get_with_timeout is None. If we are here it's not a
274 # timeout, therefore the index is a valid one.
275 index=tmp_index
276
277 log.info('membership-tracking-data', index=index, results=results)
278
khenaidoob1602a32017-07-27 16:59:52 -0400279 if previous_index != index:
280 log.info('membership-updated',
281 previous_index=previous_index, index=index)
khenaidoo032d3302017-06-09 14:50:04 -0400282
khenaidoob1602a32017-07-27 16:59:52 -0400283 # Rebuild the membership, if any
284
285 # Only members with valid session are considered active
286 members = [{'id': self.member_id_match(e['Key']).group(2),
287 'host': loads(e['Value'])['host_address']}
288 for e in results if 'Session' in e]
289
khenaidoo82ce00d2017-08-15 12:01:46 -0400290 if members:
291 updated_members = self._sanitize_member_list(members)
292 else:
293 updated_members = None
294
295 log.info('active-members', active_members=members,
296 sanitized_members=updated_members)
khenaidoob1602a32017-07-27 16:59:52 -0400297
khenaidoo5431e4c2017-08-17 15:05:40 -0400298 # Check if we are in a temporal state. If true wait for the
299 # next membership changes
300 temporal_state = yield self._is_temporal_state(updated_members)
301 if temporal_state:
302 log.info('temporal-state-detected')
303 pass # Wait for next member list change
304 elif updated_members != self.members:
305 # if the two sets are the same
khenaidoob1602a32017-07-27 16:59:52 -0400306 # update the current set of config
307 yield self._update_core_store_references()
308 log.info('membership-changed',
309 prev_members=self.members,
khenaidoo82ce00d2017-08-15 12:01:46 -0400310 curr_members=updated_members,
khenaidoob1602a32017-07-27 16:59:52 -0400311 core_store_mapping=self.core_store_assignment)
khenaidoo82ce00d2017-08-15 12:01:46 -0400312 self.members = updated_members
khenaidoob1602a32017-07-27 16:59:52 -0400313 self._restart_core_store_reassignment_soak_timer()
314 else:
315 log.debug('no-membership-change', index=index)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700316
317 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700318 log.exception('members-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400319 # to prevent flood
khenaidood6e0e802017-08-29 19:55:44 -0400320 yield asleep(self.members_tracking_sleep_to_prevent_flood)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700321 finally:
322 if not self.halted:
khenaidoo5431e4c2017-08-17 15:05:40 -0400323 reactor.callLater(1, self._track_members, index)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700324
325 def _restart_reassignment_soak_timer(self):
326
327 if self.reassignment_soak_timer is not None:
328 assert isinstance(self.reassignment_soak_timer, DelayedCall)
Zsolt Haraszti8dc1f5e2016-09-18 23:35:39 -0700329 if not self.reassignment_soak_timer.called:
330 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700331
332 self.reassignment_soak_timer = reactor.callLater(
333 self.soak_time, self._reassign_work)
334
khenaidoo032d3302017-06-09 14:50:04 -0400335 def _restart_core_store_reassignment_soak_timer(self):
336
khenaidoob1602a32017-07-27 16:59:52 -0400337 if self.core_store_reassignment_soak_timer is not None:
338 assert isinstance(self.core_store_reassignment_soak_timer, DelayedCall)
339 if not self.core_store_reassignment_soak_timer.called:
340 self.core_store_reassignment_soak_timer.cancel()
khenaidoo032d3302017-06-09 14:50:04 -0400341
khenaidoob1602a32017-07-27 16:59:52 -0400342 self.core_store_reassignment_soak_timer = reactor.callLater(
khenaidoo032d3302017-06-09 14:50:04 -0400343 self.soak_time, self._reassign_core_stores)
344
345 @inlineCallbacks
346 def _reassign_core_stores(self):
347
khenaidoo032d3302017-06-09 14:50:04 -0400348 def _get_core_data_id_from_instance(instance_name):
349 for id, instance in self.core_store_assignment.iteritems():
khenaidooa8588f22017-06-16 12:13:34 -0400350 if instance and instance['id'] == instance_name:
khenaidoo032d3302017-06-09 14:50:04 -0400351 return id
352
353 try:
khenaidooa8588f22017-06-16 12:13:34 -0400354 log.info('core-members', curr_members=self.members,
355 prev_members=self.core_store_assignment)
khenaidoo032d3302017-06-09 14:50:04 -0400356
357 # 1. clear the mapping for instances that are no longer running
358 updated_mapping = dict()
359 existing_active_config_members = set()
360 cleared_config_ids = set()
361 inactive_members = set()
khenaidoo032d3302017-06-09 14:50:04 -0400362 if self.core_store_assignment:
363 for id, instance in self.core_store_assignment.iteritems():
364 if instance not in self.members:
365 updated_mapping[id] = None
366 cleared_config_ids.add(id)
khenaidooa8588f22017-06-16 12:13:34 -0400367 if instance:
368 inactive_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400369 else:
370 updated_mapping[id] = instance
khenaidooa8588f22017-06-16 12:13:34 -0400371 existing_active_config_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400372
373 # 2. Update the mapping with the new set
374 current_id = max(self.core_store_assignment) \
khenaidoo08d48d22017-06-29 19:42:49 -0400375 if self.core_store_assignment else '0000'
khenaidoo032d3302017-06-09 14:50:04 -0400376 for instance in self.members:
khenaidooa8588f22017-06-16 12:13:34 -0400377 if instance['id'] not in existing_active_config_members:
khenaidoo032d3302017-06-09 14:50:04 -0400378 # Add the member to the config map
379 if cleared_config_ids:
380 # There is an empty slot
381 next_id = cleared_config_ids.pop()
382 updated_mapping[next_id] = instance
383 else:
384 # There are no empty slot, create new ids
khenaidoo08d48d22017-06-29 19:42:49 -0400385 current_id = get_next_core_id(current_id)
khenaidoo032d3302017-06-09 14:50:04 -0400386 updated_mapping[current_id] = instance
387
388 self.core_store_assignment = updated_mapping
khenaidooa8588f22017-06-16 12:13:34 -0400389 log.info('updated-assignment',
khenaidoo08d48d22017-06-29 19:42:49 -0400390 core_store_assignment=self.core_store_assignment,
391 inactive_members=inactive_members)
khenaidoo032d3302017-06-09 14:50:04 -0400392
393 # 3. save the mapping into consul
khenaidoo08d48d22017-06-29 19:42:49 -0400394 yield self.coord.kv_put(self.coord.core_store_assignment_key,
khenaidoo032d3302017-06-09 14:50:04 -0400395 dumps(self.core_store_assignment))
396
397 # 4. Assign the new workload to the newly created members
khenaidooa8588f22017-06-16 12:13:34 -0400398 curr_members_set = set([m['id'] for m in self.members])
khenaidoo08d48d22017-06-29 19:42:49 -0400399 new_members = curr_members_set.difference(
400 existing_active_config_members)
khenaidooa8588f22017-06-16 12:13:34 -0400401 for new_member_id in new_members:
khenaidoo032d3302017-06-09 14:50:04 -0400402 yield self.coord.kv_put(
403 self.coord.assignment_prefix
khenaidooa8588f22017-06-16 12:13:34 -0400404 + new_member_id + '/' +
khenaidoo032d3302017-06-09 14:50:04 -0400405 self.coord.core_storage_suffix,
khenaidooa8588f22017-06-16 12:13:34 -0400406 _get_core_data_id_from_instance(new_member_id))
khenaidoo032d3302017-06-09 14:50:04 -0400407
408 # 5. Remove non-existent members
khenaidooa8588f22017-06-16 12:13:34 -0400409 for member_id in inactive_members:
khenaidoo032d3302017-06-09 14:50:04 -0400410 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400411 self.coord.assignment_prefix + member_id, recurse=True)
khenaidoo032d3302017-06-09 14:50:04 -0400412 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400413 self.coord.membership_prefix + member_id,
khenaidoo032d3302017-06-09 14:50:04 -0400414 recurse=True)
415
416 except Exception as e:
417 log.exception('config-reassignment-failure', e=e)
418 self._restart_core_store_reassignment_soak_timer()