From ec226a91248d5a9a7559335f71a3175152e348be Mon Sep 17 00:00:00 2001 From: Anik Stark Date: Wed, 10 Dec 2025 17:11:50 +0100 Subject: [PATCH] frappy_psi.ips_classic: converted file to oiclassic --- frappy_psi/ips_classic.py | 290 -------------------------------------- 1 file changed, 290 deletions(-) delete mode 100644 frappy_psi/ips_classic.py diff --git a/frappy_psi/ips_classic.py b/frappy_psi/ips_classic.py deleted file mode 100644 index 8a59586a..00000000 --- a/frappy_psi/ips_classic.py +++ /dev/null @@ -1,290 +0,0 @@ -#!/usr/bin/env python -# ***************************************************************************** -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# ***************************************************************************** -"""oxford instruments mercury IPS power supply""" - -import time -from frappy.core import Parameter, EnumType, FloatRange, BoolType, IntRange, Property, Module -from frappy.lib.enum import Enum -from frappy.errors import BadValueError, HardwareError -from frappy_psi.magfield import Magfield, SimpleMagfield, Status -from frappy_psi.mercury import MercuryChannel, off_on, Mapped -from frappy.states import Retry - -Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3) -hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set, - RTOZ=Action.run_to_zero, CLMP=Action.clamped) -CURRENT_CHECK_SIZE = 2 - - -class Field(Magfield): - action = Parameter('action', EnumType(Action), readonly=False) - setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) - voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) - atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0) - working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0) - persistent_field = Parameter( - 'persistent field at last switch off', FloatRange(unit='$'), readonly=False) - wait_switch_on = Parameter( - 'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60) - wait_switch_off = Parameter( - 'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60) - forced_persistent_field = Parameter( - 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) - - _field_mismatch = None - __persistent_field = None # internal value of persistent field - __switch_fixed_until = 0 - - def initModule(self): - super().initModule() - try: - self.write_action(Action.hold) - except Exception as e: - self.log.error('can not set to hold %r', e) - - def doPoll(self): - super().doPoll() - self.read_current() - - def initialReads(self): - # on restart, assume switch is changed long time ago, if not, the mercury - # will complain and this will be handled in start_ramp_to_field - self.switch_on_time = 0 - self.switch_off_time = 0 - self.switch_heater = self.query('DEV::PSU:SIG:SWHT', off_on) - super().initialReads() - - def read_value(self): - return self.query('DEV::PSU:SIG:FLD') - - def read_ramp(self): - return self.query('DEV::PSU:SIG:RFST') - - def write_ramp(self, value): - return self.change('DEV::PSU:SIG:RFST', value) - - def read_action(self): - return self.query('DEV::PSU:ACTN', hold_rtoz_rtos_clmp) - - def write_action(self, value): - return self.change('DEV::PSU:ACTN', value, hold_rtoz_rtos_clmp) - - def read_atob(self): - return self.query('DEV::PSU:ATOB') - - def read_voltage(self): - return self.query('DEV::PSU:SIG:VOLT') - - def read_working_ramp(self): - return self.query('DEV::PSU:SIG:RFLD') - - def read_setpoint(self): - return self.query('DEV::PSU:SIG:FSET') - - def set_and_go(self, value): - self.setpoint = self.change('DEV::PSU:SIG:FSET', value) - assert self.write_action(Action.hold) == Action.hold - assert self.write_action(Action.run_to_set) == Action.run_to_set - - def start_ramp_to_target(self, sm): - # if self.action != Action.hold: - # assert self.write_action(Action.hold) == Action.hold - # return Retry - self.set_and_go(sm.target) - sm.try_cnt = 5 - return self.ramp_to_target - - def ramp_to_target(self, sm): - try: - return super().ramp_to_target(sm) - except HardwareError: - sm.try_cnt -= 1 - if sm.try_cnt < 0: - raise - self.set_and_go(sm.target) - return Retry - - def final_status(self, *args, **kwds): - self.write_action(Action.hold) - return super().final_status(*args, **kwds) - - def on_restart(self, sm): - self.write_action(Action.hold) - return super().on_restart(sm) - - def read_value(self): - current = self.query('DEV::PSU:SIG:FLD') - if self.switch_heater == self.switch_heater.on: - self.__persistent_field = current - self.forced_persistent_field = False - return current - pf = self.query('DEV::PSU:SIG:PFLD') - if self.__persistent_field is None: - self.__persistent_field = pf - self._field_mismatch = False - else: - self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 - self.persistent_field = self.__persistent_field - return self.__persistent_field - - def _check_adr(self, adr): - """avoid complains about bad slot""" - if adr.startswith('DEV:PSU.M'): - return - super()._check_adr(adr) - - def read_current(self): - current = self.query('DEV::PSU:SIG:CURR') - if self.atob: - return current / self.atob - return 0 - - def write_persistent_field(self, value): - if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: - self._field_mismatch = False - self.__persistent_field = value - return value - raise BadValueError('changing persistent field needs forced_persistent_field=True') - - def write_target(self, target): - if self._field_mismatch: - self.forced_persistent_field = True - raise BadValueError('persistent field does not match - set persistent field to guessed value first') - return super().write_target(target) - - def read_switch_heater(self): - value = self.query('DEV::PSU:SIG:SWHT', off_on) - now = time.time() - if value != self.switch_heater: - if now < self.__switch_fixed_until: - self.log.debug('correct fixed switch time') - # probably switch heater was changed, but IPS reply is not yet updated - if self.switch_heater: - self.switch_on_time = time.time() - else: - self.switch_off_time = time.time() - return self.switch_heater - return value - - def read_wait_switch_on(self): - return self.query('DEV::PSU:SWONT') * 0.001 - - def read_wait_switch_off(self): - return self.query('DEV::PSU:SWOFT') * 0.001 - - def write_switch_heater(self, value): - if value == self.read_switch_heater(): - self.log.info('switch heater already %r', value) - # we do not want to restart the timer - return value - self.__switch_fixed_until = time.time() + 10 - self.log.debug('switch time fixed for 10 sec') - result = self.change('DEV::PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check - return result - - def start_ramp_to_field(self, sm): - if abs(self.current - self.__persistent_field) <= self.tolerance: - self.log.info('leads %g are already at %g', self.current, self.__persistent_field) - return self.ramp_to_field - try: - self.set_and_go(self.__persistent_field) - except (HardwareError, AssertionError) as e: - if self.switch_heater: - self.log.warn('switch is already on!') - return self.ramp_to_field - self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e) - sm.after_wait = self.ramp_to_field - return self.wait_for_switch - return self.ramp_to_field - - def start_ramp_to_target(self, sm): - sm.try_cnt = 5 - try: - self.set_and_go(sm.target) - except (HardwareError, AssertionError) as e: - self.log.warn('switch not yet ready %r', e) - self.status = Status.PREPARING, 'wait for switch on' - sm.after_wait = self.ramp_to_target - return self.wait_for_switch - return self.ramp_to_target - - def ramp_to_field(self, sm): - try: - return super().ramp_to_field(sm) - except HardwareError: - sm.try_cnt -= 1 - if sm.try_cnt < 0: - raise - self.set_and_go(self.__persistent_field) - return Retry - - def wait_for_switch(self, sm): - if not sm.delta(10): - return Retry - try: - self.log.warn('try again') - # try again - self.set_and_go(self.__persistent_field) - except (HardwareError, AssertionError): - return Retry - return sm.after_wait - - def wait_for_switch_on(self, sm): - self.read_switch_heater() # trigger switch_on/off_time - if self.switch_heater == self.switch_heater.off: - if sm.init: # avoid too many states chained - return Retry - self.log.warning('switch turned off manually?') - return self.start_switch_on - return super().wait_for_switch_on(sm) - - def wait_for_switch_off(self, sm): - self.read_switch_heater() - if self.switch_heater == self.switch_heater.on: - if sm.init: # avoid too many states chained - return Retry - self.log.warning('switch turned on manually?') - return self.start_switch_off - return super().wait_for_switch_off(sm) - - def start_ramp_to_zero(self, sm): - pf = self.query('DEV::PSU:SIG:PFLD') - if abs(pf - self.value) > self.tolerance * 10: - self.log.warning('persistent field %g does not match %g after switch off', pf, self.value) - try: - assert self.write_action(Action.hold) == Action.hold - assert self.write_action(Action.run_to_zero) == Action.run_to_zero - except (HardwareError, AssertionError) as e: - self.log.warn('switch not yet ready %r', e) - self.status = Status.PREPARING, 'wait for switch off' - sm.after_wait = self.ramp_to_zero - return self.wait_for_switch - return self.ramp_to_zero - - def ramp_to_zero(self, sm): - try: - return super().ramp_to_zero(sm) - except HardwareError: - sm.try_cnt -= 1 - if sm.try_cnt < 0: - raise - assert self.write_action(Action.hold) == Action.hold - assert self.write_action(Action.run_to_zero) == Action.run_to_zero - return Retry