blob: 32d8e1f8f2247e3e63227478ab843822c5ffda96 [file] [log] [blame]
Matteo Scandolo04287a82018-10-02 15:25:26 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
16import json
17import functools
18from mock import patch, call, Mock, PropertyMock
19import requests_mock
20
21import os, sys
22
23# Hack to load synchronizer framework
24test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
25xos_dir=os.path.join(test_path, "../../..")
26if not os.path.exists(os.path.join(test_path, "new_base")):
27 xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
28 services_dir = os.path.join(xos_dir, "../../xos_services")
29sys.path.append(xos_dir)
30sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
31# END Hack to load synchronizer framework
32
33# generate model from xproto
34def get_models_fn(service_name, xproto_name):
35 name = os.path.join(service_name, "xos", xproto_name)
36 if os.path.exists(os.path.join(services_dir, name)):
37 return name
38 else:
39 name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
40 if os.path.exists(os.path.join(services_dir, name)):
41 return name
42 raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
43# END generate model from xproto
44
45class TestKubernetesEvent(unittest.TestCase):
46
47 def setUp(self):
48 global DeferredException
49
50 self.sys_path_save = sys.path
51 sys.path.append(xos_dir)
52 sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
53
54 config = os.path.join(test_path, "../test_config.yaml")
55 from xosconfig import Config
56 Config.clear()
57 Config.init(config, 'synchronizer-config-schema.yaml')
58
59 from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor
60 build_mock_modelaccessor(xos_dir, services_dir, [
61 get_models_fn("olt-service", "volt.xproto"),
62 get_models_fn("vsg", "vsg.xproto"),
63 get_models_fn("../profiles/rcord", "rcord.xproto"),
64 get_models_fn("onos-service", "onos.xproto"),
65 ])
66
67 import synchronizers.new_base.mock_modelaccessor
68 reload(synchronizers.new_base.mock_modelaccessor) # in case nose2 loaded it in a previous test
69
70 import synchronizers.new_base.modelaccessor
71 reload(synchronizers.new_base.modelaccessor) # in case nose2 loaded it in a previous test
72
73 from synchronizers.new_base.modelaccessor import model_accessor
74 from mock_modelaccessor import MockObjectList
75
76 from kubernetes_event import KubernetesPodDetailsEventStep
77
78 # import all class names to globals
79 for (k, v) in model_accessor.all_model_classes.items():
80 globals()[k] = v
81
82 self.event_step = KubernetesPodDetailsEventStep
83
84 self.onos = ONOSService(name="myonos",
85 id=1111,
86 rest_hostname = "onos-url",
87 rest_port = "8181",
88 rest_username = "karaf",
89 rest_password = "karaf",
90 backend_code=1,
91 backend_status="succeeded")
92
93 self.fcservice = VOLTService(name="myoltservice",
94 id=1112,
95 backend_code=1,
96 backend_status="succeeded",
97 subscriber_services=[self.onos])
98
99 self.fcsi1 = VOLTServiceInstance(name="myfcsi1",
100 owner=self.fcservice,
101 backend_code=1,
102 backend_status="succeeded")
103
104 self.fcsi2 = VOLTServiceInstance(name="myfcsi2",
105 owner=self.fcservice,
106 backend_code=1,
107 backend_status="succeeded")
108
109 self.fcservice.service_instances = MockObjectList([self.fcsi1, self.fcsi2])
110
111 self.log = Mock()
112
113 def tearDown(self):
114 sys.path = self.sys_path_save
115
116 def test_process_event(self):
117 with patch.object(VOLTService.objects, "get_items") as fcservice_objects, \
118 patch.object(Service.objects, "get_items") as service_objects, \
119 patch.object(VOLTServiceInstance, "save", autospec=True) as fcsi_save:
120 fcservice_objects.return_value = [self.fcservice]
121 service_objects.return_value = [self.onos, self.fcservice]
122
123 event_dict = {"status": "created",
124 "labels": {"xos_service": "myonos"}}
125 event = Mock()
126 event.value = json.dumps(event_dict)
127
128 step = self.event_step(log=self.log)
129 step.process_event(event)
130
131 self.assertEqual(self.fcsi1.backend_code, 0)
132 self.assertEqual(self.fcsi1.backend_status, "resynchronize due to kubernetes event")
133
134 self.assertEqual(self.fcsi2.backend_code, 0)
135 self.assertEqual(self.fcsi2.backend_status, "resynchronize due to kubernetes event")
136
137 fcsi_save.assert_has_calls([call(self.fcsi1, update_fields=["updated", "backend_code", "backend_status"],
138 always_update_timestamp=True),
139 call(self.fcsi2, update_fields=["updated", "backend_code", "backend_status"],
140 always_update_timestamp=True)])
141
142 def test_process_event_unknownstatus(self):
143 with patch.object(VOLTService.objects, "get_items") as fcservice_objects, \
144 patch.object(Service.objects, "get_items") as service_objects, \
145 patch.object(VOLTServiceInstance, "save") as fcsi_save:
146 fcservice_objects.return_value = [self.fcservice]
147 service_objects.return_value = [self.onos, self.fcservice]
148
149 event_dict = {"status": "something_else",
150 "labels": {"xos_service": "myonos"}}
151 event = Mock()
152 event.value = json.dumps(event_dict)
153
154 step = self.event_step(log=self.log)
155 step.process_event(event)
156
157 self.assertEqual(self.fcsi1.backend_code, 1)
158 self.assertEqual(self.fcsi1.backend_status, "succeeded")
159
160 self.assertEqual(self.fcsi2.backend_code, 1)
161 self.assertEqual(self.fcsi2.backend_status, "succeeded")
162
163 fcsi_save.assert_not_called()
164
165 def test_process_event_unknownservice(self):
166 with patch.object(VOLTService.objects, "get_items") as fcservice_objects, \
167 patch.object(Service.objects, "get_items") as service_objects, \
168 patch.object(VOLTServiceInstance, "save") as fcsi_save:
169 fcservice_objects.return_value = [self.fcservice]
170 service_objects.return_value = [self.onos, self.fcservice]
171
172 event_dict = {"status": "created",
173 "labels": {"xos_service": "something_else"}}
174 event = Mock()
175 event.value = json.dumps(event_dict)
176
177 step = self.event_step(log=self.log)
178 step.process_event(event)
179
180 self.assertEqual(self.fcsi1.backend_code, 1)
181 self.assertEqual(self.fcsi1.backend_status, "succeeded")
182
183 self.assertEqual(self.fcsi2.backend_code, 1)
184 self.assertEqual(self.fcsi2.backend_status, "succeeded")
185
186 fcsi_save.assert_not_called()
187
188 def test_process_event_nolabels(self):
189 with patch.object(VOLTService.objects, "get_items") as fcservice_objects, \
190 patch.object(Service.objects, "get_items") as service_objects, \
191 patch.object(VOLTServiceInstance, "save") as fcsi_save:
192 fcservice_objects.return_value = [self.fcservice]
193 service_objects.return_value = [self.onos, self.fcservice]
194
195 event_dict = {"status": "created"}
196 event = Mock()
197 event.value = json.dumps(event_dict)
198
199 step = self.event_step(log=self.log)
200 step.process_event(event)
201
202 self.assertEqual(self.fcsi1.backend_code, 1)
203 self.assertEqual(self.fcsi1.backend_status, "succeeded")
204
205 self.assertEqual(self.fcsi2.backend_code, 1)
206 self.assertEqual(self.fcsi2.backend_status, "succeeded")
207
208 fcsi_save.assert_not_called()
209
210if __name__ == '__main__':
211 unittest.main()
212
213
214