blob: 9379e2da31e96ffb9e918b2f952c3d4ed6f78dee [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Some docker related convenience functions
"""
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import os
import socket
from structlog import get_logger
from docker import Client, errors
docker_socket = os.environ.get('DOCKER_SOCK', 'unix://tmp/docker.sock')
log = get_logger()
def remove_container(id, force=True):
try:
docker_cli = Client(base_url=docker_socket)
containers = docker_cli.remove_container(id, force=force)
except Exception, e:
log.exception('failed', e=e)
raise
return containers
def get_my_containers_name():
"""
Return the docker containers name in which this process is running.
To look up the container name, we use the container ID extracted from the
$HOSTNAME environment variable (which is set by docker conventions).
:return: String with the docker container name (or None if any issue is
encountered)
"""
my_container_id = os.environ.get('HOSTNAME', None)
try:
docker_cli = Client(base_url=docker_socket)
info = docker_cli.inspect_container(my_container_id)
except Exception, e:
log.exception('failed', my_container_id=my_container_id, e=e)
raise
name = info['Name'].lstrip('/')
return name
def get_all_running_containers():
try:
docker_cli = Client(base_url=docker_socket)
containers = docker_cli.containers()
except Exception, e:
log.exception('failed', e=e)
raise
return containers
def inspect_container(id):
try:
docker_cli = Client(base_url=docker_socket)
info = docker_cli.inspect_container(id)
except Exception, e:
log.debug('failed: {}'.format(e.message))
raise
return info
def create_host_config(volumes, ports):
try:
port_bindings = ports
binds = ['{0}:{1}'.format(k, v) for k, v in volumes.iteritems()]
docker_cli = Client(base_url=docker_socket)
host_config = docker_cli.create_host_config(binds=binds,
port_bindings=port_bindings)
except Exception, e:
log.exception('failed host config creation', volumes, ports, e=e)
raise
return host_config
def connect_container_to_network(container, net_id, links):
try:
docker_cli = Client(base_url=docker_socket)
docker_cli.connect_container_to_network(container, net_id, links=links)
except:
log.exception('Failed to connect container {} to network {}'.format(container, net_id))
raise
def create_networking_config(name, links):
"""
Creates a container networks based on a set of containers.
:param name: the network name
:param links: the set of containers to link
:return: a network configuration
"""
try:
docker_cli = Client(base_url=docker_socket)
networking_config = docker_cli.create_networking_config({
name : docker_cli.create_endpoint_config(links=links)
})
except Exception, e:
log.exception('failed network creation', name, e=e)
raise
return networking_config
def stop_container(container, timeout=10):
try:
docker_cli = Client(base_url=docker_socket)
docker_cli.stop(container, timeout=timeout)
except Exception, e:
log.exception('failed', e=e)
raise
def create_container(args):
try:
docker_cli = Client(base_url=docker_socket)
container = docker_cli.create_container(**args)
except Exception, e:
log.exception('failed', e=e)
raise
return container
def start_container(container):
"""
Starts a requested container with the appropriate configuration.
:param args: contains arguments for container creation
(see https://docker-py.readthedocs.io/en/stable/api/#create_container)
:return: the containers name
"""
try:
docker_cli = Client(base_url=docker_socket)
response = docker_cli.start(container=container.get('Id'))
except Exception, e:
log.exception('failed', e=e)
raise
return response
class EventProcessor(object):
"""
This class handles the api session and allows for it to
be terminated.
"""
def __init__(self, threads=1):
self.client = CustomClient(base_url=docker_socket)
self.events = self.client.events(decode=True)
log.info("Starting event processor with {} threads".format(threads))
self.exec_service = ThreadPoolExecutor(max_workers=threads)
def stop_listening(self):
"""
Shuts down the socket.
:return: None
"""
if self.events is not None:
sock = self.client._get_raw_response_socket(self.events.response)
sock.shutdown(socket.SHUT_RDWR)
def listen_for_events(self, handlers):
"""
Listens to the docker event stream and applies the functions
in the passed handler to each event.
docker containers can report the following events:
attach, commit, copy, create, destroy, detach, die,
exec_create, exec_detach, exec_start, export,
health_status, kill, oom, pause, rename, resize,
restart, start, stop, top, unpause, update
:param handlers: a dict of functions
:return: None
"""
if not handlers or len(handlers) == 0:
raise ValueError("Handlers cannot be empty")
for event in self.events:
for k in ['time', 'Time']:
if k in event:
event[k] = datetime.fromtimestamp(event[k])
log.debug('docker event: {}'.format(event))
data = {}
i = get_id(event)
if i is not None:
try:
if 'from' in event or 'From' in event:
data = self.client.inspect_container(i)
else:
data = self.client.inspect_image(i)
data[i] = data
except errors.NotFound:
log.debug('No data for container {}'.format(i))
status = get_status(event)
if status in handlers:
self.exec_service.submit(handlers[get_status(event)], event,
data, handlers['podder_config'])
else:
log.debug("No handler for {}; skipping...".format(status))
class CustomGenerator(object):
"""
This is a custom ugly class that allows for the generator
to be (kind of) cleanly closed.
"""
def __init__(self, stream, response, decode):
self.stream = stream
self.response = response
self.decode = decode
def __iter__(self):
for item in super(CustomClient, self.stream).\
_stream_helper(self.response, self.decode):
yield item
class CustomClient(Client):
def _stream_helper(self, response, decode=False):
return CustomGenerator(self, response, decode)
def get_status(event):
for k in ['status', 'Status']:
if k in event:
return event[k]
def get_id(event):
for k in ['id', 'ID', 'Id']:
if k in event:
return event[k]