diff --git a/python/devsup/db.py b/python/devsup/db.py index 5d6893b..91289cf 100644 --- a/python/devsup/db.py +++ b/python/devsup/db.py @@ -83,7 +83,7 @@ class IOScanListThread(IOScanListBlock): return cls._worker import hooks T = Worker(max=cls.queuelength) - hooks.addHook('AtIocExit', T.stop) + hooks.addHook('AtIocExit', T.join) T.start() cls._worker = T return T diff --git a/python/devsup/util.py b/python/devsup/util.py index 317b30a..7030d28 100644 --- a/python/devsup/util.py +++ b/python/devsup/util.py @@ -1,13 +1,82 @@ import threading, traceback +class StoppableThread(threading.Thread): + """A thread which can be required to stop. + + The thread run() method should periodically call the shouldRun() + method if this yields False, the finish() should be called before returning. + This is really only feasible by sub-classing. + + >>> class TestThread(StoppableThread): + ... def __init__(self): + ... super(TestThread,self).__init__() + ... self.E=threading.Event() + ... def run(self): + ... try: + ... import time + ... self.E.set() + ... while self.shouldRun(): + ... time.sleep(0.01) + ... finally: + ... self.finish() + >>> T = TestThread() + >>> T.start() + >>> T.E.wait() + True + >>> T.join() + >>> T.is_alive() + False + """ + def __init__(self, max=0): + super(StoppableThread, self).__init__() + self._run, self._stop = False, False + self._sevt, self._lock = threading.Event(), threading.Lock() + + def running(self): + with self._lock: + return self._run and not self._stop + + def start(self): + with self._lock: + assert not self._stop + self._run = True + self._sevt.clear() + + super(StoppableThread, self).start() + + def join(self): + with self._lock: + if not self._run: + return + self._stop = True + + self._sevt.wait() + + super(StoppableThread, self).join() + + def shouldRun(self): + with self._lock: + return not self._stop + + def finish(self): + with self._lock: + self._run = self._stop = False + self._sevt.set() + + def run(self, *args, **kws): + try: + super(StoppableThread, self).start(*args, **kws) + finally: + self.finish() + class Worker(threading.Thread): """A threaded work queue. >>> w = Worker() >>> w = Worker() >>> w.start() - >>> w.stop() + >>> w.join() >>> import threading >>> E = threading.Event() >>> w = Worker() @@ -15,7 +84,7 @@ class Worker(threading.Thread): >>> w.add(E.set) >>> E.wait(1.0) True - >>> w.stop() + >>> w.join() """ def __init__(self, max=0): super(Worker, self).__init__() @@ -35,7 +104,7 @@ class Worker(threading.Thread): super(Worker, self).start() self._run = True - def stop(self, flush=True): + def join(self, flush=True): self._update.acquire() try: if self._stop: