[CORD-3131] Reading uni_port_id from voltha based info
Change-Id: I0fecf37b3fe1286b12f50fc97046164eb5a9bf70
diff --git a/xos/synchronizer/pull_steps/pull_onus.py b/xos/synchronizer/pull_steps/pull_onus.py
index b762297..13a051b 100644
--- a/xos/synchronizer/pull_steps/pull_onus.py
+++ b/xos/synchronizer/pull_steps/pull_onus.py
@@ -13,7 +13,7 @@
# limitations under the License.
from synchronizers.new_base.pullstep import PullStep
-from synchronizers.new_base.modelaccessor import model_accessor, ONUDevice, VOLTService, OLTDevice, PONPort
+from synchronizers.new_base.modelaccessor import model_accessor, ONUDevice, VOLTService, OLTDevice, PONPort, PONONUPort, UNIPort
from xosconfig import Config
from multistructlog import create_logger
@@ -57,7 +57,7 @@
log.debug("received devices", onus=devices)
# TODO
- # [ ] delete ONUS as ONUDevice.objects.all() - updated OLTs
+ # [ ] delete ONUS as ONUDevice.objects.all() - updated ONUs
if r.status_code != 200:
log.info("It was not possible to fetch devices from VOLTHA")
@@ -83,6 +83,8 @@
if model.enacted < model.updated:
log.info("Skipping pull on ONUDevice %s as enacted < updated" % model.serial_number, serial_number=model.serial_number, id=model.id, enacted=model.enacted, updated=model.updated)
+ # if we are not updating the device we still need to pull ports
+ self.fetch_onu_ports(model)
return
except IndexError:
@@ -109,12 +111,83 @@
model.save()
+ self.fetch_onu_ports(model)
+
updated_onus.append(model)
return updated_onus
+ def fetch_onu_ports(self, onu):
+ voltha_url = Helpers.get_voltha_info(self.volt_service)['url']
+ voltha_port = Helpers.get_voltha_info(self.volt_service)['port']
+ try:
+ r = requests.get("%s:%s/api/v1/devices/%s/ports" % (voltha_url, voltha_port, onu.device_id))
+ if r.status_code != 200:
+ log.info("It was not possible to fetch ports from VOLTHA for ONUDevice %s" % onu.device_id)
+ ports = r.json()['items']
+
+ log.debug("received ports", ports=ports, onu=onu.device_id)
+
+ self.create_or_update_ports(ports, onu)
+
+ except ConnectionError, e:
+ log.warn("It was not possible to connect to VOLTHA", reason=e)
+ return
+ except InvalidURL, e:
+ log.warn("VOLTHA url is invalid, is it configured in the VOLTService?", reason=e)
+ return
+ return
+
+ def create_or_update_ports(self, ports, onu):
+ uni_ports = [p for p in ports if "ETHERNET_UNI" in p["type"]]
+ pon_onu_ports = [p for p in ports if "PON_ONU" in p["type"]]
+
+ self.create_or_update_uni_port(uni_ports, onu)
+ self.create_or_update_pon_onu_port(pon_onu_ports, onu)
+
+ def create_or_update_uni_port(self, uni_ports, onu):
+ update_ports = []
+
+ for port in uni_ports:
+ try:
+ model = UNIPort.objects.filter(port_no=port["port_no"], onu_device_id=onu.id)[0]
+ log.debug("UNIPort already exists, updating it", port_no=port["port_no"], onu_device_id=onu.id)
+ except IndexError:
+ model = UNIPort()
+ model.port_no = port["port_no"]
+ model.onu_device_id = onu.id
+ model.name = port["label"]
+ log.debug("UNIPort is new, creating it", port_no=port["port_no"], onu_device_id=onu.id)
+
+ model.admin_state = port["admin_state"]
+ model.oper_status = port["oper_status"]
+ model.save()
+ update_ports.append(model)
+ return update_ports
+
+ def create_or_update_pon_onu_port(self, pon_onu_ports, onu):
+ update_ports = []
+
+ for port in pon_onu_ports:
+ try:
+ model = PONONUPort.objects.filter(port_no=port["port_no"], onu_device_id=onu.id)[0]
+ model.xos_managed = False
+ log.debug("PONONUPort already exists, updating it", port_no=port["port_no"], onu_device_id=onu.id)
+ except IndexError:
+ model = PONONUPort()
+ model.port_no = port["port_no"]
+ model.onu_device_id = onu.id
+ model.name = port["label"]
+ model.xos_managed = False
+ log.debug("PONONUPort is new, creating it", port_no=port["port_no"], onu_device_id=onu.id)
+
+ model.admin_state = port["admin_state"]
+ model.oper_status = port["oper_status"]
+ model.save()
+ update_ports.append(model)
+ return update_ports
diff --git a/xos/synchronizer/pull_steps/test_pull_onus.py b/xos/synchronizer/pull_steps/test_pull_onus.py
index a2b948b..c888639 100644
--- a/xos/synchronizer/pull_steps/test_pull_onus.py
+++ b/xos/synchronizer/pull_steps/test_pull_onus.py
@@ -109,6 +109,11 @@
]
}
+ # TODO add ports
+ self.ports = {
+ "items": []
+ }
+
def tearDown(self):
sys.path = self.sys_path_save
@@ -122,70 +127,27 @@
with patch.object(VOLTService.objects, "all") as olt_service_mock, \
patch.object(OLTDevice.objects, "get") as mock_olt_device, \
patch.object(PONPort.objects, "get") as mock_pon_port, \
- patch.object(ONUDevice, "save") as mock_save:
+ patch.object(ONUDevice, "save", autospec=True) as mock_save:
olt_service_mock.return_value = [self.volt_service]
mock_pon_port.return_value = self.pon_port
mock_olt_device.return_value = self.olt
m.get("http://voltha_url:1234/api/v1/devices", status_code=200, json=self.devices)
+ m.get("http://voltha_url:1234/api/v1/devices/0001130158f01b2d/ports", status_code=200, json=self.ports)
self.sync_step().pull_records()
- # TODO how to asster this?
- # self.assertEqual(existing_olt.admin_state, "ENABLED")
- # self.assertEqual(existing_olt.oper_status, "ACTIVE")
- # self.assertEqual(existing_olt.volt_service_id, "volt_service_id")
- # self.assertEqual(existing_olt.device_id, "test_id")
- # self.assertEqual(existing_olt.of_id, "of_id")
- # self.assertEqual(existing_olt.dp_id, "of:0000000ce2314000")
+ saved_onu = mock_save.call_args[0][0]
- mock_save.assert_called_with()
+ self.assertEqual(saved_onu.admin_state, "ENABLED")
+ self.assertEqual(saved_onu.oper_status, "ACTIVE")
+ self.assertEqual(saved_onu.connect_status, "REACHABLE")
+ self.assertEqual(saved_onu.device_type, "broadcom_onu")
+ self.assertEqual(saved_onu.vendor, "Broadcom")
+ self.assertEqual(saved_onu.device_id, "0001130158f01b2d")
- @requests_mock.Mocker()
- def _test_pull_existing(self, m):
+ self.assertEqual(mock_save.call_count, 1)
- existing_olt = Mock()
- existing_olt.enacted = 2
- existing_olt.updated = 1
-
- with patch.object(VOLTService.objects, "all") as olt_service_mock, \
- patch.object(OLTDevice.objects, "filter") as mock_get, \
- patch.object(existing_olt, "save") as mock_save:
- olt_service_mock.return_value = [self.volt_service]
- mock_get.return_value = [existing_olt]
-
- m.get("http://voltha_url:1234/api/v1/devices", status_code=200, json=self.devices)
- m.get("http://voltha_url:1234/api/v1/logical_devices", status_code=200, json=self.logical_devices)
-
- self.sync_step().pull_records()
-
- self.assertEqual(existing_olt.admin_state, "ENABLED")
- self.assertEqual(existing_olt.oper_status, "ACTIVE")
- self.assertEqual(existing_olt.volt_service_id, "volt_service_id")
- self.assertEqual(existing_olt.device_id, "test_id")
- self.assertEqual(existing_olt.of_id, "of_id")
- self.assertEqual(existing_olt.dp_id, "of:0000000ce2314000")
-
- mock_save.assert_called()
-
- @requests_mock.Mocker()
- def _test_pull_existing_do_not_sync(self, m):
- existing_olt = Mock()
- existing_olt.enacted = 1
- existing_olt.updated = 2
-
- with patch.object(VOLTService.objects, "all") as olt_service_mock, \
- patch.object(OLTDevice.objects, "get") as mock_get, \
- patch.object(existing_olt, "save") as mock_save:
- olt_service_mock.return_value = [self.volt_service]
- mock_get.return_value = existing_olt
-
- m.get("http://voltha_url:1234/api/v1/devices", status_code=200, json=self.devices)
- m.get("http://voltha_url:1234/api/v1/logical_devices", status_code=200, json=self.logical_devices)
-
- self.sync_step().pull_records()
-
- mock_save.assert_not_called()
if __name__ == "__main__":
unittest.main()
\ No newline at end of file