blob: a1d970d54ad83d2163d782f397bc6a910e606004 [file] [log] [blame]
Zsolt Harasztia3410312016-09-18 23:29:04 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17from structlog import get_logger
18from twisted.internet import reactor
19from twisted.internet.base import DelayedCall
20from twisted.internet.defer import inlineCallbacks, DeferredList
21
22from asleep import asleep
23
24
25class Leader(object):
26 """
27 A single instance of this object shall exist across the whole cluster.
28 This is guaranteed by the coordinator which instantiates this class
29 only when it secured the leadership lock, as well as calling the halt()
30 method in cases it looses the leadership lock.
31 """
32
33 ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
34 WORKLOAD_PREFIX = 'service/voltha/workload/'
35
36 log = get_logger()
37
38 # Public methods:
39
40 def __init__(self, coordinator):
41 self.coorinator = coordinator
42 self.halted = False
43 self.soak_time = 5 # soak till membership/workload changes settle
44
45 self.workload = []
46 self.members = []
47 self.reassignment_soak_timer = None
48
49 @inlineCallbacks
50 def start(self):
51 self.log.info('leader-started')
52 yield self._validate_workload()
53 yield self._start_tracking_assignments()
54
55 def halt(self):
56 """Suspend leadership duties immediately"""
57 self.log.info('leader-halted')
58 self.halted = True
59
60 # any active cancellations, releases, etc., should happen here
61 if isinstance(self.reassignment_soak_timer, DelayedCall):
62 self.reassignment_soak_timer.cancel()
63
64 # Private methods:
65
66 @inlineCallbacks
67 def _validate_workload(self):
68 """
69 Workload is defined as any k/v entries under the workload prefix
70 in consul. Under normal operation, only the leader shall edit the
71 workload list. But we make sure that in case an administrator
72 manually edits the workload, we react to that properly.
73 """
74
75 # TODO for now we simply generate a fixed number of fake entries
76 yield DeferredList([
77 self.coorinator.kv_put(
78 self.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
79 'placeholder for device group %d data' % (i + 1))
80 for i in xrange(100)
81 ])
82
83 def _start_tracking_assignments(self):
84 """
85 We must track both the cluster member list as well as the workload
86 list. Upon change in either, we must rerun our sharding algorithm
87 and reassign work as/if needed.
88 """
89 reactor.callLater(0, self._track_workload, 0)
90 reactor.callLater(0, self._track_members, 0)
91
92 @inlineCallbacks
93 def _track_workload(self, index):
94
95 try:
96 (index, results) = yield self.coorinator.kv_get(
97 self.WORKLOAD_PREFIX, index=index, recurse=True)
98
99 workload = [e['Key'] for e in results]
100
101 if workload != self.workload:
102 self.log.info('workload-changed', workload=workload)
103 self.workload = workload
104 self._restart_reassignment_soak_timer()
105
106 except Exception, e:
107 self.log.exception('workload-track-error', e=e)
108 yield asleep(1.0) # to prevent flood
109
110 finally:
111 if not self.halted:
112 reactor.callLater(0, self._track_workload, index)
113
114 @inlineCallbacks
115 def _track_members(self, index):
116
117 def is_member(entry):
118 key = entry['Key']
119 member_id = key[len(self.coorinator.MEMBERSHIP_PREFIX):]
120 return '/' not in member_id # otherwise it is a nested key
121
122 try:
123 (index, results) = yield self.coorinator.kv_get(
124 self.coorinator.MEMBERSHIP_PREFIX, index=index, recurse=True)
125
126 members = [e['Key'] for e in results if is_member(e)]
127
128 if members != self.members:
129 self.log.info('membership-changed', members=members)
130 self.members = members
131 self._restart_reassignment_soak_timer()
132
133 except Exception, e:
134 self.log.exception('members-track-error', e=e)
135 yield asleep(1.0) # to prevent flood
136
137 finally:
138 if not self.halted:
139 reactor.callLater(0, self._track_members, index)
140
141 def _restart_reassignment_soak_timer(self):
142
143 if self.reassignment_soak_timer is not None:
144 assert isinstance(self.reassignment_soak_timer, DelayedCall)
Zsolt Haraszti8dc1f5e2016-09-18 23:35:39 -0700145 if not self.reassignment_soak_timer.called:
146 self.reassignment_soak_timer.cancel()
Zsolt Harasztia3410312016-09-18 23:29:04 -0700147
148 self.reassignment_soak_timer = reactor.callLater(
149 self.soak_time, self._reassign_work)
150
151 print self.reassignment_soak_timer
152
153 @inlineCallbacks
154 def _reassign_work(self):
155 self.log.info('reassign-work')
156 yield None
157
158 # TODO continue from here
159
160 # Plan
161 # Step 1: collect current assignments from consul
162 # Step 2: calculate desired assignment from current members and
163 # workload list (e.g., using consistent hashing or any other
164 # algorithm
165 # Step 3: find the delta between the desired and actual assignments:
166 # these form two lists:
167 # 1. new assignments to be made
168 # 2. obsolete assignments to be revoked
169 # graceful handling may be desirable when moving existing
170 # assignment from existing member to another member (to make
171 # sure it is abandoned by old member before new takes charge)
172 # Step 4: orchestrate the assignment by adding/deleting(/locking)
173 # entries in consul
174 #
175 # We must make sure while we are working on this, we do not re-enter
176 # into same method!