Work coordinator for local assignments added
diff --git a/requirements.txt b/requirements.txt
index 963c799..17bb0d7 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,6 +3,7 @@
 decorator>=3.4.0
 docker-py
 fluent-logger>=0.4.3
+hash_ring>=1.3.1
 hexdump>=3.3
 klein>=15.3.1
 nose>=1.3.7
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 8992ccb..fe0cc52 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -21,11 +21,12 @@
 from requests import ConnectionError
 from structlog import get_logger
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
 from twisted.internet.task import LoopingCall
 
 from asleep import asleep
 from leader import Leader
+from worker import Worker
 
 
 class StaleMembershipEntryException(Exception):
@@ -48,7 +49,10 @@
     CONNECT_RETRY_INTERVAL_SEC = 1
     RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
     LEADER_KEY = 'service/voltha/leader'
+
     MEMBERSHIP_PREFIX = 'service/voltha/members/'
+    ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
+    WORKLOAD_PREFIX = 'service/voltha/work/'
 
     # Public methods:
 
@@ -73,6 +77,8 @@
         self.shutting_down = False
         self.leader = None
 
+        self.worker = Worker(self.instance_id, self)
+
         self.log = get_logger()
         self.log.info('initializing-coordinator')
 
@@ -85,14 +91,31 @@
         reactor.callLater(0, self._async_init)
         self.log.info('initialized-coordinator')
 
+        self.wait_for_leader_deferreds = []
+
     @inlineCallbacks
     def shutdown(self):
         self.shutting_down = True
         yield self._delete_session()  # this will delete the leader lock too
+        yield self.worker.halt()
         if self.leader is not None:
             yield self.leader.halt()
             self.leader = None
 
+    def wait_for_a_leader(self):
+        """
+        Async wait till a leader is detected/elected. The deferred will be
+        called with the leader's instance_id
+        :return: Deferred.
+        """
+        d = Deferred()
+        if self.leader_id is not None:
+            d.callback(self.leader_id)
+            return d
+        else:
+            self.wait_for_leader_deferreds.append(d)
+            return d
+
     # Proxy methods for consul with retry support
 
     def kv_get(self, *args, **kw):
@@ -111,6 +134,7 @@
         yield self._create_session()
         yield self._create_membership_record()
         yield self._start_leader_tracking()
+        yield self.worker.start()
 
     def _backoff(self, msg):
         wait_time = self.RETRY_BACKOFF[min(self.retries,
@@ -179,14 +203,15 @@
                                index=index, record=record)
                 if record is None or record['Session'] != self.session_id:
                     self.log.debug('remaking-membership-record')
-                    yield self._do_create_membership_record()
+                    yield self._retry(self._do_create_membership_record)
 
         except Exception, e:
             self.log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
-            # no matter what, the loop need to continue (after a short delay)
-            reactor.callLater(0.1, self._maintain_membership_record)
+            # except in shutdown, the loop must continue (after a short delay)
+            if not self.shutting_down:
+                reactor.callLater(0.1, self._maintain_membership_record)
 
     def _start_leader_tracking(self):
         reactor.callLater(0, self._leadership_tracking_loop)
@@ -248,15 +273,16 @@
             self.log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
-            # no matter what, the loop need to continue (after a short delay)
-            reactor.callLater(1, self._leadership_tracking_loop)
+            # except in shutdown, the loop must continue (after a short delay)
+            if not self.shutting_down:
+                reactor.callLater(1, self._leadership_tracking_loop)
 
     @inlineCallbacks
     def _assert_leadership(self):
         """(Re-)assert leadership"""
         if not self.i_am_leader:
             self.i_am_leader = True
-            self.leader_id = self.instance_id
+            self._set_leader_id(self.instance_id)
             yield self._just_gained_leadership()
 
     @inlineCallbacks
@@ -264,12 +290,19 @@
         """(Re-)assert non-leader role"""
 
         # update leader_id anyway
-        self.leader_id = leader_id
+        self._set_leader_id(leader_id)
 
         if self.i_am_leader:
             self.i_am_leader = False
             yield self._just_lost_leadership()
 
+    def _set_leader_id(self, leader_id):
+        self.leader_id = leader_id
+        deferreds, self.wait_for_leader_deferreds = \
+            self.wait_for_leader_deferreds, []
+        for d in deferreds:
+            d.callback(leader_id)
+
     def _just_gained_leadership(self):
         self.log.info('became-leader')
         self.leader = Leader(self)
@@ -291,13 +324,14 @@
                 result = yield func(*args, **kw)
                 break
             except ConsulException, e:
-                yield self._backoff('consul-not-upC')
+                yield self._backoff('consul-not-up')
             except ConnectionError, e:
                 yield self._backoff('cannot-connect-to-consul')
             except StaleMembershipEntryException, e:
                 yield self._backoff('stale-membership-record-in-the-way')
             except Exception, e:
-                self.log.exception(e)
+                if not self.shutting_down:
+                    self.log.exception(e)
                 yield self._backoff('unknown-error')
 
         returnValue(result)
diff --git a/voltha/leader.py b/voltha/leader.py
index a1d970d..6a4ef68 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 #
 
+import re
+from hash_ring import HashRing
 from structlog import get_logger
 from twisted.internet import reactor
 from twisted.internet.base import DelayedCall
@@ -30,22 +32,32 @@
     method in cases it looses the leadership lock.
     """
 
-    ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
-    WORKLOAD_PREFIX = 'service/voltha/workload/'
+    ID_EXTRACTOR = '^(%s)([^/]+)$'
+    ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
 
     log = get_logger()
 
     # Public methods:
 
     def __init__(self, coordinator):
-        self.coorinator = coordinator
+
+        self.coord = coordinator
         self.halted = False
-        self.soak_time = 5  # soak till membership/workload changes settle
+        self.soak_time = 3  # soak till membership/workload changes settle
 
         self.workload = []
         self.members = []
         self.reassignment_soak_timer = None
 
+        self.workload_id_match = re.compile(
+             self.ID_EXTRACTOR % self.coord.WORKLOAD_PREFIX).match
+
+        self.member_id_match = re.compile(
+            self.ID_EXTRACTOR % self.coord.MEMBERSHIP_PREFIX).match
+
+        self.assignment_match = re.compile(
+            self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+
     @inlineCallbacks
     def start(self):
         self.log.info('leader-started')
@@ -59,7 +71,8 @@
 
         # any active cancellations, releases, etc., should happen here
         if isinstance(self.reassignment_soak_timer, DelayedCall):
-            self.reassignment_soak_timer.cancel()
+            if not self.reassignment_soak_timer.called:
+                self.reassignment_soak_timer.cancel()
 
     # Private methods:
 
@@ -74,8 +87,8 @@
 
         # TODO for now we simply generate a fixed number of fake entries
         yield DeferredList([
-            self.coorinator.kv_put(
-                self.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
+            self.coord.kv_put(
+                self.coord.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
                 'placeholder for device group %d data' % (i + 1))
             for i in xrange(100)
         ])
@@ -93,13 +106,16 @@
     def _track_workload(self, index):
 
         try:
-            (index, results) = yield self.coorinator.kv_get(
-                self.WORKLOAD_PREFIX, index=index, recurse=True)
+            (index, results) = yield self.coord.kv_get(
+                self.coord.WORKLOAD_PREFIX, index=index, recurse=True)
 
-            workload = [e['Key'] for e in results]
+            matches = (self.workload_id_match(e['Key']) for e in results)
+            workload = [m.group(2) for m in matches if m is not None]
 
             if workload != self.workload:
-                self.log.info('workload-changed', workload=workload)
+                self.log.info('workload-changed',
+                              old_workload_count=len(self.workload),
+                              new_workload_count=len(workload))
                 self.workload = workload
                 self._restart_reassignment_soak_timer()
 
@@ -114,19 +130,17 @@
     @inlineCallbacks
     def _track_members(self, index):
 
-        def is_member(entry):
-            key = entry['Key']
-            member_id = key[len(self.coorinator.MEMBERSHIP_PREFIX):]
-            return '/' not in member_id  # otherwise it is a nested key
-
         try:
-            (index, results) = yield self.coorinator.kv_get(
-                self.coorinator.MEMBERSHIP_PREFIX, index=index, recurse=True)
+            (index, results) = yield self.coord.kv_get(
+                self.coord.MEMBERSHIP_PREFIX, index=index, recurse=True)
 
-            members = [e['Key'] for e in results if is_member(e)]
+            matches = (self.member_id_match(e['Key']) for e in results or [])
+            members = [m.group(2) for m in matches if m is not None]
 
             if members != self.members:
-                self.log.info('membership-changed', members=members)
+                self.log.info('membership-changed',
+                              old_members_count=len(self.members),
+                              new_members_count=len(members))
                 self.members = members
                 self._restart_reassignment_soak_timer()
 
@@ -148,20 +162,17 @@
         self.reassignment_soak_timer = reactor.callLater(
             self.soak_time, self._reassign_work)
 
-        print self.reassignment_soak_timer
-
     @inlineCallbacks
     def _reassign_work(self):
-        self.log.info('reassign-work')
-        yield None
 
-        # TODO continue from here
+        self.log.info('reassign-work')
 
         # Plan
-        # Step 1: collect current assignments from consul
-        # Step 2: calculate desired assignment from current members and
+        #
+        # Step 1: calculate desired assignment from current members and
         #         workload list (e.g., using consistent hashing or any other
         #         algorithm
+        # Step 2: collect current assignments from consul
         # Step 3: find the delta between the desired and actual assignments:
         #         these form two lists:
         #         1. new assignments to be made
@@ -174,3 +185,65 @@
         #
         # We must make sure while we are working on this, we do not re-enter
         # into same method!
+
+        try:
+
+            # Step 1: generate wanted assignment (mapping work to members)
+
+            ring = HashRing(self.members)
+            wanted_assignments = dict()  # member_id -> set(work_id)
+            _ = [
+                wanted_assignments.setdefault(ring.get_node(work), set())
+                .add(work)
+                for work in self.workload
+            ]
+            for (member, work) in sorted(wanted_assignments.iteritems()):
+                self.log.info('assignment',
+                              member=member, work_count=len(work))
+
+            # Step 2: discover current assignment (from consul)
+
+            (_, results) = yield self.coord.kv_get(
+                self.coord.ASSIGNMENT_PREFIX, recurse=True)
+
+            matches = [
+                (self.assignment_match(e['Key']), e) for e in results or []]
+
+            current_assignments = dict()  # member_id -> set(work_id)
+            _ = [
+                current_assignments.setdefault(
+                    m.groupdict()['member_id'], set())
+                .add(m.groupdict()['work_id'])
+                for m, e in matches if m is not None
+            ]
+
+            # Step 3: handle revoked assignments first on a per member basis
+
+            for member_id, current_work in current_assignments.iteritems():
+                assert isinstance(current_work, set)
+                wanted_work = wanted_assignments.get(member_id, set())
+                work_to_revoke = current_work.difference(wanted_work)
+
+                # revoking work by simply deleting the assignment entry
+                # TODO if we want some feedback to see that member abandoned
+                # work, we could add a consul-based protocol here
+                for work_id in work_to_revoke:
+                    yield self.coord.kv_delete(
+                        self.coord.ASSIGNMENT_PREFIX
+                        + member_id + '/' + work_id)
+
+            # Step 4: assign new work as needed
+
+            for member_id, wanted_work in wanted_assignments.iteritems():
+                assert isinstance(wanted_work, set)
+                current_work = current_assignments.get(member_id, set())
+                work_to_assign = wanted_work.difference(current_work)
+
+                for work_id in work_to_assign:
+                    yield self.coord.kv_put(
+                        self.coord.ASSIGNMENT_PREFIX
+                        + member_id + '/' + work_id, '')
+
+        except Exception, e:
+            self.log.exception('failed-reassignment', e=e)
+            self._restart_reassignment_soak_timer()  # try again in a while
diff --git a/voltha/worker.py b/voltha/worker.py
new file mode 100644
index 0000000..69020e5
--- /dev/null
+++ b/voltha/worker.py
@@ -0,0 +1,126 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import re
+
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.base import DelayedCall
+from twisted.internet.defer import inlineCallbacks
+
+from asleep import asleep
+
+
+class Worker(object):
+    """
+    Worker side of the coordinator. An instance of this class runs in every
+    voltha instance. It monitors what work is assigned to this instance by
+    the leader. This is all done via consul.
+    """
+
+    ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
+
+    log = get_logger()
+
+    # Public methods:
+
+    def __init__(self, instance_id, coordinator):
+
+        self.instance_id = instance_id
+        self.coord = coordinator
+        self.halted = False
+        self.soak_time = 0.5  # soak till assignment list settles
+
+        self.my_workload = set()  # list of work_id's assigned to me
+
+        self.assignment_soak_timer = None
+        self.my_candidate_workload = set()  # we stash here during soaking
+
+        self.assignment_match = re.compile(
+            self.ASSIGNMENT_EXTRACTOR % self.coord.ASSIGNMENT_PREFIX).match
+
+    @inlineCallbacks
+    def start(self):
+        self.log.info('worker-started')
+        yield self._start_tracking_my_assignments()
+
+    def halt(self):
+        self.log.info('worker-halted')
+        if isinstance(self.assignment_soak_timer, DelayedCall):
+            if not self.assignment_soak_timer.called:
+                self.assignment_soak_timer.cancel()
+
+    # Private methods:
+
+    def _start_tracking_my_assignments(self):
+        reactor.callLater(0, self._track_my_assignments, 0)
+
+    @inlineCallbacks
+    def _track_my_assignments(self, index):
+
+        try:
+
+            # if there is no leader yet, wait for a stable leader
+            d = self.coord.wait_for_a_leader()
+            if not d.called:
+                yield d
+                # additional time to let leader update
+                # assignments, to minimize potential churn
+                yield asleep(5)
+
+            (index, results) = yield self.coord.kv_get(
+                self.coord.ASSIGNMENT_PREFIX + self.instance_id,
+                index=index, recurse=True)
+
+            matches = [
+                (self.assignment_match(e['Key']), e) for e in results or []]
+
+            my_workload = set([
+                m.groupdict()['work_id'] for m, e in matches if m is not None
+            ])
+
+            if my_workload != self.my_workload:
+                self._stash_and_restart_soak_timer(my_workload)
+
+        except Exception, e:
+            self.log.exception('assignments-track-error', e=e)
+            yield asleep(1)  # to prevent flood
+
+        finally:
+            if not self.halted:
+                reactor.callLater(0, self._track_my_assignments, index)
+
+    def _stash_and_restart_soak_timer(self, candidate_workload):
+
+        self.log.debug('re-start-assignment-soaking')
+
+        if self.assignment_soak_timer is not None:
+            if not self.assignment_soak_timer.called:
+                self.assignment_soak_timer.cancel()
+
+        self.my_candidate_workload = candidate_workload
+        self.assignment_soak_timer = reactor.callLater(
+            self.soak_time, self._update_assignments)
+
+    def _update_assignments(self):
+        """
+        Called when finally the dust has settled on our assignments.
+        :return: None
+        """
+        self.log.info('my-assignments-changed',
+                      old_count=len(self.my_workload),
+                      new_count=len(self.my_candidate_workload))
+        self.my_workload, self.my_candidate_workload = \
+            self.my_candidate_workload, None