blob: 45c72a6e0300d97fc2e1d6f6425e346ccbc1abe7 [file] [log] [blame]
Scott Baker31acc652016-06-23 15:47:56 -07001import hashlib
2import os
3import socket
4import sys
5import base64
6import time
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +00007#import threading
8import subprocess
9import random
10import tempfile
11#from sshtunnel import SSHTunnelForwarder
Scott Baker31acc652016-06-23 15:47:56 -070012from django.db.models import F, Q
13from xos.config import Config
Murat Parlakisik638c65f2017-05-31 11:10:24 +030014from synchronizers.new_base.syncstep import SyncStep
15from synchronizers.new_base.ansible_helper import run_template_ssh
16from synchronizers.new_base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
17from modelaccessor import *
18#from core.models import Service, Slice, ModelLink
19#from services.monitoring.models import CeilometerService, MonitoringChannel
Scott Baker31acc652016-06-23 15:47:56 -070020from xos.logger import Logger, logging
21
22parentdir = os.path.join(os.path.dirname(__file__),"..")
23sys.path.insert(0,parentdir)
24
25logger = Logger(level=logging.INFO)
26
rdudyala996d70b2016-10-13 17:40:55 +000027#FIXME: Is this right approach?
28#Maintaining a global SSH tunnel database in order to handle tunnel deletions during the object delete
29ssh_tunnel_db = {}
30
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000031class SSHTunnel:
32
33 def __init__(self, localip, localport, key, remoteip, remote_port, jumpuser, jumphost):
34 self.key = key
35 self.remote_host = remoteip # Remote ip on remotehost
36 self.remote_port = remote_port
37 # Get a temporary file name
38 tmpfile = tempfile.NamedTemporaryFile()
39 tmpfile.close()
40 self.socket = tmpfile.name
41 self.local_port = localport
42 self.local_host = localip
43 self.jump_user = jumpuser # Remote user on remotehost
44 self.jump_host = jumphost # What host do we send traffic to
45 self.open = False
46
47 def start(self):
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +000048 logger.info("Creating SSH Tunnel: ssh -MfN -S %s -i %s -L %s:%s:%s;%s -o ExitOnForwardFailure=True %s@%s"%(self.socket, self.key, self.local_host, self.local_port, self.remote_host, self.remote_port,self.jump_user,self.jump_host))
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000049 exit_status = subprocess.call(['ssh', '-MfN',
50 '-S', self.socket,
51 '-i', self.key,
52 '-L', '{}:{}:{}:{}'.format(self.local_host, self.local_port, self.remote_host, self.remote_port),
53 '-o', 'ExitOnForwardFailure=True',
54 self.jump_user + '@' + self.jump_host
55 ])
56 if exit_status != 0:
57 raise Exception('SSH tunnel failed with status: {}'.format(exit_status))
58 if self.send_control_command('check') != 0:
59 raise Exception('SSH tunnel failed to check')
60 self.open = True
61
62 def stop(self):
63 if self.open:
64 if self.send_control_command('exit') != 0:
65 raise Exception('SSH tunnel failed to exit')
66 self.open = False
67
68 def send_control_command(self, cmd):
69 return subprocess.check_call(['ssh', '-S', self.socket, '-O', cmd, '-l', self.jump_user, self.jump_host])
70
71 def __enter__(self):
72 self.start()
73 return self
74
75 def __exit__(self, type, value, traceback):
76 self.stop()
77
78
79#class SshTunnel(threading.Thread):
80# def __init__(self, localip, localport, remoteip, remoteport, proxy_ssh_key, jumpuser, jumphost):
81# threading.Thread.__init__(self)
82# self.localip = localip # Local ip to listen to
83# self.localport = localport # Local port to listen to
84# self.remoteip = remoteip # Remote ip on remotehost
85# self.remoteport = remoteport # Remote port on remotehost
86# self.proxy_ssh_key = proxy_ssh_key
87# self.jumpuser = jumpuser # Remote user on remotehost
88# self.jumphost = jumphost # What host do we send traffic to
89# self.daemon = True # So that thread will exit when
90# # main non-daemon thread finishes
91#
92# def run(self):
93# if subprocess.call([
94# 'ssh', '-N',
95# '-i', self.proxy_ssh_key,
96# '-L', self.localip + ':' + str(self.localport) + ':' + self.remoteip + ':' + str(self.remoteport),
97# jumpuser + '@' + jumphost ]):
98# raise Exception ('ssh tunnel setup failed')
99
Scott Baker31acc652016-06-23 15:47:56 -0700100class SyncMonitoringChannel(SyncInstanceUsingAnsible):
101 provides=[MonitoringChannel]
102 observes=MonitoringChannel
103 requested_interval=0
104 template_name = "sync_monitoringchannel.yaml"
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +0000105 service_key_name = "/opt/xos/synchronizers/monitoring/monitoring_channel_private_key"
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000106 watches = [ModelLink(Slice,via='slice')]
Scott Baker31acc652016-06-23 15:47:56 -0700107
108 def __init__(self, *args, **kwargs):
109 super(SyncMonitoringChannel, self).__init__(*args, **kwargs)
110
111 def fetch_pending(self, deleted):
112 if (not deleted):
113 objs = MonitoringChannel.get_tenant_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
114 else:
115 objs = MonitoringChannel.get_deleted_tenant_objects()
116
117 return objs
118
119 def get_extra_attributes(self, o):
120 # This is a place to include extra attributes. In the case of Monitoring Channel, we need to know
121 # 1) Allowed tenant ids
122 # 2) Ceilometer API service endpoint URL if running externally
123 # 3) Credentials to access Ceilometer API service
124
125 ceilometer_services = CeilometerService.get_service_objects().filter(id=o.provider_service.id)
126 if not ceilometer_services:
127 raise "No associated Ceilometer service"
128 ceilometer_service = ceilometer_services[0]
129 ceilometer_pub_sub_url = ceilometer_service.ceilometer_pub_sub_url
130 if not ceilometer_pub_sub_url:
131 ceilometer_pub_sub_url = ''
132 instance = self.get_instance(o)
133
134 try:
135 full_setup = Config().observer_full_setup
136 except:
137 full_setup = True
138
139 fields = {"unique_id": o.id,
140 "allowed_tenant_ids": o.tenant_list,
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000141 "auth_url":ceilometer_service.ceilometer_auth_url,
142 "admin_user":ceilometer_service.ceilometer_admin_user,
143 "admin_password":ceilometer_service.ceilometer_admin_password,
144 "admin_tenant":ceilometer_service.ceilometer_admin_tenant,
Scott Baker31acc652016-06-23 15:47:56 -0700145 "ceilometer_pub_sub_url": ceilometer_pub_sub_url,
146 "full_setup": full_setup}
147
148 return fields
149
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000150 def sync_fields(self, o, fields):
151 try:
152 super(SyncMonitoringChannel, self).sync_fields(o, fields)
153
154 #Check if ssh tunnel is needed
155 proxy_ssh = getattr(Config(), "observer_proxy_ssh", False)
156
rdudyala996d70b2016-10-13 17:40:55 +0000157 if proxy_ssh and (not o.ssh_proxy_tunnel):
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000158 proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
159 proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
160 jump_hostname = fields["hostname"]
161
162 #Get the tunnel detsination
Srikanth Vavilapalli71aa28d2017-01-31 00:43:13 +0000163 remote_host = o.nat_ip
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000164 remote_port = o.ceilometer_port
165 #FIXME: For now, trying to setup the tunnel on the local port same as the remote port
166 local_port = remote_port
167 local_ip = socket.gethostbyname(socket.gethostname())
168
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000169 tunnel = SSHTunnel(local_ip, local_port, proxy_ssh_key, remote_host, remote_port, proxy_ssh_user, jump_hostname)
170 tunnel.start()
rdudyala996d70b2016-10-13 17:40:55 +0000171 logger.info("SSH Tunnel created for Monitoring channel-%s at local port:%s"%(o.id,local_port))
172
173 #FIXME:Store the tunnel handle in global tunnel database
174 ssh_tunnel_db[o.id] = tunnel
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000175
176 #Update the model with ssh tunnel info
177 o.ssh_proxy_tunnel = True
178 o.ssh_tunnel_ip = local_ip
179 o.ssh_tunnel_port = local_port
180
181 except Exception,error:
182 raise Exception(error)
183
Scott Baker31acc652016-06-23 15:47:56 -0700184 def run_playbook(self, o, fields):
185 #ansible_hash = hashlib.md5(repr(sorted(fields.items()))).hexdigest()
186 #quick_update = (o.last_ansible_hash == ansible_hash)
187
188 #if quick_update:
189 # logger.info("quick_update triggered; skipping ansible recipe")
190 #else:
rdudyala996d70b2016-10-13 17:40:55 +0000191 if ('delete' in fields) and (fields['delete']):
192 logger.info("Delete for Monitoring channel-%s is getting synchronized"%(o.id))
193 if o.id in ssh_tunnel_db:
194 tunnel = ssh_tunnel_db[o.id]
195 tunnel.stop()
196 logger.info("Deleted SSH Tunnel for Monitoring channel-%s at local port:%s"%(o.id,o.ssh_tunnel_port))
197 o.ssh_proxy_tunnel = False
198 del ssh_tunnel_db[o.id]
Scott Baker31acc652016-06-23 15:47:56 -0700199 super(SyncMonitoringChannel, self).run_playbook(o, fields)
200
201 #o.last_ansible_hash = ansible_hash
202
203 def map_delete_inputs(self, o):
204 fields = {"unique_id": o.id,
205 "delete": True}
206 return fields
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000207
208 def handle_watched_object(self, o):
209 logger.info("handle_watched_object is invoked for object %s" % (str(o)),extra=o.tologdict())
210 if (type(o) is Slice):
211 self.handle_slice_watch_notification(o)
212 pass
213
214 def handle_slice_watch_notification(self, sliceobj):
215 logger.info("handle_slice_watch_notification: A slice %s is created or updated or deleted" % (sliceobj))
216 for obj in MonitoringChannel.get_tenant_objects().all():
217 #Save the monitoring channel object to reflect the newly updated slice
218 obj.save()
219 pass