Files
pyDevSup/python/devsup/db.py
Michael Davidsaver 4facd4339a documentation
2013-04-01 09:16:59 -04:00

306 lines
9.0 KiB
Python

import threading, sys, traceback
from util import Worker
try:
import _dbapi
except ImportError:
import _nullapi as _dbapi
_rec_cache = {}
_no_such_field = object()
__all__ = [
'Record', 'getRecord',
'Field',
'IOScanListBlock',
'IOScanListThread',
]
def getRecord(name):
"""Retrieve a :class:`Record` instance by the
full record name.
The result is cached so the future calls will return the same instance.
This is the prefered way to get :class:`Record` instances.
>>> R = getRecord("my:record:name")
Record("my:record:name")
"""
try:
return _rec_cache[name]
except KeyError:
rec = Record(name)
_rec_cache[name] = rec
return rec
class IOScanListBlock(object):
"""A list of records which will be processed together.
This convienence class to handle the accounting to
maintain a list of records.
"""
def __init__(self):
super(IOScanListBlock,self).__init__()
self._recs, self._recs_add, self._recs_remove = set(), set(), set()
self.force, self._running = 2, False
def add(self, rec):
"""Add a record to the scan list.
This method is designed to be consistent
with :meth:`allowScan <DeviceSupport.allowScan>`
by returning its :meth:`remove` method.
If fact this function can be completely delegated. ::
class MyDriver(util.StoppableThread):
def __init__(self):
super(MyDriver,self).__init__()
self.lock = threading.Lock()
self.scan1 = IOScanListBlock()
def run(self):
try:
while self.shouldRun():
time.sleep(1)
with self.lock:
self.scan1.interrupt()
finally:
self.finish()
class MySup(object):
def __init__(self, driver):
self.driver = driver
def allowScan(rec):
with self.driver.lock:
return self.driver.scan1.add(rec)
"""
assert isinstance(rec, Record)
if self._running:
self._recs_remove.discard(rec)
self._recs_add.add(rec)
else:
self._recs.add(rec)
return self.remove
def remove(self, rec):
"""Remove a record from the scan list.
"""
if self._running:
self._recs_add.discard(rec)
self._recs_add._recs_remove(rec)
else:
self._recs.discard(rec)
def interrupt(self, reason=None, mask=None):
"""Scan the records in this list.
:param reason: Passed to :meth:`Record.scan`.
:param mask: A *list* or *set* or records which should not be scanned.
This method will call :meth:`Record.scan` of each of the records
currently in the list. This is done synchronously in the current
thread. It should **never** be call when any record locks are held.
"""
self._running = True
try:
for R in self._recs:
if mask and R in mask:
continue
R.scan(sync=True, reason=reason, force=self.force)
finally:
self._running = False
if self._recs_add or self._recs_remove:
assert len(self._recs_add.interaction(self._recs_remove))==0
self._recs.update(self._recs_add)
self._recs_add.clear()
self._recs.difference_update(self._recs_remove)
self._recs_remove.clear()
def _default_whendone(type, val, tb):
if type or val or tb:
traceback.print_exception(type, val, tb)
class IOScanListThread(IOScanListBlock):
"""A list of records w/ a worker thread to run them.
All methods are thread-safe.
"""
_worker = None
_worker_lock = threading.Lock()
queuelength=100
@classmethod
def getworker(cls):
with cls._worker_lock:
if cls._worker is not None:
return cls._worker
import hooks
T = Worker(max=cls.queuelength)
hooks.addHook('AtIocExit', T.join)
T.start()
cls._worker = T
return T
def __init__(self):
super(IOScanListThread,self).__init__()
self._lock = threading.Lock()
def add(self, rec):
"""Add a record to the scan list.
This method is thread-safe and may be used
without additional locking. ::
class MyDriver(util.StoppableThread):
def __init__(self):
super(MyDriver,self).__init__()
self.scan1 = IOScanListThread()
def run(self):
try:
while self.shouldRun():
time.sleep(1)
self.scan1.interrupt()
finally:
self.finish()
class MySup(object):
def __init__(self, driver):
self.driver = driver
self.allowScan = self.driver.scan1.add
"""
with self._lock:
return super(IOScanListThread,self).add(rec)
def remove(self, rec):
with self._lock:
return super(IOScanListThread,self).remove(rec)
def interrupt(self, reason=None, mask=None, whendone=_default_whendone):
"""Queue a request to process the scan list.
:param reason: Passed to :meth:`Record.scan`.
:param mask: A *list* or *set* or records which should not be scanned.
:param whendone: A callable which will be invoked after all records are processed.
:throws: RuntimeError is the request can't be queued.
Calling this method will cause a request to be sent to a
worker thread. This method can be called several times
to queue several requests.
If provided, the *whendone* callable is invoked with three arguments.
These will be None except in the case an interrupt is raised in the
worker in which case they are: exception type, value, and traceback.
.. note::
This method may be safely called while record locks are held.
"""
W = self.getworker()
W.add(self._X, (reason, mask, whendone))
def _X(self, reason, mask, whendone):
try:
#print 'XXXXX',self
with self._lock:
super(IOScanListThread,self).interrupt(reason, mask)
finally:
#print 'YYYYY',self,sys.exc_info()
whendone(*sys.exc_info())
class Record(_dbapi._Record):
def __init__(self, *args, **kws):
super(Record, self).__init__(*args, **kws)
super(Record, self).__setattr__('_fld_cache', {})
def field(self, name):
"""Lookup field in this record
:rtype: :class:`Field`
:throws: KeyError for non-existant fields.
The returned object is cached so future calls will
return the same instance.
>>> getRecord("rec").field('HOPR')
Field("rec.HOPR")
"""
try:
F = self._fld_cache[name]
if F is _no_such_field:
raise ValueError()
return F
except KeyError, e:
try:
fld = Field("%s.%s"%(self.name(), name))
except ValueError:
self._fld_cache[name] = _no_such_field
raise e
else:
self._fld_cache[name] = fld
return fld
def __getattr__(self, name):
try:
F = self.field(name)
except ValueError:
raise AttributeError('No such field')
else:
return F.getval()
def __setattr__(self, name, val):
try:
F=self.field(name)
except ValueError:
super(Record, self).__setattr__(name, val)
else:
F.putval(val)
def __repr__(self):
return 'Record("%s")'%self.name()
class Field(_dbapi._Field):
@property
def record(self):
"""Fetch the :class:`Record` associated with this field
"""
try:
return self._record
except AttributeError:
rec, _ = self.name()
self._record = getRecord(rec)
return self._record
def __cmp__(self, B):
if isinstance(B, Field):
B=B.getval()
return cmp(self.getval(), B)
def __int__(self):
return int(self.getval())
def __long__(self):
return long(self.getval())
def __float__(self):
return float(self.getval())
def __repr__(self):
return 'Field("%s.%s")'%self.name()
def processLink(name, lstr):
"""Process the INP or OUT link
Expects lstr to be "module arg1 arg2"
Returns (callable, Record, "arg1 arg2")
"""
rec = getRecord(name)
parts = lstr.split(None,1)
modname, args = parts[0], parts[1] if len(parts)>1 else None
mod = __import__(modname)
return rec, mod.build(rec, args)