Membership ephemeral entry and initial leader election

Also:
- env.sh to load what's needed for dev
- refactored main
- async consul client instead of blocking
diff --git a/Dockerfile b/Dockerfile
index 1a9ecc4..03537b8 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -27,9 +27,9 @@
 COPY requirements.txt /tmp/requirements.txt
 
 # Install app dependencies
-RUN apk add build-base gcc abuild binutils python-dev libffi-dev openssl-dev && \
+RUN apk add build-base gcc abuild binutils python-dev libffi-dev openssl-dev git && \
     pip install -r /tmp/requirements.txt && \
-    apk del --purge build-base gcc abuild binutils python-dev libffi-dev openssl-dev
+    apk del --purge build-base gcc abuild binutils python-dev libffi-dev openssl-dev git
 
 # Bundle app source
 COPY voltha /voltha
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index d06aea3..3f7fb0f 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -10,9 +10,9 @@
     - "8300:8300"
     - "8400:8400"
     - "8500:8500"
-    - "8600:53/udp"
+    - "8600:8600/udp"
     environment:
-      SERVICE_53_IGNORE: "yes"
+      #SERVICE_53_IGNORE: "yes"
       SERVICE_8300_IGNORE: "yes"
       SERVICE_8400_IGNORE: "yes"
       SERVICE_8500_NAME: "consul-rest"
@@ -21,7 +21,12 @@
   #
   registrator:
     image: gliderlabs/registrator:latest
-    command: -ip=10.0.2.15 -retry-attempts 100 -internal consul://consul:8500
+    command: [
+      "-ip=${DOCKER_HOST_IP}",
+      "-retry-attempts", "100",
+      # "-internal",
+      "consul://consul:8500"
+    ]
     links:
     - consul
     volumes:
@@ -35,6 +40,8 @@
     - "24224:24224"
     volumes:
     - "/tmp/fluentd:/fluentd/log"
+    environment:
+      SERVICE_24224_NAME: "fluentd-intake"
   #
   # Single-node voltha
   #
@@ -45,7 +52,9 @@
       "-v",
       "--consul=consul:8500",
       "--fluentd=fluentd:24224",
-      "--rest-port=8880"
+      "--rest-port=8880",
+      "--instance-id-is-container-name",
+      "-v"
     ]
     ports:
     - 8880
@@ -59,17 +68,20 @@
       SERVICE_8880_CHECK_HTTP: "/health"
       SERVICE_8880_CHECK_INTERVAL: "5s"
       SERVICE_8880_CHECK_TIMEOUT: "1s"
+      IMAGE_NAME: "{{DCOKER_IMAGE_NAME}}"
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
   #
   # Test container to see services available
   #
-  test:
-    image: busybox
-    command: tail -F /dev/null
-    depends_on:
-    - consul
-    - voltha
-    links:
-    - consul
-    - registrator
-    - fluentd
-    - voltha
+#  test:
+#    image: busybox
+#    command: tail -F /dev/null
+#    depends_on:
+#    - consul
+#    - voltha
+#    links:
+#    - consul
+#    - registrator
+#    - fluentd
+#    - voltha
diff --git a/env.sh b/env.sh
new file mode 100644
index 0000000..19c99fa
--- /dev/null
+++ b/env.sh
@@ -0,0 +1,9 @@
+# sourcing this file is needed to make local development and integration testing work
+
+# load local python virtualenv 
+. venv/bin/activate
+
+
+# assign DOCKER_HOST_IP to be the main ip address of this host
+export DOCKER_HOST_IP=$(python voltha/nethelpers.py)
+
diff --git a/requirements.txt b/requirements.txt
index 3d312e2..d3643a6 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,7 @@
 argparse==1.2.1
 colorama>=0.2.5
 decorator>=3.4.0
+docker-py
 flake8>=2.5.1
 fluent-logger>=0.4.3
 klein>=15.3.1
@@ -12,7 +13,6 @@
 pyflakes>=1.0.0
 pylint>=1.5.2
 pyOpenSSL>=0.13
-python-consul>=0.6.1
 PyYAML>=3.10
 requests
 scapy>=2.3.2
@@ -20,5 +20,10 @@
 simplejson>=3.8.1
 six>=1.10.0
 structlog>=16.1.0
+treq>=15.1.0
 Twisted>=13.2.0
 urllib3>=1.7.1
+
+# python-consul>=0.6.1  we need the pre-released version for now, because 0.6.1 does not
+# yet support Twisted. Once this is released, it will be the 0.6.2 version
+git+git://github.com/cablehead/python-consul.git
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 1fa2b18..bd55049 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -23,11 +23,13 @@
 # was cut before twisted support was added. So keep an eye on when 0.6.2 comes out and
 # move over to the twisted interface once it's available.
 
-from consul import Consul, ConsulException
+from consul import Check, ConsulException
+from consul.twisted import Consul
 from requests import ConnectionError
 from structlog import get_logger
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.task import LoopingCall
 
 from asleep import asleep
 
@@ -50,6 +52,9 @@
         self.external_host_address = external_host_address
         self.rest_port = rest_port
 
+        self.session_id = None
+        self.i_am_leader = False
+
         self.log = get_logger()
         self.log.info('initializing-coordinator')
 
@@ -61,9 +66,15 @@
         self.log.info('initialized-coordinator')
 
     @inlineCallbacks
+    def shutdown(self):
+        yield self.delete_session()
+
+    @inlineCallbacks
     def async_init(self):
-        yield self.kv_put('voltha/instances/%s/status' % self.instance_id, 'up')
-        yield self.register()
+        # yield self.kv_put('voltha/instances/%s/status' % self.instance_id, 'up')
+        yield self.create_session()
+        yield self.create_membership_record()
+        yield self.elect_leader()
 
     def backoff(self, msg):
         wait_time = self.RETRY_BACKOFF[min(self.retries, len(self.RETRY_BACKOFF) - 1)]
@@ -80,9 +91,10 @@
     def kv_put(self, key, value, retry=True):
         while 1:
             try:
-                self.consul.kv.put(key, value)
+                response = yield self.consul.kv.put(key, value)
                 self.clear_backoff()
-                break
+                returnValue(response)
+
             except ConsulException, e:
                 if retry:
                     yield self.backoff('Consul not yet up')
@@ -101,30 +113,67 @@
                     raise e
 
     @inlineCallbacks
-    def register(self, retry=True):
+    def create_session(self):
+
+        @inlineCallbacks
+        def _renew_session():
+            try:
+                result = yield self.consul.session.renew(session_id=self.session_id)
+                self.log.debug('just renewed session', result=result)
+            except Exception, e:
+                self.log.exception('could-not-renew-session', e=e)
+
+        @inlineCallbacks
+        def _create_session():
+
+            # create consul session
+            self.session_id = yield self.consul.session.create(behavior='delete', ttl=10)
+            self.log.info('created-consul-session', session_id=self.session_id)
+
+            # start renewing session it 3 times within the ttl
+            lc = LoopingCall(_renew_session)
+            lc.start(3)
+
+        yield self._retry(_create_session)
+
+    @inlineCallbacks
+    def delete_session(self):
+        yield self.consul.session.destroy(self.session_id)
+
+    @inlineCallbacks
+    def create_membership_record(self):
+        # create ephemeral k/v registering this instance in the service/voltha/members/<instance-id> node
+        result = yield self.consul.kv.put('service/voltha/members/%s' % self.instance_id, 'alive',
+                                          acquire=self.session_id)
+        assert result is True
+
+    @inlineCallbacks
+    def elect_leader(self):
+        """Attempt to become the leader by acquiring the leader key and track the leader anyway"""
+
+        # attempt acquire leader lock
+        result = yield self.consul.kv.put('service/voltha/leader', self.instance_id,
+                                          acquire=self.session_id)
+
+        # read it back before being too happy; seeing our session id is a proof and now we have
+        # the change id that we can use to reliably track any changes
+
+        # TODO continue from here !!!
+        if result is True:
+            self.i_am_leader = True
+
+    @inlineCallbacks
+    def _retry(self, func, *args, **kw):
         while 1:
             try:
-                kw = dict(
-                    name='voltha-%s' % self.instance_id,
-                    address=self.internal_host_address,
-                    port=self.rest_port
-                )
-                self.consul.agent.service.register(**kw)
-                self.log.info('registered-with-consul', **kw)
+                result = yield func(*args, **kw)
                 break
             except ConsulException, e:
-                if retry:
-                    yield self.backoff('Consul not yet up')
-                else:
-                    raise e
+                yield self.backoff('Consul not yet up')
             except ConnectionError, e:
-                if retry:
-                    yield self.backoff('Cannot connect to consul agent')
-                else:
-                    raise e
+                yield self.backoff('Cannot connect to consul agent')
             except Exception, e:
                 self.log.exception(e)
-                if retry:
-                    yield self.backoff('Unknown error')
-                else:
-                    raise e
+                yield self.backoff('Unknown error')
+
+        returnValue(result)
diff --git a/voltha/dockerhelpers.py b/voltha/dockerhelpers.py
new file mode 100644
index 0000000..ee6cd7a
--- /dev/null
+++ b/voltha/dockerhelpers.py
@@ -0,0 +1,49 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Some docker related convenience functions
+"""
+
+import os
+from structlog import get_logger
+
+from docker import Client
+
+
+log = get_logger()
+
+
+def get_my_containers_name():
+    """
+    Return the docker containers name in which this process is running.
+    To look up the container name, we use the container ID extracted from the $HOSTNAME
+    environment variable (which is set by docker conventions).
+    :return: String with the docker container name (or None if any issue is encountered)
+    """
+    my_container_id = os.environ.get('HOSTNAME', None)
+
+    try:
+        docker_cli = Client(base_url='unix://tmp/docker.sock')
+        info = docker_cli.inspect_container(my_container_id)
+
+    except Exception, e:
+        log.exception('failed', my_container_id=my_container_id, e=e)
+        raise
+
+    name = info['Name'].lstrip('/')
+
+    return name
diff --git a/voltha/main.py b/voltha/main.py
index 188e6aa..a85fb2f 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -21,11 +21,14 @@
 import os
 import time
 import yaml
+from twisted.internet.defer import inlineCallbacks
 
 from structlog_setup import setup_logging
 from coordinator import Coordinator
 from northbound.rest.health_check import init_rest_service
 from nethelpers import get_my_primary_interface, get_my_primary_local_ipv4
+from dockerhelpers import get_my_containers_name
+
 
 defs = dict(
     consul=os.environ.get('CONSUL', 'localhost:8500'),
@@ -93,7 +96,18 @@
     parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False,
                         help='enable verbose logging')
 
-    return parser.parse_args()
+    parser.add_argument('--instance-id-is-container-name', dest='instance_id_is_container_name', action='store_true',
+                        default=False, help='use docker container name as voltha instance id'
+                        ' (overrides -i/--instance-id option)')
+
+    args = parser.parse_args()
+
+    # post-processing
+
+    if args.instance_id_is_container_name:
+        args.instance_id = get_my_containers_name()
+
+    return args
 
 
 def load_config(args):
@@ -107,7 +121,7 @@
     return config
 
 
-def print_banner(args, log):
+def print_banner(log):
     log.info(' _    ______  __  ________  _____ ')
     log.info('| |  / / __ \/ / /_  __/ / / /   |')
     log.info('| | / / / / / /   / / / /_/ / /| |')
@@ -116,61 +130,62 @@
     log.info('(to stop: press Ctrl-C)')
 
 
-def startup(log, args, config):
-    log.info('starting-internal-services')
-    coordinator = Coordinator(
-        internal_host_address=args.internal_host_address,
-        external_host_address=args.external_host_address,
-        rest_port=args.rest_port,
-        instance_id=args.instance_id,
-        consul=args.consul)
-    init_rest_service(args.rest_port)
-    log.info('started-internal-services')
+class Main(object):
 
+    def __init__(self):
+        self.args = args = parse_args()
+        self.config = load_config(args)
+        self.log = setup_logging(self.config.get('logging', {}), args.instance_id, fluentd=args.fluentd)
 
-def cleanup(log):
-    """Execute before the reactor is shut down"""
-    log.info('exiting-on-keyboard-interrupt')
+        # components
+        self.coordinator = None
 
+        if not args.no_banner:
+            print_banner(self.log)
 
-def start_reactor(args, log):
-    from twisted.internet import reactor
-    reactor.callWhenRunning(lambda: log.info('twisted-reactor-started'))
-    reactor.addSystemEventTrigger('before', 'shutdown', lambda: cleanup(log))
-    reactor.run()
+        if not args.no_heartbeat:
+            self.start_heartbeat()
 
+        self.startup_components()
 
-def start_heartbeat(log):
+    def start(self):
+        self.start_reactor()  # will not return except Keyboard interrupt
 
-    t0 = time.time()
-    t0s = time.ctime(t0)
+    def startup_components(self):
+        self.log.info('starting-internal-components')
+        self.coordinator = Coordinator(
+            internal_host_address=self.args.internal_host_address,
+            external_host_address=self.args.external_host_address,
+            rest_port=self.args.rest_port,
+            instance_id=self.args.instance_id,
+            consul=self.args.consul)
+        init_rest_service(self.args.rest_port)
+        self.log.info('started-internal-services')
 
-    def heartbeat():
-        log.info(status='up', since=t0s, uptime=time.time() - t0)
+    @inlineCallbacks
+    def shutdown_components(self):
+        """Execute before the reactor is shut down"""
+        self.log.info('exiting-on-keyboard-interrupt')
+        yield self.coordinator.shutdown()
 
-    from twisted.internet.task import LoopingCall
-    lc = LoopingCall(heartbeat)
-    lc.start(10)
+    def start_reactor(self):
+        from twisted.internet import reactor
+        reactor.callWhenRunning(lambda: self.log.info('twisted-reactor-started'))
+        reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown_components)
+        reactor.run()
 
+    def start_heartbeat(self):
 
-def main():
+        t0 = time.time()
+        t0s = time.ctime(t0)
 
-    args = parse_args()
+        def heartbeat():
+            self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
 
-    config = load_config(args)
-
-    log = setup_logging(config.get('logging', {}), fluentd=args.fluentd)
-
-    if not args.no_banner:
-        print_banner(args, log)
-
-    if not args.no_heartbeat:
-        start_heartbeat(log)
-
-    startup(log, args, config)
-
-    start_reactor(args, log)  # will not return except Keyboard interrupt
+        from twisted.internet.task import LoopingCall
+        lc = LoopingCall(heartbeat)
+        lc.start(10)
 
 
 if __name__ == '__main__':
-    main()
+    Main().start()
diff --git a/voltha/nethelpers.py b/voltha/nethelpers.py
index 09f73c1..2fa7bd5 100644
--- a/voltha/nethelpers.py
+++ b/voltha/nethelpers.py
@@ -39,3 +39,7 @@
     addresses = ni.ifaddresses(iface_name)
     ipv4 = addresses[AF_INET][0]['addr']
     return ipv4
+
+
+if __name__ == '__main__':
+    print get_my_primary_local_ipv4()
diff --git a/voltha/northbound/rest/health_check.py b/voltha/northbound/rest/health_check.py
index ea5bff9..5038e30 100644
--- a/voltha/northbound/rest/health_check.py
+++ b/voltha/northbound/rest/health_check.py
@@ -18,6 +18,7 @@
 """ Rest API to check health of Voltha instance """
 
 from klein import Klein
+from structlog import get_logger
 from twisted.internet import endpoints
 from twisted.internet import reactor
 from twisted.web.server import Site
@@ -26,10 +27,12 @@
 class HealthCheck(object):
 
     app = Klein()
+    log = get_logger()
 
     @app.route('/health')
     def health_check(self, request):
         # TODO this is just a placeholder, very crude health check
+        self.log.debug("health-check-received")
         return '{"status": "ok"}'
 
     def get_site(self):
diff --git a/voltha/structlog_setup.py b/voltha/structlog_setup.py
index 53bfc72..299b0ac 100644
--- a/voltha/structlog_setup.py
+++ b/voltha/structlog_setup.py
@@ -57,7 +57,7 @@
             del _repr_running[call_key]
 
 
-def setup_logging(log_config, fluentd=None):
+def setup_logging(log_config, instance_id, fluentd=None):
     """
     Set up logging such that:
     - The primary logging entry method is structlog (see http://structlog.readthedocs.io/en/stable/index.html)
@@ -66,11 +66,14 @@
       bridge to a fluent logging agent.
     """
 
-    def add_exc_info_flag_for_exception(logger, name, event_dict):
+    def add_exc_info_flag_for_exception(_, name, event_dict):
         if name == 'exception':
             event_dict['exc_info'] = True
         return event_dict
 
+    def add_instance_id(_, __, event_dict):
+        event_dict['instance_id'] = instance_id
+        return event_dict
     # if fluentd is specified, we need to override the config data with its host and port info
     if fluentd is not None:
         fluentd_host = fluentd.split(':')[0].strip()
@@ -90,6 +93,7 @@
         add_exc_info_flag_for_exception,
         structlog.processors.StackInfoRenderer(),
         structlog.processors.format_exc_info,
+        add_instance_id,
         FluentRenderer(),
     ]
     structlog.configure(logger_factory=structlog.stdlib.LoggerFactory(),