VOL-340: Update voltha core to recuperate from a consul failure. This update also addresses VOL-341
Change-Id: I9176916e1e66bc35616314985a01f9d2305a5ea8
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 6eecc11..227be5d 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -31,6 +31,7 @@
from voltha.registry import IComponent
from worker import Worker
from simplejson import dumps, loads
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
log = get_logger()
@@ -74,6 +75,10 @@
'membership_watch_relatch_delay', 0.1)
self.tracking_loop_delay = config.get(
'tracking_loop_delay', 1)
+ self.session_renewal_timeout = config.get(
+ 'session_renewal_timeout', 2)
+ self.session_renewal_loop_delay = config.get(
+ 'session_renewal_loop_delay', 3)
self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
self.leader_prefix = '/'.join((self.prefix, self.config.get(
self.config['leader_key'], 'leader')))
@@ -105,11 +110,11 @@
self.worker = Worker(self.instance_id, self)
- host = consul.split(':')[0].strip()
- port = int(consul.split(':')[1].strip())
+ self.host = consul.split(':')[0].strip()
+ self.port = int(consul.split(':')[1].strip())
# TODO need to handle reconnect events properly
- self.consul = Consul(host=host, port=port)
+ self.consul = Consul(host=self.host, port=self.port)
self.wait_for_leader_deferreds = []
@@ -162,13 +167,13 @@
# Proxy methods for consul with retry support
def kv_get(self, *args, **kw):
- return self._retry(self.consul.kv.get, *args, **kw)
+ return self._retry('GET', *args, **kw)
def kv_put(self, *args, **kw):
- return self._retry(self.consul.kv.put, *args, **kw)
+ return self._retry('PUT', *args, **kw)
def kv_delete(self, *args, **kw):
- return self._retry(self.consul.kv.delete, *args, **kw)
+ return self._retry('DELETE', *args, **kw)
# Methods exposing key membership information
@@ -204,25 +209,13 @@
def _create_session(self):
@inlineCallbacks
- def _renew_session():
- try:
- result = yield self.consul.session.renew(
- session_id=self.session_id)
- log.debug('just renewed session', result=result)
- except Exception, e:
- log.exception('could-not-renew-session', e=e)
-
- @inlineCallbacks
def _create_session():
-
+ consul = yield self.get_consul()
# create consul session
- self.session_id = yield self.consul.session.create(
- behavior='release', ttl=60, lock_delay=1)
+ self.session_id = yield consul.session.create(
+ behavior='release', ttl=10, lock_delay=1)
log.info('created-consul-session', session_id=self.session_id)
-
- # start renewing session it 3 times within the ttl
- self.session_renew_timer = LoopingCall(_renew_session)
- self.session_renew_timer.start(3)
+ self._start_session_tracking()
yield self._retry(_create_session)
@@ -232,7 +225,7 @@
@inlineCallbacks
def _create_membership_record(self):
- yield self._retry(self._do_create_membership_record)
+ yield self._do_create_membership_record_with_retries()
reactor.callLater(0, self._maintain_membership_record)
def _create_membership_record_data(self):
@@ -242,12 +235,29 @@
return member_record
@inlineCallbacks
+ def _do_create_membership_record_with_retries(self):
+ while 1:
+ result = yield self._retry(
+ 'PUT',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
+ acquire=self.session_id)
+ if result:
+ log.debug('new-membership-record', session=self.session_id)
+ break
+ else:
+ log.warn('cannot-create-membership-record')
+ yield self._backoff('stale-membership-record')
+
+ @inlineCallbacks
def _do_create_membership_record(self):
- result = yield self.consul.kv.put(
+ consul = yield self.get_consul()
+ result = yield consul.kv.put(
self.membership_record_key,
dumps(self._create_membership_record_data()),
acquire=self.session_id)
if not result:
+ log.warn('cannot-create-membership-record')
raise StaleMembershipEntryException(self.instance_id)
@inlineCallbacks
@@ -255,19 +265,22 @@
index = None
try:
while 1:
- (index, record) = yield self._retry(self.consul.kv.get,
+ (index, record) = yield self._retry('GET',
self.membership_record_key,
+ wait='5s',
index=index)
- log.debug('membership-record-change-detected',
- index=index, record=record)
if record is None or \
'Session' not in record or \
record['Session'] != self.session_id:
- log.debug('remaking-membership-record')
- yield self._retry(self._do_create_membership_record)
+ log.debug('membership-record-change-detected',
+ old_session=self.session_id,
+ index=index,
+ record=record)
+
+ yield self._do_create_membership_record_with_retries()
except Exception, e:
- log.exception('unexpected-error-leader-trackin', e=e)
+ log.exception('unexpected-error-leader-tracking', e=e)
finally:
# except in shutdown, the loop must continue (after a short delay)
@@ -275,33 +288,98 @@
reactor.callLater(self.membership_watch_relatch_delay,
self._maintain_membership_record)
+ def _start_session_tracking(self):
+ reactor.callLater(0, self._session_tracking_loop)
+
+ @inlineCallbacks
+ def _session_tracking_loop(self):
+
+ @inlineCallbacks
+ def _redo_session():
+ log.info('_redo_session-before')
+ try:
+ yield self._delete_session()
+ except Exception as e:
+ log.exception('could-not-delete-old-session', e=e)
+
+ # Create a new consul connection/session
+ try:
+ self.consul = Consul(host=self.host, port=self.port)
+ self.session_id = yield self.consul.session.create(
+ behavior='release',
+ ttl=10,
+ lock_delay=1)
+ log.debug('new-consul-session', session=self.session_id)
+
+ # Update my membership
+ yield self._do_create_membership_record_with_retries()
+
+ except Exception as e:
+ log.exception('could-not-create-a-consul-session', e=e)
+
+ @inlineCallbacks
+ def _renew_session(m_callback):
+ try:
+ log.info('_renew_session-before')
+ result = yield self.consul.session.renew(
+ session_id=self.session_id)
+ log.debug('just-renewed-session', result=result)
+ if not m_callback.called:
+ # Triggering callback will cancel the timeout timer
+ log.debug('trigger-callback-to-cancel-timout-timer')
+ m_callback.callback(result)
+ except Exception, e:
+ # Let the invoking method receive a timeout
+ log.exception('could-not-renew-session', e=e)
+
+ try:
+ log.debug('session-tracking-start')
+ # Set a timeout timer (~ 2 secs) - should be more than
+ # enough to renew a session
+ rcvd = DeferredWithTimeout(timeout=self.session_renewal_timeout)
+ yield _renew_session(rcvd)
+ try:
+ _ = yield rcvd
+ except TimeOutError as e:
+ log.info('session-renew-timeout', e=e)
+ # Redo the session
+ yield _redo_session()
+ except Exception as e:
+ log.exception('session-renew-exception', e=e)
+ else:
+ log.debug('successfully-renewed-session')
+ except Exception as e:
+ print 'got an exception:{}'.format(e)
+ finally:
+ reactor.callLater(self.session_renewal_loop_delay,
+ self._session_tracking_loop)
+
def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
@inlineCallbacks
def _leadership_tracking_loop(self):
-
try:
-
# Attempt to acquire leadership lock. True indicates success;
# False indicates there is already a leader. It's instance id
# is then the value under the leader key service/voltha/leader.
# attempt acquire leader lock
- log.debug('leadership-attempt')
- result = yield self._retry(self.consul.kv.put,
+ log.debug('leadership-attempt-before')
+ result = yield self._retry('PUT',
self.leader_prefix,
self.instance_id,
acquire=self.session_id)
+ log.debug('leadership-attempt-after')
# read it back before being too happy; seeing our session id is a
# proof and now we have the change id that we can use to reliably
# track any changes. In an unlikely scenario where the leadership
# key gets wiped out administratively since the previous line,
# the returned record can be None. Handle it.
- (index, record) = yield self._retry(self.consul.kv.get,
+ (index, record) = yield self._retry('GET',
self.leader_prefix)
- log.debug('leadership-key',
+ log.debug('leader-prefix',
i_am_leader=result, index=index, record=record)
if record is not None:
@@ -320,14 +398,24 @@
last = record
while last is not None:
# this shall return only when update is made to leader key
- (index, updated) = yield self._retry(self.consul.kv.get,
+ # or expires after 5 seconds wait (in case consul took an
+ # unexpected vacation)
+ (index, updated) = yield self._retry('GET',
self.leader_prefix,
+ wait='5s',
index=index)
- log.debug('leader-key-change',
- index=index, updated=updated)
if updated is None or updated != last:
+ log.debug('leader-key-change',
+ index=index, updated=updated, last=last)
# leadership has changed or vacated (or forcefully
# removed), apply now
+ # If I was previoulsy the leader then assert a non
+ # leadership role before going for election
+ if self.i_am_leader:
+ log.debug('leaving-leaderdhip',
+ leader=self.instance_id)
+ yield self._assert_nonleadership(self.instance_id)
+
break
last = updated
@@ -376,22 +464,51 @@
return self._halt_leader()
def _halt_leader(self):
- d = self.leader.stop()
- self.leader = None
+ if self.leader:
+ d = self.leader.stop()
+ self.leader = None
return d
+ def get_consul(self):
+ return self.consul
+
@inlineCallbacks
- def _retry(self, func, *args, **kw):
+ def _retry(self, operation, *args, **kw):
while 1:
try:
- result = yield func(*args, **kw)
+ consul = yield self.get_consul()
+ log.debug('consul', consul=consul, operation=operation,
+ args=args)
+ if operation == 'GET':
+ result = yield consul.kv.get(*args, **kw)
+ elif operation == 'PUT':
+ for name, value in kw.items():
+ if name == 'acquire':
+ if value != self.session_id:
+ log.info('updating-session-in-put-operation',
+ old_session=value,
+ new_session=self.session_id)
+ kw['acquire'] = self.session_id
+ break
+ result = yield consul.kv.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = yield consul.kv.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = yield operation(*args, **kw)
self._clear_backoff()
break
except ConsulException, e:
+ log.exception('consul-not-up', consul=self.consul,
+ session=self.consul.Session, e=e)
yield self._backoff('consul-not-up')
except ConnectionError, e:
+ log.exception('cannot-connect-to-consul',
+ consul=self.consul, e=e)
yield self._backoff('cannot-connect-to-consul')
except StaleMembershipEntryException, e:
+ log.exception('stale-membership-record-in-the-way',
+ consul=self.consul, e=e)
yield self._backoff('stale-membership-record-in-the-way')
except Exception, e:
if not self.shutting_down:
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 0b1a38f..366b299 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from consul import Consul
+from consul import Consul, ConsulException
+from common.utils.asleep import asleep
+from requests import ConnectionError
+from twisted.internet.defer import inlineCallbacks, returnValue
import structlog
@@ -30,10 +33,16 @@
come in on the side and start modifying things which could be bad.
"""
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
def __init__(self, host, port, path_prefix):
self._consul = Consul(host=host, port=port)
+ self.host = host
+ self.port = port
self._path_prefix = path_prefix
self._cache = {}
+ self.retries = 0
def make_path(self, key):
return '{}/{}'.format(self._path_prefix, key)
@@ -41,7 +50,7 @@
def __getitem__(self, key):
if key in self._cache:
return self._cache[key]
- index, value = self._consul.kv.get(self.make_path(key))
+ value = self._kv_get(self.make_path(key))
if value is not None:
# consul turns empty strings to None, so we do the reverse here
self._cache[key] = value['Value'] or ''
@@ -52,7 +61,7 @@
def __contains__(self, key):
if key in self._cache:
return True
- index, value = self._consul.kv.get(self.make_path(key))
+ value = self._kv_get(self.make_path(key))
if value is not None:
self._cache[key] = value['Value']
return True
@@ -63,19 +72,78 @@
try:
assert isinstance(value, basestring)
self._cache[key] = value
- self._consul.kv.put(self.make_path(key), value)
+ self._kv_put(self.make_path(key), value)
except Exception, e:
log.exception('cannot-set-item', e=e)
def __delitem__(self, key):
self._cache.pop(key, None)
- self._consul.kv.delete(self.make_path(key))
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_consul_connection(self):
+ self._consul = Consul(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected-to-consul', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_consul(self):
+ return self._consul
+
+ # Proxy methods for consul with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+ while 1:
+ try:
+ consul = self._get_consul()
+ log.info('consul', consul=consul, operation=operation,
+ args=args)
+ if operation == 'GET':
+ index, result = consul.kv.get(*args, **kw)
+ elif operation == 'PUT':
+ result = consul.kv.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = consul.kv.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except ConsulException, e:
+ log.exception('consul-not-up', e=e)
+ self._backoff('consul-not-up')
+ except ConnectionError, e:
+ log.exception('cannot-connect-to-consul', e=e)
+ self._backoff('cannot-connect-to-consul')
+ except Exception, e:
+ log.exception(e)
+ self._backoff('unknown-error')
+ self._redo_consul_connection()
+
+ return result
def load_backend(store_id, store_prefix, args):
""" Return the kv store backend based on the command line arguments
"""
- # TODO: Make this more dynamic
def load_consul_store():
instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
diff --git a/voltha/leader.py b/voltha/leader.py
index ca8d9b4..a576a0b 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -59,6 +59,7 @@
self.core_store_assignment = None
self.reassignment_soak_timer = None
+ self.core_store_reassignment_soak_timer = None
self.workload_id_match = re.compile(
self.ID_EXTRACTOR % self.coord.workload_prefix).match
@@ -89,26 +90,14 @@
if not self.reassignment_soak_timer.called:
self.reassignment_soak_timer.cancel()
+ if isinstance(self.core_store_reassignment_soak_timer, DelayedCall):
+ if not self.core_store_reassignment_soak_timer.called:
+ self.core_store_reassignment_soak_timer.cancel()
+
log.info('stopped')
# Private methods:
- # @inlineCallbacks
- # def _validate_workload(self):
- # """
- # Workload is defined as any k/v entries under the workload prefix
- # in consul. Under normal operation, only the leader shall edit the
- # workload list. But we make sure that in case an administrator
- # manually edits the workload, we react to that properly.
- # """
- #
- # # TODO for now we simply generate a fixed number of fake entries
- # yield DeferredList([
- # self.coord.kv_put(
- # self.coord.workload_prefix + 'device_group_%04d' % (i + 1),
- # 'placeholder for device group %d data' % (i + 1))
- # for i in xrange(100)
- # ])
def _start_tracking_assignments(self):
"""
@@ -116,38 +105,8 @@
list. Upon change in either, we must rerun our sharding algorithm
and reassign work as/if needed.
"""
- # reactor.callLater(0, self._track_workload, 0)
reactor.callLater(0, self._track_members, 0)
- # @inlineCallbacks
- # def _track_workload(self, index):
- #
- # try:
- # (index, results) = yield self.coord.kv_get(
- # self.coord.workload_prefix, index=index, recurse=True)
- #
- # matches = (self.workload_id_match(e['Key']) for e in results)
- # workload = [m.group(2) for m in matches if m is not None]
- #
- # if workload != self.workload:
- # log.info('workload-changed',
- # old_workload_count=len(self.workload),
- # new_workload_count=len(workload))
- # self.workload = workload
- # self._restart_reassignment_soak_timer()
- #
- # except Exception, e:
- # log.exception('workload-track-error', e=e)
- # yield asleep(
- # self.coord.leader_config.get(
- # self.coord.leader_config[
- # 'workload_track_error_to_prevent_flood'], 1))
- # # to prevent flood
- #
- # finally:
- # if not self.halted:
- # reactor.callLater(0, self._track_workload, index)
-
@inlineCallbacks
def _get_core_store_mappings(self):
try:
@@ -198,28 +157,46 @@
@inlineCallbacks
def _track_members(self, index):
-
+ previous_index = index
try:
+ # Put a wait of 5 seconds to wait for a change of membership,
+ # if any. Without it, if all consul nodes go down then we will
+ # never get out of this watch.
(index, results) = yield self.coord.kv_get(
- self.coord.membership_prefix, index=index, recurse=True)
+ self.coord.membership_prefix, wait='5s', index=index, recurse=True)
- # Only members with valid session are considered active
- members = [{'id': self.member_id_match(e['Key']).group(2),
- 'host': loads(e['Value'])['host_address']}
- for e in results if 'Session' in e]
+ # This can happen if consul went down and came back with no data
+ if not results:
+ log.error('no-active-members')
+ # Bail out of leadership and go for an early election
+ self.coord._just_lost_leadership()
+ return
- log.info('active-members', active_members=members)
+ if previous_index != index:
+ log.info('membership-updated',
+ previous_index=previous_index, index=index)
- # Check if the two sets are the same
- if members != self.members:
- # update the current set of config
- yield self._update_core_store_references()
- log.info('membership-changed',
- prev_members=self.members,
- curr_members=members,
- core_store_mapping=self.core_store_assignment)
- self.members = members
- self._restart_core_store_reassignment_soak_timer()
+ # Rebuild the membership, if any
+
+ # Only members with valid session are considered active
+ members = [{'id': self.member_id_match(e['Key']).group(2),
+ 'host': loads(e['Value'])['host_address']}
+ for e in results if 'Session' in e]
+
+ log.info('active-members', active_members=members)
+
+ # Check if the two sets are the same
+ if members != self.members:
+ # update the current set of config
+ yield self._update_core_store_references()
+ log.info('membership-changed',
+ prev_members=self.members,
+ curr_members=members,
+ core_store_mapping=self.core_store_assignment)
+ self.members = members
+ self._restart_core_store_reassignment_soak_timer()
+ else:
+ log.debug('no-membership-change', index=index)
except Exception, e:
log.exception('members-track-error', e=e)
@@ -228,7 +205,6 @@
self.coord.leader_config[
'members_track_error_to_prevent_flood']), 1)
# to prevent flood
-
finally:
if not self.halted:
reactor.callLater(0, self._track_members, index)
@@ -245,12 +221,12 @@
def _restart_core_store_reassignment_soak_timer(self):
- if self.reassignment_soak_timer is not None:
- assert isinstance(self.reassignment_soak_timer, DelayedCall)
- if not self.reassignment_soak_timer.called:
- self.reassignment_soak_timer.cancel()
+ if self.core_store_reassignment_soak_timer is not None:
+ assert isinstance(self.core_store_reassignment_soak_timer, DelayedCall)
+ if not self.core_store_reassignment_soak_timer.called:
+ self.core_store_reassignment_soak_timer.cancel()
- self.reassignment_soak_timer = reactor.callLater(
+ self.core_store_reassignment_soak_timer = reactor.callLater(
self.soak_time, self._reassign_core_stores)
@inlineCallbacks
@@ -327,89 +303,3 @@
except Exception as e:
log.exception('config-reassignment-failure', e=e)
self._restart_core_store_reassignment_soak_timer()
-
- # @inlineCallbacks
- # def _reassign_work(self):
- #
- # log.info('reassign-work')
- #
- # # Plan
- # #
- # # Step 1: calculate desired assignment from current members and
- # # workload list (e.g., using consistent hashing or any other
- # # algorithm
- # # Step 2: collect current assignments from consul
- # # Step 3: find the delta between the desired and actual assignments:
- # # these form two lists:
- # # 1. new assignments to be made
- # # 2. obsolete assignments to be revoked
- # # graceful handling may be desirable when moving existing
- # # assignment from existing member to another member (to make
- # # sure it is abandoned by old member before new takes charge)
- # # Step 4: orchestrate the assignment by adding/deleting(/locking)
- # # entries in consul
- # #
- # # We must make sure while we are working on this, we do not re-enter
- # # into same method!
- #
- # try:
- #
- # # Step 1: generate wanted assignment (mapping work to members)
- #
- # ring = HashRing(self.members)
- # wanted_assignments = dict() # member_id -> set(work_id)
- # _ = [
- # wanted_assignments.setdefault(ring.get_node(work), set())
- # .add(work)
- # for work in self.workload
- # ]
- # for (member, work) in sorted(wanted_assignments.iteritems()):
- # log.info('assignment',
- # member=member, work_count=len(work))
- #
- # # Step 2: discover current assignment (from consul)
- #
- # (_, results) = yield self.coord.kv_get(
- # self.coord.assignment_prefix, recurse=True)
- #
- # matches = [
- # (self.assignment_match(e['Key']), e) for e in results or []]
- #
- # current_assignments = dict() # member_id -> set(work_id)
- # _ = [
- # current_assignments.setdefault(
- # m.groupdict()['member_id'], set())
- # .add(m.groupdict()['work_id'])
- # for m, e in matches if m is not None
- # ]
- #
- # # Step 3: handle revoked assignments first on a per member basis
- #
- # for member_id, current_work in current_assignments.iteritems():
- # assert isinstance(current_work, set)
- # wanted_work = wanted_assignments.get(member_id, set())
- # work_to_revoke = current_work.difference(wanted_work)
- #
- # # revoking work by simply deleting the assignment entry
- # # TODO if we want some feedback to see that member abandoned
- # # work, we could add a consul-based protocol here
- # for work_id in work_to_revoke:
- # yield self.coord.kv_delete(
- # self.coord.assignment_prefix
- # + member_id + '/' + work_id)
- #
- # # Step 4: assign new work as needed
- #
- # for member_id, wanted_work in wanted_assignments.iteritems():
- # assert isinstance(wanted_work, set)
- # current_work = current_assignments.get(member_id, set())
- # work_to_assign = wanted_work.difference(current_work)
- #
- # for work_id in work_to_assign:
- # yield self.coord.kv_put(
- # self.coord.assignment_prefix
- # + member_id + '/' + work_id, '')
- #
- # except Exception, e:
- # log.exception('failed-reassignment', e=e)
- # self._restart_reassignment_soak_timer() # try again in a while
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index bd91ec4..edb1669 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -62,6 +62,8 @@
workload_key: 'work'
membership_watch_relatch_delay: 0.1
tracking_loop_delay: 1
+ session_renewal_loop_delay: 3
+ session_renewal_timeout: 2
worker:
time_to_let_leader_update: 5