3 Commits

Author SHA1 Message Date
1bb869b43e capillary heater: heater is now a writable
The value should show the actual heater power, but
we do not know yet the address. Currently the value
is just equal to the maxheater parameter.
2025-11-27 17:55:52 +01:00
3ede9eb9f4 frappy_psi.logo: revised version after merging capillary heater
- check that dil5 still works!

Change-Id: Ibe98e64088f2f886888af170a1f38d699927eb58
2025-11-27 09:32:37 +01:00
f57400feb9 frappy_psi.picontrol: stop when switching to manual mode
Change-Id: I3ffb9a109fb3b04fbca06f5a72acbfbd19525aae
2025-11-27 07:49:29 +01:00
15 changed files with 498 additions and 1003 deletions

View File

@@ -1,22 +0,0 @@
Node('pdld_laser.psi.ch',
'PDLD laser',
interface = 'tcp://5000',
)
Mod('laser_io',
'frappy_psi.pdld.IO',
'laser IO',
uri='serial:///dev/ttyUSB0?baudrate=9600',
)
Mod('laser',
'frappy_psi.pdld.Laser',
'laser switch',
io='laser_io',
)
Mod('laser_power',
'frappy_psi.pdld.LaserPower',
'laser power',
io='laser_io',
)

View File

@@ -1,19 +0,0 @@
Node('bronkhorsttest.psi.ch',
'bronkhorst test',
'tcp://5000',
)
Mod('io',
'frappy_psi.bronkhorst.IO',
'bronkhorst communication',
uri='tcp://localhost:3005',
)
Mod('p',
'frappy_psi.bronkhorst.Controller',
'pressure controller',
io='io',
adr=128,
scale=18,
value=Param(unit='mbar')
)

View File

@@ -83,38 +83,37 @@ Mod('compressor',
)
Mod('p2',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'pressure after compressor',
io = 'logo',
addr ="VW0",
value = Param(unit='mbar'),
)
pollinterval=0.5,
)
Mod('p1',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'dump pressure',
io = 'logo',
addr ="VW28",
value = Param(unit='mbar'),
)
pollinterval=0.5,
)
Mod('p5',
'frappy_psi.logo.Value',
'frappy_psi.logo.Pressure',
'pressure after forepump',
io = 'logo',
addr ="VW4",
value = Param(unit='mbar'),
)
pollinterval = 0.5,
)
Mod('airpressure',
'frappy_psi.logo.DigitalValue',
'frappy_psi.logo.Comparator',
'Airpressure state',
io = 'logo',
addr ="V1024.7",
)
threshold = 500,
pollinterval = 0.5,
)
Mod('io_pfeiffer',
'frappy_psi.pfeiffer_new.PfeifferProtocol',

94
cfg/gas10ka_cfg.py Normal file
View File

@@ -0,0 +1,94 @@
Node('gas10ka.psi.ch',
'10kBar Gas pressure stick',
interface='tcp://5010',
)
Mod('io',
'frappy_psi.logo.IO',
'',
ip_address = "192.168.1.1",
tcap_client = 0x3000,
tsap_server = 0x2000
)
Mod('R_pt10k',
'frappy_psi.logo.Resistor',
'raw sensor value of T_p10k',
io = 'io',
addr = "VW0",
)
Mod('T_pt10k',
'frappy_psi.softcal.Sensor',
'temperature close to sample',
value=Param(unit='K'),
rawsensor='R_pt10k',
calcurve='pt10000e',
)
Mod('R_top',
'frappy_psi.logo.Resistor',
'raw sensor value of T_top',
io = 'io',
addr = "VW2",
)
Mod('T_top',
'frappy_psi.softcal.Sensor',
'capillary temperature at highest position',
value=Param(unit='K'),
rawsensor='R_top',
calcurve='pt1000e',
)
Mod('R_mid',
'frappy_psi.logo.Resistor',
'raw sensor value of T_mid',
io = 'io',
addr = "VW6",
)
Mod('T_mid',
'frappy_psi.softcal.Sensor',
'capillary temperature at mid position',
value=Param(unit='K'),
rawsensor='R_mid',
calcurve='pt1000e',
)
Mod('R_bot',
'frappy_psi.logo.Resistor',
'raw sensor value of T_bot',
io = 'io',
addr = "VW4",
)
Mod('T_bot',
'frappy_psi.softcal.Sensor',
'capillary temperature at lower position',
value=Param(unit='K'),
rawsensor='R_bot',
calcurve='pt1000e',
)
Mod('R_sam_cx',
'frappy_psi.logo.Resistor',
'sensor',
io = 'io',
addr = "VW16",
)
Mod('T_sam_cx',
'frappy_psi.softcal.Sensor',
'?',
value=Param(unit='K'),
rawsensor='R_sam_cx',
calcurve='X174785',
)
Mod('heater',
'frappy_psi.capillary_heater.Heater',
'the capillary heater',
io = 'io',
)

View File

@@ -1,16 +0,0 @@
Node('hcptest.psi.ch',
'high voltage supply test',
'tcp://5000',
)
Mod('io',
'frappy_psi.hcp.IO',
'hcp communication',
uri='serial:///dev/tty.usbserial-21440?baudrate=9600',
)
Mod('voltage',
'frappy_psi.hcp.Voltage',
'fug hcp 14-6500 voltage',
io='io',
)

View File

@@ -35,8 +35,8 @@ Mod('ts',
'frappy_psi.parmod.Converging',
'test for parmod',
unit='K',
read='th.value',
write='th.setsamp',
value_param='th.value',
target_param='th.setsamp',
meaning=['temperature', 20],
settling_time=20,
tolerance=1,

View File

@@ -1,37 +0,0 @@
Node('fibrestick.psi.ch',
'stick with laser fibre',
)
Mod('sea_stick',
'frappy_psi.sea.SeaClient',
'SEA stick connection',
config='fibre.stick',
service='stick',
)
Mod('ts',
'frappy_psi.sea.SeaReadable', '',
meaning=['temperature', 30],
io='sea_stick',
sea_object='tt',
json_file='ma11.config.json',
rel_paths=['ts'],
)
Mod('laser_io',
'frappy_psi.pdld.IO',
'laser IO',
uri='serial:///dev/serial/by-path/pci-0000:00:14.0-usb-0:4.4.4.2:1.0-port0?baudrate=9600',
)
Mod('laser',
'frappy_psi.pdld.Laser',
'laser switch',
io='laser_io',
)
Mod('laser_power',
'frappy_psi.pdld.LaserPower',
'laser power',
io='laser_io',
)

View File

@@ -1,17 +0,0 @@
Node('test_ips.psi.ch',
'ips test',
'tcp://5000',
)
Mod('io',
'frappy_psi.oiclassic.IPS_IO',
'',
uri='ma11-ts:3002',
)
Mod('B',
'frappy_psi.oiclassic.Field',
'magnetic field',
io='io',
target=Param(max=0.2),
)

View File

@@ -1,160 +0,0 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Anik Stark <anik.stark@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Bronkhorst flow or pressure regulators. Communication via ProPar protocol.
write command: :LnAd01PrTpData\r\n
read command: :LnAd04CopyPrTp\r\n (Ln = 06)
answer: :LnAd02CopyData\r\n
Ln: number of bytes (hex digits pairs) following
Ad: node address (3...120, always a reply if message is sent to node address 128)
Copy: values just to be copied by the reply (first and third digit < 8)
recommended practice: Use PrTp for Copy
Pr: Process number (<80, manual page 24pp)
Tp: Type + parameter number. Type: 00 byte, 20 int, 40 long/float, 60 string
for strings either 00 (for nul terminated) or the max. number of chars
has to be appended to the type
Data: length depending on type.
read command for direct communication: :06800401210120
Send values on a scale from 0-32000 (0-100%).
"""
from frappy.core import StringIO, HasIO, Readable, Writable, Drivable, Parameter, Property, \
FloatRange, BoolType, EnumType, IntRange, IDLE, BUSY
from frappy.errors import CommunicationFailedError
class IO(StringIO):
end_of_line = '\r\n' # hex: 0D0A
adr = 128
identification = [(f':07{adr:02X}047163716300', f':10{adr:02X}02716300.*')] # serial number
default_settings = {'baudrate': 38400}
def intpar(process, parameter):
return '06', f'{process:02X}{parameter|0x20:02X}'
def longpar(process, parameter):
return '08', f'{process:02X}{parameter|0x40:04X}'
MEASURE = intpar(1, 0)
SETPOINT = intpar(1, 1)
RAMP = intpar(1, 2)
CONTROL = longpar(114, 1)
class Sensor(HasIO, Readable):
ioClass = IO
value = Parameter('pressure', FloatRange())
scale = Property('scale factor', FloatRange(), default=1)
adr = Property('node adress', IntRange(0, 255))
def get_par(self, length, param, scale):
reply = self.communicate(f':{length}{self.adr:02X}04{param}{param}')
if reply[:11] != f':{length}{self.adr:02X}02{param}':
return CommunicationFailedError(f'bad reply: {reply}')
val = int(reply[11:14], 16) / 32000 * scale
return val
def read_value(self):
return self.get_par(*MEASURE, self.scale)
class Controller(Sensor, Writable):
def set_par(self, length, param, scale, value):
reply = self.communicate(f':{length}{self.adr:02X}01{param}{round(value/scale):04X}')
if reply[:8] != f':04{self.adr:02X}0000':
raise CommunicationFailedError(f'bad reply: {reply}')
return self.get_par(length, param, scale)
def read_target(self):
return self.get_par(*SETPOINT, self.scale)
def write_target(self, value):
val = value / self.scale * 32000
return self.set_par(*SETPOINT, self.scale, val)
class HasRamp(Drivable):
setpoint = Parameter('running setpoint', FloatRange())
ramp_enable = Parameter('enable ramp mode', BoolType())
ramp = Parameter('slope of ramp', FloatRange(1e-6, unit='mbar/min'))
tolerance = Property('tolerance for target vs. running setpoint', FloatRange(), default=1)
def read_target(self):
# overwrite Controller.read_target() as setpoint is running
return self.read_target
def write_target(self, target):
super().write_target(target)
self.status = BUSY, 'ramping'
def read_setpoint(self):
return super().read_target()
def read_ramp(self):
if abs(self.read_setpoint() - self.target) < self.tolerance:
self.status = IDLE, ''
def write_ramp(self, ramp):
if self.ramp_enable:
time = min(self.scale / ramp, 3000)
return self.set_par(*RAMP, (60 / 0.1), time)
def write_ramp_enable(self, flag):
if flag:
self.write_ramp(self.ramp)
else:
self.set_par(*RAMP, (60 / 0.1), 0)
class HasControlMode():
control_active = Parameter('control mode active', BoolType())
control = Property('control mode', EnumType(manual=4, loop=11))
output = Parameter('valve output', FloatRange(), readonly=False)
def write_control(self, value):
if self.control_active:
val = self.control.get(value, 4)
return self.set_par(*CONTROL, 1, val)
def write_output(self, value):
scale = (2**24 - 1) / 100
self.set_par(*CONTROL, scale, value)
class ControllerRamp(HasRamp, Controller):
pass
class ControllerControlMode(HasControlMode, Controller):
pass
class ControllerRampControlMode(HasRamp, HasControlMode, Controller):
pass

View File

@@ -0,0 +1,53 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.core import Writable, BoolType, FloatRange, HasIO, Property, StringType, Parameter
from frappy_psi.logo import Sensor
class Heater(Sensor):
addr = 'VW10' # heater readback address
scale = 0.1
switch_addr = Property('address for switch', StringType(), default='V0.1')
enable_addr = Property('address for enable', StringType(), default='V0.0')
maxheater_addr = Property('address for target', StringType(), default='VW10')
value = Parameter(unit='%')
switch = Parameter('heater is enabled', BoolType())
enable = Parameter('heater enabled', BoolType(), readonly=False)
maxheater = Parameter('max. heater power', FloatRange(unit='%'), readonly=False)
def read_switch(self):
return self.get_vm_value(self.switch_addr)
def read_enable(self):
return self.get_vm_value(self.enable_addr)
def write_enable(self, value):
self.set_vm_value(self.enable_addr, value)
return self.read_enable()
def read_maxheater(self):
return self.get_vm_value(self.maxheater_addr, self.scale)
def write_maxheater(self, value):
self.io.set_vm_value(self.maxheater_addr, value, self.scale)
return self.read_maxheater()

290
frappy_psi/ips_classic.py Normal file
View File

@@ -0,0 +1,290 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments mercury IPS power supply"""
import time
from frappy.core import Parameter, EnumType, FloatRange, BoolType, IntRange, Property, Module
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError
from frappy_psi.magfield import Magfield, SimpleMagfield, Status
from frappy_psi.mercury import MercuryChannel, off_on, Mapped
from frappy.states import Retry
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3)
hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
RTOZ=Action.run_to_zero, CLMP=Action.clamped)
CURRENT_CHECK_SIZE = 2
class Field(Magfield):
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0)
working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='$'), readonly=False)
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
__switch_fixed_until = 0
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def doPoll(self):
super().doPoll()
self.read_current()
def initialReads(self):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
self.switch_heater = self.query('DEV::PSU:SIG:SWHT', off_on)
super().initialReads()
def read_value(self):
return self.query('DEV::PSU:SIG:FLD')
def read_ramp(self):
return self.query('DEV::PSU:SIG:RFST')
def write_ramp(self, value):
return self.change('DEV::PSU:SIG:RFST', value)
def read_action(self):
return self.query('DEV::PSU:ACTN', hold_rtoz_rtos_clmp)
def write_action(self, value):
return self.change('DEV::PSU:ACTN', value, hold_rtoz_rtos_clmp)
def read_atob(self):
return self.query('DEV::PSU:ATOB')
def read_voltage(self):
return self.query('DEV::PSU:SIG:VOLT')
def read_working_ramp(self):
return self.query('DEV::PSU:SIG:RFLD')
def read_setpoint(self):
return self.query('DEV::PSU:SIG:FSET')
def set_and_go(self, value):
self.setpoint = self.change('DEV::PSU:SIG:FSET', value)
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def start_ramp_to_target(self, sm):
# if self.action != Action.hold:
# assert self.write_action(Action.hold) == Action.hold
# return Retry
self.set_and_go(sm.target)
sm.try_cnt = 5
return self.ramp_to_target
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
def read_value(self):
current = self.query('DEV::PSU:SIG:FLD')
if self.switch_heater == self.switch_heater.on:
self.__persistent_field = current
self.forced_persistent_field = False
return current
pf = self.query('DEV::PSU:SIG:PFLD')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def _check_adr(self, adr):
"""avoid complains about bad slot"""
if adr.startswith('DEV:PSU.M'):
return
super()._check_adr(adr)
def read_current(self):
current = self.query('DEV::PSU:SIG:CURR')
if self.atob:
return current / self.atob
return 0
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
value = self.query('DEV::PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < self.__switch_fixed_until:
self.log.debug('correct fixed switch time')
# probably switch heater was changed, but IPS reply is not yet updated
if self.switch_heater:
self.switch_on_time = time.time()
else:
self.switch_off_time = time.time()
return self.switch_heater
return value
def read_wait_switch_on(self):
return self.query('DEV::PSU:SWONT') * 0.001
def read_wait_switch_off(self):
return self.query('DEV::PSU:SWOFT') * 0.001
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.__switch_fixed_until = time.time() + 10
self.log.debug('switch time fixed for 10 sec')
result = self.change('DEV::PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check
return result
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('DEV::PSU:SIG:PFLD')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry

View File

@@ -21,8 +21,9 @@ import sys
from time import monotonic
from ast import literal_eval
import snap7
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, Communicator
from frappy.datatypes import StringType, BoolType, IntRange, NoneOr, Int32
from frappy.errors import CommunicationFailedError, ConfigError
from threading import RLock
@@ -80,11 +81,16 @@ class IO(Communicator):
class LogoMixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return literal_eval(self.io.communicate(vm_address))
def get_vm_value(self, addr, scale=None):
if scale is None:
return int(self.io.communicate(addr))
return float(self.io.communicate(addr)) * scale
def set_vm_value(self, vm_address, value):
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}'))
def set_vm_value(self, addr, value, scale=None):
if scale is None:
return int(self.io.communicate(f'{addr} {value}'))
reply = self.io.communicate(f'{addr} {round(value / scale)}')
return int(reply) * scale
class DigitalActuator(LogoMixin, Writable):
@@ -219,195 +225,47 @@ class DelayedActuator(DigitalActuator, Drivable):
self._pulse_end = now + delay
class Value(LogoMixin, Readable):
class Sensor(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
def read_value(self):
return self.get_vm_value(self.addr)
return self.get_vm_value(self.addr, self.scale)
def read_status(self):
return IDLE, ''
class DigitalValue(Value):
value = Parameter('airpressure state', datatype=BoolType())
class AnalogOutput(Sensor, Writable):
output_addr = Property('VM address output', datatype=StringType(), default='')
def checkProperties(self):
super().checkProperties()
if not self.output_addr:
self.output_addr = self.addr
def read_value(self):
return self.get_vm_value(self.addr, self.scale)
def write_target(self, target):
return self.set_vm_value(self.output_addr, target, self.scale)
# TODO: the following classes are too specific, they have to be moved
class Pressure(LogoMixin, Drivable):
vm_address = Property('VM address', datatype=StringType())
class Pressure(Sensor):
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
# pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Airpressure(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('airpressure state', datatype=BoolType())
# pollinterval = 0.5
def read_value(self):
if (self.get_vm_value(self.vm_address) > 500):
return 1
else:
return 0
def read_status(self):
return IDLE, ''
class Valve(LogoMixin, Drivable):
vm_address_input = Property('VM address input', datatype=StringType())
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Value state', datatype=BoolType())
_remaining_tries = None
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
self.set_vm_value(self.vm_address_output, target)
self._remaining_tries = 5
self.status = BUSY, 'switching'
self.setFastPoll(True, 0.5)
def read_status(self):
self.log.debug('read_status')
value = self.read_value()
self.log.debug('value %d target %d', value, self.target)
if value != self.target:
if self._remaining_tries is None:
self.target = self.read_value()
return IDLE, ''
self._remaining_tries -= 1
if self._remaining_tries < 0:
self.setFastPoll(False)
return ERROR, 'too many tries to switch'
self.set_vm_value(self.vm_address_output, self.target)
return BUSY, 'switching (try again)'
self.setFastPoll(False)
return IDLE, ''
class FluidMachines(LogoMixin, Drivable):
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Valve state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
class Resistor(Sensor):
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('Heater target', datatype=IntRange())
value = Parameter('Heater Param', datatype=IntRange())
class Comparator(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
value = Parameter('airpressure state', datatype=BoolType())
threshold = Property('threshold for True', FloatRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class controlHeater(LogoMixin, Writable):
vm_address = Property('VM address on switch', datatype=StringType())
target = Parameter('Heater state', datatype=BoolType())
value = Parameter('Heater state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_on)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''
class safetyfeatureState(LogoMixin, Readable):
vm_address = Property('VM address state', datatype=StringType())
value = Parameter('safety Feature state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class safetyfeatureParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class comparatorgekoppeltParam(LogoMixin, Writable):
vm_address_1 = Property('VM address output', datatype=StringType())
vm_address_2 = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address_1)
def write_target(self, target):
self.set_vm_value(self.vm_address_1, target)
return self.set_vm_value(self.vm_address_2, target)
def read_status(self):
return IDLE, ''
return self.get_vm_value(self.addr, self.scale) > self.threshold

View File

@@ -61,7 +61,6 @@ class SimpleMagfield(HasStates, Drivable):
'trained field (positive)',
TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')),
readonly=False, default=(0, 0))
trainmode = Parameter('train mode flag', EnumType(off=0, on=1, undef=2), default=2)
wait_stable_field = Parameter(
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31)
ramp_tmo = Parameter(
@@ -150,24 +149,10 @@ class SimpleMagfield(HasStates, Drivable):
"""
raise NotImplementedError
def handle_train_mode(self):
self.log.info('handle %r %r', self.trained, self.value)
if self.trained[0] < self.value < self.trained[1]:
trainmode = 'off'
else:
trainmode = 'on'
if self.value > 0:
self.trained = (self.trained[0], max(self.trained[1], self.value))
else:
self.trained = (min(self.trained[0], self.value), self.trained[1])
if self.trainmode != trainmode:
self.write_trainmode(trainmode)
@status_code(BUSY, 'ramping field')
def ramp_to_target(self, sm):
if sm.init:
self.init_progress(sm, self.value)
self.handle_train_mode()
# Remarks: assume there is a ramp limiting feature
if abs(self.value - sm.target) > self.tolerance:
if self.get_progress(sm, self.value) > self.ramp_tmo:
@@ -181,15 +166,11 @@ class SimpleMagfield(HasStates, Drivable):
def stabilize_field(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_field:
return Retry
self.handle_train_mode()
return self.final_status()
def read_workingramp(self):
return self.ramp
def write_trainmode(self, value):
"""overwrite when needed"""
class Magfield(SimpleMagfield):
status = Parameter(datatype=StatusType(Status))
@@ -354,7 +335,6 @@ class Magfield(SimpleMagfield):
@status_code(Status.RAMPING)
def ramp_to_target(self, sm):
self.handle_train_mode()
dif = abs(self.value - sm.target)
if sm.init:
sm.stabilize_start = 0 # in case current is already at target
@@ -373,7 +353,6 @@ class Magfield(SimpleMagfield):
@status_code(Status.STABILIZING)
def stabilize_field(self, sm):
self.handle_train_mode()
if sm.now < sm.stabilize_start + self.wait_stable_field:
return Retry
return self.check_switch_off

View File

@@ -1,514 +0,0 @@
#!/usr/bin/env python
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# Anik Stark <anik.stark@psi.ch>
# *****************************************************************************
"""older generation (classic) oxford instruments"""
import time
import re
from frappy.core import Parameter, Property, EnumType, FloatRange, IntRange, BoolType, \
StringIO, HasIO, Readable, Writable, Drivable, IDLE, WARN, ERROR
from frappy.lib import formatStatusBits
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError, CommunicationFailedError
from frappy_psi.magfield import Magfield, Status
from frappy.states import Retry
def bit(x, pos):
return bool(x & (1 << pos))
class Base(HasIO):
def query(self, cmd, scale=None):
reply = self.communicate(cmd)
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply} to command {cmd}')
try:
return float(reply[1:]) * scale
except Exception:
pass
def change(self, cmd, value, scale=None):
try:
self.communicate('C3')
reply = self.communicate(f'{cmd}{round(value / scale)}')
if reply[0] != cmd[0]:
raise CommunicationFailedError(f'bad reply: {reply}')
finally:
self.communicate('C0')
def command(self, *cmds):
try:
self.communicate('C3')
for cmd in cmds:
self.communicate(cmd)
finally:
self.communicate('C0')
class IPS_IO(StringIO):
"""oxford instruments power supply IPS120-10"""
end_of_line = '\r'
identification = [('V', r'IPS120-10.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=4)
status_map = {'0': (IDLE, ''),
'1': (ERROR, 'quenched'),
'2': (ERROR, 'overheated'),
'4': (WARN, 'warming up'),
'8': (ERROR, '')
}
limit_map = {'0': (IDLE, ''),
'1': (WARN, 'on positive voltage limit'),
'2': (WARN, 'on negative voltage limit'),
'4': (ERROR, 'outside negative current limit'),
'8': (ERROR, 'outside positive current limit')
}
class Field(Base, Magfield):
""" read commands:
R1 measured power supply voltage (V)
R7 demand field (output field) (T)
R8 setpoint (target field) (T)
R9 sweep field rate (T/min)
R18 persistent field (T)
X Status
control commands:
A set activity
T set field sweep rate
H set switch heater
J set target field """
ioClass = IPS_IO
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='T'), readonly=False)
wait_switch_on = Parameter(default=15)
wait_switch_off = Parameter(default=15)
wait_stable_field = Parameter(default=10)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
switch_heater = Parameter('turn switch heater on/off', EnumType(off=0, on=1, forced=2), default=0)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
_status = '00'
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def doPoll(self):
super().doPoll()
self.read_current()
def initialReads(self):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
super().initialReads()
def read_value(self):
if self.switch_heater:
self.__persistent_field = self.query('R7')
self.forced_persistent_field = False
self._field_mismatch = False
return self.__persistent_field
pf = self.query('R18')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def read_ramp(self):
return self.query('R9')
def write_ramp(self, value):
self.change('T', value)
return self.read_ramp()
def write_action(self, value):
self.change('A', int(value))
self.read_status()
def read_voltage(self):
return self.query('R1')
def read_setpoint(self):
return self.query('R8')
def read_current(self):
return self.query('R7')
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
self.read_status()
return self.switch_heater
def read_status(self):
status = self.communicate('X')
match = re.match(r'X(\d\d)A(\d)C\dH(\d)M\d\dP\d\d', status)
if match is None:
raise CommunicationFailedError(f'unexpected status: {status}')
self._status = match.group(1)
self.action = int(match.group(2))
self.switch_heater = match.group(3) == '1'
if self._status[0] != '0':
self._state_machine.stop()
return status_map.get(self._status[0], (ERROR, f'bad status: {self._status}'))
if self._status[1] != '0':
return limit_map.get(self._status[1], (ERROR, f'bad status: {self._status}')) # need to stop sm too?
return super().read_status()
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.log.debug('switch time fixed for 10 sec')
self.change('H', int(value))
#return result
return int(value)
def set_and_go(self, value):
self.change('J', value)
self.setpoint = self.read_current()
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('R18')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry
def write_trainmode(self, value):
self.change('M', '5' if value == 'off' else '1')
class ILM_IO(StringIO):
"""oxford instruments level meter ILM200"""
end_of_line = '\r'
identification = [('V', r'ILM200.*')] # instrument type and software version
default_settings = {'baudrate': 9600}
timeout = 5
class Level(Base, Readable):
""" X code: XcccSuuvvwwRzz
c: position corresponds to channel 1, 2, 3
possible values in each position are 0, 1, 2, 3, 9
vv, uu, ww: channel status for channel 1, 2, 3 respectively, 2 bits each
zz: relay status """
ioClass = ILM_IO
value = Parameter('level', datatype=FloatRange(unit='%'))
CHANNEL = None
X_PATTERN = re.compile(r'X(\d)(\d)(\d)S([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})R\d\d$')
MEDIUM = None
_statusbits = None
def read_value(self):
return self.query(f'R{self.CHANNEL}', 0.1)
def write_fast(self, fast):
self.command(f'T{self.CHANNEL}' if fast else f'S{self.CHANNEL}')
def get_status(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
if statuslist[self.CHANNEL] == '9':
return ERROR, f'error on {self.MEDIUM} level channel (not connected?)'
if (statuslist[self.CHANNEL] == '1') != (self.MEDIUM == 'N2'):
# '1': channel is used for N2
return ERROR, f'{self.MEDIUM} level channel not configured properly'
self._statusbits = int(statuslist[self.CHANNEL + 3], 16)
return None
return ERROR, f'bad status message {reply}'
class HeLevel(Level):
fast = Parameter('switching fast/slow', datatype=BoolType(), readonly=False)
CHANNEL = 1
MEDIUM = 'He'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, formatStatusBits(self._statusbits, ['meas', 'fast', 'slow'])
class N2Level(Level):
ioClass = ILM_IO
value = Parameter('N2 level', FloatRange(unit='%'))
CHANNEL = 2
MEDIUM = 'N2'
def read_status(self):
status = self.get_status()
if status is not None:
return status
return IDLE, ''
VALVE_MAP = {'01': 'V9',
'02': 'V8',
'03': 'V7',
'04': 'V11A',
'05': 'V13A',
'06': 'V13B',
'07': 'V11B',
'08': 'V12B',
'09': 'He4 rotary pump',
'10': 'V1',
'11': 'V5',
'12': 'V4',
'13': 'V3',
'14': 'V14',
'15': 'V10',
'16': 'V2',
'17': 'V2A He4',
'18': 'V1A He4',
'19': 'V5A He4',
'20': 'V4A He4',
'21': 'V3A He4',
'22': 'roots pump',
'23': 'unlabeled pump',
'24': 'He3 rotary pump',
}
class IGH_IO(StringIO):
"""oxford instruments dilution gas handling Kelvinox IGH"""
end_of_line = '\r'
identification = [('V', r'IGH.*')]
default_settings = {'baudrate': 9600}
X_PATTERN = re.compile(r'X(\d)A\dC\dP([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})([0-9A-F]{2})S[0-9A-F]O\dE\d$')
statusbits = 0
ini_valves = 0
def doPoll(self):
reply = self.communicate('X')
match = self.X_PATTERN.match(reply)
if match:
statuslist = match.groups()
# REVISE THE METHOD FROM HERE ON
# CHECK JUPYTER NOTEBOOK
# a hex digit indicating if motorized valves are still initializing
self.ini_valves = int(statuslist[0], 16)
class Valve(Base, Writable):
ioClass = IGH_IO
target = Parameter('open or close valve', EnumType(open=1, close=0))
addr = Property('valve number', IntRange(1,24))
def write_target(self, val):
# open: 2N, close: 2N + 1
self.change('P', (2 * self.addr + 1 - int(val)), 1)
def read_status(self):
# CHECK
status = self.io.statusbits & (1 << self.addr)
if status is not None:
return status
return IDLE, ''
class PulsedValve(Valve):
delay = Parameter('time valve is open', FloatRange(unit='s'))
_start = 0
def write_target(self, val):
if val:
self._start = time.time()
self.setFastPoll(True, 0.01)
else:
self.setFastPoll(False)
self.change('P', (2 * self.addr + 1 - int(val)), 1)
def doPoll(self):
super().doPoll()
if self._start:
if time.time() > self._start + self.delay:
self.write_target(False) # turn valve off
#self.setFastPoll(False)
self._start = 0
class MotorValve(Base, Drivable):
pass
class Pressure(Base, Readable):
pass
class Heater(Base, Writable):
pass
class N2Sensor():
# ask Markus
pass
class Pump(Base, Writable):
pass

View File

@@ -61,7 +61,7 @@ example cfg:
import time
import math
import numpy as np
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, WARN, Property
from frappy.lib import clamp, merge_status
from frappy.datatypes import LimitsType, EnumType, FloatRange
from frappy.errors import SECoPError
@@ -296,6 +296,13 @@ class PI(HasConvergence, PImixin):
super().write_target(target)
self.convergence_start()
def deactivate_control(self, source=None):
super().deactivate_control(source)
if self.isBusy():
self.convergence_state.stop_status = (
WARN, f'switched to manual mode by {source or self.name}')
self.convergence_state.start(self.convergence_interrupt)
# unchecked!
class PI2(PI):