blob: d572e62dbc0d550b599c9d08074561f6c81d7298 [file] [log] [blame]
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A simple internal pub/sub event bus with topics and filter-based registration.
19"""
20import re
21
22import structlog
23
24
25log = structlog.get_logger()
26
27
28class _Subscription(object):
29
30 __slots__ = ('bus', 'predicate', 'callback', 'topic')
31 def __init__(self, bus, predicate, callback, topic=None):
32 self.bus = bus
33 self.predicate = predicate
34 self.callback = callback
35 self.topic = topic
36
37
38class EventBus(object):
39
40 def __init__(self):
41 self.subscriptions = {} # topic -> list of _Subscription objects
42 # topic None holds regexp based topic subs.
43 self.subs_topic_map = {} # to aid fast lookup when unsubscribing
44
45 def list_subscribers(self, topic=None):
46 if topic is None:
47 return sum(self.subscriptions.itervalues(), [])
48 else:
49 if topic in self.subscriptions:
50 return self.subscriptions[topic]
51 else:
52 return []
53
54 @staticmethod
55 def _get_topic_key(topic):
56 if isinstance(topic, str):
57 return topic
58 elif hasattr(topic, 'match'):
59 return None
60 else:
61 raise AttributeError('topic not a string nor a compiled regex')
62
63 def subscribe(self, topic, callback, predicate=None):
64 """
65 Subscribe to given topic with predicate and register the callback
66 :param topic: String topic (explicit) or regexp based topic filter.
67 :param callback: Callback method with signature def func(topic, msg)
68 :param predicate: Optional method/function signature def predicate(msg)
69 :return: Subscription object which can be used to unsubscribe
70 """
71 subscription = _Subscription(self, predicate, callback, topic)
72 topic_key = self._get_topic_key(topic)
73 self.subscriptions.setdefault(topic_key, []).append(subscription)
74 self.subs_topic_map[subscription] = topic_key
75 return subscription
76
77 def unsubscribe(self, subscription):
78 """
79 Remove given subscription
80 :param subscription: subscription object as was returned by subscribe
81 :return: None
82 """
83 topic_key = self.subs_topic_map[subscription]
84 self.subscriptions[topic_key].remove(subscription)
85
86 def publish(self, topic, msg):
87 """
88 Publish given message to all subscribers registered with topic taking
89 the predicate functions into account.
90 :param topic: String topic
91 :param msg: Arbitrary python data as message
92 :return: None
93 """
94
95 def passes(msg, predicate):
96 try:
97 return predicate(msg)
98 except Exception, e:
99 return False # failed predicate function treated as no match
100
101 # lookup subscribers with explicit topic subscriptions
102 subscribers = self.subscriptions.get(topic, [])
103
104 # add matching regexp topic subscribers
105 subscribers.extend(s for s in self.subscriptions.get(None, [])
106 if s.topic.match(topic))
107
108 for candidate in subscribers:
109 predicate = candidate.predicate
110 if predicate is None or passes(msg, predicate):
111 try:
112 candidate.callback(topic, msg)
113 except Exception, e:
alshabib1ef322b2017-03-16 10:39:59 +0100114 log.warning('callback-failed', e=repr(e), topic=topic)
Zsolt Haraszti0df86c12016-11-03 23:03:35 -0700115
116
117default_bus = EventBus()
118
119
120class EventBusClient(object):
121 """
122 Primary interface to the EventBus. Usage:
123
124 Publish:
125 >>> events = EventBusClient()
126 >>> msg = dict(a=1, b='foo')
127 >>> events.publish('a.topic', msg)
128
129 Subscribe to get all messages on specific topic:
130 >>> def got_event(topic, msg):
131 >>> print topic, ':', msg
132 >>> events = EventBusClient()
133 >>> events.subscribe('a.topic', got_event)
134
135 Subscribe to get messages matching predicate on specific topic:
136 >>> def got_event(topic, msg):
137 >>> print topic, ':', msg
138 >>> events = EventBusClient()
139 >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
140
141 Use a DeferredQueue to buffer incoming messages
142 >>> queue = DeferredQueue()
143 >>> events = EventBusClient()
144 >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
145
146 """
147 def __init__(self, bus=None):
148 """
149 Obtain a client interface for the pub/sub event bus.
150 :param bus: An optional specific event bus. Inteded for mainly test
151 use. If not provided, the process default bus will be used, which is
152 the preferred use (a process shall not need more than one bus).
153 """
154 self.bus = bus or default_bus
155
156 def publish(self, topic, msg):
157 """
158 Publish given msg to given topic.
159 :param topic: String topic
160 :param msg: Arbitrary python data as message
161 :return: None
162 """
163 self.bus.publish(topic, msg)
164
165 def subscribe(self, topic, callback, predicate=None):
166 """
167 Subscribe to given topic with predicate and register the callback
168 :param topic: String topic (explicit) or regexp based topic filter.
169 :param callback: Callback method with signature def func(topic, msg)
170 :param predicate: Optional method/function with signature
171 def predicate(msg)
172 :return: Subscription object which can be used to unsubscribe
173 """
174 return self.bus.subscribe(topic, callback, predicate)
175
176 def unsubscribe(self, subscription):
177 """
178 Remove given subscription
179 :param subscription: subscription object as was returned by subscribe
180 :return: None
181 """
182 return self.bus.unsubscribe(subscription)
183
184 def list_subscribers(self, topic=None):
185 """
186 Return list of subscribers. If topci is provided, it is filtered for
187 those subscribing to the topic.
188 :param topic: Optional topic
189 :return: List of subscriptions
190 """
191 return self.bus.list_subscribers(topic)