improvements on secop.client.Client

- add unregister_callback
  (this is needed for the SECoP client in nicos)
- improve error handling
- imporve error handling in AsynTcp
- also improve error handling on StringIO

Change-Id: If4f3632a93cbc0e7fbc55a966e09fcc3e69c09b7
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22852
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2020-04-03 07:53:33 +02:00
parent 3d2333f731
commit fbc0270b15
3 changed files with 43 additions and 23 deletions

View File

@ -145,6 +145,21 @@ class ProxyClient:
if kwds: if kwds:
raise TypeError('unknown callback: %s' % (', '.join(kwds))) raise TypeError('unknown callback: %s' % (', '.join(kwds)))
def unregister_callback(self, key, *args, **kwds):
"""unregister a callback
for the arguments see register_callback
"""
for cbfunc in args:
kwds[cbfunc.__name__] = cbfunc
for cbname, func in kwds.items():
cblist = self.callbacks[cbname][key]
if func in cblist:
cblist.remove(func)
if not cblist:
self.callbacks[cbname].pop(key)
def callback(self, key, cbname, *args): def callback(self, key, cbname, *args):
"""perform callbacks """perform callbacks
@ -332,6 +347,8 @@ class SecopClient(ProxyClient):
self._rxthread = None self._rxthread = None
self.disconnect(False) self.disconnect(False)
if self._shutdown:
return
if self.activate: if self.activate:
self.log.info('try to reconnect to %s', self.uri) self.log.info('try to reconnect to %s', self.uri)
self._connthread = mkthread(self._reconnect) self._connthread = mkthread(self._reconnect)
@ -440,10 +457,11 @@ class SecopClient(ProxyClient):
self.modules[modname] = dict(accessibles=accessibles, parameters=parameters, self.modules[modname] = dict(accessibles=accessibles, parameters=parameters,
commands=commands, properties=properties) commands=commands, properties=properties)
if changed_modules is not None: if changed_modules is not None:
done = self.callback(None, 'descriptiveDataChange', None, self) done = done_main = self.callback(None, 'descriptiveDataChange', None, self)
for mname in changed_modules: for mname in changed_modules:
if not self.callback(mname, 'descriptiveDataChange', mname, self): if not self.callback(mname, 'descriptiveDataChange', mname, self):
self.log.warning('descriptive data changed on module %r', mname) if not done_main:
self.log.warning('descriptive data changed on module %r', mname)
done = True done = True
if not done: if not done:
self.log.warning('descriptive data of %r changed', self.nodename) self.log.warning('descriptive data of %r changed', self.nodename)

View File

@ -23,8 +23,9 @@
"""asynchronous connections """asynchronous connections
generic class for byte oriented communication generic class for byte oriented communication
includes implementation for TCP connections includes implementation for TCP and Serial connections
support for asynchronous communication, but may be used also for StringIO support for asynchronous communication, but may be used also for
synchronous IO (see secop.stringio.StringIO)
""" """
import socket import socket
@ -34,7 +35,7 @@ import ast
from serial import Serial from serial import Serial
from secop.lib import parseHostPort, tcpSocket, closeSocket from secop.lib import parseHostPort, tcpSocket, closeSocket
from secop.errors import ConfigError from secop.errors import ConfigError, CommunicationFailedError
class ConnectionClosed(ConnectionError): class ConnectionClosed(ConnectionError):
@ -133,7 +134,11 @@ class AsynTcp(AsynConn):
if uri.startswith('tcp://'): if uri.startswith('tcp://'):
# should be the case always # should be the case always
uri = uri[6:] uri = uri[6:]
self.connection = tcpSocket(uri, self.defaultport, self.timeout) try:
self.connection = tcpSocket(uri, self.defaultport, self.timeout)
except (ConnectionRefusedError, socket.gaierror) as e:
# indicate that retrying might make sense
raise CommunicationFailedError(str(e))
def disconnect(self): def disconnect(self):
if self.connection: if self.connection:
@ -215,6 +220,7 @@ class AsynSerial(AsynConn):
self.connection = Serial(dev, **options) self.connection = Serial(dev, **options)
except ValueError as e: except ValueError as e:
raise ConfigError(e) raise ConfigError(e)
# TODO: turn exceptions into ConnectionFailedError, where a retry makes sense
def disconnect(self): def disconnect(self):
if self.connection: if self.connection:

View File

@ -74,19 +74,13 @@ class StringIO(Communicator):
self._conn = None self._conn = None
self._lock = threading.RLock() self._lock = threading.RLock()
self._end_of_line = self.end_of_line.encode(self.encoding) self._end_of_line = self.end_of_line.encode(self.encoding)
self._connect_error = None
self._last_error = None self._last_error = None
def connectStart(self): def connectStart(self):
if not self.is_connected: if not self.is_connected:
uri = self.uri uri = self.uri
try: self._conn = AsynConn(uri, self._end_of_line)
self._conn = AsynConn(uri, self._end_of_line) self.is_connected = True
self.is_connected = True
except Exception as e:
# this is really bad, do not try again
self._connect_error = e
raise
for command, regexp in self.identification: for command, regexp in self.identification:
reply = self.do_communicate(command) reply = self.do_communicate(command)
if not re.match(regexp, reply): if not re.match(regexp, reply):
@ -157,7 +151,7 @@ class StringIO(Communicator):
"""send a command and receive a reply """send a command and receive a reply
using end_of_line, encoding and self._lock using end_of_line, encoding and self._lock
for commands without reply, join it with a query command, for commands without reply, the command must be joined with a query command,
wait_before is respected for end_of_lines within a command. wait_before is respected for end_of_lines within a command.
""" """
if not self.is_connected: if not self.is_connected:
@ -170,17 +164,18 @@ class StringIO(Communicator):
else: else:
cmds = [command] cmds = [command]
garbage = None garbage = None
for cmd in cmds:
if self.wait_before:
time.sleep(self.wait_before)
if garbage is None: # read garbage only once
garbage = self._conn.flush_recv()
if garbage:
self.log.debug('garbage: %s', garbage.decode(self.encoding))
self._conn.send((cmd + self.end_of_line).encode(self.encoding))
try: try:
for cmd in cmds:
if self.wait_before:
time.sleep(self.wait_before)
if garbage is None: # read garbage only once
garbage = self._conn.flush_recv()
if garbage:
self.log.debug('garbage: %s', garbage.decode(self.encoding))
self._conn.send((cmd + self.end_of_line).encode(self.encoding))
reply = self._conn.readline(self.timeout) reply = self._conn.readline(self.timeout)
except ConnectionClosed: except ConnectionClosed:
self.closeConnection()
raise CommunicationFailedError('disconnected') raise CommunicationFailedError('disconnected')
reply = reply.decode(self.encoding) reply = reply.decode(self.encoding)
self.log.debug('recv: %s', reply) self.log.debug('recv: %s', reply)
@ -221,6 +216,7 @@ class HasIodev(Module):
try: try:
self._iodev.read_is_connected() self._iodev.read_is_connected()
except (CommunicationFailedError, AttributeError): except (CommunicationFailedError, AttributeError):
# AttributeError: for missing _iodev?
pass pass
super().initModule() super().initModule()