Podder now runs with an associated thread pool.

This allows podder to parallelize requests to docker.

Change-Id: Iacae80a63bcd68ef7b471d63dbb41b7314d3af20
diff --git a/common/utils/dockerhelpers.py b/common/utils/dockerhelpers.py
index 9708e9e..ffd3601 100644
--- a/common/utils/dockerhelpers.py
+++ b/common/utils/dockerhelpers.py
@@ -18,6 +18,7 @@
 Some docker related convenience functions
 """
 from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
 
 import os
 import socket
@@ -78,9 +79,8 @@
     try:
         docker_cli = Client(base_url=docker_socket)
         info = docker_cli.inspect_container(id)
-
     except Exception, e:
-        log.exception('failed', e=e)
+        log.debug('failed: {}'.format(e.message))
         raise
 
     return info
@@ -162,9 +162,11 @@
     This class handles the api session and allows for it to
     be terminated.
     """
-    def __init__(self):
+    def __init__(self, threads=1):
         self.client = CustomClient(base_url=docker_socket)
         self.events = self.client.events(decode=True)
+        log.info("Starting event processor with {} threads".format(threads))
+        self.exec_service = ThreadPoolExecutor(max_workers=threads)
 
     def stop_listening(self):
         """
@@ -214,7 +216,8 @@
 
             status = get_status(event)
             if status in handlers:
-                handlers[get_status(event)](event, data, handlers['podder_config'])
+                self.exec_service.submit(handlers[get_status(event)], event,
+                                         data, handlers['podder_config'])
             else:
                 log.debug("No handler for {}; skipping...".format(status))
 
diff --git a/podder/handlers.py b/podder/handlers.py
index 2cafc7e..825cd60 100644
--- a/podder/handlers.py
+++ b/podder/handlers.py
@@ -15,6 +15,7 @@
 #
 from common.utils.dockerhelpers import create_host_config, create_container, start_container, create_networking_config, \
     get_all_running_containers, inspect_container, remove_container
+from docker import errors
 
 from structlog import get_logger
 import yaml
@@ -77,7 +78,10 @@
 def service_shutdown(service, instance_name, config):
     containers = get_all_running_containers()
     for container in containers:
-        info = inspect_container(container['Id'])
+        try:
+            info = inspect_container(container['Id'])
+        except errors.NotFound, e:
+            continue
         envs = info['Config']['Env']
         for env in envs:
             for name in env.split('='):
diff --git a/podder/main.py b/podder/main.py
index d02fc63..412ae58 100755
--- a/podder/main.py
+++ b/podder/main.py
@@ -35,7 +35,8 @@
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
     internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
                                          get_my_primary_local_ipv4()),
-    work_dir=os.environ.get('WORK_DIR', '/tmp/podder')
+    work_dir=os.environ.get('WORK_DIR', '/tmp/podder'),
+    threads=os.environ.get('PODDER_THREADS', 5)
 )
 
 def parse_args():
@@ -103,6 +104,14 @@
                         action='count',
                         help=_help)
 
+    _help = 'Number of events to handle in parallel'
+    parser.add_argument('-e', '--events-in-parallel',
+                        dest='threads',
+                        type=int,
+                        default=defs['threads'],
+                        action='store',
+                        help=_help)
+
 
     args = parser.parse_args()
 
diff --git a/podder/podder.py b/podder/podder.py
index 7cd166d..e3a2fe4 100644
--- a/podder/podder.py
+++ b/podder/podder.py
@@ -27,7 +27,7 @@
     def __init__(self, args, slave_config):
         self.log.info('Initializing Podder')
         self.running = False
-        self.events = EventProcessor()
+        self.events = EventProcessor(threads=args.threads)
         self.handlers = { 'podder_config' : Template(slave_config) }
 
     def run(self):