[7697]
VOL-582: Update the Voltha core to support either consul or etcd in a kubernetes cluster
This is an initial commit for etcd support by voltha.
Although this code works on a single-node or on a kubernetes cluster, it is NOT ready
for prime time. The integration tests have yet to be written and a kubernetes install
is not yet available.
Change-Id: I666a29a0a68cd7c2ab3c79aff8ccc6e96db96419
diff --git a/compose/docker-compose-system-test-with-etcd.yml b/compose/docker-compose-system-test-with-etcd.yml
new file mode 100644
index 0000000..a02a9cf
--- /dev/null
+++ b/compose/docker-compose-system-test-with-etcd.yml
@@ -0,0 +1,340 @@
+version: '2'
+services:
+ #
+ # Single-node zookeeper service
+ #
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - 2181
+ environment:
+ SERVICE_2181_NAME: "zookeeper"
+ #
+ # Single-node kafka service
+ #
+ kafka:
+ image: wurstmeister/kafka
+ ports:
+ - 9092
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
+ SERVICE_9092_NAME: "kafka"
+ depends_on:
+ - consul
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ #
+ # Single-node consul agent
+ #
+ consul:
+ image: consul:latest
+ command: agent -server -bootstrap -client 0.0.0.0 -ui
+ ports:
+ - "8300:8300"
+ - "8400:8400"
+ - "8500:8500"
+ - "8600:8600/udp"
+ environment:
+ #SERVICE_53_IGNORE: "yes"
+ SERVICE_8300_IGNORE: "yes"
+ SERVICE_8400_IGNORE: "yes"
+ SERVICE_8500_NAME: "consul-rest"
+ #
+ # Single-node etcd server
+ #
+ etcd:
+ image: quay.io/coreos/etcd:v3.2.9
+ command: [
+ "etcd",
+ "--name=etcd0",
+ "--advertise-client-urls=http://${DOCKER_HOST_IP}:2379,http://${DOCKER_HOST_IP}:4001",
+ "--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001",
+ "--initial-advertise-peer-urls=http://${DOCKER_HOST_IP}:2380",
+ "--listen-peer-urls=http://0.0.0.0:2380",
+ "--initial-cluster-token=etcd-cluster-1",
+ "--initial-cluster=etcd0=http://${DOCKER_HOST_IP}:2380",
+ "--initial-cluster-state=new"
+ ]
+ ports:
+ - "2379:2379"
+ - 2380
+ - 4001
+ #
+ # Registrator
+ #
+ registrator:
+ image: gliderlabs/registrator:latest
+ command: [
+ "-ip=${DOCKER_HOST_IP}",
+ "-retry-attempts", "100",
+ "-cleanup",
+ # "-internal",
+ "consul://consul:8500"
+ ]
+ links:
+ - consul
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+
+ #
+ # Fluentd log server
+ #
+ fluentd:
+ image: fluent/fluentd
+ ports:
+ - "24224:24224"
+ volumes:
+ - "/tmp/fluentd:/fluentd/log"
+ environment:
+ SERVICE_24224_NAME: "fluentd-intake"
+
+ #
+ # Graphite-Grafana-statsd service instance
+ # (demo place-holder for external KPI system)
+ #
+ grafana:
+ image: voltha/grafana
+ ports:
+ - "8883:80"
+ - "2003:2003"
+ - "2004:2004"
+ - "8126:8126"
+ - "8125:8125/udp"
+ environment:
+ SERVICE_80_NAME: "grafana-web-ui"
+ SERVICE_2003_NAME: "carbon-plain-text-intake"
+ SERVICE_2004_NAME: "carbon-pickle-intake"
+ SERVICE_8126_NAME: "statsd-tcp-intake"
+ SERVICE_8125_NAME: "statsd-udp-intake"
+ GR_SERVER_ROOT_URL: "http://localhost:80/grafana/"
+
+ #
+ # Shovel (Kafka-graphite-gateway)
+ #
+ shovel:
+ image: cord/shovel
+ command: [
+ "/shovel/shovel/main.py",
+ "--kafka=@kafka",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--topic=voltha.kpis",
+ "--host=${DOCKER_HOST_IP}"
+ ]
+ depends_on:
+ - consul
+ - kafka
+ - grafana
+ restart: unless-stopped
+
+ #
+ # Voltha server instance(s)
+ #
+ voltha:
+ image: cord/voltha
+ command: [
+ "/voltha/voltha/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--etcd=${DOCKER_HOST_IP}:2379",
+ "--fluentd=fluentd:24224",
+ "--rest-port=8880",
+ "--grpc-port=50556",
+ "--kafka=@kafka",
+ "--instance-id-is-container-name",
+ "--interface=eth1",
+ "--backend=etcd",
+ "-v"
+ ]
+ ports:
+ - 8880
+ - 50556
+ - 18880
+ - "60001:60001"
+ depends_on:
+ - consul
+ - etcd
+ links:
+ - consul
+ - etcd
+ - fluentd
+ environment:
+ SERVICE_8880_NAME: "voltha-health"
+ SERVICE_8880_CHECK_HTTP: "/health"
+ SERVICE_8880_CHECK_INTERVAL: "5s"
+ SERVICE_8880_CHECK_TIMEOUT: "1s"
+ SERVICE_18880_NAME: "voltha-sim-rest"
+ SERVICE_HOST_IP: "${DOCKER_HOST_IP}"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+ - ponmgmt
+
+ envoy:
+ image: voltha/envoy
+ entrypoint:
+ - /usr/local/bin/envoyd
+ - -envoy-cfg-template
+ - "/envoy/voltha-grpc-proxy.template.json"
+ - -envoy-config
+ - "/envoy/voltha-grpc-proxy.json"
+ - -kv
+ - "etcd"
+ - -kv-svc-name
+ - "etcd"
+ - -kv-port
+ - "2379"
+
+
+ ports:
+ - "50555:50555"
+ - "8882:8882"
+ - "8443:8443"
+ - "8001:8001"
+ environment:
+ SERVICE_50555_NAME: "voltha-grpc"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+ - ponmgmt
+ links:
+ - voltha:vcore
+ #
+ # Voltha cli container
+ #
+ vcli:
+ image: cord/vcli
+ command: [
+ "/cli/cli/setup.sh",
+ "-L",
+ "-G"
+ ]
+ environment:
+ DOCKER_HOST_IP: "${DOCKER_HOST_IP}"
+ ports:
+ - "5022:22"
+ depends_on:
+ - voltha
+
+#############################################
+# Item below this line will soon be removed.#
+#############################################
+
+ #
+ # ofagent server instance
+ #
+ ofagent:
+ image: cord/ofagent
+ command: [
+ "/ofagent/ofagent/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--fluentd=fluentd:24224",
+ "--controller=${DOCKER_HOST_IP}:6653",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "--enable-tls",
+ "--key-file=/ofagent/pki/voltha.key",
+ "--cert-file=/ofagent/pki/voltha.crt",
+ "-v"
+ ]
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ restart: unless-stopped
+
+ #
+ # Netconf server instance(s)
+ #
+ netconf:
+ image: cord/netconf
+ privileged: true
+ command: [
+ "/netconf/netconf/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--fluentd=fluentd:24224",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "-v"
+ ]
+ ports:
+ - "830:1830"
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ environment:
+ SERVICE_1830_NAME: "netconf-server"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+
+ #
+ # Dashboard daemon
+ #
+ dashd:
+ image: cord/dashd
+ command: [
+ "/dashd/dashd/main.py",
+ "--kafka=@kafka",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--grafana_url=http://admin:admin@${DOCKER_HOST_IP}:8883/api",
+ "--topic=voltha.kpis",
+ "--docker_host=${DOCKER_HOST_IP}"
+ ]
+ depends_on:
+ - consul
+ - kafka
+ - grafana
+ restart: unless-stopped
+
+ #
+ # Nginx service consolidation
+ #
+ nginx:
+ image: voltha/nginx
+ ports:
+ - "80:80"
+ environment:
+ CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+ command: [
+ "/nginx_config/start_service.sh"
+ ]
+ depends_on:
+ - consul
+ - grafana
+ - portainer
+ restart: unless-stopped
+
+ #
+ # Docker ui
+ #
+ portainer:
+ image: voltha/portainer
+ ports:
+ - "9000:9000"
+ environment:
+ CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+ restart: unless-stopped
+ entrypoint: ["/portainer", "--logo", "/docker/images/logo_alt.png"]
+ volumes:
+ - "/var/run/docker.sock:/var/run/docker.sock"
+
+networks:
+ default:
+ driver: bridge
+ ponmgmt:
+ driver: bridge
+ driver_opts:
+ com.docker.network.bridge.name: "ponmgmt"
diff --git a/requirements.txt b/requirements.txt
index 545f400..b3c8b86 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -42,6 +42,7 @@
termcolor==1.1.0
treq==17.8.0
Twisted==17.9.0
+txaioetcd==0.3.0
urllib3==1.22
pyang==1.7.3
lxml==3.6.4
@@ -52,6 +53,7 @@
ncclient==0.5.3
xmltodict==0.11.0
dicttoxml==1.7.4
+etcd3==0.7.0
# python-consul>=0.6.1 we need the pre-released version for now, because 0.6.1 does not
# yet support Twisted. Once this is released, it will be the 0.6.2 version
diff --git a/voltha/coordinator_etcd.py b/voltha/coordinator_etcd.py
new file mode 100644
index 0000000..238223f
--- /dev/null
+++ b/voltha/coordinator_etcd.py
@@ -0,0 +1,621 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Etcd-based coordinator services """
+
+from consul import ConsulException
+from consul.twisted import Consul
+from requests import ConnectionError
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
+from zope.interface import implementer
+
+from leader import Leader
+from common.utils.asleep import asleep
+from common.utils.message_queue import MessageQueue
+from voltha.registry import IComponent
+from worker import Worker
+from simplejson import dumps, loads
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+log = get_logger()
+
+
+class StaleMembershipEntryException(Exception):
+ pass
+
+
+@implementer(IComponent)
+class CoordinatorEtcd(object):
+ """
+ An app shall instantiate only one Coordinator (singleton).
+ A single instance of this object shall take care of all external
+ with consul, and via consul, all coordination activities with its
+ clustered peers. Roles include:
+ - registering an ephemeral membership entry (k/v record) in consul
+ - participating in a symmetric leader election, and potentially assuming
+ the leader's role. What leadership entails is not a concern for the
+ coordination, it simply instantiates (and shuts down) a leader class
+ when it gains (or looses) leadership.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ # Public methods:
+
+ def __init__(self,
+ internal_host_address,
+ external_host_address,
+ instance_id,
+ rest_port,
+ config,
+ consul='localhost:8500',
+ etcd='localhost:2379'):
+
+ log.info('initializing-coordinator')
+ self.config = config['coordinator']
+ self.worker_config = config['worker']
+ self.leader_config = config['leader']
+ self.membership_watch_relatch_delay = config.get(
+ 'membership_watch_relatch_delay', 0.1)
+ self.tracking_loop_delay = self.config.get(
+ 'tracking_loop_delay', 1)
+ self.session_renewal_timeout = self.config.get(
+ 'session_renewal_timeout', 5)
+ self.session_renewal_loop_delay = self.config.get(
+ 'session_renewal_loop_delay', 3)
+ self.membership_maintenance_loop_delay = self.config.get(
+ 'membership_maintenance_loop_delay', 5)
+ self.session_time_to_live = self.config.get(
+ 'session_time_to_live', 10)
+ self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
+ self.leader_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['leader_key'], 'leader')))
+ self.membership_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['membership_key'], 'members'), ''))
+ self.assignment_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['assignment_key'], 'assignments'), ''))
+ self.workload_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['workload_key'], 'work'), ''))
+ self.core_store_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['core_store_key'], 'data/core')))
+ self.core_store_assignment_key = self.core_store_prefix + \
+ '/assignment'
+ self.core_storage_suffix = 'core_store'
+
+ self.retries = 0
+ self.instance_id = instance_id
+ self.internal_host_address = internal_host_address
+ self.external_host_address = external_host_address
+ self.rest_port = rest_port
+ self.membership_record_key = self.membership_prefix + self.instance_id
+
+ self.lease = None
+ # session_id refers to either a Consul session ID or an Etcd lease object
+ self.session_id = None
+ self.i_am_leader = False
+ self.leader_id = None # will be the instance id of the current leader
+ self.shutting_down = False
+ self.leader = None
+ self.membership_callback = None
+
+ self.worker = Worker(self.instance_id, self)
+
+ self.host = consul.split(':')[0].strip()
+ self.port = int(consul.split(':')[1].strip())
+
+ # TODO need to handle reconnect events properly
+ self.consul = Consul(host=self.host, port=self.port)
+
+ # Create etcd client
+ kv_host = etcd.split(':')[0].strip()
+ kv_port = etcd.split(':')[1].strip()
+ self.etcd_url = u'http://' + kv_host + u':' + kv_port
+ self.etcd = Client(reactor, self.etcd_url)
+
+ self.wait_for_leader_deferreds = []
+
+ self.peers_mapping_queue = MessageQueue()
+
+ def start(self):
+ log.debug('starting')
+ reactor.callLater(0, self._async_init)
+ log.info('started')
+ return self
+
+ @inlineCallbacks
+ def stop(self):
+ log.debug('stopping')
+ self.shutting_down = True
+ yield self._delete_session() # this will delete the leader lock too
+ yield self.worker.stop()
+ if self.leader is not None:
+ yield self.leader.stop()
+ self.leader = None
+ log.info('stopped')
+
+ def wait_for_a_leader(self):
+ """
+ Async wait till a leader is detected/elected. The deferred will be
+ called with the leader's instance_id
+ :return: Deferred.
+ """
+ d = Deferred()
+ if self.leader_id is not None:
+ d.callback(self.leader_id)
+ return d
+ else:
+ self.wait_for_leader_deferreds.append(d)
+ return d
+
+ # Wait for a core data id to be assigned to this voltha instance
+ @inlineCallbacks
+ def get_core_store_id_and_prefix(self):
+ core_store_id = yield self.worker.get_core_store_id()
+ returnValue((core_store_id, self.core_store_prefix))
+
+ def recv_peers_map(self):
+ return self.peers_mapping_queue.get()
+
+ def publish_peers_map_change(self, msg):
+ self.peers_mapping_queue.put(msg)
+
+ # Proxy methods for etcd with retry support
+
+ def kv_get(self, *args, **kw):
+ # Intercept 'index' argument
+ for name, value in kw.items():
+ if name == 'index':
+ kw.pop('index')
+ break
+ return self._retry('GET', *args, **kw)
+
+ def kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ # Methods exposing key membership information
+
+ @inlineCallbacks
+ def get_members(self):
+ """Return list of all members"""
+ _, members = yield self.kv_get(self.membership_prefix, recurse=True)
+ returnValue([member['Key'][len(self.membership_prefix):]
+ for member in members])
+
+ # Private (internal) methods:
+
+ @inlineCallbacks
+ def _async_init(self):
+ yield self._create_session()
+ yield self._create_membership_record()
+ yield self._start_leader_tracking()
+ yield self.worker.start()
+
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ return asleep(wait_time)
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected-to-consul', after_retries=self.retries)
+ self.retries = 0
+
+ @inlineCallbacks
+ def _create_session(self):
+
+ @inlineCallbacks
+ def _create_session():
+ etcd = yield self.get_kv_client()
+ # Create etcd lease
+ self.lease = yield etcd.lease(self.session_time_to_live)
+ self.session_id = self.lease
+ log.info('created-etcd-lease', lease=self.session_id)
+ self._start_session_tracking()
+
+ yield self._retry(_create_session)
+
+ @inlineCallbacks
+ def _delete_session(self):
+ try:
+ yield self.lease.revoke()
+ except Exception as e:
+ log.exception('failed-to-delete-session',
+ session_id=self.session_id)
+
+ @inlineCallbacks
+ def _create_membership_record(self):
+ yield self._do_create_membership_record_with_retries()
+ reactor.callLater(0, self._maintain_membership_record)
+
+ @inlineCallbacks
+ def _maintain_membership_record(self):
+ try:
+ while 1:
+ valid_membership = yield self._assert_membership_record_valid()
+ if not valid_membership:
+ log.info('recreating-membership-before',
+ session=self.session_id)
+ yield self._do_create_membership_record_with_retries()
+ log.info('recreating-membership-after',
+ session=self.session_id)
+ else:
+ log.debug('valid-membership', session=self.session_id)
+ # Async sleep before checking the membership record again
+ yield asleep(self.membership_maintenance_loop_delay)
+
+ except Exception, e:
+ log.exception('unexpected-error-leader-trackin', e=e)
+ finally:
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.membership_watch_relatch_delay,
+ self._maintain_membership_record)
+
+ def _create_membership_record_data(self):
+ member_record = dict()
+ member_record['status'] = 'alive'
+ member_record['host_address'] = self.external_host_address
+ return member_record
+
+ @inlineCallbacks
+ def _assert_membership_record_valid(self):
+ try:
+ log.info('membership-record-before')
+ is_timeout, (_, record) = yield \
+ self.consul_get_with_timeout(
+ key=self.membership_record_key,
+ index=0,
+ timeout=5)
+ if is_timeout:
+ returnValue(False)
+
+ log.info('membership-record-after', record=record)
+ if record is None or \
+ 'Session' not in record:
+ log.info('membership-record-change-detected',
+ old_session=self.session_id,
+ record=record)
+ returnValue(False)
+ else:
+ returnValue(True)
+ except Exception as e:
+ log.exception('membership-validation-exception', e=e)
+ returnValue(False)
+
+ @inlineCallbacks
+ def _do_create_membership_record_with_retries(self):
+ while 1:
+ log.info('recreating-membership', session=self.session_id)
+ result = yield self._retry(
+ 'PUT',
+ self.membership_record_key,
+ dumps(self._create_membership_record_data()),
+ acquire=self.session_id)
+ if result:
+ log.info('new-membership-record-created',
+ session=self.session_id)
+ break
+ else:
+ log.warn('cannot-create-membership-record')
+ yield self._backoff('stale-membership-record')
+
+ def _start_session_tracking(self):
+ reactor.callLater(0, self._session_tracking_loop)
+
+ @inlineCallbacks
+ def _session_tracking_loop(self):
+
+ @inlineCallbacks
+ def _redo_session():
+ log.info('_redo_session-before')
+ yield self._delete_session()
+
+ # Create a new etcd connection/session with a new lease
+ try:
+ self.etcd = Client(reactor, self.etcd_url)
+ self.lease = yield self.etcd.lease(self.session_time_to_live)
+ self.session_id = self.lease
+ log.info('new-etcd-session', session=self.session_id)
+
+ except Exception as e:
+ log.exception('could-not-create-an-etcd-lease', e=e)
+
+ @inlineCallbacks
+ def _renew_session(m_callback):
+ try:
+ time_left = yield self.lease.remaining()
+ log.info('_renew_session', time_left=time_left)
+ result = yield self.lease.refresh()
+ log.info('just-renewed-session', result=result)
+ if not m_callback.called:
+ # Triggering callback will cancel the timeout timer
+ log.info('trigger-callback-to-cancel-timeout-timer')
+ m_callback.callback(result)
+ else:
+ # Timeout event has already been called. Just ignore
+ # this event
+ log.info('renew-called-after-timeout, etcd ref changed?')
+ except Exception, e:
+ # Let the invoking method receive a timeout
+ log.exception('could-not-renew-session', e=e)
+
+ try:
+ while 1:
+ log.debug('session-tracking-start')
+ rcvd = DeferredWithTimeout(
+ timeout=self.session_renewal_timeout)
+ _renew_session(rcvd)
+ try:
+ _ = yield rcvd
+ except TimeOutError as e:
+ log.info('session-renew-timeout', e=e)
+ # Redo the session
+ yield _redo_session()
+ except Exception as e:
+ log.exception('session-renew-exception', e=e)
+ else:
+ log.debug('successfully-renewed-session')
+
+ # Async sleep before the next session tracking
+ yield asleep(self.session_renewal_loop_delay)
+
+ except Exception as e:
+ log.exception('renew-exception', e=e)
+ finally:
+ reactor.callLater(self.session_renewal_loop_delay,
+ self._session_tracking_loop)
+
+ def _start_leader_tracking(self):
+ reactor.callLater(0, self._leadership_tracking_loop)
+
+ @inlineCallbacks
+ def _leadership_tracking_loop(self):
+ log.info('leadership-attempt-before')
+
+ # Try to acquire leadership lease via test-and-set operation.
+ # Success means the leader key was previously absent and was
+ # just re-created by this instance.
+ leader_prefix = bytes(self.leader_prefix)
+ txn = Transaction(
+ compare=[
+ CompVersion(leader_prefix, '==', 0)
+ ],
+ success=[
+ OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
+ OpGet(leader_prefix)
+ ],
+ failure=[]
+ )
+ newly_asserted = False
+ try:
+ result = yield self.etcd.submit(txn)
+ except Failed as failed:
+ log.info('Leader key PRESENT')
+ for response in failed.responses:
+ log.info('Leader key already present', response=response)
+ else:
+ newly_asserted = True
+ log.info('Leader key ABSENT')
+ for response in result.responses:
+ log.info('Leader key was absent', response=response)
+
+ log.info('leadership-attempt-after')
+
+ # Confirm that the assertion succeeded by reading back the value
+ # of the leader key.
+ leader = None
+ result = yield self.etcd.get(b'service/voltha/leader')
+ if result.kvs:
+ kv = result.kvs[0]
+ leader = kv.value
+ log.info('Leader readback', leader=leader, instance=self.instance_id)
+
+ if leader is None:
+ log.info('Failed to read leader key')
+ elif leader == self.instance_id:
+ if newly_asserted:
+ log.info("I JUST BECAME LEADER!")
+ yield self._assert_leadership()
+ else:
+ log.info("I'm an aging LEADER")
+ else:
+ log.info('The LEADER is another', leader=leader)
+ yield self._assert_nonleadership(leader)
+
+ # May have to add code here to handle case where, for some reason, the lease
+ # had been blown away and the txn failed for that reason
+
+ # except in shutdown, the loop must continue (after a short delay)
+ if not self.shutting_down:
+ reactor.callLater(self.tracking_loop_delay,
+ self._leadership_tracking_loop)
+
+ @inlineCallbacks
+ def _assert_leadership(self):
+ """(Re-)assert leadership"""
+ if not self.i_am_leader:
+ self.i_am_leader = True
+ self._set_leader_id(self.instance_id)
+ yield self._just_gained_leadership()
+
+ @inlineCallbacks
+ def _assert_nonleadership(self, leader_id):
+ """(Re-)assert non-leader role"""
+
+ # update leader_id anyway
+ self._set_leader_id(leader_id)
+
+ if self.i_am_leader:
+ self.i_am_leader = False
+ yield self._just_lost_leadership()
+
+ def _set_leader_id(self, leader_id):
+ self.leader_id = leader_id
+ deferreds, self.wait_for_leader_deferreds = \
+ self.wait_for_leader_deferreds, []
+ for d in deferreds:
+ d.callback(leader_id)
+
+ def _just_gained_leadership(self):
+ log.info('became-leader')
+ self.leader = Leader(self)
+ return self.leader.start()
+
+ def _just_lost_leadership(self):
+ log.info('lost-leadership')
+ return self._halt_leader()
+
+ def _halt_leader(self):
+ if self.leader:
+ d = self.leader.stop()
+ self.leader = None
+ return d
+
+ def get_kv_client(self):
+ return self.etcd
+
+ @inlineCallbacks
+ def _retry(self, operation, *args, **kw):
+ prefix = False
+ for name, value in kw.items():
+ if name == 'acquire':
+ lease = value
+ kw['lease'] = lease
+ kw.pop('acquire')
+ elif name == 'keys':
+ kw['keys_only'] = True
+ kw.pop('keys')
+ elif name=='recurse':
+# if value == 'True':
+ prefix = True
+ keyset = KeySet(bytes(args[0]), prefix=True)
+ kw.pop('recurse')
+ log.info('start-op', operation=operation, args=args, kw=kw)
+
+ while 1:
+ try:
+ etcd = yield self.get_kv_client()
+ if operation == 'GET':
+ key = bytes(args[0])
+ # If multiple keys requested, return a list
+ # else return a single record
+ if not prefix:
+ index = 0
+ record = dict()
+ res = yield etcd.get(key, **kw)
+ if res.kvs:
+ if len(res.kvs) == 1:
+ kv = res.kvs[0]
+ index = kv.mod_revision
+ record['Key'] = kv.key
+ record['Value'] = kv.value
+ record['ModifyIndex'] = index
+ record['Session'] = self.lease.lease_id if self.lease else ''
+ result = (index, record)
+ else:
+ # Get values for all keys that match the prefix
+ index = 0
+ records = []
+ res = yield etcd.get(keyset, **kw)
+ if args[0] == 'service/voltha/assignments/':
+ log.info('assignments', result=res)
+ if res.kvs and len(res.kvs) > 0:
+ for kv in res.kvs:
+ # Which index should be returned? The max over all keys?
+ if kv.mod_revision > index:
+ index = kv.mod_revision
+ rec = dict()
+ rec['Key'] = kv.key
+ rec['Value'] = kv.value
+ rec['ModifyIndex'] = kv.mod_revision
+ rec['Session'] = self.lease.lease_id if self.lease else ''
+ records.append(rec)
+ result = (index, records)
+ elif operation == 'PUT':
+ key = bytes(args[0])
+ result = yield etcd.set(key, args[1], **kw)
+ elif operation == 'DELETE':
+ key = bytes(args[0])
+ result = yield etcd.delete(key, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = yield operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ if not self.shutting_down:
+ log.exception(e)
+ yield self._backoff('unknown-error')
+
+ log.info('end-op', operation=operation, args=args, kw=kw, result=result)
+ returnValue(result)
+
+ @inlineCallbacks
+ def consul_get_with_timeout(self, key, timeout, **kw):
+ """
+ Query etcd with a timeout
+ :param key: Key to query
+ :param timeout: timeout value
+ :param kw: additional key-value params
+ :return: (is_timeout, (index, result)).
+
+ The Consul version of this method performed a 'wait-type' get operation
+ that returned a result when the key's value had a ModifyIndex greater
+ than the 'index' argument. Not sure etcd supports this functionality.
+ """
+
+ # Intercept 'index' argument
+ for name, value in kw.items():
+ if name == 'index':
+ mod_revision = value
+ log.info('consul_get_with_timeout', index=mod_revision)
+ kw.pop('index')
+ break
+
+ @inlineCallbacks
+ def _get(key, m_callback):
+ try:
+ (index, result) = yield self._retry('GET', key, **kw)
+ if index > mod_revision and not m_callback.called:
+ log.debug('got-result-cancelling-timer')
+ m_callback.callback((index, result))
+ except Exception as e:
+ log.exception('got-exception', e=e)
+
+ try:
+ rcvd = DeferredWithTimeout(timeout=timeout)
+ _get(key, rcvd)
+ try:
+ result = yield rcvd
+ log.debug('result-received', result=result)
+ returnValue((False, result))
+ except TimeOutError as e:
+ log.debug('timeout-or-no-data-change', key=key)
+ except Exception as e:
+ log.exception('exception', e=e)
+ except Exception as e:
+ log.exception('exception', e=e)
+
+ returnValue((True, (None, None)))
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 1360375..71eeddb 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -17,6 +17,7 @@
from requests import ConnectionError
from twisted.internet.defer import inlineCallbacks, returnValue
+import etcd3
import structlog
log = structlog.get_logger()
@@ -141,6 +142,120 @@
return result
+class EtcdStore(object):
+ """ Config kv store for etcd with a cache for quicker subsequent reads
+
+ TODO: This will block the reactor. Should either change
+ whole call stack to yield or put the put/delete transactions into a
+ queue to write later with twisted. Will need a transaction
+ log to ensure we don't lose anything.
+ Making the whole callstack yield is troublesome because other tasks can
+ come in on the side and start modifying things which could be bad.
+ """
+
+ CONNECT_RETRY_INTERVAL_SEC = 1
+ RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+ def __init__(self, host, port, path_prefix):
+ self._etcd = etcd3.client(host=host, port=port)
+ self.host = host
+ self.port = port
+ self._path_prefix = path_prefix
+ self._cache = {}
+ self.retries = 0
+
+ def make_path(self, key):
+ return '{}/{}'.format(self._path_prefix, key)
+
+ def __getitem__(self, key):
+ if key in self._cache:
+ return self._cache[key]
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return value
+ else:
+ raise KeyError(key)
+
+ def __contains__(self, key):
+ if key in self._cache:
+ return True
+ (value, meta) = self._kv_get(self.make_path(key))
+ if value is not None:
+ self._cache[key] = value
+ return True
+ else:
+ return False
+
+ def __setitem__(self, key, value):
+ try:
+ assert isinstance(value, basestring)
+ self._cache[key] = value
+ self._kv_put(self.make_path(key), value)
+ except Exception, e:
+ log.exception('cannot-set-item', e=e)
+
+ def __delitem__(self, key):
+ self._cache.pop(key, None)
+ self._kv_delete(self.make_path(key))
+
+ @inlineCallbacks
+ def _backoff(self, msg):
+ wait_time = self.RETRY_BACKOFF[min(self.retries,
+ len(self.RETRY_BACKOFF) - 1)]
+ self.retries += 1
+ log.error(msg, retry_in=wait_time)
+ yield asleep(wait_time)
+
+ def _redo_etcd_connection(self):
+ self._etcd = etcd3.client(host=self.host, port=self.port)
+ self._cache.clear()
+
+ def _clear_backoff(self):
+ if self.retries:
+ log.info('reconnected-to-etcd', after_retries=self.retries)
+ self.retries = 0
+
+ def _get_etcd(self):
+ return self._etcd
+
+ # Proxy methods for etcd with retry support
+ def _kv_get(self, *args, **kw):
+ return self._retry('GET', *args, **kw)
+
+ def _kv_put(self, *args, **kw):
+ return self._retry('PUT', *args, **kw)
+
+ def _kv_delete(self, *args, **kw):
+ return self._retry('DELETE', *args, **kw)
+
+ def _retry(self, operation, *args, **kw):
+ log.info('backend-op', operation=operation, args=args, kw=kw)
+ while 1:
+ try:
+ etcd = self._get_etcd()
+ log.debug('etcd', etcd=etcd, operation=operation,
+ args=args)
+ if operation == 'GET':
+ (value, meta) = etcd.get(*args, **kw)
+ result = (value, meta)
+ elif operation == 'PUT':
+ result = etcd.put(*args, **kw)
+ elif operation == 'DELETE':
+ result = etcd.delete(*args, **kw)
+ else:
+ # Default case - consider operation as a function call
+ result = operation(*args, **kw)
+ self._clear_backoff()
+ break
+ except Exception, e:
+ log.exception(e)
+ self._backoff('unknown-error-with-etcd')
+ self._redo_etcd_connection()
+
+ return result
+
+
def load_backend(store_id, store_prefix, args):
""" Return the kv store backend based on the command line arguments
"""
@@ -151,9 +266,16 @@
host, port = args.consul.split(':', 1)
return ConsulStore(host, int(port), instance_core_store_prefix)
+ def load_etcd_store():
+ instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+ host, port = args.etcd.split(':', 1)
+ return EtcdStore(host, int(port), instance_core_store_prefix)
+
loaders = {
'none': lambda: None,
- 'consul': load_consul_store
+ 'consul': load_consul_store,
+ 'etcd': load_etcd_store
}
return loaders[args.backend]()
diff --git a/voltha/main.py b/voltha/main.py
index 27b29f9..de2acfc 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -36,6 +36,7 @@
get_my_primary_local_ipv4
from voltha.adapters.loader import AdapterLoader
from voltha.coordinator import Coordinator
+from voltha.coordinator_etcd import CoordinatorEtcd
from voltha.core.core import VolthaCore
from voltha.core.config.config_backend import load_backend
from voltha.northbound.diagnostics import Diagnostics
@@ -52,6 +53,7 @@
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
+ etcd=os.environ.get('ETCD', 'localhost:2379'),
inter_core_subnet=os.environ.get('INTER_CORE_SUBNET', None),
pon_subnet=os.environ.get('PON_SUBNET', None),
external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
@@ -87,6 +89,11 @@
default=defs['consul'],
help=_help)
+ _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+ parser.add_argument(
+ '-e', '--etcd', dest='etcd', action='store',
+ default=defs['etcd'],
+ help=_help)
_help = ('<inter_core_subnet> is the subnet connecting all the voltha '
'instances in a cluster (default: %s)' % defs['inter_core_subnet'])
@@ -215,7 +222,7 @@
_help = 'backend to use for config persitence'
parser.add_argument('-b', '--backend',
default=defs['backend'],
- choices=['none', 'consul'],
+ choices=['none', 'consul', 'etcd'],
help=_help)
args = parser.parse_args()
@@ -321,20 +328,34 @@
internal_host=self.args.internal_host_address,
external_host=self.args.external_host_address,
interface=self.args.interface,
- consul=self.args.consul)
+ consul=self.args.consul,
+ etcd=self.args.etcd)
registry.register('main', self)
- yield registry.register(
- 'coordinator',
- Coordinator(
- internal_host_address=self.args.internal_host_address,
- external_host_address=self.args.external_host_address,
- rest_port=self.args.rest_port,
- instance_id=self.instance_id,
- config=self.config,
- consul=self.args.consul)
- ).start()
+ if self.args.backend == 'consul':
+ yield registry.register(
+ 'coordinator',
+ Coordinator(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.instance_id,
+ config=self.config,
+ consul=self.args.consul)
+ ).start()
+ elif self.args.backend == 'etcd':
+ yield registry.register(
+ 'coordinator',
+ CoordinatorEtcd(
+ internal_host_address=self.args.internal_host_address,
+ external_host_address=self.args.external_host_address,
+ rest_port=self.args.rest_port,
+ instance_id=self.instance_id,
+ config=self.config,
+ consul=self.args.consul,
+ etcd=self.args.etcd)
+ ).start()
self.log.info('waiting-for-config-assignment')