blob: e717c1652ab2f8444397cc8fee5ca42ec2493356 [file] [log] [blame]
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18A simple internal pub/sub event bus with topics and filter-based registration.
19"""
20import re
21
22import structlog
23
24
25log = structlog.get_logger()
26
27
28class _Subscription(object):
29
30 __slots__ = ('bus', 'predicate', 'callback', 'topic')
31 def __init__(self, bus, predicate, callback, topic=None):
32 self.bus = bus
33 self.predicate = predicate
34 self.callback = callback
35 self.topic = topic
36
37
38class EventBus(object):
39
40 def __init__(self):
41 self.subscriptions = {} # topic -> list of _Subscription objects
42 # topic None holds regexp based topic subs.
43 self.subs_topic_map = {} # to aid fast lookup when unsubscribing
44
45 def list_subscribers(self, topic=None):
46 if topic is None:
47 return sum(self.subscriptions.itervalues(), [])
48 else:
49 if topic in self.subscriptions:
50 return self.subscriptions[topic]
51 else:
52 return []
53
54 @staticmethod
55 def _get_topic_key(topic):
56 if isinstance(topic, str):
57 return topic
58 elif hasattr(topic, 'match'):
59 return None
60 else:
61 raise AttributeError('topic not a string nor a compiled regex')
62
63 def subscribe(self, topic, callback, predicate=None):
64 """
65 Subscribe to given topic with predicate and register the callback
66 :param topic: String topic (explicit) or regexp based topic filter.
67 :param callback: Callback method with signature def func(topic, msg)
68 :param predicate: Optional method/function signature def predicate(msg)
69 :return: Subscription object which can be used to unsubscribe
70 """
71 subscription = _Subscription(self, predicate, callback, topic)
72 topic_key = self._get_topic_key(topic)
73 self.subscriptions.setdefault(topic_key, []).append(subscription)
74 self.subs_topic_map[subscription] = topic_key
75 return subscription
76
77 def unsubscribe(self, subscription):
78 """
79 Remove given subscription
80 :param subscription: subscription object as was returned by subscribe
81 :return: None
82 """
83 topic_key = self.subs_topic_map[subscription]
84 self.subscriptions[topic_key].remove(subscription)
85
86 def publish(self, topic, msg):
87 """
88 Publish given message to all subscribers registered with topic taking
89 the predicate functions into account.
90 :param topic: String topic
91 :param msg: Arbitrary python data as message
92 :return: None
93 """
Chip Boling170630c2018-06-11 12:03:55 -050094 from copy import copy
Zsolt Haraszti0df86c12016-11-03 23:03:35 -070095
96 def passes(msg, predicate):
97 try:
98 return predicate(msg)
99 except Exception, e:
100 return False # failed predicate function treated as no match
101
102 # lookup subscribers with explicit topic subscriptions
103 subscribers = self.subscriptions.get(topic, [])
104
105 # add matching regexp topic subscribers
106 subscribers.extend(s for s in self.subscriptions.get(None, [])
107 if s.topic.match(topic))
108
Chip Boling170630c2018-06-11 12:03:55 -0500109 # iterate over a shallow-copy of subscribers
110 for candidate in copy(subscribers):
Zsolt Haraszti0df86c12016-11-03 23:03:35 -0700111 predicate = candidate.predicate
112 if predicate is None or passes(msg, predicate):
113 try:
114 candidate.callback(topic, msg)
115 except Exception, e:
khenaidoo032d3302017-06-09 14:50:04 -0400116 log.exception('callback-failed', e=repr(e), topic=topic)
117
Zsolt Haraszti0df86c12016-11-03 23:03:35 -0700118
119
120default_bus = EventBus()
121
122
123class EventBusClient(object):
124 """
125 Primary interface to the EventBus. Usage:
126
127 Publish:
128 >>> events = EventBusClient()
129 >>> msg = dict(a=1, b='foo')
130 >>> events.publish('a.topic', msg)
131
132 Subscribe to get all messages on specific topic:
133 >>> def got_event(topic, msg):
134 >>> print topic, ':', msg
135 >>> events = EventBusClient()
136 >>> events.subscribe('a.topic', got_event)
137
138 Subscribe to get messages matching predicate on specific topic:
139 >>> def got_event(topic, msg):
140 >>> print topic, ':', msg
141 >>> events = EventBusClient()
142 >>> events.subscribe('a.topic', got_event, lambda msg: msg.len() < 100)
143
144 Use a DeferredQueue to buffer incoming messages
145 >>> queue = DeferredQueue()
146 >>> events = EventBusClient()
147 >>> events.subscribe('a.topic', lambda _, msg: queue.put(msg))
148
149 """
150 def __init__(self, bus=None):
151 """
152 Obtain a client interface for the pub/sub event bus.
153 :param bus: An optional specific event bus. Inteded for mainly test
154 use. If not provided, the process default bus will be used, which is
155 the preferred use (a process shall not need more than one bus).
156 """
157 self.bus = bus or default_bus
158
159 def publish(self, topic, msg):
160 """
161 Publish given msg to given topic.
162 :param topic: String topic
163 :param msg: Arbitrary python data as message
164 :return: None
165 """
166 self.bus.publish(topic, msg)
167
168 def subscribe(self, topic, callback, predicate=None):
169 """
170 Subscribe to given topic with predicate and register the callback
171 :param topic: String topic (explicit) or regexp based topic filter.
172 :param callback: Callback method with signature def func(topic, msg)
173 :param predicate: Optional method/function with signature
174 def predicate(msg)
175 :return: Subscription object which can be used to unsubscribe
176 """
177 return self.bus.subscribe(topic, callback, predicate)
178
179 def unsubscribe(self, subscription):
180 """
181 Remove given subscription
182 :param subscription: subscription object as was returned by subscribe
183 :return: None
184 """
185 return self.bus.unsubscribe(subscription)
186
187 def list_subscribers(self, topic=None):
188 """
189 Return list of subscribers. If topci is provided, it is filtered for
190 those subscribing to the topic.
191 :param topic: Optional topic
192 :return: List of subscriptions
193 """
194 return self.bus.list_subscribers(topic)