fixes in mercury an triton

- Valve is now a drivable, as it will check success, and retry,
  which might take around 1 second
+ some more
This commit is contained in:
zolliker 2022-06-14 15:07:17 +02:00
parent 54b58f2188
commit dfbc1c757a
3 changed files with 107 additions and 25 deletions

View File

@ -9,6 +9,7 @@ uri = tcp://5000
class = secop_psi.mercury.IO
description = connection to triton software
uri = tcp://linse-dil5:33576
timeout = 5
[ts]
class = secop_psi.triton.TemperatureLoop
@ -20,7 +21,7 @@ io = triton
[htr_mix]
class = secop_psi.triton.HeaterOutput
description = mix. chamber heater
slot = H1
slot = H1,T5
io = triton
[T_sorb]

View File

@ -318,8 +318,8 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
return volt * current
def read_target(self):
if self.controlled_by != 0 and self.target:
return 0
if self.controlled_by != 0:
return Done
if self._last_target is not None:
return Done
self._volt_target = self.query('HTR:SIG:VOLT')
@ -344,25 +344,28 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
class TemperatureLoop(TemperatureSensor, Loop, Drivable):
channel_type = 'TEMP'
output_module = Attached(HeaterOutput, mandatory=False)
output_module = Attached(HasInput, mandatory=False)
ramp = Parameter('ramp rate', FloatRange(0, unit='K/min'), readonly=False)
enable_ramp = Parameter('enable ramp rate', BoolType(), readonly=False)
setpoint = Parameter('working setpoint (differs from target when ramping)', FloatRange(0, unit='$'))
tolerance = Parameter(default=0.1)
_last_setpoint_change = None
ENABLE = 'TEMP:LOOP:ENAB'
ENABLE_RAMP = 'TEMP:LOOP:RENA'
RAMP_RATE = 'TEMP:LOOP:RSET'
def doPoll(self):
super().doPoll()
self.read_setpoint()
def read_control_active(self):
active = self.query('TEMP:LOOP:ENAB', off_on)
active = self.query(self.ENABLE, off_on)
self.set_output(active)
return active
def write_control_active(self, value):
self.set_output(value)
return self.change('TEMP:LOOP:ENAB', value, off_on)
return self.change(self.ENABLE, value, off_on)
@nopoll # polled by read_setpoint
def read_target(self):
@ -396,10 +399,10 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
return Done
def read_enable_ramp(self):
return self.query('TEMP:LOOP:RENA', off_on)
return self.query(self.ENABLE_RAMP, off_on)
def write_enable_ramp(self, value):
return self.change('TEMP:LOOP:RENA', value, off_on)
return self.change(self.ENABLE_RAMP, value, off_on)
def set_output(self, active):
if active:
@ -413,7 +416,7 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.status = status
def read_ramp(self):
result = self.query('TEMP:LOOP:RSET')
result = self.query(self.RAMP_RATE)
return min(9e99, result)
def write_ramp(self, value):
@ -422,11 +425,11 @@ class TemperatureLoop(TemperatureSensor, Loop, Drivable):
self.write_enable_ramp(0)
return 0
if value >= 9e99:
self.change('TEMP:LOOP:RSET', 'inf', as_string)
self.change(self.RAMP_RATE, 'inf', as_string)
self.write_enable_ramp(0)
return 9e99
self.write_enable_ramp(1)
return self.change('TEMP:LOOP:RSET', max(1e-4, value))
return self.change(self.RAMP_RATE, max(1e-4, value))
class PressureSensor(MercuryChannel, Readable):
@ -542,6 +545,7 @@ class HeLevel(MercuryChannel, Readable):
(when filling) and slow (when consuming). We have to handle this by software.
"""
channel_type = 'LVL'
value = Parameter(unit='%')
sample_rate = Parameter('_', EnumType(slow=0, fast=1), readonly=False)
hysteresis = Parameter('hysteresis for detection of increase', FloatRange(0, 100, unit='%'),
default=5, readonly=False)
@ -595,9 +599,7 @@ class HeLevel(MercuryChannel, Readable):
class N2Level(MercuryChannel, Readable):
channel_type = 'LVL'
value = Parameter(unit='%')
def read_value(self):
return self.query('LVL:SIG:NIT:LEV')
# TODO: magnet power supply

View File

@ -20,15 +20,16 @@
# *****************************************************************************
"""oxford instruments triton (kelvinoxjt dil)"""
from secop.core import Drivable, HasIO, Writable, \
Parameter, Property, Readable, StringIO, Attached, Done, IDLE, WARN, nopoll
from secop.datatypes import EnumType, FloatRange, StringType, StructOf, BoolType
from math import sqrt
from secop.core import Writable, Parameter, Readable, Drivable, IDLE, WARN, BUSY, Done
from secop.errors import HardwareError
from secop.datatypes import EnumType, FloatRange
from secop.lib.enum import Enum
from secop_psi.mercury import MercuryChannel, Mapped, off_on
from secop_psi.mercury import MercuryChannel, Mapped, off_on, HasInput, SELF
import secop_psi.mercury as mercury
actions = Enum(none=0, condense=1, circulate=2, collect=3)
open_close = Mapped(CLOSE=False, OPEN=True)
open_close = Mapped(CLOSE=0, OPEN=1)
actions_map = Mapped(STOP=actions.none, COND=actions.condense, COLL=actions.collect)
actions_map.mapping['NONE'] = actions.none # when writing, STOP is used instead of NONE
@ -36,7 +37,7 @@ actions_map.mapping['NONE'] = actions.none # when writing, STOP is used instead
class Action(MercuryChannel, Writable):
channel_type = 'ACTN'
value = Parameter('running action', EnumType(actions))
target = Parameter('valve target', EnumType(none=0, condense=1, collect=3), readonly=False)
target = Parameter('action to do', EnumType(none=0, condense=1, collect=3), readonly=False)
_target = 0
def read_value(self):
@ -64,16 +65,50 @@ class Action(MercuryChannel, Writable):
# EPCL (empty pre-coll automation)
class Valve(MercuryChannel, Writable):
class Valve(MercuryChannel, Drivable):
channel_type = 'VALV'
value = Parameter('valve state', EnumType(closed=0, opened=1))
target = Parameter('valve target', EnumType(close=0, open=1))
_try_count = None
def doPoll(self):
self.read_status()
def read_value(self):
return self.query('VALV:SIG:STATE', open_close)
def read_status(self):
pos = self.read_value()
if self._try_count is None:
return IDLE, ''
if pos == self.target:
if self._try_count:
# make sure last sent command was not opposite
self.change('VALV:SIG:STATE', self.target, open_close)
self._try_count = None
self.setFastPoll(False)
return IDLE, ''
self._try_count += 1
if self._try_count % 4 == 0:
# send opposite position in order to unblock
self.change('VALV:SIG:STATE', pos, open_close)
return BUSY, 'unblock'
if self._try_count > 9:
# make sure system does not toggle later
self.change('VALV:SIG:STATE', pos, open_close)
return ERROR, 'can not %s valve' % self.target.name
self.change('VALV:SIG:STATE', self.target, open_close)
self._try_count += 1
return BUSY, 'waiting'
def write_target(self, value):
return self.change('VALV:SIG:STATE', value, open_close)
if value != self.read_value():
self._try_count = 0
self.setFastPoll(True, 0.25)
self.change('VALV:SIG:STATE', value, open_close)
self.status = BUSY, self.target.name
return value
class Pump(MercuryChannel, Writable):
@ -208,8 +243,52 @@ class TemperatureSensor(ScannerChannel, mercury.TemperatureSensor):
class TemperatureLoop(ScannerChannel, mercury.TemperatureLoop):
pass
ENABLE = 'TEMP:LOOP:MODE'
ENABLE_RAMP = 'TEMP:LOOP:RAMP:ENAB'
RAMP_RATE = 'TEMP:LOOP:RAMP:RATE'
enable_pid_table = None # remove, does not work on triton
def write_control_active(self, value):
self.change('SYS:DR:CHAN:MC', 'T5', str)
if value:
self.change('TEMP:LOOP:FILT:ENAB', 'ON', str)
if self.output_module:
limit = self.output_module.read_limit() or None # None: max. limit
self.output_module.write_limit(limit)
return super().write_control_active(value)
class HeaterOutput(mercury.HeaterOutput):
pass # not sure if we need special handling of triton heater output: to be checked!
class HeaterOutput(HasInput, MercuryChannel, Readable):
"""heater output"""
channel_type = 'HTR,TEMP'
value = Parameter('heater output', FloatRange(unit='W'))
target = Parameter('heater output', FloatRange(0, unit='$'), readonly=False)
limit = Parameter('max. heater power', FloatRange(unit='W'), readonly=False)
resistivity = Parameter('heater resistivity', FloatRange(unit='Ohm'))
def read_resistivity(self):
return self.query('HTR:RES')
def read_limit(self):
maxcur = self.query('TEMP:LOOP:RANGE') * 0.001 # mA -> A
return self.read_resistivity() * maxcur ** 2
def write_limit(self, value):
if value is None:
maxcur = 0.1 # max. allowed current 100mA
else:
maxcur = sqrt(value / self.read_resistivity())
self.change('TEMP:LOOP:RANGE', maxcur * 1000)
return self.read_limit()
def read_value(self):
return self.query('HTR:SIG:POWR') * 1e-6
def read_target(self):
if self.controlled_by != 0:
return Done
return self.value
def write_target(self, value):
self.write_controlled_by(SELF)
return self.change('HTR:SIG:POWR', value * 1e6)