Config/management model framework initial version
This is the initial commit of the internal config
model tree for Voltha.
A system that borrows key concepts from Docker, repo,
and git for multi-revision configuration handling with
transaction (commit/abort) logic, tagging, and the
ability to diff between config versions.
Key features:
* Stored model is defined using *annotated* Google protobuf
(*.proto) files
* Model is kept in memory as Python protobuf message objects
* The whole configuration is arranged in a nested (tree)
structure, made up of ConfigNode objects. Each
ConfigNode holds its config data as (possibly nested)
protobuf message object, as well as lists of "pointers"
to its logically nested children nodes. What message
fields are considered part of the node-local config vs.
what is stored as a child node is controlled by "child_node"
annotations in the *.proto files.
* Each ConifgNode stores its config in immutable
ConfigRevision obects, each revision being identified
by a unique hash value, calculated from a unique
hash value of its local configuration as well as
the list of hash values of all its children node.
* Collection type children nodes can be indexed (keyed)
so that they can be addressed with "path" notation
friendly to REST and other interfaces/APIs. Which
field is used as key is defined in the parent message
using "key" attribute of the "child_node" annotation.
Currently string and integer type fields can be used
as keys.
* Changes to the model create new revisions in all
affected nodes, which are rolled up as new revisions
to the root node.
* Root revisions can be tagged, tags can be moved
* The root node by default keeps a rev, but provides
a mechanism to purge untagged revs.
* All branch and leaf nodes auto-purge interim revs
not needed. A rev is not needed if no one refers
to it.
* Diffing between revs is supported, it yields RFC6902
jsonpatch objects. Diffing can be done between any
revs.
* The main operations are: CRUD (we call them .add,
.get, .update, .remove)
* Get can be recursive to an optionally limited depth.
* There is support for Read-Only attributes (fields)
* All CRUD operation support "path" based addressing.
* There is support for an branch/leaf node perspective
implemented by ConfigProxy. This proxy, when attached
to an arbitrary node in the tree, provides all the
CRUD operations in that context, that is, path args
are used relative to that node.
* Transaction support: All mutations made in a transaction
are invisible to others until the transaction is committed.
The commit is atomic (either all changes are applied
to the tree or none). Conflicts between transactions
are detected at the per-node level and a conflict
results in rejecting the conflicting transaction (first
one wins).
* Registered callbacks: via the proxy objects an
observer can register for pre- and post- operation
callbacks. Also, there is a post-get callback which
can be used to augment stored data with real-time
data.
I started hooking up the new config infrastructure to
Voltha's CORE, but this is still in progress, as not
all existing APIs have bee moved over yet.
Note: I also lumped in some experimental files working
with "Any" types in protobufs
Change-Id: Ic547b36e9b893d54e6d9ce67bdfcb32a6e8acd4c
diff --git a/Makefile b/Makefile
index 6a02676..a25668b 100644
--- a/Makefile
+++ b/Makefile
@@ -133,7 +133,7 @@
utest: venv protos
@ echo "Executing all unit tests"
. ${VENVDIR}/bin/activate && \
- nosetests tests --exclude-dir=./tests/itests/
+ for d in $$(find ./tests/utests -type d -depth 1); do echo $$d:; nosetests -v $$d; done
itest: venv run-as-root-tests
@ echo "Executing all integration tests"
diff --git a/common/utils/json_format.py b/common/utils/json_format.py
new file mode 100644
index 0000000..d1672e4
--- /dev/null
+++ b/common/utils/json_format.py
@@ -0,0 +1,92 @@
+"""
+Monkey patched json_format to allow best effort decoding of Any fields.
+Use the additional flag (strict_any_handling=False) to trigger the
+best-effort behavior. Omit the flag, or just use the original json_format
+module fot the strict behavior.
+"""
+
+from google.protobuf import json_format
+
+class _PatchedPrinter(json_format._Printer):
+
+ def __init__(self, including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ super(_PatchedPrinter, self).__init__(including_default_value_fields,
+ preserving_proto_field_name)
+ self.strict_any_handling = strict_any_handling
+
+ def _BestEffortAnyMessageToJsonObject(self, msg):
+ try:
+ res = self._AnyMessageToJsonObject(msg)
+ except TypeError:
+ res = self._RegularMessageToJsonObject(msg, {})
+ return res
+
+
+def MessageToDict(message,
+ including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ """Converts protobuf message to a JSON dictionary.
+
+ Args:
+ message: The protocol buffers message instance to serialize.
+ including_default_value_fields: If True, singular primitive fields,
+ repeated fields, and map fields will always be serialized. If
+ False, only serialize non-empty fields. Singular message fields
+ and oneof fields are not affected by this option.
+ preserving_proto_field_name: If True, use the original proto field
+ names as defined in the .proto file. If False, convert the field
+ names to lowerCamelCase.
+ strict_any_handling: If True, converion will error out (like in the
+ original method) if an Any field with value for which the Any type
+ is not loaded is encountered. If False, the conversion will leave
+ the field un-packed, but otherwise will continue.
+
+ Returns:
+ A dict representation of the JSON formatted protocol buffer message.
+ """
+ printer = _PatchedPrinter(including_default_value_fields,
+ preserving_proto_field_name,
+ strict_any_handling=strict_any_handling)
+ # pylint: disable=protected-access
+ return printer._MessageToJsonObject(message)
+
+
+def MessageToJson(message,
+ including_default_value_fields=False,
+ preserving_proto_field_name=False,
+ strict_any_handling=False):
+ """Converts protobuf message to JSON format.
+
+ Args:
+ message: The protocol buffers message instance to serialize.
+ including_default_value_fields: If True, singular primitive fields,
+ repeated fields, and map fields will always be serialized. If
+ False, only serialize non-empty fields. Singular message fields
+ and oneof fields are not affected by this option.
+ preserving_proto_field_name: If True, use the original proto field
+ names as defined in the .proto file. If False, convert the field
+ names to lowerCamelCase.
+ strict_any_handling: If True, converion will error out (like in the
+ original method) if an Any field with value for which the Any type
+ is not loaded is encountered. If False, the conversion will leave
+ the field un-packed, but otherwise will continue.
+
+ Returns:
+ A string containing the JSON formatted protocol buffer message.
+ """
+ printer = _PatchedPrinter(including_default_value_fields,
+ preserving_proto_field_name,
+ strict_any_handling=strict_any_handling)
+ return printer.ToJsonString(message)
+
+
+json_format._WKTJSONMETHODS['google.protobuf.Any'] = [
+ '_BestEffortAnyMessageToJsonObject',
+ '_ConvertAnyMessage'
+]
+
+json_format._Printer._BestEffortAnyMessageToJsonObject = \
+ json_format._Printer._AnyMessageToJsonObject
diff --git a/common/utils/ordered_weakvalue_dict.py b/common/utils/ordered_weakvalue_dict.py
new file mode 100644
index 0000000..0c63f67
--- /dev/null
+++ b/common/utils/ordered_weakvalue_dict.py
@@ -0,0 +1,48 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from _weakref import ref
+from weakref import KeyedRef
+from collections import OrderedDict
+
+
+class OrderedWeakValueDict(OrderedDict):
+ """
+ Modified OrderedDict to use weak references as values. Entries disappear
+ automatically if the referred value has no more strong reference pointing
+ ot it.
+
+ Warning, this is not a complete implementation, only what is needed for
+ now. See test_ordered_wealvalue_dict.py to see what is tested behavior.
+ """
+ def __init__(self, *args, **kw):
+ def remove(wr, selfref=ref(self)):
+ self = selfref()
+ if self is not None:
+ super(OrderedWeakValueDict, self).__delitem__(wr.key)
+ self._remove = remove
+ super(OrderedWeakValueDict, self).__init__(*args, **kw)
+
+ def __setitem__(self, key, value):
+ super(OrderedWeakValueDict, self).__setitem__(
+ key, KeyedRef(value, self._remove, key))
+
+ def __getitem__(self, key):
+ o = super(OrderedWeakValueDict, self).__getitem__(key)()
+ if o is None:
+ raise KeyError, key
+ else:
+ return o
+
diff --git a/experiments/extensions/Makefile b/experiments/extensions/Makefile
new file mode 100644
index 0000000..67a06c9
--- /dev/null
+++ b/experiments/extensions/Makefile
@@ -0,0 +1,22 @@
+default: test
+
+build:
+ protoc -I . --python_out=. ext1.proto
+ protoc -I . --python_out=. ext2.proto
+
+test: build
+ ./write_generic.py | ./read_generic.py
+ ./write_generic.py | ./read_ext1.py
+ ./write_generic.py | ./read_ext2.py
+ ./write_generic.py | ./read_both.py
+
+ ./write_ext1.py | ./read_generic.py
+ ./write_ext1.py | ./read_ext1.py
+ ./write_ext1.py | ./read_ext2.py
+ ./write_ext1.py | ./read_both.py
+
+ ./write_ext2.py | ./read_generic.py
+ ./write_ext2.py | ./read_ext1.py
+ ./write_ext2.py | ./read_ext2.py
+ ./write_ext2.py | ./read_both.py
+
diff --git a/experiments/extensions/README.md b/experiments/extensions/README.md
new file mode 100644
index 0000000..058aae3
--- /dev/null
+++ b/experiments/extensions/README.md
@@ -0,0 +1,6 @@
+This is an experiment to work with Any type fields in protobuf and
+in Python. Run 'make' to see how a set of writers emit data with
+no or one of two extensions, while four readers try to decode the
+data. Depending on which of them pre-loads the extension protobufs,
+they can decode the extension data or leave it in the packed value
+format.
diff --git a/experiments/extensions/__init__.py b/experiments/extensions/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/experiments/extensions/__init__.py
diff --git a/experiments/extensions/ext1.proto b/experiments/extensions/ext1.proto
new file mode 100644
index 0000000..d8fd448
--- /dev/null
+++ b/experiments/extensions/ext1.proto
@@ -0,0 +1,17 @@
+syntax = "proto3";
+
+package vendor.x1;
+
+// Vendor specific adapter descriptor data
+message AdapterDescription {
+ string internal_name = 1;
+ string internal_code = 2;
+ uint32 price = 3;
+}
+
+// Vendor specific adapter configuration data
+message AdapterConfig {
+ uint32 volume = 1;
+ uint32 bass = 2;
+ uint32 treble = 3;
+}
diff --git a/experiments/extensions/ext2.proto b/experiments/extensions/ext2.proto
new file mode 100644
index 0000000..ac9f1ec
--- /dev/null
+++ b/experiments/extensions/ext2.proto
@@ -0,0 +1,24 @@
+syntax = "proto3";
+
+package vendor.x2;
+
+// Vendor specific adapter descriptor data
+message AdapterDescription {
+ string foo = 1;
+ uint32 arg1 = 2;
+ uint32 arg2 = 3;
+ uint32 arg3 = 4;
+ uint32 arg4 = 5;
+ uint32 arg5 = 6;
+ uint32 arg6 = 7;
+}
+
+// Vendor specific adapter configuration data
+message AdapterConfig {
+ uint64 conf1 = 1;
+ uint64 conf2 = 2;
+ uint64 conf3 = 3;
+ uint64 conf4 = 4;
+ uint64 conf5 = 5;
+ repeated string things = 6;
+}
diff --git a/experiments/extensions/read_both.py b/experiments/extensions/read_both.py
new file mode 100755
index 0000000..682e51a
--- /dev/null
+++ b/experiments/extensions/read_both.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+"""
+Read adapter data while decoding both known custom fields
+"""
+
+import sys
+from json import dumps
+
+from common.utils.json_format import MessageToDict
+from voltha.protos import adapter_pb2
+
+adapter = adapter_pb2.Adapter()
+binary = sys.stdin.read()
+adapter.ParseFromString(binary)
+
+print dumps(MessageToDict(adapter, strict_any_handling=False))
+
diff --git a/experiments/extensions/read_ext1.py b/experiments/extensions/read_ext1.py
new file mode 100755
index 0000000..8032615
--- /dev/null
+++ b/experiments/extensions/read_ext1.py
@@ -0,0 +1,16 @@
+#!/usr/bin/env python
+"""
+Read adapter data while decoding ext1 custom fields.
+"""
+import sys
+from json import dumps
+
+from common.utils.json_format import MessageToDict
+from voltha.protos import adapter_pb2
+
+adapter = adapter_pb2.Adapter()
+binary = sys.stdin.read()
+adapter.ParseFromString(binary)
+
+print dumps(MessageToDict(adapter, strict_any_handling=False))
+
diff --git a/experiments/extensions/read_ext2.py b/experiments/extensions/read_ext2.py
new file mode 100755
index 0000000..a187137
--- /dev/null
+++ b/experiments/extensions/read_ext2.py
@@ -0,0 +1,16 @@
+#!/usr/bin/env python
+"""
+Read adapter data while decoding ext2 custom fields
+"""
+import sys
+from json import dumps
+
+from common.utils.json_format import MessageToDict
+from voltha.protos import adapter_pb2
+
+adapter = adapter_pb2.Adapter()
+binary = sys.stdin.read()
+adapter.ParseFromString(binary)
+
+print dumps(MessageToDict(adapter, strict_any_handling=False))
+
diff --git a/experiments/extensions/read_generic.py b/experiments/extensions/read_generic.py
new file mode 100755
index 0000000..e910188
--- /dev/null
+++ b/experiments/extensions/read_generic.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+"""
+Read adapter data while ignoring custom fields
+"""
+import sys
+from json import dumps
+
+from common.utils.json_format import MessageToDict
+from voltha.protos import adapter_pb2
+
+adapter = adapter_pb2.Adapter()
+binary = sys.stdin.read()
+adapter.ParseFromString(binary)
+print dumps(MessageToDict(adapter, strict_any_handling=False))
+
diff --git a/experiments/extensions/write_ext1.py b/experiments/extensions/write_ext1.py
new file mode 100755
index 0000000..57131ef
--- /dev/null
+++ b/experiments/extensions/write_ext1.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+"""
+Write adapter data without any custom fields
+"""
+import sys
+
+import ext1_pb2
+from voltha.protos import adapter_pb2
+from google.protobuf import any_pb2
+
+def custom_config():
+ any = any_pb2.Any()
+ any.Pack(ext1_pb2.AdapterConfig(
+ volume=20,
+ bass=50,
+ treble=50
+ ))
+ return any
+
+
+def custom_description():
+ any = any_pb2.Any()
+ any.Pack(ext1_pb2.AdapterDescription(
+ internal_name='hulu',
+ internal_code='foo',
+ price=42
+ ))
+ return any
+
+
+adapter = adapter_pb2.Adapter(
+ id='42',
+ config=adapter_pb2.AdapterConfig(
+ additional_config=custom_config()
+ ),
+ additional_description=custom_description()
+)
+
+sys.stdout.write(adapter.SerializeToString())
+
diff --git a/experiments/extensions/write_ext2.py b/experiments/extensions/write_ext2.py
new file mode 100755
index 0000000..b9316a1
--- /dev/null
+++ b/experiments/extensions/write_ext2.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+"""
+Write adapter data without any custom fields
+"""
+import sys
+
+import ext2_pb2
+from voltha.protos import adapter_pb2
+from google.protobuf import any_pb2
+
+
+def custom_config():
+ any = any_pb2.Any()
+ any.Pack(ext2_pb2.AdapterConfig(
+ conf1=1,
+ conf2=42,
+ conf3=0,
+ conf4=11111111111,
+ conf5=11231231,
+ things = ['foo', 'bar', 'baz', 'zoo']
+ ))
+ return any
+
+
+def custom_description():
+ any = any_pb2.Any()
+ any.Pack(ext2_pb2.AdapterDescription(
+ foo='hulu',
+ arg1=42,
+ arg2=42,
+ arg3=42,
+ arg4=42,
+ arg5=42
+ ))
+ return any
+
+
+adapter = adapter_pb2.Adapter(
+ id='42',
+ config=adapter_pb2.AdapterConfig(
+ additional_config=custom_config()
+ ),
+ additional_description=custom_description()
+)
+
+sys.stdout.write(adapter.SerializeToString())
+
diff --git a/experiments/extensions/write_generic.py b/experiments/extensions/write_generic.py
new file mode 100755
index 0000000..1cfca6c
--- /dev/null
+++ b/experiments/extensions/write_generic.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+"""
+Write adapter data without any custom fields
+"""
+import sys
+
+from voltha.protos import adapter_pb2
+
+adapter = adapter_pb2.Adapter(
+ id='42',
+ config=adapter_pb2.AdapterConfig()
+)
+
+sys.stdout.write(adapter.SerializeToString())
+
diff --git a/requirements.txt b/requirements.txt
index cbdf9c1..3302065 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,6 +10,7 @@
hash_ring>=1.3.1
hexdump>=3.3
jinja2>=2.8
+jsonpatch>=1.14
klein>=15.3.1
nose>=1.3.7
nose-exclude>=0.5.0
diff --git a/tests/utests/common/utils/__init__.py b/tests/utests/common/utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/utests/common/utils/__init__.py
diff --git a/tests/utests/common/utils/test_ordered_weakvalue_dict.py b/tests/utests/common/utils/test_ordered_weakvalue_dict.py
new file mode 100644
index 0000000..30579c7
--- /dev/null
+++ b/tests/utests/common/utils/test_ordered_weakvalue_dict.py
@@ -0,0 +1,38 @@
+from unittest import TestCase, main
+
+from common.utils.ordered_weakvalue_dict import OrderedWeakValueDict
+
+
+class O(object):
+ def __init__(self, a):
+ self.a = a
+
+
+class TestOrderedWeakValueDict(TestCase):
+
+ def test_standard_behavior(self):
+ holder = dict() # a real dict so we can control which object real ref
+ def mk(k):
+ o = O(k)
+ holder[k] = o
+ return o
+ o = OrderedWeakValueDict((k, mk(k)) for k in xrange(10))
+ self.assertEqual(len(o), 10)
+ self.assertEqual(o[3].a, 3)
+ o[3] = mk(-3)
+ self.assertEqual(o[3].a, -3)
+ del o[3]
+ self.assertEqual(len(o), 9)
+ o[100] = mk(100)
+ self.assertEqual(len(o), 10)
+ self.assertEqual(o.keys(), [0, 1, 2, 4, 5, 6, 7, 8, 9, 100])
+
+ # remove a few items from the holder, they should be gone from o too
+ del holder[1]
+ del holder[5]
+ del holder[6]
+
+ self.assertEqual(o.keys(), [0, 2, 4, 7, 8, 9, 100])
+ self.assertEqual([v.a for v in o.values()], [0, 2, 4, 7, 8, 9, 100])
+
+
diff --git a/tests/utests/voltha/core/test_config.py b/tests/utests/voltha/core/test_config.py
new file mode 100644
index 0000000..7baa3fe
--- /dev/null
+++ b/tests/utests/voltha/core/test_config.py
@@ -0,0 +1,1167 @@
+from collections import OrderedDict
+from copy import copy
+import resource
+from random import randint, seed
+from time import time
+from unittest import TestCase
+from unittest import main
+
+import gc
+
+from mock import Mock
+
+from voltha.core.config.config_proxy import CallbackType
+from voltha.core.config.config_rev import _rev_cache
+from voltha.core.config.config_root import ConfigRoot, MergeConflictException
+from voltha.core.config.config_txn import ClosedTransactionError
+from voltha.protos import third_party
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.voltha_pb2 import Voltha, Adapter, HealthStatus, \
+ AdapterConfig, LogicalDevice
+
+
+def memusage():
+ return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+
+
+def rev_count():
+ return len(_rev_cache)
+
+
+def probe():
+ return time(), memusage() / 1024. / 1024, rev_count()
+
+
+def print_metrics():
+ print '%20f %20f %8d' % probe()
+
+
+class TestConfigNodeShallow(TestCase):
+
+ def setUp(self):
+ self.empty = Voltha()
+ self.other = Voltha(instance_id='other')
+ self.node = ConfigRoot(Voltha())
+
+ def test_init(self):
+ pass
+
+ def test_immutability(self):
+ self.assertEqual(self.node.latest.data, self.empty)
+ self.empty.instance_id = 'overwritten id'
+ self.assertEqual(self.node.latest.data, Voltha())
+
+ def test_retrieve_latest(self):
+ self.assertEqual(self.node.latest.data, self.empty)
+ hash = self.node.latest.hash
+ self.assertEqual(self.node.revisions, [hash])
+
+ def test_update(self):
+ hash0 = self.node.latest.hash
+ self.node.update('/', copy(self.other))
+ hash1 = self.node.latest.hash
+ self.assertEqual(len(self.node.revisions), 2)
+ self.assertNotEqual(hash0, hash1)
+ self.assertEqual(self.node.latest.data, Voltha(instance_id='other'))
+
+ def test_update_with_bad_data(self):
+ self.assertRaises(ValueError, self.node.update, '/', Adapter())
+
+ def test_many_simple_updates(self):
+ n = 100
+ for i in xrange(n):
+ self.node.update('/', Voltha(instance_id='id%d' % i))
+ self.node.update('/', self.other)
+ self.assertEqual(len(self.node.revisions), 102)
+ self.assertEqual(self.node.latest.data, self.other)
+
+ def test_retrieve_by_rev_hash(self):
+ n = 100
+ for i in xrange(n):
+ self.node.update('/', Voltha(instance_id='id%d' % i))
+ self.node.update('/', self.other)
+ hashes = self.node.revisions
+ self.assertEqual(self.node[hashes[0]].data, self.empty)
+ self.assertEqual(self.node[hashes[10]].data, Voltha(instance_id='id9'))
+ self.assertEqual(self.node[hashes[-1]].data, self.other)
+
+ def test_diffs(self):
+ self.node.update('/', self.other)
+ self.assertEqual(self.node.diff(self.node.latest.hash).patch, [])
+ hashes = self.node.revisions
+ self.assertEqual(self.node.diff(hashes[0]).patch, [
+ dict(op='replace', path='/instance_id', value='other')
+ ])
+ self.assertEqual(self.node.diff(hashes[0], hashes[1]).patch, [
+ dict(op='replace', path='/instance_id', value='other')
+ ])
+ self.assertEqual(self.node.diff(hashes[1], hashes[0]).patch, [
+ dict(op='replace', path='/instance_id', value='')
+ ])
+ self.assertEqual(self.node.diff(hashes[1], hashes[1]).patch, [])
+
+ def test_tagging(self):
+ self.node.tag('original')
+ hash1 = self.node.latest.hash
+
+ # add a bunch of changes
+ for a in xrange(10):
+ self.node.update('/', Voltha(instance_id=str(a)))
+ hash2 = self.node.latest.hash
+
+ # apply tag to latest
+ self.node.tag('latest')
+
+ # apply another tag to latest
+ self.node.tag('other')
+
+ # apply tag to specific rev hash
+ self.node.tag('yetmore', hash2)
+
+ # invalid hash
+ self.assertRaises(KeyError, self.node.tag, 'sometag', 'badhash')
+
+ # retrieve data based on tag
+ self.assertEqual(self.node.by_tag('original').hash, hash1)
+ self.assertEqual(self.node.by_tag('latest').hash, hash2)
+ self.assertEqual(self.node.by_tag('other').hash, hash2)
+ self.assertEqual(self.node.by_tag('yetmore').hash, hash2)
+
+ # generate diff from tags
+ self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
+ dict(op='replace', path='/instance_id', value='9')
+ ])
+
+ # move tags to given revision
+ self.node.tag('original', self.node.revisions[2])
+ self.node.tag('latest', self.node.revisions[9])
+
+ # add another tag
+ self.node.tag('another', self.node.revisions[7])
+
+ # list tags
+ self.assertEqual(self.node.tags,
+ ['another', 'latest', 'original', 'other', 'yetmore'])
+
+ # delete a tag
+ self.node.delete_tag('another')
+ self.node.delete_tags('yetmore', 'other')
+ self.assertEqual(self.node.tags, ['latest', 'original'])
+
+ # prune untagged revisions from revision list
+ self.node.prune_untagged()
+ self.assertEqual(len(self.node.revisions), 3) # latest is always kept
+
+ # retrieve and compare working tagged revs
+ self.assertEqual(self.node.diff_by_tag('original', 'latest').patch, [
+ dict(op='replace', path='/instance_id', value='8')
+ ])
+
+
+class DeepTestsBase(TestCase):
+ """Shared test class for test using a simple node tree"""
+
+ def setUp(self):
+ gc.collect()
+ _rev_cache.clear()
+ self.health = HealthStatus(state=HealthStatus.DYING)
+ self.base_shallow = Voltha(instance_id='1')
+ self.base_deep = copy(self.base_shallow)
+ self.base_deep.health.state = HealthStatus.DYING # = self.health
+ for i in xrange(5):
+ self.base_deep.adapters.add().MergeFrom(Adapter(
+ id=str(i),
+ config=AdapterConfig(
+ log_level=3
+ )
+ ))
+ self.node = ConfigRoot(self.base_deep)
+ self.hash_orig = self.node.latest.hash
+
+ def tearDown(self):
+ del self.node
+
+
+class TestConfigNodeDeep(DeepTestsBase):
+
+ def test_init(self):
+ pass
+
+ def test_reject_duplicate_keys(self):
+ data = Voltha(
+ instance_id='42', adapters=[Adapter(id='same') for _ in xrange(5)])
+ self.assertRaises(ValueError, ConfigRoot, data)
+
+ def test_shallow_get(self):
+ self.assertEqual(self.node.latest.data, self.base_shallow)
+ self.assertEqual(self.node.get(), self.base_shallow)
+ self.assertEqual(self.node.get(hash=self.hash_orig), self.base_shallow)
+
+ def test_deep_get(self):
+ self.assertEqual(self.node.get(deep=True), self.base_deep)
+
+ def test_top_level_update(self):
+ # test that top-level update retains children
+ self.node.update('/', Voltha(version='1.2.3'))
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+ self.assertEqual(self.node.get(
+ hash=self.hash_orig, deep=1), self.base_deep)
+ latest = self.node.get(deep=1)
+ self.assertNotEqual(latest, self.base_deep)
+ self.assertEqual(len(latest.adapters), 5)
+ self.assertEqual(len(latest.logical_devices), 0)
+ self.assertEqual(latest.version, '1.2.3')
+
+ def test_path_based_get_access(self):
+ self.assertEqual(self.node.get(path='/'), self.node.get())
+ self.assertEqual(self.node.get(path='/health'), self.health)
+
+ def test_path_list_retrieval(self):
+ adapters = self.node.get(path='/adapters')
+ self.assertEqual(len(adapters), 5)
+ self.assertEqual(adapters[2].id, '2')
+
+ def test_indexing_into_containers(self):
+ adapter = self.node.get(path='/adapters/3')
+ self.assertEqual(adapter.id, '3')
+
+ def test_deep_update_non_container(self):
+
+ self.node.update('/health', HealthStatus(state=HealthStatus.HEALTHY))
+
+ # root hash is now different
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+
+ # original tree is still intact
+ orig = self.node.get(hash=self.hash_orig, deep=1)
+ self.assertEqual(orig, self.base_deep)
+ self.assertEqual(orig.health.state, HealthStatus.DYING)
+
+ # but the latest contains the change
+ new = self.node.get(deep=1)
+ self.assertNotEqual(new, orig)
+ self.assertEqual(new.health.state, HealthStatus.HEALTHY)
+
+ def test_deep_update_container(self):
+
+ self.node.update('/adapters/0', Adapter(id='0', version='new'))
+
+ # root hash is now different
+ hash_new = self.node.latest.hash
+ self.assertNotEqual(self.hash_orig, hash_new)
+
+ # original tree is still intact
+ orig = self.node.get(hash=self.hash_orig, deep=1)
+ self.assertEqual(orig, self.base_deep)
+ self.assertEqual(orig.adapters[0].id, '0')
+
+ # but the new tree contains the change
+ new = self.node.get(deep=1)
+ self.assertNotEqual(new, orig)
+ self.assertEqual(new.adapters[0].version, 'new')
+
+ def test_update_handle_invalid_paths(self):
+ self.assertRaises(KeyError, self.node.update, 'foo', None)
+ self.assertRaises(KeyError, self.node.update, '/foo', None)
+ self.assertRaises(KeyError, self.node.update, '/health/foo', None)
+ self.assertRaises(ValueError, self.node.update, '/adapters', None)
+ self.assertRaises(KeyError, self.node.update, '/adapters/foo', None)
+ self.assertRaises(KeyError, self.node.update, '/adapters/1/foo', None)
+
+ def test_update_handle_invalid_type(self):
+ self.assertRaises(ValueError, self.node.update, '/', Adapter())
+ self.assertRaises(ValueError, self.node.update, '/health', Adapter())
+ self.assertRaises(ValueError, self.node.update, '/adapters/1', Voltha())
+
+ def test_update_handle_key_change_attempt(self):
+ self.assertRaises(
+ ValueError, self.node.update, '/adapters/1', Adapter(id='changed'))
+
+ def test_add_node(self):
+ new = Adapter(id='new')
+ self.node.add('/adapters', new)
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(len(self.node.get('/adapters')), 6)
+ self.assertEqual(
+ len(self.node.get('/adapters', hash=self.hash_orig)), 5)
+ self.assertEqual(self.node.get('/adapters/new'), new)
+
+ def test_add_handle_invalid_cases(self):
+ # invalid paths
+ self.assertRaises(KeyError, self.node.add, 'foo', None)
+ self.assertRaises(KeyError, self.node.add, '/foo', None)
+ self.assertRaises(KeyError, self.node.add, '/adapters/foo', None)
+
+ # cannot add to non-container nodes
+ self.assertRaises(ValueError, self.node.add, '/health', None)
+ self.assertRaises(ValueError, self.node.add, '/adapters/1', None)
+
+ # cannot add to container data with duplicate key
+ self.assertRaises(
+ ValueError, self.node.add, '/adapters', Adapter(id='1'))
+
+ def test_remove_node(self):
+ self.node.remove('/adapters/3')
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(len(self.node.get('/adapters')), 4)
+ self.assertEqual(
+ len(self.node.get('/adapters', hash=self.hash_orig)), 5)
+ self.assertRaises(KeyError, self.node.get, '/adapters/3')
+
+ def test_remove_handle_invalid_cases(self):
+ # invalid paths
+ self.assertRaises(KeyError, self.node.remove, 'foo')
+ self.assertRaises(KeyError, self.node.remove, '/foo')
+ self.assertRaises(KeyError, self.node.remove, '/adapters/foo')
+ self.assertRaises(KeyError, self.node.remove, '/adapters/1/id')
+
+ # cannot add to non-container nodes
+ self.assertRaises(ValueError, self.node.remove, '/health')
+
+ def test_pruning_after_shallow_change(self):
+
+ self.node.update('/', Voltha(version='10.1'))
+
+ # sanity check
+ self.assertEqual(len(self.node.revisions), 2)
+
+ # prune
+ self.node.prune_untagged()
+
+ self.assertEqual(len(self.node.revisions), 1)
+
+ # we can nevertheless access the whole tree
+ new = self.node.get('/', deep=1)
+ self.assertEqual(new.adapters, self.base_deep.adapters)
+ self.assertEqual(new.version, '10.1')
+
+ def test_pruning_after_deep_change(self):
+
+ self.node.update('/adapters/3', Adapter(id='3', version='changed'))
+
+ # sanity check
+ self.assertEqual(len(self.node.revisions), 2)
+
+ # prune
+ self.node.prune_untagged()
+
+ self.assertEqual(len(self.node.revisions), 1)
+
+ # we can nevertheless access the whole tree
+ new = self.node.get('/', deep=1)
+ self.assertEqual(len(new.adapters), 5)
+ self.assertEqual(new.adapters[2], self.base_deep.adapters[2])
+ self.assertEqual(new.adapters[3].version, 'changed')
+
+
+class TestPruningPerformance(DeepTestsBase):
+
+ def test_repeated_prunning_keeps_memory_stable(self):
+ # The auto-pruning feature of the config system is that leaf nodes
+ # in the config tree that are no longer in use are pruned from memory,
+ # once they revision is removed from root using the .prune_untagged()
+ # method. This test is to verify that.
+
+ n = 1000
+
+ seed(0) # makes things consistently random
+
+ # this should be the number of nodes in the Voltha tree
+ self.assertLess(rev_count(), 10)
+ print; print_metrics()
+
+ def mk_change():
+ key = str(randint(0, 4))
+ path = '/adapters/' + key
+ adapter = self.node.get(path)
+ adapter.version = 'v{}'.format(randint(0, 100000))
+ self.node.update(path, adapter)
+
+ # first we perform many changes without pruning
+ for i in xrange(n):
+ mk_change()
+
+ # at this point we shall have more than 2*n revs laying around
+ self.assertGreater(rev_count(), 2 * n)
+ print_metrics()
+
+ # prune now
+ self.node.prune_untagged()
+
+ # at this point the rev count shall fall back to the original
+ self.assertLess(rev_count(), 10)
+ print_metrics()
+
+ # no make an additional set of modifications while constantly pruning
+ for i in xrange(n):
+ mk_change()
+ self.node.prune_untagged()
+
+ # the rev count should not have increased
+ self.assertLess(rev_count(), 10)
+ print_metrics()
+
+ def test_churn_efficiency(self):
+ # Two config revisions that hash to the same hash value also share the
+ # same in-memory object. So if the same config node goes through churn
+ # (like flip-flopping fields), we don't eat up memory unnecessarily.
+ # This test is to verify that behavior.
+
+ n = 1000
+ modulo = 2
+
+ self.assertEqual(rev_count(), 7)
+ print_metrics()
+
+ def mk_change(seq):
+ # make change module of the sequence number so we periodically
+ # return to the same config
+ path = '/adapters/3'
+ adapter = self.node.get(path)
+ adapter.version = 'v{}'.format(seq % modulo)
+ self.node.update(path, adapter)
+
+ # make n changes back and forth
+ for i in xrange(n):
+ _tmp_rc = rev_count()
+ mk_change(i)
+
+ _tmp_rc = rev_count()
+ # verify that the node count did not increase significantly, yet we
+ # have access to all ditinct revisions
+ self.assertEqual(rev_count(), 11)
+ print_metrics()
+
+ def test_strict_read_only(self):
+ # it shall not be possible to change a read-only field
+ self.assertRaises(ValueError, self.node.update,
+ '/', Voltha(version='foo'), strict=True)
+ self.assertRaises(ValueError, self.node.update,
+ '/adapters/1', Adapter(version='foo'), strict=True)
+
+
+class TestNodeOwnershipAndHooks(DeepTestsBase):
+
+ def test_init(self):
+ pass
+
+ def test_passive_ownership(self):
+
+ # grab a proxy for a given node
+ proxy = self.node.get_proxy('/health')
+
+ # able to read the value directly using this proxy
+ self.assertEqual(proxy.get(), HealthStatus(state=HealthStatus.DYING))
+
+ # able to update value directly using this proxy, but the whole tree
+ # updates
+ proxy.update('/', HealthStatus(state=HealthStatus.HEALTHY))
+ self.assertEqual(proxy.get().state, HealthStatus.HEALTHY)
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+
+ # access constraints are still enforced
+ self.assertRaises(
+ ValueError, proxy.update,
+ '/', HealthStatus(state=HealthStatus.OVERLOADED), strict=1)
+
+ def test_exclusivity(self):
+ proxy = self.node.get_proxy('/adapters/1', exclusive=True)
+ self.assertRaises(ValueError, self.node.get_proxy, '/adapters/1')
+
+ def test_get_hook(self):
+
+ proxy = self.node.get_proxy('/health')
+
+ # getting health without callback just returns what's stored in node
+ self.assertEqual(proxy.get().state, HealthStatus.DYING)
+
+ # register callback
+ def get_health_callback(msg):
+ msg.state = HealthStatus.OVERLOADED
+ return msg
+
+ proxy.register_callback(CallbackType.GET, get_health_callback)
+
+ # once registered, callback can touch up object
+ self.assertEqual(proxy.get().state, HealthStatus.OVERLOADED)
+
+ def test_pre_update_hook(self):
+
+ proxy = self.node.get_proxy('/adapters/1')
+
+ # before hook, change is allowed
+ adapter = proxy.get()
+ adapter.version = 'foo'
+ proxy.update('/', adapter)
+
+ # and for sanity, check if update made it through
+ self.assertEqual(self.node.get('/adapters/1').version, 'foo')
+
+ # regsiter hook that rejects all changes
+ def bully(msg):
+ raise RuntimeError('bully')
+ proxy.register_callback(CallbackType.PRE_UPDATE, bully)
+
+ # test that rejection applies
+ adapter.version = 'bar'
+ self.assertRaises(RuntimeError, proxy.update, '/', adapter)
+ self.assertRaises(RuntimeError, self.node.update, '/adapters/1', adapter)
+
+ def test_post_update_hook(self):
+ proxy = self.node.get_proxy('/adapters/1')
+ callback = Mock()
+ proxy.register_callback(CallbackType.POST_UPDATE, callback,
+ 'zizi', 42, x=1, y='baz')
+ data = Adapter(id='1', version='zoo')
+ proxy.update('/', data)
+ callback.assert_called_once_with(data, 'zizi', 42, x=1, y='baz')
+
+ def test_pre_and_post_add_hooks(self):
+ proxy = self.node.get_proxy('/')
+ pre_callback = Mock()
+ post_callback = Mock()
+ proxy.register_callback(CallbackType.PRE_ADD, pre_callback)
+ proxy.register_callback(CallbackType.POST_ADD, post_callback)
+ new_adapter = Adapter(id='99', version='12.2', vendor='ace')
+ proxy.add('/adapters', new_adapter)
+ pre_callback.assert_called_with(new_adapter)
+ post_callback.assert_called_with(new_adapter)
+
+ def test_pre_and_post_remove_hooks(self):
+ proxy = self.node.get_proxy('/')
+ pre_callback = Mock()
+ post_callback = Mock()
+ proxy.register_callback(CallbackType.PRE_REMOVE, pre_callback)
+ proxy.register_callback(CallbackType.POST_REMOVE, post_callback)
+ adapter = proxy.get('/adapters/1') # so that we can verify callback
+ proxy.remove('/adapters/1')
+ pre_callback.assert_called_with(adapter)
+ post_callback.assert_called_with(adapter)
+
+
+class TestTransactionalLogic(DeepTestsBase):
+
+ def make_change(self, tx, path, attr_name, new_value):
+ data = o = tx.get(path)
+ rest = attr_name
+ while 1:
+ subfield, _, rest = rest.partition('.')
+ if rest:
+ o = getattr(o, subfield)
+ attr_name = rest
+ else:
+ setattr(o, attr_name, new_value)
+ break
+ tx.update(path, data)
+
+ def check_no_tx_branches(self):
+ visited = set()
+
+ def check_node(n):
+ if n not in visited:
+ self.assertEqual(n._branches.keys(), [None])
+ for rev in n._branches[None]._revs.itervalues():
+ for children in rev._children.itervalues():
+ if isinstance(children, OrderedDict):
+ children = children.itervalues()
+ for child_rev in children:
+ child_node = child_rev.node
+ check_node(child_node)
+ visited.add(n)
+
+ check_node(self.node)
+
+ def log_levels(self):
+ return OrderedDict(
+ (a.id, a.config.log_level)
+ for a in self.node.get('/adapters', deep=1))
+
+ def tearDown(self):
+ self.check_no_tx_branches()
+ super(TestTransactionalLogic, self).tearDown()
+
+ def test_transaction_isolation(self):
+ """
+ Test that changes made in a transaction mode are not visible to others
+ """
+ proxy = self.node.get_proxy('/')
+
+ # look under the hood to verify that branches are added
+ # recursively
+ _latest_root_rev = self.node._branches[None].latest
+ adapter_node = _latest_root_rev._children['adapters']['2'].node
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ tx = proxy.open_transaction()
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ path = '/adapters/2'
+ self.make_change(tx, path, 'config.log_level', 0)
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 2)
+
+ # verify that reading from the transaction exposes the change
+ self.assertEqual(tx.get(path).config.log_level, 0)
+
+ # but that reading from the proxy or directly from tree does not
+ self.assertEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(proxy.get(path).config.log_level, 3)
+ self.assertEqual(self.node.get(path).config.log_level, 3)
+
+ tx.cancel()
+
+ def test_cannot_reuse_tx(self):
+ proxy = self.node.get_proxy('/')
+ tx = proxy.open_transaction()
+ tx.cancel()
+ self.assertRaises(ClosedTransactionError, tx.get, '/')
+ self.assertRaises(ClosedTransactionError, tx.add, '/', None)
+ self.assertRaises(ClosedTransactionError, tx.remove, '/')
+
+ def test_multiple_concurrent_transactions(self):
+ """
+ Test that two transactions can make independent changes, without
+ affecting each other, until committed.
+ """
+ proxy1 = self.node.get_proxy('/')
+ proxy2 = self.node.get_proxy('/')
+
+ tx1 = proxy1.open_transaction()
+ tx2 = proxy1.open_transaction()
+ tx3 = proxy2.open_transaction()
+
+ path = '/adapters/2'
+ self.make_change(tx1, path, 'config.log_level', 0)
+
+ # the other transaction does not see the change
+ self.assertEqual(tx2.get(path).config.log_level, 3)
+ self.assertEqual(tx3.get(path).config.log_level, 3)
+ self.assertEqual(proxy1.get(path).config.log_level, 3)
+ self.assertEqual(proxy2.get(path).config.log_level, 3)
+
+ # we can attempt to make change in other txs
+ self.make_change(tx2, path, 'config.log_level', 1)
+ self.make_change(tx3, path, 'config.log_level', 2)
+
+ # each can see its own tree, but no one else can see theirs
+ self.assertEqual(tx1.get(path).config.log_level, 0)
+ self.assertEqual(tx2.get(path).config.log_level, 1)
+ self.assertEqual(tx3.get(path).config.log_level, 2)
+ self.assertEqual(proxy1.get(path).config.log_level, 3)
+ self.assertEqual(proxy2.get(path).config.log_level, 3)
+ self.assertEqual(self.node.latest.hash, self.hash_orig)
+
+ tx1.cancel()
+ tx2.cancel()
+ tx3.cancel()
+
+ def test_transaction_canceling(self):
+ """After abort, transaction is no longer stored"""
+
+ proxy = self.node.get_proxy('/')
+
+ # look under the hood to verify that branches are added
+ # recursively
+ _latest_root_rev = self.node._branches[None].latest
+ adapter_node = _latest_root_rev._children['adapters']['2'].node
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ tx = proxy.open_transaction()
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ self.make_change(tx, '/adapters/2', 'config.log_level', 4)
+
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 2)
+
+ del tx
+
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ def test_transaction_explitic_canceling(self):
+ """After abort, transaction is no longer stored"""
+
+ proxy = self.node.get_proxy('/')
+
+ # look under the hood to verify that branches are added
+ # recursively
+ _latest_root_rev = self.node._branches[None].latest
+ adapter_node = _latest_root_rev._children['adapters']['2'].node
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ tx = proxy.open_transaction()
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ self.make_change(tx, '/adapters/2', 'config.log_level', 4)
+
+ self.assertEqual(len(self.node._branches.keys()), 2)
+ self.assertEqual(len(adapter_node._branches.keys()), 2)
+
+ tx.cancel()
+
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+
+ def test_transaction_commit(self):
+ """Once committed, changes become latest"""
+
+ proxy = self.node.get_proxy('/')
+ _latest_root_rev = self.node._branches[None].latest
+ adapter_node = _latest_root_rev._children['adapters']['2'].node
+ tx = proxy.open_transaction()
+
+ # publicly visible value before change
+ path = '/adapters/2'
+ self.assertEqual(proxy.get(path).config.log_level, 3)
+ self.assertEqual(self.node.get(path).config.log_level, 3)
+
+ # make the change, but not commit yet
+ self.make_change(tx, path, 'config.log_level', 4)
+ self.assertEqual(proxy.get(path).config.log_level, 3)
+ self.assertEqual(self.node.get(path).config.log_level, 3)
+
+ # commit the change
+ tx.commit()
+ self.assertNotEqual(self.node.latest.hash, self.hash_orig)
+ self.assertEqual(proxy.get('/adapters/2').config.log_level, 4)
+ self.assertEqual(len(self.node._branches.keys()), 1)
+ self.assertEqual(len(adapter_node._branches.keys()), 1)
+ self.assertEqual(proxy.get(path).config.log_level, 4)
+ self.assertEqual(self.node.get(path).config.log_level, 4)
+
+ def test_collision_detection(self):
+ """Correctly detect transaction collision and abort the 2nd tx"""
+
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ path = '/adapters/2'
+ self.make_change(tx1, path, 'config.log_level', 0)
+
+ # make another tx before tx1 is committed
+ tx2 = proxy.open_transaction()
+ print tx2._txid
+ self.make_change(tx2, path, 'config.log_level', 4)
+
+ # commit first
+ print tx1._txid
+ tx1.commit()
+
+ # committing 2nd one should fail
+ self.assertRaises(MergeConflictException, tx2.commit)
+ self.check_no_tx_branches()
+
+ def test_nonconfliciting_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/1', 'config.log_level', 1)
+ self.make_change(tx2, '/adapters/2', 'config.log_level', 2)
+ tx1.commit()
+ tx2.commit()
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 1, '2': 2, '3': 3, '4': 3
+ })
+
+ def test_additive_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new'))
+ tx1.add('/adapters', Adapter(id='new2'))
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 7)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 7)
+ self.assertEqual(len(self.node.get('/adapters')), 7)
+ self.assertEqual(self.log_levels().keys(),
+ ['0', '1', '2', '3', '4', 'new', 'new2'])
+
+ def test_remove_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx1.remove('/adapters/2')
+ tx1.remove('/adapters/4')
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 3)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 3)
+ self.assertEqual(len(self.node.get('/adapters')), 3)
+ self.assertEqual(self.log_levels().keys(), ['0', '1', '3'])
+
+ def test_mixed_add_remove_update_changes(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/2', 'config.log_level', 2)
+ tx1.remove('/adapters/0')
+ tx1.add('/adapters', Adapter(id='new'))
+ tx1.remove('/adapters/4')
+ tx1.add('/adapters', Adapter(id='new2'))
+ tx1.add('/adapters', Adapter(id='new3'))
+ self.assertEqual(len(proxy.get('/adapters')), 5)
+ self.assertEqual(len(self.node.get('/adapters')), 5)
+ self.assertEqual(len(tx1.get('/adapters')), 6)
+ tx1.commit()
+ self.assertEqual(len(proxy.get('/adapters')), 6)
+ self.assertEqual(len(self.node.get('/adapters')), 6)
+ self.assertEqual(self.log_levels(), {
+ '1': 3, '2': 2, '3': 3, 'new': 0, 'new2': 0, 'new3': 0
+ })
+
+ def test_compatible_updates(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx4 = proxy.open_transaction()
+ tx5 = proxy.open_transaction()
+ tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ self.make_change(tx2, '/adapters/1', 'version', '42')
+ self.make_change(tx3, '/adapters/2', 'config.log_level', 2)
+ self.make_change(tx4, '/adapters/1', 'version', '42')
+ self.make_change(tx5, '/adapters/1', 'version', '422')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ tx4.commit()
+ self.assertRaises(MergeConflictException, tx5.commit)
+
+ # verify outcome
+ self.assertEqual(self.node.get('/health').state, 1)
+ self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 3, '2': 2, '3': 3, '4': 3
+ })
+
+ def test_conflciting_updates(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.update('/health', HealthStatus(state=HealthStatus.OVERLOADED))
+ self.make_change(tx2, '/adapters/1', 'version', '42')
+ self.make_change(tx3, '/adapters/1', 'config.log_level', 2)
+ tx1.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+
+ # verify outcome
+ self.assertEqual(self.node.get('/health').state, 1)
+ self.assertEqual(self.node.get('/', deep=1).adapters[1].version, '42')
+ self.assertEqual(self.log_levels(), {
+ '0': 3, '1': 3, '2': 3, '3': 3, '4': 3
+ })
+
+ def test_compatible_adds(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new1'))
+ tx2.add('/adapters', Adapter(id='new2'))
+ tx3.add('/adapters', Adapter(id='new3'))
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2', 'new3'
+ ])
+
+ def test_colliding_adds(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx4 = proxy.open_transaction()
+ tx1.add('/adapters', Adapter(id='new1'))
+ tx2.add('/adapters', Adapter(id='new2'))
+ tx3.add('/adapters', Adapter(id='new1', version='foobar'))
+ tx4.add('/adapters', Adapter(id='new1'))
+ tx1.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+ tx4.commit() # is fine since it add the same data
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2'
+ ])
+
+ def test_compatible_removes(self):
+ # removes are always compatible with each other
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.remove('/adapters/0')
+ tx2.remove('/adapters/3')
+ tx3.remove('/adapters/0')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), ['1', '2', '4'])
+
+ def test_update_remove_conflict(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'version', '42')
+ tx1.remove('/adapters/1')
+ self.make_change(tx2, '/adapters/1', 'version', '13')
+ tx3.remove('/adapters/0')
+ tx1.commit()
+ self.assertRaises(MergeConflictException, tx2.commit)
+ self.assertRaises(MergeConflictException, tx3.commit)
+ self.assertEqual(self.log_levels().keys(), ['0', '2', '3', '4'])
+
+ def test_compatible_update_remove_mix(self):
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'version', '42')
+ tx1.remove('/adapters/1')
+ self.make_change(tx2, '/adapters/2', 'version', '13')
+ tx3.remove('/adapters/3')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), ['0', '2', '4'])
+
+ def test_update_add_mix(self):
+ # at same nodes updates are always compatible with adds
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ self.make_change(tx1, '/adapters/0', 'config.log_level', 4)
+ self.make_change(tx1, '/adapters/2', 'config.log_level', 4)
+ tx2.add('/adapters', Adapter(id='new1'))
+ tx3.add('/adapters', Adapter(id='new2'))
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '0', '1', '2', '3', '4', 'new1', 'new2'
+ ])
+
+ def test_remove_add_mix(self):
+ # at same node, adds are always compatible with removes
+ proxy = self.node.get_proxy('/')
+ tx1 = proxy.open_transaction()
+ tx2 = proxy.open_transaction()
+ tx3 = proxy.open_transaction()
+ tx1.remove('/adapters/0')
+ tx2.add('/adapters', Adapter(id='new1'))
+ tx3.add('/adapters', Adapter(id='new2'))
+ tx1.remove('/adapters/4')
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertEqual(self.log_levels().keys(), [
+ '1', '2', '3', 'new1', 'new2'
+ ])
+
+ def make_complex_changes(self):
+
+ # Plan:
+ # Have two root proxies and two proxies on specific adapters
+ # Make several transactions, including conflicting ones
+ # Check as much as possible in terms of expected operations
+
+ proxy1 = self.node.get_proxy('/')
+ proxy2 = self.node.get_proxy('/')
+ proxy3 = self.node.get_proxy('/adapters/0')
+ proxy4 = self.node.get_proxy('/adapters/1')
+
+ tx1 = proxy1.open_transaction()
+ tx2 = proxy1.open_transaction()
+ tx3 = proxy2.open_transaction()
+ tx4 = proxy3.open_transaction()
+ tx5 = proxy4.open_transaction()
+
+ # Make multiple changes via tx1
+ self.make_change(tx1, '/adapters/0', 'config.log_level', 1)
+ tx1.add('/adapters', Adapter(id='new1'))
+ tx1.remove('/adapters/2')
+
+ # Make a non-conflicting change from tx2
+ self.make_change(tx2, '/adapters/3', 'config.log_level', 0)
+
+ # Make some conflicting changes via tx3 now
+ self.make_change(tx3, '/adapters/1', 'config.log_level', 1)
+
+ # Make some changes via leaf proxies
+ my_adapter = tx4.get('/')
+ my_adapter.version = 'zulu'
+ my_adapter.config.log_level = 0
+ tx4.update('/', my_adapter)
+
+ # Make some changes via leaf proxies
+ my_adapter = tx5.get('/')
+ my_adapter.version = 'brand new'
+ my_adapter.config.log_level = 4
+ tx5.update('/', my_adapter)
+
+ # Make some more changes on tx2
+ tx2.add('/adapters', Adapter(id='new2'))
+
+ # Conflicts:
+ # - tx4 conflicts with tx0
+ # - tx5 conflicts with tx3
+ return tx1, tx2, tx3, tx4, tx5
+
+ def test_complex_changes_seq1(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx1.commit()
+ tx2.commit()
+ tx3.commit()
+ self.assertRaises(MergeConflictException, tx4.commit)
+ self.assertRaises(MergeConflictException, tx5.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 1, '1': 1, '3': 0, '4': 3, 'new1': 0, 'new2': 0
+ })
+
+ def test_complex_changes_seq2(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx5.commit()
+ tx4.commit()
+ self.assertRaises(MergeConflictException, tx3.commit)
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx1.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 0, '1': 4, '2': 3, '3': 0, '4': 3, 'new2': 0
+ })
+
+ def test_complex_changes_seq3(self):
+ tx1, tx2, tx3, tx4, tx5 = self.make_complex_changes()
+ tx4.commit()
+ tx3.commit()
+ tx2.commit()
+ self.assertRaises(MergeConflictException, tx1.commit)
+ self.assertRaises(MergeConflictException, tx5.commit)
+ self.assertEqual(self.log_levels(), {
+ '0': 0, '1': 1, '2': 3, '3': 0, '4': 3, 'new2': 0
+ })
+
+ def test_canceling_adds(self):
+ proxy = self.node.get_proxy('/')
+ tx = proxy.open_transaction()
+ tx.add('/adapters', Adapter(id='new'))
+ tx.add('/adapters', Adapter(id='new2'))
+ tx.cancel()
+ self.assertEqual(self.log_levels().keys(), ['0', '1', '2', '3', '4'])
+
+ def test_nested_adds(self):
+ self.node.add('/logical_devices', LogicalDevice(id='0'))
+ self.node.add('/logical_devices', LogicalDevice(id='1'))
+ proxy0 = self.node.get_proxy('/logical_devices/0')
+ proxy1 = self.node.get_proxy('/logical_devices/1')
+ tx0 = proxy0.open_transaction()
+ tx1 = proxy1.open_transaction()
+
+ tx0.add('/ports', ofp_port(port_no=0, name='/0'))
+ tx0.add('/ports', ofp_port(port_no=1, name='/1'))
+ tx1.add('/ports', ofp_port(port_no=0, name='/0'))
+
+ # at this point none of these are visible outside of tx
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 0)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
+
+ tx0.commit()
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 0)
+
+ tx1.commit()
+ self.assertEqual(len(proxy0.get('/', deep=1).ports), 2)
+ self.assertEqual(len(proxy1.get('/', deep=1).ports), 1)
+
+ def test_nested_removes(self):
+ self.node.add('/logical_devices', LogicalDevice(id='0'))
+ proxy0 = self.node.get_proxy('/logical_devices/0')
+
+ # add some ports to a device
+ tx0 = proxy0.open_transaction()
+ for i in xrange(10):
+ tx0.add('/ports', ofp_port(port_no=i, name='/{}'.format(i)))
+ self.assertRaises(ValueError, tx0.add,
+ '/ports', ofp_port(port_no=1, name='/1'))
+ tx0.commit()
+
+ # now to the removal
+
+ tx0 = proxy0.open_transaction()
+ tx0.remove('/ports/0')
+ tx0.remove('/ports/5')
+
+ tx1 = proxy0.open_transaction()
+ tx1.remove('/ports/2')
+ tx1.remove('/ports/7')
+
+ tx0.commit()
+ tx1.commit()
+
+ port_ids = [
+ p.port_no for p
+ in self.node.get(deep=1).logical_devices[0].ports
+ ]
+ self.assertEqual(port_ids, [1, 3, 4, 6, 8, 9])
+
+ # TODO need more tests to hammer out potential issues with transactions \
+ # on nested nodes
+
+ def test_transactions_defer_post_op_callbacks(self):
+
+ proxy = self.node.get_proxy('/')
+
+ pre_update = Mock()
+ post_update = Mock()
+ pre_add = Mock()
+ post_add = Mock()
+ pre_remove = Mock()
+ post_remove = Mock()
+
+ proxy.register_callback(CallbackType.PRE_UPDATE, pre_update)
+ proxy.register_callback(CallbackType.POST_UPDATE, post_update)
+ proxy.register_callback(CallbackType.PRE_ADD, pre_add)
+ proxy.register_callback(CallbackType.POST_ADD, post_add)
+ proxy.register_callback(CallbackType.PRE_REMOVE, pre_remove)
+ proxy.register_callback(CallbackType.POST_REMOVE, post_remove)
+
+ tx = proxy.open_transaction()
+
+ # make some changes of each type
+ v = tx.get('/')
+ v.version = '42'
+ tx.update('/', v)
+ ad = tx.get('/adapters/1')
+ tx.remove('/adapters/1')
+ ld = LogicalDevice(id='1')
+ tx.add('/logical_devices', ld)
+
+ # each pre_* should have been called exactly once, but none of the
+ # post_* callbacks have been called yet
+ pre_update.assert_called_once_with(v)
+ pre_add.assert_called_once_with(ld)
+ pre_remove.assert_called_once_with(ad)
+ post_update.assert_not_called()
+ post_add.assert_not_called()
+ post_remove.assert_not_called()
+
+ # once we commit, we shall get the other callbacks
+ tx.commit()
+ post_update.assert_called_once_with(v)
+ post_add.assert_called_once_with(ld)
+ post_remove.assert_called_once_with(ad)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/adapters/loader.py b/voltha/adapters/loader.py
index 2222f62..c90f17f 100644
--- a/voltha/adapters/loader.py
+++ b/voltha/adapters/loader.py
@@ -25,12 +25,15 @@
import structlog
from twisted.internet.defer import inlineCallbacks, returnValue
+from zope.interface import implementer
from zope.interface.verify import verifyClass
+from common.utils.grpc_utils import twisted_async
from voltha.adapters.interface import IAdapterInterface
-from voltha.northbound.grpc.grpc_server import get_nbi_server
+from voltha.protos import third_party
from voltha.protos.adapter_pb2 import add_AdapterServiceServicer_to_server, \
AdapterServiceServicer, Adapters
+from voltha.registry import IComponent, registry
log = structlog.get_logger()
@@ -38,21 +41,25 @@
mydir = os.path.abspath(os.path.dirname(__file__))
+@implementer(IComponent)
class AdapterLoader(AdapterServiceServicer):
def __init__(self, config):
self.config = config
self.adapters = {} # adapter-name -> adapter instance
- get_nbi_server().register(add_AdapterServiceServicer_to_server, self)
+ registry('grpc_server').register(
+ add_AdapterServiceServicer_to_server, self)
+ self.root_proxy = registry('core').get_proxy('/')
@inlineCallbacks
def start(self):
log.debug('starting')
- for adapter_name, adapter_class in self.find_adapters():
+ for adapter_name, adapter_class in self._find_adapters():
config = self.load_adapter_config(adapter_name)
adapter = adapter_class(config)
yield adapter.start()
self.adapters[adapter_name] = adapter
+ self.expose_adapter(adapter_name)
log.info('started')
returnValue(self)
@@ -64,7 +71,7 @@
self.adapters = {}
log.info('stopped')
- def find_adapters(self):
+ def _find_adapters(self):
subdirs = os.walk(mydir).next()[1]
for subdir in subdirs:
adapter_name = subdir
@@ -85,7 +92,6 @@
verifyClass(IAdapterInterface, cls)
yield adapter_name, cls
-
def load_adapter_config(self, adapter_name):
"""
Opportunistically load persisted adapter configuration
@@ -94,10 +100,15 @@
"""
# TODO
+ def expose_adapter(self, name):
+ adapter_descriptor = self.adapters[name].adapter_descriptor()
+ self.root_proxy.add('/adapters', adapter_descriptor)
+
# gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
# the gRPC threadpool threads.
+ @twisted_async
def ListAdapters(self, request, context):
log.info('list-adapters', request=request)
- items = [a.adapter_descriptor() for a in self.adapters.itervalues()]
+ items = self.root_proxy.get('/adapters')
return Adapters(items=items)
diff --git a/voltha/adapters/simulated/simulated.py b/voltha/adapters/simulated/simulated.py
index aa71da1..805d250 100644
--- a/voltha/adapters/simulated/simulated.py
+++ b/voltha/adapters/simulated/simulated.py
@@ -23,6 +23,7 @@
from voltha.adapters.interface import IAdapterInterface
from voltha.protos.adapter_pb2 import Adapter, DeviceTypes, AdapterConfig
from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos.common_pb2 import INFO
log = structlog.get_logger()
@@ -34,8 +35,9 @@
self.config = config
self.descriptor = Adapter(
id='simulated',
- config=AdapterConfig()
- # TODO
+ vendor='Voltha project',
+ version='0.1',
+ config=AdapterConfig(log_level=INFO)
)
def start(self):
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 39836b8..b268a20 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -23,9 +23,11 @@
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from twisted.internet.task import LoopingCall
+from zope.interface import implementer
from leader import Leader
from common.utils.asleep import asleep
+from voltha.registry import IComponent
from worker import Worker
log = get_logger()
@@ -35,6 +37,7 @@
pass
+@implementer(IComponent)
class Coordinator(object):
"""
An app shall instantiate only one Coordinator (singleton).
@@ -106,6 +109,7 @@
log.debug('starting')
reactor.callLater(0, self._async_init)
log.info('started')
+ return self
@inlineCallbacks
def stop(self):
@@ -114,7 +118,7 @@
yield self._delete_session() # this will delete the leader lock too
yield self.worker.stop()
if self.leader is not None:
- yield self.leader.halt()
+ yield self.leader.stop()
self.leader = None
log.info('stopped')
diff --git a/voltha/core/config/__init__.py b/voltha/core/config/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/core/config/__init__.py
diff --git a/voltha/core/config/config_branch.py b/voltha/core/config/config_branch.py
new file mode 100644
index 0000000..468f56e
--- /dev/null
+++ b/voltha/core/config/config_branch.py
@@ -0,0 +1,54 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Class to hold revisions, latest revision, etc., for a config node, used
+for the active committed revisions or revisions part of a transaction.
+"""
+
+from collections import OrderedDict
+
+from common.utils.ordered_weakvalue_dict import OrderedWeakValueDict
+
+
+class ConfigBranch(object):
+
+ __slots__ = (
+ '_node', # ref to node
+ '_txid', # txid for this branch (None for the committed branch)
+ '_origin', # _latest at time of branching on default branch
+ '_revs', # dict of rev-hash to ref of ConfigRevision
+ '_latest', # ref to latest committed ConfigRevision
+ '__weakref__'
+ )
+
+ def __init__(self, node, txid=None, origin=None, auto_prune=True):
+ self._node = node
+ self._txid = txid
+ self._origin = origin
+ self._revs = OrderedWeakValueDict() if auto_prune else OrderedDict()
+ self._latest = origin
+
+ def __getitem__(self, hash):
+ return self._revs[hash]
+
+ @property
+ def latest(self):
+ return self._latest
+
+ @property
+ def origin(self):
+ return self._origin
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
new file mode 100644
index 0000000..9f7f498
--- /dev/null
+++ b/voltha/core/config/config_node.py
@@ -0,0 +1,646 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from collections import OrderedDict
+from copy import copy
+
+from jsonpatch import JsonPatch
+from jsonpatch import make_patch
+
+from common.utils.json_format import MessageToDict
+from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_proxy import CallbackType, ConfigProxy
+from voltha.core.config.config_rev import is_proto_message, children_fields, \
+ ConfigRevision, access_rights
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+
+class MergeConflictException(Exception):
+ pass
+
+
+def message_to_dict(m):
+ return MessageToDict(m, True, True, False)
+
+
+def check_access_violation(new_msg, old_msg):
+ """Raise ValueError if attempt is made to change a read-only field"""
+ access_map = access_rights(new_msg.__class__)
+ violated_fields = []
+ for field_name, access in access_map.iteritems():
+ if access == meta_pb2.READ_ONLY:
+ if getattr(new_msg, field_name) != getattr(old_msg, field_name):
+ violated_fields.append(field_name)
+ if violated_fields:
+ raise ValueError('Cannot change read-only field(s) %s' %
+ ', '.join('"%s"' % f for f in violated_fields))
+
+
+class ConfigNode(object):
+ """
+ Represents a configuration node which can hold a number of revisions
+ of the configuration for this node.
+ When the configuration changes, the new version is appended to the
+ node.
+ Initial data must be a protobuf message and it will determine the type of
+ this node.
+ """
+ __slots__ = (
+ '_type', # node type, as __class__ of protobuf message
+ '_branches', # dict of transaction branches and a default (committed)
+ # branch
+ '_tags', # dict of tag-name to ref of ConfigRevision
+ '_proxy', # ref to proxy observer or None if no proxy assigned
+ )
+
+ def __init__(self, initial_data, auto_prune=True, txid=None):
+ assert is_proto_message(initial_data)
+ self._type = initial_data.__class__
+ self._branches = {}
+ self._tags = {}
+ self._proxy = None
+
+ self._initialize(copy(initial_data), auto_prune, txid)
+
+ def _initialize(self, data, auto_prune, txid):
+ # separate external children data away from locally stored data
+ # based on child_node annotations in protobuf
+ children = {}
+ for field_name, field in children_fields(self._type).iteritems():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if field.key:
+ children[field_name] = od = OrderedDict()
+ for v in field_value:
+ rev = ConfigNode(v, txid=txid).latest
+ key = getattr(v, field.key)
+ if key in od:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ od[key] = rev
+ else:
+ children[field_name] = [
+ ConfigNode(v, txid=txid).latest for v in field_value]
+ else:
+ children[field_name] = [
+ ConfigNode(field_value, txid=txid).latest]
+ data.ClearField(field_name)
+
+
+ branch = ConfigBranch(self, auto_prune=auto_prune)
+ rev = ConfigRevision(branch, data, children)
+ branch._latest = rev
+ branch._revs[rev.hash] = rev
+ self._branches[txid] = branch
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ accessors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # these convenience short-cuts only work for the committed branch
+
+ @property
+ def revisions(self):
+ return [r._hash for r in self._branches[None]._revs.itervalues()]
+
+ @property
+ def latest(self):
+ return self._branches[None]._latest
+
+ def __getitem__(self, hash):
+ return self._branches[None]._revs[hash]
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ get operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path=None, hash=None, depth=0, deep=False, txid=None):
+
+ # depth preparation
+ if deep:
+ depth = -1
+
+ # path preparation
+ path = '' if path is None else path
+ while path.startswith('/'):
+ path = path[1:]
+
+ # determine branch; if lookup fails, it is ok to use default branch
+ branch = self._branches.get(txid, None) or self._branches[None]
+
+ # determine rev
+ if hash is not None:
+ rev = branch._revs[hash]
+ else:
+ rev = branch.latest
+
+ return self._get(rev, path, depth)
+
+ def _get(self, rev, path, depth):
+
+ if not path:
+ return self._do_get(rev, depth)
+
+ # ... otherwise
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if field.key:
+ children_od = rev._children[name]
+ if path:
+ # need to escalate further
+ key, _, path = path.partition('/')
+ child_rev = children_od[field.key_from_str(key)]
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+ else:
+ # we are the node of interest
+ response = []
+ for child_rev in children_od.itervalues():
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ if path:
+ raise LookupError(
+ 'Cannot index into container with no key defined')
+ response = []
+ for child_rev in rev._children[name]:
+ child_node = child_rev.node
+ value = child_node._do_get(child_rev, depth)
+ response.append(value)
+ return response
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get(child_rev, path, depth)
+
+ def _do_get(self, rev, depth):
+ msg = rev.get(depth)
+ if self._proxy is not None:
+ msg = self._proxy.invoke_callbacks(CallbackType.GET, msg)
+ return msg
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ update operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def update(self, path, data, strict=False, txid=None, mk_branch=None):
+
+ while path.startswith('/'):
+ path = path[1:]
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ if not path:
+ return self._do_update(branch, data, strict)
+
+ rev = branch._latest # change is always made to the latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot update a list')
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children_od = copy(rev._children[name])
+ child_rev = children_od[key]
+ child_node = child_rev.node
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ if new_child_rev.hash == child_rev.hash:
+ # no change, we can return
+ return branch._latest
+ if getattr(new_child_rev.data, field.key) != key:
+ raise ValueError('Cannot change key field')
+ children_od[key] = new_child_rev
+ rev = rev.update_children(name, children_od, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ new_child_rev = child_node.update(
+ path, data, strict, txid, mk_branch)
+ rev = rev.update_children(name, [new_child_rev], branch)
+ self._make_latest(branch, rev)
+ return rev
+
+ def _do_update(self, branch, data, strict):
+ if not isinstance(data, self._type):
+ raise ValueError(
+ '"{}" is not a valid data type for this node'.format(
+ data.__class__.__name__))
+ self._test_no_children(data)
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(CallbackType.PRE_UPDATE, data)
+
+ if branch._latest.data != data:
+ if strict:
+ # check if attempt is made to change read-only field
+ check_access_violation(data, branch._latest.data)
+ rev = branch._latest.update_data(data, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_UPDATE, rev.data),))
+ return rev
+ else:
+ return branch._latest
+
+ def _make_latest(self, branch, rev, change_announcements=()):
+ branch._latest = rev
+ if rev.hash not in branch._revs:
+ branch._revs[rev.hash] = rev
+
+ # announce only if this is main branch
+ if change_announcements and branch._txid is None and \
+ self._proxy is not None:
+ for change_type, data in change_announcements:
+ self._proxy.invoke_callbacks(
+ change_type, data, proceed_on_errors=1)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot add to non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ # we do need to add a new child to the field
+ if field.key:
+ if self._proxy is not None:
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_ADD, data)
+ children_od = copy(rev._children[name])
+ key = getattr(data, field.key)
+ if key in children_od:
+ raise ValueError('Duplicate key "{}"'.format(key))
+ child_rev = ConfigNode(data).latest
+ children_od[key] = child_rev
+ rev = rev.update_children(name, children_od, branch)
+ self._make_latest(branch, rev,
+ ((CallbackType.POST_ADD, data),))
+ return rev
+ else:
+ # adding to non-keyed containers not implemented yet
+ raise ValueError('Cannot add to non-keyed container')
+ else:
+ if field.key:
+ # need to escalate
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ children_od = copy(rev._children[name])
+ child_rev = children_od[key]
+ child_node = child_rev.node
+ new_child_rev = child_node.add(path, data, txid, mk_branch)
+ children_od[key] = new_child_rev
+ rev = rev.update_children(name, children_od, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ raise ValueError(
+ 'Cannot index into container with no keys')
+ else:
+ raise ValueError('Cannot add to non-container field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ remove operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def remove(self, path, txid=None, mk_branch=None):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ raise ValueError('Cannot remove from non-container node')
+
+ try:
+ branch = self._branches[txid]
+ except KeyError:
+ branch = mk_branch(self)
+
+ rev = branch._latest # change is always made to latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError("Cannot remove without a key")
+ if field.key:
+ key, _, path = path.partition('/')
+ key = field.key_from_str(key)
+ if path:
+ # need to escalate
+ children_od = copy(rev._children[name])
+ child_rev = children_od[key]
+ child_node = child_rev.node
+ new_child_rev = child_node.remove(path, txid, mk_branch)
+ children_od[key] = new_child_rev
+ rev = rev.update_children(name, children_od, branch)
+ self._make_latest(branch, rev)
+ return rev
+ else:
+ # need to remove from this very node
+ children_od = copy(rev._children[name])
+ if self._proxy is not None:
+ data = children_od[field.key_from_str(key)].data
+ self._proxy.invoke_callbacks(
+ CallbackType.PRE_REMOVE, data)
+ post_anno = ((CallbackType.POST_REMOVE, data),)
+ else:
+ post_anno = ()
+ del children_od[field.key_from_str(key)]
+ rev = rev.update_children(name, children_od, branch)
+ self._make_latest(branch, rev, post_anno)
+ return rev
+ else:
+ raise ValueError('Cannot remove from non-keyed container')
+ else:
+ raise ValueError('Cannot remove non-conatiner field')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Branching ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _mk_txbranch(self, txid):
+ branch_point = self._branches[None].latest
+ branch = ConfigBranch(self, txid, branch_point)
+ self._branches[txid] = branch
+ return branch
+
+ def _del_txbranch(self, txid):
+ del self._branches[txid]
+
+ # def can_txbranch_be_merged(self, txid):
+ # try:
+ # self._merge_txbranch(txid, dry_run=True)
+ # except MergeConflictException:
+ # return False
+ # else:
+ # return True
+
+ def _merge_txbranch(self, txid, dry_run=False):
+ """
+ Make latest in branch to be latest in the common branch, but only
+ if no conflict is detected. Conflict is where the txbranch branch
+ point no longer matches the latest in the default branch. This has
+ to be verified recursively.
+ """
+
+ """
+ A transaction branch can be merged only if none of the following
+ happened with the master branch since the fork rev:
+ - the local data was changed both in the incoming node and in the
+ default branch since the branch point, and they differ now
+ - both branches changed the same children nodes in any way (local or
+ deep)
+ """
+
+ announcements = []
+
+ def _get_od_changes(lst1, lst2):
+ assert isinstance(lst2, OrderedDict)
+ added_keys = [k for k in lst2.iterkeys() if k not in lst1]
+ removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
+ changed_keys = [k for k in lst1.iterkeys()
+ if k in lst2 and lst1[k].hash != lst2[k].hash]
+ return added_keys, removed_keys, changed_keys
+
+ def _get_changes(lst1, lst2):
+ if isinstance(lst1, OrderedDict):
+ return _get_od_changes(lst1, lst2)
+ assert isinstance(lst1, list)
+ assert isinstance(lst2, list)
+ set1 = set(lst1)
+ set2 = set(lst2)
+ added = set2.difference(set1)
+ removed = set1.difference(set2)
+ changed = set() # no such thing in plain (unkeyed) lists
+ return added, removed, changed
+
+ def _escalate(child_rev):
+ child_branch = child_rev._branch
+ if child_branch._txid == txid:
+ child_rev = child_branch._node._merge_txbranch(txid, dry_run)
+ return child_rev
+
+ def _escalate_list(src_list):
+ if isinstance(src_list, list):
+ lst = []
+ for child_rev in src_list:
+ lst.append(_escalate(child_rev))
+ return lst
+ else: # OrderedDict
+ od = OrderedDict()
+ for key, child_rev in src_list.iteritems():
+ od[key] = _escalate(child_rev)
+ return od
+
+ def _add(dst, rev_or_key, src):
+ if isinstance(dst, list):
+ dst.append(_escalate(rev_or_key))
+ announcements.append((CallbackType.POST_ADD, rev_or_key.data))
+ else: # OrderedDict key, data is in lst
+ rev = src[rev_or_key]
+ dst[rev_or_key] = _escalate(rev)
+ announcements.append((CallbackType.POST_ADD, rev.data))
+
+ def _remove(dst, rev_or_key):
+ if isinstance(dst, list):
+ dst.remove(rev_or_key)
+ announcements.append((CallbackType.POST_REMOVE, rev_or_key))
+ else:
+ rev = dst[rev_or_key]
+ del dst[rev_or_key]
+ announcements.append((CallbackType.POST_REMOVE, rev.data))
+
+ src_branch = self._branches[txid]
+ dst_branch = self._branches[None]
+
+ fork_rev = src_branch.origin # rev from which src branch was made
+ src_rev = src_branch.latest # head rev of source branch
+ dst_rev = dst_branch.latest # head rev of target branch
+
+ # deal with config data first
+ if dst_rev._config is fork_rev._config:
+ # no change in master, accept src if different
+ config_changed = dst_rev._config != src_rev._config
+ else:
+ if dst_rev._config.hash != src_rev._config.hash:
+ raise MergeConflictException('Config collision')
+ config_changed = True
+
+ new_children = copy(dst_rev._children)
+ for field_name, field in children_fields(self._type).iteritems():
+ fork_list = fork_rev._children[field_name]
+ src_list = src_rev._children[field_name]
+ dst_list = dst_rev._children[field_name]
+ if 0: #dst_list == fork_list:
+ # no change in master, accept src if different
+ if src_list != fork_list:
+ new_children[field_name] = _escalate_list(src_list)
+ else:
+ src_added, src_removed, src_changed = _get_changes(
+ fork_list, src_list)
+ dst_added, dst_removed, dst_changed = _get_changes(
+ fork_list, dst_list)
+
+ lst = copy(new_children[field_name])
+ for to_add in src_added:
+ # we cannot add if it has been added and is different
+ if to_add in dst_added:
+ # this can happen only to keyed containers
+ assert isinstance(src_list, OrderedDict)
+ if src_list[to_add].hash != dst_list[to_add].hash:
+ raise MergeConflictException(
+ 'Cannot add because it has been added and '
+ 'different'
+ )
+ _add(lst, to_add, src_list)
+ for to_remove in src_removed:
+ # we cannot remove if it has changed in dst
+ if to_remove in dst_changed:
+ raise MergeConflictException(
+ 'Cannot remove because it has changed')
+ if to_remove not in dst_removed:
+ _remove(lst, to_remove)
+ for to_change in src_changed:
+ # we cannot change if it was removed in dst
+ if to_change in dst_removed:
+ raise MergeConflictException(
+ 'Cannot change because it has been removed')
+ # change can only be in keyed containers (OrderedDict)
+ lst[to_change] = _escalate(src_list[to_change])
+ new_children[field_name] = lst
+
+ if not dry_run:
+ rev = src_rev if config_changed else dst_rev
+ rev = rev.update_all_children(new_children, dst_branch)
+ if config_changed:
+ announcements.append((CallbackType.POST_UPDATE, rev.data))
+ self._make_latest(dst_branch, rev, announcements)
+ del self._branches[txid]
+ return rev
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def diff(self, hash1, hash2=None, txid=None):
+ branch = self._branches[txid]
+ rev1 = branch[hash1]
+ rev2 = branch[hash2] if hash2 else branch._latest
+ if rev1.hash == rev2.hash:
+ return JsonPatch([])
+ else:
+ dict1 = message_to_dict(rev1.data)
+ dict2 = message_to_dict(rev2.data)
+ return make_patch(dict1, dict2)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tagging utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def tag(self, tag, hash=None):
+ branch = self._branches[None] # tag only what has been committed
+ rev = branch._latest if hash is None else branch._revs[hash]
+ self._tags[tag] = rev
+ return self
+
+ @property
+ def tags(self):
+ return sorted(self._tags.iterkeys())
+
+ def by_tag(self, tag):
+ """
+ Return revision based on tag
+ :param tag: previously registered tag value
+ :return: revision object
+ """
+ return self._tags[tag]
+
+ def diff_by_tag(self, tag1, tag2):
+ return self.diff(self._tags[tag1].hash, self._tags[tag2].hash)
+
+ def delete_tag(self, tag):
+ del self._tags[tag]
+
+ def delete_tags(self, *tags):
+ for tag in tags:
+ del self._tags[tag]
+
+ def prune_untagged(self):
+ branch = self._branches[None]
+ keep = set(rev.hash for rev in self._tags.itervalues())
+ keep.add(branch._latest.hash)
+ for hash in branch._revs.keys():
+ if hash not in keep:
+ del branch._revs[hash]
+ return self
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def _test_no_children(self, data):
+ for field_name, field in children_fields(self._type).items():
+ field_value = getattr(data, field_name)
+ if field.is_container:
+ if len(field_value):
+ raise NotImplementedError(
+ 'Cannot update external children')
+ else:
+ if data.HasField(field_name):
+ raise NotImplementedError(
+ 'Cannot update externel children')
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Node proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get_proxy(self, path, exclusive=False):
+ return self._get_proxy(path, self, path, exclusive)
+
+ def _get_proxy(self, path, root, full_path, exclusive):
+ while path.startswith('/'):
+ path = path[1:]
+ if not path:
+ return self._mk_proxy(root, full_path, exclusive)
+
+ # need to escalate
+ rev = self._branches[None]._latest
+ name, _, path = path.partition('/')
+ field = children_fields(self._type)[name]
+ if field.is_container:
+ if not path:
+ raise ValueError('Cannot proxy a container field')
+ if field.key:
+ key, _, path = path.partition('/')
+ children_od = rev._children[name]
+ child_rev = children_od[key]
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ raise ValueError('Cannot index into container with no keys')
+
+ else:
+ child_rev = rev._children[name][0]
+ child_node = child_rev.node
+ return child_node._get_proxy(path, root, full_path, exclusive)
+
+ def _mk_proxy(self, root, full_path, exclusive):
+ if self._proxy is None:
+ self._proxy = ConfigProxy(root, self, full_path, exclusive)
+ else:
+ if self._proxy.exclusive:
+ raise ValueError('Node is already owned exclusively')
+ return self._proxy
diff --git a/voltha/core/config/config_proxy.py b/voltha/core/config/config_proxy.py
new file mode 100644
index 0000000..e4c6245
--- /dev/null
+++ b/voltha/core/config/config_proxy.py
@@ -0,0 +1,130 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from enum import Enum
+
+from voltha.core.config.config_txn import ConfigTransaction
+
+log = structlog.get_logger()
+
+
+class CallbackType(Enum):
+
+ # GET hooks are called after the data is retrieved and can be used to
+ # augment the data (they should only augment fields marked as REAL_TIME
+ GET = 1
+
+ # PRE_UPDATE hooks are called before the change is made and are supposed
+ # to be used to reject the data by raising an exception. If they don't,
+ # the change will be applied.
+ PRE_UPDATE = 2
+
+ # POST_UPDATE hooks are called after the update has occurred and can
+ # be used to deal with the change. For instance, an adapter can use the
+ # callback to trigger the south-bound configuration
+ POST_UPDATE = 3
+
+ # These behave similarly to the update callbacks as described above.
+ PRE_ADD = 4
+ POST_ADD = 5
+ PRE_REMOVE = 6
+ POST_REMOVE = 7
+
+
+class ConfigProxy(object):
+ """
+ Allows an entity to look at a sub-tree and see it as it was the whole tree
+ """
+ __slots__ = (
+ '_root',
+ '_node',
+ '_path',
+ '_exclusive',
+ '_callbacks'
+ )
+
+ def __init__(self, root, node, path, exclusive):
+ self._root = root
+ self._node = node
+ self._exclusive = exclusive
+ self._path = path # full path to proxied node
+ self._callbacks = {} # call back type -> list of callbacks
+
+ @property
+ def exclusive(self):
+ return self._exclusive
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ CRUD handlers ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None, txid=None):
+ return self._node.get(path, depth=depth, deep=deep, txid=txid)
+
+ def update(self, path, data, strict=False, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.update(full_path, data, strict, txid=txid)
+
+ def add(self, path, data, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.add(full_path, data, txid=txid)
+
+ def remove(self, path, txid=None):
+ assert path.startswith('/')
+ full_path = self._path if path == '/' else self._path + path
+ return self._root.remove(full_path, txid=txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Transaction support ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def open_transaction(self):
+ """Open a new transaction"""
+ txid = self._root.mk_txbranch()
+ return ConfigTransaction(self, txid)
+
+ def commit_transaction(self, txid):
+ """
+ If having an open transaction, commit it now. Will raise exception
+ if conflict is detected. Either way, transaction will be deleted.
+ """
+ self._root.fold_txbranch(txid)
+
+ def cancel_transaction(self, txid):
+ """
+ Cancel current transaction if we are in a transaction. Always succeeds.
+ """
+ self._root.del_txbranch(txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~ Callbacks registrations ~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def register_callback(self, callback_type, callback, *args, **kw):
+ lst = self._callbacks.setdefault(callback_type, [])
+ lst.append((callback, args, kw))
+
+ # ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def invoke_callbacks(self, callback_type, msg, proceed_on_errors=False):
+ lst = self._callbacks.get(callback_type, [])
+ for callback, args, kw in lst:
+ try:
+ msg = callback(msg, *args, **kw)
+ except Exception, e:
+ if proceed_on_errors:
+ log.exception(
+ 'call-back-error', callback_type=callback_type,
+ msg=msg, e=e)
+ else:
+ raise
+ return msg
diff --git a/voltha/core/config/config_rev.py b/voltha/core/config/config_rev.py
new file mode 100644
index 0000000..35f0ef5
--- /dev/null
+++ b/voltha/core/config/config_rev.py
@@ -0,0 +1,295 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Immutable classes to store config revision information arranged in a tree.
+
+Immutability cannot be enforced in Python, so anyoen working with these
+classes directly must obey the rules.
+"""
+
+import weakref
+from collections import OrderedDict
+from copy import copy
+from time import time
+from hashlib import md5
+
+from google.protobuf.descriptor import Descriptor
+from simplejson import dumps
+
+from common.utils.json_format import MessageToJson
+from voltha.protos import third_party
+from voltha.protos import meta_pb2
+
+
+def is_proto_message(o):
+ """
+ Return True if object o appears to be a protobuf message; False otherwise.
+ """
+ # use a somewhat empirical approach to decide if something looks like
+ # a protobuf message
+ return isinstance(getattr(o, 'DESCRIPTOR', None), Descriptor)
+
+
+def message_to_json_concise(m):
+ """
+ Return the most concise string representation of a protobuf. Good for
+ things where size matters (e.g., generating hash).
+ """
+ return MessageToJson(m, False, True, False)
+
+
+_rev_cache = weakref.WeakValueDictionary() # cache of config revs
+
+
+_children_fields_cache = {} # to memoize externally stored field name info
+
+
+class _ChildType(object):
+ """Used to store key metadata about child_node fields in protobuf messages.
+ """
+ __slots__ = ('_is_container', '_key', '_key_from_str')
+ def __init__(self, is_container, key=None, key_from_str=None):
+ self._is_container = is_container
+ self._key = key
+ self._key_from_str = key_from_str
+ @property
+ def is_container(self):
+ return self._is_container
+ @property
+ def key(self):
+ return self._key
+ @property
+ def key_from_str(self):
+ return self._key_from_str
+
+
+def children_fields(cls):
+ """
+ Return a map of externally stored fields for this protobuf message type.
+ What is stored as branch node is determined by the "child_node"
+ annotation in the protobuf definitions.
+ With each external field, we store if the field is a container, if a
+ container is keyed (indexed), and what is the function that converts
+ path substring back to the key.
+ """
+ names = _children_fields_cache.get(cls)
+ if names is None:
+ names = {}
+ for field in cls.DESCRIPTOR.fields:
+ if field.has_options:
+ options = field.GetOptions()
+ if options.HasExtension(meta_pb2.child_node):
+ is_container = field.label == 3
+ meta = options.Extensions[meta_pb2.child_node]
+ key_from_str = None
+ if meta.key:
+ key_field = field.message_type.fields_by_name[meta.key]
+ key_type = key_field.type
+ if key_type == key_field.TYPE_STRING:
+ key_from_str = lambda s: s
+ elif key_type in (
+ key_field.TYPE_FIXED32,
+ key_field.TYPE_FIXED64,
+ key_field.TYPE_INT32,
+ key_field.TYPE_INT64,
+ key_field.TYPE_SFIXED32,
+ key_field.TYPE_SFIXED64,
+ key_field.TYPE_SINT32,
+ key_field.TYPE_SINT64,
+ key_field.TYPE_UINT32,
+ key_field.TYPE_UINT64):
+ key_from_str = lambda s: int(s)
+ else:
+ raise NotImplementedError()
+ names[field.name] = _ChildType(is_container, meta.key,
+ key_from_str)
+ _children_fields_cache[cls] = names
+ return names
+
+
+_access_right_cache = {} # to memoize field access right restrictions
+
+
+def access_rights(cls):
+ """
+ Determine the access rights for each field and cache these maps for
+ fast retrieval.
+ """
+ access_map = _access_right_cache.get(cls)
+ if access_map is None:
+ access_map = {}
+ for field in cls.DESCRIPTOR.fields:
+ if field.has_options:
+ options = field.GetOptions()
+ if options.HasExtension(meta_pb2.access):
+ access = options.Extensions[meta_pb2.access]
+ access_map[field.name] = access
+ _access_right_cache[cls] = access_map
+ return access_map
+
+
+class ConfigDataRevision(object):
+ """
+ Holds a specific snapshot of the local configuration for config node.
+ It shall be treated as an immutable object, although in Python this is
+ very difficult to enforce!
+ As such, we can compute a unique hash based on the config data which
+ can be used to establish equivalence. It also has a time-stamp to track
+ changes.
+
+ This object must be treated as immutable, including its nested config data.
+ This is very important. The entire config module depends on hashes
+ we create over the data, so altering the data can lead to unpredictable
+ detriments.
+ """
+
+ __slots__ = ('_data', '_ts', '_hash')
+
+ def __init__(self, data):
+ self._data = data
+ self._ts = time()
+ self._hash = self._hash_data(data)
+
+ @property
+ def data(self):
+ return self._data
+
+ @property
+ def ts(self):
+ return self._ts
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @staticmethod
+ def _hash_data(data):
+ """Hash function to be used to track version changes of config nodes"""
+ if isinstance(data, (dict, list)):
+ signature = dumps(data)
+ elif is_proto_message(data):
+ signature = message_to_json_concise(data)
+ else:
+ signature = str(hash(data))
+ return md5(signature).hexdigest()[:12]
+
+
+class ConfigRevision(object):
+ """
+ Holds not only the local config data, but also the external children
+ reference lists, per field name.
+ Recall that externally stored fields are those marked "child_node" in
+ the protobuf definition.
+ This object must be treated as immutable, including its config data.
+ """
+
+ __slots__ = (
+ '_config',
+ '_children',
+ '_hash',
+ '_branch',
+ '__weakref__'
+ )
+
+ def __init__(self, branch, data, children=None):
+ self._branch = branch
+ self._config = ConfigDataRevision(data)
+ self._children = children
+ self._update_metainfo()
+
+ def _update_metainfo(self):
+ self._hash = self._hash_content()
+ if self._hash not in _rev_cache:
+ _rev_cache[self._hash] = self
+
+ def _hash_content(self):
+ # hash is derived from config hash and hashes of all children
+ m = md5('' if self._config is None else self._config.hash)
+ if self._children is not None:
+ for children in self._children.itervalues():
+ if isinstance(children, OrderedDict):
+ for child in children.itervalues():
+ m.update(child.hash)
+ else:
+ for child in children:
+ m.update(child.hash)
+ return m.hexdigest()[:12]
+
+ @property
+ def hash(self):
+ return self._hash
+
+ @property
+ def data(self):
+ return None if self._config is None else self._config.data
+
+ @property
+ def node(self):
+ return self._branch._node
+
+ @property
+ def type(self):
+ return self._config.data.__class__
+
+ def get(self, depth):
+ """
+ Get config data of node. If depth > 0, recursively assemble the
+ branch nodes. If depth is < 0, this results in a fully exhaustive
+ "complete config".
+ """
+ data = copy(self._config.data)
+ if depth:
+ # collect children
+ cfields = children_fields(self.type).iteritems()
+ for field_name, field in cfields:
+ if field.is_container:
+ if field.key:
+ children = self._children[field_name].itervalues()
+ else:
+ children = self._children[field_name]
+ for rev in children:
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name).add()
+ child_data_holder.MergeFrom(child_data)
+ else:
+ rev = self._children[field_name][0]
+ child_data = rev.get(depth=depth - 1)
+ child_data_holder = getattr(data, field_name)
+ child_data_holder.MergeFrom(child_data)
+ return data
+
+ def update_data(self, data, branch):
+ """Return a NEW revision which is updated for the modified data"""
+ return ConfigRevision(branch, data=data, children=self._children)
+
+ def update_children(self, name, children, branch):
+ """Return a NEW revision which is updated for the modified children"""
+ new_children = copy(self._children)
+ new_children[name] = children
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = new_children
+ new_rev._update_metainfo()
+ return new_rev
+
+ def update_all_children(self, children, branch):
+ """Return a NEW revision which is updated for all children entries"""
+ new_rev = copy(self)
+ new_rev._branch = branch
+ new_rev._children = children
+ new_rev._update_metainfo()
+ return new_rev
diff --git a/voltha/core/config/config_root.py b/voltha/core/config/config_root.py
new file mode 100644
index 0000000..92e50cc
--- /dev/null
+++ b/voltha/core/config/config_root.py
@@ -0,0 +1,96 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from uuid import uuid4
+
+import structlog
+
+from voltha.core.config.config_node import ConfigNode, MergeConflictException
+
+log = structlog.get_logger()
+
+
+class ConfigRoot(ConfigNode):
+
+ __slots__ = (
+ '_dirty_nodes', # holds set of modified nodes per transaction branch
+ )
+
+ def __init__(self, initial_data):
+ super(ConfigRoot, self).__init__(initial_data, False)
+ self._dirty_nodes = {}
+
+ def mk_txbranch(self):
+ txid = uuid4().hex[:12]
+ self._dirty_nodes[txid] = {self}
+ self._mk_txbranch(txid)
+ return txid
+
+ def del_txbranch(self, txid):
+ for dirty_node in self._dirty_nodes[txid]:
+ dirty_node._del_txbranch(txid)
+ del self._dirty_nodes[txid]
+
+ def fold_txbranch(self, txid):
+ try:
+ self._merge_txbranch(txid, dry_run=1)
+ except MergeConflictException:
+ self.del_txbranch(txid)
+ raise
+
+ self._merge_txbranch(txid)
+
+ # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
+
+ def update(self, path, data, strict=None, txid=None, mk_branch=None):
+ assert mk_branch is None
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ return super(ConfigRoot, self).update(path, data, strict,
+ txid, track_dirty)
+ else:
+ return super(ConfigRoot, self).update(path, data, strict)
+
+ def add(self, path, data, txid=None, mk_branch=None):
+ assert mk_branch is None
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ return super(ConfigRoot, self).add(path, data, txid, track_dirty)
+ else:
+ return super(ConfigRoot, self).add(path, data)
+
+ def remove(self, path, txid=None, mk_branch=None):
+ assert mk_branch is None
+ if txid is not None:
+ dirtied = self._dirty_nodes[txid]
+
+ def track_dirty(node):
+ dirtied.add(node)
+ return node._mk_txbranch(txid)
+
+ return super(ConfigRoot, self).remove(path, txid, track_dirty)
+ else:
+ return super(ConfigRoot, self).remove(path)
+
diff --git a/voltha/core/config/config_txn.py b/voltha/core/config/config_txn.py
new file mode 100644
index 0000000..5e37233
--- /dev/null
+++ b/voltha/core/config/config_txn.py
@@ -0,0 +1,73 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class ClosedTransactionError(Exception):
+ pass
+
+
+class ConfigTransaction(object):
+
+ __slots__ = (
+ '_proxy',
+ '_txid'
+ )
+
+ def __init__(self, proxy, txid):
+ self._proxy = proxy
+ self._txid = txid
+
+ def __del__(self):
+ if self._txid:
+ try:
+ self.cancel()
+ except:
+ raise
+
+ # ~~~~~~~~~~~~~~~~~~~~ CRUD ops within the transaction ~~~~~~~~~~~~~~~~~~~~
+
+ def get(self, path='/', depth=None, deep=None):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.get(path, depth=depth, deep=deep, txid=self._txid)
+
+ def update(self, path, data, strict=False):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.update(path, data, strict, self._txid)
+
+ def add(self, path, data):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.add(path, data, self._txid)
+
+ def remove(self, path):
+ if self._txid is None:
+ raise ClosedTransactionError()
+ return self._proxy.remove(path, self._txid)
+
+ # ~~~~~~~~~~~~~~~~~~~~ transaction finalization ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def cancel(self):
+ """Explicitly cancel the transaction"""
+ self._proxy.cancel_transaction(self._txid)
+ self._txid = None
+
+ def commit(self):
+ """Commit all transaction changes"""
+ try:
+ self._proxy.commit_transaction(self._txid)
+ finally:
+ self._txid = None
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 14ee432..9eb82ff 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -18,25 +18,50 @@
Voltha's CORE components.
"""
import structlog
+from zope.interface import implementer
+
+from common.utils.grpc_utils import twisted_async
+from voltha.core.config.config_root import ConfigRoot
+from voltha.protos import third_party
+from voltha.protos.voltha_pb2 import add_VolthaServiceServicer_to_server, \
+ Voltha, VolthaServiceServicer
+from voltha.registry import IComponent, registry
log = structlog.get_logger()
-class VolthaCore(object):
+@implementer(IComponent)
+class VolthaCore(VolthaServiceServicer):
- def __init__(self):
+ def __init__(self, **kw):
self.stopped = False
+ self.config_root = self._mk_config_root(**kw)
+ registry('grpc_server').register(
+ add_VolthaServiceServicer_to_server, self)
def start(self):
log.debug('starting')
pass
log.info('started')
+ return self
def stop(self):
log.debug('stopping')
self.stopped = True
log.info('stopped')
- # TODO
+ def get_proxy(self, path, exclusive=False):
+ return self.config_root.get_proxy(path, exclusive)
+ def _mk_config_root(self, **kw):
+ root_data = Voltha(**kw)
+ return ConfigRoot(root_data)
+
+ # gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
+ # the gRPC threadpool threads.
+
+ @twisted_async
+ def GetVoltha(self, request, context):
+ log.info('get-voltha', request=request)
+ return self.config_root.get('/', deep=1)
diff --git a/voltha/main.py b/voltha/main.py
index a19c1ba..39ff1f4 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -30,9 +30,14 @@
get_my_primary_local_ipv4
from voltha.adapters.loader import AdapterLoader
from voltha.coordinator import Coordinator
+from voltha.core.core import VolthaCore
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from voltha.northbound.rest.health_check import init_rest_service
+from voltha.protos.common_pb2 import INFO
+from voltha.registry import registry
+
+VERSION = '0.9.0'
defs = dict(
config=os.environ.get('CONFIG', './voltha.yml'),
@@ -214,12 +219,6 @@
# configurable variables from voltha.yml file
#self.configurable_vars = self.config.get('Constants', {})
- # components
- self.coordinator = None
- self.grpc_server = None
- self.kafka_proxy = None
- self.adapter_loader = None
-
if not args.no_banner:
print_banner(self.log)
@@ -236,40 +235,49 @@
def startup_components(self):
try:
self.log.info('starting-internal-components')
- self.coordinator = yield Coordinator(
+
+ coordinator = yield Coordinator(
internal_host_address=self.args.internal_host_address,
external_host_address=self.args.external_host_address,
rest_port=self.args.rest_port,
instance_id=self.args.instance_id,
config=self.config,
consul=self.args.consul).start()
+ registry.register('coordinator', coordinator)
+
init_rest_service(self.args.rest_port)
- self.grpc_server = yield VolthaGrpcServer(self.args.grpc_port).start()
+ grpc_server = \
+ yield VolthaGrpcServer(self.args.grpc_port).start()
+ registry.register('grpc_server', grpc_server)
- # initialize kafka proxy singleton
- self.kafka_proxy = yield KafkaProxy(self.args.consul, self.args.kafka)
+ core = \
+ yield VolthaCore(
+ instance_id=self.args.instance_id,
+ version=VERSION,
+ log_level=INFO
+ ).start()
+ registry.register('core', core)
- # adapter loader
- self.adapter_loader = yield AdapterLoader(
+ kafka_proxy = \
+ yield KafkaProxy(self.args.consul, self.args.kafka).start()
+ registry.register('kafka_proxy', kafka_proxy)
+
+ adapter_loader = yield AdapterLoader(
config=self.config.get('adapter_loader', {})).start()
+ registry.register('adapter_loader', adapter_loader)
self.log.info('started-internal-services')
except Exception as e:
self.log.exception('Failure to start all components {}'.format(e))
-
@inlineCallbacks
def shutdown_components(self):
"""Execute before the reactor is shut down"""
self.log.info('exiting-on-keyboard-interrupt')
- if self.adapter_loader is not None:
- yield self.adapter_loader.stop()
- if self.coordinator is not None:
- yield self.coordinator.stop()
- if self.grpc_server is not None:
- yield self.grpc_server.stop()
+ for component in reversed(registry.iterate()):
+ yield component.stop()
def start_reactor(self):
from twisted.internet import reactor
diff --git a/voltha/northbound/grpc/grpc_server.py b/voltha/northbound/grpc/grpc_server.py
index a0b3c8a..27b9a4b 100644
--- a/voltha/northbound/grpc/grpc_server.py
+++ b/voltha/northbound/grpc/grpc_server.py
@@ -25,20 +25,18 @@
from structlog import get_logger
import zlib
+from zope.interface import implementer
+
from common.utils.grpc_utils import twisted_async
from voltha.core.device_model import DeviceModel
from voltha.protos import voltha_pb2, schema_pb2
from google.protobuf.empty_pb2 import Empty
+from voltha.registry import IComponent
log = get_logger()
-_nbi_server = None # this will become the default server instance
-
-def get_nbi_server(): return _nbi_server
-
-
class SchemaService(schema_pb2.SchemaServiceServicer):
def __init__(self, thread_pool):
@@ -239,22 +237,16 @@
self.packet_in_queue.put(packet_in)
+@implementer(IComponent)
class VolthaGrpcServer(object):
def __init__(self, port=50055):
- self._register_singleton()
self.port = port
log.info('init-grpc-server', port=self.port)
self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
self.server = grpc.server(self.thread_pool)
self.services = []
- def _register_singleton(self):
- global _nbi_server
- if _nbi_server is not None:
- raise RuntimeError('cannot-run-two-servers')
- _nbi_server = self
-
def start(self):
log.debug('starting')
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 0ce66e5..a710400 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -21,11 +21,15 @@
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
from twisted.internet.defer import inlineCallbacks
+from zope.interface import implementer
from common.utils.consulhelpers import get_endpoint_from_consul
+from voltha.registry import IComponent
log = get_logger()
+
+@implementer(IComponent)
class KafkaProxy(object):
"""
This is a singleton proxy kafka class to hide the kafka client details.
@@ -50,9 +54,17 @@
self.kclient = None
self.kproducer = None
+ def start(self):
+ log.debug('starting')
self._get_kafka_producer()
-
KafkaProxy._kafka_instance = self
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ pass
+ log.info('stopped')
def _get_kafka_producer(self):
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
diff --git a/voltha/protos/adapter.proto b/voltha/protos/adapter.proto
index 5c53f76..8791ee9 100644
--- a/voltha/protos/adapter.proto
+++ b/voltha/protos/adapter.proto
@@ -6,11 +6,15 @@
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "common.proto";
+import "meta.proto";
message AdapterConfig {
- // Common adapter config attributes here
+ // Common adapter config attributes here
+ LogLevel log_level = 1;
+
+ // Custom (vendor-specific) configuration attributes
google.protobuf.Any additional_config = 64;
}
@@ -20,16 +24,17 @@
// Unique name of adapter, matching the python packate name under
// voltha/adapters.
- string id = 1;
-
- // TODO
- // ...
+ string id = 1 [(access) = READ_ONLY];
+ string vendor = 2 [(access) = READ_ONLY];
+ string version = 3 [(access) = READ_ONLY];
// Adapter configuration
AdapterConfig config = 16;
// Custom descriptors and custom configuration
- google.protobuf.Any additional_description = 64;
+ google.protobuf.Any additional_description = 64 [(access) = READ_ONLY];
+
+ repeated string logical_device_ids = 4; // Logical devices "owned"
}
diff --git a/voltha/protos/common.proto b/voltha/protos/common.proto
index 64e8a3e..b3e1e34 100644
--- a/voltha/protos/common.proto
+++ b/voltha/protos/common.proto
@@ -7,4 +7,10 @@
string id = 1;
}
-
+enum LogLevel {
+ DEBUG = 0;
+ INFO = 1;
+ WARNING = 2;
+ ERROR = 3;
+ CRITICAL = 4;
+}
diff --git a/voltha/protos/health.proto b/voltha/protos/health.proto
index c4d2198..1dff496 100644
--- a/voltha/protos/health.proto
+++ b/voltha/protos/health.proto
@@ -4,9 +4,11 @@
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
+import "meta.proto";
// Encode health status of a Voltha instance
message HealthStatus {
+
// Health states
enum HealthState {
HEALTHY = 0; // The instance is healthy
@@ -15,7 +17,7 @@
}
// Current state of health of this Voltha instance
- HealthState state = 1;
+ HealthState state = 1 [(access) = READ_ONLY];
}
// Health related services
diff --git a/voltha/protos/logical_layer.proto b/voltha/protos/logical_layer.proto
index 16b5b46..f86a39e 100644
--- a/voltha/protos/logical_layer.proto
+++ b/voltha/protos/logical_layer.proto
@@ -2,6 +2,7 @@
package voltha;
+import "meta.proto";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "common.proto";
@@ -10,7 +11,12 @@
message LogicalDevice {
string id = 1;
uint64 datapath_id = 2;
+
openflow_13.ofp_desc desc = 3;
+
+ repeated openflow_13.ofp_port ports = 4 [(child_node) = {key:"port_no"}];
+ repeated openflow_13.ofp_flow_stats flows = 5;
+ repeated openflow_13.ofp_group_entry flow_groups = 6;
}
message LogicalDevices {
diff --git a/voltha/protos/meta.proto b/voltha/protos/meta.proto
new file mode 100644
index 0000000..755a087
--- /dev/null
+++ b/voltha/protos/meta.proto
@@ -0,0 +1,47 @@
+// Copyright (c) 2015, Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file contains annotation definitions that can be used to describe
+// a configuration tree.
+
+syntax = "proto3";
+
+package voltha;
+
+import "google/api/http.proto";
+import "google/protobuf/descriptor.proto";
+
+message ChildNode {
+ string key = 1;
+}
+
+enum Access {
+ CONFIG = 0; // read-write, stored
+ READ_ONLY = 1; // read-only field
+}
+
+extend google.protobuf.FieldOptions {
+
+ // If present, it indicates that this field is stored as external child node
+ // or children nodes in Voltha's internal configuration tree.
+ // If the field is a container field and if the option specifies a key
+ // the child objects will be addressible by that key.
+ ChildNode child_node = 7761772;
+
+ // This annotation can be used to indicate that a field is read-only,
+ // from the perspective of NBI access. Backend plugins and system
+ // internals can update the field but the update requests through the
+ // NBI will ignore for instance a field that is marked as read-only (RO).
+ Access access = 7761773;
+}
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index d707780..163734c 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -8,6 +8,10 @@
package voltha;
+import "google/protobuf/empty.proto";
+import "google/api/annotations.proto";
+
+import public "meta.proto";
import public "common.proto";
import public "health.proto";
import public "logical_layer.proto";
@@ -17,3 +21,26 @@
option java_package = "org.opencord.voltha";
option java_outer_classname = "VolthaProtos";
option csharp_namespace = "Opencord.Voltha.Voltha";
+
+// Top-level (root) config node for Voltha
+message Voltha {
+
+ string instance_id = 1 [(access) = READ_ONLY];
+ string version = 2 [(access) = READ_ONLY];
+ LogLevel log_level = 3;
+
+ HealthStatus health = 10 [(child_node) = {}];
+ repeated Adapter adapters = 11 [(child_node) = { key: "id" }];
+ repeated LogicalDevice logical_devices = 12 [(child_node) = {key:"id"}];
+
+}
+
+service VolthaService {
+
+ rpc GetVoltha(google.protobuf.Empty) returns(Voltha) {
+ option (google.api.http) = {
+ get: "/voltha"
+ };
+ }
+
+}
diff --git a/voltha/registry.py b/voltha/registry.py
new file mode 100644
index 0000000..767cb7e
--- /dev/null
+++ b/voltha/registry.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple component registry to provide centralized access to any registered
+components.
+"""
+from collections import OrderedDict
+from zope.interface import Interface
+
+
+class IComponent(Interface):
+ """
+ A Voltha Component
+ """
+
+ def start():
+ """
+ Called once the componet is instantiated. Can be used for async
+ initialization.
+ :return: (None or Deferred)
+ """
+
+ def stop():
+ """
+ Called once before the component is unloaded. Can be used for async
+ cleanup operations.
+ :return: (None or Deferred)
+ """
+
+
+class Registry(object):
+
+ def __init__(self):
+ self.components = OrderedDict()
+
+ def register(self, name, component):
+ assert IComponent.providedBy(component)
+ assert name not in self.components
+ self.components[name] = component
+
+ def unregister(self, name):
+ if name in self.components:
+ del self.components[name]
+
+ def __call__(self, name):
+ return self.components[name]
+
+ def iterate(self):
+ return self.components.values()
+
+
+# public shared registry
+registry = Registry()