diff --git a/secop_psi/k2601b.py b/secop_psi/k2601b.py index d82a36a..926f359 100644 --- a/secop_psi/k2601b.py +++ b/secop_psi/k2601b.py @@ -41,7 +41,7 @@ SOURCECMDS = { ' smua.source.output = 0 print("ok")', 1: 'reset()' ' smua.source.func = smua.OUTPUT_DCAMPS' - ' display.smua.measure.func = display.MEASURE_VOLTS' + ' display.smua.measure.func = display.MEASURE_DCVOLTS' ' smua.source.autorangei = 1' ' smua.source.output = 1 print("ok")', 2: 'reset()' @@ -65,11 +65,11 @@ class SourceMeter(HasIO, Module): return float(self.communicate('print((smua.source.func+1)*smua.source.output)')) def write_mode(self, value): + assert self.communicate(SOURCECMDS[value]) == 'ok' if value == 'current': self.write_vlimit(self.vlimit) elif value == 'voltage': self.write_ilimit(self.ilimit) - assert self.communicate(SOURCECMDS[value]) == 'ok' return self.read_mode() def read_ilimit(self): @@ -118,6 +118,7 @@ class Current(HasIO, Writable): limit = Parameter('current limit', FloatRange(0, 2.0, unit='A'), default=2) def initModule(self): + super().initModule() self.sourcemeter.registerCallbacks(self) def read_value(self): @@ -129,9 +130,9 @@ class Current(HasIO, Writable): def write_target(self, value): if value > self.sourcemeter.ilimit: raise ValueError('current exceeds limit') - value = float(self.communicate('smua.source.leveli = %g print(smua.source.leveli)' % value)) if not self.active: self.sourcemeter.write_mode('current') # triggers update_mode -> set active to True + value = float(self.communicate('smua.source.leveli = %g print(smua.source.leveli)' % value)) return value def read_limit(self): @@ -163,6 +164,7 @@ class Voltage(HasIO, Writable): limit = Parameter('voltage limit', FloatRange(0, 2.0, unit='V'), default=2) def initModule(self): + super().initModule() self.sourcemeter.registerCallbacks(self) def read_value(self): @@ -174,9 +176,9 @@ class Voltage(HasIO, Writable): def write_target(self, value): if value > self.sourcemeter.vlimit: raise ValueError('voltage exceeds limit') - value = float(self.communicate('smua.source.levelv = %g print(smua.source.levelv)' % value)) if not self.active: self.sourcemeter.write_mode('voltage') # triggers update_mode -> set active to True + value = float(self.communicate('smua.source.levelv = %g print(smua.source.levelv)' % value)) return value def read_limit(self): diff --git a/secop_psi/triton.py b/secop_psi/triton.py new file mode 100644 index 0000000..93a0e48 --- /dev/null +++ b/secop_psi/triton.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""oxford instruments triton (kelvinoxjt dil)""" + +from secop.core import Drivable, HasIO, Writable, \ + Parameter, Property, Readable, StringIO, Attached, Done, IDLE, nopoll +from secop.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType +from secop_psi.mercury import MercuryChannel, Mapped, off_on +import secop_psi.mercury as mercury + + +opened_closed = Mapped(opened=False, closed=True) +open_close = Mapped(open=False, clos=True) + + +class DilState(MercuryChannel): + pass + # states: + # CLDN (cool down) + # PCL (precool automation) + # COND (condense mixture) + # PCOND (pause pre-cool (not condense?) automation + # RCOND (resume pre-cool (not condense?) automation + # WARM (warm-up) + # COLL (collect mixture) + # EPCL (empty pre-coll automation) + # STOP + # + + +class Valve(MercuryChannel, Writable): + channel_type = 'VALV' + value = Parameter('valve state', EnumType(closed=0, opened=1)) + target = Parameter('valve target', EnumType(close=0, open=1)) + + def read_value(self): + return self.query('VALV:SIG:STATE', opened_closed) + + def write_target(self, value): + return self.change('VALV:SIG:STATE', value, open_close) + + +class Pump(MercuryChannel, Writable): + channel_type = 'PUMP' + value = Parameter('pump state', EnumType(closed=0, opened=1)) + target = Parameter('pump target', EnumType(close=0, open=1)) + power = Parameter('pump power', FloatRange(unit='W')) + freq = Parameter('pump frequency', FloatRange(unit='Hz')) + powerstage_temp = Parameter('temperature of power stage', FloatRange(unit='K')) + motor_temp = Parameter('temperature of motor', FloatRange(unit='K')) + bearing_temp = Parameter('temperature of bearing', FloatRange(unit='K')) + pumpbase_temp = Parameter('temperature of pump base', FloatRange(unit='K')) + electronics_temp = Parameter('temperature of electronics', FloatRange(unit='K')) + + def read_value(self): + return self.query('PUMP:SIG:STATE', off_on) + + def write_target(self, value): + return self.change('PUMP:SIG:STATE', value, off_on) + + def read_status(self): + # TODO: check possible status values + return self.WARN, self.query('PUMP:SIG:STATUS') + + def read_power(self): + return self.query('PUMP:SIG:POWR') + + def read_freq(self): + return self.query('PUMP:SIG:SPD') + + def read_powerstage_temp(self): + return self.query('PUMP:SIG:PST') + + def read_motor_temp(self): + return self.query('PUMP:SIG:MT') + + def read_bearing_temp(self): + return self.query('PUMP:SIG:BT') + + def read_pumpbase_temp(self): + return self.query('PUMP:SIG:PBT') + + def read_electronics_temp(self): + return self.query('PUMP:SIG:ET') + + +class PulseTubeCompressor(MercuryChannel, Writable): + channel_type = 'PTC' + value = Parameter('compressor state', EnumType(closed=0, opened=1)) + target = Parameter('compressor target', EnumType(close=0, open=1)) + water_in_temp = Parameter('temperature of water inlet', FloatRange(unit='K')) + water_out_temp = Parameter('temperature of water outlet', FloatRange(unit='K')) + helium_temp = Parameter('temperature of helium', FloatRange(unit='K')) + helium_low_pressure = Parameter('helium pressure (low side)', FloatRange(unit='mbar')) + helium_high_pressure = Parameter('helium pressure (high side)', FloatRange(unit='mbar')) + motor_current = Parameter('motor current', FloatRange(unit='A')) + + def read_value(self): + return self.query('PTC:SIG:STATE', off_on) + + def write_target(self, value): + return self.change('PTC:SIG:STATE', value, off_on) + + def read_status(self): + # TODO: check possible status values + return self.WARN, self.query('PTC:SIG:STATUS') + + def read_water_in_temp(self): + return self.query('PTC:SIG:WIT') + + def read_water_out_temp(self): + return self.query('PTC:SIG:WOT') + + def read_helium_temp(self): + return self.query('PTC:SIG:HT') + + def read_helium_low_pressure(self): + return self.query('PTC:SIG:HLP') + + def read_helium_high_pressure(self): + return self.query('PTC:SIG:HHP') + + def read_motor_current(self): + return self.query('PTC:SIG:MCUR') + + +class FlowMeter(MercuryChannel, Readable): + channel_type = 'FLOW' + + def read_value(self): + return self.query('FLOW:SIG:FLOW') + + +class TemperatureSensor(mercury.TemperatureSensor): + # TODO: excitation, enable + filter_time = Parameter('filter time', FloatRange(1, 200, unit='sec'), readonly=False) + dwell_time = Parameter('dwell time', FloatRange(1, 200, unit='sec'), readonly=False) + pause_time = Parameter('pause time', FloatRange(3, 200, unit='sec'), readonly=False) + + def read_filter_time(self): + return self.query('TEMP:FILT:TIME') + + def write_filter_time(self, value): + self.change('TEMP:FILT:WIN', 80) + return self.change('TEMP:FILT:TIME', value) + + def read_dwell_time(self): + return self.query('TEMP:FILT:DWEL') + + def write_dwell_time(self, value): + self.change('TEMP:FILT:WIN', 80) + return self.change('TEMP:FILT:DWEL', value) + + def read_pause_time(self): + return self.query('TEMP:FILT:PAUS') + + def write_pause_time(self, value): + self.change('TEMP:FILT:WIN', 80) + return self.change('TEMP:FILT:PAUS', value) + + +class TemperatureLoop(mercury.TemperatureLoop): + pass # TODO: switch on/off filter, check