[SEBA-412] Automated reformat of Python code
Passes of modernize, autopep8, black, then check with flake8
flake8 + manual fixes:
lib/xos-config
lib/xos-kafka
lib/xos-util
xos/coreapi
xos/api
xos/xos_client
Change-Id: Ib23cf84cb13beb3c6381fa0d79594dc9131dc815
diff --git a/lib/xos-genx/xosgenx/jinja2_extensions/base.py b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
index e11d2ec..96e8dc2 100644
--- a/lib/xos-genx/xosgenx/jinja2_extensions/base.py
+++ b/lib/xos-genx/xosgenx/jinja2_extensions/base.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,49 +13,56 @@
# limitations under the License.
+from __future__ import print_function
import pdb
import re
from inflect import engine as inflect_engine_class
inflect_engine = inflect_engine_class()
+
class FieldNotFound(Exception):
def __init__(self, message):
super(FieldNotFound, self).__init__(message)
+
def xproto_debug(**kwargs):
- print kwargs
+ print(kwargs)
pdb.set_trace()
+
def xproto_unquote(s):
return unquote(s)
+
def unquote(s):
- if (s.startswith('"') and s.endswith('"')):
+ if s.startswith('"') and s.endswith('"'):
return s[1:-1]
else:
return s
+
def xproto_singularize(field):
try:
# The user has set a singular, as an exception that cannot be handled automatically
- singular = field['options']['singular']
+ singular = field["options"]["singular"]
singular = unquote(singular)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
return singular
+
def xproto_singularize_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- singular = inflect_engine.singular_noun(field['name'])
+ singular = inflect_engine.singular_noun(field["name"])
if singular is False:
# singular_noun returns False on a noun it can't singularize
singular = field["name"]
@@ -65,86 +71,103 @@
return plural
+
def xproto_pluralize(field):
try:
# The user has set a plural, as an exception that cannot be handled automatically
- plural = field['options']['plural']
+ plural = field["options"]["plural"]
plural = unquote(plural)
except KeyError:
- plural = inflect_engine.plural_noun(field['name'])
+ plural = inflect_engine.plural_noun(field["name"])
return plural
-def xproto_base_def(model_name, base, suffix='', suffix_list=[]):
- if (model_name=='XOSBase'):
- return '(models.Model, PlModelMixIn)'
- elif (not base):
- return ''
+
+def xproto_base_def(model_name, base, suffix="", suffix_list=[]):
+ if model_name == "XOSBase":
+ return "(models.Model, PlModelMixIn)"
+ elif not base:
+ return ""
else:
- int_base = [i['name']+suffix for i in base if i['name'] in suffix_list]
- ext_base = [i['name'] for i in base if i['name'] not in suffix_list]
- return '(' + ','.join(int_base + ext_base) + ')'
+ int_base = [i["name"] + suffix for i in base if i["name"] in suffix_list]
+ ext_base = [i["name"] for i in base if i["name"] not in suffix_list]
+ return "(" + ",".join(int_base + ext_base) + ")"
+
def xproto_first_non_empty(lst):
for l in lst:
- if l: return l
+ if l:
+ return l
+
def xproto_api_type(field):
try:
- if (unquote(field['options']['content_type'])=='date'):
- return 'double'
+ if unquote(field["options"]["content_type"]) == "date":
+ return "double"
except KeyError:
pass
- return field['type']
+ return field["type"]
def xproto_base_name(n):
# Hack - Refactor NetworkParameter* to make this go away
- if (n.startswith('NetworkParameter')):
- return '_'
+ if n.startswith("NetworkParameter"):
+ return "_"
- expr = r'^[A-Z]+[a-z]*'
+ expr = r"^[A-Z]+[a-z]*"
try:
match = re.findall(expr, n)[0]
- except:
- return '_'
+ except BaseException:
+ return "_"
return match
+
def xproto_base_fields(m, table):
fields = []
- for b in m['bases']:
- option1 = b['fqn']
+ for b in m["bases"]:
+ option1 = b["fqn"]
try:
- option2 = m['package'] + '.' + b['name']
+ option2 = m["package"] + "." + b["name"]
except TypeError:
option2 = option1
accessor = None
- if option1 in table: accessor = option1
- elif option2 in table: accessor = option2
+ if option1 in table:
+ accessor = option1
+ elif option2 in table:
+ accessor = option2
if accessor:
base_fields = xproto_base_fields(table[accessor], table)
- model_fields = [x.copy() for x in table[accessor]['fields']]
+ model_fields = [x.copy() for x in table[accessor]["fields"]]
for field in model_fields:
field["accessor"] = accessor
fields.extend(base_fields)
fields.extend(model_fields)
- if 'no_sync' in m['options'] and m['options']['no_sync']:
- fields = [f for f in fields if f['name'] != 'backend_status' and f['name'] != 'backend_code']
+ if "no_sync" in m["options"] and m["options"]["no_sync"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "backend_status" and f["name"] != "backend_code"
+ ]
- if 'no_policy' in m['options'] and m['options']['no_policy']:
- fields = [f for f in fields if f['name'] != 'policy_status' and f['name'] != 'policy_code']
+ if "no_policy" in m["options"] and m["options"]["no_policy"]:
+ fields = [
+ f
+ for f in fields
+ if f["name"] != "policy_status" and f["name"] != "policy_code"
+ ]
return fields
+
def xproto_fields(m, table):
""" Generate the full list of models for the xproto message `m` including fields from the classes it inherits.
@@ -165,9 +188,17 @@
# The "id" field is a special field. Every model has one. Put it up front and pretend it's part of the
if not fields:
- raise Exception("Model %s has no fields. Check for missing base class." % m["name"])
+ raise Exception(
+ "Model %s has no fields. Check for missing base class." % m["name"]
+ )
- id_field = {'type': 'int32', 'name': 'id', 'options': {}, "id": "1", "accessor": fields[0]["accessor"]}
+ id_field = {
+ "type": "int32",
+ "name": "id",
+ "options": {},
+ "id": "1",
+ "accessor": fields[0]["accessor"],
+ }
fields = [id_field] + fields
@@ -176,12 +207,15 @@
offset = 0
last_accessor = fields[0]["accessor"]
for field in fields:
- if (field["accessor"] != last_accessor):
+ if field["accessor"] != last_accessor:
last_accessor = field["accessor"]
offset += 100
field_id = int(field["id"])
if (field_id < 1) or (field_id >= 100):
- raise Exception("Only field numbers from 1 to 99 are permitted, field %s in model %s" % (field["name"], field["accessor"]))
+ raise Exception(
+ "Only field numbers from 1 to 99 are permitted, field %s in model %s"
+ % (field["name"], field["accessor"])
+ )
field["id"] = int(field["id"]) + offset
# Check for duplicates
@@ -190,20 +224,24 @@
id = field["id"]
dup = fields_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d with field %s in model %s" % (field["name"], id, dup["name"], field["accessor"]))
+ raise Exception(
+ "Field %s has duplicate number %d with field %s in model %s"
+ % (field["name"], id, dup["name"], field["accessor"])
+ )
fields_by_number[id] = field
return fields
+
def xproto_base_rlinks(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_rlinks = xproto_base_rlinks(table[b], table)
- model_rlinks = [x.copy() for x in table[b]['rlinks']]
+ model_rlinks = [x.copy() for x in table[b]["rlinks"]]
for link in model_rlinks:
link["accessor"] = b
@@ -212,6 +250,7 @@
return links
+
def xproto_rlinks(m, table):
""" Return the reverse links for the xproto message `m`.
@@ -228,14 +267,16 @@
links = xproto_base_rlinks(m, table) + model_rlinks
- links = [x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])]
+ links = [
+ x for x in links if ("+" not in x["src_port"]) and ("+" not in x["dst_port"])
+ ]
if links:
last_accessor = links[0]["accessor"]
offset = 0
index = 1900
for link in links:
- if (link["accessor"] != last_accessor):
+ if link["accessor"] != last_accessor:
last_accessor = link["accessor"]
offset += 100
@@ -249,13 +290,15 @@
index += 1
# check for duplicates
- links_by_number={}
+ links_by_number = {}
for link in links:
id = link["id"]
- dup=links_by_number.get(id)
+ dup = links_by_number.get(id)
if dup:
- raise Exception("Field %s has duplicate number %d in model %s with reverse field %s" %
- (link["src_port"], id, m["name"], dup["src_port"]))
+ raise Exception(
+ "Field %s has duplicate number %d in model %s with reverse field %s"
+ % (link["src_port"], id, m["name"], dup["src_port"])
+ )
links_by_number[id] = link
return links
@@ -264,40 +307,45 @@
def xproto_base_links(m, table):
links = []
- for base in m['bases']:
- b = base['name']
+ for base in m["bases"]:
+ b = base["name"]
if b in table:
base_links = xproto_base_links(table[b], table)
- model_links = table[b]['links']
+ model_links = table[b]["links"]
links.extend(base_links)
links.extend(model_links)
return links
+
def xproto_string_type(xptags):
try:
- max_length = eval(xptags['max_length'])
- except:
+ max_length = eval(xptags["max_length"])
+ except BaseException:
max_length = 1024
- if ('varchar' not in xptags):
- return 'string'
+ if "varchar" not in xptags:
+ return "string"
else:
- return 'text'
+ return "text"
+
def xproto_tuplify(nested_list_or_set):
- if not isinstance(nested_list_or_set, list) and not isinstance(nested_list_or_set, set):
+ if not isinstance(nested_list_or_set, list) and not isinstance(
+ nested_list_or_set, set
+ ):
return nested_list_or_set
else:
return tuple([xproto_tuplify(i) for i in nested_list_or_set])
-def xproto_field_graph_components(fields, model, tag='unique_with'):
+
+def xproto_field_graph_components(fields, model, tag="unique_with"):
def find_components(graph):
pending = set(graph.keys())
components = []
while pending:
- front = { pending.pop() }
+ front = {pending.pop()}
component = set()
while front:
@@ -308,87 +356,96 @@
pending -= neighbours
component |= neighbours
-
+
components.append(component)
return components
field_graph = {}
- field_names = {f['name'] for f in fields}
+ field_names = {f["name"] for f in fields}
for f in fields:
try:
- tagged_str = unquote(f['options'][tag])
- tagged_fields = tagged_str.split(',')
+ tagged_str = unquote(f["options"][tag])
+ tagged_fields = tagged_str.split(",")
for uf in tagged_fields:
if uf not in field_names:
- raise FieldNotFound('Field "%s" not found in model "%s", referenced from field "%s" by option "%s"' % (uf, model['name'], f['name'], tag))
+ raise FieldNotFound(
+ 'Field "%s" not found in model "%s", referenced from field "%s" by option "%s"'
+ % (uf, model["name"], f["name"], tag)
+ )
- field_graph.setdefault(f['name'], set()).add(uf)
- field_graph.setdefault(uf, set()).add(f['name'])
+ field_graph.setdefault(f["name"], set()).add(uf)
+ field_graph.setdefault(uf, set()).add(f["name"])
except KeyError:
pass
return find_components(field_graph)
+
def xproto_api_opts(field):
options = []
- if 'max_length' in field['options'] and field['type']=='string':
- options.append('(val).maxLength = %s'%field['options']['max_length'])
+ if "max_length" in field["options"] and field["type"] == "string":
+ options.append("(val).maxLength = %s" % field["options"]["max_length"])
try:
- if field['options']['null'] == 'False':
- options.append('(val).nonNull = true')
+ if field["options"]["null"] == "False":
+ options.append("(val).nonNull = true")
except KeyError:
pass
- if 'link' in field and 'model' in field['options']:
- options.append('(foreignKey).modelName = "%s"'%field['options']['model'])
+ if "link" in field and "model" in field["options"]:
+ options.append('(foreignKey).modelName = "%s"' % field["options"]["model"])
if ("options" in field) and ("port" in field["options"]):
- options.append('(foreignKey).reverseFieldName = "%s"' % field['options']['port'])
+ options.append(
+ '(foreignKey).reverseFieldName = "%s"' % field["options"]["port"]
+ )
if options:
- options_str = '[' + ', '.join(options) + ']'
+ options_str = "[" + ", ".join(options) + "]"
else:
- options_str = ''
+ options_str = ""
return options_str
+
def xproto_type_to_swagger_type(f):
try:
- content_type = f['options']['content_type']
+ content_type = f["options"]["content_type"]
content_type = eval(content_type)
- except:
+ except BaseException:
content_type = None
pass
- if 'choices' in f['options']:
- return 'string'
- elif content_type == 'date':
- return 'string'
- elif f['type'] == 'bool':
- return 'boolean'
- elif f['type'] == 'string':
- return 'string'
- elif f['type'] in ['int','uint32','int32'] or 'link' in f:
- return 'integer'
- elif f['type'] in ['double','float']:
- return 'string'
+ if "choices" in f["options"]:
+ return "string"
+ elif content_type == "date":
+ return "string"
+ elif f["type"] == "bool":
+ return "boolean"
+ elif f["type"] == "string":
+ return "string"
+ elif f["type"] in ["int", "uint32", "int32"] or "link" in f:
+ return "integer"
+ elif f["type"] in ["double", "float"]:
+ return "string"
+
def xproto_field_to_swagger_enum(f):
- if 'choices' in f['options']:
+ if "choices" in f["options"]:
list = []
- for c in eval(xproto_unquote(f['options']['choices'])):
+ for c in eval(xproto_unquote(f["options"]["choices"])):
list.append(c[0])
return list
else:
return False
+
def xproto_is_true(x):
# TODO: Audit xproto and make specification of trueness more uniform
- if (x==True) or (x=="True") or (x=='"True"'):
+ if x is True or (x == "True") or (x == '"True"'):
return True
return False