[VOL-2833]  Reporting total number of instances and current adapter instance during Adapter Registration (needs VOL-2834)

Change-Id: I550696439a167e99c705850bd7fdb23bee8f9069
diff --git a/Makefile b/Makefile
index 5ddfc8f..196da74 100644
--- a/Makefile
+++ b/Makefile
@@ -32,6 +32,14 @@
 # ignore these directories
 .PHONY: test dist
 
+local-protos:
+	mkdir -p local_imports
+ifdef LOCAL_PROTOS
+	mkdir -p local_imports/voltha-protos/dist
+	rm -f local_imports/voltha-protos/dist/*.tar.gz
+	cp ${LOCAL_PROTOS}/dist/*.tar.gz local_imports/voltha-protos/dist/
+endif
+
 dist:
 	@ echo "Creating python source distribution"
 	rm -rf dist/
@@ -43,10 +51,14 @@
 
 VENVDIR := venv-pyvoltha
 
-venv:
+venv: local-protos
 	virtualenv --python=python3.6 ${VENVDIR};\
     source ./${VENVDIR}/bin/activate ; set -u ;\
     pip install -r requirements.txt
+ifdef LOCAL_PROTOS
+	source ./${VENVDIR}/bin/activate ; set -u ;\
+	pip install local_imports/voltha-protos/dist/*.tar.gz
+endif
 
 test:
 	@ echo "Executing unit tests w/tox"
diff --git a/VERSION b/VERSION
index 197c4d5..005119b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.0
+2.4.1
diff --git a/pyvoltha/adapters/kafka/core_proxy.py b/pyvoltha/adapters/kafka/core_proxy.py
index 8244ad3..5c6ab33 100644
--- a/pyvoltha/adapters/kafka/core_proxy.py
+++ b/pyvoltha/adapters/kafka/core_proxy.py
@@ -68,6 +68,24 @@
     @inlineCallbacks
     def register(self, adapter, deviceTypes):
         log.debug("register")
+
+        if adapter.totalReplicas == 0 and adapter.currentReplica != 0:
+            raise Exception("totalReplicas can't be 0, since you're here you have at least one")
+
+        if adapter.currentReplica == 0 and adapter.totalReplicas != 0:
+            raise Exception("currentReplica can't be 0, it has to start from 1")
+
+        if adapter.currentReplica == 0 and adapter.totalReplicas == 0:
+            # if the adapter is not setting these fields they default to 0,
+            # in that case it means the adapter is not ready to be scaled
+            # and thus it defaults to a single instance
+            adapter.currentReplica = 1
+            adapter.totalReplicas = 1
+
+        if adapter.currentReplica > adapter.totalReplicas:
+            raise Exception("currentReplica (%d) can't be greater than totalReplicas (%d)"
+                          % (adapter.currentReplica, adapter.totalReplicas))
+
         try:
             res = yield self.invoke(rpc="Register",
                                     adapter=adapter,
diff --git a/pyvoltha/adapters/kafka/kafka_inter_container_library.py b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
index b896aa4..9e9a6be 100644
--- a/pyvoltha/adapters/kafka/kafka_inter_container_library.py
+++ b/pyvoltha/adapters/kafka/kafka_inter_container_library.py
@@ -557,10 +557,10 @@
                 self.transaction_id_deferred_map[
                     self._to_string(request.header.id)] = wait_for_result
             log.debug("message-send", transaction_id=transaction_id, to_topic=to_topic,
-                      from_topic=reply_topic)
+                      from_topic=reply_topic, rpc=rpc)
             yield self._send_kafka_message(to_topic, request)
             log.debug("message-sent", transaction_id=transaction_id, to_topic=to_topic,
-                      from_topic=reply_topic)
+                      from_topic=reply_topic, rpc=rpc)
 
             if response_required:
                 res = yield wait_for_result
@@ -570,7 +570,7 @@
 
                 if res is not None:
                     if res.success:
-                        log.debug("send-message-response", rpc=rpc)
+                        log.debug("send-message-response", transaction_id=transaction_id, rpc=rpc)
                         if callback:
                             callback((res.success, res.result))
                         else:
@@ -578,7 +578,7 @@
                     else:
                         # this is the case where the core API returns a grpc code.NotFound.  Return or callback
                         # so the caller can act appropriately (i.e add whatever was not found)
-                        log.warn("send-message-response-error-result", kafka_request=request, kafka_result=res)
+                        log.warn("send-message-response-error-result", transaction_id=transaction_id, rpc=rpc, kafka_request=request, kafka_result=res)
                         if callback:
                             callback((res.success, None))
                         else:
diff --git a/requirements.txt b/requirements.txt
index 259f140..b9243fb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,4 +18,4 @@
 structlog==19.2.0
 transitions==0.6.4
 txaioetcd==0.3.0
-voltha-protos==3.2.8
+voltha-protos==3.3.0
diff --git a/test/unit/kafka/core_proxy_test.py b/test/unit/kafka/core_proxy_test.py
new file mode 100644
index 0000000..10fe1e5
--- /dev/null
+++ b/test/unit/kafka/core_proxy_test.py
@@ -0,0 +1,150 @@
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import sys
+from unittest import TestCase, main
+from unittest.mock import patch
+from twisted.internet import defer
+from voltha_protos.adapter_pb2 import Adapter
+from voltha_protos.device_pb2 import DeviceType
+
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../../../")))
+
+def mock_decorator(f):
+    def real_wrapper(func):
+        return func
+    return real_wrapper
+
+
+patch('pyvoltha.adapters.kafka.container_proxy.ContainerProxy.wrap_request', mock_decorator).start()
+from pyvoltha.adapters.kafka.core_proxy import CoreProxy
+
+
+class TestCoreProxy(TestCase):
+
+    def setUp(self):
+        self.core_proxy = CoreProxy(
+            kafka_proxy=None,
+            default_core_topic='test_core',
+            default_event_topic='test.events',
+            my_listening_topic='test_openonu')
+
+        self.supported_device_types = [
+            DeviceType(
+                id="brmc_openonu",
+                vendor_ids=['BBSM'],
+                adapter="openonu",
+                accepts_bulk_flow_update=False,
+                accepts_add_remove_flow_updates=True
+            )
+        ]
+
+    @defer.inlineCallbacks
+    def test_register_defaults(self):
+        adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+        )
+
+        expected_adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+            currentReplica=1,
+            totalReplicas=1
+        )
+
+        with patch.object(self.core_proxy, "invoke") as mock_invoke:
+
+            mock_invoke.return_value = "success"
+
+            res = yield self.core_proxy.register(adapter, self.supported_device_types)
+            mock_invoke.assert_called_with(
+                rpc="Register",
+                adapter=expected_adapter,
+                deviceTypes=self.supported_device_types
+            )
+            self.assertTrue(mock_invoke.call_count, 1)
+            self.assertEqual(res, "success")
+
+    @defer.inlineCallbacks
+    def test_register_multiple(self):
+
+        adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+            currentReplica=4,
+            totalReplicas=8
+        )
+
+        with patch.object(self.core_proxy, "invoke") as mock_invoke:
+            mock_invoke.return_value = "success"
+
+            res = yield self.core_proxy.register(adapter, self.supported_device_types)
+            mock_invoke.assert_called_with(
+                rpc="Register",
+                adapter=adapter,
+                deviceTypes=self.supported_device_types
+            )
+
+    @defer.inlineCallbacks
+    def test_register_misconfigured(self):
+        """
+        In case the operator sets wrong parameter, eg: currentReplica=10, totalReplicas=2
+        raise an exception
+        """
+        adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+            currentReplica=10,
+            totalReplicas=8
+        )
+
+        with self.assertRaises(Exception) as e:
+            res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+        self.assertEqual(str(e.exception), "currentReplica (10) can't be greater than totalReplicas (8)")
+
+        adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+            totalReplicas=0,
+            currentReplica=1
+        )
+
+        with self.assertRaises(Exception) as e:
+            res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+        self.assertEqual(str(e.exception), "totalReplicas can't be 0, since you're here you have at least one")
+
+        adapter = Adapter(
+            id="testAdapter",
+            vendor="ONF",
+            version="test",
+            totalReplicas=1,
+            currentReplica=0
+        )
+
+        with self.assertRaises(Exception) as e:
+            res = yield self.core_proxy.register(adapter, self.supported_device_types)
+
+        self.assertEqual(str(e.exception), "currentReplica can't be 0, it has to start from 1")
+
+
+if __name__ == '__main__':
+    main()