diff --git a/secop_psi/mercury.py b/secop_psi/mercury.py index 0aebc82..431d721 100644 --- a/secop_psi/mercury.py +++ b/secop_psi/mercury.py @@ -38,6 +38,7 @@ SELF = 0 def as_float(value): + """converts string (with unit) to float and float to string""" if isinstance(value, str): return float(VALUE_UNIT.match(value).group(1)) return '%g' % value @@ -82,7 +83,7 @@ class MercuryChannel(HasIO): return 'DEV:%s:%s%s%s' % (slot, head, sep, tail) return adr - def multiquery(self, adr, names=(), convert=as_float): + def multiquery(self, adr, names=(), convert=as_float, debug=None): """get parameter(s) in mercury syntax :param adr: the 'address part' of the SCPI command @@ -99,26 +100,36 @@ class MercuryChannel(HasIO): self.slot='DB5.P1,DB3.G1' # -> take second slot -> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC """ + # TODO: if the need arises: allow convert to be a list adr = self._complete_adr(adr) cmd = 'READ:%s:%s' % (adr, ':'.join(names)) - reply = self.communicate(cmd) - head = 'STAT:%s:' % adr - try: - assert reply.startswith(head) - replyiter = iter(reply[len(head):].split(':')) - keys, result = zip(*zip(replyiter, replyiter)) - assert keys == tuple(names) - return tuple(convert(r) for r in result) - except (AssertionError, AttributeError, ValueError): - time.sleep(0.1) # in case this was the answer of a previous command - raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from None + msg = '' + for _ in range(3): + if msg: + self.log.warning('%s', msg) + reply = self.communicate(cmd) + if debug is not None: + debug.append(reply) + head = 'STAT:%s:' % adr + try: + assert reply.startswith(head) + replyiter = iter(reply[len(head):].split(':')) + keys, result = zip(*zip(replyiter, replyiter)) + assert keys == tuple(names) + return tuple(convert(r) for r in result) + except (AssertionError, AttributeError, ValueError): + time.sleep(0.1) # in case this was the answer of a previous command + msg = 'invalid reply %r to cmd %r' % (reply, cmd) + else: + raise HardwareError(msg) from None - def multichange(self, adr, values, convert=as_float): + def multichange(self, adr, values, convert=as_float, tolerance=0): """set parameter(s) in mercury syntax :param adr: as in see multiquery method :param values: [(name1, value1), (name2, value2) ...] :param convert: a converter function (converts given value to string and replied string to value) + :param tolerance: tolerance for readback check :return: the values as tuple Example: @@ -128,22 +139,40 @@ class MercuryChannel(HasIO): self.slot='DB6.T1,DB1.H1' # and take first slot -> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0 """ + # TODO: if the need arises: allow convert and or tolerance to be a list adr = self._complete_adr(adr) params = ['%s:%s' % (k, convert(v)) for k, v in values] cmd = 'SET:%s:%s' % (adr, ':'.join(params)) - reply = self.communicate(cmd) - head = 'STAT:SET:%s:' % adr - - try: - assert reply.startswith(head) - replyiter = iter(reply[len(head):].split(':')) - keys, result, valid = zip(*zip(replyiter, replyiter, replyiter)) - assert keys == tuple(k for k, _ in values) - assert any(v == 'VALID' for v in valid) - return tuple(convert(r) for r in result) - except (AssertionError, AttributeError, ValueError) as e: - time.sleep(0.1) # in case of missed replies this might help to skip garbage - raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e + for _ in range(3): # try 3 times or until readback result matches + t = time.time() + reply = self.communicate(cmd) + head = 'STAT:SET:%s:' % adr + try: + assert reply.startswith(head) + replyiter = iter(reply[len(head):].split(':')) + # reshuffle reply=(k1, r1, v1, k2, r2, v1) --> keys = (k1, k2), result = (r1, r2), valid = (v1, v2) + keys, result, valid = zip(*zip(replyiter, replyiter, replyiter)) + assert keys == tuple(k for k, _ in values) + assert any(v == 'VALID' for v in valid) + result = tuple(convert(r) for r in result) + except (AssertionError, AttributeError, ValueError) as e: + time.sleep(0.1) # in case of missed replies this might help to skip garbage + raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e + keys = [v[0] for v in values] + debug = [] + readback = self.multiquery(adr, keys, convert, debug) + for k, r, b in zip(keys, result, readback): + if convert == as_float: + tol = max(abs(r) * 1e-3, abs(b) * 1e-3, tolerance) + if abs(r - b) > tol: + break + elif r != b: + break + else: + return readback + self.log.warning('sent: %s', cmd) + self.log.warning('got: %s', debug[0]) + return readback def query(self, adr, convert=as_float): """query a single parameter @@ -153,9 +182,9 @@ class MercuryChannel(HasIO): adr, _, name = adr.rpartition(':') return self.multiquery(adr, [name], convert)[0] - def change(self, adr, value, convert=as_float): + def change(self, adr, value, convert=as_float, tolerance=0): adr, _, name = adr.rpartition(':') - return self.multichange(adr, [(name, value)], convert)[0] + return self.multichange(adr, [(name, value)], convert, tolerance)[0] class TemperatureSensor(MercuryChannel, Readable):