blob: 918709371d3aad1985003b9602cad1e269efb9ae [file] [log] [blame]
alshabibc61999a2016-10-27 16:44:27 -07001from consul.twisted import Consul
alshabibc67ee3a2016-10-25 23:24:03 -07002from structlog import get_logger
3from twisted.internet import reactor
alshabibc61999a2016-10-27 16:44:27 -07004from twisted.internet.defer import returnValue
alshabibc67ee3a2016-10-25 23:24:03 -07005
alshabibc61999a2016-10-27 16:44:27 -07006from common.utils.dockerhelpers import create_container_network, start_container, create_host_config
alshabibc67ee3a2016-10-25 23:24:03 -07007
8containers = {
9 'chameleon' : {
alshabibc61999a2016-10-27 16:44:27 -070010 'image' : 'cord/chameleon',
alshabibc67ee3a2016-10-25 23:24:03 -070011 'command' : [ "/chameleon/main.py",
12 "-v",
13 "--consul=consul:8500",
14 "--fluentd=fluentd:24224",
15 "--rest-port=8881",
16 "--grpc-endpoint=@voltha-grpc",
17 "--instance-id-is-container-name",
18 "-v"],
19 'ports' : [ 8881 ],
20 'depends_on' : [ "consul", "voltha" ],
alshabibc61999a2016-10-27 16:44:27 -070021 'links' : { "consul" : "consul", "fluentd" : "fluentd" },
alshabibc67ee3a2016-10-25 23:24:03 -070022 'environment' : ["SERVICE_8881_NAME=chamleon-rest"],
alshabibc61999a2016-10-27 16:44:27 -070023 'volumes' : { '/var/run/docker.sock' : '/tmp/docker.sock' }
alshabibc67ee3a2016-10-25 23:24:03 -070024 }
25}
26
27class ConsulManager(object):
28
29 log = get_logger()
30
31 def __init__(self, arg):
32 self.log.info('Initializing consul manager')
33 self.running = False
34 self.index = 0
35 (host, port) = arg.split(':')
alshabibc61999a2016-10-27 16:44:27 -070036 self.conn = Consul(host=host, port=port)
alshabibc67ee3a2016-10-25 23:24:03 -070037
alshabibc67ee3a2016-10-25 23:24:03 -070038 def run(self):
39 if self.running:
40 return
41 self.running = True
42
43 self.log.info('Running consul manager')
44
alshabibc61999a2016-10-27 16:44:27 -070045 reactor.callLater(0, self.provision_voltha_instances, None)
alshabibc67ee3a2016-10-25 23:24:03 -070046
47 reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
48 returnValue(self)
49
alshabibc61999a2016-10-27 16:44:27 -070050
alshabibc67ee3a2016-10-25 23:24:03 -070051 def shutdown(self):
52 self.log.info('Shutting down consul manager')
53 self.running = False
54
alshabibc67ee3a2016-10-25 23:24:03 -070055
alshabibc61999a2016-10-27 16:44:27 -070056 def provision_voltha_instances(self, _):
57 if not self.running:
58 return
59 # maintain index such that callbacks only happen is something has changed
60 # timeout is default to 5m
61 deferred = self.conn.catalog.service(wait='5s', service='voltha-grpc',
62 index=self.index)
63 deferred.addCallbacks(self.provision, self.fail)
64 deferred.addBoth(self.provision_voltha_instances)
65
66 def provision(self, result):
67 (self.index, data) = result
68 self.log.info('PROVISIONING {}'.format(data))
alshabibc67ee3a2016-10-25 23:24:03 -070069 for item in data:
alshabibc61999a2016-10-27 16:44:27 -070070 (service, id) = item['ServiceID'].split(':')[1].split('_')[1:3]
71 self.log.info('got {} {}'.format(service, id))
72 self.podProvisioned(service, id, item['ServiceTags'])
73
74 def fail(self, err):
75 self.log.info('Failure %s'.format(err))
76
77 def start_containers(self, result, service, id, tags):
78 self.log.info("wtf : {}".format(result))
79 (_, done) = result
80 self.log.info("result : {}, {}".format(done, type(done)))
81 if done:
82 return
83 self.log.info('provisioning voltha instance {}, with tags {}'.format(id, tags))
84 for tag in tags:
85 if tag in containers:
86 netcfg = self.create_network(id, tag)
87 if self.create_container(id, tag, netcfg):
88 self.markProvisioned(service, id)
alshabibc67ee3a2016-10-25 23:24:03 -070089
90
91 def create_network(self, id, tag):
92 return create_container_network('podder_%s_%s' % (tag, id),
93 containers[tag]['links'])
94
95 def create_container(self, id, tag, netcfg):
96 args = {}
alshabibc61999a2016-10-27 16:44:27 -070097 args['image'] = containers[tag]['image']
alshabibc67ee3a2016-10-25 23:24:03 -070098 args['networking_config'] = netcfg
alshabibc61999a2016-10-27 16:44:27 -070099 args['command'] = containers[tag]['command']
100 args['ports'] = containers[tag]['ports']
101 args['environment'] = containers[tag]['environment']
102 args['volumes'] = containers[tag]['volumes'].keys()
103 args['host_config'] = create_host_config(containers[tag]['volumes'],
104 containers[tag]['ports'])
105 args['name'] = 'podder_%s_%s' % (tag, id)
106 start_container(args)
107 #TODO check container is running
108
109 return True
110
111 def podProvisioned(self, service, id, tags):
112 d = self.conn.kv.get('podder/%s/%s/state' % (service, id))
113 d.addCallback(self.start_containers, service, id, tags)
114 d.addErrback(lambda err: self.log.info("FAIL {}".format(err)))
115
116 def markProvisioned(self, service, id):
117 self.conn.kv.put('podder/%s/%s/state' % (service, id), "started")