blob: 8d0206c512c2041edd49b20c8ca3839b748dbc41 [file] [log] [blame]
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from structlog import get_logger
19from twisted.internet import reactor
20from twisted.internet.base import DelayedCall
21from twisted.internet.defer import inlineCallbacks
22
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -070023from common.utils.asleep import asleep
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070024
25
26class Worker(object):
27 """
28 Worker side of the coordinator. An instance of this class runs in every
29 voltha instance. It monitors what work is assigned to this instance by
30 the leader. This is all done via consul.
31 """
32
33 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
34
35 log = get_logger()
36
37 # Public methods:
38
39 def __init__(self, instance_id, coordinator):
40
41 self.instance_id = instance_id
42 self.coord = coordinator
43 self.halted = False
44 self.soak_time = 0.5 # soak till assignment list settles
45
46 self.my_workload = set() # list of work_id's assigned to me
47
48 self.assignment_soak_timer = None
49 self.my_candidate_workload = set() # we stash here during soaking
50
51 self.assignment_match = re.compile(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040052 self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070053
54 @inlineCallbacks
55 def start(self):
56 self.log.info('worker-started')
57 yield self._start_tracking_my_assignments()
58
59 def halt(self):
60 self.log.info('worker-halted')
61 if isinstance(self.assignment_soak_timer, DelayedCall):
62 if not self.assignment_soak_timer.called:
63 self.assignment_soak_timer.cancel()
64
65 # Private methods:
66
67 def _start_tracking_my_assignments(self):
68 reactor.callLater(0, self._track_my_assignments, 0)
69
70 @inlineCallbacks
71 def _track_my_assignments(self, index):
72
73 try:
74
75 # if there is no leader yet, wait for a stable leader
76 d = self.coord.wait_for_a_leader()
77 if not d.called:
78 yield d
79 # additional time to let leader update
80 # assignments, to minimize potential churn
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040081 yield asleep(self.coord.worker_config.get(
82 self.coord.worker_config['time_to_let_leader_update'], 5))
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070083
84 (index, results) = yield self.coord.kv_get(
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -040085 self.coord.assignment_prefix + self.instance_id,
Zsolt Harasztiac9310d2016-09-20 12:56:35 -070086 index=index, recurse=True)
87
88 matches = [
89 (self.assignment_match(e['Key']), e) for e in results or []]
90
91 my_workload = set([
92 m.groupdict()['work_id'] for m, e in matches if m is not None
93 ])
94
95 if my_workload != self.my_workload:
96 self._stash_and_restart_soak_timer(my_workload)
97
98 except Exception, e:
99 self.log.exception('assignments-track-error', e=e)
Rouzbahan Rashidi-Tabrizi1c3eba82016-10-27 21:47:18 -0400100 yield asleep(
101 self.coord.worker_config.get(
102 self.coord.worker_config[
103 'assignments_track_error_to_avoid_flood'], 1))
104 # to prevent flood
Zsolt Harasztiac9310d2016-09-20 12:56:35 -0700105
106 finally:
107 if not self.halted:
108 reactor.callLater(0, self._track_my_assignments, index)
109
110 def _stash_and_restart_soak_timer(self, candidate_workload):
111
112 self.log.debug('re-start-assignment-soaking')
113
114 if self.assignment_soak_timer is not None:
115 if not self.assignment_soak_timer.called:
116 self.assignment_soak_timer.cancel()
117
118 self.my_candidate_workload = candidate_workload
119 self.assignment_soak_timer = reactor.callLater(
120 self.soak_time, self._update_assignments)
121
122 def _update_assignments(self):
123 """
124 Called when finally the dust has settled on our assignments.
125 :return: None
126 """
127 self.log.info('my-assignments-changed',
128 old_count=len(self.my_workload),
129 new_count=len(self.my_candidate_workload))
130 self.my_workload, self.my_candidate_workload = \
131 self.my_candidate_workload, None