Files
slsDetectorPackage/tests/scripts/utils_for_test.py
Dhanya Thattil ee27f0bc1b readoutspeed in rx master file and other master file inconsistencies (#1245)
readout speed added to json and h5 master files.
Also fixed master file inconsistencies

Sserver binaries
- update server binaries because readoutspeed needs to be sent to receiver with rx_hostname command

API
- added const to Detector class set/getburstmode

Python
- updated python bindings (burstmode const and roi arguments)

Cmd generation
- added pragma once in Caller.in.h as Caller is included in test files

m3: num channels due to #counters < 3
* workaround for m3 for messed up num channels (client always assumes all counters enabled and adds them to num channels), fix for hdf5

g2: exptime master file inconsistency
- exptime didnt match because of round of when setting burst mode (sets to a different clk divider)
- so updating actual time for all timers (exptime, period, subexptime etc, )  in Module class, get timer values from detector when setting it and then send to receiver to write in master file

ctb image size incorrect:
-  write actual size into master file and not the reserved size (digital reduces depending on dbit list and dbit offset)
- added a calculate ctb image size free function in generalData.h that is used there as well as for the tests.


master file inconsistencies
- refactored master attributes writing using templates
-    names changed to keep it consistent between json and hdf5 master file (Version, Pixels, Exposure Times, GateDelays, Acquisition Period, etc.)
-  datatypes changed to keep it simple where possible: imageSize, dynamicRange, tengiga, quad, readnrows, analog, analogsamples, digital, digitalsamples, dbitreorder, dbitoffset, transceivermask, transeiver, transceiversamples, countermask, gates =>int
- replacing "toString" with arrays, objects etc for eg for scan, rois, etc.
- json header always written (empty dataset or empty brackets)
- hdf5 needs const char* so have to convert strings to it, but taking care that strings exist prior to push_back
- master attributes (redundant string literals->error prone

tests for master file
- suppressed deprecated functions in rapidjson warnings just for the tests
- added slsREceiverSoftware/src to allow access to receiver_defs.h to test binary/hdf5 version
- refactored acquire tests by moving all the acquire tests from individual detector type files to a single one=test-Caller-acquire.cpp
- set some default settings (loadBasicSettings) for a basic acquire at load config part for the test_simulator python scripts. so minimum number of settings for detector to be set for any acquire tests.
- added tests to test master files for json and hdf5= test-Caller-master-attributes.cpp
- added option to add '-m' markers for tests using test_simulator python script
2025-07-25 11:45:26 +02:00

300 lines
9.9 KiB
Python

# SPDX-License-Identifier: LGPL-3.0-or-other
# Copyright (C) 2021 Contributors to the SLS Detector Package
'''
This file is used for common utils used for integration tests between simulators and receivers.
'''
import sys, subprocess, time, argparse
from enum import Enum
from colorama import Fore, Style, init
from datetime import timedelta
from slsdet import Detector, detectorSettings, burstMode
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
SERVER_START_PORTNO=1900
init(autoreset=True)
class LogLevel(Enum):
INFO = 0
INFORED = 1
INFOGREEN = 2
INFOBLUE = 3
WARNING = 4
ERROR = 5
DEBUG = 6
LOG_LABELS = {
LogLevel.WARNING: "WARNING: ",
LogLevel.ERROR: "ERROR: ",
LogLevel.DEBUG: "DEBUG: "
}
LOG_COLORS = {
LogLevel.INFO: Fore.WHITE,
LogLevel.INFORED: Fore.RED,
LogLevel.INFOGREEN: Fore.GREEN,
LogLevel.INFOBLUE: Fore.BLUE,
LogLevel.WARNING: Fore.YELLOW,
LogLevel.ERROR: Fore.RED,
LogLevel.DEBUG: Fore.CYAN
}
def Log(level: LogLevel, message: str, stream=sys.stdout):
color = LOG_COLORS.get(level, Fore.WHITE)
label = LOG_LABELS.get(level, "")
print(f"{color}{label}{message}{Style.RESET_ALL}", file=stream, flush=True)
class RuntimeException (Exception):
def __init__ (self, message):
Log(LogLevel.ERROR, message)
super().__init__(message)
def checkIfProcessRunning(processName):
cmd = f"pgrep -f {processName}"
res = subprocess.getoutput(cmd)
return res.strip().splitlines()
def killProcess(name, fp):
pids = checkIfProcessRunning(name)
if pids:
Log(LogLevel.INFO, f"Killing '{name}' processes with PIDs: {', '.join(pids)}", fp)
for pid in pids:
try:
p = subprocess.run(['kill', pid])
if p.returncode != 0 and bool(checkIfProcessRunning(name)):
raise RuntimeException(f"Could not kill {name} with pid {pid}")
except Exception as e:
raise RuntimeException(f"Failed to kill process {name} pid:{pid}. Error: {str(e)}") from e
#else:
# Log(LogLevel.INFO, 'process not running : ' + name)
def cleanSharedmemory(fp):
Log(LogLevel.INFO, 'Cleaning up shared memory', fp)
try:
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp)
except:
raise RuntimeException('Could not free shared memory')
def cleanup(fp):
Log(LogLevel.INFO, 'Cleaning up')
Log(LogLevel.INFO, 'Cleaning up', fp)
killProcess('DetectorServer_virtual', fp)
killProcess('slsReceiver', fp)
killProcess('slsMultiReceiver', fp)
killProcess('slsFrameSynchronizer', fp)
killProcess('frameSynchronizerPullSocket', fp)
cleanSharedmemory(fp)
def startProcessInBackground(cmd, fp):
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd))
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd), fp)
try:
p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
except Exception as e:
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
def startProcessInBackgroundWithLogFile(cmd, fp, log_file_name: str):
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name)
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name, fp)
try:
with open(log_file_name, 'w') as log_fp:
subprocess.Popen(cmd, stdout=log_fp, stderr=log_fp, text=True)
except Exception as e:
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
def checkLogForErrors(fp, log_file_path: str):
try:
with open(log_file_path, 'r') as log_file:
for line in log_file:
if 'Error' in line:
Log(LogLevel.ERROR, f"Error found in log: {line.strip()}")
Log(LogLevel.ERROR, f"Error found in log: {line.strip()}", fp)
raise RuntimeException("Error found in log file")
except FileNotFoundError:
print(f"Log file not found: {log_file_path}")
raise
except Exception as e:
print(f"Exception while reading log: {e}")
raise
def runProcessWithLogFile(name, cmd, fp, log_file_name):
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name)
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name, fp)
Log(LogLevel.INFOBLUE, 'Cmd: ' + ' '.join(cmd), fp)
try:
with open(log_file_name, 'w') as log_fp:
subprocess.run(cmd, stdout=log_fp, stderr=log_fp, check=True, text=True)
except subprocess.CalledProcessError as e:
pass
except Exception as e:
Log(LogLevel.ERROR, f'Failed to run {name}:{str(e)}', fp)
raise RuntimeException(f'Failed to run {name}:{str(e)}')
with open (log_file_name, 'r') as f:
for line in f:
if "FAILED" in line:
raise RuntimeException(f'{line}')
Log(LogLevel.INFOGREEN, name + ' successful!\n')
Log(LogLevel.INFOGREEN, name + ' successful!\n', fp)
def startDetectorVirtualServer(name :str, num_mods, fp):
for i in range(num_mods):
port_no = SERVER_START_PORTNO + (i * 2)
cmd = [name + 'DetectorServer_virtual', '-p', str(port_no)]
startProcessInBackgroundWithLogFile(cmd, fp, "/tmp/virtual_det_" + name + "_" + str(i) + ".txt")
match name:
case 'jungfrau':
time.sleep(7)
case 'gotthard2':
time.sleep(5)
case _:
time.sleep(3)
def connectToVirtualServers(name, num_mods):
try:
d = Detector()
except Exception as e:
raise RuntimeException(f'Could not create Detector object for {name}. Error: {str(e)}') from e
counts_sec = 5
while (counts_sec != 0):
try:
d.virtual = [num_mods, SERVER_START_PORTNO]
break
except Exception as e:
# stop server still not up, wait a bit longer
if "Cannot connect to" in str(e):
Log(LogLevel.WARNING, f'Still waiting for {name} virtual server to be up...{counts_sec}s left')
time.sleep(1)
counts_sec -= 1
else:
raise
return d
def loadConfig(name, rx_hostname, settingsdir, fp, num_mods = 1, num_frames = 1):
Log(LogLevel.INFO, 'Loading config')
Log(LogLevel.INFO, 'Loading config', fp)
try:
d = connectToVirtualServers(name, num_mods)
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger':
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = rx_hostname
d.udp_dstip = 'auto'
if name != "eiger":
d.udp_srcip = 'auto'
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
d.powerchip = 1
if name == "xilinx_ctb":
d.configureTransceiver()
if name == "eiger":
d.trimen = [4500, 5400, 6400]
d.settingspath = settingsdir + '/eiger/'
d.setThresholdEnergy(4500, detectorSettings.STANDARD)
d.frames = num_frames
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
return d
# for easy acquire
def loadBasicSettings(name, d, fp):
Log(LogLevel.INFO, 'Loading basic settings for ' + name)
Log(LogLevel.INFO, 'Loading basic settings for ' + name, fp)
try:
# basic settings for easy acquire
if name == "jungfrau":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 512
elif name == "moench":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 400
elif name == "eiger":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 256
d.dr = 16
elif name == "mythen3":
d.setExptime(-1, timedelta(microseconds = 200))
d.dr = 16
d.counters = [0, 1]
elif name == "gotthard2":
d.exptime = timedelta(microseconds = 200)
d.burstmode = burstMode.CONTINUOUS_EXTERNAL
d.bursts = 1
d.burstperiod = 0
d.period = timedelta(milliseconds = 2)
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
def ParseArguments(description, default_num_mods=1, markers=0):
parser = argparse.ArgumentParser(description)
parser.add_argument('rx_hostname', nargs='?', default='localhost',
help='Hostname/IP of the current machine')
parser.add_argument('settingspath', nargs='?', default='../../settingsdir',
help='Relative or absolute path to the settings directory')
parser.add_argument('-n', '--num-mods', nargs='?', default=default_num_mods, type=int,
help='Number of modules to test with')
parser.add_argument('-f', '--num-frames', nargs='?', default=1, type=int,
help='Number of frames to test with')
parser.add_argument('-s', '--servers', nargs='*',
help='Detector servers to run')
if markers == 1:
parser.add_argument('-m', '--markers', nargs='?', default ='[.cmdcall]',
help = 'Markers to use for cmd tests, default: [.cmdcall]')
args = parser.parse_args()
# Set default server list if not provided
if args.servers is None:
args.servers = [
'eiger',
'jungfrau',
'mythen3',
'gotthard2',
'ctb',
'moench',
'xilinx_ctb'
]
msg = (
'Arguments:\n'
f'rx_hostname: {args.rx_hostname}\n'
f"settingspath: '{args.settingspath}'\n"
f"servers: '{' '.join(args.servers)}'\n"
f"num_mods: '{args.num_mods}'\n"
f"num_frames: '{args.num_frames}'"
)
if markers == 1:
msg += f"\nmarkers: '{args.markers}'"
Log(LogLevel.INFO, msg)
return args