Internal universal pub/sub event bus
Change-Id: I0f295727add5675bef292f5fd764ecc766239107
diff --git a/tests/utests/voltha/core/__init__.py b/tests/utests/voltha/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/utests/voltha/core/__init__.py
diff --git a/tests/utests/voltha/core/test_event_bus.py b/tests/utests/voltha/core/test_event_bus.py
new file mode 100644
index 0000000..f937722
--- /dev/null
+++ b/tests/utests/voltha/core/test_event_bus.py
@@ -0,0 +1,197 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import re
+
+from mock import Mock
+from mock import call
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.trial.unittest import TestCase
+
+from voltha.core.event_bus import EventBusClient, EventBus
+
+
+class TestEventBus(TestCase):
+
+ def test_subscribe(self):
+
+ ebc = EventBusClient()
+ sub = ebc.subscribe('news', lambda msg, topic: None)
+ self.assertEqual(len(ebc.list_subscribers()), 1)
+ self.assertEqual(len(ebc.list_subscribers('news')), 1)
+ self.assertEqual(len(ebc.list_subscribers('other')), 0)
+
+ def test_unsubscribe(self):
+
+ ebc = EventBusClient(EventBus())
+ sub = ebc.subscribe('news', lambda msg, topic: None)
+ ebc.unsubscribe(sub)
+ self.assertEqual(ebc.list_subscribers(), [])
+ self.assertEqual(ebc.list_subscribers('news'), [])
+
+ def test_simple_publish(self):
+
+ ebc = EventBusClient(EventBus())
+
+ mock = Mock()
+ ebc.subscribe('news', mock)
+
+ ebc.publish('news', 'message')
+
+ self.assertEqual(mock.call_count, 1)
+ mock.assert_called_with('news', 'message')
+
+ def test_topic_filtering(self):
+
+ ebc = EventBusClient(EventBus())
+
+ mock = Mock()
+ ebc.subscribe('news', mock)
+
+ ebc.publish('news', 'msg1')
+ ebc.publish('alerts', 'msg2')
+ ebc.publish('logs', 'msg3')
+
+ self.assertEqual(mock.call_count, 1)
+ mock.assert_called_with('news', 'msg1')
+
+ def test_multiple_subscribers(self):
+
+ ebc = EventBusClient(EventBus())
+
+ mock1 = Mock()
+ ebc.subscribe('news', mock1)
+
+ mock2 = Mock()
+ ebc.subscribe('alerts', mock2)
+
+ mock3 = Mock()
+ ebc.subscribe('logs', mock3)
+
+ mock4 = Mock()
+ ebc.subscribe('logs', mock4)
+
+ ebc.publish('news', 'msg1')
+ ebc.publish('alerts', 'msg2')
+ ebc.publish('logs', 'msg3')
+
+ self.assertEqual(mock1.call_count, 1)
+ mock1.assert_called_with('news', 'msg1')
+
+ self.assertEqual(mock2.call_count, 1)
+ mock2.assert_called_with('alerts', 'msg2')
+
+ self.assertEqual(mock3.call_count, 1)
+ mock3.assert_called_with('logs', 'msg3')
+
+ self.assertEqual(mock4.call_count, 1)
+ mock4.assert_called_with('logs', 'msg3')
+
+ def test_predicates(self):
+
+ ebc = EventBusClient(EventBus())
+
+ get_foos = Mock()
+ ebc.subscribe('', get_foos, lambda msg: msg.startswith('foo'))
+
+ get_bars = Mock()
+ ebc.subscribe('', get_bars, lambda msg: msg.endswith('bar'))
+
+ get_all = Mock()
+ ebc.subscribe('', get_all)
+
+ get_none = Mock()
+ ebc.subscribe('', get_none, lambda msg: msg.find('zoo') >= 0)
+
+ errored = Mock()
+ ebc.subscribe('', errored, lambda msg: 1/0)
+
+ ebc.publish('', 'foo')
+ ebc.publish('', 'foobar')
+ ebc.publish('', 'bar')
+
+ c = call
+
+ self.assertEqual(get_foos.call_count, 2)
+ get_foos.assert_has_calls([c('', 'foo'), c('', 'foobar')])
+
+ self.assertEqual(get_bars.call_count, 2)
+ get_bars.assert_has_calls([c('', 'foobar'), c('', 'bar')])
+
+ self.assertEqual(get_all.call_count, 3)
+ get_all.assert_has_calls([c('', 'foo'), c('', 'foobar'), c('', 'bar')])
+
+ get_none.assert_not_called()
+
+ errored.assert_not_called()
+
+ def test_wildcard_topic(self):
+
+ ebc = EventBusClient(EventBus())
+ subs = []
+
+ wildcard_sub = Mock()
+ subs.append(ebc.subscribe(re.compile(r'.*'), wildcard_sub))
+
+ prefix_sub = Mock()
+ subs.append(ebc.subscribe(re.compile(r'ham.*'), prefix_sub))
+
+ contains_sub = Mock()
+ subs.append(ebc.subscribe(re.compile(r'.*burg.*'), contains_sub))
+
+ ebc.publish('news', 1)
+ ebc.publish('hamsters', 2)
+ ebc.publish('hamburgers', 3)
+ ebc.publish('nonsense', 4)
+
+ c = call
+
+ self.assertEqual(wildcard_sub.call_count, 4)
+ wildcard_sub.assert_has_calls([
+ c('news', 1),
+ c('hamsters', 2),
+ c('hamburgers', 3),
+ c('nonsense', 4)])
+
+ self.assertEqual(prefix_sub.call_count, 2)
+ prefix_sub.assert_has_calls([
+ c('hamsters', 2),
+ c('hamburgers', 3)])
+
+ self.assertEqual(contains_sub.call_count, 1)
+ contains_sub.assert_has_calls([c('hamburgers', 3)])
+
+ for sub in subs:
+ ebc.unsubscribe(sub)
+
+ self.assertEqual(ebc.list_subscribers(), [])
+
+ @inlineCallbacks
+ def test_deferred_queue_receiver(self):
+
+ ebc = EventBus()
+
+ queue = DeferredQueue()
+
+ ebc.subscribe('', lambda _, msg: queue.put(msg))
+
+ for i in xrange(10):
+ ebc.publish('', i)
+
+ self.assertEqual(len(queue.pending), 10)
+ for i in xrange(10):
+ msg = yield queue.get()
+ self.assertEqual(msg, i)
+ self.assertEqual(len(queue.pending), 0)
diff --git a/voltha/core/core.py b/voltha/core/core.py
new file mode 100644
index 0000000..14ee432
--- /dev/null
+++ b/voltha/core/core.py
@@ -0,0 +1,42 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Voltha's CORE components.
+"""
+import structlog
+
+log = structlog.get_logger()
+
+
+class VolthaCore(object):
+
+ def __init__(self):
+
+ self.stopped = False
+
+ def start(self):
+ log.debug('starting')
+ pass
+ log.info('started')
+
+ def stop(self):
+ log.debug('stopping')
+ self.stopped = True
+ log.info('stopped')
+
+ # TODO
+
diff --git a/voltha/core/event_bus.py b/voltha/core/event_bus.py
new file mode 100644
index 0000000..47e0dcd
--- /dev/null
+++ b/voltha/core/event_bus.py
@@ -0,0 +1,191 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple internal pub/sub event bus with topics and filter-based registration.
+"""
+import re
+
+import structlog
+
+
+log = structlog.get_logger()
+
+
+class _Subscription(object):
+
+ __slots__ = ('bus', 'predicate', 'callback', 'topic')
+ def __init__(self, bus, predicate, callback, topic=None):
+ self.bus = bus
+ self.predicate = predicate
+ self.callback = callback
+ self.topic = topic
+
+
+class EventBus(object):
+
+ def __init__(self):
+ self.subscriptions = {} # topic -> list of _Subscription objects
+ # topic None holds regexp based topic subs.
+ self.subs_topic_map = {} # to aid fast lookup when unsubscribing
+
+ def list_subscribers(self, topic=None):
+ if topic is None:
+ return sum(self.subscriptions.itervalues(), [])
+ else:
+ if topic in self.subscriptions:
+ return self.subscriptions[topic]
+ else:
+ return []
+
+ @staticmethod
+ def _get_topic_key(topic):
+ if isinstance(topic, str):
+ return topic
+ elif hasattr(topic, 'match'):
+ return None
+ else:
+ raise AttributeError('topic not a string nor a compiled regex')
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function signature def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ subscription = _Subscription(self, predicate, callback, topic)
+ topic_key = self._get_topic_key(topic)
+ self.subscriptions.setdefault(topic_key, []).append(subscription)
+ self.subs_topic_map[subscription] = topic_key
+ return subscription
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ topic_key = self.subs_topic_map[subscription]
+ self.subscriptions[topic_key].remove(subscription)
+
+ def publish(self, topic, msg):
+ """
+ Publish given message to all subscribers registered with topic taking
+ the predicate functions into account.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+
+ def passes(msg, predicate):
+ try:
+ return predicate(msg)
+ except Exception, e:
+ return False # failed predicate function treated as no match
+
+ # lookup subscribers with explicit topic subscriptions
+ subscribers = self.subscriptions.get(topic, [])
+
+ # add matching regexp topic subscribers
+ subscribers.extend(s for s in self.subscriptions.get(None, [])
+ if s.topic.match(topic))
+
+ for candidate in subscribers:
+ predicate = candidate.predicate
+ if predicate is None or passes(msg, predicate):
+ try:
+ candidate.callback(topic, msg)
+ except Exception, e:
+ log.warning('callback-failed', e=e, topic=topic)
+
+
+default_bus = EventBus()
+
+
+class EventBusClient(object):
+ """
+ Primary interface to the EventBus. Usage:
+
+ Publish:
+ >>> events = EventBusClient()
+ >>> msg = dict(a=1, b='foo')
+ >>> events.publish('a.topic', msg)
+
+ Subscribe to get all messages on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event)
+
+ Subscribe to get messages matching predicate on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
+
+ Use a DeferredQueue to buffer incoming messages
+ >>> queue = DeferredQueue()
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
+
+ """
+ def __init__(self, bus=None):
+ """
+ Obtain a client interface for the pub/sub event bus.
+ :param bus: An optional specific event bus. Inteded for mainly test
+ use. If not provided, the process default bus will be used, which is
+ the preferred use (a process shall not need more than one bus).
+ """
+ self.bus = bus or default_bus
+
+ def publish(self, topic, msg):
+ """
+ Publish given msg to given topic.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+ self.bus.publish(topic, msg)
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function with signature
+ def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ return self.bus.subscribe(topic, callback, predicate)
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ return self.bus.unsubscribe(subscription)
+
+ def list_subscribers(self, topic=None):
+ """
+ Return list of subscribers. If topci is provided, it is filtered for
+ those subscribing to the topic.
+ :param topic: Optional topic
+ :return: List of subscriptions
+ """
+ return self.bus.list_subscribers(topic)