Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | import binascii |
| 18 | import structlog |
| 19 | from unittest import TestCase, TestSuite, skip |
| 20 | from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent |
| 21 | from pyvoltha.adapters.extensions.omci.omci_entities import SoftwareImage |
| 22 | from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame |
| 23 | from pyvoltha.adapters.extensions.omci.omci_messages import \ |
| 24 | OmciStartSoftwareDownload, OmciStartSoftwareDownloadResponse, \ |
| 25 | OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \ |
| 26 | OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \ |
| 27 | OmciActivateImage, OmciActivateImageResponse, \ |
| 28 | OmciCommitImage, OmciCommitImageResponse |
Zack Williams | a95e2c8 | 2019-04-17 15:43:54 -0700 | [diff] [blame] | 29 | from voltha_protos.device_pb2 import Device, ImageDownload |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 30 | |
Chip Boling | ce2daf6 | 2019-02-12 13:53:39 -0600 | [diff] [blame] | 31 | from test.unit.extensions.omci.mock.mock_adapter_agent import MockAdapterAgent, MockCore |
Chip Boling | 67b674a | 2019-02-08 11:42:18 -0600 | [diff] [blame] | 32 | from twisted.internet import reactor |
| 33 | from twisted.internet.defer import Deferred |
| 34 | from twisted.internet.epollreactor import EPollReactor |
| 35 | from time import sleep |
| 36 | |
| 37 | class TestOmciDownload(TestCase): |
| 38 | # def __init__(self, device_id='1', image_file='/home/lcui/work/tmp/v_change.txt', **kwargs): |
| 39 | # self.device_id = device_id |
| 40 | # self.image_file = image_file |
| 41 | # super(TestOmciDownload, self).__init__(**kwargs) |
| 42 | sw_dwld_resp = { |
| 43 | 'tid': '0001', |
| 44 | 'mid': '33', |
| 45 | 'did': '0A', |
| 46 | 'entity_class': '0007', |
| 47 | 'entity_id' : '0000', |
| 48 | 'reason' : '0000', |
| 49 | 'window_size' : '001F', |
| 50 | 'inst_num' : '0001', |
| 51 | 'inst_id' : '0000', |
| 52 | 'trailer' : '00000028', |
| 53 | 'mic' : '00000000' |
| 54 | } |
| 55 | |
| 56 | # sw_dwld_resp = '0001330A000700000000001f010000' |
| 57 | |
| 58 | ### Test Functions ### |
| 59 | def sim_receive_start_sw_download_resp(self, tid, eid, r=0): |
| 60 | msg = OmciFrame( |
| 61 | transaction_id=tid, |
| 62 | message_type=OmciStartSoftwareDownloadResponse.message_id, |
| 63 | omci_message=OmciStartSoftwareDownloadResponse( |
| 64 | entity_class=0x7, |
| 65 | entity_id=eid, |
| 66 | result=r, |
| 67 | window_size=0x1F, |
| 68 | image_number=1, |
| 69 | instance_id=eid |
| 70 | ) |
| 71 | ) |
| 72 | self.device.omci_cc.receive_message(msg) |
| 73 | |
| 74 | def sim_receive_download_section_resp(self, tid, eid, section, r=0): |
| 75 | msg = OmciFrame( |
| 76 | transaction_id=tid, |
| 77 | message_type=OmciDownloadSectionResponse.message_id, |
| 78 | omci_message=OmciDownloadSectionResponse( |
| 79 | entity_class=0x7, |
| 80 | entity_id=eid, |
| 81 | result=r, |
| 82 | section_number=section |
| 83 | ) |
| 84 | ) |
| 85 | self.device.omci_cc.receive_message(msg) |
| 86 | |
| 87 | def sim_receive_end_sw_download_resp(self, tid, eid, r=0): |
| 88 | msg = OmciFrame( |
| 89 | transaction_id=tid, |
| 90 | message_type=OmciEndSoftwareDownloadResponse.message_id, |
| 91 | omci_message=OmciEndSoftwareDownloadResponse( |
| 92 | entity_class=0x7, |
| 93 | entity_id=eid, |
| 94 | result=r, |
| 95 | image_number=0x1, |
| 96 | instance_id=eid, |
| 97 | result0=0x0 |
| 98 | ) |
| 99 | ) |
| 100 | self.device.omci_cc.receive_message(msg) |
| 101 | |
| 102 | def sim_receive_activate_image_resp(self, tid, eid, r=0): |
| 103 | msg = OmciFrame( |
| 104 | transaction_id=tid, |
| 105 | message_type=OmciActivateImageResponse.message_id, |
| 106 | omci_message=OmciActivateImageResponse( |
| 107 | entity_class=0x7, |
| 108 | entity_id=eid, |
| 109 | result = r |
| 110 | )) |
| 111 | self.device.omci_cc.receive_message(msg) |
| 112 | |
| 113 | def sim_receive_commit_image_resp(self, tid, eid, r=0): |
| 114 | msg = OmciFrame( |
| 115 | transaction_id=tid, |
| 116 | message_type=OmciCommitImageResponse.message_id, |
| 117 | omci_message=OmciCommitImageResponse( |
| 118 | entity_class=0x7, |
| 119 | entity_id=eid, |
| 120 | result = r |
| 121 | )) |
| 122 | self.device.omci_cc.receive_message(msg) |
| 123 | |
| 124 | def cb_after_send_omci(self, msg): |
| 125 | self.log.debug("cb_after_send_omci") |
| 126 | dmsg = OmciFrame(binascii.unhexlify(msg)) |
| 127 | tid = dmsg.fields['transaction_id'] |
| 128 | mid = dmsg.fields['message_type'] |
| 129 | dmsg_body = dmsg.fields['omci_message'] |
| 130 | eid = dmsg_body.fields['entity_id'] |
| 131 | |
| 132 | # print("%X" % dmsg.fields['transaction_id']) |
| 133 | # print("%X" % dmsg.fields['message_type']) |
| 134 | # print("%X" % OmciActivateImage.message_id) |
| 135 | # print("%X" % dmsg_body.fields['entity_id']) |
| 136 | |
| 137 | if mid == OmciStartSoftwareDownload.message_id: |
| 138 | self.log.debug("response start download") |
| 139 | self.reactor.callLater(0, self.sim_receive_start_sw_download_resp, tid, eid) |
| 140 | elif mid == OmciEndSoftwareDownload.message_id: |
| 141 | self.log.debug("response end download") |
| 142 | if self._end_image_busy_try > 0: |
| 143 | self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid, r=6) |
| 144 | self._end_image_busy_try -= 1 |
| 145 | else: |
| 146 | self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid) |
| 147 | elif mid == OmciDownloadSection.message_id: |
| 148 | self.log.debug("receive download section, not respond") |
| 149 | elif mid == OmciDownloadSectionLast.message_id: |
| 150 | self.log.debug("response download last section") |
| 151 | self.reactor.callLater(0, self.sim_receive_download_section_resp, tid, eid, |
| 152 | section=dmsg_body.fields["section_number"]) |
| 153 | elif mid == OmciActivateImage.message_id: |
| 154 | self.log.debug("response activate image") |
| 155 | if self._act_image_busy_try > 0: |
| 156 | self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid, r=6) |
| 157 | self._act_image_busy_try -= 1 |
| 158 | else: |
| 159 | self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid) |
| 160 | self.reactor.callLater(2, self.device.image_agent.onu_bootup) |
| 161 | elif mid == OmciCommitImage.message_id: |
| 162 | self.log.debug("response commit image") |
| 163 | self.reactor.callLater(0, self.sim_receive_commit_image_resp, tid, eid) |
| 164 | else: |
| 165 | self.log.debug("Unsupported message type", message_type=mid) |
| 166 | |
| 167 | self.defer = Deferred() |
| 168 | self.defer.addCallback(self.cb_after_send_omci) |
| 169 | self.adapter_agent.send_omci_defer = self.defer |
| 170 | |
| 171 | def setUp(self): |
| 172 | self.log = structlog.get_logger() |
| 173 | self.log.debug("do setup") |
| 174 | self.device_id = '1' |
| 175 | self._image_dnld = ImageDownload() |
| 176 | self._image_dnld.id = '1' |
| 177 | self._image_dnld.name = 'switchd_1012' |
| 178 | # self._image_dnld.name = 'xgsont_4.4.4.006.img' |
| 179 | self._image_dnld.url = 'http://192.168.100.222:9090/load/4.4.4.006.img' |
| 180 | self._image_dnld.crc = 0 |
| 181 | self._image_dnld.local_dir = '/home/lcui/work/tmp' |
| 182 | self._image_dnld.state = ImageDownload.DOWNLOAD_SUCCEEDED # ImageDownload.DOWNLOAD_UNKNOWN |
| 183 | self._end_image_busy_try = 2 |
| 184 | self._act_image_busy_try = 0 |
| 185 | # self.image_file = '/home/lcui/work/tmp/v_change.txt' |
| 186 | self.reactor = EPollReactor() |
| 187 | self.defer = Deferred() |
| 188 | self.adapter_agent = MockAdapterAgent(self.defer) |
| 189 | self.defer.addCallback(self.cb_after_send_omci) |
| 190 | pb2_dev = Device(id='1') |
| 191 | self.adapter_agent.add_device(pb2_dev) |
| 192 | self.core = self.adapter_agent.core |
| 193 | self.omci_agent = OpenOMCIAgent(self.core, clock=self.reactor) |
| 194 | self.device = self.omci_agent.add_device(self.device_id, self.adapter_agent) |
| 195 | self.omci_agent.start() |
| 196 | self.omci_agent.database.add('1') |
| 197 | self.omci_agent.database.set('1', SoftwareImage.class_id, 0, {"is_committed": 1, "is_active": 1, "is_valid": 1}) |
| 198 | self.omci_agent.database.set('1', SoftwareImage.class_id, 1, {"is_committed": 0, "is_active": 0, "is_valid": 1}) |
| 199 | |
| 200 | def tearDown(self): |
| 201 | self.log.debug("Test is Done") |
| 202 | self.omci_agent.database.remove('1') |
| 203 | self.device = None |
| 204 | |
| 205 | def stop(self): |
| 206 | self.reactor.stop() |
| 207 | self.log.debug("stopped"); |
| 208 | |
| 209 | def get_omci_msg(self, *args, **kargs): |
| 210 | m = '' |
| 211 | for s in args: |
| 212 | m += s |
| 213 | m = m.ljust(80, '0') |
| 214 | return m + kargs['trailer'] + kargs['mic'] |
| 215 | |
| 216 | def sim_receive_sw_download_resp2(self): |
| 217 | r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'], |
| 218 | self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'], |
| 219 | self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'], |
| 220 | self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'], |
| 221 | trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic']) |
| 222 | data = binascii.unhexlify(r) |
| 223 | #msg = OmciFrame(data) |
| 224 | #print(msg.fields['transaction_id']) |
| 225 | #print(msg.fields['omci']) |
| 226 | self.device.omci_cc.receive_message(data) |
| 227 | |
| 228 | def sw_action_success(self, instance_id, device_id): |
| 229 | self.log.debug("Action Success", device_id=device_id, entity_id=instance_id) |
| 230 | self.reactor.callLater(0, self.onu_do_activate) |
| 231 | |
| 232 | def sw_action2_success(self, instance_id, device_id): |
| 233 | self.log.debug("Action2 Success", device_id=device_id, entity_id=instance_id) |
| 234 | |
| 235 | def sw_action_fail(self, fail, device_id): |
| 236 | self.log.debug("Finally Failed", device_id=device_id) |
| 237 | self.log.debug(fail) |
| 238 | |
| 239 | # def test_onu_do_activate(self): |
| 240 | def onu_do_activate(self): |
| 241 | self.log.debug("do test_onu_do_activate") |
| 242 | self.defer = self.device.do_onu_image_activate(self._image_dnld.name) |
| 243 | self.defer.addCallbacks(self.sw_action2_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,)) |
| 244 | self.reactor.callLater(100, self.stop) |
| 245 | # self.reactor.run() |
| 246 | |
| 247 | @skip("for Jenkins Verification") |
| 248 | def test_onu_do_software_upgrade(self): |
| 249 | self.log.debug("do test_onu_do_software_upgrade", download=self._image_dnld) |
| 250 | dr = self.omci_agent.database.query('1', SoftwareImage.class_id, 0, "is_committed") |
| 251 | self.defer = self.device.do_onu_software_download(self._image_dnld) |
| 252 | self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,)) |
| 253 | # self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail) #, errbackArgs=(self.device_id,)) |
| 254 | # self.reactor.callLater(1, self.sim_receive_start_sw_download_resp) |
| 255 | # self.reactor.callLater(12, self.stop) |
| 256 | self.reactor.run() |
| 257 | |
| 258 | @skip("Not used") |
| 259 | def test_omci_message(self): |
| 260 | self.log.debug("do test_omci_message") |
| 261 | r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'], |
| 262 | self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'], |
| 263 | self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'], |
| 264 | self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'], |
| 265 | trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic']) |
| 266 | data = binascii.unhexlify(r) |
| 267 | msg = OmciFrame(data) |
| 268 | self.log.debug(binascii.hexlify(str(msg))) |
| 269 | # print("%04X" % msg.fields['transaction_id']) |
| 270 | # print("%02X" % msg.fields['message_type']) |
| 271 | # print("%02X" % msg.fields['omci']) |
| 272 | # print("%X" % msg.fields['omci_message']) |
| 273 | |
| 274 | @skip("Not used") |
| 275 | def test_omci_message2(self): |
| 276 | self.log.debug("do test_omci_message2") |
| 277 | msg = OmciFrame( |
| 278 | transaction_id=0x0001, |
| 279 | message_type=OmciStartSoftwareDownloadResponse.message_id, |
| 280 | omci_message=OmciStartSoftwareDownloadResponse( |
| 281 | entity_class=0x7, |
| 282 | entity_id=0x0, |
| 283 | result=0x0, |
| 284 | window_size=0x1F, |
| 285 | image_number=1, |
| 286 | instance_id=0 |
| 287 | ) |
| 288 | ) |
| 289 | self.log.debug(binascii.hexlify(str(msg))) |
| 290 | |
| 291 | this_suite = TestSuite() |
| 292 | # this_suite.addTest(TestOmciDownload('test_onu_do_software_upgrade')) |
| 293 | # this_suite.addTest(TestOmciDownload('test_onu_do_activate')) |
| 294 | |