blob: d8b11a944e2de79a00c0b7d3506ae9bca1158105 [file] [log] [blame]
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from structlog import get_logger
19from twisted.internet import reactor
20from twisted.internet.base import DelayedCall
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070021from twisted.internet.defer import inlineCallbacks, returnValue
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070022
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070023from common.utils.asleep import asleep
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070025log = get_logger()
26
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070027
28class Worker(object):
29 """
30 Worker side of the coordinator. An instance of this class runs in every
31 voltha instance. It monitors what work is assigned to this instance by
32 the leader. This is all done via consul.
33 """
34
35 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
36
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070037 # Public methods:
38
39 def __init__(self, instance_id, coordinator):
40
41 self.instance_id = instance_id
42 self.coord = coordinator
43 self.halted = False
44 self.soak_time = 0.5 # soak till assignment list settles
45
46 self.my_workload = set() # list of work_id's assigned to me
47
48 self.assignment_soak_timer = None
49 self.my_candidate_workload = set() # we stash here during soaking
50
51 self.assignment_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040052 self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070053
54 @inlineCallbacks
55 def start(self):
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070056 log.debug('starting')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070057 yield self._start_tracking_my_assignments()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070058 log.info('started')
59 returnValue(self)
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070060
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070061 def stop(self):
62 log.debug('stopping')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070063 if isinstance(self.assignment_soak_timer, DelayedCall):
64 if not self.assignment_soak_timer.called:
65 self.assignment_soak_timer.cancel()
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -070066 log.info('stopped')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070067
68 # Private methods:
69
70 def _start_tracking_my_assignments(self):
71 reactor.callLater(0, self._track_my_assignments, 0)
72
73 @inlineCallbacks
74 def _track_my_assignments(self, index):
75
76 try:
77
78 # if there is no leader yet, wait for a stable leader
79 d = self.coord.wait_for_a_leader()
80 if not d.called:
81 yield d
82 # additional time to let leader update
83 # assignments, to minimize potential churn
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040084 yield asleep(self.coord.worker_config.get(
85 self.coord.worker_config['time_to_let_leader_update'], 5))
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070086
87 (index, results) = yield self.coord.kv_get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040088 self.coord.assignment_prefix + self.instance_id,
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070089 index=index, recurse=True)
90
91 matches = [
92 (self.assignment_match(e['Key']), e) for e in results or []]
93
94 my_workload = set([
95 m.groupdict()['work_id'] for m, e in matches if m is not None
96 ])
97
98 if my_workload != self.my_workload:
99 self._stash_and_restart_soak_timer(my_workload)
100
101 except Exception, e:
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700102 log.exception('assignments-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400103 yield asleep(
104 self.coord.worker_config.get(
105 self.coord.worker_config[
106 'assignments_track_error_to_avoid_flood'], 1))
107 # to prevent flood
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700108
109 finally:
110 if not self.halted:
111 reactor.callLater(0, self._track_my_assignments, index)
112
113 def _stash_and_restart_soak_timer(self, candidate_workload):
114
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700115 log.debug('re-start-assignment-soaking')
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700116
117 if self.assignment_soak_timer is not None:
118 if not self.assignment_soak_timer.called:
119 self.assignment_soak_timer.cancel()
120
121 self.my_candidate_workload = candidate_workload
122 self.assignment_soak_timer = reactor.callLater(
123 self.soak_time, self._update_assignments)
124
125 def _update_assignments(self):
126 """
127 Called when finally the dust has settled on our assignments.
128 :return: None
129 """
Zsolt Haraszti2bdb6b32016-11-03 16:56:17 -0700130 log.info('my-assignments-changed',
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700131 old_count=len(self.my_workload),
132 new_count=len(self.my_candidate_workload))
133 self.my_workload, self.my_candidate_workload = \
134 self.my_candidate_workload, None