blob: 6e09344e08e5fc070b8998194a2a66d2a23d4e39 [file] [log] [blame]
Zack Williams41513bf2018-07-07 20:08:35 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
Richard Jankowski461cb972018-04-11 15:36:27 -040014from kubernetes import client, config
15from common.utils.consulhelpers import get_all_instances_of_service, \
16 verify_all_services_healthy
17
18VOLTHA_NAMESPACE = 'voltha'
19
20def get_orch_environment(orch_env):
21 if orch_env == 'k8s-single-node':
22 return KubernetesEnvironment()
23 else:
24 return DockerComposeEnvironment()
25
26class OrchestrationEnvironment:
27
28 def verify_all_services_healthy(self, service_name=None,
29 number_of_expected_services=None):
30 raise NotImplementedError('verify_all_services_healthy must be defined')
31
32 def get_all_instances_of_service(self, service_name, port_name=None):
33 raise NotImplementedError('get_all_instances_of_service must be defined')
34
35class DockerComposeEnvironment(OrchestrationEnvironment):
36
37 LOCAL_CONSUL = "localhost:8500"
38
39 def verify_all_services_healthy(self, service_name=None,
40 number_of_expected_services=None):
41 return verify_all_services_healthy(self.LOCAL_CONSUL, service_name,
42 number_of_expected_services)
43
44 def get_all_instances_of_service(self, service_name, port_name=None):
45 return get_all_instances_of_service(self.LOCAL_CONSUL, service_name)
46
47class KubernetesEnvironment(OrchestrationEnvironment):
48
49 config.load_kube_config()
50 k8s_client = client.CoreV1Api()
51
52 def verify_all_services_healthy(self, service_name=None,
53 number_of_expected_services=None):
54
55 def check_health(service):
56 healthy = True
57 if service is None:
58 healthy = False
59 else:
60 pods = self.get_all_pods_for_service(service.metadata.name)
61 for pod in pods:
62 if pod.status.phase != 'Running':
63 healthy = False
64 return healthy
65
66 if service_name is not None:
67 return check_health(self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE))
68
69 services = self.k8s_client.list_namespaced_service(VOLTHA_NAMESPACE, watch=False)
70 if number_of_expected_services is not None and \
71 len(services.items) != number_of_expected_services:
72 return False
73
74 for svc in services.items:
75 if not check_health(svc):
76 return False
77
78 return True
79
80 def get_all_instances_of_service(self, service_name, port_name=None):
81 # Get service ports
82 port_num = None
83 svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE)
84 if svc is not None:
85 ports = svc.spec.ports
86 for port in ports:
87 if port.name == port_name:
88 port_num = port.port
89
90 pods = self.get_all_pods_for_service(service_name)
91 services = []
92 for pod in pods:
93 service = {}
94 service['ServiceAddress'] = pod.status.pod_ip
95 service['ServicePort'] = port_num
96 services.append(service)
97 return services
98
99 def get_all_pods_for_service(self, service_name):
100 '''
101 A Service is tied to the Pods that handle it via the Service's spec.selector.app
102 property, whose value matches that of the spec.template.metadata.labels.app property
103 of the Pods' controller. The controller, in turn, sets each pod's metadata.labels.app
104 property to that same value. In Voltha, the 'app' property is set to the service's
105 name. This function extracts the value of the service's 'app' selector and then
106 searches all pods that have an 'app' label set to the same value.
107
108 :param service_name
109 :return: A list of the pods handling service_name
110 '''
111 pods = []
112 svc = self.k8s_client.read_namespaced_service(service_name, VOLTHA_NAMESPACE)
113 if svc is not None and 'app' in svc.spec.selector:
114 app_label = svc.spec.selector['app']
115 ret = self.k8s_client.list_namespaced_pod(VOLTHA_NAMESPACE, watch=False)
116 for pod in ret.items:
117 labels = pod.metadata.labels
118 if 'app' in labels and labels['app'] == app_label:
119 pods.append(pod)
120 return pods