frappy/secop_psi/triton.py
Markus Zolliker 5951312d40 fixes to make pylint happy
Change-Id: I95baf4e585603a640d4ec71076a4d509082775ed
2022-06-14 15:24:42 +02:00

306 lines
11 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments triton (kelvinoxjt dil)"""
from math import sqrt
from secop.core import Writable, Parameter, Readable, Drivable, IDLE, WARN, BUSY, ERROR, Done
from secop.datatypes import EnumType, FloatRange
from secop.lib.enum import Enum
from secop_psi.mercury import MercuryChannel, Mapped, off_on, HasInput, SELF
import secop_psi.mercury as mercury
actions = Enum(none=0, condense=1, circulate=2, collect=3)
open_close = Mapped(CLOSE=0, OPEN=1)
actions_map = Mapped(STOP=actions.none, COND=actions.condense, COLL=actions.collect)
actions_map.mapping['NONE'] = actions.none # when writing, STOP is used instead of NONE
class Action(MercuryChannel, Writable):
channel_type = 'ACTN'
value = Parameter('running action', EnumType(actions))
target = Parameter('action to do', EnumType(none=0, condense=1, collect=3), readonly=False)
_target = 0
def read_value(self):
return self.query('SYS:DR:ACTN', actions_map)
def read_target(self):
return self._target
def write_target(self, value):
self._target = value
return self.change('SYS:DR:ACTN', value, actions_map)
# actions:
# NONE (no action)
# COND (condense mixture)
# COLL (collect mixture)
# STOP (go to NONE)
#
# not yet used (would need a subclass of Action):
# CLDN (cool down)
# PCL (precool automation)
# PCOND (pause pre-cool (not condense?) automation)
# RCOND (resume pre-cool (not condense?) automation)
# WARM (warm-up)
# EPCL (empty pre-cool automation)
class Valve(MercuryChannel, Drivable):
channel_type = 'VALV'
value = Parameter('valve state', EnumType(closed=0, opened=1))
target = Parameter('valve target', EnumType(close=0, open=1))
_try_count = None
def doPoll(self):
self.read_status()
def read_value(self):
pos = self.query('VALV:SIG:STATE', open_close)
if pos == self.target:
self.status = IDLE, ''
self._try_count = 0
self.setFastPoll(False)
elif self._try_count <= 7: # odd number: last try is previous position
# toggle new/previous position until success or too many tries
self.change('VALV:SIG:STATE', pos if self._try_count % 2 else self.target, open_close)
self._try_count += 1
self.status = BUSY, 'opening' if self.target else 'closing'
else:
self.status = ERROR, 'can not %s valve' % self.target.name
return pos
def read_status(self):
pos = self.read_value()
if self._try_count is None:
return IDLE, ''
if pos == self.target:
if self._try_count:
# make sure last sent command was not opposite
self.change('VALV:SIG:STATE', self.target, open_close)
self._try_count = None
self.setFastPoll(False)
return IDLE, ''
self._try_count += 1
if self._try_count % 4 == 0:
# send opposite position in order to unblock
self.change('VALV:SIG:STATE', pos, open_close)
return BUSY, 'unblock'
if self._try_count > 9:
# make sure system does not toggle later
self.change('VALV:SIG:STATE', pos, open_close)
return ERROR, 'can not %s valve' % self.target.name
self.change('VALV:SIG:STATE', self.target, open_close)
self._try_count += 1
return BUSY, 'waiting'
def write_target(self, value):
if value != self.read_value():
self._try_count = 0
self.setFastPoll(True, 0.25)
self.change('VALV:SIG:STATE', value, open_close)
self.status = BUSY, self.target.name
return value
class Pump(MercuryChannel, Writable):
channel_type = 'PUMP'
value = Parameter('pump state', EnumType(off=0, on=1))
target = Parameter('pump target', EnumType(off=0, on=1))
def read_value(self):
return self.query('PUMP:SIG:STATE', off_on)
def write_target(self, value):
return self.change('PUMP:SIG:STATE', value, off_on)
def read_status(self):
return IDLE, ''
class TurboPump(Pump):
power = Parameter('pump power', FloatRange(unit='W'))
freq = Parameter('pump frequency', FloatRange(unit='Hz'))
powerstage_temp = Parameter('temperature of power stage', FloatRange(unit='K'))
motor_temp = Parameter('temperature of motor', FloatRange(unit='K'))
bearing_temp = Parameter('temperature of bearing', FloatRange(unit='K'))
pumpbase_temp = Parameter('temperature of pump base', FloatRange(unit='K'))
electronics_temp = Parameter('temperature of electronics', FloatRange(unit='K'))
def read_status(self):
status = self.query('PUMP:STATUS', str)
if status == 'OK':
return IDLE, ''
return WARN, status
def read_power(self):
return self.query('PUMP:SIG:POWR')
def read_freq(self):
return self.query('PUMP:SIG:SPD')
def read_powerstage_temp(self):
return self.query('PUMP:SIG:PST')
def read_motor_temp(self):
return self.query('PUMP:SIG:MT')
def read_bearing_temp(self):
return self.query('PUMP:SIG:BT')
def read_pumpbase_temp(self):
return self.query('PUMP:SIG:PBT')
def read_electronics_temp(self):
return self.query('PUMP:SIG:ET')
# class PulseTubeCompressor(MercuryChannel, Writable):
# channel_type = 'PTC'
# value = Parameter('compressor state', EnumType(closed=0, opened=1))
# target = Parameter('compressor target', EnumType(close=0, open=1))
# water_in_temp = Parameter('temperature of water inlet', FloatRange(unit='K'))
# water_out_temp = Parameter('temperature of water outlet', FloatRange(unit='K'))
# helium_temp = Parameter('temperature of helium', FloatRange(unit='K'))
# helium_low_pressure = Parameter('helium pressure (low side)', FloatRange(unit='mbar'))
# helium_high_pressure = Parameter('helium pressure (high side)', FloatRange(unit='mbar'))
# motor_current = Parameter('motor current', FloatRange(unit='A'))
#
# def read_value(self):
# return self.query('PTC:SIG:STATE', off_on)
#
# def write_target(self, value):
# return self.change('PTC:SIG:STATE', value, off_on)
#
# def read_status(self):
# # TODO: check possible status values
# return self.WARN, self.query('PTC:SIG:STATUS')
#
# def read_water_in_temp(self):
# return self.query('PTC:SIG:WIT')
#
# def read_water_out_temp(self):
# return self.query('PTC:SIG:WOT')
#
# def read_helium_temp(self):
# return self.query('PTC:SIG:HT')
#
# def read_helium_low_pressure(self):
# return self.query('PTC:SIG:HLP')
#
# def read_helium_high_pressure(self):
# return self.query('PTC:SIG:HHP')
#
# def read_motor_current(self):
# return self.query('PTC:SIG:MCUR')
class FlowMeter(MercuryChannel, Readable):
channel_type = 'FLOW'
def read_value(self):
return self.query('FLOW:SIG:FLOW')
class ScannerChannel:
# TODO: excitation, enable
# TODO: switch on/off filter, check
filter_time = Parameter('filter time', FloatRange(1, 200, unit='sec'), readonly=False)
dwell_time = Parameter('dwell time', FloatRange(1, 200, unit='sec'), readonly=False)
pause_time = Parameter('pause time', FloatRange(3, 200, unit='sec'), readonly=False)
def read_filter_time(self):
return self.query('TEMP:FILT:TIME')
def write_filter_time(self, value):
self.change('TEMP:FILT:WIN', 80)
return self.change('TEMP:FILT:TIME', value)
def read_dwell_time(self):
return self.query('TEMP:MEAS:DWEL')
def write_dwell_time(self, value):
self.change('TEMP:FILT:WIN', 80)
return self.change('TEMP:MEAS:DWEL', value)
def read_pause_time(self):
return self.query('TEMP:MEAS:PAUS')
def write_pause_time(self, value):
return self.change('TEMP:MEAS:PAUS', value)
class TemperatureSensor(ScannerChannel, mercury.TemperatureSensor):
pass
class TemperatureLoop(ScannerChannel, mercury.TemperatureLoop):
ENABLE = 'TEMP:LOOP:MODE'
ENABLE_RAMP = 'TEMP:LOOP:RAMP:ENAB'
RAMP_RATE = 'TEMP:LOOP:RAMP:RATE'
enable_pid_table = None # remove, does not work on triton
def write_control_active(self, value):
self.change('SYS:DR:CHAN:MC', 'T5', str)
if value:
self.change('TEMP:LOOP:FILT:ENAB', 'ON', str)
if self.output_module:
limit = self.output_module.read_limit() or None # None: max. limit
self.output_module.write_limit(limit)
return super().write_control_active(value)
class HeaterOutput(HasInput, MercuryChannel, Readable):
"""heater output"""
channel_type = 'HTR,TEMP'
value = Parameter('heater output', FloatRange(unit='W'))
target = Parameter('heater output', FloatRange(0, unit='$'), readonly=False)
limit = Parameter('max. heater power', FloatRange(unit='W'), readonly=False)
resistivity = Parameter('heater resistivity', FloatRange(unit='Ohm'))
def read_resistivity(self):
return self.query('HTR:RES')
def read_limit(self):
maxcur = self.query('TEMP:LOOP:RANGE') * 0.001 # mA -> A
return self.read_resistivity() * maxcur ** 2
def write_limit(self, value):
if value is None:
maxcur = 0.1 # max. allowed current 100mA
else:
maxcur = sqrt(value / self.read_resistivity())
self.change('TEMP:LOOP:RANGE', maxcur * 1000)
return self.read_limit()
def read_value(self):
return self.query('HTR:SIG:POWR') * 1e-6
def read_target(self):
if self.controlled_by != 0:
return Done
return self.value
def write_target(self, value):
self.write_controlled_by(SELF)
return self.change('HTR:SIG:POWR', value * 1e6)