add sim-server again based on socketserver

- fix ls370test config file
+ fix issues with frappy_psi.ls370res
+ add frappy_psi.ls370sim

Change-Id: Ie61e3ea01c4b9c7c1286426504e50acf9413a8ba
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/34957
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
This commit is contained in:
2024-11-14 11:42:29 +01:00
parent 3bf1a838d4
commit 21b8fd6518
3 changed files with 307 additions and 8 deletions

197
bin/sim-server Executable file
View File

@@ -0,0 +1,197 @@
#!/usr/bin/env python3
# pylint: disable=invalid-name
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""server for a string communicator
Usage:
bin/sim-server <communicator class> -p <server port> [-o <option1>=<value> <option2>=<value>]
open a server on <server port> to communicate with the string based <communicator> over TCP/IP.
Use cases, mainly for test purposes:
- relay to a hardware simulation written as a communicator
> bin/sim-server frappy_psi.ls370sim.Ls370Sim
- relay to a communicator not using TCP/IP, if Frappy should run on an other host
> bin/sim-server frappy.io.StringIO -o uri=serial:///dev/tty...
- as a T, if the hardware allows only one connection, and more than one is needed:
> bin/sim-server frappy.io.StringIO -o uri=tcp://<host>:<port>
typically using communicator class frappy.io.StringIO
"""
import sys
import argparse
from pathlib import Path
import socket
import time
import os
from ast import literal_eval
from socketserver import BaseRequestHandler, ThreadingTCPServer
# Add import path for inplace usage
sys.path.insert(0, str(Path(__file__).absolute().parents[1]))
from frappy.lib import get_class, formatException, mkthread
class Logger:
def debug(self, *args):
pass
def log(self, level, *args):
pass
def info(self, *args):
print(*args)
exception = error = warn = info
class TcpRequestHandler(BaseRequestHandler):
def setup(self):
print(f'connection opened from {self.client_address}')
self.running = True
self.request.settimeout(1)
self.data = b''
def finish(self):
"""called when handle() terminates, i.e. the socket closed"""
# close socket
try:
self.request.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
print(f'connection closed from {self.client_address}')
self.request.close()
def poller(self):
while True:
time.sleep(1.0)
self.module.doPoll()
def handle(self):
"""handle a new connection"""
# do a copy of the options, as they are consumed
self.module = self.server.modulecls(
'mod', Logger(), dict(self.server.options), self.server)
self.module.earlyInit()
mkthread(self.poller)
while self.running:
try:
newdata = self.request.recv(1024)
if not newdata:
return
except socket.timeout:
# no new data during read, continue
continue
self.data += newdata
while self.running:
message, sep, self.data = self.data.partition(b'\n')
if not sep:
break
cmd = message.decode('latin-1')
try:
reply = self.module.communicate(cmd.strip())
if self.server.verbose:
print('%-40s | %s' % (cmd, reply))
except Exception:
print(formatException(verbose=True))
return
outdata = reply.encode('latin-1') + b'\n'
try:
self.request.sendall(outdata)
except Exception as e:
print(repr(e))
self.running = False
class Server(ThreadingTCPServer):
allow_reuse_address = os.name != 'nt' # False on Windows systems
class Dispatcher:
def announce_update(self, *_):
pass
def announce_update_error(self, *_):
pass
def __init__(self, port, modulecls, options, verbose=False):
super().__init__(('', port), TcpRequestHandler,
bind_and_activate=True)
self.secnode = None
self.dispatcher = self.Dispatcher()
self.verbose = verbose
self.modulecls = get_class(modulecls)
self.options = options
print(f'started sim-server listening on port {port}')
def parse_argv(argv):
parser = argparse.ArgumentParser(description="Relay to a communicator (simulated HW or other)")
parser.add_argument("-v", "--verbose",
help="output full communication",
action='store_true', default=False)
parser.add_argument("cls",
type=str,
help="communicator class.\n",)
parser.add_argument('-p',
'--port',
action='store',
help='server port or uri',
default=2089)
parser.add_argument('-o',
'--options',
action='store',
nargs='*',
help='options in the form key=value',
default=None)
return parser.parse_args(argv)
def main(argv=None):
if argv is None:
argv = sys.argv
args = parse_argv(argv[1:])
options = {'description': ''}
for item in args.options or ():
key, eq, value = item.partition('=')
if not eq:
raise ValueError(f"missing '=' in {item}")
try:
value = literal_eval(value)
except Exception:
pass
options[key] = value
srv = Server(int(args.port), args.cls, options, args.verbose)
srv.serve_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@@ -63,7 +63,12 @@ def parse_result(reply):
class LakeShoreIO(HasIO):
def set_param(self, cmd, *args):
head = ','.join([cmd] + [f'{a:g}' for a in args])
args = [f'{a:g}' for a in args]
if ' ' in cmd.strip():
args.insert(0, cmd)
else:
args[0] = cmd + args[0]
head = ','.join(args)
tail = cmd.replace(' ', '?')
reply = self.io.communicate(f'{head};{tail}')
return parse_result(reply)
@@ -99,7 +104,7 @@ class Switcher(LakeShoreIO, ChannelSwitcher):
if channelno is None:
self.status = 'ERROR', 'no enabled channel'
return
self.set_param(f'SCAN {channelno},0')
self.set_param('SCAN ', channelno, 0)
def doPoll(self):
"""poll buttons
@@ -160,7 +165,7 @@ class Switcher(LakeShoreIO, ChannelSwitcher):
self.measure_delay = chan.dwell
def set_active_channel(self, chan):
self.set_param(f'SCAN {chan.channel},0')
self.set_param('SCAN ', chan.channel, 0)
chan._last_range_change = time.monotonic()
self.set_delays(chan)
@@ -227,7 +232,7 @@ class ResChannel(LakeShoreIO, Channel):
now = time.monotonic()
if now + 0.5 < max(self._last_range_change, self.switcher._start_switch) + self.pause:
return None
result = self.get_param(f'RDGR{self.channel}')
result = self.get_param(f'RDGR?{self.channel}')
if self.autorange:
self.fix_autorange()
if now + 0.5 > self._last_range_change + self.pause:
@@ -251,7 +256,7 @@ class ResChannel(LakeShoreIO, Channel):
def read_value(self):
if self.channel == self.switcher.value == self.switcher.target:
value = self._read_value()
value = self.get_value()
if value is not None:
return value
return self.value # return previous value
@@ -264,7 +269,7 @@ class ResChannel(LakeShoreIO, Channel):
@CommonReadHandler(rdgrng_params)
def read_rdgrng(self):
iscur, exc, rng, autorange, excoff = self.get_param(f'RDGRNG{self.channel}')
iscur, exc, rng, autorange, excoff = self.get_param(f'RDGRNG?{self.channel}')
self._prev_rdgrng = iscur, exc
if autorange: # pressed autorange button
if not self._toggle_autorange:
@@ -293,8 +298,7 @@ class ResChannel(LakeShoreIO, Channel):
excoff = 1
rng = change['range']
if self.autorange:
if rng < self.minrange:
rng = self.minrange
rng = max(rng, self.minrange)
self.set_param(f'RDGRNG {self.channel}', iscur, exc, rng, 0, excoff)
self.read_range()

98
frappy_psi/ls370sim.py Normal file
View File

@@ -0,0 +1,98 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""a very simple simulator for LakeShore Models 370 and 372
reduced to the functionality actually used in e.g. frappy_psi.ls370res
"""
import time
from frappy.modules import Communicator
class _Ls37xSim(Communicator):
# commands containing %d for the channel number
CHANNEL_COMMANDS = [
('RDGR?%d', '1.0'),
('RDGK?%d', '1.5'),
('RDGST?%d', '0'),
('RDGRNG?%d', '0,5,5,0,0'),
('INSET?%d', '1,3,3,0,0'),
('FILTER?%d', '1,5,80'),
]
# commands not related to a channel
OTHER_COMMANDS = [
('SCAN?', '3,1'),
('*OPC?', '1'),
]
def earlyInit(self):
super().earlyInit()
self._res = {}
self._start = time.time()
self._data = dict(self.OTHER_COMMANDS)
for fmt, v in self.CHANNEL_COMMANDS:
for chan in range(1, 17):
self._data[fmt % chan] = v
def communicate(self, command):
self.comLog('> %s' % command)
# simulation part, time independent
for channel in range(1,17):
_, _, _, _, excoff = self._data['RDGRNG?%d' % channel].split(',')
if excoff == '1':
self._data['RDGST?%d' % channel] = '6'
else:
self._data['RDGST?%d' % channel] = '0'
channel = int(self._data['SCAN?'].split(',', 1)[0])
self._res[channel] = channel + (time.time() - self._start) / 3600
strvalue = f'{self._res[channel]:g}'
self._data['RDGR?%d' % channel] = self._data['RDGK?%d' % channel] = strvalue
chunks = command.split(';')
reply = []
for chunk in chunks:
if '?' in chunk:
reply.append(self._data[chunk])
else:
for nqarg in (1, 0):
if nqarg == 0:
qcmd, arg = chunk.split(' ', 1)
qcmd += '?'
else:
qcmd, arg = chunk.split(',', nqarg)
qcmd = qcmd.replace(' ', '?', 1)
if qcmd in self._data:
self._data[qcmd] = arg
break
reply = ';'.join(reply)
self.comLog('< %s' % reply)
return reply
class Ls370Sim(_Ls37xSim):
OTHER_COMMANDS = _Ls37xSim.OTHER_COMMANDS + [
('*IDN?', 'LSCI,MODEL370,370184,05302003'),
]
class Ls372Sim(_Ls37xSim):
OTHER_COMMANDS = _Ls37xSim.OTHER_COMMANDS + [
('*IDN?', 'LSCI,MODEL372,372184,05302003'),
('PID?1', '10,10,0'),
]