blob: 532befb9d3a4da8bf44eeef70c79afd8a14303ff [file] [log] [blame]
Zsolt Haraszti0df86c12016-11-03 23:03:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17
18from mock import Mock
19from mock import call
20from twisted.internet.defer import DeferredQueue, inlineCallbacks
21from twisted.trial.unittest import TestCase
22
Zsolt Harasztid70cd4d2016-11-03 23:23:36 -070023from common.event_bus import EventBusClient, EventBus
Zsolt Haraszti0df86c12016-11-03 23:03:35 -070024
25
26class TestEventBus(TestCase):
27
28 def test_subscribe(self):
29
30 ebc = EventBusClient()
31 sub = ebc.subscribe('news', lambda msg, topic: None)
32 self.assertEqual(len(ebc.list_subscribers()), 1)
33 self.assertEqual(len(ebc.list_subscribers('news')), 1)
34 self.assertEqual(len(ebc.list_subscribers('other')), 0)
35
36 def test_unsubscribe(self):
37
38 ebc = EventBusClient(EventBus())
39 sub = ebc.subscribe('news', lambda msg, topic: None)
40 ebc.unsubscribe(sub)
41 self.assertEqual(ebc.list_subscribers(), [])
42 self.assertEqual(ebc.list_subscribers('news'), [])
43
44 def test_simple_publish(self):
45
46 ebc = EventBusClient(EventBus())
47
48 mock = Mock()
49 ebc.subscribe('news', mock)
50
51 ebc.publish('news', 'message')
52
53 self.assertEqual(mock.call_count, 1)
54 mock.assert_called_with('news', 'message')
55
56 def test_topic_filtering(self):
57
58 ebc = EventBusClient(EventBus())
59
60 mock = Mock()
61 ebc.subscribe('news', mock)
62
63 ebc.publish('news', 'msg1')
64 ebc.publish('alerts', 'msg2')
65 ebc.publish('logs', 'msg3')
66
67 self.assertEqual(mock.call_count, 1)
68 mock.assert_called_with('news', 'msg1')
69
70 def test_multiple_subscribers(self):
71
72 ebc = EventBusClient(EventBus())
73
74 mock1 = Mock()
75 ebc.subscribe('news', mock1)
76
77 mock2 = Mock()
78 ebc.subscribe('alerts', mock2)
79
80 mock3 = Mock()
81 ebc.subscribe('logs', mock3)
82
83 mock4 = Mock()
84 ebc.subscribe('logs', mock4)
85
86 ebc.publish('news', 'msg1')
87 ebc.publish('alerts', 'msg2')
88 ebc.publish('logs', 'msg3')
89
90 self.assertEqual(mock1.call_count, 1)
91 mock1.assert_called_with('news', 'msg1')
92
93 self.assertEqual(mock2.call_count, 1)
94 mock2.assert_called_with('alerts', 'msg2')
95
96 self.assertEqual(mock3.call_count, 1)
97 mock3.assert_called_with('logs', 'msg3')
98
99 self.assertEqual(mock4.call_count, 1)
100 mock4.assert_called_with('logs', 'msg3')
101
102 def test_predicates(self):
103
104 ebc = EventBusClient(EventBus())
105
106 get_foos = Mock()
107 ebc.subscribe('', get_foos, lambda msg: msg.startswith('foo'))
108
109 get_bars = Mock()
110 ebc.subscribe('', get_bars, lambda msg: msg.endswith('bar'))
111
112 get_all = Mock()
113 ebc.subscribe('', get_all)
114
115 get_none = Mock()
116 ebc.subscribe('', get_none, lambda msg: msg.find('zoo') >= 0)
117
118 errored = Mock()
119 ebc.subscribe('', errored, lambda msg: 1/0)
120
121 ebc.publish('', 'foo')
122 ebc.publish('', 'foobar')
123 ebc.publish('', 'bar')
124
125 c = call
126
127 self.assertEqual(get_foos.call_count, 2)
128 get_foos.assert_has_calls([c('', 'foo'), c('', 'foobar')])
129
130 self.assertEqual(get_bars.call_count, 2)
131 get_bars.assert_has_calls([c('', 'foobar'), c('', 'bar')])
132
133 self.assertEqual(get_all.call_count, 3)
134 get_all.assert_has_calls([c('', 'foo'), c('', 'foobar'), c('', 'bar')])
135
136 get_none.assert_not_called()
137
138 errored.assert_not_called()
139
140 def test_wildcard_topic(self):
141
142 ebc = EventBusClient(EventBus())
143 subs = []
144
145 wildcard_sub = Mock()
146 subs.append(ebc.subscribe(re.compile(r'.*'), wildcard_sub))
147
148 prefix_sub = Mock()
149 subs.append(ebc.subscribe(re.compile(r'ham.*'), prefix_sub))
150
151 contains_sub = Mock()
152 subs.append(ebc.subscribe(re.compile(r'.*burg.*'), contains_sub))
153
154 ebc.publish('news', 1)
155 ebc.publish('hamsters', 2)
156 ebc.publish('hamburgers', 3)
157 ebc.publish('nonsense', 4)
158
159 c = call
160
161 self.assertEqual(wildcard_sub.call_count, 4)
162 wildcard_sub.assert_has_calls([
163 c('news', 1),
164 c('hamsters', 2),
165 c('hamburgers', 3),
166 c('nonsense', 4)])
167
168 self.assertEqual(prefix_sub.call_count, 2)
169 prefix_sub.assert_has_calls([
170 c('hamsters', 2),
171 c('hamburgers', 3)])
172
173 self.assertEqual(contains_sub.call_count, 1)
174 contains_sub.assert_has_calls([c('hamburgers', 3)])
175
176 for sub in subs:
177 ebc.unsubscribe(sub)
178
179 self.assertEqual(ebc.list_subscribers(), [])
180
181 @inlineCallbacks
182 def test_deferred_queue_receiver(self):
183
184 ebc = EventBus()
185
186 queue = DeferredQueue()
187
188 ebc.subscribe('', lambda _, msg: queue.put(msg))
189
190 for i in xrange(10):
191 ebc.publish('', i)
192
193 self.assertEqual(len(queue.pending), 10)
194 for i in xrange(10):
195 msg = yield queue.get()
196 self.assertEqual(msg, i)
197 self.assertEqual(len(queue.pending), 0)