blob: fcb6f17140720a839a7d50cdd34dbe9503db1563 [file] [log] [blame]
Scott Baker31acc652016-06-23 15:47:56 -07001import hashlib
2import os
3import socket
4import sys
5import base64
6import time
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +00007#import threading
8import subprocess
9import random
10import tempfile
11#from sshtunnel import SSHTunnelForwarder
Scott Baker31acc652016-06-23 15:47:56 -070012from django.db.models import F, Q
13from xos.config import Config
14from synchronizers.base.syncstep import SyncStep
15from synchronizers.base.ansible import run_template_ssh
16from synchronizers.base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
17from core.models import Service, Slice
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +000018from services.monitoring.models import CeilometerService, MonitoringChannel
Scott Baker31acc652016-06-23 15:47:56 -070019from xos.logger import Logger, logging
20
21parentdir = os.path.join(os.path.dirname(__file__),"..")
22sys.path.insert(0,parentdir)
23
24logger = Logger(level=logging.INFO)
25
rdudyala996d70b2016-10-13 17:40:55 +000026#FIXME: Is this right approach?
27#Maintaining a global SSH tunnel database in order to handle tunnel deletions during the object delete
28ssh_tunnel_db = {}
29
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000030class SSHTunnel:
31
32 def __init__(self, localip, localport, key, remoteip, remote_port, jumpuser, jumphost):
33 self.key = key
34 self.remote_host = remoteip # Remote ip on remotehost
35 self.remote_port = remote_port
36 # Get a temporary file name
37 tmpfile = tempfile.NamedTemporaryFile()
38 tmpfile.close()
39 self.socket = tmpfile.name
40 self.local_port = localport
41 self.local_host = localip
42 self.jump_user = jumpuser # Remote user on remotehost
43 self.jump_host = jumphost # What host do we send traffic to
44 self.open = False
45
46 def start(self):
47 exit_status = subprocess.call(['ssh', '-MfN',
48 '-S', self.socket,
49 '-i', self.key,
50 '-L', '{}:{}:{}:{}'.format(self.local_host, self.local_port, self.remote_host, self.remote_port),
51 '-o', 'ExitOnForwardFailure=True',
52 self.jump_user + '@' + self.jump_host
53 ])
54 if exit_status != 0:
55 raise Exception('SSH tunnel failed with status: {}'.format(exit_status))
56 if self.send_control_command('check') != 0:
57 raise Exception('SSH tunnel failed to check')
58 self.open = True
59
60 def stop(self):
61 if self.open:
62 if self.send_control_command('exit') != 0:
63 raise Exception('SSH tunnel failed to exit')
64 self.open = False
65
66 def send_control_command(self, cmd):
67 return subprocess.check_call(['ssh', '-S', self.socket, '-O', cmd, '-l', self.jump_user, self.jump_host])
68
69 def __enter__(self):
70 self.start()
71 return self
72
73 def __exit__(self, type, value, traceback):
74 self.stop()
75
76
77#class SshTunnel(threading.Thread):
78# def __init__(self, localip, localport, remoteip, remoteport, proxy_ssh_key, jumpuser, jumphost):
79# threading.Thread.__init__(self)
80# self.localip = localip # Local ip to listen to
81# self.localport = localport # Local port to listen to
82# self.remoteip = remoteip # Remote ip on remotehost
83# self.remoteport = remoteport # Remote port on remotehost
84# self.proxy_ssh_key = proxy_ssh_key
85# self.jumpuser = jumpuser # Remote user on remotehost
86# self.jumphost = jumphost # What host do we send traffic to
87# self.daemon = True # So that thread will exit when
88# # main non-daemon thread finishes
89#
90# def run(self):
91# if subprocess.call([
92# 'ssh', '-N',
93# '-i', self.proxy_ssh_key,
94# '-L', self.localip + ':' + str(self.localport) + ':' + self.remoteip + ':' + str(self.remoteport),
95# jumpuser + '@' + jumphost ]):
96# raise Exception ('ssh tunnel setup failed')
97
Scott Baker31acc652016-06-23 15:47:56 -070098class SyncMonitoringChannel(SyncInstanceUsingAnsible):
99 provides=[MonitoringChannel]
100 observes=MonitoringChannel
101 requested_interval=0
102 template_name = "sync_monitoringchannel.yaml"
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +0000103 service_key_name = "/opt/xos/synchronizers/monitoring/monitoring_channel_private_key"
Scott Baker31acc652016-06-23 15:47:56 -0700104
105 def __init__(self, *args, **kwargs):
106 super(SyncMonitoringChannel, self).__init__(*args, **kwargs)
107
108 def fetch_pending(self, deleted):
109 if (not deleted):
110 objs = MonitoringChannel.get_tenant_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
111 else:
112 objs = MonitoringChannel.get_deleted_tenant_objects()
113
114 return objs
115
116 def get_extra_attributes(self, o):
117 # This is a place to include extra attributes. In the case of Monitoring Channel, we need to know
118 # 1) Allowed tenant ids
119 # 2) Ceilometer API service endpoint URL if running externally
120 # 3) Credentials to access Ceilometer API service
121
122 ceilometer_services = CeilometerService.get_service_objects().filter(id=o.provider_service.id)
123 if not ceilometer_services:
124 raise "No associated Ceilometer service"
125 ceilometer_service = ceilometer_services[0]
126 ceilometer_pub_sub_url = ceilometer_service.ceilometer_pub_sub_url
127 if not ceilometer_pub_sub_url:
128 ceilometer_pub_sub_url = ''
129 instance = self.get_instance(o)
130
131 try:
132 full_setup = Config().observer_full_setup
133 except:
134 full_setup = True
135
136 fields = {"unique_id": o.id,
137 "allowed_tenant_ids": o.tenant_list,
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000138 "auth_url":ceilometer_service.ceilometer_auth_url,
139 "admin_user":ceilometer_service.ceilometer_admin_user,
140 "admin_password":ceilometer_service.ceilometer_admin_password,
141 "admin_tenant":ceilometer_service.ceilometer_admin_tenant,
Scott Baker31acc652016-06-23 15:47:56 -0700142 "ceilometer_pub_sub_url": ceilometer_pub_sub_url,
143 "full_setup": full_setup}
144
145 return fields
146
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000147 def sync_fields(self, o, fields):
148 try:
149 super(SyncMonitoringChannel, self).sync_fields(o, fields)
150
151 #Check if ssh tunnel is needed
152 proxy_ssh = getattr(Config(), "observer_proxy_ssh", False)
153
rdudyala996d70b2016-10-13 17:40:55 +0000154 if proxy_ssh and (not o.ssh_proxy_tunnel):
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000155 proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
156 proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
157 jump_hostname = fields["hostname"]
158
159 #Get the tunnel detsination
160 remote_host = o.private_ip
161 remote_port = o.ceilometer_port
162 #FIXME: For now, trying to setup the tunnel on the local port same as the remote port
163 local_port = remote_port
164 local_ip = socket.gethostbyname(socket.gethostname())
165
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000166 tunnel = SSHTunnel(local_ip, local_port, proxy_ssh_key, remote_host, remote_port, proxy_ssh_user, jump_hostname)
167 tunnel.start()
rdudyala996d70b2016-10-13 17:40:55 +0000168 logger.info("SSH Tunnel created for Monitoring channel-%s at local port:%s"%(o.id,local_port))
169
170 #FIXME:Store the tunnel handle in global tunnel database
171 ssh_tunnel_db[o.id] = tunnel
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000172
173 #Update the model with ssh tunnel info
174 o.ssh_proxy_tunnel = True
175 o.ssh_tunnel_ip = local_ip
176 o.ssh_tunnel_port = local_port
177
178 except Exception,error:
179 raise Exception(error)
180
Scott Baker31acc652016-06-23 15:47:56 -0700181 def run_playbook(self, o, fields):
182 #ansible_hash = hashlib.md5(repr(sorted(fields.items()))).hexdigest()
183 #quick_update = (o.last_ansible_hash == ansible_hash)
184
185 #if quick_update:
186 # logger.info("quick_update triggered; skipping ansible recipe")
187 #else:
rdudyala996d70b2016-10-13 17:40:55 +0000188 if ('delete' in fields) and (fields['delete']):
189 logger.info("Delete for Monitoring channel-%s is getting synchronized"%(o.id))
190 if o.id in ssh_tunnel_db:
191 tunnel = ssh_tunnel_db[o.id]
192 tunnel.stop()
193 logger.info("Deleted SSH Tunnel for Monitoring channel-%s at local port:%s"%(o.id,o.ssh_tunnel_port))
194 o.ssh_proxy_tunnel = False
195 del ssh_tunnel_db[o.id]
Scott Baker31acc652016-06-23 15:47:56 -0700196 super(SyncMonitoringChannel, self).run_playbook(o, fields)
197
198 #o.last_ansible_hash = ansible_hash
199
200 def map_delete_inputs(self, o):
201 fields = {"unique_id": o.id,
202 "delete": True}
203 return fields