[SEBA-367] olt,onu synchronization fix
Change-Id: If7a5709537bda3b634e8aa1f710ce619886e26f7
diff --git a/xos/synchronizer/pull_steps/test_pull_olts.py b/xos/synchronizer/pull_steps/test_pull_olts.py
index 2fb4a49..7197f8d 100644
--- a/xos/synchronizer/pull_steps/test_pull_olts.py
+++ b/xos/synchronizer/pull_steps/test_pull_olts.py
@@ -217,6 +217,46 @@
mock_olt_delete.assert_called()
+#[SEBA-367] Unit test for blank response recieved from Voltha
+
+ @requests_mock.Mocker()
+ def test_blank_response_received(self, m):
+
+ m.get("http://voltha_url:1234/api/v1/devices", status_code=200, text="")
+ with patch.object(VOLTService.objects, "all") as olt_service_mock, \
+ patch.object(PONPort, "save") as mock_pon_save, \
+ patch.object(NNIPort, "save") as mock_nni_save, \
+ patch.object(OLTDevice.objects, "get_items") as mock_get:
+
+ olt_service_mock.return_value = [self.volt_service]
+
+ self.sync_step(model_accessor=self.model_accessor).pull_records()
+
+ olt_service_mock.assert_called()
+ mock_pon_save.assert_not_called()
+ mock_nni_save.assert_not_called()
+ mock_get.assert_not_called()
+
+#[SEBA-367] Unit test for invalid json received from Voltha
+
+ @requests_mock.Mocker()
+ def test_invalid_json(self, m):
+
+ m.get("http://voltha_url:1234/api/v1/devices", status_code=200, text="{\"items\" : [host_and_port }")
+ with patch.object(VOLTService.objects, "all") as olt_service_mock, \
+ patch.object(PONPort, "save") as mock_pon_save, \
+ patch.object(NNIPort, "save") as mock_nni_save, \
+ patch.object(OLTDevice.objects, "get_items") as mock_get:
+
+ olt_service_mock.return_value = [self.volt_service]
+
+ self.sync_step(model_accessor=self.model_accessor).pull_records()
+
+ olt_service_mock.assert_called()
+ mock_pon_save.assert_not_called()
+ mock_nni_save.assert_not_called()
+ mock_get.assert_not_called()
if __name__ == "__main__":
- unittest.main()
\ No newline at end of file
+ unittest.main()
+