diff --git a/python/devsup/util.py b/python/devsup/util.py index 0d85424..effb83f 100644 --- a/python/devsup/util.py +++ b/python/devsup/util.py @@ -1,6 +1,10 @@ from __future__ import print_function import threading, traceback +try: + import Queue as queue +except ImportError: + import queue class StoppableThread(threading.Thread): """A thread which can be requested to stop. @@ -20,7 +24,7 @@ class StoppableThread(threading.Thread): ... time.sleep(0.01) >>> T = TestThread() >>> T.start() - >>> T.E.wait() + >>> T.E.wait(1.0) True >>> T.cur is T True @@ -61,83 +65,72 @@ class Worker(threading.Thread): >>> w = Worker() >>> w.start() >>> w.add(E.set) + True >>> E.wait(1.0) True >>> w.join() """ + StopWorker = object() + def __init__(self, max=0): super(Worker, self).__init__() - self.__stop = None + self._Q = queue.Queue(maxsize=max) + self.__stop = False self.__lock = threading.Lock() - self.__update = threading.Condition(self.__lock) - self.maxQ, self._Q = max, [] def join(self, flush=True): - self.__update.acquire() - try: - if self.__stop is not None: - raise RuntimeError("Someone else is already trying to stop me") - - self.__stop = threading.Event() - self.__update.notify() - - self.__update.release() - try: - self.__stop.wait() - finally: - self.__update.acquire() - - self.__stop = None - - if flush: - self._Q = [] - - finally: - self.__update.release() + """Stop accepting new jobs and join the worker thread + + Blocks until currently queued work is complete. + """ + with self.__lock: + self.__stop = True + self._Q.put((self.StopWorker,None,None)) + super(Worker, self).join() def __len__(self): - with self.__lock: - return len(self._Q) + return self._Q.qsize() - def add(self, func, args=(), kws={}): + def add(self, func, args=(), kws={}, block=True): + """Attempt to send a job to the worker. + + :returns: True if the job was queued. False if the queue is full, + or has been joined. + + When ``block=True`` then this method will only return + False if the Worker has been joined. + """ with self.__lock: - if self.__stop is not None: - return - elif self.maxQ>0 and len(self._Q)>=self.maxQ: - raise RuntimeError('Worker queue full') - - self._Q.append((func,args,kws)) - self.__update.notify() + if self.__stop is True: + return False + try: + self._Q.put((func,args,kws), block) + return True + except queue.Full: + return False def run(self): - self.__update.acquire() + Q = self._Q + block = True try: - while True: + F, A, K = Q.get(block) - while self.__stop is None and len(self._Q)==0: - self.__update.wait() + if F is self.StopWorker: + block = False + Q.task_done() + continue - if self.__stop is not None: - break - - F, A, K = self._Q.pop(0) - - self.__update.release() try: - F(*A,**K) + F(*A, **K) except: print('Error running',F,A,K) traceback.print_exc() finally: - self.__update.acquire() - - self.__stop.set() - except: - traceback.print_exc() - finally: - self.__update.release() + Q.task_done() + except queue.Empty: + pass # We are done now if __name__=='__main__': import doctest