CORD-2965 Kubernetes Synchronizer
Change-Id: Ie5c02b9ad1c65af686598bab0b36350ac1caef64
diff --git a/xos/examples/make_pod.xossh b/xos/examples/make_pod.xossh
new file mode 100644
index 0000000..6394147
--- /dev/null
+++ b/xos/examples/make_pod.xossh
@@ -0,0 +1,31 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is intended to be pasted into an xossh session
+
+t=TrustDomain(name="demo-trust", owner=KubernetesService.objects.first())
+t.save()
+p=Principal(name="demo-account", trust_domain=t)
+p.save()
+existing_images=Image.objects.filter(name="k8s.gcr.io/pause-amd64")
+if existing_images:
+ img = existing_images[0]
+else:
+ img=Image(name="k8s.gcr.io/pause-amd64", tag="3.0")
+ img.save()
+
+s=Slice(name="mysite_demo1", site=Site.objects.first(), trust_domain=t, principal=p)
+s.save()
+i=KubernetesServiceInstance(name="demo-pod", slice=s, image=img, owner=KubernetesService.objects.first(), xos_managed=True)
+i.save()
diff --git a/xos/examples/make_pod.yaml b/xos/examples/make_pod.yaml
new file mode 100644
index 0000000..2e05f32
--- /dev/null
+++ b/xos/examples/make_pod.yaml
@@ -0,0 +1,96 @@
+---
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+tosca_definitions_version: tosca_simple_yaml_1_0
+
+description: Make a pod using Kubernetes Synchronizer
+
+imports:
+ - custom_types/trustdomain.yaml
+ - custom_types/principal.yaml
+ - custom_types/image.yaml
+ - custom_types/site.yaml
+ - custom_types/slice.yaml
+ - custom_types/kubernetesservice.yaml
+ - custom_types/kubernetesserviceinstance.yaml
+
+topology_template:
+ node_templates:
+ service#kubernetes:
+ type: tosca.nodes.KubernetesService
+ properties:
+ name: kubernetes
+ must-exist: true
+
+ mysite:
+ type: tosca.nodes.Site
+ properties:
+ name: placeholder-sitename
+ must-exist: true
+
+ demo_trustdomain:
+ type: tosca.nodes.TrustDomain
+ properties:
+ name: "demo-trust"
+ requirements:
+ - owner:
+ node: service#kubernetes
+ relationship: tosca.relationships.BelongsToOne
+
+ demo_principal:
+ type: tosca.nodes.Principal
+ properties:
+ name: "demo-account"
+ requirements:
+ - trust_domain:
+ node: demo_trustdomain
+ relationship: tosca.relationships.BelongsToOne
+
+ image_pause:
+ type: tosca.nodes.Image
+ properties:
+ name: "k8s.gcr.io/pause-amd64"
+ tag: "3.0"
+
+ mysite_demo1:
+ type: tosca.nodes.Slice
+ properties:
+ name: "mysite_demo1"
+ requirements:
+ - site:
+ node: mysite
+ relationship: tosca.relationships.BelongsToOne
+ - trust_domain:
+ node: demo_trustdomain
+ relationship: tosca.relationships.BelongsToOne
+ - principal:
+ node: demo_principal
+ relationship: tosca.relationships.BelongsToOne
+
+ demo_pod:
+ type: tosca.nodes.KubernetesServiceInstance
+ properties:
+ name: "demo-pod"
+ xos_managed: True
+ requirements:
+ - slice:
+ node: mysite_demo1
+ relationship: tosca.relationships.BelongsToOne
+ - owner:
+ node: service#kubernetes
+ relationship: tosca.relationships.BelongsToOne
+ - image:
+ node: image_pause
+ relationship: tosca.relationships.BelongsToOne
diff --git a/xos/examples/make_service.xossh b/xos/examples/make_service.xossh
new file mode 100644
index 0000000..b7191ef
--- /dev/null
+++ b/xos/examples/make_service.xossh
@@ -0,0 +1,24 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is intended to be pasted into an xossh session
+
+t=TrustDomain(name="service1-trust", owner=KubernetesService.objects.first())
+t.save()
+service=Service(name="service1")
+service.save()
+slice=Slice(name="mysite_service1_slice1", trust_domain=t, service=service, site=Site.objects.first())
+slice.save()
+port=ServicePort(name="the-web", service=service, internal_port=80, external_port=30080, protocol="TCP")
+port.save()
diff --git a/xos/examples/make_service.yaml b/xos/examples/make_service.yaml
new file mode 100644
index 0000000..69f41e2
--- /dev/null
+++ b/xos/examples/make_service.yaml
@@ -0,0 +1,81 @@
+---
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+tosca_definitions_version: tosca_simple_yaml_1_0
+
+description: Make a new Service and a ServicePort for it
+
+imports:
+ - custom_types/trustdomain.yaml
+ - custom_types/site.yaml
+ - custom_types/slice.yaml
+ - custom_types/service.yaml
+ - custom_types/serviceport.yaml
+ - custom_types/kubernetesservice.yaml
+
+topology_template:
+ node_templates:
+ mysite:
+ type: tosca.nodes.Site
+ properties:
+ name: placeholder-sitename
+ must-exist: true
+
+ service#kubernetes:
+ type: tosca.nodes.KubernetesService
+ properties:
+ name: kubernetes
+ must-exist: true
+
+ service#demo:
+ type: tosca.nodes.Service
+ properties:
+ name: demo-service
+
+ demo_trustdomain:
+ type: tosca.nodes.TrustDomain
+ properties:
+ name: "demo-trust"
+ requirements:
+ - owner:
+ node: service#kubernetes
+ relationship: tosca.relationships.BelongsToOne
+
+ mysite_demo1:
+ type: tosca.nodes.Slice
+ properties:
+ name: "mysite_demo1"
+ requirements:
+ - site:
+ node: mysite
+ relationship: tosca.relationships.BelongsToOne
+ - trust_domain:
+ node: demo_trustdomain
+ relationship: tosca.relationships.BelongsToOne
+ - service:
+ node: service#demo
+ relationship: tosca.relationships.BelongsToOne
+
+ demo_service_web_port:
+ type: tosca.nodes.ServicePort
+ properties:
+ name: demo-service-web-port
+ internal_port: 80
+ external_port: 30080
+ protocol: "TCP"
+ requirements:
+ - service:
+ node: service#demo
+ relationship: tosca.relationships.BelongsToOne
\ No newline at end of file
diff --git a/xos/synchronizer/kubernetes-synchronizer.py b/xos/synchronizer/kubernetes-synchronizer.py
index 5c83aaf..8287263 100644
--- a/xos/synchronizer/kubernetes-synchronizer.py
+++ b/xos/synchronizer/kubernetes-synchronizer.py
@@ -19,12 +19,12 @@
This is the main entrypoint for the synchronizer. It loads the config file, and then starts the synchronizer.
"""
-
#!/usr/bin/env python
# Runs the standard XOS synchronizer
import importlib
+import logging
import os
import sys
from xosconfig import Config
@@ -32,6 +32,9 @@
config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/kubernetes_config.yaml')
Config.init(config_file, 'synchronizer-config-schema.yaml')
+# prevent logging noise from k8s API calls
+logging.getLogger("kubernetes.client.rest").setLevel(logging.WARNING)
+
synchronizer_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "../../synchronizers/new_base")
sys.path.append(synchronizer_path)
diff --git a/xos/synchronizer/models/kubernetes.xproto b/xos/synchronizer/models/kubernetes.xproto
index 1005057..8c2fb2c 100644
--- a/xos/synchronizer/models/kubernetes.xproto
+++ b/xos/synchronizer/models/kubernetes.xproto
@@ -6,6 +6,7 @@
}
-message KubernetesServiceInstance (ServiceInstance){
+message KubernetesServiceInstance (ComputeServiceInstance){
option verbose_name = "Kubernetes Service Instance";
+ optional string pod_ip = 1 [max_length=32, db_index = False, null = True, blank = True];
}
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index bea45d7..8d5d544 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -19,7 +19,8 @@
"""
from synchronizers.new_base.pullstep import PullStep
-from synchronizers.new_base.modelaccessor import KubernetesServiceInstance
+from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
+ TrustDomain, Site, Image
from xosconfig import Config
from multistructlog import create_logger
@@ -32,16 +33,208 @@
"""
KubernetesServiceInstancePullStep
- Pull information from Kubernetes.
+ Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
+ if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
+ as necessary to fill the required dependencies of the KubernetesServiceInstance.
"""
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
kubernetes_config.load_incluster_config()
- self.v1 = kubernetes_client.CoreV1Api()
+ self.v1core = kubernetes_client.CoreV1Api()
+ self.v1apps = kubernetes_client.AppsV1Api()
+
+ def obj_to_handle(self, obj):
+ """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
+ Kubernetes.
+ """
+ return obj.metadata.self_link
+
+ def read_obj_kind(self, kind, name, trust_domain):
+ """ Given an object kind and name, read it from Kubernetes """
+ if kind == "ReplicaSet":
+ resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
+ elif kind == "StatefulSet":
+ resource = self.v1apps.read_namespaced_statefule_set(name, trust_domain.name)
+ elif kind == "DaemonSet":
+ resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
+ elif kind == "Deployment":
+ resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
+ else:
+ resource = None
+ return resource
+
+ def get_controller_from_obj(self, obj, trust_domain, depth=0):
+ """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
+ is marked as a controller, but does not have any owners.
+
+ This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
+ the ReplicaSet and return the Deployment.
+ """
+
+ owner_references = obj.metadata.owner_references
+ if not owner_references:
+ if (depth==0):
+ # If depth is zero, then we're still looking at the object, not a controller.
+ return None
+ return obj
+
+ for owner_reference in owner_references:
+ if not getattr(owner_reference, "controller", False):
+ continue
+ owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
+ controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
+ if controller:
+ return controller
+
+ return None
+
+ def get_slice_from_pod(self, pod, trust_domain, principal):
+ """ Given a pod, determine which XOS Slice goes with it
+ If the Slice doesn't exist, create it.
+ """
+ controller = self.get_controller_from_obj(pod, trust_domain)
+ if not controller:
+ return None
+
+ slice_name = controller.metadata.name
+ if hasattr(controller.metadata, "labels"):
+ if "xos_slice_name" in controller.metadata.labels:
+ # Someone has labeled the controller with an xos slice name. Use it.
+ slice_name = controller.metadata.labels["xos_slice_name"]
+
+ existing_slices = Slice.objects.filter(name = slice_name)
+ if not existing_slices:
+ # TODO(smbaker): atomicity
+ s = Slice(name=slice_name, site = Site.objects.first(),
+ trust_domain=trust_domain,
+ principal=principal,
+ backend_handle=self.obj_to_handle(controller),
+ controller_kind=controller.kind,
+ xos_managed=False)
+ s.save()
+ return s
+ else:
+ return existing_slices[0]
+
+ def get_trustdomain_from_pod(self, pod, owner_service):
+ """ Given a pod, determine which XOS TrustDomain goes with it
+ If the TrustDomain doesn't exist, create it.
+ """
+ existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
+ if not existing_trustdomains:
+ k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
+
+ # TODO(smbaker): atomicity
+ t = TrustDomain(name = pod.metadata.namespace,
+ xos_managed=False,
+ owner=owner_service,
+ backend_handle = self.obj_to_handle(k8s_trust_domain))
+ t.save()
+ return t
+ else:
+ return existing_trustdomains[0]
+
+ def get_principal_from_pod(self, pod, trust_domain):
+ """ Given a pod, determine which XOS Principal goes with it
+ If the Principal doesn't exist, create it.
+ """
+ principal_name = getattr(pod.spec, "service_account", None)
+ if not principal_name:
+ return None
+ existing_principals = Principal.objects.filter(name = principal_name)
+ if not existing_principals:
+ k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
+
+ # TODO(smbaker): atomicity
+ p = Principal(name = principal_name,
+ trust_domain = trust_domain,
+ xos_managed = False,
+ backend_handle = self.obj_to_handle(k8s_service_account))
+ p.save()
+ return p
+ else:
+ return existing_principals[0]
+
+ def get_image_from_pod(self, pod):
+ """ Given a pod, determine which XOS Image goes with it
+ If the Image doesn't exist, create it.
+ """
+ containers = pod.spec.containers
+ if containers:
+ # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
+ container = containers[0]
+ if ":" in container.image:
+ (name, tag) = container.image.split(":")
+ else:
+ # Is assuming a default necessary?
+ name = container.image
+ tag = "master"
+
+ existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
+ if not existing_images:
+ i = Image(name=name, tag=tag, kind="container", xos_managed=False)
+ i.save()
+ return i
+ else:
+ return existing_images[0]
+ else:
+ return None
def pull_records(self):
- # TODO(smbaker): implement pull step here
- pass
+ # Read all pods from Kubernetes, store them in k8s_pods_by_name
+ k8s_pods_by_name = {}
+ ret = self.v1core.list_pod_for_all_namespaces(watch=False)
+ for item in ret.items:
+ k8s_pods_by_name[item.metadata.name] = item
+ # Read all pods from XOS, store them in xos_pods_by_name
+ xos_pods_by_name = {}
+ existing_pods = KubernetesServiceInstance.objects.all()
+ for pod in existing_pods:
+ xos_pods_by_name[pod.name] = pod
+
+ kubernetes_services = KubernetesService.objects.all()
+ if len(kubernetes_services)==0:
+ raise Exception("There are no Kubernetes Services yet")
+ if len(kubernetes_services)>1:
+ # Simplifying assumption -- there is only one Kubernetes Service
+ raise Exception("There are too many Kubernetes Services")
+ kubernetes_service = kubernetes_services[0]
+
+ # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
+ for (k,pod) in k8s_pods_by_name.items():
+ if not k in xos_pods_by_name:
+ trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
+ if not trust_domain:
+ log.warning("Unable to determine trust_domain for %s" % k)
+ continue
+
+ principal = self.get_principal_from_pod(pod, trust_domain)
+ slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
+ image = self.get_image_from_pod(pod)
+
+ if not slice:
+ log.warning("Unable to determine slice for %s" % k)
+ continue
+
+ xos_pod = KubernetesServiceInstance(name=k,
+ pod_ip = pod.status.pod_ip,
+ owner = kubernetes_service,
+ slice = slice,
+ image = image,
+ backend_handle = self.obj_to_handle(pod),
+ xos_managed = False)
+ xos_pod.save()
+ log.info("Created XOS POD %s" % xos_pod.name)
+
+ # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
+ for (k,xos_pod) in xos_pods_by_name.items():
+ if (not k in k8s_pods_by_name):
+ if (xos_pod.xos_managed):
+ # Should we do something so it gets re-created by the syncstep?
+ pass
+ else:
+ xos_pod.delete()
+ log.info("Deleted XOS POD %s" % k)
diff --git a/xos/synchronizer/steps/sync_kubernetesserviceinstance.py b/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
index 400396d..27badfa 100644
--- a/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
+++ b/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
@@ -17,6 +17,10 @@
sync_kubernetesserviceinstance.py
Synchronize KubernetesServiceInstance. See also the related pull_step.
+
+ This sync_step is intended to handle the case where callers are creating pods directly, as opposed to using
+ a controller to manage pods for them. It makes some simplifying assumptions, such as each pod has one
+ container and uses one image.
"""
from synchronizers.new_base.syncstep import SyncStep
@@ -25,6 +29,7 @@
from xosconfig import Config
from multistructlog import create_logger
+from kubernetes.client.rest import ApiException
from kubernetes import client as kubernetes_client, config as kubernetes_config
log = create_logger(Config().get('logging'))
@@ -43,16 +48,62 @@
def __init__(self, *args, **kwargs):
super(SyncKubernetesServiceInstance, self).__init__(*args, **kwargs)
-
kubernetes_config.load_incluster_config()
self.v1 = kubernetes_client.CoreV1Api()
- def sync_record(self, o):
- # TODO(smbaker): implement sync step here
- pass
+ def get_pod(self, o):
+ """ Given a KubernetesServiceInstance, read the pod from Kubernetes.
+ Return None if the pod does not exist.
+ """
+ try:
+ pod = self.v1.read_namespaced_pod(o.name, o.slice.trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return pod
+ def sync_record(self, o):
+ if o.xos_managed:
+ if (not o.slice) or (not o.slice.trust_domain):
+ raise Exception("No trust domain for service instance", o=o)
+
+ if (not o.name):
+ raise Exception("No name for service instance", o=o)
+
+ pod = self.get_pod(o)
+ if not pod:
+ # make a pod!
+ pod = kubernetes_client.V1Pod()
+ pod.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ if o.slice.trust_domain:
+ pod.metadata.namespace = o.slice.trust_domain.name
+
+ if o.image.tag:
+ imageName = o.image.name + ":" + o.image.tag
+ else:
+ # TODO(smbaker): Is this case possible?
+ imageName = o.image.name
+
+ container=kubernetes_client.V1Container(name=o.name,
+ image=imageName)
+
+ spec = kubernetes_client.V1PodSpec(containers=[container])
+ pod.spec = spec
+
+ if o.slice.principal:
+ pod.spec.service_account = o.slice.principal.name
+
+ log.info("Creating pod", o=o, pod=pod)
+
+ pod = self.v1.create_namespaced_pod(o.slice.trust_domain.name, pod)
+
+ if (not o.backend_handle):
+ o.backend_handle = pod.metadata.self_link
+ o.save(update_fields=["backend_handle"])
def delete_record(self, port):
- # TODO(smbaker): implement delete sync step here
+ # TODO(smbaker): Implement delete step
pass
diff --git a/xos/synchronizer/steps/sync_principal.py b/xos/synchronizer/steps/sync_principal.py
new file mode 100644
index 0000000..d3e61af
--- /dev/null
+++ b/xos/synchronizer/steps/sync_principal.py
@@ -0,0 +1,94 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_principal.py
+
+ Synchronize Principals. Principals correspond roughly to Kubernetes ServiceAccounts.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import Principal
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncPrincipal(SyncStep):
+
+ """
+ SyncPrincipal
+
+ Implements sync step for syncing Principals.
+ """
+
+ provides = [Principal]
+ observes = Principal
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncPrincipal, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def get_service_account(self, o):
+ """ Given an XOS Principal object, read the corresponding ServiceAccount from Kubernetes.
+ return None if no ServiceAccount exists.
+ """
+ try:
+ service_account = self.v1.read_namespaced_service_account(o.name, o.trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return service_account
+
+ def fetch_pending(self, deleted):
+ """ Figure out which Principals are interesting to the K8s synchronizer.
+ As each Principal exists within a Trust Domain, this reduces to figuring out which Trust Domains are
+ interesting.
+ """
+ objs = super(SyncPrincipal, self).fetch_pending(deleted)
+ for obj in objs[:]:
+ # If the Principal isn't in a TrustDomain, then the K8s synchronizer can't do anything with it
+ if not obj.trust_domain:
+ objs.remove(obj)
+ continue
+
+ # If the Principal's TrustDomain isn't part of the K8s service, then it's someone else's principal
+ if "KubernetesService" not in obj.trust_domain.owner.leaf_model.class_names:
+ objs.remove(obj)
+ return objs
+
+ def sync_record(self, o):
+ service_account = self.get_service_account(o)
+ if not service_account:
+ service_account = kubernetes_client.V1ServiceAccount()
+ service_account.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ service_account = self.v1.create_namespaced_service_account(o.trust_domain.name, service_account)
+
+ if (not o.backend_handle):
+ o.backend_handle = service_account.metadata.self_link
+ o.save(update_fields=["backend_handle"])
+
+ def delete_record(self, port):
+ # TODO(smbaker): Implement delete step
+ pass
+
diff --git a/xos/synchronizer/steps/sync_service.py b/xos/synchronizer/steps/sync_service.py
new file mode 100644
index 0000000..333f675
--- /dev/null
+++ b/xos/synchronizer/steps/sync_service.py
@@ -0,0 +1,130 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_service.py
+
+ Synchronize Services. The only type of Service this step knows how to deal with are services that use Kubernetes
+ NodePort to expose ports.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import Service
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncService(SyncStep):
+
+ """
+ SyncService
+
+ Implements sync step for syncing Services.
+ """
+
+ provides = [Service]
+ observes = Service
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncService, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def fetch_pending(self, deletion=False):
+ """ Filter the set of pending objects.
+ As this syncstep can only create Service that exist within Trust Domains, filter out those services that
+ don't have Trust Domains associated with them.
+ """
+ models = super(SyncService, self).fetch_pending(deletion)
+
+ if (not deletion):
+ for model in models[:]:
+ if not self.get_trust_domain(model):
+ log.info("Unable to determine Trust Domain for service %s. Ignoring." % model.name)
+ models.remove(model)
+ elif not model.serviceports.exists():
+ # If there are not ServicePorts, then there's not much for us to do at this time...
+ log.info("Service %s is not interesting. Ignoring." % model.name)
+ models.remove(model)
+
+ return models
+
+ def get_trust_domain(self, o):
+ """ Given a service, determine its Trust Domain.
+
+ The design we've chosen to go with is that a service is pinned to a Trust Domain based on the slices
+ that it contains. It's an error for a service to be directly comprised of slices from multiple
+ trust domains.
+
+ This allows for "logical services", that contain no slices of their own, but are comprised of multiple
+ subservices. For example, EPC.
+ """
+
+ trust_domain = None
+ for slice in o.slices.all():
+ if slice.trust_domain:
+ if (trust_domain is None):
+ trust_domain = slice.trust_domain
+ elif (trust_domain.id != slice.trust_domain.id):
+ # Bail out of we've encountered a situation where a service spans multiple trust domains.
+ log.warning("Service %s is comprised of slices from multiple trust domains." % o.name)
+ return None
+
+ return trust_domain
+
+ def get_service(self, o, trust_domain):
+ """ Given an XOS Service, read the associated Service from Kubernetes.
+ If no Kubernetes service exists, return None
+ """
+ try:
+ k8s_service = self.v1.read_namespaced_service(o.name, trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return k8s_service
+
+ def sync_record(self, o):
+ trust_domain = self.get_trust_domain(o)
+ k8s_service = self.get_service(o,trust_domain)
+
+ if not k8s_service:
+ k8s_service = kubernetes_client.V1Service()
+ k8s_service.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ ports=[]
+ for service_port in o.serviceports.all():
+ port=kubernetes_client.V1ServicePort(name = service_port.name,
+ node_port = service_port.external_port,
+ port = service_port.internal_port,
+ target_port = service_port.internal_port,
+ protocol = service_port.protocol)
+ ports.append(port)
+
+ k8s_service.spec = kubernetes_client.V1ServiceSpec(ports=ports,
+ type="NodePort")
+
+ self.v1.create_namespaced_service(trust_domain.name, k8s_service)
+
+ def delete_record(self, o):
+ # TODO(smbaker): Implement delete step
+ pass
+
diff --git a/xos/synchronizer/steps/sync_trustdomain.py b/xos/synchronizer/steps/sync_trustdomain.py
new file mode 100644
index 0000000..e2d0a1d
--- /dev/null
+++ b/xos/synchronizer/steps/sync_trustdomain.py
@@ -0,0 +1,90 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_trustdomain.py
+
+ Synchronize TrustDomain. TrustDomains correspond roughly to Kubernetes namespaces.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import TrustDomain
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncTrustDomain(SyncStep):
+
+ """
+ SyncTrustsDomain
+
+ Implements sync step for syncing trust domains.
+ """
+
+ provides = [TrustDomain]
+ observes = TrustDomain
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncTrustDomain, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def fetch_pending(self, deleted):
+ """ Figure out which TrustDomains are interesting to the K8s synchronizer. It's necessary to filter as we're
+ synchronizing a core model, and we only want to synchronize trust domains that will exist within
+ Kubernetes.
+ """
+ objs = super(SyncTrustDomain, self).fetch_pending(deleted)
+ for obj in objs[:]:
+ # If the TrustDomain isn't part of the K8s service, then it's someone else's trust domain
+ if "KubernetesService" not in obj.owner.leaf_model.class_names:
+ objs.remove(obj)
+ return objs
+
+ def get_namespace(self, o):
+ """ Give an XOS TrustDomain object, return the corresponding namespace from Kubernetes.
+ Return None if no namespace exists.
+ """
+ try:
+ ns = self.v1.read_namespace(o.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return ns
+
+ def sync_record(self, o):
+ ns = self.get_namespace(o)
+ if not ns:
+ ns = kubernetes_client.V1Namespace()
+ ns.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ log.info("creating namespace %s" % o.name)
+ ns=self.v1.create_namespace(ns)
+
+ if (not o.backend_handle):
+ o.backend_handle = ns.metadata.self_link
+ o.save(update_fields=["backend_handle"])
+
+ def delete_record(self, port):
+ # TODO(smbaker): Implement delete step
+ pass
+