fs (and other furnaces): fixes on interlock

- try to make interlock right
- merge status where ever possbile
This commit is contained in:
2025-07-07 16:05:27 +02:00
parent 1e73440149
commit cf151dd324
5 changed files with 143 additions and 57 deletions

View File

@@ -10,7 +10,6 @@ Mod('T',
meaning = ['temperature', 30], meaning = ['temperature', 30],
input_module = 'T_sam', input_module = 'T_sam',
output_module = 'T_reg', output_module = 'T_reg',
relais = 'relais',
p = 1.2, p = 1.2,
i = 0.005, i = 0.005,
) )
@@ -23,7 +22,6 @@ Mod('T_reg',
output_module = 't_out', output_module = 't_out',
output_min = 0, output_min = 0,
output_max = 100, output_max = 100,
relais = 'relais',
p = 1, p = 1,
i = 0.003, i = 0.003,
) )
@@ -33,7 +31,6 @@ Mod('T_reg',
# 'controlled pressure', # 'controlled pressure',
# input_module = 'p', # input_module = 'p',
# output_module = 't_out', # output_module = 't_out',
# relais = 'relais',
# p = 1, # p = 1,
# i = 0.005, # i = 0.005,
# ) # )
@@ -50,7 +47,7 @@ Mod('T_htr',
Mod('T_sam', Mod('T_sam',
'frappy_psi.furnace.PRtransmitter', 'frappy_psi.furnace.PRtransmitter',
'sample temperature', 'sample temperature',
addr = 'ai3', addr = 'ai2',
valuerange = (0, 1372), valuerange = (0, 1372),
value = Param(unit='degC'), value = Param(unit='degC'),
@@ -59,7 +56,7 @@ Mod('T_sam',
Mod('T_extra', Mod('T_extra',
'frappy_psi.furnace.PRtransmitter', 'frappy_psi.furnace.PRtransmitter',
'extra temperature', 'extra temperature',
addr = 'ai2', addr = 'ai3',
valuerange = (0, 1372), valuerange = (0, 1372),
value = Param(unit='degC'), value = Param(unit='degC'),
) )
@@ -100,13 +97,14 @@ Mod('relais',
addr = 'o2', addr = 'o2',
) )
Mod('interlocks', Mod('interlock',
'frappy_psi.furnace.Interlocks', 'frappy_psi.furnace.Interlocks',
'interlock parameters', 'interlock parameters',
input = 'T_htr', input = 'T_htr',
wall_T = 'T_wall', wall_T = 'T_wall',
htr_T = 'T_htr', htr_T = 'T_htr',
main_T = 'T_sam', main_T = 'T_sam',
reg_T = 'T_reg',
extra_T = 'T_extra', extra_T = 'T_extra',
htr = 't_out', htr = 't_out',
vacuum = 'p', vacuum = 'p',

View File

@@ -82,8 +82,11 @@ class WrapControlledBy:
on a FloatRange() the default value is 0 on a FloatRange() the default value is 0
""" """
self.self_controlled() zero = self.parameters['target'].datatype.default
self.internal_set_target(self.parameters['target'].datatype.default) try:
self.internal_set_target(zero)
except Exception as e:
self.target = zero
def update_target(self, module, value): def update_target(self, module, value):
"""update internal target value """update internal target value
@@ -94,7 +97,6 @@ class WrapControlledBy:
override and super call, if other actions are needed override and super call, if other actions are needed
""" """
if self.controlled_by != module: if self.controlled_by != module:
self.log.warning('UT %r %r', self.controlled_by, module)
deactivate_control = self.inputCallbacks.get(self.controlled_by) deactivate_control = self.inputCallbacks.get(self.controlled_by)
if deactivate_control: if deactivate_control:
deactivate_control(module) deactivate_control(module)

View File

@@ -80,10 +80,9 @@ class HasConvergence(Drivable):
self.read_status() self.read_status()
def read_status(self): def read_status(self):
try: if hasattr(super(), 'read_status'):
return merge_status(super().read_status(), self.convergence_state.status) return merge_status(super().read_status(), self.convergence_state.status)
except AttributeError: return self.convergence_state.status
return self.convergence_state.status # no super().read_status
def convergence_min_slope(self, dif): def convergence_min_slope(self, dif):
"""calculate minimum expected slope""" """calculate minimum expected slope"""

View File

@@ -23,6 +23,7 @@ from frappy.core import Module, Writable, Attached, Parameter, FloatRange, Reada
BoolType, ERROR, IDLE, Command, StringType BoolType, ERROR, IDLE, Command, StringType
from frappy.errors import ImpossibleError from frappy.errors import ImpossibleError
from frappy.ctrlby import WrapControlledBy from frappy.ctrlby import WrapControlledBy
from frappy.lib import clamp, merge_status
from frappy_psi.picontrol import PImixin from frappy_psi.picontrol import PImixin
from frappy_psi.convergence import HasConvergence from frappy_psi.convergence import HasConvergence
from frappy_psi.ionopimax import CurrentInput, LogVoltageInput from frappy_psi.ionopimax import CurrentInput, LogVoltageInput
@@ -40,6 +41,7 @@ class Interlocks(Writable):
main_T = Attached(Readable, 'the main temperature') main_T = Attached(Readable, 'the main temperature')
extra_T = Attached(Readable, 'the extra temperature') extra_T = Attached(Readable, 'the extra temperature')
control = Attached(Module, 'the control module') control = Attached(Module, 'the control module')
reg_T = Attached(Module, 'an other controlled module', mandatory = False)
htr = Attached(Module, 'the heater module', mandatory=False) htr = Attached(Module, 'the heater module', mandatory=False)
relais = Attached(Writable, 'the interlock relais', mandatory=False) relais = Attached(Writable, 'the interlock relais', mandatory=False)
flowswitch = Attached(Readable, 'the flow switch', mandatory=False) flowswitch = Attached(Readable, 'the flow switch', mandatory=False)
@@ -52,13 +54,19 @@ class Interlocks(Writable):
main_T_limit = Parameter('maximum main temperature', FloatRange(0, unit='degC'), main_T_limit = Parameter('maximum main temperature', FloatRange(0, unit='degC'),
default = 530, readonly = False) default = 530, readonly = False)
extra_T_limit = Parameter('maximum extra temperature', FloatRange(0, unit='degC'), extra_T_limit = Parameter('maximum extra temperature', FloatRange(0, unit='degC'),
default = 530, readonly = False) default = 530, readonly = False)
disabled_checks = Parameter('checks to disable', StringType(), disabled_checks = Parameter('checks to disable', StringType(),
value = '', readonly = False) value = '', readonly = False)
autoreset = Parameter('flag: reset on writing target', BoolType(),
value = False, readonly = False)
_off_reason = None # reason triggering interlock _off_reason = None # reason triggering interlock
_conditions = '' # summary of reasons why locked now _violations = '' # summary of reasons why locked now
_all_violations = '' # violations disregarding disabled_checks
_sensor_checks = () _sensor_checks = ()
_controllers = ()
_disable_flow_check = False
__inside_switch_off = False
SENSOR_MAP = { SENSOR_MAP = {
'wall_T': 'wall_limit', 'wall_T': 'wall_limit',
@@ -68,14 +76,53 @@ class Interlocks(Writable):
'vacuum': 'vacuum_limit', 'vacuum': 'vacuum_limit',
} }
def initModule(self):
super().initModule()
self._controllers = [self.control]
if self.reg_T:
self._controllers .append(self.reg_T)
for mod in self._controllers:
mod.callback_func = self.callback_func
def doPoll(self):
self.read_value() # this includes read_status
if self._violations:
if self.relais.value or any(c.control_active for c in self._controllers) or self.htr.target:
self.switch_off()
def callback_func(self, query=False, reset=False):
"""fujnction to be called from controlling modules
query = True: return violations
reset = True: try to reset
default: try to reset if autoreset is on
"""
if not self.read_value():
if not query:
if self.__inside_switch_off:
return 'switch_off'
if self.autoreset or reset:
if not self._violations and not reset:
self.log.warning('reset interlock')
self.reset()
else:
if self._violations:
return self._violations
return f'need reset after {self._off_reason}'
return self._violations
def read_value(self):
self.read_status()
return self.value
def write_target(self, value): def write_target(self, value):
if value: if value:
self.read_status() self.read_status()
if self._conditions: if self._violations:
raise ImpossibleError('not ready to start') raise ImpossibleError(f'not ready to start: {self._violations}')
self._off_reason = None self._off_reason = None
self.value = True self.value = True
elif self.value: elif self.value and not value:
self.switch_off() self.switch_off()
self._off_reason = 'switched off' self._off_reason = 'switched off'
self.value = False self.value = False
@@ -83,12 +130,20 @@ class Interlocks(Writable):
def switch_off(self): def switch_off(self):
if self.value: if self.value:
self._off_reason = self._conditions self._off_reason = self._violations
self.value = False self.value = False
if self.control.control_active: self.__inside_switch_off = True
self.log.error('switch control off %r', self.control.status) try:
self.control.write_control_active(False) for mod in self._controllers:
if mod.control_active:
try:
mod.write_control_active(False)
except Exception as e:
self.log.warning('can not shut %s', mod.name)
finally:
del self.__inside_switch_off
if self.htr and self.htr.target: if self.htr and self.htr.target:
self.log.info('switch heater off')
self.htr.write_target(0) self.htr.write_target(0)
if self.relais.value or self.relais.target: if self.relais.value or self.relais.target:
self.log.warning('switch off relais %r %r', self.relais.value, self.relais.target) self.log.warning('switch off relais %r %r', self.relais.value, self.relais.target)
@@ -99,26 +154,29 @@ class Interlocks(Writable):
self._sensor_checks = [] self._sensor_checks = []
for att, limitname in self.SENSOR_MAP.items(): for att, limitname in self.SENSOR_MAP.items():
obj = getattr(self, att) obj = getattr(self, att)
if obj and obj.name not in disabled: if obj:
self.log.info('info %r %r %r', att, obj.name, disabled) self._sensor_checks.append((obj, limitname, obj.name in disabled))
self._sensor_checks.append((obj, limitname)) self._disable_flow_check = 'flow' in disabled
def get_violations(self):
violations = {False: [], True: []}
if self.flowswitch and self.flowswitch.value == 0:
violations[seld._disable_flow_check].append('no cooling water')
for sensor, limitname, disabled in self._sensor_checks:
if sensor.status[0] >= ERROR:
violations[disabled].append(f'error at {sensor.name}: {sensor.status[1]}')
continue
if sensor.value > getattr(self, limitname):
violations[disabled].append(f'above {sensor.name} limit')
continue
self._violations = ', '.join(violations[False])
self._all_violations = ', '.join(violations[False] + violations[True])
return violations[False]
def read_status(self): def read_status(self):
conditions = [] self.get_violations()
if self.flowswitch and self.flowswitch.value == 0:
conditions.append('no cooling water')
for sensor, limitname in self._sensor_checks:
if sensor.value > getattr(self, limitname):
conditions.append(f'above {sensor.name} limit')
if sensor.status[0] >= ERROR:
conditions.append(f'error at {sensor.name}: {sensor.status[1]}')
break
self._conditions = ', '.join(conditions)
if conditions and (self.control.control_active or self.htr.target):
self.switch_off()
if self.value: if self.value:
return IDLE, '; '.join(conditions) return IDLE, self._all_violations
return ERROR, self._off_reason return ERROR, self._off_reason
@Command @Command
@@ -132,20 +190,37 @@ class Interlocks(Writable):
class PI(HasConvergence, PImixin, Writable): class PI(HasConvergence, PImixin, Writable):
input_module = Attached(Readable, 'the input module') input_module = Attached(Readable, 'the input module')
relais = Attached(Writable, 'the interlock relais', mandatory=False) callback_func = None
def read_value(self): def read_value(self):
return self.input_module.value return self.input_module.value
def read_status(self): def read_status(self):
return self.input_module.status violations = self.callback_func(query=True)
status = (ERROR, violations) if violations else super().read_status()
return merge_status(status, self.input_module.status)
def set_target(self, value): def write_target(self, target):
super().set_target(value) zero = self.parameters['target'].datatype.default
if self.relais: if target == zero:
if not self.relais.value or not self.relais.target: target = zero
self.log.warning('switch on relais %r %r', self.relais.value, self.relais.target) elif self.callback_func:
self.relais.write_target(1) errtxt = self.callback_func()
if errtxt == 'switch_off':
target = zero
elif errtxt:
msg = f'can not set target: {errtxt}'
self.log.warning(msg)
raise ImpossibleError(msg)
return super().write_target(target)
@Command
def reset(self):
"""reset interlock after error"""
if self.callback_func:
self.callback_func(reset=True)
else:
raise ConfigError('reset is not configured')
class PIctrl(WrapControlledBy, PI): class PIctrl(WrapControlledBy, PI):
@@ -156,13 +231,18 @@ class PI2(PI):
maxovershoot = Parameter('max. overshoot', FloatRange(0, 100, unit='%'), readonly=False, default=20) maxovershoot = Parameter('max. overshoot', FloatRange(0, 100, unit='%'), readonly=False, default=20)
def doPoll(self): def doPoll(self):
self.output_max = self.target * (1 + 0.01 * self.maxovershoot) maxdif = 0.01 * self.maxovershoot * self.target
self.output_min = self.target * (1 - 0.01 * self.maxovershoot) self.output_max = self.target + maxdif
super().doPoll() self.output_min = self.target - maxdif
if self.control_active:
super().doPoll()
def write_target(self, target): def write_target(self, target):
if not self.control_active: if not self.control_active:
self.output_module.write_target(target) # guess start value based on current T difference
maxdif = 0.01 * self.maxovershoot * target
self.output_module.target = target + clamp(
-maxdif, maxdif, self.output_module.value - self.input_module.value)
super().write_target(target) super().write_target(target)

View File

@@ -61,7 +61,7 @@ example cfg:
import time import time
import math import math
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property
from frappy.lib import clamp from frappy.lib import clamp, merge_status
from frappy.datatypes import LimitsType, EnumType, BoolType, FloatRange from frappy.datatypes import LimitsType, EnumType, BoolType, FloatRange
from frappy.ctrlby import HasOutputModule from frappy.ctrlby import HasOutputModule
from frappy_psi.convergence import HasConvergence from frappy_psi.convergence import HasConvergence
@@ -94,7 +94,6 @@ class PImixin(HasOutputModule, Writable):
if not self.control_active: if not self.control_active:
return return
out = self.output_module out = self.output_module
self.status = IDLE, 'controlling'
now = time.time() now = time.time()
deltat = clamp(0, now-self._lasttime, 10) deltat = clamp(0, now-self._lasttime, 10)
self._lasttime = now self._lasttime = now
@@ -115,6 +114,12 @@ class PImixin(HasOutputModule, Writable):
self._overflow = 0 self._overflow = 0
out.update_target(self.name, self._cvt2ext(output)) out.update_target(self.name, self._cvt2ext(output))
def read_status(self):
status = IDLE, 'controlling' if self.control_active else 'inactive'
if hasattr(super(), 'read_status'):
status = merge_status(super().read_status(), status)
return status
def cvt2int_square(self, output): def cvt2int_square(self, output):
return (math.sqrt(max(0, clamp(x, *self._get_range()))) for x in (output, self.output_min, self.output_max)) return (math.sqrt(max(0, clamp(x, *self._get_range()))) for x in (output, self.output_min, self.output_max))
@@ -146,10 +151,12 @@ class PImixin(HasOutputModule, Writable):
self._cvt2int = getattr(self, f'cvt2int_{self.output_func.name}') self._cvt2int = getattr(self, f'cvt2int_{self.output_func.name}')
self._cvt2ext = getattr(self, f'cvt2ext_{self.output_func.name}') self._cvt2ext = getattr(self, f'cvt2ext_{self.output_func.name}')
def write_control_active(self, value): # not needed, done by HasOutputModule.write_control_active
super().write_control_active(value) # def write_control_active(self, value):
if not value: # super().write_control_active(value)
self.output_module.write_target(0) # out = self.output_module
# if not value:
# out.write_target(out.parameters['target'].datatype.default)
def set_target(self, value): def set_target(self, value):
if not self.control_active: if not self.control_active: