blob: d4844e8082fa50b0ddde48e9ec9b32f598ff0b39 [file] [log] [blame]
jcnelson266113a2014-07-16 15:50:27 -04001#!/usr/bin/python
2
3"""
4Define some common methods for the Syndicate observer.
5"""
6import os
7import sys
8import random
9import json
10import time
11import requests
12import traceback
13import base64
14import BaseHTTPServer
15import setproctitle
16import threading
17import urllib
18
19from Crypto.Hash import SHA256 as HashAlg
20from Crypto.PublicKey import RSA as CryptoKey
21from Crypto import Random
22from Crypto.Signature import PKCS1_PSS as CryptoSigner
23
24import logging
25from logging import Logger
26logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
27logger = logging.getLogger()
28logger.setLevel( logging.INFO )
29
30# get config package
31import syndicatelib_config.config as CONFIG
32
33# get the Syndicate modules
34import syndicate
35
36import syndicate.client.bin.syntool as syntool
37import syndicate.client.common.msconfig as msconfig
38import syndicate.client.common.api as api
39import syndicate.util.storage as syndicate_storage
40import syndicate.util.watchdog as syndicate_watchdog
41import syndicate.util.daemonize as syndicate_daemon
42import syndicate.util.crypto as syndicate_crypto
43import syndicate.util.provisioning as syndicate_provisioning
44import syndicate.syndicate as c_syndicate
45
46# for testing
47TESTING = False
48class FakeObject(object):
49 def __init__(self):
50 pass
51
52if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
53 sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
54else:
55 logger.warning("No OPENCLOUD_PYTHONPATH set. Assuming Syndicate models are in your PYTHONPATH")
56
57try:
58 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
59
60 # get our models
61 import syndicate_storage.models as models
62
63 # get OpenCloud models
64 from core.models import Slice,Sliver
65
66 from django.core.exceptions import ObjectDoesNotExist
67 from django.db import IntegrityError
68
69except ImportError, ie:
70 logger.warning("Failed to import models; some tests will not work")
71
72 # create a fake "models" package that has exactly the members we need for testing.
73 models = FakeObject()
74 models.Volume = FakeObject()
75 models.Volume.CAP_READ_DATA = 1
76 models.Volume.CAP_WRITE_DATA = 2
77 models.Volume.CAP_HOST_DATA = 4
78
79 TESTING = True
80
81
82#-------------------------------
83class SyndicateObserverError( Exception ):
84 pass
85
86#-------------------------------
87def get_config():
88 """
89 Return the imported config
90 """
91 return CONFIG
92
93
94#-------------------------------
95def make_openid_url( email ):
96 """
97 Generate an OpenID identity URL from an email address.
98 """
99 return os.path.join( CONFIG.SYNDICATE_OPENID_TRUSTROOT, "id", email )
100
101
102#-------------------------------
103def connect_syndicate( username=CONFIG.SYNDICATE_OPENCLOUD_USER, password=CONFIG.SYNDICATE_OPENCLOUD_PASSWORD, user_pkey_pem=CONFIG.SYNDICATE_OPENCLOUD_PKEY ):
104 """
105 Connect to the OpenCloud Syndicate SMI, using the OpenCloud user credentials.
106 """
107 debug = True
108 if hasattr(CONFIG, "DEBUG"):
109 debug = CONFIG.DEBUG
110
111 client = syntool.Client( username, CONFIG.SYNDICATE_SMI_URL,
112 password=password,
113 user_pkey_pem=user_pkey_pem,
114 debug=debug )
115
116 return client
117
118
119#-------------------------------
120def opencloud_caps_to_syndicate_caps( cap_read, cap_write, cap_host ):
121 """
122 Convert OpenCloud capability bits from the UI into Syndicate's capability bits.
123 """
124 syn_caps = 0
125
126 if cap_read:
127 syn_caps |= (msconfig.GATEWAY_CAP_READ_DATA | msconfig.GATEWAY_CAP_READ_METADATA)
128 if cap_write:
129 syn_caps |= (msconfig.GATEWAY_CAP_WRITE_DATA | msconfig.GATEWAY_CAP_WRITE_METADATA)
130 if cap_host:
131 syn_caps |= (msconfig.GATEWAY_CAP_COORDINATE)
132
133 return syn_caps
134
135#-------------------------------
136def ensure_user_exists( user_email, **user_kw ):
137 """
138 Given an OpenCloud user, ensure that the corresponding
139 Syndicate user exists on the MS. This method does NOT
140 create any OpenCloud-specific data.
141
142 Return the (created, user), where created==True if the user
143 was created and created==False if the user was read.
144 Raise an exception on error.
145 """
146
147 client = connect_syndicate()
148 user_openid_url = make_openid_url( user_email )
149
150 return syndicate_provisioning.ensure_user_exists( client, user_email, user_openid_url, **user_kw )
151
152
153#-------------------------------
154def ensure_user_absent( user_email ):
155 """
156 Ensure that a given OpenCloud user's associated Syndicate user record
157 has been deleted. This method does NOT delete any OpenCloud-specific data.
158
159 Returns True on success
160 Raises an exception on error
161 """
162
163 client = connect_syndicate()
164
165 return client.delete_user( user_email )
166
167
168#-------------------------------
169def make_volume_principal_id( user_email, volume_name ):
170 """
171 Create a principal id for a Volume owner.
172 """
173
174 volume_name_safe = urllib.quote( volume_name )
175
176 return "volume_%s.%s" % (volume_name_safe, user_email)
177
178
179#-------------------------------
180def make_slice_principal_id( user_email, slice_name ):
181 """
182 Create a principal id for a slice owner.
183 """
184
185 slice_name_safe = urllib.quote( slice_name )
186
187 return "slice_%s.%s" % (slice_name, user_email)
188
189
190#-------------------------------
191def ensure_principal_exists( user_email, observer_secret, **user_kw ):
192 """
193 Ensure that a Syndicate user exists, as well as its OpenCloud-specific data.
194
195 Return (True, (None OR user)) on success. Returns a user if the user was created.
196 Return (False, None) on error
197 """
198
199 try:
200 created, new_user = ensure_user_exists( user_email, **user_kw )
201 except Exception, e:
202 traceback.print_exc()
203 logger.error("Failed to ensure user '%s' exists" % user_email )
204 return (False, None)
205
206 # if we created a new user, then save its (sealed) credentials to the Django DB
207 if created:
208 try:
209 rc = put_principal_data( user_email, observer_secret, new_user['signing_public_key'], new_user['signing_private_key'] )
210 assert rc == True, "Failed to save SyndicatePrincipal"
211 except Exception, e:
212 traceback.print_exc()
213 logger.error("Failed to save private key for principal %s" % (user_email))
214 return (False, None)
215
216 return (True, new_user)
217
218
219
220#-------------------------------
221def ensure_principal_absent( user_email ):
222 """
223 Ensure that a Syndicate user does not exists, and remove the OpenCloud-specific data.
224
225 Return True on success.
226 """
227
228 ensure_user_absent( user_email )
229 delete_principal_data( user_email )
230 return True
231
232#-------------------------------
233def ensure_volume_exists( user_email, opencloud_volume, user=None ):
234 """
235 Given the email address of a user, ensure that the given
236 Volume exists and is owned by that user.
237 Do not try to ensure that the user exists.
238
239 Return the Volume if we created it, or return None if we did not.
240 Raise an exception on error.
241 """
242 client = connect_syndicate()
243
244 try:
245 volume = client.read_volume( opencloud_volume.name )
246 except Exception, e:
247 # transport error
248 logger.exception(e)
249 raise e
250
251 if volume is None:
252 # the volume does not exist....try to create it
253 vol_name = opencloud_volume.name
254 vol_blocksize = opencloud_volume.blocksize
255 vol_description = opencloud_volume.description
256 vol_private = opencloud_volume.private
257 vol_archive = opencloud_volume.archive
258 vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
259
260 try:
261 vol_info = client.create_volume( user_email, vol_name, vol_description, vol_blocksize,
262 private=vol_private,
263 archive=vol_archive,
264 active=True,
265 default_gateway_caps=vol_default_gateway_caps,
266 store_private_key=False,
267 metadata_private_key="MAKE_METADATA_KEY" )
268
269 except Exception, e:
270 # transport error
271 logger.exception(e)
272 raise e
273
274 else:
275 # successfully created the volume!
276 return vol_info
277
278 else:
279
280 # volume already exists. Verify its owned by this user.
281 if user is None:
282 try:
283 user = client.read_user( volume['owner_id'] )
284 except Exception, e:
285 # transport error, or user doesn't exist (either is unacceptable)
286 logger.exception(e)
287 raise e
288
289 if user is None or user['email'] != user_email:
290 raise Exception("Volume '%s' already exists, but is NOT owned by '%s'" % (opencloud_volume.name, user_email) )
291
292 # we're good!
293 return None
294
295
296#-------------------------------
297def ensure_volume_absent( volume_name ):
298 """
299 Given an OpenCloud volume, ensure that the corresponding Syndicate
300 Volume does not exist.
301 """
302
303 client = connect_syndicate()
304
305 # this is idempotent, and returns True even if the Volume doesn't exist
306 return client.delete_volume( volume_name )
307
308
309#-------------------------------
310def update_volume( opencloud_volume ):
311 """
312 Update a Syndicate Volume from an OpenCloud Volume model.
313 Fails if the Volume does not exist in Syndicate.
314 """
315
316 client = connect_syndicate()
317
318 vol_name = opencloud_volume.name
319 vol_description = opencloud_volume.description
320 vol_private = opencloud_volume.private
321 vol_archive = opencloud_volume.archive
322 vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
323
324 try:
325 rc = client.update_volume( vol_name,
326 description=vol_description,
327 private=vol_private,
328 archive=vol_archive,
329 default_gateway_caps=vol_default_gateway_caps )
330
331 if not rc:
332 raise Exception("update_volume(%s) failed!" % vol_name )
333
334 except Exception, e:
335 # transort or method error
336 logger.exception(e)
337 return False
338
339 else:
340 return True
341
342
343#-------------------------------
344def ensure_volume_access_right_exists( user_email, volume_name, caps, allowed_gateways=[msconfig.GATEWAY_TYPE_UG] ):
345 """
346 Ensure that a particular user has particular access to a particular volume.
347 Do not try to ensure that the user or volume exist, however!
348 """
349 client = connect_syndicate()
350 return syndicate_provisioning.ensure_volume_access_right_exists( client, user_email, volume_name, caps, allowed_gateways )
351
352#-------------------------------
353def ensure_volume_access_right_absent( user_email, volume_name ):
354 """
355 Ensure that acess to a particular volume is revoked.
356 """
357 client = connect_syndicate()
358 return syndicate_provisioning.ensure_volume_access_right_absent( client, user_email, volume_name )
359
360
361#-------------------------------
362def setup_volume_access( user_email, volume_name, caps, RG_port, slice_secret, RG_closure=None ):
363 """
364 Set up the Volume to allow the slice to provision UGs in it, and to fire up RGs.
365 * create the Volume Access Right for the user, so (s)he can create Gateways.
366 * provision a single Replica Gateway, serving on localhost.
367 """
368 client = connect_syndicate()
369
370 try:
371 rc = ensure_volume_access_right_exists( user_email, volume_name, caps )
372 assert rc is True, "Failed to create access right for %s in %s" % (user_email, volume_name)
373
374 except Exception, e:
375 logger.exception(e)
376 return False
377
378 RG_name = syndicate_provisioning.make_gateway_name( "OpenCloud", "RG", volume_name, "localhost" )
379 RG_key_password = syndicate_provisioning.make_gateway_private_key_password( RG_name, slice_secret )
380
381 try:
382 rc = syndicate_provisioning.ensure_RG_exists( client, user_email, volume_name, RG_name, "localhost", RG_port, RG_key_password, closure=RG_closure )
383 except Exception, e:
384 logger.exception(e)
385 return False
386
387 return True
388
389
390#-------------------------------
391def teardown_volume_access( user_email, volume_name ):
392 """
393 Revoke access to a Volume for a User.
394 * remove the user's Volume Access Right
395 * remove the use'rs gateways
396 """
397 client = connect_syndicate()
398
399 # block the user from creating more gateways, and delete the gateways
400 try:
401 rc = client.remove_user_from_volume( user_email, volume_name )
402 assert rc is True, "Failed to remove access right for %s in %s" % (user_email, volume_name)
403
404 except Exception, e:
405 logger.exception(e)
406 return False
407
408 return True
409
410
411#-------------------------------
412def create_sealed_and_signed_blob( private_key_pem, secret, data ):
413 """
414 Create a sealed and signed message.
415 """
416
417 # seal it with the password
418 logger.info("Sealing credential data")
419
420 rc, sealed_data = c_syndicate.password_seal( data, secret )
421 if rc != 0:
422 logger.error("Failed to seal data with the secret, rc = %s" % rc)
423 return None
424
425 msg = syndicate_crypto.sign_and_serialize_json( private_key_pem, sealed_data )
426 if msg is None:
427 logger.error("Failed to sign credential")
428 return None
429
430 return msg
431
432
433#-------------------------------
434def verify_and_unseal_blob( public_key_pem, secret, blob_data ):
435 """
436 verify and unseal a serialized string of JSON
437 """
438
439 # verify it
440 rc, sealed_data = syndicate_crypto.verify_and_parse_json( public_key_pem, blob_data )
441 if rc != 0:
442 logger.error("Failed to verify and parse blob, rc = %s" % rc)
443 return None
444
445 logger.info("Unsealing credential data")
446
447 rc, data = c_syndicate.password_unseal( sealed_data, secret )
448 if rc != 0:
449 logger.error("Failed to unseal blob, rc = %s" % rc )
450 return None
451
452 return data
453
454
455#-------------------------------
456def create_volume_list_blob( private_key_pem, slice_secret, volume_list ):
457 """
458 Create a sealed volume list, signed with the private key.
459 """
460 list_data = {
461 "volumes": volume_list
462 }
463
464 list_data_str = json.dumps( list_data )
465
466 msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, list_data_str )
467 if msg is None:
468 logger.error("Failed to seal volume list")
469 return None
470
471 return msg
472
473
474#-------------------------------
475def create_slice_credential_blob( private_key_pem, slice_name, slice_secret, syndicate_url, volume_name, volume_owner, UG_port, user_pkey_pem ):
476 """
477 Create a sealed, signed, encoded slice credentials blob.
478 """
479
480 # create and serialize the data
481 cred_data = {
482 "syndicate_url": syndicate_url,
483 "volume_name": volume_name,
484 "volume_owner": volume_owner,
485 "slice_name": slice_name,
486 "slice_UG_port": UG_port,
487 "principal_pkey_pem": user_pkey_pem,
488 }
489
490 cred_data_str = json.dumps( cred_data )
491
492 msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, cred_data_str )
493 if msg is None:
494 logger.error("Failed to seal volume list")
495 return None
496
497 return msg
498
499
500#-------------------------------
501def put_principal_data( user_email, observer_secret, public_key_pem, private_key_pem ):
502 """
503 Seal and store the principal's private key into the database, in a SyndicatePrincipal object,
504 so the sliver-side Syndicate daemon syndicated.py can get them later.
505 Overwrite an existing principal if one exists.
506 """
507
508 sealed_private_key = create_sealed_and_signed_blob( private_key_pem, observer_secret, private_key_pem )
509 if sealed_private_key is None:
510 return False
511
512 try:
513 sp = models.SyndicatePrincipal( sealed_private_key=sealed_private_key, public_key_pem=public_key_pem, principal_id=user_email )
514 sp.save()
515 except IntegrityError, e:
516 logger.error("WARN: overwriting existing principal %s" % user_email)
517 sp.delete()
518 sp.save()
519
520 return True
521
522
523#-------------------------------
524def delete_principal_data( user_email ):
525 """
526 Delete an OpenCloud SyndicatePrincipal object.
527 """
528
529 sp = get_principal_data( user_email )
530 if sp is not None:
531 sp.delete()
532
533 return True
534
535
536#-------------------------------
537def get_principal_data( user_email ):
538 """
539 Get a SyndicatePrincipal record from the database
540 """
541
542 try:
543 sp = models.SyndicatePrincipal.objects.get( principal_id=user_email )
544 return sp
545 except ObjectDoesNotExist:
546 logger.error("No SyndicatePrincipal record for %s" % user_email)
547 return None
548
549
550
551#-------------------------------
552def get_principal_pkey( user_email, observer_secret ):
553 """
554 Fetch and unseal the private key of a SyndicatePrincipal.
555 """
556
557 sp = get_principal_data( user_email )
558 if sp is None:
559 logger.error("Failed to find private key for principal %s" % user_email )
560 return None
561
562 public_key_pem = sp.public_key_pem
563 sealed_private_key_pem = sp.sealed_private_key
564
565 # unseal
566 private_key_pem = verify_and_unseal_blob(public_key_pem, observer_secret, sealed_private_key_pem)
567 if private_key_pem is None:
568 logger.error("Failed to unseal private key")
569
570 return private_key_pem
571
572
573#-------------------------------
574def get_private_key_pem( pkey_path ):
575 """
576 Get a private key from storage, PEM-encoded.
577 """
578
579 # get the OpenCloud private key
580 observer_pkey = syndicate_storage.read_private_key( pkey_path )
581 if observer_pkey is None:
582 logger.error("Failed to load Observer private key")
583 return None
584
585 observer_pkey_pem = observer_pkey.exportKey()
586
587 return observer_pkey_pem
588
589
590#-------------------------------
591def encrypt_slice_secret( observer_pkey_pem, slice_secret ):
592 """
593 Encrypt and serialize the slice secret with the Observer private key
594 """
595
596 # get the public key
597 try:
598 observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
599 except Exception, e:
600 logger.exception(e)
601 logger.error("Failed to derive public key from private key")
602 return None
603
604 # encrypt the data
605 rc, sealed_slice_secret = c_syndicate.encrypt_data( observer_pkey_pem, observer_pubkey_pem, slice_secret )
606
607 if rc != 0:
608 logger.error("Failed to encrypt slice secret")
609 return None
610
611 sealed_slice_secret_b64 = base64.b64encode( sealed_slice_secret )
612
613 return sealed_slice_secret_b64
614
615
616#-------------------------------
617def decrypt_slice_secret( observer_pkey_pem, sealed_slice_secret_b64 ):
618 """
619 Unserialize and decrypt a slice secret
620 """
621
622 # get the public key
623 try:
624 observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
625 except Exception, e:
626 logger.exception(e)
627 logger.error("Failed to derive public key from private key")
628 return None
629
630 sealed_slice_secret = base64.b64decode( sealed_slice_secret_b64 )
631
632 # decrypt it
633 rc, slice_secret = c_syndicate.decrypt_data( observer_pubkey_pem, observer_pkey_pem, sealed_slice_secret )
634
635 if rc != 0:
636 logger.error("Failed to decrypt '%s', rc = %d" % (sealed_slice_secret_b64, rc))
637 return None
638
639 return slice_secret
640
641
642#--------------------------------
643def get_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
644 """
645 Get the shared secret for a slice.
646 """
647
648 ss = None
649
650 # get the sealed slice secret from Django
651 try:
652 if slice_fk is not None:
653 ss = models.SliceSecret.objects.get( slice_id=slice_fk )
654 else:
655 ss = models.SliceSecret.objects.get( slice_id__name=slice_name )
656 except ObjectDoesNotExist, e:
657 logger.error("Failed to load slice secret for (%s, %s)" % (slice_fk, slice_name) )
658 return None
659
660 return ss.secret
661
662
663#-------------------------------
664def put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=None, opencloud_slice=None ):
665 """
666 Put the shared secret for a slice, encrypting it first.
667 """
668
669 ss = None
670
671 if opencloud_slice is None:
672 # look up the slice
673 try:
674 if slice_fk is None:
675 opencloud_slice = models.Slice.objects.get( name=slice_name )
676 else:
677 opencloud_slice = models.Slice.objects.get( id=slice_fk.id )
678 except Exception, e:
679 logger.exception(e)
680 logger.error("Failed to load slice (%s, %s)" % (slice_fk, slice_name) )
681 return False
682
683 ss = models.SliceSecret( slice_id=opencloud_slice, secret=slice_secret )
684
685 ss.save()
686
687 return True
688
689
690#-------------------------------
691def get_or_create_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
692 """
693 Get a slice secret if it already exists, or generate a slice secret if one does not.
694 """
695
696 slice_secret = get_slice_secret( observer_pkey_pem, slice_name, slice_fk=slice_fk )
697 if slice_secret is None or len(slice_secret) == 0:
698
699 # generate a slice secret
700 slice_secret = "".join( random.sample("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 32) )
701
702 # store it
703 rc = put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=slice_fk )
704
705 if not rc:
706 raise SyndicateObserverError("Failed to create slice secret for (%s, %s)" % (slice_fk, slice_name))
707
708 return slice_secret
709
710
711#-------------------------------
712def generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
713 """
714 Generate and return the set of credentials to be sent off to the slice VMs.
715 exisitng_user is a Syndicate user, as a dictionary.
716
717 Return None on failure
718 """
719
720 # get the user's private key
721 logger.info("Obtaining private key for %s" % user_email)
722
723 # it might be in the existing_user...
724 user_pkey_pem = None
725 if existing_user is not None:
726 user_pkey_pem = existing_user.get('signing_private_key', None)
727
728 # no luck?
729 if user_pkey_pem is None:
730 try:
731 # get it from Django DB
732 user_pkey_pem = get_principal_pkey( user_email, observer_secret )
733 assert user_pkey_pem is not None, "No private key for %s" % user_email
734
735 except:
736 traceback.print_exc()
737 logger.error("Failed to get private key; cannot generate credentials for %s in %s" % (user_email, volume_name) )
738 return None
739
740 # generate a credetials blob
741 logger.info("Generating credentials for %s's slice" % (user_email))
742 try:
743 creds = create_slice_credential_blob( observer_pkey_pem, slice_name, slice_secret, syndicate_url, volume_name, user_email, UG_port, user_pkey_pem )
744 assert creds is not None, "Failed to create credentials for %s" % user_email
745
746 except:
747 traceback.print_exc()
748 logger.error("Failed to generate credentials for %s in %s" % (user_email, volume_name))
749 return None
750
751 return creds
752
753
754#-------------------------------
755def save_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
756 """
757 Create and save a credentials blob to a VolumeSlice.
758 Return the creds on success.
759 Return None on failure
760 """
761
762 creds = generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=existing_user )
763 ret = None
764
765 if creds is not None:
766 # save it
767 vs = get_volumeslice( volume_name, slice_name )
768
769 if vs is not None:
770 vs.credentials_blob = creds
771 vs.save()
772
773 # success!
774 ret = creds
775 else:
776 logger.error("Failed to look up VolumeSlice(%s, %s)" % (volume_name, slice_name))
777
778 else:
779 logger.error("Failed to generate credentials for %s, %s" % (volume_name, slice_name))
780
781 return ret
782
783
784#-------------------------------
785def get_volumeslice_volume_names( slice_name ):
786 """
787 Get the list of Volume names from the datastore.
788 """
789 try:
790 all_vs = models.VolumeSlice.objects.filter( slice_id__name = slice_name )
791 volume_names = []
792 for vs in all_vs:
793 volume_names.append( vs.volume_id.name )
794
795 return volume_names
796 except Exception, e:
797 logger.exception(e)
798 logger.error("Failed to query datastore for volumes mounted in %s" % slice_name)
799 return None
800
801
802#-------------------------------
803def get_volumeslice( volume_name, slice_name ):
804 """
805 Get a volumeslice record from the datastore.
806 """
807 try:
808 vs = models.VolumeSlice.objects.get( volume_id__name = volume_name, slice_id__name = slice_name )
809 return vs
810 except Exception, e:
811 logger.exception(e)
812 logger.error("Failed to query datastore for volumes (mounted in %s)" % (slice_name if (slice_name is not None or len(slice_name) > 0) else "UNKNOWN"))
813 return None
814
815
816#-------------------------------
817def do_push( sliver_hosts, portnum, payload ):
818 """
819 Push a payload to a list of slivers.
820 NOTE: this has to be done in one go, since we can't import grequests
821 into the global namespace (without wrecking havoc on the credential server),
822 but it has to stick around for the push to work.
823 """
824
825 global TESTING, CONFIG
826
827 from gevent import monkey
828
829 if TESTING:
830 monkey.patch_all()
831
832 else:
833 # make gevents runnabale from multiple threads (or Django will complain)
834 monkey.patch_all(socket=True, dns=True, time=True, select=True, thread=False, os=True, ssl=True, httplib=False, aggressive=True)
835
836 import grequests
837
838 # fan-out
839 requests = []
840 for sh in sliver_hosts:
841 rs = grequests.post( "http://" + sh + ":" + str(portnum), data={"observer_message": payload}, timeout=getattr(CONFIG, "SYNDICATE_HTTP_PUSH_TIMEOUT", 60) )
842 requests.append( rs )
843
844 # fan-in
845 responses = grequests.map( requests )
846
847 assert len(responses) == len(requests), "grequests error: len(responses) != len(requests)"
848
849 for i in xrange(0,len(requests)):
850 resp = responses[i]
851 req = requests[i]
852
853 if resp is None:
854 logger.error("Failed to connect to %s" % (req.url))
855 continue
856
857 # verify they all worked
858 if resp.status_code != 200:
859 logger.error("Failed to POST to %s, status code = %s" % (resp.url, resp.status_code))
860 continue
861
862 return True
863
864
865#-------------------------------
866def get_slice_hostnames( slice_name ):
867 """
868 Query the Django DB and get the list of hosts running in a slice.
869 """
870
871 openstack_slice = Slice.objects.get( name=slice_name )
872 if openstack_slice is None:
873 logger.error("No such slice '%s'" % slice_name)
874 return None
875
876 hostnames = [s.node.name for s in openstack_slice.slivers.all()]
877
878 return hostnames
879
880
881#-------------------------------
882def push_credentials_to_slice( slice_name, payload ):
883 """
884 Push a credentials payload to the VMs in a slice.
885 """
886 hostnames = get_slice_hostnames( slice_name )
887 return do_push( hostnames, CONFIG.SYNDICATE_SLIVER_PORT, payload )
888
889
890#-------------------------------
891class CredentialServerHandler( BaseHTTPServer.BaseHTTPRequestHandler ):
892 """
893 HTTP server handler that allows syndicated.py instances to poll
894 for volume state.
895
896 NOTE: this is a fall-back mechanism. The observer should push new
897 volume state to the slices' slivers. However, if that fails, the
898 slivers are configured to poll for volume state periodically. This
899 server allows them to do just that.
900
901 Responses:
902 GET /<slicename> -- Reply with the signed sealed list of volume names, encrypted by the slice secret
903 GET /<slicename>/<volumename> -- Reply with the signed sealed volume access credentials, encrypted by the slice secret
904
905 !!! TEMPORARY !!!
906 GET /<slicename>/SYNDICATE_SLICE_SECRET -- Reply with the slice secret (TEMPORARY)
907
908
909 NOTE: We want to limit who can learn which Volumes a slice can access, so we'll seal its slivers'
910 credentials with the SliceSecret secret. The slivers (which have the slice-wide secret) can then decrypt it.
911 However, sealing the listing is a time-consuming process (on the order of 10s), so we only want
912 to do it when we have to. Since *anyone* can ask for the ciphertext of the volume list,
913 we will cache the list ciphertext for each slice for a long-ish amount of time, so we don't
914 accidentally DDoS this server. This necessarily means that the sliver might see a stale
915 volume listing, but that's okay, since the Observer is eventually consistent anyway.
916 """
917
918 cached_volumes_json = {} # map slice_name --> (volume name, timeout)
919 cached_volumes_json_lock = threading.Lock()
920
921 CACHED_VOLUMES_JSON_LIFETIME = 3600 # one hour
922
923 SLICE_SECRET_NAME = "SYNDICATE_SLICE_SECRET"
924
925 def parse_request_path( self, path ):
926 """
927 Parse the URL path into a slice name and (possibly) a volume name or SLICE_SECRET_NAME
928 """
929 path_parts = path.strip("/").split("/")
930
931 if len(path_parts) == 0:
932 # invalid
933 return (None, None)
934
935 if len(path_parts) > 2:
936 # invalid
937 return (None, None)
938
939 slice_name = path_parts[0]
940 if len(slice_name) == 0:
941 # empty string is invalid
942 return (None, None)
943
944 volume_name = None
945
946 if len(path_parts) > 1:
947 volume_name = path_parts[1]
948
949 return slice_name, volume_name
950
951
952 def reply_data( self, data, datatype="application/json" ):
953 """
954 Give back a 200 response with data.
955 """
956 self.send_response( 200 )
957 self.send_header( "Content-Type", datatype )
958 self.send_header( "Content-Length", len(data) )
959 self.end_headers()
960
961 self.wfile.write( data )
962 return
963
964
965 def get_volumes_message( self, private_key_pem, observer_secret, slice_name ):
966 """
967 Get the json-ized list of volumes this slice is attached to.
968 Check the cache, evict stale data if necessary, and on miss,
969 regenerate the slice volume list.
970 """
971
972 # block the cache.
973 # NOTE: don't release the lock until we've generated credentials.
974 # Chances are, there's a thundering herd of slivers coming online.
975 # Block them all until we've generated their slice's credentials,
976 # and then serve them the cached one.
977
978 self.cached_volumes_json_lock.acquire()
979
980 ret = None
981 volume_list_json, cache_timeout = self.cached_volumes_json.get( slice_name, (None, None) )
982
983 if (cache_timeout is not None) and cache_timeout < time.time():
984 # expired
985 volume_list_json = None
986
987 if volume_list_json is None:
988 # generate a new list and cache it.
989
990 volume_names = get_volumeslice_volume_names( slice_name )
991 if volume_names is None:
992 # nothing to do...
993 ret = None
994
995 else:
996 # get the slice secret
997 slice_secret = get_slice_secret( private_key_pem, slice_name )
998
999 if slice_secret is None:
1000 # no such slice
1001 logger.error("No slice secret for %s" % slice_name)
1002 ret = None
1003
1004 else:
1005 # seal and sign
1006 ret = create_volume_list_blob( private_key_pem, slice_secret, volume_names )
1007
1008 # cache this
1009 if ret is not None:
1010 self.cached_volumes_json[ slice_name ] = (ret, time.time() + self.CACHED_VOLUMES_JSON_LIFETIME )
1011
1012 else:
1013 # hit the cache
1014 ret = volume_list_json
1015
1016 self.cached_volumes_json_lock.release()
1017
1018 return ret
1019
1020
1021 def do_GET( self ):
1022 """
1023 Handle one GET
1024 """
1025 slice_name, volume_name = self.parse_request_path( self.path )
1026
1027 # valid request?
1028 if volume_name is None and slice_name is None:
1029 self.send_error( 400 )
1030
1031 # slice secret request?
1032 elif volume_name == self.SLICE_SECRET_NAME and slice_name is not None:
1033
1034 # get the slice secret
1035 ret = get_slice_secret( self.server.private_key_pem, slice_name )
1036
1037 if ret is not None:
1038 self.reply_data( ret )
1039 return
1040 else:
1041 self.send_error( 404 )
1042
1043 # volume list request?
1044 elif volume_name is None and slice_name is not None:
1045
1046 # get the list of volumes for this slice
1047 ret = self.get_volumes_message( self.server.private_key_pem, self.server.observer_secret, slice_name )
1048
1049 if ret is not None:
1050 self.reply_data( ret )
1051 return
1052 else:
1053 self.send_error( 404 )
1054
1055 # volume credential request?
1056 elif volume_name is not None and slice_name is not None:
1057
1058 # get the VolumeSlice record
1059 vs = get_volumeslice( volume_name, slice_name )
1060 if vs is None:
1061 # not found
1062 self.send_error( 404 )
1063 return
1064
1065 else:
1066 ret = vs.credentials_blob
1067 if ret is not None:
1068 self.reply_data( vs.credentials_blob )
1069 else:
1070 # not generated???
1071 print ""
1072 print vs
1073 print ""
1074 self.send_error( 503 )
1075 return
1076
1077 else:
1078 # shouldn't get here...
1079 self.send_error( 500 )
1080 return
1081
1082
1083#-------------------------------
1084class CredentialServer( BaseHTTPServer.HTTPServer ):
1085
1086 def __init__(self, private_key_pem, observer_secret, server, req_handler ):
1087 self.private_key_pem = private_key_pem
1088 self.observer_secret = observer_secret
1089 BaseHTTPServer.HTTPServer.__init__( self, server, req_handler )
1090
1091
1092#-------------------------------
1093def credential_server_spawn( old_exit_status ):
1094 """
1095 Start our credential server (i.e. in a separate process, started by the watchdog)
1096 """
1097
1098 setproctitle.setproctitle( "syndicate-credential-server" )
1099
1100 private_key = syndicate_storage.read_private_key( CONFIG.SYNDICATE_PRIVATE_KEY )
1101 if private_key is None:
1102 # exit code 255 will be ignored...
1103 logger.error("Cannot load private key. Exiting...")
1104 sys.exit(255)
1105
1106 logger.info("Starting Syndicate Observer credential server on port %s" % CONFIG.SYNDICATE_HTTP_PORT)
1107
1108 srv = CredentialServer( private_key.exportKey(), CONFIG.SYNDICATE_OPENCLOUD_SECRET, ('', CONFIG.SYNDICATE_HTTP_PORT), CredentialServerHandler)
1109 srv.serve_forever()
1110
1111
1112#-------------------------------
1113def ensure_credential_server_running( foreground=False, run_once=False ):
1114 """
1115 Instantiate our credential server and keep it running.
1116 """
1117
1118 # is the watchdog running?
1119 pids = syndicate_watchdog.find_by_attrs( "syndicate-credential-server-watchdog", {} )
1120 if len(pids) > 0:
1121 # it's running
1122 return True
1123
1124 if foreground:
1125 # run in foreground
1126
1127 if run_once:
1128 return credential_server_spawn( 0 )
1129
1130 else:
1131 return syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) )
1132
1133
1134 # not running, and not foregrounding. fork a new one
1135 try:
1136 watchdog_pid = os.fork()
1137 except OSError, oe:
1138 logger.error("Failed to fork, errno = %s" % oe.errno)
1139 return False
1140
1141 if watchdog_pid != 0:
1142
1143 # child--become watchdog
1144 setproctitle.setproctitle( "syndicate-credential-server-watchdog" )
1145
1146 if run_once:
1147 syndicate_daemon.daemonize( lambda: credential_server_spawn(0), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1148
1149 else:
1150 syndicate_daemon.daemonize( lambda: syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) ), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1151
1152
1153#-------------------------------
1154# Begin functional tests.
1155# Any method starting with ft_ is a functional test.
1156#-------------------------------
1157
1158#-------------------------------
1159def ft_syndicate_access():
1160 """
1161 Functional tests for ensuring objects exist and don't exist in Syndicate.
1162 """
1163
1164 fake_user = FakeObject()
1165 fake_user.email = "fakeuser@opencloud.us"
1166
1167 print "\nensure_user_exists(%s)\n" % fake_user.email
1168 ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1169
1170 print "\nensure_user_exists(%s)\n" % fake_user.email
1171 ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1172
1173 fake_volume = FakeObject()
1174 fake_volume.name = "fakevolume"
1175 fake_volume.description = "This is a fake volume, created for funtional testing"
1176 fake_volume.blocksize = 1024
1177 fake_volume.cap_read_data = True
1178 fake_volume.cap_write_data = True
1179 fake_volume.cap_host_data = False
1180 fake_volume.archive = False
1181 fake_volume.private = True
1182
1183 # test idempotency
1184 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1185 ensure_volume_exists( fake_user.email, fake_volume )
1186
1187 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1188 ensure_volume_exists( fake_user.email, fake_volume )
1189
1190 print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1191 ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1192
1193 print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1194 ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1195
1196 print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1197 ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1198
1199 print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1200 ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1201
1202 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1203 ensure_volume_absent( fake_volume.name )
1204
1205 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1206 ensure_volume_absent( fake_volume.name )
1207
1208 print "\nensure_user_absent(%s)\n" % fake_user.email
1209 ensure_user_absent( fake_user.email )
1210
1211 print "\nensure_user_absent(%s)\n" % fake_user.email
1212 ensure_user_absent( fake_user.email )
1213
1214
1215
1216
1217 print "\nensure_principal_exists(%s)\n" % fake_user.email
1218 ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1219
1220 print "\nensure_principal_exists(%s)\n" % fake_user.email
1221 ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1222
1223 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1224 ensure_volume_exists( fake_user.email, fake_volume )
1225
1226 print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1227 setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1228
1229 print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1230 setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1231
1232 print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1233 teardown_volume_access( fake_user.email, fake_volume.name )
1234
1235 print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1236 teardown_volume_access( fake_user.email, fake_volume.name )
1237
1238 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1239 ensure_volume_absent( fake_volume.name )
1240
1241 print "\nensure_principal_absent(%s)\n" % fake_user.email
1242 ensure_principal_absent( fake_user.email )
1243
1244
1245
1246#-------------------------------
1247def ft_volumeslice( slice_name ):
1248 """
1249 Functional tests for reading VolumeSlice information
1250 """
1251 print "slice: %s" % slice_name
1252
1253 volumes = get_volumeslice_volume_names( slice_name )
1254
1255 print "volumes mounted in slice %s:" % slice_name
1256 for v in volumes:
1257 print " %s:" % v
1258
1259 vs = get_volumeslice( v, slice_name )
1260
1261 print " %s" % dir(vs)
1262
1263
1264#-------------------------------
1265def ft_get_slice_hostnames( slice_name ):
1266 """
1267 Functional tests for getting slice hostnames
1268 """
1269
1270 print "Get slice hostnames for %s" % slice_name
1271
1272 hostnames = get_slice_hostnames( slice_name )
1273 import pprint
1274
1275 pp = pprint.PrettyPrinter()
1276
1277 pp.pprint( hostnames )
1278
1279
1280#-------------------------------
1281def ft_syndicate_principal():
1282 """
1283 Functional tests for creating, reading, and deleting SyndicatePrincipals.
1284 """
1285 print "generating key pair"
1286 pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1287
1288 user_email = "fakeuser@opencloud.us"
1289
1290 print "saving principal"
1291 put_principal_data( user_email, "asdf", pubkey_pem, privkey_pem )
1292
1293 print "fetching principal private key"
1294 saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1295
1296 assert saved_privkey_pem is not None, "Could not fetch saved private key"
1297 assert saved_privkey_pem == privkey_pem, "Saved private key does not match actual private key"
1298
1299 print "delete principal"
1300
1301 delete_principal_data( user_email )
1302
1303 print "make sure its deleted..."
1304
1305 saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1306
1307 assert saved_privkey_pem is None, "Principal key not deleted"
1308
1309
1310#-------------------------------
1311def ft_credential_server():
1312 """
1313 Functional test for the credential server
1314 """
1315 ensure_credential_server_running( run_once=True, foreground=True )
1316
1317
1318#-------------------------------
1319def ft_seal_and_unseal():
1320 """
1321 Functional test for sealing/unsealing data
1322 """
1323 print "generating key pair"
1324 pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1325
1326 sealed_buf = create_sealed_and_signed_blob( privkey_pem, "foo", "hello world")
1327 print "sealed data is:\n\n%s\n\n" % sealed_buf
1328
1329 buf = verify_and_unseal_blob( pubkey_pem, "foo", sealed_buf )
1330 print "unsealed data is: \n\n%s\n\n" % buf
1331
1332
1333# run functional tests
1334if __name__ == "__main__":
1335 sys.path.append("/opt/planetstack")
1336 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
1337
1338 if len(sys.argv) < 2:
1339 print "Usage: %s testname [args]" % sys.argv[0]
1340
1341 # call a method starting with ft_, and then pass the rest of argv as its arguments
1342 testname = sys.argv[1]
1343 ft_testname = "ft_%s" % testname
1344
1345 test_call = "%s(%s)" % (ft_testname, ",".join(sys.argv[2:]))
1346
1347 print "calling %s" % test_call
1348
1349 rc = eval( test_call )
1350
1351 print "result = %s" % rc
1352
1353