Finish ledaership tracking
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index ab85947..432c0fe 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -35,10 +35,16 @@
from asleep import asleep
+class StaleMembershipEntryException(Exception):
+ pass
+
+
class Coordinator(object):
CONNECT_RETRY_INTERVAL_SEC = 1
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+ LEADER_KEY = 'service/voltha/leader'
+ MEMBERSHIP_PREFIX = 'service/voltha/members/'
def __init__(self,
internal_host_address,
@@ -52,9 +58,12 @@
self.internal_host_address = internal_host_address
self.external_host_address = external_host_address
self.rest_port = rest_port
+ self.membership_record_key = self.MEMBERSHIP_PREFIX + self.instance_id
self.session_id = None
self.i_am_leader = False
+ self.leader_id = None # will be the instance id of the current leader
+ self.shutting_down = False
self.log = get_logger()
self.log.info('initializing-coordinator')
@@ -69,54 +78,29 @@
self.log.info('initialized-coordinator')
@inlineCallbacks
- def shutdown(self):
- yield self.delete_session()
-
- @inlineCallbacks
def async_init(self):
yield self.create_session()
yield self.create_membership_record()
- yield self.elect_leader()
+ yield self.start_leader_tracking()
+
+ @inlineCallbacks
+ def shutdown(self):
+ self.shutting_down = True
+ yield self.delete_session() # this will delete the leader lock too
def backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
len(self.RETRY_BACKOFF) - 1)]
self.retries += 1
- self.log.error(msg + ', retrying in %s second(s)' % wait_time)
+ self.log.error(msg, retry_in=wait_time)
return asleep(wait_time)
def clear_backoff(self):
if self.retries:
- self.log.info('Reconnected to consul agent after %d retries'
- % self.retries)
+ self.log.info('reconnected-to-consul', after_retries=self.retries)
self.retries = 0
@inlineCallbacks
- def kv_put(self, key, value, retry=True):
- while 1:
- try:
- response = yield self.consul.kv.put(key, value)
- self.clear_backoff()
- returnValue(response)
-
- except ConsulException, e:
- if retry:
- yield self.backoff('Consul not yet up')
- else:
- raise e
- except ConnectionError, e:
- if retry:
- yield self.backoff('Cannot connect to consul agent')
- else:
- raise e
- except Exception, e:
- self.log.exception(e)
- if retry:
- yield self.backoff('Unknown error')
- else:
- raise e
-
- @inlineCallbacks
def create_session(self):
@inlineCallbacks
@@ -133,7 +117,7 @@
# create consul session
self.session_id = yield self.consul.session.create(
- behavior='delete', ttl=10)
+ behavior='delete', ttl=10, lock_delay=1)
self.log.info('created-consul-session', session_id=self.session_id)
# start renewing session it 3 times within the ttl
@@ -148,32 +132,123 @@
@inlineCallbacks
def create_membership_record(self):
- # create ephemeral k/v registering this instance in the
- # service/voltha/members/<instance-id> node
- result = yield self.consul.kv.put(
- 'service/voltha/members/%s' % self.instance_id, 'alive',
- acquire=self.session_id)
- assert result is True
+ yield self._retry(self._create_membership_record)
+ reactor.callLater(0, self._maintain_membership_record)
@inlineCallbacks
- def elect_leader(self):
- """
- Attempt to become the leader by acquiring the leader key and
- track the leader anyway
- """
+ def _create_membership_record(self):
+ result = yield self.consul.kv.put(
+ self.membership_record_key, 'alive',
+ acquire=self.session_id)
+ if not result:
+ raise StaleMembershipEntryException(self.instance_id)
- # attempt acquire leader lock
- result = yield self.consul.kv.put('service/voltha/leader',
- self.instance_id,
- acquire=self.session_id)
+ @inlineCallbacks
+ def _maintain_membership_record(self):
+ index = None
+ try:
+ while 1:
+ (index, record) = yield self._retry(self.consul.kv.get,
+ self.membership_record_key,
+ index=index)
+ self.log.debug('membership-record-change-detected',
+ index=index, record=record)
+ if record is None or record['Session'] != self.session_id:
+ self.log.debug('remaking-membership-record')
+ yield self._create_membership_record()
- # read it back before being too happy; seeing our session id is a
- # proof and now we have the change id that we can use to reliably
- # track any changes
+ except Exception, e:
+ self.log.exception('unexpected-error-leader-trackin', e=e)
- # TODO continue from here !!!
- if result is True:
+ finally:
+ # no matter what, the loop need to continue (after a short delay)
+ reactor.callAfter(0.1, self._maintain_membership_record)
+
+ def start_leader_tracking(self):
+ reactor.callLater(0, self._leadership_tracking_loop)
+
+ @inlineCallbacks
+ def _leadership_tracking_loop(self):
+
+ try:
+
+ # Attempt to acquire leadership lock. True indicates success,
+ # False indicates there is already a leader. It's instance id
+ # is then the value under the leader key service/voltha/leader.
+
+ # attempt acquire leader lock
+ self.log.debug('leadership-attempt')
+ result = yield self._retry(self.consul.kv.put,
+ self.LEADER_KEY,
+ self.instance_id,
+ acquire=self.session_id)
+
+ # read it back before being too happy; seeing our session id is a
+ # proof and now we have the change id that we can use to reliably
+ # track any changes. In an unlikely scenario where the leadership
+ # key gets wiped out administratively since the previous line,
+ # the returned record can be None. Handle it.
+ (index, record) = yield self._retry(self.consul.kv.get,
+ self.LEADER_KEY)
+ self.log.debug('leadership-key',
+ i_am_leader=result, index=index, record=record)
+
+ if record is not None:
+ if result is True:
+ if record['Session'] == self.session_id:
+ self._assert_leadership()
+ else:
+ pass # confusion; need to retry leadership
+ else:
+ leader_id = record['Value']
+ self._assert_nonleadership(leader_id)
+
+ # if record was none, we shall try leadership again
+
+ # using consul's watch feature, start tracking any changes to key
+ last = record
+ while last is not None:
+ # this shall return only when update is made to leader key
+ (index, updated) = yield self._retry(self.consul.kv.get,
+ self.LEADER_KEY,
+ index=index)
+ self.log.debug('leader-key-change',
+ index=index, updated=updated)
+ if updated is None or updated != last:
+ # leadership has changed or vacated (or forcefully
+ # removed), apply now
+ break
+ last = updated
+
+ except Exception, e:
+ self.log.exception('unexpected-error-leader-trackin', e=e)
+
+ finally:
+ # no matter what, the loop need to continue (after a short delay)
+ reactor.callLater(1, self._leadership_tracking_loop)
+
+ def _assert_leadership(self):
+ """(Re-)assert leadership"""
+ if not self.i_am_leader:
self.i_am_leader = True
+ self.leader_id = self.instance_id
+ self._just_gained_leadership()
+
+ def _assert_nonleadership(self, leader_id):
+ """(Re-)assert non-leader role"""
+
+ # update leader_id anyway
+ self.leader_id = leader_id
+
+ if self.i_am_leader:
+ self.i_am_leader = False
+ self._just_lost_leadership()
+
+ def _just_gained_leadership(self):
+ self.log.info('became-leader')
+
+ def _just_lost_leadership(self):
+ self.log.info('lost-leadership')
@inlineCallbacks
def _retry(self, func, *args, **kw):
@@ -182,11 +257,13 @@
result = yield func(*args, **kw)
break
except ConsulException, e:
- yield self.backoff('Consul not yet up')
+ yield self.backoff('consul-not-upC')
except ConnectionError, e:
- yield self.backoff('Cannot connect to consul agent')
+ yield self.backoff('cannot-connect-to-consul')
+ except StaleMembershipEntryException, e:
+ yield self.backoff('stale-membership-record-in-the-way')
except Exception, e:
self.log.exception(e)
- yield self.backoff('Unknown error')
+ yield self.backoff('unknown-error')
returnValue(result)