[7697]
VOL-582: Update the Voltha core to support either consul or etcd in a kubernetes cluster

This is an initial commit for etcd support by voltha.

Although this code works on a single-node or on a kubernetes cluster, it is NOT ready
for prime time. The integration tests have yet to be written and a kubernetes install
is not yet available.

Change-Id: I666a29a0a68cd7c2ab3c79aff8ccc6e96db96419
diff --git a/compose/docker-compose-system-test-with-etcd.yml b/compose/docker-compose-system-test-with-etcd.yml
new file mode 100644
index 0000000..a02a9cf
--- /dev/null
+++ b/compose/docker-compose-system-test-with-etcd.yml
@@ -0,0 +1,340 @@
+version: '2'
+services:
+  #
+  # Single-node zookeeper service
+  #
+  zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+    - 2181
+    environment:
+      SERVICE_2181_NAME: "zookeeper"
+  #
+  # Single-node kafka service
+  #
+  kafka:
+    image: wurstmeister/kafka
+    ports:
+     - 9092
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
+      SERVICE_9092_NAME: "kafka"
+    depends_on:
+    - consul
+    volumes:
+      - /var/run/docker.sock:/var/run/docker.sock
+  #
+  # Single-node consul agent
+  #
+  consul:
+    image: consul:latest
+    command: agent -server -bootstrap -client 0.0.0.0 -ui
+    ports:
+    - "8300:8300"
+    - "8400:8400"
+    - "8500:8500"
+    - "8600:8600/udp"
+    environment:
+      #SERVICE_53_IGNORE: "yes"
+      SERVICE_8300_IGNORE: "yes"
+      SERVICE_8400_IGNORE: "yes"
+      SERVICE_8500_NAME: "consul-rest"
+  #
+  # Single-node etcd server
+  #
+  etcd:
+    image: quay.io/coreos/etcd:v3.2.9
+    command: [
+      "etcd",
+      "--name=etcd0",
+      "--advertise-client-urls=http://${DOCKER_HOST_IP}:2379,http://${DOCKER_HOST_IP}:4001",
+      "--listen-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001",
+      "--initial-advertise-peer-urls=http://${DOCKER_HOST_IP}:2380",
+      "--listen-peer-urls=http://0.0.0.0:2380",
+      "--initial-cluster-token=etcd-cluster-1",
+      "--initial-cluster=etcd0=http://${DOCKER_HOST_IP}:2380",
+      "--initial-cluster-state=new"
+    ]
+    ports:
+    - "2379:2379"
+    - 2380
+    - 4001
+  #
+  # Registrator
+  #
+  registrator:
+    image: gliderlabs/registrator:latest
+    command: [
+      "-ip=${DOCKER_HOST_IP}",
+      "-retry-attempts", "100",
+      "-cleanup",
+      # "-internal",
+      "consul://consul:8500"
+    ]
+    links:
+    - consul
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+
+  #
+  # Fluentd log server
+  #
+  fluentd:
+    image: fluent/fluentd
+    ports:
+    - "24224:24224"
+    volumes:
+    - "/tmp/fluentd:/fluentd/log"
+    environment:
+      SERVICE_24224_NAME: "fluentd-intake"
+
+  #
+  # Graphite-Grafana-statsd service instance
+  # (demo place-holder for external KPI system)
+  #
+  grafana:
+    image: voltha/grafana
+    ports:
+    - "8883:80"
+    - "2003:2003"
+    - "2004:2004"
+    - "8126:8126"
+    - "8125:8125/udp"
+    environment:
+      SERVICE_80_NAME:   "grafana-web-ui"
+      SERVICE_2003_NAME: "carbon-plain-text-intake"
+      SERVICE_2004_NAME: "carbon-pickle-intake"
+      SERVICE_8126_NAME: "statsd-tcp-intake"
+      SERVICE_8125_NAME: "statsd-udp-intake"
+      GR_SERVER_ROOT_URL: "http://localhost:80/grafana/"
+
+  #
+  # Shovel (Kafka-graphite-gateway)
+  #
+  shovel:
+    image: cord/shovel
+    command: [
+      "/shovel/shovel/main.py",
+      "--kafka=@kafka",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--topic=voltha.kpis",
+      "--host=${DOCKER_HOST_IP}"
+    ]
+    depends_on:
+    - consul
+    - kafka
+    - grafana
+    restart: unless-stopped
+
+  #
+  # Voltha server instance(s)
+  #
+  voltha:
+    image: cord/voltha
+    command: [
+      "/voltha/voltha/main.py",
+      "-v",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--etcd=${DOCKER_HOST_IP}:2379",
+      "--fluentd=fluentd:24224",
+      "--rest-port=8880",
+      "--grpc-port=50556",
+      "--kafka=@kafka",
+      "--instance-id-is-container-name",
+      "--interface=eth1",
+      "--backend=etcd",
+      "-v"
+    ]
+    ports:
+    - 8880
+    - 50556
+    - 18880
+    - "60001:60001"
+    depends_on:
+    - consul
+    - etcd
+    links:
+    - consul
+    - etcd
+    - fluentd
+    environment:
+      SERVICE_8880_NAME: "voltha-health"
+      SERVICE_8880_CHECK_HTTP: "/health"
+      SERVICE_8880_CHECK_INTERVAL: "5s"
+      SERVICE_8880_CHECK_TIMEOUT: "1s"
+      SERVICE_18880_NAME: "voltha-sim-rest"
+      SERVICE_HOST_IP: "${DOCKER_HOST_IP}"
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    networks:
+    - default
+    - ponmgmt
+
+  envoy:
+    image: voltha/envoy
+    entrypoint:
+      - /usr/local/bin/envoyd
+      - -envoy-cfg-template
+      - "/envoy/voltha-grpc-proxy.template.json"
+      - -envoy-config
+      - "/envoy/voltha-grpc-proxy.json"
+      - -kv
+      - "etcd"
+      - -kv-svc-name
+      - "etcd"
+      - -kv-port
+      - "2379"
+
+
+    ports:
+      - "50555:50555"
+      - "8882:8882"
+      - "8443:8443"
+      - "8001:8001"
+    environment:
+      SERVICE_50555_NAME: "voltha-grpc"
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    networks:
+    - default
+    - ponmgmt
+    links:
+    - voltha:vcore
+  #
+  # Voltha cli container
+  #
+  vcli:
+    image: cord/vcli
+    command: [
+      "/cli/cli/setup.sh",
+      "-L",
+      "-G"
+    ]
+    environment:
+      DOCKER_HOST_IP: "${DOCKER_HOST_IP}"
+    ports:
+    - "5022:22"
+    depends_on:
+    - voltha
+
+#############################################
+# Item below this line will soon be removed.#
+#############################################
+
+  #
+  # ofagent server instance
+  #
+  ofagent:
+    image: cord/ofagent
+    command: [
+      "/ofagent/ofagent/main.py",
+      "-v",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--fluentd=fluentd:24224",
+      "--controller=${DOCKER_HOST_IP}:6653",
+      "--grpc-endpoint=@voltha-grpc",
+      "--instance-id-is-container-name",
+      "--enable-tls",
+      "--key-file=/ofagent/pki/voltha.key",
+      "--cert-file=/ofagent/pki/voltha.crt",
+      "-v"
+    ]
+    depends_on:
+    - consul
+    - voltha
+    links:
+    - consul
+    - fluentd
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    restart: unless-stopped
+
+  #
+  # Netconf server instance(s)
+  #
+  netconf:
+    image: cord/netconf
+    privileged: true
+    command: [
+      "/netconf/netconf/main.py",
+      "-v",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--fluentd=fluentd:24224",
+      "--grpc-endpoint=@voltha-grpc",
+      "--instance-id-is-container-name",
+      "-v"
+    ]
+    ports:
+    - "830:1830"
+    depends_on:
+    - consul
+    - voltha
+    links:
+    - consul
+    - fluentd
+    environment:
+      SERVICE_1830_NAME: "netconf-server"
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+
+  #
+  # Dashboard daemon
+  #
+  dashd:
+    image: cord/dashd
+    command: [
+      "/dashd/dashd/main.py",
+      "--kafka=@kafka",
+      "--consul=${DOCKER_HOST_IP}:8500",
+      "--grafana_url=http://admin:admin@${DOCKER_HOST_IP}:8883/api",
+      "--topic=voltha.kpis",
+      "--docker_host=${DOCKER_HOST_IP}"
+    ]
+    depends_on:
+    - consul
+    - kafka
+    - grafana
+    restart: unless-stopped
+
+  #
+  # Nginx service consolidation
+  #
+  nginx:
+    image: voltha/nginx
+    ports:
+    - "80:80"
+    environment:
+      CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+    command: [
+      "/nginx_config/start_service.sh"
+    ]
+    depends_on:
+    - consul
+    - grafana
+    - portainer
+    restart: unless-stopped
+
+  #
+  # Docker ui
+  #
+  portainer:
+    image: voltha/portainer
+    ports:
+    - "9000:9000"
+    environment:
+      CONSUL_ADDR: "${DOCKER_HOST_IP}:8500"
+    restart: unless-stopped
+    entrypoint: ["/portainer", "--logo", "/docker/images/logo_alt.png"]
+    volumes:
+    - "/var/run/docker.sock:/var/run/docker.sock"
+
+networks:
+  default:
+    driver: bridge
+  ponmgmt:
+    driver: bridge
+    driver_opts:
+      com.docker.network.bridge.name: "ponmgmt"
diff --git a/requirements.txt b/requirements.txt
index 545f400..b3c8b86 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -42,6 +42,7 @@
 termcolor==1.1.0
 treq==17.8.0
 Twisted==17.9.0
+txaioetcd==0.3.0
 urllib3==1.22
 pyang==1.7.3
 lxml==3.6.4
@@ -52,6 +53,7 @@
 ncclient==0.5.3
 xmltodict==0.11.0
 dicttoxml==1.7.4
+etcd3==0.7.0
 
 # python-consul>=0.6.1  we need the pre-released version for now, because 0.6.1 does not
 # yet support Twisted. Once this is released, it will be the 0.6.2 version
diff --git a/voltha/coordinator_etcd.py b/voltha/coordinator_etcd.py
new file mode 100644
index 0000000..238223f
--- /dev/null
+++ b/voltha/coordinator_etcd.py
@@ -0,0 +1,621 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Etcd-based coordinator services """
+
+from consul import ConsulException
+from consul.twisted import Consul
+from requests import ConnectionError
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed
+from zope.interface import implementer
+
+from leader import Leader
+from common.utils.asleep import asleep
+from common.utils.message_queue import MessageQueue
+from voltha.registry import IComponent
+from worker import Worker
+from simplejson import dumps, loads
+from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+log = get_logger()
+
+
+class StaleMembershipEntryException(Exception):
+    pass
+
+
+@implementer(IComponent)
+class CoordinatorEtcd(object):
+    """
+    An app shall instantiate only one Coordinator (singleton).
+    A single instance of this object shall take care of all external
+    with consul, and via consul, all coordination activities with its
+    clustered peers. Roles include:
+    - registering an ephemeral membership entry (k/v record) in consul
+    - participating in a symmetric leader election, and potentially assuming
+      the leader's role. What leadership entails is not a concern for the
+      coordination, it simply instantiates (and shuts down) a leader class
+      when it gains (or looses) leadership.
+    """
+
+    CONNECT_RETRY_INTERVAL_SEC = 1
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+    # Public methods:
+
+    def __init__(self,
+                 internal_host_address,
+                 external_host_address,
+                 instance_id,
+                 rest_port,
+                 config,
+                 consul='localhost:8500',
+                 etcd='localhost:2379'):
+
+        log.info('initializing-coordinator')
+        self.config = config['coordinator']
+        self.worker_config = config['worker']
+        self.leader_config = config['leader']
+        self.membership_watch_relatch_delay = config.get(
+            'membership_watch_relatch_delay', 0.1)
+        self.tracking_loop_delay = self.config.get(
+            'tracking_loop_delay', 1)
+        self.session_renewal_timeout = self.config.get(
+            'session_renewal_timeout', 5)
+        self.session_renewal_loop_delay = self.config.get(
+            'session_renewal_loop_delay', 3)
+        self.membership_maintenance_loop_delay = self.config.get(
+            'membership_maintenance_loop_delay', 5)
+        self.session_time_to_live = self.config.get(
+            'session_time_to_live', 10)
+        self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
+        self.leader_prefix = '/'.join((self.prefix, self.config.get(
+            self.config['leader_key'], 'leader')))
+        self.membership_prefix = '/'.join((self.prefix, self.config.get(
+            self.config['membership_key'], 'members'), ''))
+        self.assignment_prefix = '/'.join((self.prefix, self.config.get(
+            self.config['assignment_key'], 'assignments'), ''))
+        self.workload_prefix = '/'.join((self.prefix, self.config.get(
+            self.config['workload_key'], 'work'), ''))
+        self.core_store_prefix = '/'.join((self.prefix, self.config.get(
+            self.config['core_store_key'], 'data/core')))
+        self.core_store_assignment_key = self.core_store_prefix + \
+                                         '/assignment'
+        self.core_storage_suffix = 'core_store'
+
+        self.retries = 0
+        self.instance_id = instance_id
+        self.internal_host_address = internal_host_address
+        self.external_host_address = external_host_address
+        self.rest_port = rest_port
+        self.membership_record_key = self.membership_prefix + self.instance_id
+
+        self.lease = None
+        # session_id refers to either a Consul session ID or an Etcd lease object
+        self.session_id = None
+        self.i_am_leader = False
+        self.leader_id = None  # will be the instance id of the current leader
+        self.shutting_down = False
+        self.leader = None
+        self.membership_callback = None
+
+        self.worker = Worker(self.instance_id, self)
+
+        self.host = consul.split(':')[0].strip()
+        self.port = int(consul.split(':')[1].strip())
+
+        # TODO need to handle reconnect events properly
+        self.consul = Consul(host=self.host, port=self.port)
+
+        # Create etcd client
+        kv_host = etcd.split(':')[0].strip()
+        kv_port = etcd.split(':')[1].strip()
+        self.etcd_url = u'http://' + kv_host + u':' + kv_port
+        self.etcd = Client(reactor, self.etcd_url)
+
+        self.wait_for_leader_deferreds = []
+
+        self.peers_mapping_queue = MessageQueue()
+
+    def start(self):
+        log.debug('starting')
+        reactor.callLater(0, self._async_init)
+        log.info('started')
+        return self
+
+    @inlineCallbacks
+    def stop(self):
+        log.debug('stopping')
+        self.shutting_down = True
+        yield self._delete_session()  # this will delete the leader lock too
+        yield self.worker.stop()
+        if self.leader is not None:
+            yield self.leader.stop()
+            self.leader = None
+        log.info('stopped')
+
+    def wait_for_a_leader(self):
+        """
+        Async wait till a leader is detected/elected. The deferred will be
+        called with the leader's instance_id
+        :return: Deferred.
+        """
+        d = Deferred()
+        if self.leader_id is not None:
+            d.callback(self.leader_id)
+            return d
+        else:
+            self.wait_for_leader_deferreds.append(d)
+            return d
+
+    # Wait for a core data id to be assigned to this voltha instance
+    @inlineCallbacks
+    def get_core_store_id_and_prefix(self):
+        core_store_id = yield self.worker.get_core_store_id()
+        returnValue((core_store_id, self.core_store_prefix))
+
+    def recv_peers_map(self):
+        return self.peers_mapping_queue.get()
+
+    def publish_peers_map_change(self, msg):
+        self.peers_mapping_queue.put(msg)
+
+    # Proxy methods for etcd with retry support
+
+    def kv_get(self, *args, **kw):
+        # Intercept 'index' argument
+        for name, value in kw.items():
+            if name == 'index':
+                kw.pop('index')
+                break
+        return self._retry('GET', *args, **kw)
+
+    def kv_put(self, *args, **kw):
+        return self._retry('PUT', *args, **kw)
+
+    def kv_delete(self, *args, **kw):
+        return self._retry('DELETE', *args, **kw)
+
+    # Methods exposing key membership information
+
+    @inlineCallbacks
+    def get_members(self):
+        """Return list of all members"""
+        _, members = yield self.kv_get(self.membership_prefix, recurse=True)
+        returnValue([member['Key'][len(self.membership_prefix):]
+                     for member in members])
+
+    # Private (internal) methods:
+
+    @inlineCallbacks
+    def _async_init(self):
+        yield self._create_session()
+        yield self._create_membership_record()
+        yield self._start_leader_tracking()
+        yield self.worker.start()
+
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        log.error(msg, retry_in=wait_time)
+        return asleep(wait_time)
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.info('reconnected-to-consul', after_retries=self.retries)
+            self.retries = 0
+
+    @inlineCallbacks
+    def _create_session(self):
+
+        @inlineCallbacks
+        def _create_session():
+            etcd = yield self.get_kv_client()
+            # Create etcd lease
+            self.lease = yield etcd.lease(self.session_time_to_live)
+            self.session_id = self.lease
+            log.info('created-etcd-lease', lease=self.session_id)
+            self._start_session_tracking()
+
+        yield self._retry(_create_session)
+
+    @inlineCallbacks
+    def _delete_session(self):
+        try:
+            yield self.lease.revoke()
+        except Exception as e:
+            log.exception('failed-to-delete-session',
+                          session_id=self.session_id)
+
+    @inlineCallbacks
+    def _create_membership_record(self):
+        yield self._do_create_membership_record_with_retries()
+        reactor.callLater(0, self._maintain_membership_record)
+
+    @inlineCallbacks
+    def _maintain_membership_record(self):
+        try:
+            while 1:
+                valid_membership = yield self._assert_membership_record_valid()
+                if not valid_membership:
+                    log.info('recreating-membership-before',
+                             session=self.session_id)
+                    yield self._do_create_membership_record_with_retries()
+                    log.info('recreating-membership-after',
+                             session=self.session_id)
+                else:
+                    log.debug('valid-membership', session=self.session_id)
+                # Async sleep before checking the membership record again
+                yield asleep(self.membership_maintenance_loop_delay)
+
+        except Exception, e:
+            log.exception('unexpected-error-leader-trackin', e=e)
+        finally:
+            # except in shutdown, the loop must continue (after a short delay)
+            if not self.shutting_down:
+                reactor.callLater(self.membership_watch_relatch_delay,
+                                  self._maintain_membership_record)
+
+    def _create_membership_record_data(self):
+        member_record = dict()
+        member_record['status'] = 'alive'
+        member_record['host_address'] = self.external_host_address
+        return member_record
+
+    @inlineCallbacks
+    def _assert_membership_record_valid(self):
+        try:
+            log.info('membership-record-before')
+            is_timeout, (_, record) = yield \
+                                        self.consul_get_with_timeout(
+                                                key=self.membership_record_key,
+                                                index=0,
+                                                timeout=5)
+            if is_timeout:
+                returnValue(False)
+
+            log.info('membership-record-after', record=record)
+            if record is None or \
+                            'Session' not in record:
+                log.info('membership-record-change-detected',
+                         old_session=self.session_id,
+                         record=record)
+                returnValue(False)
+            else:
+                returnValue(True)
+        except Exception as e:
+            log.exception('membership-validation-exception', e=e)
+            returnValue(False)
+
+    @inlineCallbacks
+    def _do_create_membership_record_with_retries(self):
+        while 1:
+            log.info('recreating-membership', session=self.session_id)
+            result = yield self._retry(
+                'PUT',
+                self.membership_record_key,
+                dumps(self._create_membership_record_data()),
+                acquire=self.session_id)
+            if result:
+                log.info('new-membership-record-created',
+                         session=self.session_id)
+                break
+            else:
+                log.warn('cannot-create-membership-record')
+                yield self._backoff('stale-membership-record')
+
+    def _start_session_tracking(self):
+        reactor.callLater(0, self._session_tracking_loop)
+
+    @inlineCallbacks
+    def _session_tracking_loop(self):
+
+        @inlineCallbacks
+        def _redo_session():
+            log.info('_redo_session-before')
+            yield self._delete_session()
+
+            # Create a new etcd connection/session with a new lease
+            try:
+                self.etcd = Client(reactor, self.etcd_url)
+                self.lease = yield self.etcd.lease(self.session_time_to_live)
+                self.session_id = self.lease
+                log.info('new-etcd-session', session=self.session_id)
+
+            except Exception as e:
+                log.exception('could-not-create-an-etcd-lease', e=e)
+
+        @inlineCallbacks
+        def _renew_session(m_callback):
+            try:
+                time_left = yield self.lease.remaining()
+                log.info('_renew_session', time_left=time_left)
+                result = yield self.lease.refresh()
+                log.info('just-renewed-session', result=result)
+                if not m_callback.called:
+                    # Triggering callback will cancel the timeout timer
+                    log.info('trigger-callback-to-cancel-timeout-timer')
+                    m_callback.callback(result)
+                else:
+                    # Timeout event has already been called.  Just ignore
+                    # this event
+                    log.info('renew-called-after-timeout, etcd ref changed?')
+            except Exception, e:
+                # Let the invoking method receive a timeout
+                log.exception('could-not-renew-session', e=e)
+
+        try:
+            while 1:
+                log.debug('session-tracking-start')
+                rcvd = DeferredWithTimeout(
+                    timeout=self.session_renewal_timeout)
+                _renew_session(rcvd)
+                try:
+                    _ = yield rcvd
+                except TimeOutError as e:
+                    log.info('session-renew-timeout', e=e)
+                    # Redo the session
+                    yield _redo_session()
+                except Exception as e:
+                    log.exception('session-renew-exception', e=e)
+                else:
+                    log.debug('successfully-renewed-session')
+
+                # Async sleep before the next session tracking
+                yield asleep(self.session_renewal_loop_delay)
+
+        except Exception as e:
+            log.exception('renew-exception', e=e)
+        finally:
+            reactor.callLater(self.session_renewal_loop_delay,
+                              self._session_tracking_loop)
+
+    def _start_leader_tracking(self):
+        reactor.callLater(0, self._leadership_tracking_loop)
+
+    @inlineCallbacks
+    def _leadership_tracking_loop(self):
+        log.info('leadership-attempt-before')
+            
+        # Try to acquire leadership lease via test-and-set operation.
+        # Success means the leader key was previously absent and was
+        # just re-created by this instance.
+        leader_prefix = bytes(self.leader_prefix)
+        txn = Transaction(
+            compare=[
+                CompVersion(leader_prefix, '==', 0)
+            ],
+            success=[
+                OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease),
+                OpGet(leader_prefix)
+            ],
+            failure=[]
+        )
+        newly_asserted = False
+        try:
+            result = yield self.etcd.submit(txn)
+        except Failed as failed:
+            log.info('Leader key PRESENT')
+            for response in failed.responses:
+                log.info('Leader key already present', response=response)
+        else:
+            newly_asserted = True
+            log.info('Leader key ABSENT')
+            for response in result.responses:
+                log.info('Leader key was absent', response=response)
+
+        log.info('leadership-attempt-after')
+
+        # Confirm that the assertion succeeded by reading back the value
+        # of the leader key.
+        leader = None
+        result = yield self.etcd.get(b'service/voltha/leader')
+        if result.kvs:
+            kv = result.kvs[0]
+            leader = kv.value
+            log.info('Leader readback', leader=leader, instance=self.instance_id)    
+
+        if leader is None:
+            log.info('Failed to read leader key')
+        elif leader == self.instance_id:
+            if newly_asserted:
+                log.info("I JUST BECAME LEADER!")
+                yield self._assert_leadership()
+            else:
+                log.info("I'm an aging LEADER")
+        else:
+            log.info('The LEADER is another', leader=leader)
+            yield self._assert_nonleadership(leader)
+
+        # May have to add code here to handle case where, for some reason, the lease
+        # had been blown away and the txn failed for that reason
+
+        # except in shutdown, the loop must continue (after a short delay)
+        if not self.shutting_down:
+            reactor.callLater(self.tracking_loop_delay,
+                              self._leadership_tracking_loop)
+
+    @inlineCallbacks
+    def _assert_leadership(self):
+        """(Re-)assert leadership"""
+        if not self.i_am_leader:
+            self.i_am_leader = True
+            self._set_leader_id(self.instance_id)
+            yield self._just_gained_leadership()
+
+    @inlineCallbacks
+    def _assert_nonleadership(self, leader_id):
+        """(Re-)assert non-leader role"""
+
+        # update leader_id anyway
+        self._set_leader_id(leader_id)
+
+        if self.i_am_leader:
+            self.i_am_leader = False
+            yield self._just_lost_leadership()
+
+    def _set_leader_id(self, leader_id):
+        self.leader_id = leader_id
+        deferreds, self.wait_for_leader_deferreds = \
+            self.wait_for_leader_deferreds, []
+        for d in deferreds:
+            d.callback(leader_id)
+
+    def _just_gained_leadership(self):
+        log.info('became-leader')
+        self.leader = Leader(self)
+        return self.leader.start()
+
+    def _just_lost_leadership(self):
+        log.info('lost-leadership')
+        return self._halt_leader()
+
+    def _halt_leader(self):
+        if self.leader:
+            d = self.leader.stop()
+            self.leader = None
+        return d
+
+    def get_kv_client(self):
+        return self.etcd
+
+    @inlineCallbacks
+    def _retry(self, operation, *args, **kw):
+        prefix = False
+        for name, value in kw.items():
+            if name == 'acquire':
+                lease = value
+                kw['lease'] = lease
+                kw.pop('acquire')
+            elif name == 'keys':
+                kw['keys_only'] = True
+                kw.pop('keys')
+            elif name=='recurse':
+#               if value == 'True':
+                prefix = True
+                keyset = KeySet(bytes(args[0]), prefix=True)
+                kw.pop('recurse')
+        log.info('start-op', operation=operation, args=args, kw=kw)
+
+        while 1:
+            try:
+                etcd = yield self.get_kv_client()
+                if operation == 'GET':
+                    key = bytes(args[0])
+                    # If multiple keys requested, return a list
+                    # else return a single record
+                    if not prefix:
+                        index = 0
+                        record = dict()
+                        res = yield etcd.get(key, **kw)
+                        if res.kvs:
+                            if len(res.kvs) == 1:
+                                kv = res.kvs[0]
+                                index = kv.mod_revision
+                                record['Key'] = kv.key
+                                record['Value'] = kv.value
+                                record['ModifyIndex'] = index
+                                record['Session'] = self.lease.lease_id if self.lease else ''
+                                result = (index, record)
+                    else:
+                        # Get values for all keys that match the prefix
+                        index = 0
+                        records = []
+                        res = yield etcd.get(keyset, **kw)
+                        if args[0] == 'service/voltha/assignments/':
+                            log.info('assignments', result=res)
+                        if res.kvs and len(res.kvs) > 0:
+                            for kv in res.kvs:
+                                # Which index should be returned? The max over all keys?
+                                if kv.mod_revision > index:
+                                    index = kv.mod_revision
+                                rec = dict()
+                                rec['Key'] = kv.key
+                                rec['Value'] = kv.value
+                                rec['ModifyIndex'] = kv.mod_revision
+                                rec['Session'] = self.lease.lease_id if self.lease else ''
+                                records.append(rec)
+                        result = (index, records)
+                elif operation == 'PUT':
+                    key = bytes(args[0])
+                    result = yield etcd.set(key, args[1], **kw)
+                elif operation == 'DELETE':
+                    key = bytes(args[0])
+                    result = yield etcd.delete(key, **kw)
+                else:
+                    # Default case - consider operation as a function call
+                    result = yield operation(*args, **kw)
+                self._clear_backoff()
+                break
+            except Exception, e:
+                if not self.shutting_down:
+                    log.exception(e)
+                yield self._backoff('unknown-error')
+
+        log.info('end-op', operation=operation, args=args, kw=kw, result=result)
+        returnValue(result)
+
+    @inlineCallbacks
+    def consul_get_with_timeout(self, key, timeout, **kw):
+        """
+        Query etcd with a timeout
+        :param key: Key to query
+        :param timeout: timeout value
+        :param kw: additional key-value params
+        :return: (is_timeout, (index, result)).
+
+        The Consul version of this method performed a 'wait-type' get operation
+        that returned a result when the key's value had a ModifyIndex greater
+        than the 'index' argument. Not sure etcd supports this functionality.
+        """
+
+        # Intercept 'index' argument
+        for name, value in kw.items():
+            if name == 'index':
+                mod_revision = value
+                log.info('consul_get_with_timeout', index=mod_revision)
+                kw.pop('index')
+                break
+
+        @inlineCallbacks
+        def _get(key, m_callback):
+            try:
+                (index, result) = yield self._retry('GET', key, **kw)
+                if index > mod_revision and not m_callback.called:
+                    log.debug('got-result-cancelling-timer')
+                    m_callback.callback((index, result))
+            except Exception as e:
+                log.exception('got-exception', e=e)
+
+        try:
+            rcvd = DeferredWithTimeout(timeout=timeout)
+            _get(key, rcvd)
+            try:
+                result = yield rcvd
+                log.debug('result-received', result=result)
+                returnValue((False, result))
+            except TimeOutError as e:
+                log.debug('timeout-or-no-data-change', key=key)
+            except Exception as e:
+                log.exception('exception', e=e)
+        except Exception as e:
+            log.exception('exception', e=e)
+
+        returnValue((True, (None, None)))
diff --git a/voltha/core/config/config_backend.py b/voltha/core/config/config_backend.py
index 1360375..71eeddb 100644
--- a/voltha/core/config/config_backend.py
+++ b/voltha/core/config/config_backend.py
@@ -17,6 +17,7 @@
 from requests import ConnectionError
 from twisted.internet.defer import inlineCallbacks, returnValue
 
+import etcd3
 import structlog
 
 log = structlog.get_logger()
@@ -141,6 +142,120 @@
         return result
 
 
+class EtcdStore(object):
+    """ Config kv store for etcd with a cache for quicker subsequent reads
+
+        TODO: This will block the reactor. Should either change
+        whole call stack to yield or put the put/delete transactions into a
+        queue to write later with twisted. Will need a transaction
+        log to ensure we don't lose anything.
+        Making the whole callstack yield is troublesome because other tasks can
+        come in on the side and start modifying things which could be bad.
+    """
+
+    CONNECT_RETRY_INTERVAL_SEC = 1
+    RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+    def __init__(self, host, port, path_prefix):
+        self._etcd = etcd3.client(host=host, port=port)
+        self.host = host
+        self.port = port
+        self._path_prefix = path_prefix
+        self._cache = {}
+        self.retries = 0
+
+    def make_path(self, key):
+        return '{}/{}'.format(self._path_prefix, key)
+
+    def __getitem__(self, key):
+        if key in self._cache:
+            return self._cache[key]
+        (value, meta) = self._kv_get(self.make_path(key))
+        if value is not None:
+            self._cache[key] = value
+            return value
+        else:
+            raise KeyError(key)
+
+    def __contains__(self, key):
+        if key in self._cache:
+            return True
+        (value, meta) = self._kv_get(self.make_path(key))
+        if value is not None:
+            self._cache[key] = value
+            return True
+        else:
+            return False
+
+    def __setitem__(self, key, value):
+        try:
+            assert isinstance(value, basestring)
+            self._cache[key] = value
+            self._kv_put(self.make_path(key), value)
+        except Exception, e:
+            log.exception('cannot-set-item', e=e)
+
+    def __delitem__(self, key):
+        self._cache.pop(key, None)
+        self._kv_delete(self.make_path(key))
+
+    @inlineCallbacks
+    def _backoff(self, msg):
+        wait_time = self.RETRY_BACKOFF[min(self.retries,
+                                           len(self.RETRY_BACKOFF) - 1)]
+        self.retries += 1
+        log.error(msg, retry_in=wait_time)
+        yield asleep(wait_time)
+
+    def _redo_etcd_connection(self):
+        self._etcd = etcd3.client(host=self.host, port=self.port)
+        self._cache.clear()
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.info('reconnected-to-etcd', after_retries=self.retries)
+            self.retries = 0
+
+    def _get_etcd(self):
+        return self._etcd
+
+    # Proxy methods for etcd with retry support
+    def _kv_get(self, *args, **kw):
+        return self._retry('GET', *args, **kw)
+
+    def _kv_put(self, *args, **kw):
+        return self._retry('PUT', *args, **kw)
+
+    def _kv_delete(self, *args, **kw):
+        return self._retry('DELETE', *args, **kw)
+
+    def _retry(self, operation, *args, **kw):
+        log.info('backend-op', operation=operation, args=args, kw=kw)
+        while 1:
+            try:
+                etcd = self._get_etcd()
+                log.debug('etcd', etcd=etcd, operation=operation,
+                        args=args)
+                if operation == 'GET':
+                    (value, meta) = etcd.get(*args, **kw)
+                    result = (value, meta)
+                elif operation == 'PUT':
+                    result = etcd.put(*args, **kw)
+                elif operation == 'DELETE':
+                    result = etcd.delete(*args, **kw)
+                else:
+                    # Default case - consider operation as a function call
+                    result = operation(*args, **kw)
+                self._clear_backoff()
+                break
+            except Exception, e:
+                log.exception(e)
+                self._backoff('unknown-error-with-etcd')
+            self._redo_etcd_connection()
+
+        return result
+
+
 def load_backend(store_id, store_prefix, args):
     """ Return the kv store backend based on the command line arguments
     """
@@ -151,9 +266,16 @@
         host, port = args.consul.split(':', 1)
         return ConsulStore(host, int(port), instance_core_store_prefix)
 
+    def load_etcd_store():
+        instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
+
+        host, port = args.etcd.split(':', 1)
+        return EtcdStore(host, int(port), instance_core_store_prefix)
+
     loaders = {
         'none': lambda: None,
-        'consul': load_consul_store
+        'consul': load_consul_store,
+        'etcd': load_etcd_store
     }
 
     return loaders[args.backend]()
diff --git a/voltha/main.py b/voltha/main.py
index 27b29f9..de2acfc 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -36,6 +36,7 @@
     get_my_primary_local_ipv4
 from voltha.adapters.loader import AdapterLoader
 from voltha.coordinator import Coordinator
+from voltha.coordinator_etcd import CoordinatorEtcd
 from voltha.core.core import VolthaCore
 from voltha.core.config.config_backend import load_backend
 from voltha.northbound.diagnostics import Diagnostics
@@ -52,6 +53,7 @@
 defs = dict(
     config=os.environ.get('CONFIG', './voltha.yml'),
     consul=os.environ.get('CONSUL', 'localhost:8500'),
+    etcd=os.environ.get('ETCD', 'localhost:2379'),
     inter_core_subnet=os.environ.get('INTER_CORE_SUBNET', None),
     pon_subnet=os.environ.get('PON_SUBNET', None),
     external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
@@ -87,6 +89,11 @@
         default=defs['consul'],
         help=_help)
 
+    _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+    parser.add_argument(
+        '-e', '--etcd', dest='etcd', action='store',
+        default=defs['etcd'],
+        help=_help)
 
     _help = ('<inter_core_subnet> is the subnet connecting all the voltha '
              'instances in a cluster (default: %s)' % defs['inter_core_subnet'])
@@ -215,7 +222,7 @@
     _help = 'backend to use for config persitence'
     parser.add_argument('-b', '--backend',
                         default=defs['backend'],
-                        choices=['none', 'consul'],
+                        choices=['none', 'consul', 'etcd'],
                         help=_help)
 
     args = parser.parse_args()
@@ -321,20 +328,34 @@
                           internal_host=self.args.internal_host_address,
                           external_host=self.args.external_host_address,
                           interface=self.args.interface,
-                          consul=self.args.consul)
+                          consul=self.args.consul,
+                          etcd=self.args.etcd)
 
             registry.register('main', self)
 
-            yield registry.register(
-                'coordinator',
-                Coordinator(
-                    internal_host_address=self.args.internal_host_address,
-                    external_host_address=self.args.external_host_address,
-                    rest_port=self.args.rest_port,
-                    instance_id=self.instance_id,
-                    config=self.config,
-                    consul=self.args.consul)
-            ).start()
+            if self.args.backend == 'consul':
+                yield registry.register(
+                    'coordinator',
+                    Coordinator(
+                        internal_host_address=self.args.internal_host_address,
+                        external_host_address=self.args.external_host_address,
+                        rest_port=self.args.rest_port,
+                        instance_id=self.instance_id,
+                        config=self.config,
+                        consul=self.args.consul)
+                ).start()
+            elif self.args.backend == 'etcd':
+                yield registry.register(
+                    'coordinator',
+                    CoordinatorEtcd(
+                        internal_host_address=self.args.internal_host_address,
+                        external_host_address=self.args.external_host_address,
+                        rest_port=self.args.rest_port,
+                        instance_id=self.instance_id,
+                        config=self.config,
+                        consul=self.args.consul,
+                        etcd=self.args.etcd)
+                ).start()
 
             self.log.info('waiting-for-config-assignment')