use Queue for worker
This commit is contained in:
@ -1,6 +1,10 @@
|
|||||||
|
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
import threading, traceback
|
import threading, traceback
|
||||||
|
try:
|
||||||
|
import Queue as queue
|
||||||
|
except ImportError:
|
||||||
|
import queue
|
||||||
|
|
||||||
class StoppableThread(threading.Thread):
|
class StoppableThread(threading.Thread):
|
||||||
"""A thread which can be requested to stop.
|
"""A thread which can be requested to stop.
|
||||||
@ -20,7 +24,7 @@ class StoppableThread(threading.Thread):
|
|||||||
... time.sleep(0.01)
|
... time.sleep(0.01)
|
||||||
>>> T = TestThread()
|
>>> T = TestThread()
|
||||||
>>> T.start()
|
>>> T.start()
|
||||||
>>> T.E.wait()
|
>>> T.E.wait(1.0)
|
||||||
True
|
True
|
||||||
>>> T.cur is T
|
>>> T.cur is T
|
||||||
True
|
True
|
||||||
@ -61,83 +65,72 @@ class Worker(threading.Thread):
|
|||||||
>>> w = Worker()
|
>>> w = Worker()
|
||||||
>>> w.start()
|
>>> w.start()
|
||||||
>>> w.add(E.set)
|
>>> w.add(E.set)
|
||||||
|
True
|
||||||
>>> E.wait(1.0)
|
>>> E.wait(1.0)
|
||||||
True
|
True
|
||||||
>>> w.join()
|
>>> w.join()
|
||||||
"""
|
"""
|
||||||
|
StopWorker = object()
|
||||||
|
|
||||||
def __init__(self, max=0):
|
def __init__(self, max=0):
|
||||||
super(Worker, self).__init__()
|
super(Worker, self).__init__()
|
||||||
self.__stop = None
|
self._Q = queue.Queue(maxsize=max)
|
||||||
|
self.__stop = False
|
||||||
self.__lock = threading.Lock()
|
self.__lock = threading.Lock()
|
||||||
self.__update = threading.Condition(self.__lock)
|
|
||||||
self.maxQ, self._Q = max, []
|
|
||||||
|
|
||||||
|
|
||||||
def join(self, flush=True):
|
def join(self, flush=True):
|
||||||
self.__update.acquire()
|
"""Stop accepting new jobs and join the worker thread
|
||||||
try:
|
|
||||||
if self.__stop is not None:
|
|
||||||
raise RuntimeError("Someone else is already trying to stop me")
|
|
||||||
|
|
||||||
self.__stop = threading.Event()
|
Blocks until currently queued work is complete.
|
||||||
self.__update.notify()
|
"""
|
||||||
|
with self.__lock:
|
||||||
self.__update.release()
|
self.__stop = True
|
||||||
try:
|
self._Q.put((self.StopWorker,None,None))
|
||||||
self.__stop.wait()
|
super(Worker, self).join()
|
||||||
finally:
|
|
||||||
self.__update.acquire()
|
|
||||||
|
|
||||||
self.__stop = None
|
|
||||||
|
|
||||||
if flush:
|
|
||||||
self._Q = []
|
|
||||||
|
|
||||||
finally:
|
|
||||||
self.__update.release()
|
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
with self.__lock:
|
return self._Q.qsize()
|
||||||
return len(self._Q)
|
|
||||||
|
|
||||||
def add(self, func, args=(), kws={}):
|
def add(self, func, args=(), kws={}, block=True):
|
||||||
with self.__lock:
|
"""Attempt to send a job to the worker.
|
||||||
if self.__stop is not None:
|
|
||||||
return
|
|
||||||
elif self.maxQ>0 and len(self._Q)>=self.maxQ:
|
|
||||||
raise RuntimeError('Worker queue full')
|
|
||||||
|
|
||||||
self._Q.append((func,args,kws))
|
:returns: True if the job was queued. False if the queue is full,
|
||||||
self.__update.notify()
|
or has been joined.
|
||||||
|
|
||||||
|
When ``block=True`` then this method will only return
|
||||||
|
False if the Worker has been joined.
|
||||||
|
"""
|
||||||
|
with self.__lock:
|
||||||
|
if self.__stop is True:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
self._Q.put((func,args,kws), block)
|
||||||
|
return True
|
||||||
|
except queue.Full:
|
||||||
|
return False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.__update.acquire()
|
Q = self._Q
|
||||||
|
block = True
|
||||||
try:
|
try:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
F, A, K = Q.get(block)
|
||||||
|
|
||||||
while self.__stop is None and len(self._Q)==0:
|
if F is self.StopWorker:
|
||||||
self.__update.wait()
|
block = False
|
||||||
|
Q.task_done()
|
||||||
|
continue
|
||||||
|
|
||||||
if self.__stop is not None:
|
|
||||||
break
|
|
||||||
|
|
||||||
F, A, K = self._Q.pop(0)
|
|
||||||
|
|
||||||
self.__update.release()
|
|
||||||
try:
|
try:
|
||||||
F(*A,**K)
|
F(*A, **K)
|
||||||
except:
|
except:
|
||||||
print('Error running',F,A,K)
|
print('Error running',F,A,K)
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
finally:
|
finally:
|
||||||
self.__update.acquire()
|
Q.task_done()
|
||||||
|
except queue.Empty:
|
||||||
self.__stop.set()
|
pass # We are done now
|
||||||
except:
|
|
||||||
traceback.print_exc()
|
|
||||||
finally:
|
|
||||||
self.__update.release()
|
|
||||||
|
|
||||||
if __name__=='__main__':
|
if __name__=='__main__':
|
||||||
import doctest
|
import doctest
|
||||||
|
Reference in New Issue
Block a user