blob: 918709371d3aad1985003b9602cad1e269efb9ae [file] [log] [blame]
from consul.twisted import Consul
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import returnValue
from common.utils.dockerhelpers import create_container_network, start_container, create_host_config
containers = {
'chameleon' : {
'image' : 'cord/chameleon',
'command' : [ "/chameleon/main.py",
"-v",
"--consul=consul:8500",
"--fluentd=fluentd:24224",
"--rest-port=8881",
"--grpc-endpoint=@voltha-grpc",
"--instance-id-is-container-name",
"-v"],
'ports' : [ 8881 ],
'depends_on' : [ "consul", "voltha" ],
'links' : { "consul" : "consul", "fluentd" : "fluentd" },
'environment' : ["SERVICE_8881_NAME=chamleon-rest"],
'volumes' : { '/var/run/docker.sock' : '/tmp/docker.sock' }
}
}
class ConsulManager(object):
log = get_logger()
def __init__(self, arg):
self.log.info('Initializing consul manager')
self.running = False
self.index = 0
(host, port) = arg.split(':')
self.conn = Consul(host=host, port=port)
def run(self):
if self.running:
return
self.running = True
self.log.info('Running consul manager')
reactor.callLater(0, self.provision_voltha_instances, None)
reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
returnValue(self)
def shutdown(self):
self.log.info('Shutting down consul manager')
self.running = False
def provision_voltha_instances(self, _):
if not self.running:
return
# maintain index such that callbacks only happen is something has changed
# timeout is default to 5m
deferred = self.conn.catalog.service(wait='5s', service='voltha-grpc',
index=self.index)
deferred.addCallbacks(self.provision, self.fail)
deferred.addBoth(self.provision_voltha_instances)
def provision(self, result):
(self.index, data) = result
self.log.info('PROVISIONING {}'.format(data))
for item in data:
(service, id) = item['ServiceID'].split(':')[1].split('_')[1:3]
self.log.info('got {} {}'.format(service, id))
self.podProvisioned(service, id, item['ServiceTags'])
def fail(self, err):
self.log.info('Failure %s'.format(err))
def start_containers(self, result, service, id, tags):
self.log.info("wtf : {}".format(result))
(_, done) = result
self.log.info("result : {}, {}".format(done, type(done)))
if done:
return
self.log.info('provisioning voltha instance {}, with tags {}'.format(id, tags))
for tag in tags:
if tag in containers:
netcfg = self.create_network(id, tag)
if self.create_container(id, tag, netcfg):
self.markProvisioned(service, id)
def create_network(self, id, tag):
return create_container_network('podder_%s_%s' % (tag, id),
containers[tag]['links'])
def create_container(self, id, tag, netcfg):
args = {}
args['image'] = containers[tag]['image']
args['networking_config'] = netcfg
args['command'] = containers[tag]['command']
args['ports'] = containers[tag]['ports']
args['environment'] = containers[tag]['environment']
args['volumes'] = containers[tag]['volumes'].keys()
args['host_config'] = create_host_config(containers[tag]['volumes'],
containers[tag]['ports'])
args['name'] = 'podder_%s_%s' % (tag, id)
start_container(args)
#TODO check container is running
return True
def podProvisioned(self, service, id, tags):
d = self.conn.kv.get('podder/%s/%s/state' % (service, id))
d.addCallback(self.start_containers, service, id, tags)
d.addErrback(lambda err: self.log.info("FAIL {}".format(err)))
def markProvisioned(self, service, id):
self.conn.kv.put('podder/%s/%s/state' % (service, id), "started")