Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | import binascii |
| 18 | import structlog |
| 19 | from unittest import TestCase, TestSuite, skip |
| 20 | from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent |
| 21 | from pyvoltha.adapters.extensions.omci.omci_entities import SoftwareImage |
| 22 | from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame |
| 23 | from pyvoltha.adapters.extensions.omci.omci_messages import \ |
| 24 | OmciStartSoftwareDownload, OmciStartSoftwareDownloadResponse, \ |
| 25 | OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \ |
| 26 | OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \ |
| 27 | OmciActivateImage, OmciActivateImageResponse, \ |
| 28 | OmciCommitImage, OmciCommitImageResponse |
| 29 | from pyvoltha.protos.voltha_pb2 import ImageDownload |
| 30 | from pyvoltha.protos.device_pb2 import Device |
| 31 | |
| 32 | from tests.utests.voltha.extensions.omci.mock.mock_adapter_agent import MockAdapterAgent, MockCore |
| 33 | from twisted.internet import reactor |
| 34 | from twisted.internet.defer import Deferred |
| 35 | from twisted.internet.epollreactor import EPollReactor |
| 36 | from time import sleep |
| 37 | |
| 38 | class TestOmciDownload(TestCase): |
| 39 | # def __init__(self, device_id='1', image_file='/home/lcui/work/tmp/v_change.txt', **kwargs): |
| 40 | # self.device_id = device_id |
| 41 | # self.image_file = image_file |
| 42 | # super(TestOmciDownload, self).__init__(**kwargs) |
| 43 | sw_dwld_resp = { |
| 44 | 'tid': '0001', |
| 45 | 'mid': '33', |
| 46 | 'did': '0A', |
| 47 | 'entity_class': '0007', |
| 48 | 'entity_id' : '0000', |
| 49 | 'reason' : '0000', |
| 50 | 'window_size' : '001F', |
| 51 | 'inst_num' : '0001', |
| 52 | 'inst_id' : '0000', |
| 53 | 'trailer' : '00000028', |
| 54 | 'mic' : '00000000' |
| 55 | } |
| 56 | |
| 57 | # sw_dwld_resp = '0001330A000700000000001f010000' |
| 58 | |
| 59 | ### Test Functions ### |
| 60 | def sim_receive_start_sw_download_resp(self, tid, eid, r=0): |
| 61 | msg = OmciFrame( |
| 62 | transaction_id=tid, |
| 63 | message_type=OmciStartSoftwareDownloadResponse.message_id, |
| 64 | omci_message=OmciStartSoftwareDownloadResponse( |
| 65 | entity_class=0x7, |
| 66 | entity_id=eid, |
| 67 | result=r, |
| 68 | window_size=0x1F, |
| 69 | image_number=1, |
| 70 | instance_id=eid |
| 71 | ) |
| 72 | ) |
| 73 | self.device.omci_cc.receive_message(msg) |
| 74 | |
| 75 | def sim_receive_download_section_resp(self, tid, eid, section, r=0): |
| 76 | msg = OmciFrame( |
| 77 | transaction_id=tid, |
| 78 | message_type=OmciDownloadSectionResponse.message_id, |
| 79 | omci_message=OmciDownloadSectionResponse( |
| 80 | entity_class=0x7, |
| 81 | entity_id=eid, |
| 82 | result=r, |
| 83 | section_number=section |
| 84 | ) |
| 85 | ) |
| 86 | self.device.omci_cc.receive_message(msg) |
| 87 | |
| 88 | def sim_receive_end_sw_download_resp(self, tid, eid, r=0): |
| 89 | msg = OmciFrame( |
| 90 | transaction_id=tid, |
| 91 | message_type=OmciEndSoftwareDownloadResponse.message_id, |
| 92 | omci_message=OmciEndSoftwareDownloadResponse( |
| 93 | entity_class=0x7, |
| 94 | entity_id=eid, |
| 95 | result=r, |
| 96 | image_number=0x1, |
| 97 | instance_id=eid, |
| 98 | result0=0x0 |
| 99 | ) |
| 100 | ) |
| 101 | self.device.omci_cc.receive_message(msg) |
| 102 | |
| 103 | def sim_receive_activate_image_resp(self, tid, eid, r=0): |
| 104 | msg = OmciFrame( |
| 105 | transaction_id=tid, |
| 106 | message_type=OmciActivateImageResponse.message_id, |
| 107 | omci_message=OmciActivateImageResponse( |
| 108 | entity_class=0x7, |
| 109 | entity_id=eid, |
| 110 | result = r |
| 111 | )) |
| 112 | self.device.omci_cc.receive_message(msg) |
| 113 | |
| 114 | def sim_receive_commit_image_resp(self, tid, eid, r=0): |
| 115 | msg = OmciFrame( |
| 116 | transaction_id=tid, |
| 117 | message_type=OmciCommitImageResponse.message_id, |
| 118 | omci_message=OmciCommitImageResponse( |
| 119 | entity_class=0x7, |
| 120 | entity_id=eid, |
| 121 | result = r |
| 122 | )) |
| 123 | self.device.omci_cc.receive_message(msg) |
| 124 | |
| 125 | def cb_after_send_omci(self, msg): |
| 126 | self.log.debug("cb_after_send_omci") |
| 127 | dmsg = OmciFrame(binascii.unhexlify(msg)) |
| 128 | tid = dmsg.fields['transaction_id'] |
| 129 | mid = dmsg.fields['message_type'] |
| 130 | dmsg_body = dmsg.fields['omci_message'] |
| 131 | eid = dmsg_body.fields['entity_id'] |
| 132 | |
| 133 | # print("%X" % dmsg.fields['transaction_id']) |
| 134 | # print("%X" % dmsg.fields['message_type']) |
| 135 | # print("%X" % OmciActivateImage.message_id) |
| 136 | # print("%X" % dmsg_body.fields['entity_id']) |
| 137 | |
| 138 | if mid == OmciStartSoftwareDownload.message_id: |
| 139 | self.log.debug("response start download") |
| 140 | self.reactor.callLater(0, self.sim_receive_start_sw_download_resp, tid, eid) |
| 141 | elif mid == OmciEndSoftwareDownload.message_id: |
| 142 | self.log.debug("response end download") |
| 143 | if self._end_image_busy_try > 0: |
| 144 | self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid, r=6) |
| 145 | self._end_image_busy_try -= 1 |
| 146 | else: |
| 147 | self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid) |
| 148 | elif mid == OmciDownloadSection.message_id: |
| 149 | self.log.debug("receive download section, not respond") |
| 150 | elif mid == OmciDownloadSectionLast.message_id: |
| 151 | self.log.debug("response download last section") |
| 152 | self.reactor.callLater(0, self.sim_receive_download_section_resp, tid, eid, |
| 153 | section=dmsg_body.fields["section_number"]) |
| 154 | elif mid == OmciActivateImage.message_id: |
| 155 | self.log.debug("response activate image") |
| 156 | if self._act_image_busy_try > 0: |
| 157 | self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid, r=6) |
| 158 | self._act_image_busy_try -= 1 |
| 159 | else: |
| 160 | self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid) |
| 161 | self.reactor.callLater(2, self.device.image_agent.onu_bootup) |
| 162 | elif mid == OmciCommitImage.message_id: |
| 163 | self.log.debug("response commit image") |
| 164 | self.reactor.callLater(0, self.sim_receive_commit_image_resp, tid, eid) |
| 165 | else: |
| 166 | self.log.debug("Unsupported message type", message_type=mid) |
| 167 | |
| 168 | self.defer = Deferred() |
| 169 | self.defer.addCallback(self.cb_after_send_omci) |
| 170 | self.adapter_agent.send_omci_defer = self.defer |
| 171 | |
| 172 | def setUp(self): |
| 173 | self.log = structlog.get_logger() |
| 174 | self.log.debug("do setup") |
| 175 | self.device_id = '1' |
| 176 | self._image_dnld = ImageDownload() |
| 177 | self._image_dnld.id = '1' |
| 178 | self._image_dnld.name = 'switchd_1012' |
| 179 | # self._image_dnld.name = 'xgsont_4.4.4.006.img' |
| 180 | self._image_dnld.url = 'http://192.168.100.222:9090/load/4.4.4.006.img' |
| 181 | self._image_dnld.crc = 0 |
| 182 | self._image_dnld.local_dir = '/home/lcui/work/tmp' |
| 183 | self._image_dnld.state = ImageDownload.DOWNLOAD_SUCCEEDED # ImageDownload.DOWNLOAD_UNKNOWN |
| 184 | self._end_image_busy_try = 2 |
| 185 | self._act_image_busy_try = 0 |
| 186 | # self.image_file = '/home/lcui/work/tmp/v_change.txt' |
| 187 | self.reactor = EPollReactor() |
| 188 | self.defer = Deferred() |
| 189 | self.adapter_agent = MockAdapterAgent(self.defer) |
| 190 | self.defer.addCallback(self.cb_after_send_omci) |
| 191 | pb2_dev = Device(id='1') |
| 192 | self.adapter_agent.add_device(pb2_dev) |
| 193 | self.core = self.adapter_agent.core |
| 194 | self.omci_agent = OpenOMCIAgent(self.core, clock=self.reactor) |
| 195 | self.device = self.omci_agent.add_device(self.device_id, self.adapter_agent) |
| 196 | self.omci_agent.start() |
| 197 | self.omci_agent.database.add('1') |
| 198 | self.omci_agent.database.set('1', SoftwareImage.class_id, 0, {"is_committed": 1, "is_active": 1, "is_valid": 1}) |
| 199 | self.omci_agent.database.set('1', SoftwareImage.class_id, 1, {"is_committed": 0, "is_active": 0, "is_valid": 1}) |
| 200 | |
| 201 | def tearDown(self): |
| 202 | self.log.debug("Test is Done") |
| 203 | self.omci_agent.database.remove('1') |
| 204 | self.device = None |
| 205 | |
| 206 | def stop(self): |
| 207 | self.reactor.stop() |
| 208 | self.log.debug("stopped"); |
| 209 | |
| 210 | def get_omci_msg(self, *args, **kargs): |
| 211 | m = '' |
| 212 | for s in args: |
| 213 | m += s |
| 214 | m = m.ljust(80, '0') |
| 215 | return m + kargs['trailer'] + kargs['mic'] |
| 216 | |
| 217 | def sim_receive_sw_download_resp2(self): |
| 218 | r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'], |
| 219 | self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'], |
| 220 | self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'], |
| 221 | self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'], |
| 222 | trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic']) |
| 223 | data = binascii.unhexlify(r) |
| 224 | #msg = OmciFrame(data) |
| 225 | #print(msg.fields['transaction_id']) |
| 226 | #print(msg.fields['omci']) |
| 227 | self.device.omci_cc.receive_message(data) |
| 228 | |
| 229 | def sw_action_success(self, instance_id, device_id): |
| 230 | self.log.debug("Action Success", device_id=device_id, entity_id=instance_id) |
| 231 | self.reactor.callLater(0, self.onu_do_activate) |
| 232 | |
| 233 | def sw_action2_success(self, instance_id, device_id): |
| 234 | self.log.debug("Action2 Success", device_id=device_id, entity_id=instance_id) |
| 235 | |
| 236 | def sw_action_fail(self, fail, device_id): |
| 237 | self.log.debug("Finally Failed", device_id=device_id) |
| 238 | self.log.debug(fail) |
| 239 | |
| 240 | # def test_onu_do_activate(self): |
| 241 | def onu_do_activate(self): |
| 242 | self.log.debug("do test_onu_do_activate") |
| 243 | self.defer = self.device.do_onu_image_activate(self._image_dnld.name) |
| 244 | self.defer.addCallbacks(self.sw_action2_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,)) |
| 245 | self.reactor.callLater(100, self.stop) |
| 246 | # self.reactor.run() |
| 247 | |
| 248 | @skip("for Jenkins Verification") |
| 249 | def test_onu_do_software_upgrade(self): |
| 250 | self.log.debug("do test_onu_do_software_upgrade", download=self._image_dnld) |
| 251 | dr = self.omci_agent.database.query('1', SoftwareImage.class_id, 0, "is_committed") |
| 252 | self.defer = self.device.do_onu_software_download(self._image_dnld) |
| 253 | self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,)) |
| 254 | # self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail) #, errbackArgs=(self.device_id,)) |
| 255 | # self.reactor.callLater(1, self.sim_receive_start_sw_download_resp) |
| 256 | # self.reactor.callLater(12, self.stop) |
| 257 | self.reactor.run() |
| 258 | |
| 259 | @skip("Not used") |
| 260 | def test_omci_message(self): |
| 261 | self.log.debug("do test_omci_message") |
| 262 | r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'], |
| 263 | self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'], |
| 264 | self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'], |
| 265 | self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'], |
| 266 | trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic']) |
| 267 | data = binascii.unhexlify(r) |
| 268 | msg = OmciFrame(data) |
| 269 | self.log.debug(binascii.hexlify(str(msg))) |
| 270 | # print("%04X" % msg.fields['transaction_id']) |
| 271 | # print("%02X" % msg.fields['message_type']) |
| 272 | # print("%02X" % msg.fields['omci']) |
| 273 | # print("%X" % msg.fields['omci_message']) |
| 274 | |
| 275 | @skip("Not used") |
| 276 | def test_omci_message2(self): |
| 277 | self.log.debug("do test_omci_message2") |
| 278 | msg = OmciFrame( |
| 279 | transaction_id=0x0001, |
| 280 | message_type=OmciStartSoftwareDownloadResponse.message_id, |
| 281 | omci_message=OmciStartSoftwareDownloadResponse( |
| 282 | entity_class=0x7, |
| 283 | entity_id=0x0, |
| 284 | result=0x0, |
| 285 | window_size=0x1F, |
| 286 | image_number=1, |
| 287 | instance_id=0 |
| 288 | ) |
| 289 | ) |
| 290 | self.log.debug(binascii.hexlify(str(msg))) |
| 291 | |
| 292 | this_suite = TestSuite() |
| 293 | # this_suite.addTest(TestOmciDownload('test_onu_do_software_upgrade')) |
| 294 | # this_suite.addTest(TestOmciDownload('test_onu_do_activate')) |
| 295 | |