blob: 8c903d9cb2dfdbbba28533af73b6407538b09c1a [file] [log] [blame]
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A simple internal pub/sub event bus with topics and filter-based registration.
19"""
20import re
21
22import structlog
23
24
25log = structlog.get_logger()
26
27
28class _Subscription(object):
29
30 __slots__ = ('bus', 'predicate', 'callback', 'topic')
31 def __init__(self, bus, predicate, callback, topic=None):
32 self.bus = bus
33 self.predicate = predicate
34 self.callback = callback
35 self.topic = topic
36
37
38class EventBus(object):
39
40 def __init__(self):
41 self.subscriptions = {} # topic -> list of _Subscription objects
42 # topic None holds regexp based topic subs.
43 self.subs_topic_map = {} # to aid fast lookup when unsubscribing
44
45 def list_subscribers(self, topic=None):
46 if topic is None:
47 return sum(self.subscriptions.itervalues(), [])
48 else:
49 if topic in self.subscriptions:
50 return self.subscriptions[topic]
51 else:
52 return []
53
54 @staticmethod
55 def _get_topic_key(topic):
56 if isinstance(topic, str):
57 return topic
58 elif hasattr(topic, 'match'):
59 return None
60 else:
61 raise AttributeError('topic not a string nor a compiled regex')
62
63 def subscribe(self, topic, callback, predicate=None):
64 """
65 Subscribe to given topic with predicate and register the callback
66 :param topic: String topic (explicit) or regexp based topic filter.
67 :param callback: Callback method with signature def func(topic, msg)
68 :param predicate: Optional method/function signature def predicate(msg)
69 :return: Subscription object which can be used to unsubscribe
70 """
71 subscription = _Subscription(self, predicate, callback, topic)
72 topic_key = self._get_topic_key(topic)
73 self.subscriptions.setdefault(topic_key, []).append(subscription)
74 self.subs_topic_map[subscription] = topic_key
75 return subscription
76
77 def unsubscribe(self, subscription):
78 """
79 Remove given subscription
80 :param subscription: subscription object as was returned by subscribe
81 :return: None
82 """
83 topic_key = self.subs_topic_map[subscription]
84 self.subscriptions[topic_key].remove(subscription)
85
86 def publish(self, topic, msg):
87 """
88 Publish given message to all subscribers registered with topic taking
89 the predicate functions into account.
90 :param topic: String topic
91 :param msg: Arbitrary python data as message
92 :return: None
93 """
94
95 def passes(msg, predicate):
96 try:
97 return predicate(msg)
98 except Exception, e:
99 return False # failed predicate function treated as no match
100
101 # lookup subscribers with explicit topic subscriptions
102 subscribers = self.subscriptions.get(topic, [])
103
104 # add matching regexp topic subscribers
105 subscribers.extend(s for s in self.subscriptions.get(None, [])
106 if s.topic.match(topic))
107
108 for candidate in subscribers:
109 predicate = candidate.predicate
110 if predicate is None or passes(msg, predicate):
111 try:
112 candidate.callback(topic, msg)
113 except Exception, e:
khenaidoo032d3302017-06-09 14:50:04 -0400114 log.exception('callback-failed', e=repr(e), topic=topic)
115
Zsolt Haraszti0df86c12016-11-03 23:03:35 -0700116
117
118default_bus = EventBus()
119
120
121class EventBusClient(object):
122 """
123 Primary interface to the EventBus. Usage:
124
125 Publish:
126 >>> events = EventBusClient()
127 >>> msg = dict(a=1, b='foo')
128 >>> events.publish('a.topic', msg)
129
130 Subscribe to get all messages on specific topic:
131 >>> def got_event(topic, msg):
132 >>> print topic, ':', msg
133 >>> events = EventBusClient()
134 >>> events.subscribe('a.topic', got_event)
135
136 Subscribe to get messages matching predicate on specific topic:
137 >>> def got_event(topic, msg):
138 >>> print topic, ':', msg
139 >>> events = EventBusClient()
140 >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
141
142 Use a DeferredQueue to buffer incoming messages
143 >>> queue = DeferredQueue()
144 >>> events = EventBusClient()
145 >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
146
147 """
148 def __init__(self, bus=None):
149 """
150 Obtain a client interface for the pub/sub event bus.
151 :param bus: An optional specific event bus. Inteded for mainly test
152 use. If not provided, the process default bus will be used, which is
153 the preferred use (a process shall not need more than one bus).
154 """
155 self.bus = bus or default_bus
156
157 def publish(self, topic, msg):
158 """
159 Publish given msg to given topic.
160 :param topic: String topic
161 :param msg: Arbitrary python data as message
162 :return: None
163 """
164 self.bus.publish(topic, msg)
165
166 def subscribe(self, topic, callback, predicate=None):
167 """
168 Subscribe to given topic with predicate and register the callback
169 :param topic: String topic (explicit) or regexp based topic filter.
170 :param callback: Callback method with signature def func(topic, msg)
171 :param predicate: Optional method/function with signature
172 def predicate(msg)
173 :return: Subscription object which can be used to unsubscribe
174 """
175 return self.bus.subscribe(topic, callback, predicate)
176
177 def unsubscribe(self, subscription):
178 """
179 Remove given subscription
180 :param subscription: subscription object as was returned by subscribe
181 :return: None
182 """
183 return self.bus.unsubscribe(subscription)
184
185 def list_subscribers(self, topic=None):
186 """
187 Return list of subscribers. If topci is provided, it is filtered for
188 those subscribing to the topic.
189 :param topic: Optional topic
190 :return: List of subscriptions
191 """
192 return self.bus.list_subscribers(topic)