blob: c833bf0d2081ce7964d30defdc6228329d63d01a [file] [log] [blame]
#
# Copyright 2016 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import re
from mock import Mock
from mock import call
from twisted.internet.defer import DeferredQueue, inlineCallbacks
from twisted.trial.unittest import TestCase
from pyvoltha.common.event_bus import EventBusClient, EventBus
from six.moves import range
class TestEventBus(TestCase):
def test_subscribe(self):
ebc = EventBusClient()
sub = ebc.subscribe('news', lambda msg, topic: None)
self.assertEqual(len(ebc.list_subscribers()), 1)
self.assertEqual(len(ebc.list_subscribers('news')), 1)
self.assertEqual(len(ebc.list_subscribers('other')), 0)
def test_unsubscribe(self):
ebc = EventBusClient(EventBus())
sub = ebc.subscribe('news', lambda msg, topic: None)
ebc.unsubscribe(sub)
self.assertEqual(ebc.list_subscribers(), [])
self.assertEqual(ebc.list_subscribers('news'), [])
def test_simple_publish(self):
ebc = EventBusClient(EventBus())
mock = Mock()
ebc.subscribe('news', mock)
ebc.publish('news', 'message')
self.assertEqual(mock.call_count, 1)
mock.assert_called_with('news', 'message')
def test_topic_filtering(self):
ebc = EventBusClient(EventBus())
mock = Mock()
ebc.subscribe('news', mock)
ebc.publish('news', 'msg1')
ebc.publish('alerts', 'msg2')
ebc.publish('logs', 'msg3')
self.assertEqual(mock.call_count, 1)
mock.assert_called_with('news', 'msg1')
def test_multiple_subscribers(self):
ebc = EventBusClient(EventBus())
mock1 = Mock()
ebc.subscribe('news', mock1)
mock2 = Mock()
ebc.subscribe('alerts', mock2)
mock3 = Mock()
ebc.subscribe('logs', mock3)
mock4 = Mock()
ebc.subscribe('logs', mock4)
ebc.publish('news', 'msg1')
ebc.publish('alerts', 'msg2')
ebc.publish('logs', 'msg3')
self.assertEqual(mock1.call_count, 1)
mock1.assert_called_with('news', 'msg1')
self.assertEqual(mock2.call_count, 1)
mock2.assert_called_with('alerts', 'msg2')
self.assertEqual(mock3.call_count, 1)
mock3.assert_called_with('logs', 'msg3')
self.assertEqual(mock4.call_count, 1)
mock4.assert_called_with('logs', 'msg3')
def test_predicates(self):
ebc = EventBusClient(EventBus())
get_foos = Mock()
ebc.subscribe('', get_foos, lambda msg: msg.startswith('foo'))
get_bars = Mock()
ebc.subscribe('', get_bars, lambda msg: msg.endswith('bar'))
get_all = Mock()
ebc.subscribe('', get_all)
get_none = Mock()
ebc.subscribe('', get_none, lambda msg: msg.find('zoo') >= 0)
errored = Mock()
ebc.subscribe('', errored, lambda msg: 1/0)
ebc.publish('', 'foo')
ebc.publish('', 'foobar')
ebc.publish('', 'bar')
c = call
self.assertEqual(get_foos.call_count, 2)
get_foos.assert_has_calls([c('', 'foo'), c('', 'foobar')])
self.assertEqual(get_bars.call_count, 2)
get_bars.assert_has_calls([c('', 'foobar'), c('', 'bar')])
self.assertEqual(get_all.call_count, 3)
get_all.assert_has_calls([c('', 'foo'), c('', 'foobar'), c('', 'bar')])
get_none.assert_not_called()
errored.assert_not_called()
def test_wildcard_topic(self):
ebc = EventBusClient(EventBus())
subs = []
wildcard_sub = Mock()
subs.append(ebc.subscribe(re.compile(r'.*'), wildcard_sub))
prefix_sub = Mock()
subs.append(ebc.subscribe(re.compile(r'ham.*'), prefix_sub))
contains_sub = Mock()
subs.append(ebc.subscribe(re.compile(r'.*burg.*'), contains_sub))
ebc.publish('news', 1)
ebc.publish('hamsters', 2)
ebc.publish('hamburgers', 3)
ebc.publish('nonsense', 4)
c = call
self.assertEqual(wildcard_sub.call_count, 4)
wildcard_sub.assert_has_calls([
c('news', 1),
c('hamsters', 2),
c('hamburgers', 3),
c('nonsense', 4)])
self.assertEqual(prefix_sub.call_count, 2)
prefix_sub.assert_has_calls([
c('hamsters', 2),
c('hamburgers', 3)])
self.assertEqual(contains_sub.call_count, 1)
contains_sub.assert_has_calls([c('hamburgers', 3)])
for sub in subs:
ebc.unsubscribe(sub)
self.assertEqual(ebc.list_subscribers(), [])
@inlineCallbacks
def test_deferred_queue_receiver(self):
ebc = EventBus()
queue = DeferredQueue()
ebc.subscribe('', lambda _, msg: queue.put(msg))
for i in range(10):
ebc.publish('', i)
self.assertEqual(len(queue.pending), 10)
for i in range(10):
msg = yield queue.get()
self.assertEqual(msg, i)
self.assertEqual(len(queue.pending), 0)
def test_subscribers_that_unsubscribe_when_called(self):
# VOL-943 bug fix check
ebc = EventBusClient(EventBus())
class UnsubscribeWhenCalled(object):
def __init__(self):
self.subscription = ebc.subscribe('news', self.unsubscribe)
self.called = False
def unsubscribe(self, _topic, _msg):
self.called = True
ebc.unsubscribe(self.subscription)
ebc1 = UnsubscribeWhenCalled()
ebc2 = UnsubscribeWhenCalled()
ebc3 = UnsubscribeWhenCalled()
ebc.publish('news', 'msg1')
self.assertTrue(ebc1.called)
self.assertTrue(ebc2.called)
self.assertTrue(ebc3.called)