blob: 62bf1d542eaca7799ed8a29e94e4164b624c909e [file] [log] [blame]
Zsolt Harasztia3410312016-09-18 23:29:04 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztia3410312016-09-18 23:29:04 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070017import re
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070018
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070019from hash_ring import HashRing
Zsolt Harasztia3410312016-09-18 23:29:04 -070020from structlog import get_logger
21from twisted.internet import reactor
22from twisted.internet.base import DelayedCall
khenaidoo5431e4c2017-08-17 15:05:40 -040023from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
khenaidoo032d3302017-06-09 14:50:04 -040024from simplejson import dumps, loads
Zsolt Harasztia3410312016-09-18 23:29:04 -070025
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040027from common.utils.id_generation import get_next_core_id
Zsolt Harasztia3410312016-09-18 23:29:04 -070028
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070029log = get_logger()
30
Zsolt Harasztia3410312016-09-18 23:29:04 -070031
khenaidoo032d3302017-06-09 14:50:04 -040032class ConfigMappingException(Exception):
33 pass
34
35
Zsolt Harasztia3410312016-09-18 23:29:04 -070036class Leader(object):
37 """
38 A single instance of this object shall exist across the whole cluster.
39 This is guaranteed by the coordinator which instantiates this class
40 only when it secured the leadership lock, as well as calling the halt()
41 method in cases it looses the leadership lock.
42 """
43
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070044 ID_EXTRACTOR = '^(%s)([^/]+)$'
khenaidoo032d3302017-06-09 14:50:04 -040045 CORE_STORE_KEY_EXTRACTOR = '^%s(?P<core_store_id>[^/]+)/root$'
khenaidoo82ce00d2017-08-15 12:01:46 -040046 CORE_NUMBER_EXTRACTOR = '^.*\.([0-9]+)\..*$'
47 START_TIMESTAMP_EXTRACTOR = '^.*\..*\..*_([0-9]+)$'
khenaidoo5431e4c2017-08-17 15:05:40 -040048 ASSIGNMENT_ID_EXTRACTOR = '^(%s)([^/]+)/core_store$'
Zsolt Harasztia3410312016-09-18 23:29:04 -070049
Zsolt Harasztia3410312016-09-18 23:29:04 -070050 # Public methods:
51
52 def __init__(self, coordinator):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070053
54 self.coord = coordinator
Zsolt Harasztia3410312016-09-18 23:29:04 -070055 self.halted = False
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070056 self.soak_time = 3 # soak till membership/workload changes settle
Zsolt Harasztia3410312016-09-18 23:29:04 -070057
58 self.workload = []
59 self.members = []
khenaidoo032d3302017-06-09 14:50:04 -040060 self.core_store_ids = []
61 self.core_store_assignment = None
62
Zsolt Harasztia3410312016-09-18 23:29:04 -070063 self.reassignment_soak_timer = None
khenaidoob1602a32017-07-27 16:59:52 -040064 self.core_store_reassignment_soak_timer = None
Zsolt Harasztia3410312016-09-18 23:29:04 -070065
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070066 self.workload_id_match = re.compile(
khenaidoo032d3302017-06-09 14:50:04 -040067 self.ID_EXTRACTOR % self.coord.workload_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070068
69 self.member_id_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040070 self.ID_EXTRACTOR % self.coord.membership_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070071
khenaidoo032d3302017-06-09 14:50:04 -040072 self.core_data_id_match = re.compile(
73 self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
74
khenaidoo82ce00d2017-08-15 12:01:46 -040075 self.core_match = re.compile(self.CORE_NUMBER_EXTRACTOR).match
76 self.timestamp_match = re.compile(self.START_TIMESTAMP_EXTRACTOR ).match
77
khenaidoo5431e4c2017-08-17 15:05:40 -040078 self.assignment_id_match = re.compile(
79 self.ASSIGNMENT_ID_EXTRACTOR % self.coord.assignment_prefix).match
khenaidoo82ce00d2017-08-15 12:01:46 -040080
khenaidood6e0e802017-08-29 19:55:44 -040081 self.members_tracking_sleep_to_prevent_flood = \
82 self.coord.leader_config.get((self.coord.leader_config[
83 'members_track_error_to_prevent_flood']), 1)
84
Zsolt Harasztia3410312016-09-18 23:29:04 -070085 @inlineCallbacks
86 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070087 log.debug('starting')
khenaidooa8588f22017-06-16 12:13:34 -040088 # yield self._validate_workload()
Zsolt Harasztia3410312016-09-18 23:29:04 -070089 yield self._start_tracking_assignments()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070090 log.info('started')
Zsolt Harasztia3410312016-09-18 23:29:04 -070091
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070092 def stop(self):
Zsolt Harasztia3410312016-09-18 23:29:04 -070093 """Suspend leadership duties immediately"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070094 log.debug('stopping')
Zsolt Harasztia3410312016-09-18 23:29:04 -070095 self.halted = True
96
97 # any active cancellations, releases, etc., should happen here
98 if isinstance(self.reassignment_soak_timer, DelayedCall):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070099 if not self.reassignment_soak_timer.called:
100 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700101
khenaidoob1602a32017-07-27 16:59:52 -0400102 if isinstance(self.core_store_reassignment_soak_timer, DelayedCall):
103 if not self.core_store_reassignment_soak_timer.called:
104 self.core_store_reassignment_soak_timer.cancel()
105
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700106 log.info('stopped')
107
Zsolt Harasztia3410312016-09-18 23:29:04 -0700108 # Private methods:
109
Zsolt Harasztia3410312016-09-18 23:29:04 -0700110
111 def _start_tracking_assignments(self):
112 """
113 We must track both the cluster member list as well as the workload
114 list. Upon change in either, we must rerun our sharding algorithm
115 and reassign work as/if needed.
116 """
Zsolt Harasztia3410312016-09-18 23:29:04 -0700117 reactor.callLater(0, self._track_members, 0)
118
Zsolt Harasztia3410312016-09-18 23:29:04 -0700119 @inlineCallbacks
khenaidoo032d3302017-06-09 14:50:04 -0400120 def _get_core_store_mappings(self):
khenaidooa8588f22017-06-16 12:13:34 -0400121 try:
122 # Get the mapping record
khenaidoo032d3302017-06-09 14:50:04 -0400123 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400124 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400125 if mappings:
126 self.core_store_assignment = loads(mappings[0]['Value'])
127 return
128 else: # Key has not been created yet
129 # Create the key with an empty dictionary value
130 value = dict()
khenaidoo08d48d22017-06-29 19:42:49 -0400131 result = yield self.coord.kv_put(
132 self.coord.core_store_assignment_key,
133 dumps(value))
khenaidooa8588f22017-06-16 12:13:34 -0400134 if not result:
135 raise ConfigMappingException(self.instance_id)
khenaidoo032d3302017-06-09 14:50:04 -0400136
khenaidooa8588f22017-06-16 12:13:34 -0400137 # Ensure the record was created
138 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400139 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400140
141 self.core_store_assignment = loads(mappings[0]['Value'])
142
143 except Exception, e:
144 log.exception('error', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400145
146 @inlineCallbacks
147 def _update_core_store_references(self):
148 try:
149 # Get the current set of configs keys
150 (_, results) = yield self.coord.kv_get(
151 self.coord.core_store_prefix, recurse=False, keys=True)
152
153 matches = (self.core_data_id_match(e) for e in results or [])
154 core_ids = [m.group(1) for m in matches if m is not None]
155
156 self.core_store_ids = core_ids
157
158 # Update the config mapping
159 self._get_core_store_mappings()
160
161 log.debug('core-data', core_ids=core_ids,
162 assignment=self.core_store_assignment)
163
164 except Exception, e:
khenaidooa8588f22017-06-16 12:13:34 -0400165 log.exception('error-update-store', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400166
khenaidoo82ce00d2017-08-15 12:01:46 -0400167 def _sanitize_member_list(self, members):
168 # This method removes any duplicates from the member list using the
169 # voltha number from the member id and the time that voltha instance
170 # started, again from the member id. This method is meaningful only
171 # in the swarm cluster. In a non-cluster environment the member id
172 # is formatted differently. In such a case, the method below will
173 # create an exception and return the member list as is.
174
175 try:
176 unique_members = {}
177 update_occurred = False
178 log.info('members', members=members)
179 for member in members:
180 log.info('member', member=member)
181 # Extract the swarm assigned number of the voltha instance
182 voltha_number = self.core_match(member['id']).group(1)
183 timestamp = self.timestamp_match(member['id']).group(1)
184 if voltha_number not in unique_members:
185 unique_members[voltha_number] = {'id': member['id'],
186 'timestamp': timestamp,
187 'host': member['host']}
188 else:
189 # Verify whether if this member has the latest timestamp. If
190 # yes, overwrite the previous one
191 if unique_members[voltha_number]['timestamp'] < timestamp:
192 unique_members[voltha_number] = {'id': member['id'],
193 'timestamp': timestamp,
194 'host': member['host']}
khenaidoo5431e4c2017-08-17 15:05:40 -0400195 update_occurred = True
khenaidoo82ce00d2017-08-15 12:01:46 -0400196
197 if update_occurred:
198 updated_members = []
199 for _, unique_member in unique_members.iteritems():
200 updated_members.append({'host': unique_member['host'],
201 'id': unique_member['id']})
202 return updated_members
203 else:
204 return members
205 except:
206 return members
207
khenaidoo032d3302017-06-09 14:50:04 -0400208 @inlineCallbacks
khenaidoo5431e4c2017-08-17 15:05:40 -0400209 def _is_temporal_state(self, members):
210 try:
211 # First get the current core assignments
212 (_, results) = yield self.coord.kv_get(
213 self.coord.assignment_prefix,
214 recurse=True)
215
216 log.debug('core-assignments', assignment=results)
217 if results:
218 old_assignment = [
219 {'id': self.assignment_id_match(e['Key']).group(2),
220 'core': e['Value']}
221 for e in results]
222
223 # If there are no curr_assignments then we are starting the
224 # system. In this case we should keep processing
225 if len(old_assignment) == 0:
226 returnValue(False)
227
228 # Tackle the simplest scenario - #members >= #old_assignment
229 if len(members) >= len(old_assignment):
230 returnValue(False)
231
232 # Everything else is a temporal state
233 log.info('temporal-state-detected', members=members,
234 old_assignments=old_assignment)
235
236 returnValue(True)
237 else:
238 returnValue(False)
239 except Exception as e:
240 log.exception('temporal-state-error', e=e)
241 returnValue(True)
242
243 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700244 def _track_members(self, index):
khenaidoob1602a32017-07-27 16:59:52 -0400245 previous_index = index
Zsolt Harasztia3410312016-09-18 23:29:04 -0700246 try:
khenaidood6e0e802017-08-29 19:55:44 -0400247 log.info('member-tracking-before')
248 is_timeout, (tmp_index, results) = yield \
249 self.coord.consul_get_with_timeout(
250 key=self.coord.membership_prefix,
251 recurse=True,
252 index=index,
253 timeout=10)
khenaidoo5431e4c2017-08-17 15:05:40 -0400254 # Check whether we are still the leader - a new regime may be in
255 # place by the time we see a membership update
256 if self.halted:
257 log.info('I am no longer the leader')
258 return
Zsolt Harasztia3410312016-09-18 23:29:04 -0700259
khenaidood6e0e802017-08-29 19:55:44 -0400260 if is_timeout:
261 log.debug('timeout-or-no-membership-changed')
262 return
263
khenaidoob1602a32017-07-27 16:59:52 -0400264 # This can happen if consul went down and came back with no data
265 if not results:
266 log.error('no-active-members')
267 # Bail out of leadership and go for an early election
268 self.coord._just_lost_leadership()
269 return
Zsolt Harasztia3410312016-09-18 23:29:04 -0700270
khenaidood6e0e802017-08-29 19:55:44 -0400271 # After timeout event the index returned from
272 # consul_get_with_timeout is None. If we are here it's not a
273 # timeout, therefore the index is a valid one.
274 index=tmp_index
275
276 log.info('membership-tracking-data', index=index, results=results)
277
khenaidoob1602a32017-07-27 16:59:52 -0400278 if previous_index != index:
279 log.info('membership-updated',
280 previous_index=previous_index, index=index)
khenaidoo032d3302017-06-09 14:50:04 -0400281
khenaidoob1602a32017-07-27 16:59:52 -0400282 # Rebuild the membership, if any
283
284 # Only members with valid session are considered active
285 members = [{'id': self.member_id_match(e['Key']).group(2),
286 'host': loads(e['Value'])['host_address']}
287 for e in results if 'Session' in e]
288
khenaidoo82ce00d2017-08-15 12:01:46 -0400289 if members:
290 updated_members = self._sanitize_member_list(members)
291 else:
292 updated_members = None
293
294 log.info('active-members', active_members=members,
295 sanitized_members=updated_members)
khenaidoob1602a32017-07-27 16:59:52 -0400296
khenaidoo5431e4c2017-08-17 15:05:40 -0400297 # Check if we are in a temporal state. If true wait for the
298 # next membership changes
299 temporal_state = yield self._is_temporal_state(updated_members)
300 if temporal_state:
301 log.info('temporal-state-detected')
302 pass # Wait for next member list change
303 elif updated_members != self.members:
304 # if the two sets are the same
khenaidoob1602a32017-07-27 16:59:52 -0400305 # update the current set of config
306 yield self._update_core_store_references()
307 log.info('membership-changed',
308 prev_members=self.members,
khenaidoo82ce00d2017-08-15 12:01:46 -0400309 curr_members=updated_members,
khenaidoob1602a32017-07-27 16:59:52 -0400310 core_store_mapping=self.core_store_assignment)
khenaidoo82ce00d2017-08-15 12:01:46 -0400311 self.members = updated_members
khenaidoob1602a32017-07-27 16:59:52 -0400312 self._restart_core_store_reassignment_soak_timer()
313 else:
314 log.debug('no-membership-change', index=index)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700315
316 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700317 log.exception('members-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400318 # to prevent flood
khenaidood6e0e802017-08-29 19:55:44 -0400319 yield asleep(self.members_tracking_sleep_to_prevent_flood)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700320 finally:
321 if not self.halted:
khenaidoo5431e4c2017-08-17 15:05:40 -0400322 reactor.callLater(1, self._track_members, index)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700323
324 def _restart_reassignment_soak_timer(self):
325
326 if self.reassignment_soak_timer is not None:
327 assert isinstance(self.reassignment_soak_timer, DelayedCall)
Zsolt Haraszti8dc1f5e2016-09-18 23:35:39 -0700328 if not self.reassignment_soak_timer.called:
329 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700330
331 self.reassignment_soak_timer = reactor.callLater(
332 self.soak_time, self._reassign_work)
333
khenaidoo032d3302017-06-09 14:50:04 -0400334 def _restart_core_store_reassignment_soak_timer(self):
335
khenaidoob1602a32017-07-27 16:59:52 -0400336 if self.core_store_reassignment_soak_timer is not None:
337 assert isinstance(self.core_store_reassignment_soak_timer, DelayedCall)
338 if not self.core_store_reassignment_soak_timer.called:
339 self.core_store_reassignment_soak_timer.cancel()
khenaidoo032d3302017-06-09 14:50:04 -0400340
khenaidoob1602a32017-07-27 16:59:52 -0400341 self.core_store_reassignment_soak_timer = reactor.callLater(
khenaidoo032d3302017-06-09 14:50:04 -0400342 self.soak_time, self._reassign_core_stores)
343
344 @inlineCallbacks
345 def _reassign_core_stores(self):
346
khenaidoo032d3302017-06-09 14:50:04 -0400347 def _get_core_data_id_from_instance(instance_name):
348 for id, instance in self.core_store_assignment.iteritems():
khenaidooa8588f22017-06-16 12:13:34 -0400349 if instance and instance['id'] == instance_name:
khenaidoo032d3302017-06-09 14:50:04 -0400350 return id
351
352 try:
khenaidooa8588f22017-06-16 12:13:34 -0400353 log.info('core-members', curr_members=self.members,
354 prev_members=self.core_store_assignment)
khenaidoo032d3302017-06-09 14:50:04 -0400355
356 # 1. clear the mapping for instances that are no longer running
357 updated_mapping = dict()
358 existing_active_config_members = set()
359 cleared_config_ids = set()
360 inactive_members = set()
khenaidoo032d3302017-06-09 14:50:04 -0400361 if self.core_store_assignment:
362 for id, instance in self.core_store_assignment.iteritems():
363 if instance not in self.members:
364 updated_mapping[id] = None
365 cleared_config_ids.add(id)
khenaidooa8588f22017-06-16 12:13:34 -0400366 if instance:
367 inactive_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400368 else:
369 updated_mapping[id] = instance
khenaidooa8588f22017-06-16 12:13:34 -0400370 existing_active_config_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400371
372 # 2. Update the mapping with the new set
373 current_id = max(self.core_store_assignment) \
khenaidoo08d48d22017-06-29 19:42:49 -0400374 if self.core_store_assignment else '0000'
khenaidoo032d3302017-06-09 14:50:04 -0400375 for instance in self.members:
khenaidooa8588f22017-06-16 12:13:34 -0400376 if instance['id'] not in existing_active_config_members:
khenaidoo032d3302017-06-09 14:50:04 -0400377 # Add the member to the config map
378 if cleared_config_ids:
379 # There is an empty slot
380 next_id = cleared_config_ids.pop()
381 updated_mapping[next_id] = instance
382 else:
383 # There are no empty slot, create new ids
khenaidoo08d48d22017-06-29 19:42:49 -0400384 current_id = get_next_core_id(current_id)
khenaidoo032d3302017-06-09 14:50:04 -0400385 updated_mapping[current_id] = instance
386
387 self.core_store_assignment = updated_mapping
khenaidooa8588f22017-06-16 12:13:34 -0400388 log.info('updated-assignment',
khenaidoo08d48d22017-06-29 19:42:49 -0400389 core_store_assignment=self.core_store_assignment,
390 inactive_members=inactive_members)
khenaidoo032d3302017-06-09 14:50:04 -0400391
392 # 3. save the mapping into consul
khenaidoo08d48d22017-06-29 19:42:49 -0400393 yield self.coord.kv_put(self.coord.core_store_assignment_key,
khenaidoo032d3302017-06-09 14:50:04 -0400394 dumps(self.core_store_assignment))
395
396 # 4. Assign the new workload to the newly created members
khenaidooa8588f22017-06-16 12:13:34 -0400397 curr_members_set = set([m['id'] for m in self.members])
khenaidoo08d48d22017-06-29 19:42:49 -0400398 new_members = curr_members_set.difference(
399 existing_active_config_members)
khenaidooa8588f22017-06-16 12:13:34 -0400400 for new_member_id in new_members:
khenaidoo032d3302017-06-09 14:50:04 -0400401 yield self.coord.kv_put(
402 self.coord.assignment_prefix
khenaidooa8588f22017-06-16 12:13:34 -0400403 + new_member_id + '/' +
khenaidoo032d3302017-06-09 14:50:04 -0400404 self.coord.core_storage_suffix,
khenaidooa8588f22017-06-16 12:13:34 -0400405 _get_core_data_id_from_instance(new_member_id))
khenaidoo032d3302017-06-09 14:50:04 -0400406
407 # 5. Remove non-existent members
khenaidooa8588f22017-06-16 12:13:34 -0400408 for member_id in inactive_members:
khenaidoo032d3302017-06-09 14:50:04 -0400409 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400410 self.coord.assignment_prefix + member_id, recurse=True)
khenaidoo032d3302017-06-09 14:50:04 -0400411 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400412 self.coord.membership_prefix + member_id,
khenaidoo032d3302017-06-09 14:50:04 -0400413 recurse=True)
414
415 except Exception as e:
416 log.exception('config-reassignment-failure', e=e)
417 self._restart_core_store_reassignment_soak_timer()