Podder now runs with an associated thread pool.
This allows podder to parallelize requests to docker.
Change-Id: Iacae80a63bcd68ef7b471d63dbb41b7314d3af20
diff --git a/common/utils/dockerhelpers.py b/common/utils/dockerhelpers.py
index 9708e9e..ffd3601 100644
--- a/common/utils/dockerhelpers.py
+++ b/common/utils/dockerhelpers.py
@@ -18,6 +18,7 @@
Some docker related convenience functions
"""
from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
import os
import socket
@@ -78,9 +79,8 @@
try:
docker_cli = Client(base_url=docker_socket)
info = docker_cli.inspect_container(id)
-
except Exception, e:
- log.exception('failed', e=e)
+ log.debug('failed: {}'.format(e.message))
raise
return info
@@ -162,9 +162,11 @@
This class handles the api session and allows for it to
be terminated.
"""
- def __init__(self):
+ def __init__(self, threads=1):
self.client = CustomClient(base_url=docker_socket)
self.events = self.client.events(decode=True)
+ log.info("Starting event processor with {} threads".format(threads))
+ self.exec_service = ThreadPoolExecutor(max_workers=threads)
def stop_listening(self):
"""
@@ -214,7 +216,8 @@
status = get_status(event)
if status in handlers:
- handlers[get_status(event)](event, data, handlers['podder_config'])
+ self.exec_service.submit(handlers[get_status(event)], event,
+ data, handlers['podder_config'])
else:
log.debug("No handler for {}; skipping...".format(status))
diff --git a/podder/handlers.py b/podder/handlers.py
index 2cafc7e..825cd60 100644
--- a/podder/handlers.py
+++ b/podder/handlers.py
@@ -15,6 +15,7 @@
#
from common.utils.dockerhelpers import create_host_config, create_container, start_container, create_networking_config, \
get_all_running_containers, inspect_container, remove_container
+from docker import errors
from structlog import get_logger
import yaml
@@ -77,7 +78,10 @@
def service_shutdown(service, instance_name, config):
containers = get_all_running_containers()
for container in containers:
- info = inspect_container(container['Id'])
+ try:
+ info = inspect_container(container['Id'])
+ except errors.NotFound, e:
+ continue
envs = info['Config']['Env']
for env in envs:
for name in env.split('='):
diff --git a/podder/main.py b/podder/main.py
index d02fc63..412ae58 100755
--- a/podder/main.py
+++ b/podder/main.py
@@ -35,7 +35,8 @@
instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
internal_host_address=os.environ.get('INTERNAL_HOST_ADDRESS',
get_my_primary_local_ipv4()),
- work_dir=os.environ.get('WORK_DIR', '/tmp/podder')
+ work_dir=os.environ.get('WORK_DIR', '/tmp/podder'),
+ threads=os.environ.get('PODDER_THREADS', 5)
)
def parse_args():
@@ -103,6 +104,14 @@
action='count',
help=_help)
+ _help = 'Number of events to handle in parallel'
+ parser.add_argument('-e', '--events-in-parallel',
+ dest='threads',
+ type=int,
+ default=defs['threads'],
+ action='store',
+ help=_help)
+
args = parser.parse_args()
diff --git a/podder/podder.py b/podder/podder.py
index 7cd166d..e3a2fe4 100644
--- a/podder/podder.py
+++ b/podder/podder.py
@@ -27,7 +27,7 @@
def __init__(self, args, slave_config):
self.log.info('Initializing Podder')
self.running = False
- self.events = EventProcessor()
+ self.events = EventProcessor(threads=args.threads)
self.handlers = { 'podder_config' : Template(slave_config) }
def run(self):