blob: 016b873efd97b6d67588f36dc32e932f83b2abdf [file] [log] [blame]
Chetan Gaonkercfcce782016-05-10 10:10:42 -07001#
2# Copyright 2016-present Ciena Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Chetan Gaonkercd86bdd2016-03-17 00:08:12 -070016import threading
17import Queue
18
19class PoolThread(threading.Thread):
20
21 def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
22 threading.Thread.__init__(self, **kwds)
23 self.daemon = daemon
24 self._queue = requests_queue
25 self._wait_timeout = wait_timeout
26 self._finished = threading.Event()
27 self.start()
28
29 def run(self):
30 while True:
31 if(self._finished.isSet()):
32 break
33
34 try:
35 work = self._queue.get(block=True, timeout=self._wait_timeout)
36 except Queue.Empty:
37 continue
38 else:
39 try:
40 work.__call__()
41 finally:
42 self._queue.task_done()
43
44
45
46class ThreadPool:
47
48 def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
49 """Set up the thread pool and create pool_size threads
50 """
51 self._queue = Queue.Queue(queue_size)
52 self._daemon = daemon
53 self._threads = []
54 self._pool_size = pool_size
55 self._wait_timeout = wait_timeout
56 self.createThreads()
57
58
59 def addTask(self, callableObject):
60 if (callable(callableObject)):
61 self._queue.put(callableObject, block=True)
62
63 def cleanUpThreads(self):
64 self._queue.join()
65
66 for t in self._threads:
67 t._finished.set()
68
69
70 def createThreads(self):
71 for i in range(self._pool_size):
72 self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
73
74
75class CallObject:
76 def __init__(self, v = 0):
77 self.v = v
78 def callCb(self):
79 print 'Inside callback for %d' %self.v
80
81if __name__ == '__main__':
82 import multiprocessing
83 callList = []
84 cpu_count = multiprocessing.cpu_count()
85 for i in xrange(cpu_count * 2):
86 callList.append(CallObject(i))
87 tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
88 for i in range(40):
89 callObject = callList[i% (cpu_count*2)]
90 f = callObject.callCb
91 tp.addTask(f)
92
93 tp.cleanUpThreads()
94
95