blob: 2b4257acbd8cfbc6134ad55f8506de86a59cc8e4 [file] [log] [blame]
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Haraszti023ea7c2016-10-16 19:30:34 -07003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from twisted.internet.defer import Deferred
17from twisted.internet.defer import succeed
18
19
20class MessageQueue(object):
21 """
22 An event driven queue, similar to twisted.internet.defer.DeferredQueue
23 but which allows selective dequeing based on a predicate function.
24 Unlike DeferredQueue, there is no limit on backlog, and there is no queue
25 limit.
26 """
27
28 def __init__(self):
29 self.waiting = [] # tuples of (d, predicate)
30 self.queue = [] # messages piling up here if no one is waiting
31
32 def reset(self):
33 """
34 Purge all content as well as waiters (by errback-ing their entries).
35 :return: None
36 """
37 for d, _ in self.waiting:
38 d.errback(Exception('mesage queue reset() was called'))
39 self.waiting = []
40 self.queue = []
41
42 def _cancelGet(self, d):
43 """
44 Remove a deferred from our waiting list.
45 :param d: The deferred that was been canceled.
46 :return: None
47 """
48 for i in range(len(self.waiting)):
49 if self.waiting[i][0] is d:
50 self.waiting.pop(i)
51
52 def put(self, obj):
53 """
54 Add an object to this queue
55 :param obj: arbitrary object that will be added to the queue
56 :return:
57 """
58
59 # if someone is waiting for this, return right away
60 for i in range(len(self.waiting)):
61 d, predicate = self.waiting[i]
62 if predicate is None or predicate(obj):
63 self.waiting.pop(i)
64 d.callback(obj)
65 return
66
67 # otherwise...
68 self.queue.append(obj)
69
70 def get(self, predicate=None):
71 """
72 Attempt to retrieve and remove an object from the queue that
73 matches the optional predicate.
74 :return: Deferred which fires with the next object available.
75 If predicate was provided, only objects for which
76 predicate(obj) is True will be considered.
77 """
78 for i in range(len(self.queue)):
79 msg = self.queue[i]
80 if predicate is None or predicate(msg):
81 self.queue.pop(i)
82 return succeed(msg)
83
84 # there were no matching entries if we got here, so we wait
85 d = Deferred(canceller=self._cancelGet)
86 self.waiting.append((d, predicate))
87 return d
88
89