258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
|
|
"""modules to access parameters"""
|
|
|
|
from frappy.core import Drivable, EnumType, IDLE, Attached, StringType, Property, \
|
|
Parameter, BoolType, FloatRange, Readable, ERROR, nopoll
|
|
from frappy.errors import ConfigError
|
|
from frappy_psi.convergence import HasConvergence
|
|
from frappy_psi.mixins import HasRamp
|
|
from frappy.lib import merge_status
|
|
|
|
|
|
class Par(Readable):
|
|
value = Parameter(datatype=FloatRange(unit='$'))
|
|
read = Attached(description='<module>.<parameter> for read')
|
|
unit = Property('main unit', StringType())
|
|
|
|
def setProperty(self, key, value):
|
|
if key == 'read':
|
|
value, param = value.split('.')
|
|
setattr(self, f'{key}_param', param)
|
|
super().setProperty(key, value)
|
|
|
|
def checkProperties(self):
|
|
self.applyMainUnit(self.unit)
|
|
if self.read == self.name :
|
|
raise ConfigError('illegal recursive read/write module')
|
|
super().checkProperties()
|
|
|
|
def read_value(self):
|
|
return getattr(self.read, f'{self.read_param}')
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class Driv(Drivable):
|
|
# this should be Writable instead!
|
|
value = Parameter(datatype=FloatRange(unit='$'))
|
|
target = Parameter(datatype=FloatRange(unit='$'))
|
|
read = Attached(description='<module>.<parameter> for read')
|
|
write = Attached(description='<module>.<parameter> for read')
|
|
unit = Property('main unit', StringType())
|
|
__error = None
|
|
__status = None
|
|
|
|
def setProperty(self, key, value):
|
|
# split properties read/write (including .<param>)
|
|
# into read/write (modules) and _read_param/_write_param (parameters)
|
|
if key in ('read', 'write'):
|
|
value, param = value.split('.')
|
|
setattr(self, f'_{key}_param', param)
|
|
super().setProperty(key, value)
|
|
|
|
def checkProperties(self):
|
|
self.applyMainUnit(self.unit)
|
|
if self.read == self.name or self.write == self.name:
|
|
raise ConfigError('illegal recursive read/write module')
|
|
super().checkProperties()
|
|
|
|
def initModule(self):
|
|
super().initModule()
|
|
if self._read_param == 'value':
|
|
try:
|
|
self.read.addCallback('status', self.updateReadStatus)
|
|
except KeyError:
|
|
pass # may be not needed: value is present but not status
|
|
self.read.addCallback(self._read_param, self.updateValue)
|
|
self.write.addCallback(self._write_param, self.announceUpdate, 'target')
|
|
|
|
def updateReadStatus(self, status, err=None):
|
|
self.__status = status
|
|
self.read_status()
|
|
|
|
def updateValue(self, value, err=None):
|
|
self.announceUpdate('value', value, err)
|
|
if err != self.__error:
|
|
self.__error = err
|
|
self.read_status()
|
|
|
|
#def startModule(self, start_events):
|
|
# start_events.queue(self.registerUpdates)
|
|
# super().startModule(start_events)
|
|
|
|
@nopoll
|
|
def read_value(self):
|
|
return getattr(self.read, f'read_{self._read_param}')()
|
|
|
|
@nopoll
|
|
def read_target(self):
|
|
return getattr(self.write, f'read_{self._write_param}')()
|
|
|
|
@nopoll
|
|
def read_status(self):
|
|
if self.__status:
|
|
if self.__status[0] < ERROR and self.__error:
|
|
return ERROR, repr(self.__error)
|
|
return self.__status
|
|
return (ERROR, repr(self.__error)) if self.__error else (IDLE, '')
|
|
|
|
def write_target(self, target):
|
|
return getattr(self.write, f'write_{self._write_param}')(target)
|
|
|
|
|
|
class Converging(HasConvergence, Driv):
|
|
"""drivable with convergence"""
|
|
pollinterval = 1
|
|
|
|
def checkProperties(self):
|
|
self.parameters['tolerance'].setProperty('unit', self.unit)
|
|
super().checkProperties()
|
|
|
|
def write_target(self, target):
|
|
self.convergence_start()
|
|
return super().write_target(target)
|
|
|
|
|
|
class RampDriv(HasRamp, Driv):
|
|
pass
|
|
|
|
|
|
def set_enabled(modobj, value):
|
|
"""set enabled on module if available"""
|
|
if hasattr(modobj, 'enabled') and modobj.enabled != value:
|
|
modobj.write_enabled(value)
|
|
|
|
|
|
def get_value(obj, default):
|
|
"""get the value of given module. if not valid, return the default"""
|
|
if not getattr(obj, 'enabled', True):
|
|
return default
|
|
# consider also that a value 0 is invalid
|
|
return (obj.value if IDLE <= obj.status[0] < ERROR else 0) or default
|
|
|
|
|
|
LOW = 0
|
|
HIGH = 1
|
|
|
|
|
|
class SwitchDriv(HasConvergence, Drivable):
|
|
low = Attached(description='low range module')
|
|
high = Attached(description='high range module')
|
|
min_high = Parameter('minimum high target', FloatRange(unit='$'), readonly=False)
|
|
max_low = Parameter('maximum low target', FloatRange(unit='$'), readonly=False)
|
|
# disable_other = Parameter('whether to disable unused channel', BoolType(), readonly=False)
|
|
selected = Parameter('selected module', EnumType(low=LOW, high=HIGH), readonly=False, default=0)
|
|
autoswitch = Parameter('switch sensor automatically', BoolType(), readonly=False, default=True)
|
|
_switch_target = None # if not None, switch to selection when mid range is reached
|
|
|
|
# TODO: copy units from attached module
|
|
# TODO: callbacks for updates
|
|
|
|
def doPoll(self):
|
|
super().doPoll()
|
|
if self._switch_target is not None:
|
|
mid = (self.min_high + self.max_low) * 0.5
|
|
if self._switch_target == HIGH:
|
|
low = get_value(self.low, mid) # returns mid when low is invalid
|
|
if low > mid:
|
|
self.value = self.low.value
|
|
self._switch_target = None
|
|
self.write_target(self.target)
|
|
return
|
|
else:
|
|
high = get_value(self.high, mid) # return mid when high is invalid
|
|
if high < mid: # change to self.max_low
|
|
self.value = self.high.value
|
|
self._switch_target = None
|
|
self.write_target(self.target)
|
|
return
|
|
if not self.isBusy() and self.autoswitch:
|
|
low = get_value(self.low, self.max_low)
|
|
high = get_value(self.high, self.min_high)
|
|
low_valid = low < self.max_low
|
|
high_valid = high > self.min_high
|
|
if high_valid and high > self.max_low:
|
|
if not low_valid and not self.low.control_active:
|
|
set_enabled(self.low, False)
|
|
return
|
|
if low_valid and low < self.min_high:
|
|
if not high_valid and not self.high.control_active:
|
|
set_enabled(self.high, False)
|
|
return
|
|
# keep only one channel on
|
|
#set_enabled(self.low, True)
|
|
#set_enabled(self.high, True)
|
|
|
|
def get_selected(self):
|
|
low = get_value(self.low, self.max_low)
|
|
high = get_value(self.high, self.min_high)
|
|
if low < self.min_high:
|
|
return 0
|
|
if high > self.max_low:
|
|
return 1
|
|
return self.selected
|
|
|
|
def read_value(self):
|
|
return self.low.value if self.get_selected() == LOW else self.high.value
|
|
|
|
def read_status(self):
|
|
status = self.low.status if self.get_selected() == LOW else self.high.status
|
|
if status[0] >= ERROR:
|
|
return status
|
|
return super().read_status() # convergence status
|
|
|
|
def write_target(self, target):
|
|
this, other = self.low, self.high
|
|
selected = self.selected
|
|
target1 = target
|
|
self._switch_target = None
|
|
if target > self.max_low * 0.75 + self.min_high * 0.25:
|
|
if self.value < self.min_high:
|
|
target1 = min(target, self.max_low)
|
|
self._switch_target = HIGH
|
|
selected = LOW
|
|
else:
|
|
this, other = other, this
|
|
selected = HIGH
|
|
elif target < self.min_high * 0.75 + self.max_low * 0.25:
|
|
# reinstall this with higher threshold (e.g. 4 K)?
|
|
#if self.value > self.max_low:
|
|
# target1 = max(self.min_high, target)
|
|
# self._switch_target = LOW
|
|
# this, other = other, this
|
|
# selected = HIGH
|
|
#else:
|
|
selected = LOW
|
|
elif self.selected == HIGH:
|
|
this, other = other, this
|
|
if hasattr(other, 'control_off'):
|
|
other.control_off()
|
|
set_enabled(this, True)
|
|
set_enabled(other, False)
|
|
self.write_selected(selected)
|
|
self.convergence_start()
|
|
self.log.info('target=%g (%s)', target, this.name)
|
|
this.write_target(target1)
|
|
return target
|