[SEBA-412] Automated reformat of Python code
Passes of modernize, autopep8, black, then check with flake8
flake8 + manual fixes:
lib/xos-config
lib/xos-kafka
lib/xos-util
xos/coreapi
xos/api
xos/xos_client
Change-Id: Ib23cf84cb13beb3c6381fa0d79594dc9131dc815
diff --git a/lib/xos-genx/xosgenx/__init__.py b/lib/xos-genx/xosgenx/__init__.py
index d4e8062..b0fb0b2 100644
--- a/lib/xos-genx/xosgenx/__init__.py
+++ b/lib/xos-genx/xosgenx/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
diff --git a/lib/xos-genx/xosgenx/generator.py b/lib/xos-genx/xosgenx/generator.py
index 3355fb5..3e650be 100644
--- a/lib/xos-genx/xosgenx/generator.py
+++ b/lib/xos-genx/xosgenx/generator.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import plyxproto.parser as plyxproto
import jinja2
import os
@@ -23,9 +23,10 @@
import yaml
from colorama import Fore
-loader = jinja2.PackageLoader(__name__, 'templates')
+loader = jinja2.PackageLoader(__name__, "templates")
env = jinja2.Environment(loader=loader)
+
class XOSProcessorArgs:
""" Helper class for use cases that want to call XOSProcessor directly, rather than executing xosgenx from the
command line.
@@ -40,9 +41,13 @@
default_dest_extension = None
default_target = None
default_checkers = None
- default_verbosity = 0 # Higher numbers = more verbosity, lower numbers = less verbosity
- default_include_models = [] # If neither include_models nor include_apps is specified, then all models will
- default_include_apps = [] # be included.
+ default_verbosity = (
+ 0
+ ) # Higher numbers = more verbosity, lower numbers = less verbosity
+ default_include_models = (
+ []
+ ) # If neither include_models nor include_apps is specified, then all models will
+ default_include_apps = [] # be included.
def __init__(self, **kwargs):
# set defaults
@@ -60,14 +65,14 @@
self.include_apps = XOSProcessorArgs.default_include_apps
# override defaults with kwargs
- for (k,v) in kwargs.items():
+ for (k, v) in kwargs.items():
setattr(self, k, v)
-class XOSProcessor:
+class XOSProcessor:
@staticmethod
def _read_input_from_files(files):
- input = ''
+ input = ""
for fname in files:
with open(fname) as infile:
input += infile.read()
@@ -75,7 +80,7 @@
@staticmethod
def _attach_parser(ast, args):
- if hasattr(args, 'rev') and args.rev:
+ if hasattr(args, "rev") and args.rev:
v = Proto2XProto()
ast.accept(v)
@@ -86,7 +91,9 @@
@staticmethod
def _get_template(target):
if not os.path.isabs(target):
- return os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/targets/' + target)
+ return os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/targets/" + target
+ )
return target
@staticmethod
@@ -94,10 +101,10 @@
# NOTE this method can be used in the jinja template
def file_exists2(name):
if attic is not None:
- path = attic + '/' + name
+ path = attic + "/" + name
else:
path = name
- return (os.path.exists(path))
+ return os.path.exists(path)
return file_exists2
@@ -106,60 +113,65 @@
# NOTE this method can be used in the jinja template
def include_file2(name):
if attic is not None:
- path = attic + '/' + name
+ path = attic + "/" + name
else:
path = name
return open(path).read()
+
return include_file2
@staticmethod
def _load_jinja2_extensions(os_template_env, attic):
- os_template_env.globals['include_file'] = XOSProcessor._include_file(attic) # Generates a function
- os_template_env.globals['file_exists'] = XOSProcessor._file_exists(attic) # Generates a function
+ os_template_env.globals["include_file"] = XOSProcessor._include_file(
+ attic
+ ) # Generates a function
+ os_template_env.globals["file_exists"] = XOSProcessor._file_exists(
+ attic
+ ) # Generates a function
- os_template_env.filters['yaml'] = yaml.dump
+ os_template_env.filters["yaml"] = yaml.dump
for f in dir(jinja2_extensions):
- if f.startswith('xproto'):
+ if f.startswith("xproto"):
os_template_env.globals[f] = getattr(jinja2_extensions, f)
return os_template_env
@staticmethod
def _add_context(args):
- if not hasattr(args, 'kv') or not args.kv:
+ if not hasattr(args, "kv") or not args.kv:
return
try:
context = {}
- for s in args.kv.split(','):
- k, val = s.split(':')
+ for s in args.kv.split(","):
+ k, val = s.split(":")
context[k] = val
return context
- except Exception, e:
- print e.message
+ except Exception as e:
+ print(e.message)
@staticmethod
def _write_single_file(rendered, dir, dest_file, quiet):
file_name = "%s/%s" % (dir, dest_file)
- file = open(file_name, 'w')
+ file = open(file_name, "w")
file.write(rendered)
file.close()
- if quiet == False:
- print "Saved: %s" % file_name
+ if not quiet:
+ print("Saved: %s" % file_name)
@staticmethod
def _write_file_per_model(rendered, dir, suffix, quiet):
for m in rendered:
file_name = "%s/%s%s" % (dir, m.lower(), suffix)
if not rendered[m]:
- if quiet == False:
- print "Not saving %s as it is empty" % file_name
+ if not quiet:
+ print("Not saving %s as it is empty" % file_name)
else:
- file = open(file_name, 'w')
+ file = open(file_name, "w")
file.write(rendered[m])
file.close()
- if quiet == False:
- print "Saved: %s" % file_name
+ if not quiet:
+ print("Saved: %s" % file_name)
@staticmethod
def _write_split_target(rendered, dir, quiet):
@@ -167,21 +179,21 @@
lines = rendered.splitlines()
current_buffer = []
for l in lines:
- if (l.startswith('+++')):
+ if l.startswith("+++"):
if dir:
- path = dir + '/' + l[4:].lower()
+ path = dir + "/" + l[4:].lower()
- fil = open(path, 'w')
- buf = '\n'.join(current_buffer)
+ fil = open(path, "w")
+ buf = "\n".join(current_buffer)
obuf = buf
fil.write(obuf)
fil.close()
- if quiet == False:
- print "Save file to: %s" % path
+ if not quiet:
+ print("Save file to: %s" % path)
current_buffer = []
else:
@@ -189,50 +201,55 @@
@staticmethod
def _find_message_by_model_name(messages, model):
- return next((x for x in messages if x['name'] == model), None)
+ return next((x for x in messages if x["name"] == model), None)
@staticmethod
def _find_last_nonempty_line(text, pointer):
ne_pointer = pointer
found = False
- while ne_pointer!=0 and not found:
- ne_pointer = text[:(ne_pointer-1)].rfind('\n')
- if ne_pointer<0: ne_pointer = 0
- if text[ne_pointer-1]!='\n':
+ while ne_pointer != 0 and not found:
+ ne_pointer = text[: (ne_pointer - 1)].rfind("\n")
+ if ne_pointer < 0:
+ ne_pointer = 0
+ if text[ne_pointer - 1] != "\n":
found = True
return ne_pointer
@staticmethod
- def process(args, operator = None):
+ def process(args, operator=None):
# Setting defaults
- if not hasattr(args, 'attic'):
+ if not hasattr(args, "attic"):
args.attic = None
- if not hasattr(args, 'write_to_file'):
+ if not hasattr(args, "write_to_file"):
args.write_to_file = None
- if not hasattr(args, 'dest_file'):
+ if not hasattr(args, "dest_file"):
args.dest_file = None
- if not hasattr(args, 'dest_extension'):
+ if not hasattr(args, "dest_extension"):
args.dest_extension = None
- if not hasattr(args, 'output'):
+ if not hasattr(args, "output"):
args.output = None
- if not hasattr(args, 'quiet'):
+ if not hasattr(args, "quiet"):
args.quiet = True
# Validating
- if args.write_to_file == 'single' and args.dest_file is None:
- raise Exception("[XosGenX] write_to_file option is specified as 'single' but no dest_file is provided")
- if args.write_to_file == 'model' and (args.dest_extension is None):
- raise Exception("[XosGenX] write_to_file option is specified as 'model' but no dest_extension is provided")
+ if args.write_to_file == "single" and args.dest_file is None:
+ raise Exception(
+ "[XosGenX] write_to_file option is specified as 'single' but no dest_file is provided"
+ )
+ if args.write_to_file == "model" and (args.dest_extension is None):
+ raise Exception(
+ "[XosGenX] write_to_file option is specified as 'model' but no dest_extension is provided"
+ )
if args.output is not None and not os.path.isabs(args.output):
raise Exception("[XosGenX] The output dir must be an absolute path!")
if args.output is not None and not os.path.isdir(args.output):
raise Exception("[XosGenX] The output dir must be a directory!")
- if hasattr(args, 'files'):
+ if hasattr(args, "files"):
inputs = XOSProcessor._read_input_from_files(args.files)
- elif hasattr(args, 'inputs'):
+ elif hasattr(args, "inputs"):
inputs = args.inputs
else:
raise Exception("[XosGenX] No inputs provided!")
@@ -243,28 +260,29 @@
else:
template_path = operator
-
[template_folder, template_name] = os.path.split(template_path)
os_template_loader = jinja2.FileSystemLoader(searchpath=[template_folder])
os_template_env = jinja2.Environment(loader=os_template_loader)
- os_template_env = XOSProcessor._load_jinja2_extensions(os_template_env, args.attic)
+ os_template_env = XOSProcessor._load_jinja2_extensions(
+ os_template_env, args.attic
+ )
template = os_template_env.get_template(template_name)
context = XOSProcessor._add_context(args)
parser = plyxproto.ProtobufAnalyzer()
try:
ast = parser.parse_string(inputs, debug=0)
- except plyxproto.ParsingError, e:
+ except plyxproto.ParsingError as e:
line, start, end = e.error_range
ptr = XOSProcessor._find_last_nonempty_line(inputs, start)
if start == 0:
- beginning = ''
+ beginning = ""
else:
- beginning = inputs[ptr:start-1]
+ beginning = inputs[ptr: start - 1]
- line_end_char = inputs[start+end:].find('\n')
+ line_end_char = inputs[start + end:].find("\n")
line_end = inputs[line_end_char]
if e.message:
@@ -272,11 +290,16 @@
else:
error = "xproto parsing error"
- print error + "\n" + Fore.YELLOW + "Line %d:"%line + Fore.WHITE
- print beginning + Fore.YELLOW + inputs[start-1:start+end] + Fore.WHITE + line_end
+ print(error + "\n" + Fore.YELLOW + "Line %d:" % line + Fore.WHITE)
+ print(
+ beginning
+ + Fore.YELLOW
+ + inputs[start - 1: start + end]
+ + Fore.WHITE
+ + line_end
+ )
exit(1)
-
v = XOSProcessor._attach_parser(ast, args)
if args.include_models or args.include_apps:
@@ -300,38 +323,42 @@
messages = [XOSProcessor._find_message_by_model_name(v.messages, model)]
rendered[model] = template.render(
- {"proto":
- {
- 'message_table': models,
- 'messages': messages,
- 'policies': v.policies,
- 'message_names': [m['name'] for m in v.messages]
+ {
+ "proto": {
+ "message_table": models,
+ "messages": messages,
+ "policies": v.policies,
+ "message_names": [m["name"] for m in v.messages],
},
"context": context,
- "options": v.options
+ "options": v.options,
}
)
- if (str(v.options.get("legacy", "false")).strip('"').lower() == "true"):
+ if str(v.options.get("legacy", "false")).strip('"').lower() == "true":
suffix = "_decl." + args.dest_extension
else:
suffix = "." + args.dest_extension
- XOSProcessor._write_file_per_model(rendered, args.output, suffix, args.quiet)
+ XOSProcessor._write_file_per_model(
+ rendered, args.output, suffix, args.quiet
+ )
else:
rendered = template.render(
- {"proto":
- {
- 'message_table': v.models,
- 'messages': v.messages,
- 'policies': v.policies,
- 'message_names': [m['name'] for m in v.messages]
+ {
+ "proto": {
+ "message_table": v.models,
+ "messages": v.messages,
+ "policies": v.policies,
+ "message_names": [m["name"] for m in v.messages],
},
"context": context,
- "options": v.options
+ "options": v.options,
}
)
if args.output is not None and args.write_to_file == "target":
XOSProcessor._write_split_target(rendered, args.output, args.quiet)
elif args.output is not None and args.write_to_file == "single":
- XOSProcessor._write_single_file(rendered, args.output, args.dest_file, args.quiet)
+ XOSProcessor._write_single_file(
+ rendered, args.output, args.dest_file, args.quiet
+ )
return rendered
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py b/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
index 859594c..bf7a812 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/__init__.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
from .django import *
from .base import *
from .fol2 import *
from .gui import *
from .tosca import *
from .checklib import *
+
+__all__ = ["django", "base", "fol2", "gui", "tosca", "checklib"]
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/base.py b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
index e11d2ec..96e8dc2 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/base.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,49 +13,56 @@
# limitations under the License.
+from __future__ import print_function
import pdb
import re
from inflect import engine as inflect_engine_class
inflect_engine = inflect_engine_class()
+
class FieldNotFound(Exception):
def __init__(self, message):
super(FieldNotFound, self).__init__(message)
+
def xproto_debug(**kwargs):
- print kwargs
+ print(kwargs)
pdb.set_trace()
+
def xproto_unquote(s):
return unquote(s)
+
def unquote(s):
- if (s.startswith('"') and s.endswith('"')):
+ if s.startswith('"') and s.endswith('"'):
return s[1:-1]
else:
return s
+
def xproto_singularize(field):
try:
# The user has set a singular, as an exception that cannot be handled automatically
- singular = field['options']['singular']
+ singular = field["options"]["singular"]
singular = unquote(singular)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
return singular
+
def xproto_singularize_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
@@ -65,86 +71,103 @@
return plural
+
def xproto_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- plural = inflect_engine.plural_noun(field['name'])
+ plural = inflect_engine.plural_noun(field["name"])
return plural
-def xproto_base_def(model_name, base, suffix='', suffix_list=[]):
- if (model_name=='XOSBase'):
- return '(models.Model, PlModelMixIn)'
- elif (not base):
- return ''
+
+def xproto_base_def(model_name, base, suffix="", suffix_list=[]):
+ if model_name == "XOSBase":
+ return "(models.Model, PlModelMixIn)"
+ elif not base:
+ return ""
else:
- int_base = [i['name']+suffix for i in base if i['name'] in suffix_list]
- ext_base = [i['name'] for i in base if i['name'] not in suffix_list]
- return '(' + ','.join(int_base + ext_base) + ')'
+ int_base = [i["name"] + suffix for i in base if i["name"] in suffix_list]
+ ext_base = [i["name"] for i in base if i["name"] not in suffix_list]
+ return "(" + ",".join(int_base + ext_base) + ")"
+
def xproto_first_non_empty(lst):
for l in lst:
- if l: return l
+ if l:
+ return l
+
def xproto_api_type(field):
try:
- if (unquote(field['options']['content_type'])=='date'):
- return 'double'
+ if unquote(field["options"]["content_type"]) == "date":
+ return "double"
except KeyError:
pass
- return field['type']
+ return field["type"]
def xproto_base_name(n):
# Hack - Refactor NetworkParameter* to make this go away
- if (n.startswith('NetworkParameter')):
- return '_'
+ if n.startswith("NetworkParameter"):
+ return "_"
- expr = r'^[A-Z]+[a-z]*'
+ expr = r"^[A-Z]+[a-z]*"
try:
match = re.findall(expr, n)[0]
- except:
- return '_'
+ except BaseException:
+ return "_"
return match
+
def xproto_base_fields(m, table):
fields = []
- for b in m['bases']:
- option1 = b['fqn']
+ for b in m["bases"]:
+ option1 = b["fqn"]
try:
- option2 = m['package'] + '.' + b['name']
+ option2 = m["package"] + "." + b["name"]
except TypeError:
option2 = option1
accessor = None
- if option1 in table: accessor = option1
- elif option2 in table: accessor = option2
+ if option1 in table:
+ accessor = option1
+ elif option2 in table:
+ accessor = option2
if accessor:
base_fields = xproto_base_fields(table[accessor], table)
- model_fields = [x.copy() for x in table[accessor]['fields']]
+ model_fields = [x.copy() for x in table[accessor]["fields"]]
for field in model_fields:
field["accessor"] = accessor
fields.extend(base_fields)
fields.extend(model_fields)
- if 'no_sync' in m['options'] and m['options']['no_sync']:
- fields = [f for f in fields if f['name'] != 'backend_status' and f['name'] != 'backend_code']
+ if "no_sync" in m["options"] and m["options"]["no_sync"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "backend_status" and f["name"] != "backend_code"
+ ]
- if 'no_policy' in m['options'] and m['options']['no_policy']:
- fields = [f for f in fields if f['name'] != 'policy_status' and f['name'] != 'policy_code']
+ if "no_policy" in m["options"] and m["options"]["no_policy"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "policy_status" and f["name"] != "policy_code"
+ ]
return fields
+
def xproto_fields(m, table):
""" Generate the full list of models for the xproto message `m` including fields from the classes it inherits.
@@ -165,9 +188,17 @@
# The "id" field is a special field. Every model has one. Put it up front and pretend it's part of the
if not fields:
- raise Exception("Model %s has no fields. Check for missing base class." % m["name"])
+ raise Exception(
+ "Model %s has no fields. Check for missing base class." % m["name"]
+ )
- id_field = {'type': 'int32', 'name': 'id', 'options': {}, "id": "1", "accessor": fields[0]["accessor"]}
+ id_field = {
+ "type": "int32",
+ "name": "id",
+ "options": {},
+ "id": "1",
+ "accessor": fields[0]["accessor"],
+ }
fields = [id_field] + fields
@@ -176,12 +207,15 @@
offset = 0
last_accessor = fields[0]["accessor"]
for field in fields:
- if (field["accessor"] != last_accessor):
+ if field["accessor"] != last_accessor:
last_accessor = field["accessor"]
offset += 100
field_id = int(field["id"])
if (field_id < 1) or (field_id >= 100):
- raise Exception("Only field numbers from 1 to 99 are permitted, field %s in model %s" % (field["name"], field["accessor"]))
+ raise Exception(
+ "Only field numbers from 1 to 99 are permitted, field %s in model %s"
+ % (field["name"], field["accessor"])
+ )
field["id"] = int(field["id"]) + offset
# Check for duplicates
@@ -190,20 +224,24 @@
id = field["id"]
dup = fields_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d with field %s in model %s" % (field["name"], id, dup["name"], field["accessor"]))
+ raise Exception(
+ "Field %s has duplicate number %d with field %s in model %s"
+ % (field["name"], id, dup["name"], field["accessor"])
+ )
fields_by_number[id] = field
return fields
+
def xproto_base_rlinks(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_rlinks = xproto_base_rlinks(table[b], table)
- model_rlinks = [x.copy() for x in table[b]['rlinks']]
+ model_rlinks = [x.copy() for x in table[b]["rlinks"]]
for link in model_rlinks:
link["accessor"] = b
@@ -212,6 +250,7 @@
return links
+
def xproto_rlinks(m, table):
""" Return the reverse links for the xproto message `m`.
@@ -228,14 +267,16 @@
links = xproto_base_rlinks(m, table) + model_rlinks
- links = [x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])]
+ links = [
+ x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])
+ ]
if links:
last_accessor = links[0]["accessor"]
offset = 0
index = 1900
for link in links:
- if (link["accessor"] != last_accessor):
+ if link["accessor"] != last_accessor:
last_accessor = link["accessor"]
offset += 100
@@ -249,13 +290,15 @@
index += 1
# check for duplicates
- links_by_number={}
+ links_by_number = {}
for link in links:
id = link["id"]
- dup=links_by_number.get(id)
+ dup = links_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d in model %s with reverse field %s" %
- (link["src_port"], id, m["name"], dup["src_port"]))
+ raise Exception(
+ "Field %s has duplicate number %d in model %s with reverse field %s"
+ % (link["src_port"], id, m["name"], dup["src_port"])
+ )
links_by_number[id] = link
return links
@@ -264,40 +307,45 @@
def xproto_base_links(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_links = xproto_base_links(table[b], table)
- model_links = table[b]['links']
+ model_links = table[b]["links"]
links.extend(base_links)
links.extend(model_links)
return links
+
def xproto_string_type(xptags):
try:
- max_length = eval(xptags['max_length'])
- except:
+ max_length = eval(xptags["max_length"])
+ except BaseException:
max_length = 1024
- if ('varchar' not in xptags):
- return 'string'
+ if "varchar" not in xptags:
+ return "string"
else:
- return 'text'
+ return "text"
+
def xproto_tuplify(nested_list_or_set):
- if not isinstance(nested_list_or_set, list) and not isinstance(nested_list_or_set, set):
+ if not isinstance(nested_list_or_set, list) and not isinstance(
+ nested_list_or_set, set
+ ):
return nested_list_or_set
else:
return tuple([xproto_tuplify(i) for i in nested_list_or_set])
-def xproto_field_graph_components(fields, model, tag='unique_with'):
+
+def xproto_field_graph_components(fields, model, tag="unique_with"):
def find_components(graph):
pending = set(graph.keys())
components = []
while pending:
- front = { pending.pop() }
+ front = {pending.pop()}
component = set()
while front:
@@ -308,87 +356,96 @@
pending -= neighbours
component |= neighbours
-
+
components.append(component)
return components
field_graph = {}
- field_names = {f['name'] for f in fields}
+ field_names = {f["name"] for f in fields}
for f in fields:
try:
- tagged_str = unquote(f['options'][tag])
- tagged_fields = tagged_str.split(',')
+ tagged_str = unquote(f["options"][tag])
+ tagged_fields = tagged_str.split(",")
for uf in tagged_fields:
if uf not in field_names:
- raise FieldNotFound('Field "%s" not found in model "%s", referenced from field "%s" by option "%s"' % (uf, model['name'], f['name'], tag))
+ raise FieldNotFound(
+ 'Field "%s" not found in model "%s", referenced from field "%s" by option "%s"'
+ % (uf, model["name"], f["name"], tag)
+ )
- field_graph.setdefault(f['name'], set()).add(uf)
- field_graph.setdefault(uf, set()).add(f['name'])
+ field_graph.setdefault(f["name"], set()).add(uf)
+ field_graph.setdefault(uf, set()).add(f["name"])
except KeyError:
pass
return find_components(field_graph)
+
def xproto_api_opts(field):
options = []
- if 'max_length' in field['options'] and field['type']=='string':
- options.append('(val).maxLength = %s'%field['options']['max_length'])
+ if "max_length" in field["options"] and field["type"] == "string":
+ options.append("(val).maxLength = %s" % field["options"]["max_length"])
try:
- if field['options']['null'] == 'False':
- options.append('(val).nonNull = true')
+ if field["options"]["null"] == "False":
+ options.append("(val).nonNull = true")
except KeyError:
pass
- if 'link' in field and 'model' in field['options']:
- options.append('(foreignKey).modelName = "%s"'%field['options']['model'])
+ if "link" in field and "model" in field["options"]:
+ options.append('(foreignKey).modelName = "%s"' % field["options"]["model"])
if ("options" in field) and ("port" in field["options"]):
- options.append('(foreignKey).reverseFieldName = "%s"' % field['options']['port'])
+ options.append(
+ '(foreignKey).reverseFieldName = "%s"' % field["options"]["port"]
+ )
if options:
- options_str = '[' + ', '.join(options) + ']'
+ options_str = "[" + ", ".join(options) + "]"
else:
- options_str = ''
+ options_str = ""
return options_str
+
def xproto_type_to_swagger_type(f):
try:
- content_type = f['options']['content_type']
+ content_type = f["options"]["content_type"]
content_type = eval(content_type)
- except:
+ except BaseException:
content_type = None
pass
- if 'choices' in f['options']:
- return 'string'
- elif content_type == 'date':
- return 'string'
- elif f['type'] == 'bool':
- return 'boolean'
- elif f['type'] == 'string':
- return 'string'
- elif f['type'] in ['int','uint32','int32'] or 'link' in f:
- return 'integer'
- elif f['type'] in ['double','float']:
- return 'string'
+ if "choices" in f["options"]:
+ return "string"
+ elif content_type == "date":
+ return "string"
+ elif f["type"] == "bool":
+ return "boolean"
+ elif f["type"] == "string":
+ return "string"
+ elif f["type"] in ["int", "uint32", "int32"] or "link" in f:
+ return "integer"
+ elif f["type"] in ["double", "float"]:
+ return "string"
+
def xproto_field_to_swagger_enum(f):
- if 'choices' in f['options']:
+ if "choices" in f["options"]:
list = []
- for c in eval(xproto_unquote(f['options']['choices'])):
+ for c in eval(xproto_unquote(f["options"]["choices"])):
list.append(c[0])
return list
else:
return False
+
def xproto_is_true(x):
# TODO: Audit xproto and make specification of trueness more uniform
- if (x==True) or (x=="True") or (x=='"True"'):
+ if x is True or (x == "True") or (x == '"True"'):
return True
return False
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py b/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
index a61f7ca..db61f01 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/checklib.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,62 +14,67 @@
import ast
+
def xproto_check_synchronizer(m):
try:
- sync_step_path = 'synchronizer/steps/sync_%s.py'%m['name'].lower()
+ sync_step_path = "synchronizer/steps/sync_%s.py" % m["name"].lower()
sync_step = open(sync_step_path).read()
except IOError:
- return '510 Model needs a sync step %s'%sync_step_path
+ return "510 Model needs a sync step %s" % sync_step_path
try:
sync_step_ast = ast.parse(sync_step)
except SyntaxError:
- return '511 Could not parse sync step %s'%sync_step_path
+ return "511 Could not parse sync step %s" % sync_step_path
- classes = filter(lambda x:isinstance(x, ast.ClassDef), sync_step_ast.body)
+ classes = filter(lambda x: isinstance(x, ast.ClassDef), sync_step_ast.body)
found_sync_step_class = False
for c in classes:
base_names = [v.id for v in c.bases]
- if 'SyncStep' in base_names or 'SyncInstanceUsingAnsible' in base_names:
- attributes = filter(lambda x:isinstance(x, ast.Assign), c.body)
+ if "SyncStep" in base_names or "SyncInstanceUsingAnsible" in base_names:
+ attributes = filter(lambda x: isinstance(x, ast.Assign), c.body)
for a in attributes:
target_names = [t.id for t in a.targets]
values = a.value.elts if isinstance(a.value, ast.List) else [a.value]
value_names = [v.id for v in values]
- if 'observes' in target_names and m['name'] in value_names:
+ if "observes" in target_names and m["name"] in value_names:
found_sync_step_class = True
break
if not found_sync_step_class:
- return '512 Synchronizer needs a sync step class with an observes field containing %s'%m['name']
+ return (
+ "512 Synchronizer needs a sync step class with an observes field containing %s"
+ % m["name"]
+ )
else:
- return '200 OK'
+ return "200 OK"
def xproto_check_policy(m):
try:
- model_policy_path = 'synchronizer/model_policies/model_policy_%s.py'%m['name'].lower()
+ model_policy_path = (
+ "synchronizer/model_policies/model_policy_%s.py" % m["name"].lower()
+ )
model_policy = open(model_policy_path).read()
except IOError:
- return '510 Model needs a model policy %s'%model_policy_path
+ return "510 Model needs a model policy %s" % model_policy_path
try:
model_policy_ast = ast.parse(model_policy)
except SyntaxError:
- return '511 Could not parse sync step %s'%model_policy_path
+ return "511 Could not parse sync step %s" % model_policy_path
- classes = filter(lambda x:isinstance(x, ast.ClassDef), model_policy_ast.body)
+ classes = filter(lambda x: isinstance(x, ast.ClassDef), model_policy_ast.body)
found_model_policy_class = False
for c in classes:
base_names = [v.id for v in c.bases]
- if 'Policy' in base_names or 'TenantWithContainerPolicy' in base_names:
+ if "Policy" in base_names or "TenantWithContainerPolicy" in base_names:
found_model_policy_class = True
break
if not found_model_policy_class:
- return '513 Synchronizer needs a model policy class'
+ return "513 Synchronizer needs a model policy class"
else:
- return '200 OK'
-
+ return "200 OK"
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/django.py b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
index 64ab51a..d71ea51 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/django.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/django.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -18,183 +17,217 @@
import pdb
import re
+
def django_content_type_string(xptags):
# Check possibility of KeyError in caller
- content_type = xptags['content_type']
+ content_type = xptags["content_type"]
try:
content_type = eval(content_type)
- except:
+ except BaseException:
pass
- if (content_type=='url'):
- return 'URLField'
- if (content_type=='date'):
- return 'DateTimeField'
- elif (content_type=='ip'):
- return 'GenericIPAddressField'
- elif (content_type=='stripped' or content_type=='"stripped"'):
- return 'StrippedCharField'
+ if content_type == "url":
+ return "URLField"
+ if content_type == "date":
+ return "DateTimeField"
+ elif content_type == "ip":
+ return "GenericIPAddressField"
+ elif content_type == "stripped" or content_type == '"stripped"':
+ return "StrippedCharField"
else:
- raise Exception('Unknown Type: %s'%content_type)
+ raise Exception("Unknown Type: %s" % content_type)
+
def django_string_type(xptags):
try:
- max_length = eval(xptags['max_length'])
- except:
+ max_length = eval(xptags["max_length"])
+ except BaseException:
max_length = 1024 * 1024
- if ('content_type' in xptags):
+ if "content_type" in xptags:
return django_content_type_string(xptags)
- elif (max_length<1024*1024):
- return 'CharField'
+ elif max_length < 1024 * 1024:
+ return "CharField"
else:
- return 'TextField'
+ return "TextField"
+
def xproto_django_type(xptype, xptags):
- if (xptype=='string'):
+ if xptype == "string":
return django_string_type(xptags)
- elif (xptype=='float'):
- return 'FloatField'
- elif (xptype=='bool'):
- return 'BooleanField'
- elif (xptype=='uint32'):
- return 'IntegerField'
- elif (xptype=='int32'):
- return 'IntegerField'
- elif (xptype=='int64'):
- return 'BigIntegerField'
+ elif xptype == "float":
+ return "FloatField"
+ elif xptype == "bool":
+ return "BooleanField"
+ elif xptype == "uint32":
+ return "IntegerField"
+ elif xptype == "int32":
+ return "IntegerField"
+ elif xptype == "int64":
+ return "BigIntegerField"
else:
- raise Exception('Unknown Type: %s'%xptype)
+ raise Exception("Unknown Type: %s" % xptype)
+
def xproto_django_link_type(f):
- if (f['link_type']=='manytoone'):
- return 'ForeignKey'
- elif (f['link_type']=='onetoone'):
- return 'OneToOneField'
- elif (f['link_type']=='manytomany'):
- if (f['dst_port']):
- return 'ManyToManyField'
+ if f["link_type"] == "manytoone":
+ return "ForeignKey"
+ elif f["link_type"] == "onetoone":
+ return "OneToOneField"
+ elif f["link_type"] == "manytomany":
+ if f["dst_port"]:
+ return "ManyToManyField"
else:
- return 'GenericRelation'
+ return "GenericRelation"
+
def map_xproto_to_django(f):
- allowed_keys=['help_text','default','max_length','modifier','blank','choices','db_index','null','editable','on_delete','verbose_name', 'auto_now_add', 'unique', 'min_value', 'max_value']
+ allowed_keys = [
+ "help_text",
+ "default",
+ "max_length",
+ "modifier",
+ "blank",
+ "choices",
+ "db_index",
+ "null",
+ "editable",
+ "on_delete",
+ "verbose_name",
+ "auto_now_add",
+ "unique",
+ "min_value",
+ "max_value",
+ ]
# TODO evaluate if setting Null = False for all strings
- m = {'modifier':{'optional':True, 'required':False, '_targets': ['null', 'blank']}}
+ m = {
+ "modifier": {"optional": True, "required": False, "_targets": ["null", "blank"]}
+ }
out = {}
- for k,v in f['options'].items():
+ for k, v in f["options"].items():
if k in allowed_keys:
try:
- # NOTE this will be used to parse xproto optional/required field prefix and apply it to the null and blank fields
+ # NOTE this will be used to parse xproto optional/required field prefix
+ # and apply it to the null and blank fields
kv2 = m[k]
- for t in kv2['_targets']:
+ for t in kv2["_targets"]:
out[t] = kv2[v]
- except:
+ except BaseException:
out[k] = v
return out
+
def xproto_django_link_options_str(field, dport=None):
output_dict = map_xproto_to_django(field)
- if (dport and (dport=='+' or '+' not in dport)):
- output_dict['related_name'] = '%r'%dport
+ if dport and (dport == "+" or "+" not in dport):
+ output_dict["related_name"] = "%r" % dport
try:
- if field['through']:
+ if field["through"]:
d = {}
- if isinstance(field['through'], str):
- split = field['through'].rsplit('.',1)
- d['name'] = split[-1]
- if len(split)==2:
- d['package'] = split[0]
- d['fqn'] = 'package' + '.' + d['name']
+ if isinstance(field["through"], str):
+ split = field["through"].rsplit(".", 1)
+ d["name"] = split[-1]
+ if len(split) == 2:
+ d["package"] = split[0]
+ d["fqn"] = "package" + "." + d["name"]
else:
- d['fqn'] = d['name']
- d['package'] = ''
+ d["fqn"] = d["name"]
+ d["package"] = ""
else:
- d = field['through']
+ d = field["through"]
- if not d['name'].endswith('_'+field['name']):
- output_dict['through'] = '%r'%d['fqn']
+ if not d["name"].endswith("_" + field["name"]):
+ output_dict["through"] = "%r" % d["fqn"]
except KeyError:
pass
return format_options_string(output_dict)
+
def use_native_django_validators(k, v):
validators_map = {
- 'min_value': 'MinValueValidator',
- 'max_value': 'MaxValueValidator'
+ "min_value": "MinValueValidator",
+ "max_value": "MaxValueValidator",
}
return "%s(%s)" % (validators_map[k], v)
+
def format_options_string(d):
- known_validators = ['min_value', 'max_value']
+ known_validators = ["min_value", "max_value"]
validator_lst = []
- if (not d):
- return ''
+ if not d:
+ return ""
else:
lst = []
- for k,v in d.items():
+ for k, v in d.items():
if k in known_validators:
validator_lst.append(use_native_django_validators(k, v))
- elif (type(v)==str and k=='default' and v.endswith('()"')):
- lst.append('%s = %s'%(k,v[1:-3]))
- elif (type(v)==str and v.startswith('"')):
+ elif isinstance(v, str) and k == "default" and v.endswith('()"'):
+ lst.append("%s = %s" % (k, v[1:-3]))
+ elif isinstance(v, str) and v.startswith('"'):
try:
# unquote the value if necessary
tup = eval(v[1:-1])
- if (type(tup)==tuple):
- lst.append('%s = %r'%(k,tup))
+ if isinstance(tup, tuple):
+ lst.append("%s = %r" % (k, tup))
else:
- lst.append('%s = %s'%(k,v))
- except:
- lst.append('%s = %s'%(k,v))
- elif (type(v)==bool):
- lst.append('%s = %r'%(k,bool(v)))
+ lst.append("%s = %s" % (k, v))
+ except BaseException:
+ lst.append("%s = %s" % (k, v))
+ elif isinstance(v, bool):
+ lst.append("%s = %r" % (k, bool(v)))
else:
try:
- lst.append('%s = %r'%(k,int(v)))
+ lst.append("%s = %r" % (k, int(v)))
except ValueError:
- lst.append('%s = %s'%(k,v))
- validator_string = "validators=[%s]" % ', '.join(validator_lst)
- option_string = ', '.join(lst)
+ lst.append("%s = %s" % (k, v))
+ validator_string = "validators=[%s]" % ", ".join(validator_lst)
+ option_string = ", ".join(lst)
if len(validator_lst) == 0:
return option_string
elif len(lst) == 0:
return validator_string
else:
- return option_string + ", " + validator_string
+ return option_string + ", " + validator_string
+
def xproto_django_options_str(field, dport=None):
output_dict = map_xproto_to_django(field)
- if (dport=='_'):
- dport = '+'
+ if dport == "_":
+ dport = "+"
- if (dport and (dport=='+' or '+' not in dport)):
- output_dict['related_name'] = '%r'%dport
+ if dport and (dport == "+" or "+" not in dport):
+ output_dict["related_name"] = "%r" % dport
return format_options_string(output_dict)
+
def xproto_camel_to_underscore(name):
- return re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+ return re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
+
def xproto_validations(options):
try:
- return [map(str.strip, validation.split(':')) for validation in unquote(options['validators']).split(',')]
+ return [
+ map(str.strip, validation.split(":"))
+ for validation in unquote(options["validators"]).split(",")
+ ]
except KeyError:
return []
+
def xproto_optioned_fields_to_list(fields, option, val):
"""
List all the field that have a particural option
@@ -207,14 +240,15 @@
optioned_fields = []
for f in fields:
option_names = []
- for k, v in f['options'].items():
+ for k, v in f["options"].items():
option_names.append(k)
- if option in option_names and f['options'][option] == val:
- optioned_fields.append(f['name'])
+ if option in option_names and f["options"][option] == val:
+ optioned_fields.append(f["name"])
return optioned_fields
+
# TODO
# - in modeldefs add info about this fields
# - update the gui to have this fields as readonly
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
index 73d04af..6d66117 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/fol2.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import astunparse
import ast
import random
@@ -21,18 +21,22 @@
import jinja2
from plyxproto.parser import *
-BINOPS = ['|', '&', '->']
-QUANTS = ['exists', 'forall']
+BINOPS = ["|", "&", "->"]
+QUANTS = ["exists", "forall"]
+
class PolicyException(Exception):
pass
+
class ConstructNotHandled(Exception):
pass
+
class TrivialPolicy(Exception):
pass
+
class AutoVariable:
def __init__(self, base):
self.base = base
@@ -42,25 +46,29 @@
return self
def next(self):
- var = 'i%d' % self.idx
+ var = "i%d" % self.idx
self.idx += 1
return var
+
def gen_random_string():
- return ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(5))
+ return "".join(
+ random.choice(string.ascii_lowercase + string.digits) for _ in range(5)
+ )
+
class FOL2Python:
def __init__(self, context_map=None):
# This will produce i0, i1, i2 etc.
- self.loopvar = iter(AutoVariable('i'))
- self.verdictvar = iter(AutoVariable('result'))
+ self.loopvar = iter(AutoVariable("i"))
+ self.verdictvar = iter(AutoVariable("result"))
self.loop_variable = self.loopvar.next()
self.verdict_variable = self.verdictvar.next()
self.context_map = context_map
if not self.context_map:
- self.context_map = {'user': 'self', 'obj': 'obj'}
+ self.context_map = {"user": "self", "obj": "obj"}
def loop_next(self):
self.loop_variable = self.loopvar.next()
@@ -72,12 +80,12 @@
pass
def format_term_for_query(self, model, term, django=False):
- if term.startswith(model + '.'):
+ if term.startswith(model + "."):
term = term[len(model) + 1:]
if django:
- term = term.replace('.', '__')
+ term = term.replace(".", "__")
else:
- term = '__elt' + '.' + term
+ term = "__elt" + "." + term
return term
def fol_to_python_filter(self, model, e, django=False, negate=False):
@@ -89,109 +97,114 @@
if django:
if negate:
# De Morgan's negation
- q_bracket = '~Q(%s)'
- or_expr = ','
- and_expr = '|'
+ q_bracket = "~Q(%s)"
+ or_expr = ","
+ and_expr = "|"
else:
- q_bracket = 'Q(%s)'
- or_expr = '|'
- and_expr = ','
+ q_bracket = "Q(%s)"
+ or_expr = "|"
+ and_expr = ","
else:
if negate:
# De Morgan's negation
- q_bracket = 'not %s'
- or_expr = ' and '
- and_expr = ' or '
+ q_bracket = "not %s"
+ or_expr = " and "
+ and_expr = " or "
else:
- q_bracket = '%s'
- or_expr = ' or '
- and_expr = ' and '
+ q_bracket = "%s"
+ or_expr = " or "
+ and_expr = " and "
- if k in ['=','in']:
- v = [self.format_term_for_query(
- model, term, django=django) for term in v]
+ if k in ["=", "in"]:
+ v = [self.format_term_for_query(model, term, django=django) for term in v]
if django:
- operator_map = {'=':' = ','in':'__in'}
+ operator_map = {"=": " = ", "in": "__in"}
else:
- operator_map = {'=':' == ','in':'in'}
+ operator_map = {"=": " == ", "in": "in"}
operator = operator_map[k]
return [q_bracket % operator.join(v)]
- elif k == '|':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
+ elif k == "|":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
return [or_expr.join(components)]
- elif k == '&':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
+ elif k == "&":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
return [and_expr.join(components)]
- elif k == '->':
- components = [self.fol_to_python_filter(
- model, x, django=django).pop() for x in v]
- return ['~%s | %s' % (components[0], components[1])]
+ elif k == "->":
+ components = [
+ self.fol_to_python_filter(model, x, django=django).pop() for x in v
+ ]
+ return ["~%s | %s" % (components[0], components[1])]
""" Convert a single leaf node from a string
to an AST"""
+
def str_to_ast(self, s):
ast_module = ast.parse(s)
return ast_module.body[0]
def reduce_operands(self, operands):
- if operands[0] in ['True','False']:
- return (operands[0],operands[1])
- elif operands[1] in ['True','False']:
- return (operands[1],operands[0])
+ if operands[0] in ["True", "False"]:
+ return (operands[0], operands[1])
+ elif operands[1] in ["True", "False"]:
+ return (operands[1], operands[0])
else:
return None
""" Simplify binops with constants """
+
def simplify_binop(self, binop):
- (k,v), = binop.items()
- if k == '->':
+ (k, v), = binop.items()
+ if k == "->":
lhs, rhs = v
- if lhs == 'True':
+ if lhs == "True":
return rhs
- elif rhs == 'True':
- return 'True'
- elif lhs == 'False':
- return 'True'
- elif rhs == 'False':
- return {'not': lhs}
+ elif rhs == "True":
+ return "True"
+ elif lhs == "False":
+ return "True"
+ elif rhs == "False":
+ return {"not": lhs}
var_expr = self.reduce_operands(v)
- if not var_expr: return binop
+ if not var_expr:
+ return binop
else:
constant, var = var_expr
- if k=='|':
- if constant=='True':
- return 'True'
- elif constant=='False':
+ if k == "|":
+ if constant == "True":
+ return "True"
+ elif constant == "False":
return var
else:
raise Exception("Internal error - variable read as constant")
- elif k=='&':
- if constant=='True':
+ elif k == "&":
+ if constant == "True":
return var
- elif constant=='False':
- return 'False'
+ elif constant == "False":
+ return "False"
def is_constant(self, var, fol):
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
-
- if k in ['python', 'policy']:
- # Treat as a constant and hoist, since it cannot be quantified
- return True
- elif k == 'term':
+
+ if k in ["python", "policy"]:
+ # Treat as a constant and hoist, since it cannot be quantified
+ return True
+ elif k == "term":
return not v.startswith(var)
- elif k == 'not':
+ elif k == "not":
return self.is_constant(var, fol)
- elif k in ['in', '=']:
+ elif k in ["in", "="]:
lhs, rhs = v
- return self.is_constant(var,lhs) and self.is_constant(var, rhs)
+ return self.is_constant(var, lhs) and self.is_constant(var, rhs)
elif k in BINOPS:
lhs, rhs = v
return self.is_constant(lhs, var) and self.is_constant(rhs, var)
@@ -205,21 +218,21 @@
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k in ['python', 'policy']:
- # Treat as a constant and hoist, since it cannot be quantified
- if fol not in constants:
- constants.append(fol)
- return constants
- elif k == 'term':
- if not v.startswith(var):
- constants.append(v)
- return constants
- elif k == 'not':
+ if k in ["python", "policy"]:
+ # Treat as a constant and hoist, since it cannot be quantified
+ if fol not in constants:
+ constants.append(fol)
+ return constants
+ elif k == "term":
+ if not v.startswith(var):
+ constants.append(v)
+ return constants
+ elif k == "not":
return self.find_constants(var, v, constants)
- elif k in ['in', '=']:
+ elif k in ["in", "="]:
lhs, rhs = v
if isinstance(lhs, str) and isinstance(rhs, str):
if not lhs.startswith(var) and not rhs.startswith(var):
@@ -235,32 +248,34 @@
return constants
elif k in QUANTS:
is_constant = self.is_constant(var, v[1])
- if is_constant: constants.append(fol)
+ if is_constant:
+ constants.append(fol)
return constants
else:
raise ConstructNotHandled(k)
""" Hoist constants out of quantifiers. Depth-first. """
+
def hoist_outer(self, fol):
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k in ['python', 'policy']:
- # Tainted, optimization and distribution not possible
- return fol
- elif k == 'term':
- return fol
- elif k == 'not':
+ if k in ["python", "policy"]:
+ # Tainted, optimization and distribution not possible
+ return fol
+ elif k == "term":
+ return fol
+ elif k == "not":
vprime = self.hoist_outer(v)
- return {'not': vprime}
- elif k in ['in', '=']:
+ return {"not": vprime}
+ elif k in ["in", "="]:
lhs, rhs = v
rlhs = self.hoist_outer(lhs)
rrhs = self.hoist_outer(rhs)
- return {k:[rlhs,rrhs]}
+ return {k: [rlhs, rrhs]}
elif k in BINOPS:
lhs, rhs = v
rlhs = self.hoist_outer(lhs)
@@ -271,7 +286,7 @@
return fol_simplified
elif k in QUANTS:
rexpr = self.hoist_outer(v[1])
- return self.hoist_quant(k, [v[0],rexpr])
+ return self.hoist_quant(k, [v[0], rexpr])
else:
raise ConstructNotHandled(k)
@@ -282,27 +297,29 @@
try:
(k, v), = fol.items()
except AttributeError:
- k = 'term'
+ k = "term"
v = fol
- if k == 'term':
- if v == c: return value
- else: return v
- elif k == 'not':
+ if k == "term":
+ if v == c:
+ return value
+ else:
+ return v
+ elif k == "not":
new_expr = self.replace_const(v, c, value)
- if new_expr=='True':
- return 'False'
- elif new_expr=='False':
- return 'True'
- else:
- return {'not': new_expr}
- elif k in ['in', '=']:
+ if new_expr == "True":
+ return "False"
+ elif new_expr == "False":
+ return "True"
+ else:
+ return {"not": new_expr}
+ elif k in ["in", "="]:
lhs, rhs = v
rlhs = self.replace_const(lhs, c, value)
rrhs = self.replace_const(rhs, c, value)
- if rlhs==rrhs:
- return 'True'
+ if rlhs == rrhs:
+ return "True"
else:
return {k: [rlhs, rrhs]}
elif k in BINOPS:
@@ -310,12 +327,12 @@
rlhs = self.replace_const(lhs, c, value)
rrhs = self.replace_const(rhs, c, value)
-
- return self.simplify_binop({k:[rlhs,rrhs]})
+
+ return self.simplify_binop({k: [rlhs, rrhs]})
elif k in QUANTS:
var, expr = v
new_expr = self.replace_const(expr, c, value)
- if new_expr in ['True', 'False']:
+ if new_expr in ["True", "False"]:
return new_expr
else:
return {k: [var, new_expr]}
@@ -323,16 +340,16 @@
raise ConstructNotHandled(k)
def shannon_expand(self, c, fol):
- lhs = self.replace_const(fol, c, 'True')
- rhs = self.replace_const(fol, c, 'False')
- not_c = {'not': c}
- rlhs = {'&': [c, lhs]}
+ lhs = self.replace_const(fol, c, "True")
+ rhs = self.replace_const(fol, c, "False")
+ not_c = {"not": c}
+ rlhs = {"&": [c, lhs]}
rlhs = self.simplify_binop(rlhs)
- rrhs = {'&': [not_c, rhs]}
+ rrhs = {"&": [not_c, rhs]}
rrhs = self.simplify_binop(rrhs)
- combined = {'|': [rlhs, rrhs]}
+ combined = {"|": [rlhs, rrhs]}
return self.simplify_binop(combined)
def hoist_quant(self, k, expr):
@@ -418,17 +435,24 @@
if not tag:
tag = gen_random_string()
- policy_function_name_template = 'policy_%s_' + '%(random_string)s' % {'random_string': tag}
+ policy_function_name_template = "policy_%s_" + "%(random_string)s" % {
+ "random_string": tag
+ }
policy_function_name = policy_function_name_template % policy_name
self.verdict_next()
function_str = """
def %(fn_name)s(obj, ctx):
if not %(vvar)s: raise XOSValidationError("%(message)s".format(obj=obj, ctx=ctx))
- """ % {'fn_name': policy_function_name, 'vvar': self.verdict_variable, 'message': message}
+ """ % {
+ "fn_name": policy_function_name,
+ "vvar": self.verdict_variable,
+ "message": message,
+ }
function_ast = self.str_to_ast(function_str)
- policy_code = self.gen_test(policy_function_name_template, fol, self.verdict_variable)
-
+ policy_code = self.gen_test(
+ policy_function_name_template, fol, self.verdict_variable
+ )
function_ast.body = [policy_code] + function_ast.body
@@ -438,17 +462,24 @@
if not tag:
tag = gen_random_string()
- policy_function_name_template = '%s_' + '%(random_string)s' % {'random_string': tag}
+ policy_function_name_template = "%s_" + "%(random_string)s" % {
+ "random_string": tag
+ }
policy_function_name = policy_function_name_template % policy_name
self.verdict_next()
function_str = """
def %(fn_name)s(obj, ctx):
return %(vvar)s
- """ % {'fn_name': policy_function_name, 'vvar': self.verdict_variable}
+ """ % {
+ "fn_name": policy_function_name,
+ "vvar": self.verdict_variable,
+ }
function_ast = self.str_to_ast(function_str)
- policy_code = self.gen_test(policy_function_name_template, fol, self.verdict_variable)
+ policy_code = self.gen_test(
+ policy_function_name_template, fol, self.verdict_variable
+ )
function_ast.body = [policy_code] + function_ast.body
@@ -456,11 +487,14 @@
def gen_test(self, fn_template, fol, verdict_var, bindings=None):
if isinstance(fol, str):
- return self.str_to_ast('%(verdict_var)s = %(constant)s' % {'verdict_var': verdict_var, 'constant': fol})
+ return self.str_to_ast(
+ "%(verdict_var)s = %(constant)s"
+ % {"verdict_var": verdict_var, "constant": fol}
+ )
(k, v), = fol.items()
- if k == 'policy':
+ if k == "policy":
policy_name, object_name = v
policy_fn = fn_template % policy_name
@@ -470,39 +504,48 @@
else:
# Everybody has access to null objects
%(verdict_var)s = True
- """ % {'verdict_var': verdict_var, 'policy_fn': policy_fn, 'object_name': object_name}
+ """ % {
+ "verdict_var": verdict_var,
+ "policy_fn": policy_fn,
+ "object_name": object_name,
+ }
call_ast = self.str_to_ast(call_str)
return call_ast
- if k == 'python':
+ if k == "python":
try:
expr_ast = self.str_to_ast(v)
except SyntaxError:
- raise PolicyException('Syntax error in %s' % v)
+ raise PolicyException("Syntax error in %s" % v)
if not isinstance(expr_ast, ast.Expr):
- raise PolicyException(
- '%s is not an expression' % expr_ast)
+ raise PolicyException("%s is not an expression" % expr_ast)
assignment_str = """
%(verdict_var)s = (%(escape_expr)s)
- """ % {'verdict_var': verdict_var, 'escape_expr': v}
+ """ % {
+ "verdict_var": verdict_var,
+ "escape_expr": v,
+ }
assignment_ast = self.str_to_ast(assignment_str)
return assignment_ast
- elif k == 'not':
+ elif k == "not":
top_vvar = verdict_var
self.verdict_next()
sub_vvar = self.verdict_variable
block = self.gen_test(fn_template, v, sub_vvar)
assignment_str = """
%(verdict_var)s = not (%(subvar)s)
- """ % {'verdict_var': top_vvar, 'subvar': sub_vvar}
+ """ % {
+ "verdict_var": top_vvar,
+ "subvar": sub_vvar,
+ }
assignment_ast = self.str_to_ast(assignment_str)
return ast.Module(body=[block, assignment_ast])
- elif k in ['=','in']:
+ elif k in ["=", "in"]:
# This is the simplest case, we don't recurse further
# To use terms that are not simple variables, use
# the Python escape, e.g. {{ slice.creator is not None }}
@@ -512,7 +555,7 @@
try:
for t in lhs, rhs:
- py_expr = t['python']
+ py_expr = t["python"]
self.verdict_next()
vv = self.verdict_variable
@@ -520,15 +563,17 @@
try:
expr_ast = self.str_to_ast(py_expr)
except SyntaxError:
- raise PolicyException('Syntax error in %s' % v)
+ raise PolicyException("Syntax error in %s" % v)
if not isinstance(expr_ast, ast.Expr):
- raise PolicyException(
- '%s is not an expression' % expr_ast)
+ raise PolicyException("%s is not an expression" % expr_ast)
assignment_str = """
%(verdict_var)s = (%(escape_expr)s)
- """ % {'verdict_var': vv, 'escape_expr': py_expr}
+ """ % {
+ "verdict_var": vv,
+ "escape_expr": py_expr,
+ }
if t == lhs:
lhs = vv
@@ -540,14 +585,19 @@
except TypeError:
pass
- if k=='=':
- operator='=='
- elif k=='in':
- operator='in'
+ if k == "=":
+ operator = "=="
+ elif k == "in":
+ operator = "in"
comparison_str = """
%(verdict_var)s = (%(lhs)s %(operator)s %(rhs)s)
- """ % {'verdict_var': verdict_var, 'lhs': lhs, 'rhs': rhs, 'operator':operator}
+ """ % {
+ "verdict_var": verdict_var,
+ "lhs": lhs,
+ "rhs": rhs,
+ "operator": operator,
+ }
comparison_ast = self.str_to_ast(comparison_str)
combined_ast = ast.Module(body=assignments + [comparison_ast])
@@ -567,24 +617,30 @@
lblock = self.gen_test(fn_template, lhs, lvar)
rblock = self.gen_test(fn_template, rhs, rvar)
- invert = ''
- if k == '&':
- binop = 'and'
- elif k == '|':
- binop = 'or'
- elif k == '->':
- binop = 'or'
- invert = 'not'
+ invert = ""
+ if k == "&":
+ binop = "and"
+ elif k == "|":
+ binop = "or"
+ elif k == "->":
+ binop = "or"
+ invert = "not"
binop_str = """
%(verdict_var)s = %(invert)s %(lvar)s %(binop)s %(rvar)s
- """ % {'verdict_var': top_vvar, 'invert': invert, 'lvar': lvar, 'binop': binop, 'rvar': rvar}
+ """ % {
+ "verdict_var": top_vvar,
+ "invert": invert,
+ "lvar": lvar,
+ "binop": binop,
+ "rvar": rvar,
+ }
binop_ast = self.str_to_ast(binop_str)
combined_ast = ast.Module(body=[lblock, rblock, binop_ast])
return combined_ast
- elif k == 'exists':
+ elif k == "exists":
# If the variable starts with a capital letter,
# we assume that it is a model. If it starts with
# a small letter, we assume it is an enumerable
@@ -599,7 +655,11 @@
python_str = """
%(verdict_var)s = not not %(model)s.objects.filter(%(query)s)
- """ % {'verdict_var': verdict_var, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": verdict_var,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
else:
@@ -608,16 +668,20 @@
python_str = """
%(verdict_var)s = filter(lambda __elt:%(query)s, %(model)s)
- """ % {'verdict_var': verdict_var, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": verdict_var,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
return python_ast
- elif k=='forall':
+ elif k == "forall":
var, expr = v
if var.istitle():
- f = self.fol_to_python_filter(var, expr, django=True, negate = True)
+ f = self.fol_to_python_filter(var, expr, django=True, negate=True)
entry = f.pop()
self.verdict_next()
@@ -625,71 +689,95 @@
python_str = """
%(verdict_var)s = not not %(model)s.objects.filter(%(query)s)
- """ % {'verdict_var': vvar, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": vvar,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
else:
- f = self.fol_to_python_filter(var, expr, django=False, negate = True)
+ f = self.fol_to_python_filter(var, expr, django=False, negate=True)
entry = f.pop()
python_str = """
%(verdict_var)s = next(elt for elt in %(model)s if %(query)s)
- """ % {'verdict_var': vvar, 'model': var, 'query': entry}
+ """ % {
+ "verdict_var": vvar,
+ "model": var,
+ "query": entry,
+ }
python_ast = ast.parse(python_str)
negate_str = """
%(verdict_var)s = not %(vvar)s
- """ % {'verdict_var': verdict_var, 'vvar': vvar}
+ """ % {
+ "verdict_var": verdict_var,
+ "vvar": vvar,
+ }
negate_ast = ast.parse(negate_str)
return ast.Module(body=[python_ast, negate_ast])
+
def xproto_fol_to_python_test(policy, fol, model, tag=None):
if isinstance(fol, jinja2.Undefined):
- raise Exception('Could not find policy:', policy)
+ raise Exception("Could not find policy:", policy)
f2p = FOL2Python()
fol_reduced = f2p.hoist_outer(fol)
- if fol_reduced in ['True','False'] and fol != fol_reduced:
- raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+ if fol_reduced in ["True", "False"] and fol != fol_reduced:
+ raise TrivialPolicy(
+ "Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s" % {
+ "name": policy,
+ "reduced": fol_reduced})
- a = f2p.gen_test_function(fol_reduced, policy, tag='security_check')
+ a = f2p.gen_test_function(fol_reduced, policy, tag="security_check")
return astunparse.unparse(a)
+
def xproto_fol_to_python_validator(policy, fol, model, message, tag=None):
if isinstance(fol, jinja2.Undefined):
- raise Exception('Could not find policy:', policy)
+ raise Exception("Could not find policy:", policy)
f2p = FOL2Python()
fol_reduced = f2p.hoist_outer(fol)
- if fol_reduced in ['True','False'] and fol != fol_reduced:
- raise TrivialPolicy("Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s"%{'name':policy, 'reduced':fol_reduced})
+ if fol_reduced in ["True", "False"] and fol != fol_reduced:
+ raise TrivialPolicy(
+ "Policy %(name)s trivially reduces to %(reduced)s. If this is what you want, replace its contents with %(reduced)s" % {
+ "name": policy,
+ "reduced": fol_reduced})
- a = f2p.gen_validation_function(fol_reduced, policy, message, tag='validator')
-
+ a = f2p.gen_validation_function(fol_reduced, policy, message, tag="validator")
+
return astunparse.unparse(a)
+
def main():
while True:
- inp = ''
+ inp = ""
while True:
inp_line = raw_input()
- if inp_line=='EOF': break
- else: inp+=inp_line
-
+ if inp_line == "EOF":
+ break
+ else:
+ inp += inp_line
+
fol_lexer = lex.lex(module=FOLLexer())
- fol_parser = yacc.yacc(module=FOLParser(), start='goal', outputdir='/tmp', debug=0)
+ fol_parser = yacc.yacc(
+ module=FOLParser(), start="goal", outputdir="/tmp", debug=0
+ )
val = fol_parser.parse(inp, lexer=fol_lexer)
- a = xproto_fol_to_python_test('pol', val, 'output', 'Test')
- #f2p = FOL2Python()
- #a = f2p.hoist_outer(val)
- print a
+ a = xproto_fol_to_python_test("pol", val, "output", "Test")
+ # f2p = FOL2Python()
+ # a = f2p.hoist_outer(val)
+ print(a)
if __name__ == "__main__":
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/gui.py b/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
index 50bcf0e..245bbda 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/gui.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,74 +15,78 @@
from base import xproto_string_type, unquote
+
def xproto_type_to_ui_type(f):
try:
- content_type = f['options']['content_type']
+ content_type = f["options"]["content_type"]
content_type = eval(content_type)
- except:
+ except BaseException:
content_type = None
pass
- if 'choices' in f['options']:
- return 'select';
- elif content_type == 'date':
- return 'date'
- elif f['type'] == 'bool':
- return 'boolean'
- elif f['type'] == 'string':
- return xproto_string_type(f['options'])
- elif f['type'] in ['int','uint32','int32'] or 'link' in f:
- return 'number'
- elif f['type'] in ['double','float']:
- return 'string'
+ if "choices" in f["options"]:
+ return "select"
+ elif content_type == "date":
+ return "date"
+ elif f["type"] == "bool":
+ return "boolean"
+ elif f["type"] == "string":
+ return xproto_string_type(f["options"])
+ elif f["type"] in ["int", "uint32", "int32"] or "link" in f:
+ return "number"
+ elif f["type"] in ["double", "float"]:
+ return "string"
+
def xproto_options_choices_to_dict(choices):
list = []
for c in eval(choices):
- list.append({'id': c[0], 'label': c[1]})
+ list.append({"id": c[0], "label": c[1]})
if len(list) > 0:
return list
else:
return None
+
def xproto_validators(f):
# To be cleaned up when we formalize validation in xproto
validators = []
# bound-based validators
- bound_validators = [('max_length','maxlength'), ('min', 'min'), ('max', 'max')]
+ bound_validators = [("max_length", "maxlength"), ("min", "min"), ("max", "max")]
for v0, v1 in bound_validators:
try:
- validators.append({'name':v1, 'int_value':int(f['options'][v0])})
+ validators.append({"name": v1, "int_value": int(f["options"][v0])})
except KeyError:
pass
# validators based on content_type
- content_type_validators = ['ip', 'url', 'email']
+ content_type_validators = ["ip", "url", "email"]
for v in content_type_validators:
- #if f['name']=='ip': pdb.set_trace()
+ # if f['name']=='ip': pdb.set_trace()
try:
- val = unquote(f['options']['content_type'])==v
+ val = unquote(f["options"]["content_type"]) == v
if not val:
raise KeyError
- validators.append({'name':v, 'bool_value': True})
+ validators.append({"name": v, "bool_value": True})
except KeyError:
pass
# required validator
try:
- required = f['options']['blank']=='False' and f['options']['null']=='False'
+ required = f["options"]["blank"] == "False" and f["options"]["null"] == "False"
if required:
- validators.append({'name':'required', 'bool_value':required})
+ validators.append({"name": "required", "bool_value": required})
except KeyError:
pass
return validators
+
def is_number(s):
try:
float(s)
@@ -91,16 +94,17 @@
except ValueError:
return False
+
def xproto_default_to_gui(default):
val = "null"
if is_number(default):
val = str(default)
- elif eval(default) == True:
- val = 'true'
- elif eval(default) == False:
- val = 'false'
- elif eval(default) == None:
- val = 'null'
+ elif eval(default) is True:
+ val = "true"
+ elif eval(default) is False:
+ val = "false"
+ elif eval(default) is None:
+ val = "null"
else:
val = str(default)
return val
@@ -111,17 +115,20 @@
seen = []
for l in llst:
try:
- t = l['link_type']
- except KeyError, e:
+ t = l["link_type"]
+ except KeyError as e:
raise e
- if l['peer']['fqn'] not in seen and t != 'manytomany':
- on_field = 'null'
- if l['link_type'] == 'manytoone':
- on_field = l['src_port']
- elif l['link_type'] == 'onetomany':
- on_field = l['dst_port']
- outlist.append('- {model: %s, type: %s, on_field: %s}\n' % (l['peer']['name'], l['link_type'], on_field))
- seen.append(l['peer'])
+ if l["peer"]["fqn"] not in seen and t != "manytomany":
+ on_field = "null"
+ if l["link_type"] == "manytoone":
+ on_field = l["src_port"]
+ elif l["link_type"] == "onetomany":
+ on_field = l["dst_port"]
+ outlist.append(
+ "- {model: %s, type: %s, on_field: %s}\n"
+ % (l["peer"]["name"], l["link_type"], on_field)
+ )
+ seen.append(l["peer"])
return outlist
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py b/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
index 996d63d..d8eada7 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/tosca.py
@@ -14,12 +14,14 @@
from xosgenx.jinja2_extensions import xproto_field_graph_components
+
def xproto_tosca_required(null, blank, default=None):
- if null == 'True' or blank == 'True' or default != 'False':
+ if null == "True" or blank == "True" or default != "False":
return "false"
return "true"
+
def xproto_tosca_field_type(type):
"""
TOSCA requires fields of type 'bool' to be 'boolean'
@@ -33,30 +35,41 @@
else:
return type
+
def xproto_fields_to_tosca_keys(fields, m):
keys = []
# look for one_of keys
- _one_of = xproto_field_graph_components(fields, m, 'tosca_key_one_of')
+ _one_of = xproto_field_graph_components(fields, m, "tosca_key_one_of")
one_of = [list(i) for i in _one_of]
# look for explicit keys
for f in fields:
- if 'tosca_key' in f['options'] and f['options']['tosca_key'] and 'link' not in f:
- keys.append(f['name'])
- if 'tosca_key' in f['options'] and f['options']['tosca_key'] and ('link' in f and f['link']):
- keys.append("%s_id" % f['name'])
+ if (
+ "tosca_key" in f["options"]
+ and f["options"]["tosca_key"]
+ and "link" not in f
+ ):
+ keys.append(f["name"])
+ if (
+ "tosca_key" in f["options"]
+ and f["options"]["tosca_key"]
+ and ("link" in f and f["link"])
+ ):
+ keys.append("%s_id" % f["name"])
# if not keys are specified and there is a name field, use that as key.
- if len(keys) == 0 and 'name' in map(lambda f: f['name'], fields):
- keys.append('name')
+ if len(keys) == 0 and "name" in map(lambda f: f["name"], fields):
+ keys.append("name")
for of in one_of:
# check if the field is a link, and in case add _id
for index, f in enumerate(of):
try:
- field = [x for x in fields if x['name'] == f and ('link' in x and x['link'])][0]
+ field = [
+ x for x in fields if x["name"] == f and ("link" in x and x["link"])
+ ][0]
of[index] = "%s_id" % f
- except IndexError, e:
+ except IndexError as e:
# the field is not a link
pass
diff --git a/lib/xos-genx/xosgenx/proto2xproto.py b/lib/xos-genx/xosgenx/proto2xproto.py
index d2bde67..f1dc959 100644
--- a/lib/xos-genx/xosgenx/proto2xproto.py
+++ b/lib/xos-genx/xosgenx/proto2xproto.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,78 +26,90 @@
import ply.lex as lex
import ply.yacc as yacc
+
class Stack(list):
- def push(self,x):
+ def push(self, x):
self.append(x)
+
def str_to_dict(s):
- lst = s.rsplit('.',1)
+ lst = s.rsplit(".", 1)
name = lst[-1]
- if len(lst)==2:
+ if len(lst) == 2:
package = lst[0]
else:
- package = ''
+ package = ""
- return {'name': name, 'package': package, 'fqn': s}
+ return {"name": name, "package": package, "fqn": s}
def replace_link(obj):
+ try:
+ link = obj.link
try:
- link = obj.link
- try:
- through = link['through']
- except KeyError:
- through = None
+ through = link["through"]
+ except KeyError:
+ through = None
- try:
- through_str = through[1:-1]
- except TypeError:
- through_str = None
+ try:
+ through_str = through[1:-1]
+ except TypeError:
+ through_str = None
- if through_str:
- through_dict = str_to_dict(through_str)
- else:
- through_dict = {}
+ if through_str:
+ through_dict = str_to_dict(through_str)
+ else:
+ through_dict = {}
- model_dict = str_to_dict(link['model'][1:-1])
+ model_dict = str_to_dict(link["model"][1:-1])
- ls = m.LinkSpec(obj, m.LinkDefinition(link['link'][1:-1],obj.name,model_dict,link['port'][1:-1],through_dict))
- return ls
- except:
- return obj
+ ls = m.LinkSpec(
+ obj,
+ m.LinkDefinition(
+ link["link"][1:-1],
+ obj.name,
+ model_dict,
+ link["port"][1:-1],
+ through_dict,
+ ),
+ )
+ return ls
+ except BaseException:
+ return obj
+
class Proto2XProto(Visitor):
fol_lexer = lex.lex(module=FOLLexer())
- fol_parser = yacc.yacc(module=FOLParser(), start='goal', debug=0, outputdir='/tmp')
+ fol_parser = yacc.yacc(module=FOLParser(), start="goal", debug=0, outputdir="/tmp")
def __init__(self):
super(Proto2XProto, self).__init__()
self.stack = Stack()
self.count_stack = Stack()
- self.content=""
- self.offset=0
- self.statementsChanged=0
+ self.content = ""
+ self.offset = 0
+ self.statementsChanged = 0
self.message_options = {}
self.options = {}
self.current_message_name = None
- self.xproto_message_options = ['bases']
- self.xproto_field_options = ['model']
+ self.xproto_message_options = ["bases"]
+ self.xproto_field_options = ["model"]
self.verbose = 0
self.first_field = True
self.first_method = True
-
+
def replace_policy(self, obj):
if isinstance(obj, m.OptionStatement):
rhs = obj.value.value.pval
if rhs.startswith('"') and rhs.endswith('"'):
rhs = rhs[1:-1]
- if rhs.startswith('policy:'):
- str = rhs.split(':',1)[1]
- val = self.fol_parser.parse(str, lexer = self.fol_lexer)
+ if rhs.startswith("policy:"):
+ str = rhs.split(":", 1)[1]
+ val = self.fol_parser.parse(str, lexer=self.fol_lexer)
return m.PolicyDefinition(obj.name, val)
@@ -110,9 +121,9 @@
for fd in obj.fieldDirective:
k = fd.pval.name.value.pval
v = fd.pval.value.value.pval
- opts[k]=v
+ opts[k] = v
- if ('model' in opts and 'link' in opts and 'port' in opts):
+ if "model" in opts and "link" in opts and "port" in opts:
obj.link = opts
pass
except KeyError:
@@ -121,41 +132,42 @@
def proto_to_xproto_message(self, obj):
try:
try:
- bases = self.message_options['bases'].split(',')
+ bases = self.message_options["bases"].split(",")
except KeyError:
bases = []
- bases = map(lambda x:str_to_dict(x[1:-1]), bases)
+ bases = map(lambda x: str_to_dict(x[1:-1]), bases)
obj.bases = bases
except KeyError:
raise
def map_field(self, obj, s):
- if 'model' in s:
- link = m.LinkDefinition('onetoone','src','name','dst', obj.linespan, obj.lexspan, obj.p)
+ if "model" in s:
+ link = m.LinkDefinition(
+ "onetoone", "src", "name", "dst", obj.linespan, obj.lexspan, obj.p
+ )
lspec = m.LinkSpec(link, obj)
else:
lspec = obj
return lspec
-
def get_stack(self):
return self.stack
def visit_PackageStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_ImportStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_OptionStatement(self, obj):
- if (self.current_message_name):
+ if self.current_message_name:
k = obj.name.value.pval
self.message_options[k] = obj.value.value.pval
- if (k in self.xproto_message_options):
- obj.mark_for_deletion = True
+ if k in self.xproto_message_options:
+ obj.mark_for_deletion = True
else:
self.options[obj.name.value.pval] = obj.value.value.pval
@@ -197,10 +209,10 @@
self.message_options = {}
return True
-
+
def visit_MessageDefinition_post(self, obj):
self.proto_to_xproto_message(obj)
- obj.body = filter(lambda x:not hasattr(x, 'mark_for_deletion'), obj.body)
+ obj.body = filter(lambda x: not hasattr(x, "mark_for_deletion"), obj.body)
obj.body = map(replace_link, obj.body)
self.current_message_name = None
@@ -230,7 +242,7 @@
def visit_Proto(self, obj):
self.count_stack.push(len(obj.body))
return True
-
+
def visit_Proto_post(self, obj):
obj.body = [self.replace_policy(o) for o in obj.body]
diff --git a/lib/xos-genx/xosgenx/version.py b/lib/xos-genx/xosgenx/version.py
index a118c43..79c5702 100644
--- a/lib/xos-genx/xosgenx/version.py
+++ b/lib/xos-genx/xosgenx/version.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
diff --git a/lib/xos-genx/xosgenx/xos2jinja.py b/lib/xos-genx/xosgenx/xos2jinja.py
index aca2468..471be93 100644
--- a/lib/xos-genx/xosgenx/xos2jinja.py
+++ b/lib/xos-genx/xosgenx/xos2jinja.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,6 +13,7 @@
# limitations under the License.
+from __future__ import print_function
import plyxproto.model as m
from plyxproto.helpers import Visitor
import argparse
@@ -25,15 +25,20 @@
import copy
import pdb
+
class MissingPolicyException(Exception):
pass
+
def find_missing_policy_calls(name, policies, policy):
if isinstance(policy, dict):
(k, lst), = policy.items()
- if k=='policy':
+ if k == "policy":
policy_name = lst[0]
- if policy_name not in policies: raise MissingPolicyException("Policy %s invoked missing policy %s"%(name, policy_name))
+ if policy_name not in policies:
+ raise MissingPolicyException(
+ "Policy %s invoked missing policy %s" % (name, policy_name)
+ )
else:
for p in lst:
find_missing_policy_calls(name, policies, p)
@@ -41,26 +46,27 @@
for p in lst:
find_missing_policy_calls(name, policies, p)
+
def dotname_to_fqn(dotname):
b_names = [part.pval for part in dotname]
- package = '.'.join(b_names[:-1])
+ package = ".".join(b_names[:-1])
name = b_names[-1]
if package:
- fqn = package + '.' + name
+ fqn = package + "." + name
else:
fqn = name
- return {'name': name, 'fqn': fqn, 'package': package}
+ return {"name": name, "fqn": fqn, "package": package}
def dotname_to_name(dotname):
b_names = [part.pval for part in dotname]
- return '.'.join(b_names)
+ return ".".join(b_names)
def count_messages(body):
count = 0
for e in body:
- if (type(e) == m.MessageDefinition):
+ if isinstance(e, m.MessageDefinition):
count += 1
return count
@@ -68,7 +74,7 @@
def count_fields(body):
count = 0
for e in body:
- if (type(e) in [m.LinkDefinition, m.FieldDefinition, m.LinkSpec]):
+ if type(e) in [m.LinkDefinition, m.FieldDefinition, m.LinkSpec]:
count += 1
return count
@@ -90,8 +96,9 @@
self.append(x)
-''' XOS2Jinja overrides the underlying visitor pattern to transform the tree
- in addition to traversing it '''
+""" XOS2Jinja overrides the underlying visitor pattern to transform the tree
+ in addition to traversing it """
+
class XOS2Jinja(Visitor):
def __init__(self, args):
@@ -114,7 +121,7 @@
def visit_PolicyDefinition(self, obj):
if self.package:
- pname = '.'.join([self.package, obj.name.value.pval])
+ pname = ".".join([self.package, obj.name.value.pval])
else:
pname = obj.name.value.pval
@@ -126,17 +133,17 @@
def visit_PackageStatement(self, obj):
dotlist = obj.name.value
dotlist2 = [f.pval for f in dotlist]
- dotstr = '.'.join(dotlist2)
+ dotstr = ".".join(dotlist2)
self.package = dotstr
return True
def visit_ImportStatement(self, obj):
- '''Ignore'''
+ """Ignore"""
return True
def visit_OptionStatement(self, obj):
- if not hasattr(obj, 'mark_for_deletion'):
- if (self.current_message_name):
+ if not hasattr(obj, "mark_for_deletion"):
+ if self.current_message_name:
self.message_options[obj.name.value.pval] = obj.value.value.pval
else:
self.options[obj.name.value.pval] = obj.value.value.pval
@@ -159,7 +166,7 @@
except AttributeError:
name = obj.name.value
- if type(obj.value) == list:
+ if isinstance(obj.value, list):
value = dotname_to_name(obj.value)
else:
value = name_to_value(obj)
@@ -168,52 +175,52 @@
return True
def visit_FieldType(self, obj):
- '''Field type, if type is name, then it may need refactoring consistent with refactoring rules according to the table'''
+ """Field type, if type is name, then it may need refactoring consistent with refactoring rules according to the table"""
return True
def visit_LinkDefinition(self, obj):
s = {}
try:
- s['link_type'] = obj.link_type.pval
+ s["link_type"] = obj.link_type.pval
except AttributeError:
- s['link_type'] = obj.link_type
+ s["link_type"] = obj.link_type
- s['src_port'] = obj.src_port.value.pval
- s['name'] = obj.src_port.value.pval
+ s["src_port"] = obj.src_port.value.pval
+ s["name"] = obj.src_port.value.pval
try:
- s['policy'] = obj.policy.pval
+ s["policy"] = obj.policy.pval
except AttributeError:
- s['policy'] = None
+ s["policy"] = None
try:
- s['dst_port'] = obj.dst_port.value.pval
+ s["dst_port"] = obj.dst_port.value.pval
except AttributeError:
- s['dst_port'] = obj.dst_port
+ s["dst_port"] = obj.dst_port
- if type(obj.through) == list:
- s['through'] = dotname_to_fqn(obj.through)
+ if isinstance(obj.through, list):
+ s["through"] = dotname_to_fqn(obj.through)
else:
try:
- s['through'] = obj.through.pval
+ s["through"] = obj.through.pval
except AttributeError:
- s['through'] = obj.through
+ s["through"] = obj.through
- if type(obj.name) == list:
- s['peer'] = dotname_to_fqn(obj.name)
+ if isinstance(obj.name, list):
+ s["peer"] = dotname_to_fqn(obj.name)
else:
try:
- s['peer'] = obj.name.pval
+ s["peer"] = obj.name.pval
except AttributeError:
- s['peer'] = obj.name
+ s["peer"] = obj.name
try:
- s['reverse_id'] = obj.reverse_id.pval
+ s["reverse_id"] = obj.reverse_id.pval
except AttributeError:
- s['reverse_id'] = obj.reverse_id
+ s["reverse_id"] = obj.reverse_id
- s['_type'] = 'link'
- s['options'] = {'modifier': 'optional'}
+ s["_type"] = "link"
+ s["options"] = {"modifier": "optional"}
self.stack.push(s)
return True
@@ -226,21 +233,21 @@
s = {}
if isinstance(obj.ftype, m.Name):
- s['type'] = obj.ftype.value
+ s["type"] = obj.ftype.value
else:
- s['type'] = obj.ftype.name.pval
+ s["type"] = obj.ftype.name.pval
- s['name'] = obj.name.value.pval
+ s["name"] = obj.name.value.pval
try:
- s['policy'] = obj.policy.pval
+ s["policy"] = obj.policy.pval
except AttributeError:
- s['policy'] = None
+ s["policy"] = None
- s['modifier'] = obj.field_modifier.pval
- s['id'] = obj.fieldId.pval
+ s["modifier"] = obj.field_modifier.pval
+ s["id"] = obj.fieldId.pval
- opts = {'modifier': s['modifier']}
+ opts = {"modifier": s["modifier"]}
n = self.count_stack.pop()
for i in range(0, n):
k, v = self.stack.pop()
@@ -253,28 +260,28 @@
opts[k] = v
- s['options'] = opts
+ s["options"] = opts
try:
- last_link = self.stack[-1]['_type']
- if (last_link == 'link'):
- s['link'] = True
- except:
+ last_link = self.stack[-1]["_type"]
+ if last_link == "link":
+ s["link"] = True
+ except BaseException:
pass
- s['_type'] = 'field'
+ s["_type"] = "field"
self.stack.push(s)
return True
def visit_EnumFieldDefinition(self, obj):
if self.verbose > 4:
- print "\tEnumField: name=%s, %s" % (obj.name, obj)
+ print("\tEnumField: name=%s, %s" % (obj.name, obj))
return True
def visit_EnumDefinition(self, obj):
- '''New enum definition, refactor name'''
+ """New enum definition, refactor name"""
if self.verbose > 3:
- print "Enum, [%s] body=%s\n\n" % (obj.name, obj.body)
+ print("Enum, [%s] body=%s\n\n" % (obj.name, obj.body))
return True
@@ -297,28 +304,39 @@
last_field = {}
for i in range(0, stack_num):
f = self.stack.pop()
- if (f['_type'] == 'link'):
- f['options'] = {i: d[i] for d in [f['options'], last_field['options']] for i in d}
- assert (last_field == fields[0])
- fields[0].setdefault('options', {})['link_type'] = f['link_type']
+ if f["_type"] == "link":
+ f["options"] = {
+ i: d[i] for d in [f["options"], last_field["options"]] for i in d
+ }
+ assert last_field == fields[0]
+ fields[0].setdefault("options", {})["link_type"] = f["link_type"]
links.insert(0, f)
else:
fields.insert(0, f)
last_field = f
if self.package:
- model_name = '.'.join([self.package, obj.name.value.pval])
+ model_name = ".".join([self.package, obj.name.value.pval])
else:
model_name = obj.name.value.pval
- model_def = {'name':obj.name.value.pval,'fields':fields,'links':links, 'bases':obj.bases, 'options':self.message_options, 'package':self.package, 'fqn': model_name, 'rlinks': []}
+ model_def = {
+ "name": obj.name.value.pval,
+ "fields": fields,
+ "links": links,
+ "bases": obj.bases,
+ "options": self.message_options,
+ "package": self.package,
+ "fqn": model_name,
+ "rlinks": [],
+ }
try:
- model_def['policy'] = obj.policy.pval
+ model_def["policy"] = obj.policy.pval
except AttributeError:
- model_def['policy'] = None
+ model_def["policy"] = None
self.stack.push(model_def)
-
+
self.models[model_name] = model_def
# Set message options
@@ -382,40 +400,53 @@
rev_links = {}
link_opposite = {
- 'manytomany': 'manytomany',
- 'manytoone': 'onetomany',
- 'onetoone': 'onetoone',
- 'onetomany': 'manytoone'
+ "manytomany": "manytomany",
+ "manytoone": "onetomany",
+ "onetoone": "onetoone",
+ "onetomany": "manytoone",
}
for m in messages:
- for l in m['links']:
+ for l in m["links"]:
rlink = copy.deepcopy(l)
- rlink['_type'] = 'rlink' # An implicit link, not declared in the model
- rlink['src_port'] = l['dst_port']
- rlink['dst_port'] = l['src_port']
- rlink['peer'] = {'name': m['name'], 'package': m['package'], 'fqn': m['fqn']}
- rlink['link_type'] = link_opposite[l['link_type']]
- rlink["reverse_id"] = l['reverse_id']
+ rlink["_type"] = "rlink" # An implicit link, not declared in the model
+ rlink["src_port"] = l["dst_port"]
+ rlink["dst_port"] = l["src_port"]
+ rlink["peer"] = {
+ "name": m["name"],
+ "package": m["package"],
+ "fqn": m["fqn"],
+ }
+ rlink["link_type"] = link_opposite[l["link_type"]]
+ rlink["reverse_id"] = l["reverse_id"]
- if (not l['reverse_id']) and (self.args.verbosity >= 1):
- print >> sys.stderr, "WARNING: Field %s in model %s has no reverse_id" % (l["src_port"], m["name"])
+ if (not l["reverse_id"]) and (self.args.verbosity >= 1):
+ print(
+ "WARNING: Field %s in model %s has no reverse_id"
+ % (l["src_port"], m["name"]),
+ file=sys.stderr,
+ )
- if l["reverse_id"] and ((int(l["reverse_id"]) < 1000) or (int(l["reverse_id"]) >= 1900)):
- raise Exception("reverse id for field %s in model %s should be between 1000 and 1899" % (l["src_port"], m["name"]))
+ if l["reverse_id"] and (
+ (int(l["reverse_id"]) < 1000) or (int(l["reverse_id"]) >= 1900)
+ ):
+ raise Exception(
+ "reverse id for field %s in model %s should be between 1000 and 1899"
+ % (l["src_port"], m["name"])
+ )
try:
try:
- rev_links[l['peer']['fqn']].append(rlink)
+ rev_links[l["peer"]["fqn"]].append(rlink)
except TypeError:
pass
except KeyError:
- rev_links[l['peer']['fqn']] = [rlink]
+ rev_links[l["peer"]["fqn"]] = [rlink]
for m in messages:
try:
- m['rlinks'] = rev_links[m['name']]
- message_dict[m['name']]['rlinks'] = m['rlinks']
+ m["rlinks"] = rev_links[m["name"]]
+ message_dict[m["name"]]["rlinks"] = m["rlinks"]
except KeyError:
pass
diff --git a/lib/xos-genx/xosgenx/xosgen.py b/lib/xos-genx/xosgenx/xosgen.py
index 27a7fb4..1318242 100755
--- a/lib/xos-genx/xosgenx/xosgen.py
+++ b/lib/xos-genx/xosgenx/xosgen.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,49 +15,116 @@
#!/usr/bin/python
+from __future__ import print_function
import argparse
from generator import *
from version import __version__
-parse = argparse.ArgumentParser(description='XOS Generative Toolchain')
-parse.add_argument('--rev', dest='rev', action='store_true',
- default=XOSProcessorArgs.default_rev, help='Convert proto to xproto')
-parse.add_argument('--output', dest='output', action='store',
- default=XOSProcessorArgs.default_output, help='Destination dir')
-parse.add_argument('--attic', dest='attic', action='store',
- default=XOSProcessorArgs.default_attic, help='The location at which static files are stored')
-parse.add_argument('--kvpairs', dest='kv', action='store',
- default=XOSProcessorArgs.default_kvpairs, help='Key value pairs to make available to the target')
-parse.add_argument('--write-to-file', dest='write_to_file', choices = ['single', 'model', 'target'], action='store',
- default=XOSProcessorArgs.default_write_to_file,
- help='Single output file (single) or output file per model (model) or let target decide (target)')
-parse.add_argument('--version', action='version', version=__version__)
-parse.add_argument("-v", "--verbosity", action="count",
- default=XOSProcessorArgs.default_verbosity, help="increase output verbosity")
-parse.add_argument("--include-models", dest="include_models", action="append",
- default=XOSProcessorArgs.default_include_models, help="list of models to include")
-parse.add_argument("--include-apps", dest="include_apps", action="append",
- default=XOSProcessorArgs.default_include_apps, help="list of models to include")
+parse = argparse.ArgumentParser(description="XOS Generative Toolchain")
+parse.add_argument(
+ "--rev",
+ dest="rev",
+ action="store_true",
+ default=XOSProcessorArgs.default_rev,
+ help="Convert proto to xproto",
+)
+parse.add_argument(
+ "--output",
+ dest="output",
+ action="store",
+ default=XOSProcessorArgs.default_output,
+ help="Destination dir",
+)
+parse.add_argument(
+ "--attic",
+ dest="attic",
+ action="store",
+ default=XOSProcessorArgs.default_attic,
+ help="The location at which static files are stored",
+)
+parse.add_argument(
+ "--kvpairs",
+ dest="kv",
+ action="store",
+ default=XOSProcessorArgs.default_kvpairs,
+ help="Key value pairs to make available to the target",
+)
+parse.add_argument(
+ "--write-to-file",
+ dest="write_to_file",
+ choices=["single", "model", "target"],
+ action="store",
+ default=XOSProcessorArgs.default_write_to_file,
+ help="Single output file (single) or output file per model (model) or let target decide (target)",
+)
+parse.add_argument("--version", action="version", version=__version__)
+parse.add_argument(
+ "-v",
+ "--verbosity",
+ action="count",
+ default=XOSProcessorArgs.default_verbosity,
+ help="increase output verbosity",
+)
+parse.add_argument(
+ "--include-models",
+ dest="include_models",
+ action="append",
+ default=XOSProcessorArgs.default_include_models,
+ help="list of models to include",
+)
+parse.add_argument(
+ "--include-apps",
+ dest="include_apps",
+ action="append",
+ default=XOSProcessorArgs.default_include_apps,
+ help="list of models to include",
+)
group = parse.add_mutually_exclusive_group()
-group.add_argument('--dest-file', dest='dest_file', action='store',
- default=XOSProcessorArgs.default_dest_file, help='Output file name (if write-to-file is set to single)')
-group.add_argument('--dest-extension', dest='dest_extension', action='store',
- default=XOSProcessorArgs.default_dest_extension, help='Output file extension (if write-to-file is set to single)')
+group.add_argument(
+ "--dest-file",
+ dest="dest_file",
+ action="store",
+ default=XOSProcessorArgs.default_dest_file,
+ help="Output file name (if write-to-file is set to single)",
+)
+group.add_argument(
+ "--dest-extension",
+ dest="dest_extension",
+ action="store",
+ default=XOSProcessorArgs.default_dest_extension,
+ help="Output file extension (if write-to-file is set to single)",
+)
group = parse.add_mutually_exclusive_group(required=True)
-group.add_argument('--target', dest='target', action='store',
- default=XOSProcessorArgs.default_target, help='Output format, corresponding to <output>.yaml file')
-group.add_argument('--checkers', dest='checkers', action='store',
- default=XOSProcessorArgs.default_checkers, help='Comma-separated list of static checkers')
+group.add_argument(
+ "--target",
+ dest="target",
+ action="store",
+ default=XOSProcessorArgs.default_target,
+ help="Output format, corresponding to <output>.yaml file",
+)
+group.add_argument(
+ "--checkers",
+ dest="checkers",
+ action="store",
+ default=XOSProcessorArgs.default_checkers,
+ help="Comma-separated list of static checkers",
+)
-parse.add_argument('files', metavar='<input file>', nargs='+', action='store', help='xproto files to compile')
+parse.add_argument(
+ "files",
+ metavar="<input file>",
+ nargs="+",
+ action="store",
+ help="xproto files to compile",
+)
CHECK = 1
GEN = 2
-class XosGen:
+class XosGen:
@staticmethod
def init(args=None):
if not args:
@@ -68,77 +134,94 @@
if args.target:
op = GEN
- subdir = '/targets/'
+ subdir = "/targets/"
elif args.checkers:
op = CHECK
- subdir = '/checkers/'
+ subdir = "/checkers/"
else:
parse.error("At least one of --target and --checkers is required")
- operators = args.checkers.split(',') if hasattr(args, 'checkers') and args.checkers else [args.target]
+ operators = (
+ args.checkers.split(",")
+ if hasattr(args, "checkers") and args.checkers
+ else [args.target]
+ )
for i in xrange(len(operators)):
- if not '/' in operators[i]:
+ if "/" not in operators[i]:
# if the target is not a path, it refer to a library included one
- operators[i] = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + subdir + operators[i])
+ operators[i] = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + subdir + operators[i]
+ )
if not os.path.isabs(operators[i]):
- operators[i] = os.path.abspath(os.getcwd() + '/' + operators[i])
+ operators[i] = os.path.abspath(os.getcwd() + "/" + operators[i])
if op == GEN:
# convert output to absolute path
if args.output is not None and not os.path.isabs(args.output):
- args.output = os.path.abspath(os.getcwd() + '/' + args.output)
+ args.output = os.path.abspath(os.getcwd() + "/" + args.output)
operator = operators[0]
-
+
# check if there's a line that starts with +++ in the target
# if so, then the output file names are left to the target to decide
# also, if dest-file or dest-extension are supplied, then an error is generated.
- plusplusplus = reduce(lambda acc, line: True if line.startswith('+++') else acc, open(operator).read().splitlines(), False)
+ plusplusplus = reduce(
+ lambda acc, line: True if line.startswith("+++") else acc,
+ open(operator).read().splitlines(),
+ False,
+ )
- if plusplusplus and args.write_to_file != 'target':
- parse.error('%s chooses the names of the files that it generates, you must set --write-to-file to "target"' % operator)
+ if plusplusplus and args.write_to_file != "target":
+ parse.error(
+ '%s chooses the names of the files that it generates, you must set --write-to-file to "target"'
+ % operator
+ )
- if args.write_to_file != 'single' and (args.dest_file):
- parse.error('--dest-file requires --write-to-file to be set to "single"')
+ if args.write_to_file != "single" and (args.dest_file):
+ parse.error(
+ '--dest-file requires --write-to-file to be set to "single"'
+ )
- if args.write_to_file != 'model' and (args.dest_extension):
- parse.error('--dest-extension requires --write-to-file to be set to "model"')
-
+ if args.write_to_file != "model" and (args.dest_extension):
+ parse.error(
+ '--dest-extension requires --write-to-file to be set to "model"'
+ )
+
else:
if args.write_to_file or args.dest_extension:
- parse.error('Checkers cannot write to files')
+ parse.error("Checkers cannot write to files")
inputs = []
for fname in args.files:
if not os.path.isabs(fname):
- inputs.append(os.path.abspath(os.getcwd() + '/' + fname))
+ inputs.append(os.path.abspath(os.getcwd() + "/" + fname))
else:
inputs.append(fname)
args.files = inputs
- if op==GEN:
+ if op == GEN:
generated = XOSProcessor.process(args, operators[0])
if not args.output and not args.write_to_file:
- print generated
- elif op==CHECK:
+ print(generated)
+ elif op == CHECK:
for o in operators:
verdict_str = XOSProcessor.process(args, o)
- vlst = verdict_str.split('\n')
+ vlst = verdict_str.split("\n")
try:
verdict = next(v for v in vlst if v.strip())
- status_code, status_string = verdict.split(' ', 1)
+ status_code, status_string = verdict.split(" ", 1)
status_code = int(status_code)
- except:
- print "Checker %s returned mangled output" % o
+ except BaseException:
+ print("Checker %s returned mangled output" % o)
exit(1)
if status_code != 200:
- print '%s: %s - %s' % (o, status_code, status_string)
+ print("%s: %s - %s" % (o, status_code, status_string))
exit(1)
else:
- print '%s: OK'%o
+ print("%s: OK" % o)