blob: a889be7df7345866665e1975786089391eaed117 [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import binascii
import structlog
from unittest import TestCase, TestSuite, skip
from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent
from pyvoltha.adapters.extensions.omci.omci_entities import SoftwareImage
from pyvoltha.adapters.extensions.omci.omci_frame import OmciFrame
from pyvoltha.adapters.extensions.omci.omci_messages import \
OmciStartSoftwareDownload, OmciStartSoftwareDownloadResponse, \
OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \
OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \
OmciActivateImage, OmciActivateImageResponse, \
OmciCommitImage, OmciCommitImageResponse
from voltha_protos.device_pb2 import Device, ImageDownload
from test.unit.extensions.omci.mock.mock_adapter_agent import MockAdapterAgent, MockCore
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.epollreactor import EPollReactor
from time import sleep
class TestOmciDownload(TestCase):
# def __init__(self, device_id='1', image_file='/home/lcui/work/tmp/v_change.txt', **kwargs):
# self.device_id = device_id
# self.image_file = image_file
# super(TestOmciDownload, self).__init__(**kwargs)
sw_dwld_resp = {
'tid': '0001',
'mid': '33',
'did': '0A',
'entity_class': '0007',
'entity_id' : '0000',
'reason' : '0000',
'window_size' : '001F',
'inst_num' : '0001',
'inst_id' : '0000',
'trailer' : '00000028',
'mic' : '00000000'
}
# sw_dwld_resp = '0001330A000700000000001f010000'
### Test Functions ###
def sim_receive_start_sw_download_resp(self, tid, eid, r=0):
msg = OmciFrame(
transaction_id=tid,
message_type=OmciStartSoftwareDownloadResponse.message_id,
omci_message=OmciStartSoftwareDownloadResponse(
entity_class=0x7,
entity_id=eid,
result=r,
window_size=0x1F,
image_number=1,
instance_id=eid
)
)
self.device.omci_cc.receive_message(msg)
def sim_receive_download_section_resp(self, tid, eid, section, r=0):
msg = OmciFrame(
transaction_id=tid,
message_type=OmciDownloadSectionResponse.message_id,
omci_message=OmciDownloadSectionResponse(
entity_class=0x7,
entity_id=eid,
result=r,
section_number=section
)
)
self.device.omci_cc.receive_message(msg)
def sim_receive_end_sw_download_resp(self, tid, eid, r=0):
msg = OmciFrame(
transaction_id=tid,
message_type=OmciEndSoftwareDownloadResponse.message_id,
omci_message=OmciEndSoftwareDownloadResponse(
entity_class=0x7,
entity_id=eid,
result=r,
image_number=0x1,
instance_id=eid,
result0=0x0
)
)
self.device.omci_cc.receive_message(msg)
def sim_receive_activate_image_resp(self, tid, eid, r=0):
msg = OmciFrame(
transaction_id=tid,
message_type=OmciActivateImageResponse.message_id,
omci_message=OmciActivateImageResponse(
entity_class=0x7,
entity_id=eid,
result = r
))
self.device.omci_cc.receive_message(msg)
def sim_receive_commit_image_resp(self, tid, eid, r=0):
msg = OmciFrame(
transaction_id=tid,
message_type=OmciCommitImageResponse.message_id,
omci_message=OmciCommitImageResponse(
entity_class=0x7,
entity_id=eid,
result = r
))
self.device.omci_cc.receive_message(msg)
def cb_after_send_omci(self, msg):
self.log.debug("cb_after_send_omci")
dmsg = OmciFrame(binascii.unhexlify(msg))
tid = dmsg.fields['transaction_id']
mid = dmsg.fields['message_type']
dmsg_body = dmsg.fields['omci_message']
eid = dmsg_body.fields['entity_id']
# print("%X" % dmsg.fields['transaction_id'])
# print("%X" % dmsg.fields['message_type'])
# print("%X" % OmciActivateImage.message_id)
# print("%X" % dmsg_body.fields['entity_id'])
if mid == OmciStartSoftwareDownload.message_id:
self.log.debug("response start download")
self.reactor.callLater(0, self.sim_receive_start_sw_download_resp, tid, eid)
elif mid == OmciEndSoftwareDownload.message_id:
self.log.debug("response end download")
if self._end_image_busy_try > 0:
self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid, r=6)
self._end_image_busy_try -= 1
else:
self.reactor.callLater(0, self.sim_receive_end_sw_download_resp, tid, eid)
elif mid == OmciDownloadSection.message_id:
self.log.debug("receive download section, not respond")
elif mid == OmciDownloadSectionLast.message_id:
self.log.debug("response download last section")
self.reactor.callLater(0, self.sim_receive_download_section_resp, tid, eid,
section=dmsg_body.fields["section_number"])
elif mid == OmciActivateImage.message_id:
self.log.debug("response activate image")
if self._act_image_busy_try > 0:
self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid, r=6)
self._act_image_busy_try -= 1
else:
self.reactor.callLater(0, self.sim_receive_activate_image_resp, tid, eid)
self.reactor.callLater(2, self.device.image_agent.onu_bootup)
elif mid == OmciCommitImage.message_id:
self.log.debug("response commit image")
self.reactor.callLater(0, self.sim_receive_commit_image_resp, tid, eid)
else:
self.log.debug("Unsupported message type", message_type=mid)
self.defer = Deferred()
self.defer.addCallback(self.cb_after_send_omci)
self.adapter_agent.send_omci_defer = self.defer
def setUp(self):
self.log = structlog.get_logger()
self.log.debug("do setup")
self.device_id = '1'
self._image_dnld = ImageDownload()
self._image_dnld.id = '1'
self._image_dnld.name = 'switchd_1012'
# self._image_dnld.name = 'xgsont_4.4.4.006.img'
self._image_dnld.url = 'http://192.168.100.222:9090/load/4.4.4.006.img'
self._image_dnld.crc = 0
self._image_dnld.local_dir = '/home/lcui/work/tmp'
self._image_dnld.state = ImageDownload.DOWNLOAD_SUCCEEDED # ImageDownload.DOWNLOAD_UNKNOWN
self._end_image_busy_try = 2
self._act_image_busy_try = 0
# self.image_file = '/home/lcui/work/tmp/v_change.txt'
self.reactor = EPollReactor()
self.defer = Deferred()
self.adapter_agent = MockAdapterAgent(self.defer)
self.defer.addCallback(self.cb_after_send_omci)
pb2_dev = Device(id='1')
self.adapter_agent.add_device(pb2_dev)
self.core = self.adapter_agent.core
self.omci_agent = OpenOMCIAgent(self.core, clock=self.reactor)
self.device = self.omci_agent.add_device(self.device_id, self.adapter_agent)
self.omci_agent.start()
self.omci_agent.database.add('1')
self.omci_agent.database.set('1', SoftwareImage.class_id, 0, {"is_committed": 1, "is_active": 1, "is_valid": 1})
self.omci_agent.database.set('1', SoftwareImage.class_id, 1, {"is_committed": 0, "is_active": 0, "is_valid": 1})
def tearDown(self):
self.log.debug("Test is Done")
self.omci_agent.database.remove('1')
self.device = None
def stop(self):
self.reactor.stop()
self.log.debug("stopped");
def get_omci_msg(self, *args, **kargs):
m = ''
for s in args:
m += s
m = m.ljust(80, '0')
return m + kargs['trailer'] + kargs['mic']
def sim_receive_sw_download_resp2(self):
r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'],
self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'],
self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'],
self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'],
trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic'])
data = binascii.unhexlify(r)
#msg = OmciFrame(data)
#print(msg.fields['transaction_id'])
#print(msg.fields['omci'])
self.device.omci_cc.receive_message(data)
def sw_action_success(self, instance_id, device_id):
self.log.debug("Action Success", device_id=device_id, entity_id=instance_id)
self.reactor.callLater(0, self.onu_do_activate)
def sw_action2_success(self, instance_id, device_id):
self.log.debug("Action2 Success", device_id=device_id, entity_id=instance_id)
def sw_action_fail(self, fail, device_id):
self.log.debug("Finally Failed", device_id=device_id)
self.log.debug(fail)
# def test_onu_do_activate(self):
def onu_do_activate(self):
self.log.debug("do test_onu_do_activate")
self.defer = self.device.do_onu_image_activate(self._image_dnld.name)
self.defer.addCallbacks(self.sw_action2_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,))
self.reactor.callLater(100, self.stop)
# self.reactor.run()
@skip("for Jenkins Verification")
def test_onu_do_software_upgrade(self):
self.log.debug("do test_onu_do_software_upgrade", download=self._image_dnld)
dr = self.omci_agent.database.query('1', SoftwareImage.class_id, 0, "is_committed")
self.defer = self.device.do_onu_software_download(self._image_dnld)
self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail, callbackArgs=(self.device_id,), errbackArgs=(self.device_id,))
# self.defer.addCallbacks(self.sw_action_success, self.sw_action_fail) #, errbackArgs=(self.device_id,))
# self.reactor.callLater(1, self.sim_receive_start_sw_download_resp)
# self.reactor.callLater(12, self.stop)
self.reactor.run()
@skip("Not used")
def test_omci_message(self):
self.log.debug("do test_omci_message")
r = self.get_omci_msg(self.sw_dwld_resp['tid'], self.sw_dwld_resp['mid'],
self.sw_dwld_resp['did'], self.sw_dwld_resp['entity_class'],
self.sw_dwld_resp['entity_id'], self.sw_dwld_resp['reason'],
self.sw_dwld_resp['window_size'], self.sw_dwld_resp['inst_num'], self.sw_dwld_resp['inst_id'],
trailer=self.sw_dwld_resp['trailer'], mic=self.sw_dwld_resp['mic'])
data = binascii.unhexlify(r)
msg = OmciFrame(data)
self.log.debug(binascii.hexlify(str(msg)))
# print("%04X" % msg.fields['transaction_id'])
# print("%02X" % msg.fields['message_type'])
# print("%02X" % msg.fields['omci'])
# print("%X" % msg.fields['omci_message'])
@skip("Not used")
def test_omci_message2(self):
self.log.debug("do test_omci_message2")
msg = OmciFrame(
transaction_id=0x0001,
message_type=OmciStartSoftwareDownloadResponse.message_id,
omci_message=OmciStartSoftwareDownloadResponse(
entity_class=0x7,
entity_id=0x0,
result=0x0,
window_size=0x1F,
image_number=1,
instance_id=0
)
)
self.log.debug(binascii.hexlify(str(msg)))
this_suite = TestSuite()
# this_suite.addTest(TestOmciDownload('test_onu_do_software_upgrade'))
# this_suite.addTest(TestOmciDownload('test_onu_do_activate'))