Convert formatting automatically to f-strings
Automatically convert formatting with the following call: flynt -ll 2000 -v frappy* Result: 303/381 auto-converted. Failing conversions will be looked at manually in a follow-up commit. Change-Id: Icd996b27221202faccc15af78e0380cf52ee37f2 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30900 Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Georg Brandl <g.brandl@fz-juelich.de> Reviewed-by: Alexander Zaft <a.zaft@fz-juelich.de>
This commit is contained in:
@@ -84,9 +84,9 @@ class Capacitance(HasIO, Readable):
|
||||
return self.voltage
|
||||
|
||||
def write_freq(self, value):
|
||||
self.value = self.parse_reply(self.communicate('FR %g;SI' % value))
|
||||
self.value = self.parse_reply(self.communicate(f'FR {value:g};SI'))
|
||||
return self.freq
|
||||
|
||||
def write_voltage(self, value):
|
||||
self.value = self.parse_reply(self.communicate('V %g;SI' % value))
|
||||
self.value = self.parse_reply(self.communicate(f'V {value:g};SI'))
|
||||
return self.voltage
|
||||
|
||||
@@ -84,13 +84,13 @@ class HeLevel(HasIO, Readable):
|
||||
return self.query('hem')
|
||||
|
||||
def write_empty_length(self, value):
|
||||
return self.query('hem=%g' % value)
|
||||
return self.query(f'hem={value:g}')
|
||||
|
||||
def read_full_length(self):
|
||||
return self.query('hfu')
|
||||
|
||||
def write_full_length(self, value):
|
||||
return self.query('hfu=%g' % value)
|
||||
return self.query(f'hfu={value:g}')
|
||||
|
||||
def read_sample_rate(self):
|
||||
return self.query('hf')
|
||||
|
||||
@@ -145,7 +145,7 @@ class ChannelSwitcher(Drivable):
|
||||
|
||||
def write_target(self, channel):
|
||||
if channel not in self._channels:
|
||||
raise ValueError('%r is no valid channel' % channel)
|
||||
raise ValueError(f'{channel!r} is no valid channel')
|
||||
if channel == self.target and self._channels[channel].enabled:
|
||||
return channel
|
||||
chan = self._channels[channel]
|
||||
|
||||
@@ -31,11 +31,12 @@ def make_cvt_list(dt, tail=''):
|
||||
tail is a postfix to be appended in case of tuples and structs
|
||||
"""
|
||||
if isinstance(dt, (EnumType, IntRange, BoolType)):
|
||||
return[(int, tail, dict(type='NUM'))]
|
||||
return[(int, tail, {type: 'NUM'})]
|
||||
if isinstance(dt, (FloatRange, ScaledInteger)):
|
||||
return [(dt.import_value, tail, dict(type='NUM', unit=dt.unit, period=5) if dt.unit else {})]
|
||||
return [(dt.import_value, tail,
|
||||
{'type': 'NUM', 'unit': dt.unit, 'period': 5} if dt.unit else {})]
|
||||
if isinstance(dt, StringType):
|
||||
return [(lambda x: x, tail, dict(type='STR'))]
|
||||
return [(lambda x: x, tail, {'type': 'STR'})]
|
||||
if isinstance(dt, TupleOf):
|
||||
items = enumerate(dt.members)
|
||||
elif isinstance(dt, StructOf):
|
||||
@@ -44,7 +45,7 @@ def make_cvt_list(dt, tail=''):
|
||||
return [] # ArrayType, BlobType and TextType are ignored: too much data, probably not used
|
||||
result = []
|
||||
for subkey, elmtype in items:
|
||||
for fun, tail_, opts in make_cvt_list(elmtype, '%s.%s' % (tail, subkey)):
|
||||
for fun, tail_, opts in make_cvt_list(elmtype, f'{tail}.{subkey}'):
|
||||
result.append((lambda v, k=subkey, f=fun: f(v[k]), tail_, opts))
|
||||
return result
|
||||
|
||||
@@ -122,7 +123,7 @@ class FrappyHistoryWriter(frappyhistory.FrappyWriter):
|
||||
def send_reply(self, msg):
|
||||
action, ident, value = msg
|
||||
if not action.endswith('update'):
|
||||
print('unknown async message %r' % msg)
|
||||
print(f'unknown async message {msg!r}')
|
||||
return
|
||||
now = self._init_time or time.time() # on initialisation, use the same timestamp for all
|
||||
if action == 'update':
|
||||
|
||||
@@ -78,7 +78,7 @@ class SourceMeter(HasIO, Module):
|
||||
def write_ilimit(self, value):
|
||||
if self.mode == 'current':
|
||||
return self.ilimit
|
||||
return float(self.communicate('smua.source.limiti = %g print(smua.source.limiti)' % value))
|
||||
return float(self.communicate(f'smua.source.limiti = {value:g} print(smua.source.limiti)'))
|
||||
|
||||
def read_vlimit(self):
|
||||
if self.mode == 'voltage':
|
||||
@@ -88,7 +88,7 @@ class SourceMeter(HasIO, Module):
|
||||
def write_vlimit(self, value):
|
||||
if self.mode == 'voltage':
|
||||
return self.ilimit
|
||||
return float(self.communicate('smua.source.limitv = %g print(smua.source.limitv)' % value))
|
||||
return float(self.communicate(f'smua.source.limitv = {value:g} print(smua.source.limitv)'))
|
||||
|
||||
|
||||
class Power(HasIO, Readable):
|
||||
@@ -130,7 +130,7 @@ class Current(HasIO, Writable):
|
||||
raise ValueError('current exceeds limit')
|
||||
if not self.active:
|
||||
self.sourcemeter.write_mode('current') # triggers update_mode -> set active to True
|
||||
value = float(self.communicate('smua.source.leveli = %g print(smua.source.leveli)' % value))
|
||||
value = float(self.communicate(f'smua.source.leveli = {value:g} print(smua.source.leveli)'))
|
||||
return value
|
||||
|
||||
def read_limit(self):
|
||||
@@ -176,7 +176,7 @@ class Voltage(HasIO, Writable):
|
||||
raise ValueError('voltage exceeds limit')
|
||||
if not self.active:
|
||||
self.sourcemeter.write_mode('voltage') # triggers update_mode -> set active to True
|
||||
value = float(self.communicate('smua.source.levelv = %g print(smua.source.levelv)' % value))
|
||||
value = float(self.communicate(f'smua.source.levelv = {value:g} print(smua.source.levelv)'))
|
||||
return value
|
||||
|
||||
def read_limit(self):
|
||||
|
||||
@@ -40,7 +40,7 @@ SELF = 0
|
||||
def as_float(value):
|
||||
if isinstance(value, str):
|
||||
return float(VALUE_UNIT.match(value).group(1))
|
||||
return '%g' % value
|
||||
return f'{value:g}'
|
||||
|
||||
|
||||
def as_string(value):
|
||||
@@ -78,9 +78,9 @@ class MercuryChannel(HasIO):
|
||||
head, sep, tail = adr.partition(':')
|
||||
for i, (channel_type, slot) in enumerate(zip(self.channel_type.split(','), self.slot.split(','))):
|
||||
if head == str(i):
|
||||
return 'DEV:%s:%s%s%s' % (slot, channel_type, sep, tail)
|
||||
return f'DEV:{slot}:{channel_type}{sep}{tail}'
|
||||
if head == channel_type:
|
||||
return 'DEV:%s:%s%s%s' % (slot, head, sep, tail)
|
||||
return f'DEV:{slot}:{head}{sep}{tail}'
|
||||
return adr
|
||||
|
||||
def multiquery(self, adr, names=(), convert=as_float):
|
||||
@@ -101,9 +101,9 @@ class MercuryChannel(HasIO):
|
||||
-> query command will be READ:DEV:DB3.G1:PRES:SIG:PERC
|
||||
"""
|
||||
adr = self._complete_adr(adr)
|
||||
cmd = 'READ:%s:%s' % (adr, ':'.join(names))
|
||||
cmd = f"READ:{adr}:{':'.join(names)}"
|
||||
reply = self.communicate(cmd)
|
||||
head = 'STAT:%s:' % adr
|
||||
head = f'STAT:{adr}:'
|
||||
try:
|
||||
assert reply.startswith(head)
|
||||
replyiter = iter(reply[len(head):].split(':'))
|
||||
@@ -111,7 +111,7 @@ class MercuryChannel(HasIO):
|
||||
assert keys == tuple(names)
|
||||
return tuple(convert(r) for r in result)
|
||||
except (AssertionError, AttributeError, ValueError):
|
||||
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from None
|
||||
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from None
|
||||
|
||||
def multichange(self, adr, values, convert=as_float):
|
||||
"""set parameter(s) in mercury syntax
|
||||
@@ -129,10 +129,10 @@ class MercuryChannel(HasIO):
|
||||
-> change command will be SET:DEV:DB6.T1:TEMP:LOOP:P:5:I:2:D:0
|
||||
"""
|
||||
adr = self._complete_adr(adr)
|
||||
params = ['%s:%s' % (k, convert(v)) for k, v in values]
|
||||
cmd = 'SET:%s:%s' % (adr, ':'.join(params))
|
||||
params = [f'{k}:{convert(v)}' for k, v in values]
|
||||
cmd = f"SET:{adr}:{':'.join(params)}"
|
||||
reply = self.communicate(cmd)
|
||||
head = 'STAT:SET:%s:' % adr
|
||||
head = f'STAT:SET:{adr}:'
|
||||
|
||||
try:
|
||||
assert reply.startswith(head)
|
||||
@@ -142,7 +142,7 @@ class MercuryChannel(HasIO):
|
||||
assert any(v == 'VALID' for v in valid)
|
||||
return tuple(convert(r) for r in result)
|
||||
except (AssertionError, AttributeError, ValueError) as e:
|
||||
raise HardwareError('invalid reply %r to cmd %r' % (reply, cmd)) from e
|
||||
raise HardwareError(f'invalid reply {reply!r} to cmd {cmd!r}') from e
|
||||
|
||||
def query(self, adr, convert=as_float):
|
||||
"""query a single parameter
|
||||
|
||||
@@ -72,7 +72,7 @@ class MotorValve(PersistentMixin, Drivable):
|
||||
|
||||
def write_target(self, target):
|
||||
if self.status[0] == ERROR:
|
||||
raise HardwareError('%s: need refrun' % self.status[1])
|
||||
raise HardwareError(f'{self.status[1]}: need refrun')
|
||||
self.target = target
|
||||
self._state.start(self.goto_target, count=3)
|
||||
return self.target
|
||||
|
||||
@@ -80,7 +80,7 @@ class Main(Communicator):
|
||||
def communicate(self, command):
|
||||
"""GPIB command"""
|
||||
with self.lock:
|
||||
self.comLog('> %s' % command)
|
||||
self.comLog(f'> {command}')
|
||||
reply = self._ppms_device.send(command)
|
||||
self.comLog("< %s", reply)
|
||||
return reply
|
||||
@@ -151,7 +151,7 @@ class PpmsBase(HasIO, Readable):
|
||||
"""write command and check if result is OK"""
|
||||
reply = self.communicate(command)
|
||||
if reply != 'OK':
|
||||
raise HardwareError('bad reply %r to command %r' % (reply, command))
|
||||
raise HardwareError(f'bad reply {reply!r} to command {command!r}')
|
||||
|
||||
|
||||
class PpmsDrivable(Drivable, PpmsBase):
|
||||
@@ -447,12 +447,12 @@ class Temp(PpmsDrivable):
|
||||
if status[0] == StatusType.IDLE:
|
||||
status = (status[0], 'stopped')
|
||||
else:
|
||||
status = (status[0], 'stopping (%s)' % status[1])
|
||||
status = (status[0], f'stopping ({status[1]})')
|
||||
if self._expected_target_time:
|
||||
# handle timeout
|
||||
if self.isDriving(status):
|
||||
if now > self._expected_target_time + self.timeout:
|
||||
status = (StatusType.WARN, 'timeout while %s' % status[1])
|
||||
status = (StatusType.WARN, f'timeout while {status[1]}')
|
||||
else:
|
||||
self._expected_target_time = 0
|
||||
self.status = status
|
||||
@@ -492,7 +492,7 @@ class Temp(PpmsDrivable):
|
||||
if newtarget != self.target:
|
||||
self.log.debug('stop at %s K', newtarget)
|
||||
self.write_target(newtarget)
|
||||
self.status = self.status[0], 'stopping (%s)' % self.status[1]
|
||||
self.status = self.status[0], f'stopping ({self.status[1]})'
|
||||
self._stopped = True
|
||||
|
||||
|
||||
@@ -577,7 +577,7 @@ class Field(PpmsDrivable):
|
||||
if status[0] == StatusType.IDLE:
|
||||
status = (status[0], 'stopped')
|
||||
else:
|
||||
status = (status[0], 'stopping (%s)' % status[1])
|
||||
status = (status[0], f'stopping ({status[1]})')
|
||||
self.status = status
|
||||
|
||||
def write_target(self, target):
|
||||
@@ -620,7 +620,7 @@ class Field(PpmsDrivable):
|
||||
if newtarget != self.target:
|
||||
self.log.debug('stop at %s T', newtarget)
|
||||
self.write_target(newtarget)
|
||||
self.status = (self.status[0], 'stopping (%s)' % self.status[1])
|
||||
self.status = (self.status[0], f'stopping ({self.status[1]})')
|
||||
self._stopped = True
|
||||
|
||||
|
||||
@@ -698,7 +698,7 @@ class Position(PpmsDrivable):
|
||||
if status[0] == StatusType.IDLE:
|
||||
status = (status[0], 'stopped')
|
||||
else:
|
||||
status = (status[0], 'stopping (%s)' % status[1])
|
||||
status = (status[0], f'stopping ({status[1]})')
|
||||
self.status = status
|
||||
|
||||
def write_target(self, target):
|
||||
@@ -722,5 +722,5 @@ class Position(PpmsDrivable):
|
||||
if newtarget != self.target:
|
||||
self.log.debug('stop at %s T', newtarget)
|
||||
self.write_target(newtarget)
|
||||
self.status = (self.status[0], 'stopping (%s)' % self.status[1])
|
||||
self.status = (self.status[0], f'stopping ({self.status[1]})')
|
||||
self._stopped = True
|
||||
|
||||
@@ -48,7 +48,7 @@ class NamedList:
|
||||
return setattr(self, self.__keys__[index], value)
|
||||
|
||||
def __repr__(self):
|
||||
return ",".join("%.7g" % val for val in self.aslist())
|
||||
return ",".join(f"{val:.7g}" for val in self.aslist())
|
||||
|
||||
|
||||
class PpmsSim:
|
||||
@@ -198,10 +198,10 @@ class PpmsSim:
|
||||
|
||||
def getdat(self, mask):
|
||||
mask = int(mask) & 0x8000ff # all channels up to i2 plus ts
|
||||
output = ['%d' % mask, '%.2f' % (time.time() - self.start)]
|
||||
output = ['%d' % mask, f'{time.time() - self.start:.2f}']
|
||||
for i, chan in self.CHANNELS.items():
|
||||
if (1 << i) & mask:
|
||||
output.append("%.7g" % getattr(self, chan))
|
||||
output.append(f"{getattr(self, chan):.7g}")
|
||||
return ",".join(output)
|
||||
|
||||
|
||||
@@ -219,12 +219,12 @@ class QDevice:
|
||||
name, args = command.split('?')
|
||||
name += args.strip()
|
||||
result = getattr(self.sim, name.lower()).aslist()
|
||||
result = ",".join("%.7g" % arg for arg in result)
|
||||
result = ",".join(f"{arg:.7g}" for arg in result)
|
||||
# print(command, '/', result)
|
||||
else:
|
||||
# print(command)
|
||||
name, args = command.split()
|
||||
args = json.loads("[%s]" % args)
|
||||
args = json.loads(f"[{args}]")
|
||||
if name.startswith('BRIDGE') or name.startswith('DRVOUT'):
|
||||
name = name + str(int(args[0]))
|
||||
getattr(self.sim, name.lower()).setvalues(args)
|
||||
|
||||
@@ -121,7 +121,7 @@ class CalCurve:
|
||||
# then try adding all kinds as extension
|
||||
for nam in calibname, calibname.upper(), calibname.lower():
|
||||
for kind in KINDS:
|
||||
filename = join(path.strip(), '%s.%s' % (nam, kind))
|
||||
filename = join(path.strip(), f'{nam}.{kind}')
|
||||
if exists(filename):
|
||||
break
|
||||
else:
|
||||
@@ -148,7 +148,7 @@ class CalCurve:
|
||||
for line in f:
|
||||
parser.parse(line)
|
||||
except Exception as e:
|
||||
raise ValueError('calib curve %s: %s' % (calibspec, e)) from e
|
||||
raise ValueError(f'calib curve {calibspec}: {e}') from e
|
||||
self.convert_x = nplog if parser.logx else linear
|
||||
self.convert_y = npexp if parser.logy else linear
|
||||
x = np.asarray(parser.xdata)
|
||||
@@ -157,11 +157,11 @@ class CalCurve:
|
||||
x = np.flip(x)
|
||||
y = np.flip(y)
|
||||
elif np.any(x[:-1] >= x[1:]): # some not increasing
|
||||
raise ValueError('calib curve %s is not monotonic' % calibspec)
|
||||
raise ValueError(f'calib curve {calibspec} is not monotonic')
|
||||
try:
|
||||
self.spline = splrep(x, y, s=0, k=min(3, len(x) - 1))
|
||||
except (ValueError, TypeError) as e:
|
||||
raise ValueError('invalid calib curve %s' % calibspec) from e
|
||||
raise ValueError(f'invalid calib curve {calibspec}') from e
|
||||
|
||||
def __call__(self, value):
|
||||
"""convert value
|
||||
@@ -194,7 +194,7 @@ class Sensor(Readable):
|
||||
self.rawsensor.registerCallbacks(self, ['status']) # auto update status
|
||||
self._calib = CalCurve(self.calib)
|
||||
if self.description == '_':
|
||||
self.description = '%r calibrated with curve %r' % (self.rawsensor, self.calib)
|
||||
self.description = f'{self.rawsensor!r} calibrated with curve {self.calib!r}'
|
||||
|
||||
def doPoll(self):
|
||||
self.read_status()
|
||||
|
||||
@@ -334,9 +334,9 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
return IDLE, ''
|
||||
if self.auto_reset:
|
||||
self._need_reset = True
|
||||
return IDLE, 'stalled: %s' % reason
|
||||
return IDLE, f'stalled: {reason}'
|
||||
self.log.error('out of tolerance by %.3g (%s)', diff, reason)
|
||||
return ERROR, 'out of tolerance (%s)' % reason
|
||||
return ERROR, f'out of tolerance ({reason})'
|
||||
|
||||
def write_target(self, target):
|
||||
for _ in range(2): # for auto reset
|
||||
@@ -345,8 +345,7 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
abs(target - self.encoder) > self.move_limit + self.tolerance):
|
||||
# pylint: disable=bad-string-format-type
|
||||
# pylint wrongly does not recognise encoder as a descriptor
|
||||
raise RangeError('can not move more than %g deg (%g -> %g)' %
|
||||
(self.move_limit, self.encoder, target))
|
||||
raise RangeError(f'can not move more than {self.move_limit:g} deg ({self.encoder:g} -> {target:g})')
|
||||
diff = self.encoder - self.steppos
|
||||
if self._need_reset:
|
||||
if self.auto_reset:
|
||||
@@ -359,7 +358,7 @@ class Motor(PersistentMixin, HasIO, Drivable):
|
||||
if self.status[0] == IDLE:
|
||||
continue
|
||||
raise HardwareError('auto reset failed')
|
||||
raise HardwareError('need reset (%s)' % self.status[1])
|
||||
raise HardwareError(f'need reset ({self.status[1]})')
|
||||
break
|
||||
if abs(diff) > self.tolerance:
|
||||
if abs(diff) > self.encoder_tolerance and self.has_encoder:
|
||||
|
||||
Reference in New Issue
Block a user