blob: be76942df7b9a3bc482c0284180e5f1fe917bcb0 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Scott Baker31acc652016-06-23 15:47:56 -070017import hashlib
18import os
19import socket
20import sys
21import base64
22import time
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000023#import threading
24import subprocess
25import random
26import tempfile
27#from sshtunnel import SSHTunnelForwarder
Scott Baker31acc652016-06-23 15:47:56 -070028from django.db.models import F, Q
29from xos.config import Config
Murat Parlakisik638c65f2017-05-31 11:10:24 +030030from synchronizers.new_base.syncstep import SyncStep
31from synchronizers.new_base.ansible_helper import run_template_ssh
32from synchronizers.new_base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
33from modelaccessor import *
34#from core.models import Service, Slice, ModelLink
35#from services.monitoring.models import CeilometerService, MonitoringChannel
Scott Baker31acc652016-06-23 15:47:56 -070036from xos.logger import Logger, logging
37
38parentdir = os.path.join(os.path.dirname(__file__),"..")
39sys.path.insert(0,parentdir)
40
41logger = Logger(level=logging.INFO)
42
rdudyala996d70b2016-10-13 17:40:55 +000043#FIXME: Is this right approach?
44#Maintaining a global SSH tunnel database in order to handle tunnel deletions during the object delete
45ssh_tunnel_db = {}
46
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000047class SSHTunnel:
48
49 def __init__(self, localip, localport, key, remoteip, remote_port, jumpuser, jumphost):
50 self.key = key
51 self.remote_host = remoteip # Remote ip on remotehost
52 self.remote_port = remote_port
53 # Get a temporary file name
54 tmpfile = tempfile.NamedTemporaryFile()
55 tmpfile.close()
56 self.socket = tmpfile.name
57 self.local_port = localport
58 self.local_host = localip
59 self.jump_user = jumpuser # Remote user on remotehost
60 self.jump_host = jumphost # What host do we send traffic to
61 self.open = False
62
63 def start(self):
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +000064 logger.info("Creating SSH Tunnel: ssh -MfN -S %s -i %s -L %s:%s:%s;%s -o ExitOnForwardFailure=True %s@%s"%(self.socket, self.key, self.local_host, self.local_port, self.remote_host, self.remote_port,self.jump_user,self.jump_host))
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +000065 exit_status = subprocess.call(['ssh', '-MfN',
66 '-S', self.socket,
67 '-i', self.key,
68 '-L', '{}:{}:{}:{}'.format(self.local_host, self.local_port, self.remote_host, self.remote_port),
69 '-o', 'ExitOnForwardFailure=True',
70 self.jump_user + '@' + self.jump_host
71 ])
72 if exit_status != 0:
73 raise Exception('SSH tunnel failed with status: {}'.format(exit_status))
74 if self.send_control_command('check') != 0:
75 raise Exception('SSH tunnel failed to check')
76 self.open = True
77
78 def stop(self):
79 if self.open:
80 if self.send_control_command('exit') != 0:
81 raise Exception('SSH tunnel failed to exit')
82 self.open = False
83
84 def send_control_command(self, cmd):
85 return subprocess.check_call(['ssh', '-S', self.socket, '-O', cmd, '-l', self.jump_user, self.jump_host])
86
87 def __enter__(self):
88 self.start()
89 return self
90
91 def __exit__(self, type, value, traceback):
92 self.stop()
93
94
95#class SshTunnel(threading.Thread):
96# def __init__(self, localip, localport, remoteip, remoteport, proxy_ssh_key, jumpuser, jumphost):
97# threading.Thread.__init__(self)
98# self.localip = localip # Local ip to listen to
99# self.localport = localport # Local port to listen to
100# self.remoteip = remoteip # Remote ip on remotehost
101# self.remoteport = remoteport # Remote port on remotehost
102# self.proxy_ssh_key = proxy_ssh_key
103# self.jumpuser = jumpuser # Remote user on remotehost
104# self.jumphost = jumphost # What host do we send traffic to
105# self.daemon = True # So that thread will exit when
106# # main non-daemon thread finishes
107#
108# def run(self):
109# if subprocess.call([
110# 'ssh', '-N',
111# '-i', self.proxy_ssh_key,
112# '-L', self.localip + ':' + str(self.localport) + ':' + self.remoteip + ':' + str(self.remoteport),
113# jumpuser + '@' + jumphost ]):
114# raise Exception ('ssh tunnel setup failed')
115
Scott Baker31acc652016-06-23 15:47:56 -0700116class SyncMonitoringChannel(SyncInstanceUsingAnsible):
117 provides=[MonitoringChannel]
118 observes=MonitoringChannel
119 requested_interval=0
120 template_name = "sync_monitoringchannel.yaml"
Srikanth Vavilapallid84b7b72016-06-28 00:19:07 +0000121 service_key_name = "/opt/xos/synchronizers/monitoring/monitoring_channel_private_key"
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000122 watches = [ModelLink(Slice,via='slice')]
Scott Baker31acc652016-06-23 15:47:56 -0700123
124 def __init__(self, *args, **kwargs):
125 super(SyncMonitoringChannel, self).__init__(*args, **kwargs)
126
127 def fetch_pending(self, deleted):
128 if (not deleted):
129 objs = MonitoringChannel.get_tenant_objects().filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
130 else:
131 objs = MonitoringChannel.get_deleted_tenant_objects()
132
133 return objs
134
135 def get_extra_attributes(self, o):
136 # This is a place to include extra attributes. In the case of Monitoring Channel, we need to know
137 # 1) Allowed tenant ids
138 # 2) Ceilometer API service endpoint URL if running externally
139 # 3) Credentials to access Ceilometer API service
140
141 ceilometer_services = CeilometerService.get_service_objects().filter(id=o.provider_service.id)
142 if not ceilometer_services:
143 raise "No associated Ceilometer service"
144 ceilometer_service = ceilometer_services[0]
145 ceilometer_pub_sub_url = ceilometer_service.ceilometer_pub_sub_url
146 if not ceilometer_pub_sub_url:
147 ceilometer_pub_sub_url = ''
148 instance = self.get_instance(o)
149
150 try:
151 full_setup = Config().observer_full_setup
152 except:
153 full_setup = True
154
155 fields = {"unique_id": o.id,
156 "allowed_tenant_ids": o.tenant_list,
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000157 "auth_url":ceilometer_service.ceilometer_auth_url,
158 "admin_user":ceilometer_service.ceilometer_admin_user,
159 "admin_password":ceilometer_service.ceilometer_admin_password,
160 "admin_tenant":ceilometer_service.ceilometer_admin_tenant,
Scott Baker31acc652016-06-23 15:47:56 -0700161 "ceilometer_pub_sub_url": ceilometer_pub_sub_url,
162 "full_setup": full_setup}
163
164 return fields
165
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000166 def sync_fields(self, o, fields):
167 try:
168 super(SyncMonitoringChannel, self).sync_fields(o, fields)
169
170 #Check if ssh tunnel is needed
171 proxy_ssh = getattr(Config(), "observer_proxy_ssh", False)
172
rdudyala996d70b2016-10-13 17:40:55 +0000173 if proxy_ssh and (not o.ssh_proxy_tunnel):
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000174 proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
175 proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
176 jump_hostname = fields["hostname"]
177
178 #Get the tunnel detsination
Srikanth Vavilapalli71aa28d2017-01-31 00:43:13 +0000179 remote_host = o.nat_ip
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000180 remote_port = o.ceilometer_port
181 #FIXME: For now, trying to setup the tunnel on the local port same as the remote port
182 local_port = remote_port
183 local_ip = socket.gethostbyname(socket.gethostname())
184
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000185 tunnel = SSHTunnel(local_ip, local_port, proxy_ssh_key, remote_host, remote_port, proxy_ssh_user, jump_hostname)
186 tunnel.start()
rdudyala996d70b2016-10-13 17:40:55 +0000187 logger.info("SSH Tunnel created for Monitoring channel-%s at local port:%s"%(o.id,local_port))
188
189 #FIXME:Store the tunnel handle in global tunnel database
190 ssh_tunnel_db[o.id] = tunnel
Srikanth Vavilapallifd8c9b32016-08-15 22:59:28 +0000191
192 #Update the model with ssh tunnel info
193 o.ssh_proxy_tunnel = True
194 o.ssh_tunnel_ip = local_ip
195 o.ssh_tunnel_port = local_port
196
197 except Exception,error:
198 raise Exception(error)
199
Scott Baker31acc652016-06-23 15:47:56 -0700200 def run_playbook(self, o, fields):
201 #ansible_hash = hashlib.md5(repr(sorted(fields.items()))).hexdigest()
202 #quick_update = (o.last_ansible_hash == ansible_hash)
203
204 #if quick_update:
205 # logger.info("quick_update triggered; skipping ansible recipe")
206 #else:
rdudyala996d70b2016-10-13 17:40:55 +0000207 if ('delete' in fields) and (fields['delete']):
208 logger.info("Delete for Monitoring channel-%s is getting synchronized"%(o.id))
209 if o.id in ssh_tunnel_db:
210 tunnel = ssh_tunnel_db[o.id]
211 tunnel.stop()
212 logger.info("Deleted SSH Tunnel for Monitoring channel-%s at local port:%s"%(o.id,o.ssh_tunnel_port))
213 o.ssh_proxy_tunnel = False
214 del ssh_tunnel_db[o.id]
Scott Baker31acc652016-06-23 15:47:56 -0700215 super(SyncMonitoringChannel, self).run_playbook(o, fields)
216
217 #o.last_ansible_hash = ansible_hash
218
219 def map_delete_inputs(self, o):
220 fields = {"unique_id": o.id,
221 "delete": True}
222 return fields
Srikanth Vavilapallib2a50192017-02-03 18:25:59 +0000223
224 def handle_watched_object(self, o):
225 logger.info("handle_watched_object is invoked for object %s" % (str(o)),extra=o.tologdict())
226 if (type(o) is Slice):
227 self.handle_slice_watch_notification(o)
228 pass
229
230 def handle_slice_watch_notification(self, sliceobj):
231 logger.info("handle_slice_watch_notification: A slice %s is created or updated or deleted" % (sliceobj))
232 for obj in MonitoringChannel.get_tenant_objects().all():
233 #Save the monitoring channel object to reflect the newly updated slice
234 obj.save()
235 pass