frappy/bin/stringio-server
Markus Zolliker cb4874331b driver for Lakeshore Model 370 resistivity measurement
- this does not (yet) include temperatures and control loop
- including stringio-server for test purposes

Change-Id: I414ae2e6663bb0773fe60db1798401dfc9dde018
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22005
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2019-12-13 17:13:05 +01:00

142 lines
4.2 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=invalid-name
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""server for a string communicator
Usage:
bin/stringio-server <communciator> <server port>
open a server on <server port> to communicate with the string based <communicator> over TCP/IP.
Use cases, mainly for test purposes:
- as a T, if the hardware allows only one connection, and more than one is needed
- relay to a communicator not using TCP/IP, if Frappy should run on an other host
- relay to a hardware simulation written as a communicator
"""
import sys
from os import path
import asyncore
import socket
# Add import path for inplace usage
sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..')))
from secop.lib import get_class, formatException
class LineHandler(asyncore.dispatcher_with_send):
def __init__(self, sock):
self.buffer = b""
asyncore.dispatcher_with_send.__init__(self, sock)
self.crlf = 0
def handle_read(self):
data = self.recv(8192)
if data:
parts = data.split(b"\n")
if len(parts) == 1:
self.buffer += data
else:
self.handle_line((self.buffer + parts[0]).decode('latin_1'))
for part in parts[1:-1]:
if part[-1] == b"\r":
self.crlf = True
part = part[:-1]
else:
self.crlf = False
self.handle_line(part.decode('latin_1'))
self.buffer = parts[-1]
def send_line(self, line):
self.send((line + ("\r\n" if self.crlf else "\n")).encode('latin_1'))
class LineServer(asyncore.dispatcher):
def __init__(self, host, port, lineHandlerClass):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.lineHandlerClass = lineHandlerClass
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print ("Incoming connection from %s" % repr(addr))
self.lineHandlerClass(sock)
def loop(self):
asyncore.loop()
class Server(LineServer):
class Dispatcher:
def announce_update(self, *_):
pass
def announce_update_error(self, *_):
pass
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
self.dispatcher = self.Dispatcher()
class Handler(LineHandler):
def handle_line(self, line):
try:
reply = module.do_communicate(line.strip())
print('%-40s | %s' % (line, reply))
except Exception:
print(formatException(verbose=True))
self.send_line(reply)
class Logger:
def debug(self, *args):
print(*args)
info = exception = debug
if len(sys.argv) < 2:
sys.argv.append('secop_psi.ls370sim.Ls370Sim')
if len(sys.argv) < 3:
sys.argv.append('4567')
communicatorname = sys.argv[1]
serverport = int(sys.argv[2])
opts = {'.description':'simulator'}
for arg in sys.argv[3:]:
k, v = arg.split('=',1)
opts[k] = v
srv = Server('localhost', serverport, Handler)
module = get_class(communicatorname)(communicatorname, Logger(), opts, srv)
module.earlyInit()
srv.loop()