blob: 0dda65a5aa1cb18bd4c65960692f7444ad59543e [file] [log] [blame]
Zsolt Harasztia3410312016-09-18 23:29:04 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztia3410312016-09-18 23:29:04 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070017import re
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070018
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070019from hash_ring import HashRing
Zsolt Harasztia3410312016-09-18 23:29:04 -070020from structlog import get_logger
21from twisted.internet import reactor
22from twisted.internet.base import DelayedCall
23from twisted.internet.defer import inlineCallbacks, DeferredList
khenaidoo032d3302017-06-09 14:50:04 -040024from simplejson import dumps, loads
Zsolt Harasztia3410312016-09-18 23:29:04 -070025
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070026from common.utils.asleep import asleep
khenaidoo08d48d22017-06-29 19:42:49 -040027from common.utils.id_generation import get_next_core_id
Zsolt Harasztia3410312016-09-18 23:29:04 -070028
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070029log = get_logger()
30
Zsolt Harasztia3410312016-09-18 23:29:04 -070031
khenaidoo032d3302017-06-09 14:50:04 -040032class ConfigMappingException(Exception):
33 pass
34
35
Zsolt Harasztia3410312016-09-18 23:29:04 -070036class Leader(object):
37 """
38 A single instance of this object shall exist across the whole cluster.
39 This is guaranteed by the coordinator which instantiates this class
40 only when it secured the leadership lock, as well as calling the halt()
41 method in cases it looses the leadership lock.
42 """
43
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070044 ID_EXTRACTOR = '^(%s)([^/]+)$'
45 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
khenaidoo032d3302017-06-09 14:50:04 -040046 CORE_STORE_KEY_EXTRACTOR = '^%s(?P<core_store_id>[^/]+)/root$'
khenaidoo82ce00d2017-08-15 12:01:46 -040047 CORE_NUMBER_EXTRACTOR = '^.*\.([0-9]+)\..*$'
48 START_TIMESTAMP_EXTRACTOR = '^.*\..*\..*_([0-9]+)$'
Zsolt Harasztia3410312016-09-18 23:29:04 -070049
Zsolt Harasztia3410312016-09-18 23:29:04 -070050 # Public methods:
51
52 def __init__(self, coordinator):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070053
54 self.coord = coordinator
Zsolt Harasztia3410312016-09-18 23:29:04 -070055 self.halted = False
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070056 self.soak_time = 3 # soak till membership/workload changes settle
Zsolt Harasztia3410312016-09-18 23:29:04 -070057
58 self.workload = []
59 self.members = []
khenaidoo032d3302017-06-09 14:50:04 -040060 self.core_store_ids = []
61 self.core_store_assignment = None
62
Zsolt Harasztia3410312016-09-18 23:29:04 -070063 self.reassignment_soak_timer = None
khenaidoob1602a32017-07-27 16:59:52 -040064 self.core_store_reassignment_soak_timer = None
Zsolt Harasztia3410312016-09-18 23:29:04 -070065
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070066 self.workload_id_match = re.compile(
khenaidoo032d3302017-06-09 14:50:04 -040067 self.ID_EXTRACTOR % self.coord.workload_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070068
69 self.member_id_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040070 self.ID_EXTRACTOR % self.coord.membership_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070071
khenaidoo032d3302017-06-09 14:50:04 -040072 self.core_data_id_match = re.compile(
73 self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
74
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070075 self.assignment_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040076 self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070077
khenaidoo82ce00d2017-08-15 12:01:46 -040078 self.core_match = re.compile(self.CORE_NUMBER_EXTRACTOR).match
79 self.timestamp_match = re.compile(self.START_TIMESTAMP_EXTRACTOR ).match
80
81
Zsolt Harasztia3410312016-09-18 23:29:04 -070082 @inlineCallbacks
83 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070084 log.debug('starting')
khenaidooa8588f22017-06-16 12:13:34 -040085 # yield self._validate_workload()
Zsolt Harasztia3410312016-09-18 23:29:04 -070086 yield self._start_tracking_assignments()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070087 log.info('started')
Zsolt Harasztia3410312016-09-18 23:29:04 -070088
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070089 def stop(self):
Zsolt Harasztia3410312016-09-18 23:29:04 -070090 """Suspend leadership duties immediately"""
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070091 log.debug('stopping')
Zsolt Harasztia3410312016-09-18 23:29:04 -070092 self.halted = True
93
94 # any active cancellations, releases, etc., should happen here
95 if isinstance(self.reassignment_soak_timer, DelayedCall):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070096 if not self.reassignment_soak_timer.called:
97 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -070098
khenaidoob1602a32017-07-27 16:59:52 -040099 if isinstance(self.core_store_reassignment_soak_timer, DelayedCall):
100 if not self.core_store_reassignment_soak_timer.called:
101 self.core_store_reassignment_soak_timer.cancel()
102
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700103 log.info('stopped')
104
Zsolt Harasztia3410312016-09-18 23:29:04 -0700105 # Private methods:
106
Zsolt Harasztia3410312016-09-18 23:29:04 -0700107
108 def _start_tracking_assignments(self):
109 """
110 We must track both the cluster member list as well as the workload
111 list. Upon change in either, we must rerun our sharding algorithm
112 and reassign work as/if needed.
113 """
Zsolt Harasztia3410312016-09-18 23:29:04 -0700114 reactor.callLater(0, self._track_members, 0)
115
Zsolt Harasztia3410312016-09-18 23:29:04 -0700116 @inlineCallbacks
khenaidoo032d3302017-06-09 14:50:04 -0400117 def _get_core_store_mappings(self):
khenaidooa8588f22017-06-16 12:13:34 -0400118 try:
119 # Get the mapping record
khenaidoo032d3302017-06-09 14:50:04 -0400120 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400121 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400122 if mappings:
123 self.core_store_assignment = loads(mappings[0]['Value'])
124 return
125 else: # Key has not been created yet
126 # Create the key with an empty dictionary value
127 value = dict()
khenaidoo08d48d22017-06-29 19:42:49 -0400128 result = yield self.coord.kv_put(
129 self.coord.core_store_assignment_key,
130 dumps(value))
khenaidooa8588f22017-06-16 12:13:34 -0400131 if not result:
132 raise ConfigMappingException(self.instance_id)
khenaidoo032d3302017-06-09 14:50:04 -0400133
khenaidooa8588f22017-06-16 12:13:34 -0400134 # Ensure the record was created
135 (_, mappings) = yield self.coord.kv_get(
khenaidoo08d48d22017-06-29 19:42:49 -0400136 self.coord.core_store_assignment_key, recurse=True)
khenaidooa8588f22017-06-16 12:13:34 -0400137
138 self.core_store_assignment = loads(mappings[0]['Value'])
139
140 except Exception, e:
141 log.exception('error', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400142
143 @inlineCallbacks
144 def _update_core_store_references(self):
145 try:
146 # Get the current set of configs keys
147 (_, results) = yield self.coord.kv_get(
148 self.coord.core_store_prefix, recurse=False, keys=True)
149
150 matches = (self.core_data_id_match(e) for e in results or [])
151 core_ids = [m.group(1) for m in matches if m is not None]
152
153 self.core_store_ids = core_ids
154
155 # Update the config mapping
156 self._get_core_store_mappings()
157
158 log.debug('core-data', core_ids=core_ids,
159 assignment=self.core_store_assignment)
160
161 except Exception, e:
khenaidooa8588f22017-06-16 12:13:34 -0400162 log.exception('error-update-store', e=e)
khenaidoo032d3302017-06-09 14:50:04 -0400163
khenaidoo82ce00d2017-08-15 12:01:46 -0400164 def _sanitize_member_list(self, members):
165 # This method removes any duplicates from the member list using the
166 # voltha number from the member id and the time that voltha instance
167 # started, again from the member id. This method is meaningful only
168 # in the swarm cluster. In a non-cluster environment the member id
169 # is formatted differently. In such a case, the method below will
170 # create an exception and return the member list as is.
171
172 try:
173 unique_members = {}
174 update_occurred = False
175 log.info('members', members=members)
176 for member in members:
177 log.info('member', member=member)
178 # Extract the swarm assigned number of the voltha instance
179 voltha_number = self.core_match(member['id']).group(1)
180 timestamp = self.timestamp_match(member['id']).group(1)
181 if voltha_number not in unique_members:
182 unique_members[voltha_number] = {'id': member['id'],
183 'timestamp': timestamp,
184 'host': member['host']}
185 else:
186 # Verify whether if this member has the latest timestamp. If
187 # yes, overwrite the previous one
188 if unique_members[voltha_number]['timestamp'] < timestamp:
189 unique_members[voltha_number] = {'id': member['id'],
190 'timestamp': timestamp,
191 'host': member['host']}
192 update_occurred = True
193
194 if update_occurred:
195 updated_members = []
196 for _, unique_member in unique_members.iteritems():
197 updated_members.append({'host': unique_member['host'],
198 'id': unique_member['id']})
199 return updated_members
200 else:
201 return members
202 except:
203 return members
204
khenaidoo032d3302017-06-09 14:50:04 -0400205 @inlineCallbacks
Zsolt Harasztia3410312016-09-18 23:29:04 -0700206 def _track_members(self, index):
khenaidoob1602a32017-07-27 16:59:52 -0400207 previous_index = index
Zsolt Harasztia3410312016-09-18 23:29:04 -0700208 try:
khenaidoob1602a32017-07-27 16:59:52 -0400209 # Put a wait of 5 seconds to wait for a change of membership,
210 # if any. Without it, if all consul nodes go down then we will
211 # never get out of this watch.
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700212 (index, results) = yield self.coord.kv_get(
khenaidoob1602a32017-07-27 16:59:52 -0400213 self.coord.membership_prefix, wait='5s', index=index, recurse=True)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700214
khenaidoob1602a32017-07-27 16:59:52 -0400215 # This can happen if consul went down and came back with no data
216 if not results:
217 log.error('no-active-members')
218 # Bail out of leadership and go for an early election
219 self.coord._just_lost_leadership()
220 return
Zsolt Harasztia3410312016-09-18 23:29:04 -0700221
khenaidoob1602a32017-07-27 16:59:52 -0400222 if previous_index != index:
223 log.info('membership-updated',
224 previous_index=previous_index, index=index)
khenaidoo032d3302017-06-09 14:50:04 -0400225
khenaidoob1602a32017-07-27 16:59:52 -0400226 # Rebuild the membership, if any
227
228 # Only members with valid session are considered active
229 members = [{'id': self.member_id_match(e['Key']).group(2),
230 'host': loads(e['Value'])['host_address']}
231 for e in results if 'Session' in e]
232
khenaidoo82ce00d2017-08-15 12:01:46 -0400233 if members:
234 updated_members = self._sanitize_member_list(members)
235 else:
236 updated_members = None
237
238 log.info('active-members', active_members=members,
239 sanitized_members=updated_members)
khenaidoob1602a32017-07-27 16:59:52 -0400240
241 # Check if the two sets are the same
khenaidoo82ce00d2017-08-15 12:01:46 -0400242 if updated_members != self.members:
khenaidoob1602a32017-07-27 16:59:52 -0400243 # update the current set of config
244 yield self._update_core_store_references()
245 log.info('membership-changed',
246 prev_members=self.members,
khenaidoo82ce00d2017-08-15 12:01:46 -0400247 curr_members=updated_members,
khenaidoob1602a32017-07-27 16:59:52 -0400248 core_store_mapping=self.core_store_assignment)
khenaidoo82ce00d2017-08-15 12:01:46 -0400249 self.members = updated_members
khenaidoob1602a32017-07-27 16:59:52 -0400250 self._restart_core_store_reassignment_soak_timer()
251 else:
252 log.debug('no-membership-change', index=index)
Zsolt Harasztia3410312016-09-18 23:29:04 -0700253
254 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700255 log.exception('members-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400256 yield asleep(
257 self.coord.leader_config.get(
258 self.coord.leader_config[
259 'members_track_error_to_prevent_flood']), 1)
260 # to prevent flood
Zsolt Harasztia3410312016-09-18 23:29:04 -0700261 finally:
262 if not self.halted:
263 reactor.callLater(0, self._track_members, index)
264
265 def _restart_reassignment_soak_timer(self):
266
267 if self.reassignment_soak_timer is not None:
268 assert isinstance(self.reassignment_soak_timer, DelayedCall)
Zsolt Haraszti8dc1f5e2016-09-18 23:35:39 -0700269 if not self.reassignment_soak_timer.called:
270 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700271
272 self.reassignment_soak_timer = reactor.callLater(
273 self.soak_time, self._reassign_work)
274
khenaidoo032d3302017-06-09 14:50:04 -0400275 def _restart_core_store_reassignment_soak_timer(self):
276
khenaidoob1602a32017-07-27 16:59:52 -0400277 if self.core_store_reassignment_soak_timer is not None:
278 assert isinstance(self.core_store_reassignment_soak_timer, DelayedCall)
279 if not self.core_store_reassignment_soak_timer.called:
280 self.core_store_reassignment_soak_timer.cancel()
khenaidoo032d3302017-06-09 14:50:04 -0400281
khenaidoob1602a32017-07-27 16:59:52 -0400282 self.core_store_reassignment_soak_timer = reactor.callLater(
khenaidoo032d3302017-06-09 14:50:04 -0400283 self.soak_time, self._reassign_core_stores)
284
285 @inlineCallbacks
286 def _reassign_core_stores(self):
287
khenaidoo032d3302017-06-09 14:50:04 -0400288 def _get_core_data_id_from_instance(instance_name):
289 for id, instance in self.core_store_assignment.iteritems():
khenaidooa8588f22017-06-16 12:13:34 -0400290 if instance and instance['id'] == instance_name:
khenaidoo032d3302017-06-09 14:50:04 -0400291 return id
292
293 try:
khenaidooa8588f22017-06-16 12:13:34 -0400294 log.info('core-members', curr_members=self.members,
295 prev_members=self.core_store_assignment)
khenaidoo032d3302017-06-09 14:50:04 -0400296
297 # 1. clear the mapping for instances that are no longer running
298 updated_mapping = dict()
299 existing_active_config_members = set()
300 cleared_config_ids = set()
301 inactive_members = set()
khenaidoo032d3302017-06-09 14:50:04 -0400302 if self.core_store_assignment:
303 for id, instance in self.core_store_assignment.iteritems():
304 if instance not in self.members:
305 updated_mapping[id] = None
306 cleared_config_ids.add(id)
khenaidooa8588f22017-06-16 12:13:34 -0400307 if instance:
308 inactive_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400309 else:
310 updated_mapping[id] = instance
khenaidooa8588f22017-06-16 12:13:34 -0400311 existing_active_config_members.add(instance['id'])
khenaidoo032d3302017-06-09 14:50:04 -0400312
313 # 2. Update the mapping with the new set
314 current_id = max(self.core_store_assignment) \
khenaidoo08d48d22017-06-29 19:42:49 -0400315 if self.core_store_assignment else '0000'
khenaidoo032d3302017-06-09 14:50:04 -0400316 for instance in self.members:
khenaidooa8588f22017-06-16 12:13:34 -0400317 if instance['id'] not in existing_active_config_members:
khenaidoo032d3302017-06-09 14:50:04 -0400318 # Add the member to the config map
319 if cleared_config_ids:
320 # There is an empty slot
321 next_id = cleared_config_ids.pop()
322 updated_mapping[next_id] = instance
323 else:
324 # There are no empty slot, create new ids
khenaidoo08d48d22017-06-29 19:42:49 -0400325 current_id = get_next_core_id(current_id)
khenaidoo032d3302017-06-09 14:50:04 -0400326 updated_mapping[current_id] = instance
327
328 self.core_store_assignment = updated_mapping
khenaidooa8588f22017-06-16 12:13:34 -0400329 log.info('updated-assignment',
khenaidoo08d48d22017-06-29 19:42:49 -0400330 core_store_assignment=self.core_store_assignment,
331 inactive_members=inactive_members)
khenaidoo032d3302017-06-09 14:50:04 -0400332
333 # 3. save the mapping into consul
khenaidoo08d48d22017-06-29 19:42:49 -0400334 yield self.coord.kv_put(self.coord.core_store_assignment_key,
khenaidoo032d3302017-06-09 14:50:04 -0400335 dumps(self.core_store_assignment))
336
337 # 4. Assign the new workload to the newly created members
khenaidooa8588f22017-06-16 12:13:34 -0400338 curr_members_set = set([m['id'] for m in self.members])
khenaidoo08d48d22017-06-29 19:42:49 -0400339 new_members = curr_members_set.difference(
340 existing_active_config_members)
khenaidooa8588f22017-06-16 12:13:34 -0400341 for new_member_id in new_members:
khenaidoo032d3302017-06-09 14:50:04 -0400342 yield self.coord.kv_put(
343 self.coord.assignment_prefix
khenaidooa8588f22017-06-16 12:13:34 -0400344 + new_member_id + '/' +
khenaidoo032d3302017-06-09 14:50:04 -0400345 self.coord.core_storage_suffix,
khenaidooa8588f22017-06-16 12:13:34 -0400346 _get_core_data_id_from_instance(new_member_id))
khenaidoo032d3302017-06-09 14:50:04 -0400347
348 # 5. Remove non-existent members
khenaidooa8588f22017-06-16 12:13:34 -0400349 for member_id in inactive_members:
khenaidoo032d3302017-06-09 14:50:04 -0400350 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400351 self.coord.assignment_prefix + member_id, recurse=True)
khenaidoo032d3302017-06-09 14:50:04 -0400352 yield self.coord.kv_delete(
khenaidooa8588f22017-06-16 12:13:34 -0400353 self.coord.membership_prefix + member_id,
khenaidoo032d3302017-06-09 14:50:04 -0400354 recurse=True)
355
356 except Exception as e:
357 log.exception('config-reassignment-failure', e=e)
358 self._restart_core_store_reassignment_soak_timer()