Impl of config events.

Changes to the config store are advertised on the event bus
under the 'model-change-events' topic which is forwarded kafka.

Change-Id: Iad019b866eedb9e53a3ea9c70434afc7ec96d548
diff --git a/tests/utests/voltha/core/config/test_config.py b/tests/utests/voltha/core/config/test_config.py
index ff67283..ea112f1 100644
--- a/tests/utests/voltha/core/config/test_config.py
+++ b/tests/utests/voltha/core/config/test_config.py
@@ -7,13 +7,17 @@
 
 import gc
 
+from google.protobuf.json_format import MessageToDict
 from mock import Mock
+from simplejson import dumps
 
-from voltha.core.config.config_proxy import CallbackType, OperationContext
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
 from voltha.core.config.config_rev import _rev_cache
 from voltha.core.config.config_root import ConfigRoot, MergeConflictException
 from voltha.core.config.config_txn import ClosedTransactionError
 from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
 from voltha.protos.openflow_13_pb2 import ofp_port
 from voltha.protos.voltha_pb2 import VolthaInstance, Adapter, HealthStatus, \
     AdapterConfig, LogicalDevice, LogicalPort
@@ -539,6 +543,42 @@
         pre_callback.assert_called_with(adapter)
         post_callback.assert_called_with(adapter)
 
+class TestEventLogic(DeepTestsBase):
+
+    def setUp(self):
+        super(TestEventLogic, self).setUp()
+        self.ebc = EventBusClient()
+        self.event_mock = Mock()
+        self.ebc.subscribe('model-change-events', self.event_mock)
+
+    def test_add_event(self):
+
+        data = Adapter(id='10', version='zoo')
+        self.node.add('/adapters', data)
+        event = ConfigEvent(
+            type=ConfigEventType.add,
+            hash=self.node.latest.hash,
+            data=dumps(MessageToDict(data, True, True))
+        )
+
+        self.event_mock.assert_called_once_with('model-change-events', event)
+
+    def test_remove_event(self):
+        data = Adapter(
+            id='1',
+            config=AdapterConfig(
+                log_level=3
+            )
+        )
+        self.node.remove('/adapters/1')
+        event = ConfigEvent(
+            type=ConfigEventType.remove,
+            hash=self.node.latest.hash,
+            data=dumps(MessageToDict(data, True, True))
+        )
+
+        self.event_mock.assert_called_once_with('model-change-events', event)
+
 
 class TestTransactionalLogic(DeepTestsBase):
 
diff --git a/voltha/core/config/config_event_bus.py b/voltha/core/config/config_event_bus.py
new file mode 100644
index 0000000..e000f2e
--- /dev/null
+++ b/voltha/core/config/config_event_bus.py
@@ -0,0 +1,53 @@
+import structlog
+from enum import Enum
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
+
+IGNORED_CALLBACKS = [CallbackType.PRE_ADD, CallbackType.GET,
+                     CallbackType.POST_LISTCHANGE, CallbackType.PRE_REMOVE,
+                     CallbackType.PRE_UPDATE]
+
+log = structlog.get_logger()
+
+class ConfigEventBus(object):
+
+    __slots__ = (
+        '_event_bus_client',  # The event bus client used to publish events.
+        '_topic'  # the topic to publish to
+    )
+
+    def __init__(self):
+        self._event_bus_client = EventBusClient()
+        self._topic = 'model-change-events'
+
+    def advertise(self, type, data, hash=None):
+        if type in IGNORED_CALLBACKS:
+            log.info('Ignoring event {} with data {}'.format(type, data))
+            return
+
+        if type is CallbackType.POST_ADD:
+            kind = ConfigEventType.add
+        elif type is CallbackType.POST_REMOVE:
+            kind = ConfigEventType.remove
+        else:
+            kind = ConfigEventType.update
+
+        if isinstance(data, Message):
+            msg = dumps(MessageToDict(data, True, True))
+        else:
+            msg = data
+
+        event = ConfigEvent(
+            type=kind,
+            hash=hash,
+            data=msg
+        )
+
+        self._event_bus_client.publish(self._topic, event)
+
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index cba6d88..4e1023b 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -20,6 +20,7 @@
 
 from common.utils.json_format import MessageToDict
 from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
 from voltha.core.config.config_proxy import CallbackType, ConfigProxy
 from voltha.core.config.config_rev import is_proto_message, children_fields, \
     ConfigRevision, access_rights
@@ -69,6 +70,7 @@
                       # branch
         '_tags',  # dict of tag-name to ref of ConfigRevision
         '_proxy',  # ref to proxy observer or None if no proxy assigned
+        '_event_bus',  # ref to event_bus or None if no event bus is assigned
         '_auto_prune'
     )
 
@@ -77,6 +79,7 @@
         self._branches = {}
         self._tags = {}
         self._proxy = None
+        self._event_bus = None
         self._auto_prune = auto_prune
 
         if isinstance(initial_data, type):
@@ -286,18 +289,28 @@
             branch._revs[rev.hash] = rev
 
         # announce only if this is main branch
-        if change_announcements and branch._txid is None and \
-                        self._proxy is not None:
+        if change_announcements and branch._txid is None:
+
+            if self._proxy is not None:
+                for change_type, data in change_announcements:
+                    # since the callback may operate on the config tree,
+                    # we have to defer the execution of the callbacks till
+                    # the change is propagated to the root, then root will
+                    # call the callbacks
+                    self._root.enqueue_callback(
+                        self._proxy.invoke_callbacks,
+                        change_type,
+                        data,
+                        proceed_on_errors=1,
+                    )
+
+
             for change_type, data in change_announcements:
-                # since the callback may operate on the config tree,
-                # we have to defer the execution of the callbacks till
-                # the change is propagated to the root, then root will
-                # call the callbacks
                 self._root.enqueue_callback(
-                    self._proxy.invoke_callbacks,
+                    self._mk_event_bus().advertise,
                     change_type,
                     data,
-                    proceed_on_errors=1
+                    hash=rev.hash
                 )
 
     # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -401,7 +414,7 @@
                             CallbackType.PRE_REMOVE, data)
                         post_anno = ((CallbackType.POST_REMOVE, data),)
                     else:
-                        post_anno = ()
+                        post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
                     del children[idx]
                     rev = rev.update_children(name, children, branch)
                     self._make_latest(branch, rev, post_anno)
@@ -567,6 +580,11 @@
                 raise ValueError('Node is already owned exclusively')
         return self._proxy
 
+    def _mk_event_bus(self):
+        if self._event_bus is None:
+            self._event_bus = ConfigEventBus()
+        return self._event_bus
+
     # ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     def load_latest(self, latest_hash):
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 8fe434c..0c5ac04 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -4,7 +4,6 @@
 
 import "meta.proto";
 import "google/api/annotations.proto";
-import "google/protobuf/any.proto";
 
 message ConfigEventType {
     enum ConfigEventType {
@@ -18,7 +17,7 @@
     ConfigEventType.ConfigEventType type = 1;
 
     string hash = 2; // hash for this change, can be used for quick lookup
-    google.protobuf.Any data = 3; // the actual new data, may be empty for a remove operation
+    string data = 3; // the actual new data, in json format
 }
 
 message KpiEventType {