Add leader, membership, and workload tracking

Still work in progress, but leader correctly detects workload
and/or membership change events and properly soaks before
rerunning reassignment routine (which is not flashed out yet).
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 432c0fe..8992ccb 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -16,14 +16,6 @@
 
 """ Consul-based coordinator services """
 
-# TODO move this to the consul.twisted async client once it is available.
-# Note:
-# We use https://github.com/cablehead/python-consul for consul client.
-# It's master branch already provides support for Twisted, but the latest
-# released version (0.6.1) was cut before twisted support was added. So keep
-# an eye on when 0.6.2 comes out and move over to the twisted interface once
-# it's available.
-
 from consul import ConsulException
 from consul.twisted import Consul
 from requests import ConnectionError
@@ -33,6 +25,7 @@
 from twisted.internet.task import LoopingCall
 
 from asleep import asleep
+from leader import Leader
 
 
 class StaleMembershipEntryException(Exception):
@@ -40,18 +33,32 @@
 
 
 class Coordinator(object):
+    """
+    An app shall instantiate only one Coordinator (singleton).
+    A single instance of this object shall take care of all external
+    with consul, and via consul, all coordination activities with its
+    clustered peers. Roles include:
+    - registering an ephemeral membership entry (k/v record) in consul
+    - participating in a symmetric leader election, and potentially assuming
+      the leader's role. What leadership entails is not a concern for the
+      coordination, it simply instantiates (and shuts down) a leader class
+      when it gains (or looses) leadership.
+    """
 
     CONNECT_RETRY_INTERVAL_SEC = 1
     RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
     LEADER_KEY = 'service/voltha/leader'
     MEMBERSHIP_PREFIX = 'service/voltha/members/'
 
+    # Public methods:
+
     def __init__(self,
                  internal_host_address,
                  external_host_address,
                  instance_id,
                  rest_port,
-                 consul='localhost:8500'):
+                 consul='localhost:8500',
+                 leader_class=Leader):
 
         self.retries = 0
         self.instance_id = instance_id
@@ -64,6 +71,7 @@
         self.i_am_leader = False
         self.leader_id = None  # will be the instance id of the current leader
         self.shutting_down = False
+        self.leader = None
 
         self.log = get_logger()
         self.log.info('initializing-coordinator')
@@ -74,34 +82,50 @@
         # TODO need to handle reconnect events properly
         self.consul = Consul(host=host, port=port)
 
-        reactor.callLater(0, self.async_init)
+        reactor.callLater(0, self._async_init)
         self.log.info('initialized-coordinator')
 
     @inlineCallbacks
-    def async_init(self):
-        yield self.create_session()
-        yield self.create_membership_record()
-        yield self.start_leader_tracking()
-
-    @inlineCallbacks
     def shutdown(self):
         self.shutting_down = True
-        yield self.delete_session()  # this will delete the leader lock too
+        yield self._delete_session()  # this will delete the leader lock too
+        if self.leader is not None:
+            yield self.leader.halt()
+            self.leader = None
 
-    def backoff(self, msg):
+    # Proxy methods for consul with retry support
+
+    def kv_get(self, *args, **kw):
+        return self._retry(self.consul.kv.get, *args, **kw)
+
+    def kv_put(self, *args, **kw):
+        return self._retry(self.consul.kv.put, *args, **kw)
+
+    def kv_delete(self, *args, **kw):
+        return self._retry(self.consul.kv.delete, *args, **kw)
+
+    # Private (internal) methods:
+
+    @inlineCallbacks
+    def _async_init(self):
+        yield self._create_session()
+        yield self._create_membership_record()
+        yield self._start_leader_tracking()
+
+    def _backoff(self, msg):
         wait_time = self.RETRY_BACKOFF[min(self.retries,
                                            len(self.RETRY_BACKOFF) - 1)]
         self.retries += 1
         self.log.error(msg, retry_in=wait_time)
         return asleep(wait_time)
 
-    def clear_backoff(self):
+    def _clear_backoff(self):
         if self.retries:
             self.log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
     @inlineCallbacks
-    def create_session(self):
+    def _create_session(self):
 
         @inlineCallbacks
         def _renew_session():
@@ -127,16 +151,16 @@
         yield self._retry(_create_session)
 
     @inlineCallbacks
-    def delete_session(self):
+    def _delete_session(self):
         yield self.consul.session.destroy(self.session_id)
 
     @inlineCallbacks
-    def create_membership_record(self):
-        yield self._retry(self._create_membership_record)
+    def _create_membership_record(self):
+        yield self._retry(self._do_create_membership_record)
         reactor.callLater(0, self._maintain_membership_record)
 
     @inlineCallbacks
-    def _create_membership_record(self):
+    def _do_create_membership_record(self):
         result = yield self.consul.kv.put(
             self.membership_record_key, 'alive',
             acquire=self.session_id)
@@ -155,16 +179,16 @@
                                index=index, record=record)
                 if record is None or record['Session'] != self.session_id:
                     self.log.debug('remaking-membership-record')
-                    yield self._create_membership_record()
+                    yield self._do_create_membership_record()
 
         except Exception, e:
             self.log.exception('unexpected-error-leader-trackin', e=e)
 
         finally:
             # no matter what, the loop need to continue (after a short delay)
-            reactor.callAfter(0.1, self._maintain_membership_record)
+            reactor.callLater(0.1, self._maintain_membership_record)
 
-    def start_leader_tracking(self):
+    def _start_leader_tracking(self):
         reactor.callLater(0, self._leadership_tracking_loop)
 
     @inlineCallbacks
@@ -172,7 +196,7 @@
 
         try:
 
-            # Attempt to acquire leadership lock. True indicates success,
+            # Attempt to acquire leadership lock. True indicates success;
             # False indicates there is already a leader. It's instance id
             # is then the value under the leader key service/voltha/leader.
 
@@ -196,12 +220,12 @@
             if record is not None:
                 if result is True:
                     if record['Session'] == self.session_id:
-                        self._assert_leadership()
+                        yield self._assert_leadership()
                     else:
                         pass  # confusion; need to retry leadership
                 else:
                     leader_id = record['Value']
-                    self._assert_nonleadership(leader_id)
+                    yield self._assert_nonleadership(leader_id)
 
             # if record was none, we shall try leadership again
 
@@ -227,13 +251,15 @@
             # no matter what, the loop need to continue (after a short delay)
             reactor.callLater(1, self._leadership_tracking_loop)
 
+    @inlineCallbacks
     def _assert_leadership(self):
         """(Re-)assert leadership"""
         if not self.i_am_leader:
             self.i_am_leader = True
             self.leader_id = self.instance_id
-            self._just_gained_leadership()
+            yield self._just_gained_leadership()
 
+    @inlineCallbacks
     def _assert_nonleadership(self, leader_id):
         """(Re-)assert non-leader role"""
 
@@ -242,13 +268,21 @@
 
         if self.i_am_leader:
             self.i_am_leader = False
-            self._just_lost_leadership()
+            yield self._just_lost_leadership()
 
     def _just_gained_leadership(self):
         self.log.info('became-leader')
+        self.leader = Leader(self)
+        return self.leader.start()
 
     def _just_lost_leadership(self):
         self.log.info('lost-leadership')
+        return self._halt_leader()
+
+    def _halt_leader(self):
+        d = self.leader.halt()
+        self.leader = None
+        return d
 
     @inlineCallbacks
     def _retry(self, func, *args, **kw):
@@ -257,13 +291,13 @@
                 result = yield func(*args, **kw)
                 break
             except ConsulException, e:
-                yield self.backoff('consul-not-upC')
+                yield self._backoff('consul-not-upC')
             except ConnectionError, e:
-                yield self.backoff('cannot-connect-to-consul')
+                yield self._backoff('cannot-connect-to-consul')
             except StaleMembershipEntryException, e:
-                yield self.backoff('stale-membership-record-in-the-way')
+                yield self._backoff('stale-membership-record-in-the-way')
             except Exception, e:
                 self.log.exception(e)
-                yield self.backoff('unknown-error')
+                yield self._backoff('unknown-error')
 
         returnValue(result)
diff --git a/voltha/leader.py b/voltha/leader.py
new file mode 100644
index 0000000..e1c677a
--- /dev/null
+++ b/voltha/leader.py
@@ -0,0 +1,175 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.base import DelayedCall
+from twisted.internet.defer import inlineCallbacks, DeferredList
+
+from asleep import asleep
+
+
+class Leader(object):
+    """
+    A single instance of this object shall exist across the whole cluster.
+    This is guaranteed by the coordinator which instantiates this class
+    only when it secured the leadership lock, as well as calling the halt()
+    method in cases it looses the leadership lock.
+    """
+
+    ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
+    WORKLOAD_PREFIX = 'service/voltha/workload/'
+
+    log = get_logger()
+
+    # Public methods:
+
+    def __init__(self, coordinator):
+        self.coorinator = coordinator
+        self.halted = False
+        self.soak_time = 5  # soak till membership/workload changes settle
+
+        self.workload = []
+        self.members = []
+        self.reassignment_soak_timer = None
+
+    @inlineCallbacks
+    def start(self):
+        self.log.info('leader-started')
+        yield self._validate_workload()
+        yield self._start_tracking_assignments()
+
+    def halt(self):
+        """Suspend leadership duties immediately"""
+        self.log.info('leader-halted')
+        self.halted = True
+
+        # any active cancellations, releases, etc., should happen here
+        if isinstance(self.reassignment_soak_timer, DelayedCall):
+            self.reassignment_soak_timer.cancel()
+
+    # Private methods:
+
+    @inlineCallbacks
+    def _validate_workload(self):
+        """
+        Workload is defined as any k/v entries under the workload prefix
+        in consul. Under normal operation, only the leader shall edit the
+        workload list. But we make sure that in case an administrator
+        manually edits the workload, we react to that properly.
+        """
+
+        # TODO for now we simply generate a fixed number of fake entries
+        yield DeferredList([
+            self.coorinator.kv_put(
+                self.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
+                'placeholder for device group %d data' % (i + 1))
+            for i in xrange(100)
+        ])
+
+    def _start_tracking_assignments(self):
+        """
+        We must track both the cluster member list as well as the workload
+        list. Upon change in either, we must rerun our sharding algorithm
+        and reassign work as/if needed.
+        """
+        reactor.callLater(0, self._track_workload, 0)
+        reactor.callLater(0, self._track_members, 0)
+
+    @inlineCallbacks
+    def _track_workload(self, index):
+
+        try:
+            (index, results) = yield self.coorinator.kv_get(
+                self.WORKLOAD_PREFIX, index=index, recurse=True)
+
+            workload = [e['Key'] for e in results]
+
+            if workload != self.workload:
+                self.log.info('workload-changed', workload=workload)
+                self.workload = workload
+                self._restart_reassignment_soak_timer()
+
+        except Exception, e:
+            self.log.exception('workload-track-error', e=e)
+            yield asleep(1.0)  # to prevent flood
+
+        finally:
+            if not self.halted:
+                reactor.callLater(0, self._track_workload, index)
+
+    @inlineCallbacks
+    def _track_members(self, index):
+
+        def is_member(entry):
+            key = entry['Key']
+            member_id = key[len(self.coorinator.MEMBERSHIP_PREFIX):]
+            return '/' not in member_id  # otherwise it is a nested key
+
+        try:
+            (index, results) = yield self.coorinator.kv_get(
+                self.coorinator.MEMBERSHIP_PREFIX, index=index, recurse=True)
+
+            members = [e['Key'] for e in results if is_member(e)]
+
+            if members != self.members:
+                self.log.info('membership-changed', members=members)
+                self.members = members
+                self._restart_reassignment_soak_timer()
+
+        except Exception, e:
+            self.log.exception('members-track-error', e=e)
+            yield asleep(1.0)  # to prevent flood
+
+        finally:
+            if not self.halted:
+                reactor.callLater(0, self._track_members, index)
+
+    def _restart_reassignment_soak_timer(self):
+
+        if self.reassignment_soak_timer is not None:
+            assert isinstance(self.reassignment_soak_timer, DelayedCall)
+            self.reassignment_soak_timer.cancel()
+
+        self.reassignment_soak_timer = reactor.callLater(
+            self.soak_time, self._reassign_work)
+
+        print self.reassignment_soak_timer
+
+    @inlineCallbacks
+    def _reassign_work(self):
+        self.log.info('reassign-work')
+        yield None
+
+        # TODO continue from here
+
+        # Plan
+        # Step 1: collect current assignments from consul
+        # Step 2: calculate desired assignment from current members and
+        #         workload list (e.g., using consistent hashing or any other
+        #         algorithm
+        # Step 3: find the delta between the desired and actual assignments:
+        #         these form two lists:
+        #         1. new assignments to be made
+        #         2. obsolete assignments to be revoked
+        #         graceful handling may be desirable when moving existing
+        #         assignment from existing member to another member (to make
+        #         sure it is abandoned by old member before new takes charge)
+        # Step 4: orchestrate the assignment by adding/deleting(/locking)
+        #         entries in consul
+        #
+        # We must make sure while we are working on this, we do not re-enter
+        # into same method!
diff --git a/voltha/northbound/rpc_dispatcher.py b/voltha/northbound/rpc_dispatcher.py
new file mode 100644
index 0000000..128ba07
--- /dev/null
+++ b/voltha/northbound/rpc_dispatcher.py
@@ -0,0 +1,29 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+RPC request dispatcher
+"""
+
+from twisted.internet.defer import DeferredQueue
+
+
+class RpcDispatcher(object):
+
+    def __init__(self):
+        self.queue = DeferredQueue()
+
+        # work in progress