VOL-582: Update the Voltha core to support either consul or etcd in a kubernetes cluster
The voltha core is refactored such that it interworks with either consul or etcd.
The refactoring should not have changed any of voltha's behaviour with consul. In
cluster mode the update was tested with only the vcli, envoy, vcore, and KV-store
(consul or etcd) containers:
- executed itests on a single-node system
- tested with consul on a kubernetes 3-node cluster
- tested with etcd on a kubernetes 3-node cluster
- tested with consul on a 3-node docker swarm
- container and node failure testing not performed
I believe Consul is still required in single-node operation because the combination of
consul and registrator is used to provide a DNS-like service. This service would have
to be migrated to etcd.
This update enables some experimentation of voltha in the kubernetes environment.
There is more work to be done. Future updates could:
- replace the current polling of etcd keys with an asynchronous technique
Change-Id: Ieea5b34acd9bcf70b760a4d7d2be9b1f88026cd5
diff --git a/compose/docker-compose-system-test-with-etcd.yml b/compose/docker-compose-system-test-with-etcd.yml
new file mode 100644
index 0000000..a02a9cf
--- /dev/null
+++ b/compose/docker-compose-system-test-with-etcd.yml
@@ -0,0 +1,340 @@
+version: '2'
+services:
+ #
+ # Single-node zookeeper service
+ #
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - 2181
+ environment:
+ SERVICE_2181_NAME: "zookeeper"
+ #
+ # Single-node kafka service
+ #
+ kafka:
+ image: wurstmeister/kafka
+ ports:
+ - 9092
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
+ SERVICE_9092_NAME: "kafka"
+ depends_on:
+ - consul
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ #
+ # Single-node consul agent
+ #
+ consul:
+ image: consul:latest
+ command: agent -server -bootstrap -client 0.0.0.0 -ui
+ ports:
+ - "8300:8300"
+ - "8400:8400"
+ - "8500:8500"
+ - "8600:8600/udp"
+ environment:
+ #SERVICE_53_IGNORE: "yes"
+ SERVICE_8300_IGNORE: "yes"
+ SERVICE_8400_IGNORE: "yes"
+ SERVICE_8500_NAME: "consul-rest"
+ #
+ # Single-node etcd server
+ #
+ etcd:
+ image: quay.io/coreos/etcd:v3.2.9
+ command: [
+ "etcd",
+ "--name=etcd0",
+ "--advertise-client-urls=http://${DOCKER_HOST_IP}:2379,http://${DOCKER_HOST_IP}:4001",
+ "--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001",
+ "--initial-advertise-peer-urls=http://${DOCKER_HOST_IP}:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://${DOCKER_HOST_IP}:2380",
+ "--initial-cluster-state=new"
+ ]
+ ports:
+ - "2379:2379"
+ - 2380
+ - 4001
+ #
+ # Registrator
+ #
+ registrator:
+ image: gliderlabs/registrator:latest
+ command: [
+ "-ip=${DOCKER_HOST_IP}",
+ "-retry-attempts", "100",
+ "-cleanup",
+ # "-internal",
+ "consul://consul:8500"
+ ]
+ links:
+ - consul
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+
+ #
+ # Fluentd log server
+ #
+ fluentd:
+ image: fluent/fluentd
+ ports:
+ - "24224:24224"
+ volumes:
+ - "/tmp/fluentd:/fluentd/log"
+ environment:
+ SERVICE_24224_NAME: "fluentd-intake"
+
+ #
+ # Graphite-Grafana-statsd service instance
+ # (demo place-holder for external KPI system)
+ #
+ grafana:
+ image: voltha/grafana
+ ports:
+ - "8883:80"
+ - "2003:2003"
+ - "2004:2004"
+ - "8126:8126"
+ - "8125:8125/udp"
+ environment:
+ SERVICE_80_NAME: "grafana-web-ui"
+ SERVICE_2003_NAME: "carbon-plain-text-intake"
+ SERVICE_2004_NAME: "carbon-pickle-intake"
+ SERVICE_8126_NAME: "statsd-tcp-intake"
+ SERVICE_8125_NAME: "statsd-udp-intake"
+ GR_SERVER_ROOT_URL: "http://localhost:80/grafana/"
+
+ #
+ # Shovel (Kafka-graphite-gateway)
+ #
+ shovel:
+ image: cord/shovel
+ command: [
+ "/shovel/shovel/main.py",
+ "--kafka=@kafka",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--topic=voltha.kpis",
+ "--host=${DOCKER_HOST_IP}"
+ ]
+ depends_on:
+ - consul
+ - kafka
+ - grafana
+ restart: unless-stopped
+
+ #
+ # Voltha server instance(s)
+ #
+ voltha:
+ image: cord/voltha
+ command: [
+ "/voltha/voltha/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--etcd=${DOCKER_HOST_IP}:2379",
+ "--fluentd=fluentd:24224",
+ "--rest-port=8880",
+ "--grpc-port=50556",
+ "--kafka=@kafka",
+ "--instance-id-is-container-name",
+ "--interface=eth1",
+ "--backend=etcd",
+ "-v"
+ ]
+ ports:
+ - 8880
+ - 50556
+ - 18880
+ - "60001:60001"
+ depends_on:
+ - consul
+ - etcd
+ links:
+ - consul
+ - etcd
+ - fluentd
+ environment:
+ SERVICE_8880_NAME: "voltha-health"
+ SERVICE_8880_CHECK_HTTP: "/health"
+ SERVICE_8880_CHECK_INTERVAL: "5s"
+ SERVICE_8880_CHECK_TIMEOUT: "1s"
+ SERVICE_18880_NAME: "voltha-sim-rest"
+ SERVICE_HOST_IP: "${DOCKER_HOST_IP}"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+ - ponmgmt
+
+ envoy:
+ image: voltha/envoy
+ entrypoint:
+ - /usr/local/bin/envoyd
+ - -envoy-cfg-template
+ - "/envoy/voltha-grpc-proxy.template.json"
+ - -envoy-config
+ - "/envoy/voltha-grpc-proxy.json"
+ - -kv
+ - "etcd"
+ - -kv-svc-name
+ - "etcd"
+ - -kv-port
+ - "2379"
+
+
+ ports:
+ - "50555:50555"
+ - "8882:8882"
+ - "8443:8443"
+ - "8001:8001"
+ environment:
+ SERVICE_50555_NAME: "voltha-grpc"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+ - ponmgmt
+ links:
+ - voltha:vcore
+ #
+ # Voltha cli container
+ #
+ vcli:
+ image: cord/vcli
+ command: [
+ "/cli/cli/setup.sh",
+ "-L",
+ "-G"
+ ]
+ environment:
+ DOCKER_HOST_IP: "${DOCKER_HOST_IP}"
+ ports:
+ - "5022:22"
+ depends_on:
+ - voltha
+
+#############################################
+# Item below this line will soon be removed.#
+#############################################
+
+ #
+ # ofagent server instance
+ #
+ ofagent:
+ image: cord/ofagent
+ command: [
+ "/ofagent/ofagent/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--fluentd=fluentd:24224",
+ "--controller=${DOCKER_HOST_IP}:6653",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "--enable-tls",
+ "--key-file=/ofagent/pki/voltha.key",
+ "--cert-file=/ofagent/pki/voltha.crt",
+ "-v"
+ ]
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ restart: unless-stopped
+
+ #
+ # Netconf server instance(s)
+ #
+ netconf:
+ image: cord/netconf
+ privileged: true
+ command: [
+ "/netconf/netconf/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--fluentd=fluentd:24224",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "-v"
+ ]
+ ports:
+ - "830:1830"
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ environment:
+ SERVICE_1830_NAME: "netconf-server"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+
+ #
+ # Dashboard daemon
+ #
+ dashd:
+ image: cord/dashd
+ command: [
+ "/dashd/dashd/main.py",
+ "--kafka=@kafka",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--grafana_url=http://admin:admin@${DOCKER_HOST_IP}:8883/api",
+ "--topic=voltha.kpis",
+ "--docker_host=${DOCKER_HOST_IP}"
+ ]
+ depends_on:
+ - consul
+ - kafka
+ - grafana
+ restart: unless-stopped
+
+ #
+ # Nginx service consolidation
+ #
+ nginx:
+ image: voltha/nginx
+ ports:
+ - "80:80"
+ environment:
+ CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+ command: [
+ "/nginx_config/start_service.sh"
+ ]
+ depends_on:
+ - consul
+ - grafana
+ - portainer
+ restart: unless-stopped
+
+ #
+ # Docker ui
+ #
+ portainer:
+ image: voltha/portainer
+ ports:
+ - "9000:9000"
+ environment:
+ CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+ restart: unless-stopped
+ entrypoint: ["/portainer", "--logo", "/docker/images/logo_alt.png"]
+ volumes:
+ - "/var/run/docker.sock:/var/run/docker.sock"
+
+networks:
+ default:
+ driver: bridge
+ ponmgmt:
+ driver: bridge
+ driver_opts:
+ com.docker.network.bridge.name: "ponmgmt"
diff --git a/requirements.txt b/requirements.txt
index 545f400..b3c8b86 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -42,6 +42,7 @@
termcolor==1.1.0
treq==17.8.0
Twisted==17.9.0
+txaioetcd==0.3.0
urllib3==1.22
pyang==1.7.3
lxml==3.6.4
@@ -52,6 +53,7 @@
ncclient==0.5.3
xmltodict==0.11.0
dicttoxml==1.7.4
+etcd3==0.7.0
# python-consul>=0.6.1 we need the pre-released version for now, because 0.6.1 does not
# yet support Twisted. Once this is released, it will be the 0.6.2 version
diff --git a/voltha/coordinator_etcd.py b/voltha/coordinator_etcd.py
new file mode 100644
index 0000000..cb424fd
--- /dev/null
+++ b/voltha/coordinator_etcd.py
@@ -0,0 +1,621 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Etcd-based coordinator services """
+
+from consul import ConsulException
+from consul.twisted import Consul
+from requests import ConnectionError
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
+from zope.interface import implementer
+
+from leader import Leader
+from common.utils.asleep import asleep
+from common.utils.message_queue import MessageQueue
+from voltha.registry import IComponent
+from worker import Worker
+from simplejson import dumps, loads
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+log = get_logger()
+
+
+class StaleMembershipEntryException(Exception):
+ pass
+
+
+@implementer(IComponent)
+class CoordinatorEtcd(object):
+ """
+ An app shall instantiate only one Coordinator (singleton).
+ A single instance of this object shall take care of all external
+ with consul, and via consul, all coordination activities with its
+ clustered peers. Roles include:
+ - registering an ephemeral membership entry (k/v record) in consul
+ - participating in a symmetric leader election, and potentially assuming
+ the leader's role. What leadership entails is not a concern for the
+ coordination, it simply instantiates (and shuts down) a leader class
+ when it gains (or looses) leadership.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ # Public methods:
+
+ def __init__(self,
+ internal_host_address,
+ external_host_address,
+ instance_id,
+ rest_port,
+ config,
+ consul='localhost:8500',
+ etcd='localhost:2379'):
+
+ log.info('initializing-coordinator')
+ self.config = config['coordinator']
+ self.worker_config = config['worker']
+ self.leader_config = config['leader']
+ self.membership_watch_relatch_delay = config.get(
+ 'membership_watch_relatch_delay', 0.1)
+ self.tracking_loop_delay = self.config.get(
+ 'tracking_loop_delay', 1)
+ self.session_renewal_timeout = self.config.get(
+ 'session_renewal_timeout', 5)
+ self.session_renewal_loop_delay = self.config.get(
+ 'session_renewal_loop_delay', 3)
+ self.membership_maintenance_loop_delay = self.config.get(
+ 'membership_maintenance_loop_delay', 5)
+ self.session_time_to_live = self.config.get(
+ 'session_time_to_live', 10)
+ self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
+ self.leader_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['leader_key'], 'leader')))
+ self.membership_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['membership_key'], 'members'), ''))
+ self.assignment_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['assignment_key'], 'assignments'), ''))
+ self.workload_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['workload_key'], 'work'), ''))
+ self.core_store_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['core_store_key'], 'data/core')))
+ self.core_store_assignment_key = self.core_store_prefix + \
+ '/assignment'
+ self.core_storage_suffix = 'core_store'
+
+ self.retries = 0
+ self.instance_id = instance_id
+ self.internal_host_address = internal_host_address
+ self.external_host_address = external_host_address
+ self.rest_port = rest_port
+ self.membership_record_key = self.membership_prefix + self.instance_id
+
+ self.lease = None
+ # session_id refers to either a Consul session ID or an Etcd lease object
+ self.session_id = None
+ self.i_am_leader = False
+ self.leader_id = None # will be the instance id of the current leader
+ self.shutting_down = False
+ self.leader = None
+ self.membership_callback = None
+
+ self.worker = Worker(self.instance_id, self)
+
+ self.host = consul.split(':')[0].strip()
+ self.port = int(consul.split(':')[1].strip())
+
+ # TODO need to handle reconnect events properly
+ self.consul = Consul(host=self.host, port=self.port)
+
+ # Create etcd client
+ kv_host = etcd.split(':')[0].strip()
+ kv_port = etcd.split(':')[1].strip()
+ self.etcd_url = u'http://' + kv_host + u':' + kv_port
+ self.etcd = Client(reactor, self.etcd_url)
+
+ self.wait_for_leader_deferreds = []
+
+ self.peers_mapping_queue = MessageQueue()
+
+ def start(self):
+ log.debug('starting')
+ reactor.callLater(0, self._async_init)
+ log.info('started')
+ return self
+
+ @inlineCallbacks
+ def stop(self):
+ log.debug('stopping')
+ self.shutting_down = True
+ yield self._delete_session() # this will delete the leader lock too
+ yield self.worker.stop()
+ if self.leader is not None:
+ yield self.leader.stop()
+ self.leader = None
+ log.info('stopped')
+
+ def wait_for_a_leader(self):
+ """
+ Async wait till a leader is detected/elected. The deferred will be
+ called with the leader's instance_id
+ :return: Deferred.
+ """
+ d = Deferred()
+ if self.leader_id is not None:
+ d.callback(self.leader_id)
+ return d
+ else:
+ self.wait_for_leader_deferreds.append(d)
+ return d
+
+ # Wait for a core data id to be assigned to this voltha instance
+ @inlineCallbacks
+ def get_core_store_id_and_prefix(self):
+ core_store_id = yield self.worker.get_core_store_id()
+ returnValue((core_store_id, self.core_store_prefix))
+
+ def recv_peers_map(self):
+ return self.peers_mapping_queue.get()
+
+ def publish_peers_map_change(self, msg):
+ self.peers_mapping_queue.put(msg)
+
+ # Proxy methods for consul with retry support
+
+ def kv_get(self, *args, **kw):
+ # Intercept 'index' argument
+ for name, value in kw.items():
+ if name == 'index':
+ kw.pop('index')
+ break
+ return self._retry('GET', *args, **kw)
+
+ def kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ # Methods exposing key membership information
+
+ @inlineCallbacks
+ def get_members(self):
+ """Return list of all members"""
+ _, members = yield self.kv_get(self.membership_prefix, recurse=True)
+ returnValue([member['Key'][len(self.membership_prefix):]
+ for member in members])
+
+ # Private (internal) methods:
+
+ @inlineCallbacks
+ def _async_init(self):
+ yield self._create_session()
+ yield self._create_membership_record()
+ yield self._start_leader_tracking()
+ yield self.worker.start()
+
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected-to-consul', after_retries=self.retries)
+ self.retries = 0
+
+ @inlineCallbacks
+ def _create_session(self):
+
+ @inlineCallbacks
+ def _create_session():
+ etcd = yield self.get_kv_client()
+ # Create etcd lease
+ self.lease = yield etcd.lease(self.session_time_to_live)
+ self.session_id = self.lease
+ log.info('created-etcd-lease', lease=self.session_id)
+ self._start_session_tracking()
+
+ yield self._retry(_create_session)
+
+ @inlineCallbacks
+ def _delete_session(self):
+ try:
+ yield self.lease.revoke()
+ except Exception as e:
+ log.exception('failed-to-delete-session',
+ session_id=self.session_id)
+
+ @inlineCallbacks
+ def _create_membership_record(self):
+ yield self._do_create_membership_record_with_retries()
+ reactor.callLater(0, self._maintain_membership_record)
+
+ @inlineCallbacks
+ def _maintain_membership_record(self):
+ try:
+ while 1:
+ valid_membership = yield self._assert_membership_record_valid()
+ if not valid_membership:
+ log.info('recreating-membership-before',
+ session=self.session_id)
+ yield self._do_create_membership_record_with_retries()
+ log.info('recreating-membership-after',
+ session=self.session_id)
+ else:
+ log.debug('valid-membership', session=self.session_id)
+ # Async sleep before checking the membership record again
+ yield asleep(self.membership_maintenance_loop_delay)
+
+ except Exception, e:
+ log.exception('unexpected-error-leader-trackin', e=e)
+ finally:
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.membership_watch_relatch_delay,
+ self._maintain_membership_record)
+
+ def _create_membership_record_data(self):
+ member_record = dict()
+ member_record['status'] = 'alive'
+ member_record['host_address'] = self.external_host_address
+ return member_record
+
+ @inlineCallbacks
+ def _assert_membership_record_valid(self):
+ try:
+ log.info('membership-record-before')
+ is_timeout, (_, record) = yield \
+ self.consul_get_with_timeout(
+ key=self.membership_record_key,
+ index=0,
+ timeout=5)
+ if is_timeout:
+ returnValue(False)
+
+ log.info('membership-record-after', record=record)
+ if record is None or \
+ 'Session' not in record:
+ log.info('membership-record-change-detected',
+ old_session=self.session_id,
+ record=record)
+ returnValue(False)
+ else:
+ returnValue(True)
+ except Exception as e:
+ log.exception('membership-validation-exception', e=e)
+ returnValue(False)
+
+ @inlineCallbacks
+ def _do_create_membership_record_with_retries(self):
+ while 1:
+ log.info('recreating-membership', session=self.session_id)
+ result = yield self._retry(
+ 'PUT',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
+ acquire=self.session_id)
+ if result:
+ log.info('new-membership-record-created',
+ session=self.session_id)
+ break
+ else:
+ log.warn('cannot-create-membership-record')
+ yield self._backoff('stale-membership-record')
+
+ def _start_session_tracking(self):
+ reactor.callLater(0, self._session_tracking_loop)
+
+ @inlineCallbacks
+ def _session_tracking_loop(self):
+
+ @inlineCallbacks
+ def _redo_session():
+ log.info('_redo_session-before')
+ yield self._delete_session()
+
+ # Create a new etcd connection/session with a new lease
+ try:
+ self.etcd = Client(reactor, self.etcd_url)
+ self.lease = yield self.etcd.lease(self.session_time_to_live)
+ self.session_id = self.lease
+ log.info('new-etcd-session', session=self.session_id)
+
+ except Exception as e:
+ log.exception('could-not-create-an-etcd-lease', e=e)
+
+ @inlineCallbacks
+ def _renew_session(m_callback):
+ try:
+ time_left = yield self.lease.remaining()
+ log.info('_renew_session', time_left=time_left)
+ result = yield self.lease.refresh()
+ log.info('just-renewed-session', result=result)
+ if not m_callback.called:
+ # Triggering callback will cancel the timeout timer
+ log.info('trigger-callback-to-cancel-timeout-timer')
+ m_callback.callback(result)
+ else:
+ # Timeout event has already been called. Just ignore
+ # this event
+ log.info('renew-called-after-timeout, etcd ref changed?')
+ except Exception, e:
+ # Let the invoking method receive a timeout
+ log.exception('could-not-renew-session', e=e)
+
+ try:
+ while 1:
+ log.debug('session-tracking-start')
+ rcvd = DeferredWithTimeout(
+ timeout=self.session_renewal_timeout)
+ _renew_session(rcvd)
+ try:
+ _ = yield rcvd
+ except TimeOutError as e:
+ log.info('session-renew-timeout', e=e)
+ # Redo the session
+ yield _redo_session()
+ except Exception as e:
+ log.exception('session-renew-exception', e=e)
+ else:
+ log.debug('successfully-renewed-session')
+
+ # Async sleep before the next session tracking
+ yield asleep(self.session_renewal_loop_delay)
+
+ except Exception as e:
+ log.exception('renew-exception', e=e)
+ finally:
+ reactor.callLater(self.session_renewal_loop_delay,
+ self._session_tracking_loop)
+
+ def _start_leader_tracking(self):
+ reactor.callLater(0, self._leadership_tracking_loop)
+
+ @inlineCallbacks
+ def _leadership_tracking_loop(self):
+ log.info('leadership-attempt-before')
+
+ # Try to acquire leadership lease via test-and-set operation.
+ # Success means the leader key was previously absent and was
+ # just re-created by this instance.
+ leader_prefix = bytes(self.leader_prefix)
+ txn = Transaction(
+ compare=[
+ CompVersion(leader_prefix, '==', 0)
+ ],
+ success=[
+ OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
+ OpGet(leader_prefix)
+ ],
+ failure=[]
+ )
+ newly_asserted = False
+ try:
+ result = yield self.etcd.submit(txn)
+ except Failed as failed:
+ log.info('Leader key PRESENT')
+ for response in failed.responses:
+ log.info('Leader key already present', response=response)
+ else:
+ newly_asserted = True
+ log.info('Leader key ABSENT')
+ for response in result.responses:
+ log.info('Leader key was absent', response=response)
+
+ log.info('leadership-attempt-after')
+
+ # Confirm that the assertion succeeded by reading back the value
+ # of the leader key.
+ leader = None
+ result = yield self.etcd.get(b'service/voltha/leader')
+ if result.kvs:
+ kv = result.kvs[0]
+ leader = kv.value
+ log.info('Leader readback', leader=leader, instance=self.instance_id)
+
+ if leader is None:
+ log.info('Failed to read leader key')
+ elif leader == self.instance_id:
+ if newly_asserted:
+ log.info("I JUST BECAME LEADER!")
+ yield self._assert_leadership()
+ else:
+ log.info("I'm an aging LEADER")
+ else:
+ log.info('The LEADER is another', leader=leader)
+ yield self._assert_nonleadership(leader)
+
+ # May have to add code here to handle case where, for some reason, the lease
+ # had been blown away and the txn failed for that reason
+
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.tracking_loop_delay,
+ self._leadership_tracking_loop)
+
+ @inlineCallbacks
+ def _assert_leadership(self):
+ """(Re-)assert leadership"""
+ if not self.i_am_leader:
+ self.i_am_leader = True
+ self._set_leader_id(self.instance_id)
+ yield self._just_gained_leadership()
+
+ @inlineCallbacks
+ def _assert_nonleadership(self, leader_id):
+ """(Re-)assert non-leader role"""
+
+ # update leader_id anyway
+ self._set_leader_id(leader_id)
+
+ if self.i_am_leader:
+ self.i_am_leader = False
+ yield self._just_lost_leadership()
+
+ def _set_leader_id(self, leader_id):
+ self.leader_id = leader_id
+ deferreds, self.wait_for_leader_deferreds = \
+ self.wait_for_leader_deferreds, []
+ for d in deferreds:
+ d.callback(leader_id)
+
+ def _just_gained_leadership(self):
+ log.info('became-leader')
+ self.leader = Leader(self)
+ return self.leader.start()
+
+ def _just_lost_leadership(self):
+ log.info('lost-leadership')
+ return self._halt_leader()
+
+ def _halt_leader(self):
+ if self.leader:
+ d = self.leader.stop()
+ self.leader = None
+ return d
+
+ def get_kv_client(self):
+ return self.etcd
+
+ @inlineCallbacks
+ def _retry(self, operation, *args, **kw):
+ prefix = False
+ for name, value in kw.items():
+ if name == 'acquire':
+ lease = value
+ kw['lease'] = lease
+ kw.pop('acquire')
+ elif name == 'keys':
+ kw['keys_only'] = True
+ kw.pop('keys')
+ elif name=='recurse':
+# if value == 'True':
+ prefix = True
+ keyset = KeySet(bytes(args[0]), prefix=True)
+ kw.pop('recurse')
+ log.info('start-op', operation=operation, args=args, kw=kw)
+
+ while 1:
+ try:
+ etcd = yield self.get_kv_client()
+ if operation == 'GET':
+ key = bytes(args[0])
+ # If multiple keys requested, return a list
+ # else return a single record
+ if not prefix:
+ index = 0
+ record = dict()
+ res = yield etcd.get(key, **kw)
+ if res.kvs:
+ if len(res.kvs) == 1:
+ kv = res.kvs[0]
+ index = kv.mod_revision
+ record['Key'] = kv.key
+ record['Value'] = kv.value
+ record['ModifyIndex'] = index
+ record['Session'] = self.lease.lease_id if self.lease else ''
+ result = (index, record)
+ else:
+ # Get values for all keys that match the prefix
+ index = 0
+ records = []
+ res = yield etcd.get(keyset, **kw)
+ if args[0] == 'service/voltha/assignments/':
+ log.info('assignments', result=res)
+ if res.kvs and len(res.kvs) > 0:
+ for kv in res.kvs:
+ # Which index should be returned? The max over all keys?
+ if kv.mod_revision > index:
+ index = kv.mod_revision
+ rec = dict()
+ rec['Key'] = kv.key
+ rec['Value'] = kv.value
+ rec['ModifyIndex'] = kv.mod_revision
+ rec['Session'] = self.lease.lease_id if self.lease else ''
+ records.append(rec)
+ result = (index, records)
+ elif operation == 'PUT':
+ key = bytes(args[0])
+ result = yield etcd.set(key, args[1], **kw)
+ elif operation == 'DELETE':
+ key = bytes(args[0])
+ result = yield etcd.delete(key, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = yield operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ if not self.shutting_down:
+ log.exception(e)
+ yield self._backoff('unknown-error')
+
+ log.info('end-op', operation=operation, args=args, kw=kw, result=result)
+ returnValue(result)
+
+ @inlineCallbacks
+ def consul_get_with_timeout(self, key, timeout, **kw):
+ """
+ Query etcd with a timeout
+ :param key: Key to query
+ :param timeout: timeout value
+ :param kw: additional key-value params
+ :return: (is_timeout, (index, result)).
+
+ The Consul version of this method performed a 'wait-type' get operation
+ that returned a result when the key's value had a ModifyIndex greater
+ than the 'index' argument. Not sure etcd supports this functionality.
+ """
+
+ # Intercept 'index' argument
+ for name, value in kw.items():
+ if name == 'index':
+ mod_revision = value
+ log.info('consul_get_with_timeout', index=mod_revision)
+ kw.pop('index')
+ break
+
+ @inlineCallbacks
+ def _get(key, m_callback):
+ try:
+ (index, result) = yield self._retry('GET', key, **kw)
+ if index > mod_revision and not m_callback.called:
+ log.debug('got-result-cancelling-timer')
+ m_callback.callback((index, result))
+ except Exception as e:
+ log.exception('got-exception', e=e)
+
+ try:
+ rcvd = DeferredWithTimeout(timeout=timeout)
+ _get(key, rcvd)
+ try:
+ result = yield rcvd
+ log.debug('result-received', result=result)
+ returnValue((False, result))
+ except TimeOutError as e:
+ log.debug('timeout-or-no-data-change', key=key)
+ except Exception as e:
+ log.exception('exception', e=e)
+ except Exception as e:
+ log.exception('exception', e=e)
+
+ returnValue((True, (None, None)))
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 1360375..f7a5805 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -17,6 +17,7 @@
from requests import ConnectionError
from twisted.internet.defer import inlineCallbacks, returnValue
+import etcd3
import structlog
log = structlog.get_logger()
@@ -141,6 +142,120 @@
return result
+class EtcdStore(object):
+ """ Config kv store for etcd with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+ self._etcd = etcd3.client(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return value
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_etcd_connection(self):
+ self._etcd = etcd3.client(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected-to-etcd', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_etcd(self):
+ return self._etcd
+
+ # Proxy methods for consul with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+ log.info('backend-op', operation=operation, args=args, kw=kw)
+ while 1:
+ try:
+ etcd = self._get_etcd()
+ log.debug('etcd', etcd=etcd, operation=operation,
+ args=args)
+ if operation == 'GET':
+ (value, meta) = etcd.get(*args, **kw)
+ result = (value, meta)
+ elif operation == 'PUT':
+ result = etcd.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = etcd.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ log.exception(e)
+ self._backoff('unknown-error-with-etcd')
+ self._redo_etcd_connection()
+
+ return result
+
+
def load_backend(store_id, store_prefix, args):
""" Return the kv store backend based on the command line arguments
"""
@@ -151,9 +266,16 @@
host, port = args.consul.split(':', 1)
return ConsulStore(host, int(port), instance_core_store_prefix)
+ def load_etcd_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.etcd.split(':', 1)
+ return EtcdStore(host, int(port), instance_core_store_prefix)
+
loaders = {
'none': lambda: None,
- 'consul': load_consul_store
+ 'consul': load_consul_store,
+ 'etcd': load_etcd_store
}
return loaders[args.backend]()
diff --git a/voltha/main.py b/voltha/main.py
index 27b29f9..de2acfc 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -36,6 +36,7 @@
get_my_primary_local_ipv4
from voltha.adapters.loader import AdapterLoader
from voltha.coordinator import Coordinator
+from voltha.coordinator_etcd import CoordinatorEtcd
from voltha.core.core import VolthaCore
from voltha.core.config.config_backend import load_backend
from voltha.northbound.diagnostics import Diagnostics
@@ -52,6 +53,7 @@
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
+ etcd=os.environ.get('ETCD', 'localhost:2379'),
inter_core_subnet=os.environ.get('INTER_CORE_SUBNET', None),
pon_subnet=os.environ.get('PON_SUBNET', None),
external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
@@ -87,6 +89,11 @@
default=defs['consul'],
help=_help)
+ _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+ parser.add_argument(
+ '-e', '--etcd', dest='etcd', action='store',
+ default=defs['etcd'],
+ help=_help)
_help = ('<inter_core_subnet> is the subnet connecting all the voltha '
'instances in a cluster (default: %s)' % defs['inter_core_subnet'])
@@ -215,7 +222,7 @@
_help = 'backend to use for config persitence'
parser.add_argument('-b', '--backend',
default=defs['backend'],
- choices=['none', 'consul'],
+ choices=['none', 'consul', 'etcd'],
help=_help)
args = parser.parse_args()
@@ -321,20 +328,34 @@
internal_host=self.args.internal_host_address,
external_host=self.args.external_host_address,
interface=self.args.interface,
- consul=self.args.consul)
+ consul=self.args.consul,
+ etcd=self.args.etcd)
registry.register('main', self)
- yield registry.register(
- 'coordinator',
- Coordinator(
- internal_host_address=self.args.internal_host_address,
- external_host_address=self.args.external_host_address,
- rest_port=self.args.rest_port,
- instance_id=self.instance_id,
- config=self.config,
- consul=self.args.consul)
- ).start()
+ if self.args.backend == 'consul':
+ yield registry.register(
+ 'coordinator',
+ Coordinator(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.instance_id,
+ config=self.config,
+ consul=self.args.consul)
+ ).start()
+ elif self.args.backend == 'etcd':
+ yield registry.register(
+ 'coordinator',
+ CoordinatorEtcd(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.instance_id,
+ config=self.config,
+ consul=self.args.consul,
+ etcd=self.args.etcd)
+ ).start()
self.log.info('waiting-for-config-assignment')