From 977e857802b25c5569b7b81b9f06e48f41d3b4e0 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sun, 31 Mar 2013 15:12:20 -0400 Subject: [PATCH] add queue worker --- python/devsup/__init__.py | 2 +- python/devsup/_nullapi.py | 2 +- python/devsup/util.py | 108 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 python/devsup/util.py diff --git a/python/devsup/__init__.py b/python/devsup/__init__.py index 700a242..d5ffaa0 100644 --- a/python/devsup/__init__.py +++ b/python/devsup/__init__.py @@ -5,4 +5,4 @@ except ImportError: __all__ = ['verinfo'] -from _dbapi import verinfo +verinfo = _dbapi.verinfo diff --git a/python/devsup/_nullapi.py b/python/devsup/_nullapi.py index 1b2c645..dce02f2 100644 --- a/python/devsup/_nullapi.py +++ b/python/devsup/_nullapi.py @@ -4,7 +4,7 @@ def verinfo(): Query EPICS version information """ - return (0,0,0,0.'invalid') + return (0,0,0,0,'invalid') class _Record(object): """Handle for record operations diff --git a/python/devsup/util.py b/python/devsup/util.py new file mode 100644 index 0000000..317b30a --- /dev/null +++ b/python/devsup/util.py @@ -0,0 +1,108 @@ + +import threading, traceback + +class Worker(threading.Thread): + """A threaded work queue. + + >>> w = Worker() + >>> w = Worker() + >>> w.start() + >>> w.stop() + >>> import threading + >>> E = threading.Event() + >>> w = Worker() + >>> w.start() + >>> w.add(E.set) + >>> E.wait(1.0) + True + >>> w.stop() + """ + def __init__(self, max=0): + super(Worker, self).__init__() + self._run, self._stop = False, None + self._lock = threading.Lock() + self._update = threading.Condition(self._lock) + self.maxQ, self._Q = max, [] + + def running(self): + with self._lock: + return self._run and not self._stop + + def start(self): + with self._lock: + if self._run or self._stop: + return + super(Worker, self).start() + self._run = True + + def stop(self, flush=True): + self._update.acquire() + try: + if self._stop: + raise RuntimeError("Someone else is already trying to stop me") + + self._stop = threading.Event() + self._update.notify() + + self._update.release() + try: + self._stop.wait() + finally: + self._update.acquire() + + self._stop = None + assert not self._run + + if flush: + self._Q = [] + + finally: + self._update.release() + + def __len__(self): + with self._lock: + return len(self._Q) + + def add(self, func, args=(), kws={}): + with self._lock: + if not self._run or self._stop: + return + elif self.maxQ>0 and len(self._Q)>=self.maxQ: + raise RuntimeError('Worker queue full') + + self._Q.append((func,args,kws)) + self._update.notify() + + def run(self): + self._update.acquire() + try: + assert self._run + + while True: + + while self._stop is None and len(self._Q)==0: + self._update.wait() + + if self._stop is not None: + break + + F, A, K = self._Q.pop(0) + + self._update.release() + try: + F(*A,**K) + except: + print 'Error running',F,A,K + traceback.print_exc() + finally: + self._update.acquire() + + self._run = False + self._stop.set() + + finally: + self._update.release() + +if __name__=='__main__': + import doctest + doctest.testmod()