[SEBA-412] Automated reformat of Python code
Passes of modernize, autopep8, black, then check with flake8
flake8 + manual fixes:
lib/xos-config
lib/xos-kafka
lib/xos-util
xos/coreapi
xos/api
xos/xos_client
Change-Id: Ib23cf84cb13beb3c6381fa0d79594dc9131dc815
diff --git a/lib/__init__.py b/lib/__init__.py
index 42722a8..b0fb0b2 100644
--- a/lib/__init__.py
+++ b/lib/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/lib/xos-config/setup.py b/lib/xos-config/setup.py
index 777a7a1..1e683f1 100644
--- a/lib/xos-config/setup.py
+++ b/lib/xos-config/setup.py
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from setuptools import setup
-
try:
from xosutil.autoversion_setup import setup_with_auto_version as setup
except ImportError:
@@ -25,15 +23,14 @@
from xosconfig.version import __version__
-setup(name='XosConfig',
- version=__version__,
- description='XOS Config Library',
- author='Matteo Scandolo',
- author_email='teo@onlab.us',
- packages=['xosconfig'],
- include_package_data=True,
- # TODO add all deps to the install_requires section
- install_requires=[
- 'pykwalify>=1.6.0'
- ]
- )
+setup(
+ name="XosConfig",
+ version=__version__,
+ description="XOS Config Library",
+ author="Matteo Scandolo",
+ author_email="teo@onlab.us",
+ packages=["xosconfig"],
+ include_package_data=True,
+ # TODO add all deps to the install_requires section
+ install_requires=["pykwalify>=1.6.0"],
+)
diff --git a/lib/xos-config/tests/__init__.py b/lib/xos-config/tests/__init__.py
index d4e8062..b0fb0b2 100644
--- a/lib/xos-config/tests/__init__.py
+++ b/lib/xos-config/tests/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
diff --git a/lib/xos-config/tests/test_config.py b/lib/xos-config/tests/test_config.py
index ee80b52..5eb86af 100644
--- a/lib/xos-config/tests/test_config.py
+++ b/lib/xos-config/tests/test_config.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,39 +14,50 @@
import unittest
-from mock import patch
import os
from xosconfig import Config
from xosconfig import Config as Config2
-basic_conf = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/basic_conf.yaml")
-yaml_not_valid = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/yaml_not_valid.yaml")
-invalid_format = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/invalid_format.yaml")
-sample_conf = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/sample_conf.yaml")
-override_conf = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/override_conf.yaml")
-extend_conf = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/extend_conf.yaml")
+basic_conf = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/basic_conf.yaml"
+)
+yaml_not_valid = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/yaml_not_valid.yaml"
+)
+invalid_format = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/invalid_format.yaml"
+)
+sample_conf = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/sample_conf.yaml"
+)
+override_conf = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/override_conf.yaml"
+)
+extend_conf = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/confs/extend_conf.yaml"
+)
-small_schema = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/schemas/small_schema.yaml")
+small_schema = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/schemas/small_schema.yaml"
+)
-services_list = {
- "xos-ws": [],
- "xos-db": [],
-}
+services_list = {"xos-ws": [], "xos-db": []}
db_service = [
- {
- "ModifyIndex": 6,
- "CreateIndex": 6,
- "Node": "0152982c3159",
- "Address": "172.19.0.2",
- "ServiceID": "0d53ce210785:frontend_xos_db_1:5432",
- "ServiceName": "xos-db",
- "ServiceTags": [],
- "ServiceAddress": "172.18.0.4",
- "ServicePort": 5432,
- "ServiceEnableTagOverride": "false"
- }
- ]
+ {
+ "ModifyIndex": 6,
+ "CreateIndex": 6,
+ "Node": "0152982c3159",
+ "Address": "172.19.0.2",
+ "ServiceID": "0d53ce210785:frontend_xos_db_1:5432",
+ "ServiceName": "xos-db",
+ "ServiceTags": [],
+ "ServiceAddress": "172.18.0.4",
+ "ServicePort": 5432,
+ "ServiceEnableTagOverride": "false",
+ }
+]
+
class XOSConfigTest(unittest.TestCase):
"""
@@ -77,15 +87,19 @@
"""
with self.assertRaises(Exception) as e:
Config.get("database")
- self.assertEqual(e.exception.message, "[XOS-Config] Module has not been initialized")
+ self.assertEqual(
+ e.exception.message, "[XOS-Config] Module has not been initialized"
+ )
def test_missing_file_exception(self):
"""
- [XOS-Config] Raise if file not found
+ [XOS-Config] Raise if file not found
"""
with self.assertRaises(Exception) as e:
Config.init("missing_conf")
- self.assertEqual(e.exception.message, "[XOS-Config] Config file not found at: missing_conf")
+ self.assertEqual(
+ e.exception.message, "[XOS-Config] Config file not found at: missing_conf"
+ )
def test_yaml_not_valid(self):
"""
@@ -93,7 +107,9 @@
"""
with self.assertRaises(Exception) as e:
Config.init(yaml_not_valid)
- self.assertTrue(e.exception.message.startswith("[XOS-Config] The config format is wrong:"))
+ self.assertTrue(
+ e.exception.message.startswith("[XOS-Config] The config format is wrong:")
+ )
def test_invalid_format(self):
"""
@@ -101,7 +117,13 @@
"""
with self.assertRaises(Exception) as e:
Config.init(invalid_format)
- self.assertEqual(e.exception.message, "[XOS-Config] The config format is wrong: Schema validation failed:\n - Value '['I am', 'a yaml', 'but the', 'format is not', 'correct']' is not a dict. Value path: ''.")
+ self.assertEqual(
+ e.exception.message,
+ (
+ "[XOS-Config] The config format is wrong: Schema validation failed:\n"
+ " - Value '['I am', 'a yaml', 'but the', 'format is not', 'correct']' is not a dict. Value path: ''."
+ ),
+ )
def test_env_override(self):
"""
@@ -110,7 +132,9 @@
os.environ["XOS_CONFIG_FILE"] = "env.yaml"
with self.assertRaises(Exception) as e:
Config.init("missing_conf")
- self.assertEqual(e.exception.message, "[XOS-Config] Config file not found at: env.yaml")
+ self.assertEqual(
+ e.exception.message, "[XOS-Config] Config file not found at: env.yaml"
+ )
del os.environ["XOS_CONFIG_FILE"]
def test_schema_override(self):
@@ -120,7 +144,10 @@
os.environ["XOS_CONFIG_SCHEMA"] = "env-schema.yaml"
with self.assertRaises(Exception) as e:
Config.init(basic_conf)
- self.assertRegexpMatches(e.exception.message, '\[XOS\-Config\] Config schema not found at: (.+)env-schema\.yaml')
+ self.assertRegexpMatches(
+ e.exception.message,
+ r"\[XOS\-Config\] Config schema not found at: (.+)env-schema\.yaml",
+ )
# self.assertEqual(e.exception.message, "[XOS-Config] Config schema not found at: env-schema.yaml")
del os.environ["XOS_CONFIG_SCHEMA"]
@@ -131,7 +158,13 @@
os.environ["XOS_CONFIG_SCHEMA"] = small_schema
with self.assertRaises(Exception) as e:
Config.init(basic_conf)
- self.assertEqual(e.exception.message, "[XOS-Config] The config format is wrong: Schema validation failed:\n - Key 'database' was not defined. Path: ''.")
+ self.assertEqual(
+ e.exception.message,
+ (
+ "[XOS-Config] The config format is wrong: Schema validation failed:\n"
+ " - Key 'database' was not defined. Path: ''."
+ ),
+ )
del os.environ["XOS_CONFIG_SCHEMA"]
def test_get_cli_param(self):
@@ -174,11 +207,7 @@
# NOTE we are using Config2 here to be sure that the configuration is readable from any import,
# not only from the one that has been used to initialize it
res = Config2.get("database")
- self.assertEqual(res, {
- "name": "xos",
- "username": "test",
- "password": "safe"
- })
+ self.assertEqual(res, {"name": "xos", "username": "test", "password": "safe"})
def test_get_child_level(self):
"""
@@ -192,7 +221,7 @@
"""
[XOS-Config] If an override is provided for the config, it should return the overridden value
"""
- Config.init(sample_conf, 'xos-config-schema.yaml', override_conf)
+ Config.init(sample_conf, "xos-config-schema.yaml", override_conf)
res = Config.get("logging.level")
self.assertEqual(res, "info")
res = Config.get("database.password")
@@ -200,13 +229,16 @@
def test_config_extend(self):
"""
- [XOS-Config] If an override is provided for the config, it should return the overridden value (also if not defined in the base one)
+ [XOS-Config] If an override is provided for the config, it should
+ return the overridden value (also if not defined in the base one)
"""
- Config.init(sample_conf, 'xos-config-schema.yaml', extend_conf)
+
+ Config.init(sample_conf, "xos-config-schema.yaml", extend_conf)
res = Config.get("xos_dir")
self.assertEqual(res, "/opt/xos")
res = Config.get("database.password")
self.assertEqual(res, "safe")
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-config/xosconfig/__init__.py b/lib/xos-config/xosconfig/__init__.py
index d4e8305..9a0b30c 100644
--- a/lib/xos-config/xosconfig/__init__.py
+++ b/lib/xos-config/xosconfig/__init__.py
@@ -13,3 +13,5 @@
# limitations under the License.
from .config import Config
+
+__all__ = ["Config"]
diff --git a/lib/xos-config/xosconfig/config.py b/lib/xos-config/xosconfig/config.py
index ebc696f..aac6ffb 100644
--- a/lib/xos-config/xosconfig/config.py
+++ b/lib/xos-config/xosconfig/config.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,7 +16,6 @@
import os
import sys
import yaml
-import requests
import default
from pykwalify.core import Core as PyKwalify
import pykwalify
@@ -25,20 +23,25 @@
pykwalify.init_logging(1)
DEFAULT_CONFIG_FILE = "/opt/xos/xos_config.yaml"
-DEFAULT_CONFIG_SCHEMA = 'xos-config-schema.yaml'
+DEFAULT_CONFIG_SCHEMA = "xos-config-schema.yaml"
INITIALIZED = False
CONFIG_FILE = None
CONFIG = {}
OVERRIDE_CONFIG = {}
+
class Config:
"""
XOS Configuration APIs
"""
@staticmethod
- def init(config_file=DEFAULT_CONFIG_FILE, config_schema=DEFAULT_CONFIG_SCHEMA, override_config_file=None):
+ def init(
+ config_file=DEFAULT_CONFIG_FILE,
+ config_schema=DEFAULT_CONFIG_SCHEMA,
+ override_config_file=None,
+ ):
# make schema relative to this directory
# TODO give the possibility to specify an absolute path
@@ -58,37 +61,40 @@
# the config module can be initialized only one
if INITIALIZED:
- raise Exception('[XOS-Config] Module already initialized')
+ raise Exception("[XOS-Config] Module already initialized")
INITIALIZED = True
# if XOS_CONFIG_FILE is defined override the config_file
# FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
- if os.environ.get('XOS_CONFIG_FILE'):
- config_file = os.environ['XOS_CONFIG_FILE']
+ if os.environ.get("XOS_CONFIG_FILE"):
+ config_file = os.environ["XOS_CONFIG_FILE"]
# if XOS_CONFIG_SCHEMA is defined override the config_schema
# FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
- if os.environ.get('XOS_CONFIG_SCHEMA'):
- config_schema = Config.get_abs_path(os.environ['XOS_CONFIG_SCHEMA'])
+ if os.environ.get("XOS_CONFIG_SCHEMA"):
+ config_schema = Config.get_abs_path(os.environ["XOS_CONFIG_SCHEMA"])
# allow OVERRIDE_CONFIG_* to be overridden by env vars
- if os.environ.get('XOS_OVERRIDE_CONFIG_FILE'):
- OVERRIDE_CONFIG_FILE = os.environ['XOS_OVERRIDE_CONFIG_FILE']
- if os.environ.get('XOS_OVERRIDE_CONFIG_SCHEMA'):
- OVERRIDE_CONFIG_SCHEMA = Config.get_abs_path(os.environ['XOS_OVERRIDE_CONFIG_SCHEMA'])
+ if os.environ.get("XOS_OVERRIDE_CONFIG_FILE"):
+ OVERRIDE_CONFIG_FILE = os.environ["XOS_OVERRIDE_CONFIG_FILE"]
+ if os.environ.get("XOS_OVERRIDE_CONFIG_SCHEMA"):
+ OVERRIDE_CONFIG_SCHEMA = Config.get_abs_path(
+ os.environ["XOS_OVERRIDE_CONFIG_SCHEMA"]
+ )
# if a -C parameter is set in the cli override the config_file
# FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
if Config.get_cli_param(sys.argv):
config_schema = Config.get_cli_param(sys.argv)
-
CONFIG_FILE = config_file
CONFIG = Config.read_config(config_file, config_schema)
# if an override is set
if OVERRIDE_CONFIG_FILE is not None:
- OVERRIDE_CONFIG = Config.read_config(OVERRIDE_CONFIG_FILE, OVERRIDE_CONFIG_SCHEMA, True)
+ OVERRIDE_CONFIG = Config.read_config(
+ OVERRIDE_CONFIG_FILE, OVERRIDE_CONFIG_SCHEMA, True
+ )
@staticmethod
def get_config_file():
@@ -103,7 +109,7 @@
def get_abs_path(path):
if os.path.isabs(path):
return path
- return os.path.dirname(os.path.realpath(__file__)) + '/' + path
+ return os.path.dirname(os.path.realpath(__file__)) + "/" + path
@staticmethod
def validate_config_format(config_file, config_schema):
@@ -115,7 +121,7 @@
def get_cli_param(args):
last = None
for arg in args:
- if last == '-C':
+ if last == "-C":
return arg
last = arg
@@ -127,25 +133,27 @@
:return: dict
"""
- if(not os.path.exists(config_file) and ignore_if_not_found):
+ if not os.path.exists(config_file) and ignore_if_not_found:
return {}
if not os.path.exists(config_file):
- raise Exception('[XOS-Config] Config file not found at: %s' % config_file)
+ raise Exception("[XOS-Config] Config file not found at: %s" % config_file)
if not os.path.exists(config_schema):
- raise Exception('[XOS-Config] Config schema not found at: %s' % config_schema)
+ raise Exception(
+ "[XOS-Config] Config schema not found at: %s" % config_schema
+ )
try:
Config.validate_config_format(config_file, config_schema)
- except Exception, e:
+ except Exception as e:
try:
error_msg = e.msg
except AttributeError:
error_msg = str(e)
- raise Exception('[XOS-Config] The config format is wrong: %s' % error_msg)
+ raise Exception("[XOS-Config] The config format is wrong: %s" % error_msg)
- with open(config_file, 'r') as stream:
+ with open(config_file, "r") as stream:
return yaml.safe_load(stream)
@staticmethod
@@ -161,7 +169,7 @@
global OVERRIDE_CONFIG_FILE
if not INITIALIZED:
- raise Exception('[XOS-Config] Module has not been initialized')
+ raise Exception("[XOS-Config] Module has not been initialized")
val = Config.get_param(query, CONFIG)
if OVERRIDE_CONFIG_FILE or not val:
@@ -186,10 +194,10 @@
:param config: the config source to read from (can be the config file or the defaults)
:return: the requested parameter in any format the parameter is specified
"""
- keys = query.split('.')
+ keys = query.split(".")
if len(keys) == 1:
key = keys[0]
- if not config.has_key(key):
+ if key not in config:
return None
return config[key]
else:
@@ -205,10 +213,11 @@
"""
param = config
for k in keys:
- if not param.has_key(k):
+ if k not in param:
return None
param = param[k]
return param
-if __name__ == '__main__':
+
+if __name__ == "__main__":
Config.init()
diff --git a/lib/xos-config/xosconfig/default.py b/lib/xos-config/xosconfig/default.py
index 2c73b26..afed387 100644
--- a/lib/xos-config/xosconfig/default.py
+++ b/lib/xos-config/xosconfig/default.py
@@ -13,52 +13,34 @@
# limitations under the License.
DEFAULT_VALUES = {
- 'xos_dir': '/opt/xos',
-
+ "xos_dir": "/opt/xos",
# The configuration below inherits from the standard config of the Python logging module
# See: https://docs.python.org/2/library/logging.config.html
# multistructlog supports this config in all of its generality
# So for instance, you can add new handlers. Note that all handlers will
# receive logs simultaneously.
-
- 'blueprints': {},
- 'logging': {
- 'version': 1,
- 'handlers': {
- 'console': {
- 'class': 'logging.StreamHandler',
+ "blueprints": {},
+ "logging": {
+ "version": 1,
+ "handlers": {
+ "console": {"class": "logging.StreamHandler"},
+ "file": {
+ "class": "logging.handlers.RotatingFileHandler",
+ "filename": "/var/log/xos.log",
+ "maxBytes": 10485760,
+ "backupCount": 5,
},
- 'file': {
- 'class': 'logging.handlers.RotatingFileHandler',
- 'filename': '/var/log/xos.log',
- 'maxBytes': 10485760,
- 'backupCount': 5
- }
},
- 'loggers': {
- '': {
- 'handlers': ['console', 'file'],
- 'level': 'DEBUG'
- }
- }
+ "loggers": {"": {"handlers": ["console", "file"], "level": "DEBUG"}},
},
- 'accessor': {
- 'endpoint': 'xos-core.cord.lab:50051',
- 'kind': 'grpcapi',
- },
- 'keep_temp_files': False,
- 'dependency_graph': None,
- 'error_map_path': '/opt/xos/error_map.txt',
- 'feefie': {
- 'client_user': 'pl'
- },
- 'proxy_ssh': {
- 'enabled': True,
- 'key': '/opt/cord_profile/node_key',
- 'user': 'root'
- },
- 'node_key': '/opt/cord_profile/node_key',
- 'config_dir': '/etc/xos/sync',
- 'backoff_disabled': True,
- 'kafka_bootstrap_servers': ['cord-kafka:9092'],
+ "accessor": {"endpoint": "xos-core.cord.lab:50051", "kind": "grpcapi"},
+ "keep_temp_files": False,
+ "dependency_graph": None,
+ "error_map_path": "/opt/xos/error_map.txt",
+ "feefie": {"client_user": "pl"},
+ "proxy_ssh": {"enabled": True, "key": "/opt/cord_profile/node_key", "user": "root"},
+ "node_key": "/opt/cord_profile/node_key",
+ "config_dir": "/etc/xos/sync",
+ "backoff_disabled": True,
+ "kafka_bootstrap_servers": ["cord-kafka:9092"],
}
diff --git a/lib/xos-config/xosconfig/version.py b/lib/xos-config/xosconfig/version.py
index a118c43..2c84950 100644
--- a/lib/xos-config/xosconfig/version.py
+++ b/lib/xos-config/xosconfig/version.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-
# This file will be replaced by setup.py
__version__ = "unknown"
diff --git a/lib/xos-genx/__init__.py b/lib/xos-genx/__init__.py
index 42722a8..b0fb0b2 100644
--- a/lib/xos-genx/__init__.py
+++ b/lib/xos-genx/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/lib/xos-genx/bin/xosgenx b/lib/xos-genx/bin/xosgenx
index d48f6b0..46392d7 100644
--- a/lib/xos-genx/bin/xosgenx
+++ b/lib/xos-genx/bin/xosgenx
@@ -1,4 +1,4 @@
#!/usr/bin/env python
from xosgenx.xosgen import XosGen
-XosGen.init()
\ No newline at end of file
+XosGen.init()
diff --git a/lib/xos-genx/setup.py b/lib/xos-genx/setup.py
index c97ba52..c294ca4 100644
--- a/lib/xos-genx/setup.py
+++ b/lib/xos-genx/setup.py
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from setuptools import setup
-
try:
from xosutil.autoversion_setup import setup_with_auto_version as setup
except ImportError:
@@ -25,18 +23,15 @@
from xosgenx.version import __version__
-setup(name='XosGenX',
- version=__version__,
- description='XOS Generative Toolchain',
- author='Sapan Bhatia, Matteo Scandolo',
- author_email='sapan@opennetworking.org, teo@opennetworking.org',
- packages=['xosgenx'],
- scripts=['bin/xosgenx'],
- include_package_data=True,
- # TODO add all deps to the install_requires section
- install_requires=[
- 'inflect>=1.0.1',
- 'astunparse>=1.5.0'
- ]
- )
-
+setup(
+ name="XosGenX",
+ version=__version__,
+ description="XOS Generative Toolchain",
+ author="Sapan Bhatia, Matteo Scandolo",
+ author_email="sapan@opennetworking.org, teo@opennetworking.org",
+ packages=["xosgenx"],
+ scripts=["bin/xosgenx"],
+ include_package_data=True,
+ # TODO add all deps to the install_requires section
+ install_requires=["inflect>=1.0.1", "astunparse>=1.5.0"],
+)
diff --git a/lib/xos-genx/tox.ini b/lib/xos-genx/tox.ini
new file mode 100644
index 0000000..56c85a5
--- /dev/null
+++ b/lib/xos-genx/tox.ini
@@ -0,0 +1,21 @@
+; Copyright 2017-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+; using tox.ini to store config information for other tools
+
+[flake8]
+; F821, ignoring undefined names would be valuable, but our testing dynamically loads them
+; W503, allow breaks before binary operators (see: https://github.com/PyCQA/pycodestyle/issues/498)
+ignore = F821, W503
+max-line-length = 200
diff --git a/lib/xos-genx/xos-genx-tests/__init__.py b/lib/xos-genx/xos-genx-tests/__init__.py
index d4e8062..b0fb0b2 100644
--- a/lib/xos-genx/xos-genx-tests/__init__.py
+++ b/lib/xos-genx/xos-genx-tests/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
diff --git a/lib/xos-genx/xos-genx-tests/attics/xosmodel_bottom.py b/lib/xos-genx/xos-genx-tests/attics/xosmodel_bottom.py
index 734aed5..b4fbd54 100644
--- a/lib/xos-genx/xos-genx-tests/attics/xosmodel_bottom.py
+++ b/lib/xos-genx/xos-genx-tests/attics/xosmodel_bottom.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,4 +14,4 @@
def bottom():
- return 'bottom'
+ return "bottom"
diff --git a/lib/xos-genx/xos-genx-tests/attics/xosmodel_header.py b/lib/xos-genx/xos-genx-tests/attics/xosmodel_header.py
index 5cee4a3..9ac13cc 100644
--- a/lib/xos-genx/xos-genx-tests/attics/xosmodel_header.py
+++ b/lib/xos-genx/xos-genx-tests/attics/xosmodel_header.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,4 +14,4 @@
def header():
- return 'header'
+ return "header"
diff --git a/lib/xos-genx/xos-genx-tests/attics/xosmodel_model.py b/lib/xos-genx/xos-genx-tests/attics/xosmodel_model.py
index 048641a..5d5ab91 100644
--- a/lib/xos-genx/xos-genx-tests/attics/xosmodel_model.py
+++ b/lib/xos-genx/xos-genx-tests/attics/xosmodel_model.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,4 +14,4 @@
def model():
- return 'model'
+ return "model"
diff --git a/lib/xos-genx/xos-genx-tests/attics/xosmodel_top.py b/lib/xos-genx/xos-genx-tests/attics/xosmodel_top.py
index 79752d0..e25560a 100644
--- a/lib/xos-genx/xos-genx-tests/attics/xosmodel_top.py
+++ b/lib/xos-genx/xos-genx-tests/attics/xosmodel_top.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,4 +14,4 @@
def top():
- return 'top'
+ return "top"
diff --git a/lib/xos-genx/xos-genx-tests/counts b/lib/xos-genx/xos-genx-tests/counts
index 8ab44d4..bfbbd7c 100755
--- a/lib/xos-genx/xos-genx-tests/counts
+++ b/lib/xos-genx/xos-genx-tests/counts
@@ -5,14 +5,16 @@
from core.models import XOSBase, PlModelMixIn
import pdb
+
def count(lst):
c = 0
for l in lst[0]:
- ll = l.lstrip()
- if (ll and not ll.startswith('#') and ll.rstrip()!='pass' and 'ModelLink' not in ll and 'CHOICES' not in ll):
- c+=1
+ ll = l.lstrip()
+ if (ll and not ll.startswith('#') and ll.rstrip() != 'pass' and 'ModelLink' not in ll and 'CHOICES' not in ll):
+ c += 1
return c
+
def is_model_class(model):
""" Return True if 'model' is something that we're interested in """
if not inspect.isclass(model):
@@ -26,9 +28,9 @@
return False
+
for a in dir(core.models):
- x = getattr(core.models,a)
+ x = getattr(core.models, a)
if (is_model_class(x)):
lines = inspect.getsourcelines(x)
- print x.__name__,":",count(lines)
-
+ print x.__name__, ":", count(lines)
diff --git a/lib/xos-genx/xos-genx-tests/helpers.py b/lib/xos-genx/xos-genx-tests/helpers.py
index ae52076..4687bb3 100644
--- a/lib/xos-genx/xos-genx-tests/helpers.py
+++ b/lib/xos-genx/xos-genx-tests/helpers.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,18 +18,19 @@
# Constants
OUTPUT_DIR = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/out/")
-TMP_TARGET_PATH = os.path.join(OUTPUT_DIR, 'tmp.xtarget')
+TMP_TARGET_PATH = os.path.join(OUTPUT_DIR, "tmp.xtarget")
# Passed around in various security / validation checks
+
+
class FakeObject:
pass
-class XProtoTestHelpers:
+class XProtoTestHelpers:
@staticmethod
def write_tmp_target(target):
- tmp_file = open(TMP_TARGET_PATH, 'w')
+ tmp_file = open(TMP_TARGET_PATH, "w")
tmp_file.write(target)
tmp_file.close()
return TMP_TARGET_PATH
-
diff --git a/lib/xos-genx/xos-genx-tests/test_cli.py b/lib/xos-genx/xos-genx-tests/test_cli.py
index 4beef06..3f94865 100644
--- a/lib/xos-genx/xos-genx-tests/test_cli.py
+++ b/lib/xos-genx/xos-genx-tests/test_cli.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,33 +18,39 @@
from mock import patch
from xosgenx.xosgen import XosGen
+
class Args:
pass
+
class XOSProcessorTest(unittest.TestCase):
"""
Testing the CLI binding for the XOS Generative Toolchain
"""
def setUp(self):
- os.chdir(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), ".."))
+ os.chdir(
+ os.path.join(
+ os.path.abspath(os.path.dirname(os.path.realpath(__file__))), ".."
+ )
+ )
def test_generator(self):
"""
[XOS-GenX] The CLI entry point should correctly parse params
"""
args = Args()
- args.files = ['xos-genx-tests/xproto/test.xproto']
- args.target = 'xos-genx-tests/xtarget/test.xtarget'
- args.output = 'xos-genx-tests/out/dir/'
+ args.files = ["xos-genx-tests/xproto/test.xproto"]
+ args.target = "xos-genx-tests/xtarget/test.xtarget"
+ args.output = "xos-genx-tests/out/dir/"
args.write_to_file = "target"
args.dest_file = None
args.dest_extension = None
expected_args = Args()
- expected_args.files = [os.path.abspath(os.getcwd() + '/' + args.files[0])]
- expected_args.target = os.path.abspath(os.getcwd() + '/' + args.target)
- expected_args.output = os.path.abspath(os.getcwd() + '/' + args.output)
+ expected_args.files = [os.path.abspath(os.getcwd() + "/" + args.files[0])]
+ expected_args.target = os.path.abspath(os.getcwd() + "/" + args.target)
+ expected_args.output = os.path.abspath(os.getcwd() + "/" + args.output)
with patch("xosgenx.xosgen.XOSProcessor.process") as generator:
XosGen.init(args)
@@ -53,5 +58,6 @@
self.assertEqual(actual_args.files, expected_args.files)
self.assertEqual(actual_args.output, expected_args.output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_django_generator.py b/lib/xos-genx/xos-genx-tests/test_django_generator.py
index 4f4dae4..a81f80c 100644
--- a/lib/xos-genx/xos-genx-tests/test_django_generator.py
+++ b/lib/xos-genx/xos-genx-tests/test_django_generator.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,34 +17,36 @@
import os
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
-VROUTER_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto")
+VROUTER_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto"
+)
# Generate Protobuf from Xproto and then parse the resulting Protobuf
+
+
class XProtoProtobufGeneratorTest(unittest.TestCase):
def test_proto_generator(self):
"""
[XOS-GenX] Generate DJANGO models, verify Fields and Foreign Keys
"""
- args = XOSProcessorArgs(files = [VROUTER_XPROTO],
- target = 'django.xtarget')
+ args = XOSProcessorArgs(files=[VROUTER_XPROTO], target="django.xtarget")
output = XOSProcessor.process(args)
- fields = filter(lambda s:'Field(' in s, output.splitlines())
+ fields = filter(lambda s: "Field(" in s, output.splitlines())
self.assertEqual(len(fields), 2)
- links = filter(lambda s:'Key(' in s, output.splitlines())
+ links = filter(lambda s: "Key(" in s, output.splitlines())
self.assertEqual(len(links), 2)
def test_optional_relations(self):
"""
[XOS-GenX] Generate DJANGO models, verify relations
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message ENodeB {
}
-
+
message Handover {
}
@@ -55,15 +56,14 @@
}
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = 'django.xtarget')
+ args = XOSProcessorArgs(inputs=xproto, target="django.xtarget")
output = XOSProcessor.process(args)
- null_true = filter(lambda s: 'null = True' in s, output.splitlines())
- null_false = filter(lambda s: 'null = False' in s, output.splitlines())
+ null_true = filter(lambda s: "null = True" in s, output.splitlines())
+ null_false = filter(lambda s: "null = False" in s, output.splitlines())
- blank_true = filter(lambda s: 'blank = True' in s, output.splitlines())
- blank_false = filter(lambda s: 'blank = False' in s, output.splitlines())
+ blank_true = filter(lambda s: "blank = True" in s, output.splitlines())
+ blank_false = filter(lambda s: "blank = False" in s, output.splitlines())
self.assertEqual(len(null_true), 1)
self.assertEqual(len(null_false), 1)
@@ -74,8 +74,7 @@
"""
[XOS-GenX] Generate DJANGO models, verify feedback_state fields
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message ParentFoo {
@@ -87,8 +86,7 @@
}
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = 'django.xtarget')
+ args = XOSProcessorArgs(inputs=xproto, target="django.xtarget")
output = XOSProcessor.process(args)
self.assertIn("feedback_state_fields = ['parent_name', 'name']", output)
@@ -97,8 +95,7 @@
"""
[XOS-GenX] Use django validors for min and max values
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message Foo (ParentFoo) {
@@ -106,15 +103,13 @@
}
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = 'django.xtarget')
+ args = XOSProcessorArgs(inputs=xproto, target="django.xtarget")
output = XOSProcessor.process(args)
self.assertIn("validators=[", output)
self.assertIn("MinValueValidator(1)", output)
self.assertIn("MaxValueValidator(10)", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_field_graph.py b/lib/xos-genx/xos-genx-tests/test_field_graph.py
index 478708a..cfb4c8b 100644
--- a/lib/xos-genx/xos-genx-tests/test_field_graph.py
+++ b/lib/xos-genx/xos-genx-tests/test_field_graph.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,11 +17,12 @@
from xosgenx.jinja2_extensions import FieldNotFound
from helpers import XProtoTestHelpers
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
+from functools import reduce
+
class XProtoFieldGraphTest(unittest.TestCase):
def _test_field_graph(self):
- xproto = \
-"""
+ xproto = """
message VRouterDevice (PlCoreBase){
optional string name = 1 [help_text = "device friendly name", max_length = 20, null = True, db_index = False, blank = True, unique_with="openflow_id"];
required string openflow_id = 2 [help_text = "device identifier in ONOS", max_length = 20, null = False, db_index = False, blank = False, unique_with="name"];
@@ -39,45 +39,46 @@
}
"""
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{{ xproto_field_graph_components(proto.messages.0.fields, proto.messages.0) }}
-""")
+"""
+ )
- args = XOSProcessorArgs(inputs = xproto,
- target = target)
+ args = XOSProcessorArgs(inputs=xproto, target=target)
output = XOSProcessor.process(args)
- output = eval(output)
- self.assertIn({'A','B','C'}, output)
- self.assertIn({'openflow_id','name'}, output)
- self.assertIn({'config_key','vrouter_service','driver'}, output)
- self.assertIn({'E','F','G'}, output)
-
- union = reduce(lambda acc,x: acc | x, output)
- self.assertNotIn('D', union)
+ output = eval(output)
+ self.assertIn({"A", "B", "C"}, output)
+ self.assertIn({"openflow_id", "name"}, output)
+ self.assertIn({"config_key", "vrouter_service", "driver"}, output)
+ self.assertIn({"E", "F", "G"}, output)
+
+ union = reduce(lambda acc, x: acc | x, output)
+ self.assertNotIn("D", union)
def test_missing_field(self):
- xproto = \
-"""
+ xproto = """
message Foo (PlCoreBase){
required string A = 6 [unique_with="B"];
}
"""
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{{ xproto_field_graph_components(proto.messages.0.fields, proto.messages.0) }}
-""")
+"""
+ )
def generate():
- args = XOSProcessorArgs(inputs = xproto,
- target = target)
+ args = XOSProcessorArgs(inputs=xproto, target=target)
output = XOSProcessor.process(args)
with self.assertRaises(FieldNotFound) as e:
- generate()
+ generate()
- self.assertEqual(e.exception.message, 'Field "B" not found in model "Foo", referenced from field "A" by option "unique_with"')
+ self.assertEqual(
+ e.exception.message,
+ 'Field "B" not found in model "Foo", referenced from field "A" by option "unique_with"',
+ )
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_general_security.py b/lib/xos-genx/xos-genx-tests/test_general_security.py
index 1a7b7ca..c675b16 100644
--- a/lib/xos-genx/xos-genx-tests/test_general_security.py
+++ b/lib/xos-genx/xos-genx-tests/test_general_security.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,33 +20,38 @@
"""The function below is for eliminating warnings arising due to the missing output_security_check,
which is generated and loaded dynamically.
"""
+
+
def output_security_check(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
+
"""
-The tests below use the Python code target to generate
+The tests below use the Python code target to generate
Python security policies, set up an appropriate environment and execute the Python.
"""
+
+
class XProtoSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("""
+ self.target = XProtoTestHelpers.write_tmp_target(
+ """
{% for name, policy in proto.policies.items() %}
{{ xproto_fol_to_python_test(name, policy, None, '0') }}
{% endfor %}
-""")
+"""
+ )
def test_constant(self):
- xproto = \
-"""
+ xproto = """
policy output < True >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def output_security_check(obj, ctx):
@@ -59,17 +63,15 @@
self.assertTrue(verdict)
def test_equal(self):
- xproto = \
-"""
+ xproto = """
policy output < ctx.user = obj.user >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def output_security_check(obj, ctx):
@@ -85,18 +87,18 @@
verdict = output_security_check(obj, ctx)
def test_call_policy(self):
- xproto = \
-"""
+ xproto = """
policy sub_policy < ctx.user = obj.user >
policy output < *sub_policy(child) >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output,globals()) # This loads the generated function, which should look like this:
+ exec(
+ output, globals()
+ ) # This loads the generated function, which should look like this:
"""
def sub_policy_security_check(obj, ctx):
@@ -105,10 +107,10 @@
def output_security_check(obj, ctx):
if obj.child:
- i1 = sub_policy_security_check(obj.child, ctx)
- else:
- i1 = True
- return i1
+ i1 = sub_policy_security_check(obj.child, ctx)
+ else:
+ i1 = True
+ return i1
"""
obj = FakeObject()
@@ -122,18 +124,18 @@
self.assertTrue(verdict)
def test_call_policy_child_none(self):
- xproto = \
-"""
+ xproto = """
policy sub_policy < ctx.user = obj.user >
policy output < *sub_policy(child) >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output,globals()) # This loads the generated function, which should look like this:
+ exec(
+ output, globals()
+ ) # This loads the generated function, which should look like this:
"""
def sub_policy_security_check(obj, ctx):
@@ -142,10 +144,10 @@
def output_security_check(obj, ctx):
if obj.child:
- i1 = sub_policy_security_check(obj.child, ctx)
- else:
- i1 = True
- return i1
+ i1 = sub_policy_security_check(obj.child, ctx)
+ else:
+ i1 = True
+ return i1
"""
obj = FakeObject()
@@ -158,23 +160,21 @@
self.assertTrue(verdict)
def test_bin(self):
- xproto = \
-"""
+ xproto = """
policy output < ctx.is_admin = True | obj.empty = True>
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
- def output_security_check(obj, ctx):
- i2 = (ctx.is_admin == True)
- i3 = (obj.empty == True)
- i1 = (i2 or i3)
- return i1
+ def output_security_check(obj, ctx):
+ i2 = (ctx.is_admin == True)
+ i3 = (obj.empty == True)
+ i1 = (i2 or i3)
+ return i1
"""
obj = FakeObject()
@@ -187,35 +187,30 @@
self.assertTrue(verdict)
-
def test_exists(self):
- xproto = \
-"""
+ xproto = """
policy output < exists Privilege: Privilege.object_id = obj.id >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
- def output_security_check(obj, ctx):
- i1 = Privilege.objects.filter(object_id=obj.id)
- return i1
+ def output_security_check(obj, ctx):
+ i1 = Privilege.objects.filter(object_id=obj.id)
+ return i1
"""
self.assertTrue(output_security_check is not None)
-
+
def test_python(self):
- xproto = \
-"""
+ xproto = """
policy output < {{ "jack" in ["the", "box"] }} = False >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def output_security_check(obj, ctx):
@@ -228,13 +223,11 @@
def test_forall(self):
# This one we only parse
- xproto = \
-"""
+ xproto = """
policy output < forall Credential: Credential.obj_id = obj_id >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
"""
@@ -245,5 +238,6 @@
"""
exec(output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_general_validation.py b/lib/xos-genx/xos-genx-tests/test_general_validation.py
index 048167a..0e2a785 100644
--- a/lib/xos-genx/xos-genx-tests/test_general_validation.py
+++ b/lib/xos-genx/xos-genx-tests/test_general_validation.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,33 +21,38 @@
"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
which is generated and loaded dynamically.
"""
+
+
def policy_output_validator(x, y):
raise Exception("Validator not generated. Test failed.")
return False
+
"""
-The tests below use the Python code target to generate
+The tests below use the Python code target to generate
Python validation policies, set up an appropriate environment and execute the Python.
"""
+
+
class XProtoGeneralValidationTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("""
+ self.target = XProtoTestHelpers.write_tmp_target(
+ """
{% for name, policy in proto.policies.items() %}
{{ xproto_fol_to_python_validator(name, policy, None, 'Necessary Failure') }}
{% endfor %}
-""")
+"""
+ )
def test_constant(self):
- xproto = \
-"""
+ xproto = """
policy output < False >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -58,20 +62,18 @@
"""
with self.assertRaises(Exception):
- policy_output_validator({}, {})
-
+ policy_output_validator({}, {})
+
def test_equal(self):
- xproto = \
-"""
+ xproto = """
policy output < not (ctx.user = obj.user) >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -87,20 +89,18 @@
ctx.user = 1
with self.assertRaises(Exception):
- policy_output_validator(obj, ctx)
+ policy_output_validator(obj, ctx)
def test_equal(self):
- xproto = \
-"""
+ xproto = """
policy output < not (ctx.user = obj.user) >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -116,11 +116,10 @@
ctx.user = 1
with self.assertRaises(Exception):
- policy_output_validator(obj, ctx)
+ policy_output_validator(obj, ctx)
def test_bin(self):
- xproto = \
-"""
+ xproto = """
policy output < (ctx.is_admin = True | obj.empty = True) | False>
"""
@@ -129,7 +128,7 @@
args.target = self.target
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -149,17 +148,14 @@
with self.assertRaises(Exception):
verdict = policy_output_validator(obj, ctx)
-
def test_exists(self):
- xproto = \
-"""
+ xproto = """
policy output < exists Privilege: Privilege.object_id = obj.id >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -169,16 +165,14 @@
"""
self.assertTrue(policy_output_validator is not None)
-
+
def test_python(self):
- xproto = \
-"""
+ xproto = """
policy output < {{ "jack" in ["the", "box"] }} = True >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -192,18 +186,18 @@
self.assertTrue(policy_output_validator({}, {}) is True)
def test_call_policy(self):
- xproto = \
-"""
+ xproto = """
policy sub_policy < ctx.user = obj.user >
policy output < *sub_policy(child) >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
- exec(output,globals()) # This loads the generated function, which should look like this:
+ exec(
+ output, globals()
+ ) # This loads the generated function, which should look like this:
"""
def policy_sub_policy_validator(obj, ctx):
@@ -229,13 +223,11 @@
def test_forall(self):
# This one we only parse
- xproto = \
-"""
+ xproto = """
policy output < forall Credential: Credential.obj_id = obj_id >
"""
- args = XOSProcessorArgs(inputs = xproto,
- target = self.target)
+ args = XOSProcessorArgs(inputs=xproto, target=self.target)
output = XOSProcessor.process(args)
@@ -246,7 +238,8 @@
return i1
"""
- self.assertIn('policy_output_validator', output)
+ self.assertIn("policy_output_validator", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_generator.py b/lib/xos-genx/xos-genx-tests/test_generator.py
index 3daa594..1de6bd8 100644
--- a/lib/xos-genx/xos-genx-tests/test_generator.py
+++ b/lib/xos-genx/xos-genx-tests/test_generator.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -30,38 +29,64 @@
description: "Help Files"
"""
-BASE_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/base.xproto")
-TEST_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/test.xproto")
-FIELDTEST_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/fieldtest.xproto")
-REVERSEFIELDTEST_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/reversefieldtest.xproto")
-FILTERTEST_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/filtertest.xproto")
-SKIP_DJANGO_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/skip_django.xproto")
-VROUTER_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto")
-TEST_TARGET = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xtarget/test.xtarget")
-FIELDTEST_TARGET = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xtarget/fieldtest.xtarget")
-FILTERTEST_TARGET = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xtarget/filtertest.xtarget")
-SPLIT_TARGET = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xtarget/split.xtarget")
+BASE_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/base.xproto"
+)
+TEST_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/test.xproto"
+)
+FIELDTEST_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/fieldtest.xproto"
+)
+REVERSEFIELDTEST_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/reversefieldtest.xproto"
+)
+FILTERTEST_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/filtertest.xproto"
+)
+SKIP_DJANGO_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/skip_django.xproto"
+)
+VROUTER_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto"
+)
+TEST_TARGET = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xtarget/test.xtarget"
+)
+FIELDTEST_TARGET = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xtarget/fieldtest.xtarget"
+)
+FILTERTEST_TARGET = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xtarget/filtertest.xtarget"
+)
+SPLIT_TARGET = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xtarget/split.xtarget"
+)
TEST_ATTICS = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/attics/")
+
class XOSProcessorTest(unittest.TestCase):
"""
Testing the XOS Generative Toolchain
"""
def setUp(self):
- os.chdir(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), ".."))
+ os.chdir(
+ os.path.join(
+ os.path.abspath(os.path.dirname(os.path.realpath(__file__))), ".."
+ )
+ )
filesToRemove = [f for f in os.listdir(OUTPUT_DIR)]
for f in filesToRemove:
- if not f.startswith('.'):
- os.remove(OUTPUT_DIR + '/' + f)
+ if not f.startswith("."):
+ os.remove(OUTPUT_DIR + "/" + f)
def test_generator_custom_target_from_file(self):
"""
[XOS-GenX] Generate output from base.xproto
"""
- args = XOSProcessorArgs(files = [TEST_XPROTO],
- target = TEST_TARGET)
+ args = XOSProcessorArgs(files=[TEST_XPROTO], target=TEST_TARGET)
output = XOSProcessor.process(args)
self.assertEqual(output, TEST_EXPECTED_OUTPUT)
@@ -69,8 +94,7 @@
"""
[XOS-GenX] Generate output from base.xproto
"""
- args = XOSProcessorArgs(inputs = open(TEST_XPROTO).read(),
- target = TEST_TARGET)
+ args = XOSProcessorArgs(inputs=open(TEST_XPROTO).read(), target=TEST_TARGET)
output = XOSProcessor.process(args)
self.assertEqual(output, TEST_EXPECTED_OUTPUT)
@@ -78,73 +102,79 @@
"""
[XOS-GenX] Generate django output from test.xproto
"""
- args = XOSProcessorArgs(files = [TEST_XPROTO, VROUTER_XPROTO],
- target = 'django.xtarget',
- attic = TEST_ATTICS,
- output = OUTPUT_DIR,
- dest_extension = 'py',
- write_to_file = 'model')
+ args = XOSProcessorArgs(
+ files=[TEST_XPROTO, VROUTER_XPROTO],
+ target="django.xtarget",
+ attic=TEST_ATTICS,
+ output=OUTPUT_DIR,
+ dest_extension="py",
+ write_to_file="model",
+ )
output = XOSProcessor.process(args)
# xosmodel has custom header attic
- self.assertIn('from xosmodel_header import *', output['XOSModel'])
- self.assertIn('class XOSModel(XOSBase):', output['XOSModel'])
+ self.assertIn("from xosmodel_header import *", output["XOSModel"])
+ self.assertIn("class XOSModel(XOSBase):", output["XOSModel"])
# vrouter port use the default header
- self.assertIn('header import *', output['VRouterPort'])
- self.assertIn('class VRouterPort(XOSBase):', output['VRouterPort'])
+ self.assertIn("header import *", output["VRouterPort"])
+ self.assertIn("class VRouterPort(XOSBase):", output["VRouterPort"])
- #verify files
- xosmodel = OUTPUT_DIR + '/xosmodel.py'
+ # verify files
+ xosmodel = OUTPUT_DIR + "/xosmodel.py"
self.assertTrue(os.path.isfile(xosmodel))
xmf = open(xosmodel).read()
- self.assertIn('from xosmodel_header import *', xmf)
- self.assertIn('class XOSModel(XOSBase):', xmf)
+ self.assertIn("from xosmodel_header import *", xmf)
+ self.assertIn("class XOSModel(XOSBase):", xmf)
- vrouterport = OUTPUT_DIR + '/vrouterport.py'
+ vrouterport = OUTPUT_DIR + "/vrouterport.py"
self.assertTrue(os.path.isfile(vrouterport))
vrpf = open(vrouterport).read()
- self.assertIn('header import *', vrpf)
- self.assertIn('class VRouterPort(XOSBase):', vrpf)
+ self.assertIn("header import *", vrpf)
+ self.assertIn("class VRouterPort(XOSBase):", vrpf)
def test_django_with_base(self):
- args = XOSProcessorArgs(files = [TEST_XPROTO, BASE_XPROTO],
- target = 'django.xtarget',
- attic = TEST_ATTICS,
- output = OUTPUT_DIR,
- dest_extension = 'py',
- write_to_file = 'model')
+ args = XOSProcessorArgs(
+ files=[TEST_XPROTO, BASE_XPROTO],
+ target="django.xtarget",
+ attic=TEST_ATTICS,
+ output=OUTPUT_DIR,
+ dest_extension="py",
+ write_to_file="model",
+ )
output = XOSProcessor.process(args)
# verify files
- xosmodel = OUTPUT_DIR + '/xosmodel.py'
+ xosmodel = OUTPUT_DIR + "/xosmodel.py"
self.assertTrue(os.path.isfile(xosmodel))
xmf = open(xosmodel).read()
- self.assertIn('from xosmodel_header import *', xmf)
- self.assertIn('class XOSModel(XOSBase):', xmf)
+ self.assertIn("from xosmodel_header import *", xmf)
+ self.assertIn("class XOSModel(XOSBase):", xmf)
- xosbase = OUTPUT_DIR + '/xosbase.py'
+ xosbase = OUTPUT_DIR + "/xosbase.py"
self.assertTrue(os.path.isfile(xosbase))
xbf = open(xosbase).read()
- self.assertIn('header import *', xbf)
- self.assertIn('class XOSBase(models.Model, PlModelMixIn):', xbf)
+ self.assertIn("header import *", xbf)
+ self.assertIn("class XOSBase(models.Model, PlModelMixIn):", xbf)
def test_write_multiple_files(self):
"""
[XOS-GenX] read multiple models as input, print one file per model
"""
- args = XOSProcessorArgs(files = [TEST_XPROTO, VROUTER_XPROTO],
- target = TEST_TARGET,
- output = OUTPUT_DIR,
- dest_extension = 'txt',
- write_to_file = 'model')
+ args = XOSProcessorArgs(
+ files=[TEST_XPROTO, VROUTER_XPROTO],
+ target=TEST_TARGET,
+ output=OUTPUT_DIR,
+ dest_extension="txt",
+ write_to_file="model",
+ )
XOSProcessor.process(args)
- generated_files = [f for f in os.listdir(OUTPUT_DIR) if not f.startswith('.')]
+ generated_files = [f for f in os.listdir(OUTPUT_DIR) if not f.startswith(".")]
self.assertEqual(len(generated_files), 2)
- xosmodel = open(os.path.join(OUTPUT_DIR, 'xosmodel.txt'), "r").read()
- vrouterport = open(os.path.join(OUTPUT_DIR, 'vrouterport.txt'), "r").read()
+ xosmodel = open(os.path.join(OUTPUT_DIR, "xosmodel.txt"), "r").read()
+ vrouterport = open(os.path.join(OUTPUT_DIR, "vrouterport.txt"), "r").read()
self.assertIn("name: XOSModel", xosmodel)
self.assertIn("name: VRouterPort", vrouterport)
@@ -153,58 +183,63 @@
"""
[XOS-GenX] read multiple models as input, print separate files based on +++
"""
- args = XOSProcessorArgs(files = [TEST_XPROTO, VROUTER_XPROTO],
- target = SPLIT_TARGET,
- output = OUTPUT_DIR,
- write_to_file = 'target')
+ args = XOSProcessorArgs(
+ files=[TEST_XPROTO, VROUTER_XPROTO],
+ target=SPLIT_TARGET,
+ output=OUTPUT_DIR,
+ write_to_file="target",
+ )
XOSProcessor.process(args)
- generated_files = [f for f in os.listdir(OUTPUT_DIR) if not f.startswith('.')]
+ generated_files = [f for f in os.listdir(OUTPUT_DIR) if not f.startswith(".")]
self.assertEqual(len(generated_files), 2)
- xosmodel = open(os.path.join(OUTPUT_DIR, 'xosmodel.txt'), "r").read()
- vrouterport = open(os.path.join(OUTPUT_DIR, 'vrouterport.txt'), "r").read()
+ xosmodel = open(os.path.join(OUTPUT_DIR, "xosmodel.txt"), "r").read()
+ vrouterport = open(os.path.join(OUTPUT_DIR, "vrouterport.txt"), "r").read()
self.assertIn("name: XOSModel", xosmodel)
self.assertIn("name: VRouterPort", vrouterport)
def test_skip_django(self):
- args = XOSProcessorArgs(files = [SKIP_DJANGO_XPROTO],
- target = 'django.xtarget',
- output = OUTPUT_DIR,
- dest_extension = 'py',
- write_to_file = 'model')
+ args = XOSProcessorArgs(
+ files=[SKIP_DJANGO_XPROTO],
+ target="django.xtarget",
+ output=OUTPUT_DIR,
+ dest_extension="py",
+ write_to_file="model",
+ )
output = XOSProcessor.process(args)
# should not print a file if options.skip_django = True
- file = OUTPUT_DIR + '/user.py'
+ file = OUTPUT_DIR + "/user.py"
self.assertFalse(os.path.isfile(file))
def test_service_order(self):
- args = XOSProcessorArgs(files = [BASE_XPROTO, TEST_XPROTO, VROUTER_XPROTO],
- target = 'service.xtarget',
- output = OUTPUT_DIR,
- write_to_file = 'target')
+ args = XOSProcessorArgs(
+ files=[BASE_XPROTO, TEST_XPROTO, VROUTER_XPROTO],
+ target="service.xtarget",
+ output=OUTPUT_DIR,
+ write_to_file="target",
+ )
output = XOSProcessor.process(args)
- model = OUTPUT_DIR + '/models.py'
+ model = OUTPUT_DIR + "/models.py"
self.assertTrue(os.path.isfile(model))
line_num = 0
for line in open(model).readlines():
line_num += 1
- if line.find('class XOSBase(models.Model, PlModelMixIn):') >= 0:
+ if line.find("class XOSBase(models.Model, PlModelMixIn):") >= 0:
base_line = line_num
- if line.find('XOSModel(XOSBase):') >= 0:
+ if line.find("XOSModel(XOSBase):") >= 0:
xosmodel_line = line_num
- if line.find('class VRouterPort(XOSBase):') >= 0:
+ if line.find("class VRouterPort(XOSBase):") >= 0:
vrouter_line = line_num
self.assertLess(base_line, xosmodel_line)
self.assertLess(xosmodel_line, vrouter_line)
def test_field_numbers(self):
- args = XOSProcessorArgs(files = [FIELDTEST_XPROTO],
- target = FIELDTEST_TARGET)
+ args = XOSProcessorArgs(files=[FIELDTEST_XPROTO], target=FIELDTEST_TARGET)
output = XOSProcessor.process(args)
def _assert_field(modelname, fieldname, id):
@@ -224,8 +259,9 @@
_assert_field("Slice", "site", 102)
def test_field_numbers(self):
- args = XOSProcessorArgs(files = [REVERSEFIELDTEST_XPROTO],
- target = FIELDTEST_TARGET)
+ args = XOSProcessorArgs(
+ files=[REVERSEFIELDTEST_XPROTO], target=FIELDTEST_TARGET
+ )
output = XOSProcessor.process(args)
def _assert_field(modelname, fieldname, id):
@@ -250,32 +286,31 @@
def test_unfiltered(self):
""" With no include_* args, should get all models """
- args = XOSProcessorArgs(files = [FILTERTEST_XPROTO],
- target = FILTERTEST_TARGET)
+ args = XOSProcessorArgs(files=[FILTERTEST_XPROTO], target=FILTERTEST_TARGET)
output = XOSProcessor.process(args)
self.assertEqual(output, "Model1,Model2,Model3,")
def test_filter_models(self):
""" Should only get models specified by include_models """
- args = XOSProcessorArgs(files = [FILTERTEST_XPROTO],
- target = FILTERTEST_TARGET,
- include_models = ["Model1", "Model3"])
+ args = XOSProcessorArgs(
+ files=[FILTERTEST_XPROTO],
+ target=FILTERTEST_TARGET,
+ include_models=["Model1", "Model3"],
+ )
output = XOSProcessor.process(args)
self.assertEqual(output, "Model1,Model3,")
def test_filter_apps(self):
""" Should only get models whose apps are specified by include_apps """
- args = XOSProcessorArgs(files = [FILTERTEST_XPROTO],
- target = FILTERTEST_TARGET,
- include_apps = ["core"])
+ args = XOSProcessorArgs(
+ files=[FILTERTEST_XPROTO], target=FILTERTEST_TARGET, include_apps=["core"]
+ )
output = XOSProcessor.process(args)
self.assertEqual(output, "Model1,Model2,")
-
-
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_graph.py b/lib/xos-genx/xos-genx-tests/test_graph.py
index fda3d99..c6cfea7 100644
--- a/lib/xos-genx/xos-genx-tests/test_graph.py
+++ b/lib/xos-genx/xos-genx-tests/test_graph.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,10 +17,11 @@
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
+
class XProtoGraphTests(unittest.TestCase):
def test_cross_model(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{% for m in proto.messages %}
{{ m.name }} {
{%- for l in m.links %}
@@ -34,10 +34,10 @@
{% endfor %}
}
{% endfor %}
-""")
-
- proto = \
"""
+ )
+
+ proto = """
message Port (PlCoreBase,ParameterMixin){
required manytoone network->Network:links = 1 [db_index = True, null = False, blank = False];
optional manytoone instance->Instance:ports = 2 [db_index = True, null = True, blank = True];
@@ -116,12 +116,13 @@
args.inputs = proto
args.target = target
output = XOSProcessor.process(args)
- num_semis = output.count(';')
- self.assertGreater(num_semis, 3) # 3 is the number of links, each of which contains at least one field
+ num_semis = output.count(";")
+ self.assertGreater(
+ num_semis, 3
+ ) # 3 is the number of links, each of which contains at least one field
def test_base_class_fields(self):
- target = \
-"""
+ target = """
{% for m in proto.messages %}
{{ m.name }} {
{%- for l in m.links %}
@@ -137,8 +138,7 @@
"""
xtarget = XProtoTestHelpers.write_tmp_target(target)
- proto = \
-"""
+ proto = """
message Port (PlCoreBase,ParameterMixin){
required manytoone network->Network:links = 1 [db_index = True, null = False, blank = False];
optional manytoone instance->Instance:ports = 2 [db_index = True, null = True, blank = True];
@@ -218,19 +218,17 @@
args.target = xtarget
output = XOSProcessor.process(args)
- num_semis = output.count(';')
+ num_semis = output.count(";")
self.assertGreater(num_semis, 3)
def test_from_base(self):
- target = \
-"""
+ target = """
{% for f in xproto_base_fields(proto.messages.3, proto.message_table) %}
{{ f.type }} {{ f.name }};
{% endfor %}
"""
xtarget = XProtoTestHelpers.write_tmp_target(target)
- proto = \
-"""
+ proto = """
message Port (PlCoreBase,ParameterMixin){
required string easter_egg = 1;
required manytoone network->Network:links = 1 [db_index = True, null = False, blank = False];
@@ -309,9 +307,8 @@
args.inputs = proto
args.target = xtarget
output = XOSProcessor.process(args)
- self.assertIn('easter_egg', output)
+ self.assertIn("easter_egg", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_jinja2_base.py b/lib/xos-genx/xos-genx-tests/test_jinja2_base.py
index 4f26ac9..859d640 100644
--- a/lib/xos-genx/xos-genx-tests/test_jinja2_base.py
+++ b/lib/xos-genx/xos-genx-tests/test_jinja2_base.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,6 +28,7 @@
f["options"]["plural"] = plural
return f
+
class Jinja2BaseTests(unittest.TestCase):
def test_xproto_is_true(self):
self.assertTrue(xproto_is_true(True))
@@ -68,14 +68,16 @@
self.assertEqual(xproto_singularize_pluralize(_field("sheep")), "sheep")
self.assertEqual(xproto_singularize_pluralize(_field("slices")), "slices")
self.assertEqual(xproto_singularize_pluralize(_field("networks")), "networks")
- self.assertEqual(xproto_singularize_pluralize(_field("omf_friendlies")), "omf_friendlies")
+ self.assertEqual(
+ xproto_singularize_pluralize(_field("omf_friendlies")), "omf_friendlies"
+ )
# invalid words, should usually return <word>-es
self.assertEqual(xproto_singularize_pluralize(_field("xxx")), "xxxes")
# if a field option is set, use that
- self.assertEqual(xproto_singularize(_field("sheep", singular="turtle")), "turtle")
+ self.assertEqual(
+ xproto_singularize(_field("sheep", singular="turtle")), "turtle"
+ )
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_jinja2_django.py b/lib/xos-genx/xos-genx-tests/test_jinja2_django.py
index ab47443..108ae4e 100644
--- a/lib/xos-genx/xos-genx-tests/test_jinja2_django.py
+++ b/lib/xos-genx/xos-genx-tests/test_jinja2_django.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,103 +16,70 @@
import unittest
from xosgenx.jinja2_extensions.django import *
+
class Jinja2BaseTests(unittest.TestCase):
def test_xproto_optioned_fields_to_list(self):
fields = [
- {
- 'name': 'has_feedback_1',
- 'options': {
- 'feedback_state': 'True',
- }
- },
- {
- 'name': 'has_feedback_2',
- 'options': {
- 'feedback_state': 'True',
- }
- },
- {
- 'name': 'no_feedback',
- 'options': {
- 'feedback_state': 'False',
- }
- }
+ {"name": "has_feedback_1", "options": {"feedback_state": "True"}},
+ {"name": "has_feedback_2", "options": {"feedback_state": "True"}},
+ {"name": "no_feedback", "options": {"feedback_state": "False"}},
]
- res = xproto_optioned_fields_to_list(fields, 'feedback_state', 'True')
+ res = xproto_optioned_fields_to_list(fields, "feedback_state", "True")
self.assertEqual(res, ["has_feedback_1", "has_feedback_2"])
def test_xproto_required_to_django(self):
- field = {
- 'name': 'foo',
- 'options': {
- 'modifier': 'required'
- }
- }
+ field = {"name": "foo", "options": {"modifier": "required"}}
res = map_xproto_to_django(field)
- self.assertEqual(res, {'blank': False, 'null': False})
+ self.assertEqual(res, {"blank": False, "null": False})
def test_xproto_optional_to_django(self):
- field = {
- 'name': 'foo',
- 'options': {
- 'modifier': 'optional'
- }
- }
+ field = {"name": "foo", "options": {"modifier": "optional"}}
res = map_xproto_to_django(field)
- self.assertEqual(res, {'blank': True, 'null': True})
-
+ self.assertEqual(res, {"blank": True, "null": True})
def test_map_xproto_to_django(self):
options = {
- 'help_text': 'bar',
- 'default': 'default_value',
- 'null': True,
- 'db_index': False,
- 'blank': False,
- 'min_value': 16,
- 'max_value': 16
+ "help_text": "bar",
+ "default": "default_value",
+ "null": True,
+ "db_index": False,
+ "blank": False,
+ "min_value": 16,
+ "max_value": 16,
}
- field = {
- 'name': 'foo',
- 'options': options
- }
+ field = {"name": "foo", "options": options}
res = map_xproto_to_django(field)
self.assertEqual(res, options)
def test_format_options_string(self):
- options = {
- 'null': True,
- 'min_value': 16,
- 'max_value': 16
- }
+ options = {"null": True, "min_value": 16, "max_value": 16}
res = format_options_string(options)
- self.assertEqual(res, "null = True, validators=[MaxValueValidator(16), MinValueValidator(16)]")
+ self.assertEqual(
+ res,
+ "null = True, validators=[MaxValueValidator(16), MinValueValidator(16)]",
+ )
- options = {
- 'min_value': 16,
- 'max_value': 16
- }
+ options = {"min_value": 16, "max_value": 16}
res = format_options_string(options)
- self.assertEqual(res, "validators=[MaxValueValidator(16), MinValueValidator(16)]")
+ self.assertEqual(
+ res, "validators=[MaxValueValidator(16), MinValueValidator(16)]"
+ )
- options = {
- 'null': True,
- }
+ options = {"null": True}
res = format_options_string(options)
self.assertEqual(res, "null = True")
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_optimize.py b/lib/xos-genx/xos-genx-tests/test_optimize.py
index e31deb8..c86b736 100644
--- a/lib/xos-genx/xos-genx-tests/test_optimize.py
+++ b/lib/xos-genx/xos-genx-tests/test_optimize.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,81 +16,120 @@
import unittest
from xosgenx.jinja2_extensions.fol2 import FOL2Python
+
class XProtoOptimizeTest(unittest.TestCase):
def setUp(self):
self.f2p = FOL2Python()
- self.maxDiff=None
+ self.maxDiff = None
def test_constant(self):
- input = 'True'
+ input = "True"
output = self.f2p.hoist_outer(input)
self.assertEqual(output, input)
def test_exists(self):
- input = {'exists': ['X',{'|':['X.foo','y']}]}
+ input = {"exists": ["X", {"|": ["X.foo", "y"]}]}
output = self.f2p.hoist_outer(input)
- expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
+ expected = {"|": ["y", {"&": [{"not": "y"}, {"exists": ["X", "X.foo"]}]}]}
self.assertEqual(output, expected)
-
+
def test_exists_implies(self):
- input = {'exists': ['Foo', {'&': [{'=': ('Foo.a', '1')}, {'->': ['write_access', {'=': ('Foo.b', '1')}]}]}]}
+ input = {
+ "exists": [
+ "Foo",
+ {
+ "&": [
+ {"=": ("Foo.a", "1")},
+ {"->": ["write_access", {"=": ("Foo.b", "1")}]},
+ ]
+ },
+ ]
+ }
output = self.f2p.hoist_outer(input)
- expected = {'|': [{'&': ['write_access', {'exists': ['Foo', {'&': [{'=': ['Foo.a', '1']}, {'=': ['Foo.b', '1']}]}]}]}, {'&': [{'not': 'write_access'}, {'exists': ['Foo', {'=': ['Foo.a', '1']}]}]}]}
+ expected = {
+ "|": [
+ {
+ "&": [
+ "write_access",
+ {
+ "exists": [
+ "Foo",
+ {"&": [{"=": ["Foo.a", "1"]}, {"=": ["Foo.b", "1"]}]},
+ ]
+ },
+ ]
+ },
+ {
+ "&": [
+ {"not": "write_access"},
+ {"exists": ["Foo", {"=": ["Foo.a", "1"]}]},
+ ]
+ },
+ ]
+ }
self.assertEqual(output, expected)
def test_forall(self):
- input = {'forall': ['X',{'|':['X.foo','y']}]}
+ input = {"forall": ["X", {"|": ["X.foo", "y"]}]}
output = self.f2p.hoist_outer(input)
- expected = {'|': ['y', {'&': [{'not': 'y'}, {'forall': ['X', 'X.foo']}]}]}
+ expected = {"|": ["y", {"&": [{"not": "y"}, {"forall": ["X", "X.foo"]}]}]}
self.assertEqual(output, expected)
def test_exists_embedded(self):
- input = {'&':['True',{'exists': ['X',{'|':['X.foo','y']}]}]}
+ input = {"&": ["True", {"exists": ["X", {"|": ["X.foo", "y"]}]}]}
output = self.f2p.hoist_outer(input)
- expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', 'X.foo']}]}]}
+ expected = {"|": ["y", {"&": [{"not": "y"}, {"exists": ["X", "X.foo"]}]}]}
self.assertEqual(output, expected)
-
+
def test_exists_equals(self):
- input = {'&':['True',{'exists': ['X',{'|':['X.foo',{'=':['y','z']}]}]}]}
+ input = {"&": ["True", {"exists": ["X", {"|": ["X.foo", {"=": ["y", "z"]}]}]}]}
output = self.f2p.hoist_outer(input)
- expected = {'|': [{'=': ['y', 'z']}, {'&': [{'not': {'=': ['y', 'z']}}, {'exists': ['X', 'X.foo']}]}]}
+ expected = {
+ "|": [
+ {"=": ["y", "z"]},
+ {"&": [{"not": {"=": ["y", "z"]}}, {"exists": ["X", "X.foo"]}]},
+ ]
+ }
self.assertEqual(output, expected)
def test_exists_nested_constant(self):
- input = {'&':['True',{'exists': ['X',{'|':['y',{'=':['y','X.foo']}]}]}]}
+ input = {"&": ["True", {"exists": ["X", {"|": ["y", {"=": ["y", "X.foo"]}]}]}]}
output = self.f2p.hoist_outer(input)
- expected = {'|': ['y', {'&': [{'not': 'y'}, {'exists': ['X', {'=': ['False', 'X.foo']}]}]}]}
+ expected = {
+ "|": [
+ "y",
+ {"&": [{"not": "y"}, {"exists": ["X", {"=": ["False", "X.foo"]}]}]},
+ ]
+ }
self.assertEqual(output, expected)
def test_exists_nested(self):
- input = {'exists': ['X',{'exists':['Y',{'=':['Y.foo','X.foo']}]}]}
+ input = {"exists": ["X", {"exists": ["Y", {"=": ["Y.foo", "X.foo"]}]}]}
output = self.f2p.hoist_outer(input)
expected = input
self.assertEqual(input, output)
def test_exists_nested2(self):
- input = {'exists': ['X',{'exists':['Y',{'=':['Z','Y']}]}]}
+ input = {"exists": ["X", {"exists": ["Y", {"=": ["Z", "Y"]}]}]}
output = self.f2p.hoist_outer(input)
- expected = {'exists': ['Y', {'=': ['Z', 'Y']}]}
+ expected = {"exists": ["Y", {"=": ["Z", "Y"]}]}
self.assertEqual(output, expected)
def test_exists_nested3(self):
- input = {'exists': ['X',{'exists':['Y',{'=':['Z','X']}]}]}
+ input = {"exists": ["X", {"exists": ["Y", {"=": ["Z", "X"]}]}]}
output = self.f2p.hoist_outer(input)
- expected = {'exists': ['X', {'=': ['Z', 'X']}]}
+ expected = {"exists": ["X", {"=": ["Z", "X"]}]}
self.assertEqual(output, expected)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_package.py b/lib/xos-genx/xos-genx-tests/test_package.py
index f1cf426..03911bd 100644
--- a/lib/xos-genx/xos-genx-tests/test_package.py
+++ b/lib/xos-genx/xos-genx-tests/test_package.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,18 +18,19 @@
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
+
class XProtoPackageTest(unittest.TestCase):
def test_package_fqn(self):
args = XOSProcessorArgs()
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{% for m in proto.messages %}
{{ m.name }},{{ m.package }},{{ m.fqn }}
{% endfor %}
-""")
-
- xproto =\
"""
+ )
+
+ xproto = """
package xos.core;
message Port (PlCoreBase,ParameterMixin) {
@@ -48,11 +48,11 @@
output = XOSProcessor.process(args)
- self.assertIn('Port,xos.core,xos.core.Port', output)
+ self.assertIn("Port,xos.core,xos.core.Port", output)
def test_cross_model(self):
- target = XProtoTestHelpers.write_tmp_target( \
-"""
+ target = XProtoTestHelpers.write_tmp_target(
+ """
{% for m in proto.messages %}
{{ m.fqn }} {
{%- for l in m.links %}
@@ -75,10 +75,10 @@
{% endfor %}
}
{% endfor %}
-""")
-
- xproto = \
"""
+ )
+
+ xproto = """
package xos.network;
message Port (PlCoreBase,ParameterMixin){
@@ -164,13 +164,17 @@
args.target = target
output = XOSProcessor.process(args)
- self.assertIn('numberCores', output) # Instance showed up via cross-package call
- self.assertIn('ip;', output) # Network showed up via cross-package call
- self.assertIn('max_instances', output) # Slice showed up via implicit in-package call
+ self.assertIn(
+ "numberCores", output
+ ) # Instance showed up via cross-package call
+ self.assertIn("ip;", output) # Network showed up via cross-package call
+ self.assertIn(
+ "max_instances", output
+ ) # Slice showed up via implicit in-package call
def test_base_class_fields(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{% for m in proto.messages %}
{{ m.name }} {
{%- for b in m.bases %}
@@ -183,10 +187,10 @@
{% endfor %}
}
{% endfor %}
-""")
-
- xproto =\
"""
+ )
+
+ xproto = """
package xos.network;
message Port (PlCoreBase,ParameterMixin){
@@ -225,18 +229,18 @@
args.target = target
output = XOSProcessor.process(args)
- self.assertIn('xos_created', output)
+ self.assertIn("xos_created", output)
def test_from_base(self):
- target = XProtoTestHelpers.write_tmp_target( \
-"""
+ target = XProtoTestHelpers.write_tmp_target(
+ """
{% for f in xproto_base_fields(proto.messages.3, proto.message_table) %}
{{ f.type }} {{ f.name }};
{% endfor %}
-""")
-
- xproto =\
"""
+ )
+
+ xproto = """
option app_name = "firstapp";
message Port (PlCoreBase,ParameterMixin){
@@ -326,21 +330,21 @@
args.target = target
output = XOSProcessor.process(args)
- self.assertIn('easter_egg', output)
+ self.assertIn("easter_egg", output)
def test_model_options(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
Options:
{{ proto.options }}
{% for m in proto.messages %}
{{ m.options.app_name }}
{% endfor %}
-""")
-
- xproto =\
"""
+ )
+
+ xproto = """
option app_name = "firstapp";
message Port (PlCoreBase,ParameterMixin){
@@ -426,17 +430,15 @@
required manytomany tags->Tag = 18 [db_index = False, null = False, blank = True];
}
"""
-
+
args = XOSProcessorArgs()
args.inputs = xproto
args.target = target
output = XOSProcessor.process(args)
- self.assertEqual(output.count('firstapp'), 2)
- self.assertEqual(output.count('networkapp'), 2)
+ self.assertEqual(output.count("firstapp"), 2)
+ self.assertEqual(output.count("networkapp"), 2)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_parse.py b/lib/xos-genx/xos-genx-tests/test_parse.py
index d7edcb7..8d1ccf5 100644
--- a/lib/xos-genx/xos-genx-tests/test_parse.py
+++ b/lib/xos-genx/xos-genx-tests/test_parse.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,13 +17,13 @@
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
+
class XProtoParseTests(unittest.TestCase):
def test_global_options(self):
xtarget = XProtoTestHelpers.write_tmp_target("{{ options }}")
- xproto = \
-"""
+ xproto = """
option kind = "vsg";
option verbose_name = "vSG Service";
"""
@@ -38,8 +37,7 @@
def test_basic_proto(self):
xtarget = XProtoTestHelpers.write_tmp_target("{{ proto }}")
- xproto = \
-"""
+ xproto = """
message Person {
required string name = 1;
required int32 id = 2; // Unique ID number for this person.
@@ -66,8 +64,7 @@
def test_link_extensions(self):
xtarget = XProtoTestHelpers.write_tmp_target("{{ proto.messages.0.links }}")
- xproto = \
-"""
+ xproto = """
message links {
required manytoone vrouter_service->VRouterService:device_ports = 4 [db_index = True, null = False, blank = False];
}
@@ -79,9 +76,10 @@
self.assertIn("VRouterService", output)
def test_through_extensions(self):
- xtarget = XProtoTestHelpers.write_tmp_target("{{ proto.messages.0.links.0.through }}")
- xproto = \
-"""
+ xtarget = XProtoTestHelpers.write_tmp_target(
+ "{{ proto.messages.0.links.0.through }}"
+ )
+ xproto = """
message links {
required manytomany vrouter_service->VRouterService/ServiceProxy:device_ports = 4 [db_index = True, null = False, blank = False];
}
@@ -93,9 +91,10 @@
self.assertIn("ServiceProxy", output)
def test_message_options(self):
- xtarget = XProtoTestHelpers.write_tmp_target("{{ proto.messages.0.options.type }}")
- xproto = \
-"""
+ xtarget = XProtoTestHelpers.write_tmp_target(
+ "{{ proto.messages.0.options.type }}"
+ )
+ xproto = """
message link {
option type = "e1000";
}
@@ -108,8 +107,7 @@
def test_message_base(self):
xtarget = XProtoTestHelpers.write_tmp_target("{{ proto.messages.0.bases }}")
- xproto = \
-"""
+ xproto = """
message base(Base) {
}
"""
@@ -120,7 +118,6 @@
output = XOSProcessor.process(args)
self.assertIn("Base", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_policy.py b/lib/xos-genx/xos-genx-tests/test_policy.py
index d83ab6c..e8b5a76 100644
--- a/lib/xos-genx/xos-genx-tests/test_policy.py
+++ b/lib/xos-genx/xos-genx-tests/test_policy.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,10 +23,10 @@
into Python, set up an appropriate environment and execute the Python.
"""
+
class XProtoPolicyTest(unittest.TestCase):
def test_annotation(self):
- xproto = \
-"""
+ xproto = """
policy true_policy < True >
message always::true_policy {
@@ -45,8 +44,7 @@
self.assertIn("true_policy", output)
def test_constant(self):
- xproto = \
-"""
+ xproto = """
policy true_policy < True >
"""
@@ -56,12 +54,11 @@
args.inputs = xproto
args.target = target
- output = XOSProcessor.process(args).replace('t','T')
- self.assertTrue(eval(output))
+ output = XOSProcessor.process(args).replace("t", "T")
+ self.assertTrue(eval(output))
def test_function_term(self):
- xproto = \
-"""
+ xproto = """
policy slice_user < slice.user.compute_is_admin() >
"""
@@ -71,7 +68,7 @@
args.target = target
output = XOSProcessor.process(args)
-
+
slice = FakeObject()
slice.user = FakeObject()
slice.user.compute_is_admin = lambda: True
@@ -80,8 +77,7 @@
self.assertTrue(expr)
def test_term(self):
- xproto = \
-"""
+ xproto = """
policy slice_user < slice.user.is_admin >
"""
@@ -91,7 +87,7 @@
args.target = target
output = XOSProcessor.process(args)
-
+
slice = FakeObject()
slice.user = FakeObject()
slice.user.is_admin = True
@@ -100,8 +96,7 @@
self.assertTrue(expr)
def test_num_constant(self):
- xproto = \
-"""
+ xproto = """
policy slice_user < slice.user.age = 57 >
"""
@@ -111,7 +106,7 @@
args.target = target
output = XOSProcessor.process(args)
-
+
slice = FakeObject()
slice.user = FakeObject()
slice.user.is_admin = True
@@ -120,8 +115,7 @@
self.assertTrue(expr)
def test_string_constant(self):
- xproto = \
-"""
+ xproto = """
policy slice_user < slice.user.email = "admin@opencord.org" >
"""
@@ -131,7 +125,7 @@
args.target = target
output = XOSProcessor.process(args)
-
+
slice = FakeObject()
slice.user = FakeObject()
slice.user.is_admin = True
@@ -140,8 +134,7 @@
self.assertTrue(expr)
def test_equal(self):
- xproto = \
-"""
+ xproto = """
policy slice_user < slice.user = obj.user >
"""
@@ -151,20 +144,19 @@
args.target = target
output = XOSProcessor.process(args)
-
+
slice = FakeObject()
- slice.user = 'twin'
+ slice.user = "twin"
obj = FakeObject()
- obj.user = 'twin'
+ obj.user = "twin"
(op, operands), = eval(output).items()
- expr = op.join(operands).replace('=','==')
+ expr = op.join(operands).replace("=", "==")
self.assertTrue(eval(expr))
def test_bin(self):
- xproto = \
-"""
+ xproto = """
policy slice_admin < slice.is_admin | obj.empty >
"""
target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.slice_admin }}")
@@ -180,13 +172,12 @@
obj.empty = []
(op, operands), = eval(output).items()
- expr = op.join(operands).replace('|',' or ')
+ expr = op.join(operands).replace("|", " or ")
self.assertFalse(eval(expr))
def test_implies(self):
- xproto = \
-"""
+ xproto = """
policy implies < obj.name -> obj.creator >
"""
target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.implies }}")
@@ -199,17 +190,16 @@
slice = FakeObject()
slice.is_admin = False
obj = FakeObject()
- obj.name = 'Thing 1'
+ obj.name = "Thing 1"
obj.creator = None
(op, operands), = eval(output).items()
- expr = 'not ' + op.join(operands).replace('->',' or ')
+ expr = "not " + op.join(operands).replace("->", " or ")
self.assertFalse(eval(expr))
-
+
def test_exists(self):
- xproto = \
-"""
+ xproto = """
policy privilege < exists Privilege: Privilege.object_id = obj.id >
"""
@@ -219,7 +209,7 @@
args.target = target
output = XOSProcessor.process(args)
-
+
Privilege = FakeObject()
Privilege.object_id = 1
obj = FakeObject()
@@ -227,49 +217,49 @@
(op, operands), = eval(output).items()
(op2, operands2), = operands[1].items()
- expr = op2.join(operands2).replace('=','==')
+ expr = op2.join(operands2).replace("=", "==")
self.assertTrue(eval(expr))
def test_policy_function(self):
- xproto = \
-"""
+ xproto = """
policy slice_policy < exists Privilege: Privilege.object_id = obj.id >
policy network_slice_policy < *slice_policy(slice) >
"""
- target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.network_slice_policy }} ")
+ target = XProtoTestHelpers.write_tmp_target(
+ "{{ proto.policies.network_slice_policy }} "
+ )
args = XOSProcessorArgs()
args.inputs = xproto
args.target = target
output = XOSProcessor.process(args)
-
+
(op, operands), = eval(output).items()
- self.assertIn('slice_policy', operands)
- self.assertIn('slice', operands)
+ self.assertIn("slice_policy", operands)
+ self.assertIn("slice", operands)
def test_policy_missing_function(self):
- xproto = \
-"""
+ xproto = """
policy slice_policy < exists Privilege: Privilege.object_id = obj.id >
policy network_slice_policy < *slice_policyX(slice) >
"""
- target = XProtoTestHelpers.write_tmp_target("{{ proto.policies.network_slice_policy }} ")
+ target = XProtoTestHelpers.write_tmp_target(
+ "{{ proto.policies.network_slice_policy }} "
+ )
args = XOSProcessorArgs()
args.inputs = xproto
args.target = target
with self.assertRaises(Exception):
output = XOSProcessor.process(args)
-
def test_forall(self):
# This one we only parse
- xproto = \
-"""
+ xproto = """
policy instance < forall Instance: exists Credential: Credential.obj_id = Instance.obj_id >
"""
@@ -282,10 +272,8 @@
output = XOSProcessor.process(args)
(op, operands), = eval(output).items()
- self.assertEqual(op,'forall')
+ self.assertEqual(op, "forall")
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_pure_proto.py b/lib/xos-genx/xos-genx-tests/test_pure_proto.py
index ade4957..c4f680d 100644
--- a/lib/xos-genx/xos-genx-tests/test_pure_proto.py
+++ b/lib/xos-genx/xos-genx-tests/test_pure_proto.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,16 +13,16 @@
# limitations under the License.
-
import unittest
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
# Generate from xproto, then generate from equivalent proto
+
+
class XPureProtobufGenerator(unittest.TestCase):
def test_pure_proto(self):
- xproto = \
-"""
+ xproto = """
message VRouterPort (XOSBase){
optional string name = 1 [help_text = "port friendly name", max_length = 20, null = True, db_index = False, blank = True];
required string openflow_id = 2 [help_text = "port identifier in ONOS", max_length = 21, null = False, db_index = False, blank = False];
@@ -32,8 +31,7 @@
}
"""
- proto = \
-"""
+ proto = """
message VRouterPort {
option bases = "XOSBase";
optional string name = 1 [ null = "True", max_length = "20", blank = "True", help_text = "port friendly name", modifier = "optional", db_index = "False" ];
@@ -42,8 +40,8 @@
required int32 vrouter_service = 4 [ null = "False", blank = "False", model = "VRouterService", modifier = "required", type = "link", port = "device_ports", db_index = "True", link = "manytoone"];
}
"""
- target = XProtoTestHelpers.write_tmp_target(
-"""
+ target = XProtoTestHelpers.write_tmp_target(
+ """
from header import *
{% for m in proto.messages %}
{% if file_exists(xproto_base_name(m.name)|lower+'_header.py') -%}from {{xproto_base_name(m.name)|lower }}_header import *{% endif %}
@@ -81,53 +79,52 @@
{% if file_exists(xproto_base_name(m.name)|lower+'_bottom.py') -%}{{ include_file(xproto_base_name(m.name)|lower+'_bottom.py') }}{% endif %}
{% endfor %}
-""")
+"""
+ )
- args_xproto = XOSProcessorArgs()
- args_xproto.inputs = xproto
- args_xproto.target = target
- xproto_gen = XOSProcessor.process(args_xproto)
+ args_xproto = XOSProcessorArgs()
+ args_xproto.inputs = xproto
+ args_xproto.target = target
+ xproto_gen = XOSProcessor.process(args_xproto)
- count1 = len(xproto_gen.split('\n'))
+ count1 = len(xproto_gen.split("\n"))
- args_proto = XOSProcessorArgs()
- args_proto.inputs = proto
- args_proto.target = target
- args_proto.rev = True
- proto_gen = XOSProcessor.process(args_proto)
- count2 = len(proto_gen.split('\n'))
+ args_proto = XOSProcessorArgs()
+ args_proto.inputs = proto
+ args_proto.target = target
+ args_proto.rev = True
+ proto_gen = XOSProcessor.process(args_proto)
+ count2 = len(proto_gen.split("\n"))
- self.assertEqual(count1, count2)
+ self.assertEqual(count1, count2)
def test_pure_policies(self):
- xproto = \
-"""
+ xproto = """
policy my_policy < exists x:a=b >
"""
- proto = \
-"""
+ proto = """
option my_policy = "policy:< exists x:a=b >";
"""
- target = XProtoTestHelpers.write_tmp_target(
-"""
+ target = XProtoTestHelpers.write_tmp_target(
+ """
{{ policies }}
-""")
+"""
+ )
- args_xproto = XOSProcessorArgs()
- args_xproto.inputs = xproto
- args_xproto.target = target
- xproto_gen = XOSProcessor.process(args_xproto)
+ args_xproto = XOSProcessorArgs()
+ args_xproto.inputs = xproto
+ args_xproto.target = target
+ xproto_gen = XOSProcessor.process(args_xproto)
- args_proto = XOSProcessorArgs()
- args_proto.inputs = proto
- args_proto.target = target
- args_proto.rev = True
- proto_gen = XOSProcessor.process(args_proto)
+ args_proto = XOSProcessorArgs()
+ args_proto.inputs = proto
+ args_proto.target = target
+ args_proto.rev = True
+ proto_gen = XOSProcessor.process(args_proto)
- self.assertEqual(proto_gen, xproto_gen)
+ self.assertEqual(proto_gen, xproto_gen)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_rlinks.py b/lib/xos-genx/xos-genx-tests/test_rlinks.py
index 635c81f..c0ad406 100644
--- a/lib/xos-genx/xos-genx-tests/test_rlinks.py
+++ b/lib/xos-genx/xos-genx-tests/test_rlinks.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,18 +17,20 @@
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import XProtoTestHelpers
+
class XProtoRlinkTests(unittest.TestCase):
def test_proto_generator(self):
- target = XProtoTestHelpers.write_tmp_target("""
+ target = XProtoTestHelpers.write_tmp_target(
+ """
{% for m in proto.messages %}
{% for r in m.rlinks %}
{{ r }}
{% endfor %}
{% endfor %}
-""")
-
- xproto = \
"""
+ )
+
+ xproto = """
message VRouterPort (PlCoreBase){
optional string name = 1 [help_text = "port friendly name", max_length = 20, null = True, db_index = False, blank = True];
required string openflow_id = 2 [help_text = "port identifier in ONOS", max_length = 21, null = False, db_index = False, blank = False];
@@ -60,7 +61,6 @@
self.assertIn("'src_port': 'device_ports'", output)
self.assertIn("'src_port': 'ports'", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_swagger.py b/lib/xos-genx/xos-genx-tests/test_swagger.py
index 00b0684..3af997e 100644
--- a/lib/xos-genx/xos-genx-tests/test_swagger.py
+++ b/lib/xos-genx/xos-genx-tests/test_swagger.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,11 +19,12 @@
from xosgenx.generator import XOSProcessor, XOSProcessorArgs
from helpers import OUTPUT_DIR
+
class Args:
pass
-class XOSProcessorTest(unittest.TestCase):
+class XOSProcessorTest(unittest.TestCase):
def test_swagger_target(self):
"""
[XOS-GenX] The swagger xtarget should generate the appropriate json
@@ -32,10 +32,9 @@
# xosgenx --output . --target xosgenx/targets/swagger.xtarget --write-to-file single --dest-file swagger.yaml ../../xos/core/models/core.xproto
# http-server --cors Users/teone/Sites/opencord/orchestration/xos/lib/xos-genx/
- xproto = \
- """
+ xproto = """
option app_label = "core";
-
+
message Instance::instance_policy (XOSBase) {
option validators = "instance_creator:Instance has no creator, instance_isolation: Container instance {obj.name} must use container image, instance_isolation_container_vm_parent:Container-vm instance {obj.name} must have a parent, instance_parent_isolation_container_vm:Parent field can only be set on Container-vm instances ({obj.name}), instance_isolation_vm: VM Instance {obj.name} must use VM image, instance_creator_privilege: instance creator has no privileges on slice";
optional string instance_id = 1 [max_length = 200, content_type = "stripped", blank = True, help_text = "Nova instance id", null = True, db_index = False];
@@ -58,7 +57,7 @@
"""
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'swagger.xtarget'
+ args.target = "swagger.xtarget"
args.output = OUTPUT_DIR
args.write_to_file = "single"
args.dest_file = "swagger.yaml"
@@ -68,5 +67,6 @@
self.assertIn("/xosapi/v1/core/instances/{id}:", output)
self.assertIn("Instance:", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_target.py b/lib/xos-genx/xos-genx-tests/test_target.py
index d729be7..c468018 100644
--- a/lib/xos-genx/xos-genx-tests/test_target.py
+++ b/lib/xos-genx/xos-genx-tests/test_target.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,24 +22,25 @@
TEST_OUTPUT = "Do re mi fa so la ti do"
-class XProtoTargetTests(unittest.TestCase):
+class XProtoTargetTests(unittest.TestCase):
def setUp(self):
- test_file = open(os.path.join(OUTPUT_DIR, TEST_FILE), 'w')
+ test_file = open(os.path.join(OUTPUT_DIR, TEST_FILE), "w")
test_file.write(TEST_OUTPUT)
test_file.close()
def test_file_methods(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{%% if file_exists("%s") %%}
{{ include_file("%s") }}
{%% endif %%}
-"""%(TEST_FILE, TEST_FILE)
+"""
+ % (TEST_FILE, TEST_FILE)
)
args = XOSProcessorArgs()
- args.inputs = ''
+ args.inputs = ""
args.target = target
args.attic = OUTPUT_DIR
output = XOSProcessor.process(args)
@@ -48,30 +48,31 @@
def test_xproto_lib(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{{ xproto_first_non_empty([None, None, None, None, None, None, "Eureka"]) }}
-""")
+"""
+ )
args = XOSProcessorArgs()
- args.inputs = ''
+ args.inputs = ""
args.target = target
output = XOSProcessor.process(args)
self.assertIn("Eureka", output)
def test_context(self):
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{{ context.what }}
-""")
+"""
+ )
args = XOSProcessorArgs()
- args.inputs = ''
+ args.inputs = ""
args.target = target
- args.kv='what:what is what'
+ args.kv = "what:what is what"
output = XOSProcessor.process(args)
self.assertIn("what is what", output)
def test_singularize(self):
- proto = \
-"""
+ proto = """
message TestSingularize {
// The following field has an explicitly specified singular
required int many = 1 [singular = "one"];
@@ -84,20 +85,22 @@
"""
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{% for m in proto.messages.0.fields -%}
{{ xproto_singularize(m) }},
{%- endfor %}
-""")
+"""
+ )
args = XOSProcessorArgs()
args.inputs = proto
args.target = target
output = XOSProcessor.process(args)
- self.assertEqual("one,sheep,slice,network,omf_friendly", output.lstrip().rstrip().rstrip(','))
+ self.assertEqual(
+ "one,sheep,slice,network,omf_friendly", output.lstrip().rstrip().rstrip(",")
+ )
def test_pluralize(self):
- proto = \
-"""
+ proto = """
message TestPluralize {
// The following field has an explicitly specified plural
required int anecdote = 1 [plural = "data"];
@@ -110,18 +113,21 @@
"""
target = XProtoTestHelpers.write_tmp_target(
-"""
+ """
{% for m in proto.messages.0.fields -%}
{{ xproto_pluralize(m) }},
{%- endfor %}
-""")
+"""
+ )
args = XOSProcessorArgs()
args.inputs = proto
args.target = target
output = XOSProcessor.process(args)
- self.assertEqual("data,sheep,slices,networks,omf_friendlies", output.lstrip().rstrip().rstrip(','))
+ self.assertEqual(
+ "data,sheep,slices,networks,omf_friendlies",
+ output.lstrip().rstrip().rstrip(","),
+ )
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_tosca.py b/lib/xos-genx/xos-genx-tests/test_tosca.py
index 270d126..c5a0f17 100644
--- a/lib/xos-genx/xos-genx-tests/test_tosca.py
+++ b/lib/xos-genx/xos-genx-tests/test_tosca.py
@@ -18,7 +18,6 @@
class XProtoToscaTypeTest(unittest.TestCase):
-
def setUp(self):
self.target_tosca_type = XProtoTestHelpers.write_tmp_target(
"""
@@ -27,13 +26,14 @@
{{ xproto_tosca_field_type(f.type) }}
{% endfor -%}
{% endfor -%}
- """)
+ """
+ )
+
def test_tosca_fields(self):
"""
[XOS-GenX] should convert xproto types to tosca know types
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message Foo {
@@ -47,26 +47,26 @@
args.inputs = xproto
args.target = self.target_tosca_type
output = XOSProcessor.process(args)
- self.assertIn('string', output)
- self.assertIn('boolean', output)
- self.assertIn('integer', output)
+ self.assertIn("string", output)
+ self.assertIn("boolean", output)
+ self.assertIn("integer", output)
+
class XProtoToscaKeyTest(unittest.TestCase):
-
def setUp(self):
self.target_tosca_keys = XProtoTestHelpers.write_tmp_target(
"""
{%- for m in proto.messages %}
{{ xproto_fields_to_tosca_keys(m.fields, m) }}
{% endfor -%}
- """)
+ """
+ )
def test_xproto_fields_to_tosca_keys_default(self):
"""
[XOS-GenX] if no "tosca_key" is specified, and a name attribute is present in the model, use that
"""
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -78,16 +78,15 @@
args.inputs = xproto
args.target = self.target_tosca_keys
output = XOSProcessor.process(args)
- self.assertIn('name', output)
+ self.assertIn("name", output)
def test_xproto_fields_to_tosca_keys_custom(self):
"""
[XOS-GenX] if "tosca_key" is specified, use it
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
-
+
message Foo {
required string name = 1 [ null = "False", blank="False"];
required string key_1 = 2 [ null = "False", blank="False", tosca_key=True];
@@ -99,16 +98,15 @@
args.inputs = xproto
args.target = self.target_tosca_keys
output = XOSProcessor.process(args)
- self.assertNotIn('name', output)
- self.assertIn('key_1', output)
- self.assertIn('key_2', output)
+ self.assertNotIn("name", output)
+ self.assertIn("key_1", output)
+ self.assertIn("key_2", output)
def test_xproto_fields_link_to_tosca_keys_custom(self):
"""
[XOS-GenX] if "tosca_key" is specified, use it
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message Foo {
@@ -121,21 +119,20 @@
args.inputs = xproto
args.target = self.target_tosca_keys
output = XOSProcessor.process(args)
- self.assertNotIn('name', output)
- self.assertIn('provider_service_instance_id', output)
+ self.assertNotIn("name", output)
+ self.assertIn("provider_service_instance_id", output)
def test_xproto_model_to_oneof_key(self):
"""
[XOS-GenX] in some models we need to have a combine key on variable fields, for example, keys can be subscriber_service_id + oneof(provider_service_id, provider_network_id)
"""
- xproto = \
- """
+ xproto = """
option app_label = "test";
message Foo {
-
+
option tosca_key = "key1, oneof(key_2, key_3)";
-
+
required string name = 1 [ null = "False", blank="False"];
required string key_1 = 2 [ null = "False", blank="False", tosca_key_one_of = "key_2"];
required string key_2 = 3 [ null = "False", blank="False", tosca_key_one_of = "key_1"];
@@ -150,8 +147,7 @@
output = XOSProcessor.process(args)
self.assertIn("['name', ['key_4', 'key_3'], ['key_1', 'key_2']]", output)
- xproto = \
- """
+ xproto = """
option app_label = "test";
message Foo {
diff --git a/lib/xos-genx/xos-genx-tests/test_translator.py b/lib/xos-genx/xos-genx-tests/test_translator.py
index f3476cd..320021b 100644
--- a/lib/xos-genx/xos-genx-tests/test_translator.py
+++ b/lib/xos-genx/xos-genx-tests/test_translator.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,20 +27,23 @@
required int32 vrouter_service = 4 [ null = "False", blank = "False", model = "VRouterService", modifier = "required", type = "link", port = "device_ports", link_type = "manytoone", db_index = "True" ];
}
"""
-VROUTER_XPROTO = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto")
+VROUTER_XPROTO = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/xproto/vrouterport.xproto"
+)
# Generate other formats from xproto
+
+
class XProtoTranslatorTest(unittest.TestCase):
def _test_proto_generator(self):
args = XOSProcessorArgs()
args.files = [VROUTER_XPROTO]
- args.target = 'proto.xtarget'
+ args.target = "proto.xtarget"
output = XOSProcessor.process(args)
self.assertEqual(output, PROTO_EXPECTED_OUTPUT)
def test_yaml_generator(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Port (PlCoreBase,ParameterMixin){
@@ -120,15 +122,14 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
yaml_ir = yaml.load(output)
- self.assertEqual(len(yaml_ir['items']), 4)
+ self.assertEqual(len(yaml_ir["items"]), 4)
def test_gui_hidden_models(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -143,16 +144,15 @@
"""
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
yaml_ir = yaml.load(output)
- self.assertEqual(len(yaml_ir['items']), 1)
- self.assertIn('Bar', output)
- self.assertNotIn('Foo', output)
+ self.assertEqual(len(yaml_ir["items"]), 1)
+ self.assertIn("Bar", output)
+ self.assertNotIn("Foo", output)
def test_gui_hidden_model_fields(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -162,16 +162,15 @@
"""
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
yaml_ir = yaml.load(output)
- self.assertEqual(len(yaml_ir['items']), 1)
- self.assertIn('name', output)
- self.assertNotIn('secret', output)
+ self.assertEqual(len(yaml_ir["items"]), 1)
+ self.assertIn("name", output)
+ self.assertNotIn("secret", output)
def test_static_options(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -182,14 +181,13 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
self.assertIn("options:", output)
self.assertIn(" {'id': 'container_vm', 'label': 'Container In VM'}", output)
def test_not_static_options(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -199,13 +197,12 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
self.assertNotIn("options:", output)
def test_default_value_in_modeldef(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -219,7 +216,7 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
self.assertIn('default: "bar"', output)
self.assertIn('default: "false"', output)
@@ -228,8 +225,7 @@
self.assertIn('default: "0"', output)
def test_not_default_value_in_modeldef(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -239,13 +235,12 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
- self.assertNotIn('default:', output)
+ self.assertNotIn("default:", output)
def test_one_to_many_in_modeldef(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message ServiceDependency {
@@ -260,19 +255,28 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
# Service deps model
- self.assertIn('{model: Service, type: manytoone, on_field: provider_service}', output)
- self.assertIn('{model: Service, type: manytoone, on_field: provider_service}', output)
+ self.assertIn(
+ "{model: Service, type: manytoone, on_field: provider_service}", output
+ )
+ self.assertIn(
+ "{model: Service, type: manytoone, on_field: provider_service}", output
+ )
# Service model
- self.assertIn('{model: ServiceDependency, type: onetomany, on_field: provider_service}', output)
- self.assertIn('{model: ServiceDependency, type: onetomany, on_field: provider_service}', output)
+ self.assertIn(
+ "{model: ServiceDependency, type: onetomany, on_field: provider_service}",
+ output,
+ )
+ self.assertIn(
+ "{model: ServiceDependency, type: onetomany, on_field: provider_service}",
+ output,
+ )
def test_model_description(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -288,13 +292,12 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
self.assertIn('description: "This is the Foo model"', output)
def test_model_verbose_name(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message Foo {
@@ -310,13 +313,12 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
self.assertIn('verbose_name: "Verbose Foo Name"', output)
def test_feedback_field(self):
- xproto = \
-"""
+ xproto = """
option app_label = "test";
message ParentFoo {
@@ -330,13 +332,12 @@
args = XOSProcessorArgs()
args.inputs = xproto
- args.target = 'modeldefs.xtarget'
+ args.target = "modeldefs.xtarget"
output = XOSProcessor.process(args)
- read_only = filter(lambda s: 'read_only: True' in s, output.splitlines())
- self.assertEqual(len(read_only), 3) # readonly is 1 for ParentFoo and 2 for Foo
+ read_only = filter(lambda s: "read_only: True" in s, output.splitlines())
+ self.assertEqual(len(read_only), 3) # readonly is 1 for ParentFoo and 2 for Foo
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
-
-
diff --git a/lib/xos-genx/xos-genx-tests/test_xos_security.py b/lib/xos-genx/xos-genx-tests/test_xos_security.py
index 766e102..3bd4653 100644
--- a/lib/xos-genx/xos-genx-tests/test_xos_security.py
+++ b/lib/xos-genx/xos-genx-tests/test_xos_security.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,25 +20,32 @@
"""The function below is for eliminating warnings arising due to the missing policy_output_enforcer,
which is generated and loaded dynamically.
"""
+
+
def policy_output_enforcer(x, y):
raise Exception("Security enforcer not generated. Test failed.")
return False
+
"""
-The tests below use the Python code target to generate
+The tests below use the Python code target to generate
Python security policies, set up an appropriate environment and execute the Python.
The security policies here deliberately made complex in order to stress the processor.
"""
+
+
class XProtoXOSSecurityTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}")
+ self.target = XProtoTestHelpers.write_tmp_target(
+ "{{ xproto_fol_to_python_test('output',proto.policies.test_policy, None, '0') }}"
+ )
"""
This is the security policy for controllers
"""
+
def test_controller_policy(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < ctx.user.is_admin | exists Privilege: Privilege.accessor_id = ctx.user.id & Privilege.object_type = "Deployment" & Privilege.permission = "role:admin" & Privilege.object_id = obj.id >
"""
args = XOSProcessorArgs()
@@ -48,7 +54,7 @@
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
@@ -64,10 +70,10 @@
"""
This is the security policy for ControllerNetworks
"""
+
def test_controller_network_policy(self):
- xproto = \
-"""
- policy test_policy <
+ xproto = """
+ policy test_policy <
ctx.user.is_admin
| (exists Privilege:
Privilege.accessor_id = ctx.user.id
@@ -86,7 +92,7 @@
args.target = self.target
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
@@ -104,9 +110,9 @@
"""
This is the security policy for Slices
"""
+
def test_slice_policy(self):
- xproto = \
-"""
+ xproto = """
policy site_policy <
ctx.user.is_admin
| (ctx.write_access -> exists Privilege: Privilege.object_type = "Site" & Privilege.object_id = obj.id & Privilege.accessor_id = ctx.user.id & Privilege.permission_id = "role:admin") >
@@ -127,7 +133,7 @@
& Privilege.object_id = obj.site.id
& Privilege.permission = "role:admin"))
)>
-
+
"""
args = XOSProcessorArgs()
args.inputs = xproto
@@ -135,25 +141,25 @@
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
- i2 = ctx.user.is_admin
- i4 = policy_site_policy_enforcer(obj.site, ctx)
- i10 = ctx.write_access
- i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
- i8 = (i10 and i11)
- i14 = ctx.write_access
- i12 = (not i14)
- i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
- i9 = (i12 and i13)
- i6 = (i8 or i9)
- i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
- i5 = (i6 or i7)
- i3 = (i4 and i5)
- i1 = (i2 or i3)
- return i1
+ i2 = ctx.user.is_admin
+ i4 = policy_site_policy_enforcer(obj.site, ctx)
+ i10 = ctx.write_access
+ i11 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id), Q(permission='role:admin'))))
+ i8 = (i10 and i11)
+ i14 = ctx.write_access
+ i12 = (not i14)
+ i13 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Slice'), Q(object_id=obj.id))))
+ i9 = (i12 and i13)
+ i6 = (i8 or i9)
+ i7 = (not (not Privilege.objects.filter(Q(accessor_id=ctx.user.id), Q(accessor_type='User'), Q(object_type='Site'), Q(object_id=obj.site.id), Q(permission='role:admin'))))
+ i5 = (i6 or i7)
+ i3 = (i4 and i5)
+ i1 = (i2 or i3)
+ return i1
"""
# FIXME: Test this policy by executing it
@@ -162,13 +168,13 @@
"""
This is the security policy for Users
"""
+
def test_user_policy(self):
- xproto = \
-"""
+ xproto = """
policy test_policy <
ctx.user.is_admin
| ctx.user.id = obj.id
- | (exists Privilege:
+ | (exists Privilege:
Privilege.accessor_id = ctx.user.id
& Privilege.accessor_type = "User"
& Privilege.permission = "role:admin"
@@ -181,7 +187,7 @@
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_enforcer(obj, ctx):
@@ -196,5 +202,6 @@
# FIXME: Test this policy by executing it
self.assertTrue(policy_output_enforcer is not None)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xos-genx-tests/test_xos_validation.py b/lib/xos-genx/xos-genx-tests/test_xos_validation.py
index f2f8ce3..257eb4d 100644
--- a/lib/xos-genx/xos-genx-tests/test_xos_validation.py
+++ b/lib/xos-genx/xos-genx-tests/test_xos_validation.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,21 +20,27 @@
"""The function below is for eliminating warnings arising due to the missing policy_output_validator,
which is generated and loaded dynamically.
"""
+
+
def policy_output_validator(x, y):
raise Exception("Validator not generated. Test failed.")
return False
+
"""
-The tests below use the Python code target to generate
+The tests below use the Python code target to generate
Python validation policies, set up an appropriate environment and execute the Python.
"""
+
+
class XProtoXOSModelValidationTest(unittest.TestCase):
def setUp(self):
- self.target = XProtoTestHelpers.write_tmp_target("{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}")
+ self.target = XProtoTestHelpers.write_tmp_target(
+ "{{ xproto_fol_to_python_validator('output', proto.policies.test_policy, None, 'Necessary Failure') }}"
+ )
def test_instance_container(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < (obj.isolation = "container" | obj.isolation = "container_vm" ) -> (obj.image.kind = "container") >
"""
args = XOSProcessorArgs()
@@ -45,10 +50,10 @@
output = XOSProcessor.process(args)
obj = FakeObject()
- obj.isolation = 'container'
- obj.kind = 'not a container'
+ obj.isolation = "container"
+ obj.kind = "not a container"
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -61,11 +66,10 @@
"""
with self.assertRaises(Exception):
- policy_output_validator(obj, {})
-
+ policy_output_validator(obj, {})
+
def test_slice_name_validation(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < not obj.id -> {{ obj.name.startswith(obj.site.login_base) }} >
"""
args = XOSProcessorArgs()
@@ -75,10 +79,10 @@
output = XOSProcessor.process(args)
obj = FakeObject()
- obj.isolation = 'container'
- obj.kind = 'not a container'
+ obj.isolation = "container"
+ obj.kind = "not a container"
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -91,11 +95,10 @@
"""
with self.assertRaises(Exception):
- policy_output_validator(obj, {})
+ policy_output_validator(obj, {})
def test_equal(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < not (ctx.user = obj.user) >
"""
@@ -105,7 +108,7 @@
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -121,11 +124,10 @@
ctx.user = 1
with self.assertRaises(Exception):
- policy_output_validator(obj, ctx)
+ policy_output_validator(obj, ctx)
def test_bin(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < not (ctx.is_admin = True | obj.empty = True) | False>
"""
@@ -134,7 +136,7 @@
args.target = self.target
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -154,10 +156,8 @@
with self.assertRaises(Exception):
verdict = policy_output_validator(obj, ctx)
-
def test_exists(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < exists Privilege: Privilege.object_id = obj.id >
"""
args = XOSProcessorArgs()
@@ -165,7 +165,7 @@
args.target = self.target
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -175,17 +175,16 @@
"""
self.assertTrue(policy_output_validator is not None)
-
+
def test_python(self):
- xproto = \
-"""
+ xproto = """
policy test_policy < {{ "jack" in ["the", "box"] }} = True >
"""
args = XOSProcessorArgs()
args.inputs = xproto
args.target = self.target
output = XOSProcessor.process(args)
- exec(output) # This loads the generated function, which should look like this:
+ exec(output) # This loads the generated function, which should look like this:
"""
def policy_output_validator(obj, ctx):
@@ -200,8 +199,7 @@
def test_forall(self):
# This one we only parse
- xproto = \
-"""
+ xproto = """
policy test_policy < forall Credential: Credential.obj_id = obj_id >
"""
@@ -218,7 +216,8 @@
return i1
"""
- self.assertIn('policy_output_validator', output)
+ self.assertIn("policy_output_validator", output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-genx/xosgenx/__init__.py b/lib/xos-genx/xosgenx/__init__.py
index d4e8062..b0fb0b2 100644
--- a/lib/xos-genx/xosgenx/__init__.py
+++ b/lib/xos-genx/xosgenx/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
diff --git a/lib/xos-genx/xosgenx/generator.py b/lib/xos-genx/xosgenx/generator.py
index 3355fb5..3e650be 100644
--- a/lib/xos-genx/xosgenx/generator.py
+++ b/lib/xos-genx/xosgenx/generator.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import plyxproto.parser as plyxproto
import jinja2
import os
@@ -23,9 +23,10 @@
import yaml
from colorama import Fore
-loader = jinja2.PackageLoader(__name__, 'templates')
+loader = jinja2.PackageLoader(__name__, "templates")
env = jinja2.Environment(loader=loader)
+
class XOSProcessorArgs:
""" Helper class for use cases that want to call XOSProcessor directly, rather than executing xosgenx from the
command line.
@@ -40,9 +41,13 @@
default_dest_extension = None
default_target = None
default_checkers = None
- default_verbosity = 0 # Higher numbers = more verbosity, lower numbers = less verbosity
- default_include_models = [] # If neither include_models nor include_apps is specified, then all models will
- default_include_apps = [] # be included.
+ default_verbosity = (
+ 0
+ ) # Higher numbers = more verbosity, lower numbers = less verbosity
+ default_include_models = (
+ []
+ ) # If neither include_models nor include_apps is specified, then all models will
+ default_include_apps = [] # be included.
def __init__(self, **kwargs):
# set defaults
@@ -60,14 +65,14 @@
self.include_apps = XOSProcessorArgs.default_include_apps
# override defaults with kwargs
- for (k,v) in kwargs.items():
+ for (k, v) in kwargs.items():
setattr(self, k, v)
-class XOSProcessor:
+class XOSProcessor:
@staticmethod
def _read_input_from_files(files):
- input = ''
+ input = ""
for fname in files:
with open(fname) as infile:
input += infile.read()
@@ -75,7 +80,7 @@
@staticmethod
def _attach_parser(ast, args):
- if hasattr(args, 'rev') and args.rev:
+ if hasattr(args, "rev") and args.rev:
v = Proto2XProto()
ast.accept(v)
@@ -86,7 +91,9 @@
@staticmethod
def _get_template(target):
if not os.path.isabs(target):
- return os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/targets/' + target)
+ return os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/targets/" + target
+ )
return target
@staticmethod
@@ -94,10 +101,10 @@
# NOTE this method can be used in the jinja template
def file_exists2(name):
if attic is not None:
- path = attic + '/' + name
+ path = attic + "/" + name
else:
path = name
- return (os.path.exists(path))
+ return os.path.exists(path)
return file_exists2
@@ -106,60 +113,65 @@
# NOTE this method can be used in the jinja template
def include_file2(name):
if attic is not None:
- path = attic + '/' + name
+ path = attic + "/" + name
else:
path = name
return open(path).read()
+
return include_file2
@staticmethod
def _load_jinja2_extensions(os_template_env, attic):
- os_template_env.globals['include_file'] = XOSProcessor._include_file(attic) # Generates a function
- os_template_env.globals['file_exists'] = XOSProcessor._file_exists(attic) # Generates a function
+ os_template_env.globals["include_file"] = XOSProcessor._include_file(
+ attic
+ ) # Generates a function
+ os_template_env.globals["file_exists"] = XOSProcessor._file_exists(
+ attic
+ ) # Generates a function
- os_template_env.filters['yaml'] = yaml.dump
+ os_template_env.filters["yaml"] = yaml.dump
for f in dir(jinja2_extensions):
- if f.startswith('xproto'):
+ if f.startswith("xproto"):
os_template_env.globals[f] = getattr(jinja2_extensions, f)
return os_template_env
@staticmethod
def _add_context(args):
- if not hasattr(args, 'kv') or not args.kv:
+ if not hasattr(args, "kv") or not args.kv:
return
try:
context = {}
- for s in args.kv.split(','):
- k, val = s.split(':')
+ for s in args.kv.split(","):
+ k, val = s.split(":")
context[k] = val
return context
- except Exception, e:
- print e.message
+ except Exception as e:
+ print(e.message)
@staticmethod
def _write_single_file(rendered, dir, dest_file, quiet):
file_name = "%s/%s" % (dir, dest_file)
- file = open(file_name, 'w')
+ file = open(file_name, "w")
file.write(rendered)
file.close()
- if quiet == False:
- print "Saved: %s" % file_name
+ if not quiet:
+ print("Saved: %s" % file_name)
@staticmethod
def _write_file_per_model(rendered, dir, suffix, quiet):
for m in rendered:
file_name = "%s/%s%s" % (dir, m.lower(), suffix)
if not rendered[m]:
- if quiet == False:
- print "Not saving %s as it is empty" % file_name
+ if not quiet:
+ print("Not saving %s as it is empty" % file_name)
else:
- file = open(file_name, 'w')
+ file = open(file_name, "w")
file.write(rendered[m])
file.close()
- if quiet == False:
- print "Saved: %s" % file_name
+ if not quiet:
+ print("Saved: %s" % file_name)
@staticmethod
def _write_split_target(rendered, dir, quiet):
@@ -167,21 +179,21 @@
lines = rendered.splitlines()
current_buffer = []
for l in lines:
- if (l.startswith('+++')):
+ if l.startswith("+++"):
if dir:
- path = dir + '/' + l[4:].lower()
+ path = dir + "/" + l[4:].lower()
- fil = open(path, 'w')
- buf = '\n'.join(current_buffer)
+ fil = open(path, "w")
+ buf = "\n".join(current_buffer)
obuf = buf
fil.write(obuf)
fil.close()
- if quiet == False:
- print "Save file to: %s" % path
+ if not quiet:
+ print("Save file to: %s" % path)
current_buffer = []
else:
@@ -189,50 +201,55 @@
@staticmethod
def _find_message_by_model_name(messages, model):
- return next((x for x in messages if x['name'] == model), None)
+ return next((x for x in messages if x["name"] == model), None)
@staticmethod
def _find_last_nonempty_line(text, pointer):
ne_pointer = pointer
found = False
- while ne_pointer!=0 and not found:
- ne_pointer = text[:(ne_pointer-1)].rfind('\n')
- if ne_pointer<0: ne_pointer = 0
- if text[ne_pointer-1]!='\n':
+ while ne_pointer != 0 and not found:
+ ne_pointer = text[: (ne_pointer - 1)].rfind("\n")
+ if ne_pointer < 0:
+ ne_pointer = 0
+ if text[ne_pointer - 1] != "\n":
found = True
return ne_pointer
@staticmethod
- def process(args, operator = None):
+ def process(args, operator=None):
# Setting defaults
- if not hasattr(args, 'attic'):
+ if not hasattr(args, "attic"):
args.attic = None
- if not hasattr(args, 'write_to_file'):
+ if not hasattr(args, "write_to_file"):
args.write_to_file = None
- if not hasattr(args, 'dest_file'):
+ if not hasattr(args, "dest_file"):
args.dest_file = None
- if not hasattr(args, 'dest_extension'):
+ if not hasattr(args, "dest_extension"):
args.dest_extension = None
- if not hasattr(args, 'output'):
+ if not hasattr(args, "output"):
args.output = None
- if not hasattr(args, 'quiet'):
+ if not hasattr(args, "quiet"):
args.quiet = True
# Validating
- if args.write_to_file == 'single' and args.dest_file is None:
- raise Exception("[XosGenX] write_to_file option is specified as 'single' but no dest_file is provided")
- if args.write_to_file == 'model' and (args.dest_extension is None):
- raise Exception("[XosGenX] write_to_file option is specified as 'model' but no dest_extension is provided")
+ if args.write_to_file == "single" and args.dest_file is None:
+ raise Exception(
+ "[XosGenX] write_to_file option is specified as 'single' but no dest_file is provided"
+ )
+ if args.write_to_file == "model" and (args.dest_extension is None):
+ raise Exception(
+ "[XosGenX] write_to_file option is specified as 'model' but no dest_extension is provided"
+ )
if args.output is not None and not os.path.isabs(args.output):
raise Exception("[XosGenX] The output dir must be an absolute path!")
if args.output is not None and not os.path.isdir(args.output):
raise Exception("[XosGenX] The output dir must be a directory!")
- if hasattr(args, 'files'):
+ if hasattr(args, "files"):
inputs = XOSProcessor._read_input_from_files(args.files)
- elif hasattr(args, 'inputs'):
+ elif hasattr(args, "inputs"):
inputs = args.inputs
else:
raise Exception("[XosGenX] No inputs provided!")
@@ -243,28 +260,29 @@
else:
template_path = operator
-
[template_folder, template_name] = os.path.split(template_path)
os_template_loader = jinja2.FileSystemLoader(searchpath=[template_folder])
os_template_env = jinja2.Environment(loader=os_template_loader)
- os_template_env = XOSProcessor._load_jinja2_extensions(os_template_env, args.attic)
+ os_template_env = XOSProcessor._load_jinja2_extensions(
+ os_template_env, args.attic
+ )
template = os_template_env.get_template(template_name)
context = XOSProcessor._add_context(args)
parser = plyxproto.ProtobufAnalyzer()
try:
ast = parser.parse_string(inputs, debug=0)
- except plyxproto.ParsingError, e:
+ except plyxproto.ParsingError as e:
line, start, end = e.error_range
ptr = XOSProcessor._find_last_nonempty_line(inputs, start)
if start == 0:
- beginning = ''
+ beginning = ""
else:
- beginning = inputs[ptr:start-1]
+ beginning = inputs[ptr: start - 1]
- line_end_char = inputs[start+end:].find('\n')
+ line_end_char = inputs[start + end:].find("\n")
line_end = inputs[line_end_char]
if e.message:
@@ -272,11 +290,16 @@
else:
error = "xproto parsing error"
- print error + "\n" + Fore.YELLOW + "Line %d:"%line + Fore.WHITE
- print beginning + Fore.YELLOW + inputs[start-1:start+end] + Fore.WHITE + line_end
+ print(error + "\n" + Fore.YELLOW + "Line %d:" % line + Fore.WHITE)
+ print(
+ beginning
+ + Fore.YELLOW
+ + inputs[start - 1: start + end]
+ + Fore.WHITE
+ + line_end
+ )
exit(1)
-
v = XOSProcessor._attach_parser(ast, args)
if args.include_models or args.include_apps:
@@ -300,38 +323,42 @@
messages = [XOSProcessor._find_message_by_model_name(v.messages, model)]
rendered[model] = template.render(
- {"proto":
- {
- 'message_table': models,
- 'messages': messages,
- 'policies': v.policies,
- 'message_names': [m['name'] for m in v.messages]
+ {
+ "proto": {
+ "message_table": models,
+ "messages": messages,
+ "policies": v.policies,
+ "message_names": [m["name"] for m in v.messages],
},
"context": context,
- "options": v.options
+ "options": v.options,
}
)
- if (str(v.options.get("legacy", "false")).strip('"').lower() == "true"):
+ if str(v.options.get("legacy", "false")).strip('"').lower() == "true":
suffix = "_decl." + args.dest_extension
else:
suffix = "." + args.dest_extension
- XOSProcessor._write_file_per_model(rendered, args.output, suffix, args.quiet)
+ XOSProcessor._write_file_per_model(
+ rendered, args.output, suffix, args.quiet
+ )
else:
rendered = template.render(
- {"proto":
- {
- 'message_table': v.models,
- 'messages': v.messages,
- 'policies': v.policies,
- 'message_names': [m['name'] for m in v.messages]
+ {
+ "proto": {
+ "message_table": v.models,
+ "messages": v.messages,
+ "policies": v.policies,
+ "message_names": [m["name"] for m in v.messages],
},
"context": context,
- "options": v.options
+ "options": v.options,
}
)
if args.output is not None and args.write_to_file == "target":
XOSProcessor._write_split_target(rendered, args.output, args.quiet)
elif args.output is not None and args.write_to_file == "single":
- XOSProcessor._write_single_file(rendered, args.output, args.dest_file, args.quiet)
+ XOSProcessor._write_single_file(
+ rendered, args.output, args.dest_file, args.quiet
+ )
return rendered
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py b/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
index 859594c..bf7a812 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
from .django import *
from .base import *
from .fol2 import *
from .gui import *
from .tosca import *
from .checklib import *
+
+__all__ = ["django", "base", "fol2", "gui", "tosca", "checklib"]
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/base.py b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
index e11d2ec..96e8dc2 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/base.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,49 +13,56 @@
# limitations under the License.
+from __future__ import print_function
import pdb
import re
from inflect import engine as inflect_engine_class
inflect_engine = inflect_engine_class()
+
class FieldNotFound(Exception):
def __init__(self, message):
super(FieldNotFound, self).__init__(message)
+
def xproto_debug(**kwargs):
- print kwargs
+ print(kwargs)
pdb.set_trace()
+
def xproto_unquote(s):
return unquote(s)
+
def unquote(s):
- if (s.startswith('"') and s.endswith('"')):
+ if s.startswith('"') and s.endswith('"'):
return s[1:-1]
else:
return s
+
def xproto_singularize(field):
try:
# The user has set a singular, as an exception that cannot be handled automatically
- singular = field['options']['singular']
+ singular = field["options"]["singular"]
singular = unquote(singular)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
return singular
+
def xproto_singularize_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
@@ -65,86 +71,103 @@
return plural
+
def xproto_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- plural = inflect_engine.plural_noun(field['name'])
+ plural = inflect_engine.plural_noun(field["name"])
return plural
-def xproto_base_def(model_name, base, suffix='', suffix_list=[]):
- if (model_name=='XOSBase'):
- return '(models.Model, PlModelMixIn)'
- elif (not base):
- return ''
+
+def xproto_base_def(model_name, base, suffix="", suffix_list=[]):
+ if model_name == "XOSBase":
+ return "(models.Model, PlModelMixIn)"
+ elif not base:
+ return ""
else:
- int_base = [i['name']+suffix for i in base if i['name'] in suffix_list]
- ext_base = [i['name'] for i in base if i['name'] not in suffix_list]
- return '(' + ','.join(int_base + ext_base) + ')'
+ int_base = [i["name"] + suffix for i in base if i["name"] in suffix_list]
+ ext_base = [i["name"] for i in base if i["name"] not in suffix_list]
+ return "(" + ",".join(int_base + ext_base) + ")"
+
def xproto_first_non_empty(lst):
for l in lst:
- if l: return l
+ if l:
+ return l
+
def xproto_api_type(field):
try:
- if (unquote(field['options']['content_type'])=='date'):
- return 'double'
+ if unquote(field["options"]["content_type"]) == "date":
+ return "double"
except KeyError:
pass
- return field['type']
+ return field["type"]
def xproto_base_name(n):
# Hack - Refactor NetworkParameter* to make this go away
- if (n.startswith('NetworkParameter')):
- return '_'
+ if n.startswith("NetworkParameter"):
+ return "_"
- expr = r'^[A-Z]+[a-z]*'
+ expr = r"^[A-Z]+[a-z]*"
try:
match = re.findall(expr, n)[0]
- except:
- return '_'
+ except BaseException:
+ return "_"
return match
+
def xproto_base_fields(m, table):
fields = []
- for b in m['bases']:
- option1 = b['fqn']
+ for b in m["bases"]:
+ option1 = b["fqn"]
try:
- option2 = m['package'] + '.' + b['name']
+ option2 = m["package"] + "." + b["name"]
except TypeError:
option2 = option1
accessor = None
- if option1 in table: accessor = option1
- elif option2 in table: accessor = option2
+ if option1 in table:
+ accessor = option1
+ elif option2 in table:
+ accessor = option2
if accessor:
base_fields = xproto_base_fields(table[accessor], table)
- model_fields = [x.copy() for x in table[accessor]['fields']]
+ model_fields = [x.copy() for x in table[accessor]["fields"]]
for field in model_fields:
field["accessor"] = accessor
fields.extend(base_fields)
fields.extend(model_fields)
- if 'no_sync' in m['options'] and m['options']['no_sync']:
- fields = [f for f in fields if f['name'] != 'backend_status' and f['name'] != 'backend_code']
+ if "no_sync" in m["options"] and m["options"]["no_sync"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "backend_status" and f["name"] != "backend_code"
+ ]
- if 'no_policy' in m['options'] and m['options']['no_policy']:
- fields = [f for f in fields if f['name'] != 'policy_status' and f['name'] != 'policy_code']
+ if "no_policy" in m["options"] and m["options"]["no_policy"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "policy_status" and f["name"] != "policy_code"
+ ]
return fields
+
def xproto_fields(m, table):
""" Generate the full list of models for the xproto message `m` including fields from the classes it inherits.
@@ -165,9 +188,17 @@
# The "id" field is a special field. Every model has one. Put it up front and pretend it's part of the
if not fields:
- raise Exception("Model %s has no fields. Check for missing base class." % m["name"])
+ raise Exception(
+ "Model %s has no fields. Check for missing base class." % m["name"]
+ )
- id_field = {'type': 'int32', 'name': 'id', 'options': {}, "id": "1", "accessor": fields[0]["accessor"]}
+ id_field = {
+ "type": "int32",
+ "name": "id",
+ "options": {},
+ "id": "1",
+ "accessor": fields[0]["accessor"],
+ }
fields = [id_field] + fields
@@ -176,12 +207,15 @@
offset = 0
last_accessor = fields[0]["accessor"]
for field in fields:
- if (field["accessor"] != last_accessor):
+ if field["accessor"] != last_accessor:
last_accessor = field["accessor"]
offset += 100
field_id = int(field["id"])
if (field_id < 1) or (field_id >= 100):
- raise Exception("Only field numbers from 1 to 99 are permitted, field %s in model %s" % (field["name"], field["accessor"]))
+ raise Exception(
+ "Only field numbers from 1 to 99 are permitted, field %s in model %s"
+ % (field["name"], field["accessor"])
+ )
field["id"] = int(field["id"]) + offset
# Check for duplicates
@@ -190,20 +224,24 @@
id = field["id"]
dup = fields_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d with field %s in model %s" % (field["name"], id, dup["name"], field["accessor"]))
+ raise Exception(
+ "Field %s has duplicate number %d with field %s in model %s"
+ % (field["name"], id, dup["name"], field["accessor"])
+ )
fields_by_number[id] = field
return fields
+
def xproto_base_rlinks(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_rlinks = xproto_base_rlinks(table[b], table)
- model_rlinks = [x.copy() for x in table[b]['rlinks']]
+ model_rlinks = [x.copy() for x in table[b]["rlinks"]]
for link in model_rlinks:
link["accessor"] = b
@@ -212,6 +250,7 @@
return links
+
def xproto_rlinks(m, table):
""" Return the reverse links for the xproto message `m`.
@@ -228,14 +267,16 @@
links = xproto_base_rlinks(m, table) + model_rlinks
- links = [x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])]
+ links = [
+ x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])
+ ]
if links:
last_accessor = links[0]["accessor"]
offset = 0
index = 1900
for link in links:
- if (link["accessor"] != last_accessor):
+ if link["accessor"] != last_accessor:
last_accessor = link["accessor"]
offset += 100
@@ -249,13 +290,15 @@
index += 1
# check for duplicates
- links_by_number={}
+ links_by_number = {}
for link in links:
id = link["id"]
- dup=links_by_number.get(id)
+ dup = links_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d in model %s with reverse field %s" %
- (link["src_port"], id, m["name"], dup["src_port"]))
+ raise Exception(
+ "Field %s has duplicate number %d in model %s with reverse field %s"
+ % (link["src_port"], id, m["name"], dup["src_port"])
+ )
links_by_number[id] = link
return links
@@ -264,40 +307,45 @@
def xproto_base_links(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_links = xproto_base_links(table[b], table)
- model_links = table[b]['links']
+ model_links = table[b]["links"]
links.extend(base_links)
links.extend(model_links)
return links
+
def xproto_string_type(xptags):
try:
- max_length = eval(xptags['max_length'])
- except:
+ max_length = eval(xptags["max_length"])
+ except BaseException:
max_length = 1024
- if ('varchar' not in xptags):
- return 'string'
+ if "varchar" not in xptags:
+ return "string"
else:
- return 'text'
+ return "text"
+
def xproto_tuplify(nested_list_or_set):
- if not isinstance(nested_list_or_set, list) and not isinstance(nested_list_or_set, set):
+ if not isinstance(nested_list_or_set, list) and not isinstance(
+ nested_list_or_set, set
+ ):
return nested_list_or_set
else:
return tuple([xproto_tuplify(i) for i in nested_list_or_set])
-def xproto_field_graph_components(fields, model, tag='unique_with'):
+
+def xproto_field_graph_components(fields, model, tag="unique_with"):
def find_components(graph):
pending = set(graph.keys())
components = []
while pending:
- front = { pending.pop() }
+ front = {pending.pop()}
component = set()
while front:
@@ -308,87 +356,96 @@
pending -= neighbours
component |= neighbours
-
+
components.append(component)
return components
field_graph = {}
- field_names = {f['name'] for f in fields}
+ field_names = {f["name"] for f in fields}
for f in fields:
try:
- tagged_str = unquote(f['options'][tag])
- tagged_fields = tagged_str.split(',')
+ tagged_str = unquote(f["options"][tag])
+ tagged_fields = tagged_str.split(",")
for uf in tagged_fields:
if uf not in field_names:
- raise FieldNotFound('Field "%s" not found in model "%s", referenced from field "%s" by option "%s"' % (uf, model['name'], f['name'], tag))
+ raise FieldNotFound(
+ 'Field "%s" not found in model "%s", referenced from field "%s" by option "%s"'
+ % (uf, model["name"], f["name"], tag)
+ )
- field_graph.setdefault(f['name'], set()).add(uf)
- field_graph.setdefault(uf, set()).add(f['name'])
+ field_graph.setdefault(f["name"], set()).add(uf)
+ field_graph.setdefault(uf, set()).add(f["name"])
except KeyError:
pass
return find_components(field_graph)
+
def xproto_api_opts(field):
options = []
- if 'max_length' in field['options'] and field['type']=='string':
- options.append('(val).maxLength = %s'%field['options']['max_length'])
+ if "max_length" in field["options"] and field["type"] == "string":
+ options.append("(val).maxLength = %s" % field["options"]["max_length"])
try:
- if field['options']['null'] == 'False':
- options.append('(val).nonNull = true')
+ if field["options"]["null"] == "False":
+ options.append("(val).nonNull = true")
except KeyError:
pass
- if 'link' in field and 'model' in field['options']:
- options.append('(foreignKey).modelName = "%s"'%field['options']['model'])
+ if "link" in field and "model" in field["options"]:
+ options.append('(foreignKey).modelName = "%s"' % field["options"]["model"])
if ("options" in field) and ("port" in field["options"]):
- options.append('(foreignKey).reverseFieldName = "%s"' % field['options']['port'])
+ options.append(
+ '(foreignKey).reverseFieldName = "%s"' % field["options"]["port"]
+ )
if options:
- options_str = '[' + ', '.join(options) + ']'
+ options_str = "[" + ", ".join(options) + "]"
else:
- options_str = ''
+ options_str = ""
return options_str
+
def xproto_type_to_swagger_type(f):
try:
- content_type = f['options']['content_type']
+ content_type = f["options"]["content_type"]
content_type = eval(content_type)
- except:
+ except BaseException:
content_type = None
pass
- if 'choices' in f['options']:
- return 'string'
- elif content_type == 'date':
- return 'string'
- elif f['type'] == 'bool':
- return 'boolean'
- elif f['type'] == 'string':
- return 'string'
- elif f['type'] in ['int','uint32','int32'] or 'link' in f:
- return 'integer'
- elif f['type'] in ['double','float']:
- return 'string'
+ if "choices" in f["options"]:
+ return "string"
+ elif content_type == "date":
+ return "string"
+ elif f["type"] == "bool":
+ return "boolean"
+ elif f["type"] == "string":
+ return "string"
+ elif f["type"] in ["int", "uint32", "int32"] or "link" in f:
+ return "integer"
+ elif f["type"] in ["double", "float"]:
+ return "string"
+
def xproto_field_to_swagger_enum(f):
- if 'choices' in f['options']:
+ if "choices" in f["options"]:
list = []
- for c in eval(xproto_unquote(f['options']['choices'])):
+ for c in eval(xproto_unquote(f["options"]["choices"])):
list.append(c[0])
return list
else:
return False
+
def xproto_is_true(x):
# TODO: Audit xproto and make specification of trueness more uniform
- if (x==True) or (x=="True") or (x=='"True"'):
+ if x is True or (x == "True") or (x == '"True"'):
return True
return False
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py b/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
index a61f7ca..db61f01 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,62 +14,67 @@
import ast
+
def xproto_check_synchronizer(m):
try:
- sync_step_path = 'synchronizer/steps/sync_%s.py'%m['name'].lower()
+ sync_step_path = "synchronizer/steps/sync_%s.py" % m["name"].lower()
sync_step = open(sync_step_path).read()
except IOError:
- return '510 Model needs a sync step %s'%sync_step_path
+ return "510 Model needs a sync step %s" % sync_step_path
try:
sync_step_ast = ast.parse(sync_step)
except SyntaxError:
- return '511 Could not parse sync step %s'%sync_step_path
+ return "511 Could not parse sync step %s" % sync_step_path
- classes = filter(lambda x:isinstance(x, ast.ClassDef), sync_step_ast.body)
+ classes = filter(lambda x: isinstance(x, ast.ClassDef), sync_step_ast.body)
found_sync_step_class = False
for c in classes:
base_names = [v.id for v in c.bases]
- if 'SyncStep' in base_names or 'SyncInstanceUsingAnsible' in base_names:
- attributes = filter(lambda x:isinstance(x, ast.Assign), c.body)
+ if "SyncStep" in base_names or "SyncInstanceUsingAnsible" in base_names:
+ attributes = filter(lambda x: isinstance(x, ast.Assign), c.body)
for a in attributes:
target_names = [t.id for t in a.targets]
values = a.value.elts if isinstance(a.value, ast.List) else [a.value]
value_names = [v.id for v in values]
- if 'observes' in target_names and m['name'] in value_names:
+ if "observes" in target_names and m["name"] in value_names:
found_sync_step_class = True
break
if not found_sync_step_class:
- return '512 Synchronizer needs a sync step class with an observes field containing %s'%m['name']
+ return (
+ "512 Synchronizer needs a sync step class with an observes field containing %s"
+ % m["name"]
+ )
else:
- return '200 OK'
+ return "200 OK"
def xproto_check_policy(m):
try:
- model_policy_path = 'synchronizer/model_policies/model_policy_%s.py'%m['name'].lower()
+ model_policy_path = (
+ "synchronizer/model_policies/model_policy_%s.py" % m["name"].lower()
+ )
model_policy = open(model_policy_path).read()
except IOError:
- return '510 Model needs a model policy %s'%model_policy_path
+ return "510 Model needs a model policy %s" % model_policy_path
try:
model_policy_ast = ast.parse(model_policy)
except SyntaxError:
- return '511 Could not parse sync step %s'%model_policy_path
+ return "511 Could not parse sync step %s" % model_policy_path
- classes = filter(lambda x:isinstance(x, ast.ClassDef), model_policy_ast.body)
+ classes = filter(lambda x: isinstance(x, ast.ClassDef), model_policy_ast.body)
found_model_policy_class = False
for c in classes:
base_names = [v.id for v in c.bases]
- if 'Policy' in base_names or 'TenantWithContainerPolicy' in base_names:
+ if "Policy" in base_names or "TenantWithContainerPolicy" in base_names:
found_model_policy_class = True
break
if not found_model_policy_class:
- return '513 Synchronizer needs a model policy class'
+ return "513 Synchronizer needs a model policy class"
else:
- return '200 OK'
-
+ return "200 OK"
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/django.py b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
index 64ab51a..d71ea51 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/django.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,183 +17,217 @@
import pdb
import re
+
def django_content_type_string(xptags):
# Check possibility of KeyError in caller
- content_type = xptags['content_type']
+ content_type = xptags["content_type"]
try:
content_type = eval(content_type)
- except:
+ except BaseException:
pass
- if (content_type=='url'):
- return 'URLField'
- if (content_type=='date'):
- return 'DateTimeField'
- elif (content_type=='ip'):
- return 'GenericIPAddressField'
- elif (content_type=='stripped' or content_type=='"stripped"'):
- return 'StrippedCharField'
+ if content_type == "url":
+ return "URLField"
+ if content_type == "date":
+ return "DateTimeField"
+ elif content_type == "ip":
+ return "GenericIPAddressField"
+ elif content_type == "stripped" or content_type == '"stripped"':
+ return "StrippedCharField"
else:
- raise Exception('Unknown Type: %s'%content_type)
+ raise Exception("Unknown Type: %s" % content_type)
+
def django_string_type(xptags):
try:
- max_length = eval(xptags['max_length'])
- except:
+ max_length = eval(xptags["max_length"])
+ except BaseException:
max_length = 1024 * 1024
- if ('content_type' in xptags):
+ if "content_type" in xptags:
return django_content_type_string(xptags)
- elif (max_length<1024*1024):
- return 'CharField'
+ elif max_length < 1024 * 1024:
+ return "CharField"
else:
- return 'TextField'
+ return "TextField"
+
def xproto_django_type(xptype, xptags):
- if (xptype=='string'):
+ if xptype == "string":
return django_string_type(xptags)
- elif (xptype=='float'):
- return 'FloatField'
- elif (xptype=='bool'):
- return 'BooleanField'
- elif (xptype=='uint32'):
- return 'IntegerField'
- elif (xptype=='int32'):
- return 'IntegerField'
- elif (xptype=='int64'):
- return 'BigIntegerField'
+ elif xptype == "float":
+ return "FloatField"
+ elif xptype == "bool":
+ return "BooleanField"
+ elif xptype == "uint32":
+ return "IntegerField"
+ elif xptype == "int32":
+ return "IntegerField"
+ elif xptype == "int64":
+ return "BigIntegerField"
else:
- raise Exception('Unknown Type: %s'%xptype)
+ raise Exception("Unknown Type: %s" % xptype)
+
def xproto_django_link_type(f):
- if (f['link_type']=='manytoone'):
- return 'ForeignKey'
- elif (f['link_type']=='onetoone'):
- return 'OneToOneField'
- elif (f['link_type']=='manytomany'):
- if (f['dst_port']):
- return 'ManyToManyField'
+ if f["link_type"] == "manytoone":
+ return "ForeignKey"
+ elif f["link_type"] == "onetoone":
+ return "OneToOneField"
+ elif f["link_type"] == "manytomany":
+ if f["dst_port"]:
+ return "ManyToManyField"
else:
- return 'GenericRelation'
+ return "GenericRelation"
+
def map_xproto_to_django(f):
- allowed_keys=['help_text','default','max_length','modifier','blank','choices','db_index','null','editable','on_delete','verbose_name', 'auto_now_add', 'unique', 'min_value', 'max_value']
+ allowed_keys = [
+ "help_text",
+ "default",
+ "max_length",
+ "modifier",
+ "blank",
+ "choices",
+ "db_index",
+ "null",
+ "editable",
+ "on_delete",
+ "verbose_name",
+ "auto_now_add",
+ "unique",
+ "min_value",
+ "max_value",
+ ]
# TODO evaluate if setting Null = False for all strings
- m = {'modifier':{'optional':True, 'required':False, '_targets': ['null', 'blank']}}
+ m = {
+ "modifier": {"optional": True, "required": False, "_targets": ["null", "blank"]}
+ }
out = {}
- for k,v in f['options'].items():
+ for k, v in f["options"].items():
if k in allowed_keys:
try:
- # NOTE this will be used to parse xproto optional/required field prefix and apply it to the null and blank fields
+ # NOTE this will be used to parse xproto optional/required field prefix
+ # and apply it to the null and blank fields
kv2 = m[k]
- for t in kv2['_targets']:
+ for t in kv2["_targets"]:
out[t] = kv2[v]
- except:
+ except BaseException:
out[k] = v
return out
+
def xproto_django_link_options_str(field, dport=None):
output_dict = map_xproto_to_django(field)
- if (dport and (dport=='+' or '+' not in dport)):
- output_dict['related_name'] = '%r'%dport
+ if dport and (dport == "+" or "+" not in dport):
+ output_dict["related_name"] = "%r" % dport
try:
- if field['through']:
+ if field["through"]:
d = {}
- if isinstance(field['through'], str):
- split = field['through'].rsplit('.',1)
- d['name'] = split[-1]
- if len(split)==2:
- d['package'] = split[0]
- d['fqn'] = 'package' + '.' + d['name']
+ if isinstance(field["through"], str):
+ split = field["through"].rsplit(".", 1)
+ d["name"] = split[-1]
+ if len(split) == 2:
+ d["package"] = split[0]
+ d["fqn"] = "package" + "." + d["name"]
else:
- d['fqn'] = d['name']
- d['package'] = ''
+ d["fqn"] = d["name"]
+ d["package"] = ""
else:
- d = field['through']
+ d = field["through"]
- if not d['name'].endswith('_'+field['name']):
- output_dict['through'] = '%r'%d['fqn']
+ if not d["name"].endswith("_" + field["name"]):
+ output_dict["through"] = "%r" % d["fqn"]
except KeyError:
pass
return format_options_string(output_dict)
+
def use_native_django_validators(k, v):
validators_map = {
- 'min_value': 'MinValueValidator',
- 'max_value': 'MaxValueValidator'
+ "min_value": "MinValueValidator",
+ "max_value": "MaxValueValidator",
}
return "%s(%s)" % (validators_map[k], v)
+
def format_options_string(d):
- known_validators = ['min_value', 'max_value']
+ known_validators = ["min_value", "max_value"]
validator_lst = []
- if (not d):
- return ''
+ if not d:
+ return ""
else:
lst = []
- for k,v in d.items():
+ for k, v in d.items():
if k in known_validators:
validator_lst.append(use_native_django_validators(k, v))
- elif (type(v)==str and k=='default' and v.endswith('()"')):
- lst.append('%s = %s'%(k,v[1:-3]))
- elif (type(v)==str and v.startswith('"')):
+ elif isinstance(v, str) and k == "default" and v.endswith('()"'):
+ lst.append("%s = %s" % (k, v[1:-3]))
+ elif isinstance(v, str) and v.startswith('"'):
try:
# unquote the value if necessary
tup = eval(v[1:-1])
- if (type(tup)==tuple):
- lst.append('%s = %r'%(k,tup))
+ if isinstance(tup, tuple):
+ lst.append("%s = %r" % (k, tup))
else:
- lst.append('%s = %s'%(k,v))
- except:
- lst.append('%s = %s'%(k,v))
- elif (type(v)==bool):
- lst.append('%s = %r'%(k,bool(v)))
+ lst.append("%s = %s" % (k, v))
+ except BaseException:
+ lst.append("%s = %s" % (k, v))
+ elif isinstance(v, bool):
+ lst.append("%s = %r" % (k, bool(v)))
else:
try:
- lst.append('%s = %r'%(k,int(v)))
+ lst.append("%s = %r" % (k, int(v)))
except ValueError:
- lst.append('%s = %s'%(k,v))
- validator_string = "validators=[%s]" % ', '.join(validator_lst)
- option_string = ', '.join(lst)
+ lst.append("%s = %s" % (k, v))
+ validator_string = "validators=[%s]" % ", ".join(validator_lst)
+ option_string = ", ".join(lst)
if len(validator_lst) == 0:
return option_string
elif len(lst) == 0:
return validator_string
else:
- return option_string + ", " + validator_string
+ return option_string + ", " + validator_string
+
def xproto_django_options_str(field, dport=None):
output_dict = map_xproto_to_django(field)
- if (dport=='_'):
- dport = '+'
+ if dport == "_":
+ dport = "+"
- if (dport and (dport=='+' or '+' not in dport)):
- output_dict['related_name'] = '%r'%dport
+ if dport and (dport == "+" or "+" not in dport):
+ output_dict["related_name"] = "%r" % dport
return format_options_string(output_dict)
+
def xproto_camel_to_underscore(name):
- return re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+ return re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
+
def xproto_validations(options):
try:
- return [map(str.strip, validation.split(':')) for validation in unquote(options['validators']).split(',')]
+ return [
+ map(str.strip, validation.split(":"))
+ for validation in unquote(options["validators"]).split(",")
+ ]
except KeyError:
return []
+
def xproto_optioned_fields_to_list(fields, option, val):
"""
List all the field that have a particural option
@@ -207,14 +240,15 @@
optioned_fields = []
for f in fields:
option_names = []
- for k, v in f['options'].items():
+ for k, v in f["options"].items():
option_names.append(k)
- if option in option_names and f['options'][option] == val:
- optioned_fields.append(f['name'])
+ if option in option_names and f["options"][option] == val:
+ optioned_fields.append(f["name"])
return optioned_fields
+
# TODO
# - in modeldefs add info about this fields
# - update the gui to have this fields as readonly
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
index 73d04af..6d66117 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import astunparse
import ast
import random
@@ -21,18 +21,22 @@
import jinja2
from plyxproto.parser import *
-BINOPS = ['|', '&', '->']
-QUANTS = ['exists', 'forall']
+BINOPS = ["|", "&", "->"]
+QUANTS = ["exists", "forall"]
+
class PolicyException(Exception):
pass
+
class ConstructNotHandled(Exception):
pass
+
class TrivialPolicy(Exception):
pass
+
class AutoVariable:
def __init__(self, base):
self.base = base
@@ -42,25 +46,29 @@
return self
def next(self):
- var = 'i%d' % self.idx
+ var = "i%d" % self.idx
self.idx += 1
return var
+
def gen_random_string():
- return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(5))
+ return "".join(
+ random.choice(string.ascii_lowercase + string.digits) for _ in range(5)
+ )
+
class FOL2Python:
def __init__(self, context_map=None):
# This will produce i0, i1, i2 etc.
- self.loopvar = iter(AutoVariable('i'))
- self.verdictvar = iter(AutoVariable('result'))
+ self.loopvar = iter(AutoVariable("i"))
+ self.verdictvar = iter(AutoVariable("result"))
self.loop_variable = self.loopvar.next()
self.verdict_variable = self.verdictvar.next()
self.context_map = context_map
if not self.context_map:
- self.context_map = {'user': 'self', 'obj': 'obj'}
+ self.context_map = {"user": "self", "obj": "obj"}
def loop_next(self):
self.loop_variable = self.loopvar.next()
@@ -72,12 +80,12 @@
pass
def format_term_for_query(self, model, term, django=False):
- if term.startswith(model + '.'):
+ if term.startswith(model + "."):
term = term[len(model) + 1:]
if django:
- term = term.replace('.', '__')
+ term = term.replace(".", "__")
else:
- term = '__elt' + '.' + term
+ term = "__elt" + "." + term
return term
def fol_to_python_filter(self, model, e, django=False, negate=False):
@@ -89,109 +97,114 @@
if django:
if negate:
# De Morgan's negation
- q_bracket = '~Q(%s)'
- or_expr = ','
- and_expr = '|'
+ q_bracket = "~Q(%s)"
+ or_expr = ","
+ and_expr = "|"
else:
- q_bracket = 'Q(%s)'
- or_expr = '|'
- and_expr = ','
+ q_bracket = "Q(%s)"
+ or_expr = "|"
+ and_expr = ","
else:
if negate:
# De Morgan's negation
- q_bracket = 'not %s'
- or_expr = ' and '
- and_expr = ' or '
+ q_bracket = "not %s"
+ or_expr = " and "
+ and_expr = " or "
else:
- q_bracket = '%s'
- or_expr = ' or '
- and_expr = ' and '
+ q_bracket = "%s"
+ or_expr = " or "
+ and_expr = " and "
- if k in ['=','in']:
- v = [self.format_term_for_query(
- model, term, django=django) for term in v]
+ if k in ["=", "in"]:
+ v = [self.format_term_for_query(model, term, django=django) for term in v]
if django:
- operator_map = {'=':' = ','in':'__in'}
+ operator_map = {"=": " = ", "in": "__in"}
else:
- operator_map = {'=':' == ','in':'in'}
+ operator_map = {"=": " == ", "in": "in"}
operator = operator_map[k]
return [q_bracket % operator.join(v)]
- elif k == '|':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
+ elif k == "|":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
return [or_expr.join(components)]
- elif k == '&':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
+ elif k == "&":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
return [and_expr.join(components)]
- elif k == '->':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
- return ['~%s | %s' % (components[0], components[1])]
+ elif k == "->":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
+ return ["~%s | %s" % (components[0], components[1])]
""" Convert a single leaf node from a string
to an AST"""
+
def str_to_ast(self, s):
ast_module = ast.parse(s)
return ast_module.body[0]
def reduce_operands(self, operands):
- if operands[0] in ['True','False']:
- return (operands[0],operands[1])
- elif operands[1] in ['True','False']:
- return (operands[1],operands[0])
+ if operands[0] in ["True", "False"]:
+ return (operands[0], operands[1])
+ elif operands[1] in ["True", "False"]:
+ return (operands[1], operands[0])
else:
return None
""" Simplify binops with constants """
+
def simplify_binop(self, binop):
- (k,v), = binop.items()
- if k == '->':
+ (k, v), = binop.items()
+ if k == "->":
lhs, rhs = v
- if lhs == 'True':
+ if lhs == "True":
return rhs
- elif rhs == 'True':
- return 'True'
- elif lhs == 'False':
- return 'True'
- elif rhs == 'False':
- return {'not': lhs}
+ elif rhs == "True":
+ return "True"
+ elif lhs == "False":
+ return "True"
+ elif rhs == "False":
+ return {"not": lhs}
var_expr = self.reduce_operands(v)
- if not var_expr: return binop
+ if not var_expr:
+ return binop
else:
constant, var = var_expr
- if k=='|':
- if constant=='True':
- return 'True'
- elif constant=='False':
+ if k == "|":
+ if constant == "True":
+ return "True"
+ elif constant == "False":
return var
else:
raise Exception("Internal error - variable read as constant")
- elif k=='&':
- if constant=='True':
+ elif k == "&":
+ if constant == "True":
return var
- elif constant=='False':
- return 'False'
+ elif constant == "False":
+ return "False"
def is_constant(self, var, fol):
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
-
- if k in ['python', 'policy']:
- # Treat as a constant and hoist, since it cannot be quantified
- return True
- elif k == 'term':
+
+ if k in ["python", "policy"]:
+ # Treat as a constant and hoist, since it cannot be quantified
+ return True
+ elif k == "term":
return not v.startswith(var)
- elif k == 'not':
+ elif k == "not":
return self.is_constant(var, fol)
- elif k in ['in', '=']:
+ elif k in ["in", "="]:
lhs, rhs = v
- return self.is_constant(var,lhs) and self.is_constant(var, rhs)
+ return self.is_constant(var, lhs) and self.is_constant(var, rhs)
elif k in BINOPS:
lhs, rhs = v
return self.is_constant(lhs, var) and self.is_constant(rhs, var)
@@ -205,21 +218,21 @@
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k in ['python', 'policy']:
- # Treat as a constant and hoist, since it cannot be quantified
- if fol not in constants:
- constants.append(fol)
- return constants
- elif k == 'term':
- if not v.startswith(var):
- constants.append(v)
- return constants
- elif k == 'not':
+ if k in ["python", "policy"]:
+ # Treat as a constant and hoist, since it cannot be quantified
+ if fol not in constants:
+ constants.append(fol)
+ return constants
+ elif k == "term":
+ if not v.startswith(var):
+ constants.append(v)
+ return constants
+ elif k == "not":
return self.find_constants(var, v, constants)
- elif k in ['in', '=']:
+ elif k in ["in", "="]:
lhs, rhs = v
if isinstance(lhs, str) and isinstance(rhs, str):
if not lhs.startswith(var) and not rhs.startswith(var):
@@ -235,32 +248,34 @@
return constants
elif k in QUANTS:
is_constant = self.is_constant(var, v[1])
- if is_constant: constants.append(fol)
+ if is_constant:
+ constants.append(fol)
return constants
else:
raise ConstructNotHandled(k)
""" Hoist constants out of quantifiers. Depth-first. """
+
def hoist_outer(self, fol):
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k in ['python', 'policy']:
- # Tainted, optimization and distribution not possible
- return fol
- elif k == 'term':
- return fol
- elif k == 'not':
+ if k in ["python", "policy"]:
+ # Tainted, optimization and distribution not possible
+ return fol
+ elif k == "term":
+ return fol
+ elif k == "not":
vprime = self.hoist_outer(v)
- return {'not': vprime}
- elif k in ['in', '=']:
+ return {"not": vprime}
+ elif k in ["in", "="]:
lhs, rhs = v
rlhs = self.hoist_outer(lhs)
rrhs = self.hoist_outer(rhs)
- return {k:[rlhs,rrhs]}
+ return {k: [rlhs, rrhs]}
elif k in BINOPS:
lhs, rhs = v
rlhs = self.hoist_outer(lhs)
@@ -271,7 +286,7 @@
return fol_simplified
elif k in QUANTS:
rexpr = self.hoist_outer(v[1])
- return self.hoist_quant(k, [v[0],rexpr])
+ return self.hoist_quant(k, [v[0], rexpr])
else:
raise ConstructNotHandled(k)
@@ -282,27 +297,29 @@
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k == 'term':
- if v == c: return value
- else: return v
- elif k == 'not':
+ if k == "term":
+ if v == c:
+ return value
+ else:
+ return v
+ elif k == "not":
new_expr = self.replace_const(v, c, value)
- if new_expr=='True':
- return 'False'
- elif new_expr=='False':
- return 'True'
- else:
- return {'not': new_expr}
- elif k in ['in', '=']:
+ if new_expr == "True":
+ return "False"
+ elif new_expr == "False":
+ return "True"
+ else:
+ return {"not": new_expr}
+ elif k in ["in", "="]:
lhs, rhs = v
rlhs = self.replace_const(lhs, c, value)
rrhs = self.replace_const(rhs, c, value)
- if rlhs==rrhs:
- return 'True'
+ if rlhs == rrhs:
+ return "True"
else:
return {k: [rlhs, rrhs]}
elif k in BINOPS:
@@ -310,12 +327,12 @@
rlhs = self.replace_const(lhs, c, value)
rrhs = self.replace_const(rhs, c, value)
-
- return self.simplify_binop({k:[rlhs,rrhs]})
+
+ return self.simplify_binop({k: [rlhs, rrhs]})
elif k in QUANTS:
var, expr = v
new_expr = self.replace_const(expr, c, value)
- if new_expr in ['True', 'False']:
+ if new_expr in ["True", "False"]:
return new_expr
else:
return {k: [var, new_expr]}
@@ -323,16 +340,16 @@
raise ConstructNotHandled(k)
def shannon_expand(self, c, fol):
- lhs = self.replace_const(fol, c, 'True')
- rhs = self.replace_const(fol, c, 'False')
- not_c = {'not': c}
- rlhs = {'&': [c, lhs]}
+ lhs = self.replace_const(fol, c, "True")
+ rhs = self.replace_const(fol, c, "False")
+ not_c = {"not": c}
+ rlhs = {"&": [c, lhs]}
rlhs = self.simplify_binop(rlhs)
- rrhs = {'&': [not_c, rhs]}
+ rrhs = {"&": [not_c, rhs]}
rrhs = self.simplify_binop(rrhs)
- combined = {'|': [rlhs, rrhs]}
+ combined = {"|": [rlhs, rrhs]}
return self.simplify_binop(combined)
def hoist_quant(self, k, expr):
@@ -418,17 +435,24 @@
if not tag:
tag = gen_random_string()
- policy_function_name_template = 'policy_%s_' + '%(random_string)s' % {'random_string': tag}
+ policy_function_name_template = "policy_%s_" + "%(random_string)s" % {
+ "random_string": tag
+ }
policy_function_name = policy_function_name_template % policy_name
self.verdict_next()
function_str = """
def %(fn_name)s(obj, ctx):
if not %(vvar)s: raise XOSValidationError("%(message)s".format(obj=obj, ctx=ctx))
- """ % {'fn_name': policy_function_name, 'vvar': self.verdict_variable, 'message': message}
+ """ % {
+ "fn_name": policy_function_name,
+ "vvar": self.verdict_variable,
+ "message": message,
+ }
function_ast = self.str_to_ast(function_str)
- policy_code = self.gen_test(policy_function_name_template, fol, self.verdict_variable)
-
+ policy_code = self.gen_test(
+ policy_function_name_template, fol, self.verdict_variable
+ )
function_ast.body = [policy_code] + function_ast.body
@@ -438,17 +462,24 @@
if not tag:
tag = gen_random_string()
- policy_function_name_template = '%s_' + '%(random_string)s' % {'random_string': tag}
+ policy_function_name_template = "%s_" + "%(random_string)s" % {
+ "random_string": tag
+ }
policy_function_name = policy_function_name_template % policy_name
self.verdict_next()
function_str = """
def %(fn_name)s(obj, ctx):
return %(vvar)s
- """ % {'fn_name': policy_function_name, 'vvar': self.verdict_variable}
+ """ % {
+ "fn_name": policy_function_name,
+ "vvar": self.verdict_variable,
+ }
function_ast = self.str_to_ast(function_str)
- policy_code = self.gen_test(policy_function_name_template, fol, self.verdict_variable)
+ policy_code = self.gen_test(
+ policy_function_name_template, fol, self.verdict_variable
+ )
function_ast.body = [policy_code] + function_ast.body
@@ -456,11 +487,14 @@
def gen_test(self, fn_template, fol, verdict_var, bindings=None):
if isinstance(fol, str):
- return self.str_to_ast('%(verdict_var)s = %(constant)s' % {'verdict_var': verdict_var, 'constant': fol})
+ return self.str_to_ast(
+ "%(verdict_var)s = %(constant)s"
+ % {"verdict_var": verdict_var, "constant": fol}
+ )
(k, v), = fol.items()
- if k == 'policy':
+ if k == "policy":
policy_name, object_name = v
policy_fn = fn_template % policy_name
@@ -470,39 +504,48 @@
else:
# Everybody has access to null objects
%(verdict_var)s = True
- """ % {'verdict_var': verdict_var, 'policy_fn': policy_fn, 'object_name': object_name}
+ """ % {
+ "verdict_var": verdict_var,
+ "policy_fn": policy_fn,
+ "object_name": object_name,
+ }
call_ast = self.str_to_ast(call_str)
return call_ast
- if k == 'python':
+ if k == "python":
try:
expr_ast = self.str_to_ast(v)
except SyntaxError:
- raise PolicyException('Syntax error in %s' % v)
+ raise PolicyException("Syntax error in %s" % v)
if not isinstance(expr_ast, ast.Expr):
- raise PolicyException(
- '%s is not an expression' % expr_ast)
+ raise PolicyException("%s is not an expression" % expr_ast)
assignment_str = """
%(verdict_var)s = (%(escape_expr)s)
- """ % {'verdict_var': verdict_var, 'escape_expr': v}
+ """ % {
+ "verdict_var": verdict_var,
+ "escape_expr": v,
+ }
assignment_ast = self.str_to_ast(assignment_str)
return assignment_ast
- elif k == 'not':
+ elif k == "not":
top_vvar = verdict_var
self.verdict_next()
sub_vvar = self.verdict_variable
block = self.gen_test(fn_template, v, sub_vvar)
assignment_str = """
%(verdict_var)s = not (%(subvar)s)
- """ % {'verdict_var': top_vvar, 'subvar': sub_vvar}
+ """ % {
+ "verdict_var": top_vvar,
+ "subvar": sub_vvar,
+ }
assignment_ast = self.str_to_ast(assignment_str)
return ast.Module(body=[block, assignment_ast])
- elif k in ['=','in']:
+ elif k in ["=", "in"]:
# This is the simplest case, we don't recurse further
# To use terms that are not simple variables, use
# the Python escape, e.g. {{ slice.creator is not None }}
@@ -512,7 +555,7 @@
try:
for t in lhs, rhs:
- py_expr = t['python']
+ py_expr = t["python"]
self.verdict_next()
vv = self.verdict_variable
@@ -520,15 +563,17 @@
try:
expr_ast = self.str_to_ast(py_expr)
except SyntaxError:
- raise PolicyException('Syntax error in %s' % v)
+ raise PolicyException("Syntax error in %s" % v)
if not isinstance(expr_ast, ast.Expr):
- raise PolicyException(
- '%s is not an expression' % expr_ast)
+ raise PolicyException("%s is not an expression" % expr_ast)
assignment_str = """
%(verdict_var)s = (%(escape_expr)s)
- """ % {'verdict_var': vv, 'escape_expr': py_expr}
+ """ % {
+ "verdict_var": vv,
+ "escape_expr": py_expr,
+ }
if t == lhs:
lhs = vv
@@ -540,14 +585,19 @@
except TypeError:
pass
- if k=='=':
- operator='=='
- elif k=='in':
- operator='in'
+ if k == "=":
+ operator = "=="
+ elif k == "in":
+ operator = "in"
comparison_str = """
%(verdict_var)s = (%(lhs)s %(operator)s %(rhs)s)
- """ % {'verdict_var': verdict_var, 'lhs': lhs, 'rhs': rhs, 'operator':operator}
+ """ % {
+ "verdict_var": verdict_var,
+ "lhs": lhs,
+ "rhs": rhs,
+ "operator": operator,
+ }
comparison_ast = self.str_to_ast(comparison_str)
combined_ast = ast.Module(body=assignments + [comparison_ast])
@@ -567,24 +617,30 @@
lblock = self.gen_test(fn_template, lhs, lvar)
rblock = self.gen_test(fn_template, rhs, rvar)
- invert = ''
- if k == '&':
- binop = 'and'
- elif k == '|':
- binop = 'or'
- elif k == '->':
- binop = 'or'
- invert = 'not'
+ invert = ""
+ if k == "&":
+ binop = "and"
+ elif k == "|":
+ binop = "or"
+ elif k == "->":
+ binop = "or"
+ invert = "not"
binop_str = """
%(verdict_var)s = %(invert)s %(lvar)s %(binop)s %(rvar)s
- """ % {'verdict_var': top_vvar, 'invert': invert, 'lvar': lvar, 'binop': binop, 'rvar': rvar}
+ """ % {
+ "verdict_var": top_vvar,
+ "invert": invert,
+ "lvar": lvar,
+ "binop": binop,
+ "rvar": rvar,
+ }
binop_ast = self.str_to_ast(binop_str)
combined_ast = ast.Module(body=[lblock, rblock, binop_ast])
return combined_ast
- elif k == 'exists':
+ elif k == "exists":
# If the variable starts with a capital letter,
# we assume that it is a model. If it starts with
# a small letter, we assume it is an enumerable
@@ -599,7 +655,11 @@
python_str = """
%(verdict_var)s = not not %(model)s.objects.filter(%(query)s)
- """ % {'verdict_var': verdict_var, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": verdict_var,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
else:
@@ -608,16 +668,20 @@
python_str = """
%(verdict_var)s = filter(lambda __elt:%(query)s, %(model)s)
- """ % {'verdict_var': verdict_var, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": verdict_var,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
return python_ast
- elif k=='forall':
+ elif k == "forall":
var, expr = v
if var.istitle():
- f = self.fol_to_python_filter(var, expr, django=True, negate = True)
+ f = self.fol_to_python_filter(var, expr, django=True, negate=True)
entry = f.pop()
self.verdict_next()
@@ -625,71 +689,95 @@
python_str = """
%(verdict_var)s = not not %(model)s.objects.filter(%(query)s)
- """ % {'verdict_var': vvar, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": vvar,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
else:
- f = self.fol_to_python_filter(var, expr, django=False, negate = True)
+ f = self.fol_to_python_filter(var, expr, django=False, negate=True)
entry = f.pop()
python_str = """
%(verdict_var)s = next(elt for elt in %(model)s if %(query)s)
- """ % {'verdict_var': vvar, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": vvar,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
negate_str = """
%(verdict_var)s = not %(vvar)s
- """ % {'verdict_var': verdict_var, 'vvar': vvar}
+ """ % {
+ "verdict_var": verdict_var,
+ "vvar": vvar,
+ }
negate_ast = ast.parse(negate_str)
return ast.Module(body=[python_ast, negate_ast])
+
def xproto_fol_to_python_test(policy, fol, model, tag=None):
if isinstance(fol, jinja2.Undefined):
- raise Exception('Could not find policy:', policy)
+ raise Exception("Could not find policy:", policy)
f2p = FOL2Python()
fol_reduced = f2p.hoist_outer(fol)
- if fol_reduced in ['True','False'] and fol != fol_reduced:
- raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+ if fol_reduced in ["True", "False"] and fol != fol_reduced:
+ raise TrivialPolicy(
+ "Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s" % {
+ "name": policy,
+ "reduced": fol_reduced})
- a = f2p.gen_test_function(fol_reduced, policy, tag='security_check')
+ a = f2p.gen_test_function(fol_reduced, policy, tag="security_check")
return astunparse.unparse(a)
+
def xproto_fol_to_python_validator(policy, fol, model, message, tag=None):
if isinstance(fol, jinja2.Undefined):
- raise Exception('Could not find policy:', policy)
+ raise Exception("Could not find policy:", policy)
f2p = FOL2Python()
fol_reduced = f2p.hoist_outer(fol)
- if fol_reduced in ['True','False'] and fol != fol_reduced:
- raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+ if fol_reduced in ["True", "False"] and fol != fol_reduced:
+ raise TrivialPolicy(
+ "Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s" % {
+ "name": policy,
+ "reduced": fol_reduced})
- a = f2p.gen_validation_function(fol_reduced, policy, message, tag='validator')
-
+ a = f2p.gen_validation_function(fol_reduced, policy, message, tag="validator")
+
return astunparse.unparse(a)
+
def main():
while True:
- inp = ''
+ inp = ""
while True:
inp_line = raw_input()
- if inp_line=='EOF': break
- else: inp+=inp_line
-
+ if inp_line == "EOF":
+ break
+ else:
+ inp += inp_line
+
fol_lexer = lex.lex(module=FOLLexer())
- fol_parser = yacc.yacc(module=FOLParser(), start='goal', outputdir='/tmp', debug=0)
+ fol_parser = yacc.yacc(
+ module=FOLParser(), start="goal", outputdir="/tmp", debug=0
+ )
val = fol_parser.parse(inp, lexer=fol_lexer)
- a = xproto_fol_to_python_test('pol', val, 'output', 'Test')
- #f2p = FOL2Python()
- #a = f2p.hoist_outer(val)
- print a
+ a = xproto_fol_to_python_test("pol", val, "output", "Test")
+ # f2p = FOL2Python()
+ # a = f2p.hoist_outer(val)
+ print(a)
if __name__ == "__main__":
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/gui.py b/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
index 50bcf0e..245bbda 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,74 +15,78 @@
from base import xproto_string_type, unquote
+
def xproto_type_to_ui_type(f):
try:
- content_type = f['options']['content_type']
+ content_type = f["options"]["content_type"]
content_type = eval(content_type)
- except:
+ except BaseException:
content_type = None
pass
- if 'choices' in f['options']:
- return 'select';
- elif content_type == 'date':
- return 'date'
- elif f['type'] == 'bool':
- return 'boolean'
- elif f['type'] == 'string':
- return xproto_string_type(f['options'])
- elif f['type'] in ['int','uint32','int32'] or 'link' in f:
- return 'number'
- elif f['type'] in ['double','float']:
- return 'string'
+ if "choices" in f["options"]:
+ return "select"
+ elif content_type == "date":
+ return "date"
+ elif f["type"] == "bool":
+ return "boolean"
+ elif f["type"] == "string":
+ return xproto_string_type(f["options"])
+ elif f["type"] in ["int", "uint32", "int32"] or "link" in f:
+ return "number"
+ elif f["type"] in ["double", "float"]:
+ return "string"
+
def xproto_options_choices_to_dict(choices):
list = []
for c in eval(choices):
- list.append({'id': c[0], 'label': c[1]})
+ list.append({"id": c[0], "label": c[1]})
if len(list) > 0:
return list
else:
return None
+
def xproto_validators(f):
# To be cleaned up when we formalize validation in xproto
validators = []
# bound-based validators
- bound_validators = [('max_length','maxlength'), ('min', 'min'), ('max', 'max')]
+ bound_validators = [("max_length", "maxlength"), ("min", "min"), ("max", "max")]
for v0, v1 in bound_validators:
try:
- validators.append({'name':v1, 'int_value':int(f['options'][v0])})
+ validators.append({"name": v1, "int_value": int(f["options"][v0])})
except KeyError:
pass
# validators based on content_type
- content_type_validators = ['ip', 'url', 'email']
+ content_type_validators = ["ip", "url", "email"]
for v in content_type_validators:
- #if f['name']=='ip': pdb.set_trace()
+ # if f['name']=='ip': pdb.set_trace()
try:
- val = unquote(f['options']['content_type'])==v
+ val = unquote(f["options"]["content_type"]) == v
if not val:
raise KeyError
- validators.append({'name':v, 'bool_value': True})
+ validators.append({"name": v, "bool_value": True})
except KeyError:
pass
# required validator
try:
- required = f['options']['blank']=='False' and f['options']['null']=='False'
+ required = f["options"]["blank"] == "False" and f["options"]["null"] == "False"
if required:
- validators.append({'name':'required', 'bool_value':required})
+ validators.append({"name": "required", "bool_value": required})
except KeyError:
pass
return validators
+
def is_number(s):
try:
float(s)
@@ -91,16 +94,17 @@
except ValueError:
return False
+
def xproto_default_to_gui(default):
val = "null"
if is_number(default):
val = str(default)
- elif eval(default) == True:
- val = 'true'
- elif eval(default) == False:
- val = 'false'
- elif eval(default) == None:
- val = 'null'
+ elif eval(default) is True:
+ val = "true"
+ elif eval(default) is False:
+ val = "false"
+ elif eval(default) is None:
+ val = "null"
else:
val = str(default)
return val
@@ -111,17 +115,20 @@
seen = []
for l in llst:
try:
- t = l['link_type']
- except KeyError, e:
+ t = l["link_type"]
+ except KeyError as e:
raise e
- if l['peer']['fqn'] not in seen and t != 'manytomany':
- on_field = 'null'
- if l['link_type'] == 'manytoone':
- on_field = l['src_port']
- elif l['link_type'] == 'onetomany':
- on_field = l['dst_port']
- outlist.append('- {model: %s, type: %s, on_field: %s}\n' % (l['peer']['name'], l['link_type'], on_field))
- seen.append(l['peer'])
+ if l["peer"]["fqn"] not in seen and t != "manytomany":
+ on_field = "null"
+ if l["link_type"] == "manytoone":
+ on_field = l["src_port"]
+ elif l["link_type"] == "onetomany":
+ on_field = l["dst_port"]
+ outlist.append(
+ "- {model: %s, type: %s, on_field: %s}\n"
+ % (l["peer"]["name"], l["link_type"], on_field)
+ )
+ seen.append(l["peer"])
return outlist
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py b/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
index 996d63d..d8eada7 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
@@ -14,12 +14,14 @@
from xosgenx.jinja2_extensions import xproto_field_graph_components
+
def xproto_tosca_required(null, blank, default=None):
- if null == 'True' or blank == 'True' or default != 'False':
+ if null == "True" or blank == "True" or default != "False":
return "false"
return "true"
+
def xproto_tosca_field_type(type):
"""
TOSCA requires fields of type 'bool' to be 'boolean'
@@ -33,30 +35,41 @@
else:
return type
+
def xproto_fields_to_tosca_keys(fields, m):
keys = []
# look for one_of keys
- _one_of = xproto_field_graph_components(fields, m, 'tosca_key_one_of')
+ _one_of = xproto_field_graph_components(fields, m, "tosca_key_one_of")
one_of = [list(i) for i in _one_of]
# look for explicit keys
for f in fields:
- if 'tosca_key' in f['options'] and f['options']['tosca_key'] and 'link' not in f:
- keys.append(f['name'])
- if 'tosca_key' in f['options'] and f['options']['tosca_key'] and ('link' in f and f['link']):
- keys.append("%s_id" % f['name'])
+ if (
+ "tosca_key" in f["options"]
+ and f["options"]["tosca_key"]
+ and "link" not in f
+ ):
+ keys.append(f["name"])
+ if (
+ "tosca_key" in f["options"]
+ and f["options"]["tosca_key"]
+ and ("link" in f and f["link"])
+ ):
+ keys.append("%s_id" % f["name"])
# if not keys are specified and there is a name field, use that as key.
- if len(keys) == 0 and 'name' in map(lambda f: f['name'], fields):
- keys.append('name')
+ if len(keys) == 0 and "name" in map(lambda f: f["name"], fields):
+ keys.append("name")
for of in one_of:
# check if the field is a link, and in case add _id
for index, f in enumerate(of):
try:
- field = [x for x in fields if x['name'] == f and ('link' in x and x['link'])][0]
+ field = [
+ x for x in fields if x["name"] == f and ("link" in x and x["link"])
+ ][0]
of[index] = "%s_id" % f
- except IndexError, e:
+ except IndexError as e:
# the field is not a link
pass
diff --git a/lib/xos-genx/xosgenx/proto2xproto.py b/lib/xos-genx/xosgenx/proto2xproto.py
index d2bde67..f1dc959 100644
--- a/lib/xos-genx/xosgenx/proto2xproto.py
+++ b/lib/xos-genx/xosgenx/proto2xproto.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,78 +26,90 @@
import ply.lex as lex
import ply.yacc as yacc
+
class Stack(list):
- def push(self,x):
+ def push(self, x):
self.append(x)
+
def str_to_dict(s):
- lst = s.rsplit('.',1)
+ lst = s.rsplit(".", 1)
name = lst[-1]
- if len(lst)==2:
+ if len(lst) == 2:
package = lst[0]
else:
- package = ''
+ package = ""
- return {'name': name, 'package': package, 'fqn': s}
+ return {"name": name, "package": package, "fqn": s}
def replace_link(obj):
+ try:
+ link = obj.link
try:
- link = obj.link
- try:
- through = link['through']
- except KeyError:
- through = None
+ through = link["through"]
+ except KeyError:
+ through = None
- try:
- through_str = through[1:-1]
- except TypeError:
- through_str = None
+ try:
+ through_str = through[1:-1]
+ except TypeError:
+ through_str = None
- if through_str:
- through_dict = str_to_dict(through_str)
- else:
- through_dict = {}
+ if through_str:
+ through_dict = str_to_dict(through_str)
+ else:
+ through_dict = {}
- model_dict = str_to_dict(link['model'][1:-1])
+ model_dict = str_to_dict(link["model"][1:-1])
- ls = m.LinkSpec(obj, m.LinkDefinition(link['link'][1:-1],obj.name,model_dict,link['port'][1:-1],through_dict))
- return ls
- except:
- return obj
+ ls = m.LinkSpec(
+ obj,
+ m.LinkDefinition(
+ link["link"][1:-1],
+ obj.name,
+ model_dict,
+ link["port"][1:-1],
+ through_dict,
+ ),
+ )
+ return ls
+ except BaseException:
+ return obj
+
class Proto2XProto(Visitor):
fol_lexer = lex.lex(module=FOLLexer())
- fol_parser = yacc.yacc(module=FOLParser(), start='goal', debug=0, outputdir='/tmp')
+ fol_parser = yacc.yacc(module=FOLParser(), start="goal", debug=0, outputdir="/tmp")
def __init__(self):
super(Proto2XProto, self).__init__()
self.stack = Stack()
self.count_stack = Stack()
- self.content=""
- self.offset=0
- self.statementsChanged=0
+ self.content = ""
+ self.offset = 0
+ self.statementsChanged = 0
self.message_options = {}
self.options = {}
self.current_message_name = None
- self.xproto_message_options = ['bases']
- self.xproto_field_options = ['model']
+ self.xproto_message_options = ["bases"]
+ self.xproto_field_options = ["model"]
self.verbose = 0
self.first_field = True
self.first_method = True
-
+
def replace_policy(self, obj):
if isinstance(obj, m.OptionStatement):
rhs = obj.value.value.pval
if rhs.startswith('"') and rhs.endswith('"'):
rhs = rhs[1:-1]
- if rhs.startswith('policy:'):
- str = rhs.split(':',1)[1]
- val = self.fol_parser.parse(str, lexer = self.fol_lexer)
+ if rhs.startswith("policy:"):
+ str = rhs.split(":", 1)[1]
+ val = self.fol_parser.parse(str, lexer=self.fol_lexer)
return m.PolicyDefinition(obj.name, val)
@@ -110,9 +121,9 @@
for fd in obj.fieldDirective:
k = fd.pval.name.value.pval
v = fd.pval.value.value.pval
- opts[k]=v
+ opts[k] = v
- if ('model' in opts and 'link' in opts and 'port' in opts):
+ if "model" in opts and "link" in opts and "port" in opts:
obj.link = opts
pass
except KeyError:
@@ -121,41 +132,42 @@
def proto_to_xproto_message(self, obj):
try:
try:
- bases = self.message_options['bases'].split(',')
+ bases = self.message_options["bases"].split(",")
except KeyError:
bases = []
- bases = map(lambda x:str_to_dict(x[1:-1]), bases)
+ bases = map(lambda x: str_to_dict(x[1:-1]), bases)
obj.bases = bases
except KeyError:
raise
def map_field(self, obj, s):
- if 'model' in s:
- link = m.LinkDefinition('onetoone','src','name','dst', obj.linespan, obj.lexspan, obj.p)
+ if "model" in s:
+ link = m.LinkDefinition(
+ "onetoone", "src", "name", "dst", obj.linespan, obj.lexspan, obj.p
+ )
lspec = m.LinkSpec(link, obj)
else:
lspec = obj
return lspec
-
def get_stack(self):
return self.stack
def visit_PackageStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_ImportStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_OptionStatement(self, obj):
- if (self.current_message_name):
+ if self.current_message_name:
k = obj.name.value.pval
self.message_options[k] = obj.value.value.pval
- if (k in self.xproto_message_options):
- obj.mark_for_deletion = True
+ if k in self.xproto_message_options:
+ obj.mark_for_deletion = True
else:
self.options[obj.name.value.pval] = obj.value.value.pval
@@ -197,10 +209,10 @@
self.message_options = {}
return True
-
+
def visit_MessageDefinition_post(self, obj):
self.proto_to_xproto_message(obj)
- obj.body = filter(lambda x:not hasattr(x, 'mark_for_deletion'), obj.body)
+ obj.body = filter(lambda x: not hasattr(x, "mark_for_deletion"), obj.body)
obj.body = map(replace_link, obj.body)
self.current_message_name = None
@@ -230,7 +242,7 @@
def visit_Proto(self, obj):
self.count_stack.push(len(obj.body))
return True
-
+
def visit_Proto_post(self, obj):
obj.body = [self.replace_policy(o) for o in obj.body]
diff --git a/lib/xos-genx/xosgenx/version.py b/lib/xos-genx/xosgenx/version.py
index a118c43..79c5702 100644
--- a/lib/xos-genx/xosgenx/version.py
+++ b/lib/xos-genx/xosgenx/version.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/lib/xos-genx/xosgenx/xos2jinja.py b/lib/xos-genx/xosgenx/xos2jinja.py
index aca2468..471be93 100644
--- a/lib/xos-genx/xosgenx/xos2jinja.py
+++ b/lib/xos-genx/xosgenx/xos2jinja.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import plyxproto.model as m
from plyxproto.helpers import Visitor
import argparse
@@ -25,15 +25,20 @@
import copy
import pdb
+
class MissingPolicyException(Exception):
pass
+
def find_missing_policy_calls(name, policies, policy):
if isinstance(policy, dict):
(k, lst), = policy.items()
- if k=='policy':
+ if k == "policy":
policy_name = lst[0]
- if policy_name not in policies: raise MissingPolicyException("Policy %s invoked missing policy %s"%(name, policy_name))
+ if policy_name not in policies:
+ raise MissingPolicyException(
+ "Policy %s invoked missing policy %s" % (name, policy_name)
+ )
else:
for p in lst:
find_missing_policy_calls(name, policies, p)
@@ -41,26 +46,27 @@
for p in lst:
find_missing_policy_calls(name, policies, p)
+
def dotname_to_fqn(dotname):
b_names = [part.pval for part in dotname]
- package = '.'.join(b_names[:-1])
+ package = ".".join(b_names[:-1])
name = b_names[-1]
if package:
- fqn = package + '.' + name
+ fqn = package + "." + name
else:
fqn = name
- return {'name': name, 'fqn': fqn, 'package': package}
+ return {"name": name, "fqn": fqn, "package": package}
def dotname_to_name(dotname):
b_names = [part.pval for part in dotname]
- return '.'.join(b_names)
+ return ".".join(b_names)
def count_messages(body):
count = 0
for e in body:
- if (type(e) == m.MessageDefinition):
+ if isinstance(e, m.MessageDefinition):
count += 1
return count
@@ -68,7 +74,7 @@
def count_fields(body):
count = 0
for e in body:
- if (type(e) in [m.LinkDefinition, m.FieldDefinition, m.LinkSpec]):
+ if type(e) in [m.LinkDefinition, m.FieldDefinition, m.LinkSpec]:
count += 1
return count
@@ -90,8 +96,9 @@
self.append(x)
-''' XOS2Jinja overrides the underlying visitor pattern to transform the tree
- in addition to traversing it '''
+""" XOS2Jinja overrides the underlying visitor pattern to transform the tree
+ in addition to traversing it """
+
class XOS2Jinja(Visitor):
def __init__(self, args):
@@ -114,7 +121,7 @@
def visit_PolicyDefinition(self, obj):
if self.package:
- pname = '.'.join([self.package, obj.name.value.pval])
+ pname = ".".join([self.package, obj.name.value.pval])
else:
pname = obj.name.value.pval
@@ -126,17 +133,17 @@
def visit_PackageStatement(self, obj):
dotlist = obj.name.value
dotlist2 = [f.pval for f in dotlist]
- dotstr = '.'.join(dotlist2)
+ dotstr = ".".join(dotlist2)
self.package = dotstr
return True
def visit_ImportStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_OptionStatement(self, obj):
- if not hasattr(obj, 'mark_for_deletion'):
- if (self.current_message_name):
+ if not hasattr(obj, "mark_for_deletion"):
+ if self.current_message_name:
self.message_options[obj.name.value.pval] = obj.value.value.pval
else:
self.options[obj.name.value.pval] = obj.value.value.pval
@@ -159,7 +166,7 @@
except AttributeError:
name = obj.name.value
- if type(obj.value) == list:
+ if isinstance(obj.value, list):
value = dotname_to_name(obj.value)
else:
value = name_to_value(obj)
@@ -168,52 +175,52 @@
return True
def visit_FieldType(self, obj):
- '''Field type, if type is name, then it may need refactoring consistent with refactoring rules according to the table'''
+ """Field type, if type is name, then it may need refactoring consistent with refactoring rules according to the table"""
return True
def visit_LinkDefinition(self, obj):
s = {}
try:
- s['link_type'] = obj.link_type.pval
+ s["link_type"] = obj.link_type.pval
except AttributeError:
- s['link_type'] = obj.link_type
+ s["link_type"] = obj.link_type
- s['src_port'] = obj.src_port.value.pval
- s['name'] = obj.src_port.value.pval
+ s["src_port"] = obj.src_port.value.pval
+ s["name"] = obj.src_port.value.pval
try:
- s['policy'] = obj.policy.pval
+ s["policy"] = obj.policy.pval
except AttributeError:
- s['policy'] = None
+ s["policy"] = None
try:
- s['dst_port'] = obj.dst_port.value.pval
+ s["dst_port"] = obj.dst_port.value.pval
except AttributeError:
- s['dst_port'] = obj.dst_port
+ s["dst_port"] = obj.dst_port
- if type(obj.through) == list:
- s['through'] = dotname_to_fqn(obj.through)
+ if isinstance(obj.through, list):
+ s["through"] = dotname_to_fqn(obj.through)
else:
try:
- s['through'] = obj.through.pval
+ s["through"] = obj.through.pval
except AttributeError:
- s['through'] = obj.through
+ s["through"] = obj.through
- if type(obj.name) == list:
- s['peer'] = dotname_to_fqn(obj.name)
+ if isinstance(obj.name, list):
+ s["peer"] = dotname_to_fqn(obj.name)
else:
try:
- s['peer'] = obj.name.pval
+ s["peer"] = obj.name.pval
except AttributeError:
- s['peer'] = obj.name
+ s["peer"] = obj.name
try:
- s['reverse_id'] = obj.reverse_id.pval
+ s["reverse_id"] = obj.reverse_id.pval
except AttributeError:
- s['reverse_id'] = obj.reverse_id
+ s["reverse_id"] = obj.reverse_id
- s['_type'] = 'link'
- s['options'] = {'modifier': 'optional'}
+ s["_type"] = "link"
+ s["options"] = {"modifier": "optional"}
self.stack.push(s)
return True
@@ -226,21 +233,21 @@
s = {}
if isinstance(obj.ftype, m.Name):
- s['type'] = obj.ftype.value
+ s["type"] = obj.ftype.value
else:
- s['type'] = obj.ftype.name.pval
+ s["type"] = obj.ftype.name.pval
- s['name'] = obj.name.value.pval
+ s["name"] = obj.name.value.pval
try:
- s['policy'] = obj.policy.pval
+ s["policy"] = obj.policy.pval
except AttributeError:
- s['policy'] = None
+ s["policy"] = None
- s['modifier'] = obj.field_modifier.pval
- s['id'] = obj.fieldId.pval
+ s["modifier"] = obj.field_modifier.pval
+ s["id"] = obj.fieldId.pval
- opts = {'modifier': s['modifier']}
+ opts = {"modifier": s["modifier"]}
n = self.count_stack.pop()
for i in range(0, n):
k, v = self.stack.pop()
@@ -253,28 +260,28 @@
opts[k] = v
- s['options'] = opts
+ s["options"] = opts
try:
- last_link = self.stack[-1]['_type']
- if (last_link == 'link'):
- s['link'] = True
- except:
+ last_link = self.stack[-1]["_type"]
+ if last_link == "link":
+ s["link"] = True
+ except BaseException:
pass
- s['_type'] = 'field'
+ s["_type"] = "field"
self.stack.push(s)
return True
def visit_EnumFieldDefinition(self, obj):
if self.verbose > 4:
- print "\tEnumField: name=%s, %s" % (obj.name, obj)
+ print("\tEnumField: name=%s, %s" % (obj.name, obj))
return True
def visit_EnumDefinition(self, obj):
- '''New enum definition, refactor name'''
+ """New enum definition, refactor name"""
if self.verbose > 3:
- print "Enum, [%s] body=%s\n\n" % (obj.name, obj.body)
+ print("Enum, [%s] body=%s\n\n" % (obj.name, obj.body))
return True
@@ -297,28 +304,39 @@
last_field = {}
for i in range(0, stack_num):
f = self.stack.pop()
- if (f['_type'] == 'link'):
- f['options'] = {i: d[i] for d in [f['options'], last_field['options']] for i in d}
- assert (last_field == fields[0])
- fields[0].setdefault('options', {})['link_type'] = f['link_type']
+ if f["_type"] == "link":
+ f["options"] = {
+ i: d[i] for d in [f["options"], last_field["options"]] for i in d
+ }
+ assert last_field == fields[0]
+ fields[0].setdefault("options", {})["link_type"] = f["link_type"]
links.insert(0, f)
else:
fields.insert(0, f)
last_field = f
if self.package:
- model_name = '.'.join([self.package, obj.name.value.pval])
+ model_name = ".".join([self.package, obj.name.value.pval])
else:
model_name = obj.name.value.pval
- model_def = {'name':obj.name.value.pval,'fields':fields,'links':links, 'bases':obj.bases, 'options':self.message_options, 'package':self.package, 'fqn': model_name, 'rlinks': []}
+ model_def = {
+ "name": obj.name.value.pval,
+ "fields": fields,
+ "links": links,
+ "bases": obj.bases,
+ "options": self.message_options,
+ "package": self.package,
+ "fqn": model_name,
+ "rlinks": [],
+ }
try:
- model_def['policy'] = obj.policy.pval
+ model_def["policy"] = obj.policy.pval
except AttributeError:
- model_def['policy'] = None
+ model_def["policy"] = None
self.stack.push(model_def)
-
+
self.models[model_name] = model_def
# Set message options
@@ -382,40 +400,53 @@
rev_links = {}
link_opposite = {
- 'manytomany': 'manytomany',
- 'manytoone': 'onetomany',
- 'onetoone': 'onetoone',
- 'onetomany': 'manytoone'
+ "manytomany": "manytomany",
+ "manytoone": "onetomany",
+ "onetoone": "onetoone",
+ "onetomany": "manytoone",
}
for m in messages:
- for l in m['links']:
+ for l in m["links"]:
rlink = copy.deepcopy(l)
- rlink['_type'] = 'rlink' # An implicit link, not declared in the model
- rlink['src_port'] = l['dst_port']
- rlink['dst_port'] = l['src_port']
- rlink['peer'] = {'name': m['name'], 'package': m['package'], 'fqn': m['fqn']}
- rlink['link_type'] = link_opposite[l['link_type']]
- rlink["reverse_id"] = l['reverse_id']
+ rlink["_type"] = "rlink" # An implicit link, not declared in the model
+ rlink["src_port"] = l["dst_port"]
+ rlink["dst_port"] = l["src_port"]
+ rlink["peer"] = {
+ "name": m["name"],
+ "package": m["package"],
+ "fqn": m["fqn"],
+ }
+ rlink["link_type"] = link_opposite[l["link_type"]]
+ rlink["reverse_id"] = l["reverse_id"]
- if (not l['reverse_id']) and (self.args.verbosity >= 1):
- print >> sys.stderr, "WARNING: Field %s in model %s has no reverse_id" % (l["src_port"], m["name"])
+ if (not l["reverse_id"]) and (self.args.verbosity >= 1):
+ print(
+ "WARNING: Field %s in model %s has no reverse_id"
+ % (l["src_port"], m["name"]),
+ file=sys.stderr,
+ )
- if l["reverse_id"] and ((int(l["reverse_id"]) < 1000) or (int(l["reverse_id"]) >= 1900)):
- raise Exception("reverse id for field %s in model %s should be between 1000 and 1899" % (l["src_port"], m["name"]))
+ if l["reverse_id"] and (
+ (int(l["reverse_id"]) < 1000) or (int(l["reverse_id"]) >= 1900)
+ ):
+ raise Exception(
+ "reverse id for field %s in model %s should be between 1000 and 1899"
+ % (l["src_port"], m["name"])
+ )
try:
try:
- rev_links[l['peer']['fqn']].append(rlink)
+ rev_links[l["peer"]["fqn"]].append(rlink)
except TypeError:
pass
except KeyError:
- rev_links[l['peer']['fqn']] = [rlink]
+ rev_links[l["peer"]["fqn"]] = [rlink]
for m in messages:
try:
- m['rlinks'] = rev_links[m['name']]
- message_dict[m['name']]['rlinks'] = m['rlinks']
+ m["rlinks"] = rev_links[m["name"]]
+ message_dict[m["name"]]["rlinks"] = m["rlinks"]
except KeyError:
pass
diff --git a/lib/xos-genx/xosgenx/xosgen.py b/lib/xos-genx/xosgenx/xosgen.py
index 27a7fb4..1318242 100755
--- a/lib/xos-genx/xosgenx/xosgen.py
+++ b/lib/xos-genx/xosgenx/xosgen.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,49 +15,116 @@
#!/usr/bin/python
+from __future__ import print_function
import argparse
from generator import *
from version import __version__
-parse = argparse.ArgumentParser(description='XOS Generative Toolchain')
-parse.add_argument('--rev', dest='rev', action='store_true',
- default=XOSProcessorArgs.default_rev, help='Convert proto to xproto')
-parse.add_argument('--output', dest='output', action='store',
- default=XOSProcessorArgs.default_output, help='Destination dir')
-parse.add_argument('--attic', dest='attic', action='store',
- default=XOSProcessorArgs.default_attic, help='The location at which static files are stored')
-parse.add_argument('--kvpairs', dest='kv', action='store',
- default=XOSProcessorArgs.default_kvpairs, help='Key value pairs to make available to the target')
-parse.add_argument('--write-to-file', dest='write_to_file', choices = ['single', 'model', 'target'], action='store',
- default=XOSProcessorArgs.default_write_to_file,
- help='Single output file (single) or output file per model (model) or let target decide (target)')
-parse.add_argument('--version', action='version', version=__version__)
-parse.add_argument("-v", "--verbosity", action="count",
- default=XOSProcessorArgs.default_verbosity, help="increase output verbosity")
-parse.add_argument("--include-models", dest="include_models", action="append",
- default=XOSProcessorArgs.default_include_models, help="list of models to include")
-parse.add_argument("--include-apps", dest="include_apps", action="append",
- default=XOSProcessorArgs.default_include_apps, help="list of models to include")
+parse = argparse.ArgumentParser(description="XOS Generative Toolchain")
+parse.add_argument(
+ "--rev",
+ dest="rev",
+ action="store_true",
+ default=XOSProcessorArgs.default_rev,
+ help="Convert proto to xproto",
+)
+parse.add_argument(
+ "--output",
+ dest="output",
+ action="store",
+ default=XOSProcessorArgs.default_output,
+ help="Destination dir",
+)
+parse.add_argument(
+ "--attic",
+ dest="attic",
+ action="store",
+ default=XOSProcessorArgs.default_attic,
+ help="The location at which static files are stored",
+)
+parse.add_argument(
+ "--kvpairs",
+ dest="kv",
+ action="store",
+ default=XOSProcessorArgs.default_kvpairs,
+ help="Key value pairs to make available to the target",
+)
+parse.add_argument(
+ "--write-to-file",
+ dest="write_to_file",
+ choices=["single", "model", "target"],
+ action="store",
+ default=XOSProcessorArgs.default_write_to_file,
+ help="Single output file (single) or output file per model (model) or let target decide (target)",
+)
+parse.add_argument("--version", action="version", version=__version__)
+parse.add_argument(
+ "-v",
+ "--verbosity",
+ action="count",
+ default=XOSProcessorArgs.default_verbosity,
+ help="increase output verbosity",
+)
+parse.add_argument(
+ "--include-models",
+ dest="include_models",
+ action="append",
+ default=XOSProcessorArgs.default_include_models,
+ help="list of models to include",
+)
+parse.add_argument(
+ "--include-apps",
+ dest="include_apps",
+ action="append",
+ default=XOSProcessorArgs.default_include_apps,
+ help="list of models to include",
+)
group = parse.add_mutually_exclusive_group()
-group.add_argument('--dest-file', dest='dest_file', action='store',
- default=XOSProcessorArgs.default_dest_file, help='Output file name (if write-to-file is set to single)')
-group.add_argument('--dest-extension', dest='dest_extension', action='store',
- default=XOSProcessorArgs.default_dest_extension, help='Output file extension (if write-to-file is set to single)')
+group.add_argument(
+ "--dest-file",
+ dest="dest_file",
+ action="store",
+ default=XOSProcessorArgs.default_dest_file,
+ help="Output file name (if write-to-file is set to single)",
+)
+group.add_argument(
+ "--dest-extension",
+ dest="dest_extension",
+ action="store",
+ default=XOSProcessorArgs.default_dest_extension,
+ help="Output file extension (if write-to-file is set to single)",
+)
group = parse.add_mutually_exclusive_group(required=True)
-group.add_argument('--target', dest='target', action='store',
- default=XOSProcessorArgs.default_target, help='Output format, corresponding to <output>.yaml file')
-group.add_argument('--checkers', dest='checkers', action='store',
- default=XOSProcessorArgs.default_checkers, help='Comma-separated list of static checkers')
+group.add_argument(
+ "--target",
+ dest="target",
+ action="store",
+ default=XOSProcessorArgs.default_target,
+ help="Output format, corresponding to <output>.yaml file",
+)
+group.add_argument(
+ "--checkers",
+ dest="checkers",
+ action="store",
+ default=XOSProcessorArgs.default_checkers,
+ help="Comma-separated list of static checkers",
+)
-parse.add_argument('files', metavar='<input file>', nargs='+', action='store', help='xproto files to compile')
+parse.add_argument(
+ "files",
+ metavar="<input file>",
+ nargs="+",
+ action="store",
+ help="xproto files to compile",
+)
CHECK = 1
GEN = 2
-class XosGen:
+class XosGen:
@staticmethod
def init(args=None):
if not args:
@@ -68,77 +134,94 @@
if args.target:
op = GEN
- subdir = '/targets/'
+ subdir = "/targets/"
elif args.checkers:
op = CHECK
- subdir = '/checkers/'
+ subdir = "/checkers/"
else:
parse.error("At least one of --target and --checkers is required")
- operators = args.checkers.split(',') if hasattr(args, 'checkers') and args.checkers else [args.target]
+ operators = (
+ args.checkers.split(",")
+ if hasattr(args, "checkers") and args.checkers
+ else [args.target]
+ )
for i in xrange(len(operators)):
- if not '/' in operators[i]:
+ if "/" not in operators[i]:
# if the target is not a path, it refer to a library included one
- operators[i] = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + subdir + operators[i])
+ operators[i] = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + subdir + operators[i]
+ )
if not os.path.isabs(operators[i]):
- operators[i] = os.path.abspath(os.getcwd() + '/' + operators[i])
+ operators[i] = os.path.abspath(os.getcwd() + "/" + operators[i])
if op == GEN:
# convert output to absolute path
if args.output is not None and not os.path.isabs(args.output):
- args.output = os.path.abspath(os.getcwd() + '/' + args.output)
+ args.output = os.path.abspath(os.getcwd() + "/" + args.output)
operator = operators[0]
-
+
# check if there's a line that starts with +++ in the target
# if so, then the output file names are left to the target to decide
# also, if dest-file or dest-extension are supplied, then an error is generated.
- plusplusplus = reduce(lambda acc, line: True if line.startswith('+++') else acc, open(operator).read().splitlines(), False)
+ plusplusplus = reduce(
+ lambda acc, line: True if line.startswith("+++") else acc,
+ open(operator).read().splitlines(),
+ False,
+ )
- if plusplusplus and args.write_to_file != 'target':
- parse.error('%s chooses the names of the files that it generates, you must set --write-to-file to "target"' % operator)
+ if plusplusplus and args.write_to_file != "target":
+ parse.error(
+ '%s chooses the names of the files that it generates, you must set --write-to-file to "target"'
+ % operator
+ )
- if args.write_to_file != 'single' and (args.dest_file):
- parse.error('--dest-file requires --write-to-file to be set to "single"')
+ if args.write_to_file != "single" and (args.dest_file):
+ parse.error(
+ '--dest-file requires --write-to-file to be set to "single"'
+ )
- if args.write_to_file != 'model' and (args.dest_extension):
- parse.error('--dest-extension requires --write-to-file to be set to "model"')
-
+ if args.write_to_file != "model" and (args.dest_extension):
+ parse.error(
+ '--dest-extension requires --write-to-file to be set to "model"'
+ )
+
else:
if args.write_to_file or args.dest_extension:
- parse.error('Checkers cannot write to files')
+ parse.error("Checkers cannot write to files")
inputs = []
for fname in args.files:
if not os.path.isabs(fname):
- inputs.append(os.path.abspath(os.getcwd() + '/' + fname))
+ inputs.append(os.path.abspath(os.getcwd() + "/" + fname))
else:
inputs.append(fname)
args.files = inputs
- if op==GEN:
+ if op == GEN:
generated = XOSProcessor.process(args, operators[0])
if not args.output and not args.write_to_file:
- print generated
- elif op==CHECK:
+ print(generated)
+ elif op == CHECK:
for o in operators:
verdict_str = XOSProcessor.process(args, o)
- vlst = verdict_str.split('\n')
+ vlst = verdict_str.split("\n")
try:
verdict = next(v for v in vlst if v.strip())
- status_code, status_string = verdict.split(' ', 1)
+ status_code, status_string = verdict.split(" ", 1)
status_code = int(status_code)
- except:
- print "Checker %s returned mangled output" % o
+ except BaseException:
+ print("Checker %s returned mangled output" % o)
exit(1)
if status_code != 200:
- print '%s: %s - %s' % (o, status_code, status_string)
+ print("%s: %s - %s" % (o, status_code, status_string))
exit(1)
else:
- print '%s: OK'%o
+ print("%s: OK" % o)
diff --git a/lib/xos-kafka/setup.py b/lib/xos-kafka/setup.py
index 4486f19..3d05854 100644
--- a/lib/xos-kafka/setup.py
+++ b/lib/xos-kafka/setup.py
@@ -19,27 +19,25 @@
def readme():
- with open('README.rst') as f:
+ with open("README.rst") as f:
return f.read()
setup_with_auto_version(
- name='xoskafka',
+ name="xoskafka",
version=__version__,
- description='Wrapper around kafka for XOS',
+ description="Wrapper around kafka for XOS",
long_description=readme(),
- classifiers=[
- 'License :: OSI Approved :: Apache Software License',
- ],
- author='Zack Williams',
- author_email='zdw@opennetworking.org',
- packages=['xoskafka'],
- license='Apache v2',
+ classifiers=["License :: OSI Approved :: Apache Software License"],
+ author="Zack Williams",
+ author_email="zdw@opennetworking.org",
+ packages=["xoskafka"],
+ license="Apache v2",
install_requires=[
- 'confluent-kafka>=0.11.5',
- 'xosconfig>=2.1.0',
- 'multistructlog>=1.5',
- ],
+ "confluent-kafka>=0.11.5",
+ "xosconfig>=2.1.0",
+ "multistructlog>=1.5",
+ ],
include_package_data=True,
zip_safe=False,
- )
+)
diff --git a/lib/xos-kafka/xoskafka/__init__.py b/lib/xos-kafka/xoskafka/__init__.py
index 69f5a32..293a26e 100644
--- a/lib/xos-kafka/xoskafka/__init__.py
+++ b/lib/xos-kafka/xoskafka/__init__.py
@@ -13,3 +13,5 @@
# limitations under the License.
from .xoskafkaproducer import XOSKafkaProducer
+
+__all__ = ["XOSKafkaProducer"]
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
index b4134d5..4611547 100644
--- a/lib/xos-kafka/xoskafka/xoskafkaproducer.py
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -18,7 +18,8 @@
from xosconfig import Config
from multistructlog import create_logger
-log = create_logger(Config().get('logging'))
+
+log = create_logger(Config().get("logging"))
kafka_producer = None
@@ -34,23 +35,24 @@
global kafka_producer
if kafka_producer:
- raise Exception('XOSKafkaProducer already initialized')
+ raise Exception("XOSKafkaProducer already initialized")
else:
- log.info('Connecting to Kafka with bootstrap servers: %s' %
- Config.get('kafka_bootstrap_servers'))
+ log.info(
+ "Connecting to Kafka with bootstrap servers: %s"
+ % Config.get("kafka_bootstrap_servers")
+ )
try:
producer_config = {
- 'bootstrap.servers':
- ','.join(Config.get('kafka_bootstrap_servers')),
+ "bootstrap.servers": ",".join(Config.get("kafka_bootstrap_servers"))
}
kafka_producer = confluent_kafka.Producer(**producer_config)
- log.info('Connected to Kafka: %s' % kafka_producer)
+ log.info("Connected to Kafka: %s" % kafka_producer)
- except confluent_kafka.KafkaError, e:
+ except confluent_kafka.KafkaError as e:
log.exception("Kafka Error: %s" % e)
@classmethod
@@ -58,25 +60,22 @@
try:
kafka_producer.produce(
- topic,
- value,
- key,
- callback=cls._kafka_delivery_callback
- )
+ topic, value, key, callback=cls._kafka_delivery_callback
+ )
# see https://github.com/confluentinc/confluent-kafka-python/issues/16
kafka_producer.poll(0)
- except confluent_kafka.KafkaError, err:
+ except confluent_kafka.KafkaError as err:
log.exception("Kafka Error", err)
def __del__(self):
- if kafka_producer is not None:
+ if kafka_producer is not None:
kafka_producer.flush()
@staticmethod
def _kafka_delivery_callback(err, msg):
if err:
- log.error('Message failed delivery: %s' % err)
+ log.error("Message failed delivery: %s" % err)
else:
- log.trace('Message delivered', message=msg)
+ log.trace("Message delivered", message=msg)
diff --git a/lib/xos-util/setup.py b/lib/xos-util/setup.py
index b60435d..a3707ef 100644
--- a/lib/xos-util/setup.py
+++ b/lib/xos-util/setup.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python
# Copyright 2017-present Open Networking Foundation
#
@@ -15,18 +15,15 @@
# limitations under the License.
-import os
-
-from setuptools import setup
-
from xosutil.autoversion_setup import setup_with_auto_version
from xosutil.version import __version__
-setup_with_auto_version(name='XosUtil',
- version=__version__,
- description='XOS Utility Library',
- author='Scott Baker',
- author_email='scottb@opennetworking.org',
- packages=['xosutil'],
- include_package_data=True
- )
+setup_with_auto_version(
+ name="XosUtil",
+ version=__version__,
+ description="XOS Utility Library",
+ author="Scott Baker",
+ author_email="scottb@opennetworking.org",
+ packages=["xosutil"],
+ include_package_data=True,
+)
diff --git a/lib/xos-util/tests/test_util.py b/lib/xos-util/tests/test_util.py
index ff1ce58..2b5be07 100644
--- a/lib/xos-util/tests/test_util.py
+++ b/lib/xos-util/tests/test_util.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,6 +19,7 @@
test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
class XOSUtilTest(unittest.TestCase):
"""
Testing the XOS Util Module
@@ -40,9 +40,15 @@
test_save_fn = os.path.join(test_path, "test_version.py")
if os.path.exists(test_save_fn):
os.remove(test_save_fn)
- self.assertEqual(version, autodiscover_version.autodiscover_version_of_caller(save_to="test_version.py"))
+ self.assertEqual(
+ version,
+ autodiscover_version.autodiscover_version_of_caller(
+ save_to="test_version.py"
+ ),
+ )
self.assertTrue(os.path.exists(test_save_fn))
self.assertTrue(version in open(test_save_fn).read())
+
if __name__ == "__main__":
unittest.main()
diff --git a/lib/xos-util/xosutil/__init__.py b/lib/xos-util/xosutil/__init__.py
index 42722a8..b0fb0b2 100644
--- a/lib/xos-util/xosutil/__init__.py
+++ b/lib/xos-util/xosutil/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/lib/xos-util/xosutil/autodiscover_version.py b/lib/xos-util/xosutil/autodiscover_version.py
index 2e606e1..8f4ea47 100644
--- a/lib/xos-util/xosutil/autodiscover_version.py
+++ b/lib/xos-util/xosutil/autodiscover_version.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,6 +23,7 @@
import inspect
import os
+
def autodiscover_version(caller_filename=None, save_to=None, max_parent_depth=None):
""" walk back along the path to the current module, searching for a VERSION file """
if not caller_filename:
@@ -42,8 +42,8 @@
return version
# limit_parent_depth can be used to limit how far back we search the tree for a VERSION file.
- if (max_parent_depth is not None):
- if (max_parent_depth <= 0):
+ if max_parent_depth is not None:
+ if max_parent_depth <= 0:
return None
max_parent_depth -= 1
@@ -51,13 +51,16 @@
if not remainder:
return None
+
def autodiscover_version_of_caller(*args, **kwargs):
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
return autodiscover_version(module.__file__, *args, **kwargs)
+
def autodiscover_version_of_main(*args, **kwargs):
import __main__
+
if hasattr(__main__, "__file__"):
return autodiscover_version(__main__.__file__, *args, **kwargs)
else:
diff --git a/lib/xos-util/xosutil/autoversion_setup.py b/lib/xos-util/xosutil/autoversion_setup.py
index 5a7ea44..e027d5c 100644
--- a/lib/xos-util/xosutil/autoversion_setup.py
+++ b/lib/xos-util/xosutil/autoversion_setup.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -33,42 +32,47 @@
import inspect
from autodiscover_version import autodiscover_version
+
class SdistCommand(sdist):
def copy_file(self, infile, outfile, *args, **kwargs):
if kwargs.get("dry_run"):
return (outfile, 1)
- if (os.path.split(outfile)[1] == "version.py"):
- open(outfile, "w").write("# do not edit. Autogenerated file.\n" \
- "__version__ = '%s'\n" % self.distribution.metadata.version)
+ if os.path.split(outfile)[1] == "version.py":
+ open(outfile, "w").write(
+ "# do not edit. Autogenerated file.\n"
+ "__version__ = '%s'\n" % self.distribution.metadata.version
+ )
return (outfile, 1)
else:
return sdist.copy_file(self, infile, outfile, *args, **kwargs)
+
class BuildPyCommand(build_py):
def copy_file(self, infile, outfile, *args, **kwargs):
if kwargs.get("dry_run"):
return (outfile, 1)
- if (os.path.split(outfile)[1] == "version.py"):
- open(outfile, "w").write("# do not edit. Autogenerated file.\n" \
- "__version__ = '%s'\n" % self.distribution.metadata.version)
+ if os.path.split(outfile)[1] == "version.py":
+ open(outfile, "w").write(
+ "# do not edit. Autogenerated file.\n"
+ "__version__ = '%s'\n" % self.distribution.metadata.version
+ )
return (outfile, 1)
else:
return build_py.copy_file(self, infile, outfile, *args, **kwargs)
+
def setup_with_auto_version(*args, **kwargs):
# Learn the module that called this function, so we can search for any VERSION files in it.
frame = inspect.stack()[1]
caller_module = inspect.getmodule(frame[0])
# Search for a VERSION file and extract the version number from it.
- version = autodiscover_version(caller_filename = caller_module.__file__)
+ version = autodiscover_version(caller_filename=caller_module.__file__)
if version:
kwargs["version"] = version
cmdclass = kwargs.get("cmdclass", {}).copy()
- cmdclass.update( {"sdist": SdistCommand,
- "build_py": BuildPyCommand} )
+ cmdclass.update({"sdist": SdistCommand, "build_py": BuildPyCommand})
kwargs["cmdclass"] = cmdclass
return setup(*args, **kwargs)
-
diff --git a/lib/xos-util/xosutil/version.py b/lib/xos-util/xosutil/version.py
index 57833b8..8456fc5 100644
--- a/lib/xos-util/xosutil/version.py
+++ b/lib/xos-util/xosutil/version.py
@@ -13,4 +13,4 @@
# limitations under the License.
# This file is autogenerated. Do not edit.
-__version__ = 'unknown'
+__version__ = "unknown"