Merge pull request #1205 from slsdetectorgroup/911/fix_frame_sync
Some checks failed
CMake / Configure and build using cmake (push) Failing after 2s

911/fix frame sync
This commit is contained in:
maliakal_d 2025-05-12 01:01:16 +02:00 committed by GitHub
commit f3e5804d02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 523 additions and 275 deletions

View File

@ -64,6 +64,10 @@ configure_file( scripts/test_virtual.py
${CMAKE_BINARY_DIR}/test_virtual.py ${CMAKE_BINARY_DIR}/test_virtual.py
) )
configure_file(scripts/frameSynchronizerPullSocket.py
${CMAKE_BINARY_DIR}/bin/frameSynchronizerPullSocket.py COPYONLY)
configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/../VERSION configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/../VERSION
${CMAKE_BINARY_DIR}/bin/slsdet/VERSION ${CMAKE_BINARY_DIR}/bin/slsdet/VERSION
) )
@ -76,4 +80,5 @@ if(SLS_INSTALL_PYTHONEXT)
install(FILES ${PYTHON_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/python/slsdet) install(FILES ${PYTHON_FILES} DESTINATION ${CMAKE_INSTALL_PREFIX}/python/slsdet)
install(FILES ../VERSION DESTINATION ${CMAKE_INSTALL_PREFIX}/python/slsdet) install(FILES ../VERSION DESTINATION ${CMAKE_INSTALL_PREFIX}/python/slsdet)
endif() endif()

View File

@ -445,23 +445,25 @@ TEST_CASE("rx_arping", "[.cmdcall][.rx]") {
Detector det; Detector det;
Caller caller(&det); Caller caller(&det);
auto prev_val = det.getRxArping(); auto prev_val = det.getRxArping();
{ if (det.getDestinationUDPIP()[0].str() != "127.0.0.1") {
std::ostringstream oss; {
caller.call("rx_arping", {"1"}, -1, PUT, oss); std::ostringstream oss;
REQUIRE(oss.str() == "rx_arping 1\n"); caller.call("rx_arping", {"1"}, -1, PUT, oss);
} REQUIRE(oss.str() == "rx_arping 1\n");
{ }
std::ostringstream oss; {
caller.call("rx_arping", {}, -1, GET, oss); std::ostringstream oss;
REQUIRE(oss.str() == "rx_arping 1\n"); caller.call("rx_arping", {}, -1, GET, oss);
} REQUIRE(oss.str() == "rx_arping 1\n");
{ }
std::ostringstream oss; {
caller.call("rx_arping", {"0"}, -1, PUT, oss); std::ostringstream oss;
REQUIRE(oss.str() == "rx_arping 0\n"); caller.call("rx_arping", {"0"}, -1, PUT, oss);
} REQUIRE(oss.str() == "rx_arping 0\n");
for (int i = 0; i != det.size(); ++i) { }
det.setRxArping(prev_val[i], {i}); for (int i = 0; i != det.size(); ++i) {
det.setRxArping(prev_val[i], {i});
}
} }
} }

View File

@ -107,11 +107,11 @@ void zmq_free(void *data, void *hint) { delete[] static_cast<char *>(data); }
void print_frames(const PortFrameMap &frame_port_map) { void print_frames(const PortFrameMap &frame_port_map) {
LOG(sls::logDEBUG) << "Printing frames"; LOG(sls::logDEBUG) << "Printing frames";
for (const auto &it : frame_port_map) { for (const auto &it : frame_port_map) {
uint16_t udpPort = it.first; const uint16_t udpPort = it.first;
const auto &frame_map = it.second; const auto &frame_map = it.second;
LOG(sls::logDEBUG) << "UDP port: " << udpPort; LOG(sls::logDEBUG) << "UDP port: " << udpPort;
for (const auto &frame : frame_map) { for (const auto &frame : frame_map) {
uint64_t fnum = frame.first; const uint64_t fnum = frame.first;
const auto &msg_list = frame.second; const auto &msg_list = frame.second;
LOG(sls::logDEBUG) LOG(sls::logDEBUG)
<< " acq index: " << fnum << '[' << msg_list.size() << ']'; << " acq index: " << fnum << '[' << msg_list.size() << ']';
@ -130,30 +130,26 @@ std::set<uint64_t> get_valid_fnums(const PortFrameMap &port_frame_map) {
// collect all unique frame numbers from all ports // collect all unique frame numbers from all ports
std::set<uint64_t> unique_fnums; std::set<uint64_t> unique_fnums;
for (auto it = port_frame_map.begin(); it != port_frame_map.begin(); ++it) { for (const auto &it : port_frame_map) {
const FrameMap &frame_map = it->second; const FrameMap &frame_map = it.second;
for (auto frame = frame_map.begin(); frame != frame_map.end(); for (const auto &frame : frame_map) {
++frame) { unique_fnums.insert(frame.first);
unique_fnums.insert(frame->first);
} }
} }
// collect valid frame numbers // collect valid frame numbers
for (auto &fnum : unique_fnums) { for (auto &fnum : unique_fnums) {
bool is_valid = true; bool is_valid = true;
for (auto it = port_frame_map.begin(); it != port_frame_map.end(); for (const auto &it : port_frame_map) {
++it) { const uint16_t port = it.first;
uint16_t port = it->first; const FrameMap &frame_map = it.second;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum); auto frame = frame_map.find(fnum);
// invalid: fnum missing in one port // invalid: fnum missing in one port
if (frame == frame_map.end()) { if (frame == frame_map.end()) {
LOG(sls::logDEBUG) LOG(sls::logDEBUG)
<< "Fnum " << fnum << " is missing in port " << port; << "Fnum " << fnum << " is missing in port " << port;
// invalid: fnum greater than all in that port auto upper_frame = frame_map.upper_bound(fnum);
auto last_frame = std::prev(frame_map.end()); if (upper_frame == frame_map.end()) {
auto last_fnum = last_frame->first;
if (fnum > last_fnum) {
LOG(sls::logDEBUG) << "And no larger fnum found. Fnum " LOG(sls::logDEBUG) << "And no larger fnum found. Fnum "
<< fnum << " is invalid.\n"; << fnum << " is invalid.\n";
is_valid = false; is_valid = false;
@ -223,18 +219,26 @@ void Correlate(FrameStatus *stat) {
// sending all valid fnum data packets // sending all valid fnum data packets
for (const auto &fnum : valid_fnums) { for (const auto &fnum : valid_fnums) {
ZmqMsgList msg_list; ZmqMsgList msg_list;
PortFrameMap &port_frame_map = stat->frames; for (const auto &it : stat->frames) {
for (auto it = port_frame_map.begin(); const uint16_t port = it.first;
it != port_frame_map.end(); ++it) { const FrameMap &frame_map = it.second;
uint16_t port = it->first;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum); auto frame = frame_map.find(fnum);
if (frame != frame_map.end()) { if (frame != frame_map.end()) {
msg_list.insert(msg_list.end(), msg_list.insert(msg_list.end(),
stat->frames[port][fnum].begin(), stat->frames[port][fnum].begin(),
stat->frames[port][fnum].end()); stat->frames[port][fnum].end());
// clean up }
for (zmq_msg_t *msg : stat->frames[port][fnum]) { }
LOG(printHeadersLevel)
<< "Sending data packets for fnum " << fnum;
zmq_send_multipart(socket, msg_list);
// clean up
for (const auto &it : stat->frames) {
const uint16_t port = it.first;
const FrameMap &frame_map = it.second;
auto frame = frame_map.find(fnum);
if (frame != frame_map.end()) {
for (zmq_msg_t *msg : frame->second) {
if (msg) { if (msg) {
zmq_msg_close(msg); zmq_msg_close(msg);
delete msg; delete msg;
@ -243,9 +247,6 @@ void Correlate(FrameStatus *stat) {
stat->frames[port].erase(fnum); stat->frames[port].erase(fnum);
} }
} }
LOG(printHeadersLevel)
<< "Sending data packets for fnum " << fnum;
zmq_send_multipart(socket, msg_list);
} }
} }
// sending all end packets // sending all end packets
@ -259,6 +260,21 @@ void Correlate(FrameStatus *stat) {
} }
} }
stat->ends.clear(); stat->ends.clear();
// clean up old frames
for (auto &it : stat->frames) {
FrameMap &frame_map = it.second;
for (auto &frame : frame_map) {
for (zmq_msg_t *msg : frame.second) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
frame.second.clear();
}
frame_map.clear();
}
stat->frames.clear();
} }
} }
} }

View File

@ -60,3 +60,5 @@ include(Catch)
catch_discover_tests(tests) catch_discover_tests(tests)
configure_file(scripts/test_simulators.py ${CMAKE_BINARY_DIR}/bin/test_simulators.py COPYONLY) configure_file(scripts/test_simulators.py ${CMAKE_BINARY_DIR}/bin/test_simulators.py COPYONLY)
configure_file(scripts/test_frame_synchronizer.py ${CMAKE_BINARY_DIR}/bin/test_frame_synchronizer.py COPYONLY)
configure_file(scripts/utils_for_test.py ${CMAKE_BINARY_DIR}/bin/utils_for_test.py COPYONLY)

View File

@ -0,0 +1,140 @@
# SPDX-License-Identifier: LGPL-3.0-or-other
# Copyright (C) 2021 Contributors to the SLS Detector Package
'''
This file is used to start up simulators, frame synchronizer, pull sockets, acquire, test and kill them finally.
'''
import sys, time
import traceback, json
from slsdet import Detector
from slsdet.defines import DEFAULT_TCP_RX_PORTNO
from utils_for_test import (
Log,
LogLevel,
RuntimeException,
checkIfProcessRunning,
killProcess,
cleanup,
cleanSharedmemory,
startProcessInBackground,
startProcessInBackgroundWithLogFile,
startDetectorVirtualServer,
loadConfig,
ParseArguments
)
LOG_PREFIX_FNAME = '/tmp/slsFrameSynchronizer_test'
MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt'
PULL_SOCKET_PREFIX_FNAME = LOG_PREFIX_FNAME + '_pull_socket_'
def startFrameSynchronizerPullSocket(name, fp):
fname = PULL_SOCKET_PREFIX_FNAME + name + '.txt'
cmd = ['python', '-u', 'frameSynchronizerPullSocket.py']
startProcessInBackgroundWithLogFile(cmd, fp, fname)
def startFrameSynchronizer(num_mods, fp):
cmd = ['slsFrameSynchronizer', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
# in 10.0.0
#cmd = ['slsFrameSynchronizer', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
startProcessInBackground(cmd, fp)
time.sleep(1)
def acquire(fp):
Log(LogLevel.INFO, 'Acquiring')
Log(LogLevel.INFO, 'Acquiring', fp)
d = Detector()
d.acquire()
def testFramesCaught(name, num_frames):
d = Detector()
fnum = d.rx_framescaught[0]
if fnum != num_frames:
raise RuntimeException(f"{name} caught only {fnum}. Expected {num_frames}")
Log(LogLevel.INFOGREEN, f'Frames caught test passed for {name}')
Log(LogLevel.INFOGREEN, f'Frames caught test passed for {name}', fp)
def testZmqHeadetTypeCount(name, num_mods, num_frames, fp):
Log(LogLevel.INFO, f"Testing Zmq Header type count for {name}")
Log(LogLevel.INFO, f"Testing Zmq Header type count for {name}", fp)
htype_counts = {
"header": 0,
"series_end": 0,
"module": 0
}
try:
# get a count of each htype from file
pull_socket_fname = PULL_SOCKET_PREFIX_FNAME + name + '.txt'
with open(pull_socket_fname, 'r') as log_fp:
for line in log_fp:
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
htype = data.get("htype")
if htype in htype_counts:
htype_counts[htype] += 1
except json.JSONDecodeError:
continue
# test if file contents matches expected counts
d = Detector()
num_ports_per_module = 1 if name == "gotthard2" else d.numinterfaces
total_num_frame_parts = num_ports_per_module * num_mods * num_frames
for htype, expected_count in [("header", num_mods), ("series_end", num_mods), ("module", total_num_frame_parts)]:
if htype_counts[htype] != expected_count:
msg = f"Expected {expected_count} '{htype}' entries, found {htype_counts[htype]}"
raise RuntimeException(msg)
except Exception as e:
raise RuntimeException(f'Failed to get zmq header count type. Error:{str(e)}') from e
Log(LogLevel.INFOGREEN, f"Zmq Header type count test passed for {name}")
Log(LogLevel.INFOGREEN, f"Zmq Header type count test passed for {name}", fp)
def startTestsForAll(args, fp):
for server in args.servers:
try:
Log(LogLevel.INFOBLUE, f'Synchronizer Tests for {server}')
Log(LogLevel.INFOBLUE, f'Synchronizer Tests for {server}', fp)
cleanup(fp)
startDetectorVirtualServer(server, args.num_mods, fp)
startFrameSynchronizerPullSocket(server, fp)
startFrameSynchronizer(args.num_mods, fp)
loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, fp=fp, num_mods=args.num_mods, num_frames=args.num_frames)
acquire(fp)
testFramesCaught(server, args.num_frames)
testZmqHeadetTypeCount(server, args.num_mods, args.num_frames, fp)
Log(LogLevel.INFO, '\n')
except Exception as e:
raise RuntimeException(f'Synchronizer Tests failed') from e
Log(LogLevel.INFOGREEN, 'Passed all synchronizer tests for all detectors \n' + str(args.servers))
if __name__ == '__main__':
args = ParseArguments(description='Automated tests to test frame synchronizer', default_num_mods=2)
Log(LogLevel.INFOBLUE, '\nLog File: ' + MAIN_LOG_FNAME + '\n')
with open(MAIN_LOG_FNAME, 'w') as fp:
try:
startTestsForAll(args, fp)
cleanup(fp)
except Exception as e:
with open(MAIN_LOG_FNAME, 'a') as fp_error:
traceback.print_exc(file=fp_error)
cleanup(fp)
Log(LogLevel.ERROR, f'Tests Failed.')

View File

@ -4,250 +4,86 @@
This file is used to start up simulators, receivers and run all the tests on them and finally kill the simulators and receivers. This file is used to start up simulators, receivers and run all the tests on them and finally kill the simulators and receivers.
''' '''
import argparse import argparse
import os, sys, subprocess, time, colorama, signal import sys, subprocess, time, traceback
from colorama import Fore from slsdet import Detector
from slsdet import Detector, detectorType, detectorSettings from slsdet.defines import DEFAULT_TCP_RX_PORTNO
from slsdet.defines import DEFAULT_TCP_CNTRL_PORTNO, DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
HALFMOD2_TCP_CNTRL_PORTNO=1955
HALFMOD2_TCP_RX_PORTNO=1957
colorama.init(autoreset=True) from utils_for_test import (
Log,
class RuntimeException (Exception): LogLevel,
def __init__ (self, message): RuntimeException,
super().__init__(Fore.RED + message) checkIfProcessRunning,
killProcess,
def Log(color, message): cleanup,
print('\n' + color + message, flush=True) cleanSharedmemory,
startProcessInBackground,
runProcessWithLogFile,
startDetectorVirtualServer,
loadConfig,
ParseArguments
)
def checkIfProcessRunning(processName): LOG_PREFIX_FNAME = '/tmp/slsDetectorPackage_virtual_test'
cmd = "ps -ef | grep " + processName MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt'
print(cmd) GENERAL_TESTS_LOG_FNAME = LOG_PREFIX_FNAME + '_results_general.txt'
res=subprocess.getoutput(cmd) CMD_TEST_LOG_PREFIX_FNAME = LOG_PREFIX_FNAME + '_results_cmd_'
print(res)
# eg. of output
#l_user 250506 243295 0 14:38 pts/5 00:00:00 /bin/sh -c ps -ef | grep slsReceiver
#l_user 250508 250506 0 14:38 pts/5 00:00:00 grep slsReceiver
print('how many')
cmd = "ps -ef | grep " + processName + " | wc -l"
print(cmd)
res=subprocess.getoutput(cmd)
print(res)
if res == '2':
return False
return True
def killProcess(name): def startReceiver(num_mods, fp):
if checkIfProcessRunning(name): if num_mods == 1:
Log(Fore.GREEN, 'killing ' + name) cmd = ['slsReceiver']
p = subprocess.run(['killall', name])
if p.returncode != 0:
raise RuntimeException('killall failed for ' + name)
else: else:
print('process not running : ' + name) cmd = ['slsMultiReceiver', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
# in 10.0.0
#cmd = ['slsMultiReceiver', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
startProcessInBackground(cmd, fp)
time.sleep(1)
def startGeneralTests(fp):
def killAllStaleProcesses(): fname = GENERAL_TESTS_LOG_FNAME
killProcess('eigerDetectorServer_virtual') cmd = ['tests', '--abort', '-s']
killProcess('jungfrauDetectorServer_virtual')
killProcess('mythen3DetectorServer_virtual')
killProcess('gotthard2DetectorServer_virtual')
killProcess('gotthardDetectorServer_virtual')
killProcess('ctbDetectorServer_virtual')
killProcess('moenchDetectorServer_virtual')
killProcess('xilinx_ctbDetectorServer_virtual')
killProcess('slsReceiver')
killProcess('slsMultiReceiver')
cleanSharedmemory()
def cleanup(name):
'''
kill both servers, receivers and clean shared memory
'''
Log(Fore.GREEN, 'Cleaning up...')
killProcess(name + 'DetectorServer_virtual')
killProcess('slsReceiver')
killProcess('slsMultiReceiver')
cleanSharedmemory()
def cleanSharedmemory():
Log(Fore.GREEN, 'Cleaning up shared memory...')
try: try:
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp) cleanup(fp)
except: runProcessWithLogFile('General Tests', cmd, fp, fname)
Log(Fore.RED, 'Could not free shared memory') except Exception as e:
raise raise RuntimeException(f'General tests failed.') from e
def startProcessInBackground(name):
try:
# in background and dont print output
p = subprocess.Popen(name.split(), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
Log(Fore.GREEN, 'Starting up ' + name + ' ...')
except:
Log(Fore.RED, 'Could not start ' + name)
raise
def startServer(name):
startProcessInBackground(name + 'DetectorServer_virtual')
# second half
if name == 'eiger':
startProcessInBackground(name + 'DetectorServer_virtual -p' + str(HALFMOD2_TCP_CNTRL_PORTNO))
tStartup = 6
Log(Fore.WHITE, 'Takes ' + str(tStartup) + ' seconds... Please be patient')
time.sleep(tStartup)
def startReceiver(name):
startProcessInBackground('slsReceiver')
# second half
if name == 'eiger':
startProcessInBackground('slsReceiver -t' + str(HALFMOD2_TCP_RX_PORTNO))
time.sleep(2)
def loadConfig(name, rx_hostname, settingsdir):
Log(Fore.GREEN, 'Loading config')
try:
d = Detector()
if name == 'eiger':
d.hostname = 'localhost:' + str(DEFAULT_TCP_CNTRL_PORTNO) + '+localhost:' + str(HALFMOD2_TCP_CNTRL_PORTNO)
#d.udp_dstport = {2: 50003}
# will set up for every module
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = rx_hostname + ':' + str(DEFAULT_TCP_RX_PORTNO) + '+' + rx_hostname + ':' + str(HALFMOD2_TCP_RX_PORTNO)
d.udp_dstip = 'auto'
d.trimen = [4500, 5400, 6400]
d.settingspath = settingsdir + '/eiger/'
d.setThresholdEnergy(4500, detectorSettings.STANDARD)
else:
d.hostname = 'localhost'
d.rx_hostname = rx_hostname
d.udp_dstip = 'auto'
if d.type == detectorType.GOTTHARD:
d.udp_srcip = d.udp_dstip
else:
d.udp_srcip = 'auto'
if d.type == detectorType.JUNGFRAU or d.type == detectorType.MOENCH or d.type == detectorType.XILINX_CHIPTESTBOARD:
d.powerchip = 1
if d.type == detectorType.XILINX_CHIPTESTBOARD:
d.configureTransceiver()
except:
Log(Fore.RED, 'Could not load config for ' + name)
raise
def startCmdTests(name, fp, fname):
Log(Fore.GREEN, 'Cmd Tests for ' + name)
cmd = 'tests --abort [.cmdcall] -s -o ' + fname
p = subprocess.run(cmd.split(), stdout=fp, stderr=fp, check=True, text=True)
p.check_returncode()
with open (fname, 'r') as f:
for line in f:
if "FAILED" in line:
msg = 'Cmd tests failed for ' + name + '!!!'
Log(Fore.RED, msg)
raise Exception(msg)
Log(Fore.GREEN, 'Cmd Tests successful for ' + name)
def startGeneralTests(fp, fname):
Log(Fore.GREEN, 'General Tests')
cmd = 'tests --abort -s -o ' + fname
p = subprocess.run(cmd.split(), stdout=fp, stderr=fp, check=True, text=True)
p.check_returncode()
with open (fname, 'r') as f:
for line in f:
if "FAILED" in line:
msg = 'General tests failed !!!'
Log(Fore.RED, msg)
raise Exception(msg)
Log(Fore.GREEN, 'General Tests successful')
def startCmdTestsForAll(args, fp):
# parse cmd line for rx_hostname and settingspath using the argparse library for server in args.servers:
parser = argparse.ArgumentParser(description = 'automated tests with the virtual detector servers')
parser.add_argument('rx_hostname', help = 'hostname/ip of the current machine')
parser.add_argument('settingspath', help = 'Relative or absolut path to the settingspath')
parser.add_argument('-s', '--servers', help='Detector servers to run', nargs='*')
args = parser.parse_args()
if args.rx_hostname == 'localhost':
raise RuntimeException('Cannot use localhost for rx_hostname for the tests (fails for rx_arping for eg.)')
if args.servers is None:
servers = [
'eiger',
'jungfrau',
'mythen3',
'gotthard2',
'gotthard',
'ctb',
'moench',
'xilinx_ctb'
]
else:
servers = args.servers
Log(Fore.WHITE, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\'')
# redirect to file
prefix_fname = '/tmp/slsDetectorPackage_virtual_test'
original_stdout = sys.stdout
original_stderr = sys.stderr
fname = prefix_fname + '_log.txt'
Log(Fore.BLUE, '\nLog File: ' + fname)
with open(fname, 'w') as fp:
# general tests
file_results = prefix_fname + '_results_general.txt'
Log(Fore.BLUE, 'General tests (results: ' + file_results + ')')
sys.stdout = fp
sys.stderr = fp
Log(Fore.BLUE, 'General tests (results: ' + file_results + ')')
startGeneralTests(fp, file_results)
killAllStaleProcesses()
for server in servers:
try: try:
# print to terminal for progress num_mods = 2 if server == 'eiger' else 1
sys.stdout = original_stdout fname = CMD_TEST_LOG_PREFIX_FNAME + server + '.txt'
sys.stderr = original_stderr cmd = ['tests', '--abort', '[.cmdcall]', '-s']
file_results = prefix_fname + '_results_cmd_' + server + '.txt'
Log(Fore.BLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')') Log(LogLevel.INFOBLUE, f'Starting Cmd Tests for {server}')
sys.stdout = fp cleanup(fp)
sys.stderr = fp startDetectorVirtualServer(name=server, num_mods=num_mods, fp=fp)
Log(Fore.BLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')') startReceiver(num_mods, fp)
loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, fp=fp, num_mods=num_mods)
# cmd tests for det runProcessWithLogFile('Cmd Tests for ' + server, cmd, fp, fname)
cleanup(server) except Exception as e:
startServer(server) raise RuntimeException(f'Cmd Tests failed for {server}.') from e
startReceiver(server)
loadConfig(server, args.rx_hostname, args.settingspath) Log(LogLevel.INFOGREEN, 'Passed all tests for all detectors \n' + str(args.servers))
startCmdTests(server, fp, file_results)
cleanup(server)
except:
Log(Fore.RED, 'Exception caught. Cleaning up.')
cleanup(server)
sys.stdout = original_stdout
sys.stderr = original_stderr
Log(Fore.RED, 'Cmd tests failed for ' + server + '!!!')
raise
Log(Fore.GREEN, 'Passed all tests for virtual detectors \n' + str(servers)) if __name__ == '__main__':
args = ParseArguments('Automated tests with the virtual detector servers')
if args.num_mods > 1:
raise RuntimeException(f'Cannot support multiple modules at the moment (except Eiger).')
# redirect to terminal Log(LogLevel.INFOBLUE, '\nLog File: ' + MAIN_LOG_FNAME + '\n')
sys.stdout = original_stdout
sys.stderr = original_stderr with open(MAIN_LOG_FNAME, 'w') as fp:
Log(Fore.GREEN, 'Passed all tests for virtual detectors \n' + str(servers) + '\nYayyyy! :) ') try:
startGeneralTests(fp)
startCmdTestsForAll(args, fp)
cleanup(fp)
except Exception as e:
with open(MAIN_LOG_FNAME, 'a') as fp_error:
traceback.print_exc(file=fp_error)
cleanup(fp)
Log(LogLevel.ERROR, f'Tests Failed.')

View File

@ -0,0 +1,247 @@
# SPDX-License-Identifier: LGPL-3.0-or-other
# Copyright (C) 2021 Contributors to the SLS Detector Package
'''
This file is used for common utils used for integration tests between simulators and receivers.
'''
import sys, subprocess, time, argparse
from enum import Enum
from colorama import Fore, Style, init
from slsdet import Detector, detectorSettings
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
SERVER_START_PORTNO=1900
init(autoreset=True)
class LogLevel(Enum):
INFO = 0
INFORED = 1
INFOGREEN = 2
INFOBLUE = 3
WARNING = 4
ERROR = 5
DEBUG = 6
LOG_LABELS = {
LogLevel.WARNING: "WARNING: ",
LogLevel.ERROR: "ERROR: ",
LogLevel.DEBUG: "DEBUG: "
}
LOG_COLORS = {
LogLevel.INFO: Fore.WHITE,
LogLevel.INFORED: Fore.RED,
LogLevel.INFOGREEN: Fore.GREEN,
LogLevel.INFOBLUE: Fore.BLUE,
LogLevel.WARNING: Fore.YELLOW,
LogLevel.ERROR: Fore.RED,
LogLevel.DEBUG: Fore.CYAN
}
def Log(level: LogLevel, message: str, stream=sys.stdout):
color = LOG_COLORS.get(level, Fore.WHITE)
label = LOG_LABELS.get(level, "")
print(f"{color}{label}{message}{Style.RESET_ALL}", file=stream, flush=True)
class RuntimeException (Exception):
def __init__ (self, message):
Log(LogLevel.ERROR, message)
super().__init__(message)
def checkIfProcessRunning(processName):
cmd = f"pgrep -f {processName}"
res = subprocess.getoutput(cmd)
return res.strip().splitlines()
def killProcess(name, fp):
pids = checkIfProcessRunning(name)
if pids:
Log(LogLevel.INFO, f"Killing '{name}' processes with PIDs: {', '.join(pids)}", fp)
for pid in pids:
try:
p = subprocess.run(['kill', pid])
if p.returncode != 0 and bool(checkIfProcessRunning(name)):
raise RuntimeException(f"Could not kill {name} with pid {pid}")
except Exception as e:
raise RuntimeException(f"Failed to kill process {name} pid:{pid}. Error: {str(e)}") from e
#else:
# Log(LogLevel.INFO, 'process not running : ' + name)
def cleanSharedmemory(fp):
Log(LogLevel.INFO, 'Cleaning up shared memory', fp)
try:
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp)
except:
raise RuntimeException('Could not free shared memory')
def cleanup(fp):
Log(LogLevel.INFO, 'Cleaning up')
Log(LogLevel.INFO, 'Cleaning up', fp)
killProcess('DetectorServer_virtual', fp)
killProcess('slsReceiver', fp)
killProcess('slsMultiReceiver', fp)
killProcess('slsFrameSynchronizer', fp)
killProcess('frameSynchronizerPullSocket', fp)
cleanSharedmemory(fp)
def startProcessInBackground(cmd, fp):
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd))
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd), fp)
try:
p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
except Exception as e:
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
def startProcessInBackgroundWithLogFile(cmd, fp, log_file_name):
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name)
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name, fp)
try:
with open(log_file_name, 'w') as log_fp:
subprocess.Popen(cmd, stdout=log_fp, stderr=log_fp, text=True)
except Exception as e:
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
def runProcessWithLogFile(name, cmd, fp, log_file_name):
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name)
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name, fp)
Log(LogLevel.INFOBLUE, 'Cmd: ' + ' '.join(cmd), fp)
try:
with open(log_file_name, 'w') as log_fp:
subprocess.run(cmd, stdout=log_fp, stderr=log_fp, check=True, text=True)
except subprocess.CalledProcessError as e:
pass
except Exception as e:
Log(LogLevel.ERROR, f'Failed to run {name}:{str(e)}', fp)
raise RuntimeException(f'Failed to run {name}:{str(e)}')
with open (log_file_name, 'r') as f:
for line in f:
if "FAILED" in line:
raise RuntimeException(f'{line}')
Log(LogLevel.INFOGREEN, name + ' successful!\n')
Log(LogLevel.INFOGREEN, name + ' successful!\n', fp)
def startDetectorVirtualServer(name :str, num_mods, fp):
for i in range(num_mods):
port_no = SERVER_START_PORTNO + (i * 2)
cmd = [name + 'DetectorServer_virtual', '-p', str(port_no)]
if name == 'gotthard':
cmd += ['-m', '1']
startProcessInBackground(cmd, fp)
match name:
case 'jungfrau':
time.sleep(7)
case 'gotthard2':
time.sleep(5)
case _:
time.sleep(3)
def connectToVirtualServers(name, num_mods):
try:
d = Detector()
except Exception as e:
raise RuntimeException(f'Could not create Detector object for {name}. Error: {str(e)}') from e
counts_sec = 5
while (counts_sec != 0):
try:
d.virtual = [num_mods, SERVER_START_PORTNO]
break
except Exception as e:
# stop server still not up, wait a bit longer
if "Cannot connect to" in str(e):
Log(LogLevel.WARNING, f'Still waiting for {name} virtual server to be up...{counts_sec}s left')
time.sleep(1)
counts_sec -= 1
else:
raise
return d
def loadConfig(name, rx_hostname, settingsdir, fp, num_mods = 1, num_frames = 1):
Log(LogLevel.INFO, 'Loading config')
Log(LogLevel.INFO, 'Loading config', fp)
try:
d = connectToVirtualServers(name, num_mods)
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger':
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = rx_hostname
d.udp_dstip = 'auto'
if name != "eiger":
if name == "gotthard":
d.udp_srcip = d.udp_dstip
else:
d.udp_srcip = 'auto'
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
d.powerchip = 1
if name == "xilinx_ctb":
d.configureTransceiver()
if name == "eiger":
d.trimen = [4500, 5400, 6400]
d.settingspath = settingsdir + '/eiger/'
d.setThresholdEnergy(4500, detectorSettings.STANDARD)
d.frames = num_frames
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
def ParseArguments(description, default_num_mods=1):
parser = argparse.ArgumentParser(description)
parser.add_argument('rx_hostname', nargs='?', default='localhost',
help='Hostname/IP of the current machine')
parser.add_argument('settingspath', nargs='?', default='../../settingsdir',
help='Relative or absolute path to the settings directory')
parser.add_argument('-n', '--num-mods', nargs='?', default=default_num_mods, type=int,
help='Number of modules to test with')
parser.add_argument('-f', '--num-frames', nargs='?', default=1, type=int,
help='Number of frames to test with')
parser.add_argument('-s', '--servers', nargs='*',
help='Detector servers to run')
args = parser.parse_args()
# Set default server list if not provided
if args.servers is None:
args.servers = [
'eiger',
'jungfrau',
'mythen3',
'gotthard2',
'gotthard',
'ctb',
'moench',
'xilinx_ctb'
]
Log(LogLevel.INFO, 'Arguments:\n' +
'rx_hostname: ' + args.rx_hostname +
'\nsettingspath: \'' + args.settingspath +
'\nservers: \'' + ' '.join(args.servers) +
'\nnum_mods: \'' + str(args.num_mods) +
'\nnum_frames: \'' + str(args.num_frames) + '\'')
return args