# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """modules to access parameters""" from frappy.core import Drivable, EnumType, IDLE, Attached, StringType, Property, \ Parameter, FloatRange, Readable, ERROR from frappy.errors import ConfigError from frappy_psi.convergence import HasConvergence from frappy_psi.mixins import HasRamp from frappy.lib import merge_status class Par(Readable): value = Parameter(datatype=FloatRange(unit='$')) read = Attached(description='. for read') unit = Property('main unit', StringType()) def setProperty(self, key, value): if key == 'read': value, param = value.split('.') setattr(self, f'{key}_param', param) super().setProperty(key, value) def checkProperties(self): self.applyMainUnit(self.unit) if self.read == self.name : raise ConfigError('illegal recursive read/write module') super().checkProperties() def read_value(self): return getattr(self.read, f'{self.read_param}') def read_status(self): return IDLE, '' class Driv(Drivable): value = Parameter(datatype=FloatRange(unit='$')) target = Parameter(datatype=FloatRange(unit='$')) read = Attached(description='. for read') write = Attached(description='. for read') unit = Property('main unit', StringType()) def setProperty(self, key, value): if key in ('read', 'write'): value, param = value.split('.') setattr(self, f'{key}_param', param) super().setProperty(key, value) def checkProperties(self): self.applyMainUnit(self.unit) if self.read == self.name or self.write == self.name: raise ConfigError('illegal recursive read/write module') super().checkProperties() #def registerUpdates(self): # self.read.valueCallbacks[self.read_param].append(self.update_value) # self.write.valueCallbacks[self.write_param].append(self.update_target) # #def startModule(self, start_events): # start_events.queue(self.registerUpdates) # super().startModule(start_events) def read_value(self): return getattr(self.read, f'{self.read_param}') def read_target(self): return getattr(self.write, f'{self.write_param}') def read_status(self): return IDLE, '' def write_target(self, target): return getattr(self.write, f'write_{self.write_param}')(target) class Converging(HasConvergence, Driv): """drivable with convergence""" pollinterval = 1 def checkProperties(self): self.parameters['tolerance'].setProperty('unit', self.unit) super().checkProperties() #def update_value(self, value): # print('UV', value) # self.value = value #def error_update_value(self, err): # raise err #def update_target(self, value): # self.target = value #def error_update_target(self, err): # raise err def write_target(self, target): self.convergence_start() return super().write_target(target) class RampDriv(HasRamp, Driv): pass def set_enabled(modobj, value): """set enabled on module if available""" if hasattr(modobj, 'enabled') and modobj.enabled != value: modobj.write_enabled(value) def get_value(obj, default): """get the value of given module. if not valid, return the limit (min_high or max_low)""" if not getattr(obj, 'enabled', True): return default return obj.value if IDLE <= obj.status[0] < ERROR else default class SwitchDriv(HasConvergence, Drivable): low = Attached(description='low range module') high = Attached(description='high range module') min_high = Parameter('minimum high target', FloatRange(unit='$'), readonly=False) max_low = Parameter('maximum low target', FloatRange(unit='$'), readonly=False) # disable_other = Parameter('whether to disable unused channel', BoolType(), readonly=False) selected = Parameter('selected module', EnumType(low=0, high=1), readonly=False, default=0) # TODO: copy units from attached module # TODO: callbacks for updates def doPoll(self): super().doPoll() if not self.isBusy(): low = get_value(self.low, self.max_low) high = get_value(self.high, self.min_high) low_valid = low < self.max_low high_valid = high > self.min_high if high_valid and high > self.max_low: if not low_valid: set_enabled(self.low, False) return if low_valid and low < self.min_high: if not high_valid: set_enabled(self.high, False) return set_enabled(self.low, True) set_enabled(self.high, True) def read_value(self): return self.low.value if self.selected == self.selected.low else self.high.value def read_status(self): status = self.low.status if self.selected == self.selected.low else self.high.status if status[0] >= ERROR: return status return super().read_status() # convergence status def read_target(self): if self.selected == self.selected.low: return self.low.target return self.high.target def write_target(self, target): this, other = self.low, self.high selected = self.selected if target > self.max_low: this, other = other, this selected = self.selected.high elif target < self.min_high: selected = self.selected.low elif self.selected == self.selected.high: this, other = other, this if hasattr(other, 'control_off'): other.control_off() set_enabled(this, True) set_enabled(other, False) self.write_selected(selected) self.convergence_start() return this.write_target(target)