Major rework of gRPC handling (do not merge yet)
Includes the following chages:
* Refactored proto files
- separation of logical devices vs devices
- common flow related message types moved to openflow_13
- most RPC is defined in voltha.proto now
* Expanded RPC definitions to cover now most of what we
need (a few device provisioning RPCs are still missing)
* Reworked RPC handlers to work with new config tree
* Implemented test cases for all existing RPCs, tested via
chameleon's REST service
* Did away wih the OrderedDict internal representation
in the config nodes (3x performance boost on bulk
add, and negligible penalty in other ops)
* Refactored transacton merge handling to align with
new structures
Change-Id: I3740ec13b8296943b307782e86e6b596af78140e
diff --git a/chameleon/grpc_client/grpc_client.py b/chameleon/grpc_client/grpc_client.py
index 36618be..74933cd 100644
--- a/chameleon/grpc_client/grpc_client.py
+++ b/chameleon/grpc_client/grpc_client.py
@@ -30,7 +30,7 @@
from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
from werkzeug.exceptions import ServiceUnavailable
from common.utils.asleep import asleep
@@ -251,7 +251,8 @@
log.debug('test-import', modname=modname)
_ = __import__(modname)
- def invoke(self, stub, method_name, request):
+ @inlineCallbacks
+ def invoke(self, stub, method_name, request, retry=1):
"""
Invoke a gRPC call to the remote server and return the response.
:param stub: Reference to the *_pb2 service stub
@@ -265,16 +266,22 @@
try:
response = getattr(stub(self.channel), method_name)(request)
- return response
+ returnValue(response)
except grpc._channel._Rendezvous, e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
e = ServiceUnavailable()
+
+ if self.connected:
+ self.connected = False
+ yield self.connect()
+ if retry > 0:
+ response = yield self.invoke(stub, method_name,
+ request,
+ retry=retry - 1)
+ returnValue(response)
+
else:
log.exception(e)
- if self.connected :
- self.connected = False
- reactor.callLater(0, self.connect)
-
raise e
diff --git a/chameleon/protoc_plugins/gw_gen.py b/chameleon/protoc_plugins/gw_gen.py
index 4596bef..c5a8875 100755
--- a/chameleon/protoc_plugins/gw_gen.py
+++ b/chameleon/protoc_plugins/gw_gen.py
@@ -32,7 +32,9 @@
from simplejson import dumps, load
from structlog import get_logger
-from protobuf_to_dict import protobuf_to_dict, dict_to_protobuf
+from protobuf_to_dict import dict_to_protobuf
+from google.protobuf.json_format import MessageToDict
+from twisted.internet.defer import inlineCallbacks, returnValue
{% set package = file_name.replace('.proto', '') %}
@@ -54,6 +56,7 @@
{% set method_name = method['service'].rpartition('.')[2] + '_' + method['method'] %}
{% set path = method['path'].replace('{', '<string:').replace('}', '>') %}
@app.route('{{ path }}', methods=['{{ method['verb'].upper() }}'])
+ @inlineCallbacks
def {{ method_name }}(server, request, **kw):
log.debug('{{ method_name }}', request=request, server=server, **kw)
{% if method['body'] == '*' %}
@@ -69,11 +72,11 @@
except Exception, e:
log.error('cannot-convert-to-protobuf', e=e, data=data)
raise
- res = grpc_client.invoke(
+ res = yield grpc_client.invoke(
{{ type_map[method['service']] }}Stub,
'{{ method['method'] }}', req)
try:
- out_data = protobuf_to_dict(res, use_enum_labels=True)
+ out_data = MessageToDict(res, True, True)
except AttributeError, e:
filename = '/tmp/chameleon_failed_to_convert_data.pbd'
with file(filename, 'w') as f:
@@ -82,7 +85,7 @@
raise
request.setHeader('Content-Type', 'application/json')
log.debug('{{ method_name }}', **out_data)
- return dumps(out_data)
+ returnValue(dumps(out_data))
{% endfor %}
diff --git a/common/utils/grpc_utils.py b/common/utils/grpc_utils.py
index 86abdba..b9cc8f7 100644
--- a/common/utils/grpc_utils.py
+++ b/common/utils/grpc_utils.py
@@ -20,6 +20,7 @@
from concurrent.futures import Future
from twisted.internet import reactor
from twisted.internet.defer import Deferred
+from twisted.python.threadable import isInIOThread
def twisted_async(func):
@@ -60,6 +61,10 @@
"""
def in_thread_wrapper(*args, **kw):
+ if isInIOThread():
+
+ return func(*args, **kw)
+
f = Future()
def twisted_wrapper():
diff --git a/tests/itests/voltha/__init__.py b/tests/itests/voltha/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/itests/voltha/__init__.py
diff --git a/tests/itests/voltha/test_voltha_rest.py b/tests/itests/voltha/test_voltha_rest.py
new file mode 100644
index 0000000..78b1891
--- /dev/null
+++ b/tests/itests/voltha/test_voltha_rest.py
@@ -0,0 +1,347 @@
+from google.protobuf.json_format import MessageToDict
+from requests import get, post, put, patch, delete
+from unittest import TestCase, main
+
+from voltha.protos.openflow_13_pb2 import FlowTableUpdate, ofp_flow_mod, \
+ OFPFC_ADD, ofp_instruction, OFPIT_APPLY_ACTIONS, ofp_instruction_actions, \
+ ofp_action, OFPAT_OUTPUT, ofp_action_output, FlowGroupTableUpdate, \
+ ofp_group_mod, OFPGC_ADD, OFPGT_ALL, ofp_bucket
+
+
+class TestRestCases(TestCase):
+
+ base_url = 'http://localhost:8881'
+
+ def url(self, path):
+ while path.startswith('/'):
+ path = path[1:]
+ return self.base_url + '/' + path
+
+ def verify_content_type_and_return(self, response, expected_content_type):
+ if 200 <= response.status_code < 300:
+ self.assertEqual(
+ response.headers['Content-Type'],
+ expected_content_type,
+ msg='Content-Type %s != %s; msg:%s' % (
+ response.headers['Content-Type'],
+ expected_content_type,
+ response.content))
+ if expected_content_type == 'application/json':
+ return response.json()
+ else:
+ return response.content
+
+ def get(self, path, expected_code=200,
+ expected_content_type='application/json'):
+ r = get(self.url(path))
+ self.assertEqual(r.status_code, expected_code,
+ msg='Code %d!=%d; msg:%s' % (
+ r.status_code, expected_code, r.content))
+ return self.verify_content_type_and_return(r, expected_content_type)
+
+ def post(self, path, json_dict, expected_code=201):
+ r = post(self.url(path), json=json_dict)
+ self.assertEqual(r.status_code, expected_code,
+ msg='Code %d!=%d; msg:%s' % (
+ r.status_code, expected_code, r.content))
+ return self.verify_content_type_and_return(r, 'application/json')
+
+ def put(self, path, json_dict, expected_code=200):
+ r = put(self.url(path), json=json_dict)
+ self.assertEqual(r.status_code, expected_code,
+ msg='Code %d!=%d; msg:%s' % (
+ r.status_code, expected_code, r.content))
+ return self.verify_content_type_and_return(r, 'application/json')
+
+ def delete(self, path, expected_code=209):
+ r = delete(self.url(path))
+ self.assertEqual(r.status_code, expected_code,
+ msg='Code %d!=%d; msg:%s' % (
+ r.status_code, expected_code, r.content))
+
+ # ~~~~~~~~~~~~~~~~~~~~~ GLOBAL TOP-LEVEL SERVICES~ ~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def test_get_root(self):
+ res = self.get('/', expected_content_type='text/html')
+ self.assertGreaterEqual(res.find('swagger'), 0)
+
+ def test_get_schema(self):
+ res = self.get('/schema')
+ self.assertEqual(set(res.keys()), {'protos', 'swagger_from'})
+
+ def test_get_health(self):
+ res = self.get('/health')
+ self.assertEqual(res['state'], 'HEALTHY')
+
+ # ~~~~~~~~~~~~~~~~~~~~~ TOP LEVEL VOLTHA OPERATIONS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def test_get_voltha(self):
+ res = self.get('/api/v1')
+ self.assertEqual(res['version'], '0.9.0')
+
+ def test_list_voltha_instances(self):
+ res = self.get('/api/v1/instances')
+ self.assertEqual(len(res['items']), 1)
+
+ def test_get_voltha_instance(self):
+ res = self.get('/api/v1/instances/1')
+ self.assertEqual(res['version'], '0.9.0')
+
+ def test_list_logical_devices(self):
+ res = self.get('/api/v1/logical_devices')
+ self.assertGreaterEqual(len(res['items']), 1)
+
+ def test_get_logical_device(self):
+ res = self.get('/api/v1/logical_devices/simulated1')
+ self.assertEqual(res['datapath_id'], '1') # TODO should be int
+
+ def test_list_logical_device_ports(self):
+ res = self.get('/api/v1/logical_devices/simulated1/ports')
+ self.assertGreaterEqual(len(res['items']), 3)
+
+ def test_list_and_update_logical_device_flows(self):
+
+ # retrieve flow list
+ res = self.get('/api/v1/logical_devices/simulated1/flows')
+ len_before = len(res['items'])
+
+ # add some flows
+ req = FlowTableUpdate(
+ id='simulated1',
+ flow_mod=ofp_flow_mod(
+ command=OFPFC_ADD,
+ instructions=[
+ ofp_instruction(
+ type=OFPIT_APPLY_ACTIONS,
+ actions=ofp_instruction_actions(
+ actions=[
+ ofp_action(
+ type=OFPAT_OUTPUT,
+ output=ofp_action_output(
+ port=1
+ )
+ )
+ ]
+ )
+ )
+ ]
+ )
+ )
+
+ res = self.post('/api/v1/logical_devices/simulated1/flows',
+ MessageToDict(req, preserving_proto_field_name=True))
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/logical_devices/simulated1/flows')
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def test_list_and_update_logical_device_flow_groups(self):
+
+ # retrieve flow list
+ res = self.get('/api/v1/logical_devices/simulated1/flow_groups')
+ len_before = len(res['items'])
+
+ # add some flows
+ req = FlowGroupTableUpdate(
+ id='simulated1',
+ group_mod=ofp_group_mod(
+ command=OFPGC_ADD,
+ type=OFPGT_ALL,
+ group_id=1,
+ buckets=[
+ ofp_bucket(
+ actions=[
+ ofp_action(
+ type=OFPAT_OUTPUT,
+ output=ofp_action_output(
+ port=1
+ )
+ )
+ ]
+ )
+ ]
+ )
+ )
+
+ res = self.post('/api/v1/logical_devices/simulated1/flow_groups',
+ MessageToDict(req, preserving_proto_field_name=True))
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/logical_devices/simulated1/flow_groups')
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def test_list_devices(self):
+ res = self.get('/api/v1/devices')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_get_device(self):
+ res = self.get('/api/v1/devices/simulated_olt_1')
+ # TODO test result
+
+ def test_list_device_ports(self):
+ res = self.get('/api/v1/devices/simulated_olt_1/ports')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_list_device_flows(self):
+ res = self.get('/api/v1/devices/simulated_olt_1/flows')
+ self.assertGreaterEqual(len(res['items']), 0)
+
+ def test_list_device_flow_groups(self):
+ res = self.get('/api/v1/devices/simulated_olt_1/flow_groups')
+ self.assertGreaterEqual(len(res['items']), 0)
+
+ def test_list_device_types(self):
+ res = self.get('/api/v1/device_types')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_get_device_type(self):
+ res = self.get('/api/v1/device_types/simulated_olt')
+ # TODO test the result
+
+ def test_list_device_groups(self):
+ res = self.get('/api/v1/device_groups')
+ self.assertGreaterEqual(len(res['items']), 1)
+
+ def test_get_device_group(self):
+ res = self.get('/api/v1/device_groups/1')
+ # TODO test the result
+
+ # ~~~~~~~~~~~~~~~~~~ VOLTHA INSTANCE LEVEL OPERATIONS ~~~~~~~~~~~~~~~~~~~~~
+
+ def test_get_local(self):
+ self.assertEqual(self.get('/api/v1/local')['version'], '0.9.0')
+
+ def test_get_local_health(self):
+ d = self.get('/api/v1/local/health')
+ self.assertEqual(d['state'], 'HEALTHY')
+
+ def test_list_local_adapters(self):
+ self.assertGreaterEqual(
+ len(self.get('/api/v1/local/adapters')['items']), 1)
+
+ def test_list_local_logical_devices(self):
+ self.assertGreaterEqual(
+ len(self.get('/api/v1/local/logical_devices')['items']), 1)
+
+ def test_get_local_logical_device(self):
+ res = self.get('/api/v1/local/logical_devices/simulated1')
+ self.assertEqual(res['datapath_id'], '1') # TODO this should be a long int
+
+ def test_list_local_logical_device_ports(self):
+ res = self.get('/api/v1/local/logical_devices/simulated1/ports')
+ self.assertGreaterEqual(len(res['items']), 3)
+
+ def test_list_and_update_local_logical_device_flows(self):
+
+ # retrieve flow list
+ res = self.get('/api/v1/local/logical_devices/simulated1/flows')
+ len_before = len(res['items'])
+
+ # add some flows
+ req = FlowTableUpdate(
+ id='simulated1',
+ flow_mod=ofp_flow_mod(
+ command=OFPFC_ADD,
+ instructions=[
+ ofp_instruction(
+ type=OFPIT_APPLY_ACTIONS,
+ actions=ofp_instruction_actions(
+ actions=[
+ ofp_action(
+ type=OFPAT_OUTPUT,
+ output=ofp_action_output(
+ port=1
+ )
+ )
+ ]
+ )
+ )
+ ]
+ )
+ )
+
+ res = self.post('/api/v1/local/logical_devices/simulated1/flows',
+ MessageToDict(req, preserving_proto_field_name=True))
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/local/logical_devices/simulated1/flows')
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def test_list_and_update_local_logical_device_flow_groups(self):
+
+ # retrieve flow list
+ res = self.get('/api/v1/local/logical_devices/simulated1/flow_groups')
+ len_before = len(res['items'])
+
+ # add some flows
+ req = FlowGroupTableUpdate(
+ id='simulated1',
+ group_mod=ofp_group_mod(
+ command=OFPGC_ADD,
+ type=OFPGT_ALL,
+ group_id=1,
+ buckets=[
+ ofp_bucket(
+ actions=[
+ ofp_action(
+ type=OFPAT_OUTPUT,
+ output=ofp_action_output(
+ port=1
+ )
+ )
+ ]
+ )
+ ]
+ )
+ )
+
+ res = self.post('/api/v1/local/logical_devices/simulated1/flow_groups',
+ MessageToDict(req, preserving_proto_field_name=True))
+ # TODO check some stuff on res
+
+ res = self.get('/api/v1/local/logical_devices/simulated1/flow_groups')
+ len_after = len(res['items'])
+ self.assertGreater(len_after, len_before)
+
+ def test_list_local_devices(self):
+ res = self.get('/api/v1/local/devices')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_get_local_device(self):
+ res = self.get('/api/v1/local/devices/simulated_olt_1')
+ # TODO test result
+
+ def test_list_local_device_ports(self):
+ res = self.get('/api/v1/local/devices/simulated_olt_1/ports')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_list_local_device_flows(self):
+ res = self.get('/api/v1/local/devices/simulated_olt_1/flows')
+ self.assertGreaterEqual(len(res['items']), 0)
+
+ def test_list_local_device_flow_groups(self):
+ res = self.get('/api/v1/local/devices/simulated_olt_1/flow_groups')
+ self.assertGreaterEqual(len(res['items']), 0)
+
+ def test_list_local_device_types(self):
+ res = self.get('/api/v1/local/device_types')
+ self.assertGreaterEqual(len(res['items']), 2)
+
+ def test_get_local_device_type(self):
+ res = self.get('/api/v1/local/device_types/simulated_olt')
+ # TODO test the result
+
+ def test_list_local_device_groups(self):
+ res = self.get('/api/v1/local/device_groups')
+ self.assertGreaterEqual(len(res['items']), 1)
+
+ def test_get_local_device_group(self):
+ res = self.get('/api/v1/local/device_groups/1')
+ # TODO test the result
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/utests/voltha/core/config/test_config.py b/tests/utests/voltha/core/config/test_config.py
index d76e511..7ec3dbc 100644
--- a/tests/utests/voltha/core/config/test_config.py
+++ b/tests/utests/voltha/core/config/test_config.py
@@ -9,13 +9,13 @@
from mock import Mock
-from voltha.core.config.config_proxy import CallbackType
+from voltha.core.config.config_proxy import CallbackType, OperationContext
from voltha.core.config.config_rev import _rev_cache
from voltha.core.config.config_root import ConfigRoot, MergeConflictException
from voltha.core.config.config_txn import ClosedTransactionError
from voltha.protos import third_party
from voltha.protos.openflow_13_pb2 import ofp_port
-from voltha.protos.voltha_pb2 import Voltha, Adapter, HealthStatus, \
+from voltha.protos.voltha_pb2 import VolthaInstance, Adapter, HealthStatus, \
AdapterConfig, LogicalDevice
@@ -38,9 +38,9 @@
class TestConfigNodeShallow(TestCase):
def setUp(self):
- self.empty = Voltha()
- self.other = Voltha(instance_id='other')
- self.node = ConfigRoot(Voltha())
+ self.empty = VolthaInstance()
+ self.other = VolthaInstance(instance_id='other')
+ self.node = ConfigRoot(VolthaInstance())
def test_init(self):
pass
@@ -48,7 +48,7 @@
def test_immutability(self):
self.assertEqual(self.node.latest.data, self.empty)
self.empty.instance_id = 'overwritten id'
- self.assertEqual(self.node.latest.data, Voltha())
+ self.assertEqual(self.node.latest.data, VolthaInstance())
def test_retrieve_latest(self):
self.assertEqual(self.node.latest.data, self.empty)
@@ -61,7 +61,7 @@
hash1 = self.node.latest.hash
self.assertEqual(len(self.node.revisions), 2)
self.assertNotEqual(hash0, hash1)
- self.assertEqual(self.node.latest.data, Voltha(instance_id='other'))
+ self.assertEqual(self.node.latest.data, VolthaInstance(instance_id='other'))
def test_update_with_bad_data(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
@@ -69,7 +69,7 @@
def test_many_simple_updates(self):
n = 1000
for i in xrange(n):
- self.node.update('/', Voltha(instance_id='id%d' % i))
+ self.node.update('/', VolthaInstance(instance_id='id%d' % i))
self.node.update('/', self.other)
self.assertEqual(len(self.node.revisions), 1002)
self.assertEqual(self.node.latest.data, self.other)
@@ -77,11 +77,11 @@
def test_retrieve_by_rev_hash(self):
n = 1000
for i in xrange(n):
- self.node.update('/', Voltha(instance_id='id%d' % i))
+ self.node.update('/', VolthaInstance(instance_id='id%d' % i))
self.node.update('/', self.other)
hashes = self.node.revisions
self.assertEqual(self.node[hashes[0]].data, self.empty)
- self.assertEqual(self.node[hashes[10]].data, Voltha(instance_id='id9'))
+ self.assertEqual(self.node[hashes[10]].data, VolthaInstance(instance_id='id9'))
self.assertEqual(self.node[hashes[-1]].data, self.other)
def test_diffs(self):
@@ -105,7 +105,7 @@
# add a bunch of changes
for a in xrange(10):
- self.node.update('/', Voltha(instance_id=str(a)))
+ self.node.update('/', VolthaInstance(instance_id=str(a)))
hash2 = self.node.latest.hash
# apply tag to latest
@@ -164,7 +164,7 @@
gc.collect()
_rev_cache.clear()
self.health = HealthStatus(state=HealthStatus.DYING)
- self.base_shallow = Voltha(instance_id='1')
+ self.base_shallow = VolthaInstance(instance_id='1')
self.base_deep = copy(self.base_shallow)
self.base_deep.health.state = HealthStatus.DYING # = self.health
for i in xrange(5):
@@ -187,7 +187,7 @@
pass
def test_reject_duplicate_keys(self):
- data = Voltha(
+ data = VolthaInstance(
instance_id='42', adapters=[Adapter(id='same') for _ in xrange(5)])
self.assertRaises(ValueError, ConfigRoot, data)
@@ -201,7 +201,7 @@
def test_top_level_update(self):
# test that top-level update retains children
- self.node.update('/', Voltha(version='1.2.3'))
+ self.node.update('/', VolthaInstance(version='1.2.3'))
hash_new = self.node.latest.hash
self.assertNotEqual(self.hash_orig, hash_new)
self.assertEqual(self.node.get(
@@ -272,7 +272,7 @@
def test_update_handle_invalid_type(self):
self.assertRaises(ValueError, self.node.update, '/', Adapter())
self.assertRaises(ValueError, self.node.update, '/health', Adapter())
- self.assertRaises(ValueError, self.node.update, '/adapters/1', Voltha())
+ self.assertRaises(ValueError, self.node.update, '/adapters/1', VolthaInstance())
def test_update_handle_key_change_attempt(self):
self.assertRaises(
@@ -321,7 +321,7 @@
def test_pruning_after_shallow_change(self):
- self.node.update('/', Voltha(version='10.1'))
+ self.node.update('/', VolthaInstance(version='10.1'))
# sanity check
self.assertEqual(len(self.node.revisions), 2)
@@ -367,7 +367,7 @@
seed(0) # makes things consistently random
- # this should be the number of nodes in the Voltha tree
+ # this should be the number of nodes in the VolthaInstance tree
self.assertLess(rev_count(), 20)
print; print_metrics()
@@ -436,7 +436,7 @@
def test_strict_read_only(self):
# it shall not be possible to change a read-only field
self.assertRaises(ValueError, self.node.update,
- '/', Voltha(version='foo'), strict=True)
+ '/', VolthaInstance(version='foo'), strict=True)
self.assertRaises(ValueError, self.node.update,
'/adapters/1', Adapter(version='foo'), strict=True)
@@ -590,7 +590,7 @@
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
- adapter_node = _latest_root_rev._children['adapters']['2'].node
+ adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
@@ -666,7 +666,7 @@
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
- adapter_node = _latest_root_rev._children['adapters']['2'].node
+ adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
@@ -692,7 +692,7 @@
# look under the hood to verify that branches are added
# recursively
_latest_root_rev = self.node._branches[None].latest
- adapter_node = _latest_root_rev._children['adapters']['2'].node
+ adapter_node = _latest_root_rev._children['adapters'][2].node
self.assertEqual(len(self.node._branches.keys()), 1)
self.assertEqual(len(adapter_node._branches.keys()), 1)
@@ -715,7 +715,7 @@
proxy = self.node.get_proxy('/')
_latest_root_rev = self.node._branches[None].latest
- adapter_node = _latest_root_rev._children['adapters']['2'].node
+ adapter_node = _latest_root_rev._children['adapters'][2].node
tx = proxy.open_transaction()
# publicly visible value before change
@@ -888,7 +888,7 @@
tx1.commit()
tx2.commit()
self.assertRaises(MergeConflictException, tx3.commit)
- tx4.commit() # is fine since it add the same data
+ tx4.commit() # is fine since it added the same data
self.assertEqual(self.log_levels().keys(), [
'0', '1', '2', '3', '4', 'new1', 'new2'
])
@@ -1159,6 +1159,11 @@
tx.commit()
post_update.assert_called_once_with(v)
post_add.assert_called_once_with(ld)
+ # OperationContext(
+ # data=ld,
+ # field_name='logical_devices',
+ # child_key='1'
+ # ))
post_remove.assert_called_once_with(ad)
diff --git a/tests/utests/voltha/core/config/test_persistence.py b/tests/utests/voltha/core/config/test_persistence.py
index ce2dec5..7926c6c 100644
--- a/tests/utests/voltha/core/config/test_persistence.py
+++ b/tests/utests/voltha/core/config/test_persistence.py
@@ -5,19 +5,19 @@
from voltha.core.config.config_root import ConfigRoot
from voltha.protos.openflow_13_pb2 import ofp_desc
-from voltha.protos.voltha_pb2 import Voltha, HealthStatus, Adapter, \
+from voltha.protos.voltha_pb2 import VolthaInstance, HealthStatus, Adapter, \
AdapterConfig, LogicalDevice
-n_adapters = 100
-n_logical_nodes = 100
+n_adapters = 1000
+n_logical_nodes = 1000
class TestPersistence(TestCase):
def pump_some_data(self, node):
seed(0)
- node.update('/', Voltha(
+ node.update('/', VolthaInstance(
instance_id='1',
version='42',
log_level=1
@@ -53,7 +53,7 @@
kv_store = dict()
# create node and pump data
- node = ConfigRoot(Voltha(), kv_store=kv_store)
+ node = ConfigRoot(VolthaInstance(), kv_store=kv_store)
pt('init')
self.pump_some_data(node)
pt('pump')
@@ -79,10 +79,11 @@
# self.assertEqual(size2, 1 + 2 * (1 + 1 + n_adapters + n_logical_nodes))
# recreate tree from persistence
- node = ConfigRoot.load(Voltha, kv_store)
+ node = ConfigRoot.load(VolthaInstance, kv_store)
pt('load from kv store')
self.assertEqual(node.get('/', deep=1), all_latest_data)
pt('deep get')
+
if __name__ == '__main__':
main()
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index e28b59c..60eec7d 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -17,7 +17,19 @@
"""
Interface definition for Voltha Adapters
"""
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import Interface
+from zope.interface import implementer
+
+from voltha.protos import third_party
+from voltha.protos.device_pb2 import Device, Port
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice
+from voltha.registry import registry
+
+
+log = structlog.get_logger()
class IAdapterInterface(Interface):
@@ -97,3 +109,147 @@
# TODO work in progress
# ...
+
+
+class IAdapterProxy(Interface):
+ """
+ This object is passed in to the __init__ function of each adapter,
+ and can be used by the adapter implementation to initiate async calls
+ toward Voltha's CORE via the APIs defined here.
+ """
+
+ def create_device(device):
+ # TODO add doc
+ """"""
+
+ def add_port(device_id, port):
+ # TODO add doc
+ """"""
+
+ def create_logical_device(logical_device):
+ # TODO add doc
+ """"""
+
+ def add_logical_port(logical_device_id, port):
+ # TODO add doc
+ """"""
+
+ # TODO work in progress
+ pass
+
+
+@implementer(IAdapterProxy)
+class AdapterProxy(object):
+ """
+ Gate-keeper between CORE and device adapters.
+
+ On one side it interacts with Core's internal model and update/dispatch
+ mechanisms.
+
+ On the other side, it interacts with the adapters standard interface as
+ defined in
+ """
+
+ def __init__(self, adapter_name, adapter_cls):
+ self.adapter_name = adapter_name
+ self.adapter_cls = adapter_cls
+ self.core = registry('core')
+ self.adapter = None
+ self.adapter_node_proxy = None
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+ config = self._get_adapter_config() # this may be None
+ adapter = self.adapter_cls(self, config)
+ yield adapter.start()
+ self.adapter = adapter
+ self.adapter_node_proxy = self._update_adapter_node()
+ self._update_device_types()
+ log.info('started')
+ returnValue(self)
+
+ @inlineCallbacks
+ def stop(self):
+ log.debug('stopping')
+ if self.adapter is not None:
+ yield self.adapter.stop()
+ self.adapter = None
+ log.info('stopped')
+
+ def _get_adapter_config(self):
+ """
+ Opportunistically load persisted adapter configuration.
+ Return None if no configuration exists yet.
+ """
+ proxy = self.core.get_proxy('/')
+ try:
+ config = proxy.get('/adapters/' + self.adapter_name)
+ return config
+ except KeyError:
+ return None
+
+ def _update_adapter_node(self):
+ """
+ Creates or updates the adapter node object based on self
+ description from the adapter.
+ """
+
+ adapter_desc = self.adapter.adapter_descriptor()
+ assert adapter_desc.id == self.adapter_name
+ path = self._make_up_to_date(
+ '/adapters', self.adapter_name, adapter_desc)
+ return self.core.get_proxy(path)
+
+ def _update_device_types(self):
+ """
+ Make sure device types are registered in Core
+ """
+ device_types = self.adapter.device_types()
+ for device_type in device_types.items:
+ key = device_type.id
+ self._make_up_to_date('/device_types', key, device_type)
+
+ def _make_up_to_date(self, container_path, key, data):
+ full_path = container_path + '/' + str(key)
+ root_proxy = self.core.get_proxy('/')
+ try:
+ root_proxy.get(full_path)
+ root_proxy.update(full_path, data)
+ except KeyError:
+ root_proxy.add(container_path, data)
+ return full_path
+
+ # ~~~~~~~~~~~~~~~~~ Adapter-Facing Service ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def create_device(self, device):
+ assert isinstance(device, Device)
+ self._make_up_to_date('/devices', device.id, device)
+
+ # TODO for now, just map everything into a single device group
+ # which we create if it does not yet exist
+
+ dg = DeviceGroup(id='1')
+ self._make_up_to_date('/device_groups', dg.id, dg)
+
+ # add device to device group
+ # TODO how to do that?
+
+ def create_logical_device(self, logical_device):
+ assert isinstance(logical_device, LogicalDevice)
+ self._make_up_to_date('/logical_devices',
+ logical_device.id, logical_device)
+
+ # TODO link logical device to root device and back...
+
+ def add_port(self, device_id, port):
+ assert isinstance(port, Port)
+ self._make_up_to_date('/devices/{}/ports'.format(device_id),
+ port.id, port)
+
+ def add_logical_port(self, logical_device_id, port):
+ assert isinstance(port, ofp_port)
+ self._make_up_to_date(
+ '/logical_devices/{}/ports'.format(logical_device_id),
+ port.port_no, port)
+
diff --git a/voltha/adapters/loader.py b/voltha/adapters/loader.py
index c90f17f..889b94f 100644
--- a/voltha/adapters/loader.py
+++ b/voltha/adapters/loader.py
@@ -29,10 +29,10 @@
from zope.interface.verify import verifyClass
from common.utils.grpc_utils import twisted_async
-from voltha.adapters.interface import IAdapterInterface
+from voltha.adapters.interface import IAdapterInterface, AdapterProxy
from voltha.protos import third_party
-from voltha.protos.adapter_pb2 import add_AdapterServiceServicer_to_server, \
- AdapterServiceServicer, Adapters
+# from voltha.protos.adapter_pb2 import add_AdapterServiceServicer_to_server, \
+# AdapterServiceServicer, Adapters
from voltha.registry import IComponent, registry
log = structlog.get_logger()
@@ -42,33 +42,27 @@
@implementer(IComponent)
-class AdapterLoader(AdapterServiceServicer):
+class AdapterLoader(object): # AdapterServiceServicer):
def __init__(self, config):
self.config = config
- self.adapters = {} # adapter-name -> adapter instance
- registry('grpc_server').register(
- add_AdapterServiceServicer_to_server, self)
- self.root_proxy = registry('core').get_proxy('/')
+ self.adapter_proxies = {} # adapter-name -> adapter instance
@inlineCallbacks
def start(self):
log.debug('starting')
for adapter_name, adapter_class in self._find_adapters():
- config = self.load_adapter_config(adapter_name)
- adapter = adapter_class(config)
- yield adapter.start()
- self.adapters[adapter_name] = adapter
- self.expose_adapter(adapter_name)
+ proxy = AdapterProxy(adapter_name, adapter_class)
+ yield proxy.start()
log.info('started')
returnValue(self)
@inlineCallbacks
def stop(self):
log.debug('stopping')
- for adapter in self.adapters.values():
- yield adapter.stop()
- self.adapters = {}
+ for proxy in self.adapter_proxies.values():
+ yield proxy.stop()
+ self.adapter_proxies = {}
log.info('stopped')
def _find_adapters(self):
@@ -91,24 +85,3 @@
IAdapterInterface.implementedBy(cls):
verifyClass(IAdapterInterface, cls)
yield adapter_name, cls
-
- def load_adapter_config(self, adapter_name):
- """
- Opportunistically load persisted adapter configuration
- :param adapter_name: name of adapter
- :return: AdapterConfig
- """
- # TODO
-
- def expose_adapter(self, name):
- adapter_descriptor = self.adapters[name].adapter_descriptor()
- self.root_proxy.add('/adapters', adapter_descriptor)
-
- # gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
- # the gRPC threadpool threads.
-
- @twisted_async
- def ListAdapters(self, request, context):
- log.info('list-adapters', request=request)
- items = self.root_proxy.get('/adapters')
- return Adapters(items=items)
diff --git a/voltha/adapters/simulated/simulated.py b/voltha/adapters/simulated/simulated.py
index 805d250..ad28898 100644
--- a/voltha/adapters/simulated/simulated.py
+++ b/voltha/adapters/simulated/simulated.py
@@ -17,13 +17,20 @@
"""
Mock device adapter for testing.
"""
+from uuid import uuid4
+
import structlog
from zope.interface import implementer
from voltha.adapters.interface import IAdapterInterface
-from voltha.protos.adapter_pb2 import Adapter, DeviceTypes, AdapterConfig
+from voltha.core.device_model import mac_str_to_tuple
+from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Device, Port
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.common_pb2 import INFO
+from voltha.protos.logical_device_pb2 import LogicalDevice
+from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_1GB_FD, \
+ OFPPF_FIBER, OFPPS_LIVE
log = structlog.get_logger()
@@ -31,10 +38,13 @@
@implementer(IAdapterInterface)
class SimulatedAdapter(object):
- def __init__(self, config):
+ name = 'simulated'
+
+ def __init__(self, proxy, config):
+ self.proxy = proxy
self.config = config
self.descriptor = Adapter(
- id='simulated',
+ id=self.name,
vendor='Voltha project',
version='0.1',
config=AdapterConfig(log_level=INFO)
@@ -42,7 +52,8 @@
def start(self):
log.debug('starting')
- # pass
+ # TODO tmp: populate some devices and logical devices
+ self._tmp_populate_stuff()
log.info('started')
def stop(self):
@@ -53,9 +64,10 @@
return self.descriptor
def device_types(self):
- return DeviceTypes(
- items=[] # TODO
- )
+ return DeviceTypes(items=[
+ DeviceType(id='simulated_olt', adapter=self.name),
+ DeviceType(id='simulated_onu', adapter=self.name)
+ ])
def health(self):
return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
@@ -72,3 +84,93 @@
def deactivate_device(self, device):
raise NotImplementedError()
+ def _tmp_populate_stuff(self):
+ """
+ pretend that we discovered some devices and create:
+ - devices
+ - device ports for each
+ - logical device
+ - logical device ports
+ """
+
+ olt = Device(
+ id='simulated_olt_1',
+ type='simulated_olt',
+ root=True,
+ vendor='simulated',
+ model='n/a',
+ hardware_version='n/a',
+ firmware_version='n/a',
+ software_version='1.0',
+ serial_number=uuid4().hex,
+ adapter=self.name
+ )
+ self.proxy.create_device(olt)
+ for id in ['eth', 'pon']:
+ port = Port(id=id)
+ self.proxy.add_port(olt.id, port)
+
+ onu1 = Device(
+ id='simulated_onu_1',
+ type='simulated_onu',
+ root=False,
+ parent_id=olt.id,
+ vendor='simulated',
+ model='n/a',
+ hardware_version='n/a',
+ firmware_version='n/a',
+ software_version='1.0',
+ serial_number=uuid4().hex,
+ adapter=self.name
+ )
+ self.proxy.create_device(onu1)
+ for id in ['eth', 'pon']:
+ port = Port(id=id)
+ self.proxy.add_port(onu1.id, port)
+
+ onu2 = Device(
+ id='simulated_onu_2',
+ type='simulated_onu',
+ root=False,
+ parent_id=olt.id,
+ vendor='simulated',
+ model='n/a',
+ hardware_version='n/a',
+ firmware_version='n/a',
+ software_version='1.0',
+ serial_number=uuid4().hex,
+ adapter=self.name
+ )
+ self.proxy.create_device(onu2)
+ for id in ['eth', 'pon']:
+ port = Port(id=id)
+ self.proxy.add_port(onu2.id, port)
+
+ ld = LogicalDevice(
+ id='simulated1',
+ datapath_id=1,
+ desc=ofp_desc(
+ mfr_desc='cord porject',
+ hw_desc='simualted pon',
+ sw_desc='simualted pon',
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ )
+ )
+ self.proxy.create_logical_device(ld)
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ for port_no, name in [(1, 'onu1'), (2, 'onu2'), (129, 'olt1')]:
+ port = ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name=name,
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ )
+ self.proxy.add_logical_port(ld.id, port)
+
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index b268a20..5ab7ae9 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -73,14 +73,14 @@
self.tracking_loop_delay = config.get(
'tracking_loop_delay', 1)
self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
- self.leader_prefix = '/'.join([self.prefix, self.config.get(
- self.config['leader_key'], 'leader')])
- self.membership_prefix = '/'.join([self.prefix, self.config.get(
- self.config['membership_key'], 'members')])
- self.assignment_prefix = '/'.join([self.prefix, self.config.get(
- self.config['assignment_key'], 'assignments')])
- self.workload_prefix = '/'.join([self.prefix, self.config.get(
- self.config['workload_key'], 'work')])
+ self.leader_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['leader_key'], 'leader')))
+ self.membership_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['membership_key'], 'members'), ''))
+ self.assignment_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['assignment_key'], 'assignments'), ''))
+ self.workload_prefix = '/'.join((self.prefix, self.config.get(
+ self.config['workload_key'], 'work'), ''))
self.retries = 0
self.instance_id = instance_id
@@ -147,6 +147,15 @@
def kv_delete(self, *args, **kw):
return self._retry(self.consul.kv.delete, *args, **kw)
+ # Methods exposing key membership information
+
+ @inlineCallbacks
+ def get_members(self):
+ """Return list of all members"""
+ _, members = yield self.kv_get(self.membership_prefix, recurse=True)
+ returnValue([member['Key'][len(self.membership_prefix):]
+ for member in members])
+
# Private (internal) methods:
@inlineCallbacks
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index 6df8b4c..969ba8e 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from collections import OrderedDict
from copy import copy
from jsonpatch import JsonPatch
@@ -25,14 +24,11 @@
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import merge_3way
from voltha.protos import third_party
from voltha.protos import meta_pb2
-class MergeConflictException(Exception):
- pass
-
-
def message_to_dict(m):
return MessageToDict(m, True, True, False)
@@ -50,6 +46,13 @@
', '.join('"%s"' % f for f in violated_fields))
+def find_rev_by_key(revs, keyname, value):
+ for i, rev in enumerate(revs):
+ if getattr(rev._config._data, keyname) == value:
+ return i, rev
+ raise KeyError('key {}={} not found'.format(keyname, value))
+
+
class ConfigNode(object):
"""
Represents a configuration node which can hold a number of revisions
@@ -80,7 +83,9 @@
self._type = initial_data
elif is_proto_message(initial_data):
self._type = initial_data.__class__
- self._initialize(copy(initial_data), txid)
+ copied_data = initial_data.__class__()
+ copied_data.CopyFrom(initial_data)
+ self._initialize(copied_data, txid)
else:
raise NotImplementedError()
@@ -98,13 +103,15 @@
field_value = getattr(data, field_name)
if field.is_container:
if field.key:
- children[field_name] = od = OrderedDict()
+ keys_seen = set()
+ children[field_name] = lst = []
for v in field_value:
rev = self._mknode(v, txid=txid).latest
key = getattr(v, field.key)
- if key in od:
+ if key in keys_seen:
raise ValueError('Duplicate key "{}"'.format(key))
- od[key] = rev
+ lst.append(rev)
+ keys_seen.add(key)
else:
children[field_name] = [
self._mknode(v, txid=txid).latest for v in field_value]
@@ -166,17 +173,18 @@
field = children_fields(self._type)[name]
if field.is_container:
if field.key:
- children_od = rev._children[name]
+ children = rev._children[name]
if path:
# need to escalate further
key, _, path = path.partition('/')
- child_rev = children_od[field.key_from_str(key)]
+ key = field.key_from_str(key)
+ _, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get(child_rev, path, depth)
else:
# we are the node of interest
response = []
- for child_rev in children_od.itervalues():
+ for child_rev in children:
child_node = child_rev.node
value = child_node._do_get(child_rev, depth)
response.append(value)
@@ -226,8 +234,8 @@
if field.key:
key, _, path = path.partition('/')
key = field.key_from_str(key)
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.update(
path, data, strict, txid, mk_branch)
@@ -236,8 +244,8 @@
return branch._latest
if getattr(new_child_rev.data, field.key) != key:
raise ValueError('Cannot change key field')
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
@@ -307,13 +315,17 @@
if self._proxy is not None:
self._proxy.invoke_callbacks(
CallbackType.PRE_ADD, data)
- children_od = copy(rev._children[name])
+ children = copy(rev._children[name])
key = getattr(data, field.key)
- if key in children_od:
+ try:
+ find_rev_by_key(children, field.key, key)
+ except KeyError:
+ pass
+ else:
raise ValueError('Duplicate key "{}"'.format(key))
child_rev = self._mknode(data).latest
- children_od[key] = child_rev
- rev = rev.update_children(name, children_od, branch)
+ children.append(child_rev)
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev,
((CallbackType.POST_ADD, data),))
return rev
@@ -325,12 +337,12 @@
# need to escalate
key, _, path = path.partition('/')
key = field.key_from_str(key)
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.add(path, data, txid, mk_branch)
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
@@ -363,26 +375,27 @@
key = field.key_from_str(key)
if path:
# need to escalate
- children_od = copy(rev._children[name])
- child_rev = children_od[key]
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
new_child_rev = child_node.remove(path, txid, mk_branch)
- children_od[key] = new_child_rev
- rev = rev.update_children(name, children_od, branch)
+ children[idx] = new_child_rev
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev)
return rev
else:
# need to remove from this very node
- children_od = copy(rev._children[name])
+ children = copy(rev._children[name])
+ idx, child_rev = find_rev_by_key(children, field.key, key)
if self._proxy is not None:
- data = children_od[field.key_from_str(key)].data
+ data = child_rev.data
self._proxy.invoke_callbacks(
CallbackType.PRE_REMOVE, data)
post_anno = ((CallbackType.POST_REMOVE, data),)
else:
post_anno = ()
- del children_od[field.key_from_str(key)]
- rev = rev.update_children(name, children_od, branch)
+ del children[idx]
+ rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev, post_anno)
return rev
else:
@@ -401,14 +414,6 @@
def _del_txbranch(self, txid):
del self._branches[txid]
- # def can_txbranch_be_merged(self, txid):
- # try:
- # self._merge_txbranch(txid, dry_run=True)
- # except MergeConflictException:
- # return False
- # else:
- # return True
-
def _merge_txbranch(self, txid, dry_run=False):
"""
Make latest in branch to be latest in the common branch, but only
@@ -417,73 +422,12 @@
to be verified recursively.
"""
- """
- A transaction branch can be merged only if none of the following
- happened with the master branch since the fork rev:
- - the local data was changed both in the incoming node and in the
- default branch since the branch point, and they differ now
- - both branches changed the same children nodes in any way (local or
- deep)
- """
-
- announcements = []
-
- def _get_od_changes(lst1, lst2):
- assert isinstance(lst2, dict)
- added_keys = [k for k in lst2.iterkeys() if k not in lst1]
- removed_keys = [k for k in lst1.iterkeys() if k not in lst2]
- changed_keys = [k for k in lst1.iterkeys()
- if k in lst2 and lst1[k].hash != lst2[k].hash]
- return added_keys, removed_keys, changed_keys
-
- def _get_changes(lst1, lst2):
- if isinstance(lst1, dict):
- return _get_od_changes(lst1, lst2)
- assert isinstance(lst1, list)
- assert isinstance(lst2, list)
- set1 = set(lst1)
- set2 = set(lst2)
- added = set2.difference(set1)
- removed = set1.difference(set2)
- changed = set() # no such thing in plain (unkeyed) lists
- return added, removed, changed
-
- def _escalate(child_rev):
+ def merge_child(child_rev):
child_branch = child_rev._branch
if child_branch._txid == txid:
child_rev = child_branch._node._merge_txbranch(txid, dry_run)
return child_rev
- def _escalate_list(src_list):
- if isinstance(src_list, list):
- lst = []
- for child_rev in src_list:
- lst.append(_escalate(child_rev))
- return lst
- else: # OrderedDict
- od = OrderedDict()
- for key, child_rev in src_list.iteritems():
- od[key] = _escalate(child_rev)
- return od
-
- def _add(dst, rev_or_key, src):
- if isinstance(dst, list):
- dst.append(_escalate(rev_or_key))
- announcements.append((CallbackType.POST_ADD, rev_or_key.data))
- else: # OrderedDict key, data is in lst
- rev = src[rev_or_key]
- dst[rev_or_key] = _escalate(rev)
- announcements.append((CallbackType.POST_ADD, rev.data))
-
- def _remove(dst, rev_or_key):
- if isinstance(dst, list):
- dst.remove(rev_or_key)
- announcements.append((CallbackType.POST_REMOVE, rev_or_key))
- else:
- rev = dst[rev_or_key]
- del dst[rev_or_key]
- announcements.append((CallbackType.POST_REMOVE, rev.data))
-
src_branch = self._branches[txid]
dst_branch = self._branches[None]
@@ -491,66 +435,14 @@
src_rev = src_branch.latest # head rev of source branch
dst_rev = dst_branch.latest # head rev of target branch
- # deal with config data first
- if dst_rev._config is fork_rev._config:
- # no change in master, accept src if different
- config_changed = dst_rev._config != src_rev._config
- else:
- if dst_rev._config.hash != src_rev._config.hash:
- raise MergeConflictException('Config collision')
- config_changed = True
-
- new_children = copy(dst_rev._children)
- for field_name, field in children_fields(self._type).iteritems():
- fork_list = fork_rev._children[field_name]
- src_list = src_rev._children[field_name]
- dst_list = dst_rev._children[field_name]
- if 0: #dst_list == fork_list:
- # no change in master, accept src if different
- if src_list != fork_list:
- new_children[field_name] = _escalate_list(src_list)
- else:
- src_added, src_removed, src_changed = _get_changes(
- fork_list, src_list)
- dst_added, dst_removed, dst_changed = _get_changes(
- fork_list, dst_list)
-
- lst = copy(new_children[field_name])
- for to_add in src_added:
- # we cannot add if it has been added and is different
- if to_add in dst_added:
- # this can happen only to keyed containers
- assert isinstance(src_list, dict)
- if src_list[to_add].hash != dst_list[to_add].hash:
- raise MergeConflictException(
- 'Cannot add because it has been added and '
- 'different'
- )
- _add(lst, to_add, src_list)
- for to_remove in src_removed:
- # we cannot remove if it has changed in dst
- if to_remove in dst_changed:
- raise MergeConflictException(
- 'Cannot remove because it has changed')
- if to_remove not in dst_removed:
- _remove(lst, to_remove)
- for to_change in src_changed:
- # we cannot change if it was removed in dst
- if to_change in dst_removed:
- raise MergeConflictException(
- 'Cannot change because it has been removed')
- # change can only be in keyed containers (OrderedDict)
- lst[to_change] = _escalate(src_list[to_change])
- new_children[field_name] = lst
+ rev, changes = merge_3way(
+ fork_rev, src_rev, dst_rev, merge_child, dry_run)
if not dry_run:
- rev = src_rev if config_changed else dst_rev
- rev = rev.update_all_children(new_children, dst_branch)
- if config_changed:
- announcements.append((CallbackType.POST_UPDATE, rev.data))
- self._make_latest(dst_branch, rev, announcements)
+ self._make_latest(dst_branch, rev, change_announcements=changes)
del self._branches[txid]
- return rev
+
+ return rev
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Diff utility ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -638,8 +530,9 @@
raise ValueError('Cannot proxy a container field')
if field.key:
key, _, path = path.partition('/')
- children_od = rev._children[name]
- child_rev = children_od[key]
+ key = field.key_from_str(key)
+ children = rev._children[name]
+ _, child_rev = find_rev_by_key(children, field.key, key)
child_node = child_rev.node
return child_node._get_proxy(path, root, full_path, exclusive)
diff --git a/voltha/core/config/config_proxy.py b/voltha/core/config/config_proxy.py
index e4c6245..0769a94 100644
--- a/voltha/core/config/config_proxy.py
+++ b/voltha/core/config/config_proxy.py
@@ -21,6 +21,22 @@
log = structlog.get_logger()
+class OperationContext(object):
+ def __init__(self, path=None, data=None, field_name=None, child_key=None):
+ self.path = path
+ self._data = data
+ self.field_name = field_name
+ self.child_key = child_key
+ @property
+ def data(self):
+ return self._data
+ def update(self, data):
+ self._data = data
+ return self
+ def __repr__(self):
+ return 'OperationContext({})'.format(self.__dict__)
+
+
class CallbackType(Enum):
# GET hooks are called after the data is retrieved and can be used to
@@ -43,6 +59,10 @@
PRE_REMOVE = 6
POST_REMOVE = 7
+ # Bulk list change due to transaction commit that changed items in
+ # non-keyed container fields
+ POST_LISTCHANGE = 8
+
class ConfigProxy(object):
"""
@@ -115,16 +135,16 @@
# ~~~~~~~~~~~~~~~~~~~~~ Callback dispatch ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- def invoke_callbacks(self, callback_type, msg, proceed_on_errors=False):
+ def invoke_callbacks(self, callback_type, context, proceed_on_errors=False):
lst = self._callbacks.get(callback_type, [])
for callback, args, kw in lst:
try:
- msg = callback(msg, *args, **kw)
+ context = callback(context, *args, **kw)
except Exception, e:
if proceed_on_errors:
log.exception(
'call-back-error', callback_type=callback_type,
- msg=msg, e=e)
+ context=context, e=e)
else:
raise
- return msg
+ return context
diff --git a/voltha/core/config/config_rev.py b/voltha/core/config/config_rev.py
index 411ede2..dc0eb5e 100644
--- a/voltha/core/config/config_rev.py
+++ b/voltha/core/config/config_rev.py
@@ -263,10 +263,8 @@
m = md5('' if self._config is None else self._config._hash)
if self._children is not None:
for children in self._children.itervalues():
- if isinstance(children, dict):
- m.update(''.join(c._hash for c in children.itervalues()))
- else:
- m.update(''.join(c._hash for c in children))
+ assert isinstance(children, list)
+ m.update(''.join(c._hash for c in children))
return m.hexdigest()[:12]
@property
@@ -291,17 +289,15 @@
branch nodes. If depth is < 0, this results in a fully exhaustive
"complete config".
"""
- data = copy(self._config.data)
+ orig_data = self._config.data
+ data = orig_data.__class__()
+ data.CopyFrom(orig_data)
if depth:
# collect children
cfields = children_fields(self.type).iteritems()
for field_name, field in cfields:
if field.is_container:
- if field.key:
- children = self._children[field_name].itervalues()
- else:
- children = self._children[field_name]
- for rev in children:
+ for rev in self._children[field_name]:
child_data = rev.get(depth=depth - 1)
child_data_holder = getattr(data, field_name).add()
child_data_holder.MergeFrom(child_data)
@@ -322,7 +318,7 @@
def update_children(self, name, children, branch):
"""Return a NEW revision which is updated for the modified children"""
- new_children = copy(self._children)
+ new_children = self._children.copy()
new_children[name] = children
new_rev = copy(self)
new_rev._branch = branch
diff --git a/voltha/core/config/config_rev_persisted.py b/voltha/core/config/config_rev_persisted.py
index f1fad1c..d3983a1 100644
--- a/voltha/core/config/config_rev_persisted.py
+++ b/voltha/core/config/config_rev_persisted.py
@@ -18,7 +18,6 @@
A config rev object that persists itself
"""
from bz2 import compress, decompress
-from collections import OrderedDict
import structlog
from simplejson import dumps, loads
@@ -61,11 +60,8 @@
children_lists = {}
for field_name, children in self._children.iteritems():
- if isinstance(children, list):
- lst = [rev.hash for rev in children]
- else:
- lst = [rev.hash for rev in children.itervalues()]
- children_lists[field_name] = lst
+ hashes = [rev.hash for rev in children]
+ children_lists[field_name] = hashes
data = dict(
children=children_lists,
@@ -92,25 +88,13 @@
node = branch._node
for field_name, meta in children_fields(msg_cls).iteritems():
child_msg_cls = tmp_cls_loader(meta.module, meta.type)
- if meta.key:
- # we need to assemble an ordered dict using the key
- lst = OrderedDict()
- for child_hash in children_list[field_name]:
- child_node = node._mknode(child_msg_cls)
- child_node.load_latest(child_hash)
- child_rev = child_node.latest
- key = getattr(child_rev.data, meta.key)
- lst[key] = child_rev
- else:
- lst = []
- for child_hash in children_list[field_name]:
- child_node = node._mknode(child_msg_cls)
- child_node.load_latest(child_hash)
- child_rev = child_node.latest
- lst.append(child_rev)
-
- assembled_children[field_name] = lst
-
+ children = []
+ for child_hash in children_list[field_name]:
+ child_node = node._mknode(child_msg_cls)
+ child_node.load_latest(child_hash)
+ child_rev = child_node.latest
+ children.append(child_rev)
+ assembled_children[field_name] = children
rev = cls(branch, config_data, assembled_children)
return rev
@@ -139,5 +123,5 @@
def tmp_cls_loader(module_name, cls_name):
# TODO this shall be generalized
from voltha.protos import voltha_pb2, health_pb2, adapter_pb2, \
- logical_layer_pb2, openflow_13_pb2
+ logical_device_pb2, device_pb2, openflow_13_pb2
return getattr(locals()[module_name], cls_name)
diff --git a/voltha/core/config/config_root.py b/voltha/core/config/config_root.py
index c229a42..6b45a90 100644
--- a/voltha/core/config/config_root.py
+++ b/voltha/core/config/config_root.py
@@ -18,9 +18,10 @@
import structlog
from simplejson import dumps, loads
-from voltha.core.config.config_node import ConfigNode, MergeConflictException
+from voltha.core.config.config_node import ConfigNode
from voltha.core.config.config_rev import ConfigRevision
from voltha.core.config.config_rev_persisted import PersistedConfigRevision
+from voltha.core.config.merge_3way import MergeConflictException
log = structlog.get_logger()
diff --git a/voltha/core/config/merge_3way.py b/voltha/core/config/merge_3way.py
new file mode 100644
index 0000000..be87f5c
--- /dev/null
+++ b/voltha/core/config/merge_3way.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+3-way merge function for config rev objects.
+"""
+from collections import OrderedDict
+from copy import copy
+
+from voltha.core.config.config_proxy import CallbackType, OperationContext
+from voltha.core.config.config_rev import children_fields
+
+
+class MergeConflictException(Exception):
+ pass
+
+
+def merge_3way(fork_rev, src_rev, dst_rev, merge_child_func, dry_run=False):
+ """
+ Attempt to merge src_rev into dst_rev but taking into account what have
+ changed in both revs since the last known common point, the fork_rev.
+ In case of conflict, raise a MergeConflictException(). If dry run is True,
+ don't actually perform the merge, but detect potential conflicts.
+
+ This function recurses into all children nodes stored under the rev and
+ performs the merge if the children is also part of a transaction branch.
+
+ :param fork_rev: Point of forking (last known common state between branches
+ :param src_rev: Latest rev from which we merge to dst_rev
+ :param dst_rev: Target (destination) rev
+ :param merge_child_fun: To run a potential merge in all children that
+ may need merge (determined from the local changes)
+ :param dry_run: If True, do not perform the merge, but detect merge
+ conflicts.
+ :return: The new dst_rev (a new rev instance) the list of changes that
+ occurred in this node or any of its children as part of this merge.
+ """
+
+ # to collect change tuples of (<callback-type>, <op-context>)
+ changes = []
+
+ class AnalyzeChanges(object):
+ def __init__(self, lst1, lst2, keyname):
+ self.keymap1 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst1))
+ self.keymap2 = OrderedDict((getattr(rev._config._data, keyname), i)
+ for i, rev in enumerate(lst2))
+ self.added_keys = [
+ k for k in self.keymap2.iterkeys() if k not in self.keymap1]
+ self.removed_keys = [
+ k for k in self.keymap1.iterkeys() if k not in self.keymap2]
+ self.changed_keys = [
+ k for k in self.keymap1.iterkeys()
+ if k in self.keymap2 and
+ lst1[self.keymap1[k]]._hash != lst2[self.keymap2[k]]._hash
+ ]
+
+ # Note: there are a couple of special cases that can be optimized
+ # for larer on. But since premature optimization is a bad idea, we
+ # defer them.
+
+ # deal with config data first
+ if dst_rev._config is fork_rev._config:
+ # no change in master, accept src if different
+ config_changed = dst_rev._config != src_rev._config
+ else:
+ if dst_rev._config.hash != src_rev._config.hash:
+ raise MergeConflictException('Config collision')
+ config_changed = True
+
+ # now to the external children fields
+ new_children = dst_rev._children.copy()
+ _children_fields = children_fields(fork_rev.data.__class__)
+
+ for field_name, field in _children_fields.iteritems():
+
+ fork_list = fork_rev._children[field_name]
+ src_list = src_rev._children[field_name]
+ dst_list = dst_rev._children[field_name]
+
+ if dst_list == src_list:
+ # we do not need to change the dst, however we still need
+ # to complete the branch purging in child nodes so not
+ # to leave dangling branches around
+ [merge_child_func(rev) for rev in src_list]
+ continue
+
+ if not field.key:
+ # If the list is not keyed, we really should not merge. We merely
+ # check for collision, i.e., if both changed (and not same)
+ if dst_list == fork_list:
+ # dst branch did not change since fork
+
+ assert src_list != fork_list, 'We should not be here otherwise'
+
+ # the incoming (src) rev changed, and we have to apply it
+ new_children[field_name] = [
+ merge_child_func(rev) for rev in src_list]
+
+ if field.is_container:
+ changes.append((CallbackType.POST_LISTCHANGE,
+ OperationContext(field_name=field_name)))
+
+ else:
+ if src_list != fork_list:
+ raise MergeConflictException(
+ 'Cannot merge because single child node or un-keyed'
+ 'children list has changed')
+
+ else:
+
+ if dst_list == fork_list:
+ # Destination did not change
+
+ # We need to analyze only the changes on the incoming rev
+ # since fork
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+
+ new_list = copy(src_list) # we start from the source list
+
+ for key in src.added_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ changes.append(
+ (CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.removed_keys:
+ old_rev = fork_list[src.keymap1[key]]
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ for key in src.changed_keys:
+ idx = src.keymap2[key]
+ new_rev = merge_child_func(new_list[idx])
+ new_list[idx] = new_rev
+ # updated child gets its own change event
+
+ new_children[field_name] = new_list
+
+ else:
+
+ # For keyed fields we can really investigate what has been
+ # added, removed, or changed in both branches and do a
+ # fine-grained collision detection and merge
+
+ src = AnalyzeChanges(fork_list, src_list, field.key)
+ dst = AnalyzeChanges(fork_list, dst_list, field.key)
+
+ new_list = copy(dst_list) # this time we start with the dst
+
+ for key in src.added_keys:
+ # we cannot add if it has been added and is different
+ if key in dst.added_keys:
+ # it has been added to both, we need to check if
+ # they are the same
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_dst_rev)
+ else:
+ raise MergeConflictException(
+ 'Cannot add because it has been added and '
+ 'different'
+ )
+ else:
+ # this is a brand new key, need to add it
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list.append(new_rev)
+ changes.append((
+ CallbackType.POST_ADD,
+ new_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=new_rev.data)))
+
+ for key in src.changed_keys:
+ # we cannot change if it was removed in dst
+ if key in dst.removed_keys:
+ raise MergeConflictException(
+ 'Cannot change because it has been removed')
+
+ # if it changed in dst as well, we need to check if they
+ # match (same change
+ elif key in dst.changed_keys:
+ child_dst_rev = dst_list[dst.keymap2[key]]
+ child_src_rev = src_list[src.keymap2[key]]
+ if child_dst_rev.hash == child_src_rev.hash:
+ # they match, so we do not need to change the
+ # dst list, but we still need to purge the src
+ # branch
+ merge_child_func(child_src_rev)
+ elif child_dst_rev._config.hash != child_src_rev._config.hash:
+ raise MergeConflictException(
+ 'Cannot update because it has been changed and '
+ 'different'
+ )
+ else:
+ new_rev = merge_child_func(
+ src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ else:
+ # it only changed in src branch
+ new_rev = merge_child_func(src_list[src.keymap2[key]])
+ new_list[dst.keymap2[key]] = new_rev
+ # no announcement for child update
+
+ for key in reversed(src.removed_keys): # we go from highest
+ # index to lowest
+
+ # we cannot remove if it has changed in dst
+ if key in dst.changed_keys:
+ raise MergeConflictException(
+ 'Cannot remove because it has changed')
+
+ # if it has not been removed yet from dst, then remove it
+ if key not in dst.removed_keys:
+ dst_idx = dst.keymap2[key]
+ old_rev = new_list.pop(dst_idx)
+ changes.append((
+ CallbackType.POST_REMOVE,
+ old_rev.data))
+ # OperationContext(
+ # field_name=field_name,
+ # child_key=key,
+ # data=old_rev.data)))
+
+ new_children[field_name] = new_list
+
+ if not dry_run:
+ rev = src_rev if config_changed else dst_rev
+ rev = rev.update_all_children(new_children, dst_rev._branch)
+ if config_changed:
+ changes.append((CallbackType.POST_UPDATE, rev.data))
+ return rev, changes
+
+ else:
+ return None, None
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 9eb82ff..02e9c02 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -18,31 +18,343 @@
Voltha's CORE components.
"""
import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implementer
from common.utils.grpc_utils import twisted_async
from voltha.core.config.config_root import ConfigRoot
from voltha.protos import third_party
-from voltha.protos.voltha_pb2 import add_VolthaServiceServicer_to_server, \
- Voltha, VolthaServiceServicer
+from voltha.protos.voltha_pb2 import \
+ add_VolthaGlobalServiceServicer_to_server, \
+ add_VolthaLocalServiceServicer_to_server, \
+ VolthaGlobalServiceServicer, VolthaLocalServiceStub, \
+ VolthaLocalServiceServicer, Voltha, VolthaInstance, VolthaInstances, \
+ Adapters, LogicalDevices, Ports, LogicalPorts, Flows, FlowGroups, Devices, \
+ DeviceTypes, DeviceGroups
from voltha.registry import IComponent, registry
+from google.protobuf.empty_pb2 import Empty
log = structlog.get_logger()
@implementer(IComponent)
-class VolthaCore(VolthaServiceServicer):
+class VolthaCore(object):
- def __init__(self, **kw):
-
+ def __init__(self, instance_id, version, log_level):
+ self.instance_id = instance_id
self.stopped = False
- self.config_root = self._mk_config_root(**kw)
- registry('grpc_server').register(
- add_VolthaServiceServicer_to_server, self)
+ self.global_service = VolthaGlobalServiceHandler(
+ dispatcher=self,
+ instance_id=instance_id,
+ version=version,
+ log_level=log_level)
+ self.local_service = VolthaLocalServiceHandler(
+ instance_id=instance_id,
+ version=version,
+ log_level=log_level)
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+ yield self.global_service.start()
+ yield self.local_service.start()
+ log.info('started')
+ returnValue(self)
+
+ def stop(self):
+ log.debug('stopping')
+ self.stopped = True
+ log.info('stopped')
+
+ def get_proxy(self, path, exclusive=False):
+ return self.local_service.get_proxy(path, exclusive)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~ DISPATCH LOGIC ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ # TODO this shall be moved into its own module
+
+ def dispatch(self, instance_id, stub, method_name, input):
+ log.debug('dispatch', instance_id=instance_id, stub=stub,
+ _method_name=method_name, input=input)
+ # special case if instance_id is us
+ if instance_id == self.instance_id:
+ # for now, we assume it is always the local stub
+ assert stub == VolthaLocalServiceStub
+ method = getattr(self.local_service, method_name)
+ log.debug('dispatching', method=method)
+ res = method(input, context=None)
+ log.debug('dispatch-success', res=res)
+ return res
+
+ else:
+ raise NotImplementedError('cannot handle real dispatch yet')
+
+ def instance_id_by_logical_device_id(self, logical_device_id):
+ log.warning('temp-mapping-logical-device-id')
+ # TODO no true dispatchong uyet, we blindly map everything to self
+ return self.instance_id
+
+ def instance_id_by_device_id(self, device_id):
+ log.warning('temp-mapping-logical-device-id')
+ # TODO no true dispatchong uyet, we blindly map everything to self
+ return self.instance_id
+
+
+class VolthaGlobalServiceHandler(VolthaGlobalServiceServicer):
+
+ def __init__(self, dispatcher, instance_id, **init_kw):
+ self.dispatcher = dispatcher
+ self.instance_id = instance_id
+ self.init_kw = init_kw
+ self.root = None
+ self.stopped = False
def start(self):
log.debug('starting')
- pass
+ self.root = ConfigRoot(Voltha(**self.init_kw))
+ registry('grpc_server').register(
+ add_VolthaGlobalServiceServicer_to_server, self)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ self.stopped = True
+ log.info('stopped')
+
+ # gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
+ # the gRPC threadpool threads.
+
+ @twisted_async
+ def GetVoltha(self, request, context):
+ log.info('grpc-request', request=request)
+ return self.root.get('/', depth=1)
+
+ @twisted_async
+ @inlineCallbacks
+ def ListVolthaInstances(self, request, context):
+ log.info('grpc-request', request=request)
+ items = yield registry('coordinator').get_members()
+ returnValue(VolthaInstances(items=items))
+
+ @twisted_async
+ def GetVolthaInstance(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = request.id
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'GetVolthaInstance',
+ Empty())
+
+ @twisted_async
+ def ListLogicalDevices(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'ListLogicalDevices',
+ Empty())
+
+ @twisted_async
+ def GetLogicalDevice(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'GetLogicalDevice',
+ request
+ )
+
+ @twisted_async
+ def ListLogicalDevicePorts(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListLogicalDevicePorts',
+ request
+ )
+
+ @twisted_async
+ def ListLogicalDeviceFlows(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListLogicalDeviceFlows',
+ request
+ )
+
+ @twisted_async
+ def UpdateLogicalDeviceFlowTable(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'UpdateLogicalDeviceFlowTable',
+ request
+ )
+
+ @twisted_async
+ def ListLogicalDeviceFlowGroups(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListLogicalDeviceFlowGroups',
+ request
+ )
+
+ @twisted_async
+ def UpdateLogicalDeviceFlowGroupTable(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_logical_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'UpdateLogicalDeviceFlowGroupTable',
+ request
+ )
+
+ @twisted_async
+ def ListDevices(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'ListDevices',
+ Empty())
+
+ @twisted_async
+ def GetDevice(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'GetDevice',
+ request
+ )
+
+ @twisted_async
+ def ListDevicePorts(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListDevicePorts',
+ request
+ )
+
+ @twisted_async
+ def ListDeviceFlows(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListDeviceFlows',
+ request
+ )
+
+ @twisted_async
+ def ListDeviceFlowGroups(self, request, context):
+ log.info('grpc-request', request=request)
+ instance_id = self.dispatcher.instance_id_by_device_id(
+ request.id
+ )
+ return self.dispatcher.dispatch(
+ instance_id,
+ VolthaLocalServiceStub,
+ 'ListDeviceFlowGroups',
+ request
+ )
+
+ @twisted_async
+ def ListDeviceTypes(self, request, context):
+ log.info('grpc-request', request=request)
+ # we always deflect this to the local instance, as we assume
+ # they all loaded the same adapters, supporting the same device
+ # types
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'ListDeviceTypes',
+ request
+ )
+
+ @twisted_async
+ def GetDeviceType(self, request, context):
+ log.info('grpc-request', request=request)
+ # we always deflect this to the local instance, as we assume
+ # they all loaded the same adapters, supporting the same device
+ # types
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'GetDeviceType',
+ request
+ )
+
+ @twisted_async
+ def ListDeviceGroups(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'ListDeviceGroups',
+ Empty())
+
+ @twisted_async
+ def GetDeviceGroup(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'GetDeviceGroup',
+ request)
+
+
+class VolthaLocalServiceHandler(VolthaLocalServiceServicer):
+
+ def __init__(self, **init_kw):
+ self.init_kw = init_kw
+ self.root = None
+ self.stopped = False
+
+ def start(self):
+ log.debug('starting')
+ self.root = ConfigRoot(VolthaInstance(**self.init_kw))
+ registry('grpc_server').register(
+ add_VolthaLocalServiceServicer_to_server, self)
log.info('started')
return self
@@ -52,16 +364,136 @@
log.info('stopped')
def get_proxy(self, path, exclusive=False):
- return self.config_root.get_proxy(path, exclusive)
-
- def _mk_config_root(self, **kw):
- root_data = Voltha(**kw)
- return ConfigRoot(root_data)
+ return self.root.get_proxy(path, exclusive)
# gRPC service method implementations. BE CAREFUL; THESE ARE CALLED ON
# the gRPC threadpool threads.
@twisted_async
- def GetVoltha(self, request, context):
- log.info('get-voltha', request=request)
- return self.config_root.get('/', deep=1)
+ def GetVolthaInstance(self, request, context):
+ log.info('grpc-request', request=request)
+ return self.root.get('/', depth=1)
+
+ @twisted_async
+ def GetHealth(self, request, context):
+ log.info('grpc-request', request=request)
+ return self.root.get('/health')
+
+ @twisted_async
+ def ListAdapters(self, request, context):
+ log.info('grpc-request', request=request)
+ items = self.root.get('/adapters')
+ return Adapters(items=items)
+
+ @twisted_async
+ def ListLogicalDevices(self, request, context):
+ log.info('grpc-request', request=request)
+ items = self.root.get('/logical_devices')
+ return LogicalDevices(items=items)
+
+ @twisted_async
+ def GetLogicalDevice(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ return self.root.get('/logical_devices/' + request.id)
+
+ @twisted_async
+ def ListLogicalDevicePorts(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ items = self.root.get('/logical_devices/{}/ports'.format(request.id))
+ return LogicalPorts(items=items)
+
+ @twisted_async
+ def ListLogicalDeviceFlows(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ flows = self.root.get('/logical_devices/{}/flows'.format(request.id))
+ return flows
+
+ @twisted_async
+ def UpdateLogicalDeviceFlowTable(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ raise NotImplementedError()
+
+ @twisted_async
+ def ListLogicalDeviceFlowGroups(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ groups = self.root.get(
+ '/logical_devices/{}/flow_groups'.format(request.id))
+ return groups
+
+ @twisted_async
+ def UpdateLogicalDeviceFlowGroupTable(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ raise NotImplementedError()
+
+ @twisted_async
+ def ListDevices(self, request, context):
+ log.info('grpc-request', request=request)
+ items = self.root.get('/devices')
+ return Devices(items=items)
+
+ @twisted_async
+ def GetDevice(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ return self.root.get('/devices/' + request.id)
+
+ @twisted_async
+ def ListDevicePorts(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ items = self.root.get('/devices/{}/ports'.format(request.id))
+ return Ports(items=items)
+
+ @twisted_async
+ def ListDeviceFlows(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ flows = self.root.get('/devices/{}/flows'.format(request.id))
+ return flows
+
+ @twisted_async
+ def ListDeviceFlowGroups(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ groups = self.root.get('/devices/{}/flow_groups'.format(request.id))
+ return groups
+
+ @twisted_async
+ def ListDeviceTypes(self, request, context):
+ log.info('grpc-request', request=request)
+ items = self.root.get('/device_types')
+ return DeviceTypes(items=items)
+
+ @twisted_async
+ def GetDeviceType(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ return self.root.get('/device_types/' + request.id)
+
+ @twisted_async
+ def ListDeviceGroups(self, request, context):
+ log.info('grpc-request', request=request)
+ # TODO is this mapped to tree or taken from coordinator?
+ items = self.root.get('/device_groups')
+ return DeviceGroups(items=items)
+
+ @twisted_async
+ def GetDeviceGroup(self, request, context):
+ log.info('grpc-request', request=request)
+ assert '/' not in request.id
+ # TODO is this mapped to tree or taken from coordinator?
+ return self.root.get('/device_groups/' + request.id)
+
+ @twisted_async
+ def StreamPacketsOut(self, request_iterator, context):
+ raise NotImplementedError()
+
+ @twisted_async
+ def ReceivePacketsIn(self, request, context):
+ raise NotImplementedError()
diff --git a/voltha/northbound/grpc/grpc_server.py b/voltha/northbound/grpc/grpc_server.py
index 27b9a4b..c25429d 100644
--- a/voltha/northbound/grpc/grpc_server.py
+++ b/voltha/northbound/grpc/grpc_server.py
@@ -103,54 +103,7 @@
)
return res
-
-class ExampleService(voltha_pb2.ExampleServiceServicer):
-
- def __init__(self, thread_pool):
- from random import randint
- self.thread_pool = thread_pool
- self.db = dict((id, voltha_pb2.Address(
- id=id,
- street="%d 1st Street" % randint(1, 4000),
- city="Petaluma",
- zip=94954,
- state="CA"
- )) for id in (uuid.uuid5(uuid.NAMESPACE_OID, str(i)).get_hex()
- for i in xrange(1000, 1005)))
-
- def stop(self):
- pass
-
- def GetAddress(self, request, context):
- log.info('get-address', request=request)
- return self.db[request.id]
-
- def ListAddresses(self, request, context):
- log.info('list-addresses', request=request)
- res = voltha_pb2.Addresses(
- addresses=self.db.values()
- )
- return res
-
- def CreateAddress(self, request, context):
- log.info('create-address', request=request)
- id = uuid.uuid4().get_hex()
- request.id = id
- self.db[id] = request
- return request
-
- def DeleteAddress(self, request, context):
- log.info('delete-address', request=request)
- del self.db[request.id]
- return Empty()
-
- def UpdateAddress(self, request, context):
- log.info('update-address', request=request)
- updated = self.db[request.id]
- updated.MergeFrom(request)
- return updated
-
-
+'''
class VolthaLogicalLayer(voltha_pb2.VolthaLogicalLayerServicer):
# TODO still a mock
@@ -235,7 +188,7 @@
"""Must be called on the twisted thread"""
packet_in = voltha_pb2.PacketIn(id=device_id, packet_in=ofp_packet_in)
self.packet_in_queue.put(packet_in)
-
+'''
@implementer(IComponent)
class VolthaGrpcServer(object):
@@ -254,8 +207,6 @@
for activator_func, service_class in (
(schema_pb2.add_SchemaServiceServicer_to_server, SchemaService),
(voltha_pb2.add_HealthServiceServicer_to_server, HealthService),
- (voltha_pb2.add_ExampleServiceServicer_to_server, ExampleService),
- (voltha_pb2.add_VolthaLogicalLayerServicer_to_server, VolthaLogicalLayer)
):
service = service_class(self.thread_pool)
self.register(activator_func, service)
diff --git a/voltha/protos/adapter.proto b/voltha/protos/adapter.proto
index 8791ee9..2e16f96 100644
--- a/voltha/protos/adapter.proto
+++ b/voltha/protos/adapter.proto
@@ -2,8 +2,6 @@
package voltha;
-import "google/api/annotations.proto";
-import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "common.proto";
import "meta.proto";
@@ -41,66 +39,3 @@
message Adapters {
repeated Adapter items = 1;
}
-
-// A Device Type
-message DeviceType {
-
- // Unique name for the device type
- string id = 1;
-
- // Name of the adapter that handles device type
- string adapter = 2;
-
- // TODO
- // ...
-
-}
-
-// A plurality of device types
-message DeviceTypes {
- repeated DeviceType items = 1;
-}
-
-// A Physical Device instance
-message Device {
-
- // Voltha's device identifier
- string id = 1;
-
- // Device type, refers to one of the registered device types
- string type = 2;
-
- // Is this device a root device. Each logical switch has one root
- // device that is associated with the logical flow switch.
- bool root = 3;
-
- // Parent device id, in the device tree
- string parent_id = 4;
-
- // Vendor, version, serial number, etc.
- string vendor = 5;
- string model = 6;
- string hardware_version = 7;
- string firmware_version = 8;
- string software_version = 9;
- string serial_number = 10;
-
- // Addapter that takes care of device
- string adapter = 11;
-
- // TODO additional common attribute here
- // ...
-
- // Device type specific attributes
- google.protobuf.Any custom = 64;
-
-}
-
-service AdapterService {
-
- rpc ListAdapters(google.protobuf.Empty) returns(Adapters) {
- option (google.api.http) = {
- get: "/local/adapters"
- };
- }
-}
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
new file mode 100644
index 0000000..2aeae24
--- /dev/null
+++ b/voltha/protos/device.proto
@@ -0,0 +1,80 @@
+syntax = "proto3";
+
+package voltha;
+
+import "meta.proto";
+import "google/protobuf/any.proto";
+import "openflow_13.proto";
+
+// A Device Type
+message DeviceType {
+
+ // Unique name for the device type
+ string id = 1;
+
+ // Name of the adapter that handles device type
+ string adapter = 2;
+
+ // TODO
+ // ...
+
+}
+
+// A plurality of device types
+message DeviceTypes {
+ repeated DeviceType items = 1;
+}
+
+message Port {
+ string id = 1;
+ // TODO
+}
+
+message Ports {
+ repeated Port items = 1;
+}
+
+// A Physical Device instance
+message Device {
+
+ // Voltha's device identifier
+ string id = 1;
+
+ // Device type, refers to one of the registered device types
+ string type = 2;
+
+ // Is this device a root device. Each logical switch has one root
+ // device that is associated with the logical flow switch.
+ bool root = 3;
+
+ // Parent device id, in the device tree
+ string parent_id = 4;
+
+ // Vendor, version, serial number, etc.
+ string vendor = 5;
+ string model = 6;
+ string hardware_version = 7;
+ string firmware_version = 8;
+ string software_version = 9;
+ string serial_number = 10;
+
+ // Addapter that takes care of device
+ string adapter = 11;
+
+ // TODO additional common attribute here
+ // ...
+
+ // Device type specific attributes
+ google.protobuf.Any custom = 64;
+
+ repeated Port ports = 128 [(child_node) = {key: "id"}];
+ openflow_13.Flows flows = 129 [(child_node) = {}];
+// repeated openflow_13.ofp_flow_stats flows = 129;
+ openflow_13.FlowGroups flow_groups = 130 [(child_node) = {}];
+// repeated openflow_13.ofp_group_entry flow_groups = 130;
+
+}
+
+message Devices {
+ repeated Device items = 1;
+}
diff --git a/voltha/protos/example_service.proto b/voltha/protos/example_service.proto
deleted file mode 100644
index aac8efc..0000000
--- a/voltha/protos/example_service.proto
+++ /dev/null
@@ -1,77 +0,0 @@
-//
-// This is a temporary service to illustrate the generic approach to defining
-// Voltha's top level service APIs usign gRPC
-//
-// To add a new service, the following steps shall be followed
-//
-// Step 1: Define a proto file like this
-// Step 2: Include proto file in voltha.proto as public
-// Step 3: Implement the backend in grpc_server.py
-//
-
-syntax = "proto3";
-
-package voltha;
-
-import "common.proto";
-
-import "google/api/annotations.proto";
-import "google/protobuf/empty.proto";
-
-
-// (placeholder) Address as example message
-message Address {
- string id = 7; // ID of address record
- string street = 1; // Street address
- string street2 = 2; // Apartment, suite, building, etc.
- string street3 = 3; // Apartment, suite, building, etc.
- string city = 4; // City
- string state = 5; // State
- uint32 zip = 6; // Zip code
-}
-
-message Addresses {
- repeated Address addresses = 1;
-}
-
-// (placeholder) This is an example service
-service ExampleService {
-
- // Create an address record
- rpc CreateAddress(Address) returns (Address) {
- option (google.api.http) = {
- post: "/addresses"
- body: "*"
- };
- }
-
- // Return an address by ID
- rpc GetAddress(ID) returns (Address) {
- option (google.api.http) = {
- get: "/addresses/{id}"
- };
- }
-
- // Update an existing address record by ID
- rpc UpdateAddress(Address) returns (Address) {
- option (google.api.http) = {
- patch: "/addresses/{id}"
- body: "*"
- };
- }
-
- // Delete an address record by ID
- rpc DeleteAddress(ID) returns (google.protobuf.Empty) {
- option (google.api.http) = {
- delete: "/addresses/{id}"
- };
- }
-
- // Return a bit more complex objects
- rpc ListAddresses(google.protobuf.Empty) returns (Addresses) {
- option (google.api.http) = {
- get: "/addresses"
- };
- }
-
-}
diff --git a/voltha/protos/health.proto b/voltha/protos/health.proto
index 1dff496..19383a9 100644
--- a/voltha/protos/health.proto
+++ b/voltha/protos/health.proto
@@ -22,11 +22,13 @@
// Health related services
service HealthService {
+
// Return current health status of a Voltha instance
rpc GetHealthStatus(google.protobuf.Empty) returns (HealthStatus) {
option (google.api.http) = {
get: "/health"
};
}
+
}
diff --git a/voltha/protos/logical_device.proto b/voltha/protos/logical_device.proto
new file mode 100644
index 0000000..49d555a
--- /dev/null
+++ b/voltha/protos/logical_device.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+
+package voltha;
+
+import "meta.proto";
+import "google/api/annotations.proto";
+import "openflow_13.proto";
+
+message LogicalDevice {
+ string id = 1;
+ uint64 datapath_id = 2;
+
+ openflow_13.ofp_desc desc = 3;
+
+ repeated openflow_13.ofp_port ports = 4 [(child_node) = {key: "port_no"}];
+ openflow_13.Flows flows = 5 [(child_node) = {}];
+// repeated openflow_13.ofp_flow_stats flows = 129;
+ openflow_13.FlowGroups flow_groups = 6 [(child_node) = {}];
+// repeated openflow_13.ofp_group_entry flow_groups = 130;
+
+}
+
+message LogicalDevices {
+ repeated LogicalDevice items = 1;
+}
+
+message LogicalPorts {
+ repeated openflow_13.ofp_port items = 1;
+}
+
+message LogicalDeviceDetails {
+ string id = 1;
+ uint64 datapath_id = 2;
+ openflow_13.ofp_desc desc = 3;
+ openflow_13.ofp_switch_features switch_features = 4;
+}
diff --git a/voltha/protos/logical_layer.proto b/voltha/protos/logical_layer.proto
deleted file mode 100644
index f86a39e..0000000
--- a/voltha/protos/logical_layer.proto
+++ /dev/null
@@ -1,128 +0,0 @@
-syntax = "proto3";
-
-package voltha;
-
-import "meta.proto";
-import "google/api/annotations.proto";
-import "google/protobuf/empty.proto";
-import "common.proto";
-import "openflow_13.proto";
-
-message LogicalDevice {
- string id = 1;
- uint64 datapath_id = 2;
-
- openflow_13.ofp_desc desc = 3;
-
- repeated openflow_13.ofp_port ports = 4 [(child_node) = {key:"port_no"}];
- repeated openflow_13.ofp_flow_stats flows = 5;
- repeated openflow_13.ofp_group_entry flow_groups = 6;
-}
-
-message LogicalDevices {
- repeated LogicalDevice items = 1;
-}
-
-message LogicalPorts {
- repeated openflow_13.ofp_port items = 1;
-}
-
-message LogicalDeviceDetails {
- string id = 1;
- uint64 datapath_id = 2;
- openflow_13.ofp_desc desc = 3;
- openflow_13.ofp_switch_features switch_features = 4;
-}
-
-message FlowTableUpdate {
- string id = 1; // device id
- openflow_13.ofp_flow_mod flow_mod = 2;
-}
-
-message GroupTableUpdate {
- string id = 1; // device id
- openflow_13.ofp_group_mod group_mod = 2;
-}
-
-message Flows {
- repeated openflow_13.ofp_flow_stats items = 1;
-}
-
-message FlowGroups {
- repeated openflow_13.ofp_group_entry items = 1;
-}
-
-message PacketIn {
- string id = 1; // device id
- openflow_13.ofp_packet_in packet_in = 2;
-}
-
-message PacketOut {
- string id = 1; // device id
- openflow_13.ofp_packet_out packet_out = 2;
-}
-
-service VolthaLogicalLayer {
-
- // List logical devices owned by this Voltha instance
- rpc ListLogicalDevices(google.protobuf.Empty) returns(LogicalDevices) {
- option (google.api.http) = {
- get: "/local/devices"
- };
- }
-
- // Get detailed info on logical device owned by this Voltha instance
- rpc GetLogicalDevice(ID) returns(LogicalDeviceDetails) {
- option (google.api.http) = {
- get: "/local/devices/{id}"
- };
- }
-
- // List ports of a logical device
- rpc ListLogicalDevicePorts(ID) returns(LogicalPorts) {
- option (google.api.http) = {
- get: "/local/devices/{id}/ports"
- };
- }
-
- // Update flow table for device
- rpc UpdateFlowTable(FlowTableUpdate) returns(google.protobuf.Empty) {
- option (google.api.http) = {
- post: "/local/devices/{id}/flows"
- body: "*"
- };
- }
-
- // List all flows of a logical device
- rpc ListDeviceFlows(ID) returns(Flows) {
- option (google.api.http) = {
- get: "/local/devices/{id}/flows"
- };
- }
-
- // Update group tabel for device
- rpc UpdateGroupTable(GroupTableUpdate) returns(google.protobuf.Empty) {
- option (google.api.http) = {
- post: "/local/devices/{id}/groups"
- body: "*"
- };
- }
-
- // List all flow groups of a logical device
- rpc ListDeviceFlowGroups(ID) returns(FlowGroups) {
- option (google.api.http) = {
- get: "/local/devices/{id}/groups"
- };
- }
-
- // Stream control packets to the dataplane
- rpc StreamPacketsOut(stream PacketOut) returns(google.protobuf.Empty) {
- // This does not have an HTTP representation
- }
-
- // Receive control packet stream
- rpc ReceivePacketsIn(google.protobuf.Empty) returns(stream PacketIn) {
- // This does not have an HTTP representation
- }
-
-}
diff --git a/voltha/protos/meta.proto b/voltha/protos/meta.proto
index 755a087..d16668a 100644
--- a/voltha/protos/meta.proto
+++ b/voltha/protos/meta.proto
@@ -19,7 +19,6 @@
package voltha;
-import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
message ChildNode {
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index 355cfcd..e110147 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -2241,3 +2241,34 @@
repeated uint32 port_status_mask = 2; /* Bitmasks of OFPPR_* values. */
repeated uint32 flow_removed_mask = 3;/* Bitmasks of OFPRR_* values. */
};
+
+
+/* ADDITIONAL VOLTHA SPECIFIC MESSAGE TYPES, AIDING RPC CALLS */
+
+message FlowTableUpdate {
+ string id = 1; // Device.id or LogicalDevice.id
+ ofp_flow_mod flow_mod = 2;
+}
+
+message FlowGroupTableUpdate {
+ string id = 1; // Device.id or LogicalDevice.id
+ ofp_group_mod group_mod = 2;
+}
+
+message Flows {
+ repeated ofp_flow_stats items = 1;
+}
+
+message FlowGroups {
+ repeated ofp_group_entry items = 1;
+}
+
+message PacketIn {
+ string id = 1; // LogicalDevice.id
+ ofp_packet_in packet_in = 2;
+}
+
+message PacketOut {
+ string id = 1; // LogicalDevice.id
+ ofp_packet_out packet_out = 2;
+}
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 163734c..2490974 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -14,33 +14,359 @@
import public "meta.proto";
import public "common.proto";
import public "health.proto";
-import public "logical_layer.proto";
+import public "logical_device.proto";
+import public "device.proto";
import public "adapter.proto";
-import public "example_service.proto";
+import public "openflow_13.proto";
option java_package = "org.opencord.voltha";
option java_outer_classname = "VolthaProtos";
option csharp_namespace = "Opencord.Voltha.Voltha";
-// Top-level (root) config node for Voltha
-message Voltha {
+message DeviceGroup {
+
+ string id = 1 [(access) = READ_ONLY];
+
+ repeated LogicalDevice logical_devices = 2 [(child_node) = {key: "id"}];
+
+ repeated Device devices = 3 [(child_node) = {key: "id"}];
+}
+
+message DeviceGroups {
+ repeated DeviceGroup items = 1;
+}
+
+// Top-level (root) node for a Voltha Instance
+message VolthaInstance {
string instance_id = 1 [(access) = READ_ONLY];
+
string version = 2 [(access) = READ_ONLY];
+
LogLevel log_level = 3;
HealthStatus health = 10 [(child_node) = {}];
- repeated Adapter adapters = 11 [(child_node) = { key: "id" }];
- repeated LogicalDevice logical_devices = 12 [(child_node) = {key:"id"}];
+
+ repeated Adapter adapters = 11 [(child_node) = {key: "id" }];
+
+ repeated LogicalDevice logical_devices = 12 [(child_node) = {key: "id"}];
+
+ repeated Device devices = 13 [(child_node) = {key: "id"}];
+
+ repeated DeviceType device_types = 14 [(child_node) = {key: "id"}];
+
+ repeated DeviceGroup device_groups = 15 [(child_node) = {key: "id"}];
+}
+
+message VolthaInstances {
+ repeated string items = 1;
+}
+
+// Voltha representing the entire Voltha cluster
+message Voltha {
+
+ string version = 1 [(access) = READ_ONLY];
+
+ LogLevel log_level = 2;
+
+ repeated VolthaInstance instances = 3 [(child_node) = {key: "instance_id"}];
+
+ repeated Adapter adapters = 11 [(child_node) = {key: "id"}];
+
+ repeated LogicalDevice logical_devices = 12 [(child_node) = {key: "id"}];
+
+ repeated Device devices = 13 [(child_node) = {key: "id"}];
+
+ repeated DeviceGroup device_groups = 15 [(child_node) = {key: "id"}];
}
-service VolthaService {
+
+/*
+ * Cluster-wide Voltha APIs
+ *
+ * These APIs are potentially dispatched to the leader of the Voltha cluster,
+ * to a specific Voltha instance which owns the given device or logical device.
+ *
+ */
+service VolthaGlobalService {
rpc GetVoltha(google.protobuf.Empty) returns(Voltha) {
option (google.api.http) = {
- get: "/voltha"
+ get: "/api/v1"
};
}
+ rpc ListVolthaInstances(google.protobuf.Empty) returns(VolthaInstances) {
+ option (google.api.http) = {
+ get: "/api/v1/instances"
+ };
+ }
+
+ rpc GetVolthaInstance(ID) returns(VolthaInstance) {
+ option (google.api.http) = {
+ get: "/api/v1/instances/{id}"
+ };
+ }
+
+ rpc ListLogicalDevices(google.protobuf.Empty) returns(LogicalDevices) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices"
+ };
+ }
+
+ rpc GetLogicalDevice(ID) returns(LogicalDevice) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}"
+ };
+ }
+
+ // List ports of a logical device
+ rpc ListLogicalDevicePorts(ID) returns(LogicalPorts) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/ports"
+ };
+ }
+
+ // List all flows of a logical device
+ rpc ListLogicalDeviceFlows(ID) returns(openflow_13.Flows) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/flows"
+ };
+ }
+
+ // Update flow table for device
+ rpc UpdateLogicalDeviceFlowTable(openflow_13.FlowTableUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/flows"
+ body: "*"
+ };
+ }
+
+ // List all flow groups of a logical device
+ rpc ListLogicalDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/flow_groups"
+ };
+ }
+
+ // Update group table for device
+ rpc UpdateLogicalDeviceFlowGroupTable(openflow_13.FlowGroupTableUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/flow_groups"
+ body: "*"
+ };
+ }
+
+ rpc ListDevices(google.protobuf.Empty) returns(Devices) {
+ option (google.api.http) = {
+ get: "/api/v1/devices"
+ };
+ }
+
+ rpc GetDevice(ID) returns(Device) {
+ option (google.api.http) = {
+ get: "/api/v1/devices/{id}"
+ };
+ }
+
+ // List ports of a logical device
+ rpc ListDevicePorts(ID) returns(Ports) {
+ option (google.api.http) = {
+ get: "/api/v1/devices/{id}/ports"
+ };
+ }
+
+ // List all flows of a logical device
+ rpc ListDeviceFlows(ID) returns(openflow_13.Flows) {
+ option (google.api.http) = {
+ get: "/api/v1/devices/{id}/flows"
+ };
+ }
+
+ // List all flow groups of a logical device
+ rpc ListDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/devices/{id}/flow_groups"
+ };
+ }
+
+ rpc ListDeviceTypes(google.protobuf.Empty) returns(DeviceTypes) {
+ option (google.api.http) = {
+ get: "/api/v1/device_types"
+ };
+ }
+
+ rpc GetDeviceType(ID) returns(DeviceType) {
+ option (google.api.http) = {
+ get: "/api/v1/device_types/{id}"
+ };
+ }
+
+ rpc ListDeviceGroups(google.protobuf.Empty) returns(DeviceGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/device_groups"
+ };
+ }
+
+ rpc GetDeviceGroup(ID) returns(DeviceGroup) {
+ option (google.api.http) = {
+ get: "/api/v1/device_groups/{id}"
+ };
+ }
+
+ // TODO other top-level APIs to be added here
+
+}
+
+/*
+ * Per-instance APIs
+ *
+ * These APIs are always served locally by the Voltha instance on which the
+ * call is made.
+ */
+service VolthaLocalService {
+
+ rpc GetVolthaInstance(google.protobuf.Empty) returns(VolthaInstance) {
+ option (google.api.http) = {
+ get: "/api/v1/local"
+ };
+ }
+
+ rpc GetHealth(google.protobuf.Empty) returns(HealthStatus) {
+ option (google.api.http) = {
+ get: "/api/v1/local/health"
+ };
+ }
+
+ rpc ListAdapters(google.protobuf.Empty) returns(Adapters) {
+ option (google.api.http) = {
+ get: "/api/v1/local/adapters"
+ };
+ }
+
+ rpc ListLogicalDevices(google.protobuf.Empty) returns(LogicalDevices) {
+ option (google.api.http) = {
+ get: "/api/v1/local/logical_devices"
+ };
+ }
+
+ rpc GetLogicalDevice(ID) returns(LogicalDevice) {
+ option (google.api.http) = {
+ get: "/api/v1/local/logical_devices/{id}"
+ };
+ }
+
+ // List ports of a logical device
+ rpc ListLogicalDevicePorts(ID) returns(LogicalPorts) {
+ option (google.api.http) = {
+ get: "/api/v1/local/logical_devices/{id}/ports"
+ };
+ }
+
+ // List all flows of a logical device
+ rpc ListLogicalDeviceFlows(ID) returns(openflow_13.Flows) {
+ option (google.api.http) = {
+ get: "/api/v1/local/logical_devices/{id}/flows"
+ };
+ }
+
+ // Update flow table for device
+ rpc UpdateLogicalDeviceFlowTable(openflow_13.FlowTableUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/local/logical_devices/{id}/flows"
+ body: "*"
+ };
+ }
+
+ // List all flow groups of a logical device
+ rpc ListLogicalDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/local/logical_devices/{id}/flow_groups"
+ };
+ }
+
+ // Update group table for device
+ rpc UpdateLogicalDeviceFlowGroupTable(openflow_13.FlowGroupTableUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/local/logical_devices/{id}/flow_groups"
+ body: "*"
+ };
+ }
+
+ rpc ListDevices(google.protobuf.Empty) returns(Devices) {
+ option (google.api.http) = {
+ get: "/api/v1/local/devices"
+ };
+ }
+
+ rpc GetDevice(ID) returns(Device) {
+ option (google.api.http) = {
+ get: "/api/v1/local/devices/{id}"
+ };
+ }
+
+ // List ports of a logical device
+ rpc ListDevicePorts(ID) returns(Ports) {
+ option (google.api.http) = {
+ get: "/api/v1/local/devices/{id}/ports"
+ };
+ }
+
+ // List all flows of a logical device
+ rpc ListDeviceFlows(ID) returns(openflow_13.Flows) {
+ option (google.api.http) = {
+ get: "/api/v1/local/devices/{id}/flows"
+ };
+ }
+
+ // List all flow groups of a logical device
+ rpc ListDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/local/devices/{id}/flow_groups"
+ };
+ }
+
+ rpc ListDeviceTypes(google.protobuf.Empty) returns(DeviceTypes) {
+ option (google.api.http) = {
+ get: "/api/v1/local/device_types"
+ };
+ }
+
+ rpc GetDeviceType(ID) returns(DeviceType) {
+ option (google.api.http) = {
+ get: "/api/v1/local/device_types/{id}"
+ };
+ }
+
+ rpc ListDeviceGroups(google.protobuf.Empty) returns(DeviceGroups) {
+ option (google.api.http) = {
+ get: "/api/v1/local/device_groups"
+ };
+ }
+
+ rpc GetDeviceGroup(ID) returns(DeviceGroup) {
+ option (google.api.http) = {
+ get: "/api/v1/local/device_groups/{id}"
+ };
+ }
+
+ // Stream control packets to the dataplane
+ rpc StreamPacketsOut(stream openflow_13.PacketOut)
+ returns(google.protobuf.Empty) {
+ // This does not have an HTTP representation
+ }
+
+ // Receive control packet stream
+ rpc ReceivePacketsIn(google.protobuf.Empty)
+ returns(stream openflow_13.PacketIn) {
+ // This does not have an HTTP representation
+ }
+
+ // TODO other local APIs to be added here
+
}