the reasonly class frappy_psi.parmod.Par represents a parameter or a component of a tuple parameter Change-Id: I47208c9d7a6fc377cd56b82cc6a9e8cdb433fe8e
124 lines
3.9 KiB
Python
124 lines
3.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Module authors:
|
|
# Markus Zolliker <markus.zolliker@psi.ch>
|
|
#
|
|
# *****************************************************************************
|
|
|
|
"""modules to access parameters"""
|
|
|
|
from frappy.core import Drivable, IDLE, Attached, StringType, Property, \
|
|
Parameter, FloatRange, Readable
|
|
from frappy.errors import ConfigError
|
|
from frappy_psi.convergence import HasConvergence
|
|
from frappy_psi.mixins import HasRamp
|
|
|
|
|
|
class Par(Readable):
|
|
value = Parameter(datatype=FloatRange(unit='$'))
|
|
read = Attached(description='<module>.<parameter> for read')
|
|
unit = Property('main unit', StringType())
|
|
|
|
def setProperty(self, key, value):
|
|
if key == 'read':
|
|
value, param = value.split('.')
|
|
setattr(self, f'{key}_param', param)
|
|
super().setProperty(key, value)
|
|
|
|
def checkProperties(self):
|
|
self.applyMainUnit(self.unit)
|
|
if self.read == self.name :
|
|
raise ConfigError('illegal recursive read/write module')
|
|
super().checkProperties()
|
|
|
|
def read_value(self):
|
|
return getattr(self.read, f'{self.read_param}')
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class Driv(Drivable):
|
|
value = Parameter(datatype=FloatRange(unit='$'))
|
|
target = Parameter(datatype=FloatRange(unit='$'))
|
|
read = Attached(description='<module>.<parameter> for read')
|
|
write = Attached(description='<module>.<parameter> for read')
|
|
unit = Property('main unit', StringType())
|
|
|
|
def setProperty(self, key, value):
|
|
if key in ('read', 'write'):
|
|
value, param = value.split('.')
|
|
setattr(self, f'{key}_param', param)
|
|
super().setProperty(key, value)
|
|
|
|
def checkProperties(self):
|
|
self.applyMainUnit(self.unit)
|
|
if self.read == self.name or self.write == self.name:
|
|
raise ConfigError('illegal recursive read/write module')
|
|
super().checkProperties()
|
|
|
|
#def registerUpdates(self):
|
|
# self.read.valueCallbacks[self.read_param].append(self.update_value)
|
|
# self.write.valueCallbacks[self.write_param].append(self.update_target)
|
|
#
|
|
#def startModule(self, start_events):
|
|
# start_events.queue(self.registerUpdates)
|
|
# super().startModule(start_events)
|
|
|
|
def read_value(self):
|
|
return getattr(self.read, f'{self.read_param}')
|
|
|
|
def read_target(self):
|
|
return getattr(self.write, f'{self.write_param}')
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
def write_target(self, target):
|
|
return getattr(self.write, f'write_{self.write_param}')(target)
|
|
|
|
|
|
class Converging(HasConvergence, Driv):
|
|
"""drivable with convergence"""
|
|
pollinterval = 1
|
|
|
|
def checkProperties(self):
|
|
self.parameters['tolerance'].setProperty('unit', self.unit)
|
|
super().checkProperties()
|
|
|
|
#def update_value(self, value):
|
|
# print('UV', value)
|
|
# self.value = value
|
|
|
|
#def error_update_value(self, err):
|
|
# raise err
|
|
|
|
#def update_target(self, value):
|
|
# self.target = value
|
|
|
|
#def error_update_target(self, err):
|
|
# raise err
|
|
|
|
def write_target(self, target):
|
|
self.convergence_start()
|
|
return super().write_target(target)
|
|
|
|
|
|
class RampDriv(HasRamp, Driv):
|
|
pass
|