Merge branch 'wip' of gitlab.psi.ch:samenv/frappy into wip

This commit is contained in:
l_samenv 2022-11-21 14:38:40 +01:00
commit d845fedc03
4 changed files with 118 additions and 68 deletions

View File

@ -1,16 +1,19 @@
[NODE]
description = MB11 standard sample stick
id = mb11.stick.sea.psi.ch
# DO NOT USE
# use 'mb11std' instead of 'mb11', 'mb11stick'
# as the communication proxy for itc does not work yet
[NODE]
description = MB11 standard sample stick (do not use)
id = mb11.stick.sea.psi.ch
[INTERFACE]
uri = tcp://5000
[itc]
class = secop.proxy.Proxy
remote_class = secop_psi.mercury.IO
description = connection to MB11 mercury
module = itc1
#[itc]
#class = secop.proxy.Proxy
#remote_class = secop_psi.mercury.IO
#description = connection to MB11 mercury
#module = itc1
#uri = mb11-ts:3001
#timeout = 5
@ -25,19 +28,17 @@ module = itc1
#calib = /home/l_samenv/sea/tcl/calcurves/X70197.340
#svalue.unit = K
#[ts]
#class = secop.proxy.Proxy
#remote_class = secop_psi.mercury.TemperatureLoop
#description = sample temperature
#module = T_sample
#io = itc1
#[htr_ts]
#class = secop.proxy.Proxy
#remote_class = secop_psi.mercury.HeaterOutput
#description = sample stick heater power
#module = htr_sample
#io = itc1
[ts]
class = secop_psi.mercury.TemperatureLoop
description = sample temperature
output_module = htr_ts
slot = MB1.T1
io = itc
tolerance = 1
[htr_ts]
class = secop_psi.mercury.HeaterOutput
description = sample stick heater power
slot = MB0.H1
io = itc

View File

@ -38,6 +38,7 @@ SELF = 0
def as_float(value):
"""converts string (with unit) to float and float to string"""
if isinstance(value, str):
return float(VALUE_UNIT.match(value).group(1))
return '%g' % value
@ -82,7 +83,7 @@ class MercuryChannel(HasIO):
return 'DEV:%s:%s%s%s' % (slot, head, sep, tail)
return adr
def multiquery(self, adr, names=(), convert=as_float):
def multiquery(self, adr, names=(), convert=as_float, debug=None):
"""get parameter(s) in mercury syntax
:param adr: the 'address part' of the SCPI command
@ -99,9 +100,16 @@ class MercuryChannel(HasIO):
self.slot='DB5.P1,DB3.G1' # -> take second slot
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
"""
# TODO: if the need arises: allow convert to be a list
adr = self._complete_adr(adr)
cmd = 'READ:%s:%s' % (adr, ':'.join(names))
msg = ''
for _ in range(3):
if msg:
self.log.warning('%s', msg)
reply = self.communicate(cmd)
if debug is not None:
debug.append(reply)
head = 'STAT:%s:' % adr
try:
assert reply.startswith(head)
@ -111,14 +119,17 @@ class MercuryChannel(HasIO):
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError):
time.sleep(0.1) # in case this was the answer of a previous command
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from None
msg = 'invalid reply %r to cmd %r' % (reply, cmd)
else:
raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float):
def multichange(self, adr, values, convert=as_float, tolerance=0):
"""set parameter(s) in mercury syntax
:param adr: as in see multiquery method
:param values: [(name1, value1), (name2, value2) ...]
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:return: the values as tuple
Example:
@ -128,22 +139,40 @@ class MercuryChannel(HasIO):
self.slot='DB6.T1,DB1.H1' # and take first slot
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
"""
# TODO: if the need arises: allow convert and or tolerance to be a list
adr = self._complete_adr(adr)
params = ['%s:%s' % (k, convert(v)) for k, v in values]
cmd = 'SET:%s:%s' % (adr, ':'.join(params))
for _ in range(3): # try 3 times or until readback result matches
t = time.time()
reply = self.communicate(cmd)
head = 'STAT:SET:%s:' % adr
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
# reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2)
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert any(v == 'VALID' for v in valid)
return tuple(convert(r) for r in result)
result = tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
keys = [v[0] for v in values]
debug = []
readback = self.multiquery(adr, keys, convert, debug)
for k, r, b in zip(keys, result, readback):
if convert == as_float:
tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance)
if abs(r - b) > tol:
break
elif r != b:
break
else:
return readback
self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0])
return readback
def query(self, adr, convert=as_float):
"""query a single parameter
@ -153,9 +182,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float):
def change(self, adr, value, convert=as_float, tolerance=0):
adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert)[0]
return self.multichange(adr, [(name, value)], convert, tolerance)[0]
class TemperatureSensor(MercuryChannel, Readable):

View File

@ -533,11 +533,14 @@ class SeaModule(Module):
if key == 'target':
kwds['readonly'] = False
prev = cls.accessibles[key]
if key == 'status':
# special case: status from sea is a string, not the status tuple
pobj = prev.copy()
else:
pobj = Parameter(**kwds)
merged_properties = prev.propertyValues.copy()
pobj.updateProperties(merged_properties)
pobj.merge(merged_properties)
datatype = kwds.get('datatype', cls.accessibles[key].datatype)
else:
pobj = Parameter(**kwds)
datatype = pobj.datatype

View File

@ -20,11 +20,13 @@
# *****************************************************************************
"""oxford instruments triton (kelvinoxjt dil)"""
from math import sqrt
from secop.core import Writable, Parameter, Readable, Drivable, IDLE, WARN, BUSY, ERROR, Done
from secop.datatypes import EnumType, FloatRange
from math import sqrt, log10
from secop.core import Writable, Parameter, Readable, Drivable, IDLE, WARN, BUSY, ERROR, \
Done, Property
from secop.datatypes import EnumType, FloatRange, StringType
from secop.lib.enum import Enum
from secop_psi.mercury import MercuryChannel, Mapped, off_on, HasInput, SELF
from secop.lib import clamp
import secop_psi.mercury as mercury
actions = Enum(none=0, condense=1, circulate=2, collect=3)
@ -35,6 +37,7 @@ actions_map.mapping['NONE'] = actions.none # when writing, STOP is used instead
class Action(MercuryChannel, Writable):
channel_type = 'ACTN'
cooldown_channel = Property('cool down channel', StringType(), 'T5')
value = Parameter('running action', EnumType(actions))
target = Parameter('action to do', EnumType(none=0, condense=1, collect=3), readonly=False)
_target = 0
@ -47,6 +50,7 @@ class Action(MercuryChannel, Writable):
def write_target(self, value):
self._target = value
self.change('SYS:DR:CHAN:COOL', self.cooldown_channel, str)
return self.change('SYS:DR:ACTN', value, actions_map)
# actions:
@ -245,10 +249,14 @@ class TemperatureLoop(ScannerChannel, mercury.TemperatureLoop):
ENABLE = 'TEMP:LOOP:MODE'
ENABLE_RAMP = 'TEMP:LOOP:RAMP:ENAB'
RAMP_RATE = 'TEMP:LOOP:RAMP:RATE'
enable_pid_table = None # remove, does not work on triton
ctrlpars = Parameter('pid (gain, integral (inv. time), differential time')
system_channel = Property('system channel name', StringType(), 'MC')
def write_control_active(self, value):
self.change('SYS:DR:CHAN:MC', 'T5', str)
if self.system_channel:
self.change('SYS:DR:CHAN:%s' % self.system_channel, self.slot.split(',')[0], str)
if value:
self.change('TEMP:LOOP:FILT:ENAB', 'ON', str)
if self.output_module:
@ -260,7 +268,7 @@ class TemperatureLoop(ScannerChannel, mercury.TemperatureLoop):
class HeaterOutput(HasInput, MercuryChannel, Writable):
"""heater output"""
channel_type = 'HTR'
value = Parameter('heater output', FloatRange(unit='W'))
value = Parameter('heater output', FloatRange(unit='uW'))
target = Parameter('heater output', FloatRange(0, unit='$'), readonly=False)
resistivity = Parameter('heater resistivity', FloatRange(unit='Ohm'))
@ -268,7 +276,7 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
return self.query('HTR:RES')
def read_value(self):
return self.query('HTR:SIG:POWR') * 1e-6
return round(self.query('HTR:SIG:POWR'), 3)
def read_target(self):
if self.controlled_by != 0:
@ -277,24 +285,33 @@ class HeaterOutput(HasInput, MercuryChannel, Writable):
def write_target(self, value):
self.write_controlled_by(SELF)
return self.change('HTR:SIG:POWR', value * 1e6)
if self.resistivity:
# round to the next voltage step
value = round(sqrt(value * self.resistivity)) ** 2 / self.resistivity
return round(self.change('HTR:SIG:POWR', value), 3)
class HeaterOutputWithRange(HeaterOutput):
"""heater output with heater range"""
channel_type = 'HTR,TEMP'
limit = Parameter('max. heater power', FloatRange(unit='W'), readonly=False)
limit = Parameter('max. heater power', FloatRange(unit='uW'), readonly=False)
def read_limit(self):
maxcur = self.query('TEMP:LOOP:RANGE') * 0.001 # mA -> A
return self.read_resistivity() * maxcur ** 2
maxcur = self.query('TEMP:LOOP:RANGE') # mA
return self.read_resistivity() * maxcur ** 2 # uW
def write_limit(self, value):
if value is None:
maxcur = 0.1 # max. allowed current 100mA
maxcur = 100 # max. allowed current 100mA
else:
maxcur = sqrt(value / self.read_resistivity())
self.change('TEMP:LOOP:RANGE', maxcur * 1000)
for cur in 0.0316, 0.1, 0.316, 1, 3.16, 10, 31.6, 100:
if cur > maxcur * 0.999:
maxcur = cur
break
else:
maxcur = cur
self.change('TEMP:LOOP:RANGE', maxcur)
return self.read_limit()