blob: a801d0a7b359e3d57428d18f261c934a9f9e2bca [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2017 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import binascii
18import structlog
19from unittest import TestCase, TestSuite, skip
20from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent
21from pyvoltha.adapters.extensions.omci.omci_entities import SoftwareImage
22from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame
23from pyvoltha.adapters.extensions.omci.omci_messages import \
24 OmciStartSoftwareDownload, OmciStartSoftwareDownloadResponse, \
25 OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \
26 OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \
27 OmciActivateImage, OmciActivateImageResponse, \
28 OmciCommitImage, OmciCommitImageResponse
29from pyvoltha.protos.voltha_pb2 import ImageDownload
30from pyvoltha.protos.device_pb2 import Device
31
32from tests.utests.voltha.extensions.omci.mock.mock_adapter_agent import MockAdapterAgent, MockCore
33from twisted.internet import reactor
34from twisted.internet.defer import Deferred
35from twisted.internet.epollreactor import EPollReactor
36from time import sleep
37
38class TestOmciDownload(TestCase):
39 # def __init__(self, device_id='1', image_file='/home/lcui/work/tmp/v_change.txt', **kwargs):
40 # self.device_id = device_id
41 # self.image_file = image_file
42 # super(TestOmciDownload, self).__init__(**kwargs)
43 sw_dwld_resp = {
44 'tid': '0001',
45 'mid': '33',
46 'did': '0A',
47 'entity_class': '0007',
48 'entity_id' : '0000',
49 'reason' : '0000',
50 'window_size' : '001F',
51 'inst_num' : '0001',
52 'inst_id' : '0000',
53 'trailer' : '00000028',
54 'mic' : '00000000'
55 }
56
57 # sw_dwld_resp = '0001330A000700000000001f010000'
58
59 ### Test Functions ###
60 def sim_receive_start_sw_download_resp(self, tid, eid, r=0):
61 msg = OmciFrame(
62 transaction_id=tid,
63 message_type=OmciStartSoftwareDownloadResponse.message_id,
64 omci_message=OmciStartSoftwareDownloadResponse(
65 entity_class=0x7,
66 entity_id=eid,
67 result=r,
68 window_size=0x1F,
69 image_number=1,
70 instance_id=eid
71 )
72 )
73 self.device.omci_cc.receive_message(msg)
74
75 def sim_receive_download_section_resp(self, tid, eid, section, r=0):
76 msg = OmciFrame(
77 transaction_id=tid,
78 message_type=OmciDownloadSectionResponse.message_id,
79 omci_message=OmciDownloadSectionResponse(
80 entity_class=0x7,
81 entity_id=eid,
82 result=r,
83 section_number=section
84 )
85 )
86 self.device.omci_cc.receive_message(msg)
87
88 def sim_receive_end_sw_download_resp(self, tid, eid, r=0):
89 msg = OmciFrame(
90 transaction_id=tid,
91 message_type=OmciEndSoftwareDownloadResponse.message_id,
92 omci_message=OmciEndSoftwareDownloadResponse(
93 entity_class=0x7,
94 entity_id=eid,
95 result=r,
96 image_number=0x1,
97 instance_id=eid,
98 result0=0x0
99 )
100 )
101 self.device.omci_cc.receive_message(msg)
102
103 def sim_receive_activate_image_resp(self, tid, eid, r=0):
104 msg = OmciFrame(
105 transaction_id=tid,
106 message_type=OmciActivateImageResponse.message_id,
107 omci_message=OmciActivateImageResponse(
108 entity_class=0x7,
109 entity_id=eid,
110 result = r
111 ))
112 self.device.omci_cc.receive_message(msg)
113
114 def sim_receive_commit_image_resp(self, tid, eid, r=0):
115 msg = OmciFrame(
116 transaction_id=tid,
117 message_type=OmciCommitImageResponse.message_id,
118 omci_message=OmciCommitImageResponse(
119 entity_class=0x7,
120 entity_id=eid,
121 result = r
122 ))
123 self.device.omci_cc.receive_message(msg)
124
125 def cb_after_send_omci(self, msg):
126 self.log.debug("cb_after_send_omci")
127 dmsg = OmciFrame(binascii.unhexlify(msg))
128 tid = dmsg.fields['transaction_id']
129 mid = dmsg.fields['message_type']
130 dmsg_body = dmsg.fields['omci_message']
131 eid = dmsg_body.fields['entity_id']
132
133 # print("%X" % dmsg.fields['transaction_id'])
134 # print("%X" % dmsg.fields['message_type'])
135 # print("%X" % OmciActivateImage.message_id)
136 # print("%X" % dmsg_body.fields['entity_id'])
137
138 if mid == OmciStartSoftwareDownload.message_id:
139 self.log.debug("response start download")
140 self.reactor.callLater(0, self.sim_receive_start_sw_download_resp, tid, eid)
141 elif mid == OmciEndSoftwareDownload.message_id:
142 self.log.debug("response end download")
143 if self._end_image_busy_try > 0:
144 self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid, r=6)
145 self._end_image_busy_try -= 1
146 else:
147 self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid)
148 elif mid == OmciDownloadSection.message_id:
149 self.log.debug("receive download section, not respond")
150 elif mid == OmciDownloadSectionLast.message_id:
151 self.log.debug("response download last section")
152 self.reactor.callLater(0, self.sim_receive_download_section_resp, tid, eid,
153 section=dmsg_body.fields["section_number"])
154 elif mid == OmciActivateImage.message_id:
155 self.log.debug("response activate image")
156 if self._act_image_busy_try > 0:
157 self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid, r=6)
158 self._act_image_busy_try -= 1
159 else:
160 self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid)
161 self.reactor.callLater(2, self.device.image_agent.onu_bootup)
162 elif mid == OmciCommitImage.message_id:
163 self.log.debug("response commit image")
164 self.reactor.callLater(0, self.sim_receive_commit_image_resp, tid, eid)
165 else:
166 self.log.debug("Unsupported message type", message_type=mid)
167
168 self.defer = Deferred()
169 self.defer.addCallback(self.cb_after_send_omci)
170 self.adapter_agent.send_omci_defer = self.defer
171
172 def setUp(self):
173 self.log = structlog.get_logger()
174 self.log.debug("do setup")
175 self.device_id = '1'
176 self._image_dnld = ImageDownload()
177 self._image_dnld.id = '1'
178 self._image_dnld.name = 'switchd_1012'
179 # self._image_dnld.name = 'xgsont_4.4.4.006.img'
180 self._image_dnld.url = 'http://192.168.100.222:9090/load/4.4.4.006.img'
181 self._image_dnld.crc = 0
182 self._image_dnld.local_dir = '/home/lcui/work/tmp'
183 self._image_dnld.state = ImageDownload.DOWNLOAD_SUCCEEDED # ImageDownload.DOWNLOAD_UNKNOWN
184 self._end_image_busy_try = 2
185 self._act_image_busy_try = 0
186 # self.image_file = '/home/lcui/work/tmp/v_change.txt'
187 self.reactor = EPollReactor()
188 self.defer = Deferred()
189 self.adapter_agent = MockAdapterAgent(self.defer)
190 self.defer.addCallback(self.cb_after_send_omci)
191 pb2_dev = Device(id='1')
192 self.adapter_agent.add_device(pb2_dev)
193 self.core = self.adapter_agent.core
194 self.omci_agent = OpenOMCIAgent(self.core, clock=self.reactor)
195 self.device = self.omci_agent.add_device(self.device_id, self.adapter_agent)
196 self.omci_agent.start()
197 self.omci_agent.database.add('1')
198 self.omci_agent.database.set('1', SoftwareImage.class_id, 0, {"is_committed": 1, "is_active": 1, "is_valid": 1})
199 self.omci_agent.database.set('1', SoftwareImage.class_id, 1, {"is_committed": 0, "is_active": 0, "is_valid": 1})
200
201 def tearDown(self):
202 self.log.debug("Test is Done")
203 self.omci_agent.database.remove('1')
204 self.device = None
205
206 def stop(self):
207 self.reactor.stop()
208 self.log.debug("stopped");
209
210 def get_omci_msg(self, *args, **kargs):
211 m = ''
212 for s in args:
213 m += s
214 m = m.ljust(80, '0')
215 return m + kargs['trailer'] + kargs['mic']
216
217 def sim_receive_sw_download_resp2(self):
218 r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'],
219 self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'],
220 self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'],
221 self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'],
222 trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic'])
223 data = binascii.unhexlify(r)
224 #msg = OmciFrame(data)
225 #print(msg.fields['transaction_id'])
226 #print(msg.fields['omci'])
227 self.device.omci_cc.receive_message(data)
228
229 def sw_action_success(self, instance_id, device_id):
230 self.log.debug("Action Success", device_id=device_id, entity_id=instance_id)
231 self.reactor.callLater(0, self.onu_do_activate)
232
233 def sw_action2_success(self, instance_id, device_id):
234 self.log.debug("Action2 Success", device_id=device_id, entity_id=instance_id)
235
236 def sw_action_fail(self, fail, device_id):
237 self.log.debug("Finally Failed", device_id=device_id)
238 self.log.debug(fail)
239
240 # def test_onu_do_activate(self):
241 def onu_do_activate(self):
242 self.log.debug("do test_onu_do_activate")
243 self.defer = self.device.do_onu_image_activate(self._image_dnld.name)
244 self.defer.addCallbacks(self.sw_action2_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,))
245 self.reactor.callLater(100, self.stop)
246 # self.reactor.run()
247
248 @skip("for Jenkins Verification")
249 def test_onu_do_software_upgrade(self):
250 self.log.debug("do test_onu_do_software_upgrade", download=self._image_dnld)
251 dr = self.omci_agent.database.query('1', SoftwareImage.class_id, 0, "is_committed")
252 self.defer = self.device.do_onu_software_download(self._image_dnld)
253 self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,))
254 # self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail) #, errbackArgs=(self.device_id,))
255 # self.reactor.callLater(1, self.sim_receive_start_sw_download_resp)
256 # self.reactor.callLater(12, self.stop)
257 self.reactor.run()
258
259 @skip("Not used")
260 def test_omci_message(self):
261 self.log.debug("do test_omci_message")
262 r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'],
263 self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'],
264 self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'],
265 self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'],
266 trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic'])
267 data = binascii.unhexlify(r)
268 msg = OmciFrame(data)
269 self.log.debug(binascii.hexlify(str(msg)))
270 # print("%04X" % msg.fields['transaction_id'])
271 # print("%02X" % msg.fields['message_type'])
272 # print("%02X" % msg.fields['omci'])
273 # print("%X" % msg.fields['omci_message'])
274
275 @skip("Not used")
276 def test_omci_message2(self):
277 self.log.debug("do test_omci_message2")
278 msg = OmciFrame(
279 transaction_id=0x0001,
280 message_type=OmciStartSoftwareDownloadResponse.message_id,
281 omci_message=OmciStartSoftwareDownloadResponse(
282 entity_class=0x7,
283 entity_id=0x0,
284 result=0x0,
285 window_size=0x1F,
286 image_number=1,
287 instance_id=0
288 )
289 )
290 self.log.debug(binascii.hexlify(str(msg)))
291
292this_suite = TestSuite()
293# this_suite.addTest(TestOmciDownload('test_onu_do_software_upgrade'))
294# this_suite.addTest(TestOmciDownload('test_onu_do_activate'))
295