Work coordinator for local assignments added
diff --git a/requirements.txt b/requirements.txt
index 963c799..17bb0d7 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
decorator>=3.4.0
docker-py
fluent-logger>=0.4.3
+hash_ring>=1.3.1
hexdump>=3.3
klein>=15.3.1
nose>=1.3.7
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 8992ccb..fe0cc52 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -21,11 +21,12 @@
from requests import ConnectionError
from structlog import get_logger
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from twisted.internet.task import LoopingCall
from asleep import asleep
from leader import Leader
+from worker import Worker
class StaleMembershipEntryException(Exception):
@@ -48,7 +49,10 @@
CONNECT_RETRY_INTERVAL_SEC = 1
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
LEADER_KEY = 'service/voltha/leader'
+
MEMBERSHIP_PREFIX = 'service/voltha/members/'
+ ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
+ WORKLOAD_PREFIX = 'service/voltha/work/'
# Public methods:
@@ -73,6 +77,8 @@
self.shutting_down = False
self.leader = None
+ self.worker = Worker(self.instance_id, self)
+
self.log = get_logger()
self.log.info('initializing-coordinator')
@@ -85,14 +91,31 @@
reactor.callLater(0, self._async_init)
self.log.info('initialized-coordinator')
+ self.wait_for_leader_deferreds = []
+
@inlineCallbacks
def shutdown(self):
self.shutting_down = True
yield self._delete_session() # this will delete the leader lock too
+ yield self.worker.halt()
if self.leader is not None:
yield self.leader.halt()
self.leader = None
+ def wait_for_a_leader(self):
+ """
+ Async wait till a leader is detected/elected. The deferred will be
+ called with the leader's instance_id
+ :return: Deferred.
+ """
+ d = Deferred()
+ if self.leader_id is not None:
+ d.callback(self.leader_id)
+ return d
+ else:
+ self.wait_for_leader_deferreds.append(d)
+ return d
+
# Proxy methods for consul with retry support
def kv_get(self, *args, **kw):
@@ -111,6 +134,7 @@
yield self._create_session()
yield self._create_membership_record()
yield self._start_leader_tracking()
+ yield self.worker.start()
def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
@@ -179,14 +203,15 @@
index=index, record=record)
if record is None or record['Session'] != self.session_id:
self.log.debug('remaking-membership-record')
- yield self._do_create_membership_record()
+ yield self._retry(self._do_create_membership_record)
except Exception, e:
self.log.exception('unexpected-error-leader-trackin', e=e)
finally:
- # no matter what, the loop need to continue (after a short delay)
- reactor.callLater(0.1, self._maintain_membership_record)
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(0.1, self._maintain_membership_record)
def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
@@ -248,15 +273,16 @@
self.log.exception('unexpected-error-leader-trackin', e=e)
finally:
- # no matter what, the loop need to continue (after a short delay)
- reactor.callLater(1, self._leadership_tracking_loop)
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(1, self._leadership_tracking_loop)
@inlineCallbacks
def _assert_leadership(self):
"""(Re-)assert leadership"""
if not self.i_am_leader:
self.i_am_leader = True
- self.leader_id = self.instance_id
+ self._set_leader_id(self.instance_id)
yield self._just_gained_leadership()
@inlineCallbacks
@@ -264,12 +290,19 @@
"""(Re-)assert non-leader role"""
# update leader_id anyway
- self.leader_id = leader_id
+ self._set_leader_id(leader_id)
if self.i_am_leader:
self.i_am_leader = False
yield self._just_lost_leadership()
+ def _set_leader_id(self, leader_id):
+ self.leader_id = leader_id
+ deferreds, self.wait_for_leader_deferreds = \
+ self.wait_for_leader_deferreds, []
+ for d in deferreds:
+ d.callback(leader_id)
+
def _just_gained_leadership(self):
self.log.info('became-leader')
self.leader = Leader(self)
@@ -291,13 +324,14 @@
result = yield func(*args, **kw)
break
except ConsulException, e:
- yield self._backoff('consul-not-upC')
+ yield self._backoff('consul-not-up')
except ConnectionError, e:
yield self._backoff('cannot-connect-to-consul')
except StaleMembershipEntryException, e:
yield self._backoff('stale-membership-record-in-the-way')
except Exception, e:
- self.log.exception(e)
+ if not self.shutting_down:
+ self.log.exception(e)
yield self._backoff('unknown-error')
returnValue(result)
diff --git a/voltha/leader.py b/voltha/leader.py
index a1d970d..6a4ef68 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -14,6 +14,8 @@
# limitations under the License.
#
+import re
+from hash_ring import HashRing
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
@@ -30,22 +32,32 @@
method in cases it looses the leadership lock.
"""
- ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
- WORKLOAD_PREFIX = 'service/voltha/workload/'
+ ID_EXTRACTOR = '^(%s)([^/]+)$'
+ ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
log = get_logger()
# Public methods:
def __init__(self, coordinator):
- self.coorinator = coordinator
+
+ self.coord = coordinator
self.halted = False
- self.soak_time = 5 # soak till membership/workload changes settle
+ self.soak_time = 3 # soak till membership/workload changes settle
self.workload = []
self.members = []
self.reassignment_soak_timer = None
+ self.workload_id_match = re.compile(
+ self.ID_EXTRACTOR % self.coord.WORKLOAD_PREFIX).match
+
+ self.member_id_match = re.compile(
+ self.ID_EXTRACTOR % self.coord.MEMBERSHIP_PREFIX).match
+
+ self.assignment_match = re.compile(
+ self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+
@inlineCallbacks
def start(self):
self.log.info('leader-started')
@@ -59,7 +71,8 @@
# any active cancellations, releases, etc., should happen here
if isinstance(self.reassignment_soak_timer, DelayedCall):
- self.reassignment_soak_timer.cancel()
+ if not self.reassignment_soak_timer.called:
+ self.reassignment_soak_timer.cancel()
# Private methods:
@@ -74,8 +87,8 @@
# TODO for now we simply generate a fixed number of fake entries
yield DeferredList([
- self.coorinator.kv_put(
- self.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
+ self.coord.kv_put(
+ self.coord.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
'placeholder for device group %d data' % (i + 1))
for i in xrange(100)
])
@@ -93,13 +106,16 @@
def _track_workload(self, index):
try:
- (index, results) = yield self.coorinator.kv_get(
- self.WORKLOAD_PREFIX, index=index, recurse=True)
+ (index, results) = yield self.coord.kv_get(
+ self.coord.WORKLOAD_PREFIX, index=index, recurse=True)
- workload = [e['Key'] for e in results]
+ matches = (self.workload_id_match(e['Key']) for e in results)
+ workload = [m.group(2) for m in matches if m is not None]
if workload != self.workload:
- self.log.info('workload-changed', workload=workload)
+ self.log.info('workload-changed',
+ old_workload_count=len(self.workload),
+ new_workload_count=len(workload))
self.workload = workload
self._restart_reassignment_soak_timer()
@@ -114,19 +130,17 @@
@inlineCallbacks
def _track_members(self, index):
- def is_member(entry):
- key = entry['Key']
- member_id = key[len(self.coorinator.MEMBERSHIP_PREFIX):]
- return '/' not in member_id # otherwise it is a nested key
-
try:
- (index, results) = yield self.coorinator.kv_get(
- self.coorinator.MEMBERSHIP_PREFIX, index=index, recurse=True)
+ (index, results) = yield self.coord.kv_get(
+ self.coord.MEMBERSHIP_PREFIX, index=index, recurse=True)
- members = [e['Key'] for e in results if is_member(e)]
+ matches = (self.member_id_match(e['Key']) for e in results or [])
+ members = [m.group(2) for m in matches if m is not None]
if members != self.members:
- self.log.info('membership-changed', members=members)
+ self.log.info('membership-changed',
+ old_members_count=len(self.members),
+ new_members_count=len(members))
self.members = members
self._restart_reassignment_soak_timer()
@@ -148,20 +162,17 @@
self.reassignment_soak_timer = reactor.callLater(
self.soak_time, self._reassign_work)
- print self.reassignment_soak_timer
-
@inlineCallbacks
def _reassign_work(self):
- self.log.info('reassign-work')
- yield None
- # TODO continue from here
+ self.log.info('reassign-work')
# Plan
- # Step 1: collect current assignments from consul
- # Step 2: calculate desired assignment from current members and
+ #
+ # Step 1: calculate desired assignment from current members and
# workload list (e.g., using consistent hashing or any other
# algorithm
+ # Step 2: collect current assignments from consul
# Step 3: find the delta between the desired and actual assignments:
# these form two lists:
# 1. new assignments to be made
@@ -174,3 +185,65 @@
#
# We must make sure while we are working on this, we do not re-enter
# into same method!
+
+ try:
+
+ # Step 1: generate wanted assignment (mapping work to members)
+
+ ring = HashRing(self.members)
+ wanted_assignments = dict() # member_id -> set(work_id)
+ _ = [
+ wanted_assignments.setdefault(ring.get_node(work), set())
+ .add(work)
+ for work in self.workload
+ ]
+ for (member, work) in sorted(wanted_assignments.iteritems()):
+ self.log.info('assignment',
+ member=member, work_count=len(work))
+
+ # Step 2: discover current assignment (from consul)
+
+ (_, results) = yield self.coord.kv_get(
+ self.coord.ASSIGNMENT_PREFIX, recurse=True)
+
+ matches = [
+ (self.assignment_match(e['Key']), e) for e in results or []]
+
+ current_assignments = dict() # member_id -> set(work_id)
+ _ = [
+ current_assignments.setdefault(
+ m.groupdict()['member_id'], set())
+ .add(m.groupdict()['work_id'])
+ for m, e in matches if m is not None
+ ]
+
+ # Step 3: handle revoked assignments first on a per member basis
+
+ for member_id, current_work in current_assignments.iteritems():
+ assert isinstance(current_work, set)
+ wanted_work = wanted_assignments.get(member_id, set())
+ work_to_revoke = current_work.difference(wanted_work)
+
+ # revoking work by simply deleting the assignment entry
+ # TODO if we want some feedback to see that member abandoned
+ # work, we could add a consul-based protocol here
+ for work_id in work_to_revoke:
+ yield self.coord.kv_delete(
+ self.coord.ASSIGNMENT_PREFIX
+ + member_id + '/' + work_id)
+
+ # Step 4: assign new work as needed
+
+ for member_id, wanted_work in wanted_assignments.iteritems():
+ assert isinstance(wanted_work, set)
+ current_work = current_assignments.get(member_id, set())
+ work_to_assign = wanted_work.difference(current_work)
+
+ for work_id in work_to_assign:
+ yield self.coord.kv_put(
+ self.coord.ASSIGNMENT_PREFIX
+ + member_id + '/' + work_id, '')
+
+ except Exception, e:
+ self.log.exception('failed-reassignment', e=e)
+ self._restart_reassignment_soak_timer() # try again in a while
diff --git a/voltha/worker.py b/voltha/worker.py
new file mode 100644
index 0000000..69020e5
--- /dev/null
+++ b/voltha/worker.py
@@ -0,0 +1,126 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import re
+
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.base import DelayedCall
+from twisted.internet.defer import inlineCallbacks
+
+from asleep import asleep
+
+
+class Worker(object):
+ """
+ Worker side of the coordinator. An instance of this class runs in every
+ voltha instance. It monitors what work is assigned to this instance by
+ the leader. This is all done via consul.
+ """
+
+ ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
+
+ log = get_logger()
+
+ # Public methods:
+
+ def __init__(self, instance_id, coordinator):
+
+ self.instance_id = instance_id
+ self.coord = coordinator
+ self.halted = False
+ self.soak_time = 0.5 # soak till assignment list settles
+
+ self.my_workload = set() # list of work_id's assigned to me
+
+ self.assignment_soak_timer = None
+ self.my_candidate_workload = set() # we stash here during soaking
+
+ self.assignment_match = re.compile(
+ self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+
+ @inlineCallbacks
+ def start(self):
+ self.log.info('worker-started')
+ yield self._start_tracking_my_assignments()
+
+ def halt(self):
+ self.log.info('worker-halted')
+ if isinstance(self.assignment_soak_timer, DelayedCall):
+ if not self.assignment_soak_timer.called:
+ self.assignment_soak_timer.cancel()
+
+ # Private methods:
+
+ def _start_tracking_my_assignments(self):
+ reactor.callLater(0, self._track_my_assignments, 0)
+
+ @inlineCallbacks
+ def _track_my_assignments(self, index):
+
+ try:
+
+ # if there is no leader yet, wait for a stable leader
+ d = self.coord.wait_for_a_leader()
+ if not d.called:
+ yield d
+ # additional time to let leader update
+ # assignments, to minimize potential churn
+ yield asleep(5)
+
+ (index, results) = yield self.coord.kv_get(
+ self.coord.ASSIGNMENT_PREFIX + self.instance_id,
+ index=index, recurse=True)
+
+ matches = [
+ (self.assignment_match(e['Key']), e) for e in results or []]
+
+ my_workload = set([
+ m.groupdict()['work_id'] for m, e in matches if m is not None
+ ])
+
+ if my_workload != self.my_workload:
+ self._stash_and_restart_soak_timer(my_workload)
+
+ except Exception, e:
+ self.log.exception('assignments-track-error', e=e)
+ yield asleep(1) # to prevent flood
+
+ finally:
+ if not self.halted:
+ reactor.callLater(0, self._track_my_assignments, index)
+
+ def _stash_and_restart_soak_timer(self, candidate_workload):
+
+ self.log.debug('re-start-assignment-soaking')
+
+ if self.assignment_soak_timer is not None:
+ if not self.assignment_soak_timer.called:
+ self.assignment_soak_timer.cancel()
+
+ self.my_candidate_workload = candidate_workload
+ self.assignment_soak_timer = reactor.callLater(
+ self.soak_time, self._update_assignments)
+
+ def _update_assignments(self):
+ """
+ Called when finally the dust has settled on our assignments.
+ :return: None
+ """
+ self.log.info('my-assignments-changed',
+ old_count=len(self.my_workload),
+ new_count=len(self.my_candidate_workload))
+ self.my_workload, self.my_candidate_workload = \
+ self.my_candidate_workload, None