blob: 36562b34ee7f8d6dcc8b9753f39eaec504cb2304 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
pull_pods.py
Implements a syncstep to pull information about pods form Kubernetes.
"""
import json
from synchronizers.new_base.pullstep import PullStep
from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
TrustDomain, Site, Image
from xosconfig import Config
from multistructlog import create_logger
from xoskafka import XOSKafkaProducer
log = create_logger(Config().get('logging'))
class KubernetesServiceInstancePullStep(PullStep):
"""
KubernetesServiceInstancePullStep
Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
as necessary to fill the required dependencies of the KubernetesServiceInstance.
"""
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
self.init_kubernetes_client()
def init_kubernetes_client(self):
from kubernetes import client as kubernetes_client, config as kubernetes_config
kubernetes_config.load_incluster_config()
self.v1core = kubernetes_client.CoreV1Api()
self.v1apps = kubernetes_client.AppsV1Api()
self.v1batch = kubernetes_client.BatchV1Api()
def obj_to_handle(self, obj):
""" Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
Kubernetes.
"""
return obj.metadata.self_link
def read_obj_kind(self, kind, name, trust_domain):
""" Given an object kind and name, read it from Kubernetes """
if kind == "ReplicaSet":
resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
elif kind == "StatefulSet":
resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
elif kind == "DaemonSet":
resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
elif kind == "Deployment":
resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
elif kind == "Job":
resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
else:
resource = None
return resource
def get_controller_from_obj(self, obj, trust_domain, depth=0):
""" Given an object, Search for its controller. Strategy is to walk backward until we find some object that
is marked as a controller, but does not have any owners.
This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
the ReplicaSet and return the Deployment.
"""
owner_references = obj.metadata.owner_references
if not owner_references:
if (depth==0):
# If depth is zero, then we're still looking at the object, not a controller.
return None
return obj
for owner_reference in owner_references:
if not getattr(owner_reference, "controller", False):
continue
owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
if not owner:
# Failed to fetch the owner, probably because the owner's kind is something we do not understand. An
# example is the etcd-cluser pod, which is owned by a deployment of kind "EtcdCluster".
log.debug("failed to fetch owner", owner_reference=owner_reference)
continue
controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
if controller:
return controller
return None
def get_slice_from_pod(self, pod, trust_domain, principal):
""" Given a pod, determine which XOS Slice goes with it
If the Slice doesn't exist, create it.
"""
controller = self.get_controller_from_obj(pod, trust_domain)
if not controller:
return None
slice_name = controller.metadata.name
if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None:
if "xos_slice_name" in controller.metadata.labels:
# Someone has labeled the controller with an xos slice name. Use it.
slice_name = controller.metadata.labels["xos_slice_name"]
existing_slices = Slice.objects.filter(name = slice_name)
if not existing_slices:
# TODO(smbaker): atomicity
s = Slice(name=slice_name, site = Site.objects.first(),
trust_domain=trust_domain,
principal=principal,
backend_handle=self.obj_to_handle(controller),
controller_kind=controller.kind,
xos_managed=False)
s.save()
return s
else:
return existing_slices[0]
def get_trustdomain_from_pod(self, pod, owner_service):
""" Given a pod, determine which XOS TrustDomain goes with it
If the TrustDomain doesn't exist, create it.
"""
existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
if not existing_trustdomains:
k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
# TODO(smbaker): atomicity
t = TrustDomain(name = pod.metadata.namespace,
xos_managed=False,
owner=owner_service,
backend_handle = self.obj_to_handle(k8s_trust_domain))
t.save()
return t
else:
return existing_trustdomains[0]
def get_principal_from_pod(self, pod, trust_domain):
""" Given a pod, determine which XOS Principal goes with it
If the Principal doesn't exist, create it.
"""
principal_name = getattr(pod.spec, "service_account", None)
if not principal_name:
return None
existing_principals = Principal.objects.filter(name = principal_name)
if not existing_principals:
k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
# TODO(smbaker): atomicity
p = Principal(name = principal_name,
trust_domain = trust_domain,
xos_managed = False,
backend_handle = self.obj_to_handle(k8s_service_account))
p.save()
return p
else:
return existing_principals[0]
def get_image_from_pod(self, pod):
""" Given a pod, determine which XOS Image goes with it
If the Image doesn't exist, create it.
"""
containers = pod.spec.containers
if containers:
# TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
container = containers[0]
if ":" in container.image:
(name, tag) = container.image.rsplit(":", 1)
else:
# Is assuming a default necessary?
name = container.image
tag = "master"
existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
if not existing_images:
i = Image(name=name, tag=tag, kind="container", xos_managed=False)
i.save()
return i
else:
return existing_images[0]
else:
return None
def send_notification(self, xos_pod, k8s_pod, status):
event = {"status": status,
"name": xos_pod.name,
"producer": "k8s-sync"}
if xos_pod.id:
event["kubernetesserviceinstance_id"] = xos_pod.id
if k8s_pod:
event["labels"] = k8s_pod.metadata.labels
if k8s_pod.status.pod_ip:
event["netinterfaces"] = [{"name": "primary",
"addresses": [k8s_pod.status.pod_ip]}]
topic = "xos.kubernetes.pod-details"
key = xos_pod.name
value = json.dumps(event, default=lambda o: repr(o))
XOSKafkaProducer.produce(topic, key, value)
def pull_records(self):
# Read all pods from Kubernetes, store them in k8s_pods_by_name
k8s_pods_by_name = {}
ret = self.v1core.list_pod_for_all_namespaces(watch=False)
for item in ret.items:
k8s_pods_by_name[item.metadata.name] = item
# Read all pods from XOS, store them in xos_pods_by_name
xos_pods_by_name = {}
existing_pods = KubernetesServiceInstance.objects.all()
for pod in existing_pods:
xos_pods_by_name[pod.name] = pod
kubernetes_services = KubernetesService.objects.all()
if len(kubernetes_services)==0:
raise Exception("There are no Kubernetes Services yet")
if len(kubernetes_services)>1:
# Simplifying assumption -- there is only one Kubernetes Service
raise Exception("There are too many Kubernetes Services")
kubernetes_service = kubernetes_services[0]
# For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
for (k,pod) in k8s_pods_by_name.items():
try:
if not k in xos_pods_by_name:
trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
if not trust_domain:
# All kubernetes pods should belong to a namespace. If we can't find the namespace, then
# something is very wrong in K8s.
log.warning("Unable to determine trust_domain for pod %s. Ignoring." % k)
continue
principal = self.get_principal_from_pod(pod, trust_domain)
slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
image = self.get_image_from_pod(pod)
if not slice:
# We could get here if the pod doesn't have a controller, or if the controller is of a kind
# that we don't understand (such as the Etcd controller). If so, the pod is not something we
# are interested in.
log.debug("Unable to determine slice for pod %s. Ignoring." % k)
continue
xos_pod = KubernetesServiceInstance(name=k,
pod_ip = pod.status.pod_ip,
owner = kubernetes_service,
slice = slice,
image = image,
backend_handle = self.obj_to_handle(pod),
xos_managed = False,
need_event = True)
xos_pod.save()
xos_pods_by_name[k] = xos_pod
log.info("Created XOS POD %s" % xos_pod.name)
xos_pod = xos_pods_by_name[k]
# Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
# isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
# here.
if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
xos_pod.pod_ip = pod.status.pod_ip
xos_pod.need_event = True # Trigger a new kafka event
xos_pod.save(update_fields = ["pod_ip", "need_event"])
log.info("Updated XOS POD %s" % xos_pod.name)
# Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
# so, then we'll try to send the event again later.
if (xos_pod.need_event):
if xos_pod.last_event_sent == "created":
event_kind = "updated"
else:
event_kind = "created"
self.send_notification(xos_pod, pod, event_kind)
xos_pod.need_event = False
xos_pod.last_event_sent = event_kind
xos_pod.save(update_fields=["need_event", "last_event_sent"])
except:
log.exception("Failed to process k8s pod", k=k, pod=pod)
# For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
for (k,xos_pod) in xos_pods_by_name.items():
try:
if (not k in k8s_pods_by_name):
if (xos_pod.xos_managed):
# Should we do something so it gets re-created by the syncstep?
pass
else:
self.send_notification(xos_pod, None, "deleted")
xos_pod.delete()
log.info("Deleted XOS POD %s" % k)
except:
log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)