blob: 49a196254a69aa4f365be6fe8d9efe6ce546bafa [file] [log] [blame]
Scott Baker31acc652016-06-23 15:47:56 -07001import hashlib
2import os
3import socket
4import sys
5import base64
6import time
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +00007#import threading
8import subprocess
9import random
10import tempfile
11#from sshtunnel import SSHTunnelForwarder
Scott Baker31acc652016-06-23 15:47:56 -070012from django.db.models import F, Q
13from xos.config import Config
14from synchronizers.base.syncstep import SyncStep
Sapan Bhatia7e9f4d32017-01-24 19:32:59 +010015from synchronizers.base.ansible_helper import run_template_ssh
Scott Baker31acc652016-06-23 15:47:56 -070016from synchronizers.base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +000017from core.models import Service, Slice, ModelLink
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +000018from services.monitoring.models import CeilometerService, MonitoringChannel
Scott Baker31acc652016-06-23 15:47:56 -070019from xos.logger import Logger, logging
20
21parentdir = os.path.join(os.path.dirname(__file__),"..")
22sys.path.insert(0,parentdir)
23
24logger = Logger(level=logging.INFO)
25
rdudyala996d70b2016-10-13 17:40:55 +000026#FIXME: Is this right approach?
27#Maintaining a global SSH tunnel database in order to handle tunnel deletions during the object delete
28ssh_tunnel_db = {}
29
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000030class SSHTunnel:
31
32 def __init__(self, localip, localport, key, remoteip, remote_port, jumpuser, jumphost):
33 self.key = key
34 self.remote_host = remoteip # Remote ip on remotehost
35 self.remote_port = remote_port
36 # Get a temporary file name
37 tmpfile = tempfile.NamedTemporaryFile()
38 tmpfile.close()
39 self.socket = tmpfile.name
40 self.local_port = localport
41 self.local_host = localip
42 self.jump_user = jumpuser # Remote user on remotehost
43 self.jump_host = jumphost # What host do we send traffic to
44 self.open = False
45
46 def start(self):
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +000047 logger.info("Creating SSH Tunnel: ssh -MfN -S %s -i %s -L %s:%s:%s;%s -o ExitOnForwardFailure=True %s@%s"%(self.socket, self.key, self.local_host, self.local_port, self.remote_host, self.remote_port,self.jump_user,self.jump_host))
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000048 exit_status = subprocess.call(['ssh', '-MfN',
49 '-S', self.socket,
50 '-i', self.key,
51 '-L', '{}:{}:{}:{}'.format(self.local_host, self.local_port, self.remote_host, self.remote_port),
52 '-o', 'ExitOnForwardFailure=True',
53 self.jump_user + '@' + self.jump_host
54 ])
55 if exit_status != 0:
56 raise Exception('SSH tunnel failed with status: {}'.format(exit_status))
57 if self.send_control_command('check') != 0:
58 raise Exception('SSH tunnel failed to check')
59 self.open = True
60
61 def stop(self):
62 if self.open:
63 if self.send_control_command('exit') != 0:
64 raise Exception('SSH tunnel failed to exit')
65 self.open = False
66
67 def send_control_command(self, cmd):
68 return subprocess.check_call(['ssh', '-S', self.socket, '-O', cmd, '-l', self.jump_user, self.jump_host])
69
70 def __enter__(self):
71 self.start()
72 return self
73
74 def __exit__(self, type, value, traceback):
75 self.stop()
76
77
78#class SshTunnel(threading.Thread):
79# def __init__(self, localip, localport, remoteip, remoteport, proxy_ssh_key, jumpuser, jumphost):
80# threading.Thread.__init__(self)
81# self.localip = localip # Local ip to listen to
82# self.localport = localport # Local port to listen to
83# self.remoteip = remoteip # Remote ip on remotehost
84# self.remoteport = remoteport # Remote port on remotehost
85# self.proxy_ssh_key = proxy_ssh_key
86# self.jumpuser = jumpuser # Remote user on remotehost
87# self.jumphost = jumphost # What host do we send traffic to
88# self.daemon = True # So that thread will exit when
89# # main non-daemon thread finishes
90#
91# def run(self):
92# if subprocess.call([
93# 'ssh', '-N',
94# '-i', self.proxy_ssh_key,
95# '-L', self.localip + ':' + str(self.localport) + ':' + self.remoteip + ':' + str(self.remoteport),
96# jumpuser + '@' + jumphost ]):
97# raise Exception ('ssh tunnel setup failed')
98
Scott Baker31acc652016-06-23 15:47:56 -070099class SyncMonitoringChannel(SyncInstanceUsingAnsible):
100 provides=[MonitoringChannel]
101 observes=MonitoringChannel
102 requested_interval=0
103 template_name = "sync_monitoringchannel.yaml"
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +0000104 service_key_name = "/opt/xos/synchronizers/monitoring/monitoring_channel_private_key"
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000105 watches = [ModelLink(Slice,via='slice')]
Scott Baker31acc652016-06-23 15:47:56 -0700106
107 def __init__(self, *args, **kwargs):
108 super(SyncMonitoringChannel, self).__init__(*args, **kwargs)
109
110 def fetch_pending(self, deleted):
111 if (not deleted):
112 objs = MonitoringChannel.get_tenant_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
113 else:
114 objs = MonitoringChannel.get_deleted_tenant_objects()
115
116 return objs
117
118 def get_extra_attributes(self, o):
119 # This is a place to include extra attributes. In the case of Monitoring Channel, we need to know
120 # 1) Allowed tenant ids
121 # 2) Ceilometer API service endpoint URL if running externally
122 # 3) Credentials to access Ceilometer API service
123
124 ceilometer_services = CeilometerService.get_service_objects().filter(id=o.provider_service.id)
125 if not ceilometer_services:
126 raise "No associated Ceilometer service"
127 ceilometer_service = ceilometer_services[0]
128 ceilometer_pub_sub_url = ceilometer_service.ceilometer_pub_sub_url
129 if not ceilometer_pub_sub_url:
130 ceilometer_pub_sub_url = ''
131 instance = self.get_instance(o)
132
133 try:
134 full_setup = Config().observer_full_setup
135 except:
136 full_setup = True
137
138 fields = {"unique_id": o.id,
139 "allowed_tenant_ids": o.tenant_list,
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000140 "auth_url":ceilometer_service.ceilometer_auth_url,
141 "admin_user":ceilometer_service.ceilometer_admin_user,
142 "admin_password":ceilometer_service.ceilometer_admin_password,
143 "admin_tenant":ceilometer_service.ceilometer_admin_tenant,
Scott Baker31acc652016-06-23 15:47:56 -0700144 "ceilometer_pub_sub_url": ceilometer_pub_sub_url,
145 "full_setup": full_setup}
146
147 return fields
148
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000149 def sync_fields(self, o, fields):
150 try:
151 super(SyncMonitoringChannel, self).sync_fields(o, fields)
152
153 #Check if ssh tunnel is needed
154 proxy_ssh = getattr(Config(), "observer_proxy_ssh", False)
155
rdudyala996d70b2016-10-13 17:40:55 +0000156 if proxy_ssh and (not o.ssh_proxy_tunnel):
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000157 proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
158 proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
159 jump_hostname = fields["hostname"]
160
161 #Get the tunnel detsination
Srikanth Vavilapalli71aa28d2017-01-31 00:43:13 +0000162 remote_host = o.nat_ip
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000163 remote_port = o.ceilometer_port
164 #FIXME: For now, trying to setup the tunnel on the local port same as the remote port
165 local_port = remote_port
166 local_ip = socket.gethostbyname(socket.gethostname())
167
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000168 tunnel = SSHTunnel(local_ip, local_port, proxy_ssh_key, remote_host, remote_port, proxy_ssh_user, jump_hostname)
169 tunnel.start()
rdudyala996d70b2016-10-13 17:40:55 +0000170 logger.info("SSH Tunnel created for Monitoring channel-%s at local port:%s"%(o.id,local_port))
171
172 #FIXME:Store the tunnel handle in global tunnel database
173 ssh_tunnel_db[o.id] = tunnel
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000174
175 #Update the model with ssh tunnel info
176 o.ssh_proxy_tunnel = True
177 o.ssh_tunnel_ip = local_ip
178 o.ssh_tunnel_port = local_port
179
180 except Exception,error:
181 raise Exception(error)
182
Scott Baker31acc652016-06-23 15:47:56 -0700183 def run_playbook(self, o, fields):
184 #ansible_hash = hashlib.md5(repr(sorted(fields.items()))).hexdigest()
185 #quick_update = (o.last_ansible_hash == ansible_hash)
186
187 #if quick_update:
188 # logger.info("quick_update triggered; skipping ansible recipe")
189 #else:
rdudyala996d70b2016-10-13 17:40:55 +0000190 if ('delete' in fields) and (fields['delete']):
191 logger.info("Delete for Monitoring channel-%s is getting synchronized"%(o.id))
192 if o.id in ssh_tunnel_db:
193 tunnel = ssh_tunnel_db[o.id]
194 tunnel.stop()
195 logger.info("Deleted SSH Tunnel for Monitoring channel-%s at local port:%s"%(o.id,o.ssh_tunnel_port))
196 o.ssh_proxy_tunnel = False
197 del ssh_tunnel_db[o.id]
Scott Baker31acc652016-06-23 15:47:56 -0700198 super(SyncMonitoringChannel, self).run_playbook(o, fields)
199
200 #o.last_ansible_hash = ansible_hash
201
202 def map_delete_inputs(self, o):
203 fields = {"unique_id": o.id,
204 "delete": True}
205 return fields
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000206
207 def handle_watched_object(self, o):
208 logger.info("handle_watched_object is invoked for object %s" % (str(o)),extra=o.tologdict())
209 if (type(o) is Slice):
210 self.handle_slice_watch_notification(o)
211 pass
212
213 def handle_slice_watch_notification(self, sliceobj):
214 logger.info("handle_slice_watch_notification: A slice %s is created or updated or deleted" % (sliceobj))
215 for obj in MonitoringChannel.get_tenant_objects().all():
216 #Save the monitoring channel object to reflect the newly updated slice
217 obj.save()
218 pass
219