generalized set_using_dict

This commit is contained in:
Erik Frojdh 2020-09-24 11:01:51 +02:00
parent 97fea10ee2
commit 101f029eef
3 changed files with 171 additions and 54 deletions

View File

@ -1267,9 +1267,11 @@ class Detector(CppDetectorApi):
@settingspath.setter
def settingspath(self, path):
self.setSettingsPath(path)
path = ut.make_string_path(path)
ut.set_using_dict(self.setSettingsPath, path)
@property
@element
def status(self):
"""Gets detector status. Enum: runStatus
Note
@ -1279,9 +1281,10 @@ class Detector(CppDetectorApi):
>>> d.status
runStatus.IDLE
"""
return element_if_equal(self.getDetectorStatus())
return self.getDetectorStatus()
@property
@element
def rx_status(self):
"""Gets receiver listener status. Enum: runStatus
Note
@ -1290,21 +1293,23 @@ class Detector(CppDetectorApi):
>>> d.rx_status
runStatus.IDLE
"""
return element_if_equal(self.getReceiverStatus())
return self.getReceiverStatus()
@property
@element
def rx_udpsocksize(self):
"""UDP socket buffer size in receiver. Tune rmem_default and rmem_max accordingly."""
return element_if_equal(self.getRxUDPSocketBufferSize())
return self.getRxUDPSocketBufferSize()
@rx_udpsocksize.setter
def rx_udpsocksize(self, buffer_size):
self.setRxUDPSocketBufferSize(buffer_size)
ut.set_using_dict(self.setRxUDPSocketBufferSize, buffer_size)
@property
@element
def rx_realudpsocksize(self):
"""Gets actual udp socket buffer size. Double the size of rx_udpsocksize due to kernel bookkeeping."""
return element_if_equal(self.getRxRealUDPSocketBufferSize())
return self.getRxRealUDPSocketBufferSize()
@property
def trimbits(self):
@ -1324,7 +1329,7 @@ class Detector(CppDetectorApi):
@trimbits.setter
def trimbits(self, fname):
fname = ut.make_string_path(fname)
self.loadTrimbits(fname)
ut.set_using_dict(self.loadTrimbits, fname)
@property
@element
@ -1336,26 +1341,27 @@ class Detector(CppDetectorApi):
@trimval.setter
def trimval(self, value):
self.setAllTrimbits(value)
ut.set_using_dict(self.setAllTrimbits, value)
@property
@element
def lock(self):
"""Lock detector to one client IP, 1 locks, 0 unlocks. Default is unlocked."""
return element_if_equal(self.getDetectorLock())
return self.getDetectorLock()
@lock.setter
def lock(self, value):
self.setDetectorLock(value)
ut.set_using_dict(self.setDetectorLock, value)
@property
@element
def rx_lock(self):
"""Lock receiver to one client IP, 1 locks, 0 unlocks. Default is unlocked."""
return element_if_equal(self.getRxLock())
return self.getRxLock()
@rx_lock.setter
def rx_lock(self, value):
self.setRxLock(value)
ut.set_using_dict(self.setRxLock, value)
@property
@element
@ -1379,9 +1385,10 @@ class Detector(CppDetectorApi):
ut.set_using_dict(self.setRxZmqStartingFrame, value)
@property
@element
def lastclient(self):
"""Get Client IP Address that last communicated with the detector."""
return element_if_equal(self.getLastClientIP())
return self.getLastClientIP()
@property
def reg(self):
@ -1501,13 +1508,14 @@ class Detector(CppDetectorApi):
return self.getMeasurementTime()
@property
@element
def led(self):
"""[Ctb] Switches on/off all LEDs. Default is enabled. """
return element_if_equal(self.getLEDEnable())
return self.getLEDEnable()
@led.setter
def led(self, value):
self.setLEDEnable(value)
ut.set_using_dict(self.setLEDEnable, value)
def acquire(self):
"""
@ -1592,6 +1600,7 @@ class Detector(CppDetectorApi):
self.setRateCorrection(tau)
@property
@element
def speed(self):
"""
[Eiger][Jungfrau] Readout speed of chip. Enum: speedLevel
@ -1605,7 +1614,7 @@ class Detector(CppDetectorApi):
@speed.setter
def speed(self, value):
self.setSpeed(value)
ut.set_using_dict(self.setSpeed, value)
@property
def rx_jsonpara(self):
@ -1653,11 +1662,7 @@ class Detector(CppDetectorApi):
ut.set_using_dict(self.setAdditionalJsonHeader, args)
@property
def rx_frameindex(self):
"""Current frame index received in receiver during acquisition"""
return self.getRxCurrentFrameIndex()
@property
@element
def threshold(self):
"""[Eiger] Threshold in eV
Note
@ -1665,13 +1670,14 @@ class Detector(CppDetectorApi):
To change settings as well or set threshold without trimbits, use setThresholdEnergy.
:setter: It loads trim files from settingspath.
"""
return element_if_equal(self.getThresholdEnergy())
return self.getThresholdEnergy()
@threshold.setter
def threshold(self, eV):
self.setThresholdEnergy(eV)
ut.set_using_dict(self.setThresholdEnergy, eV)
@property
@element
def timing(self):
"""
Set Timing Mode of detector. Enum: timingMode
@ -1682,13 +1688,14 @@ class Detector(CppDetectorApi):
[Mythen3] AUTO_TIMING, TRIGGER_EXPOSURE, GATED, TRIGGER_GATED \n
[Eiger] AUTO_TIMING, TRIGGER_EXPOSURE, GATED, BURST_TRIGGER
"""
return element_if_equal(self.getTimingMode())
return self.getTimingMode()
@timing.setter
def timing(self, mode):
self.setTimingMode(mode)
ut.set_using_dict(self.setTimingMode, mode)
@property
@element
def trimen(self):
"""
[Eiger] List of trim energies, where corresponding default trim files exist in corresponding trim folders.
@ -1700,13 +1707,14 @@ class Detector(CppDetectorApi):
>>> d.trimen
[4500, 5400, 6400]
"""
return element_if_equal(self.getTrimEnergies())
return self.getTrimEnergies()
@trimen.setter
def trimen(self, energies):
self.setTrimEnergies(energies)
ut.set_using_dict(self.setTrimEnergies, energies)
@property
@element
def vthreshold(self):
"""
[Eiger][Mythen3] Detector threshold voltage for single photon counters in dac units.
@ -1715,13 +1723,19 @@ class Detector(CppDetectorApi):
[Eiger] Sets vcmp_ll, vcmp_lr, vcmp_rl, vcmp_rr and vcp to the same value. \n
[Mythen3] Sets vth1, vth2 and vth3 to the same value.
"""
return element_if_equal(self.getDAC(dacIndex.VTHRESHOLD, False))
return self.getDAC(dacIndex.VTHRESHOLD)
@vthreshold.setter
def vthreshold(self, value):
self.setDAC(dacIndex.VTHRESHOLD, value, False)
if isinstance(value, dict):
args = ({k:(dacIndex.VTHRESHOLD,v) for k,v in value.items()},)
else:
args = (dacIndex.VTHRESHOLD, value)
ut.set_using_dict(self.setDAC, *args)
@property
@element
def type(self):
""" Returns detector type. Enum: detectorType
Note
@ -1729,7 +1743,7 @@ class Detector(CppDetectorApi):
:setter: Not implemented
Values: EIGER, JUNGFRAU, GOTTHARD, MOENCH, MYTHEN3, GOTTHARD2, CHIPTESTBOARD
"""
return element_if_equal(self.getDetectorType())
return self.getDetectorType()
@property
def rx_frameindex(self):

View File

@ -3,9 +3,8 @@ Utility functions that are useful for testing and troubleshooting
but not directly used in controlling the detector
"""
from collections import namedtuple
import _slsdet #C++ lib
import _slsdet #C++ lib
import functools
import datetime as dt
import pathlib
@ -14,18 +13,21 @@ from pathlib import Path
Geometry = namedtuple('Geometry', ['x', 'y'])
def is_iterable(item):
try:
try:
iter(item)
except TypeError:
return False
return True
def get_set_bits(mask):
"""
Return a list of the set bits in a python integer
"""
return [i for i in range(mask.bit_length()) if (mask>>i)&1]
return [i for i in range(mask.bit_length()) if (mask >> i) & 1]
def list_to_bitmask(values):
"""
@ -33,17 +35,19 @@ def list_to_bitmask(values):
where the list indicates
"""
mask = int(0)
values = list(set(values)) #Remove duplicates
values = list(set(values)) #Remove duplicates
for v in values:
mask += 1 << v
return mask
def to_geo(value):
if isinstance(value, _slsdet.xy):
return Geometry(x = value.x, y = value.y)
return Geometry(x=value.x, y=value.y)
else:
raise ValueError("Can only convert slsdet.xy")
def all_equal(mylist):
"""If all elements are equal return true otherwise false"""
return all(x == mylist[0] for x in mylist)
@ -53,7 +57,7 @@ def element_if_equal(mylist):
"""If all elements are equal return only one element"""
if not is_iterable(mylist):
return mylist
if all_equal(mylist):
if len(mylist) == 0:
return None
@ -62,6 +66,7 @@ def element_if_equal(mylist):
else:
return mylist
def reduce_time(mylist):
res = element_if_equal(element_if_equal(mylist))
if isinstance(res, dt.timedelta):
@ -71,6 +76,7 @@ def reduce_time(mylist):
else:
return [r.total_seconds() for r in res]
def element(func):
"""
Wrapper to return either list or element
@ -78,6 +84,7 @@ def element(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
return element_if_equal(func(self, *args, **kwargs))
return wrapper
@ -90,14 +97,16 @@ def eiger_register_to_time(register):
"""
clocks = register >> 3
exponent = register & 0b111
return clocks*10**exponent / 100e6
return clocks * 10**exponent / 100e6
def make_timedelta(t):
if isinstance(t, dt.timedelta):
return t
return t
else:
return dt.timedelta(seconds=t)
def _make_string_path(path):
"""
Accepts either a pathlib.Path or a string, expands ~ to user and convert
@ -110,26 +119,28 @@ def _make_string_path(path):
else:
raise ValueError("Cannot convert argument to posix path")
def make_string_path(path):
if isinstance(path, dict):
return {key:_make_string_path(value) for key,value in path.items()}
else:
return _make_string_path(path)
return _make(path, _make_string_path)
def make_ip(arg):
return _make(arg, _slsdet.IpAddr)
def make_mac(arg):
return _make(arg, _slsdet.MacAddr)
def make_path(arg):
return _make(arg, Path)
def _make(arg, transform):
"""Helper function for make_mac and make_ip special cases for
dict, list and tuple. Otherwise just calls transform"""
if isinstance(arg, dict):
return {key:transform(value) for key,value in arg.items()}
return {key: transform(value) for key, value in arg.items()}
elif isinstance(arg, list):
return [transform(a) for a in arg]
elif isinstance(arg, tuple):
@ -137,15 +148,21 @@ def _make(arg, transform):
else:
return transform(arg)
def set_using_dict(func, args):
if isinstance(args, dict) and all(isinstance(k, int) for k in args.keys()):
for key, value in args.items():
def set_using_dict(func, *args):
if len(args) == 1 and isinstance(args[0], dict) and all(
isinstance(k, int) for k in args[0].keys()):
for key, value in args[0].items():
if not isinstance(value, tuple):
value = (value,)
try:
func(value, [key])
func(*value, [key])
except TypeError:
func(value, key)
func(*value, key)
else:
func(args)
func(*args)
def set_time_using_dict(func, args):
if isinstance(args, dict) and all(isinstance(k, int) for k in args.keys()):
@ -155,11 +172,13 @@ def set_time_using_dict(func, args):
func(value, [key])
else:
if isinstance(args, int):
args = float(args)
args = float(args)
func(args)
def lhex(iterable):
return [hex(item) for item in iterable]
def lpath(iterable):
return [Path(item) for item in iterable]

View File

@ -167,10 +167,12 @@ def test_make_ip_from_list():
arg = ["192.168.1.1", "192.168.1.2", "127.0.0.1"]
assert make_ip(arg) == [IpAddr(a) for a in arg]
def test_make_ip_from_tuple():
arg = ("127.0.0.1")
assert make_ip(arg) == (IpAddr(arg))
def test_make_mac_from_dict():
arg = {6: "84:a9:aa:24:32:88", 12: "84:a9:3e:24:32:aa"}
res = make_mac(arg)
@ -198,10 +200,92 @@ def test_make_mac_from_tuple():
assert make_mac(arg) == (MacAddr("84:a9:aa:24:32:88"),
MacAddr("84:a9:3e:24:32:aa"))
def test_make_path_from_str():
assert make_path("/") == Path("/")
assert make_path("/home") == Path("/home")
def test_make_path_from_list():
arg = ["/", "/home", "/another/path"]
assert make_path(arg) == [Path(p) for p in arg]
assert make_path(arg) == [Path(p) for p in arg]
def test_make_string_path_from_str():
arg = "/path/to/something"
assert make_string_path(arg) == arg
def test_make_string_path_from_Path():
s = "/path/to/something"
arg = Path(s)
assert make_string_path(arg) == s
def test_make_string_path_from_list_of_Path_and_string():
args = ["/path/to", Path("/something/something"), "else/"]
assert make_string_path(args) == [
"/path/to", "/something/something", "else/"
]
def test_make_string_path_from_Path_list():
s = "/path/to/something"
arg = [Path(s)]
assert make_string_path(arg) == [s]
def test_make_string_path_from_dict():
args = {0: "/path/to/something", 1: Path("/something/else")}
assert make_string_path(args) == {
0: "/path/to/something",
1: "/something/else"
}
class DummyClass:
def __init__(self):
self.args = []
def call(self, *args):
"""Normal type call in slsdet where list of detectors is passed"""
self.args.append(args)
def call_int_id(self, *args):
"""call where det_is is an int"""
*args, i = args
if isinstance(i, list):
raise TypeError
self.args.append((*args, i))
def test_set_using_dict_single_int():
c = DummyClass()
set_using_dict(c.call, 5)
assert c.args == [(5,)]
def test_set_using_dict_two_ints():
c = DummyClass()
set_using_dict(c.call, 1, 2)
assert c.args == [(1,2)]
def test_set_using_dict_passing_dict():
c = DummyClass()
set_using_dict(c.call, {0: 5, 8:3, 9:7})
assert len(c.args) == 3
assert c.args == [(5, [0]), (3, [8]), (7, [9])]
def test_set_using_dict_calling_int_id():
c = DummyClass()
set_using_dict(c.call_int_id, {0: "hej", 8:3, 9:7})
assert len(c.args) == 3
assert c.args == [("hej", 0), (3, 8), (7, 9)]
def test_set_using_dict_pass_multiple_args():
c = DummyClass()
set_using_dict(c.call, "a", "b", "c")
assert len(c.args) == 1
assert c.args == [("a", "b", "c")]
def test_set_using_dict_passing_dict_with_multiple_args():
c = DummyClass()
set_using_dict(c.call, {0: ("a", "b"), 1: ("c", "d")})
assert c.args == [("a", "b", [0]), ("c", "d", [1])]