diff --git a/secop/errors.py b/secop/errors.py index 30ad77b..e262425 100644 --- a/secop/errors.py +++ b/secop/errors.py @@ -98,6 +98,14 @@ class CommunicationFailedError(SECoPError): pass +class SilentError(SECoPError): + pass + + +class CommunicationSilentError(SilentError, CommunicationFailedError): + name = 'CommunicationFailed' + + class IsBusyError(SECoPError): pass diff --git a/secop/modules.py b/secop/modules.py index 5e79f41..ebe5aa8 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -28,7 +28,7 @@ from collections import OrderedDict from secop.datatypes import EnumType, FloatRange, BoolType, IntRange, \ StringType, TupleOf, get_datatype, ArrayOf, TextType -from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError +from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError, SilentError from secop.lib import formatException, formatExtendedStack, mkthread from secop.lib.enum import Enum from secop.metaclass import ModuleMeta @@ -282,6 +282,8 @@ class Module(HasProperties, metaclass=ModuleMeta): """poll parameter with proper error handling""" try: return getattr(self, 'read_'+ pname)() + except SilentError as e: + pass except SECoPError as e: self.log.error(str(e)) except Exception as e: @@ -298,16 +300,18 @@ class Module(HasProperties, metaclass=ModuleMeta): pnames = pobj.handler.parameters valuedict = {n: self.writeDict.pop(n) for n in pnames if n in self.writeDict} if valuedict: - self.log.info('write parameters %r', valuedict) + self.log.debug('write parameters %r', valuedict) pobj.handler.write(self, valuedict, force_read=True) return pobj.handler.read(self) else: if pname in self.writeDict: - self.log.info('write parameter %s', pname) + self.log.debug('write parameter %s', pname) getattr(self, 'write_'+ pname)(self.writeDict.pop(pname)) else: getattr(self, 'read_'+ pname)() + except SilentError as e: + pass except SECoPError as e: self.log.error(str(e)) except Exception as e: diff --git a/secop/params.py b/secop/params.py index 7c7df4c..1395116 100644 --- a/secop/params.py +++ b/secop/params.py @@ -258,7 +258,6 @@ class Override(CountedObj): class Command(Accessible): """storage for Commands settings (description + call signature...) """ - # datatype is not listed (handled separately) properties = { 'description': Property('Description of the Command', TextType(), extname='description', export=True, mandatory=True), @@ -326,4 +325,5 @@ PREDEFINED_ACCESSIBLES = dict( go = Command, abort = Command, shutdown = Command, + communicate = Command, ) diff --git a/secop/protocol/interface/tcp.py b/secop/protocol/interface/tcp.py index 0772d4f..db1dcc3 100644 --- a/secop/protocol/interface/tcp.py +++ b/secop/protocol/interface/tcp.py @@ -124,7 +124,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler): except Exception as err: # we have to decode 'origin' here # use latin-1, as utf-8 or ascii may lead to encoding errors - msg = origin.decode('latin-1').split(' ', 3) + (None,) # make sure len(msg) > 1 + msg = origin.decode('latin-1').split(' ', 3) + [None] # make sure len(msg) > 1 result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err), {'exception': formatException(), 'traceback': formatExtendedStack()}]) diff --git a/secop/stringio.py b/secop/stringio.py new file mode 100644 index 0000000..73ab5a2 --- /dev/null +++ b/secop/stringio.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""Line oriented stream communication + +implements TCP/IP and is be used as a base for SerialIO +""" + +import time +import socket +import threading +import re +from secop.modules import Module, Communicator, Parameter, Command, Property, Attached +from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, TupleOf +from secop.errors import CommunicationFailedError, CommunicationSilentError +from secop.poller import REGULAR +from secop.metaclass import Done + + + +class StringIO(Communicator): + """line oriented communicator + + implementation for TCP/IP streams. + other types have to override the following methods: + createConnection, readWithTimeout, writeBytes, closeConnection + + self healing is assured by polling the parameter 'is_connected' + """ + properties = { + 'uri': + Property('hostname:portnumber', datatype=StringType()), + 'end_of_line': + Property('end_of_line character', datatype=StringType(), + default='\n', settable=True), + 'encoding': + Property('used encoding', datatype=StringType(), + default='ascii', settable=True), + 'identification': + Property('a list of tuples with commands and expected responses as regexp', + datatype=ArrayOf(TupleOf(StringType(),StringType())), default=[], export=False), + } + parameters = { + 'timeout': + Parameter('timeout', datatype=FloatRange(0), default=1), + 'wait_before': + Parameter('wait time before sending', datatype=FloatRange(), default=0), + 'is_connected': + Parameter('connection state', datatype=BoolType(), readonly=False, poll=REGULAR), + 'pollinterval': + Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10), + } + commands = { + 'multicomm': + Command('execute multiple commands in one go', + argument=ArrayOf(StringType()), result= ArrayOf(StringType())) + } + + def earlyInit(self): + self._stream = None + self._lock = threading.RLock() + self._end_of_line = self.end_of_line.encode(self.encoding) + self._connect_error = None + self._last_error = 'not connected' + + def createConnection(self): + """create connection + + in case of success, self.is_connected MUST be set to True by implementors + """ + uri = self.uri + if uri.startswith('tcp://'): + uri = uri[6:] + try: + host, port = uri.split(':') + self._stream = socket.create_connection((host, int(port)), 10) + self.is_connected = True + except (ConnectionRefusedError, socket.gaierror) as e: + raise CommunicationFailedError(str(e)) + except Exception as e: + # this is really bad, do not try again + self._connect_error = e + raise + + def readWithTimeout(self, timeout): + """read with timeout + + Read bytes available now, or wait at most the specified timeout until some bytes + are available. Throw an error, if disconnected. + If no bytes are available, return b'' + + to be overwritten for other stream types + """ + if timeout is None or timeout < 0: + raise ValueError('illegal timeout %r' % timeout) + if not self.is_connected: + raise CommunicationSilentError(self._last_error) + self._stream.settimeout(timeout) + try: + reply = self._stream.recv(4096) + if reply: + return reply + except (BlockingIOError, socket.timeout): + return b'' + except Exception as e: + self.closeConnection() + raise CommunicationFailedError('disconnected because of %s' % e) + # other end disconnected + self.closeConnection() + raise CommunicationFailedError('other end disconnected') + + def writeBytes(self, data): + """write bytes + + to be overwritten for other stream types + """ + self._stream.sendall(data) + + def closeConnection(self): + """close connection + + self.is_connected MUST be set to False by implementors + """ + if not self._stream: + return + self.log.debug('disconnect %s' % self.uri) + try: + self._stream.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + try: + self._stream.close() + except socket.error: + pass + self._stream = None + self.is_connected = False + + def read_is_connected(self): + """try to reconnect, when not connected + + self.is_connected is changed only by self.connectStart or self.closeConnection + """ + if self.is_connected: + return Done # no need for intermediate updates + try: + self.connectStart() + if self._last_error: + self.log.info('connected') + self._last_error = 'connected' + return Done + except Exception as e: + if str(e) == self._last_error: + raise CommunicationSilentError(str(e)) + self._last_error = str(e) + self.log.error(self._last_error) + raise + return Done + + def write_is_connected(self, value): + """value = True: connect if not yet done + value = False: disconnect (will be reconnected automatically) + """ + if not value: + self.closeConnection() + return False + return self.read_is_connected() + + def connectStart(self): + if not self.is_connected: + self.createConnection() + for command, regexp in self.identification: + reply = self.do_communicate(command) + if not re.match(regexp, reply): + self.closeConnection() + raise CommunicationFailedError('bad response: %s does not match %s' % + (reply, regexp)) + + def do_communicate(self, command): + '''send a command and receive a reply + + using end_of_line, encoding and self._lock + for commands without reply, join it with a query command, + wait_before is respected for end_of_lines within a command. + ''' + if not self.is_connected: + self.read_is_connected() # try to reconnect + try: + with self._lock: + # read garbage and wait before send + if self.wait_before: + cmds = command.split(self.end_of_line) + else: + cmds = [command] + garbage = None + for cmd in cmds: + if self.wait_before: + time.sleep(self.wait_before) + if garbage is None: # read garbage only once + garbage = '' + data = self.readWithTimeout(0) + while data: + garbage += data + data = self.readWithTimeout(0) + if garbage: + self.log.debug('garbage: %s', garbage.decode(self.encoding)) + self.writeBytes((cmd + self.end_of_line).encode(self.encoding)) + timeout = self.timeout + buffer = b'' + data = True + while data: + data = self.readWithTimeout(timeout) + buffer += data + if self._end_of_line in buffer: + break + else: + raise CommunicationFailedError('timeout') + reply = buffer.split(self._end_of_line, 1)[0].decode(self.encoding) + self.log.debug('recv: %s', reply) + return reply + except Exception as e: + if str(e) == self._last_error: + raise CommunicationSilentError(str(e)) + self._last_error = str(e) + self.log.error(self._last_error) + raise + + def do_multicomm(self, commands): + replies = [] + with self._lock: + for cmd in commands: + replies.append(self.do_communicate(cmd)) + return replies + + +class HasIodev(Module): + """Mixin for modules using a communicator""" + properties = { + 'iodev': Attached(), + } + + def initModule(self): + try: + self._iodev.read_is_connected() + except CommunicationFailedError: + pass + super().initModule() + + def sendRecv(self, command): + return self._iodev.do_communicate(command)