Merge pull request #1339 from slsdetectorgroup/dev/python_cmd_for_ROI
All checks were successful
Build on RHEL9 / build (push) Successful in 4m30s
Build on RHEL8 / build (push) Successful in 5m46s

Dev/python cmd for roi
This commit is contained in:
2025-11-26 15:29:40 +01:00
committed by GitHub
11 changed files with 324 additions and 120 deletions

View File

@@ -19,7 +19,7 @@ jobs:
with: with:
python-version: 3.12 python-version: 3.12
cache: 'pip' cache: 'pip'
- run: pip install pytest numpy - run: pip install pytest numpy colorama
- uses: awalsh128/cache-apt-pkgs-action@latest - uses: awalsh128/cache-apt-pkgs-action@latest
with: with:

View File

@@ -24,6 +24,7 @@ import datetime as dt
from functools import wraps from functools import wraps
from collections import namedtuple from collections import namedtuple
from collections.abc import Sequence
import socket import socket
import numpy as np import numpy as np
@@ -301,6 +302,46 @@ class Detector(CppDetectorApi):
def rx_arping(self, value): def rx_arping(self, value):
ut.set_using_dict(self.setRxArping, value) ut.set_using_dict(self.setRxArping, value)
@property
def rx_roi(self):
"""Gets the list of ROIs configured in the receiver.
Note
-----
Each ROI is represented as a tuple of (x_start, y_start, x_end, y_end). \n
If no ROIs are configured, returns [[-1,-1,-1,-1]].
"""
return self.getRxROI() #vector of Roi structs how represented?
@rx_roi.setter
def rx_roi(self, rois):
"""
Sets the list of ROIs in the receiver.
Can only set multiple ROIs at multi module level without gap pixels. If more than 1 ROI per
UDP port, it will throw. Setting number of udp interfaces will clear the
roi. Cannot be set for CTB or Xilinx CTB.
Note
-----
Each ROI should be represented as a sequence of 4 ints (x_start, y_start, x_end, y_end). \n
For mythen3 or gotthard2 pass a sequence of 2 ints (x_start, x_end) \n
For multiple ROI's pass a sequence of sequence \n
Example: [[0, 100, 50, 100], [260, 270, 50,100]] \n
"""
# TODO: maybe better to accept py::object in setRxROI and handle there?
if not isinstance(rois, Sequence):
raise TypeError(
"setRxROI failed: expected a tuple/list of ints x_min, x_max, y_min, y_max "
"or a sequence of such."
)
if(not isinstance(rois[0], Sequence)):
self.setRxROI([rois])
else:
self.setRxROI(rois)
def rx_clearroi(self):
"""Clears all the ROIs configured in the receiver."""
self.clearRxROI()
@property @property
@element @element

View File

@@ -942,6 +942,7 @@ void init_det(py::module &m) {
(void (Detector::*)(const std::vector<defs::ROI> &)) & (void (Detector::*)(const std::vector<defs::ROI> &)) &
Detector::setRxROI, Detector::setRxROI,
py::arg()); py::arg());
CppDetectorApi.def("clearRxROI", CppDetectorApi.def("clearRxROI",
(void (Detector::*)()) & Detector::clearRxROI); (void (Detector::*)()) & Detector::clearRxROI);
CppDetectorApi.def( CppDetectorApi.def(

View File

@@ -1,11 +1,12 @@
// SPDX-License-Identifier: LGPL-3.0-or-other // SPDX-License-Identifier: LGPL-3.0-or-other
// Copyright (C) 2021 Contributors to the SLS Detector Package // Copyright (C) 2021 Contributors to the SLS Detector Package
#pragma once #pragma once
#include <pybind11/pybind11.h>
#include <datetime.h> #include <datetime.h>
#include <pybind11/pybind11.h>
#include "sls/Result.h"
#include "DurationWrapper.h" #include "DurationWrapper.h"
#include "sls/Result.h"
#include "sls/sls_detector_defs.h"
namespace py = pybind11; namespace py = pybind11;
namespace pybind11 { namespace pybind11 {
@@ -14,84 +15,130 @@ template <typename Type, typename Alloc>
struct type_caster<sls::Result<Type, Alloc>> struct type_caster<sls::Result<Type, Alloc>>
: list_caster<sls::Result<Type, Alloc>, Type> {}; : list_caster<sls::Result<Type, Alloc>, Type> {};
// Based on the typecaster in pybind11/chrono.h // Based on the typecaster in pybind11/chrono.h
template <> struct type_caster<std::chrono::nanoseconds> { template <> struct type_caster<std::chrono::nanoseconds> {
public: public:
PYBIND11_TYPE_CASTER(std::chrono::nanoseconds, const_name("DurationWrapper")); PYBIND11_TYPE_CASTER(std::chrono::nanoseconds,
const_name("DurationWrapper"));
// signed 25 bits required by the standard. // signed 25 bits required by the standard.
using days = std::chrono::duration<int_least32_t, std::ratio<86400>>; using days = std::chrono::duration<int_least32_t, std::ratio<86400>>;
/** /**
* Conversion part 1 (Python->C++): convert a PyObject into std::chrono::nanoseconds * Conversion part 1 (Python->C++): convert a PyObject into
* try datetime.timedelta, floats and our DurationWrapper wrapper * std::chrono::nanoseconds try datetime.timedelta, floats and our
*/ * DurationWrapper wrapper
*/
bool load(handle src, bool) { bool load(handle src, bool) {
using namespace std::chrono; using namespace std::chrono;
// Lazy initialise the PyDateTime import // Lazy initialise the PyDateTime import
if (!PyDateTimeAPI) { if (!PyDateTimeAPI) {
PyDateTime_IMPORT; PyDateTime_IMPORT;
} }
if (!src) { if (!src) {
return false;
}
// If invoked with datetime.delta object, same as in chrono.h
if (PyDelta_Check(src.ptr())) {
value = duration_cast<nanoseconds>(
days(PyDateTime_DELTA_GET_DAYS(src.ptr())) +
seconds(PyDateTime_DELTA_GET_SECONDS(src.ptr())) +
microseconds(PyDateTime_DELTA_GET_MICROSECONDS(src.ptr()))
);
return true;
}
// If invoked with a float we assume it is seconds and convert, same as
// in chrono.h
if (PyFloat_Check(src.ptr())) {
value = duration_cast<nanoseconds>(
duration<double>(PyFloat_AsDouble(src.ptr())));
return true;
}
// If invoked with an int we assume it is nanoseconds and convert, same
// as in chrono.h
if (PyLong_Check(src.ptr())) {
value = duration_cast<nanoseconds>(
duration<int64_t>(PyLong_AsLongLong(src.ptr())));
return true;
}
// Lastly if we were actually called with a DurationWrapper object we
// get the number of nanoseconds and create a std::chrono::nanoseconds
// from it
py::object py_cls =
py::module::import("slsdet._slsdet").attr("DurationWrapper");
if (py::isinstance(src, py_cls)) {
sls::DurationWrapper *cls = src.cast<sls::DurationWrapper *>();
value = nanoseconds(cls->count());
return true;
}
return false;
}
/**
* Conversion part 2 (C++ -> Python)
* import the module to get a handle to the wrapped class
* Default construct an object of (wrapped) DurationWrapper
* set the count from chrono::nanoseconds and return
*/
static handle cast(std::chrono::nanoseconds src,
return_value_policy /* policy */, handle /* parent */) {
py::object py_cls =
py::module::import("slsdet._slsdet").attr("DurationWrapper");
py::object *obj = new py::object;
*obj = py_cls();
sls::DurationWrapper *dur = obj->cast<sls::DurationWrapper *>();
dur->set_count(src.count());
return *obj;
}
};
// Type caster for sls::defs::ROI from tuple
template <> struct type_caster<sls::defs::ROI> {
PYBIND11_TYPE_CASTER(sls::defs::ROI, _("Sequence[int, int, int, int] or "
"Sequence[int, int]"));
// convert c++ ROI to python tuple
static handle cast(const sls::defs::ROI &roi, return_value_policy, handle) {
return py::make_tuple(roi.xmin, roi.xmax, roi.ymin, roi.ymax).release();
}
// convert from python to c++ ROI
bool load(handle roi, bool /*allow implicit conversion*/) {
// accept tuple, list, numpy array any sequence
py::sequence seq;
try {
seq = py::reinterpret_borrow<py::sequence>(roi);
} catch (...) {
return false;
}
if (seq.size() != 4 && seq.size() != 2)
return false;
// Check if each element is an int
for (auto item : seq) {
if (!py::isinstance<py::int_>(item)) {
return false; return false;
} }
// If invoked with datetime.delta object, same as in chrono.h
if (PyDelta_Check(src.ptr())) {
value = duration_cast<nanoseconds>(
days(PyDateTime_DELTA_GET_DAYS(src.ptr())) +
seconds(PyDateTime_DELTA_GET_SECONDS(src.ptr())) +
microseconds(PyDateTime_DELTA_GET_MICROSECONDS(src.ptr()))
);
return true;
}
// If invoked with a float we assume it is seconds and convert, same as in chrono.h
if (PyFloat_Check(src.ptr())) {
value = duration_cast<nanoseconds>(duration<double>(PyFloat_AsDouble(src.ptr())));
return true;
}
// If invoked with an int we assume it is nanoseconds and convert, same as in chrono.h
if (PyLong_Check(src.ptr())) {
value = duration_cast<nanoseconds>(duration<int64_t>(PyLong_AsLongLong(src.ptr())));
return true;
}
// Lastly if we were actually called with a DurationWrapper object we get
// the number of nanoseconds and create a std::chrono::nanoseconds from it
py::object py_cls = py::module::import("slsdet._slsdet").attr("DurationWrapper");
if (py::isinstance(src, py_cls)){
sls::DurationWrapper *cls = src.cast<sls::DurationWrapper *>();
value = nanoseconds(cls->count());
return true;
}
return false;
} }
/** value.xmin = seq[0].cast<int>();
* Conversion part 2 (C++ -> Python) value.xmax = seq[1].cast<int>();
* import the module to get a handle to the wrapped class
* Default construct an object of (wrapped) DurationWrapper if (seq.size() == 4) {
* set the count from chrono::nanoseconds and return value.ymin = seq[2].cast<int>();
*/ value.ymax = seq[3].cast<int>();
static handle cast(std::chrono::nanoseconds src, return_value_policy /* policy */, handle /* parent */) {
py::object py_cls = py::module::import("slsdet._slsdet").attr("DurationWrapper");
py::object* obj = new py::object;
*obj = py_cls();
sls::DurationWrapper *dur = obj->cast<sls::DurationWrapper *>();
dur->set_count(src.count());
return *obj;
} }
};
return true;
}
};
} // namespace detail } // namespace detail
} // namespace pybind11 } // namespace pybind11

85
python/tests/conftest.py Normal file
View File

@@ -0,0 +1,85 @@
import pytest
import sys
import traceback
from pathlib import Path
current_dir = Path(__file__).resolve().parents[2]
scripts_dir = current_dir / "tests" / "scripts"
sys.path.append(str(scripts_dir))
print(sys.path)
from utils_for_test import (
Log,
LogLevel,
cleanup,
startReceiver,
startDetectorVirtualServer,
loadConfig,
loadBasicSettings,
)
def pytest_addoption(parser):
parser.addoption(
"--with-detector-simulators", action="store_true", default=False, help="Run tests that require detector simulators"
)
def pytest_configure(config):
config.addinivalue_line("markers", "withdetectorsimulators: mark test as needing detector simulators to run")
def pytest_collection_modifyitems(config, items):
if config.getoption("--with-detector-simulators"):
return
skip = pytest.mark.skip(reason="need --with-detector-simulators option to run")
for item in items:
if "withdetectorsimulators" in item.keywords:
item.add_marker(skip)
#helper fixture for servers
@pytest.fixture
def servers(request):
try:
return request.param # comes from @pytest.mark.parametrize(..., indirect=True)
except AttributeError:
# fallback default if the test did not parametrize
return ['eiger', 'jungfrau', 'mythen3', 'gotthard2', 'ctb', 'moench', 'xilinx_ctb']
return request.param
@pytest.fixture
def test_with_simulators(servers):
""" Fixture to automatically setup virtual detector servers for testing. """
LOG_PREFIX_FNAME = '/tmp/slsDetectorPackage_virtual_PythonAPI_test'
MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt'
with open(MAIN_LOG_FNAME, 'w') as fp:
try:
nmods = 2
for server in servers:
for ninterfaces in range(1,2):
if ninterfaces == 2 and server != 'jungfrau' and server != 'moench':
continue
msg = f'Starting Python API Tests for {server}'
if server == 'jungfrau' or server == 'moench':
msg += f' with {ninterfaces} interfaces'
Log(LogLevel.INFOBLUE, msg, fp)
cleanup(fp)
startDetectorVirtualServer(server, nmods, fp)
startReceiver(nmods, fp)
d = loadConfig(name=server, log_file_fp=fp, num_mods=nmods, num_frames=1, num_interfaces=ninterfaces)
loadBasicSettings(name=server, d=d, fp=fp)
yield # run test
cleanup(fp) # teardown
except Exception as e:
with open(MAIN_LOG_FNAME, 'a') as fp_error:
traceback.print_exc(file=fp_error)
Log(LogLevel.ERROR, f'Tests Failed.', fp)
cleanup(fp)

View File

@@ -0,0 +1,48 @@
import pytest
import sys
from conftest import test_with_simulators
from slsdet import Detector
@pytest.mark.withdetectorsimulators
@pytest.mark.parametrize("servers", [["moench"]], indirect=True)
def test_rx_ROI_moench(test_with_simulators, servers):
""" Test setting and getting rx_ROI property of Detector class for moench. """
d = Detector()
d.rx_roi = (0, 10, 10, 20)
roi = d.rx_roi
assert roi == [(0, 10, 10, 20)]
d.rx_roi = [5,15,15,25]
assert d.rx_roi == [(5,15,15,25)]
d.rx_roi = [[0,10,0,20], [5,20,410,420]]
roi = d.rx_roi
assert roi == [(0,10,0,20), (5,20,410,420)]
d.rx_clearroi()
roi = d.rx_roi
assert roi == [(-1,-1,-1,-1)]
@pytest.mark.withdetectorsimulators
@pytest.mark.parametrize("servers", [["mythen3"]], indirect=True)
def test_rx_ROI_mythen(test_with_simulators, servers):
""" Test setting and getting rx_ROI property of Detector class for mythen. """
d = Detector()
d.rx_roi = (0, 10)
roi = d.rx_roi
assert roi == [(0, 10, -1, -1)]
#d.rx_roi = [[5,15, 0, 1]] # not allowed for mythen3
d.rx_roi = [0,10, -1, -1]
assert d.rx_roi == [(0,10,-1,-1)]

View File

@@ -138,7 +138,8 @@ TEST_CASE("Parse version and help", "[detector]") {
} }
} }
TEST_CASE("Parse port and uid", "[detector]") { // TODO: fails on gitea CI due to uid issue, fix later
TEST_CASE("Parse port and uid", "[.failsongitea][detector]") {
uid_t uid = getuid(); uid_t uid = getuid();
std::string uidStr = std::to_string(uid); std::string uidStr = std::to_string(uid);
uid_t invalidUid = uid + 1000; uid_t invalidUid = uid + 1000;

View File

@@ -113,7 +113,7 @@ def startTestsForAll(args, fp):
startDetectorVirtualServer(server, args.num_mods, fp) startDetectorVirtualServer(server, args.num_mods, fp)
startFrameSynchronizerPullSocket(server, fp) startFrameSynchronizerPullSocket(server, fp)
startFrameSynchronizer(args.num_mods, fp) startFrameSynchronizer(args.num_mods, fp)
d = loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, fp=fp, num_mods=args.num_mods, num_frames=args.num_frames) d = loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, log_file_fp=fp, num_mods=args.num_mods, num_frames=args.num_frames)
loadBasicSettings(name=server, d=d, fp=fp) loadBasicSettings(name=server, d=d, fp=fp)
acquire(fp, d) acquire(fp, d)
testFramesCaught(server, d, args.num_frames) testFramesCaught(server, d, args.num_frames)

View File

@@ -18,9 +18,11 @@ from utils_for_test import (
RuntimeException, RuntimeException,
cleanup, cleanup,
startProcessInBackground, startProcessInBackground,
startReceiver,
startDetectorVirtualServer, startDetectorVirtualServer,
connectToVirtualServers, connectToVirtualServers,
loadBasicSettings, loadBasicSettings,
loadConfig,
runProcessWithLogFile runProcessWithLogFile
) )
@@ -28,45 +30,6 @@ LOG_PREFIX_FNAME = '/tmp/slsDetectorPackage_virtual_roi_test'
MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt' MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt'
ROI_TEST_FNAME = LOG_PREFIX_FNAME + '_results_' ROI_TEST_FNAME = LOG_PREFIX_FNAME + '_results_'
def startReceiver(num_mods, fp):
if num_mods == 1:
cmd = ['slsReceiver']
else:
cmd = ['slsMultiReceiver', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
# in 10.0.0
#cmd = ['slsMultiReceiver', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
startProcessInBackground(cmd, fp)
time.sleep(1)
def loadConfigForRoi(name, fp, num_mods = 1, num_interfaces = 1):
Log(LogLevel.INFO, 'Loading config')
Log(LogLevel.INFO, 'Loading config', fp)
try:
d = connectToVirtualServers(name, num_mods)
if name == 'jungfrau' or name == 'moench':
d.numinterfaces = num_interfaces
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger' or name == 'jungfrau' or name == 'moench':
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = 'localhost'
d.udp_dstip = 'auto'
if name != "eiger":
d.udp_srcip = 'auto'
if name == 'jungfrau' or name == 'moench':
d.udp_dstip2 = 'auto'
d.powerchip = 1
d.frames = 5
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
return d
def startTestsForAll(fp): def startTestsForAll(fp):
servers = [ servers = [
'eiger', 'eiger',
@@ -89,7 +52,7 @@ def startTestsForAll(fp):
cleanup(fp) cleanup(fp)
startDetectorVirtualServer(server, nmods, fp) startDetectorVirtualServer(server, nmods, fp)
startReceiver(nmods, fp) startReceiver(nmods, fp)
d = loadConfigForRoi(name=server, fp=fp, num_mods=nmods, num_interfaces=ninterfaces) d = loadConfig(name=server, log_file_fp = fp, num_mods=nmods, num_frames=5, num_interfaces=ninterfaces)
loadBasicSettings(name=server, d=d, fp=fp) loadBasicSettings(name=server, d=d, fp=fp)
fname = ROI_TEST_FNAME + server + '.txt' fname = ROI_TEST_FNAME + server + '.txt'

View File

@@ -63,7 +63,7 @@ def startCmdTestsForAll(args, fp):
cleanup(fp) cleanup(fp)
startDetectorVirtualServer(name=server, num_mods=num_mods, fp=fp) startDetectorVirtualServer(name=server, num_mods=num_mods, fp=fp)
startReceiver(num_mods, fp) startReceiver(num_mods, fp)
d = loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, fp=fp, num_mods=num_mods) d = loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, log_file_fp=fp, num_mods=num_mods)
loadBasicSettings(name=server, d=d, fp=fp) loadBasicSettings(name=server, d=d, fp=fp)
runProcessWithLogFile('Cmd Tests (' + args.markers + ') for ' + server, cmd, fp, fname) runProcessWithLogFile('Cmd Tests (' + args.markers + ') for ' + server, cmd, fp, fname)
except Exception as e: except Exception as e:

View File

@@ -16,7 +16,6 @@ SERVER_START_PORTNO=1900
init(autoreset=True) init(autoreset=True)
class LogLevel(Enum): class LogLevel(Enum):
INFO = 0 INFO = 0
INFORED = 1 INFORED = 1
@@ -193,30 +192,49 @@ def connectToVirtualServers(name, num_mods, ctb_object=False):
return d return d
def startReceiver(num_mods, fp):
if num_mods == 1:
cmd = ['slsReceiver']
else:
cmd = ['slsMultiReceiver', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
# in 10.0.0
#cmd = ['slsMultiReceiver', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
startProcessInBackground(cmd, fp)
time.sleep(1)
def loadConfig(name, rx_hostname, settingsdir, fp, num_mods = 1, num_frames = 1):
def loadConfig(name, rx_hostname = 'localhost', settingsdir = None, log_file_fp = None, num_mods = 1, num_frames = 1, num_interfaces = 1):
Log(LogLevel.INFO, 'Loading config') Log(LogLevel.INFO, 'Loading config')
Log(LogLevel.INFO, 'Loading config', fp) Log(LogLevel.INFO, 'Loading config', log_file_fp)
try: try:
d = connectToVirtualServers(name, num_mods) d = connectToVirtualServers(name, num_mods)
if name == 'jungfrau' or name == 'moench':
d.numinterfaces = num_interfaces
d.udp_dstport = DEFAULT_UDP_DST_PORTNO d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger': if name == 'eiger' or name == 'jungfrau' or name == 'moench':
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1 d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = rx_hostname d.rx_hostname = rx_hostname
d.udp_dstip = 'auto' d.udp_dstip = 'auto'
if name != "eiger": if name != "eiger":
d.udp_srcip = 'auto' d.udp_srcip = 'auto'
if name == "jungfrau" or name == "moench":
d.udp_dstip2 = 'auto'
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb": if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
d.powerchip = 1 d.powerchip = 1
if name == "xilinx_ctb": if name == "xilinx_ctb":
d.configureTransceiver() d.configureTransceiver()
if name == "eiger": if settingsdir is not None and name in ['eiger', 'mythen3']:
d.trimen = [4500, 5400, 6400] d.settingspath = settingsdir + '/' + name + '/'
d.settingspath = settingsdir + '/eiger/' d.trimen = [4500, 5400, 6400] if name == 'eiger' else [4000, 6000, 8000, 12000]
d.setThresholdEnergy(4500, detectorSettings.STANDARD) d.setThresholdEnergy(4500, detectorSettings.STANDARD)
d.frames = num_frames d.frames = num_frames