Add leader, membership, and workload tracking
Still work in progress, but leader correctly detects workload
and/or membership change events and properly soaks before
rerunning reassignment routine (which is not flashed out yet).
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 432c0fe..8992ccb 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -16,14 +16,6 @@
""" Consul-based coordinator services """
-# TODO move this to the consul.twisted async client once it is available.
-# Note:
-# We use https://github.com/cablehead/python-consul for consul client.
-# It's master branch already provides support for Twisted, but the latest
-# released version (0.6.1) was cut before twisted support was added. So keep
-# an eye on when 0.6.2 comes out and move over to the twisted interface once
-# it's available.
-
from consul import ConsulException
from consul.twisted import Consul
from requests import ConnectionError
@@ -33,6 +25,7 @@
from twisted.internet.task import LoopingCall
from asleep import asleep
+from leader import Leader
class StaleMembershipEntryException(Exception):
@@ -40,18 +33,32 @@
class Coordinator(object):
+ """
+ An app shall instantiate only one Coordinator (singleton).
+ A single instance of this object shall take care of all external
+ with consul, and via consul, all coordination activities with its
+ clustered peers. Roles include:
+ - registering an ephemeral membership entry (k/v record) in consul
+ - participating in a symmetric leader election, and potentially assuming
+ the leader's role. What leadership entails is not a concern for the
+ coordination, it simply instantiates (and shuts down) a leader class
+ when it gains (or looses) leadership.
+ """
CONNECT_RETRY_INTERVAL_SEC = 1
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
LEADER_KEY = 'service/voltha/leader'
MEMBERSHIP_PREFIX = 'service/voltha/members/'
+ # Public methods:
+
def __init__(self,
internal_host_address,
external_host_address,
instance_id,
rest_port,
- consul='localhost:8500'):
+ consul='localhost:8500',
+ leader_class=Leader):
self.retries = 0
self.instance_id = instance_id
@@ -64,6 +71,7 @@
self.i_am_leader = False
self.leader_id = None # will be the instance id of the current leader
self.shutting_down = False
+ self.leader = None
self.log = get_logger()
self.log.info('initializing-coordinator')
@@ -74,34 +82,50 @@
# TODO need to handle reconnect events properly
self.consul = Consul(host=host, port=port)
- reactor.callLater(0, self.async_init)
+ reactor.callLater(0, self._async_init)
self.log.info('initialized-coordinator')
@inlineCallbacks
- def async_init(self):
- yield self.create_session()
- yield self.create_membership_record()
- yield self.start_leader_tracking()
-
- @inlineCallbacks
def shutdown(self):
self.shutting_down = True
- yield self.delete_session() # this will delete the leader lock too
+ yield self._delete_session() # this will delete the leader lock too
+ if self.leader is not None:
+ yield self.leader.halt()
+ self.leader = None
- def backoff(self, msg):
+ # Proxy methods for consul with retry support
+
+ def kv_get(self, *args, **kw):
+ return self._retry(self.consul.kv.get, *args, **kw)
+
+ def kv_put(self, *args, **kw):
+ return self._retry(self.consul.kv.put, *args, **kw)
+
+ def kv_delete(self, *args, **kw):
+ return self._retry(self.consul.kv.delete, *args, **kw)
+
+ # Private (internal) methods:
+
+ @inlineCallbacks
+ def _async_init(self):
+ yield self._create_session()
+ yield self._create_membership_record()
+ yield self._start_leader_tracking()
+
+ def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
self.log.error(msg, retry_in=wait_time)
return asleep(wait_time)
- def clear_backoff(self):
+ def _clear_backoff(self):
if self.retries:
self.log.info('reconnected-to-consul', after_retries=self.retries)
self.retries = 0
@inlineCallbacks
- def create_session(self):
+ def _create_session(self):
@inlineCallbacks
def _renew_session():
@@ -127,16 +151,16 @@
yield self._retry(_create_session)
@inlineCallbacks
- def delete_session(self):
+ def _delete_session(self):
yield self.consul.session.destroy(self.session_id)
@inlineCallbacks
- def create_membership_record(self):
- yield self._retry(self._create_membership_record)
+ def _create_membership_record(self):
+ yield self._retry(self._do_create_membership_record)
reactor.callLater(0, self._maintain_membership_record)
@inlineCallbacks
- def _create_membership_record(self):
+ def _do_create_membership_record(self):
result = yield self.consul.kv.put(
self.membership_record_key, 'alive',
acquire=self.session_id)
@@ -155,16 +179,16 @@
index=index, record=record)
if record is None or record['Session'] != self.session_id:
self.log.debug('remaking-membership-record')
- yield self._create_membership_record()
+ yield self._do_create_membership_record()
except Exception, e:
self.log.exception('unexpected-error-leader-trackin', e=e)
finally:
# no matter what, the loop need to continue (after a short delay)
- reactor.callAfter(0.1, self._maintain_membership_record)
+ reactor.callLater(0.1, self._maintain_membership_record)
- def start_leader_tracking(self):
+ def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
@inlineCallbacks
@@ -172,7 +196,7 @@
try:
- # Attempt to acquire leadership lock. True indicates success,
+ # Attempt to acquire leadership lock. True indicates success;
# False indicates there is already a leader. It's instance id
# is then the value under the leader key service/voltha/leader.
@@ -196,12 +220,12 @@
if record is not None:
if result is True:
if record['Session'] == self.session_id:
- self._assert_leadership()
+ yield self._assert_leadership()
else:
pass # confusion; need to retry leadership
else:
leader_id = record['Value']
- self._assert_nonleadership(leader_id)
+ yield self._assert_nonleadership(leader_id)
# if record was none, we shall try leadership again
@@ -227,13 +251,15 @@
# no matter what, the loop need to continue (after a short delay)
reactor.callLater(1, self._leadership_tracking_loop)
+ @inlineCallbacks
def _assert_leadership(self):
"""(Re-)assert leadership"""
if not self.i_am_leader:
self.i_am_leader = True
self.leader_id = self.instance_id
- self._just_gained_leadership()
+ yield self._just_gained_leadership()
+ @inlineCallbacks
def _assert_nonleadership(self, leader_id):
"""(Re-)assert non-leader role"""
@@ -242,13 +268,21 @@
if self.i_am_leader:
self.i_am_leader = False
- self._just_lost_leadership()
+ yield self._just_lost_leadership()
def _just_gained_leadership(self):
self.log.info('became-leader')
+ self.leader = Leader(self)
+ return self.leader.start()
def _just_lost_leadership(self):
self.log.info('lost-leadership')
+ return self._halt_leader()
+
+ def _halt_leader(self):
+ d = self.leader.halt()
+ self.leader = None
+ return d
@inlineCallbacks
def _retry(self, func, *args, **kw):
@@ -257,13 +291,13 @@
result = yield func(*args, **kw)
break
except ConsulException, e:
- yield self.backoff('consul-not-upC')
+ yield self._backoff('consul-not-upC')
except ConnectionError, e:
- yield self.backoff('cannot-connect-to-consul')
+ yield self._backoff('cannot-connect-to-consul')
except StaleMembershipEntryException, e:
- yield self.backoff('stale-membership-record-in-the-way')
+ yield self._backoff('stale-membership-record-in-the-way')
except Exception, e:
self.log.exception(e)
- yield self.backoff('unknown-error')
+ yield self._backoff('unknown-error')
returnValue(result)
diff --git a/voltha/leader.py b/voltha/leader.py
new file mode 100644
index 0000000..e1c677a
--- /dev/null
+++ b/voltha/leader.py
@@ -0,0 +1,175 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.base import DelayedCall
+from twisted.internet.defer import inlineCallbacks, DeferredList
+
+from asleep import asleep
+
+
+class Leader(object):
+ """
+ A single instance of this object shall exist across the whole cluster.
+ This is guaranteed by the coordinator which instantiates this class
+ only when it secured the leadership lock, as well as calling the halt()
+ method in cases it looses the leadership lock.
+ """
+
+ ASSIGNMENT_PREFIX = 'service/voltha/assignments/'
+ WORKLOAD_PREFIX = 'service/voltha/workload/'
+
+ log = get_logger()
+
+ # Public methods:
+
+ def __init__(self, coordinator):
+ self.coorinator = coordinator
+ self.halted = False
+ self.soak_time = 5 # soak till membership/workload changes settle
+
+ self.workload = []
+ self.members = []
+ self.reassignment_soak_timer = None
+
+ @inlineCallbacks
+ def start(self):
+ self.log.info('leader-started')
+ yield self._validate_workload()
+ yield self._start_tracking_assignments()
+
+ def halt(self):
+ """Suspend leadership duties immediately"""
+ self.log.info('leader-halted')
+ self.halted = True
+
+ # any active cancellations, releases, etc., should happen here
+ if isinstance(self.reassignment_soak_timer, DelayedCall):
+ self.reassignment_soak_timer.cancel()
+
+ # Private methods:
+
+ @inlineCallbacks
+ def _validate_workload(self):
+ """
+ Workload is defined as any k/v entries under the workload prefix
+ in consul. Under normal operation, only the leader shall edit the
+ workload list. But we make sure that in case an administrator
+ manually edits the workload, we react to that properly.
+ """
+
+ # TODO for now we simply generate a fixed number of fake entries
+ yield DeferredList([
+ self.coorinator.kv_put(
+ self.WORKLOAD_PREFIX + 'device_group_%04d' % (i + 1),
+ 'placeholder for device group %d data' % (i + 1))
+ for i in xrange(100)
+ ])
+
+ def _start_tracking_assignments(self):
+ """
+ We must track both the cluster member list as well as the workload
+ list. Upon change in either, we must rerun our sharding algorithm
+ and reassign work as/if needed.
+ """
+ reactor.callLater(0, self._track_workload, 0)
+ reactor.callLater(0, self._track_members, 0)
+
+ @inlineCallbacks
+ def _track_workload(self, index):
+
+ try:
+ (index, results) = yield self.coorinator.kv_get(
+ self.WORKLOAD_PREFIX, index=index, recurse=True)
+
+ workload = [e['Key'] for e in results]
+
+ if workload != self.workload:
+ self.log.info('workload-changed', workload=workload)
+ self.workload = workload
+ self._restart_reassignment_soak_timer()
+
+ except Exception, e:
+ self.log.exception('workload-track-error', e=e)
+ yield asleep(1.0) # to prevent flood
+
+ finally:
+ if not self.halted:
+ reactor.callLater(0, self._track_workload, index)
+
+ @inlineCallbacks
+ def _track_members(self, index):
+
+ def is_member(entry):
+ key = entry['Key']
+ member_id = key[len(self.coorinator.MEMBERSHIP_PREFIX):]
+ return '/' not in member_id # otherwise it is a nested key
+
+ try:
+ (index, results) = yield self.coorinator.kv_get(
+ self.coorinator.MEMBERSHIP_PREFIX, index=index, recurse=True)
+
+ members = [e['Key'] for e in results if is_member(e)]
+
+ if members != self.members:
+ self.log.info('membership-changed', members=members)
+ self.members = members
+ self._restart_reassignment_soak_timer()
+
+ except Exception, e:
+ self.log.exception('members-track-error', e=e)
+ yield asleep(1.0) # to prevent flood
+
+ finally:
+ if not self.halted:
+ reactor.callLater(0, self._track_members, index)
+
+ def _restart_reassignment_soak_timer(self):
+
+ if self.reassignment_soak_timer is not None:
+ assert isinstance(self.reassignment_soak_timer, DelayedCall)
+ self.reassignment_soak_timer.cancel()
+
+ self.reassignment_soak_timer = reactor.callLater(
+ self.soak_time, self._reassign_work)
+
+ print self.reassignment_soak_timer
+
+ @inlineCallbacks
+ def _reassign_work(self):
+ self.log.info('reassign-work')
+ yield None
+
+ # TODO continue from here
+
+ # Plan
+ # Step 1: collect current assignments from consul
+ # Step 2: calculate desired assignment from current members and
+ # workload list (e.g., using consistent hashing or any other
+ # algorithm
+ # Step 3: find the delta between the desired and actual assignments:
+ # these form two lists:
+ # 1. new assignments to be made
+ # 2. obsolete assignments to be revoked
+ # graceful handling may be desirable when moving existing
+ # assignment from existing member to another member (to make
+ # sure it is abandoned by old member before new takes charge)
+ # Step 4: orchestrate the assignment by adding/deleting(/locking)
+ # entries in consul
+ #
+ # We must make sure while we are working on this, we do not re-enter
+ # into same method!
diff --git a/voltha/northbound/rpc_dispatcher.py b/voltha/northbound/rpc_dispatcher.py
new file mode 100644
index 0000000..128ba07
--- /dev/null
+++ b/voltha/northbound/rpc_dispatcher.py
@@ -0,0 +1,29 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+RPC request dispatcher
+"""
+
+from twisted.internet.defer import DeferredQueue
+
+
+class RpcDispatcher(object):
+
+ def __init__(self):
+ self.queue = DeferredQueue()
+
+ # work in progress