diff --git a/secop/client/__init__.py b/secop/client/__init__.py index f0b54c4..d3c9729 100644 --- a/secop/client/__init__.py +++ b/secop/client/__init__.py @@ -145,6 +145,21 @@ class ProxyClient: if kwds: raise TypeError('unknown callback: %s' % (', '.join(kwds))) + def unregister_callback(self, key, *args, **kwds): + """unregister a callback + + for the arguments see register_callback + """ + for cbfunc in args: + kwds[cbfunc.__name__] = cbfunc + for cbname, func in kwds.items(): + cblist = self.callbacks[cbname][key] + if func in cblist: + cblist.remove(func) + if not cblist: + self.callbacks[cbname].pop(key) + + def callback(self, key, cbname, *args): """perform callbacks @@ -332,6 +347,8 @@ class SecopClient(ProxyClient): self._rxthread = None self.disconnect(False) + if self._shutdown: + return if self.activate: self.log.info('try to reconnect to %s', self.uri) self._connthread = mkthread(self._reconnect) @@ -440,10 +457,11 @@ class SecopClient(ProxyClient): self.modules[modname] = dict(accessibles=accessibles, parameters=parameters, commands=commands, properties=properties) if changed_modules is not None: - done = self.callback(None, 'descriptiveDataChange', None, self) + done = done_main = self.callback(None, 'descriptiveDataChange', None, self) for mname in changed_modules: if not self.callback(mname, 'descriptiveDataChange', mname, self): - self.log.warning('descriptive data changed on module %r', mname) + if not done_main: + self.log.warning('descriptive data changed on module %r', mname) done = True if not done: self.log.warning('descriptive data of %r changed', self.nodename) diff --git a/secop/lib/asynconn.py b/secop/lib/asynconn.py index 5569fe2..8e68624 100644 --- a/secop/lib/asynconn.py +++ b/secop/lib/asynconn.py @@ -23,8 +23,9 @@ """asynchronous connections generic class for byte oriented communication -includes implementation for TCP connections -support for asynchronous communication, but may be used also for StringIO +includes implementation for TCP and Serial connections +support for asynchronous communication, but may be used also for +synchronous IO (see secop.stringio.StringIO) """ import socket @@ -34,7 +35,7 @@ import ast from serial import Serial from secop.lib import parseHostPort, tcpSocket, closeSocket -from secop.errors import ConfigError +from secop.errors import ConfigError, CommunicationFailedError class ConnectionClosed(ConnectionError): @@ -133,7 +134,11 @@ class AsynTcp(AsynConn): if uri.startswith('tcp://'): # should be the case always uri = uri[6:] - self.connection = tcpSocket(uri, self.defaultport, self.timeout) + try: + self.connection = tcpSocket(uri, self.defaultport, self.timeout) + except (ConnectionRefusedError, socket.gaierror) as e: + # indicate that retrying might make sense + raise CommunicationFailedError(str(e)) def disconnect(self): if self.connection: @@ -215,6 +220,7 @@ class AsynSerial(AsynConn): self.connection = Serial(dev, **options) except ValueError as e: raise ConfigError(e) + # TODO: turn exceptions into ConnectionFailedError, where a retry makes sense def disconnect(self): if self.connection: diff --git a/secop/stringio.py b/secop/stringio.py index adcc67b..cb6bd9f 100644 --- a/secop/stringio.py +++ b/secop/stringio.py @@ -74,19 +74,13 @@ class StringIO(Communicator): self._conn = None self._lock = threading.RLock() self._end_of_line = self.end_of_line.encode(self.encoding) - self._connect_error = None self._last_error = None def connectStart(self): if not self.is_connected: uri = self.uri - try: - self._conn = AsynConn(uri, self._end_of_line) - self.is_connected = True - except Exception as e: - # this is really bad, do not try again - self._connect_error = e - raise + self._conn = AsynConn(uri, self._end_of_line) + self.is_connected = True for command, regexp in self.identification: reply = self.do_communicate(command) if not re.match(regexp, reply): @@ -157,7 +151,7 @@ class StringIO(Communicator): """send a command and receive a reply using end_of_line, encoding and self._lock - for commands without reply, join it with a query command, + for commands without reply, the command must be joined with a query command, wait_before is respected for end_of_lines within a command. """ if not self.is_connected: @@ -170,17 +164,18 @@ class StringIO(Communicator): else: cmds = [command] garbage = None - for cmd in cmds: - if self.wait_before: - time.sleep(self.wait_before) - if garbage is None: # read garbage only once - garbage = self._conn.flush_recv() - if garbage: - self.log.debug('garbage: %s', garbage.decode(self.encoding)) - self._conn.send((cmd + self.end_of_line).encode(self.encoding)) try: + for cmd in cmds: + if self.wait_before: + time.sleep(self.wait_before) + if garbage is None: # read garbage only once + garbage = self._conn.flush_recv() + if garbage: + self.log.debug('garbage: %s', garbage.decode(self.encoding)) + self._conn.send((cmd + self.end_of_line).encode(self.encoding)) reply = self._conn.readline(self.timeout) except ConnectionClosed: + self.closeConnection() raise CommunicationFailedError('disconnected') reply = reply.decode(self.encoding) self.log.debug('recv: %s', reply) @@ -221,6 +216,7 @@ class HasIodev(Module): try: self._iodev.read_is_connected() except (CommunicationFailedError, AttributeError): + # AttributeError: for missing _iodev? pass super().initModule()