blob: aff83b1eab55a219ab878d399acc40818110d8a7 [file] [log] [blame]
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from structlog import get_logger
19from twisted.internet import reactor
20from twisted.internet.base import DelayedCall
khenaidoo032d3302017-06-09 14:50:04 -040021from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070022
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070023from common.utils.asleep import asleep
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070025log = get_logger()
26
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070027
28class Worker(object):
29 """
30 Worker side of the coordinator. An instance of this class runs in every
31 voltha instance. It monitors what work is assigned to this instance by
32 the leader. This is all done via consul.
33 """
34
35 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
36
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070037 # Public methods:
38
39 def __init__(self, instance_id, coordinator):
40
41 self.instance_id = instance_id
42 self.coord = coordinator
43 self.halted = False
44 self.soak_time = 0.5 # soak till assignment list settles
45
46 self.my_workload = set() # list of work_id's assigned to me
47
48 self.assignment_soak_timer = None
49 self.my_candidate_workload = set() # we stash here during soaking
50
51 self.assignment_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040052 self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070053
khenaidoo032d3302017-06-09 14:50:04 -040054 self.mycore_store_id = None
55
56 self.wait_for_core_store_assignment = Deferred()
57
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070058 @inlineCallbacks
59 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070060 log.debug('starting')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070061 yield self._start_tracking_my_assignments()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070062 log.info('started')
63 returnValue(self)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070064
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070065 def stop(self):
66 log.debug('stopping')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070067 if isinstance(self.assignment_soak_timer, DelayedCall):
68 if not self.assignment_soak_timer.called:
69 self.assignment_soak_timer.cancel()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070070 log.info('stopped')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070071
khenaidoo032d3302017-06-09 14:50:04 -040072 @inlineCallbacks
73 def get_core_store_id(self):
74 if self.mycore_store_id:
75 returnValue(self.mycore_store_id)
76 else:
77 # Let's wait until we get assigned a store_id from the leader
78 val = yield self.wait_for_core_store_assignment
79 returnValue(val)
80
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070081 # Private methods:
82
83 def _start_tracking_my_assignments(self):
84 reactor.callLater(0, self._track_my_assignments, 0)
85
86 @inlineCallbacks
87 def _track_my_assignments(self, index):
88
89 try:
90
91 # if there is no leader yet, wait for a stable leader
92 d = self.coord.wait_for_a_leader()
93 if not d.called:
94 yield d
95 # additional time to let leader update
96 # assignments, to minimize potential churn
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040097 yield asleep(self.coord.worker_config.get(
98 self.coord.worker_config['time_to_let_leader_update'], 5))
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070099
100 (index, results) = yield self.coord.kv_get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400101 self.coord.assignment_prefix + self.instance_id,
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700102 index=index, recurse=True)
103
khenaidoo032d3302017-06-09 14:50:04 -0400104 # 1. Check whether we have been assigned a full voltha instance
105 if results and not self.mycore_store_id:
106 # We have no store id set yet
107 core_stores = [c['Value'] for c in results if
108 c['Key'] == self.coord.assignment_prefix +
109 self.instance_id + '/' +
110 self.coord.core_storage_suffix]
111 if core_stores:
112 self.mycore_store_id = core_stores[0]
113 log.debug('store-assigned',
114 mycore_store_id=self.mycore_store_id)
115 self._stash_and_restart_core_store_soak_timer()
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700116
khenaidoo032d3302017-06-09 14:50:04 -0400117 # 2. Check whether we have been assigned a work item
118 if results and self.mycore_store_id:
119 # Check for difference between current worload and newer one
120 # TODO: Depending on how workload gets load balanced we may
121 # need to add workload distribution here
122 pass
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700123
124 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700125 log.exception('assignments-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400126 yield asleep(
127 self.coord.worker_config.get(
128 self.coord.worker_config[
129 'assignments_track_error_to_avoid_flood'], 1))
130 # to prevent flood
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700131
132 finally:
133 if not self.halted:
134 reactor.callLater(0, self._track_my_assignments, index)
135
136 def _stash_and_restart_soak_timer(self, candidate_workload):
137
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700138 log.debug('re-start-assignment-soaking')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700139
140 if self.assignment_soak_timer is not None:
141 if not self.assignment_soak_timer.called:
142 self.assignment_soak_timer.cancel()
143
144 self.my_candidate_workload = candidate_workload
145 self.assignment_soak_timer = reactor.callLater(
146 self.soak_time, self._update_assignments)
147
148 def _update_assignments(self):
149 """
150 Called when finally the dust has settled on our assignments.
151 :return: None
152 """
khenaidoo032d3302017-06-09 14:50:04 -0400153 log.debug('my-assignments-changed',
154 old_count=len(self.my_workload),
155 new_count=len(self.my_candidate_workload),
156 workload=self.my_workload)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700157 self.my_workload, self.my_candidate_workload = \
158 self.my_candidate_workload, None
khenaidoo032d3302017-06-09 14:50:04 -0400159
160 def _stash_and_restart_core_store_soak_timer(self):
161
162 log.debug('re-start-assignment-config-soaking')
163
164 if self.assignment_soak_timer is not None:
165 if not self.assignment_soak_timer.called:
166 self.assignment_soak_timer.cancel()
167
168 self.assignment_soak_timer = reactor.callLater(
169 self.soak_time, self._process_config_assignment)
170
171 def _process_config_assignment(self):
172 log.debug('process-config-assignment',
173 mycore_store_id=self.mycore_store_id)
174 self.wait_for_core_store_assignment.callback(self.mycore_store_id)