blob: a0e5a4dc56c6c0f2fa89641e443aa27306ff2745 [file] [log] [blame]
Matteo Scandolocc94e902018-05-22 15:25:25 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
Scott Bakerb1671512019-04-01 19:16:54 -070016import os
17import sys
Matteo Scandolocc94e902018-05-22 15:25:25 -070018from mock import patch, Mock, MagicMock
19
Scott Bakerb1671512019-04-01 19:16:54 -070020test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
21service_dir = os.path.join(test_path, "../../../..")
22xos_dir = os.path.join(test_path, "../../..")
Matteo Scandolo0813b882018-07-16 14:38:01 -040023if not os.path.exists(os.path.join(test_path, "new_base")):
Scott Bakerb1671512019-04-01 19:16:54 -070024 xos_dir = os.path.join(test_path, "../../../../../../orchestration/xos/xos")
25 services_dir = os.path.join(xos_dir, "../../xos_services")
Matteo Scandolo0813b882018-07-16 14:38:01 -040026
Matteo Scandolocc94e902018-05-22 15:25:25 -070027# mocking XOS exception, as they're based in Django
Scott Bakerb1671512019-04-01 19:16:54 -070028
29
Matteo Scandolocc94e902018-05-22 15:25:25 -070030class Exceptions:
31 XOSValidationError = Exception
32 XOSProgrammingError = Exception
33 XOSPermissionDenied = Exception
vigneshethiraj4d3e5972019-04-04 19:22:14 +053034 XOSConfigurationError = Exception
Matteo Scandolocc94e902018-05-22 15:25:25 -070035
Scott Bakerb1671512019-04-01 19:16:54 -070036
Matteo Scandolocc94e902018-05-22 15:25:25 -070037class XOS:
38 exceptions = Exceptions
39
Scott Bakerb1671512019-04-01 19:16:54 -070040
Matteo Scandolocc94e902018-05-22 15:25:25 -070041class TestRCORDModels(unittest.TestCase):
42 def setUp(self):
Matteo Scandolo0813b882018-07-16 14:38:01 -040043
44 self.sys_path_save = sys.path
45 sys.path.append(xos_dir)
46
Matteo Scandolocc94e902018-05-22 15:25:25 -070047 self.xos = XOS
48
49 self.models_decl = Mock()
50 self.models_decl.RCORDSubscriber_decl = MagicMock
51 self.models_decl.RCORDSubscriber_decl.save = Mock()
52 self.models_decl.RCORDSubscriber_decl.objects = Mock()
53 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = []
54
Luca Preted6700ba2018-09-12 16:40:49 -070055 self.models_decl.RCORDIpAddress_decl = MagicMock
56 self.models_decl.RCORDIpAddress_decl.save = Mock()
57 self.models_decl.RCORDIpAddress_decl.objects = Mock()
58 self.models_decl.RCORDIpAddress_decl.objects.filter.return_value = []
59
Matteo Scandolocc94e902018-05-22 15:25:25 -070060 modules = {
61 'xos.exceptions': self.xos.exceptions,
62 'models_decl': self.models_decl
63 }
64
65 self.module_patcher = patch.dict('sys.modules', modules)
66 self.module_patcher.start()
67
68 self.volt = Mock()
69
Luca Preted6700ba2018-09-12 16:40:49 -070070 from models import RCORDSubscriber, RCORDIpAddress
Matteo Scandolocc94e902018-05-22 15:25:25 -070071
72 self.rcord_subscriber_class = RCORDSubscriber
73
74 self.rcord_subscriber = RCORDSubscriber()
Matteo Scandolo10d94512019-03-04 15:54:43 -080075 self.rcord_subscriber.deleted = False
Scott Bakerb1671512019-04-01 19:16:54 -070076 self.rcord_subscriber.id = None # this is a new model
Matteo Scandoloaa37b422018-05-23 15:36:56 -070077 self.rcord_subscriber.is_new = True
Matteo Scandolocc94e902018-05-22 15:25:25 -070078 self.rcord_subscriber.onu_device = "BRCM1234"
Matteo Scandoloaa37b422018-05-23 15:36:56 -070079 self.rcord_subscriber.c_tag = 111
Matteo Scandoloc348b3f2018-07-29 09:35:11 -070080 self.rcord_subscriber.s_tag = 222
Luca Preted6700ba2018-09-12 16:40:49 -070081 self.rcord_subscriber.ips = Mock()
82 self.rcord_subscriber.ips.all.return_value = []
Matteo Scandolocc94e902018-05-22 15:25:25 -070083 self.rcord_subscriber.mac_address = "00:AA:00:00:00:01"
84 self.rcord_subscriber.owner.leaf_model.access = "voltha"
85 self.rcord_subscriber.owner.provider_services = [self.volt]
vigneshethiraj4d3e5972019-04-04 19:22:14 +053086 self.rcord_subscriber.list_of_unused_c_tags_for_s_tag = []
Matteo Scandolocc94e902018-05-22 15:25:25 -070087
Luca Preted6700ba2018-09-12 16:40:49 -070088 self.rcord_ip = RCORDIpAddress()
Scott Bakerb1671512019-04-01 19:16:54 -070089 self.rcord_ip.subscriber = 1
Luca Preted6700ba2018-09-12 16:40:49 -070090
Matteo Scandolo0813b882018-07-16 14:38:01 -040091 def tearDown(self):
92 sys.path = self.sys_path_save
Matteo Scandolocc94e902018-05-22 15:25:25 -070093
94 def test_save(self):
95 self.rcord_subscriber.save()
96 self.models_decl.RCORDSubscriber_decl.save.assert_called()
97
Luca Preted6700ba2018-09-12 16:40:49 -070098 def _test_validate_ipv4_address(self):
99 self.rcord_ip.ip = "192.168.0."
Matteo Scandolocc94e902018-05-22 15:25:25 -0700100 with self.assertRaises(Exception) as e:
Luca Preted6700ba2018-09-12 16:40:49 -0700101 self.rcord_ip.save()
Matteo Scandolocc94e902018-05-22 15:25:25 -0700102
Luca Preted6700ba2018-09-12 16:40:49 -0700103 self.assertEqual(e.exception.message, "The IP specified is not valid: 192.168.0.")
104 self.models_decl.RCORDIpAddress.save.assert_not_called()
105
106 def test_validate_ipv6_address(self):
107 self.rcord_ip.ip = "2001:0db8:85a3:0000:0000:8a2e:03"
108 with self.assertRaises(Exception) as e:
109 self.rcord_ip.save()
110
111 self.assertEqual(e.exception.message, "The IP specified is not valid: 2001:0db8:85a3:0000:0000:8a2e:03")
112 self.models_decl.RCORDIpAddress.save.assert_not_called()
Matteo Scandolocc94e902018-05-22 15:25:25 -0700113
114 def test_validate_mac_address(self):
115 self.rcord_subscriber.mac_address = "invalid"
116 with self.assertRaises(Exception) as e:
117 self.rcord_subscriber.save()
118
Luca Preted6700ba2018-09-12 16:40:49 -0700119 self.assertEqual(e.exception.message, "The MAC address specified is not valid: invalid")
Matteo Scandolocc94e902018-05-22 15:25:25 -0700120 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
121
122 def test_valid_onu_device(self):
123 self.rcord_subscriber.save()
124 self.models_decl.RCORDSubscriber_decl.save.assert_called()
125
126 def test_invalid_onu_device(self):
127 self.volt.leaf_model.has_access_device.return_value = False
128 with self.assertRaises(Exception) as e:
129 self.rcord_subscriber.save()
130
131 self.assertEqual(e.exception.message, "The onu_device you specified (BRCM1234) does not exists")
132 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
133
Matteo Scandolo10d94512019-03-04 15:54:43 -0800134 def test_missing_onu_device_on_delete(self):
135 self.volt.leaf_model.has_access_device.return_value = False
136 self.rcord_subscriber.deleted = True
137 self.rcord_subscriber.save()
138 self.models_decl.RCORDSubscriber_decl.save.assert_called()
139
Matteo Scandoloa1875602018-10-16 07:35:04 -0700140 def test_validate_c_tag_pass(self):
141 """
142 check that other subscriber attached to the same ONU don't have the same c_tag
143 """
144
145 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = [self.rcord_subscriber]
146
Matteo Scandoloa1875602018-10-16 07:35:04 -0700147 self.rcord_subscriber.save()
148
149 self.models_decl.RCORDSubscriber_decl.save.assert_called()
150
151 def test_validate_c_tag_fail(self):
Matteo Scandolocc94e902018-05-22 15:25:25 -0700152 """
153 check that other subscriber attached to the same ONU don't have the same c_tag
154 """
155
156 s = Mock()
Matteo Scandoloaa37b422018-05-23 15:36:56 -0700157 s.c_tag = 111
Matteo Scandolocc94e902018-05-22 15:25:25 -0700158 s.onu_device = "BRCM1234"
159
Matteo Scandoloa1875602018-10-16 07:35:04 -0700160 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = [s, self.rcord_subscriber]
Matteo Scandolocc94e902018-05-22 15:25:25 -0700161
162 with self.assertRaises(Exception) as e:
163 self.rcord_subscriber.save()
164
165 self.assertEqual(e.exception.message, "The c_tag you specified (111) has already been used on device BRCM1234")
166 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
167
HARDIK WINDLASSabe3a9c2019-03-18 16:17:28 +0530168 def test_validate_get_used_s_c_tag_subscriber_id(self):
Matteo Scandoloa1875602018-10-16 07:35:04 -0700169 """
170 check that other subscriber using the same s_tag don't have the same c_tag
171 """
HARDIK WINDLASSabe3a9c2019-03-18 16:17:28 +0530172 s1 = Mock()
173 s1.id = 456
174 s1.c_tag = 999
175 s1.s_tag = 222
176 s1.onu_device = "BRCM1234"
177
178 s2 = Mock()
179 s2.id = 123
180 s2.c_tag = 111
181 s2.s_tag = 222
182 s2.onu_device = "BRCM12345"
183
184 self.rcord_subscriber.get_same_onu_subscribers = Mock()
185 self.rcord_subscriber.get_same_onu_subscribers.return_value = [s1]
186
187 self.rcord_subscriber.get_same_s_c_tag_subscribers = Mock()
188 self.rcord_subscriber.get_same_s_c_tag_subscribers.return_value = [s2]
Matteo Scandoloa1875602018-10-16 07:35:04 -0700189
190 with self.assertRaises(Exception) as e:
191 self.rcord_subscriber.save()
192
Scott Bakerb1671512019-04-01 19:16:54 -0700193 self.assertEqual(e.exception.message,
194 "The c_tag(111) and s_tag(222) pair you specified,has already been used by Subscriber with Id (123)")
Matteo Scandoloa1875602018-10-16 07:35:04 -0700195 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
196
Matteo Scandoloaa37b422018-05-23 15:36:56 -0700197 def test_validate_c_tag_on_update(self):
198 s = Mock()
199 s.c_tag = 111
200 s.onu_device = "BRCM1234"
201 s.id = 1
202
203 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = [s]
204
205 self.rcord_subscriber.is_new = False
206 self.rcord_subscriber.id = 1
207 self.rcord_subscriber.save()
208
209 self.models_decl.RCORDSubscriber_decl.save.assert_called()
210
211 def test_validate_c_tag_on_update_fail(self):
212 s = Mock()
213 s.c_tag = 222
214 s.onu_device = "BRCM1234"
215 s.id = 2
216
217 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = [s]
218
219 self.rcord_subscriber.id = 1
220 self.rcord_subscriber.is_new = False
221 self.rcord_subscriber.c_tag = 222
222 with self.assertRaises(Exception) as e:
223 self.rcord_subscriber.save()
224
225 self.assertEqual(e.exception.message, "The c_tag you specified (222) has already been used on device BRCM1234")
226 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
Matteo Scandolocc94e902018-05-22 15:25:25 -0700227
228 def test_generate_c_tag(self):
229 s = Mock()
230 s.c_tag = "111"
HARDIK WINDLASSabe3a9c2019-03-18 16:17:28 +0530231 s.s_tag = "223"
Matteo Scandolocc94e902018-05-22 15:25:25 -0700232 s.onu_device = "BRCM1234"
vigneshethiraj4d3e5972019-04-04 19:22:14 +0530233
HARDIK WINDLASSabe3a9c2019-03-18 16:17:28 +0530234 self.rcord_subscriber.get_same_onu_subscribers = Mock()
235 self.rcord_subscriber.get_same_onu_subscribers.return_value = [s]
236
237 self.rcord_subscriber.get_same_s_c_tag_subscribers = Mock()
238 self.rcord_subscriber.get_same_s_c_tag_subscribers.return_value = []
239
Matteo Scandolocc94e902018-05-22 15:25:25 -0700240 self.rcord_subscriber.c_tag = None
241
242 self.rcord_subscriber.save()
243
244 self.models_decl.RCORDSubscriber_decl.save.assert_called()
245 self.assertNotEquals(self.rcord_subscriber.c_tag, "111")
246 self.assertGreater(self.rcord_subscriber.c_tag, 16)
247 self.assertLess(self.rcord_subscriber.c_tag, 4097)
248
vigneshethiraj4d3e5972019-04-04 19:22:14 +0530249 #Testing whether the random generation choses c_tag from the list provided
250 self.rcord_subscriber.c_tag = None
251 self.rcord_subscriber.s_tag = None
252 self.rcord_subscriber.unused_c_tags_for_s_tag = Mock()
253 self.rcord_subscriber.unused_c_tags_for_s_tag.return_value = [18,19]
254
255 self.rcord_subscriber.save()
256 self.models_decl.RCORDSubscriber_decl.save.assert_called()
257 self.assertGreater(self.rcord_subscriber.c_tag, 17)
258 self.assertLess(self.rcord_subscriber.c_tag, 20)
259
260 self.rcord_subscriber.c_tag = None
261 self.rcord_subscriber.s_tag = None
262
263 self.rcord_subscriber.save()
264 self.models_decl.RCORDSubscriber_decl.save.assert_called()
265 self.assertGreater(self.rcord_subscriber.c_tag, 16)
266 self.assertLess(self.rcord_subscriber.c_tag, 4096)
267
268 def test_unused_c_tags_for_s_tag(self):
269 s=[]
270 for i in range(16,4097):
271 d = Mock()
272 d.c_tag = i
273 d.s_tag = "222"
274 d.onu_device = "BRCM1234"
275 s.append(d)
276
277 self.rcord_subscriber.get_same_onu_subscribers = Mock()
278 self.rcord_subscriber.get_same_onu_subscribers.return_value = []
279 self.rcord_subscriber.get_same_s_c_tag_subscribers = Mock()
280 self.rcord_subscriber.get_same_s_c_tag_subscribers.return_value = []
281
282 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = s
283 self.rcord_subscriber.s_tag = 222
284 self.rcord_subscriber.c_tag = None
285 with self.assertRaises(Exception) as e:
286 self.rcord_subscriber.save()
287 self.assertEqual(e.exception.message, "All the c_tags are exhausted for this s_tag: 222")
288 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
289
290 def test_unused_s_tags_for_c_tag(self):
291 s=[]
292 for i in range(16,4097):
293 d = Mock()
294 d.s_tag = i
295 d.c_tag = "111"
296 d.onu_device = "BRCM1234"
297 s.append(d)
298
299 self.rcord_subscriber.get_same_onu_subscribers = Mock()
300 self.rcord_subscriber.get_same_onu_subscribers.return_value = []
301 self.rcord_subscriber.get_same_s_c_tag_subscribers = Mock()
302 self.rcord_subscriber.get_same_s_c_tag_subscribers.return_value = []
303
304 self.models_decl.RCORDSubscriber_decl.objects.filter.return_value = s
305 self.rcord_subscriber.c_tag = 111
306 self.rcord_subscriber.s_tag = None
307 with self.assertRaises(Exception) as e:
308 self.rcord_subscriber.save()
309 self.assertEqual(e.exception.message, "All the s_tags are exhausted for this c_tag: 111")
310 self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
311
312
313
Matteo Scandoloc348b3f2018-07-29 09:35:11 -0700314 def test_generate_s_tag(self):
HARDIK WINDLASSabe3a9c2019-03-18 16:17:28 +0530315 self.rcord_subscriber.s_tag = None
Matteo Scandoloc348b3f2018-07-29 09:35:11 -0700316
317 self.rcord_subscriber.save()
318
319 self.models_decl.RCORDSubscriber_decl.save.assert_called()
320 self.assertNotEqual(self.rcord_subscriber.s_tag, None)
321
vigneshethiraj4d3e5972019-04-04 19:22:14 +0530322
Matteo Scandoloc348b3f2018-07-29 09:35:11 -0700323 def test_provisioned_s_stag(self):
324 self.rcord_subscriber.save()
325 self.models_decl.RCORDSubscriber_decl.save.assert_called()
326 self.assertEqual(self.rcord_subscriber.s_tag, 222)
327
328
Matteo Scandolocc94e902018-05-22 15:25:25 -0700329if __name__ == '__main__':
330 unittest.main()