blob: cb8883a3d4270a20967f6f005f409383919eb9c5 [file] [log] [blame]
Richard Jankowski461cb972018-04-11 15:36:27 -04001from kubernetes import client, config
2from common.utils.consulhelpers import get_all_instances_of_service, \
3 verify_all_services_healthy
4
5VOLTHA_NAMESPACE = 'voltha'
6
7def get_orch_environment(orch_env):
8 if orch_env == 'k8s-single-node':
9 return KubernetesEnvironment()
10 else:
11 return DockerComposeEnvironment()
12
13class OrchestrationEnvironment:
14
15 def verify_all_services_healthy(self, service_name=None,
16 number_of_expected_services=None):
17 raise NotImplementedError('verify_all_services_healthy must be defined')
18
19 def get_all_instances_of_service(self, service_name, port_name=None):
20 raise NotImplementedError('get_all_instances_of_service must be defined')
21
22class DockerComposeEnvironment(OrchestrationEnvironment):
23
24 LOCAL_CONSUL = "localhost:8500"
25
26 def verify_all_services_healthy(self, service_name=None,
27 number_of_expected_services=None):
28 return verify_all_services_healthy(self.LOCAL_CONSUL, service_name,
29 number_of_expected_services)
30
31 def get_all_instances_of_service(self, service_name, port_name=None):
32 return get_all_instances_of_service(self.LOCAL_CONSUL, service_name)
33
34class KubernetesEnvironment(OrchestrationEnvironment):
35
36 config.load_kube_config()
37 k8s_client = client.CoreV1Api()
38
39 def verify_all_services_healthy(self, service_name=None,
40 number_of_expected_services=None):
41
42 def check_health(service):
43 healthy = True
44 if service is None:
45 healthy = False
46 else:
47 pods = self.get_all_pods_for_service(service.metadata.name)
48 for pod in pods:
49 if pod.status.phase != 'Running':
50 healthy = False
51 return healthy
52
53 if service_name is not None:
54 return check_health(self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE))
55
56 services = self.k8s_client.list_namespaced_service(VOLTHA_NAMESPACE, watch=False)
57 if number_of_expected_services is not None and \
58 len(services.items) != number_of_expected_services:
59 return False
60
61 for svc in services.items:
62 if not check_health(svc):
63 return False
64
65 return True
66
67 def get_all_instances_of_service(self, service_name, port_name=None):
68 # Get service ports
69 port_num = None
70 svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE)
71 if svc is not None:
72 ports = svc.spec.ports
73 for port in ports:
74 if port.name == port_name:
75 port_num = port.port
76
77 pods = self.get_all_pods_for_service(service_name)
78 services = []
79 for pod in pods:
80 service = {}
81 service['ServiceAddress'] = pod.status.pod_ip
82 service['ServicePort'] = port_num
83 services.append(service)
84 return services
85
86 def get_all_pods_for_service(self, service_name):
87 '''
88 A Service is tied to the Pods that handle it via the Service's spec.selector.app
89 property, whose value matches that of the spec.template.metadata.labels.app property
90 of the Pods' controller. The controller, in turn, sets each pod's metadata.labels.app
91 property to that same value. In Voltha, the 'app' property is set to the service's
92 name. This function extracts the value of the service's 'app' selector and then
93 searches all pods that have an 'app' label set to the same value.
94
95 :param service_name
96 :return: A list of the pods handling service_name
97 '''
98 pods = []
99 svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE)
100 if svc is not None and 'app' in svc.spec.selector:
101 app_label = svc.spec.selector['app']
102 ret = self.k8s_client.list_namespaced_pod(VOLTHA_NAMESPACE, watch=False)
103 for pod in ret.items:
104 labels = pod.metadata.labels
105 if 'app' in labels and labels['app'] == app_label:
106 pods.append(pod)
107 return pods