#!/usr/bin/env python3 # pylint: disable=invalid-name # -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """server for a string communicator Usage: bin/stringio-server open a server on to communicate with the string based over TCP/IP. Use cases, mainly for test purposes: - as a T, if the hardware allows only one connection, and more than one is needed - relay to a communicator not using TCP/IP, if Frappy should run on an other host - relay to a hardware simulation written as a communicator """ import sys from os import path import asyncore import socket import ast # Add import path for inplace usage sys.path.insert(0, path.abspath(path.join(path.dirname(__file__), '..'))) from secop.lib import get_class, formatException class LineHandler(asyncore.dispatcher_with_send): def __init__(self, sock): self.buffer = b"" asyncore.dispatcher_with_send.__init__(self, sock) self.crlf = 0 def handle_read(self): data = self.recv(8192) if data: parts = data.split(b"\n") if len(parts) == 1: self.buffer += data else: self.handle_line((self.buffer + parts[0]).decode('latin_1')) for part in parts[1:-1]: if part[-1] == b"\r": self.crlf = True part = part[:-1] else: self.crlf = False self.handle_line(part.decode('latin_1')) self.buffer = parts[-1] def send_line(self, line): self.send((line + ("\r\n" if self.crlf else "\n")).encode('latin_1')) class LineServer(asyncore.dispatcher): def __init__(self, host, port, lineHandlerClass): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host, port)) self.listen(5) self.lineHandlerClass = lineHandlerClass def handle_accept(self): pair = self.accept() if pair is not None: sock, addr = pair print("Incoming connection from %s" % repr(addr)) self.lineHandlerClass(sock) def loop(self): asyncore.loop() class Server(LineServer): class Dispatcher: def announce_update(self, *_): pass def announce_update_error(self, *_): pass def __init__(self, *args, **kwds): super().__init__(*args, **kwds) self.dispatcher = self.Dispatcher() class Handler(LineHandler): def handle_line(self, line): try: reply = module.do_communicate(line.strip()) if verbose: print('%-40s | %s' % (line, reply)) except Exception: print(formatException(verbose=True)) self.send_line(reply) class Logger: def debug(self, *args): print(*args) info = exception = debug opts = {'description': 'simulator'} args = [] for arg in sys.argv[1:]: k, sep, v = arg.partition('=') if not k: args.append(v) try: v = ast.literal_eval(v) except Exception: pass opts[k] = v verbose = opts.pop('verbose', False) opts['cls'] = 'secop_psi.ls370sim.Ls370Sim' opts['port'] = 4567 if len(args) > 2: raise ValueError('do not know about: %s' % ' '.join(args[2:])) if len(args) == 2: opts['port'] = int(args[1]) if len(args) > 0: opts['cls'] = args[0] args.append(opts) cls = opts.pop('cls') port = opts.pop('port') srv = Server('localhost', int(port), Handler) module = get_class(cls)(cls, Logger(), opts, srv) module.earlyInit() srv.loop()