blob: c833bf0d2081ce7964d30defdc6228329d63d01a [file] [log] [blame]
Chip Boling67b674a2019-02-08 11:42:18 -06001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Zack Williams84a71e92019-11-15 09:00:19 -070016from __future__ import absolute_import
Chip Boling67b674a2019-02-08 11:42:18 -060017import re
18
19from mock import Mock
20from mock import call
21from twisted.internet.defer import DeferredQueue, inlineCallbacks
22from twisted.trial.unittest import TestCase
23
24from pyvoltha.common.event_bus import EventBusClient, EventBus
Zack Williams84a71e92019-11-15 09:00:19 -070025from six.moves import range
Chip Boling67b674a2019-02-08 11:42:18 -060026
27
28class TestEventBus(TestCase):
29
30 def test_subscribe(self):
31
32 ebc = EventBusClient()
33 sub = ebc.subscribe('news', lambda msg, topic: None)
34 self.assertEqual(len(ebc.list_subscribers()), 1)
35 self.assertEqual(len(ebc.list_subscribers('news')), 1)
36 self.assertEqual(len(ebc.list_subscribers('other')), 0)
37
38 def test_unsubscribe(self):
39
40 ebc = EventBusClient(EventBus())
41 sub = ebc.subscribe('news', lambda msg, topic: None)
42 ebc.unsubscribe(sub)
43 self.assertEqual(ebc.list_subscribers(), [])
44 self.assertEqual(ebc.list_subscribers('news'), [])
45
46 def test_simple_publish(self):
47
48 ebc = EventBusClient(EventBus())
49
50 mock = Mock()
51 ebc.subscribe('news', mock)
52
53 ebc.publish('news', 'message')
54
55 self.assertEqual(mock.call_count, 1)
56 mock.assert_called_with('news', 'message')
57
58 def test_topic_filtering(self):
59
60 ebc = EventBusClient(EventBus())
61
62 mock = Mock()
63 ebc.subscribe('news', mock)
64
65 ebc.publish('news', 'msg1')
66 ebc.publish('alerts', 'msg2')
67 ebc.publish('logs', 'msg3')
68
69 self.assertEqual(mock.call_count, 1)
70 mock.assert_called_with('news', 'msg1')
71
72 def test_multiple_subscribers(self):
73
74 ebc = EventBusClient(EventBus())
75
76 mock1 = Mock()
77 ebc.subscribe('news', mock1)
78
79 mock2 = Mock()
80 ebc.subscribe('alerts', mock2)
81
82 mock3 = Mock()
83 ebc.subscribe('logs', mock3)
84
85 mock4 = Mock()
86 ebc.subscribe('logs', mock4)
87
88 ebc.publish('news', 'msg1')
89 ebc.publish('alerts', 'msg2')
90 ebc.publish('logs', 'msg3')
91
92 self.assertEqual(mock1.call_count, 1)
93 mock1.assert_called_with('news', 'msg1')
94
95 self.assertEqual(mock2.call_count, 1)
96 mock2.assert_called_with('alerts', 'msg2')
97
98 self.assertEqual(mock3.call_count, 1)
99 mock3.assert_called_with('logs', 'msg3')
100
101 self.assertEqual(mock4.call_count, 1)
102 mock4.assert_called_with('logs', 'msg3')
103
104 def test_predicates(self):
105
106 ebc = EventBusClient(EventBus())
107
108 get_foos = Mock()
109 ebc.subscribe('', get_foos, lambda msg: msg.startswith('foo'))
110
111 get_bars = Mock()
112 ebc.subscribe('', get_bars, lambda msg: msg.endswith('bar'))
113
114 get_all = Mock()
115 ebc.subscribe('', get_all)
116
117 get_none = Mock()
118 ebc.subscribe('', get_none, lambda msg: msg.find('zoo') >= 0)
119
120 errored = Mock()
121 ebc.subscribe('', errored, lambda msg: 1/0)
122
123 ebc.publish('', 'foo')
124 ebc.publish('', 'foobar')
125 ebc.publish('', 'bar')
126
127 c = call
128
129 self.assertEqual(get_foos.call_count, 2)
130 get_foos.assert_has_calls([c('', 'foo'), c('', 'foobar')])
131
132 self.assertEqual(get_bars.call_count, 2)
133 get_bars.assert_has_calls([c('', 'foobar'), c('', 'bar')])
134
135 self.assertEqual(get_all.call_count, 3)
136 get_all.assert_has_calls([c('', 'foo'), c('', 'foobar'), c('', 'bar')])
137
138 get_none.assert_not_called()
139
140 errored.assert_not_called()
141
142 def test_wildcard_topic(self):
143
144 ebc = EventBusClient(EventBus())
145 subs = []
146
147 wildcard_sub = Mock()
148 subs.append(ebc.subscribe(re.compile(r'.*'), wildcard_sub))
149
150 prefix_sub = Mock()
151 subs.append(ebc.subscribe(re.compile(r'ham.*'), prefix_sub))
152
153 contains_sub = Mock()
154 subs.append(ebc.subscribe(re.compile(r'.*burg.*'), contains_sub))
155
156 ebc.publish('news', 1)
157 ebc.publish('hamsters', 2)
158 ebc.publish('hamburgers', 3)
159 ebc.publish('nonsense', 4)
160
161 c = call
162
163 self.assertEqual(wildcard_sub.call_count, 4)
164 wildcard_sub.assert_has_calls([
165 c('news', 1),
166 c('hamsters', 2),
167 c('hamburgers', 3),
168 c('nonsense', 4)])
169
170 self.assertEqual(prefix_sub.call_count, 2)
171 prefix_sub.assert_has_calls([
172 c('hamsters', 2),
173 c('hamburgers', 3)])
174
175 self.assertEqual(contains_sub.call_count, 1)
176 contains_sub.assert_has_calls([c('hamburgers', 3)])
177
178 for sub in subs:
179 ebc.unsubscribe(sub)
180
181 self.assertEqual(ebc.list_subscribers(), [])
182
183 @inlineCallbacks
184 def test_deferred_queue_receiver(self):
185
186 ebc = EventBus()
187
188 queue = DeferredQueue()
189
190 ebc.subscribe('', lambda _, msg: queue.put(msg))
191
Zack Williams84a71e92019-11-15 09:00:19 -0700192 for i in range(10):
Chip Boling67b674a2019-02-08 11:42:18 -0600193 ebc.publish('', i)
194
195 self.assertEqual(len(queue.pending), 10)
Zack Williams84a71e92019-11-15 09:00:19 -0700196 for i in range(10):
Chip Boling67b674a2019-02-08 11:42:18 -0600197 msg = yield queue.get()
198 self.assertEqual(msg, i)
199 self.assertEqual(len(queue.pending), 0)
200
201 def test_subscribers_that_unsubscribe_when_called(self):
202 # VOL-943 bug fix check
203 ebc = EventBusClient(EventBus())
204
205 class UnsubscribeWhenCalled(object):
206 def __init__(self):
207 self.subscription = ebc.subscribe('news', self.unsubscribe)
208 self.called = False
209
210 def unsubscribe(self, _topic, _msg):
211 self.called = True
212 ebc.unsubscribe(self.subscription)
213
214 ebc1 = UnsubscribeWhenCalled()
215 ebc2 = UnsubscribeWhenCalled()
216 ebc3 = UnsubscribeWhenCalled()
217
218 ebc.publish('news', 'msg1')
219
220 self.assertTrue(ebc1.called)
221 self.assertTrue(ebc2.called)
222 self.assertTrue(ebc3.called)