VOL-1451 Initial checkin of openonu build
Produced docker container capable of building and running
openonu/brcm_openonci_onu. Copied over current onu code
and resolved all imports by copying into the local source tree.
Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/common/event_bus.py b/python/common/event_bus.py
new file mode 100644
index 0000000..e717c16
--- /dev/null
+++ b/python/common/event_bus.py
@@ -0,0 +1,194 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple internal pub/sub event bus with topics and filter-based registration.
+"""
+import re
+
+import structlog
+
+
+log = structlog.get_logger()
+
+
+class _Subscription(object):
+
+ __slots__ = ('bus', 'predicate', 'callback', 'topic')
+ def __init__(self, bus, predicate, callback, topic=None):
+ self.bus = bus
+ self.predicate = predicate
+ self.callback = callback
+ self.topic = topic
+
+
+class EventBus(object):
+
+ def __init__(self):
+ self.subscriptions = {} # topic -> list of _Subscription objects
+ # topic None holds regexp based topic subs.
+ self.subs_topic_map = {} # to aid fast lookup when unsubscribing
+
+ def list_subscribers(self, topic=None):
+ if topic is None:
+ return sum(self.subscriptions.itervalues(), [])
+ else:
+ if topic in self.subscriptions:
+ return self.subscriptions[topic]
+ else:
+ return []
+
+ @staticmethod
+ def _get_topic_key(topic):
+ if isinstance(topic, str):
+ return topic
+ elif hasattr(topic, 'match'):
+ return None
+ else:
+ raise AttributeError('topic not a string nor a compiled regex')
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function signature def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ subscription = _Subscription(self, predicate, callback, topic)
+ topic_key = self._get_topic_key(topic)
+ self.subscriptions.setdefault(topic_key, []).append(subscription)
+ self.subs_topic_map[subscription] = topic_key
+ return subscription
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ topic_key = self.subs_topic_map[subscription]
+ self.subscriptions[topic_key].remove(subscription)
+
+ def publish(self, topic, msg):
+ """
+ Publish given message to all subscribers registered with topic taking
+ the predicate functions into account.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+ from copy import copy
+
+ def passes(msg, predicate):
+ try:
+ return predicate(msg)
+ except Exception, e:
+ return False # failed predicate function treated as no match
+
+ # lookup subscribers with explicit topic subscriptions
+ subscribers = self.subscriptions.get(topic, [])
+
+ # add matching regexp topic subscribers
+ subscribers.extend(s for s in self.subscriptions.get(None, [])
+ if s.topic.match(topic))
+
+ # iterate over a shallow-copy of subscribers
+ for candidate in copy(subscribers):
+ predicate = candidate.predicate
+ if predicate is None or passes(msg, predicate):
+ try:
+ candidate.callback(topic, msg)
+ except Exception, e:
+ log.exception('callback-failed', e=repr(e), topic=topic)
+
+
+
+default_bus = EventBus()
+
+
+class EventBusClient(object):
+ """
+ Primary interface to the EventBus. Usage:
+
+ Publish:
+ >>> events = EventBusClient()
+ >>> msg = dict(a=1, b='foo')
+ >>> events.publish('a.topic', msg)
+
+ Subscribe to get all messages on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event)
+
+ Subscribe to get messages matching predicate on specific topic:
+ >>> def got_event(topic, msg):
+ >>> print topic, ':', msg
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
+
+ Use a DeferredQueue to buffer incoming messages
+ >>> queue = DeferredQueue()
+ >>> events = EventBusClient()
+ >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
+
+ """
+ def __init__(self, bus=None):
+ """
+ Obtain a client interface for the pub/sub event bus.
+ :param bus: An optional specific event bus. Inteded for mainly test
+ use. If not provided, the process default bus will be used, which is
+ the preferred use (a process shall not need more than one bus).
+ """
+ self.bus = bus or default_bus
+
+ def publish(self, topic, msg):
+ """
+ Publish given msg to given topic.
+ :param topic: String topic
+ :param msg: Arbitrary python data as message
+ :return: None
+ """
+ self.bus.publish(topic, msg)
+
+ def subscribe(self, topic, callback, predicate=None):
+ """
+ Subscribe to given topic with predicate and register the callback
+ :param topic: String topic (explicit) or regexp based topic filter.
+ :param callback: Callback method with signature def func(topic, msg)
+ :param predicate: Optional method/function with signature
+ def predicate(msg)
+ :return: Subscription object which can be used to unsubscribe
+ """
+ return self.bus.subscribe(topic, callback, predicate)
+
+ def unsubscribe(self, subscription):
+ """
+ Remove given subscription
+ :param subscription: subscription object as was returned by subscribe
+ :return: None
+ """
+ return self.bus.unsubscribe(subscription)
+
+ def list_subscribers(self, topic=None):
+ """
+ Return list of subscribers. If topci is provided, it is filtered for
+ those subscribing to the topic.
+ :param topic: Optional topic
+ :return: List of subscriptions
+ """
+ return self.bus.list_subscribers(topic)