diff --git a/python/devsup/db.py b/python/devsup/db.py index d644021..5d6893b 100644 --- a/python/devsup/db.py +++ b/python/devsup/db.py @@ -1,15 +1,21 @@ + +import threading, sys, traceback + +from util import Worker + try: import _dbapi except ImportError: import _nullapi as _dbapi -from _dbapi import _Field, _Record _rec_cache = {} _no_such_field = object() __all__ = [ - 'Record', + 'Record', 'getRecord', 'Field', + 'IOScanListBlock', + 'IOScanListThread', ] def getRecord(name): @@ -20,7 +26,100 @@ def getRecord(name): _rec_cache[name] = rec return rec -class Record(_Record): +class IOScanListBlock(object): + def __init__(self): + super(IOScanListBlock,self).__init__() + self._recs, self._recs_add, self._recs_remove = set(), set(), set() + self._running = False + + def add(self, rec): + assert isinstance(rec, Record) + + if self._running: + self._recs_remove.discard(rec) + self._recs_add.add(rec) + + else: + self._recs.add(rec) + + return self._remove + + def _remove(self, rec): + if self._running: + self._recs_add.discard(rec) + self._recs_add._recs_remove(rec) + + else: + self._recs.discard(rec) + + def interrupt(self, reason=None, mask=None): + self._running = True + try: + for R in self._recs: + if mask and R in mask: + continue + R.scan(sync=True, reason=reason, force=2) + finally: + self._running = False + if self._recs_add or self._recs_remove: + assert len(self._recs_add.interaction(self._recs_remove))==0 + + self._recs.update(self._recs_add) + self._recs_add.clear() + self._recs.difference_update(self._recs_remove) + self._recs_remove.clear() + + +def _default_whendone(type, val, tb): + if type or val or tb: + traceback.print_exception(type, val, tb) + +class IOScanListThread(IOScanListBlock): + _worker = None + queuelength=100 + @classmethod + def getworker(cls): + if cls._worker: + return cls._worker + import hooks + T = Worker(max=cls.queuelength) + hooks.addHook('AtIocExit', T.stop) + T.start() + cls._worker = T + return T + + def __init__(self): + super(IOScanListThread,self).__init__() + self._lock = threading.Lock() + + def add(self, rec): + print self,'add',rec + with self._lock: + return super(IOScanListThread,self).add(rec) + + def _remove(self, rec): + print self,'remove',rec + with self._lock: + return super(IOScanListThread,self)._remove(rec) + + def interrupt(self, reason=None, mask=None, whendone=_default_whendone): + W = self.getworker() + try: + W.add(self._X, (reason, mask, whendone)) + return True + except RuntimeError: + return False + + def _X(self, reason, mask, whendone): + try: + #print 'XXXXX',self + with self._lock: + super(IOScanListThread,self).interrupt(reason, mask) + finally: + #print 'YYYYY',self,sys.exc_info() + whendone(*sys.exc_info()) + +class Record(_dbapi._Record): def __init__(self, *args, **kws): super(Record, self).__init__(*args, **kws) super(Record, self).__setattr__('_fld_cache', {}) @@ -64,7 +163,7 @@ class Record(_Record): def __repr__(self): return 'Record("%s")'%self.name() -class Field(_Field): +class Field(_dbapi._Field): @property def record(self): """Fetch the record associated with this field