check readback and try up to 3 times when it does not match

This commit is contained in:
l_samenv 2022-11-15 16:57:00 +01:00
parent 0c93ca8c75
commit df428f4c0c

View File

@ -38,6 +38,7 @@ SELF = 0
def as_float(value):
"""converts string (with unit) to float and float to string"""
if isinstance(value, str):
return float(VALUE_UNIT.match(value).group(1))
return '%g' % value
@ -82,7 +83,7 @@ class MercuryChannel(HasIO):
return 'DEV:%s:%s%s%s' % (slot, head, sep, tail)
return adr
def multiquery(self, adr, names=(), convert=as_float):
def multiquery(self, adr, names=(), convert=as_float, debug=None):
"""get parameter(s) in mercury syntax
:param adr: the 'address part' of the SCPI command
@ -99,26 +100,36 @@ class MercuryChannel(HasIO):
self.slot='DB5.P1,DB3.G1' # -> take second slot
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
"""
# TODO: if the need arises: allow convert to be a list
adr = self._complete_adr(adr)
cmd = 'READ:%s:%s' % (adr, ':'.join(names))
reply = self.communicate(cmd)
head = 'STAT:%s:' % adr
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result = zip(*zip(replyiter, replyiter))
assert keys == tuple(names)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError):
time.sleep(0.1) # in case this was the answer of a previous command
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from None
msg = ''
for _ in range(3):
if msg:
self.log.warning('%s', msg)
reply = self.communicate(cmd)
if debug is not None:
debug.append(reply)
head = 'STAT:%s:' % adr
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result = zip(*zip(replyiter, replyiter))
assert keys == tuple(names)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError):
time.sleep(0.1) # in case this was the answer of a previous command
msg = 'invalid reply %r to cmd %r' % (reply, cmd)
else:
raise HardwareError(msg) from None
def multichange(self, adr, values, convert=as_float):
def multichange(self, adr, values, convert=as_float, tolerance=0):
"""set parameter(s) in mercury syntax
:param adr: as in see multiquery method
:param values: [(name1, value1), (name2, value2) ...]
:param convert: a converter function (converts given value to string and replied string to value)
:param tolerance: tolerance for readback check
:return: the values as tuple
Example:
@ -128,22 +139,40 @@ class MercuryChannel(HasIO):
self.slot='DB6.T1,DB1.H1' # and take first slot
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
"""
# TODO: if the need arises: allow convert and or tolerance to be a list
adr = self._complete_adr(adr)
params = ['%s:%s' % (k, convert(v)) for k, v in values]
cmd = 'SET:%s:%s' % (adr, ':'.join(params))
reply = self.communicate(cmd)
head = 'STAT:SET:%s:' % adr
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert any(v == 'VALID' for v in valid)
return tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
for _ in range(3): # try 3 times or until readback result matches
t = time.time()
reply = self.communicate(cmd)
head = 'STAT:SET:%s:' % adr
try:
assert reply.startswith(head)
replyiter = iter(reply[len(head):].split(':'))
# reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2)
keys, result, valid = zip(*zip(replyiter, replyiter, replyiter))
assert keys == tuple(k for k, _ in values)
assert any(v == 'VALID' for v in valid)
result = tuple(convert(r) for r in result)
except (AssertionError, AttributeError, ValueError) as e:
time.sleep(0.1) # in case of missed replies this might help to skip garbage
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
keys = [v[0] for v in values]
debug = []
readback = self.multiquery(adr, keys, convert, debug)
for k, r, b in zip(keys, result, readback):
if convert == as_float:
tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance)
if abs(r - b) > tol:
break
elif r != b:
break
else:
return readback
self.log.warning('sent: %s', cmd)
self.log.warning('got: %s', debug[0])
return readback
def query(self, adr, convert=as_float):
"""query a single parameter
@ -153,9 +182,9 @@ class MercuryChannel(HasIO):
adr, _, name = adr.rpartition(':')
return self.multiquery(adr, [name], convert)[0]
def change(self, adr, value, convert=as_float):
def change(self, adr, value, convert=as_float, tolerance=0):
adr, _, name = adr.rpartition(':')
return self.multichange(adr, [(name, value)], convert)[0]
return self.multichange(adr, [(name, value)], convert, tolerance)[0]
class TemperatureSensor(MercuryChannel, Readable):