fs: improve and fix implementation

+ introduce WrapControlledBy and fix HasControlledBy

this in a new module before mercury/triton have been fixed
This commit is contained in:
2025-06-27 14:47:21 +02:00
parent 8385461163
commit a3d0549199
5 changed files with 261 additions and 54 deletions

View File

@@ -4,34 +4,38 @@ Node('fs.psi.ch',
) )
Mod('T', Mod('T',
'frappy_psi.picontrol.PI2', 'frappy_psi.furnace.PI2',
'controlled Temperature on sample (2nd loop)', 'controlled Temperature on sample (2nd loop)',
input = 'T_sample', value = Param(unit='degC'),
output = 'T_reg', input_module = 'T_sam',
output_module = 'T_reg',
relais = 'relais', relais = 'relais',
p = 1.2, p = 1.2,
i = 0.005, i = 0.005,
) )
Mod('T_reg', Mod('T_reg',
'frappy_psi.picontrol.PI', 'frappy_psi.furnace.PIctrl',
'controlled Temperature on heater', 'controlled Temperature on heater',
input = 'T_htr', value = Param(unit='degC'),
output = 't_out', input_module = 'T_htr',
output_module = 't_out',
output_min = 0,
output_max = 100,
relais = 'relais', relais = 'relais',
p = 1, p = 1,
i = 0.003, i = 0.003,
) )
Mod('p_reg', #Mod('p_reg',
'frappy_psi.picontrol.PI', # 'frappy_psi.furnace.PI',
'controlled pressure', # 'controlled pressure',
input = 'p', # input_module = 'p',
output = 'p_out', # output_module = 't_out',
relais = 'relais', # relais = 'relais',
p = 1, # p = 1,
i = 0.005, # i = 0.005,
) # )
Mod('T_htr', Mod('T_htr',
'frappy_psi.ionopimax.CurrentInput', 'frappy_psi.ionopimax.CurrentInput',
@@ -39,11 +43,10 @@ Mod('T_htr',
addr = 'ai4', addr = 'ai4',
valuerange = (0, 1372), valuerange = (0, 1372),
value = Param(unit='degC'), value = Param(unit='degC'),
) )
Mod('T_sample', Mod('T_sam',
'frappy_psi.ionopimax.CurrentInput', 'frappy_psi.ionopimax.CurrentInput',
'sample temperature', 'sample temperature',
addr = 'ai3', addr = 'ai3',
@@ -58,10 +61,8 @@ Mod('T_extra',
addr = 'ai2', addr = 'ai2',
valuerange = (0, 1372), valuerange = (0, 1372),
value = Param(unit='degC'), value = Param(unit='degC'),
) )
Mod('T_wall', Mod('T_wall',
'frappy_psi.ionopimax.VoltageInput', 'frappy_psi.ionopimax.VoltageInput',
'furnace wall temperature', 'furnace wall temperature',
@@ -86,7 +87,7 @@ Mod('htr',
Mod('t_out', Mod('t_out',
'frappy_psi.bkpower.Output', 'frappy_psi.bkpower.Output',
'heater output', 'heater output',
p_value = 'p_out', # p_value = 'p_out',
io = 'htr_io', io = 'htr_io',
maxvolt = 50, maxvolt = 50,
maxcurrent = 2, maxcurrent = 2,
@@ -104,13 +105,14 @@ Mod('interlocks',
input = 'T_htr', input = 'T_htr',
wall_T = 'T_wall', wall_T = 'T_wall',
htr_T = 'T_htr', htr_T = 'T_htr',
main_T = 'T_sample', main_T = 'T_sam',
extra_T = 'T_extra', extra_T = 'T_extra',
htr = 't_out',
vacuum = 'p', vacuum = 'p',
relais = 'relais', relais = 'relais',
control = 'T', control = 'T',
wall_limit = 100, wall_limit = 60,
vacuum_limit = 0.1, vacuum_limit = 0.001,
) )
Mod('p', Mod('p',

193
frappy/ctrlby.py Normal file
View File

@@ -0,0 +1,193 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.datatypes import BoolType, EnumType, Enum
from frappy.core import Parameter, Attached
class WrapControlledBy:
"""mixin for modules with controlled_by
Two use cases:
1) on the implementation of a hardware module it is already known that
HasControlledBy is wanted. In this case, functionality to apply the
target to the hardware has to be implemented in method 'set_target'
class MyWritable(HasControlledBy, ...):
def set_target(self, target):
"apply target to HW"
# no supercall needed!
# do not override write_target !
2) a hardware module is already available, and we extend it with the
controlled_by stuff
class Enhanced(HasControlledBy, BaseWritable):
set_target = BaseWritable.write_target
# nothing else is needed.
"""
controlled_by = Parameter('source of target value', EnumType(members={'self': 0}), default=0)
target = Parameter() # make sure target is a parameter
inputCallbacks = ()
def register_input(self, name, deactivate_control):
"""register input
:param name: the name of the module (for controlled_by enum)
:param deactivate_control: a method on the input module to switch off control
called by <controller module>.initModule
"""
if not self.inputCallbacks:
self.inputCallbacks = {}
self.inputCallbacks[name] = deactivate_control
prev_enum = self.parameters['controlled_by'].datatype.export_datatype()['members']
# add enum member, using autoincrement feature of Enum
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
def write_controlled_by(self, modulename):
result = modulename
if modulename in ('self', self.name):
# inform the deactivate_control methods, that we have already switched
self.controlled_by = result = 'self'
for name, deactivate_control in self.inputCallbacks.items():
if name != modulename:
deactivate_control(modulename)
return result
def self_controlled(self):
"""method to change controlled_by to self
to be called from the write_target method
"""
if self.controlled_by != 0:
self.write_controlled_by('self')
def set_off(self):
"""to be overriden if the off state should be different than the default
on a FloatRange() the default value is 0
"""
self.self_controlled()
self.set_target_cby(self.parameters['target'].datatype.default)
def update_target(self, module, value):
"""update internal target value
as write_target would switch to manual mode, the controlling module
has to use this method to update the value
override and super call, if other actions are needed
"""
if self.controlled_by != module:
self.log.warning('UT %r %r', self.controlled_by, module)
deactivate_control = self.inputCallbacks.get(self.controlled_by)
if deactivate_control:
deactivate_control(module)
self.controlled_by = module
target = self.set_target_cby(value)
self.target = value if target is None else target
def write_target(self, target):
self.self_controlled()
return self.set_target_cby(target)
def set_target_cby(self, target):
return super().write_target(target)
class HasControlledBy(WrapControlledBy):
def set_target(self, value):
"""to be overridden for setting target of HW"""
raise NotImplementedError
def set_target_cby(self, value):
"""to be overridden in case this mixin is not added on top"""
return self.set_target(value)
class WrapOutputModule:
"""mixin for modules having an output module
this module will call the update_target method of an output module
"""
# mandatory=False: it should be possible to configure a module with fixed control
output_module = Attached(WrapControlledBy, mandatory=False)
control_active = Parameter('control mode', BoolType(), default=False)
target = Parameter() # make sure target is a parameter
def initModule(self):
super().initModule()
if self.output_module:
self.output_module.register_input(self.name, self.deactivate_control)
def write_control_active(self, value):
"""override with supercall if needed"""
out = self.output_module
if out:
if value:
if out.controlled_by != self.name:
# deactivate control an all modules controlling our output_module
out.write_controlled_by(self.name)
else:
if out.controlled_by == self.name:
out.set_off() # this sets out.controlled_by to 0 (=self)
def set_control_active(self, active):
"""to be overridden for switching hw control"""
self.control_active = active
def activate_control(self):
"""method to switch control_active on
to be called from the write_target method, with the target as argument
"""
self.write_control_active(True)
def deactivate_control(self, source=None):
"""called when another module takes over control
registered to be called from the controlled module(s)
"""
if self.control_active:
self.write_control_active(False)
self.log.warning(f'switched to manual mode by {source or self.name}')
def write_target(self, target):
self.write_control_active(True)
return self.set_target_out(target)
def set_target_out(self, target):
return super().write_target(target)
class HasOutputModule(WrapOutputModule):
def set_target(self, target):
"""to be overridden except for WrapOutputModule"""
raise NotImplementedError
def set_target_out(self, target):
return self.set_target(target)

View File

@@ -47,6 +47,7 @@ Mod('out',
from frappy.core import StringIO, Readable, Parameter, FloatRange, Writable, HasIO, BoolType from frappy.core import StringIO, Readable, Parameter, FloatRange, Writable, HasIO, BoolType
from frappy.ctrlby import HasControlledBy
# define the IO class # define the IO class
@@ -65,10 +66,10 @@ class Power(HasIO, Readable):
return volt*current return volt*current
class Output(HasIO, Writable): class Output(HasIO, HasControlledBy, Writable):
value = Parameter(datatype=FloatRange(0,100,unit='%'), default=0) value = Parameter(datatype=FloatRange(0,100,unit='%'), default=0)
target = Parameter(datatype=FloatRange(0,100,unit='%')) target = Parameter(datatype=FloatRange(0,100,unit='%'))
p_value = Parameter(datatype=FloatRange(0,100,unit='%'), default=0) p_value = Parameter('?', datatype=FloatRange(0,100,unit='%'), default=0)
maxvolt = Parameter('voltage at 100%',datatype=FloatRange(0,60,unit='V'),default=50,readonly=False) maxvolt = Parameter('voltage at 100%',datatype=FloatRange(0,60,unit='V'),default=50,readonly=False)
maxcurrent = Parameter('current at 100%',datatype=FloatRange(0,5,unit='A'),default=2,readonly=False) maxcurrent = Parameter('current at 100%',datatype=FloatRange(0,5,unit='A'),default=2,readonly=False)
output_enable = Parameter('control on/off', BoolType(), readonly=False) output_enable = Parameter('control on/off', BoolType(), readonly=False)
@@ -77,7 +78,7 @@ class Output(HasIO, Writable):
super().initModule() super().initModule()
self.write_output_enable(False) self.write_output_enable(False)
def write_target(self, target): def set_target(self, target):
self.write_output_enable(target != 0) self.write_output_enable(target != 0)
self.communicate(f'VOLT{round(max(8,(target)**0.5 * self.maxvolt)):03d}') self.communicate(f'VOLT{round(max(8,(target)**0.5 * self.maxvolt)):03d}')
self.communicate(f'CURR{round((target)**0.5* 10 * self.maxcurrent):03d}') self.communicate(f'CURR{round((target)**0.5* 10 * self.maxcurrent):03d}')

View File

@@ -20,9 +20,9 @@
"""interlocks for furnace""" """interlocks for furnace"""
from frappy.core import Module, Writable, Attached, Parameter, FloatRange, Readable,\ from frappy.core import Module, Writable, Attached, Parameter, FloatRange, Readable,\
BoolType, ERROR, IDLE BoolType, ERROR, IDLE, Command
from frappy.errors import ImpossibleError from frappy.errors import ImpossibleError
from frappy.mixins import HasControlledBy from frappy.ctrlby import WrapControlledBy
from frappy_psi.picontrol import PImixin from frappy_psi.picontrol import PImixin
from frappy_psi.convergence import HasConvergence from frappy_psi.convergence import HasConvergence
from frappy_psi.ionopimax import CurrentInput, LogVoltageInput from frappy_psi.ionopimax import CurrentInput, LogVoltageInput
@@ -86,12 +86,12 @@ class Interlocks(Writable):
self.value = False self.value = False
if self.control.control_active: if self.control.control_active:
self.log.error('switch control off %r', self.control.status) self.log.error('switch control off %r', self.control.status)
self.control.write_control_active(False) self.control.write_control_active(False)
self.control.status = ERROR, self._conditions
if self.htr and self.htr.target: if self.htr and self.htr.target:
self.htr.write_target(0) self.htr.write_target(0)
if self.relais and (self.relais.value or self.relais.target): if self.relais.value or self.relais.target:
self.relais.write_target(False) self.log.warning('switch off relais %r %r', self.relais.value, self.relais.target)
self.relais.write_target(False)
def read_status(self): def read_status(self):
conditions = [] conditions = []
@@ -112,35 +112,46 @@ class Interlocks(Writable):
return IDLE, '; '.join(conditions) return IDLE, '; '.join(conditions)
return ERROR, self._off_reason return ERROR, self._off_reason
@Command
def reset(self):
"""reset interlock after error
class PI(HasConvergence, PImixin): will fail if error conditions still apply
"""
self.write_target(1)
class PI(HasConvergence, PImixin, Writable):
input_module = Attached(Readable, 'the input module') input_module = Attached(Readable, 'the input module')
relais = Attached(Writable, 'the interlock relais', mandatory=False) relais = Attached(Writable, 'the interlock relais', mandatory=False)
def read_value(self): def read_value(self):
return self.input.value return self.input_module.value
def write_target(self, value): def read_status(self):
super().write_target(value) return self.input_module.status
def set_target(self, value):
super().set_target(value)
if self.relais: if self.relais:
if not self.relais.value or not self.relais.target:
self.log.warning('switch on relais %r %r', self.relais.value, self.relais.target)
self.relais.write_target(1) self.relais.write_target(1)
class TdkOutput(HasControlledBy, tdkpower.Output): class PIctrl(WrapControlledBy, PI):
pass pass
class BkOutput(HasControlledBy, bkpower.Output): class PI2(PI):
pass maxovershoot = Parameter('max. overshoot', FloatRange(0, 100, unit='%'), readonly=False, default=20)
def doPoll(self):
self.output_max = self.target * (1 + 0.01 * self.maxovershoot)
self.output_min = self.target * (1 - 0.01 * self.maxovershoot)
super().doPoll()
class PRtransmitter(CurrentInput): def write_target(self, target):
rawrange = (0.004, 0.02) if not self.control_active:
extendedrange = (0.0036, 0.021) self.output_module.write_target(target)
super().write_target(target)
class PKRgauge(LogVoltageInput):
rawrange = (1.82, 8.6)
valuerange = (5e-9, 1000)
extendedrange = (0.5, 9.5)
value = Parameter(unit='mbar')

View File

@@ -63,7 +63,7 @@ import math
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property
from frappy.lib import clamp from frappy.lib import clamp
from frappy.datatypes import LimitsType, EnumType, BoolType, FloatRange from frappy.datatypes import LimitsType, EnumType, BoolType, FloatRange
from frappy.newmixins import HasOutputModule from frappy.ctrlby import HasOutputModule
from frappy_psi.convergence import HasConvergence from frappy_psi.convergence import HasConvergence
@@ -75,12 +75,14 @@ class PImixin(HasOutputModule, Writable):
output_min = Parameter('min output', FloatRange(), default=0, readonly=False) output_min = Parameter('min output', FloatRange(), default=0, readonly=False)
output_max = Parameter('max output', FloatRange(), default=0, readonly=False) output_max = Parameter('max output', FloatRange(), default=0, readonly=False)
output_func = Parameter('output function', output_func = Parameter('output function',
EnumType(lin=0, square=1), readonly=False, default=0) EnumType(lin=0, square=1), readonly=False, value=0)
value = Parameter(unit='K') value = Parameter(unit='K')
_lastdiff = None _lastdiff = None
_lasttime = 0 _lasttime = 0
_get_range = None # a function get output range from output_module _get_range = None # a function get output range from output_module
_overflow = 0 _overflow = 0
_cvt2int = None
_cvt2ext = None
def initModule(self): def initModule(self):
super().initModule() super().initModule()
@@ -149,11 +151,10 @@ class PImixin(HasOutputModule, Writable):
if not value: if not value:
self.output_module.write_target(0) self.output_module.write_target(0)
def write_target(self, _): def set_target(self, value):
if not self.control_active: if not self.control_active:
self.activate_control() self.activate_control()
# unchecked! # unchecked!
class PI(HasConvergence, PImixin): class PI(HasConvergence, PImixin):
@@ -178,4 +179,3 @@ class PI2(PI):
if not self.control_active: if not self.control_active:
self.output.write_target(target) self.output.write_target(target)
super().write_target(target) super().write_target(target)