Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame^] | 1 | # |
| 2 | # Copyright 2018 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | from hashlib import md5 |
| 17 | from unittest import TestCase, main |
| 18 | from nose.tools import raises |
| 19 | from nose.twistedtools import deferred |
| 20 | from copy import deepcopy |
| 21 | from mock.mock_adapter_agent import MockAdapterAgent, MockCore |
| 22 | from mock.mock_onu_handler import MockOnuHandler |
| 23 | from mock.mock_olt_handler import MockOltHandler |
| 24 | from mock.mock_onu import MockOnu |
| 25 | from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent, OpenOmciAgentDefaults |
| 26 | from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion |
| 27 | from pyvoltha.adapters.extensions.omci.omci_defs import * |
| 28 | from pyvoltha.adapters.extensions.omci.omci_entities import OntG, Ont2G, Cardholder, \ |
| 29 | CircuitPack, SoftwareImage, AniG, UniG |
| 30 | from pyvoltha.common.utils.asleep import asleep |
| 31 | from pyvoltha.adapters.extensions.omci.database.mib_db_dict import MibDbVolatileDict |
| 32 | from datetime import datetime |
| 33 | |
| 34 | DEFAULT_OLT_DEVICE_ID = 'default_olt_mock' |
| 35 | DEFAULT_ONU_DEVICE_ID = 'default_onu_mock' |
| 36 | DEFAULT_PON_ID = 0 |
| 37 | DEFAULT_ONU_ID = 0 |
| 38 | DEFAULT_ONU_SN = 'TEST00000001' |
| 39 | |
| 40 | OP = EntityOperations |
| 41 | RC = ReasonCodes |
| 42 | |
| 43 | |
| 44 | class TestOmciConfiguration(TestCase): |
| 45 | """ |
| 46 | Test the OMCI read-only Configuration library methods |
| 47 | """ |
| 48 | def setUp(self): |
| 49 | self.adapter_agent = MockAdapterAgent() |
| 50 | |
| 51 | custom = deepcopy(OpenOmciAgentDefaults) |
| 52 | custom['mib-synchronizer']['database'] = MibDbVolatileDict |
| 53 | |
| 54 | self.omci_agent = OpenOMCIAgent(MockCore, support_classes=custom) |
| 55 | self.omci_agent.start() |
| 56 | |
| 57 | def tearDown(self): |
| 58 | if self.omci_agent is not None: |
| 59 | self.omci_agent.stop() |
| 60 | |
| 61 | if self.adapter_agent is not None: |
| 62 | self.adapter_agent.tearDown() |
| 63 | |
| 64 | def setup_mock_olt(self, device_id=DEFAULT_OLT_DEVICE_ID): |
| 65 | handler = MockOltHandler(self.adapter_agent, device_id) |
| 66 | self.adapter_agent.add_device(handler.device) |
| 67 | return handler |
| 68 | |
| 69 | def setup_mock_onu(self, parent_id=DEFAULT_OLT_DEVICE_ID, |
| 70 | device_id=DEFAULT_ONU_DEVICE_ID, |
| 71 | pon_id=DEFAULT_PON_ID, |
| 72 | onu_id=DEFAULT_ONU_ID, |
| 73 | serial_no=DEFAULT_ONU_SN): |
| 74 | handler = MockOnuHandler(self.adapter_agent, parent_id, device_id, pon_id, onu_id) |
| 75 | handler.serial_number = serial_no |
| 76 | onu = MockOnu(serial_no, self.adapter_agent, handler.device_id) \ |
| 77 | if serial_no is not None else None |
| 78 | handler.onu_mock = onu |
| 79 | return handler |
| 80 | |
| 81 | def setup_one_of_each(self): |
| 82 | # Most tests will use at lease one or more OLT and ONU |
| 83 | self.olt_handler = self.setup_mock_olt() |
| 84 | self.onu_handler = self.setup_mock_onu(parent_id=self.olt_handler.device_id) |
| 85 | self.onu_device = self.onu_handler.onu_mock |
| 86 | |
| 87 | self.adapter_agent.add_child_device(self.olt_handler.device, |
| 88 | self.onu_handler.device) |
| 89 | # Add device to OpenOMCI |
| 90 | self.onu_device = self.omci_agent.add_device(DEFAULT_ONU_DEVICE_ID, |
| 91 | self.adapter_agent) |
| 92 | |
| 93 | # Allow timeout trigger support while in disabled state for mib sync |
| 94 | # to make tests run cleanly while profiling. |
| 95 | self.onu_device.mib_synchronizer.machine.add_transition('timeout', 'disabled', 'disabled') |
| 96 | |
| 97 | def not_called(self, _reason): |
| 98 | assert False, 'Should never be called' |
| 99 | |
| 100 | def _stuff_database(self, entries): |
| 101 | """ |
| 102 | Stuff the MIB database with some entries that we will use during tests |
| 103 | """ |
| 104 | database = self.onu_device.mib_synchronizer._database |
| 105 | |
| 106 | # Stuff a value into last in sync. This makes it look like |
| 107 | # the ONU has been in in-sync at least once. |
| 108 | self.onu_device.mib_synchronizer.last_mib_db_sync = datetime.utcnow() |
| 109 | |
| 110 | # Entry is a tuple of (class_id, instance_id, {attributes}) |
| 111 | for entry in entries: |
| 112 | database.set(DEFAULT_ONU_DEVICE_ID, entry[0], entry[1], entry[2]) |
| 113 | |
| 114 | def test_OMCCVersion(self): |
| 115 | for key, value in OMCCVersion.__members__.items(): |
| 116 | self.assertEqual(OMCCVersion.to_enum(OMCCVersion[key].value), value) |
| 117 | |
| 118 | self.assertEqual(OMCCVersion.to_enum(-1), OMCCVersion.Unknown) |
| 119 | |
| 120 | @deferred(timeout=50000) |
| 121 | def test_defaults(self): |
| 122 | self.setup_one_of_each() |
| 123 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 124 | |
| 125 | @raises(AssertionError) |
| 126 | def do_my_tests(_results): |
| 127 | config = self.onu_device.configuration |
| 128 | # Should raise assertion if never been synchronized |
| 129 | config.version |
| 130 | |
| 131 | # No capabilities available until started |
| 132 | self.assertIsNone(self.onu_device.configuration) |
| 133 | |
| 134 | # Yield context so that MIB Database callLater runs. This is a waiting |
| 135 | # Async task from when the OpenOMCIAgent was started. But also start the |
| 136 | # device so that it's queued async state machines can run as well |
| 137 | self.onu_device.start() |
| 138 | d = asleep(0.2) |
| 139 | d.addCallbacks(do_my_tests, self.not_called) |
| 140 | return d |
| 141 | |
| 142 | @deferred(timeout=5) |
| 143 | def test_in_sync_but_empty(self): |
| 144 | self.setup_one_of_each() |
| 145 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 146 | |
| 147 | def stuff_db(_results): |
| 148 | self._stuff_database([]) |
| 149 | |
| 150 | def do_my_tests(_results): |
| 151 | config = self.onu_device.configuration |
| 152 | |
| 153 | # On no Class ID for requested property, None should be |
| 154 | # returned |
| 155 | self.assertIsNone(config.version) |
| 156 | self.assertIsNone(config.traffic_management_option) |
| 157 | self.assertIsNone(config.onu_survival_time) |
| 158 | self.assertIsNone(config.equipment_id) |
| 159 | self.assertIsNone(config.omcc_version) |
| 160 | self.assertIsNone(config.vendor_product_code) |
| 161 | self.assertIsNone(config.total_priority_queues) |
| 162 | self.assertIsNone(config.total_traffic_schedulers) |
| 163 | self.assertIsNone(config.total_gem_ports) |
| 164 | self.assertIsNone(config.uptime) |
| 165 | self.assertIsNone(config.connectivity_capability) |
| 166 | self.assertIsNone(config.qos_configuration_flexibility) |
| 167 | self.assertIsNone(config.priority_queue_scale_factor) |
| 168 | self.assertIsNone(config.cardholder_entities) |
| 169 | self.assertIsNone(config.circuitpack_entities) |
| 170 | self.assertIsNone(config.software_images) |
| 171 | self.assertIsNone(config.ani_g_entities) |
| 172 | self.assertIsNone(config.uni_g_entities) |
| 173 | |
| 174 | # No capabilities available until started |
| 175 | self.assertIsNone(self.onu_device.configuration) |
| 176 | |
| 177 | # Yield context so that MIB Database callLater runs. |
| 178 | self.onu_device.start() |
| 179 | d = asleep(0.2) |
| 180 | d.addCallbacks(stuff_db, self.not_called) |
| 181 | d.addCallbacks(do_my_tests, self.not_called) |
| 182 | return d |
| 183 | |
| 184 | @deferred(timeout=5) |
| 185 | def test_in_sync_with_ont_g_values(self): |
| 186 | self.setup_one_of_each() |
| 187 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 188 | |
| 189 | version = 'abcDEF' |
| 190 | tm_opt = 2 |
| 191 | onu_survival = 123 |
| 192 | |
| 193 | def stuff_db(_results): |
| 194 | self._stuff_database([ |
| 195 | (OntG.class_id, 0, {'version': version, |
| 196 | 'traffic_management_options': tm_opt, |
| 197 | 'ont_survival_time': onu_survival |
| 198 | })]) |
| 199 | |
| 200 | def do_my_tests(_results): |
| 201 | config = self.onu_device.configuration |
| 202 | |
| 203 | # On no Class ID for requested property, None should be |
| 204 | # returned |
| 205 | self.assertEqual(config.version, version) |
| 206 | self.assertEqual(config.traffic_management_option, tm_opt) |
| 207 | self.assertEqual(config.onu_survival_time, onu_survival) |
| 208 | |
| 209 | # No capabilities available until started |
| 210 | self.assertIsNone(self.onu_device.configuration) |
| 211 | |
| 212 | # Yield context so that MIB Database callLater runs. |
| 213 | self.onu_device.start() |
| 214 | d = asleep(0.2) |
| 215 | d.addCallbacks(stuff_db, self.not_called) |
| 216 | d.addCallbacks(do_my_tests, self.not_called) |
| 217 | return d |
| 218 | |
| 219 | @deferred(timeout=5) |
| 220 | def test_in_sync_with_ont_2g_values(self): |
| 221 | self.setup_one_of_each() |
| 222 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 223 | |
| 224 | equip_id = 'br-549' |
| 225 | omcc_ver = OMCCVersion.G_988_2012 |
| 226 | vend_code = 0x1234 |
| 227 | queues = 64 |
| 228 | scheds = 8 |
| 229 | gem_ports = 24 |
| 230 | uptime = 12345 |
| 231 | conn_capp = 0x00aa |
| 232 | qos_flex = 0x001b |
| 233 | queue_scale = 1 |
| 234 | |
| 235 | def stuff_db(_results): |
| 236 | self._stuff_database([ |
| 237 | (Ont2G.class_id, 0, {'equipment_id': equip_id, |
| 238 | 'omcc_version': omcc_ver.value, |
| 239 | 'vendor_product_code': vend_code, |
| 240 | 'total_priority_queue_number': queues, |
| 241 | 'total_traffic_scheduler_number': scheds, |
| 242 | 'total_gem_port_id_number': gem_ports, |
| 243 | 'sys_uptime': uptime, |
| 244 | 'connectivity_capability': conn_capp, |
| 245 | 'qos_configuration_flexibility': qos_flex, |
| 246 | 'priority_queue_scale_factor': queue_scale |
| 247 | })]) |
| 248 | |
| 249 | def do_my_tests(_results): |
| 250 | config = self.onu_device.configuration |
| 251 | |
| 252 | self.assertEqual(config.equipment_id, equip_id) |
| 253 | self.assertEqual(config.omcc_version, omcc_ver) |
| 254 | self.assertEqual(config.vendor_product_code, vend_code) |
| 255 | self.assertEqual(config.total_priority_queues, queues) |
| 256 | self.assertEqual(config.total_traffic_schedulers, scheds) |
| 257 | self.assertEqual(config.total_gem_ports, gem_ports) |
| 258 | self.assertEqual(config.uptime, uptime) |
| 259 | self.assertEqual(config.connectivity_capability, conn_capp) |
| 260 | self.assertEqual(config.qos_configuration_flexibility, qos_flex) |
| 261 | self.assertEqual(config.priority_queue_scale_factor, queue_scale) |
| 262 | |
| 263 | # No capabilities available until started |
| 264 | self.assertIsNone(self.onu_device.configuration) |
| 265 | |
| 266 | # Yield context so that MIB Database callLater runs. |
| 267 | self.onu_device.start() |
| 268 | d = asleep(0.2) |
| 269 | d.addCallbacks(stuff_db, self.not_called) |
| 270 | d.addCallbacks(do_my_tests, self.not_called) |
| 271 | return d |
| 272 | |
| 273 | @deferred(timeout=5) |
| 274 | def test_in_sync_with_cardholder_values(self): |
| 275 | self.setup_one_of_each() |
| 276 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 277 | |
| 278 | ch_entity = 0x102 |
| 279 | unit_type = 255 |
| 280 | clie_code = 'abc123' |
| 281 | prot_ptr = 0 |
| 282 | |
| 283 | def stuff_db(_results): |
| 284 | self._stuff_database([ |
| 285 | (Cardholder.class_id, ch_entity, {'actual_plugin_unit_type': unit_type, |
| 286 | 'actual_equipment_id': clie_code, |
| 287 | 'protection_profile_pointer': prot_ptr, |
| 288 | })]) |
| 289 | |
| 290 | def do_my_tests(_results): |
| 291 | config = self.onu_device.configuration |
| 292 | |
| 293 | cardholder = config.cardholder_entities |
| 294 | self.assertTrue(isinstance(cardholder, dict)) |
| 295 | self.assertEqual(len(cardholder), 1) |
| 296 | self.assertEqual(cardholder[ch_entity]['entity-id'], ch_entity) |
| 297 | self.assertEqual(cardholder[ch_entity]['is-single-piece'], ch_entity >= 256) |
| 298 | self.assertEqual(cardholder[ch_entity]['slot-number'], ch_entity & 0xFF) |
| 299 | self.assertEqual(cardholder[ch_entity]['actual-plug-in-type'], unit_type) |
| 300 | self.assertEqual(cardholder[ch_entity]['actual-equipment-id'], clie_code) |
| 301 | self.assertEqual(cardholder[ch_entity]['protection-profile-ptr'], prot_ptr) |
| 302 | |
| 303 | # No capabilities available until started |
| 304 | self.assertIsNone(self.onu_device.configuration) |
| 305 | |
| 306 | # Yield context so that MIB Database callLater runs. |
| 307 | self.onu_device.start() |
| 308 | d = asleep(0.2) |
| 309 | d.addCallbacks(stuff_db, self.not_called) |
| 310 | d.addCallbacks(do_my_tests, self.not_called) |
| 311 | return d |
| 312 | |
| 313 | @deferred(timeout=5) |
| 314 | def test_in_sync_with_circuitpack_values(self): |
| 315 | self.setup_one_of_each() |
| 316 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 317 | |
| 318 | cp_entity = 0x100 |
| 319 | num_ports = 1 |
| 320 | serial_num = 'ABCD01234' |
| 321 | cp_version = '1234ABCD' |
| 322 | vendor_id = 'AB-9876' |
| 323 | tconts = 2 |
| 324 | pqueues = 64 |
| 325 | sched_count = 8 |
| 326 | |
| 327 | def stuff_db(_results): |
| 328 | self._stuff_database([ |
| 329 | (CircuitPack.class_id, cp_entity, {'number_of_ports': num_ports, |
| 330 | 'serial_number': serial_num, |
| 331 | 'version': cp_version, |
| 332 | 'vendor_id': vendor_id, |
| 333 | 'total_tcont_buffer_number': tconts, |
| 334 | 'total_priority_queue_number': pqueues, |
| 335 | 'total_traffic_scheduler_number': sched_count, |
| 336 | })]) |
| 337 | |
| 338 | def do_my_tests(_results): |
| 339 | config = self.onu_device.configuration |
| 340 | |
| 341 | circuitpack = config.circuitpack_entities |
| 342 | self.assertTrue(isinstance(circuitpack, dict)) |
| 343 | self.assertEqual(len(circuitpack), 1) |
| 344 | self.assertEqual(circuitpack[cp_entity]['entity-id'], cp_entity) |
| 345 | self.assertEqual(circuitpack[cp_entity]['number-of-ports'], num_ports) |
| 346 | self.assertEqual(circuitpack[cp_entity]['serial-number'], serial_num) |
| 347 | self.assertEqual(circuitpack[cp_entity]['version'], cp_version) |
| 348 | self.assertEqual(circuitpack[cp_entity]['vendor-id'], vendor_id) |
| 349 | self.assertEqual(circuitpack[cp_entity]['total-tcont-count'], tconts) |
| 350 | self.assertEqual(circuitpack[cp_entity]['total-priority-queue-count'], pqueues) |
| 351 | self.assertEqual(circuitpack[cp_entity]['total-traffic-sched-count'], sched_count) |
| 352 | |
| 353 | # No capabilities available until started |
| 354 | self.assertIsNone(self.onu_device.configuration) |
| 355 | |
| 356 | # Yield context so that MIB Database callLater runs. |
| 357 | self.onu_device.start() |
| 358 | d = asleep(0.2) |
| 359 | d.addCallbacks(stuff_db, self.not_called) |
| 360 | d.addCallbacks(do_my_tests, self.not_called) |
| 361 | return d |
| 362 | |
| 363 | @deferred(timeout=5) |
| 364 | def test_in_sync_with_software_values(self): |
| 365 | self.setup_one_of_each() |
| 366 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 367 | |
| 368 | sw_entity = 0x200 |
| 369 | sw_version = 'Beta-0.0.2' |
| 370 | sw_hash = md5("just_a_test").hexdigest() |
| 371 | prod_code = 'MySoftware' |
| 372 | sw_active = True |
| 373 | sw_committed = True |
| 374 | sw_valid = True |
| 375 | |
| 376 | def stuff_db(_results): |
| 377 | self._stuff_database([ |
| 378 | (SoftwareImage.class_id, sw_entity, {'version': sw_version, |
| 379 | 'is_committed': sw_committed, |
| 380 | 'is_active': sw_active, |
| 381 | 'is_valid': sw_valid, |
| 382 | 'product_code': prod_code, |
| 383 | 'image_hash': sw_hash, |
| 384 | })]) |
| 385 | |
| 386 | def do_my_tests(_results): |
| 387 | config = self.onu_device.configuration |
| 388 | |
| 389 | images = config.software_images |
| 390 | self.assertTrue(isinstance(images, list)) |
| 391 | self.assertEqual(len(images), 1) |
| 392 | self.assertEqual(images[0].name, 'running-revision' if sw_active else 'candidate-revision') |
| 393 | self.assertEqual(images[0].version, sw_version) |
| 394 | self.assertEqual(images[0].is_active, 1 if sw_active else 0) |
| 395 | self.assertEqual(images[0].is_committed, 1 if sw_committed else 0) |
| 396 | self.assertEqual(images[0].is_valid, 1 if sw_valid else 0) |
| 397 | self.assertEqual(images[0].hash, sw_hash) |
| 398 | |
| 399 | # No capabilities available until started |
| 400 | self.assertIsNone(self.onu_device.configuration) |
| 401 | |
| 402 | # Yield context so that MIB Database callLater runs. |
| 403 | self.onu_device.start() |
| 404 | d = asleep(0.2) |
| 405 | d.addCallbacks(stuff_db, self.not_called) |
| 406 | d.addCallbacks(do_my_tests, self.not_called) |
| 407 | return d |
| 408 | |
| 409 | @deferred(timeout=5) |
| 410 | def test_in_sync_with_ani_g_values(self): |
| 411 | self.setup_one_of_each() |
| 412 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 413 | |
| 414 | entity_id = 0x0106 |
| 415 | tconts = 4 |
| 416 | dba_report = 4 |
| 417 | |
| 418 | def stuff_db(_results): |
| 419 | self._stuff_database([ |
| 420 | (AniG.class_id, entity_id, {'total_tcont_number': tconts, |
| 421 | 'piggyback_dba_reporting': dba_report |
| 422 | }) |
| 423 | ]) |
| 424 | |
| 425 | def do_my_tests(_results): |
| 426 | config = self.onu_device.configuration |
| 427 | |
| 428 | anig = config.ani_g_entities |
| 429 | self.assertTrue(isinstance(anig, dict)) |
| 430 | self.assertEqual(len(anig), 1) |
| 431 | |
| 432 | self.assertEqual(anig[entity_id]['entity-id'], entity_id) |
| 433 | self.assertEqual(anig[entity_id]['slot-number'], (entity_id >> 8) & 0xff) |
| 434 | self.assertEqual(anig[entity_id]['port-number'], entity_id & 0xff) |
| 435 | self.assertEqual(anig[entity_id]['total-tcont-count'], tconts) |
| 436 | self.assertEqual(anig[entity_id]['piggyback-dba-reporting'], dba_report) |
| 437 | |
| 438 | # No capabilities available until started |
| 439 | self.assertIsNone(self.onu_device.configuration) |
| 440 | |
| 441 | # Yield context so that MIB Database callLater runs. |
| 442 | self.onu_device.start() |
| 443 | d = asleep(0.2) |
| 444 | d.addCallbacks(stuff_db, self.not_called) |
| 445 | d.addCallbacks(do_my_tests, self.not_called) |
| 446 | return d |
| 447 | |
| 448 | @deferred(timeout=5) |
| 449 | def test_in_sync_with_uni_g_values(self): |
| 450 | self.setup_one_of_each() |
| 451 | self.assertEqual(len(self.omci_agent.device_ids()), 1) |
| 452 | |
| 453 | entity_id = 0x4321 |
| 454 | mgmt_cap = 0 |
| 455 | |
| 456 | def stuff_db(_results): |
| 457 | self._stuff_database([ |
| 458 | (UniG.class_id, entity_id, {'management_capability': mgmt_cap}) |
| 459 | ]) |
| 460 | |
| 461 | def do_my_tests(_results): |
| 462 | config = self.onu_device.configuration |
| 463 | |
| 464 | unig = config.uni_g_entities |
| 465 | self.assertTrue(isinstance(unig, dict)) |
| 466 | self.assertEqual(len(unig), 1) |
| 467 | |
| 468 | self.assertEqual(unig[entity_id]['entity-id'], entity_id) |
| 469 | self.assertEqual(unig[entity_id]['management-capability'], mgmt_cap) |
| 470 | |
| 471 | # No capabilities available until started |
| 472 | self.assertIsNone(self.onu_device.configuration) |
| 473 | |
| 474 | # Yield context so that MIB Database callLater runs. |
| 475 | self.onu_device.start() |
| 476 | d = asleep(0.2) |
| 477 | d.addCallbacks(stuff_db, self.not_called) |
| 478 | d.addCallbacks(do_my_tests, self.not_called) |
| 479 | return d |
| 480 | |
| 481 | |
| 482 | if __name__ == '__main__': |
| 483 | main() |
| 484 | |