3 Commits

Author SHA1 Message Date
1bb869b43e capillary heater: heater is now a writable
The value should show the actual heater power, but
we do not know yet the address. Currently the value
is just equal to the maxheater parameter.
2025-11-27 17:55:52 +01:00
3ede9eb9f4 frappy_psi.logo: revised version after merging capillary heater
- check that dil5 still works!

Change-Id: Ibe98e64088f2f886888af170a1f38d699927eb58
2025-11-27 09:32:37 +01:00
f57400feb9 frappy_psi.picontrol: stop when switching to manual mode
Change-Id: I3ffb9a109fb3b04fbca06f5a72acbfbd19525aae
2025-11-27 07:49:29 +01:00
21 changed files with 498 additions and 1809 deletions

View File

@@ -1,22 +0,0 @@
Node('pdld_laser.psi.ch',
'PDLD laser',
interface = 'tcp://5000',
)
Mod('laser_io',
'frappy_psi.pdld.IO',
'laser IO',
uri='serial:///dev/ttyUSB0?baudrate=9600',
)
Mod('laser',
'frappy_psi.pdld.Laser',
'laser switch',
io='laser_io',
)
Mod('laser_power',
'frappy_psi.pdld.LaserPower',
'laser power',
io='laser_io',
)

View File

@@ -1,19 +0,0 @@
Node('bronkhorsttest.psi.ch',
'bronkhorst test',
'tcp://5000',
)
Mod('io',
'frappy_psi.bronkhorst.IO',
'bronkhorst communication',
uri='tcp://localhost:3005',
)
Mod('p',
'frappy_psi.bronkhorst.Controller',
'pressure controller',
io='io',
adr=128,
scale=18,
value=Param(unit='mbar')
)

View File

@@ -1,17 +0,0 @@
Node('bronkhorst_test.psi.ch',
'bronkhorst test',
'tcp://5000',
)
Mod('io',
'frappy_psi.bronkhorst.IO',
'',
uri='dil4-ts:3002',
)
Mod('flow',
'frappy_psi.bronkhorst.Sensor',
'flow',
io='io',
value=Param(unit='ln/min'),
)

View File

@@ -1,100 +0,0 @@
Node('dil4_test.psi.ch',
'dil4 test',
'tcp://5000',
)
Mod('io',
'frappy_psi.oiclassic.IGH_IO',
'',
uri='dil4-ts:3005',
)
Mod('p',
'frappy_psi.oiclassic.Pressure',
'pressure',
io='io',
addr='G2',
)
Mod('P_mix',
'frappy_psi.oiclassic.MixPower',
'mix power',
io='io',
)
Mod('P_sorb',
'frappy_psi.oiclassic.SorbPower',
'sorb power',
io='io',
)
Mod('P_still',
'frappy_psi.oiclassic.StillPower',
'still power',
io='io',
)
Mod('mot_fast',
'frappy_psi.oiclassic.MotorValve',
'fast motor valve',
io='io',
)
Mod('mot_slow',
'frappy_psi.oiclassic.SlowMotorValve',
'slow motor valve',
io='io',
)
Mod('valve',
'frappy_psi.oiclassic.Valve',
'solenoid valve',
io='io',
addr='V2',
)
Mod('valve_pulsed',
'frappy_psi.oiclassic.PulsedValve',
'pulsed valve',
io='io',
addr='V2',
)
Mod('upperLN2',
'frappy_psi.oiclassic.N2Sensor',
'upper LN2',
)
Mod('lowerLN2',
'frappy_psi.oiclassic.N2Sensor',
'lower LN2',
)
Mod('pump',
'frappy_psi.oiclassic.Pump',
'pump feedback',
io='io',
addr='rotary_pump_He3',
upper_LN2='upperLN2',
lower_LN2='lowerLN2',
)
Mod('io_flow',
'frappy_psi.bronkhorst.IO',
'',
uri='dil4-ts:3002',
)
Mod('flow',
'frappy_psi.bronkhorst.Sensor',
'flow',
io='io_flow',
value=Param(unit='ln/min'),
)
Mod('fun',
'frappy_psi.softcal.Function',
'modified flow',
rawsensor='flow',
formula='x',
)

View File

@@ -83,38 +83,37 @@ Mod('compressor',
)
Mod('p2',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'pressure after compressor',
io = 'logo',
addr ="VW0",
value = Param(unit='mbar'),
)
pollinterval=0.5,
)
Mod('p1',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'dump pressure',
io = 'logo',
addr ="VW28",
value = Param(unit='mbar'),
)
pollinterval=0.5,
)
Mod('p5',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'pressure after forepump',
io = 'logo',
addr ="VW4",
value = Param(unit='mbar'),
)
pollinterval = 0.5,
)
Mod('airpressure',
'frappy_psi.logo.DigitalValue',
'frappy_psi.logo.Comparator',
'Airpressure state',
io = 'logo',
addr ="V1024.7",
)
threshold = 500,
pollinterval = 0.5,
)
Mod('io_pfeiffer',
'frappy_psi.pfeiffer_new.PfeifferProtocol',

94
cfg/gas10ka_cfg.py Normal file
View File

@@ -0,0 +1,94 @@
Node('gas10ka.psi.ch',
'10kBar Gas pressure stick',
interface='tcp://5010',
)
Mod('io',
'frappy_psi.logo.IO',
'',
ip_address = "192.168.1.1",
tcap_client = 0x3000,
tsap_server = 0x2000
)
Mod('R_pt10k',
'frappy_psi.logo.Resistor',
'raw sensor value of T_p10k',
io = 'io',
addr = "VW0",
)
Mod('T_pt10k',
'frappy_psi.softcal.Sensor',
'temperature close to sample',
value=Param(unit='K'),
rawsensor='R_pt10k',
calcurve='pt10000e',
)
Mod('R_top',
'frappy_psi.logo.Resistor',
'raw sensor value of T_top',
io = 'io',
addr = "VW2",
)
Mod('T_top',
'frappy_psi.softcal.Sensor',
'capillary temperature at highest position',
value=Param(unit='K'),
rawsensor='R_top',
calcurve='pt1000e',
)
Mod('R_mid',
'frappy_psi.logo.Resistor',
'raw sensor value of T_mid',
io = 'io',
addr = "VW6",
)
Mod('T_mid',
'frappy_psi.softcal.Sensor',
'capillary temperature at mid position',
value=Param(unit='K'),
rawsensor='R_mid',
calcurve='pt1000e',
)
Mod('R_bot',
'frappy_psi.logo.Resistor',
'raw sensor value of T_bot',
io = 'io',
addr = "VW4",
)
Mod('T_bot',
'frappy_psi.softcal.Sensor',
'capillary temperature at lower position',
value=Param(unit='K'),
rawsensor='R_bot',
calcurve='pt1000e',
)
Mod('R_sam_cx',
'frappy_psi.logo.Resistor',
'sensor',
io = 'io',
addr = "VW16",
)
Mod('T_sam_cx',
'frappy_psi.softcal.Sensor',
'?',
value=Param(unit='K'),
rawsensor='R_sam_cx',
calcurve='X174785',
)
Mod('heater',
'frappy_psi.capillary_heater.Heater',
'the capillary heater',
io = 'io',
)

View File

@@ -1,16 +0,0 @@
Node('hcptest.psi.ch',
'high voltage supply test',
'tcp://5000',
)
Mod('io',
'frappy_psi.hcp.IO',
'hcp communication',
uri='serial:///dev/tty.usbserial-21440?baudrate=9600',
)
Mod('voltage',
'frappy_psi.hcp.Voltage',
'fug hcp 14-6500 voltage',
io='io',
)

View File

@@ -35,8 +35,8 @@ Mod('ts',
'frappy_psi.parmod.Converging',
'test for parmod',
unit='K',
read='th.value',
write='th.setsamp',
value_param='th.value',
target_param='th.setsamp',
meaning=['temperature', 20],
settling_time=20,
tolerance=1,

View File

@@ -1,149 +0,0 @@
Node('sim_dil_test.test',
'simulated dil4 state machine test',
'tcp://5000',
)
Mod('V1',
'frappy_psi.sim_dil.Valve',
'condense valve',
value='close',
)
Mod('V2',
'frappy_psi.sim_dil.Valve',
'circuitshort valve',
value='close',
)
Mod('V3',
'frappy_psi.sim_dil.Valve',
'circuitshort valve',
value='close',
)
Mod('V4',
'frappy_psi.sim_dil.Valve',
'still valve',
value='close',
)
Mod('V5',
'frappy_psi.sim_dil.Valve',
'still valve',
value='close',
)
Mod('V7',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V8',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V9',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V10',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V11A',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V12A',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('V13A',
'frappy_psi.sim_dil.Valve',
'dump valve',
value='close',
)
Mod('pump_He3',
'frappy_psi.sim_dil.Valve',
'rotary_pump_He3',
value='close',
)
Mod('V14',
'frappy_psi.sim_dil.PulsedValve',
'pulsed valve',
value='close',
)
Mod('V6_motor',
'frappy_psi.sim_dil.Sensor',
'motor valve',
value=Param(0.0, unit='%'),
)
Mod('G1',
'frappy_psi.sim_dil.Sensor',
'condensline pressure',
value=Param(0, unit='mbar')
)
Mod('G3',
'frappy_psi.sim_dil.Sensor',
'dump pressure',
value=Param(0, unit='mbar')
)
Mod('P1',
'frappy_psi.sim_dil.Sensor',
'still pressure',
value=Param(0, unit='mbar')
)
Mod('T_oneK',
'frappy_psi.sim_dil.Sensor',
'temp one Kelvin chamber',
value=Param(4, unit='K'),
)
Mod('T_still',
'frappy_psi.sim_dil.Sensor',
'temp still chamber',
value=Param(4, unit='K'),
)
Mod('T_mix',
'frappy_psi.sim_dil.Sensor',
'temp mix chamber',
value=Param(4, unit='K'),
)
Mod('dil',
'frappy_psi.dilution_new.DIL4',
'dilution state machine',
condenseline_pressure='G1', # G1
condense_valve='V1', # V1
dump_valve='V9', # V9
forepump='pump_He3', # rotary_pump_He3 (24)
condenseline_valve='V1', # V1
circuitshort_valve='V3', # V3
still_valve='V6_motor', # V6
pumpout_valve='V14', # V14
still_pressure='P1', # P1
dump_pressure='G3', # G3
oneK_temp='T_oneK',
still_temp='T_still',
mix_temp='T_mix',
sorb_pump_time=30,
)

View File

@@ -1,37 +0,0 @@
Node('fibrestick.psi.ch',
'stick with laser fibre',
)
Mod('sea_stick',
'frappy_psi.sea.SeaClient',
'SEA stick connection',
config='fibre.stick',
service='stick',
)
Mod('ts',
'frappy_psi.sea.SeaReadable', '',
meaning=['temperature', 30],
io='sea_stick',
sea_object='tt',
json_file='ma11.config.json',
rel_paths=['ts'],
)
Mod('laser_io',
'frappy_psi.pdld.IO',
'laser IO',
uri='serial:///dev/serial/by-path/pci-0000:00:14.0-usb-0:4.4.4.2:1.0-port0?baudrate=9600',
)
Mod('laser',
'frappy_psi.pdld.Laser',
'laser switch',
io='laser_io',
)
Mod('laser_power',
'frappy_psi.pdld.LaserPower',
'laser power',
io='laser_io',
)

View File

@@ -1,17 +0,0 @@
Node('test_ips.psi.ch',
'ips test',
'tcp://5000',
)
Mod('io',
'frappy_psi.oiclassic.IPS_IO',
'',
uri='ma11-ts:3002',
)
Mod('B',
'frappy_psi.oiclassic.Field',
'magnetic field',
io='io',
target=Param(max=0.2),
)

View File

@@ -37,13 +37,6 @@ class MathParser:
ast.Div: op.truediv,
ast.Pow: op.pow,
ast.FloorDiv: op.floordiv,
ast.Lt: op.lt,
ast.Gt: op.gt,
ast.LtE: op.le,
ast.GtE: op.ge,
ast.Eq: op.eq,
ast.NotEq: op.ne,
ast.Not: op.not_,
ast.USub: op.neg,
ast.UAdd: lambda a:a}
@@ -81,15 +74,6 @@ class MathParser:
if isinstance(node, ast.BinOp): # evaluate binary operations
method = self._operators2method[type(node.op)]
return method( self.eval_(node.left), self.eval_(node.right))
if isinstance(node, ast.Compare): # evaluate binary operations
left = self.eval_(node.left)
for oper, value in zip(node.ops, node.comparators):
method = self._operators2method[type(oper)]
right = self.eval_(value)
if not method(left, right):
return False
left = right
return True
if isinstance(node, ast.UnaryOp): # handle operators
method = self._operators2method[type(node.op)]
return method( self.eval_(node.operand) )

View File

@@ -1,160 +0,0 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Anik Stark <anik.stark@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Bronkhorst flow or pressure regulators. Communication via ProPar protocol.
write command: :LnAd01PrTpData\r\n
read command: :LnAd04CopyPrTp\r\n (Ln = 06)
answer: :LnAd02CopyData\r\n
Ln: number of bytes (hex digits pairs) following
Ad: node address (3...120, always a reply if message is sent to node address 128)
Copy: values just to be copied by the reply (first and third digit < 8)
recommended practice: Use PrTp for Copy
Pr: Process number (<80, manual page 24pp)
Tp: Type + parameter number. Type: 00 byte, 20 int, 40 long/float, 60 string
for strings either 00 (for nul terminated) or the max. number of chars
has to be appended to the type
Data: length depending on type.
read command for direct communication: :06800401210120
Send values on a scale from 0-32000 (0-100%).
"""
from frappy.core import StringIO, HasIO, Readable, Writable, Drivable, Parameter, Property, \
FloatRange, BoolType, EnumType, IntRange, IDLE, BUSY
from frappy.errors import CommunicationFailedError
class IO(StringIO):
end_of_line = '\r\n' # hex: 0D0A
addr = 128
identification = [(f':07{addr:02X}047163716300', f':10{addr:02X}02716300.*')] # serial number
default_settings = {'baudrate': 38400}
def intpar(process, parameter):
return '06', f'{process:02X}{parameter|0x20:02X}'
def longpar(process, parameter):
return '08', f'{process:02X}{parameter|0x40:04X}'
MEASURE = intpar(1, 0)
SETPOINT = intpar(1, 1)
RAMP = intpar(1, 2)
CONTROL = longpar(114, 1)
class Sensor(HasIO, Readable):
ioClass = IO
value = Parameter('pressure', FloatRange())
scale = Property('scale factor', FloatRange(), default=1)
addr = Property('node adress', IntRange(0, 255), default=128)
def get_par(self, length, param, scale):
reply = self.communicate(f':{length}{self.addr:02X}04{param}{param}')
if reply[:11] != f':{length}{self.addr:02X}02{param}':
return CommunicationFailedError(f'bad reply: {reply}')
val = int(reply[11:14], 16) / 32000 * scale
return val
def read_value(self):
return self.get_par(*MEASURE, self.scale)
class Controller(Sensor, Writable):
def set_par(self, length, param, scale, value):
reply = self.communicate(f':{length}{self.addr:02X}01{param}{round(value/scale):04X}')
if reply[:8] != f':04{self.addr:02X}0000':
raise CommunicationFailedError(f'bad reply: {reply}')
return self.get_par(length, param, scale)
def read_target(self):
return self.get_par(*SETPOINT, self.scale)
def write_target(self, value):
val = value / self.scale * 32000
return self.set_par(*SETPOINT, self.scale, val)
class HasRamp(Drivable):
setpoint = Parameter('running setpoint', FloatRange())
ramp_enable = Parameter('enable ramp mode', BoolType())
ramp = Parameter('slope of ramp', FloatRange(1e-6, unit='mbar/min'))
tolerance = Property('tolerance for target vs. running setpoint', FloatRange(), default=1)
def read_target(self):
# overwrite Controller.read_target() as setpoint is running
return self.read_target
def write_target(self, target):
super().write_target(target)
self.status = BUSY, 'ramping'
def read_setpoint(self):
return super().read_target()
def read_ramp(self):
if abs(self.read_setpoint() - self.target) < self.tolerance:
self.status = IDLE, ''
def write_ramp(self, ramp):
if self.ramp_enable:
time = min(self.scale / ramp, 3000)
return self.set_par(*RAMP, (60 / 0.1), time)
def write_ramp_enable(self, flag):
if flag:
self.write_ramp(self.ramp)
else:
self.set_par(*RAMP, (60 / 0.1), 0)
class HasControlMode():
control_active = Parameter('control mode active', BoolType())
control = Property('control mode', EnumType(manual=4, loop=11))
output = Parameter('valve output', FloatRange(), readonly=False)
def write_control(self, value):
if self.control_active:
val = self.control.get(value, 4)
return self.set_par(*CONTROL, 1, val)
def write_output(self, value):
scale = (2**24 - 1) / 100
self.set_par(*CONTROL, scale, value)
class ControllerRamp(HasRamp, Controller):
pass
class ControllerControlMode(HasControlMode, Controller):
pass
class ControllerRampControlMode(HasRamp, HasControlMode, Controller):
pass

View File

@@ -0,0 +1,53 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.core import Writable, BoolType, FloatRange, HasIO, Property, StringType, Parameter
from frappy_psi.logo import Sensor
class Heater(Sensor):
addr = 'VW10' # heater readback address
scale = 0.1
switch_addr = Property('address for switch', StringType(), default='V0.1')
enable_addr = Property('address for enable', StringType(), default='V0.0')
maxheater_addr = Property('address for target', StringType(), default='VW10')
value = Parameter(unit='%')
switch = Parameter('heater is enabled', BoolType())
enable = Parameter('heater enabled', BoolType(), readonly=False)
maxheater = Parameter('max. heater power', FloatRange(unit='%'), readonly=False)
def read_switch(self):
return self.get_vm_value(self.switch_addr)
def read_enable(self):
return self.get_vm_value(self.enable_addr)
def write_enable(self, value):
self.set_vm_value(self.enable_addr, value)
return self.read_enable()
def read_maxheater(self):
return self.get_vm_value(self.maxheater_addr, self.scale)
def write_maxheater(self, value):
self.io.set_vm_value(self.maxheater_addr, value, self.scale)
return self.read_maxheater()

View File

@@ -1,260 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Andrea Plank <andrea.plank@psi.ch>
# Anik Stark <anik.stark@psi.ch>
#
# *****************************************************************************
from frappy.core import Drivable, Parameter, Attached, FloatRange, \
IDLE, BUSY, WARN, ERROR
from frappy.datatypes import EnumType, BoolType, StructOf, StringType
from frappy.states import Retry, Finish, status_code, HasStates
from frappy.lib.enum import Enum
from frappy.errors import ImpossibleError
from frappy.lib.mathparser import MathParser
T = Enum( # target states
off = 0,
sorbpump = 2,
condense = 5,
remove = 7,
remove_and_sorbpump = 9,
remove_and_condense = 10,
manual = 11,
test = 12,
)
V = Enum(T, # value status inherits from target status
sorbpumping=1,
condensing=4,
circulating=6,
removing=8,
)
class Dilution(HasStates, Drivable):
condenseline_pressure = Attached() # G1
condense_valve = Attached() # V1
dump_valve = Attached() # V9
forepump = Attached() # rotary_pump_He3 (24)
condenseline_valve = Attached() # V1
circuitshort_valve = Attached() # V3
still_valve = Attached() # V6
pumpout_valve = Attached() # V14
still_pressure = Attached() # P1
dump_pressure = Attached() # G3
oneK_temp = Attached()
still_temp = Attached()
mix_temp = Attached()
value = Parameter('current state', EnumType(V), default=0)
target = Parameter('target state', EnumType(T), default=0)
sorbpumped = Parameter('sorb pump done', BoolType(), default=False, readonly=False)
sorb_cond = Parameter('sorb condition', StringType(), default='oneK>4', readonly=False)
sorb_pump_time = Parameter('sorb pump time', FloatRange(), default=2400, readonly=False)
dump_target = Parameter('low dump pressure limit indicating end of condensation phase',
FloatRange(unit='mbar * min'), readonly=False, default=50)
pulse_factor = Parameter('factor for calculating pump out pulse length',
FloatRange(unit='mbar'), readonly=False, default=20)
end_condense_pressure = Parameter('low condense pressure indicating end of condensation phase',
FloatRange(unit='mbar'), readonly=False, default=50)
end_remove_pressure = Parameter('pressure reached before end of remove (before fore pump)',
FloatRange(unit='mbar'), readonly=False, default=0.02)
condensing_p_low = Parameter('when to start pumping dump', datatype=FloatRange(unit='mbar'), default=500, readonly=False)
st = StringType()
valve_set = StructOf(close=st, open=st, check_open=st, check_closed=st)
condense_valves = Parameter('valve to act when condensing', valve_set)
valves_after_remove = Parameter('valve to act after remove', valve_set)
check_after_remove = Parameter('check for manual valves after remove', valve_set)
init = True
_start_time = 0
_warn_manual_work = None
def write_target(self, target):
"""
if (target == Targetstates.SORBPUMP):
if self.value == target:
return self.target
self.start_machine(self.sorbpump)
self.value = Targetstates.SORBPUMP
return self.value
"""
self.log.info('start %s', target.name)
if self.value == target:
return target
try:
self.start_machine(getattr(self, target.name, None))
except Exception as e:
self.log.exception('error %s', e)
self.log.info('started %s', target.name)
return target
@status_code(BUSY, 'sorbpump state')
def sorbpump(self, state):
""" heat up to Tsorb and wait """
if state.init:
#self.ls372.write_target(40) # set Tsorb to 40K
self.start_time = state.now
self.handle_valves(**self.condense_valves)
return Retry
parser = MathParser(oneK=self.oneK_temp.value, still=self.still_temp.value, mix=self.mix_temp.value)
if parser.calculate(self.sorb_cond):
self.start_time = state.now
if state.now - self.start_time < self.sorb_pump_time:
return Retry
return self.condense
@status_code(BUSY)
def condense(self, state):
""" condense process """
if state.init:
# self.value = V.condensing
self.handle_valves(**self.condense_valves)
self.still_valve.write_target(100)
self._start_time = state.now
return Retry
pdump = self.dump_pressure.value
pcond = self.condenseline_pressure.read_value()
if pcond < self.condensing_p_low and state.now > self._start_time + 5:
pulse_time = 60 * self.pulse_factor / pdump
if pulse_time > 59:
pulse_time = 3600
self.pumpout_valve.delay = pulse_time
self.pumpout_valve.write_target(1)
if pdump > self.dump_target:
return Retry
return self.wait_for_condense_line_pressure
@status_code(BUSY)
def wait_for_condense_line_pressure(self, state):
if self.condenseline_pressure.read_value() > self.end_condense_pressure:
return Retry
self.condense_valve.write_target(0)
return self.circulate
@status_code(BUSY)
def circulate(self, state):
"""Zirkuliert die Mischung."""
if state.init:
self.handle_valves(**self.condense_valves)
if self.wait_valves():
return Retry
self.check_valve_result()
self.value = V.circulating
return Finish
@status_code(BUSY, 'remove (wait for turbo shut down)')
def remove(self, state):
"""Entfernt die Mischung."""
if state.init:
self.handle_valves(**self.remove_valves)
return Retry
self.circuitshort_valve.write_target(1)
return self.remove_endsequence
@status_code(BUSY)
def remove_endsequence(self, state):
if self.still_pressure.read_value() > self.end_remove_pressure:
return Retry
self.circuitshort_valve.write_target(0)
self.dump_valve.write_target(0)
return self.close_valves_after_remove
@status_code(BUSY)
def close_valves_after_remove(self, state):
if state.init:
self.handle_valves(**self.valves_after_remove)
self._warn_manual_work = True
return self.final_status(WARN, 'please check manual valves')
def read_status(self):
status = super().read_status()
if status[0] < ERROR and self._warn_manual_work:
try:
self.handle_valves(**self.check_after_remove)
self._warn_manual_work = False
except ImpossibleError:
return WARN, f'please close manual valves {",".join(self._valves_failed[False])}'
return status
def handle_valves(self, check_closed=(), check_open=(), close=(), open=()):
"""check ot set given valves
raises ImpossibleError, when checks fails """
self._valves_to_wait_for = {}
self._valves_failed = {True: [], False: []}
for flag, valves in enumerate([check_closed, check_open]):
for vname in valves.split():
if self.secNode.modules[vname].read_value() != flag:
self._valves_failed[flag].append(vname)
for flag, valves in enumerate([close, open]):
for vname in valves.split():
valve = self.secNode.modules[vname]
valve.write_target(flag)
if valve.isBusy():
self._valves_to_wait_for[vname] = (valve, flag)
elif valve.read_value() != flag:
self._valves_failed[flag].append(vname)
def wait_valves(self):
busy = False
for vname, (valve, flag) in dict(self._valves_to_wait_for.items()):
statuscode = valve.read_status()[0]
if statuscode == BUSY:
busy = True
continue
if valve.read_value() == flag and statuscode == IDLE:
self._valves_to_wait_for.pop(vname)
else:
self._valves_failed[flag].append(vname)
return busy
def check_valve_result(self):
result = []
for flag, valves in self._valves_failed.items():
if valves:
result.append(f"{','.join(valves)} not {'open' if flag else 'closed'}")
if result:
raise ImpossibleError(f"failed: {', '.join(result)}")
class DIL4(Dilution):
condense_valves = {
'close': 'V2 V3 V4 V7 V8 V10 V11A V12A V13A',
'check_closed': '',
'check_open': '',
'open': 'V1 V5 V9',
}
remove_valves = {
'close': '',
'check_closed': '',
'check_open': '',
'open': '',
}
valves_after_remove = {
'close': '',
'check_closed': '',
'open': '',
'check_open': '',
}
check_after_remove = {
'close': '',
'check_closed': '',
'open': '',
'check_open': '',
}

290
frappy_psi/ips_classic.py Normal file
View File

@@ -0,0 +1,290 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments mercury IPS power supply"""
import time
from frappy.core import Parameter, EnumType, FloatRange, BoolType, IntRange, Property, Module
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError
from frappy_psi.magfield import Magfield, SimpleMagfield, Status
from frappy_psi.mercury import MercuryChannel, off_on, Mapped
from frappy.states import Retry
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3)
hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
RTOZ=Action.run_to_zero, CLMP=Action.clamped)
CURRENT_CHECK_SIZE = 2
class Field(Magfield):
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0)
working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='$'), readonly=False)
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
__switch_fixed_until = 0
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def doPoll(self):
super().doPoll()
self.read_current()
def initialReads(self):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
self.switch_heater = self.query('DEV::PSU:SIG:SWHT', off_on)
super().initialReads()
def read_value(self):
return self.query('DEV::PSU:SIG:FLD')
def read_ramp(self):
return self.query('DEV::PSU:SIG:RFST')
def write_ramp(self, value):
return self.change('DEV::PSU:SIG:RFST', value)
def read_action(self):
return self.query('DEV::PSU:ACTN', hold_rtoz_rtos_clmp)
def write_action(self, value):
return self.change('DEV::PSU:ACTN', value, hold_rtoz_rtos_clmp)
def read_atob(self):
return self.query('DEV::PSU:ATOB')
def read_voltage(self):
return self.query('DEV::PSU:SIG:VOLT')
def read_working_ramp(self):
return self.query('DEV::PSU:SIG:RFLD')
def read_setpoint(self):
return self.query('DEV::PSU:SIG:FSET')
def set_and_go(self, value):
self.setpoint = self.change('DEV::PSU:SIG:FSET', value)
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def start_ramp_to_target(self, sm):
# if self.action != Action.hold:
# assert self.write_action(Action.hold) == Action.hold
# return Retry
self.set_and_go(sm.target)
sm.try_cnt = 5
return self.ramp_to_target
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
def read_value(self):
current = self.query('DEV::PSU:SIG:FLD')
if self.switch_heater == self.switch_heater.on:
self.__persistent_field = current
self.forced_persistent_field = False
return current
pf = self.query('DEV::PSU:SIG:PFLD')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def _check_adr(self, adr):
"""avoid complains about bad slot"""
if adr.startswith('DEV:PSU.M'):
return
super()._check_adr(adr)
def read_current(self):
current = self.query('DEV::PSU:SIG:CURR')
if self.atob:
return current / self.atob
return 0
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
value = self.query('DEV::PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < self.__switch_fixed_until:
self.log.debug('correct fixed switch time')
# probably switch heater was changed, but IPS reply is not yet updated
if self.switch_heater:
self.switch_on_time = time.time()
else:
self.switch_off_time = time.time()
return self.switch_heater
return value
def read_wait_switch_on(self):
return self.query('DEV::PSU:SWONT') * 0.001
def read_wait_switch_off(self):
return self.query('DEV::PSU:SWOFT') * 0.001
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.__switch_fixed_until = time.time() + 10
self.log.debug('switch time fixed for 10 sec')
result = self.change('DEV::PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check
return result
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('DEV::PSU:SIG:PFLD')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry

View File

@@ -21,8 +21,9 @@ import sys
from time import monotonic
from ast import literal_eval
import snap7
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, Communicator
from frappy.datatypes import StringType, BoolType, IntRange, NoneOr, Int32
from frappy.errors import CommunicationFailedError, ConfigError
from threading import RLock
@@ -80,11 +81,16 @@ class IO(Communicator):
class LogoMixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return literal_eval(self.io.communicate(vm_address))
def get_vm_value(self, addr, scale=None):
if scale is None:
return int(self.io.communicate(addr))
return float(self.io.communicate(addr)) * scale
def set_vm_value(self, vm_address, value):
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}'))
def set_vm_value(self, addr, value, scale=None):
if scale is None:
return int(self.io.communicate(f'{addr} {value}'))
reply = self.io.communicate(f'{addr} {round(value / scale)}')
return int(reply) * scale
class DigitalActuator(LogoMixin, Writable):
@@ -219,195 +225,47 @@ class DelayedActuator(DigitalActuator, Drivable):
self._pulse_end = now + delay
class Value(LogoMixin, Readable):
class Sensor(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
def read_value(self):
return self.get_vm_value(self.addr)
return self.get_vm_value(self.addr, self.scale)
def read_status(self):
return IDLE, ''
class DigitalValue(Value):
value = Parameter('airpressure state', datatype=BoolType())
class AnalogOutput(Sensor, Writable):
output_addr = Property('VM address output', datatype=StringType(), default='')
def checkProperties(self):
super().checkProperties()
if not self.output_addr:
self.output_addr = self.addr
def read_value(self):
return self.get_vm_value(self.addr, self.scale)
def write_target(self, target):
return self.set_vm_value(self.output_addr, target, self.scale)
# TODO: the following classes are too specific, they have to be moved
class Pressure(LogoMixin, Drivable):
vm_address = Property('VM address', datatype=StringType())
class Pressure(Sensor):
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
# pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Airpressure(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('airpressure state', datatype=BoolType())
# pollinterval = 0.5
def read_value(self):
if (self.get_vm_value(self.vm_address) > 500):
return 1
else:
return 0
def read_status(self):
return IDLE, ''
class Valve(LogoMixin, Drivable):
vm_address_input = Property('VM address input', datatype=StringType())
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Value state', datatype=BoolType())
_remaining_tries = None
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
self.set_vm_value(self.vm_address_output, target)
self._remaining_tries = 5
self.status = BUSY, 'switching'
self.setFastPoll(True, 0.5)
def read_status(self):
self.log.debug('read_status')
value = self.read_value()
self.log.debug('value %d target %d', value, self.target)
if value != self.target:
if self._remaining_tries is None:
self.target = self.read_value()
return IDLE, ''
self._remaining_tries -= 1
if self._remaining_tries < 0:
self.setFastPoll(False)
return ERROR, 'too many tries to switch'
self.set_vm_value(self.vm_address_output, self.target)
return BUSY, 'switching (try again)'
self.setFastPoll(False)
return IDLE, ''
class FluidMachines(LogoMixin, Drivable):
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Valve state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
class Resistor(Sensor):
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('Heater target', datatype=IntRange())
value = Parameter('Heater Param', datatype=IntRange())
class Comparator(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
value = Parameter('airpressure state', datatype=BoolType())
threshold = Property('threshold for True', FloatRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class controlHeater(LogoMixin, Writable):
vm_address = Property('VM address on switch', datatype=StringType())
target = Parameter('Heater state', datatype=BoolType())
value = Parameter('Heater state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_on)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''
class safetyfeatureState(LogoMixin, Readable):
vm_address = Property('VM address state', datatype=StringType())
value = Parameter('safety Feature state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class safetyfeatureParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class comparatorgekoppeltParam(LogoMixin, Writable):
vm_address_1 = Property('VM address output', datatype=StringType())
vm_address_2 = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address_1)
def write_target(self, target):
self.set_vm_value(self.vm_address_1, target)
return self.set_vm_value(self.vm_address_2, target)
def read_status(self):
return IDLE, ''
return self.get_vm_value(self.addr, self.scale) > self.threshold

View File

@@ -61,7 +61,6 @@ class SimpleMagfield(HasStates, Drivable):
'trained field (positive)',
TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')),
readonly=False, default=(0, 0))
trainmode = Parameter('train mode flag', EnumType(off=0, on=1, undef=2), default=2)
wait_stable_field = Parameter(
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31)
ramp_tmo = Parameter(
@@ -150,24 +149,10 @@ class SimpleMagfield(HasStates, Drivable):
"""
raise NotImplementedError
def handle_train_mode(self):
self.log.info('handle %r %r', self.trained, self.value)
if self.trained[0] < self.value < self.trained[1]:
trainmode = 'off'
else:
trainmode = 'on'
if self.value > 0:
self.trained = (self.trained[0], max(self.trained[1], self.value))
else:
self.trained = (min(self.trained[0], self.value), self.trained[1])
if self.trainmode != trainmode:
self.write_trainmode(trainmode)
@status_code(BUSY, 'ramping field')
def ramp_to_target(self, sm):
if sm.init:
self.init_progress(sm, self.value)
self.handle_train_mode()
# Remarks: assume there is a ramp limiting feature
if abs(self.value - sm.target) > self.tolerance:
if self.get_progress(sm, self.value) > self.ramp_tmo:
@@ -181,15 +166,11 @@ class SimpleMagfield(HasStates, Drivable):
def stabilize_field(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_field:
return Retry
self.handle_train_mode()
return self.final_status()
def read_workingramp(self):
return self.ramp
def write_trainmode(self, value):
"""overwrite when needed"""
class Magfield(SimpleMagfield):
status = Parameter(datatype=StatusType(Status))
@@ -354,7 +335,6 @@ class Magfield(SimpleMagfield):
@status_code(Status.RAMPING)
def ramp_to_target(self, sm):
self.handle_train_mode()
dif = abs(self.value - sm.target)
if sm.init:
sm.stabilize_start = 0 # in case current is already at target
@@ -373,7 +353,6 @@ class Magfield(SimpleMagfield):
@status_code(Status.STABILIZING)
def stabilize_field(self, sm):
self.handle_train_mode()
if sm.now < sm.stabilize_start + self.wait_stable_field:
return Retry
return self.check_switch_off

View File

@@ -1,715 +0,0 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Anik Stark <anik.stark@psi.ch>
# *****************************************************************************
"""oxford instruments old (classic) devices (ILM, IGH, IPS)"""
import time
import re
from frappy.core import Parameter, Property, EnumType, FloatRange, BoolType, \
StringIO, HasIO, Readable, Writable, Drivable, IDLE, BUSY, WARN, ERROR, Attached
from frappy.lib import formatStatusBits
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError, CommunicationFailedError
from frappy_psi.magfield import Magfield, Status
from frappy.states import Retry
def bit(x, pos):
"""Check if the bit at a certain position is set"""
return bool(x & (1 << pos))
class OxBase(HasIO):
def query(self, cmd, scale=None):
reply = self.communicate(cmd)
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}')
if scale is None:
return int(reply[1:])
return float(reply[1:]) * scale
def change(self, cmd, value, scale=None):
try:
self.communicate('C3')
reply = self.communicate(f'{cmd}{round(value / scale)}')
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply}')
finally:
self.communicate('C0')
def command(self, *cmds):
try:
self.communicate('C3')
for cmd in cmds:
self.communicate(cmd)
finally:
self.communicate('C0')
class IPS_IO(StringIO):
"""oxford instruments power supply IPS120-10"""
end_of_line = '\r'
identification = [('V', r'IPS120-10.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4)
status_map = {'0': (IDLE, ''),
'1': (ERROR, 'quenched'),
'2': (ERROR, 'overheated'),
'4': (WARN, 'warming up'),
'8': (ERROR, '')
}
limit_map = {'0': (IDLE, ''),
'1': (WARN, 'on positive voltage limit'),
'2': (WARN, 'on negative voltage limit'),
'4': (ERROR, 'outside negative current limit'),
'8': (ERROR, 'outside positive current limit')
}
class Field(OxBase, Magfield):
""" read commands:
R1 measured power supply voltage (V)
R7 demand field (output field) (T)
R8 setpoint (target field) (T)
R9 sweep field rate (T/min)
R18 persistent field (T)
X Status
control commands:
A set activity
T set field sweep rate
H set switch heater
J set target field """
ioClass = IPS_IO
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='T'), readonly=False)
wait_switch_on = Parameter(default=15)
wait_switch_off = Parameter(default=15)
wait_stable_field = Parameter(default=10)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
_status = '00'
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def doPoll(self):
super().doPoll()
self.read_current()
def initialReads(self):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
super().initialReads()
def read_value(self):
if self.switch_heater:
self.__persistent_field = self.query('R7')
self.forced_persistent_field = False
self._field_mismatch = False
return self.__persistent_field
pf = self.query('R18')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def read_ramp(self):
return self.query('R9')
def write_ramp(self, value):
self.change('T', value)
return self.read_ramp()
def write_action(self, value):
self.change('A', int(value))
self.read_status()
def read_voltage(self):
return self.query('R1')
def read_setpoint(self):
return self.query('R8')
def read_current(self):
return self.query('R7')
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
self.read_status()
return self.switch_heater
def read_status(self):
status = self.communicate('X')
match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status)
if match is None:
raise CommunicationFailedError(f'unexpected status: {status}')
self._status = match.group(1)
self.action = int(match.group(2))
self.switch_heater = match.group(3) == '1'
if self._status[0] != '0':
self._state_machine.stop()
return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}'))
if self._status[1] != '0':
return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too?
return super().read_status()
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.log.debug('switch time fixed for 10 sec')
self.change('H', int(value))
#return result
return int(value)
def set_and_go(self, value):
self.change('J', value)
self.setpoint = self.read_current()
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('R18')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry
def write_trainmode(self, value):
self.change('M', '5' if value == 'off' else '1')
class ILM_IO(StringIO):
"""oxford instruments level meter ILM200"""
end_of_line = '\r'
identification = [('V', r'ILM200.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
timeout = 5
class Level(OxBase, Readable):
""" X code: XcccSuuvvwwRzz
c: position corresponds to channel 1, 2, 3
possible values in each position are 0, 1, 2, 3, 9
vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each
zz: relay status """
ioClass = ILM_IO
value = Parameter('level', datatype=FloatRange(unit='%'))
fast = Parameter('fast reading', datatype=BoolType())
CHANNEL = None
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
MEDIUM = None
_statusbits = None
def read_value(self):
return self.query(f'R{self.CHANNEL}', 0.1)
def write_fast(self, fast):
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
def get_status(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
# '1': channel is used for N2
return ERROR, f'{self.MEDIUM} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
return ERROR, f'bad status message {reply}'
class HeLevel(Level):
value = Parameter('He level', FloatRange(unit='%'))
fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False)
CHANNEL = 1
MEDIUM = 'He'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow'])
class N2Level(Level):
ioClass = ILM_IO
value = Parameter('N2 level', FloatRange(unit='%'))
CHANNEL = 2
MEDIUM = 'N2'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, ''
VALVE_MAP = {'V9': 1,
'V8': 2,
'V7': 3,
'V11A': 4,
'V13A': 5,
'V13B': 6,
'V11B': 7,
'V12B': 8,
'rotary_pump_He4': 9,
'V1': 10,
'V5': 11,
'V4': 12,
'V3': 13,
'V14' : 14,
'V10': 15,
'V2': 16,
'V2A_He4': 17,
'V1A_He4': 18,
'V5A_He4': 19,
'V4A_He4': 20,
'V3A_He4': 21,
'roots_pump': 22,
'unlabeled_pump': 23,
'rotary_pump_He3': 24,
}
class IGH_IO(StringIO):
""" oxford instruments dilution gas handling Kelvinox IGH
X code: XxAaCcPpppSsOoEe
x motorized valves are still initializing
a mix heater activity
c control status (0, 1, 2, 3)
pppp 4 hex numbers (two digits each), state of solenoid valves and pumps
s hex digit, state of the 3 motorized valves
o still and sorb heater information
e mix heater power range """
end_of_line = '\r'
identification = [('V', r'IGH.*')]
default_settings = {'baudrate': 9600}
X_PATTERN = re.compile(r'X(\d)A(\d)C\dP([0-9A-F]{8})S([0-9A-F])O(\d)E(\d)$')
_ini_valves = 0 # ini status of motorized valves
_mix_status = 0
_valves = 0 # status of solenoid valves and pumps
_motor_status = 0
_heater_status = 0
_heater_range = 0
def doPoll(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
ini_valves, mix_status, valves, motor_status, heater_status, heater_range = match.groups()
self._ini_valves = int(ini_valves, 16)
self._mix_status = int(mix_status)
self._valves = int(valves, 16)
self._motor_status = int(motor_status, 16)
self._heater_status = int(heater_status)
self._heater_range = int(heater_range)
class Valve(OxBase, Writable):
ioClass = IGH_IO
value = Parameter('state of valve (open or close)', datatype=EnumType(open=1, close=0))
target = Parameter('open or close valve', datatype=EnumType(open=1, close=0))
addr = Property('valve name', datatype=EnumType(VALVE_MAP))
def read_value(self):
# hex -> int -> check if bit in bin(integer) is set at the addr position
return bit(self.io._valves, self.addr.value - 1)
def write_target(self, target):
# open: 2N, close: 2N + 1
self.change('P', (2 * self.addr.value + 1 - int(target)), 1)
class PulsedValve(Valve):
delay = Parameter('delay (time valve is open)', FloatRange(unit='s'), readonly=False)
_start = 0
def write_target(self, target):
if target:
self._start = time.time()
self.setFastPoll(True, 0.01)
else:
self.setFastPoll(False)
self.change('P', (2 * self.addr.value + 1 - int(target)), 1)
def doPoll(self):
super().doPoll()
if self._start:
if time.time() > self._start + self.delay:
self.write_target(0)
self._start = 0
class MotorValve(OxBase, Writable):
ioClass = IGH_IO
target = Parameter('target of motor valve', datatype=FloatRange(0, 100, unit='%'))
value = Parameter('position of fast valve', datatype=FloatRange(0, 100, unit='%'))
def write_target(self, target):
self.change('H', target, 0.1) # valve V12A
self.value = target
def read_value(self):
return self.target
def read_status(self):
if bit(self.io._ini_valves, 1):
self.value = 0
return BUSY, 'valve V12A is initializing'
return IDLE, ''
class SlowMotorValve(OxBase, Drivable):
ioClass = IGH_IO
target = Parameter('target of slow motor valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
value = Parameter('position of slow valve', datatype=FloatRange(0, 100, unit='%', fmtstr='%.1f'))
_prev_time = 0
def read_target(self):
return self.query('R7', 0.1)
def write_target(self, target):
self.change('G', target, 0.1) # valve V6
self.read_status()
def read_status(self):
if bit(self.io._ini_valves, 0):
self.value = 0
return BUSY, 'valve V6 is initializing'
now = time.time()
if self._prev_time == 0:
self.value = self.read_target()
delta_t = 0
else:
delta_t = now - self._prev_time
self._prev_time = now
if (self.io._motor_status >> 0) & 1:
if self.target > self.value:
self.value = min(self.target, self.value + delta_t / 300 * 100)
else:
self.value = max(self.target, self.value - delta_t / 300 * 100)
return BUSY, 'valve V6 is moving'
self.value = self.target
return IDLE, ''
def stop(self):
"""stop moving"""
self.write_target(self.value)
GAUGE_MAP = {'G1': 14,
'G2': 15,
'G3': 16,
'P1': 20,
'P2': 21,
}
class Pressure(OxBase, Readable):
addr = Property('pressure gauge address', datatype=EnumType(GAUGE_MAP))
def read_value(self):
nr = self.addr.value
if self.addr.name.startswith('G'):
return self.query(f'R{nr}', 0.1)
return self.query(f'R{nr}', 1)
class MixPower(OxBase, Writable):
ioClass = IGH_IO
target = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
value = Parameter('mix power', datatype=FloatRange(0, 0.02, unit='W'))
def read_value(self):
scale = 10**-(7 - self.io._heater_range)
return self.query('R4', scale)
def write_target(self, target):
if target:
self.command('A1') # on, fixed heater power
target = min(0.01999, target)
target_nW = str(int(target * 1e9))
range_mix = max(1, len(target_nW) - 3)
if target_nW >= '2000':
range_mix += 1
scale = 10**-(10 - range_mix)
self.command(f'E{range_mix}')
self.change('M', target, scale)
else:
self.command('A0') # turn off
def read_status(self):
if self.io._mix_status:
return IDLE, 'on'
return IDLE, 'off'
class SorbPower(OxBase, Writable):
""" heater status:
bit 0 still on
bit 1 sorb in temperature control (this ctr mode is not used)
bit 2 sorb in power control """
ioClass = IGH_IO
target = Parameter('sorb power', datatype=FloatRange(0, 2, unit='W')) # Werte 0.001, 2
writecmd = 'B' # in units of 1mW (range 0000 to 1999)
scale = 1e-3
def read_value(self):
if self.io._heater_status & 6:
return self.query('R6', self.scale)
return 0
def write_target(self, target):
self.change('O', self.io._heater_status & 1 | 4 * (target > 0), 1)
self.change('B', target, self.scale)
def read_status(self):
sorb_status = self.io._heater_status & 6
if sorb_status == 2:
return WARN, 'sorb in temperature control mode'
return IDLE, ('on' if sorb_status else 'off')
class StillPower(OxBase, Writable):
""" heater status:
bit 0 still on
bit 1 sorb in temperature control (this ctr mode is not used)
bit 2 sorb in power control """
ioClass = IGH_IO
target = Parameter('still power', datatype=FloatRange(0, 0.2, unit='W'))
readcmd = 'R5'
writecmd = 'S' # in units of 0.1mW (range 0000 to 1999)
scale = 1e-4
def read_value(self):
if self.io._heater_status & 1:
return self.query('R5', self.scale)
return 0
def write_target(self, target):
self.change('O', self.io._heater_status & 6 | (target > 0), 1)
self.change('S', target, self.scale)
def read_status(self):
sorb_status = self.io._heater_status & 1
return IDLE, ('on' if sorb_status else 'off')
class N2Sensor(Readable):
value = Parameter(datatype=FloatRange(unit='K'))
class Pump(Valve):
value = Parameter('state of valve (open or close)', datatype=EnumType(on=1, off=0))
target = Parameter('open or close valve', datatype=EnumType(on=1, off=0))
upper_LN2 = Attached()
lower_LN2 = Attached()
PATTERN = re.compile(r'\?\{(\d),(\d+),(\d+)\}')
def read_value(self):
reply = self.communicate('{r}')
match = self.PATTERN.match(reply)
if match:
value, upper_LN2, lower_LN2 = match.groups()
self.upper_LN2.value = 0.1 * int(upper_LN2)
self.lower_LN2.value = 0.1 * int(lower_LN2)
return int(value)
raise CommunicationFailedError('bad reply to {r}')
def read_target(self):
# hex -> int -> check if bit in bin(integer) is set at the addr position
return bit(self.io._valves, self.addr.value - 1)
def write_target(self, target):
# open: 2 * 24, close: 2 * 24 + 1
self.change('P', 2 * self.addr.value + 1 - target, 1)
self.value = target
def read_status(self):
if self.target and not self.value:
return WARN, 'pump switched off'
return IDLE, ''

View File

@@ -61,7 +61,7 @@ example cfg:
import time
import math
import numpy as np
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, WARN, Property
from frappy.lib import clamp, merge_status
from frappy.datatypes import LimitsType, EnumType, FloatRange
from frappy.errors import SECoPError
@@ -296,6 +296,13 @@ class PI(HasConvergence, PImixin):
super().write_target(target)
self.convergence_start()
def deactivate_control(self, source=None):
super().deactivate_control(source)
if self.isBusy():
self.convergence_state.stop_status = (
WARN, f'switched to manual mode by {source or self.name}')
self.convergence_state.start(self.convergence_interrupt)
# unchecked!
class PI2(PI):

View File

@@ -1,63 +0,0 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Anik Stark <anik.stark@psi.ch>
#
# *****************************************************************************
import time
from frappy.core import Parameter, EnumType, FloatRange, Writable
class Valve(Writable):
value = Parameter('state of valve (open or close)', datatype=EnumType(open=1, close=0))
target = Parameter('open or close valve', datatype=EnumType(open=1, close=0))
def write_target(self, target):
self.value = target
class PulsedValve(Valve):
delay = Parameter('delay (time valve is open)', FloatRange(unit='s'), readonly=False)
_start = 0
def write_target(self, target):
if target:
self._start = time.time()
self.setFastPoll(True, 0.01)
self.value = target
else:
self.setFastPoll(False)
def doPoll(self):
super().doPoll()
if self._start:
if time.time() > self._start + self.delay:
self.write_target(0)
self.value = 0
self._start = 0
class Sensor(Writable):
""" Pressure and motor valve. """
value = Parameter('sensor value', datatype=FloatRange(), readonly=False)
def write_target(self, target):
self.value = target