blob: 331e9258828f4cb0cb27ce52f8e98da670eea375 [file] [log] [blame]
#!/usr/bin/env python
"""
Define some common methods for the Syndicate observer.
"""
import os
import sys
import random
import json
import time
import requests
import traceback
import base64
import BaseHTTPServer
import setproctitle
import threading
import urllib
from Crypto.Hash import SHA256 as HashAlg
from Crypto.PublicKey import RSA as CryptoKey
from Crypto import Random
from Crypto.Signature import PKCS1_PSS as CryptoSigner
import logging
from logging import Logger
logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
logger = logging.getLogger()
logger.setLevel( logging.INFO )
# get config package
import syndicatelib_config.config as CONFIG
# get the Syndicate modules
import syndicate
import syndicate.client.bin.syntool as syntool
import syndicate.client.common.msconfig as msconfig
import syndicate.client.common.api as api
import syndicate.util.storage as syndicate_storage
import syndicate.util.watchdog as syndicate_watchdog
import syndicate.util.daemonize as syndicate_daemon
import syndicate.util.crypto as syndicate_crypto
import syndicate.util.provisioning as syndicate_provisioning
import syndicate.syndicate as c_syndicate
# for testing
TESTING = False
class FakeObject(object):
def __init__(self):
pass
if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
else:
logger.warning("No OPENCLOUD_PYTHONPATH set. Assuming Syndicate models are in your PYTHONPATH")
try:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
# get our models
import syndicate_storage.models as models
# get OpenCloud models
from core.models import Slice,Sliver
from django.core.exceptions import ObjectDoesNotExist
from django.db import IntegrityError
except ImportError, ie:
logger.warning("Failed to import models; some tests will not work")
# create a fake "models" package that has exactly the members we need for testing.
models = FakeObject()
models.Volume = FakeObject()
models.Volume.CAP_READ_DATA = 1
models.Volume.CAP_WRITE_DATA = 2
models.Volume.CAP_HOST_DATA = 4
TESTING = True
#-------------------------------
class SyndicateObserverError( Exception ):
pass
#-------------------------------
def get_config():
"""
Return the imported config
"""
return CONFIG
#-------------------------------
def make_openid_url( email ):
"""
Generate an OpenID identity URL from an email address.
"""
return os.path.join( CONFIG.SYNDICATE_OPENID_TRUSTROOT, "id", email )
#-------------------------------
def connect_syndicate( username=CONFIG.SYNDICATE_OPENCLOUD_USER, password=CONFIG.SYNDICATE_OPENCLOUD_PASSWORD, user_pkey_pem=CONFIG.SYNDICATE_OPENCLOUD_PKEY ):
"""
Connect to the OpenCloud Syndicate SMI, using the OpenCloud user credentials.
"""
debug = True
if hasattr(CONFIG, "DEBUG"):
debug = CONFIG.DEBUG
client = syntool.Client( username, CONFIG.SYNDICATE_SMI_URL,
password=password,
user_pkey_pem=user_pkey_pem,
debug=debug )
return client
#-------------------------------
def opencloud_caps_to_syndicate_caps( cap_read, cap_write, cap_host ):
"""
Convert OpenCloud capability bits from the UI into Syndicate's capability bits.
"""
syn_caps = 0
if cap_read:
syn_caps |= (msconfig.GATEWAY_CAP_READ_DATA | msconfig.GATEWAY_CAP_READ_METADATA)
if cap_write:
syn_caps |= (msconfig.GATEWAY_CAP_WRITE_DATA | msconfig.GATEWAY_CAP_WRITE_METADATA)
if cap_host:
syn_caps |= (msconfig.GATEWAY_CAP_COORDINATE)
return syn_caps
#-------------------------------
def ensure_user_exists( user_email, **user_kw ):
"""
Given an OpenCloud user, ensure that the corresponding
Syndicate user exists on the MS. This method does NOT
create any OpenCloud-specific data.
Return the (created, user), where created==True if the user
was created and created==False if the user was read.
Raise an exception on error.
"""
client = connect_syndicate()
user_openid_url = make_openid_url( user_email )
return syndicate_provisioning.ensure_user_exists( client, user_email, user_openid_url, **user_kw )
#-------------------------------
def ensure_user_absent( user_email ):
"""
Ensure that a given OpenCloud user's associated Syndicate user record
has been deleted. This method does NOT delete any OpenCloud-specific data.
Returns True on success
Raises an exception on error
"""
client = connect_syndicate()
return client.delete_user( user_email )
#-------------------------------
def make_volume_principal_id( user_email, volume_name ):
"""
Create a principal id for a Volume owner.
"""
volume_name_safe = urllib.quote( volume_name )
return "volume_%s.%s" % (volume_name_safe, user_email)
#-------------------------------
def make_slice_principal_id( user_email, slice_name ):
"""
Create a principal id for a slice owner.
"""
slice_name_safe = urllib.quote( slice_name )
return "slice_%s.%s" % (slice_name, user_email)
#-------------------------------
def ensure_principal_exists( user_email, observer_secret, **user_kw ):
"""
Ensure that a Syndicate user exists, as well as its OpenCloud-specific data.
Return (True, (None OR user)) on success. Returns a user if the user was created.
Return (False, None) on error
"""
try:
created, new_user = ensure_user_exists( user_email, **user_kw )
except Exception, e:
traceback.print_exc()
logger.error("Failed to ensure user '%s' exists" % user_email )
return (False, None)
# if we created a new user, then save its (sealed) credentials to the Django DB
if created:
try:
rc = put_principal_data( user_email, observer_secret, new_user['signing_public_key'], new_user['signing_private_key'] )
assert rc == True, "Failed to save SyndicatePrincipal"
except Exception, e:
traceback.print_exc()
logger.error("Failed to save private key for principal %s" % (user_email))
return (False, None)
return (True, new_user)
#-------------------------------
def ensure_principal_absent( user_email ):
"""
Ensure that a Syndicate user does not exists, and remove the OpenCloud-specific data.
Return True on success.
"""
ensure_user_absent( user_email )
delete_principal_data( user_email )
return True
#-------------------------------
def ensure_volume_exists( user_email, opencloud_volume, user=None ):
"""
Given the email address of a user, ensure that the given
Volume exists and is owned by that user.
Do not try to ensure that the user exists.
Return the Volume if we created it, or return None if we did not.
Raise an exception on error.
"""
client = connect_syndicate()
try:
volume = client.read_volume( opencloud_volume.name )
except Exception, e:
# transport error
logger.exception(e)
raise e
if volume is None:
# the volume does not exist....try to create it
vol_name = opencloud_volume.name
vol_blocksize = opencloud_volume.blocksize
vol_description = opencloud_volume.description
vol_private = opencloud_volume.private
vol_archive = opencloud_volume.archive
vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
try:
vol_info = client.create_volume( user_email, vol_name, vol_description, vol_blocksize,
private=vol_private,
archive=vol_archive,
active=True,
default_gateway_caps=vol_default_gateway_caps,
store_private_key=False,
metadata_private_key="MAKE_METADATA_KEY" )
except Exception, e:
# transport error
logger.exception(e)
raise e
else:
# successfully created the volume!
return vol_info
else:
# volume already exists. Verify its owned by this user.
if user is None:
try:
user = client.read_user( volume['owner_id'] )
except Exception, e:
# transport error, or user doesn't exist (either is unacceptable)
logger.exception(e)
raise e
if user is None or user['email'] != user_email:
raise Exception("Volume '%s' already exists, but is NOT owned by '%s'" % (opencloud_volume.name, user_email) )
# we're good!
return None
#-------------------------------
def ensure_volume_absent( volume_name ):
"""
Given an OpenCloud volume, ensure that the corresponding Syndicate
Volume does not exist.
"""
client = connect_syndicate()
# this is idempotent, and returns True even if the Volume doesn't exist
return client.delete_volume( volume_name )
#-------------------------------
def update_volume( opencloud_volume ):
"""
Update a Syndicate Volume from an OpenCloud Volume model.
Fails if the Volume does not exist in Syndicate.
"""
client = connect_syndicate()
vol_name = opencloud_volume.name
vol_description = opencloud_volume.description
vol_private = opencloud_volume.private
vol_archive = opencloud_volume.archive
vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
try:
rc = client.update_volume( vol_name,
description=vol_description,
private=vol_private,
archive=vol_archive,
default_gateway_caps=vol_default_gateway_caps )
if not rc:
raise Exception("update_volume(%s) failed!" % vol_name )
except Exception, e:
# transort or method error
logger.exception(e)
return False
else:
return True
#-------------------------------
def ensure_volume_access_right_exists( user_email, volume_name, caps, allowed_gateways=[msconfig.GATEWAY_TYPE_UG] ):
"""
Ensure that a particular user has particular access to a particular volume.
Do not try to ensure that the user or volume exist, however!
"""
client = connect_syndicate()
return syndicate_provisioning.ensure_volume_access_right_exists( client, user_email, volume_name, caps, allowed_gateways )
#-------------------------------
def ensure_volume_access_right_absent( user_email, volume_name ):
"""
Ensure that acess to a particular volume is revoked.
"""
client = connect_syndicate()
return syndicate_provisioning.ensure_volume_access_right_absent( client, user_email, volume_name )
#-------------------------------
def setup_volume_access( user_email, volume_name, caps, RG_port, slice_secret, RG_closure=None ):
"""
Set up the Volume to allow the slice to provision UGs in it, and to fire up RGs.
* create the Volume Access Right for the user, so (s)he can create Gateways.
* provision a single Replica Gateway, serving on localhost.
"""
client = connect_syndicate()
try:
rc = ensure_volume_access_right_exists( user_email, volume_name, caps )
assert rc is True, "Failed to create access right for %s in %s" % (user_email, volume_name)
except Exception, e:
logger.exception(e)
return False
RG_name = syndicate_provisioning.make_gateway_name( "OpenCloud", "RG", volume_name, "localhost" )
RG_key_password = syndicate_provisioning.make_gateway_private_key_password( RG_name, slice_secret )
try:
rc = syndicate_provisioning.ensure_RG_exists( client, user_email, volume_name, RG_name, "localhost", RG_port, RG_key_password, closure=RG_closure )
except Exception, e:
logger.exception(e)
return False
return True
#-------------------------------
def teardown_volume_access( user_email, volume_name ):
"""
Revoke access to a Volume for a User.
* remove the user's Volume Access Right
* remove the use'rs gateways
"""
client = connect_syndicate()
# block the user from creating more gateways, and delete the gateways
try:
rc = client.remove_user_from_volume( user_email, volume_name )
assert rc is True, "Failed to remove access right for %s in %s" % (user_email, volume_name)
except Exception, e:
logger.exception(e)
return False
return True
#-------------------------------
def create_sealed_and_signed_blob( private_key_pem, secret, data ):
"""
Create a sealed and signed message.
"""
# seal it with the password
logger.info("Sealing credential data")
rc, sealed_data = c_syndicate.password_seal( data, secret )
if rc != 0:
logger.error("Failed to seal data with the secret, rc = %s" % rc)
return None
msg = syndicate_crypto.sign_and_serialize_json( private_key_pem, sealed_data )
if msg is None:
logger.error("Failed to sign credential")
return None
return msg
#-------------------------------
def verify_and_unseal_blob( public_key_pem, secret, blob_data ):
"""
verify and unseal a serialized string of JSON
"""
# verify it
rc, sealed_data = syndicate_crypto.verify_and_parse_json( public_key_pem, blob_data )
if rc != 0:
logger.error("Failed to verify and parse blob, rc = %s" % rc)
return None
logger.info("Unsealing credential data")
rc, data = c_syndicate.password_unseal( sealed_data, secret )
if rc != 0:
logger.error("Failed to unseal blob, rc = %s" % rc )
return None
return data
#-------------------------------
def create_volume_list_blob( private_key_pem, slice_secret, volume_list ):
"""
Create a sealed volume list, signed with the private key.
"""
list_data = {
"volumes": volume_list
}
list_data_str = json.dumps( list_data )
msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, list_data_str )
if msg is None:
logger.error("Failed to seal volume list")
return None
return msg
#-------------------------------
def create_slice_credential_blob( private_key_pem, slice_name, slice_secret, syndicate_url, volume_name, volume_owner, UG_port, user_pkey_pem ):
"""
Create a sealed, signed, encoded slice credentials blob.
"""
# create and serialize the data
cred_data = {
"syndicate_url": syndicate_url,
"volume_name": volume_name,
"volume_owner": volume_owner,
"slice_name": slice_name,
"slice_UG_port": UG_port,
"principal_pkey_pem": user_pkey_pem,
}
cred_data_str = json.dumps( cred_data )
msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, cred_data_str )
if msg is None:
logger.error("Failed to seal volume list")
return None
return msg
#-------------------------------
def put_principal_data( user_email, observer_secret, public_key_pem, private_key_pem ):
"""
Seal and store the principal's private key into the database, in a SyndicatePrincipal object,
so the sliver-side Syndicate daemon syndicated.py can get them later.
Overwrite an existing principal if one exists.
"""
sealed_private_key = create_sealed_and_signed_blob( private_key_pem, observer_secret, private_key_pem )
if sealed_private_key is None:
return False
try:
sp = models.SyndicatePrincipal( sealed_private_key=sealed_private_key, public_key_pem=public_key_pem, principal_id=user_email )
sp.save()
except IntegrityError, e:
logger.error("WARN: overwriting existing principal %s" % user_email)
sp.delete()
sp.save()
return True
#-------------------------------
def delete_principal_data( user_email ):
"""
Delete an OpenCloud SyndicatePrincipal object.
"""
sp = get_principal_data( user_email )
if sp is not None:
sp.delete()
return True
#-------------------------------
def get_principal_data( user_email ):
"""
Get a SyndicatePrincipal record from the database
"""
try:
sp = models.SyndicatePrincipal.objects.get( principal_id=user_email )
return sp
except ObjectDoesNotExist:
logger.error("No SyndicatePrincipal record for %s" % user_email)
return None
#-------------------------------
def get_principal_pkey( user_email, observer_secret ):
"""
Fetch and unseal the private key of a SyndicatePrincipal.
"""
sp = get_principal_data( user_email )
if sp is None:
logger.error("Failed to find private key for principal %s" % user_email )
return None
public_key_pem = sp.public_key_pem
sealed_private_key_pem = sp.sealed_private_key
# unseal
private_key_pem = verify_and_unseal_blob(public_key_pem, observer_secret, sealed_private_key_pem)
if private_key_pem is None:
logger.error("Failed to unseal private key")
return private_key_pem
#-------------------------------
def get_private_key_pem( pkey_path ):
"""
Get a private key from storage, PEM-encoded.
"""
# get the OpenCloud private key
observer_pkey = syndicate_storage.read_private_key( pkey_path )
if observer_pkey is None:
logger.error("Failed to load Observer private key")
return None
observer_pkey_pem = observer_pkey.exportKey()
return observer_pkey_pem
#-------------------------------
def encrypt_slice_secret( observer_pkey_pem, slice_secret ):
"""
Encrypt and serialize the slice secret with the Observer private key
"""
# get the public key
try:
observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
except Exception, e:
logger.exception(e)
logger.error("Failed to derive public key from private key")
return None
# encrypt the data
rc, sealed_slice_secret = c_syndicate.encrypt_data( observer_pkey_pem, observer_pubkey_pem, slice_secret )
if rc != 0:
logger.error("Failed to encrypt slice secret")
return None
sealed_slice_secret_b64 = base64.b64encode( sealed_slice_secret )
return sealed_slice_secret_b64
#-------------------------------
def decrypt_slice_secret( observer_pkey_pem, sealed_slice_secret_b64 ):
"""
Unserialize and decrypt a slice secret
"""
# get the public key
try:
observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
except Exception, e:
logger.exception(e)
logger.error("Failed to derive public key from private key")
return None
sealed_slice_secret = base64.b64decode( sealed_slice_secret_b64 )
# decrypt it
rc, slice_secret = c_syndicate.decrypt_data( observer_pubkey_pem, observer_pkey_pem, sealed_slice_secret )
if rc != 0:
logger.error("Failed to decrypt '%s', rc = %d" % (sealed_slice_secret_b64, rc))
return None
return slice_secret
#--------------------------------
def get_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
"""
Get the shared secret for a slice.
"""
ss = None
# get the sealed slice secret from Django
try:
if slice_fk is not None:
ss = models.SliceSecret.objects.get( slice_id=slice_fk )
else:
ss = models.SliceSecret.objects.get( slice_id__name=slice_name )
except ObjectDoesNotExist, e:
logger.error("Failed to load slice secret for (%s, %s)" % (slice_fk, slice_name) )
return None
return ss.secret
#-------------------------------
def put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=None, opencloud_slice=None ):
"""
Put the shared secret for a slice, encrypting it first.
"""
ss = None
if opencloud_slice is None:
# look up the slice
try:
if slice_fk is None:
opencloud_slice = models.Slice.objects.get( name=slice_name )
else:
opencloud_slice = models.Slice.objects.get( id=slice_fk.id )
except Exception, e:
logger.exception(e)
logger.error("Failed to load slice (%s, %s)" % (slice_fk, slice_name) )
return False
ss = models.SliceSecret( slice_id=opencloud_slice, secret=slice_secret )
ss.save()
return True
#-------------------------------
def get_or_create_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
"""
Get a slice secret if it already exists, or generate a slice secret if one does not.
"""
slice_secret = get_slice_secret( observer_pkey_pem, slice_name, slice_fk=slice_fk )
if slice_secret is None or len(slice_secret) == 0:
# generate a slice secret
slice_secret = "".join( random.sample("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 32) )
# store it
rc = put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=slice_fk )
if not rc:
raise SyndicateObserverError("Failed to create slice secret for (%s, %s)" % (slice_fk, slice_name))
return slice_secret
#-------------------------------
def generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
"""
Generate and return the set of credentials to be sent off to the slice VMs.
exisitng_user is a Syndicate user, as a dictionary.
Return None on failure
"""
# get the user's private key
logger.info("Obtaining private key for %s" % user_email)
# it might be in the existing_user...
user_pkey_pem = None
if existing_user is not None:
user_pkey_pem = existing_user.get('signing_private_key', None)
# no luck?
if user_pkey_pem is None:
try:
# get it from Django DB
user_pkey_pem = get_principal_pkey( user_email, observer_secret )
assert user_pkey_pem is not None, "No private key for %s" % user_email
except:
traceback.print_exc()
logger.error("Failed to get private key; cannot generate credentials for %s in %s" % (user_email, volume_name) )
return None
# generate a credetials blob
logger.info("Generating credentials for %s's slice" % (user_email))
try:
creds = create_slice_credential_blob( observer_pkey_pem, slice_name, slice_secret, syndicate_url, volume_name, user_email, UG_port, user_pkey_pem )
assert creds is not None, "Failed to create credentials for %s" % user_email
except:
traceback.print_exc()
logger.error("Failed to generate credentials for %s in %s" % (user_email, volume_name))
return None
return creds
#-------------------------------
def save_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
"""
Create and save a credentials blob to a VolumeSlice.
Return the creds on success.
Return None on failure
"""
creds = generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=existing_user )
ret = None
if creds is not None:
# save it
vs = get_volumeslice( volume_name, slice_name )
if vs is not None:
vs.credentials_blob = creds
vs.save()
# success!
ret = creds
else:
logger.error("Failed to look up VolumeSlice(%s, %s)" % (volume_name, slice_name))
else:
logger.error("Failed to generate credentials for %s, %s" % (volume_name, slice_name))
return ret
#-------------------------------
def get_volumeslice_volume_names( slice_name ):
"""
Get the list of Volume names from the datastore.
"""
try:
all_vs = models.VolumeSlice.objects.filter( slice_id__name = slice_name )
volume_names = []
for vs in all_vs:
volume_names.append( vs.volume_id.name )
return volume_names
except Exception, e:
logger.exception(e)
logger.error("Failed to query datastore for volumes mounted in %s" % slice_name)
return None
#-------------------------------
def get_volumeslice( volume_name, slice_name ):
"""
Get a volumeslice record from the datastore.
"""
try:
vs = models.VolumeSlice.objects.get( volume_id__name = volume_name, slice_id__name = slice_name )
return vs
except Exception, e:
logger.exception(e)
logger.error("Failed to query datastore for volumes (mounted in %s)" % (slice_name if (slice_name is not None or len(slice_name) > 0) else "UNKNOWN"))
return None
#-------------------------------
def do_push( sliver_hosts, portnum, payload ):
"""
Push a payload to a list of slivers.
NOTE: this has to be done in one go, since we can't import grequests
into the global namespace (without wrecking havoc on the credential server),
but it has to stick around for the push to work.
"""
global TESTING, CONFIG
from gevent import monkey
if TESTING:
monkey.patch_all()
else:
# make gevents runnabale from multiple threads (or Django will complain)
monkey.patch_all(socket=True, dns=True, time=True, select=True, thread=False, os=True, ssl=True, httplib=False, aggressive=True)
import grequests
# fan-out
requests = []
for sh in sliver_hosts:
rs = grequests.post( "http://" + sh + ":" + str(portnum), data={"observer_message": payload}, timeout=getattr(CONFIG, "SYNDICATE_HTTP_PUSH_TIMEOUT", 60) )
requests.append( rs )
# fan-in
responses = grequests.map( requests )
assert len(responses) == len(requests), "grequests error: len(responses) != len(requests)"
for i in xrange(0,len(requests)):
resp = responses[i]
req = requests[i]
if resp is None:
logger.error("Failed to connect to %s" % (req.url))
continue
# verify they all worked
if resp.status_code != 200:
logger.error("Failed to POST to %s, status code = %s" % (resp.url, resp.status_code))
continue
return True
#-------------------------------
def get_slice_hostnames( slice_name ):
"""
Query the Django DB and get the list of hosts running in a slice.
"""
openstack_slice = Slice.objects.get( name=slice_name )
if openstack_slice is None:
logger.error("No such slice '%s'" % slice_name)
return None
hostnames = [s.node.name for s in openstack_slice.slivers.all()]
return hostnames
#-------------------------------
def push_credentials_to_slice( slice_name, payload ):
"""
Push a credentials payload to the VMs in a slice.
"""
hostnames = get_slice_hostnames( slice_name )
return do_push( hostnames, CONFIG.SYNDICATE_SLIVER_PORT, payload )
#-------------------------------
class CredentialServerHandler( BaseHTTPServer.BaseHTTPRequestHandler ):
"""
HTTP server handler that allows syndicated.py instances to poll
for volume state.
NOTE: this is a fall-back mechanism. The observer should push new
volume state to the slices' slivers. However, if that fails, the
slivers are configured to poll for volume state periodically. This
server allows them to do just that.
Responses:
GET /<slicename> -- Reply with the signed sealed list of volume names, encrypted by the slice secret
GET /<slicename>/<volumename> -- Reply with the signed sealed volume access credentials, encrypted by the slice secret
!!! TEMPORARY !!!
GET /<slicename>/SYNDICATE_SLICE_SECRET -- Reply with the slice secret (TEMPORARY)
NOTE: We want to limit who can learn which Volumes a slice can access, so we'll seal its slivers'
credentials with the SliceSecret secret. The slivers (which have the slice-wide secret) can then decrypt it.
However, sealing the listing is a time-consuming process (on the order of 10s), so we only want
to do it when we have to. Since *anyone* can ask for the ciphertext of the volume list,
we will cache the list ciphertext for each slice for a long-ish amount of time, so we don't
accidentally DDoS this server. This necessarily means that the sliver might see a stale
volume listing, but that's okay, since the Observer is eventually consistent anyway.
"""
cached_volumes_json = {} # map slice_name --> (volume name, timeout)
cached_volumes_json_lock = threading.Lock()
CACHED_VOLUMES_JSON_LIFETIME = 3600 # one hour
SLICE_SECRET_NAME = "SYNDICATE_SLICE_SECRET"
def parse_request_path( self, path ):
"""
Parse the URL path into a slice name and (possibly) a volume name or SLICE_SECRET_NAME
"""
path_parts = path.strip("/").split("/")
if len(path_parts) == 0:
# invalid
return (None, None)
if len(path_parts) > 2:
# invalid
return (None, None)
slice_name = path_parts[0]
if len(slice_name) == 0:
# empty string is invalid
return (None, None)
volume_name = None
if len(path_parts) > 1:
volume_name = path_parts[1]
return slice_name, volume_name
def reply_data( self, data, datatype="application/json" ):
"""
Give back a 200 response with data.
"""
self.send_response( 200 )
self.send_header( "Content-Type", datatype )
self.send_header( "Content-Length", len(data) )
self.end_headers()
self.wfile.write( data )
return
def get_volumes_message( self, private_key_pem, observer_secret, slice_name ):
"""
Get the json-ized list of volumes this slice is attached to.
Check the cache, evict stale data if necessary, and on miss,
regenerate the slice volume list.
"""
# block the cache.
# NOTE: don't release the lock until we've generated credentials.
# Chances are, there's a thundering herd of slivers coming online.
# Block them all until we've generated their slice's credentials,
# and then serve them the cached one.
self.cached_volumes_json_lock.acquire()
ret = None
volume_list_json, cache_timeout = self.cached_volumes_json.get( slice_name, (None, None) )
if (cache_timeout is not None) and cache_timeout < time.time():
# expired
volume_list_json = None
if volume_list_json is None:
# generate a new list and cache it.
volume_names = get_volumeslice_volume_names( slice_name )
if volume_names is None:
# nothing to do...
ret = None
else:
# get the slice secret
slice_secret = get_slice_secret( private_key_pem, slice_name )
if slice_secret is None:
# no such slice
logger.error("No slice secret for %s" % slice_name)
ret = None
else:
# seal and sign
ret = create_volume_list_blob( private_key_pem, slice_secret, volume_names )
# cache this
if ret is not None:
self.cached_volumes_json[ slice_name ] = (ret, time.time() + self.CACHED_VOLUMES_JSON_LIFETIME )
else:
# hit the cache
ret = volume_list_json
self.cached_volumes_json_lock.release()
return ret
def do_GET( self ):
"""
Handle one GET
"""
slice_name, volume_name = self.parse_request_path( self.path )
# valid request?
if volume_name is None and slice_name is None:
self.send_error( 400 )
# slice secret request?
elif volume_name == self.SLICE_SECRET_NAME and slice_name is not None:
# get the slice secret
ret = get_slice_secret( self.server.private_key_pem, slice_name )
if ret is not None:
self.reply_data( ret )
return
else:
self.send_error( 404 )
# volume list request?
elif volume_name is None and slice_name is not None:
# get the list of volumes for this slice
ret = self.get_volumes_message( self.server.private_key_pem, self.server.observer_secret, slice_name )
if ret is not None:
self.reply_data( ret )
return
else:
self.send_error( 404 )
# volume credential request?
elif volume_name is not None and slice_name is not None:
# get the VolumeSlice record
vs = get_volumeslice( volume_name, slice_name )
if vs is None:
# not found
self.send_error( 404 )
return
else:
ret = vs.credentials_blob
if ret is not None:
self.reply_data( vs.credentials_blob )
else:
# not generated???
print ""
print vs
print ""
self.send_error( 503 )
return
else:
# shouldn't get here...
self.send_error( 500 )
return
#-------------------------------
class CredentialServer( BaseHTTPServer.HTTPServer ):
def __init__(self, private_key_pem, observer_secret, server, req_handler ):
self.private_key_pem = private_key_pem
self.observer_secret = observer_secret
BaseHTTPServer.HTTPServer.__init__( self, server, req_handler )
#-------------------------------
def credential_server_spawn( old_exit_status ):
"""
Start our credential server (i.e. in a separate process, started by the watchdog)
"""
setproctitle.setproctitle( "syndicate-credential-server" )
private_key = syndicate_storage.read_private_key( CONFIG.SYNDICATE_PRIVATE_KEY )
if private_key is None:
# exit code 255 will be ignored...
logger.error("Cannot load private key. Exiting...")
sys.exit(255)
logger.info("Starting Syndicate Observer credential server on port %s" % CONFIG.SYNDICATE_HTTP_PORT)
srv = CredentialServer( private_key.exportKey(), CONFIG.SYNDICATE_OPENCLOUD_SECRET, ('', CONFIG.SYNDICATE_HTTP_PORT), CredentialServerHandler)
srv.serve_forever()
#-------------------------------
def ensure_credential_server_running( foreground=False, run_once=False ):
"""
Instantiate our credential server and keep it running.
"""
# is the watchdog running?
pids = syndicate_watchdog.find_by_attrs( "syndicate-credential-server-watchdog", {} )
if len(pids) > 0:
# it's running
return True
if foreground:
# run in foreground
if run_once:
return credential_server_spawn( 0 )
else:
return syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) )
# not running, and not foregrounding. fork a new one
try:
watchdog_pid = os.fork()
except OSError, oe:
logger.error("Failed to fork, errno = %s" % oe.errno)
return False
if watchdog_pid != 0:
# child--become watchdog
setproctitle.setproctitle( "syndicate-credential-server-watchdog" )
if run_once:
syndicate_daemon.daemonize( lambda: credential_server_spawn(0), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
else:
syndicate_daemon.daemonize( lambda: syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) ), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
#-------------------------------
# Begin functional tests.
# Any method starting with ft_ is a functional test.
#-------------------------------
#-------------------------------
def ft_syndicate_access():
"""
Functional tests for ensuring objects exist and don't exist in Syndicate.
"""
fake_user = FakeObject()
fake_user.email = "fakeuser@opencloud.us"
print "\nensure_user_exists(%s)\n" % fake_user.email
ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
print "\nensure_user_exists(%s)\n" % fake_user.email
ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
fake_volume = FakeObject()
fake_volume.name = "fakevolume"
fake_volume.description = "This is a fake volume, created for funtional testing"
fake_volume.blocksize = 1024
fake_volume.cap_read_data = True
fake_volume.cap_write_data = True
fake_volume.cap_host_data = False
fake_volume.archive = False
fake_volume.private = True
# test idempotency
print "\nensure_volume_exists(%s)\n" % fake_volume.name
ensure_volume_exists( fake_user.email, fake_volume )
print "\nensure_volume_exists(%s)\n" % fake_volume.name
ensure_volume_exists( fake_user.email, fake_volume )
print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
print "\nensure_volume_absent(%s)\n" % fake_volume.name
ensure_volume_absent( fake_volume.name )
print "\nensure_volume_absent(%s)\n" % fake_volume.name
ensure_volume_absent( fake_volume.name )
print "\nensure_user_absent(%s)\n" % fake_user.email
ensure_user_absent( fake_user.email )
print "\nensure_user_absent(%s)\n" % fake_user.email
ensure_user_absent( fake_user.email )
print "\nensure_principal_exists(%s)\n" % fake_user.email
ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
print "\nensure_principal_exists(%s)\n" % fake_user.email
ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
print "\nensure_volume_exists(%s)\n" % fake_volume.name
ensure_volume_exists( fake_user.email, fake_volume )
print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
teardown_volume_access( fake_user.email, fake_volume.name )
print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
teardown_volume_access( fake_user.email, fake_volume.name )
print "\nensure_volume_absent(%s)\n" % fake_volume.name
ensure_volume_absent( fake_volume.name )
print "\nensure_principal_absent(%s)\n" % fake_user.email
ensure_principal_absent( fake_user.email )
#-------------------------------
def ft_volumeslice( slice_name ):
"""
Functional tests for reading VolumeSlice information
"""
print "slice: %s" % slice_name
volumes = get_volumeslice_volume_names( slice_name )
print "volumes mounted in slice %s:" % slice_name
for v in volumes:
print " %s:" % v
vs = get_volumeslice( v, slice_name )
print " %s" % dir(vs)
#-------------------------------
def ft_get_slice_hostnames( slice_name ):
"""
Functional tests for getting slice hostnames
"""
print "Get slice hostnames for %s" % slice_name
hostnames = get_slice_hostnames( slice_name )
import pprint
pp = pprint.PrettyPrinter()
pp.pprint( hostnames )
#-------------------------------
def ft_syndicate_principal():
"""
Functional tests for creating, reading, and deleting SyndicatePrincipals.
"""
print "generating key pair"
pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
user_email = "fakeuser@opencloud.us"
print "saving principal"
put_principal_data( user_email, "asdf", pubkey_pem, privkey_pem )
print "fetching principal private key"
saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
assert saved_privkey_pem is not None, "Could not fetch saved private key"
assert saved_privkey_pem == privkey_pem, "Saved private key does not match actual private key"
print "delete principal"
delete_principal_data( user_email )
print "make sure its deleted..."
saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
assert saved_privkey_pem is None, "Principal key not deleted"
#-------------------------------
def ft_credential_server():
"""
Functional test for the credential server
"""
ensure_credential_server_running( run_once=True, foreground=True )
#-------------------------------
def ft_seal_and_unseal():
"""
Functional test for sealing/unsealing data
"""
print "generating key pair"
pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
sealed_buf = create_sealed_and_signed_blob( privkey_pem, "foo", "hello world")
print "sealed data is:\n\n%s\n\n" % sealed_buf
buf = verify_and_unseal_blob( pubkey_pem, "foo", sealed_buf )
print "unsealed data is: \n\n%s\n\n" % buf
# run functional tests
if __name__ == "__main__":
sys.path.append("/opt/xos")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
if len(sys.argv) < 2:
print "Usage: %s testname [args]" % sys.argv[0]
# call a method starting with ft_, and then pass the rest of argv as its arguments
testname = sys.argv[1]
ft_testname = "ft_%s" % testname
test_call = "%s(%s)" % (ft_testname, ",".join(sys.argv[2:]))
print "calling %s" % test_call
rc = eval( test_call )
print "result = %s" % rc