| # |
| # Copyright 2017 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| A simple internal pub/sub event bus with topics and filter-based registration. |
| """ |
| from __future__ import absolute_import |
| import re |
| |
| import structlog |
| import six |
| |
| |
| log = structlog.get_logger() |
| |
| |
| class _Subscription(object): |
| |
| __slots__ = ('bus', 'predicate', 'callback', 'topic') |
| def __init__(self, bus, predicate, callback, topic=None): |
| self.bus = bus |
| self.predicate = predicate |
| self.callback = callback |
| self.topic = topic |
| |
| |
| class EventBus(object): |
| |
| def __init__(self): |
| self.subscriptions = {} # topic -> list of _Subscription objects |
| # topic None holds regexp based topic subs. |
| self.subs_topic_map = {} # to aid fast lookup when unsubscribing |
| |
| def list_subscribers(self, topic=None): |
| if topic is None: |
| return sum(six.itervalues(self.subscriptions), []) |
| else: |
| if topic in self.subscriptions: |
| return self.subscriptions[topic] |
| else: |
| return [] |
| |
| @staticmethod |
| def _get_topic_key(topic): |
| if isinstance(topic, str): |
| return topic |
| elif hasattr(topic, 'match'): |
| return None |
| else: |
| raise AttributeError('topic not a string nor a compiled regex') |
| |
| def subscribe(self, topic, callback, predicate=None): |
| """ |
| Subscribe to given topic with predicate and register the callback |
| :param topic: String topic (explicit) or regexp based topic filter. |
| :param callback: Callback method with signature def func(topic, msg) |
| :param predicate: Optional method/function signature def predicate(msg) |
| :return: Subscription object which can be used to unsubscribe |
| """ |
| subscription = _Subscription(self, predicate, callback, topic) |
| topic_key = self._get_topic_key(topic) |
| self.subscriptions.setdefault(topic_key, []).append(subscription) |
| self.subs_topic_map[subscription] = topic_key |
| return subscription |
| |
| def unsubscribe(self, subscription): |
| """ |
| Remove given subscription |
| :param subscription: subscription object as was returned by subscribe |
| :return: None |
| """ |
| topic_key = self.subs_topic_map[subscription] |
| self.subscriptions[topic_key].remove(subscription) |
| |
| def publish(self, topic, msg): |
| """ |
| Publish given message to all subscribers registered with topic taking |
| the predicate functions into account. |
| :param topic: String topic |
| :param msg: Arbitrary python data as message |
| :return: None |
| """ |
| from copy import copy |
| |
| def passes(msg, predicate): |
| try: |
| return predicate(msg) |
| except Exception as e: |
| return False # failed predicate function treated as no match |
| |
| # lookup subscribers with explicit topic subscriptions |
| subscribers = self.subscriptions.get(topic, []) |
| |
| # add matching regexp topic subscribers |
| subscribers.extend(s for s in self.subscriptions.get(None, []) |
| if s.topic.match(topic)) |
| |
| # iterate over a shallow-copy of subscribers |
| for candidate in copy(subscribers): |
| predicate = candidate.predicate |
| if predicate is None or passes(msg, predicate): |
| try: |
| candidate.callback(topic, msg) |
| except Exception as e: |
| log.exception('callback-failed', e=repr(e), topic=topic) |
| |
| |
| |
| default_bus = EventBus() |
| |
| |
| class EventBusClient(object): |
| """ |
| Primary interface to the EventBus. Usage: |
| |
| Publish: |
| >>> events = EventBusClient() |
| >>> msg = dict(a=1, b='foo') |
| >>> events.publish('a.topic', msg) |
| |
| Subscribe to get all messages on specific topic: |
| >>> def got_event(topic, msg): |
| >>> print topic, ':', msg |
| >>> events = EventBusClient() |
| >>> events.subscribe('a.topic', got_event) |
| |
| Subscribe to get messages matching predicate on specific topic: |
| >>> def got_event(topic, msg): |
| >>> print topic, ':', msg |
| >>> events = EventBusClient() |
| >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100) |
| |
| Use a DeferredQueue to buffer incoming messages |
| >>> queue = DeferredQueue() |
| >>> events = EventBusClient() |
| >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg)) |
| |
| """ |
| def __init__(self, bus=None): |
| """ |
| Obtain a client interface for the pub/sub event bus. |
| :param bus: An optional specific event bus. Inteded for mainly test |
| use. If not provided, the process default bus will be used, which is |
| the preferred use (a process shall not need more than one bus). |
| """ |
| self.bus = bus or default_bus |
| |
| def publish(self, topic, msg): |
| """ |
| Publish given msg to given topic. |
| :param topic: String topic |
| :param msg: Arbitrary python data as message |
| :return: None |
| """ |
| self.bus.publish(topic, msg) |
| |
| def subscribe(self, topic, callback, predicate=None): |
| """ |
| Subscribe to given topic with predicate and register the callback |
| :param topic: String topic (explicit) or regexp based topic filter. |
| :param callback: Callback method with signature def func(topic, msg) |
| :param predicate: Optional method/function with signature |
| def predicate(msg) |
| :return: Subscription object which can be used to unsubscribe |
| """ |
| return self.bus.subscribe(topic, callback, predicate) |
| |
| def unsubscribe(self, subscription): |
| """ |
| Remove given subscription |
| :param subscription: subscription object as was returned by subscribe |
| :return: None |
| """ |
| return self.bus.unsubscribe(subscription) |
| |
| def list_subscribers(self, topic=None): |
| """ |
| Return list of subscribers. If topci is provided, it is filtered for |
| those subscribing to the topic. |
| :param topic: Optional topic |
| :return: List of subscriptions |
| """ |
| return self.bus.list_subscribers(topic) |