more intuitive and doc

This commit is contained in:
2026-06-16 11:41:31 +02:00
parent cf91156e70
commit 1cb8f5ce5d
5 changed files with 61 additions and 50 deletions
+2 -2
View File
@@ -2431,9 +2431,9 @@ class Detector(CppDetectorApi):
@property
def udp_datastream(self):
"""
[Eiger] Enable or disable UDP data streaming from the left and/or right detector ports in 10 GbE mode. Options: LEFT, RIGHT. Both ports are enabled by default.
[Eiger] Enable or disable UDP data streaming from the left and/or right detector ports in 10 GbE mode. Both ports are enabled by default. Options: LEFT, RIGHT.
[Jungfrau][Moench] Enable or disable UDP data streaming from the top and/or bottom receiver interfaces. This option is available only when numinterfaces is set to 2. Options: TOP, BOTTOM. Both interfaces are enabled by default.
[Jungfrau][Moench] Enable or disable UDP data streaming from the top and/or bottom receiver interfaces. This option is available only when numinterfaces is set to 2. Both ports are enabled by default. Options: TOP, BOTTOM.
Enum: portPosition
"""
+1 -1
View File
@@ -60,7 +60,7 @@ def to_geo(value):
def all_equal(mylist):
"""If all elements are equal return true otherwise false"""
return all(x == mylist[0] for x in mylist)
return all(x == list(mylist[0]) for x in mylist)
def element_if_equal(mylist):
+51 -41
View File
@@ -11,9 +11,8 @@ from utils_for_test import (
LogLevel,
)
from slsdet import Detector
from slsdet._slsdet import slsDetectorDefs
from slsdet.utils import all_equal, element_if_equal
detectorType = slsDetectorDefs.detectorType
@@ -931,61 +930,72 @@ def test_udp_datastream(session_simulator, request):
from slsdet import portPosition
if det_type in ['eiger']:
prev = d.udp_datastream
assert all(isinstance(v, bool) for v in prev.values())
ports = [portPosition.LEFT, portPosition.RIGHT]
prev = [d.getUDPDataStream(i) for i in ports]
# ensure all equal for each value in prev
assert all_equal(prev)
prev_val = [element_if_equal(v) for v in prev]
#list
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.LEFT, [True, False]) #list
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.TOP, True) # invalid port position
d.udp_datastream = (ports[0], [True, False])
# invalid port position
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.BOTTOM, True) # invalid port position
d.udp_datastream = (portPosition.TOP, True)
with pytest.raises(Exception) as exc_info:
d.udp_datastream = True # without port position
d.udp_datastream = (portPosition.BOTTOM, True)
d.udp_datastream = (portPosition.LEFT, False)
assert d.udp_datastream[portPosition.LEFT] is False
d.udp_datastream = (portPosition.RIGHT, False)
assert d.udp_datastream[portPosition.RIGHT] is False
d.udp_datastream = (portPosition.LEFT, True)
assert d.udp_datastream[portPosition.LEFT] is True
d.udp_datastream = (portPosition.RIGHT, True)
assert d.udp_datastream[portPosition.RIGHT] is True
# without port position
with pytest.raises(Exception) as exc_info:
d.udp_datastream = True
d.setUDPDataStream(portPosition.LEFT, prev[portPosition.LEFT])
d.setUDPDataStream(portPosition.RIGHT, prev[portPosition.RIGHT])
d.udp_datastream = (ports[0], False)
assert d.udp_datastream[ports[0]] is False
d.udp_datastream = (ports[1], False)
assert d.udp_datastream[ports[1]] is False
d.udp_datastream = (ports[0], True)
assert d.udp_datastream[ports[0]] is True
d.udp_datastream = (ports[1], True)
assert d.udp_datastream[ports[1]] is True
d.setUDPDataStream(ports[0], element_if_equal(prev[0]))
d.setUDPDataStream(ports[1], element_if_equal(prev[1]))
elif det_type in ['jungfrau', 'moench'] and num_interfaces == 2:
prev = d.udp_datastream
assert all(isinstance(v, bool) for v in prev.values())
ports = [portPosition.TOP, portPosition.BOTTOM]
prev = [d.getUDPDataStream(i) for i in ports]
# ensure all equal for each value in prev
assert all_equal(prev)
prev_val = [element_if_equal(v) for v in prev]
#list
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.TOP, [True, False]) #list
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.LEFT, True) # invalid port position
d.udp_datastream = (ports[0], [True, False])
# invalid port position
with pytest.raises(Exception) as exc_info:
d.udp_datastream = (portPosition.RIGHT, True) # invalid port position
d.udp_datastream = (portPosition.LEFT, True)
with pytest.raises(Exception) as exc_info:
d.udp_datastream = True # without port position
d.udp_datastream = (portPosition.RIGHT, True)
d.udp_datastream = (portPosition.TOP, False)
assert d.udp_datastream[portPosition.TOP] is False
d.udp_datastream = (portPosition.BOTTOM, False)
assert d.udp_datastream[portPosition.BOTTOM] is False
d.udp_datastream = (portPosition.TOP, True)
assert d.udp_datastream[portPosition.TOP] is True
d.udp_datastream = (portPosition.BOTTOM, True)
assert d.udp_datastream[portPosition.BOTTOM] is True
# without port position
with pytest.raises(Exception) as exc_info:
d.udp_datastream = True
d.setUDPDataStream(portPosition.TOP, prev[portPosition.TOP])
d.setUDPDataStream(portPosition.BOTTOM, prev[portPosition.BOTTOM])
d.udp_datastream = (ports[0], False)
assert d.udp_datastream[ports[0]] is False
d.udp_datastream = (ports[1], False)
assert d.udp_datastream[ports[1]] is False
d.udp_datastream = (ports[0], True)
assert d.udp_datastream[ports[0]] is True
d.udp_datastream = (ports[1], True)
assert d.udp_datastream[ports[1]] is True
d.setUDPDataStream(ports[0], element_if_equal(prev[0]))
d.setUDPDataStream(ports[1], element_if_equal(prev[1]))
else:
with pytest.raises(Exception) as exc_info: