Merge branch 'wip' of gitlab.psi.ch-samenv:samenv/frappy into wip
This commit is contained in:
@ -25,7 +25,7 @@ from frappy.core import Readable, Parameter, IntRange, EnumType, FloatRange, \
|
||||
Attached, StructOf, WARN, Done, BoolType, Enum
|
||||
|
||||
from frappy_psi.convergence import HasConvergence
|
||||
from frappy_psi.mixins import HasOutputModule, HasControlledBy
|
||||
from frappy.mixins import HasOutputModule, HasControlledBy
|
||||
|
||||
|
||||
class Ls340IO(StringIO):
|
||||
|
@ -20,78 +20,120 @@
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
from frappy.datatypes import BoolType, EnumType, Enum
|
||||
from frappy.core import Parameter, Writable, Attached
|
||||
import time
|
||||
from math import copysign
|
||||
from frappy.datatypes import BoolType, FloatRange
|
||||
from frappy.core import Parameter, BUSY
|
||||
from frappy.lib import merge_status, clamp
|
||||
from frappy.errors import RangeError
|
||||
|
||||
|
||||
class HasControlledBy(Writable):
|
||||
"""mixin for modules with controlled_by
|
||||
class HasRamp:
|
||||
"""software ramp"""
|
||||
# make sure it is a drivable
|
||||
status = Parameter()
|
||||
target = Parameter()
|
||||
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), default=0, readonly=False)
|
||||
ramp_used = Parameter('False: infinite ramp', BoolType(), default=False, readonly=False)
|
||||
setpoint = Parameter('ramping setpoint', FloatRange(unit='$'), readonly=False)
|
||||
maxdif = Parameter('''max. difference between setpoint and value
|
||||
|
||||
stop ramp then value lags behind setpoint by more than maxdif
|
||||
maxdif=0: use 'ramp' value (value lags 1 minute behind setpoint)
|
||||
''',
|
||||
FloatRange(0, unit='$'), default=0, readonly=False)
|
||||
rampinterval = Parameter('interval for changing the setpoint', FloatRange(0, unit='s'),
|
||||
default=1, readonly=False)
|
||||
workingramp = Parameter('effective ramp', FloatRange(unit='$/min'))
|
||||
|
||||
in the :meth:`write_target` the hardware action to switch to own control should be done
|
||||
and in addition self.self_controlled() should be called
|
||||
"""
|
||||
controlled_by = Parameter('source of target value', EnumType(members={'self': 0}), default=0)
|
||||
inputCallbacks = ()
|
||||
_ramp_status = None
|
||||
_last_time = None
|
||||
_buffer = 0
|
||||
|
||||
def register_input(self, name, control_off):
|
||||
"""register input
|
||||
def doPoll(self):
|
||||
super().doPoll() # suppose that this is reading value and status
|
||||
self.ramp_step(self.target)
|
||||
|
||||
:param name: the name of the module (for controlled_by enum)
|
||||
:param control_off: a method on the input module to switch off control
|
||||
"""
|
||||
if not self.inputCallbacks:
|
||||
self.inputCallbacks = {}
|
||||
self.inputCallbacks[name] = control_off
|
||||
prev_enum = self.parameters['controlled_by'].datatype.export_datatype()['members']
|
||||
# add enum member, using autoincrement feature of Enum
|
||||
self.parameters['controlled_by'].datatype = EnumType(Enum(prev_enum, **{name: None}))
|
||||
def ramp_step(self, target):
|
||||
now = time.time()
|
||||
setpoint = self.setpoint
|
||||
if self._ramp_status is not None:
|
||||
if setpoint == target:
|
||||
self._ramp_status = None # at target
|
||||
self.workingramp = 0
|
||||
else:
|
||||
sign = copysign(1, target - setpoint)
|
||||
prev_t, prev_v = self._last_point
|
||||
if self.value == prev_v:
|
||||
return # no reads happened
|
||||
delay = (now - prev_t) / 60.0 # minutes !
|
||||
slope = (self.value - prev_v) / max(1e-5, delay)
|
||||
dif = (setpoint - self.value) * sign
|
||||
maxdif = self.maxdif or self.ramp
|
||||
if dif < maxdif:
|
||||
# reduce ramp when slope is bigger than ramp
|
||||
ramp = max(2 * self.ramp - sign * slope, 0) + self._buffer
|
||||
if ramp > self.ramp:
|
||||
self._buffer = min(ramp - self.ramp, self.ramp)
|
||||
ramp = self.ramp
|
||||
else:
|
||||
self._buffer = 0
|
||||
setpoint += sign * delay * ramp
|
||||
if sign * (setpoint - target) >= 0:
|
||||
self.write_setpoint(setpoint)
|
||||
self.workingramp = 0
|
||||
self._ramp_status = None # at target
|
||||
else:
|
||||
if ramp != self.workingramp:
|
||||
self.workingramp = sign * ramp
|
||||
self.write_setpoint(setpoint)
|
||||
self._ramp_status = 'ramping'
|
||||
else:
|
||||
self._ramp_status = 'holding'
|
||||
self._last_point = now, self.value
|
||||
self.read_status()
|
||||
|
||||
def self_controlled(self):
|
||||
"""method to change controlled_by to self
|
||||
def read_status(self):
|
||||
status = super().read_status()
|
||||
if self._ramp_status is None:
|
||||
if self.pollInfo.fast_flag:
|
||||
self.setFastPoll(False)
|
||||
return status
|
||||
if self.pollInfo.interval != self.rampinterval:
|
||||
self.setFastPoll(True, self.rampinterval)
|
||||
return merge_status((BUSY, self._ramp_status), status)
|
||||
|
||||
must be called from the write_target method
|
||||
"""
|
||||
if self.controlled_by:
|
||||
self.controlled_by = 0
|
||||
for name, control_off in self.inputCallbacks.items():
|
||||
control_off(self.name)
|
||||
def write_ramp(self, ramp):
|
||||
if ramp:
|
||||
self.write_ramp_used(True)
|
||||
else:
|
||||
raise RangeError('ramp must not 0, use ramp_used = False to disable ramping')
|
||||
return ramp
|
||||
|
||||
def write_ramp_used(self, used):
|
||||
if used != self.ramp_used:
|
||||
self.ramp_used = used
|
||||
if self._ramp_status:
|
||||
self.write_target(self.target)
|
||||
|
||||
class HasOutputModule(Writable):
|
||||
"""mixin for modules having an output module
|
||||
def write_setpoint(self, setpoint):
|
||||
super().write_target(setpoint)
|
||||
return setpoint
|
||||
|
||||
in the :meth:`write_target` the hardware action to switch to own control should be done
|
||||
and in addition self.activate_output() should be called
|
||||
"""
|
||||
# allow unassigned output module, it should be possible to configure a
|
||||
# module with fixed control
|
||||
output_module = Attached(HasControlledBy, mandatory=False)
|
||||
control_active = Parameter('control mode', BoolType())
|
||||
def read_target(self):
|
||||
if not self._ramp_status:
|
||||
return super().read_target()
|
||||
return self.target
|
||||
|
||||
def initModule(self):
|
||||
super().initModule()
|
||||
if self.output_module:
|
||||
self.output_module.register_input(self.name, self.control_off)
|
||||
|
||||
def activate_output(self):
|
||||
"""method to switch control_active on
|
||||
|
||||
self.activate_output() must be called from the write_target method
|
||||
"""
|
||||
out = self.output_module
|
||||
if out:
|
||||
for name, control_off in out.inputCallbacks.items():
|
||||
if name != self.name:
|
||||
control_off(self.name)
|
||||
out.controlled_by = self.name
|
||||
self.control_active = True
|
||||
|
||||
def control_off(self, switched_by):
|
||||
"""control_off is called, when an other module takes over control
|
||||
|
||||
if possible avoid hardware access in an overriding method in an overriding method
|
||||
as this might lead to a deadlock with the modules accessLock
|
||||
"""
|
||||
if self.control_active:
|
||||
self.control_active = False
|
||||
self.log.warning(f'switched to manual mode by {switched_by}')
|
||||
def write_target(self, target):
|
||||
if self.ramp_used:
|
||||
if self.parameters['setpoint'].readerror:
|
||||
self.write_setpoint(self.read_value())
|
||||
self._ramp_status = 'changed target'
|
||||
self._last_time = time.time()
|
||||
self.setFastPoll(True, self.rampinterval)
|
||||
self.ramp_step(target)
|
||||
return target
|
||||
self._ramp_status = None
|
||||
self.write_setpoint(target)
|
||||
return target
|
||||
|
99
frappy_psi/parmod.py
Normal file
99
frappy_psi/parmod.py
Normal file
@ -0,0 +1,99 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Markus Zolliker <markus.zolliker@psi.ch>
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
"""modules to access parameters"""
|
||||
|
||||
from frappy.core import Drivable, IDLE, Attached, StringType, Property, \
|
||||
Parameter, FloatRange
|
||||
from frappy.errors import ConfigError
|
||||
from frappy_psi.convergence import HasConvergence
|
||||
from frappy_psi.mixins import HasRamp
|
||||
|
||||
|
||||
class Driv(Drivable):
|
||||
value = Parameter(datatype=FloatRange(unit='$'))
|
||||
target = Parameter(datatype=FloatRange(unit='$'))
|
||||
read = Attached(description='<module>.<parameter> for read')
|
||||
write = Attached(description='<module>.<parameter> for read')
|
||||
unit = Property('main unit', StringType())
|
||||
|
||||
def setProperty(self, key, value):
|
||||
if key in ('read', 'write'):
|
||||
value, param = value.split('.')
|
||||
setattr(self, f'{key}_param', param)
|
||||
super().setProperty(key, value)
|
||||
|
||||
def checkProperties(self):
|
||||
self.applyMainUnit(self.unit)
|
||||
if self.read == self.name or self.write == self.name:
|
||||
raise ConfigError('illegal recursive read/write module')
|
||||
super().checkProperties()
|
||||
|
||||
#def registerUpdates(self):
|
||||
# self.read.valueCallbacks[self.read_param].append(self.update_value)
|
||||
# self.write.valueCallbacks[self.write_param].append(self.update_target)
|
||||
#
|
||||
#def startModule(self, start_events):
|
||||
# start_events.queue(self.registerUpdates)
|
||||
# super().startModule(start_events)
|
||||
|
||||
def read_value(self):
|
||||
return getattr(self.read, f'{self.read_param}')
|
||||
|
||||
def read_target(self):
|
||||
return getattr(self.write, f'{self.write_param}')
|
||||
|
||||
def read_status(self):
|
||||
return IDLE, ''
|
||||
|
||||
def write_target(self, target):
|
||||
return getattr(self.write, f'write_{self.write_param}')(target)
|
||||
|
||||
|
||||
class Converging(HasConvergence, Driv):
|
||||
"""drivable with convergence"""
|
||||
pollinterval = 1
|
||||
|
||||
def checkProperties(self):
|
||||
self.parameters['tolerance'].setProperty('unit', self.unit)
|
||||
super().checkProperties()
|
||||
|
||||
#def update_value(self, value):
|
||||
# print('UV', value)
|
||||
# self.value = value
|
||||
|
||||
#def error_update_value(self, err):
|
||||
# raise err
|
||||
|
||||
#def update_target(self, value):
|
||||
# self.target = value
|
||||
|
||||
#def error_update_target(self, err):
|
||||
# raise err
|
||||
|
||||
def write_target(self, target):
|
||||
self.convergence_start()
|
||||
return super().write_target(target)
|
||||
|
||||
|
||||
class RampDriv(HasRamp, Driv):
|
||||
pass
|
@ -48,9 +48,8 @@ from frappy.modules import Attached, Command, Done, Drivable, \
|
||||
from frappy.protocol.dispatcher import make_update
|
||||
|
||||
|
||||
CFG_HEADER = """Node(
|
||||
description = '''%(nodedescr)s''',
|
||||
id = %(config)s.sea.psi.ch,
|
||||
CFG_HEADER = """Node('%(config)s.sea.psi.ch',
|
||||
'''%(nodedescr)s''',
|
||||
)
|
||||
Mod(%(seaconn)r,
|
||||
'frappy_psi.sea.SeaClient',
|
||||
|
Reference in New Issue
Block a user