mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2026-01-13 18:01:43 +01:00
* basic ctb config api for register and bit names * tests for define and definelist pass. yet to implement using them for reg, setbit, clearbit and getbit * improved autocomplete for getbit,setbit, clearbit * validate autocomplete * definelist has no put * updating help * converting char array+int in runtimeerror compiles but throws at runtime.Fixed.Tested for it. Also check if string or int before using getregisterdefinitonbyvalue to see if it threw to call the other function. because both of it can throw and we should differentiate the issues for both * removed std::vector<std::pair<string,int> to std::map<string, int> for defiitions list * Dev/define cmd tie bit to reg (#1328) * strong type * moved everythign to bit_utils class * pybindings * added tests for python * removed duplicates * removed bit names in reg * changed BitPosition to BitAddress * Using define reg/bit from python (#1344) * define_bit, define_addr in python. * setBit/clearBit takes int or addr * added example using bits * split define into 2 commands define_reg and define_bit, definelist into 2: definelist_reg and definelist_bit * allow string for register and bit names in c++ api * refactor from github comments * naming refactoring (getRegisterDefnition to retunr name and address specifically * added marker for 8 cmd tests connected to define, changed macro to static constexpr * changed bitPosition from int to uint32_t * got rid of setbitposition and setaddress, instead overloaded constructor to take in strings so that the conversion from string to bit address members, takes place within the class for easy maintainance in case type changes * Removing implicit conversions: RegisterAddresss and RegisterValue: Removed the implicit conversions. RegisterAddress: Changed member name from address_ to value_ and method as well to value(). RegisterValue: Also added | operator to be able to concatenate with uint32_t. Same in python bindings (but could not find the tests to modify * Allowed concatenation with other RegisterValue, made them all constexpr * fix a ctbConfig test * Maponstack works with integration tests, but need unit tests * tests on mapstack * fixed ctb tests and FixedString being initialized with gibberish * removing parsing from string inside the class RegisterAddress, BitAddress and RegisterValue * updated python bindings * fixed bit utils test * renaming getRegisterDefintiionAddress/Name=>getRegisterAddress/Name and similary for getBitDefinitionAddress/Name * updated python bindings * fix tests (format) * a few python tests added and python bindings corrected * replaceing str with __str__ for bit.cpp * repr reimplemented for bit.cpp * removed make with registerAddress etc * starting server for tests per session and nor module * killprocess throws if no process found-> github runs fails, changed to pkill and not throw * clean shm shouldnt raise, in ci binary not found * ignoring these tests for CI, which fail on CI because simulators are not generated in CI. This is in another PR, where it should work --------- Co-authored-by: Erik Fröjdh <erik.frojdh@gmail.com> Co-authored-by: froejdh_e <erik.frojdh@psi.ch>
336 lines
11 KiB
Python
336 lines
11 KiB
Python
# SPDX-License-Identifier: LGPL-3.0-or-other
|
|
# Copyright (C) 2021 Contributors to the SLS Detector Package
|
|
'''
|
|
This file is used for common utils used for integration tests between simulators and receivers.
|
|
'''
|
|
|
|
from pathlib import Path
|
|
import sys, subprocess, time, argparse
|
|
from enum import Enum
|
|
from colorama import Fore, Style, init
|
|
from datetime import timedelta
|
|
|
|
from slsdet import Detector, Ctb, detectorSettings, burstMode
|
|
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
|
|
SERVER_START_PORTNO=1900
|
|
|
|
init(autoreset=True)
|
|
|
|
class LogLevel(Enum):
|
|
INFO = 0
|
|
INFORED = 1
|
|
INFOGREEN = 2
|
|
INFOBLUE = 3
|
|
WARNING = 4
|
|
ERROR = 5
|
|
DEBUG = 6
|
|
|
|
|
|
LOG_LABELS = {
|
|
LogLevel.WARNING: "WARNING: ",
|
|
LogLevel.ERROR: "ERROR: ",
|
|
LogLevel.DEBUG: "DEBUG: "
|
|
}
|
|
|
|
|
|
LOG_COLORS = {
|
|
LogLevel.INFO: Fore.WHITE,
|
|
LogLevel.INFORED: Fore.RED,
|
|
LogLevel.INFOGREEN: Fore.GREEN,
|
|
LogLevel.INFOBLUE: Fore.BLUE,
|
|
LogLevel.WARNING: Fore.YELLOW,
|
|
LogLevel.ERROR: Fore.RED,
|
|
LogLevel.DEBUG: Fore.CYAN
|
|
}
|
|
|
|
|
|
def Log(level: LogLevel, message: str, stream=sys.stdout):
|
|
color = LOG_COLORS.get(level, Fore.WHITE)
|
|
label = LOG_LABELS.get(level, "")
|
|
print(f"{color}{label}{message}{Style.RESET_ALL}", file=stream, flush=True)
|
|
|
|
|
|
class RuntimeException (Exception):
|
|
def __init__ (self, message):
|
|
Log(LogLevel.ERROR, message)
|
|
super().__init__(message)
|
|
|
|
|
|
def checkIfProcessRunning(processName):
|
|
cmd = f"pgrep -f {processName}"
|
|
res = subprocess.getoutput(cmd)
|
|
return res.strip().splitlines()
|
|
|
|
|
|
def killProcess(name, fp):
|
|
'''
|
|
Kill all processes matching name.
|
|
Does not fail if process is already gone.
|
|
'''
|
|
Log(LogLevel.INFO, f"Attempting to kill '{name}' (if running)", fp)
|
|
|
|
# pkill returns:
|
|
# 0 -> process killed
|
|
# 1 -> no process found OK
|
|
subprocess.run(['pkill', '-f', name],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL)
|
|
|
|
|
|
def cleanSharedmemory(fp):
|
|
Log(LogLevel.INFO, 'Cleaning up shared memory', fp)
|
|
try:
|
|
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp, check = False)
|
|
except FileNotFoundError:
|
|
# Binary not available (e.g. on CI) → ignore
|
|
Log(LogLevel.INFO, 'sls_detector_get not found, skipping shared memory cleanup', fp)
|
|
except Exception as e:
|
|
# Any other cleanup failure should NEVER fail tests
|
|
Log(LogLevel.WARN, f'Ignoring shared memory cleanup error: {e}', fp)
|
|
|
|
|
|
def cleanup(fp):
|
|
Log(LogLevel.INFO, 'Cleaning up')
|
|
Log(LogLevel.INFO, 'Cleaning up', fp)
|
|
killProcess('DetectorServer_virtual', fp)
|
|
killProcess('slsReceiver', fp)
|
|
killProcess('slsMultiReceiver', fp)
|
|
killProcess('slsFrameSynchronizer', fp)
|
|
killProcess('frameSynchronizerPullSocket', fp)
|
|
cleanSharedmemory(fp)
|
|
|
|
|
|
def startProcessInBackground(cmd, fp):
|
|
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd))
|
|
Log(LogLevel.INFO, 'Starting up ' + ' '.join(cmd), fp)
|
|
try:
|
|
p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
|
|
except Exception as e:
|
|
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
|
|
|
|
|
|
def startProcessInBackgroundWithLogFile(cmd, fp, log_file_name: str):
|
|
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name)
|
|
Log(LogLevel.INFOBLUE, 'Starting up ' + ' '.join(cmd) + '. Log: ' + log_file_name, fp)
|
|
try:
|
|
with open(log_file_name, 'w') as log_fp:
|
|
subprocess.Popen(cmd, stdout=log_fp, stderr=log_fp, text=True)
|
|
except Exception as e:
|
|
raise RuntimeException(f'Failed to start {cmd}:{str(e)}') from e
|
|
|
|
|
|
def checkLogForErrors(fp, log_file_path: str):
|
|
try:
|
|
with open(log_file_path, 'r') as log_file:
|
|
for line in log_file:
|
|
if 'Error' in line:
|
|
Log(LogLevel.ERROR, f"Error found in log: {line.strip()}")
|
|
Log(LogLevel.ERROR, f"Error found in log: {line.strip()}", fp)
|
|
raise RuntimeException("Error found in log file")
|
|
except FileNotFoundError:
|
|
print(f"Log file not found: {log_file_path}")
|
|
raise
|
|
except Exception as e:
|
|
print(f"Exception while reading log: {e}")
|
|
raise
|
|
|
|
|
|
def runProcessWithLogFile(name, cmd, fp, log_file_name):
|
|
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name)
|
|
Log(LogLevel.INFOBLUE, 'Running ' + name + '. Log: ' + log_file_name, fp)
|
|
Log(LogLevel.INFOBLUE, 'Cmd: ' + ' '.join(cmd), fp)
|
|
try:
|
|
with open(log_file_name, 'w') as log_fp:
|
|
subprocess.run(cmd, stdout=log_fp, stderr=log_fp, check=True, text=True)
|
|
except subprocess.CalledProcessError as e:
|
|
pass
|
|
except Exception as e:
|
|
Log(LogLevel.ERROR, f'Failed to run {name}:{str(e)}', fp)
|
|
raise RuntimeException(f'Failed to run {name}:{str(e)}')
|
|
|
|
with open (log_file_name, 'r') as f:
|
|
for line in f:
|
|
if "FAILED" in line:
|
|
raise RuntimeException(f'{line}')
|
|
|
|
Log(LogLevel.INFOGREEN, name + ' successful!\n')
|
|
Log(LogLevel.INFOGREEN, name + ' successful!\n', fp)
|
|
|
|
|
|
def startDetectorVirtualServer(name :str, num_mods, fp):
|
|
for i in range(num_mods):
|
|
port_no = SERVER_START_PORTNO + (i * 2)
|
|
cmd = [name + 'DetectorServer_virtual', '-p', str(port_no)]
|
|
startProcessInBackgroundWithLogFile(cmd, fp, "/tmp/virtual_det_" + name + "_" + str(i) + ".txt")
|
|
match name:
|
|
case 'jungfrau':
|
|
time.sleep(7)
|
|
case 'gotthard2':
|
|
time.sleep(5)
|
|
case _:
|
|
time.sleep(3)
|
|
|
|
|
|
def connectToVirtualServers(name, num_mods, ctb_object=False):
|
|
try:
|
|
if ctb_object:
|
|
d = Ctb()
|
|
else:
|
|
d = Detector()
|
|
except Exception as e:
|
|
raise RuntimeException(f'Could not create Detector object for {name}. Error: {str(e)}') from e
|
|
|
|
counts_sec = 5
|
|
while (counts_sec != 0):
|
|
try:
|
|
d.virtual = [num_mods, SERVER_START_PORTNO]
|
|
break
|
|
except Exception as e:
|
|
# stop server still not up, wait a bit longer
|
|
if "Cannot connect to" in str(e):
|
|
Log(LogLevel.WARNING, f'Still waiting for {name} virtual server to be up...{counts_sec}s left')
|
|
time.sleep(1)
|
|
counts_sec -= 1
|
|
else:
|
|
raise
|
|
|
|
return d
|
|
|
|
def startReceiver(num_mods, fp):
|
|
if num_mods == 1:
|
|
cmd = ['slsReceiver']
|
|
else:
|
|
cmd = ['slsMultiReceiver', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
|
|
# in 10.0.0
|
|
#cmd = ['slsMultiReceiver', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
|
|
startProcessInBackground(cmd, fp)
|
|
time.sleep(1)
|
|
|
|
|
|
def loadConfig(name, rx_hostname = 'localhost', settingsdir = None, log_file_fp = None, num_mods = 1, num_frames = 1, num_interfaces = 1):
|
|
Log(LogLevel.INFO, 'Loading config')
|
|
Log(LogLevel.INFO, 'Loading config', log_file_fp)
|
|
try:
|
|
d = connectToVirtualServers(name, num_mods)
|
|
|
|
if name == 'jungfrau' or name == 'moench':
|
|
d.numinterfaces = num_interfaces
|
|
|
|
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
|
|
if name == 'eiger' or name == 'jungfrau' or name == 'moench':
|
|
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
|
|
|
|
d.rx_hostname = rx_hostname
|
|
|
|
d.udp_dstip = 'auto'
|
|
|
|
if name != "eiger":
|
|
d.udp_srcip = 'auto'
|
|
|
|
if name == "jungfrau" or name == "moench":
|
|
d.udp_dstip2 = 'auto'
|
|
|
|
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
|
|
d.powerchip = 1
|
|
|
|
if name == "xilinx_ctb":
|
|
d.configureTransceiver()
|
|
|
|
if settingsdir is not None and name in ['eiger', 'mythen3']:
|
|
d.settingspath = settingsdir + '/' + name + '/'
|
|
d.trimen = [4500, 5400, 6400] if name == 'eiger' else [4000, 6000, 8000, 12000]
|
|
d.setThresholdEnergy(4500, detectorSettings.STANDARD)
|
|
|
|
d.frames = num_frames
|
|
|
|
except Exception as e:
|
|
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
|
|
|
|
return d
|
|
|
|
# for easy acquire
|
|
def loadBasicSettings(name, d, fp):
|
|
Log(LogLevel.INFO, 'Loading basic settings for ' + name)
|
|
Log(LogLevel.INFO, 'Loading basic settings for ' + name, fp)
|
|
try:
|
|
# basic settings for easy acquire
|
|
if name == "jungfrau":
|
|
d.exptime = timedelta(microseconds = 200)
|
|
d.readnrows = 512
|
|
elif name == "moench":
|
|
d.exptime = timedelta(microseconds = 200)
|
|
d.readnrows = 400
|
|
elif name == "eiger":
|
|
d.exptime = timedelta(microseconds = 200)
|
|
d.readnrows = 256
|
|
d.dr = 16
|
|
elif name == "mythen3":
|
|
d.setExptime(-1, timedelta(microseconds = 200))
|
|
d.dr = 16
|
|
d.counters = [0, 1, 2]
|
|
elif name == "gotthard2":
|
|
d.exptime = timedelta(microseconds = 200)
|
|
d.burstmode = burstMode.CONTINUOUS_EXTERNAL
|
|
d.bursts = 1
|
|
d.burstperiod = 0
|
|
d.period = timedelta(milliseconds = 2)
|
|
|
|
except Exception as e:
|
|
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}') from e
|
|
|
|
def ParseArguments(description, default_num_mods=1, markers=False, general_tests_option=False):
|
|
parser = argparse.ArgumentParser(description)
|
|
|
|
default_settings_path = Path(__file__).resolve().parents[2] / "settingsdir"
|
|
|
|
parser.add_argument('rx_hostname', nargs='?', default='localhost',
|
|
help='Hostname/IP of the current machine')
|
|
parser.add_argument('settingspath', nargs='?', default=str(default_settings_path),
|
|
help='Relative or absolute path to the settings directory')
|
|
parser.add_argument('-n', '--num-mods', nargs='?', default=default_num_mods, type=int,
|
|
help='Number of modules to test with')
|
|
parser.add_argument('-f', '--num-frames', nargs='?', default=1, type=int,
|
|
help='Number of frames to test with')
|
|
parser.add_argument('-s', '--servers', nargs='*',
|
|
help='Detector servers to run')
|
|
if markers:
|
|
parser.add_argument('-m', '--markers', nargs='?', default ='[.cmdcall]',
|
|
help = 'Markers to use for cmd tests, default: [.cmdcall]')
|
|
|
|
if general_tests_option:
|
|
parser.add_argument('-g', '--general_tests', action='store_true',
|
|
help = 'Enable general tests (no value needed)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set default server list if not provided
|
|
if args.servers is None:
|
|
args.servers = [
|
|
'eiger',
|
|
'jungfrau',
|
|
'mythen3',
|
|
'gotthard2',
|
|
'ctb',
|
|
'moench',
|
|
'xilinx_ctb'
|
|
]
|
|
|
|
msg = (
|
|
'Arguments:\n'
|
|
f'rx_hostname: {args.rx_hostname}\n'
|
|
f"settingspath: '{args.settingspath}'\n"
|
|
f"servers: '{' '.join(args.servers)}'\n"
|
|
f"num_mods: '{args.num_mods}'\n"
|
|
f"num_frames: '{args.num_frames}'"
|
|
)
|
|
if markers:
|
|
msg += f"\nmarkers: '{args.markers}'"
|
|
|
|
if general_tests_option:
|
|
msg += f"\ngeneral_tests: '{args.general_tests}'"
|
|
|
|
Log(LogLevel.INFO, msg)
|
|
|
|
|
|
return args
|