Files
slsDetectorPackage/tests/scripts/utils_for_test.py
Dhanya Thattil b759c72bb6
Some checks failed
Run Simulator Tests on local RHEL9 / build (push) Failing after 3m22s
Build on RHEL9 / build (push) Successful in 3m24s
Build on RHEL8 / build (push) Successful in 4m47s
Run Simulator Tests on local RHEL8 / build (push) Failing after 5m2s
more fixes
2026-02-06 15:27:14 +01:00

429 lines
14 KiB
Python

# SPDX-License-Identifier: LGPL-3.0-or-other
# Copyright (C) 2021 Contributors to the SLS Detector Package
'''
This file is used for common utils used for integration tests between simulators and receivers.
'''
import os
from pathlib import Path
import sys, subprocess, time, argparse
from enum import Enum
from colorama import Fore, Style, init
from datetime import timedelta
from contextlib import contextmanager
from slsdet import Detector, Ctb, detectorSettings, burstMode
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
SERVER_START_PORTNO=1900
LOG_PREFIX_FNAME = "/tmp/slsDetectorPackage_"
init(autoreset=True)
build_dir = Path(__file__).resolve().parents[2] / "build" / "bin"
class LogLevel(Enum):
INFO = 0
INFORED = 1
INFOGREEN = 2
INFOBLUE = 3
WARNING = 4
ERROR = 5
DEBUG = 6
LOG_LABELS = {
LogLevel.WARNING: "WARNING: ",
LogLevel.ERROR: "ERROR: ",
LogLevel.DEBUG: "DEBUG: "
}
LOG_COLORS = {
LogLevel.INFO: Fore.WHITE,
LogLevel.INFORED: Fore.RED,
LogLevel.INFOGREEN: Fore.GREEN,
LogLevel.INFOBLUE: Fore.BLUE,
LogLevel.WARNING: Fore.YELLOW,
LogLevel.ERROR: Fore.RED,
LogLevel.DEBUG: Fore.CYAN
}
def Log(level: LogLevel, message: str, stream=sys.stdout, both=False):
color = LOG_COLORS.get(level, Fore.WHITE)
label = LOG_LABELS.get(level, "")
print(f"{color}{label}{message}{Style.RESET_ALL}", file=stream, flush=True)
if both and stream != sys.stdout:
print(f"{color}{label}{message}{Style.RESET_ALL}", file=sys.stdout, flush=True)
class RuntimeException (Exception):
def __init__ (self, message):
Log(LogLevel.ERROR, message)
super().__init__(message)
@contextmanager
def optional_file(file_path=None, mode='w', quiet_mode=False):
if file_path:
f = open(file_path, mode)
try:
yield f
finally:
f.close()
else:
if quiet_mode:
f = open(os.devnull, mode)
try:
yield f
finally:
f.close()
else:
yield sys.stdout
def checkIfProcessRunning(processName):
res = subprocess.getoutput(f"pgrep -f {processName}")
return res.strip().splitlines()
def killProcess(name, fp):
'''
Kill all processes matching name.
Does not fail if process is already gone.
'''
Log(LogLevel.INFO, f"Attempting to kill '{name}' (if running)", fp)
# pkill returns:
# 0 -> process killed
# 1 -> no process found OK
subprocess.run(['pkill', '-f', name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def cleanSharedmemory(fp):
Log(LogLevel.INFO, 'Cleaning up shared memory', fp)
try:
p = subprocess.run([build_dir / 'sls_detector_get', 'free'], stdout=fp, stderr=fp)
except Exception as e:
raise RuntimeException(f'Could not free shared memory: {str(e)}')
def cleanup(fp):
Log(LogLevel.INFO, 'Cleaning up', fp, True)
killProcess('DetectorServer_virtual', fp)
killProcess('slsReceiver', fp)
killProcess('slsMultiReceiver', fp)
killProcess('slsFrameSynchronizer', fp)
killProcess('frameSynchronizerPullSocket', fp)
cleanSharedmemory(fp)
def startProcessInBackground(cmd, fp, log_file_name: str, quiet_mode=False):
info_text = 'Starting up ' + ' '.join(cmd)
if log_file_name:
info_text += '. Log: ' + log_file_name
Log(LogLevel.INFO, f'{info_text}', fp, True)
try:
with optional_file(log_file_name, 'w', quiet_mode) as log_fp:
subprocess.Popen(cmd, stdout=log_fp, stderr=log_fp, text=True)
except Exception as e:
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
def checkLogForErrors(fp, log_file_path: str):
try:
with open(log_file_path, 'r') as log_file:
for line in log_file:
if 'Error' in line:
Log(LogLevel.ERROR, f"Error found in log: {line.strip()}", fp, True)
raise RuntimeException("Error found in log file")
except FileNotFoundError:
print(f"Log file not found: {log_file_path}")
raise
except Exception as e:
print(f"Exception while reading log: {e}")
raise
def checkLogForErrorsOrSummary(fp, lines, source_name=""):
failed = False # if it found "failed" or "FAILED" in file
failed_msg = ""
printing_error = False # print every line in file after failure
printing_summary = False # print summary if no failure
for line in lines:
line_stripped = line.rstrip()
# Detect failure (case-insensitive)
if not failed and (": FAILED:" in line or " failed\nassertions" in line):
failed = True
failed_msg = line_stripped
printing_error = True
Log(LogLevel.ERROR, line_stripped, fp)
if source_name:
Log(LogLevel.ERROR, f"Error log from file: {source_name}")
Log(LogLevel.ERROR, "="*79)
# After failure, log everything as ERROR
if printing_error:
print(f"{line_stripped}")
continue
# Summary delimiter
if line_stripped.startswith("====="):
printing_summary = True
# No failure - print summary lines
if printing_summary:
print(f"{line_stripped}")
if failed:
Log(LogLevel.ERROR, "="*79)
raise RuntimeException(f'Test failed: {failed_msg}')
else:
print("="*79)
def runProcess(name, cmd, fp, log_file_name = None, quiet_mode=False):
info_text = 'Running ' + name + '.'
if log_file_name:
info_text += ' Log: ' + log_file_name
Log(LogLevel.INFOBLUE, info_text, fp, True)
Log(LogLevel.INFOBLUE, 'Cmd: ' + ' '.join(cmd), fp, True)
captured_log = []
try:
if log_file_name:
with optional_file(log_file_name, 'w', quiet_mode) as log_fp:
subprocess.run(cmd, stdout=log_fp, stderr=log_fp, check=True, text=True)
else:
capture = subprocess.run(cmd, check=True, text=True, capture_output=True)
captured_log = capture.stdout.splitlines()
except subprocess.CalledProcessError as e:
print("error: ", str(e))
if not log_file_name and e.stdout:
captured_log = e.stdout.splitlines()
except Exception as e:
print("something else failed")
Log(LogLevel.ERROR, f'Failed to run {name}:{str(e)}', fp)
raise RuntimeException(f'Failed to run {name}:{str(e)}')
if log_file_name:
with optional_file(log_file_name, 'r') as log_fp:
checkLogForErrorsOrSummary(fp, log_fp, log_file_name)
else:
checkLogForErrorsOrSummary(fp, captured_log)
Log(LogLevel.INFOGREEN, name + ' successful!\n', fp, True)
def startDetectorVirtualServer(name :str, num_mods, fp, no_log_file = False, quiet_mode=False):
for i in range(num_mods):
port_no = SERVER_START_PORTNO + (i * 2)
cmd = [str(build_dir / (name + 'DetectorServer_virtual')), '-p', str(port_no)]
fname = LOG_PREFIX_FNAME + "virtual_det_" + name + "_" + str(SERVER_START_PORTNO) + ".txt"
if no_log_file:
fname = None
startProcessInBackground(cmd, fp, fname, quiet_mode)
match name:
case 'jungfrau':
time.sleep(7)
case 'gotthard2':
time.sleep(5)
case _:
time.sleep(3)
def connectToVirtualServers(name, num_mods, ctb_object=False):
try:
if ctb_object:
d = Ctb()
else:
d = Detector()
except Exception as e:
raise RuntimeException(f'Could not create Detector object for {name}. Error: {str(e)}') from e
counts_sec = 5
while (counts_sec != 0):
try:
d.virtual = [num_mods, SERVER_START_PORTNO]
break
except Exception as e:
# stop server still not up, wait a bit longer
if "Cannot connect to" in str(e):
Log(LogLevel.WARNING, f'Still waiting for {name} virtual server to be up...{counts_sec}s left')
time.sleep(1)
counts_sec -= 1
else:
raise
return d
def startReceiver(num_mods, fp, no_log_file = False, quiet_mode=False):
if num_mods == 1:
cmd = [str(build_dir / 'slsReceiver')]
fname = LOG_PREFIX_FNAME + "slsReceiver.txt"
else:
cmd = [str(build_dir / 'slsMultiReceiver'), str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
fname = LOG_PREFIX_FNAME + "slsMultiReceiver.txt"
# in 10.0.0
#cmd = ['slsMultiReceiver', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
if no_log_file:
fname = None
startProcessInBackground(cmd, fp, fname, quiet_mode)
time.sleep(1)
def loadConfig(name, rx_hostname = 'localhost', settingsdir = None, log_file_fp = None, num_mods = 1, num_frames = 1, num_interfaces = 1):
Log(LogLevel.INFO, 'Loading config', log_file_fp, True)
try:
d = connectToVirtualServers(name, num_mods)
if name == 'jungfrau' or name == 'moench':
d.numinterfaces = num_interfaces
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger' or num_interfaces == 2:
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
d.rx_hostname = rx_hostname
d.udp_dstip = 'auto'
if name != "eiger":
d.udp_srcip = 'auto'
if num_interfaces == 2:
d.udp_dstip2 = 'auto'
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
d.powerchip = 1
if name == "xilinx_ctb":
d.configureTransceiver()
if settingsdir is not None and name in ['eiger', 'mythen3']:
d.settingspath = settingsdir + '/' + name + '/'
d.trimen = [4500, 5400, 6400] if name == 'eiger' else [4000, 6000, 8000, 12000]
d.setThresholdEnergy(4500, detectorSettings.STANDARD)
d.frames = num_frames
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
return d
# for easy acquire
def loadBasicSettings(name, d, fp):
Log(LogLevel.INFO, 'Loading basic settings for ' + name, fp, True)
try:
# basic settings for easy acquire
if name == "jungfrau":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 512
elif name == "moench":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 400
elif name == "eiger":
d.exptime = timedelta(microseconds = 200)
d.readnrows = 256
d.dr = 16
elif name == "mythen3":
d.setExptime(-1, timedelta(microseconds = 200))
d.dr = 16
d.counters = [0, 1, 2]
elif name == "gotthard2":
d.exptime = timedelta(microseconds = 200)
d.burstmode = burstMode.CONTINUOUS_EXTERNAL
d.bursts = 1
d.burstperiod = 0
d.period = timedelta(milliseconds = 2)
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
def ParseArguments(description, default_num_mods=2, specific_tests=False, general_tests_option=False):
parser = argparse.ArgumentParser(description)
default_settings_path = Path(__file__).resolve().parents[2] / "settingsdir"
parser.add_argument('rx_hostname', nargs='?', default='localhost',
help='Hostname/IP of the current machine')
parser.add_argument('settingspath', nargs='?', default=str(default_settings_path),
help='Relative or absolute path to the settings directory')
parser.add_argument('-n', '--num-mods', nargs='?', default=default_num_mods, type=int,
help='Number of modules to test with')
parser.add_argument('-f', '--num-frames', nargs='?', default=1, type=int,
help='Number of frames to test with')
parser.add_argument('-s', '--servers', nargs='*',
help='Detector servers to run')
parser.add_argument('-nlf', '--no-log-file', action='store_true',
help='Dont write output to log file')
parser.add_argument('-q', '--quiet', action='store_true',
help='Dont write to stdout when possible.')
if specific_tests:
parser.add_argument('-t', '--tests', nargs='?', default ='[.detectorintegration]',
help = 'Test markers or specific test name to use for tests, default: [.detectorintegration]')
if general_tests_option:
parser.add_argument('-g', '--general-tests', action='store_true',
help = 'Enable general tests (no value needed)')
args = parser.parse_args()
# Set default server list if not provided
if args.servers is None:
args.servers = [
'eiger',
'jungfrau',
'mythen3',
'gotthard2',
'ctb',
'moench',
'xilinx_ctb'
]
msg = (
'Arguments:\n'
f'rx_hostname: {args.rx_hostname}\n'
f"settingspath: '{args.settingspath}'\n"
f"servers: '{' '.join(args.servers)}'\n"
f"num_mods: '{args.num_mods}'\n"
f"num_frames: '{args.num_frames}'"
)
if args.no_log_file:
msg += f"\nLog File: Disabled"
else:
msg += f"\nLog File: Enabled"
if args.quiet:
msg += f"\nQuiet mode: Enabled"
else:
msg += f"\nQuiet mode: Disabled"
if specific_tests:
msg += f"\ntests: '{args.tests}'"
if general_tests_option:
msg += f"\ngeneral_tests: '{args.general_tests}'"
Log(LogLevel.INFO, msg)
return args