blob: bd3e21ede7fd7995e761b457243f0f2bd43bc89a [file] [log] [blame]
Chetan Gaonkercd86bdd2016-03-17 00:08:12 -07001import threading
2import Queue
3
4class PoolThread(threading.Thread):
5
6 def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
7 threading.Thread.__init__(self, **kwds)
8 self.daemon = daemon
9 self._queue = requests_queue
10 self._wait_timeout = wait_timeout
11 self._finished = threading.Event()
12 self.start()
13
14 def run(self):
15 while True:
16 if(self._finished.isSet()):
17 break
18
19 try:
20 work = self._queue.get(block=True, timeout=self._wait_timeout)
21 except Queue.Empty:
22 continue
23 else:
24 try:
25 work.__call__()
26 finally:
27 self._queue.task_done()
28
29
30
31class ThreadPool:
32
33 def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
34 """Set up the thread pool and create pool_size threads
35 """
36 self._queue = Queue.Queue(queue_size)
37 self._daemon = daemon
38 self._threads = []
39 self._pool_size = pool_size
40 self._wait_timeout = wait_timeout
41 self.createThreads()
42
43
44 def addTask(self, callableObject):
45 if (callable(callableObject)):
46 self._queue.put(callableObject, block=True)
47
48 def cleanUpThreads(self):
49 self._queue.join()
50
51 for t in self._threads:
52 t._finished.set()
53
54
55 def createThreads(self):
56 for i in range(self._pool_size):
57 self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
58
59
60class CallObject:
61 def __init__(self, v = 0):
62 self.v = v
63 def callCb(self):
64 print 'Inside callback for %d' %self.v
65
66if __name__ == '__main__':
67 import multiprocessing
68 callList = []
69 cpu_count = multiprocessing.cpu_count()
70 for i in xrange(cpu_count * 2):
71 callList.append(CallObject(i))
72 tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
73 for i in range(40):
74 callObject = callList[i% (cpu_count*2)]
75 f = callObject.callCb
76 tp.addTask(f)
77
78 tp.cleanUpThreads()
79
80