boyoung | f42f2bf | 2017-10-16 19:06:30 +0900 | [diff] [blame^] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
| 17 | import os |
| 18 | import base64 |
| 19 | import socket |
| 20 | import threading |
| 21 | import time |
| 22 | import sys, traceback |
| 23 | |
| 24 | from synchronizers.swarm.swarmsyncstep import SwarmSyncStep |
| 25 | from synchronizers.new_base.ansible_helper import * |
| 26 | from synchronizers.new_base.syncstep import * |
| 27 | from xos.logger import observer_logger as logger |
| 28 | from synchronizers.new_base.modelaccessor import * |
| 29 | |
| 30 | import synchronizers.swarm.swarmlog as slog |
| 31 | |
| 32 | |
| 33 | class SyncInstances(SwarmSyncStep): |
| 34 | provides = [Instance] |
| 35 | requested_interval = 0 |
| 36 | observes = Instance |
| 37 | playbook = 'sync_instances.yaml' |
| 38 | |
| 39 | def update_instance(self, instance): |
| 40 | slog.info("instance_name: %s Thread: %s instance.updated: %s" % ( |
| 41 | instance.instance_name, threading.current_thread().name, instance.updated)) |
| 42 | time.sleep(1) |
| 43 | |
| 44 | try: |
| 45 | inst_id = instance.id |
| 46 | slog.debug("inst_id: %s" % inst_id) |
| 47 | old_update = 0 |
| 48 | |
| 49 | for idx in range(1, 100, 1): |
| 50 | slog.debug("idx: %s" % idx) |
| 51 | new_inst = Instance.objects.get(id=inst_id) |
| 52 | |
| 53 | if old_update == 0: |
| 54 | old_update = new_inst.updated |
| 55 | slog.debug("updated(old): %s" % time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(old_update))) |
| 56 | |
| 57 | if (new_inst.volumes is not None) and (new_inst.instance_name is not None): |
| 58 | if (len(new_inst.volumes) > 2) and (len(new_inst.instance_name) > 2): |
| 59 | slog.debug("instance_name: %s (%s)" % (new_inst.instance_name, len(new_inst.instance_name))) |
| 60 | slog.debug("volumes : %s (%s)" % (new_inst.volumes, len(new_inst.volumes ))) |
| 61 | |
| 62 | new_inst.numberCores = idx |
| 63 | new_inst.updated = time.time() |
| 64 | slog.debug("updated to renew : %s (%s)" % ( |
| 65 | time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(new_inst.updated)), |
| 66 | new_inst.updated) |
| 67 | ) |
| 68 | new_inst.save(update_fields=['updated', 'numberCores']) |
| 69 | time.sleep(1) |
| 70 | clone_inst = Instance.objects.get(id=inst_id) |
| 71 | clone_inst.save(update_fields=['numberCores']) |
| 72 | slog.debug("updated : %s (%s)" % ( |
| 73 | time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(clone_inst.updated)), |
| 74 | clone_inst.updated) |
| 75 | ) |
| 76 | if clone_inst.updated == old_update: |
| 77 | slog.debug("updated date was not changed. Nothing is executed. Waiting 5 seconds.") |
| 78 | time.sleep(5) |
| 79 | else: |
| 80 | slog.debug("updated date was changed. Swarm synchronizer will run ansible") |
| 81 | return |
| 82 | except Exception as ex: |
| 83 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 84 | slog.error("%s" % str(traceback.format_exc())) |
| 85 | |
| 86 | def chk_svc_exist(self, instance, swarm_manager_address): |
| 87 | duplicated_flag = False |
| 88 | try: |
| 89 | instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id) |
| 90 | slog.debug("Service name to chkeck: %s" % instance_name) |
| 91 | |
| 92 | import docker |
| 93 | docker_api_base_url = "tcp://%s:4243" % swarm_manager_address |
| 94 | slog.debug("Docker API Base URL: %s" % docker_api_base_url) |
| 95 | my_client = docker.DockerClient(base_url=docker_api_base_url) |
| 96 | swarm_svc = my_client.services.get(instance_name) |
| 97 | |
| 98 | slog.debug("swarm_svc.id : %s" % swarm_svc.id) |
| 99 | slog.debug("swarm_svc.name : %s" % swarm_svc.name) |
| 100 | slog.debug("swarm_svc.attrs : %s" % swarm_svc.attrs) |
| 101 | except Exception as ex: |
| 102 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 103 | slog.error("%s" % str(traceback.format_exc())) |
| 104 | slog.info("There is no duplicated service name, I would create new service (%s)" % instance_name) |
| 105 | duplicated_flag = False |
| 106 | else: |
| 107 | slog.info("There is duplicated service name (%s)" % instance_name) |
| 108 | duplicated_flag = True |
| 109 | return duplicated_flag |
| 110 | |
| 111 | def fetch_pending(self, deletion=False): |
| 112 | slog.info("begin fetch_pending(%s)" % deletion) |
| 113 | objs = super(SyncInstances, self).fetch_pending(deletion) |
| 114 | slog.info("objects: %s" % objs) |
| 115 | objs = [x for x in objs if x.isolation == "vm"] |
| 116 | slog.info("VM objects: %s" % objs) |
| 117 | return objs |
| 118 | |
| 119 | def map_sync_inputs(self, instance): |
| 120 | try: |
| 121 | slog.debug("instance: %s slice: %s" % (instance, instance.slice.name)) |
| 122 | controller = instance.node.site_deployment.controller |
| 123 | swarm_manager_url = controller.auth_url |
| 124 | slog.info("swarm_manager_url: %s" % swarm_manager_url) |
| 125 | (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':') |
| 126 | slog.info("swarm_manager_address: %s docker_registry_port: %s" % ( |
| 127 | swarm_manager_address, docker_registry_port)) |
| 128 | |
| 129 | # if instance.instance_uuid is not None, |
| 130 | # then This method will update the instance with new configuration. |
| 131 | swarm_service_update_flag = False |
| 132 | if instance.instance_uuid is not None: |
| 133 | if len(instance.instance_uuid) > 2: |
| 134 | swarm_service_update_flag = True |
| 135 | |
| 136 | # check if this service is created already on swarm cluster. |
| 137 | duplicated_flag = self.chk_svc_exist(instance, swarm_manager_address) |
| 138 | if duplicated_flag is True: |
| 139 | swarm_service_update_flag = True |
| 140 | |
| 141 | # To get volume information |
| 142 | slog.debug("slice.mount_data_sets: %s" % instance.slice.mount_data_sets) |
| 143 | mount_dest_path = "/usr/local/etc" |
| 144 | if len(instance.slice.mount_data_sets) < 2: |
| 145 | slog.debug("instance.slice.mount_data_sets(%s) is too short" % instance.slice.mount_data_sets) |
| 146 | else: |
| 147 | mount_dest_path = instance.slice.mount_data_sets |
| 148 | slog.debug("volume mount destination path: %s" % mount_dest_path) |
| 149 | |
| 150 | # set options for volume mounting |
| 151 | # (example) --mount type=bind,src=/opt/xos/instance_volume/1,dst=/usr/local/etc/haproxy |
| 152 | volume_mount_opt = " " |
| 153 | if swarm_service_update_flag is True: |
| 154 | volume_mount_opt = "--mount-add type=bind,src=/opt/xos/instance_volume/%s,dst=%s" % ( |
| 155 | instance.id, mount_dest_path) |
| 156 | else: |
| 157 | volume_mount_opt = "--mount type=bind,src=/opt/xos/instance_volume/%s,dst=%s" % ( |
| 158 | instance.id, mount_dest_path) |
| 159 | slog.debug("volume_mount_opt: %s" % volume_mount_opt) |
| 160 | host_volume_path = "/opt/xos/instance_volume/%s" % instance.id |
| 161 | slog.debug("host_volume_path: %s" % host_volume_path) |
| 162 | |
| 163 | # sanity check - make sure model_policy for slice has run |
| 164 | if ((not instance.slice.policed) or (instance.slice.policed < instance.slice.updated)): |
| 165 | slog.info("Instance %s waiting on Slice %s to execute model policies" % ( |
| 166 | instance, instance.slice.name)) |
| 167 | raise DeferredException( |
| 168 | "Instance %s waiting on Slice %s to execute model policies" % ( |
| 169 | instance, instance.slice.name)) |
| 170 | |
| 171 | # sanity check - make sure model_policy for all slice networks have run |
| 172 | networks = instance.slice.ownedNetworks.all() |
| 173 | slog.debug("network list for slice of this instance(%s): %s" % (instance.name, str(networks))) |
| 174 | |
| 175 | for network in instance.slice.ownedNetworks.all(): |
| 176 | slog.info("instance: %s network of slice(%s): %s" % (instance.name, instance.slice.name, network.name)) |
| 177 | if ((not network.policed) or (network.policed < network.updated)): |
| 178 | slog.info("Instance %s waiting on Network %s to execute model policies" % ( |
| 179 | instance, network.name)) |
| 180 | raise DeferredException( |
| 181 | "Instance %s waiting on Network %s to execute model policies" % ( |
| 182 | instance, network.name)) |
| 183 | slog.debug("Model Policy checking is done successfully.") |
| 184 | |
| 185 | swarm_network = "" |
| 186 | for network in networks: |
| 187 | slog.debug("networkd.id: %s(%s, %s) controller.id: %s" % ( |
| 188 | network.id, network.name, network.subnet, |
| 189 | instance.node.site_deployment.controller.id)) |
| 190 | if not ControllerNetwork.objects.filter( |
| 191 | network_id=network.id, |
| 192 | controller_id=instance.node.site_deployment.controller.id).exists(): |
| 193 | raise DeferredException( |
| 194 | "Instance %s Private Network %s lacks ControllerNetwork object" % ( |
| 195 | instance, network.name)) |
| 196 | swarm_network += " --network %s " % network.name |
| 197 | slog.debug("swarm_network: %s" % swarm_network) |
| 198 | |
| 199 | image_name = None |
| 200 | controller_images = instance.image.controllerimages.all() |
| 201 | controller_images = [x for x in controller_images if |
| 202 | x.controller_id == instance.node.site_deployment.controller.id] |
| 203 | if controller_images: |
| 204 | image_name = controller_images[0].image.name |
| 205 | slog.info("using image from ControllerImage object: " + str(image_name)) |
| 206 | |
| 207 | host_filter = instance.node.name.strip() |
| 208 | slog.info("instance.node.name: %s" % instance.node.name) |
| 209 | |
| 210 | instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id) |
| 211 | slog.info("service name: %s instance.slice.name: %s instance.id: %s instance_name: %s" % ( |
| 212 | instance.slice.service.name, instance.slice.name, instance.id, instance_name)) |
| 213 | self.instance_name = instance_name |
| 214 | |
| 215 | input_fields = { |
| 216 | 'swarm_manager_address' : swarm_manager_address, |
| 217 | 'swarm_service_name' : instance_name, |
| 218 | 'network_name' : swarm_network, |
| 219 | 'replicas' : "--replicas 1", |
| 220 | 'restart_condition' : "--restart-condition any --restart-delay 9s ", |
| 221 | 'volume' : volume_mount_opt, |
| 222 | 'host_volume_path' : host_volume_path, |
| 223 | 'docker_registry_port' : docker_registry_port, |
| 224 | 'image_name' : instance.image.name, |
| 225 | 'image_tag' : instance.image.tag, |
| 226 | 'ansible_tag' : instance_name, |
| 227 | 'delete' : False, |
| 228 | 'update' : swarm_service_update_flag |
| 229 | } |
| 230 | slog.info("input_fields: %s" % input_fields) |
| 231 | |
| 232 | if swarm_service_update_flag is False: |
| 233 | slog.info("swarm_service_update_flag is %s, so I will update once more" % |
| 234 | swarm_service_update_flag) |
| 235 | try: |
| 236 | my_thr = threading.Thread(target=self.update_instance, args=(instance,)) |
| 237 | my_thr.start() |
| 238 | except Exception as ex: |
| 239 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 240 | slog.error("%s" % str(traceback.format_exc())) |
| 241 | return input_fields |
| 242 | except Exception as ex: |
| 243 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 244 | slog.error("%s" % str(traceback.format_exc())) |
| 245 | |
| 246 | def map_sync_outputs(self, instance, res): |
| 247 | try: |
| 248 | slog.debug("ansible playbook ressult: %s" % str(res)) |
| 249 | slog.debug("ansible playbook ressult[3][stdout]: %s" % str(res[3]['stdout'])) |
| 250 | res_stdout = res[3]['stdout'] |
| 251 | json_content = json.loads(res_stdout) |
| 252 | slog.debug("json_content: %s" % str(json_content)) |
| 253 | instance.instance_id = json_content[0]['Spec']["Name"] |
| 254 | slog.debug("instance.instance_id: %s" % str(instance.instance_id)) |
| 255 | instance.instance_uuid = json_content[0]['ID'] |
| 256 | slog.debug("instance.instance_uuid: %s" % str(instance.instance_uuid)) |
| 257 | |
| 258 | controller = instance.node.site_deployment.controller |
| 259 | swarm_manager_url = controller.auth_url |
| 260 | (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':') |
| 261 | slog.debug("swarm_manager_address: %s docker_registry_port: %s" % ( |
| 262 | swarm_manager_address, docker_registry_port)) |
| 263 | try: |
| 264 | instance.ip = socket.gethostbyname(swarm_manager_address) |
| 265 | except Exception,e: |
| 266 | slog.info(str(e)) |
| 267 | slog.info("hostname(%s) resolution fail" % swarm_manager_address) |
| 268 | pass |
| 269 | instance.instance_name = self.instance_name |
| 270 | instance.save() |
| 271 | except Exception as ex: |
| 272 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 273 | slog.error("%s" % str(traceback.format_exc())) |
| 274 | |
| 275 | def map_delete_inputs(self, instance): |
| 276 | try: |
| 277 | controller_register = json.loads(instance.node.site_deployment.controller.backend_register) |
| 278 | slog.debug("controller_register: %s" % controller_register) |
| 279 | |
| 280 | if (controller_register.get('disabled', False)): |
| 281 | slog.info('Controller %s is disabled' % instance.node.site_deployment.controller.name) |
| 282 | raise InnocuousException('Controller %s is disabled' % instance.node.site_deployment.controller.name) |
| 283 | |
| 284 | instance_name = '%s-%s-%d' % (instance.slice.service.name, instance.slice.name, instance.id) |
| 285 | slog.debug("instance_name: %s" % instance_name) |
| 286 | |
| 287 | controller = instance.node.site_deployment.controller |
| 288 | slog.debug("controller: %s" % controller) |
| 289 | swarm_manager_url = controller.auth_url |
| 290 | slog.debug("swarm_manager_url: %s" % swarm_manager_url) |
| 291 | (swarm_manager_address, docker_registry_port) = swarm_manager_url.split(':') |
| 292 | slog.debug("swarm_manager_address: %s docker_registry_port: %s" % (swarm_manager_address, docker_registry_port)) |
| 293 | host_volume_path = "/opt/xos/instance_volume/%s" % instance.id |
| 294 | slog.debug("host_volume_path: %s" % host_volume_path) |
| 295 | input = { |
| 296 | 'swarm_manager_address' : swarm_manager_address, |
| 297 | 'swarm_service_name' : instance_name, |
| 298 | 'host_volume_path' : host_volume_path, |
| 299 | 'ansible_tag' : instance_name, |
| 300 | 'delete' : True |
| 301 | } |
| 302 | return input |
| 303 | except Exception as ex: |
| 304 | slog.error("Exception: %s %s %s" % (type(ex), str(ex), ex.args)) |
| 305 | slog.error("%s" % str(traceback.format_exc())) |