Redesign of Podder.
Podder now only listens to events from the docker
api. Docker event api will use a callback mechanism
to take action when a container is started or stopped.
Adding Dockerfile for podder and an entry into the
compose file.
Change-Id: Ib5735078a69eab3af6076af94bc697ab3e82a239
diff --git a/podder/consul_mgr.py b/podder/consul_mgr.py
deleted file mode 100644
index 9187093..0000000
--- a/podder/consul_mgr.py
+++ /dev/null
@@ -1,117 +0,0 @@
-from consul.twisted import Consul
-from structlog import get_logger
-from twisted.internet import reactor
-from twisted.internet.defer import returnValue
-
-from common.utils.dockerhelpers import create_container_network, start_container, create_host_config
-
-containers = {
- 'chameleon' : {
- 'image' : 'cord/chameleon',
- 'command' : [ "/chameleon/main.py",
- "-v",
- "--consul=consul:8500",
- "--fluentd=fluentd:24224",
- "--rest-port=8881",
- "--grpc-endpoint=@voltha-grpc",
- "--instance-id-is-container-name",
- "-v"],
- 'ports' : [ 8881 ],
- 'depends_on' : [ "consul", "voltha" ],
- 'links' : { "consul" : "consul", "fluentd" : "fluentd" },
- 'environment' : ["SERVICE_8881_NAME=chamleon-rest"],
- 'volumes' : { '/var/run/docker.sock' : '/tmp/docker.sock' }
- }
-}
-
-class ConsulManager(object):
-
- log = get_logger()
-
- def __init__(self, arg):
- self.log.info('Initializing consul manager')
- self.running = False
- self.index = 0
- (host, port) = arg.split(':')
- self.conn = Consul(host=host, port=port)
-
- def run(self):
- if self.running:
- return
- self.running = True
-
- self.log.info('Running consul manager')
-
- reactor.callLater(0, self.provision_voltha_instances, None)
-
- reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
- returnValue(self)
-
-
- def shutdown(self):
- self.log.info('Shutting down consul manager')
- self.running = False
-
-
- def provision_voltha_instances(self, _):
- if not self.running:
- return
- # maintain index such that callbacks only happen is something has changed
- # timeout is default to 5m
- deferred = self.conn.catalog.service(wait='5s', service='voltha-grpc',
- index=self.index)
- deferred.addCallbacks(self.provision, self.fail)
- deferred.addBoth(self.provision_voltha_instances)
-
- def provision(self, result):
- (self.index, data) = result
- self.log.info('PROVISIONING {}'.format(data))
- for item in data:
- (service, id) = item['ServiceID'].split(':')[1].split('_')[1:3]
- self.log.info('got {} {}'.format(service, id))
- self.podProvisioned(service, id, item['ServiceTags'])
-
- def fail(self, err):
- self.log.info('Failure %s'.format(err))
-
- def start_containers(self, result, service, id, tags):
- self.log.info("wtf : {}".format(result))
- (_, done) = result
- self.log.info("result : {}, {}".format(done, type(done)))
- if done:
- return
- self.log.info('provisioning voltha instance {}, with tags {}'.format(id, tags))
- for tag in tags:
- if tag in containers:
- netcfg = self.create_network(id, tag)
- if self.create_container(id, tag, netcfg):
- self.markProvisioned(service, id)
-
-
- def create_network(self, id, tag):
- return create_container_network('podder_%s_%s' % (tag, id),
- containers[tag]['links'])
-
- def create_container(self, id, tag, netcfg):
- args = {}
- args['image'] = containers[tag]['image']
- args['networking_config'] = netcfg
- args['command'] = containers[tag]['command']
- args['ports'] = containers[tag]['ports']
- args['environment'] = containers[tag]['environment']
- args['volumes'] = containers[tag]['volumes'].keys()
- args['host_config'] = create_host_config(containers[tag]['volumes'],
- containers[tag]['ports'])
- args['name'] = 'podder_%s_%s' % (tag, id)
- start_container(args)
- #TODO check container is running
-
- return True
-
- def podProvisioned(self, service, id, tags):
- d = self.conn.kv.get('podder/%s/%s/state' % (service, id))
- d.addCallback(self.start_containers, service, id, tags)
- d.addErrback(lambda err: self.log.info("FAIL {}".format(err)))
-
- def markProvisioned(self, service, id):
- self.conn.kv.put('podder/%s/%s/state' % (service, id), "started")
diff --git a/podder/handlers.py b/podder/handlers.py
new file mode 100644
index 0000000..6b261a6
--- /dev/null
+++ b/podder/handlers.py
@@ -0,0 +1,139 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from common.utils.dockerhelpers import create_host_config, create_container, start_container, create_networking_config, \
+ get_all_running_containers, inspect_container, remove_container
+
+from structlog import get_logger
+
+log = get_logger()
+
+INSTANCE_ID_KEY = 'com.docker.compose.container-number'
+INSTANCE_NAME_KEY = 'name'
+
+
+def check(event):
+ return ('from' in event) and\
+ ('Actor' in event and 'Attributes' in event['Actor'] and\
+ INSTANCE_ID_KEY in event['Actor']['Attributes']) and\
+ ('Actor' in event and 'Attributes' in event['Actor'] and\
+ INSTANCE_NAME_KEY in event['Actor']['Attributes'])
+
+def get_entry(key, dico, mandatory = False, noneval=None):
+ if key in dico:
+ return dico[key]
+ if mandatory:
+ raise Exception('Key {} must be in container config'.format(key))
+ return noneval
+
+def obtain_network_name(data):
+ return data['NetworkSettings']['Networks'].keys()
+
+
+def create_network_config(network, links):
+ if links is None:
+ return None
+ # Assuming only one network exists....
+ return create_networking_config(network[0], { l : l for l in links})
+
+
+def process_value(value):
+ if value is None:
+ return None
+ if isinstance(value, dict):
+ return value
+ if isinstance(value, list):
+ retval = {}
+ for item in value:
+ if not isinstance(item, int) and ':' in item:
+ item_split = item.split(':')
+ retval[item_split[0]] = item_split[1]
+ else:
+ retval[item] = None
+ return retval
+ raise Exception('Cannot handle {}'.format(value))
+
+def construct_container_spec(config):
+ container_spec = {}
+ container_spec['image'] = get_entry('image', config, mandatory=True)
+ #TODO need to rewrite command to connect to right service instance
+ container_spec['command'] = get_entry('command', config, mandatory=True)
+ container_spec['environment'] = get_entry('environment', config, noneval={})
+ container_spec['ports'] = get_entry('ports', config)
+ container_spec['volumes'] = get_entry('volumes', config)
+ return container_spec
+
+def service_shutdown(service, instance_name, config):
+ containers = get_all_running_containers()
+ for container in containers:
+ info = inspect_container(container['Id'])
+ envs = info['Config']['Env']
+ for env in envs:
+ for name in env.split('='):
+ if name == instance_name:
+ log.info('Removing container {}'.format(container['Names']))
+ remove_container(container['Id'])
+
+def start_slaves(service, instance_name, instance_id, data, config):
+ if service not in config['services']:
+ log.debug('Unknown service {}'.format(service))
+ return
+ for slave in config['services'][service]['slaves']:
+ if slave not in config['slaves']:
+ log.debug('Unknown slave service {}'.format(slave))
+ continue
+ network = obtain_network_name(data)
+ netcfg = create_network_config(network, get_entry('links', config['slaves'][slave]))
+ container_spec = construct_container_spec(config['slaves'][slave])
+ container_spec['networking_config'] = netcfg
+ if 'volumes' in container_spec:
+ container_spec['host_config'] = create_host_config(
+ process_value(container_spec['volumes']),
+ process_value(container_spec['ports']))
+ container_spec['name'] = 'podder_%s_%s' % (slave, instance_id)
+
+ container_spec['environment']['PODDER_MASTER'] = instance_name
+
+ container = create_container(container_spec)
+ start_container(container)
+
+
+def stop_slaves(service, instance_name, instance_id, data, config):
+ log.info('Stopping slaves for {}'.format(instance_name))
+ if service in config['services']:
+ service_shutdown(service, instance_name, config)
+ else:
+ # handle slave shutdown; restart him
+ pass
+
+
+def handler_start(event, data, config):
+ if not check(event):
+ log.debug('event {} is invalid'.format(event) )
+ return
+ service = event['from']
+ instance_name = event['Actor']['Attributes'][INSTANCE_NAME_KEY]
+ instance_id = event['Actor']['Attributes'][INSTANCE_ID_KEY]
+ start_slaves(service, instance_name, instance_id, data, config)
+
+def handler_stop(event, data, config):
+ if not check(event):
+ log.debug('event {} is invalid'.format(event) )
+ return
+ service = event['from']
+ instance_name = event['Actor']['Attributes'][INSTANCE_NAME_KEY]
+ instance_id = event['Actor']['Attributes'][INSTANCE_ID_KEY]
+ stop_slaves(service, instance_name, instance_id, data, config)
+
diff --git a/podder/main.py b/podder/main.py
index 1da0b41..3f74446 100755
--- a/podder/main.py
+++ b/podder/main.py
@@ -19,13 +19,13 @@
import os
import yaml
-from twisted.internet.defer import inlineCallbacks
+from podder import Podder
from common.structlog_setup import setup_logging
from common.utils.nethelpers import get_my_primary_local_ipv4
-from consul_mgr import ConsulManager
defs = dict(
+ slaves=os.environ.get('SLAVES', './slaves.yml'),
config=os.environ.get('CONFIG', './podder.yml'),
consul=os.environ.get('CONSUL', 'localhost:8500'),
external_host_address=os.environ.get('EXTERNAL_HOST_ADDRESS',
@@ -51,6 +51,15 @@
default=defs['config'],
help=_help)
+ _help = ('Path to slaves configuration file (default %s).'
+ 'If relative, it is relative to main.py of podder.'
+ % defs['slaves'])
+ parser.add_argument('-s', '--slaves',
+ dest='slaves',
+ action='store',
+ default=defs['slaves'],
+ help=_help)
+
_help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
parser.add_argument(
'-C', '--consul', dest='consul', action='store',
@@ -101,8 +110,8 @@
return args
-def load_config(args):
- path = args.config
+def load_config(config):
+ path = config
if path.startswith('.'):
dir = os.path.dirname(os.path.abspath(__file__))
path = os.path.join(dir, path)
@@ -129,7 +138,8 @@
def __init__(self):
self.args = args = parse_args()
- self.config = load_config(args)
+ self.config = load_config(args.config)
+ self.slave_config = load_config(args.slaves)
verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
self.log = setup_logging(self.config.get('logging', {}),
@@ -142,33 +152,15 @@
if not args.no_banner:
print_banner(self.log)
+ def start(self):
self.startup_components()
- def start(self):
- self.start_reactor()
-
- @inlineCallbacks
def startup_components(self):
self.log.info('starting-internal-components')
args = self.args
- self.consul_manager = yield ConsulManager(args.consul).run()
+ self.podder = Podder(args, self.slave_config)
self.log.info('started-internal-components')
-
- @inlineCallbacks
- def shutdown_components(self):
- """Execute before the reactor is shut down"""
- self.log.info('exiting-on-keyboard-interrupt')
- if self.consul_manager is not None:
- yield self.consul_manager.shutdown()
-
- def start_reactor(self):
- from twisted.internet import reactor
- reactor.callWhenRunning(
- lambda: self.log.info('twisted-reactor-started'))
-
- reactor.addSystemEventTrigger('before', 'shutdown',
- self.shutdown_components)
- reactor.run()
+ self.podder.run()
if __name__ == '__main__':
diff --git a/podder/podder.py b/podder/podder.py
new file mode 100644
index 0000000..daa9e91
--- /dev/null
+++ b/podder/podder.py
@@ -0,0 +1,59 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from handlers import handler_start, handler_stop
+from structlog import get_logger
+
+from common.utils.dockerhelpers import EventProcessor
+
+class Podder(object):
+
+ log = get_logger()
+
+ def __init__(self, args, slave_config):
+ self.log.info('Initializing Podder')
+ self.running = False
+ self.events = EventProcessor()
+ self.handlers = { 'podder_config' : slave_config }
+
+ def run(self):
+ if self.running:
+ return
+ self.running = True
+
+ self.initialize()
+
+ def shutdown(self):
+ try:
+ self.events.stop_listening()
+ except:
+ self.log.info('Shutting down')
+
+ def initialize(self):
+ self.define_handlers()
+ while True:
+ try:
+ self.events.listen_for_events(self.handlers)
+ except KeyboardInterrupt:
+ self.shutdown()
+ break
+ except Exception, e:
+ self.log.info('Handler exception', e)
+
+ def define_handlers(self):
+ self.handlers['start'] = handler_start
+ self.handlers['stop'] = handler_stop
+
diff --git a/podder/slaves.yml b/podder/slaves.yml
new file mode 100644
index 0000000..2adc46b
--- /dev/null
+++ b/podder/slaves.yml
@@ -0,0 +1,51 @@
+services:
+ cord/voltha:
+ slaves: ["chameleon", "ofagent"]
+
+
+slaves:
+ chameleon:
+ image: cord/chameleon
+ command: [
+ "/chameleon/main.py",
+ "-v",
+ "--consul=consul:8500",
+ "--fluentd=fluentd:24224",
+ "--rest-port=8881",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "-v"
+ ]
+ ports:
+ - 8881
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ environment:
+ SERVICE_8881_NAME: "chameleon-rest"
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+
+ ofagent:
+ image: cord/ofagent
+ command: [
+ "/ofagent/main.py",
+ "-v",
+ "--consul=${DOCKER_HOST_IP}:8500",
+ "--fluentd=fluentd:24224",
+ "--controller=${DOCKER_HOST_IP}:6633",
+ "--grpc-endpoint=@voltha-grpc",
+ "--instance-id-is-container-name",
+ "-v"
+ ]
+ depends_on:
+ - consul
+ - voltha
+ links:
+ - consul
+ - fluentd
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"