introduced StringIO
introduce line oriented communication + HasIodev Mixin + minor fixes Change-Id: I22605824d2f45147224840172bae281f85c80a11 Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/21953 Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
@ -98,6 +98,14 @@ class CommunicationFailedError(SECoPError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SilentError(SECoPError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommunicationSilentError(SilentError, CommunicationFailedError):
|
||||||
|
name = 'CommunicationFailed'
|
||||||
|
|
||||||
|
|
||||||
class IsBusyError(SECoPError):
|
class IsBusyError(SECoPError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ from collections import OrderedDict
|
|||||||
|
|
||||||
from secop.datatypes import EnumType, FloatRange, BoolType, IntRange, \
|
from secop.datatypes import EnumType, FloatRange, BoolType, IntRange, \
|
||||||
StringType, TupleOf, get_datatype, ArrayOf, TextType
|
StringType, TupleOf, get_datatype, ArrayOf, TextType
|
||||||
from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError
|
from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError, SilentError
|
||||||
from secop.lib import formatException, formatExtendedStack, mkthread
|
from secop.lib import formatException, formatExtendedStack, mkthread
|
||||||
from secop.lib.enum import Enum
|
from secop.lib.enum import Enum
|
||||||
from secop.metaclass import ModuleMeta
|
from secop.metaclass import ModuleMeta
|
||||||
@ -282,6 +282,8 @@ class Module(HasProperties, metaclass=ModuleMeta):
|
|||||||
"""poll parameter <pname> with proper error handling"""
|
"""poll parameter <pname> with proper error handling"""
|
||||||
try:
|
try:
|
||||||
return getattr(self, 'read_'+ pname)()
|
return getattr(self, 'read_'+ pname)()
|
||||||
|
except SilentError as e:
|
||||||
|
pass
|
||||||
except SECoPError as e:
|
except SECoPError as e:
|
||||||
self.log.error(str(e))
|
self.log.error(str(e))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -298,16 +300,18 @@ class Module(HasProperties, metaclass=ModuleMeta):
|
|||||||
pnames = pobj.handler.parameters
|
pnames = pobj.handler.parameters
|
||||||
valuedict = {n: self.writeDict.pop(n) for n in pnames if n in self.writeDict}
|
valuedict = {n: self.writeDict.pop(n) for n in pnames if n in self.writeDict}
|
||||||
if valuedict:
|
if valuedict:
|
||||||
self.log.info('write parameters %r', valuedict)
|
self.log.debug('write parameters %r', valuedict)
|
||||||
pobj.handler.write(self, valuedict, force_read=True)
|
pobj.handler.write(self, valuedict, force_read=True)
|
||||||
return
|
return
|
||||||
pobj.handler.read(self)
|
pobj.handler.read(self)
|
||||||
else:
|
else:
|
||||||
if pname in self.writeDict:
|
if pname in self.writeDict:
|
||||||
self.log.info('write parameter %s', pname)
|
self.log.debug('write parameter %s', pname)
|
||||||
getattr(self, 'write_'+ pname)(self.writeDict.pop(pname))
|
getattr(self, 'write_'+ pname)(self.writeDict.pop(pname))
|
||||||
else:
|
else:
|
||||||
getattr(self, 'read_'+ pname)()
|
getattr(self, 'read_'+ pname)()
|
||||||
|
except SilentError as e:
|
||||||
|
pass
|
||||||
except SECoPError as e:
|
except SECoPError as e:
|
||||||
self.log.error(str(e))
|
self.log.error(str(e))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -258,7 +258,6 @@ class Override(CountedObj):
|
|||||||
class Command(Accessible):
|
class Command(Accessible):
|
||||||
"""storage for Commands settings (description + call signature...)
|
"""storage for Commands settings (description + call signature...)
|
||||||
"""
|
"""
|
||||||
# datatype is not listed (handled separately)
|
|
||||||
properties = {
|
properties = {
|
||||||
'description': Property('Description of the Command', TextType(),
|
'description': Property('Description of the Command', TextType(),
|
||||||
extname='description', export=True, mandatory=True),
|
extname='description', export=True, mandatory=True),
|
||||||
@ -326,4 +325,5 @@ PREDEFINED_ACCESSIBLES = dict(
|
|||||||
go = Command,
|
go = Command,
|
||||||
abort = Command,
|
abort = Command,
|
||||||
shutdown = Command,
|
shutdown = Command,
|
||||||
|
communicate = Command,
|
||||||
)
|
)
|
||||||
|
@ -124,7 +124,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
|
|||||||
except Exception as err:
|
except Exception as err:
|
||||||
# we have to decode 'origin' here
|
# we have to decode 'origin' here
|
||||||
# use latin-1, as utf-8 or ascii may lead to encoding errors
|
# use latin-1, as utf-8 or ascii may lead to encoding errors
|
||||||
msg = origin.decode('latin-1').split(' ', 3) + (None,) # make sure len(msg) > 1
|
msg = origin.decode('latin-1').split(' ', 3) + [None] # make sure len(msg) > 1
|
||||||
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err),
|
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err),
|
||||||
{'exception': formatException(),
|
{'exception': formatException(),
|
||||||
'traceback': formatExtendedStack()}])
|
'traceback': formatExtendedStack()}])
|
||||||
|
266
secop/stringio.py
Normal file
266
secop/stringio.py
Normal file
@ -0,0 +1,266 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# *****************************************************************************
|
||||||
|
# This program is free software; you can redistribute it and/or modify it under
|
||||||
|
# the terms of the GNU General Public License as published by the Free Software
|
||||||
|
# Foundation; either version 2 of the License, or (at your option) any later
|
||||||
|
# version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||||
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||||
|
# details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License along with
|
||||||
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||||
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
# Module authors:
|
||||||
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||||
|
# *****************************************************************************
|
||||||
|
"""Line oriented stream communication
|
||||||
|
|
||||||
|
implements TCP/IP and is be used as a base for SerialIO
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import re
|
||||||
|
from secop.modules import Module, Communicator, Parameter, Command, Property, Attached
|
||||||
|
from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, TupleOf
|
||||||
|
from secop.errors import CommunicationFailedError, CommunicationSilentError
|
||||||
|
from secop.poller import REGULAR
|
||||||
|
from secop.metaclass import Done
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class StringIO(Communicator):
|
||||||
|
"""line oriented communicator
|
||||||
|
|
||||||
|
implementation for TCP/IP streams.
|
||||||
|
other types have to override the following methods:
|
||||||
|
createConnection, readWithTimeout, writeBytes, closeConnection
|
||||||
|
|
||||||
|
self healing is assured by polling the parameter 'is_connected'
|
||||||
|
"""
|
||||||
|
properties = {
|
||||||
|
'uri':
|
||||||
|
Property('hostname:portnumber', datatype=StringType()),
|
||||||
|
'end_of_line':
|
||||||
|
Property('end_of_line character', datatype=StringType(),
|
||||||
|
default='\n', settable=True),
|
||||||
|
'encoding':
|
||||||
|
Property('used encoding', datatype=StringType(),
|
||||||
|
default='ascii', settable=True),
|
||||||
|
'identification':
|
||||||
|
Property('a list of tuples with commands and expected responses as regexp',
|
||||||
|
datatype=ArrayOf(TupleOf(StringType(),StringType())), default=[], export=False),
|
||||||
|
}
|
||||||
|
parameters = {
|
||||||
|
'timeout':
|
||||||
|
Parameter('timeout', datatype=FloatRange(0), default=1),
|
||||||
|
'wait_before':
|
||||||
|
Parameter('wait time before sending', datatype=FloatRange(), default=0),
|
||||||
|
'is_connected':
|
||||||
|
Parameter('connection state', datatype=BoolType(), readonly=False, poll=REGULAR),
|
||||||
|
'pollinterval':
|
||||||
|
Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10),
|
||||||
|
}
|
||||||
|
commands = {
|
||||||
|
'multicomm':
|
||||||
|
Command('execute multiple commands in one go',
|
||||||
|
argument=ArrayOf(StringType()), result= ArrayOf(StringType()))
|
||||||
|
}
|
||||||
|
|
||||||
|
def earlyInit(self):
|
||||||
|
self._stream = None
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
self._end_of_line = self.end_of_line.encode(self.encoding)
|
||||||
|
self._connect_error = None
|
||||||
|
self._last_error = 'not connected'
|
||||||
|
|
||||||
|
def createConnection(self):
|
||||||
|
"""create connection
|
||||||
|
|
||||||
|
in case of success, self.is_connected MUST be set to True by implementors
|
||||||
|
"""
|
||||||
|
uri = self.uri
|
||||||
|
if uri.startswith('tcp://'):
|
||||||
|
uri = uri[6:]
|
||||||
|
try:
|
||||||
|
host, port = uri.split(':')
|
||||||
|
self._stream = socket.create_connection((host, int(port)), 10)
|
||||||
|
self.is_connected = True
|
||||||
|
except (ConnectionRefusedError, socket.gaierror) as e:
|
||||||
|
raise CommunicationFailedError(str(e))
|
||||||
|
except Exception as e:
|
||||||
|
# this is really bad, do not try again
|
||||||
|
self._connect_error = e
|
||||||
|
raise
|
||||||
|
|
||||||
|
def readWithTimeout(self, timeout):
|
||||||
|
"""read with timeout
|
||||||
|
|
||||||
|
Read bytes available now, or wait at most the specified timeout until some bytes
|
||||||
|
are available. Throw an error, if disconnected.
|
||||||
|
If no bytes are available, return b''
|
||||||
|
|
||||||
|
to be overwritten for other stream types
|
||||||
|
"""
|
||||||
|
if timeout is None or timeout < 0:
|
||||||
|
raise ValueError('illegal timeout %r' % timeout)
|
||||||
|
if not self.is_connected:
|
||||||
|
raise CommunicationSilentError(self._last_error)
|
||||||
|
self._stream.settimeout(timeout)
|
||||||
|
try:
|
||||||
|
reply = self._stream.recv(4096)
|
||||||
|
if reply:
|
||||||
|
return reply
|
||||||
|
except (BlockingIOError, socket.timeout):
|
||||||
|
return b''
|
||||||
|
except Exception as e:
|
||||||
|
self.closeConnection()
|
||||||
|
raise CommunicationFailedError('disconnected because of %s' % e)
|
||||||
|
# other end disconnected
|
||||||
|
self.closeConnection()
|
||||||
|
raise CommunicationFailedError('other end disconnected')
|
||||||
|
|
||||||
|
def writeBytes(self, data):
|
||||||
|
"""write bytes
|
||||||
|
|
||||||
|
to be overwritten for other stream types
|
||||||
|
"""
|
||||||
|
self._stream.sendall(data)
|
||||||
|
|
||||||
|
def closeConnection(self):
|
||||||
|
"""close connection
|
||||||
|
|
||||||
|
self.is_connected MUST be set to False by implementors
|
||||||
|
"""
|
||||||
|
if not self._stream:
|
||||||
|
return
|
||||||
|
self.log.debug('disconnect %s' % self.uri)
|
||||||
|
try:
|
||||||
|
self._stream.shutdown(socket.SHUT_RDWR)
|
||||||
|
except socket.error:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
self._stream.close()
|
||||||
|
except socket.error:
|
||||||
|
pass
|
||||||
|
self._stream = None
|
||||||
|
self.is_connected = False
|
||||||
|
|
||||||
|
def read_is_connected(self):
|
||||||
|
"""try to reconnect, when not connected
|
||||||
|
|
||||||
|
self.is_connected is changed only by self.connectStart or self.closeConnection
|
||||||
|
"""
|
||||||
|
if self.is_connected:
|
||||||
|
return Done # no need for intermediate updates
|
||||||
|
try:
|
||||||
|
self.connectStart()
|
||||||
|
if self._last_error:
|
||||||
|
self.log.info('connected')
|
||||||
|
self._last_error = 'connected'
|
||||||
|
return Done
|
||||||
|
except Exception as e:
|
||||||
|
if str(e) == self._last_error:
|
||||||
|
raise CommunicationSilentError(str(e))
|
||||||
|
self._last_error = str(e)
|
||||||
|
self.log.error(self._last_error)
|
||||||
|
raise
|
||||||
|
return Done
|
||||||
|
|
||||||
|
def write_is_connected(self, value):
|
||||||
|
"""value = True: connect if not yet done
|
||||||
|
value = False: disconnect (will be reconnected automatically)
|
||||||
|
"""
|
||||||
|
if not value:
|
||||||
|
self.closeConnection()
|
||||||
|
return False
|
||||||
|
return self.read_is_connected()
|
||||||
|
|
||||||
|
def connectStart(self):
|
||||||
|
if not self.is_connected:
|
||||||
|
self.createConnection()
|
||||||
|
for command, regexp in self.identification:
|
||||||
|
reply = self.do_communicate(command)
|
||||||
|
if not re.match(regexp, reply):
|
||||||
|
self.closeConnection()
|
||||||
|
raise CommunicationFailedError('bad response: %s does not match %s' %
|
||||||
|
(reply, regexp))
|
||||||
|
|
||||||
|
def do_communicate(self, command):
|
||||||
|
'''send a command and receive a reply
|
||||||
|
|
||||||
|
using end_of_line, encoding and self._lock
|
||||||
|
for commands without reply, join it with a query command,
|
||||||
|
wait_before is respected for end_of_lines within a command.
|
||||||
|
'''
|
||||||
|
if not self.is_connected:
|
||||||
|
self.read_is_connected() # try to reconnect
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
# read garbage and wait before send
|
||||||
|
if self.wait_before:
|
||||||
|
cmds = command.split(self.end_of_line)
|
||||||
|
else:
|
||||||
|
cmds = [command]
|
||||||
|
garbage = None
|
||||||
|
for cmd in cmds:
|
||||||
|
if self.wait_before:
|
||||||
|
time.sleep(self.wait_before)
|
||||||
|
if garbage is None: # read garbage only once
|
||||||
|
garbage = ''
|
||||||
|
data = self.readWithTimeout(0)
|
||||||
|
while data:
|
||||||
|
garbage += data
|
||||||
|
data = self.readWithTimeout(0)
|
||||||
|
if garbage:
|
||||||
|
self.log.debug('garbage: %s', garbage.decode(self.encoding))
|
||||||
|
self.writeBytes((cmd + self.end_of_line).encode(self.encoding))
|
||||||
|
timeout = self.timeout
|
||||||
|
buffer = b''
|
||||||
|
data = True
|
||||||
|
while data:
|
||||||
|
data = self.readWithTimeout(timeout)
|
||||||
|
buffer += data
|
||||||
|
if self._end_of_line in buffer:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise CommunicationFailedError('timeout')
|
||||||
|
reply = buffer.split(self._end_of_line, 1)[0].decode(self.encoding)
|
||||||
|
self.log.debug('recv: %s', reply)
|
||||||
|
return reply
|
||||||
|
except Exception as e:
|
||||||
|
if str(e) == self._last_error:
|
||||||
|
raise CommunicationSilentError(str(e))
|
||||||
|
self._last_error = str(e)
|
||||||
|
self.log.error(self._last_error)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def do_multicomm(self, commands):
|
||||||
|
replies = []
|
||||||
|
with self._lock:
|
||||||
|
for cmd in commands:
|
||||||
|
replies.append(self.do_communicate(cmd))
|
||||||
|
return replies
|
||||||
|
|
||||||
|
|
||||||
|
class HasIodev(Module):
|
||||||
|
"""Mixin for modules using a communicator"""
|
||||||
|
properties = {
|
||||||
|
'iodev': Attached(),
|
||||||
|
}
|
||||||
|
|
||||||
|
def initModule(self):
|
||||||
|
try:
|
||||||
|
self._iodev.read_is_connected()
|
||||||
|
except CommunicationFailedError:
|
||||||
|
pass
|
||||||
|
super().initModule()
|
||||||
|
|
||||||
|
def sendRecv(self, command):
|
||||||
|
return self._iodev.do_communicate(command)
|
Reference in New Issue
Block a user