13 Commits

Author SHA1 Message Date
17a44ef42a fixed dilsc, fist try before tests
Change-Id: Iaac5f07c27879c7c17be2e25defde19e9ddd1566
2024-04-29 12:18:12 +02:00
fcdee8e3ec gui: sort qt imports
Change-Id: Id8dfdd7b07bff6e337eb71a436dd51762f9b5fa7
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33389
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:01:20 +02:00
dddf74df9e frappy.client: catch errors on callbacks
buggy updates or errors in callbacks are now converted into
error updates, buggy error updates are skipped.

-> the receive thread should no longer crash

Change-Id: I97e3999db73e64f73dfbc380fac3d7685b6ca31c
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33386
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2024-04-08 17:01:03 +02:00
ac251ea515 test: add uri attach test
Change-Id: I8a021c53c9987ed03ce31c8481f73573b943d1f3
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33417
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:01:03 +02:00
9e4f9b7b95 gui: more specialized input widgets
Change-Id: I8b768b2069ae28a58d540165bd95f31ad7e984e0
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33390
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:01:03 +02:00
4f65ae7e46 frappy.client: cleanup properly after a reply timeout
when a reply to a request is not received within 10 s a timeout
error is raised, but any further requests with the same identifier
will be blocked - fix this

Change-Id: Id2fbc8bb6e6ecfd54bba1fa57b62e626e49b54c8
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33385
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2024-04-08 17:01:03 +02:00
a73b7e7d88 gui: catch invalid inputs
Change-Id: Ie0525e4ef9a94085da811e7eaa2e0b7430bade95
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33388
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:01:03 +02:00
76a78871b4 core: cover errors in handler setup()
Change-Id: I0bb2f07e26717205c013dfedec6e1beca2947d17
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33239
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:00:24 +02:00
118e22ee44 core: introduce common handler class
- make RequestHanlder based on socketserver.BaserequestHandler
- split handle() into subfunctions
- rework TCPRequestHandler

Change-Id: I62452e21c03b9cb9937673ce9c8663765798f863
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/32984
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:00:24 +02:00
c63f98f3cb dispatcher: consistent handling of missing timestamps
Like in `read` replies (line 193), `update` now omits the timestamp
instead of returning {"t": 0}.

Change-Id: Iaee0fccae81040cdd6075b0e4a8600c032aec03d
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33382
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:00:09 +02:00
6514a1b2ee core: add websocket interface
Change-Id: Ic62abeef6fb73f4a1b3d29f9225ba164de9e3e93
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33240
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
2024-04-08 17:00:09 +02:00
aeec940659 follow up fix: handler export=True correctly
this seems to be broken in change 33266

Change-Id: I4da78f297976daeac0a0709b9c86e6e28fc122bf
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33268
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2024-04-08 17:00:09 +02:00
4571af8534 follow up fix for change 33168
triggerPoll must be set on attached io

Change-Id: If3fa1016efa6047371380790f60db246d87b3628
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/33269
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
2024-04-08 16:59:19 +02:00
15 changed files with 869 additions and 233 deletions

231
cfg/dilsc_cfg.py Normal file
View File

@ -0,0 +1,231 @@
Node('cfg/dilsc1.cfg',
'triton test',
interface='5000',
name='dilsc1',
)
Mod('triton',
'frappy_psi.mercury.IO',
'connection to triton software',
uri='tcp://192.168.2.33:33576',
)
Mod('T_mix',
'frappy_psi.triton.TemperatureSensor',
'mix. chamber temperature',
slot='T8',
io='triton',
)
Mod('T_pt2head',
'frappy_psi.triton.TemperatureSensor',
'PTR2 head temperature',
slot='T1',
io='triton',
)
Mod('T_pt2plate',
'frappy_psi.triton.TemperatureSensor',
'PTR2 plate temperature',
slot='T2',
io='triton',
)
Mod('T_still',
'frappy_psi.triton.TemperatureSensor',
'still temperature',
slot='T3',
io='triton',
)
Mod('htr_still',
'frappy_psi.triton.HeaterOutput',
'still heater',
slot='H2',
io='triton',
)
Mod('T_coldpl',
'frappy_psi.triton.TemperatureSensor',
'cold plate temperature',
slot='T4',
io='triton',
)
Mod('T_mixcx',
'frappy_psi.triton.TemperatureSensor',
'mix. chamber cernox',
slot='T5',
io='triton',
)
Mod('T_pt1head',
'frappy_psi.triton.TemperatureSensor',
'PTR1 head temperature',
slot='T6',
io='triton',
)
Mod('T_pt1plate',
'frappy_psi.triton.TemperatureSensor',
'PTR1 plate temperature',
slot='T7',
io='triton',
)
Mod('T_pucksensor',
'frappy_psi.triton.TemperatureLoop',
'puck sensor temperature',
output_module='htr_pucksensor',
slot='TA',
io='triton',
)
Mod('htr_pucksensor',
'frappy_psi.triton.HeaterOutputWithRange',
'mix. chamber heater',
slot='H1,TA',
io='triton',
)
Mod('T_magnet',
'frappy_psi.triton.TemperatureSensor',
'magnet temperature',
slot='T13',
io='triton',
)
Mod('action',
'frappy_psi.triton.Action',
'higher level scripts',
io='triton',
slot='DR',
)
Mod('p_dump',
'frappy_psi.mercury.PressureSensor',
'dump pressure',
slot='P1',
io='triton',
)
Mod('p_cond',
'frappy_psi.mercury.PressureSensor',
'condenser pressure',
slot='P2',
io='triton',
)
Mod('p_still',
'frappy_psi.mercury.PressureSensor',
'still pressure',
slot='P3',
io='triton',
)
Mod('p_fore',
'frappy_psi.mercury.PressureSensor',
'pressure on the pump side',
slot='P5',
io='triton',
)
Mod('p_back',
'frappy_psi.mercury.PressureSensor',
'pressure on the back side of the pump',
slot='P4',
io='triton',
)
Mod('p_ovc',
'frappy_psi.mercury.PressureSensor',
'outer vacuum pressure',
slot='P6',
io='triton',
)
Mod('V1',
'frappy_psi.triton.Valve',
'valve V1',
slot='V1',
io='triton',
)
Mod('V2',
'frappy_psi.triton.Valve',
'valve V2',
slot='V2',
io='triton',
)
Mod('V4',
'frappy_psi.triton.Valve',
'valve V4',
slot='V4',
io='triton',
)
Mod('V5',
'frappy_psi.triton.Valve',
'valve V5',
slot='V5',
io='triton',
)
Mod('V9',
'frappy_psi.triton.Valve',
'valve V9',
slot='V9',
io='triton',
)
Mod('ips',
'frappy_psi.mercury.IO',
'IPS for magnet',
uri='192.168.127.254:3001',
)
Mod('mf',
'frappy_psi.dilsc.VectorField',
'vector field',
x='mfx',
y='mfy',
z='mfz',
sphere_radius=0.6,
cylinders=((0.23, 5.2), (0.45, 0.8)),
)
Mod('mfx',
'frappy_psi.ips_mercury.SimpleField',
'magnetic field, x-axis',
slot='GRPX',
io='ips',
tolerance=0.0001,
wait_stable_field=0.0,
nunits=2,
target=Param(max=0.6),
ramp=0.225,
)
Mod('mfy',
'frappy_psi.ips_mercury.SimpleField',
'magnetic field, y axis',
slot='GRPY',
io='ips',
tolerance=0.0001,
wait_stable_field=0.0,
nunits=2,
target=Param(max=0.6),
ramp=0.225,
)
Mod('mfz',
'frappy_psi.ips_mercury.Field',
'magnetic field, z-axis',
slot='GRPZ',
io='ips',
tolerance=0.0001,
target=Param(max=5.2),
mode='DRIVEN',
ramp=0.52,
)

View File

@ -282,6 +282,7 @@ class SecopClient(ProxyClient):
self.nodename = uri
self._lock = RLock()
self._shutdown = Event()
self.cleanup = []
def __del__(self):
try:
@ -297,6 +298,10 @@ class SecopClient(ProxyClient):
with self._lock:
if self.io:
return
self.txq = queue.Queue(30)
self.pending = queue.Queue(30)
self.active_requests.clear()
self.cleanup.clear()
if self.online:
self._set_state(True, 'reconnecting')
else:
@ -368,6 +373,12 @@ class SecopClient(ProxyClient):
noactivity = 0
try:
while self._running:
while self.cleanup:
entry = self.cleanup.pop()
for key, prev in self.active_requests.items():
if prev is entry:
self.active_requests.pop(key)
break
# may raise ConnectionClosed
reply = self.io.readline()
if reply is None:
@ -405,6 +416,14 @@ class SecopClient(ProxyClient):
self.updateValue(module, param, value, timestamp, readerror)
except KeyError:
pass # ignore updates of unknown parameters
except Exception as e:
self.log.debug(f'error when updating %s:%s %r', module, param, value)
try:
# catch errors in callback functions
self.updateValue(module, param, None, timestamp,
type(e)(f'{e} - raised on client side'))
except Exception as ee:
self.log.warn(f'can not handle error update %r for %s:%s: %r', e, module, param, ee)
if action in (EVENTREPLY, ERRORPREFIX + EVENTREPLY):
continue
try:
@ -591,8 +610,10 @@ class SecopClient(ProxyClient):
def get_reply(self, entry):
"""wait for reply and return it"""
if not entry[1].wait(10): # event
self.cleanup.append(entry)
raise TimeoutError('no response within 10s')
if not entry[2]: # reply
# no cleanup needed as self.active_requests will be cleared on connect
raise ConnectionError('connection closed before reply')
action, _, data = entry[2] # pylint: disable=unpacking-non-sequence
if action.startswith(ERRORPREFIX):

View File

@ -1,6 +1,10 @@
from frappy.gui.qt import QCheckBox, QComboBox, QLineEdit, pyqtSignal
import sys
from frappy.datatypes import BoolType, EnumType
from frappy.gui.qt import QCheckBox, QComboBox, QDoubleSpinBox, QLineEdit, \
QSpinBox, pyqtSignal
from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange, \
StringType, TextType
# ArrayOf, BLOBType, FloatRange, IntRange, StringType, StructOf, TextType, TupleOf
@ -9,11 +13,24 @@ def get_input_widget(datatype, parent=None):
return {
EnumType: EnumInput,
BoolType: BoolInput,
IntRange: IntInput,
StringType: StringInput,
TextType: StringInput,
}.get(datatype.__class__, GenericInput)(datatype, parent)
class GenericInput(QLineEdit):
class InputBase:
submitted = pyqtSignal()
input_feedback = pyqtSignal(str)
def get_input(self):
raise NotImplementedError
def submit(self):
self.submitted.emit()
class GenericInput(InputBase, QLineEdit):
def __init__(self, datatype, parent=None):
super().__init__(parent)
self.datatype = datatype
@ -23,12 +40,28 @@ class GenericInput(QLineEdit):
def get_input(self):
return self.datatype.from_string(self.text())
def submit(self):
self.submitted.emit()
class StringInput(GenericInput):
def __init__(self, datatype, parent=None):
super().__init__(datatype, parent)
class EnumInput(QComboBox):
submitted = pyqtSignal()
class IntInput(InputBase, QSpinBox):
def __init__(self, datatype, parent=None):
super().__init__(parent)
self.datatype = datatype
# we dont use setMaximum and setMinimum because it is quite restrictive
# when typing, so set it as high as possible
self.setMaximum(2147483647)
self.setMinimum(-2147483648)
self.lineEdit().returnPressed.connect(self.submit)
def get_input(self):
return self.datatype(self.value())
class EnumInput(InputBase, QComboBox):
def __init__(self, datatype, parent=None):
super().__init__(parent)
self.setPlaceholderText('choose value')
@ -45,18 +78,11 @@ class EnumInput(QComboBox):
def get_input(self):
return self._map[self.currentIndex()].value
def submit(self):
self.submitted.emit()
class BoolInput(QCheckBox):
submitted = pyqtSignal()
class BoolInput(InputBase, QCheckBox):
def __init__(self, datatype, parent=None):
super().__init__(parent)
self.datatype = datatype
def get_input(self):
return self.isChecked()
def submit(self):
self.submitted.emit()

View File

@ -24,9 +24,9 @@ from frappy.gui.qt import QColor, QDialog, QHBoxLayout, QIcon, QLabel, \
QLineEdit, QMessageBox, QPropertyAnimation, QPushButton, Qt, QToolButton, \
QWidget, pyqtProperty, pyqtSignal
from frappy.gui.inputwidgets import get_input_widget
from frappy.gui.util import Colors, loadUi
from frappy.gui.valuewidgets import get_widget
from frappy.gui.inputwidgets import get_input_widget
class CommandDialog(QDialog):
@ -54,7 +54,11 @@ class CommandDialog(QDialog):
self.resize(self.sizeHint())
def get_value(self):
return True, self.widgets[0].get_value()
try:
return self.widgets[0].get_value()
except Exception as e:
QMessageBox.warning(self.parent(), 'Operation failed', str(e))
return None
def exec(self):
if super().exec():
@ -95,8 +99,9 @@ class CommandButton(QPushButton):
if self._argintype:
dlg = CommandDialog(self._cmdname, self._argintype)
args = dlg.exec()
if args: # not 'Cancel' clicked
self._cb(self._cmdname, args[1])
if args is not None:
# no errors when converting value and 'Cancel' wasn't clicked
self._cb(self._cmdname, args)
else:
# no need for arguments
self._cb(self._cmdname, None)
@ -442,8 +447,8 @@ class ModuleWidget(QWidget):
self.paramDetails.emit(self._name, param)
def _button_pressed(self, param):
target = self._paramInputs[param].get_input()
try:
target = self._paramInputs[param].get_input()
self._node.setParameter(self._name, param, target)
except Exception as e:
QMessageBox.warning(self.parent(), 'Operation failed', str(e))

View File

@ -42,10 +42,10 @@ try:
QDialogButtonBox, QDoubleSpinBox, QFileDialog, QFrame, QGridLayout, \
QGroupBox, QHBoxLayout, QInputDialog, QLabel, QLineEdit, QMainWindow, \
QMenu, QMessageBox, QPlainTextEdit, QPushButton, QRadioButton, \
QScrollArea, QSizePolicy, QSpacerItem, QSpinBox, QStyle, \
QScrollArea, QSizePolicy, QSlider, QSpacerItem, QSpinBox, QStyle, \
QStyleOptionTab, QStylePainter, QTabBar, QTabWidget, QTextEdit, \
QToolButton, QTreeView, QTreeWidget, QTreeWidgetItem, QVBoxLayout, \
QWidget,QSlider
QWidget
import frappy.gui.resources_qt6
@ -62,9 +62,9 @@ except ImportError as e:
QDialog, QDialogButtonBox, QDoubleSpinBox, QFileDialog, QFrame, \
QGridLayout, QGroupBox, QHBoxLayout, QInputDialog, QLabel, QLineEdit, \
QMainWindow, QMenu, QMessageBox, QPlainTextEdit, QPushButton, \
QRadioButton, QScrollArea, QShortcut, QSizePolicy, QSpacerItem, \
QSpinBox, QStyle, QStyleOptionTab, QStylePainter, QTabBar, \
QTabWidget, QTextEdit, QToolButton, QTreeView, QTreeWidget, \
QTreeWidgetItem, QVBoxLayout, QWidget, QSlider
QRadioButton, QScrollArea, QShortcut, QSizePolicy, QSlider, \
QSpacerItem, QSpinBox, QStyle, QStyleOptionTab, QStylePainter, \
QTabBar, QTabWidget, QTextEdit, QToolButton, QTreeView, QTreeWidget, \
QTreeWidgetItem, QVBoxLayout, QWidget
import frappy.gui.resources_qt5

View File

@ -592,7 +592,7 @@ class Module(HasAccessibles):
if not self.io.triggerPoll:
# when self.io.enablePoll is False, triggerPoll is not
# created for self.io in the else clause below
self.triggerPoll = threading.Event()
self.io.triggerPoll = threading.Event()
else:
self.triggerPoll = threading.Event()
self.polledModules.append(self)

View File

@ -98,6 +98,16 @@ class Accessible(HasProperties):
props.append(f'{k}={v!r}')
return f"{self.__class__.__name__}({', '.join(props)})"
def fixExport(self):
if self.export is True:
predefined_cls = PREDEFINED_ACCESSIBLES.get(self.name)
if predefined_cls is None:
self.export = '_' + self.name
elif isinstance(self, predefined_cls):
self.export = self.name
else:
raise ProgrammingError(f'can not use {self.name!r} as name of a {type(self).__name__}')
class Parameter(Accessible):
"""defines a parameter
@ -225,18 +235,7 @@ class Parameter(Accessible):
self.name = name
if isinstance(self.datatype, EnumType):
self.datatype.set_name(name)
if self.export is True:
predefined_cls = PREDEFINED_ACCESSIBLES.get(self.name, None)
if predefined_cls is Parameter:
self.export = self.name
elif predefined_cls is None:
self.export = '_' + self.name
else:
raise ProgrammingError(f'can not use {self.name!r} as name of a Parameter')
if 'export' in self.ownProperties:
# avoid export=True overrides export=<name>
self.ownProperties['export'] = self.export
self.fixExport()
def clone(self, properties, **kwds):
"""return a clone of ourselfs with inherited properties"""
@ -280,7 +279,7 @@ class Parameter(Accessible):
:param modobj: final call, called from Module.__init__
"""
self.fixExport()
if self.constant is not None:
constant = self.datatype(self.constant)
# The value of the `constant` property should be the
@ -407,18 +406,8 @@ class Command(Accessible):
if self.func is None:
raise ProgrammingError(f'Command {owner.__name__}.{name} must be used as a method decorator')
self.fixExport()
self.datatype = CommandType(self.argument, self.result)
if self.export is True:
predefined_cls = PREDEFINED_ACCESSIBLES.get(name, None)
if predefined_cls is Command:
self.export = name
elif predefined_cls is None:
self.export = '_' + name
else:
raise ProgrammingError(f'can not use {name!r} as name of a Command') from None
if 'export' in self.ownProperties:
# avoid export=True overrides export=<name>
self.ownProperties['export'] = self.export
if not self._inherit:
for key, pobj in self.properties.items():
if key not in self.propertyValues:
@ -455,6 +444,7 @@ class Command(Accessible):
"""return a clone of ourselfs with inherited properties"""
res = type(self)(**kwds)
res.name = self.name
self.fixExport()
res.func = self.func
res.init(properties)
res.init(res.ownProperties)

View File

@ -47,9 +47,11 @@ def make_update(modulename, pobj):
if pobj.readerror:
return (ERRORPREFIX + EVENTREPLY, f'{modulename}:{pobj.export}',
# error-report !
[pobj.readerror.name, str(pobj.readerror), {'t': pobj.timestamp}])
[pobj.readerror.name, str(pobj.readerror),
{'t': pobj.timestamp} if pobj.timestamp else {}])
return (EVENTREPLY, f'{modulename}:{pobj.export}',
[pobj.export_value(), {'t': pobj.timestamp}])
[pobj.export_value(),
{'t': pobj.timestamp} if pobj.timestamp else {}])
class Dispatcher:

View File

@ -0,0 +1,237 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""The common parts of the SECNodes outside interfaces"""
import sys
import threading
from frappy.errors import SECoPError
from frappy.lib import formatException, formatExtendedStack, \
formatExtendedTraceback
from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
HelpMessage
class DecodeError(Exception):
def __init__(self, message, raw_msg):
super().__init__(message)
self._raw_msg = raw_msg
@property
def raw_msg(self):
return self._raw_msg
class ConnectionClose(Exception):
"""Indicates that receive quit due to an error."""
class RequestHandler:
"""Base class for the request handlers.
This is an extended copy of the BaseRequestHandler from socketserver.
To make a new interface, implement these methods:
ingest, next_message, decode_message, receive, send_reply and format
and extend (override) setup() and finish() if needed.
For an example, have a look at TCPRequestHandler.
"""
# Methods from BaseRequestHandler
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.log = None
try:
self.setup()
self.handle()
except Exception:
if self.log:
self.log.error(formatException())
else:
server.log.error(formatException())
finally:
self.finish()
def setup(self):
self.log = self.server.log
self.log.info("new connection %s", self.format())
# notify dispatcher of us
self.server.dispatcher.add_connection(self)
self.send_lock = threading.Lock()
self.running = True
# overwrite this with an appropriate buffer if needed
self.data = None
def handle(self):
"""handle a new connection"""
# copy state info
serverobj = self.server
# copy relevant settings from Interface
detailed_errors = serverobj.detailed_errors
# start serving
while self.running:
try:
newdata = self.receive()
if newdata is None:
# no new data during read, continue
continue
self.ingest(newdata)
except ConnectionClose:
# either normal close or error in receive
return
# put data into (de-) framer,
# de-frame data with next_message() and decode it
# call dispatcher.handle_request(self, message)
# dispatcher will queue the reply before returning
while self.running:
try:
msg = self.next_message()
if msg is None:
break # no more messages to process
except DecodeError as err:
# we have to decode 'origin' here
# use latin-1, as utf-8 or ascii may lead to encoding errors
msg = err.raw_msg.decode('latin-1').split(' ', 3) + [
None
] # make sure len(msg) > 1
result = (
ERRORPREFIX + msg[0],
msg[1],
[
'InternalError', str(err),
{
'exception': formatException(),
'traceback': formatExtendedStack()
}
]
)
print('--------------------')
print(formatException())
print('--------------------')
print(formatExtendedTraceback(sys.exc_info()))
print('====================')
else:
try:
if msg[0] == HELPREQUEST:
self.handle_help()
result = (HELPREPLY, None, None)
else:
result = serverobj.dispatcher.handle_request(self,
msg)
except SECoPError as err:
result = (
ERRORPREFIX + msg[0],
msg[1],
[
err.name,
str(err),
{
'exception': formatException(),
'traceback': formatExtendedStack()
}
]
)
except Exception as err:
# create Error Obj instead
result = (
ERRORPREFIX + msg[0],
msg[1],
[
'InternalError',
repr(err),
{
'exception': formatException(),
'traceback': formatExtendedStack()
}
]
)
print('--------------------')
print(formatException())
print('--------------------')
print(formatExtendedTraceback(sys.exc_info()))
print('====================')
if not result:
self.log.error('empty result upon msg %s', repr(msg))
if result[0].startswith(ERRORPREFIX) and not detailed_errors:
# strip extra information
result[2][2].clear()
self.send_reply(result)
def handle_help(self):
for idx, line in enumerate(HelpMessage.splitlines()):
# not sending HELPREPLY here, as there should be only one reply for
# every request
self.send_reply(('_', f'{idx + 1}', line))
def finish(self):
"""called when handle() terminates, i.e. the socket closed"""
self.log.info('closing connection %s', self.format())
# notify dispatcher
self.server.dispatcher.remove_connection(self)
# Methods for implementing in derived classes:
def ingest(self, newdata):
"""Put the new data into the buffer."""
raise NotImplementedError
def next_message(self):
"""Get the next decoded message from the buffer.
Has to return a triple of (MESSAGE, specifier, data) or None, in case
there are no further messages in the receive queue.
If there is an Error during decoding, this method has to raise a
DecodeError.
"""
raise NotImplementedError
def receive(self):
"""Receive data from the link.
Should return the received data or None if there was nothing new. Has
to raise a ConnectionClose on shutdown of the connection or on errors
that are not recoverable.
"""
raise NotImplementedError
def send_reply(self, data):
"""send reply
stops recv loop on error
"""
raise NotImplementedError
def format(self):
"""
Format available connection data into something recognizable for
logging.
For example, the remote IP address or a connection identifier.
"""
raise NotImplementedError
# TODO: server baseclass?

View File

@ -18,122 +18,77 @@
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""provides tcp interface to the SECoP Server"""
"""TCP interface to the SECoP Server"""
import errno
import os
import socket
import socketserver
import sys
import threading
import time
from frappy.datatypes import BoolType, StringType
from frappy.errors import SECoPError
from frappy.lib import formatException, formatExtendedStack, \
formatExtendedTraceback, SECoP_DEFAULT_PORT
from frappy.lib import SECoP_DEFAULT_PORT
from frappy.properties import Property
from frappy.protocol.interface import decode_msg, encode_msg_frame, get_msg
from frappy.protocol.messages import ERRORPREFIX, HELPREPLY, HELPREQUEST, \
HelpMessage
from frappy.protocol.interface.handler import ConnectionClose, \
RequestHandler, DecodeError
from frappy.protocol.messages import HELPREQUEST
MESSAGE_READ_SIZE = 1024
HELP = HELPREQUEST.encode()
class TCPRequestHandler(socketserver.BaseRequestHandler):
def format_address(addr):
if len(addr) == 2:
return '%s:%d' % addr
address, port = addr[0:2]
if address.startswith('::ffff'):
return '%s:%d' % (address[7:], port)
return '[%s]:%d' % (address, port)
class TCPRequestHandler(RequestHandler):
def setup(self):
self.log = self.server.log
self.running = True
self.send_lock = threading.Lock()
super().setup()
self.request.settimeout(1)
self.data = b''
def handle(self):
"""handle a new tcp-connection"""
# copy state info
mysocket = self.request
clientaddr = self.client_address
serverobj = self.server
def finish(self):
"""called when handle() terminates, i.e. the socket closed"""
super().finish()
# close socket
try:
self.request.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
self.request.close()
self.log.info("handling new connection from %s", format_address(clientaddr))
data = b''
def ingest(self, newdata):
self.data += newdata
# notify dispatcher of us
serverobj.dispatcher.add_connection(self)
def next_message(self):
try:
message, self.data = get_msg(self.data)
if message is None:
return None
if message.strip() == b'':
return (HELPREQUEST, None, None)
return decode_msg(message)
except Exception as e:
raise DecodeError('exception in receive', raw_msg=message) from e
# copy relevant settings from Interface
detailed_errors = serverobj.detailed_errors
mysocket.settimeout(1)
# start serving
while self.running:
try:
newdata = mysocket.recv(MESSAGE_READ_SIZE)
if not newdata:
# no timeout error, but no new data -> connection closed
return
data = data + newdata
except socket.timeout:
continue
except socket.error as e:
self.log.exception(e)
return
def receive(self):
try:
data = self.request.recv(MESSAGE_READ_SIZE)
if not data:
continue
# put data into (de-) framer,
# put frames into (de-) coder and if a message appear,
# call dispatcher.handle_request(self, message)
# dispatcher will queue the reply before returning
while self.running:
origin, data = get_msg(data)
if origin is None:
break # no more messages to process
origin = origin.strip()
if origin in (HELP, b''): # empty string -> send help message
for idx, line in enumerate(HelpMessage.splitlines()):
# not sending HELPREPLY here, as there should be only one reply for every request
self.send_reply(('_', f'{idx + 1}', line))
# ident matches request
self.send_reply((HELPREPLY, None, None))
continue
try:
msg = decode_msg(origin)
except Exception as err:
# we have to decode 'origin' here
# use latin-1, as utf-8 or ascii may lead to encoding errors
msg = origin.decode('latin-1').split(' ', 3) + [None] # make sure len(msg) > 1
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', str(err),
{'exception': formatException(),
'traceback': formatExtendedStack()}])
print('--------------------')
print(formatException())
print('--------------------')
print(formatExtendedTraceback(sys.exc_info()))
print('====================')
else:
try:
result = serverobj.dispatcher.handle_request(self, msg)
except SECoPError as err:
result = (ERRORPREFIX + msg[0], msg[1], [err.name, str(err),
{'exception': formatException(),
'traceback': formatExtendedStack()}])
except Exception as err:
# create Error Obj instead
result = (ERRORPREFIX + msg[0], msg[1], ['InternalError', repr(err),
{'exception': formatException(),
'traceback': formatExtendedStack()}])
print('--------------------')
print(formatException())
print('--------------------')
print(formatExtendedTraceback(sys.exc_info()))
print('====================')
if not result:
self.log.error('empty result upon msg %s', repr(msg))
if result[0].startswith(ERRORPREFIX) and not detailed_errors:
# strip extra information
result[2][2].clear()
self.send_reply(result)
raise ConnectionClose('socket was closed')
return data
except socket.timeout:
return None
except socket.error as e:
self.log.exception(e)
raise ConnectionClose() from e
def send_reply(self, data):
"""send reply
@ -156,18 +111,9 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
self.log.error('ERROR in send_reply %r', e)
self.running = False
def finish(self):
"""called when handle() terminates, i.e. the socket closed"""
self.log.info('closing connection from %s', format_address(self.client_address))
# notify dispatcher
self.server.dispatcher.remove_connection(self)
# close socket
try:
self.request.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
self.request.close()
def format(self):
return f'from {format_address(self.client_address)}'
class DualStackTCPServer(socketserver.ThreadingTCPServer):
"""Subclassed to provide IPv6 capabilities as socketserver only uses IPv4"""
@ -230,12 +176,3 @@ class TCPServer(DualStackTCPServer):
if ntry:
self.log.warning('tried again %d times after "Address already in use"', ntry)
self.log.info("TCPServer initiated")
def format_address(addr):
if len(addr) == 2:
return '%s:%d' % addr
address, port = addr[0:2]
if address.startswith('::ffff'):
return '%s:%d' % (address[7:], port)
return '[%s]:%d' % (address, port)

View File

@ -0,0 +1,160 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Alexander Zaft <a.zaft@fz-juelich.de>
#
# *****************************************************************************
import json
from functools import partial
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
from websockets.sync.server import CloseCode, serve
from frappy.protocol.interface.handler import ConnectionClose, \
RequestHandler, DecodeError
from frappy.protocol.messages import HELPREQUEST
def encode_msg_frame_str(action, specifier=None, data=None):
""" encode a msg_triple into an msg_frame, ready to be sent
action (and optional specifier) are str strings,
data may be an json-yfied python object"""
msg = (action, specifier or '', '' if data is None else json.dumps(data))
return ' '.join(msg).strip()
class WSRequestHandler(RequestHandler):
"""Handles a Websocket connection."""
def __init__(self, conn, server):
self.conn = conn
client_address = conn.remote_address
request = conn.socket
super().__init__(request, client_address, server)
def setup(self):
super().setup()
self.server.connections.add(self)
def finish(self):
"""called when handle() terminates, i.e. the socket closed"""
super().finish()
self.server.connections.discard(self)
# this will be called for a second time if the server is shutting down,
# but in that case it will be a no-op
self.conn.close()
def ingest(self, newdata):
# recv on the websocket connection returns one message, we don't save
# anything in data
self.data = newdata
def next_message(self):
"""split the string into a message triple."""
if self.data is None:
return None
try:
message = self.data.strip()
if message == '':
return HELPREQUEST, None, None
res = message.split(' ', 2) + ['', '']
action, specifier, data = res[0:3]
self.data = None
return (
action,
specifier or None,
None if data == '' else json.loads(data)
)
except Exception as e:
raise DecodeError('exception when reading in message',
raw_msg=bytes(message, 'utf-8')) from e
def receive(self):
"""receives one message from the websocket."""
try:
return self.conn.recv()
except TimeoutError:
return None
except ConnectionClosedOK:
raise ConnectionClose from None
except ConnectionClosedError as e:
self.log.error('No close frame received from %s', self.format())
raise ConnectionClose from e
except OSError as e:
self.log.exception(e)
raise ConnectionClose from e
def send_reply(self, data):
"""send reply
stops recv loop on error (including timeout when output buffer full for
more than 1 sec)
"""
if not data:
self.log.error('should not reply empty data!')
return
outdata = encode_msg_frame_str(*data)
with self.send_lock:
if self.running:
try:
self.conn.send(outdata)
except (BrokenPipeError, IOError) as e:
self.log.debug('send_reply got an %r, connection closed?',
e)
self.running = False
except Exception as e:
self.log.error('ERROR in send_reply %r', e)
self.running = False
def format(self):
return f'{self.conn.id} from {self.client_address}'
class WSServer:
"""Server for providing a websocket interface.
Implementation note:
The websockets library doesn't provide an option to subclass its server, so
we take the returned value as an attribute and provide the neccessary
function calls.
"""
def __init__(self, name, logger, options, srv):
self.connections = set() # keep track for shutting down
self.dispatcher = srv.dispatcher
self.name = name
self.log = logger
self.port = int(options.pop('uri').split('://', 1)[-1])
self.detailed_errors = options.pop('detailed_errors', False)
handle = partial(WSRequestHandler, server=self)
# websockets only gives the serve method without an option to subclass
self.ws_server = serve(handle, '', self.port, logger=logger)
self.log.info("Websocket server %s binding to port %d", name, self.port)
def serve_forever(self):
self.ws_server.serve_forever()
def shutdown(self):
for c in list(self.connections):
c.conn.close(code=CloseCode.GOING_AWAY, reason='shutting down')
self.ws_server.shutdown()
def __enter__(self):
return self
def __exit__(self, *args):
return self.shutdown()

View File

@ -53,6 +53,7 @@ except ImportError:
class Server:
INTERFACES = {
'tcp': 'protocol.interface.tcp.TCPServer',
'ws': 'protocol.interface.ws.WSServer',
}
_restart = True

View File

@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
@ -18,79 +19,78 @@
# *****************************************************************************
"""vector field"""
from frappy.core import Drivable, Done, BUSY, IDLE, WARN, ERROR
from frappy.errors import BadValueError
import math
from frappy.core import Drivable, Done, BUSY, IDLE, ERROR, Parameter, TupleOf, ArrayOf, FloatRange
from frappy.errors import RangeError
from frappy_psi.vector import Vector
from frappy.states import HasStates, Retry, status_code
DECREASE = 1
INCREASE = 2
class VectorField(Vector, Drivable):
_state = None
class VectorField(HasStates, Vector, Drivable):
sphere_radius = Parameter('max. sphere', datatype=FloatRange(0, 0.7, unit='T'), readonly=True, default=0.6)
cylinders = Parameter('allowed cylinders (list of radius and height)',
datatype=ArrayOf(TupleOf(FloatRange(0, 0.6, unit='T'), FloatRange(0, 5.2, unit='T')), 1, 9),
readonly=True, default=((0.23, 5.2), (0.45, 0.8)))
def doPoll(self):
"""periodically called method"""
try:
if self._starting:
# first decrease components
driving = False
for target, component in zip(self.target, self.components):
if target * component.value < 0:
# change sign: drive to zero first
target = 0
if abs(target) < abs(component.target):
if target != component.target:
component.write_target(target)
if component.isDriving():
driving = True
if driving:
return
# now we can go to the final targets
for target, component in zip(self.target, self.components):
component.write_target(target)
self._starting = False
else:
for component in self.components:
if component.isDriving():
return
self.setFastPoll(False)
finally:
super().doPoll()
def initModule(self):
super().initModule()
# override check_limits of the components with a check for restrictions on the vector
for idx, component in enumerate(self.components):
def outer_check(target, vector=self, i=idx, inner_check=component.check_target):
inner_check(target)
value = [c.value - math.copysign(c.tolerance, c.value)
for c in vector.components]
value[i] = target
vector._check_limits(value)
component.check_target = outer_check
def merge_status(self):
names = [c.name for c in self.components if c.status[0] >= ERROR]
if names:
return ERROR, 'error in %s' % ', '.join(names)
names = [c.name for c in self.components if c.isDriving()]
if self._state:
# self.log.info('merge %r', [c.status for c in self.components])
if names:
direction = 'down ' if self._state == DECREASE else ''
return BUSY, 'ramping %s%s' % (direction, ', '.join(names))
if self.status[0] == BUSY:
return self.status
return BUSY, 'driving'
if names:
return WARN, 'moving %s directly' % ', '.join(names)
names = [c.name for c in self.components if c.status[0] >= WARN]
if names:
return WARN, 'warnings in %s' % ', '.join(names)
return IDLE, ''
return self.status
def write_target(self, value):
def _check_limits(self, value):
"""check if value is within one of the safe shapes"""
if sum((v ** 2 for v in value)) <= self.sphere_radius ** 2:
return
for r, h in self.cylinders:
if sum(v ** 2 for v in value[0:2]) <= r ** 2 and abs(value[2]) <= h:
return
raise RangeError('vector %s does not fit in any limiting shape' % repr(value))
def write_target(self, target):
"""initiate target change"""
# first make sure target is valid
for target, component in zip(self.target, self.components):
# check against limits if individual components
component.check_limits(target)
if sum(v * v for v in value) > 1:
raise BadValueError('norm of vector too high')
self.log.info('decrease')
self.setFastPoll(True)
self.target = value
self._state = DECREASE
self.doPoll()
self.log.info('done write_target %r', value)
return Done
# check limits first
for component_target, component in zip(target, self.components):
# check against limits of individual components
component.check_target(component_target, vector=None) # no outer check here!
self._check_limits(target)
for component_target, component in zip(target, self.components):
if component_target * component.value < 0:
# change sign: drive to zero first
component_target = 0
if abs(component_target) > abs(component.value):
continue # do not drive yet
component.write_target(component_target)
self.start_machine(self.ramp_down, target=target)
return target
@status_code(BUSY)
def ramp_down(self, state):
for target, component in zip(state.target, self.components):
if component.isDriving():
return Retry()
for target, component in zip(state.target, self.components):
component.write_target(target)
return self.final_ramp
@status_code(BUSY)
def final_ramp(self, state):
for component in self.components:
if component.isDriving():
return Retry()
return self.final_status()

View File

@ -5,6 +5,8 @@ mlzlog >=0.2.0
# daemonizing
psutil
python-daemon >=2.0
# websocket interface:
websockets>=11.0
# for zmq interface
#pyzmq>=13.1.0
#for ppms on windows

View File

@ -19,6 +19,7 @@
#
# *****************************************************************************
from frappy.io import HasIO
from frappy.modules import Module, Attached
from frappy.protocol.dispatcher import Dispatcher
@ -29,6 +30,9 @@ class LoggerStub:
info = warning = exception = debug
handlers = []
def getChild(self, name):
return self
logger = LoggerStub()
@ -51,6 +55,7 @@ class ServerStub:
def __init__(self):
self.secnode = SecNodeStub()
self.dispatcher = Dispatcher('dispatcher', logger, {}, self)
self.log = logger
def test_attach():
@ -64,3 +69,22 @@ def test_attach():
srv.secnode.add_module(a, 'a')
srv.secnode.add_module(m, 'm')
assert m.att == a
def test_attach_hasio_uri():
class TestIO(Module):
def __init__(self, name, logger, cfgdict, srv):
self._uri = cfgdict.pop('uri')
super().__init__(name, logger, cfgdict, srv)
class HasIOTest(HasIO):
ioClass = TestIO
srv = ServerStub()
m = HasIOTest('m', logger, {'description': '', 'uri': 'abc'}, srv)
assert srv.secnode.modules['m_io']._uri == 'abc'
assert m.io == srv.secnode.modules['m_io']
# two modules with the same IO should use the same io module
m2 = HasIOTest('m', logger, {'description': '', 'uri': 'abc'}, srv)
assert m2.io == srv.secnode.modules['m_io']