#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """oxford instruments triton (kelvinoxjt dil)""" from math import sqrt from secop.core import Writable, Parameter, Readable, Drivable, IDLE, WARN, BUSY, ERROR, Done from secop.datatypes import EnumType, FloatRange from secop.lib.enum import Enum from secop_psi.mercury import MercuryChannel, Mapped, off_on, HasInput, SELF import secop_psi.mercury as mercury actions = Enum(none=0, condense=1, circulate=2, collect=3) open_close = Mapped(CLOSE=0, OPEN=1) actions_map = Mapped(STOP=actions.none, COND=actions.condense, COLL=actions.collect) actions_map.mapping['NONE'] = actions.none # when writing, STOP is used instead of NONE class Action(MercuryChannel, Writable): channel_type = 'ACTN' value = Parameter('running action', EnumType(actions)) target = Parameter('action to do', EnumType(none=0, condense=1, collect=3), readonly=False) _target = 0 def read_value(self): return self.query('SYS:DR:ACTN', actions_map) def read_target(self): return self._target def write_target(self, value): self._target = value return self.change('SYS:DR:ACTN', value, actions_map) # actions: # NONE (no action) # COND (condense mixture) # COLL (collect mixture) # STOP (go to NONE) # # not yet used (would need a subclass of Action): # CLDN (cool down) # PCL (precool automation) # PCOND (pause pre-cool (not condense?) automation) # RCOND (resume pre-cool (not condense?) automation) # WARM (warm-up) # EPCL (empty pre-cool automation) class Valve(MercuryChannel, Drivable): channel_type = 'VALV' value = Parameter('valve state', EnumType(closed=0, opened=1)) target = Parameter('valve target', EnumType(close=0, open=1)) _try_count = None def doPoll(self): self.read_status() def read_value(self): return self.query('VALV:SIG:STATE', open_close) def read_status(self): pos = self.read_value() if self._try_count is None: # not switching return IDLE, '' if pos == self.target: # success if self._try_count: # make sure last sent command was not opposite self.change('VALV:SIG:STATE', self.target, open_close) self._try_count = None self.setFastPoll(False) return IDLE, '' self._try_count += 1 if self._try_count % 4 == 0: # send to opposite position in order to unblock self.change('VALV:SIG:STATE', pos, open_close) return BUSY, 'unblock' if self._try_count > 9: # make sure system does not toggle later self.change('VALV:SIG:STATE', pos, open_close) return ERROR, 'can not %s valve' % self.target.name self.change('VALV:SIG:STATE', self.target, open_close) return BUSY, 'waiting' def write_target(self, value): if value != self.read_value(): self._try_count = 0 self.setFastPoll(True, 0.25) self.change('VALV:SIG:STATE', value, open_close) self.status = BUSY, self.target.name return value class Pump(MercuryChannel, Writable): channel_type = 'PUMP' value = Parameter('pump state', EnumType(off=0, on=1)) target = Parameter('pump target', EnumType(off=0, on=1)) def read_value(self): return self.query('PUMP:SIG:STATE', off_on) def write_target(self, value): return self.change('PUMP:SIG:STATE', value, off_on) def read_status(self): return IDLE, '' class TurboPump(Pump): power = Parameter('pump power', FloatRange(unit='W')) freq = Parameter('pump frequency', FloatRange(unit='Hz')) powerstage_temp = Parameter('temperature of power stage', FloatRange(unit='K')) motor_temp = Parameter('temperature of motor', FloatRange(unit='K')) bearing_temp = Parameter('temperature of bearing', FloatRange(unit='K')) pumpbase_temp = Parameter('temperature of pump base', FloatRange(unit='K')) electronics_temp = Parameter('temperature of electronics', FloatRange(unit='K')) def read_status(self): status = self.query('PUMP:STATUS', str) if status == 'OK': return IDLE, '' return WARN, status def read_power(self): return self.query('PUMP:SIG:POWR') def read_freq(self): return self.query('PUMP:SIG:SPD') def read_powerstage_temp(self): return self.query('PUMP:SIG:PST') def read_motor_temp(self): return self.query('PUMP:SIG:MT') def read_bearing_temp(self): return self.query('PUMP:SIG:BT') def read_pumpbase_temp(self): return self.query('PUMP:SIG:PBT') def read_electronics_temp(self): return self.query('PUMP:SIG:ET') # class PulseTubeCompressor(MercuryChannel, Writable): # channel_type = 'PTC' # value = Parameter('compressor state', EnumType(closed=0, opened=1)) # target = Parameter('compressor target', EnumType(close=0, open=1)) # water_in_temp = Parameter('temperature of water inlet', FloatRange(unit='K')) # water_out_temp = Parameter('temperature of water outlet', FloatRange(unit='K')) # helium_temp = Parameter('temperature of helium', FloatRange(unit='K')) # helium_low_pressure = Parameter('helium pressure (low side)', FloatRange(unit='mbar')) # helium_high_pressure = Parameter('helium pressure (high side)', FloatRange(unit='mbar')) # motor_current = Parameter('motor current', FloatRange(unit='A')) # # def read_value(self): # return self.query('PTC:SIG:STATE', off_on) # # def write_target(self, value): # return self.change('PTC:SIG:STATE', value, off_on) # # def read_status(self): # # TODO: check possible status values # return self.WARN, self.query('PTC:SIG:STATUS') # # def read_water_in_temp(self): # return self.query('PTC:SIG:WIT') # # def read_water_out_temp(self): # return self.query('PTC:SIG:WOT') # # def read_helium_temp(self): # return self.query('PTC:SIG:HT') # # def read_helium_low_pressure(self): # return self.query('PTC:SIG:HLP') # # def read_helium_high_pressure(self): # return self.query('PTC:SIG:HHP') # # def read_motor_current(self): # return self.query('PTC:SIG:MCUR') class FlowMeter(MercuryChannel, Readable): channel_type = 'FLOW' def read_value(self): return self.query('FLOW:SIG:FLOW') class ScannerChannel: # TODO: excitation, enable # TODO: switch on/off filter, check filter_time = Parameter('filter time', FloatRange(1, 200, unit='sec'), readonly=False) dwell_time = Parameter('dwell time', FloatRange(1, 200, unit='sec'), readonly=False) pause_time = Parameter('pause time', FloatRange(3, 200, unit='sec'), readonly=False) def read_filter_time(self): return self.query('TEMP:FILT:TIME') def write_filter_time(self, value): self.change('TEMP:FILT:WIN', 80) return self.change('TEMP:FILT:TIME', value) def read_dwell_time(self): return self.query('TEMP:MEAS:DWEL') def write_dwell_time(self, value): self.change('TEMP:FILT:WIN', 80) return self.change('TEMP:MEAS:DWEL', value) def read_pause_time(self): return self.query('TEMP:MEAS:PAUS') def write_pause_time(self, value): return self.change('TEMP:MEAS:PAUS', value) class TemperatureSensor(ScannerChannel, mercury.TemperatureSensor): pass class TemperatureLoop(ScannerChannel, mercury.TemperatureLoop): ENABLE = 'TEMP:LOOP:MODE' ENABLE_RAMP = 'TEMP:LOOP:RAMP:ENAB' RAMP_RATE = 'TEMP:LOOP:RAMP:RATE' enable_pid_table = None # remove, does not work on triton def write_control_active(self, value): self.change('SYS:DR:CHAN:MC', 'T5', str) if value: self.change('TEMP:LOOP:FILT:ENAB', 'ON', str) if self.output_module: limit = self.output_module.read_limit() or None # None: max. limit self.output_module.write_limit(limit) return super().write_control_active(value) class HeaterOutput(HasInput, MercuryChannel, Writable): """heater output""" channel_type = 'HTR' value = Parameter('heater output', FloatRange(unit='W')) target = Parameter('heater output', FloatRange(0, unit='$'), readonly=False) resistivity = Parameter('heater resistivity', FloatRange(unit='Ohm')) def read_resistivity(self): return self.query('HTR:RES') def read_value(self): return self.query('HTR:SIG:POWR') * 1e-6 def read_target(self): if self.controlled_by != 0: return Done return self.value def write_target(self, value): self.write_controlled_by(SELF) return self.change('HTR:SIG:POWR', value * 1e6) class HeaterOutputWithRange(HeaterOutput): """heater output with heater range""" channel_type = 'HTR,TEMP' limit = Parameter('max. heater power', FloatRange(unit='W'), readonly=False) def read_limit(self): maxcur = self.query('TEMP:LOOP:RANGE') * 0.001 # mA -> A return self.read_resistivity() * maxcur ** 2 def write_limit(self, value): if value is None: maxcur = 0.1 # max. allowed current 100mA else: maxcur = sqrt(value / self.read_resistivity()) self.change('TEMP:LOOP:RANGE', maxcur * 1000) return self.read_limit()