CORD-2965 Kubernetes Synchronizer
Change-Id: Ie5c02b9ad1c65af686598bab0b36350ac1caef64
diff --git a/xos/synchronizer/steps/sync_kubernetesserviceinstance.py b/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
index 400396d..27badfa 100644
--- a/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
+++ b/xos/synchronizer/steps/sync_kubernetesserviceinstance.py
@@ -17,6 +17,10 @@
sync_kubernetesserviceinstance.py
Synchronize KubernetesServiceInstance. See also the related pull_step.
+
+ This sync_step is intended to handle the case where callers are creating pods directly, as opposed to using
+ a controller to manage pods for them. It makes some simplifying assumptions, such as each pod has one
+ container and uses one image.
"""
from synchronizers.new_base.syncstep import SyncStep
@@ -25,6 +29,7 @@
from xosconfig import Config
from multistructlog import create_logger
+from kubernetes.client.rest import ApiException
from kubernetes import client as kubernetes_client, config as kubernetes_config
log = create_logger(Config().get('logging'))
@@ -43,16 +48,62 @@
def __init__(self, *args, **kwargs):
super(SyncKubernetesServiceInstance, self).__init__(*args, **kwargs)
-
kubernetes_config.load_incluster_config()
self.v1 = kubernetes_client.CoreV1Api()
- def sync_record(self, o):
- # TODO(smbaker): implement sync step here
- pass
+ def get_pod(self, o):
+ """ Given a KubernetesServiceInstance, read the pod from Kubernetes.
+ Return None if the pod does not exist.
+ """
+ try:
+ pod = self.v1.read_namespaced_pod(o.name, o.slice.trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return pod
+ def sync_record(self, o):
+ if o.xos_managed:
+ if (not o.slice) or (not o.slice.trust_domain):
+ raise Exception("No trust domain for service instance", o=o)
+
+ if (not o.name):
+ raise Exception("No name for service instance", o=o)
+
+ pod = self.get_pod(o)
+ if not pod:
+ # make a pod!
+ pod = kubernetes_client.V1Pod()
+ pod.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ if o.slice.trust_domain:
+ pod.metadata.namespace = o.slice.trust_domain.name
+
+ if o.image.tag:
+ imageName = o.image.name + ":" + o.image.tag
+ else:
+ # TODO(smbaker): Is this case possible?
+ imageName = o.image.name
+
+ container=kubernetes_client.V1Container(name=o.name,
+ image=imageName)
+
+ spec = kubernetes_client.V1PodSpec(containers=[container])
+ pod.spec = spec
+
+ if o.slice.principal:
+ pod.spec.service_account = o.slice.principal.name
+
+ log.info("Creating pod", o=o, pod=pod)
+
+ pod = self.v1.create_namespaced_pod(o.slice.trust_domain.name, pod)
+
+ if (not o.backend_handle):
+ o.backend_handle = pod.metadata.self_link
+ o.save(update_fields=["backend_handle"])
def delete_record(self, port):
- # TODO(smbaker): implement delete sync step here
+ # TODO(smbaker): Implement delete step
pass
diff --git a/xos/synchronizer/steps/sync_principal.py b/xos/synchronizer/steps/sync_principal.py
new file mode 100644
index 0000000..d3e61af
--- /dev/null
+++ b/xos/synchronizer/steps/sync_principal.py
@@ -0,0 +1,94 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_principal.py
+
+ Synchronize Principals. Principals correspond roughly to Kubernetes ServiceAccounts.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import Principal
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncPrincipal(SyncStep):
+
+ """
+ SyncPrincipal
+
+ Implements sync step for syncing Principals.
+ """
+
+ provides = [Principal]
+ observes = Principal
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncPrincipal, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def get_service_account(self, o):
+ """ Given an XOS Principal object, read the corresponding ServiceAccount from Kubernetes.
+ return None if no ServiceAccount exists.
+ """
+ try:
+ service_account = self.v1.read_namespaced_service_account(o.name, o.trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return service_account
+
+ def fetch_pending(self, deleted):
+ """ Figure out which Principals are interesting to the K8s synchronizer.
+ As each Principal exists within a Trust Domain, this reduces to figuring out which Trust Domains are
+ interesting.
+ """
+ objs = super(SyncPrincipal, self).fetch_pending(deleted)
+ for obj in objs[:]:
+ # If the Principal isn't in a TrustDomain, then the K8s synchronizer can't do anything with it
+ if not obj.trust_domain:
+ objs.remove(obj)
+ continue
+
+ # If the Principal's TrustDomain isn't part of the K8s service, then it's someone else's principal
+ if "KubernetesService" not in obj.trust_domain.owner.leaf_model.class_names:
+ objs.remove(obj)
+ return objs
+
+ def sync_record(self, o):
+ service_account = self.get_service_account(o)
+ if not service_account:
+ service_account = kubernetes_client.V1ServiceAccount()
+ service_account.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ service_account = self.v1.create_namespaced_service_account(o.trust_domain.name, service_account)
+
+ if (not o.backend_handle):
+ o.backend_handle = service_account.metadata.self_link
+ o.save(update_fields=["backend_handle"])
+
+ def delete_record(self, port):
+ # TODO(smbaker): Implement delete step
+ pass
+
diff --git a/xos/synchronizer/steps/sync_service.py b/xos/synchronizer/steps/sync_service.py
new file mode 100644
index 0000000..333f675
--- /dev/null
+++ b/xos/synchronizer/steps/sync_service.py
@@ -0,0 +1,130 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_service.py
+
+ Synchronize Services. The only type of Service this step knows how to deal with are services that use Kubernetes
+ NodePort to expose ports.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import Service
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncService(SyncStep):
+
+ """
+ SyncService
+
+ Implements sync step for syncing Services.
+ """
+
+ provides = [Service]
+ observes = Service
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncService, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def fetch_pending(self, deletion=False):
+ """ Filter the set of pending objects.
+ As this syncstep can only create Service that exist within Trust Domains, filter out those services that
+ don't have Trust Domains associated with them.
+ """
+ models = super(SyncService, self).fetch_pending(deletion)
+
+ if (not deletion):
+ for model in models[:]:
+ if not self.get_trust_domain(model):
+ log.info("Unable to determine Trust Domain for service %s. Ignoring." % model.name)
+ models.remove(model)
+ elif not model.serviceports.exists():
+ # If there are not ServicePorts, then there's not much for us to do at this time...
+ log.info("Service %s is not interesting. Ignoring." % model.name)
+ models.remove(model)
+
+ return models
+
+ def get_trust_domain(self, o):
+ """ Given a service, determine its Trust Domain.
+
+ The design we've chosen to go with is that a service is pinned to a Trust Domain based on the slices
+ that it contains. It's an error for a service to be directly comprised of slices from multiple
+ trust domains.
+
+ This allows for "logical services", that contain no slices of their own, but are comprised of multiple
+ subservices. For example, EPC.
+ """
+
+ trust_domain = None
+ for slice in o.slices.all():
+ if slice.trust_domain:
+ if (trust_domain is None):
+ trust_domain = slice.trust_domain
+ elif (trust_domain.id != slice.trust_domain.id):
+ # Bail out of we've encountered a situation where a service spans multiple trust domains.
+ log.warning("Service %s is comprised of slices from multiple trust domains." % o.name)
+ return None
+
+ return trust_domain
+
+ def get_service(self, o, trust_domain):
+ """ Given an XOS Service, read the associated Service from Kubernetes.
+ If no Kubernetes service exists, return None
+ """
+ try:
+ k8s_service = self.v1.read_namespaced_service(o.name, trust_domain.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return k8s_service
+
+ def sync_record(self, o):
+ trust_domain = self.get_trust_domain(o)
+ k8s_service = self.get_service(o,trust_domain)
+
+ if not k8s_service:
+ k8s_service = kubernetes_client.V1Service()
+ k8s_service.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ ports=[]
+ for service_port in o.serviceports.all():
+ port=kubernetes_client.V1ServicePort(name = service_port.name,
+ node_port = service_port.external_port,
+ port = service_port.internal_port,
+ target_port = service_port.internal_port,
+ protocol = service_port.protocol)
+ ports.append(port)
+
+ k8s_service.spec = kubernetes_client.V1ServiceSpec(ports=ports,
+ type="NodePort")
+
+ self.v1.create_namespaced_service(trust_domain.name, k8s_service)
+
+ def delete_record(self, o):
+ # TODO(smbaker): Implement delete step
+ pass
+
diff --git a/xos/synchronizer/steps/sync_trustdomain.py b/xos/synchronizer/steps/sync_trustdomain.py
new file mode 100644
index 0000000..e2d0a1d
--- /dev/null
+++ b/xos/synchronizer/steps/sync_trustdomain.py
@@ -0,0 +1,90 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ sync_trustdomain.py
+
+ Synchronize TrustDomain. TrustDomains correspond roughly to Kubernetes namespaces.
+"""
+
+from synchronizers.new_base.syncstep import SyncStep
+from synchronizers.new_base.modelaccessor import TrustDomain
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+from kubernetes.client.rest import ApiException
+from kubernetes import client as kubernetes_client, config as kubernetes_config
+
+log = create_logger(Config().get('logging'))
+
+class SyncTrustDomain(SyncStep):
+
+ """
+ SyncTrustsDomain
+
+ Implements sync step for syncing trust domains.
+ """
+
+ provides = [TrustDomain]
+ observes = TrustDomain
+ requested_interval = 0
+
+ def __init__(self, *args, **kwargs):
+ super(SyncTrustDomain, self).__init__(*args, **kwargs)
+ kubernetes_config.load_incluster_config()
+ self.v1 = kubernetes_client.CoreV1Api()
+
+ def fetch_pending(self, deleted):
+ """ Figure out which TrustDomains are interesting to the K8s synchronizer. It's necessary to filter as we're
+ synchronizing a core model, and we only want to synchronize trust domains that will exist within
+ Kubernetes.
+ """
+ objs = super(SyncTrustDomain, self).fetch_pending(deleted)
+ for obj in objs[:]:
+ # If the TrustDomain isn't part of the K8s service, then it's someone else's trust domain
+ if "KubernetesService" not in obj.owner.leaf_model.class_names:
+ objs.remove(obj)
+ return objs
+
+ def get_namespace(self, o):
+ """ Give an XOS TrustDomain object, return the corresponding namespace from Kubernetes.
+ Return None if no namespace exists.
+ """
+ try:
+ ns = self.v1.read_namespace(o.name)
+ except ApiException, e:
+ if e.status == 404:
+ return None
+ raise
+ return ns
+
+ def sync_record(self, o):
+ ns = self.get_namespace(o)
+ if not ns:
+ ns = kubernetes_client.V1Namespace()
+ ns.metadata = kubernetes_client.V1ObjectMeta(name=o.name)
+
+ log.info("creating namespace %s" % o.name)
+ ns=self.v1.create_namespace(ns)
+
+ if (not o.backend_handle):
+ o.backend_handle = ns.metadata.self_link
+ o.save(update_fields=["backend_handle"])
+
+ def delete_record(self, port):
+ # TODO(smbaker): Implement delete step
+ pass
+