138 lines
3.4 KiB
Python
138 lines
3.4 KiB
Python
|
|
from __future__ import print_function
|
|
import threading, traceback
|
|
try:
|
|
import Queue as queue
|
|
except ImportError:
|
|
import queue
|
|
|
|
class StoppableThread(threading.Thread):
|
|
"""A thread which can be requested to stop.
|
|
|
|
The thread run() method should periodically call the shouldRun()
|
|
method and return if this yields False.
|
|
|
|
>>> class TestThread(StoppableThread):
|
|
... def __init__(self):
|
|
... super(TestThread,self).__init__()
|
|
... self.E=threading.Event()
|
|
... def run(self):
|
|
... import time
|
|
... self.cur = threading.current_thread()
|
|
... self.E.set()
|
|
... while self.shouldRun():
|
|
... time.sleep(0.01)
|
|
>>> T = TestThread()
|
|
>>> T.start()
|
|
>>> T.E.wait(1.0)
|
|
True
|
|
>>> T.cur is T
|
|
True
|
|
>>> T.join()
|
|
>>> T.is_alive()
|
|
False
|
|
"""
|
|
def __init__(self, max=0):
|
|
super(StoppableThread, self).__init__()
|
|
self.__stop = True
|
|
self.__lock = threading.Lock()
|
|
|
|
def start(self):
|
|
with self.__lock:
|
|
self.__stop = False
|
|
|
|
super(StoppableThread, self).start()
|
|
|
|
def join(self):
|
|
with self.__lock:
|
|
self.__stop = True
|
|
|
|
super(StoppableThread, self).join()
|
|
|
|
def shouldRun(self):
|
|
with self.__lock:
|
|
return not self.__stop
|
|
|
|
class Worker(threading.Thread):
|
|
"""A threaded work queue.
|
|
|
|
>>> w = Worker()
|
|
>>> w = Worker()
|
|
>>> w.start()
|
|
>>> w.join()
|
|
>>> import threading
|
|
>>> E = threading.Event()
|
|
>>> w = Worker()
|
|
>>> w.start()
|
|
>>> w.add(E.set)
|
|
True
|
|
>>> E.wait(1.0)
|
|
True
|
|
>>> w.join()
|
|
"""
|
|
StopWorker = object()
|
|
|
|
def __init__(self, max=0):
|
|
super(Worker, self).__init__()
|
|
self._Q = queue.Queue(maxsize=max)
|
|
self.__stop = False
|
|
self.__lock = threading.Lock()
|
|
|
|
|
|
def join(self, flush=True):
|
|
"""Stop accepting new jobs and join the worker thread
|
|
|
|
Blocks until currently queued work is complete.
|
|
"""
|
|
with self.__lock:
|
|
self.__stop = True
|
|
self._Q.put((self.StopWorker,None,None))
|
|
super(Worker, self).join()
|
|
|
|
def __len__(self):
|
|
return self._Q.qsize()
|
|
|
|
def add(self, func, args=(), kws={}, block=True):
|
|
"""Attempt to send a job to the worker.
|
|
|
|
:returns: True if the job was queued. False if the queue is full,
|
|
or has been joined.
|
|
|
|
When ``block=True`` then this method will only return
|
|
False if the Worker has been joined.
|
|
"""
|
|
with self.__lock:
|
|
if self.__stop is True:
|
|
return False
|
|
try:
|
|
self._Q.put((func,args,kws), block)
|
|
return True
|
|
except queue.Full:
|
|
return False
|
|
|
|
def run(self):
|
|
Q = self._Q
|
|
block = True
|
|
try:
|
|
while True:
|
|
F, A, K = Q.get(block)
|
|
|
|
if F is self.StopWorker:
|
|
block = False
|
|
Q.task_done()
|
|
continue
|
|
|
|
try:
|
|
F(*A, **K)
|
|
except:
|
|
print('Error running',F,A,K)
|
|
traceback.print_exc()
|
|
finally:
|
|
Q.task_done()
|
|
except queue.Empty:
|
|
pass # We are done now
|
|
|
|
if __name__=='__main__':
|
|
import doctest
|
|
doctest.testmod()
|