Internal universal pub/sub event bus

Change-Id: I0f295727add5675bef292f5fd764ecc766239107
diff --git a/tests/utests/voltha/core/__init__.py b/tests/utests/voltha/core/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/utests/voltha/core/__init__.py
diff --git a/tests/utests/voltha/core/test_event_bus.py b/tests/utests/voltha/core/test_event_bus.py
new file mode 100644
index 0000000..f937722
--- /dev/null
+++ b/tests/utests/voltha/core/test_event_bus.py
@@ -0,0 +1,197 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import re
+
+from mock import Mock
+from mock import call
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.trial.unittest import TestCase
+
+from voltha.core.event_bus import EventBusClient, EventBus
+
+
+class TestEventBus(TestCase):
+
+    def test_subscribe(self):
+
+        ebc = EventBusClient()
+        sub = ebc.subscribe('news', lambda msg, topic: None)
+        self.assertEqual(len(ebc.list_subscribers()), 1)
+        self.assertEqual(len(ebc.list_subscribers('news')), 1)
+        self.assertEqual(len(ebc.list_subscribers('other')), 0)
+
+    def test_unsubscribe(self):
+
+        ebc = EventBusClient(EventBus())
+        sub = ebc.subscribe('news', lambda msg, topic: None)
+        ebc.unsubscribe(sub)
+        self.assertEqual(ebc.list_subscribers(), [])
+        self.assertEqual(ebc.list_subscribers('news'), [])
+
+    def test_simple_publish(self):
+
+        ebc = EventBusClient(EventBus())
+
+        mock = Mock()
+        ebc.subscribe('news', mock)
+
+        ebc.publish('news', 'message')
+
+        self.assertEqual(mock.call_count, 1)
+        mock.assert_called_with('news', 'message')
+
+    def test_topic_filtering(self):
+
+        ebc = EventBusClient(EventBus())
+
+        mock = Mock()
+        ebc.subscribe('news', mock)
+
+        ebc.publish('news', 'msg1')
+        ebc.publish('alerts', 'msg2')
+        ebc.publish('logs', 'msg3')
+
+        self.assertEqual(mock.call_count, 1)
+        mock.assert_called_with('news', 'msg1')
+
+    def test_multiple_subscribers(self):
+
+        ebc = EventBusClient(EventBus())
+
+        mock1 = Mock()
+        ebc.subscribe('news', mock1)
+
+        mock2 = Mock()
+        ebc.subscribe('alerts', mock2)
+
+        mock3 = Mock()
+        ebc.subscribe('logs', mock3)
+
+        mock4 = Mock()
+        ebc.subscribe('logs', mock4)
+
+        ebc.publish('news', 'msg1')
+        ebc.publish('alerts', 'msg2')
+        ebc.publish('logs', 'msg3')
+
+        self.assertEqual(mock1.call_count, 1)
+        mock1.assert_called_with('news', 'msg1')
+
+        self.assertEqual(mock2.call_count, 1)
+        mock2.assert_called_with('alerts', 'msg2')
+
+        self.assertEqual(mock3.call_count, 1)
+        mock3.assert_called_with('logs', 'msg3')
+
+        self.assertEqual(mock4.call_count, 1)
+        mock4.assert_called_with('logs', 'msg3')
+
+    def test_predicates(self):
+
+        ebc = EventBusClient(EventBus())
+
+        get_foos = Mock()
+        ebc.subscribe('', get_foos, lambda msg: msg.startswith('foo'))
+
+        get_bars = Mock()
+        ebc.subscribe('', get_bars, lambda msg: msg.endswith('bar'))
+
+        get_all = Mock()
+        ebc.subscribe('', get_all)
+
+        get_none = Mock()
+        ebc.subscribe('', get_none, lambda msg: msg.find('zoo') >= 0)
+
+        errored = Mock()
+        ebc.subscribe('', errored, lambda msg: 1/0)
+
+        ebc.publish('', 'foo')
+        ebc.publish('', 'foobar')
+        ebc.publish('', 'bar')
+
+        c = call
+
+        self.assertEqual(get_foos.call_count, 2)
+        get_foos.assert_has_calls([c('', 'foo'), c('', 'foobar')])
+
+        self.assertEqual(get_bars.call_count, 2)
+        get_bars.assert_has_calls([c('', 'foobar'), c('', 'bar')])
+
+        self.assertEqual(get_all.call_count, 3)
+        get_all.assert_has_calls([c('', 'foo'), c('', 'foobar'), c('', 'bar')])
+
+        get_none.assert_not_called()
+
+        errored.assert_not_called()
+
+    def test_wildcard_topic(self):
+
+        ebc = EventBusClient(EventBus())
+        subs = []
+
+        wildcard_sub = Mock()
+        subs.append(ebc.subscribe(re.compile(r'.*'), wildcard_sub))
+
+        prefix_sub = Mock()
+        subs.append(ebc.subscribe(re.compile(r'ham.*'), prefix_sub))
+
+        contains_sub = Mock()
+        subs.append(ebc.subscribe(re.compile(r'.*burg.*'), contains_sub))
+
+        ebc.publish('news', 1)
+        ebc.publish('hamsters', 2)
+        ebc.publish('hamburgers', 3)
+        ebc.publish('nonsense', 4)
+
+        c = call
+
+        self.assertEqual(wildcard_sub.call_count, 4)
+        wildcard_sub.assert_has_calls([
+            c('news', 1),
+            c('hamsters', 2),
+            c('hamburgers', 3),
+            c('nonsense', 4)])
+
+        self.assertEqual(prefix_sub.call_count, 2)
+        prefix_sub.assert_has_calls([
+            c('hamsters', 2),
+            c('hamburgers', 3)])
+
+        self.assertEqual(contains_sub.call_count, 1)
+        contains_sub.assert_has_calls([c('hamburgers', 3)])
+
+        for sub in subs:
+            ebc.unsubscribe(sub)
+
+        self.assertEqual(ebc.list_subscribers(), [])
+
+    @inlineCallbacks
+    def test_deferred_queue_receiver(self):
+
+        ebc = EventBus()
+
+        queue = DeferredQueue()
+
+        ebc.subscribe('', lambda _, msg: queue.put(msg))
+
+        for i in xrange(10):
+            ebc.publish('', i)
+
+        self.assertEqual(len(queue.pending), 10)
+        for i in xrange(10):
+            msg = yield queue.get()
+            self.assertEqual(msg, i)
+        self.assertEqual(len(queue.pending), 0)
diff --git a/voltha/core/core.py b/voltha/core/core.py
new file mode 100644
index 0000000..14ee432
--- /dev/null
+++ b/voltha/core/core.py
@@ -0,0 +1,42 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Voltha's CORE components.
+"""
+import structlog
+
+log = structlog.get_logger()
+
+
+class VolthaCore(object):
+
+    def __init__(self):
+
+        self.stopped = False
+
+    def start(self):
+        log.debug('starting')
+        pass
+        log.info('started')
+
+    def stop(self):
+        log.debug('stopping')
+        self.stopped = True
+        log.info('stopped')
+
+    # TODO
+
diff --git a/voltha/core/event_bus.py b/voltha/core/event_bus.py
new file mode 100644
index 0000000..47e0dcd
--- /dev/null
+++ b/voltha/core/event_bus.py
@@ -0,0 +1,191 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple internal pub/sub event bus with topics and filter-based registration.
+"""
+import re
+
+import structlog
+
+
+log = structlog.get_logger()
+
+
+class _Subscription(object):
+
+    __slots__ = ('bus', 'predicate', 'callback', 'topic')
+    def __init__(self, bus, predicate, callback, topic=None):
+        self.bus = bus
+        self.predicate = predicate
+        self.callback = callback
+        self.topic = topic
+
+
+class EventBus(object):
+
+    def __init__(self):
+        self.subscriptions = {}  # topic -> list of _Subscription objects
+                                 # topic None holds regexp based topic subs.
+        self.subs_topic_map = {} # to aid fast lookup when unsubscribing
+
+    def list_subscribers(self, topic=None):
+        if topic is None:
+            return sum(self.subscriptions.itervalues(), [])
+        else:
+            if topic in self.subscriptions:
+                return self.subscriptions[topic]
+            else:
+                return []
+
+    @staticmethod
+    def _get_topic_key(topic):
+        if isinstance(topic, str):
+            return topic
+        elif hasattr(topic, 'match'):
+            return None
+        else:
+            raise AttributeError('topic not a string nor a compiled regex')
+
+    def subscribe(self, topic, callback, predicate=None):
+        """
+        Subscribe to given topic with predicate and register the callback
+        :param topic: String topic (explicit) or regexp based topic filter.
+        :param callback: Callback method with signature def func(topic, msg)
+        :param predicate: Optional method/function signature def predicate(msg)
+        :return: Subscription object which can be used to unsubscribe
+        """
+        subscription = _Subscription(self, predicate, callback, topic)
+        topic_key = self._get_topic_key(topic)
+        self.subscriptions.setdefault(topic_key, []).append(subscription)
+        self.subs_topic_map[subscription] = topic_key
+        return subscription
+
+    def unsubscribe(self, subscription):
+        """
+        Remove given subscription
+        :param subscription: subscription object as was returned by subscribe
+        :return: None
+        """
+        topic_key = self.subs_topic_map[subscription]
+        self.subscriptions[topic_key].remove(subscription)
+
+    def publish(self, topic, msg):
+        """
+        Publish given message to all subscribers registered with topic taking
+        the predicate functions into account.
+        :param topic: String topic
+        :param msg: Arbitrary python data as message
+        :return: None
+        """
+
+        def passes(msg, predicate):
+            try:
+                return predicate(msg)
+            except Exception, e:
+                return False  # failed predicate function treated as no match
+
+        # lookup subscribers with explicit topic subscriptions
+        subscribers = self.subscriptions.get(topic, [])
+
+        # add matching regexp topic subscribers
+        subscribers.extend(s for s in self.subscriptions.get(None, [])
+                           if s.topic.match(topic))
+
+        for candidate in subscribers:
+            predicate = candidate.predicate
+            if predicate is None or passes(msg, predicate):
+                try:
+                    candidate.callback(topic, msg)
+                except Exception, e:
+                    log.warning('callback-failed', e=e, topic=topic)
+
+
+default_bus = EventBus()
+
+
+class EventBusClient(object):
+    """
+    Primary interface to the EventBus. Usage:
+
+    Publish:
+    >>> events = EventBusClient()
+    >>> msg = dict(a=1, b='foo')
+    >>> events.publish('a.topic', msg)
+
+    Subscribe to get all messages on specific topic:
+    >>> def got_event(topic, msg):
+    >>>     print topic, ':', msg
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', got_event)
+
+    Subscribe to get messages matching predicate on specific topic:
+    >>> def got_event(topic, msg):
+    >>>     print topic, ':', msg
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
+
+    Use a DeferredQueue to buffer incoming messages
+    >>> queue = DeferredQueue()
+    >>> events = EventBusClient()
+    >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
+
+    """
+    def __init__(self, bus=None):
+        """
+        Obtain a client interface for the pub/sub event bus.
+        :param bus: An optional specific event bus. Inteded for mainly test
+        use. If not provided, the process default bus will be used, which is
+        the preferred use (a process shall not need more than one bus).
+        """
+        self.bus = bus or default_bus
+
+    def publish(self, topic, msg):
+        """
+        Publish given msg to given topic.
+        :param topic: String topic
+        :param msg: Arbitrary python data as message
+        :return: None
+        """
+        self.bus.publish(topic, msg)
+
+    def subscribe(self, topic, callback, predicate=None):
+        """
+        Subscribe to given topic with predicate and register the callback
+        :param topic: String topic (explicit) or regexp based topic filter.
+        :param callback: Callback method with signature def func(topic, msg)
+        :param predicate: Optional method/function with signature
+        def predicate(msg)
+        :return: Subscription object which can be used to unsubscribe
+        """
+        return self.bus.subscribe(topic, callback, predicate)
+
+    def unsubscribe(self, subscription):
+        """
+        Remove given subscription
+        :param subscription: subscription object as was returned by subscribe
+        :return: None
+        """
+        return self.bus.unsubscribe(subscription)
+
+    def list_subscribers(self, topic=None):
+        """
+        Return list of subscribers. If topci is provided, it is filtered for
+        those subscribing to the topic.
+        :param topic: Optional topic
+        :return: List of subscriptions
+        """
+        return self.bus.list_subscribers(topic)