Impl of config events.
Changes to the config store are advertised on the event bus
under the 'model-change-events' topic which is forwarded kafka.
Change-Id: Iad019b866eedb9e53a3ea9c70434afc7ec96d548
diff --git a/tests/utests/voltha/core/config/test_config.py b/tests/utests/voltha/core/config/test_config.py
index ff67283..ea112f1 100644
--- a/tests/utests/voltha/core/config/test_config.py
+++ b/tests/utests/voltha/core/config/test_config.py
@@ -7,13 +7,17 @@
import gc
+from google.protobuf.json_format import MessageToDict
from mock import Mock
+from simplejson import dumps
-from voltha.core.config.config_proxy import CallbackType, OperationContext
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
from voltha.core.config.config_rev import _rev_cache
from voltha.core.config.config_root import ConfigRoot, MergeConflictException
from voltha.core.config.config_txn import ClosedTransactionError
from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
from voltha.protos.openflow_13_pb2 import ofp_port
from voltha.protos.voltha_pb2 import VolthaInstance, Adapter, HealthStatus, \
AdapterConfig, LogicalDevice, LogicalPort
@@ -539,6 +543,42 @@
pre_callback.assert_called_with(adapter)
post_callback.assert_called_with(adapter)
+class TestEventLogic(DeepTestsBase):
+
+ def setUp(self):
+ super(TestEventLogic, self).setUp()
+ self.ebc = EventBusClient()
+ self.event_mock = Mock()
+ self.ebc.subscribe('model-change-events', self.event_mock)
+
+ def test_add_event(self):
+
+ data = Adapter(id='10', version='zoo')
+ self.node.add('/adapters', data)
+ event = ConfigEvent(
+ type=ConfigEventType.add,
+ hash=self.node.latest.hash,
+ data=dumps(MessageToDict(data, True, True))
+ )
+
+ self.event_mock.assert_called_once_with('model-change-events', event)
+
+ def test_remove_event(self):
+ data = Adapter(
+ id='1',
+ config=AdapterConfig(
+ log_level=3
+ )
+ )
+ self.node.remove('/adapters/1')
+ event = ConfigEvent(
+ type=ConfigEventType.remove,
+ hash=self.node.latest.hash,
+ data=dumps(MessageToDict(data, True, True))
+ )
+
+ self.event_mock.assert_called_once_with('model-change-events', event)
+
class TestTransactionalLogic(DeepTestsBase):
diff --git a/voltha/core/config/config_event_bus.py b/voltha/core/config/config_event_bus.py
new file mode 100644
index 0000000..e000f2e
--- /dev/null
+++ b/voltha/core/config/config_event_bus.py
@@ -0,0 +1,53 @@
+import structlog
+from enum import Enum
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos import third_party
+from voltha.protos.events_pb2 import ConfigEvent, ConfigEventType
+
+IGNORED_CALLBACKS = [CallbackType.PRE_ADD, CallbackType.GET,
+ CallbackType.POST_LISTCHANGE, CallbackType.PRE_REMOVE,
+ CallbackType.PRE_UPDATE]
+
+log = structlog.get_logger()
+
+class ConfigEventBus(object):
+
+ __slots__ = (
+ '_event_bus_client', # The event bus client used to publish events.
+ '_topic' # the topic to publish to
+ )
+
+ def __init__(self):
+ self._event_bus_client = EventBusClient()
+ self._topic = 'model-change-events'
+
+ def advertise(self, type, data, hash=None):
+ if type in IGNORED_CALLBACKS:
+ log.info('Ignoring event {} with data {}'.format(type, data))
+ return
+
+ if type is CallbackType.POST_ADD:
+ kind = ConfigEventType.add
+ elif type is CallbackType.POST_REMOVE:
+ kind = ConfigEventType.remove
+ else:
+ kind = ConfigEventType.update
+
+ if isinstance(data, Message):
+ msg = dumps(MessageToDict(data, True, True))
+ else:
+ msg = data
+
+ event = ConfigEvent(
+ type=kind,
+ hash=hash,
+ data=msg
+ )
+
+ self._event_bus_client.publish(self._topic, event)
+
diff --git a/voltha/core/config/config_node.py b/voltha/core/config/config_node.py
index cba6d88..4e1023b 100644
--- a/voltha/core/config/config_node.py
+++ b/voltha/core/config/config_node.py
@@ -20,6 +20,7 @@
from common.utils.json_format import MessageToDict
from voltha.core.config.config_branch import ConfigBranch
+from voltha.core.config.config_event_bus import ConfigEventBus
from voltha.core.config.config_proxy import CallbackType, ConfigProxy
from voltha.core.config.config_rev import is_proto_message, children_fields, \
ConfigRevision, access_rights
@@ -69,6 +70,7 @@
# branch
'_tags', # dict of tag-name to ref of ConfigRevision
'_proxy', # ref to proxy observer or None if no proxy assigned
+ '_event_bus', # ref to event_bus or None if no event bus is assigned
'_auto_prune'
)
@@ -77,6 +79,7 @@
self._branches = {}
self._tags = {}
self._proxy = None
+ self._event_bus = None
self._auto_prune = auto_prune
if isinstance(initial_data, type):
@@ -286,18 +289,28 @@
branch._revs[rev.hash] = rev
# announce only if this is main branch
- if change_announcements and branch._txid is None and \
- self._proxy is not None:
+ if change_announcements and branch._txid is None:
+
+ if self._proxy is not None:
+ for change_type, data in change_announcements:
+ # since the callback may operate on the config tree,
+ # we have to defer the execution of the callbacks till
+ # the change is propagated to the root, then root will
+ # call the callbacks
+ self._root.enqueue_callback(
+ self._proxy.invoke_callbacks,
+ change_type,
+ data,
+ proceed_on_errors=1,
+ )
+
+
for change_type, data in change_announcements:
- # since the callback may operate on the config tree,
- # we have to defer the execution of the callbacks till
- # the change is propagated to the root, then root will
- # call the callbacks
self._root.enqueue_callback(
- self._proxy.invoke_callbacks,
+ self._mk_event_bus().advertise,
change_type,
data,
- proceed_on_errors=1
+ hash=rev.hash
)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ add operation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -401,7 +414,7 @@
CallbackType.PRE_REMOVE, data)
post_anno = ((CallbackType.POST_REMOVE, data),)
else:
- post_anno = ()
+ post_anno = ((CallbackType.POST_REMOVE, child_rev.data),)
del children[idx]
rev = rev.update_children(name, children, branch)
self._make_latest(branch, rev, post_anno)
@@ -567,6 +580,11 @@
raise ValueError('Node is already owned exclusively')
return self._proxy
+ def _mk_event_bus(self):
+ if self._event_bus is None:
+ self._event_bus = ConfigEventBus()
+ return self._event_bus
+
# ~~~~~~~~~~~~~~~~~~~~~~~~ Persistence loading ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def load_latest(self, latest_hash):
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 8fe434c..0c5ac04 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -4,7 +4,6 @@
import "meta.proto";
import "google/api/annotations.proto";
-import "google/protobuf/any.proto";
message ConfigEventType {
enum ConfigEventType {
@@ -18,7 +17,7 @@
ConfigEventType.ConfigEventType type = 1;
string hash = 2; // hash for this change, can be used for quick lookup
- google.protobuf.Any data = 3; // the actual new data, may be empty for a remove operation
+ string data = 3; // the actual new data, in json format
}
message KpiEventType {