Richard Jankowski | 461cb97 | 2018-04-11 15:36:27 -0400 | [diff] [blame] | 1 | from kubernetes import client, config |
| 2 | from common.utils.consulhelpers import get_all_instances_of_service, \ |
| 3 | verify_all_services_healthy |
| 4 | |
| 5 | VOLTHA_NAMESPACE = 'voltha' |
| 6 | |
| 7 | def get_orch_environment(orch_env): |
| 8 | if orch_env == 'k8s-single-node': |
| 9 | return KubernetesEnvironment() |
| 10 | else: |
| 11 | return DockerComposeEnvironment() |
| 12 | |
| 13 | class OrchestrationEnvironment: |
| 14 | |
| 15 | def verify_all_services_healthy(self, service_name=None, |
| 16 | number_of_expected_services=None): |
| 17 | raise NotImplementedError('verify_all_services_healthy must be defined') |
| 18 | |
| 19 | def get_all_instances_of_service(self, service_name, port_name=None): |
| 20 | raise NotImplementedError('get_all_instances_of_service must be defined') |
| 21 | |
| 22 | class DockerComposeEnvironment(OrchestrationEnvironment): |
| 23 | |
| 24 | LOCAL_CONSUL = "localhost:8500" |
| 25 | |
| 26 | def verify_all_services_healthy(self, service_name=None, |
| 27 | number_of_expected_services=None): |
| 28 | return verify_all_services_healthy(self.LOCAL_CONSUL, service_name, |
| 29 | number_of_expected_services) |
| 30 | |
| 31 | def get_all_instances_of_service(self, service_name, port_name=None): |
| 32 | return get_all_instances_of_service(self.LOCAL_CONSUL, service_name) |
| 33 | |
| 34 | class KubernetesEnvironment(OrchestrationEnvironment): |
| 35 | |
| 36 | config.load_kube_config() |
| 37 | k8s_client = client.CoreV1Api() |
| 38 | |
| 39 | def verify_all_services_healthy(self, service_name=None, |
| 40 | number_of_expected_services=None): |
| 41 | |
| 42 | def check_health(service): |
| 43 | healthy = True |
| 44 | if service is None: |
| 45 | healthy = False |
| 46 | else: |
| 47 | pods = self.get_all_pods_for_service(service.metadata.name) |
| 48 | for pod in pods: |
| 49 | if pod.status.phase != 'Running': |
| 50 | healthy = False |
| 51 | return healthy |
| 52 | |
| 53 | if service_name is not None: |
| 54 | return check_health(self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE)) |
| 55 | |
| 56 | services = self.k8s_client.list_namespaced_service(VOLTHA_NAMESPACE, watch=False) |
| 57 | if number_of_expected_services is not None and \ |
| 58 | len(services.items) != number_of_expected_services: |
| 59 | return False |
| 60 | |
| 61 | for svc in services.items: |
| 62 | if not check_health(svc): |
| 63 | return False |
| 64 | |
| 65 | return True |
| 66 | |
| 67 | def get_all_instances_of_service(self, service_name, port_name=None): |
| 68 | # Get service ports |
| 69 | port_num = None |
| 70 | svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE) |
| 71 | if svc is not None: |
| 72 | ports = svc.spec.ports |
| 73 | for port in ports: |
| 74 | if port.name == port_name: |
| 75 | port_num = port.port |
| 76 | |
| 77 | pods = self.get_all_pods_for_service(service_name) |
| 78 | services = [] |
| 79 | for pod in pods: |
| 80 | service = {} |
| 81 | service['ServiceAddress'] = pod.status.pod_ip |
| 82 | service['ServicePort'] = port_num |
| 83 | services.append(service) |
| 84 | return services |
| 85 | |
| 86 | def get_all_pods_for_service(self, service_name): |
| 87 | ''' |
| 88 | A Service is tied to the Pods that handle it via the Service's spec.selector.app |
| 89 | property, whose value matches that of the spec.template.metadata.labels.app property |
| 90 | of the Pods' controller. The controller, in turn, sets each pod's metadata.labels.app |
| 91 | property to that same value. In Voltha, the 'app' property is set to the service's |
| 92 | name. This function extracts the value of the service's 'app' selector and then |
| 93 | searches all pods that have an 'app' label set to the same value. |
| 94 | |
| 95 | :param service_name |
| 96 | :return: A list of the pods handling service_name |
| 97 | ''' |
| 98 | pods = [] |
| 99 | svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE) |
| 100 | if svc is not None and 'app' in svc.spec.selector: |
| 101 | app_label = svc.spec.selector['app'] |
| 102 | ret = self.k8s_client.list_namespaced_pod(VOLTHA_NAMESPACE, watch=False) |
| 103 | for pod in ret.items: |
| 104 | labels = pod.metadata.labels |
| 105 | if 'app' in labels and labels['app'] == app_label: |
| 106 | pods.append(pod) |
| 107 | return pods |