blob: ec7e81df40dc2c38c273ea96d9aa67ee0f71af52 [file] [log] [blame]
Matteo Scandolo48d3d2d2017-08-08 13:05:27 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Chetan Gaonkercfcce782016-05-10 10:10:42 -070017#
18# Copyright 2016-present Ciena Corporation
19#
20# Licensed under the Apache License, Version 2.0 (the "License");
21# you may not use this file except in compliance with the License.
22# You may obtain a copy of the License at
23#
24# http://www.apache.org/licenses/LICENSE-2.0
25#
26# Unless required by applicable law or agreed to in writing, software
27# distributed under the License is distributed on an "AS IS" BASIS,
28# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29# See the License for the specific language governing permissions and
30# limitations under the License.
31#
Chetan Gaonkercd86bdd2016-03-17 00:08:12 -070032import threading
33import Queue
34
35class PoolThread(threading.Thread):
36
37 def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
38 threading.Thread.__init__(self, **kwds)
39 self.daemon = daemon
40 self._queue = requests_queue
41 self._wait_timeout = wait_timeout
42 self._finished = threading.Event()
43 self.start()
44
45 def run(self):
46 while True:
47 if(self._finished.isSet()):
48 break
49
50 try:
51 work = self._queue.get(block=True, timeout=self._wait_timeout)
52 except Queue.Empty:
53 continue
54 else:
55 try:
56 work.__call__()
57 finally:
58 self._queue.task_done()
59
60
61
62class ThreadPool:
63
64 def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
65 """Set up the thread pool and create pool_size threads
66 """
67 self._queue = Queue.Queue(queue_size)
68 self._daemon = daemon
69 self._threads = []
70 self._pool_size = pool_size
71 self._wait_timeout = wait_timeout
72 self.createThreads()
73
74
75 def addTask(self, callableObject):
76 if (callable(callableObject)):
77 self._queue.put(callableObject, block=True)
78
79 def cleanUpThreads(self):
80 self._queue.join()
81
82 for t in self._threads:
83 t._finished.set()
84
85
86 def createThreads(self):
87 for i in range(self._pool_size):
88 self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
89
90
91class CallObject:
92 def __init__(self, v = 0):
93 self.v = v
94 def callCb(self):
95 print 'Inside callback for %d' %self.v
96
97if __name__ == '__main__':
98 import multiprocessing
99 callList = []
100 cpu_count = multiprocessing.cpu_count()
101 for i in xrange(cpu_count * 2):
102 callList.append(CallObject(i))
103 tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
104 for i in range(40):
105 callObject = callList[i% (cpu_count*2)]
106 f = callObject.callCb
107 tp.addTask(f)
108
109 tp.cleanUpThreads()
110
111