blob: b2ef19eb7479401bd5dbeca457bf63d9458e4e3f [file] [log] [blame]
boyoungf42f2bf2017-10-16 19:06:30 +09001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17import os
18import base64
19import socket
20import threading
21import time
22import sys, traceback
23
24from synchronizers.swarm.swarmsyncstep import SwarmSyncStep
25from synchronizers.new_base.ansible_helper import *
26from synchronizers.new_base.syncstep import *
27from xos.logger import observer_logger as logger
28from synchronizers.new_base.modelaccessor import *
29
30import synchronizers.swarm.swarmlog as slog
31
32
33class SyncInstances(SwarmSyncStep):
34 provides = [Instance]
35 requested_interval = 0
36 observes = Instance
37 playbook = 'sync_instances.yaml'
38
39 def update_instance(self, instance):
40 slog.info("instance_name: %s Thread: %s instance.updated: %s" % (
41 instance.instance_name, threading.current_thread().name, instance.updated))
42 time.sleep(1)
43
44 try:
45 inst_id = instance.id
46 slog.debug("inst_id: %s" % inst_id)
47 old_update = 0
48
49 for idx in range(1, 100, 1):
50 slog.debug("idx: %s" % idx)
51 new_inst = Instance.objects.get(id=inst_id)
52
53 if old_update == 0:
54 old_update = new_inst.updated
55 slog.debug("updated(old): %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(old_update)))
56
57 if (new_inst.volumes is not None) and (new_inst.instance_name is not None):
58 if (len(new_inst.volumes) > 2) and (len(new_inst.instance_name) > 2):
59 slog.debug("instance_name: %s (%s)" % (new_inst.instance_name, len(new_inst.instance_name)))
60 slog.debug("volumes : %s (%s)" % (new_inst.volumes, len(new_inst.volumes )))
61
62 new_inst.numberCores = idx
63 new_inst.updated = time.time()
64 slog.debug("updated to renew : %s (%s)" % (
65 time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(new_inst.updated)),
66 new_inst.updated)
67 )
68 new_inst.save(update_fields=['updated', 'numberCores'])
69 time.sleep(1)
70 clone_inst = Instance.objects.get(id=inst_id)
71 clone_inst.save(update_fields=['numberCores'])
72 slog.debug("updated : %s (%s)" % (
73 time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(clone_inst.updated)),
74 clone_inst.updated)
75 )
76 if clone_inst.updated == old_update:
77 slog.debug("updated date was not changed. Nothing is executed. Waiting 5 seconds.")
78 time.sleep(5)
79 else:
80 slog.debug("updated date was changed. Swarm synchronizer will run ansible")
81 return
82 except Exception as ex:
83 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
84 slog.error("%s" % str(traceback.format_exc()))
85
86 def chk_svc_exist(self, instance, swarm_manager_address):
87 duplicated_flag = False
88 try:
89 instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id)
90 slog.debug("Service name to chkeck: %s" % instance_name)
91
92 import docker
93 docker_api_base_url = "tcp://%s:4243" % swarm_manager_address
94 slog.debug("Docker API Base URL: %s" % docker_api_base_url)
95 my_client = docker.DockerClient(base_url=docker_api_base_url)
96 swarm_svc = my_client.services.get(instance_name)
97
98 slog.debug("swarm_svc.id : %s" % swarm_svc.id)
99 slog.debug("swarm_svc.name : %s" % swarm_svc.name)
100 slog.debug("swarm_svc.attrs : %s" % swarm_svc.attrs)
101 except Exception as ex:
102 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
103 slog.error("%s" % str(traceback.format_exc()))
104 slog.info("There is no duplicated service name, I would create new service (%s)" % instance_name)
105 duplicated_flag = False
106 else:
107 slog.info("There is duplicated service name (%s)" % instance_name)
108 duplicated_flag = True
109 return duplicated_flag
110
111 def fetch_pending(self, deletion=False):
112 slog.info("begin fetch_pending(%s)" % deletion)
113 objs = super(SyncInstances, self).fetch_pending(deletion)
114 slog.info("objects: %s" % objs)
115 objs = [x for x in objs if x.isolation == "vm"]
116 slog.info("VM objects: %s" % objs)
117 return objs
118
119 def map_sync_inputs(self, instance):
120 try:
121 slog.debug("instance: %s slice: %s" % (instance, instance.slice.name))
122 controller = instance.node.site_deployment.controller
123 swarm_manager_url = controller.auth_url
124 slog.info("swarm_manager_url: %s" % swarm_manager_url)
125 (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':')
126 slog.info("swarm_manager_address: %s docker_registry_port: %s" % (
127 swarm_manager_address, docker_registry_port))
128
129 # if instance.instance_uuid is not None,
130 # then This method will update the instance with new configuration.
131 swarm_service_update_flag = False
132 if instance.instance_uuid is not None:
133 if len(instance.instance_uuid) > 2:
134 swarm_service_update_flag = True
135
136 # check if this service is created already on swarm cluster.
137 duplicated_flag = self.chk_svc_exist(instance, swarm_manager_address)
138 if duplicated_flag is True:
139 swarm_service_update_flag = True
140
141 # To get volume information
142 slog.debug("slice.mount_data_sets: %s" % instance.slice.mount_data_sets)
143 mount_dest_path = "/usr/local/etc"
144 if len(instance.slice.mount_data_sets) < 2:
145 slog.debug("instance.slice.mount_data_sets(%s) is too short" % instance.slice.mount_data_sets)
146 else:
147 mount_dest_path = instance.slice.mount_data_sets
148 slog.debug("volume mount destination path: %s" % mount_dest_path)
149
150 # set options for volume mounting
151 # (example) --mount type=bind,src=/opt/xos/instance_volume/1,dst=/usr/local/etc/haproxy
152 volume_mount_opt = " "
153 if swarm_service_update_flag is True:
154 volume_mount_opt = "--mount-add type=bind,src=/opt/xos/instance_volume/%s,dst=%s" % (
155 instance.id, mount_dest_path)
156 else:
157 volume_mount_opt = "--mount type=bind,src=/opt/xos/instance_volume/%s,dst=%s" % (
158 instance.id, mount_dest_path)
159 slog.debug("volume_mount_opt: %s" % volume_mount_opt)
160 host_volume_path = "/opt/xos/instance_volume/%s" % instance.id
161 slog.debug("host_volume_path: %s" % host_volume_path)
162
163 # sanity check - make sure model_policy for slice has run
164 if ((not instance.slice.policed) or (instance.slice.policed < instance.slice.updated)):
165 slog.info("Instance %s waiting on Slice %s to execute model policies" % (
166 instance, instance.slice.name))
167 raise DeferredException(
168 "Instance %s waiting on Slice %s to execute model policies" % (
169 instance, instance.slice.name))
170
171 # sanity check - make sure model_policy for all slice networks have run
172 networks = instance.slice.ownedNetworks.all()
173 slog.debug("network list for slice of this instance(%s): %s" % (instance.name, str(networks)))
174
175 for network in instance.slice.ownedNetworks.all():
176 slog.info("instance: %s network of slice(%s): %s" % (instance.name, instance.slice.name, network.name))
177 if ((not network.policed) or (network.policed < network.updated)):
178 slog.info("Instance %s waiting on Network %s to execute model policies" % (
179 instance, network.name))
180 raise DeferredException(
181 "Instance %s waiting on Network %s to execute model policies" % (
182 instance, network.name))
183 slog.debug("Model Policy checking is done successfully.")
184
185 swarm_network = ""
186 for network in networks:
187 slog.debug("networkd.id: %s(%s, %s) controller.id: %s" % (
188 network.id, network.name, network.subnet,
189 instance.node.site_deployment.controller.id))
190 if not ControllerNetwork.objects.filter(
191 network_id=network.id,
192 controller_id=instance.node.site_deployment.controller.id).exists():
193 raise DeferredException(
194 "Instance %s Private Network %s lacks ControllerNetwork object" % (
195 instance, network.name))
196 swarm_network += " --network %s " % network.name
197 slog.debug("swarm_network: %s" % swarm_network)
198
199 image_name = None
200 controller_images = instance.image.controllerimages.all()
201 controller_images = [x for x in controller_images if
202 x.controller_id == instance.node.site_deployment.controller.id]
203 if controller_images:
204 image_name = controller_images[0].image.name
205 slog.info("using image from ControllerImage object: " + str(image_name))
206
207 host_filter = instance.node.name.strip()
208 slog.info("instance.node.name: %s" % instance.node.name)
209
210 instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id)
211 slog.info("service name: %s instance.slice.name: %s instance.id: %s instance_name: %s" % (
212 instance.slice.service.name, instance.slice.name, instance.id, instance_name))
213 self.instance_name = instance_name
214
215 input_fields = {
216 'swarm_manager_address' : swarm_manager_address,
217 'swarm_service_name' : instance_name,
218 'network_name' : swarm_network,
219 'replicas' : "--replicas 1",
220 'restart_condition' : "--restart-condition any --restart-delay 9s ",
221 'volume' : volume_mount_opt,
222 'host_volume_path' : host_volume_path,
223 'docker_registry_port' : docker_registry_port,
224 'image_name' : instance.image.name,
225 'image_tag' : instance.image.tag,
226 'ansible_tag' : instance_name,
227 'delete' : False,
228 'update' : swarm_service_update_flag
229 }
230 slog.info("input_fields: %s" % input_fields)
231
232 if swarm_service_update_flag is False:
233 slog.info("swarm_service_update_flag is %s, so I will update once more" %
234 swarm_service_update_flag)
235 try:
236 my_thr = threading.Thread(target=self.update_instance, args=(instance,))
237 my_thr.start()
238 except Exception as ex:
239 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
240 slog.error("%s" % str(traceback.format_exc()))
241 return input_fields
242 except Exception as ex:
243 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
244 slog.error("%s" % str(traceback.format_exc()))
245
246 def map_sync_outputs(self, instance, res):
247 try:
248 slog.debug("ansible playbook ressult: %s" % str(res))
249 slog.debug("ansible playbook ressult[3][stdout]: %s" % str(res[3]['stdout']))
250 res_stdout = res[3]['stdout']
251 json_content = json.loads(res_stdout)
252 slog.debug("json_content: %s" % str(json_content))
253 instance.instance_id = json_content[0]['Spec']["Name"]
254 slog.debug("instance.instance_id: %s" % str(instance.instance_id))
255 instance.instance_uuid = json_content[0]['ID']
256 slog.debug("instance.instance_uuid: %s" % str(instance.instance_uuid))
257
258 controller = instance.node.site_deployment.controller
259 swarm_manager_url = controller.auth_url
260 (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':')
261 slog.debug("swarm_manager_address: %s docker_registry_port: %s" % (
262 swarm_manager_address, docker_registry_port))
263 try:
264 instance.ip = socket.gethostbyname(swarm_manager_address)
265 except Exception,e:
266 slog.info(str(e))
267 slog.info("hostname(%s) resolution fail" % swarm_manager_address)
268 pass
269 instance.instance_name = self.instance_name
270 instance.save()
271 except Exception as ex:
272 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
273 slog.error("%s" % str(traceback.format_exc()))
274
275 def map_delete_inputs(self, instance):
276 try:
277 controller_register = json.loads(instance.node.site_deployment.controller.backend_register)
278 slog.debug("controller_register: %s" % controller_register)
279
280 if (controller_register.get('disabled', False)):
281 slog.info('Controller %s is disabled' % instance.node.site_deployment.controller.name)
282 raise InnocuousException('Controller %s is disabled' % instance.node.site_deployment.controller.name)
283
284 instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id)
285 slog.debug("instance_name: %s" % instance_name)
286
287 controller = instance.node.site_deployment.controller
288 slog.debug("controller: %s" % controller)
289 swarm_manager_url = controller.auth_url
290 slog.debug("swarm_manager_url: %s" % swarm_manager_url)
291 (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':')
292 slog.debug("swarm_manager_address: %s docker_registry_port: %s" % (swarm_manager_address, docker_registry_port))
293 host_volume_path = "/opt/xos/instance_volume/%s" % instance.id
294 slog.debug("host_volume_path: %s" % host_volume_path)
295 input = {
296 'swarm_manager_address' : swarm_manager_address,
297 'swarm_service_name' : instance_name,
298 'host_volume_path' : host_volume_path,
299 'ansible_tag' : instance_name,
300 'delete' : True
301 }
302 return input
303 except Exception as ex:
304 slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args))
305 slog.error("%s" % str(traceback.format_exc()))