software ramp mixin

+ fix frappy.lib.merge_status

Change-Id: I550eaeaab460a0d9ac1b027d59d4223dac4c0663
This commit is contained in:
zolliker 2023-05-25 16:14:06 +02:00
parent e4dbb90065
commit bef190b77d
2 changed files with 99 additions and 65 deletions

View File

@ -405,4 +405,8 @@ def merge_status(*args):
texts matching maximal code are joined with ', '
"""
maxcode = max(a[0] for a in args)
return maxcode, ', '.join([a[1] for a in args if a[0] == maxcode and a[1]])
# take status value matching highest status code
merged = [a[1] for a in args if a[0] == maxcode and a[1]]
# merge the split texts. use dict instead of set for keeping order
merged = {m: 0 for mm in merged for m in mm.split(', ')}
return maxcode, ', '.join(merged)

View File

@ -20,78 +20,108 @@
#
# *****************************************************************************
from frappy.datatypes import BoolType, EnumType, Enum
from frappy.core import Parameter, Writable, Attached
import time
from math import copysign
from frappy.datatypes import BoolType, FloatRange
from frappy.core import Parameter, BUSY
from frappy.lib import merge_status, clamp
from frappy.errors import RangeError
class HasControlledBy(Writable):
"""mixin for modules with controlled_by
class HasRamp:
"""software ramp"""
# make sure it is a drivable
status = Parameter()
target = Parameter()
ramp = Parameter('ramp ratge', FloatRange(0), default=0, readonly=False)
ramp_used = Parameter('False: infinite ramp', BoolType(), default=False, readonly=False)
setpoint = Parameter('ramping setpoint', FloatRange())
maxlag = Parameter('max lag between setpoint and value',
FloatRange(0, unit='s'), default=60, readonly=False)
rampinterval = Parameter('interval for changing the setpoint', FloatRange(0, unit='s'),
default=1, readonly=False)
_ramp_status = None
_last_time = None
in the :meth:`write_target` the hardware action to switch to own control should be done
and in addition self.self_controlled() should be called
"""
controlled_by = Parameter('source of target value', EnumType(members={'self': 0}), default=0)
inputCallbacks = ()
def checkProperties(self):
unit = self.parameters['value'].datatype.unit
self.parameters['setpoint'].setProperty('unit', unit)
self.writeDict['setpoint'] = self.parameters['setpoint'].default
super().checkProperties()
def register_input(self, name, control_off):
"""register input
def doPoll(self):
super().doPoll() # suppose that this is reading value and status
self.ramp_step(self.target)
:param name: the name of the module (for controlled_by enum)
:param control_off: a method on the input module to switch off control
"""
if not self.inputCallbacks:
self.inputCallbacks = {}
self.inputCallbacks[name] = control_off
prev_enum = self.parameters['controlled_by'].datatype.export_datatype()['members']
# add enum member, using autoincrement feature of Enum
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
def write_setpoint(self, value):
# written only once at startup
self._last_time = time.time()
self.setpoint = self.read_value()
def self_controlled(self):
"""method to change controlled_by to self
def ramp_step(self, target):
now = time.time()
setpoint = self.setpoint
if self._ramp_status is not None:
if setpoint == target:
self._ramp_status = None # at target
else:
sign = copysign(1, target - setpoint)
delay = now - self._last_time
dif = (setpoint - self.value) * sign
ramp_sec = self.ramp / 60.0
if dif < self.maxlag * ramp_sec:
setpoint += delay * sign * ramp_sec
if sign * (setpoint - target) >= 0:
self.setpoint = target
self._ramp_status = None # at target
else:
self.setpoint = setpoint
self._ramp_status = 'ramping'
super().write_target(self.setpoint)
else:
self._ramp_status = 'holding'
self._last_time = now
self.read_status()
must be called from the write_target method
"""
if self.controlled_by:
self.controlled_by = 0
for name, control_off in self.inputCallbacks.items():
control_off(self.name)
def read_status(self):
status = super().read_status()
if self._ramp_status is None:
if self.pollInfo.fast_flag:
self.setFastPoll(False)
return status
if self.pollInfo.interval != self.rampinterval:
self.setFastPoll(True, self.rampinterval)
return merge_status((BUSY, self._ramp_status), status)
def write_ramp(self, ramp):
if ramp:
self.write_ramp_used(True)
else:
raise RangeError('ramp must not 0, use ramp_used = False to disable ramping')
return ramp
class HasOutputModule(Writable):
"""mixin for modules having an output module
def write_ramp_used(self, used):
if used != self.ramp_used:
self.ramp_used = used
if self._ramp_status:
self.write_target(self.target)
in the :meth:`write_target` the hardware action to switch to own control should be done
and in addition self.activate_output() should be called
"""
# allow unassigned output module, it should be possible to configure a
# module with fixed control
output_module = Attached(HasControlledBy, mandatory=False)
control_active = Parameter('control mode', BoolType())
def read_target(self):
if not self._ramp_status:
return super().read_target()
return self.target
def initModule(self):
super().initModule()
if self.output_module:
self.output_module.register_input(self.name, self.control_off)
def activate_output(self):
"""method to switch control_active on
self.activate_output() must be called from the write_target method
"""
out = self.output_module
if out:
for name, control_off in out.inputCallbacks.items():
if name != self.name:
control_off(self.name)
out.controlled_by = self.name
self.control_active = True
def control_off(self, switched_by):
"""control_off is called, when an other module takes over control
if possible avoid hardware access in an overriding method in an overriding method
as this might lead to a deadlock with the modules accessLock
"""
if self.control_active:
self.control_active = False
self.log.warning(f'switched to manual mode by {switched_by}')
def write_target(self, target):
if not self.ramp_used:
self._ramp_status = None
self.setpoint = target
super().write_target(target)
return target
self._ramp_status = 'changed target'
v = self.read_value()
maxdif = self.maxlag * self.ramp / 60
# setpoint must not differ too much from value
self.setpoint = clamp(v - maxdif, self.setpoint, v + maxdif)
self.setFastPoll(True, self.rampinterval)
self.ramp_step(target)
return target