adopt frappy_psi and frappy changes from wip

Change-Id: I4e6732e755398d88b73007fb53b758039c5d4483
This commit is contained in:
zolliker 2024-01-29 10:56:58 +01:00
parent 3eb5f90ce6
commit a429852c80
15 changed files with 597 additions and 12 deletions

View File

@ -391,16 +391,17 @@ class SecopClient(ProxyClient):
else:
module_param = self.internal.get(f'{ident}:value', None)
if module_param is not None:
now = time.time()
if action.startswith(ERRORPREFIX):
timestamp = data[2].get('t', None)
timestamp = data[2].get('t', now)
readerror = frappy.errors.make_secop_error(*data[0:2])
value = None
else:
timestamp = data[1].get('t', None)
timestamp = data[1].get('t', now)
value = data[0]
readerror = None
module, param = module_param
timestamp = min(time.time(), timestamp) # no timestamps in the future!
timestamp = min(now, timestamp) # no timestamps in the future!
try:
self.updateValue(module, param, value, timestamp, readerror)
except KeyError:

View File

@ -239,7 +239,7 @@ class StringIO(IOBase):
a flag to indicate whether the first message should be resent once to
avoid data that may still be in the buffer to garble the message''',
datatype=BoolType(), default=False)
datatype=BoolType(), default=True)
def _convert_eol(self, value):
if isinstance(value, str):

View File

@ -71,8 +71,11 @@ class HasOutputModule:
def initModule(self):
super().initModule()
if self.output_module:
self.output_module.register_input(self.name, self.deactivate_control)
try:
if self.output_module:
self.output_module.register_input(self.name, self.deactivate_control)
except Exception:
self.log.info(f'{self.name} has no output module')
def set_control_active(self, active):
"""to be overridden for switching hw control"""

View File

@ -489,6 +489,9 @@ class Dispatcher:
modobj.setRemoteLogging(conn, level)
def handle_logging(self, conn, specifier, level):
if specifier == '#':
self.log.handlers[1].setLevel(int(level))
return LOGGING_REPLY, specifier, level
if specifier and specifier != '.':
modobj = self._modules[specifier]
modobj.setRemoteLogging(conn, level)

View File

@ -143,6 +143,7 @@ class TCPRequestHandler(socketserver.BaseRequestHandler):
if not data:
self.log.error('should not reply empty data!')
return
self.log.debug('send %r', data)
outdata = encode_msg_frame(*data)
with self.send_lock:
if self.running:

67
frappy_psi/bkpower.py Normal file
View File

@ -0,0 +1,67 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Jael Celia Lorenzana <jael-celia.lorenzana@psi.ch>
# *****************************************************************************
"""Powersupply B&K Precision BK168xB"""
from frappy.core import StringIO, Readable, Parameter, FloatRange, Writable, HasIO, BoolType
class IO(StringIO):
end_of_line = ('OK\r', '\r')
default_settings = {'baudrate': 9600}
class Power(HasIO, Readable):
value = Parameter(datatype=FloatRange(0,300,unit='W'))
def read_value(self):
reply = self.communicate('GETD')
volt = float(reply[0:4])/100
current = float(reply[4:8])/100
return volt*current
class Output(HasIO, Writable):
value = Parameter(datatype=FloatRange(0,100,unit='%'))
target = Parameter(datatype=FloatRange(0,100,unit='%'))
maxvolt = Parameter('voltage at 100%',datatype=FloatRange(0,60,unit='V'),default=50,readonly=False)
maxcurrent = Parameter('current at 100%',datatype=FloatRange(0,5,unit='A'),default=2,readonly=False)
output_enable = Parameter('control on/off', BoolType(), readonly=False)
def initModule(self):
super().initModule()
self.write_output_enable(False)
def write_target(self, target):
self.write_output_enable(target != 0)
self.communicate(f'VOLT{round(max(8,target*self.maxvolt/10)):03d}')
self.communicate(f'CURR{round(target*self.maxcurrent):03d}')
self.value = target
def write_output_enable(self, value):
self.communicate(f'SOUT{int(not value)}')
def write_maxvolt(self, maxvolt):
self.communicate(f'SOVP{round(maxvolt*10):03d}')
def write_maxcurrent(self, maxcurrent):
self.communicate(f'SOCP{round(maxcurrent*100):03d}')
def shutdown(self):
self.write_target(0)

65
frappy_psi/drums.py Normal file
View File

@ -0,0 +1,65 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""iono pi max relais drums
for demo purposes
"""
from frappy.core import Module, Attached, FloatRange, StringType, Writable, Parameter
class Drums(Writable):
target = Parameter('drum speed', FloatRange(unit='beats/min'))
left = Attached(Writable)
right = Attached(Writable)
pollinterval = Parameter('drum interval', FloatRange(0, 10, unit='s'), readonly=False)
pattern = Parameter('''pattern
a string containing:
L,R: left / right relais on
l,r: left / right relais off
blank: wait''', StringType(), readonly=False)
_pos = 0
_wait = 0
def initModule(self):
super().initModule()
self.actions = {'L': self.left, 'R': self.right}
def write_target(self, target):
self.pollinterval = 60 / max(1, target)
self.value = target
def doPoll(self):
if not self.target:
return
if self._wait:
self._wait -= 1
return
if self._pos >= len(self.pattern):
self._pos = 0
for i, action in enumerate(self.pattern[self._pos:]):
upper = action.upper()
relais = self.actions.get(action.upper())
if relais:
relais.write_target(upper == action) # True when capital letter
else:
self._wait = int(action) - 1
self._pos += i + 1
return

49
frappy_psi/furnace.py Normal file
View File

@ -0,0 +1,49 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""interlocks for furnance"""
import time
from frappy.core import Module, Writable, Attached, Parameter, FloatRange, Readable, BoolType, ERROR, IDLE
class Interlocks(Module):
input = Attached(Readable, 'the input module')
vacuum = Attached (Readable, 'the vacuum pressure')
wall_T = Attached (Readable, 'the wall temperature')
control = Attached(Module, 'the control module')
relais = Attached(Writable, 'the interlock relais')
wall_limit = Parameter('maximum wall temperature', FloatRange(0, unit='degC'),
default = 50, readonly = False)
vacuum_limit = Parameter('maximum vacuum pressure', FloatRange(0, unit='mbar'),
default = 0.1, readonly = False)
def doPoll(self):
super().doPoll()
if self.input.status[0] >= ERROR:
self.control.status = self.input.status
elif self.vacuum.value > self.vacuum_limit:
self.control.status = ERROR, 'bad vacuum'
elif self.wall_T.value > self.wall_limit:
self.control.status = ERROR, 'wall overheat'
else:
return
self.control.write_control_active(False)
self.relais.write_target(False)

125
frappy_psi/ionopimax.py Normal file
View File

@ -0,0 +1,125 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Jael Celia Lorenzana <jael-celia.lorenzana@psi.ch>
# *****************************************************************************
from frappy.core import Readable, Writable, Parameter, BoolType, StringType,\
FloatRange, Property, TupleOf, ERROR, IDLE
from math import log
class Base:
addr = Property('address', StringType())
def read(self, addr, scale=None):
with open(f'/sys/class/ionopimax/{self.devclass}/{addr}') as f:
result = f.read()
if scale:
return float(result) / scale
return result
def write(self, addr, value, scale=None):
value = str(round(value * scale)) if scale else str(value)
with open(f'/sys/class/ionopimax/{self.devclass}/{addr}', 'w') as f:
f.write(value)
class DigitalInput(Base, Readable):
value = Parameter('input state', BoolType())
devclass = 'digital_in'
def read_value(self):
return self.read(self.addr, 1)
class DigitalOutput(DigitalInput, Writable):
target = Parameter('output state', BoolType(), readonly=False)
devclass = 'digital_out'
def write_target(self, value):
self.write(self.addr, value, 1)
class AnalogInput(Base, Readable):
value = Parameter('analog value', FloatRange())
rawrange = Property('raw range(electronic)', TupleOf(FloatRange(),FloatRange()))
valuerange = Property('value range(physical)', TupleOf(FloatRange(),FloatRange()))
devclass = 'analog_in'
def read_value(self):
x0, x1 = self.rawrange
y0, y1 = self.valuerange
self.x = self.read(self.addr, self.scale)
return y0 + (y1 - y0) * (self.x - x0) / (x1 - x0)
class VoltageInput(AnalogInput):
scale = 1e5
def initModule(self):
super().initModule()
self.write(f'{self.addr}_mode','U')
class LogVoltageInput(VoltageInput):
def read_value(self):
x0, x1 = self.rawrange
y0, y1 = self.valuerange
self.x = self.read(self.addr, self.scale)
a = (x1-x0)/log(y1/y0,10)
return 10**((self.x-x1)/a)*y1
class CurrentInput(AnalogInput):
scale = 1e6
rawrange = (0.004,0.02)
def initModule(self):
super().initModule()
self.write(f'{self.addr}_mode','U')
def read_value(self):
result = super().read_value()
if self.x > 0.021:
self.status = ERROR, 'sensor broken'
else:
self.status = IDLE, ''
return result
class AnalogOutput(AnalogInput, Writable):
target = Parameter('outputvalue', FloatRange())
devclass = 'analog_out'
def write_target(self, value):
x0, x1 = self.rawrange
y0, y1 = self.valuerange
self.write(self.addr, x0 + (x1 - x0) * (value - y0) / (y1 - y0),self.scale)
class VoltageOutput(AnalogOutput):
rawrange = (0,10)
scale = 1e3
def initModule(self):
super().initModule()
self.write(f'{self.addr}_enabled', '0')
self.write(f'{self.addr}_mode', 'V')
self.write(f'{self.addr}', '0')
self.write(f'{self.addr}_enabled', '1')

View File

@ -28,9 +28,9 @@ from frappy_psi.magfield import Magfield, SimpleMagfield, Status
from frappy_psi.mercury import MercuryChannel, off_on, Mapped
from frappy.states import Retry
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3)
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3, not_found=4)
hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
RTOZ=Action.run_to_zero, CLMP=Action.clamped)
RTOZ=Action.run_to_zero, CLMP=Action.clamped, NOT_FOUND=Action.not_found)
CURRENT_CHECK_SIZE = 2
@ -161,6 +161,7 @@ class Field(SimpleField, Magfield):
if self.switch_heater == self.switch_heater.on:
self.__persistent_field = current
self.forced_persistent_field = False
self._field_mismatch = False
return current
pf = self.query('DEV::PSU:SIG:PFLD')
if self.__persistent_field is None:

View File

@ -294,6 +294,7 @@ class HeaterOutput(HasInput, Writable):
volt = 0.0 # target voltage
_last_target = None
_volt_target = None
_resistivity = 10
def read_limit(self):
return self.query('DEV::HTR:VLIM') ** 2 / self.resistivity
@ -329,10 +330,15 @@ class HeaterOutput(HasInput, Writable):
res = volt / current
tol = res * max(max(0.0003, abs(volt - self._volt_target)) / volt, 0.0001 / current, 0.0001)
if abs(res - self.resistivity) > tol + 0.07 and self._last_target:
self.write_resistivity(round(res, 1))
if self.controlled_by == 0:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
res = round(res, 1)
if self._resistivity != res and 10 <= res <= 100:
# we want twice the same value before changing
self._resistivity = res
else:
self.write_resistivity(res)
if self.controlled_by == 0:
self._volt_target = math.sqrt(self._last_target * self.resistivity)
self.change('DEV::HTR:SIG:VOLT', self._volt_target, tolerance=2e-4)
return volt * current
def read_target(self):

68
frappy_psi/pdld.py Normal file
View File

@ -0,0 +1,68 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""PDLD Laser"""
from frappy.core import StringIO, Parameter, HasIO, \
Writable, FloatRange, BoolType, IDLE, ERROR
class IO(StringIO):
end_of_line = ('>', '\r')
class Laser(HasIO, Writable):
ioClass = IO
value = Parameter('on/off', BoolType())
target = Parameter('on/off', BoolType())
def get_par(self, cmd):
return float(self.communicate(cmd).split()[-1])
def read_value(self):
reply = float(self.communicate('MCM').split()[-1])
if reply == 10:
self.status = IDLE, ''
return True
if reply in (0, 40):
self.status = IDLE, ''
return False
return ERROR, 'bad OP mode %s', reply
def write_target(self, value):
if value:
self.communicate('SALO')
else:
self.communicate('SALS')
class LaserPower(HasIO, Writable):
value = Parameter('power readback', FloatRange(unit='mW'))
target = Parameter('power setpoint', FloatRange(0, 300, unit='mW'), readonly=False)
ioClass = IO
def read_value(self):
return float(self.communicate('MPO').split()[-1])
def read_target(self):
return float(self.communicate('MPR').split()[-1])
def write_target(self, value):
self.communicate(f'SPR{int(value):03d}')

64
frappy_psi/pfeiffer.py Normal file
View File

@ -0,0 +1,64 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""pfeiffer TPG vacuum pressure reading"""
from frappy.core import StringIO, HasIO, Command, StringType, IntRange, \
IDLE, WARN, ERROR, Readable, Parameter, Property
from frappy.errors import CommunicationFailedError
ACK = '\x06'
ENQ = '\x05'
class IO(StringIO):
end_of_line = '\r\n'
default_settings = {'baudrate': 9600}
def communicate(self, command, noreply=False):
with self._lock:
ack = super().communicate(command)
if ack != ACK:
raise CommunicationFailedError('no ack received')
if noreply:
# to be used for changing parameters when needed
return None
return super().communicate(ENQ)
class Pressure(HasIO, Readable):
value = Parameter(unit='mbar')
channel = Property('channel number', IntRange(1,2), default=1)
STATUS_MAP = {
'0': (IDLE, ''),
'1': (WARN, 'underrange'),
'2': (WARN, 'overrange'),
'3': (ERROR, 'sensor error'),
'4': (ERROR, 'sensor off'),
'5': (ERROR, 'no sensor'),
'6': (ERROR, 'identification error'),
}
ioClass = IO
def read_value(self):
reply = self.communicate(f'PR{self.channel}')
status, strvalue = reply.split(',')
self.status = self.STATUS_MAP.get(status, (ERROR, 'bad status'))
return float(strvalue)

70
frappy_psi/picontrol.py Normal file
View File

@ -0,0 +1,70 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Jael Celia Lorenzana <jael-celia.lorenzana@psi.ch>
# *****************************************************************************
"""PI control for furnance"""
import time
from frappy.core import Writable, Attached, Parameter, FloatRange, Readable, BoolType, ERROR, IDLE
class PI(Writable):
input = Attached(Readable, 'the input module')
output = Attached(Writable, 'the output module')
relais = Attached(Writable, 'the interlock relais')
p = Parameter('proportional term', FloatRange(0), readonly=False)
i = Parameter('integral term', FloatRange(0), readonly=False)
control_active = Parameter('control flag', BoolType(), readonly=False, default=False)
value = Parameter(unit='degC')
_lastdiff = None
_lasttime = 0
def doPoll(self):
super().doPoll()
if not self.control_active:
return
self.value = self.input.value
self.status = IDLE, 'controlling'
now = time.time()
deltat = min(10, now-self._lasttime)
self._lasttime = now
diff = self.target - self.value
if self.value > 300:
self.write_control_active(False)
return
if self._lastdiff is None:
self._lastdiff = diff
deltadiff = diff - self._lastdiff
self._lastdiff = diff
output = self.output.target
output += self.p * deltadiff + self.i * deltat * diff
if output > 100:
output = 100
elif output < 0:
output = 0
self.output.write_target(output)
def write_control_active(self, value):
if not value:
self.output.write_target(0)
def write_target(self, value):
self.control_active = True
self.relais.write_target(1)

62
frappy_psi/tdkpower.py Normal file
View File

@ -0,0 +1,62 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Leon Zimmermann <leon.zimmermann@psi.ch>
# *****************************************************************************
"""Powersupply TDK-Lambda GEN8-400-1P230"""
from frappy.core import StringIO, Readable, Parameter, FloatRange, Writable, HasIO, BoolType
class IO(StringIO):
end_of_line = ('OK\r', '\r')
default_settings = {'baudrate': 9600}
class Power(HasIO, Readable):
value = Parameter(datatype=FloatRange(0,3300,unit='W'))
def read_value(self):
reply_volt = self.communicate('MV?')
reply_current = self.communicate('MC?')
volt = float(reply_volt)
current = float(reply_current)
return volt*current
class Output(HasIO, Writable):
value = Parameter(datatype=FloatRange(0,100,unit='%'))
target = Parameter(datatype=FloatRange(0,100,unit='%'))
maxvolt = Parameter('voltage at 100%',datatype=FloatRange(0,8,unit='V'),readonly=False)
maxcurrent = Parameter('current at 100%',datatype=FloatRange(0,400,unit='A'),readonly=False)
output_enable = Parameter('control on/off', BoolType(), readonly=False)
def initModule(self):
super().initModule()
self.write_output_enable(False)
def write_target(self, target):
self.write_output_enable(target != 0)
self.communicate(f'PV {target*self.maxvolt:.5f}')
self.communicate(f'PC {target*self.maxcurrent:.5f}')
self.value = target
def write_output_enable(self, value):
self.communicate(f'OUT {int(value)}')
def shutdown(self):
self.write_target(0)