blob: 69020e5e3134b19ddd7e3251b74ab11c384c77b5 [file] [log] [blame]
Zsolt Harasztiac9310d2016-09-20 12:56:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from structlog import get_logger
19from twisted.internet import reactor
20from twisted.internet.base import DelayedCall
21from twisted.internet.defer import inlineCallbacks
22
23from asleep import asleep
24
25
26class Worker(object):
27 """
28 Worker side of the coordinator. An instance of this class runs in every
29 voltha instance. It monitors what work is assigned to this instance by
30 the leader. This is all done via consul.
31 """
32
33 ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
34
35 log = get_logger()
36
37 # Public methods:
38
39 def __init__(self, instance_id, coordinator):
40
41 self.instance_id = instance_id
42 self.coord = coordinator
43 self.halted = False
44 self.soak_time = 0.5 # soak till assignment list settles
45
46 self.my_workload = set() # list of work_id's assigned to me
47
48 self.assignment_soak_timer = None
49 self.my_candidate_workload = set() # we stash here during soaking
50
51 self.assignment_match = re.compile(
52 self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
53
54 @inlineCallbacks
55 def start(self):
56 self.log.info('worker-started')
57 yield self._start_tracking_my_assignments()
58
59 def halt(self):
60 self.log.info('worker-halted')
61 if isinstance(self.assignment_soak_timer, DelayedCall):
62 if not self.assignment_soak_timer.called:
63 self.assignment_soak_timer.cancel()
64
65 # Private methods:
66
67 def _start_tracking_my_assignments(self):
68 reactor.callLater(0, self._track_my_assignments, 0)
69
70 @inlineCallbacks
71 def _track_my_assignments(self, index):
72
73 try:
74
75 # if there is no leader yet, wait for a stable leader
76 d = self.coord.wait_for_a_leader()
77 if not d.called:
78 yield d
79 # additional time to let leader update
80 # assignments, to minimize potential churn
81 yield asleep(5)
82
83 (index, results) = yield self.coord.kv_get(
84 self.coord.ASSIGNMENT_PREFIX + self.instance_id,
85 index=index, recurse=True)
86
87 matches = [
88 (self.assignment_match(e['Key']), e) for e in results or []]
89
90 my_workload = set([
91 m.groupdict()['work_id'] for m, e in matches if m is not None
92 ])
93
94 if my_workload != self.my_workload:
95 self._stash_and_restart_soak_timer(my_workload)
96
97 except Exception, e:
98 self.log.exception('assignments-track-error', e=e)
99 yield asleep(1) # to prevent flood
100
101 finally:
102 if not self.halted:
103 reactor.callLater(0, self._track_my_assignments, index)
104
105 def _stash_and_restart_soak_timer(self, candidate_workload):
106
107 self.log.debug('re-start-assignment-soaking')
108
109 if self.assignment_soak_timer is not None:
110 if not self.assignment_soak_timer.called:
111 self.assignment_soak_timer.cancel()
112
113 self.my_candidate_workload = candidate_workload
114 self.assignment_soak_timer = reactor.callLater(
115 self.soak_time, self._update_assignments)
116
117 def _update_assignments(self):
118 """
119 Called when finally the dust has settled on our assignments.
120 :return: None
121 """
122 self.log.info('my-assignments-changed',
123 old_count=len(self.my_workload),
124 new_count=len(self.my_candidate_workload))
125 self.my_workload, self.my_candidate_workload = \
126 self.my_candidate_workload, None