install py files
This commit is contained in:
@ -50,9 +50,18 @@ devsup_SRCS += dbdset.c
|
||||
# Finally link to the EPICS Base libraries
|
||||
devsup_LIBS += $(EPICS_BASE_IOC_LIBS)
|
||||
|
||||
|
||||
PY += devsup/__init__.py
|
||||
PY += devsup/_nullapi.py
|
||||
PY += devsup/db.py
|
||||
PY += devsup/hooks.py
|
||||
PY += devsup/interfaces.py
|
||||
PY += devsup/util.py
|
||||
|
||||
#===========================
|
||||
|
||||
include $(TOP)/configure/RULES
|
||||
include $(TOP)/configure/RULES_PY
|
||||
#----------------------------------------
|
||||
# ADD RULES AFTER THIS LINE
|
||||
|
||||
|
13
devsupApp/src/devsup/__init__.py
Normal file
13
devsupApp/src/devsup/__init__.py
Normal file
@ -0,0 +1,13 @@
|
||||
try:
|
||||
import _dbapi
|
||||
except ImportError:
|
||||
import devsup._nullapi as _dbapi
|
||||
|
||||
try:
|
||||
from _dbconstants import *
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__all__ = ['verinfo']
|
||||
|
||||
verinfo = _dbapi.verinfo
|
184
devsupApp/src/devsup/_nullapi.py
Normal file
184
devsupApp/src/devsup/_nullapi.py
Normal file
@ -0,0 +1,184 @@
|
||||
|
||||
def verinfo():
|
||||
"""(VER, REV, MOD, PATH, "site") = verinfo()
|
||||
|
||||
Query EPICS version information
|
||||
"""
|
||||
return (0,0,0,0,'invalid')
|
||||
|
||||
class _Record(object):
|
||||
"""Handle for record operations
|
||||
|
||||
r = _Record("rec:name")
|
||||
"""
|
||||
|
||||
def __init__(self, rec):
|
||||
pass
|
||||
def name(self):
|
||||
"""Record name string.
|
||||
|
||||
>>> R = getRecord("my:record:name")
|
||||
>>> R.name()
|
||||
"my:record:name"
|
||||
"""
|
||||
def rtype(self):
|
||||
"""Record type name string.
|
||||
|
||||
>>> R = getRecord("my:record:name")
|
||||
>>> R.type()
|
||||
"longin"
|
||||
"""
|
||||
def isPyRecord(self):
|
||||
"""Is this record using Python device support.
|
||||
|
||||
:rtype: bool
|
||||
"""
|
||||
def info(self, key):
|
||||
"""info(key [,default])
|
||||
|
||||
:rtype: str
|
||||
:throws: KeyError
|
||||
|
||||
Lookup record info tag. If no default
|
||||
is provided then an exception is raised
|
||||
if the info key does not exist.
|
||||
"""
|
||||
def infos(self):
|
||||
"""Return a dictionary of all info tags
|
||||
for this record
|
||||
"""
|
||||
|
||||
def setSevr(self, sevr=3, stat=15):
|
||||
"""setSevr(sevr=INVALID_ALARM, stat=COMM_ALARM)
|
||||
|
||||
Signal a new alarm condition. The effect of this
|
||||
call depends on the current alarm condition.
|
||||
|
||||
See :c:func:`recGblSetSevr` in EPICS Base.
|
||||
"""
|
||||
|
||||
def scan(self, sync=False, reason=None, force=0):
|
||||
"""Scan this record.
|
||||
|
||||
:param sync: scan in current thread (``True``), or queue (``False``).
|
||||
:param reason: Reason object passed to :meth:`process <DeviceSupport.process>` (sync=True only)
|
||||
:param force: Record processing condtion (0=Passive, 1=Force, 2=I/O Intr)
|
||||
:throws: ``RuntimeError`` when ``sync=True``, but ``force`` prevents scanning.
|
||||
|
||||
If ``sync`` is False then a
|
||||
scan request is queued to run in another thread..
|
||||
If ``sync`` is True then the record
|
||||
is scannined immidately on the current thread.
|
||||
|
||||
For ``reason`` argument must be used in conjunction with ``sync=True``
|
||||
on records with Python device support. This provides a means
|
||||
of providing extra contextual information to the record's
|
||||
:meth:`process <DeviceSupport.process>` method.
|
||||
|
||||
``force`` is used to decide if the record will actuall be processed,
|
||||
``force=0`` will only process records with SCAN=Passive.
|
||||
``force=1`` will process any record if at all possible.
|
||||
``force=2`` will only process records with Python device support and
|
||||
SCAN=I/O Intr.
|
||||
|
||||
It is **never** safe to use ``sync=True`` while holding record locks,
|
||||
including from within a *process* method.
|
||||
"""
|
||||
|
||||
def asyncStart(self):
|
||||
"""Start asynchronous processing
|
||||
|
||||
This method may be called from a device support
|
||||
:meth:`process <DeviceSupport.process>` method
|
||||
to indicate that processing will continue
|
||||
later.
|
||||
|
||||
.. important::
|
||||
This method is **only** safe to call within a *process* method.
|
||||
"""
|
||||
def asyncFinish(self, reason=None):
|
||||
"""Indicate that asynchronous processing can complete
|
||||
|
||||
Similar to :meth:`scan`. Used to conclude asynchronous
|
||||
process started with :meth:`asyncStart`.
|
||||
|
||||
Processing is completed on the current thread.
|
||||
|
||||
.. important::
|
||||
This method should **never** be called within
|
||||
a :meth:`process <DeviceSupport.process>` method,
|
||||
or any other context where a Record lock is held.
|
||||
Doing so will result in a deadlock.
|
||||
|
||||
Typically a *reason* will be passed to *process* as a way
|
||||
of indicating that this is the completion of an async action. ::
|
||||
|
||||
AsyncDone = object()
|
||||
class MySup(object):
|
||||
def process(record, reason):
|
||||
if reason is AsyncDone:
|
||||
record.VAL = ... # store result
|
||||
else:
|
||||
threading.Timer(1.0, record.asyncFinish, kwargs={'reason':AsyncDone})
|
||||
record.asyncStart()
|
||||
"""
|
||||
|
||||
class _Field(object):
|
||||
"""Handle for field operations
|
||||
|
||||
f = Field("rec:name.HOPR")
|
||||
|
||||
Field objects implement the buffer protocol.
|
||||
"""
|
||||
def __init__(self, fld):
|
||||
pass
|
||||
def name(self):
|
||||
"""Fetch the record and field names.
|
||||
|
||||
>>> FLD = getRecord("rec").field("FLD")
|
||||
>>> FLD.name()
|
||||
("rec", "FLD")
|
||||
"""
|
||||
def fieldinfo(self):
|
||||
"""(type, size, #elements) = fieldinfo()
|
||||
|
||||
Type is DBF type code
|
||||
size is number of bytes to start a single element
|
||||
#elements is the maximum number of elements the field can hold
|
||||
"""
|
||||
|
||||
def getval(self):
|
||||
"""Fetch the current field value as a scalar.
|
||||
|
||||
:rtype: int, float, or str
|
||||
|
||||
Returned type depends of field DBF type.
|
||||
An ``int`` is returned for CHAR, SHORT, LONG, and ENUM.
|
||||
A ``float`` is returned for FLOAT and DOUBLE.
|
||||
A ``str`` is returned for STRING.
|
||||
"""
|
||||
|
||||
def putval(self, val):
|
||||
"""Update the field value
|
||||
|
||||
Must be an Int, Float or str. Strings will be truncated to 39 charactors.
|
||||
"""
|
||||
|
||||
def getarray(self):
|
||||
"""Return a numpy ndarray refering to this field for in-place operations.
|
||||
|
||||
The dtype of the ndarray will correspond to the field's DBF type.
|
||||
Its size will be the **maximum** number of elements.
|
||||
|
||||
.. important::
|
||||
It is only safe to read or write to this ndarray while the record
|
||||
lock is held (ie withing :meth:`process <DeviceSupport.process>`).
|
||||
"""
|
||||
|
||||
def getAlarm(self):
|
||||
"""Returns a tuple (severity, status) with the condtion of the linked field.
|
||||
|
||||
Only works for fields of type DBF_INLINK.
|
||||
"""
|
||||
|
||||
_hooks = {}
|
327
devsupApp/src/devsup/db.py
Normal file
327
devsupApp/src/devsup/db.py
Normal file
@ -0,0 +1,327 @@
|
||||
|
||||
import threading, sys, traceback, time
|
||||
|
||||
from devsup.util import Worker
|
||||
|
||||
try:
|
||||
import _dbapi
|
||||
except ImportError:
|
||||
import _nullapi as _dbapi
|
||||
|
||||
_rec_cache = {}
|
||||
_no_such_field = object()
|
||||
|
||||
__all__ = [
|
||||
'Record', 'getRecord',
|
||||
'Field',
|
||||
'IOScanListBlock',
|
||||
'IOScanListThread',
|
||||
]
|
||||
|
||||
def getRecord(name):
|
||||
"""Retrieve a :class:`Record` instance by the
|
||||
full record name.
|
||||
|
||||
The result is cached so the future calls will return the same instance.
|
||||
This is the prefered way to get :class:`Record` instances.
|
||||
|
||||
>>> R = getRecord("my:record:name")
|
||||
Record("my:record:name")
|
||||
"""
|
||||
try:
|
||||
return _rec_cache[name]
|
||||
except KeyError:
|
||||
rec = Record(name)
|
||||
_rec_cache[name] = rec
|
||||
return rec
|
||||
|
||||
class IOScanListBlock(object):
|
||||
"""A list of records which will be processed together.
|
||||
|
||||
This convienence class to handle the accounting to
|
||||
maintain a list of records.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(IOScanListBlock,self).__init__()
|
||||
self._recs, self._recs_add, self._recs_remove = set(), set(), set()
|
||||
self.force, self._running = 2, False
|
||||
|
||||
def add(self, rec):
|
||||
"""Add a record to the scan list.
|
||||
|
||||
This method is designed to be consistent
|
||||
with :meth:`allowScan <DeviceSupport.allowScan>`
|
||||
by returning its :meth:`remove` method.
|
||||
If fact this function can be completely delegated. ::
|
||||
|
||||
class MyDriver(util.StoppableThread):
|
||||
def __init__(self):
|
||||
super(MyDriver,self).__init__()
|
||||
self.lock = threading.Lock()
|
||||
self.scan1 = IOScanListBlock()
|
||||
def run(self):
|
||||
while self.shouldRun():
|
||||
time.sleep(1)
|
||||
with self.lock:
|
||||
self.scan1.interrupt()
|
||||
|
||||
class MySup(object):
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
def allowScan(rec):
|
||||
with self.driver.lock:
|
||||
return self.driver.scan1.add(rec)
|
||||
"""
|
||||
assert isinstance(rec, Record)
|
||||
|
||||
if self._running:
|
||||
self._recs_remove.discard(rec)
|
||||
self._recs_add.add(rec)
|
||||
|
||||
else:
|
||||
self._recs.add(rec)
|
||||
|
||||
return self.remove
|
||||
|
||||
def remove(self, rec):
|
||||
"""Remove a record from the scan list.
|
||||
"""
|
||||
if self._running:
|
||||
self._recs_add.discard(rec)
|
||||
self._recs_add._recs_remove(rec)
|
||||
|
||||
else:
|
||||
self._recs.discard(rec)
|
||||
|
||||
def interrupt(self, reason=None, mask=None):
|
||||
"""Scan the records in this list.
|
||||
|
||||
:param reason: Passed to :meth:`Record.scan`.
|
||||
:param mask: A *list* or *set* or records which should not be scanned.
|
||||
|
||||
This method will call :meth:`Record.scan` of each of the records
|
||||
currently in the list. This is done synchronously in the current
|
||||
thread. It should **never** be call when any record locks are held.
|
||||
"""
|
||||
self._running = True
|
||||
try:
|
||||
for R in self._recs:
|
||||
if mask and R in mask:
|
||||
continue
|
||||
R.scan(sync=True, reason=reason, force=self.force)
|
||||
finally:
|
||||
self._running = False
|
||||
if self._recs_add or self._recs_remove:
|
||||
assert len(self._recs_add.interaction(self._recs_remove))==0
|
||||
|
||||
self._recs.update(self._recs_add)
|
||||
self._recs_add.clear()
|
||||
self._recs.difference_update(self._recs_remove)
|
||||
self._recs_remove.clear()
|
||||
|
||||
|
||||
def _default_whendone(type, val, tb):
|
||||
if type or val or tb:
|
||||
traceback.print_exception(type, val, tb)
|
||||
|
||||
class IOScanListThread(IOScanListBlock):
|
||||
"""A list of records w/ a worker thread to run them.
|
||||
|
||||
All methods are thread-safe.
|
||||
"""
|
||||
_worker = None
|
||||
_worker_lock = threading.Lock()
|
||||
queuelength=100
|
||||
@classmethod
|
||||
def getworker(cls):
|
||||
with cls._worker_lock:
|
||||
if cls._worker is not None:
|
||||
return cls._worker
|
||||
import devsup.hooks
|
||||
T = Worker(max=cls.queuelength)
|
||||
devsup.hooks.addHook('AtIocExit', T.join)
|
||||
T.start()
|
||||
cls._worker = T
|
||||
return T
|
||||
|
||||
def __init__(self):
|
||||
super(IOScanListThread,self).__init__()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def add(self, rec):
|
||||
"""Add a record to the scan list.
|
||||
|
||||
This method is thread-safe and may be used
|
||||
without additional locking. ::
|
||||
|
||||
class MyDriver(util.StoppableThread):
|
||||
def __init__(self):
|
||||
super(MyDriver,self).__init__()
|
||||
self.scan1 = IOScanListThread()
|
||||
def run(self):
|
||||
while self.shouldRun():
|
||||
time.sleep(1)
|
||||
self.scan1.interrupt()
|
||||
|
||||
class MySup(object):
|
||||
def __init__(self, driver):
|
||||
self.driver = driver
|
||||
self.allowScan = self.driver.scan1.add
|
||||
"""
|
||||
with self._lock:
|
||||
return super(IOScanListThread,self).add(rec)
|
||||
|
||||
def remove(self, rec):
|
||||
with self._lock:
|
||||
return super(IOScanListThread,self).remove(rec)
|
||||
|
||||
def interrupt(self, reason=None, mask=None, whendone=_default_whendone):
|
||||
"""Queue a request to process the scan list.
|
||||
|
||||
:param reason: Passed to :meth:`Record.scan`.
|
||||
:param mask: A *list* or *set* or records which should not be scanned.
|
||||
:param whendone: A callable which will be invoked after all records are processed.
|
||||
:throws: RuntimeError is the request can't be queued.
|
||||
|
||||
Calling this method will cause a request to be sent to a
|
||||
worker thread. This method can be called several times
|
||||
to queue several requests.
|
||||
|
||||
If provided, the *whendone* callable is invoked with three arguments.
|
||||
These will be None except in the case an interrupt is raised in the
|
||||
worker in which case they are: exception type, value, and traceback.
|
||||
|
||||
.. note::
|
||||
This method may be safely called while record locks are held.
|
||||
"""
|
||||
W = self.getworker()
|
||||
W.add(self._X, (reason, mask, whendone))
|
||||
|
||||
def _X(self, reason, mask, whendone):
|
||||
try:
|
||||
#print 'XXXXX',self
|
||||
with self._lock:
|
||||
super(IOScanListThread,self).interrupt(reason, mask)
|
||||
finally:
|
||||
#print 'YYYYY',self,sys.exc_info()
|
||||
whendone(*sys.exc_info())
|
||||
|
||||
class Record(_dbapi._Record):
|
||||
def __init__(self, *args, **kws):
|
||||
super(Record, self).__init__(*args, **kws)
|
||||
super(Record, self).__setattr__('_fld_cache', {})
|
||||
|
||||
def field(self, name):
|
||||
"""Lookup field in this record
|
||||
|
||||
:rtype: :class:`Field`
|
||||
:throws: KeyError for non-existant fields.
|
||||
|
||||
The returned object is cached so future calls will
|
||||
return the same instance.
|
||||
|
||||
>>> getRecord("rec").field('HOPR')
|
||||
Field("rec.HOPR")
|
||||
"""
|
||||
try:
|
||||
F = self._fld_cache[name]
|
||||
if F is _no_such_field:
|
||||
raise ValueError()
|
||||
return F
|
||||
except KeyError as e:
|
||||
try:
|
||||
fld = Field("%s.%s"%(self.name(), name))
|
||||
except ValueError:
|
||||
self._fld_cache[name] = _no_such_field
|
||||
raise e
|
||||
else:
|
||||
self._fld_cache[name] = fld
|
||||
return fld
|
||||
|
||||
def setTime(self, ts):
|
||||
"""Set record timestamp.
|
||||
|
||||
:param ts: datetime, float, or (sec, nsec).
|
||||
|
||||
Has not effect if the TSE field is not set to -2.
|
||||
All inputs must be referenced to the posix epoch.
|
||||
"""
|
||||
if hasattr(ts, 'timetuple'):
|
||||
ts = time.mktime(ts.timetuple())
|
||||
|
||||
try:
|
||||
sec, nsec = ts
|
||||
except TypeError:
|
||||
sec = int(ts)
|
||||
nsec = int(ts*1e9)%1000000000
|
||||
|
||||
super(Record, self).setTime(sec, nsec)
|
||||
|
||||
def __getattr__(self, name):
|
||||
try:
|
||||
F = self.field(name)
|
||||
except ValueError:
|
||||
raise AttributeError('No such field')
|
||||
else:
|
||||
return F.getval()
|
||||
|
||||
def __setattr__(self, name, val):
|
||||
try:
|
||||
F=self.field(name)
|
||||
except ValueError:
|
||||
super(Record, self).__setattr__(name, val)
|
||||
else:
|
||||
F.putval(val)
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return 'Record("%s")'%self.name()
|
||||
|
||||
class Field(_dbapi._Field):
|
||||
@property
|
||||
def record(self):
|
||||
"""Fetch the :class:`Record` associated with this field
|
||||
"""
|
||||
try:
|
||||
return self._record
|
||||
except AttributeError:
|
||||
rec, _ = self.name()
|
||||
self._record = getRecord(rec)
|
||||
return self._record
|
||||
|
||||
def getTime(self):
|
||||
"""Get timestamp of link target.
|
||||
|
||||
Only works for DBF_INLINK fields.
|
||||
Returns the time in seconds since the posix epoch.
|
||||
|
||||
:rtype: float
|
||||
"""
|
||||
|
||||
def __cmp__(self, B):
|
||||
if isinstance(B, Field):
|
||||
B=B.getval()
|
||||
return cmp(self.getval(), B)
|
||||
|
||||
def __int__(self):
|
||||
return int(self.getval())
|
||||
def __long__(self):
|
||||
return long(self.getval())
|
||||
def __float__(self):
|
||||
return float(self.getval())
|
||||
|
||||
def __repr__(self):
|
||||
return 'Field("%s.%s")'%self.name()
|
||||
|
||||
def processLink(name, lstr):
|
||||
"""Process the INP or OUT link
|
||||
|
||||
Expects lstr to be "module arg1 arg2"
|
||||
|
||||
Returns (callable, Record, "arg1 arg2")
|
||||
"""
|
||||
rec = getRecord(name)
|
||||
parts = lstr.split(None,1)
|
||||
modname, args = parts[0], parts[1] if len(parts)>1 else None
|
||||
mod = __import__(modname, fromlist=['__should_not_exist'])
|
||||
return rec, mod.build(rec, args)
|
60
devsupApp/src/devsup/hooks.py
Normal file
60
devsupApp/src/devsup/hooks.py
Normal file
@ -0,0 +1,60 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
|
||||
try:
|
||||
import _dbapi
|
||||
except ImportError:
|
||||
import devsup._nullapi as _dbapi
|
||||
|
||||
__all__ = [
|
||||
"hooknames",
|
||||
"addHook",
|
||||
"debugHooks",
|
||||
]
|
||||
|
||||
hooknames = _dbapi._hooks.keys()
|
||||
|
||||
_revnames = dict([(v,k) for k,v in _dbapi._hooks.iteritems()])
|
||||
|
||||
_hooktable = defaultdict(list)
|
||||
|
||||
def addHook(state, func):
|
||||
"""addHook("stats", funcion)
|
||||
Add callable to IOC start sequence.
|
||||
|
||||
Callables are run in the order in
|
||||
which they were added (except for 'AtIocExit').
|
||||
|
||||
>>> def show():
|
||||
... print 'State Occurred'
|
||||
>>> addHook("AfterIocRunning", show)
|
||||
|
||||
An additional special hook 'AtIocExit' may be used
|
||||
for cleanup actions during IOC shutdown.
|
||||
"""
|
||||
sid = _dbapi._hooks[state]
|
||||
_hooktable[sid].append(func)
|
||||
|
||||
|
||||
def debugHooks():
|
||||
"""Install debugging print to hooks
|
||||
"""
|
||||
for h in hooknames:
|
||||
def _showstate(state=h):
|
||||
print('Reached state',state)
|
||||
addHook(h, _showstate)
|
||||
|
||||
def _runhook(sid):
|
||||
name = _revnames[sid]
|
||||
pop = -1 if name=='AtIocExit' else 0
|
||||
fns = _hooktable.get(sid)
|
||||
if fns is not None:
|
||||
while len(fns)>0:
|
||||
fn = fns.pop(pop)
|
||||
try:
|
||||
fn()
|
||||
except:
|
||||
print("Error running",name,"hook",fn)
|
||||
traceback.print_exc()
|
26
devsupApp/src/devsup/interfaces.py
Normal file
26
devsupApp/src/devsup/interfaces.py
Normal file
@ -0,0 +1,26 @@
|
||||
|
||||
from zope.interface import Interface, Attribute
|
||||
|
||||
class DeviceSupport(Interface):
|
||||
|
||||
raw = Attribute('True if this support modifies VAL instead of RVAL')
|
||||
|
||||
def detach(record):
|
||||
"""Disconnect from the record.
|
||||
|
||||
This is the last method called.
|
||||
"""
|
||||
|
||||
def allowScan(record):
|
||||
"""Return True to allow SCAN='I/O Intr'
|
||||
or False to prevent this.
|
||||
|
||||
If a callable object is returned then if
|
||||
will be invoked when I/O Intr scanning
|
||||
is disabled. A Record instance is passed
|
||||
as the first (and only) argument.
|
||||
"""
|
||||
|
||||
def process(record, reason):
|
||||
"""Callback for record processing action.
|
||||
"""
|
136
devsupApp/src/devsup/util.py
Normal file
136
devsupApp/src/devsup/util.py
Normal file
@ -0,0 +1,136 @@
|
||||
|
||||
from __future__ import print_function
|
||||
import threading, traceback
|
||||
try:
|
||||
import Queue as queue
|
||||
except ImportError:
|
||||
import queue
|
||||
|
||||
class StoppableThread(threading.Thread):
|
||||
"""A thread which can be requested to stop.
|
||||
|
||||
The thread run() method should periodically call the shouldRun()
|
||||
method and return if this yields False.
|
||||
|
||||
>>> class TestThread(StoppableThread):
|
||||
... def __init__(self):
|
||||
... super(TestThread,self).__init__()
|
||||
... self.E=threading.Event()
|
||||
... def run(self):
|
||||
... import time
|
||||
... self.cur = threading.current_thread()
|
||||
... self.E.set()
|
||||
... while self.shouldRun():
|
||||
... time.sleep(0.01)
|
||||
>>> T = TestThread()
|
||||
>>> T.start()
|
||||
>>> T.E.wait(1.0)
|
||||
True
|
||||
>>> T.cur is T
|
||||
True
|
||||
>>> T.join()
|
||||
>>> T.is_alive()
|
||||
False
|
||||
"""
|
||||
def __init__(self, max=0):
|
||||
super(StoppableThread, self).__init__()
|
||||
self.__stop = True
|
||||
self.__lock = threading.Lock()
|
||||
|
||||
def start(self):
|
||||
with self.__lock:
|
||||
self.__stop = False
|
||||
|
||||
super(StoppableThread, self).start()
|
||||
|
||||
def join(self):
|
||||
with self.__lock:
|
||||
self.__stop = True
|
||||
|
||||
super(StoppableThread, self).join()
|
||||
|
||||
def shouldRun(self):
|
||||
with self.__lock:
|
||||
return not self.__stop
|
||||
|
||||
class Worker(threading.Thread):
|
||||
"""A threaded work queue.
|
||||
|
||||
>>> w = Worker()
|
||||
>>> w.start()
|
||||
>>> w.join()
|
||||
>>> import threading
|
||||
>>> E = threading.Event()
|
||||
>>> w = Worker()
|
||||
>>> w.start()
|
||||
>>> w.add(E.set)
|
||||
True
|
||||
>>> E.wait(1.0)
|
||||
True
|
||||
>>> w.join()
|
||||
"""
|
||||
StopWorker = object()
|
||||
|
||||
def __init__(self, max=0):
|
||||
super(Worker, self).__init__()
|
||||
self._Q = queue.Queue(maxsize=max)
|
||||
self.__stop = False
|
||||
self.__lock = threading.Lock()
|
||||
|
||||
|
||||
def join(self, flush=True):
|
||||
"""Stop accepting new jobs and join the worker thread
|
||||
|
||||
Blocks until currently queued work is complete.
|
||||
"""
|
||||
with self.__lock:
|
||||
self.__stop = True
|
||||
self._Q.put((self.StopWorker,None,None))
|
||||
super(Worker, self).join()
|
||||
|
||||
def __len__(self):
|
||||
return self._Q.qsize()
|
||||
|
||||
def add(self, func, args=(), kws={}, block=True):
|
||||
"""Attempt to send a job to the worker.
|
||||
|
||||
:returns: True if the job was queued. False if the queue is full,
|
||||
or has been joined.
|
||||
|
||||
When ``block=True`` then this method will only return
|
||||
False if the Worker has been joined.
|
||||
"""
|
||||
with self.__lock:
|
||||
if self.__stop is True:
|
||||
return False
|
||||
try:
|
||||
self._Q.put((func,args,kws), block)
|
||||
return True
|
||||
except queue.Full:
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
Q = self._Q
|
||||
block = True
|
||||
try:
|
||||
while True:
|
||||
F, A, K = Q.get(block)
|
||||
|
||||
if F is self.StopWorker:
|
||||
block = False
|
||||
Q.task_done()
|
||||
continue
|
||||
|
||||
try:
|
||||
F(*A, **K)
|
||||
except:
|
||||
print('Error running',F,A,K)
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
Q.task_done()
|
||||
except queue.Empty:
|
||||
pass # We are done now
|
||||
|
||||
if __name__=='__main__':
|
||||
import doctest
|
||||
doctest.testmod()
|
Reference in New Issue
Block a user