diff --git a/secop/io.py b/secop/io.py index 695643f..d7e09df 100644 --- a/secop/io.py +++ b/secop/io.py @@ -118,6 +118,7 @@ class IOBase(Communicator): def earlyInit(self): super().earlyInit() + self._reconnectCallbacks = {} self._lock = threading.RLock() def connectStart(self): @@ -171,10 +172,7 @@ class IOBase(Communicator): if the callback fails or returns False, it is cleared """ - if self._reconnectCallbacks is None: - self._reconnectCallbacks = {name: func} - else: - self._reconnectCallbacks[name] = func + self._reconnectCallbacks[name] = func def callCallbacks(self): for key, cb in list(self._reconnectCallbacks.items()): diff --git a/secop/modules.py b/secop/modules.py index f7b4436..101f50e 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -24,7 +24,6 @@ import time -from queue import Queue, Empty import threading from collections import OrderedDict from functools import wraps @@ -144,6 +143,7 @@ class HasAccessibles(HasProperties): self.log.debug("read_%s returned %r", pname, value) except Exception as e: self.log.debug("read_%s failed with %r", pname, e) + self.announceUpdate(pname, None, e) raise if value is Done: return getattr(self, pname) @@ -228,6 +228,26 @@ class HasAccessibles(HasProperties): cls.configurables = res +class PollInfo: + def __init__(self, pollinterval, trigger_event): + self.interval = pollinterval + self.last_main = 0 + self.last_slow = 0 + self.last_error = None + self.polled_parameters = [] + self.fast_flag = False + self.trigger_event = trigger_event + + def trigger(self): + """trigger a recalculation of poll due times""" + self.trigger_event.set() + + def update_interval(self, pollinterval): + if not self.fast_flag: + self.interval = pollinterval + self.trigger() + + class Module(HasAccessibles): """basic module @@ -283,6 +303,8 @@ class Module(HasAccessibles): # reference to the dispatcher (used for sending async updates) DISPATCHER = None attachedModules = None + pollInfo = None + triggerPoll = None # trigger event for polls. used on io modules and modules without io def __init__(self, name, logger, cfgdict, srv): # remember the dispatcher object (for the async callbacks) @@ -296,8 +318,8 @@ class Module(HasAccessibles): self.initModuleDone = False self.startModuleDone = False self.remoteLogHandler = None - self.changePollinterval = Queue() # used for waiting between polls and transmit info to the thread self.accessLock = threading.RLock() + self.polledModules = [] # modules polled by thread started in self.startModules errors = [] # handle module properties @@ -568,6 +590,13 @@ class Module(HasAccessibles): def initModule(self): """initialise module with stuff to be done after all modules are created""" self.initModuleDone = True + if self.enablePoll or self.writeDict: + # enablePoll == False: we still need the poll thread for writing values from writeDict + if hasattr(self, 'io'): + self.io.polledModules.append(self) + else: + self.triggerPoll = threading.Event() + self.polledModules = [self] def startModule(self, start_events): """runs after init of all modules @@ -578,9 +607,8 @@ class Module(HasAccessibles): registers it in the server for waiting defaults to 30 seconds """ - if self.enablePoll or self.writeDict: - # enablePoll == False: start poll thread for writing values from writeDict only - mkthread(self.__pollThread, start_events.get_trigger()) + if self.polledModules: + mkthread(self.__pollThread, self.polledModules, start_events.get_trigger()) self.startModuleDone = True def doPoll(self): @@ -589,17 +617,16 @@ class Module(HasAccessibles): all other parameters are polled automatically """ - def setFastPoll(self, pollinterval): + def setFastPoll(self, flag, fast_interval=0.25): """change poll interval - :param pollinterval: a new (typically lower) pollinterval - special values: True: set to 0.25 (default fast poll interval) - False: set to self.pollinterval (value for idle) + :param flag: enable/disable fast poll mode + :param fast_interval: fast poll interval """ - if pollinterval is False: - self.changePollinterval.put(self.pollinterval) - return - self.changePollinterval.put(0.25 if pollinterval is True else pollinterval) + if self.pollInfo: + self.pollInfo.fast_flag = flag + self.pollInfo.interval = fast_interval if flag else self.pollinterval + self.pollInfo.trigger() def callPollFunc(self, rfunc): """call read method with proper error handling""" @@ -612,64 +639,92 @@ class Module(HasAccessibles): except Exception: self.log.error(formatException()) - def __pollThread(self, started_callback): - self.writeInitParams() - if not self.enablePoll: + def __pollThread(self, modules, started_callback): + """poll thread body + + :param modules: list of modules to be handled by this thread + :param started_callback: to be called after all polls are done once + + before polling, parameters which need hardware initialisation are written + """ + for mobj in modules: + mobj.writeInitParams() + modules = [m for m in modules if m.enablePoll] + if not modules: # no polls needed - exit thread + started_callback() return - polled_parameters = [] + if hasattr(self, 'registerReconnectCallback'): + # self is a communicator supporting reconnections + def trigger_all(trg=self.triggerPoll, polled_modules=modules): + for m in polled_modules: + m.pollInfo.last_main = 0 + m.pollInfo.last_slow = 0 + trg.set() + self.registerReconnectCallback('trigger_polls', trigger_all) + # collect and call all read functions a first time - for pname, pobj in self.parameters.items(): - rfunc = getattr(self, 'read_' + pname) - if rfunc.poll: - polled_parameters.append((rfunc, pobj)) - self.callPollFunc(rfunc) + for mobj in modules: + pinfo = mobj.pollInfo = PollInfo(mobj.pollinterval, self.triggerPoll) + # trigger a poll interval change when self.pollinterval changes. + if 'pollinterval' in mobj.valueCallbacks: + mobj.valueCallbacks['pollinterval'].append(pinfo.update_interval) + + for pname, pobj in mobj.parameters.items(): + rfunc = getattr(mobj, 'read_' + pname) + if rfunc.poll: + pinfo.polled_parameters.append((mobj, rfunc, pobj)) + mobj.callPollFunc(rfunc) started_callback() - pollinterval = self.pollinterval - last_slow = last_main = 0 - last_error = None - error_count = 0 to_poll = () while True: now = time.time() - wait_main = last_main + pollinterval - now - wait_slow = last_slow + self.slowinterval - now - wait_time = min(wait_main, wait_slow) + wait_time = 999 + for mobj in modules: + pinfo = mobj.pollInfo + wait_time = min(pinfo.last_main + pinfo.interval - now, wait_time, + pinfo.last_slow + mobj.slowinterval - now) if wait_time > 0: - try: - result = self.changePollinterval.get(timeout=wait_time) - except Empty: - result = None - if result is not None: - pollinterval = result + self.triggerPoll.wait(wait_time) + self.triggerPoll.clear() continue - # call doPoll, if due - if wait_main <= 0: - last_main = (now // pollinterval) * pollinterval - try: - self.doPoll() - if last_error and error_count > 1: - self.log.info('recovered after %d calls to doPoll (%r)', error_count, last_error) - last_error = None - except Exception as e: - if repr(e) != last_error: - error_count = 0 - self.log.error('error in doPoll: %r', e) - error_count += 1 - last_error = repr(e) + # call doPoll of all modules where due + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_main + pinfo.interval: + pinfo.last_main = (now // pinfo.interval) * pinfo.interval + try: + mobj.doPoll() + pinfo.last_error = None + except Exception as e: + if str(e) != str(pinfo.last_error) and not isinstance(e, SilentError): + mobj.log.error('doPoll: %r', e) + pinfo.last_error = e now = time.time() # find ONE due slow poll and call it loop = True while loop: # loops max. 2 times, when to_poll is at end - for rfunc, pobj in to_poll: - if now > pobj.timestamp + self.slowinterval * 0.5: - self.callPollFunc(rfunc) - loop = False + for mobj, rfunc, pobj in to_poll: + if now > pobj.timestamp + mobj.slowinterval * 0.5: + try: + prev_err = pobj.readerror + rfunc() + except Exception as e: + if not isinstance(e, SilentError) and str(pobj.readerror) != str(prev_err): + mobj.log.error('%s: %r', pobj.name, e) + loop = False # one poll done break else: - if now < last_slow + self.slowinterval: - break - last_slow = (now // self.slowinterval) * self.slowinterval - to_poll = iter(polled_parameters) + to_poll = [] + # collect due slow polls + for mobj in modules: + pinfo = mobj.pollInfo + if now > pinfo.last_slow + mobj.slowinterval: + to_poll.extend(pinfo.polled_parameters) + pinfo.last_slow = (now // mobj.slowinterval) * mobj.slowinterval + if to_poll: + to_poll = iter(to_poll) + else: + loop = False # no slow polls ready def writeInitParams(self, started_callback=None): """write values for parameters with configured values @@ -722,12 +777,6 @@ class Readable(Module): pollinterval = Parameter('default poll interval', FloatRange(0.1, 120), default=5, readonly=False, export=True) - def earlyInit(self): - super().earlyInit() - # trigger a poll interval change when self.pollinterval changes. - # self.setFastPoll with a float argument does the job here - self.valueCallbacks['pollinterval'].append(self.setFastPoll) - def doPoll(self): self.read_value() self.read_status() diff --git a/secop/properties.py b/secop/properties.py index f12d5a9..d491f65 100644 --- a/secop/properties.py +++ b/secop/properties.py @@ -136,10 +136,11 @@ class HasProperties(HasDescriptors): properties.update({k: v for k, v in base.__dict__.items() if isinstance(v, Property)}) cls.propertyDict = properties # treat overriding properties with bare values - for pn, po in properties.items(): + for pn, po in list(properties.items()): value = getattr(cls, pn, po) - if not isinstance(value, (Property, HasProperties)): # attribute may be a bare value - # HasProperties is a base class of Parameter -> allow a Parameter to override a Property () + if isinstance(value, HasProperties): # value is a Parameter, allow override + properties.pop(pn) + elif not isinstance(value, Property): # attribute may be a bare value po = po.copy() try: # try to apply bare value to Property diff --git a/test/test_modules.py b/test/test_modules.py index 7760404..c2230e1 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -69,6 +69,7 @@ class ServerStub: class DummyMultiEvent(threading.Event): def get_trigger(self): + def trigger(event=self): event.set() sys.exit() @@ -80,8 +81,9 @@ def test_Communicator(): o.earlyInit() o.initModule() event = DummyMultiEvent() + o.initModule() o.startModule(event) - assert event.is_set() # event should be set immediately + assert event.wait(timeout=0.1) def test_ModuleMagic(): @@ -193,8 +195,9 @@ def test_ModuleMagic(): assert updates.pop('o1') == expectedBeforeStart o1.earlyInit() event = DummyMultiEvent() + o1.initModule() o1.startModule(event) - event.wait() + assert event.wait(timeout=0.1) # should contain polled values expectedAfterStart = { 'status': (Drivable.Status.IDLE, 'ok'), 'value': 'second', @@ -209,8 +212,9 @@ def test_ModuleMagic(): assert updates.pop('o2') == expectedBeforeStart o2.earlyInit() event = DummyMultiEvent() + o2.initModule() o2.startModule(event) - event.wait() + assert event.wait(timeout=0.1) # value has changed type, b2 and a1 are written expectedAfterStart.update(value=0, b2='EMPTY', a1=True) # ramerk: a1=True: this behaviour is a Porgamming error diff --git a/test/test_poller.py b/test/test_poller.py index ae94e29..bea914c 100644 --- a/test/test_poller.py +++ b/test/test_poller.py @@ -23,6 +23,7 @@ import sys import threading +from time import time as current_time import time import logging @@ -33,43 +34,19 @@ from secop.lib.multievent import MultiEvent class Time: - STARTTIME = 1000 # artificial time zero - + """artificial time, forwarded on sleep instead of waiting""" def __init__(self): - self.reset() - self.finish = float('inf') - self.stop = lambda : None - self.commtime = 0.05 # time needed for 1 poll - - def reset(self, lifetime=10): - self.seconds = self.STARTTIME - self.idletime = 0.0 - self.busytime = 0.0 - self.finish = self.STARTTIME + lifetime + self.offset = 0 def time(self): - if self.seconds > self.finish: - self.finish = float('inf') - self.stop() - return self.seconds + return current_time() + self.offset def sleep(self, seconds): assert 0 <= seconds <= 24*3600 - self.idletime += seconds - self.seconds += seconds - - def busy(self, seconds): - assert seconds >= 0 - self.seconds += seconds - self.busytime += seconds + self.offset += seconds -artime = Time() # artificial test time - - -class Event(threading.Event): - def wait(self, timeout=None): - artime.sleep(max(0, timeout)) +artime = Time() # artificial test time class DispatcherStub: @@ -97,13 +74,25 @@ class Base(Module): srv = ServerStub() super().__init__('mod', logging.getLogger('dummy'), dict(description=''), srv) self.dispatcher = srv.dispatcher - self.nextPollEvent = Event() def run(self, maxcycles): self.dispatcher.maxcycles = maxcycles self.dispatcher.finish_event = threading.Event() + self.initModule() + + def wait(timeout=None, base=self.triggerPoll): + """simplified simulation + + when an event is already set return True, else forward artificial time + """ + if base.is_set(): + return True + artime.sleep(max(0.0, 99.9 if timeout is None else timeout)) + return base.is_set() + + self.triggerPoll.wait = wait self.startModule(MultiEvent()) - self.dispatcher.finish_event.wait(1) + assert self.dispatcher.finish_event.wait(1) class Mod1(Base, Readable): @@ -132,14 +121,13 @@ class Mod1(Base, Readable): @pytest.mark.parametrize( 'ncycles, pollinterval, slowinterval, mspan, pspan', - [ # normal case: 5+-1 15+-1 - ( 60, 5, 15, (4, 6), (14, 16)), - # pollinterval faster then reading: mspan max 3 s (polls of value, status and ONE other parameter) - ( 60, 1, 5, (1, 3), (5, 16)), + [ # normal case: + ( 60, 5, 15, (4.9, 5.1), (14, 16)), + # pollinterval faster then reading: mspan max ~ 3 s (polls of value, status and ONE other parameter) + ( 60, 1, 5, (0.9, 3.1), (5, 17)), ]) def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch): monkeypatch.setattr(time, 'time', artime.time) - artime.reset() m = Mod1() m.pollinterval = pollinterval m.slowInterval = slowinterval @@ -148,18 +136,18 @@ def test_poll(ncycles, pollinterval, slowinterval, mspan, pspan, monkeypatch): for pname in ['value', 'status']: pobj = m.parameters[pname] lowcnt = 0 + print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])]) for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]): if t2 - t1 < mspan[0]: - print(t2 - t1) lowcnt += 1 assert t2 - t1 <= mspan[1] - assert lowcnt <= 1 + assert lowcnt <= 2 for pname in ['param1', 'param2', 'param3']: pobj = m.parameters[pname] lowcnt = 0 + print(pname, [t2 - t1 for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1])]) for t1, t2 in zip(pobj.stat[1:], pobj.stat[2:-1]): if t2 - t1 < pspan[0]: - print(pname, t2 - t1) lowcnt += 1 assert t2 - t1 <= pspan[1] - assert lowcnt <= 1 + assert lowcnt <= 2