poll all parameters after reconnect

poller.Poller triggers polling of all parameters right after its iodev
reconnects, if the iodev supports registerReconnectCallback and
has the is_connected parameter

Change-Id: I59bb05cefdbea5efd4f3966ffe5237a75c2174bf
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21995
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
This commit is contained in:
2019-12-11 13:14:21 +01:00
parent bc089d2ae0
commit 07eb252b78
3 changed files with 54 additions and 9 deletions

View File

@ -113,7 +113,8 @@ class Poller(PollerBase):
def __init__(self, name): def __init__(self, name):
'''create a poller''' '''create a poller'''
self.queues = {polltype: [] for polltype in self.DEFAULT_FACTORS} self.queues = {polltype: [] for polltype in self.DEFAULT_FACTORS}
self._stopped = Event() self._event = Event()
self._stopped = False
self.maxwait = 3600 self.maxwait = 3600
self.name = name self.name = name
@ -142,6 +143,11 @@ class Poller(PollerBase):
if not hasattr(module, 'pollinterval'): if not hasattr(module, 'pollinterval'):
raise ProgrammingError("module %s must have a pollinterval" raise ProgrammingError("module %s must have a pollinterval"
% module.name) % module.name)
if pname == 'is_connected':
if hasattr(module, 'registerReconnectCallback'):
module.registerReconnectCallback(self.name, self.trigger_all)
else:
module.log.warning("%r has 'is_connected' but no 'registerReconnectCallback'" % module)
if polltype == AUTO: # covers also pobj.poll == True if polltype == AUTO: # covers also pobj.poll == True
if pname in ('value', 'status'): if pname in ('value', 'status'):
polltype = DYNAMIC polltype = DYNAMIC
@ -184,7 +190,10 @@ class Poller(PollerBase):
else: else:
interval = module.pollinterval * factor interval = module.pollinterval * factor
mininterval = interval mininterval = interval
due = max(lastdue + interval, pobj.timestamp + interval * 0.5) if due == 0:
due = now # do not look at timestamp after trigger_all
else:
due = max(lastdue + interval, pobj.timestamp + interval * 0.5)
if now >= due: if now >= due:
module.pollOneParam(pname) module.pollOneParam(pname)
done = True done = True
@ -194,6 +203,13 @@ class Poller(PollerBase):
heapreplace(queue, (due, lastdue, pollitem)) heapreplace(queue, (due, lastdue, pollitem))
return 0 return 0
def trigger_all(self):
for _, queue in sorted(self.queues.items()):
for idx, (_, lastdue, pollitem) in enumerate(queue):
queue[idx] = (0, lastdue, pollitem)
self._event.set()
return True
def run(self, started_callback): def run(self, started_callback):
'''start poll loop '''start poll loop
@ -222,7 +238,7 @@ class Poller(PollerBase):
heapify(queue) heapify(queue)
started_callback() # signal end of startup started_callback() # signal end of startup
nregular = len(self.queues[REGULAR]) nregular = len(self.queues[REGULAR])
while not self._stopped.is_set(): while not self._stopped:
due = float('inf') due = float('inf')
for _ in range(nregular): for _ in range(nregular):
due = min(self.poll_next(DYNAMIC), self.poll_next(REGULAR)) due = min(self.poll_next(DYNAMIC), self.poll_next(REGULAR))
@ -231,10 +247,12 @@ class Poller(PollerBase):
due = min(due, self.poll_next(DYNAMIC), self.poll_next(SLOW)) due = min(due, self.poll_next(DYNAMIC), self.poll_next(SLOW))
delay = due - time.time() delay = due - time.time()
if delay > 0: if delay > 0:
self._stopped.wait(delay) self._event.wait(delay)
self._event.clear()
def stop(self): def stop(self):
self._stopped.set() self._event.set()
self._stopped = True
def __bool__(self): def __bool__(self):
'''is there any poll item?''' '''is there any poll item?'''

View File

@ -73,12 +73,14 @@ class StringIO(Communicator):
argument=ArrayOf(StringType()), result= ArrayOf(StringType())) argument=ArrayOf(StringType()), result= ArrayOf(StringType()))
} }
_reconnectCallbacks = None
def earlyInit(self): def earlyInit(self):
self._stream = None self._stream = None
self._lock = threading.RLock() self._lock = threading.RLock()
self._end_of_line = self.end_of_line.encode(self.encoding) self._end_of_line = self.end_of_line.encode(self.encoding)
self._connect_error = None self._connect_error = None
self._last_error = 'not connected' self._last_error = None
def createConnection(self): def createConnection(self):
"""create connection """create connection
@ -111,7 +113,7 @@ class StringIO(Communicator):
if timeout is None or timeout < 0: if timeout is None or timeout < 0:
raise ValueError('illegal timeout %r' % timeout) raise ValueError('illegal timeout %r' % timeout)
if not self.is_connected: if not self.is_connected:
raise CommunicationSilentError(self._last_error) raise CommunicationSilentError(self._last_error or 'not connected')
self._stream.settimeout(timeout) self._stream.settimeout(timeout)
try: try:
reply = self._stream.recv(4096) reply = self._stream.recv(4096)
@ -164,6 +166,7 @@ class StringIO(Communicator):
if self._last_error: if self._last_error:
self.log.info('connected') self.log.info('connected')
self._last_error = 'connected' self._last_error = 'connected'
self.callCallbacks()
return Done return Done
except Exception as e: except Exception as e:
if str(e) == self._last_error: if str(e) == self._last_error:
@ -192,6 +195,26 @@ class StringIO(Communicator):
raise CommunicationFailedError('bad response: %s does not match %s' % raise CommunicationFailedError('bad response: %s does not match %s' %
(reply, regexp)) (reply, regexp))
def registerReconnectCallback(self, name, func):
"""register reconnect callback
if the callback fails or returns False, it is cleared
"""
if self._reconnectCallbacks is None:
self._reconnectCallbacks = {name: func}
else:
self._reconnectCallbacks[name] = func
def callCallbacks(self):
for key, cb in list(self._reconnectCallbacks.items()):
try:
removeme = not cb()
except Exception as e:
self.log.error('callback: %s' % e)
removeme = True
if removeme:
self._reconnectCallbacks.pop(key)
def do_communicate(self, command): def do_communicate(self, command):
'''send a command and receive a reply '''send a command and receive a reply

View File

@ -73,7 +73,10 @@ class Event:
artime.sleep(max(0,timeout)) artime.sleep(max(0,timeout))
def set(self): def set(self):
self.flag=True self.flag = True
def clear(self):
self.flag = False
def is_set(self): def is_set(self):
return self.flag return self.flag
@ -198,7 +201,7 @@ def test_Poller(modules):
assert len(pollTable) == 1 assert len(pollTable) == 1
poller = pollTable[(Poller, 'common_iodev')] poller = pollTable[(Poller, 'common_iodev')]
artime.stop = poller.stop artime.stop = poller.stop
poller._stopped = Event() # patch Event.wait poller._event = Event() # patch Event.wait
assert (sum(count.values()) > 0) == bool(poller) assert (sum(count.values()) > 0) == bool(poller)
@ -233,6 +236,7 @@ def test_Poller(modules):
for module in modules: for module in modules:
for pobj in module.parameters.values(): for pobj in module.parameters.values():
if pobj.poll: if pobj.poll:
assert pobj.cnt > 0
assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1 assert pobj.maxspan <= maxspan[pobj.polltype] * 1.1
assert (pobj.cnt + 1) * pobj.interval >= total * 0.99 assert (pobj.cnt + 1) * pobj.interval >= total * 0.99
assert abs(pobj.span - pobj.interval) < 0.01 assert abs(pobj.span - pobj.interval) < 0.01