SEBA-497 delayering, Makefile, and tox for rcord-synchronizer
Change-Id: I1005db39de51e43bdfd9b17fe6290a37131e4605
diff --git a/xos/synchronizer/models/__init__.py b/xos/synchronizer/models/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/xos/synchronizer/models/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/models/convenience/rcordsubscriber.py b/xos/synchronizer/models/convenience/rcordsubscriber.py
index 47c29ed..e6e7bf0 100644
--- a/xos/synchronizer/models/convenience/rcordsubscriber.py
+++ b/xos/synchronizer/models/convenience/rcordsubscriber.py
@@ -14,9 +14,9 @@
# limitations under the License.
-import json
from xosapi.orm import ORMWrapper, register_convenience_wrapper
+
class ORMWrapperRCORDSubscriber(ORMWrapper):
@property
def volt(self):
@@ -24,7 +24,7 @@
for link in links:
# FIXME: hardcoded service dependency
# cast from ServiceInstance to VOLTServiceInstance
- volts = self.stub.VOLTServiceInstance.objects.filter(id = link.provider_service_instance.id)
+ volts = self.stub.VOLTServiceInstance.objects.filter(id=link.provider_service_instance.id)
if volts:
return volts[0]
return None
@@ -38,4 +38,5 @@
"downlink_speed",
"status")
+
register_convenience_wrapper("RCORDSubscriber", ORMWrapperRCORDSubscriber)
diff --git a/xos/synchronizer/models/models.py b/xos/synchronizer/models/models.py
index 7227ab9..f5ac079 100644
--- a/xos/synchronizer/models/models.py
+++ b/xos/synchronizer/models/models.py
@@ -16,17 +16,20 @@
import socket
import random
-from xos.exceptions import XOSValidationError, XOSProgrammingError, XOSPermissionDenied
+from xos.exceptions import XOSValidationError, XOSProgrammingError
from models_decl import RCORDService_decl, RCORDSubscriber_decl, RCORDIpAddress_decl, BandwidthProfile_decl
+
class BandwidthProfile(BandwidthProfile_decl):
class Meta:
proxy = True
+
class RCORDService(RCORDService_decl):
class Meta:
proxy = True
+
class RCORDIpAddress(RCORDIpAddress_decl):
class Meta:
proxy = True
@@ -44,6 +47,7 @@
super(RCORDIpAddress, self).save(*args, **kwargs)
return
+
class RCORDSubscriber(RCORDSubscriber_decl):
class Meta:
@@ -149,7 +153,9 @@
is_update_with_same_tag = True
if self.c_tag in self.get_used_c_tags() and not is_update_with_same_tag:
- raise XOSValidationError("The c_tag you specified (%s) has already been used on device %s" % (self.c_tag, self.onu_device))
+ raise XOSValidationError(
+ "The c_tag you specified (%s) has already been used on device %s" %
+ (self.c_tag, self.onu_device))
# validate s_tag and c_tag combination
if self.c_tag and self.s_tag:
@@ -173,10 +179,14 @@
self.set_owner()
- if self.status != "pre-provisioned" and hasattr(self.owner.leaf_model, "access") and self.owner.leaf_model.access == "voltha" and not self.deleted:
+ if self.status != "pre-provisioned" and \
+ hasattr(self.owner.leaf_model, "access") and \
+ self.owner.leaf_model.access == "voltha" and \
+ not self.deleted:
# if the access network is managed by voltha, validate that onu_device actually exist
- volt_service = self.owner.provider_services[0].leaf_model # we assume RCORDService is connected only to the vOLTService
+ # we assume RCORDService is connected only to the vOLTService
+ volt_service = self.owner.provider_services[0].leaf_model
if not volt_service.has_access_device(self.onu_device):
raise XOSValidationError("The onu_device you specified (%s) does not exists" % self.onu_device)
diff --git a/xos/synchronizer/models/test_models.py b/xos/synchronizer/models/test_models.py
index 072b7f5..4fe767f 100644
--- a/xos/synchronizer/models/test_models.py
+++ b/xos/synchronizer/models/test_models.py
@@ -13,25 +13,30 @@
# limitations under the License.
import unittest
-import os, sys
+import os
+import sys
from mock import patch, Mock, MagicMock
-test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-service_dir=os.path.join(test_path, "../../../..")
-xos_dir=os.path.join(test_path, "../../..")
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+service_dir = os.path.join(test_path, "../../../..")
+xos_dir = os.path.join(test_path, "../../..")
if not os.path.exists(os.path.join(test_path, "new_base")):
- xos_dir=os.path.join(test_path, "../../../../../../orchestration/xos/xos")
- services_dir=os.path.join(xos_dir, "../../xos_services")
+ xos_dir = os.path.join(test_path, "../../../../../../orchestration/xos/xos")
+ services_dir = os.path.join(xos_dir, "../../xos_services")
# mocking XOS exception, as they're based in Django
+
+
class Exceptions:
XOSValidationError = Exception
XOSProgrammingError = Exception
XOSPermissionDenied = Exception
+
class XOS:
exceptions = Exceptions
+
class TestRCORDModels(unittest.TestCase):
def setUp(self):
@@ -51,7 +56,6 @@
self.models_decl.RCORDIpAddress_decl.objects = Mock()
self.models_decl.RCORDIpAddress_decl.objects.filter.return_value = []
-
modules = {
'xos.exceptions': self.xos.exceptions,
'models_decl': self.models_decl
@@ -68,7 +72,7 @@
self.rcord_subscriber = RCORDSubscriber()
self.rcord_subscriber.deleted = False
- self.rcord_subscriber.id = None # this is a new model
+ self.rcord_subscriber.id = None # this is a new model
self.rcord_subscriber.is_new = True
self.rcord_subscriber.onu_device = "BRCM1234"
self.rcord_subscriber.c_tag = 111
@@ -80,7 +84,7 @@
self.rcord_subscriber.owner.provider_services = [self.volt]
self.rcord_ip = RCORDIpAddress()
- self.rcord_ip.subscriber = 1;
+ self.rcord_ip.subscriber = 1
def tearDown(self):
sys.path = self.sys_path_save
@@ -184,7 +188,8 @@
with self.assertRaises(Exception) as e:
self.rcord_subscriber.save()
- self.assertEqual(e.exception.message, "The c_tag(111) and s_tag(222) pair you specified,has already been used by Subscriber with Id (123)")
+ self.assertEqual(e.exception.message,
+ "The c_tag(111) and s_tag(222) pair you specified,has already been used by Subscriber with Id (123)")
self.models_decl.RCORDSubscriber_decl.save.assert_not_called()
def test_validate_c_tag_on_update(self):