[VOL-2833] Reporting total number of instances and current adapter instance during Adapter Registration (needs VOL-2834)
Change-Id: I550696439a167e99c705850bd7fdb23bee8f9069
diff --git a/Makefile b/Makefile
index 5ddfc8f..196da74 100644
--- a/Makefile
+++ b/Makefile
@@ -32,6 +32,14 @@
# ignore these directories
.PHONY: test dist
+local-protos:
+ mkdir -p local_imports
+ifdef LOCAL_PROTOS
+ mkdir -p local_imports/voltha-protos/dist
+ rm -f local_imports/voltha-protos/dist/*.tar.gz
+ cp ${LOCAL_PROTOS}/dist/*.tar.gz local_imports/voltha-protos/dist/
+endif
+
dist:
@ echo "Creating python source distribution"
rm -rf dist/
@@ -43,10 +51,14 @@
VENVDIR := venv-pyvoltha
-venv:
+venv: local-protos
virtualenv --python=python3.6 ${VENVDIR};\
source ./${VENVDIR}/bin/activate ; set -u ;\
pip install -r requirements.txt
+ifdef LOCAL_PROTOS
+ source ./${VENVDIR}/bin/activate ; set -u ;\
+ pip install local_imports/voltha-protos/dist/*.tar.gz
+endif
test:
@ echo "Executing unit tests w/tox"
diff --git a/VERSION b/VERSION
index 197c4d5..005119b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.0
+2.4.1
diff --git a/pyvoltha/adapters/kafka/core_proxy.py b/pyvoltha/adapters/kafka/core_proxy.py
index 8244ad3..5c6ab33 100644
--- a/pyvoltha/adapters/kafka/core_proxy.py
+++ b/pyvoltha/adapters/kafka/core_proxy.py
@@ -68,6 +68,24 @@
@inlineCallbacks
def register(self, adapter, deviceTypes):
log.debug("register")
+
+ if adapter.totalReplicas == 0 and adapter.currentReplica != 0:
+ raise Exception("totalReplicas can't be 0, since you're here you have at least one")
+
+ if adapter.currentReplica == 0 and adapter.totalReplicas != 0:
+ raise Exception("currentReplica can't be 0, it has to start from 1")
+
+ if adapter.currentReplica == 0 and adapter.totalReplicas == 0:
+ # if the adapter is not setting these fields they default to 0,
+ # in that case it means the adapter is not ready to be scaled
+ # and thus it defaults to a single instance
+ adapter.currentReplica = 1
+ adapter.totalReplicas = 1
+
+ if adapter.currentReplica > adapter.totalReplicas:
+ raise Exception("currentReplica (%d) can't be greater than totalReplicas (%d)"
+ % (adapter.currentReplica, adapter.totalReplicas))
+
try:
res = yield self.invoke(rpc="Register",
adapter=adapter,
diff --git a/pyvoltha/adapters/kafka/kafka_inter_container_library.py b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
index b896aa4..9e9a6be 100644
--- a/pyvoltha/adapters/kafka/kafka_inter_container_library.py
+++ b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
@@ -557,10 +557,10 @@
self.transaction_id_deferred_map[
self._to_string(request.header.id)] = wait_for_result
log.debug("message-send", transaction_id=transaction_id, to_topic=to_topic,
- from_topic=reply_topic)
+ from_topic=reply_topic, rpc=rpc)
yield self._send_kafka_message(to_topic, request)
log.debug("message-sent", transaction_id=transaction_id, to_topic=to_topic,
- from_topic=reply_topic)
+ from_topic=reply_topic, rpc=rpc)
if response_required:
res = yield wait_for_result
@@ -570,7 +570,7 @@
if res is not None:
if res.success:
- log.debug("send-message-response", rpc=rpc)
+ log.debug("send-message-response", transaction_id=transaction_id, rpc=rpc)
if callback:
callback((res.success, res.result))
else:
@@ -578,7 +578,7 @@
else:
# this is the case where the core API returns a grpc code.NotFound. Return or callback
# so the caller can act appropriately (i.e add whatever was not found)
- log.warn("send-message-response-error-result", kafka_request=request, kafka_result=res)
+ log.warn("send-message-response-error-result", transaction_id=transaction_id, rpc=rpc, kafka_request=request, kafka_result=res)
if callback:
callback((res.success, None))
else:
diff --git a/requirements.txt b/requirements.txt
index 259f140..b9243fb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,4 +18,4 @@
structlog==19.2.0
transitions==0.6.4
txaioetcd==0.3.0
-voltha-protos==3.2.8
+voltha-protos==3.3.0
diff --git a/test/unit/kafka/core_proxy_test.py b/test/unit/kafka/core_proxy_test.py
new file mode 100644
index 0000000..10fe1e5
--- /dev/null
+++ b/test/unit/kafka/core_proxy_test.py
@@ -0,0 +1,150 @@
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import sys
+from unittest import TestCase, main
+from unittest.mock import patch
+from twisted.internet import defer
+from voltha_protos.adapter_pb2 import Adapter
+from voltha_protos.device_pb2 import DeviceType
+
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../../../")))
+
+def mock_decorator(f):
+ def real_wrapper(func):
+ return func
+ return real_wrapper
+
+
+patch('pyvoltha.adapters.kafka.container_proxy.ContainerProxy.wrap_request', mock_decorator).start()
+from pyvoltha.adapters.kafka.core_proxy import CoreProxy
+
+
+class TestCoreProxy(TestCase):
+
+ def setUp(self):
+ self.core_proxy = CoreProxy(
+ kafka_proxy=None,
+ default_core_topic='test_core',
+ default_event_topic='test.events',
+ my_listening_topic='test_openonu')
+
+ self.supported_device_types = [
+ DeviceType(
+ id="brmc_openonu",
+ vendor_ids=['BBSM'],
+ adapter="openonu",
+ accepts_bulk_flow_update=False,
+ accepts_add_remove_flow_updates=True
+ )
+ ]
+
+ @defer.inlineCallbacks
+ def test_register_defaults(self):
+ adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ )
+
+ expected_adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ currentReplica=1,
+ totalReplicas=1
+ )
+
+ with patch.object(self.core_proxy, "invoke") as mock_invoke:
+
+ mock_invoke.return_value = "success"
+
+ res = yield self.core_proxy.register(adapter, self.supported_device_types)
+ mock_invoke.assert_called_with(
+ rpc="Register",
+ adapter=expected_adapter,
+ deviceTypes=self.supported_device_types
+ )
+ self.assertTrue(mock_invoke.call_count, 1)
+ self.assertEqual(res, "success")
+
+ @defer.inlineCallbacks
+ def test_register_multiple(self):
+
+ adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ currentReplica=4,
+ totalReplicas=8
+ )
+
+ with patch.object(self.core_proxy, "invoke") as mock_invoke:
+ mock_invoke.return_value = "success"
+
+ res = yield self.core_proxy.register(adapter, self.supported_device_types)
+ mock_invoke.assert_called_with(
+ rpc="Register",
+ adapter=adapter,
+ deviceTypes=self.supported_device_types
+ )
+
+ @defer.inlineCallbacks
+ def test_register_misconfigured(self):
+ """
+ In case the operator sets wrong parameter, eg: currentReplica=10, totalReplicas=2
+ raise an exception
+ """
+ adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ currentReplica=10,
+ totalReplicas=8
+ )
+
+ with self.assertRaises(Exception) as e:
+ res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+ self.assertEqual(str(e.exception), "currentReplica (10) can't be greater than totalReplicas (8)")
+
+ adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ totalReplicas=0,
+ currentReplica=1
+ )
+
+ with self.assertRaises(Exception) as e:
+ res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+ self.assertEqual(str(e.exception), "totalReplicas can't be 0, since you're here you have at least one")
+
+ adapter = Adapter(
+ id="testAdapter",
+ vendor="ONF",
+ version="test",
+ totalReplicas=1,
+ currentReplica=0
+ )
+
+ with self.assertRaises(Exception) as e:
+ res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+ self.assertEqual(str(e.exception), "currentReplica can't be 0, it has to start from 1")
+
+
+if __name__ == '__main__':
+ main()