blob: ed4fd6d6fbf0d8076289f05277221e9f01dc3ebf [file] [log] [blame]
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from structlog import get_logger
19from twisted.internet import reactor
20from twisted.internet.base import DelayedCall
khenaidoo032d3302017-06-09 14:50:04 -040021from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
khenaidoo08d48d22017-06-29 19:42:49 -040022from simplejson import dumps, loads
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070023
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070024from common.utils.asleep import asleep
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070025
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070026log = get_logger()
27
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070028
29class Worker(object):
30 """
31 Worker side of the coordinator. An instance of this class runs in every
32 voltha instance. It monitors what work is assigned to this instance by
33 the leader. This is all done via consul.
34 """
35
36 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
37
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070038 # Public methods:
39
40 def __init__(self, instance_id, coordinator):
41
42 self.instance_id = instance_id
43 self.coord = coordinator
44 self.halted = False
45 self.soak_time = 0.5 # soak till assignment list settles
46
47 self.my_workload = set() # list of work_id's assigned to me
48
49 self.assignment_soak_timer = None
khenaidoo08d48d22017-06-29 19:42:49 -040050 self.assignment_core_store_soak_timer = None
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070051 self.my_candidate_workload = set() # we stash here during soaking
52
53 self.assignment_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040054 self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070055
khenaidoo032d3302017-06-09 14:50:04 -040056 self.mycore_store_id = None
57
58 self.wait_for_core_store_assignment = Deferred()
59
khenaidoo08d48d22017-06-29 19:42:49 -040060 self.peers_map = None
61
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070062 @inlineCallbacks
63 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070064 log.debug('starting')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070065 yield self._start_tracking_my_assignments()
khenaidoo08d48d22017-06-29 19:42:49 -040066 yield self._start_tracking_my_peers()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070067 log.info('started')
68 returnValue(self)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070069
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 def stop(self):
71 log.debug('stopping')
khenaidooe154d592017-08-03 19:08:27 -040072 self.halted = True
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070073 if isinstance(self.assignment_soak_timer, DelayedCall):
74 if not self.assignment_soak_timer.called:
75 self.assignment_soak_timer.cancel()
khenaidooe154d592017-08-03 19:08:27 -040076
77 if isinstance(self.assignment_core_store_soak_timer, DelayedCall):
78 if not self.assignment_core_store_soak_timer.called:
79 self.assignment_core_store_soak_timer.cancel()
80
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070081 log.info('stopped')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070082
khenaidoo032d3302017-06-09 14:50:04 -040083 @inlineCallbacks
84 def get_core_store_id(self):
85 if self.mycore_store_id:
86 returnValue(self.mycore_store_id)
87 else:
88 # Let's wait until we get assigned a store_id from the leader
89 val = yield self.wait_for_core_store_assignment
90 returnValue(val)
91
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070092 # Private methods:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070093 def _start_tracking_my_assignments(self):
94 reactor.callLater(0, self._track_my_assignments, 0)
95
khenaidoo08d48d22017-06-29 19:42:49 -040096 def _start_tracking_my_peers(self):
97 reactor.callLater(0, self._track_my_peers, 0)
98
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070099 @inlineCallbacks
100 def _track_my_assignments(self, index):
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700101 try:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700102 # if there is no leader yet, wait for a stable leader
103 d = self.coord.wait_for_a_leader()
104 if not d.called:
105 yield d
106 # additional time to let leader update
107 # assignments, to minimize potential churn
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400108 yield asleep(self.coord.worker_config.get(
109 self.coord.worker_config['time_to_let_leader_update'], 5))
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700110
111 (index, results) = yield self.coord.kv_get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400112 self.coord.assignment_prefix + self.instance_id,
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700113 index=index, recurse=True)
114
khenaidoo032d3302017-06-09 14:50:04 -0400115 # 1. Check whether we have been assigned a full voltha instance
116 if results and not self.mycore_store_id:
117 # We have no store id set yet
118 core_stores = [c['Value'] for c in results if
khenaidoo08d48d22017-06-29 19:42:49 -0400119 c['Key'] == self.coord.assignment_prefix +
120 self.instance_id + '/' +
Gertjan Van Droogenbroeck63846962017-10-10 17:58:38 -0500121 self.coord.core_storage_suffix and c['Value']]
khenaidoo032d3302017-06-09 14:50:04 -0400122 if core_stores:
123 self.mycore_store_id = core_stores[0]
124 log.debug('store-assigned',
khenaidoo08d48d22017-06-29 19:42:49 -0400125 mycore_store_id=self.mycore_store_id)
khenaidoo032d3302017-06-09 14:50:04 -0400126 self._stash_and_restart_core_store_soak_timer()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700127
khenaidoo032d3302017-06-09 14:50:04 -0400128 # 2. Check whether we have been assigned a work item
129 if results and self.mycore_store_id:
130 # Check for difference between current worload and newer one
131 # TODO: Depending on how workload gets load balanced we may
132 # need to add workload distribution here
133 pass
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700134
135 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700136 log.exception('assignments-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400137 yield asleep(
138 self.coord.worker_config.get(
139 self.coord.worker_config[
140 'assignments_track_error_to_avoid_flood'], 1))
141 # to prevent flood
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700142
143 finally:
khenaidoo08d48d22017-06-29 19:42:49 -0400144 if not self.halted and not self.mycore_store_id:
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700145 reactor.callLater(0, self._track_my_assignments, index)
146
khenaidoo08d48d22017-06-29 19:42:49 -0400147 @inlineCallbacks
148 def _track_my_peers(self, index):
149 try:
khenaidoo5431e4c2017-08-17 15:05:40 -0400150 prev_index = index
khenaidoo08d48d22017-06-29 19:42:49 -0400151 if self.mycore_store_id:
152 # Wait for updates to the store assigment key
khenaidood6e0e802017-08-29 19:55:44 -0400153 is_timeout, (tmp_index, mappings) = yield \
Zack Williams18357ed2018-11-14 10:41:08 -0700154 self.coord.coordinator_get_with_timeout(
khenaidood6e0e802017-08-29 19:55:44 -0400155 key=self.coord.core_store_assignment_key,
156 recurse=True,
157 index=index,
158 timeout=10)
159
160 if is_timeout:
161 return
162
163 # After timeout event the index returned from
Zack Williams18357ed2018-11-14 10:41:08 -0700164 # coordinator_get_with_timeout is None. If we are here it's
165 # not a timeout, therefore the index is a valid one.
khenaidood6e0e802017-08-29 19:55:44 -0400166 index=tmp_index
167
khenaidoo5431e4c2017-08-17 15:05:40 -0400168 if mappings and index != prev_index:
khenaidoo08d48d22017-06-29 19:42:49 -0400169 new_map = loads(mappings[0]['Value'])
170 # Remove my id from my peers list
171 new_map.pop(self.mycore_store_id)
172 if self.peers_map is None or self.peers_map != new_map:
173 self.coord.publish_peers_map_change(new_map)
174 self.peers_map = new_map
khenaidoo82ce00d2017-08-15 12:01:46 -0400175 log.info('peer-mapping-changed', mapping=new_map)
khenaidoo5431e4c2017-08-17 15:05:40 -0400176 else:
177 log.debug('no-mapping-change', mappings=mappings,
178 index=index, prev_index=prev_index)
khenaidoo08d48d22017-06-29 19:42:49 -0400179
180 except Exception, e:
181 log.exception('peer-track-error', e=e)
182 yield asleep(
183 self.coord.worker_config.get(
184 self.coord.worker_config[
185 'assignments_track_error_to_avoid_flood'], 1))
186 # to prevent flood
187 finally:
188 if not self.halted:
189 # Wait longer if we have not received a core id yet
khenaidoo5431e4c2017-08-17 15:05:40 -0400190 reactor.callLater(1 if self.mycore_store_id else 5,
khenaidoo08d48d22017-06-29 19:42:49 -0400191 self._track_my_peers, index)
192
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700193 def _stash_and_restart_soak_timer(self, candidate_workload):
194
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700195 log.debug('re-start-assignment-soaking')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700196
197 if self.assignment_soak_timer is not None:
198 if not self.assignment_soak_timer.called:
199 self.assignment_soak_timer.cancel()
200
201 self.my_candidate_workload = candidate_workload
202 self.assignment_soak_timer = reactor.callLater(
203 self.soak_time, self._update_assignments)
204
205 def _update_assignments(self):
206 """
207 Called when finally the dust has settled on our assignments.
208 :return: None
209 """
khenaidoo032d3302017-06-09 14:50:04 -0400210 log.debug('my-assignments-changed',
khenaidoo08d48d22017-06-29 19:42:49 -0400211 old_count=len(self.my_workload),
212 new_count=len(self.my_candidate_workload),
213 workload=self.my_workload)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700214 self.my_workload, self.my_candidate_workload = \
215 self.my_candidate_workload, None
khenaidoo032d3302017-06-09 14:50:04 -0400216
217 def _stash_and_restart_core_store_soak_timer(self):
218
219 log.debug('re-start-assignment-config-soaking')
220
khenaidoo08d48d22017-06-29 19:42:49 -0400221 if self.assignment_core_store_soak_timer is not None:
222 if not self.assignment_core_store_soak_timer.called:
223 self.assignment_core_store_soak_timer.cancel()
khenaidoo032d3302017-06-09 14:50:04 -0400224
khenaidoo08d48d22017-06-29 19:42:49 -0400225 self.assignment_core_store_soak_timer = reactor.callLater(
khenaidoo032d3302017-06-09 14:50:04 -0400226 self.soak_time, self._process_config_assignment)
227
228 def _process_config_assignment(self):
229 log.debug('process-config-assignment',
khenaidoo08d48d22017-06-29 19:42:49 -0400230 mycore_store_id=self.mycore_store_id)
231 self.wait_for_core_store_assignment.callback(self.mycore_store_id)