refactored test_simulator and test_frame_synchronizer tests

This commit is contained in:
2025-05-07 13:57:12 +02:00
parent 684f511bf8
commit 03eded30f9
3 changed files with 391 additions and 446 deletions

View File

@ -3,242 +3,137 @@
'''
This file is used to start up simulators, frame synchronizer, pull sockets, acquire, test and kill them finally.
'''
import argparse
import os, sys, subprocess, time
import shlex, traceback, json
import sys, time
import traceback, json
from slsdet import Detector
from slsdet.defines import DEFAULT_TCP_RX_PORTNO
from utils_for_test import (
Log,
LogLevel,
RuntimeException,
checkIfProcessRunning,
killProcess,
cleanup,
cleanSharedmemory,
startProcessInBackground,
startProcessInBackgroundWithLogFile,
startDetectorVirtualServer,
loadConfig,
ParseArguments
)
LOG_PREFIX_FNAME = '/tmp/slsFrameSynchronizer_test'
MAIN_LOG_FNAME = LOG_PREFIX_FNAME + '_log.txt'
PULL_SOCKET_PREFIX_FNAME = LOG_PREFIX_FNAME + '_pull_socket_'
from slsdet import Detector, detectorType, detectorSettings
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
SERVER_START_PORTNO=1900
from utils_for_test import Log, LogLevel
class RuntimeException (Exception):
def __init__ (self, message):
super().__init__(Log(LogLevel.ERROR, message))
def checkIfProcessRunning(processName):
cmd = f"pgrep -f {processName}"
res = subprocess.getoutput(cmd)
return res.strip().splitlines()
def killProcess(name, fp):
pids = checkIfProcessRunning(name)
if pids:
Log(LogLevel.INFO, f"Killing '{name}' processes with PIDs: {', '.join(pids)}", fp)
for pid in pids:
try:
p = subprocess.run(['kill', pid])
if p.returncode != 0 and bool(checkIfProcessRunning(name)):
raise RuntimeException(f"Could not kill {name} with pid {pid}")
except Exception as e:
raise RuntimeException(f"Failed to kill process {name} pid:{pid}. Exception occured: [code:{e}, msg:{e.stderr}]")
#else:
# Log(LogLevel.INFO, 'process not running : ' + name)
def cleanup(fp):
'''
kill both servers, receivers and clean shared memory
'''
Log(LogLevel.INFO, 'Cleaning up')
Log(LogLevel.INFO, 'Cleaning up', fp)
killProcess('DetectorServer_virtual', fp)
killProcess('slsReceiver', fp)
killProcess('slsMultiReceiver', fp)
killProcess('slsFrameSynchronizer', fp)
killProcess('frameSynchronizerPullSocket', fp)
cleanSharedmemory(fp)
def cleanSharedmemory(fp):
Log(LogLevel.INFO, 'Cleaning up shared memory...', fp)
try:
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp)
except:
raise RuntimeException('Could not free shared memory')
def startProcessInBackground(name, fp):
try:
# in background and dont print output
p = subprocess.Popen(shlex.split(name), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
Log(LogLevel.INFO, 'Starting up ' + name + ' ...', fp)
except Exception as e:
raise RuntimeException(f'Could not start {name}:{e}')
def startServers(name, num_mods):
Log(LogLevel.INFO, 'Starting server')
for i in range(num_mods):
port_no = SERVER_START_PORTNO + (i * 2)
startProcessInBackground(name + 'DetectorServer_virtual -p' + str(port_no), fp)
time.sleep(6)
def startFrameSynchronizerPullSocket(fname, fp):
Log(LogLevel.INFO, 'Starting sync pull socket')
Log(LogLevel.INFO, f"Starting up Synchronizer pull socket. Log: {fname}", fp)
Log(LogLevel.INFO, f"Synchronizer pull socket log: {fname}")
def startFrameSynchronizerPullSocket(name, fp):
fname = PULL_SOCKET_PREFIX_FNAME + name + '.txt'
cmd = ['python', '-u', 'frameSynchronizerPullSocket.py']
try:
with open(fname, 'w') as fp:
subprocess.Popen(cmd, stdout=fp, stderr=fp, text=True)
except Exception as e:
raise RuntimeException(f"failed to start synchronizer pull socket: {e}")
startProcessInBackgroundWithLogFile(cmd, fp, fname)
def startFrameSynchronizer(num_mods, fp):
Log(LogLevel.INFO, 'Starting frame synchronizer')
cmd = ['slsFrameSynchronizer', str(DEFAULT_TCP_RX_PORTNO), str(num_mods)]
# in 10.0.0
#startProcessInBackground('slsFrameSynchronizer -n ' + str(num_mods) + ' -p ' + str(DEFAULT_TCP_RX_PORTNO))
startProcessInBackground('slsFrameSynchronizer ' + str(DEFAULT_TCP_RX_PORTNO) + ' ' + str(num_mods), fp)
tStartup = 1 * num_mods
time.sleep(tStartup)
#cmd = ['slsFrameSynchronizer', '-p', str(DEFAULT_TCP_RX_PORTNO), '-n', str(num_mods)]
startProcessInBackground(cmd, fp)
time.sleep(1)
def loadConfig(name, num_mods, rx_hostname, settingsdir, num_frames, fp):
Log(LogLevel.INFO, 'Loading config')
Log(LogLevel.INFO, 'Loading config', fp)
try:
d = Detector()
d.virtual = [num_mods, SERVER_START_PORTNO]
d.udp_dstport = DEFAULT_UDP_DST_PORTNO
if name == 'eiger':
d.udp_dstport2 = DEFAULT_UDP_DST_PORTNO + 1
def acquire():
Log(LogLevel.INFO, 'Acquiring')
Log(LogLevel.INFO, 'Acquiring', fp)
d = Detector()
d.acquire()
d.rx_hostname = rx_hostname
d.udp_dstip = 'auto'
if name != "eiger":
d.udp_srcip = 'auto'
if name == "jungfrau" or name == "moench" or name == "xilinx_ctb":
d.powerchip = 1
def testFramesCaught(name, num_frames):
d = Detector()
fnum = d.rx_framescaught[0]
if fnum != num_frames:
raise RuntimeException(f"{name} caught only {fnum}. Expected {num_frames}")
Log(LogLevel.INFOGREEN, f'Frames caught test passed for {name}')
Log(LogLevel.INFOGREEN, f'Frames caught test passed for {name}', fp)
if d.type == detectorType.XILINX_CHIPTESTBOARD:
d.configureTransceiver()
d.frames = num_frames
except Exception as e:
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}')
def testZmqHeadetTypeCount(name, num_mods, num_frames, fp):
def validate_htype_counts(log_path, num_mods, num_ports_per_module, num_frames):
Log(LogLevel.INFO, f"Testing Zmq Header type count for {name}")
Log(LogLevel.INFO, f"Testing Zmq Header type count for {name}", fp)
htype_counts = {
"header": 0,
"series_end": 0,
"module": 0
}
# get a count of each htype from file
with open(log_path, 'r') as f:
for line in f:
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
htype = data.get("htype")
if htype in htype_counts:
htype_counts[htype] += 1
except json.JSONDecodeError:
continue # or log malformed line
for htype, expected_count in [("header", num_mods), ("series_end", num_mods), ("module", num_ports_per_module * num_mods * num_frames)]:
if htype_counts[htype] != expected_count:
msg = f"Expected {expected_count} '{htype}' entries, found {htype_counts[htype]}"
raise RuntimeException(msg)
def startTests(name, num_mods, num_frames, fp, file_pull_socket):
Log(LogLevel.INFO, 'Tests for ' + name)
Log(LogLevel.INFO, 'Tests for ' + name, fp)
cmd = 'tests --abort [.cmdcall] -s -o ' + fname
d = Detector()
num_ports_per_module = d.numinterfaces
if name == "gotthard2":
num_ports_per_module = 1
d.acquire()
fnum = d.rx_framescaught[0]
if fnum != num_frames:
raise RuntimeException(f"{name} caught only {fnum}. Expected {num_frames}")
validate_htype_counts(file_pull_socket, num_mods, num_ports_per_module, num_frames)
Log(LogLevel.INFOGREEN, f"Log file htype checks passed for {name}", fp)
# parse cmd line for rx_hostname and settingspath using the argparse library
parser = argparse.ArgumentParser(description = 'automated tests with the virtual detector servers')
parser.add_argument('rx_hostname', nargs='?', default='localhost', help = 'hostname/ip of the current machine')
parser.add_argument('settingspath', nargs='?', default='../../settingsdir', help = 'Relative or absolut path to the settingspath')
parser.add_argument('-n', '--num-mods', nargs='?', default=2, type=int, help = 'Number of modules to test with')
parser.add_argument('-f', '--num-frames', nargs='?', default=1, type=int, help = 'Number of frames to test with')
parser.add_argument('-s', '--servers', nargs='*', help='Detector servers to run')
args = parser.parse_args()
if args.servers is None:
servers = [
'eiger',
'jungfrau',
'mythen3',
'gotthard2',
'ctb',
'moench',
'xilinx_ctb'
]
else:
servers = args.servers
Log(LogLevel.INFO, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\nservers: \'' + ' '.join(servers) + '\nnum_mods: \'' + str(args.num_mods) + '\nnum_frames: \'' + str(args.num_frames) + '\'')
# redirect to file
prefix_fname = '/tmp/slsFrameSynchronizer_test'
original_stdout = sys.stdout
original_stderr = sys.stderr
fname = prefix_fname + '_log.txt'
Log(LogLevel.INFOBLUE, '\nLog File: ' + fname)
with open(fname, 'w') as fp:
try:
testError = False
for server in servers:
try:
Log(LogLevel.INFOBLUE, '\nSynchonizer tests for ' + server, fp)
Log(LogLevel.INFOBLUE, '\nSynchonizer tests for ' + server)
# cmd tests for det
cleanup(fp)
startServers(server, args.num_mods)
file_pull_socket = prefix_fname + '_pull_socket_' + server + '.txt'
startFrameSynchronizerPullSocket(file_pull_socket, fp)
startFrameSynchronizer(args.num_mods, fp)
loadConfig(server, args.num_mods, args.rx_hostname, args.settingspath, args.num_frames, fp)
startTests(server, args.num_mods, args.num_frames, fp, file_pull_socket)
cleanup(fp)
except Exception as e:
# redirect to terminal
sys.stdout = original_stdout
sys.stderr = original_stderr
Log(LogLevel.INFORED, f'Exception caught while testing {server}. Cleaning up...')
with open(fname, 'a') as fp_error:
traceback.print_exc(file=fp_error) # This will log the full traceback
testError = True
cleanup(fp)
break
# redirect to terminal
sys.stdout = original_stdout
sys.stderr = original_stderr
if not testError:
Log(LogLevel.INFOGREEN, 'Passed all sync tests\n' + str(servers))
# get a count of each htype from file
pull_socket_fname = PULL_SOCKET_PREFIX_FNAME + name + '.txt'
with open(pull_socket_fname, 'r') as log_fp:
for line in log_fp:
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
data = json.loads(line)
htype = data.get("htype")
if htype in htype_counts:
htype_counts[htype] += 1
except json.JSONDecodeError:
continue
# test if file contents matches expected counts
d = Detector()
num_ports_per_module = 1 if name == "gotthard2" else d.numinterfaces
total_num_frame_parts = num_ports_per_module * num_mods * num_frames
for htype, expected_count in [("header", num_mods), ("series_end", num_mods), ("module", total_num_frame_parts)]:
if htype_counts[htype] != expected_count:
msg = f"Expected {expected_count} '{htype}' entries, found {htype_counts[htype]}"
raise RuntimeException(msg)
except Exception as e:
# redirect to terminal
sys.stdout = original_stdout
sys.stderr = original_stderr
Log(LogLevel.INFORED, f'Exception caught with general testing. Cleaning up...')
cleanup(fp)
raise RuntimeException(f'Failed to get zmq header count type. Error:{e}')
Log(LogLevel.INFOGREEN, f"Zmq Header type count test passed for {name}")
Log(LogLevel.INFOGREEN, f"Zmq Header type count test passed for {name}", fp)
def startTestsForAll(args, fp):
for server in args.servers:
try:
Log(LogLevel.INFOBLUE, f'Synchronizer Tests for {server}')
Log(LogLevel.INFOBLUE, f'Synchronizer Tests for {server}', fp)
cleanup(fp)
startDetectorVirtualServer(server, args.num_mods, fp)
startFrameSynchronizerPullSocket(server, fp)
startFrameSynchronizer(args.num_mods, fp)
loadConfig(name=server, rx_hostname=args.rx_hostname, settingsdir=args.settingspath, fp=fp, num_mods=args.num_mods, num_frames=args.num_frames)
acquire()
testFramesCaught(server, args.num_frames)
testZmqHeadetTypeCount(server, args.num_mods, args.num_frames, fp)
except Exception as e:
raise RuntimeException(f'Synchronizer Tests failed')
Log(LogLevel.INFOGREEN, 'Passed all synchronizer tests for all detectors \n' + str(args.servers))
if __name__ == '__main__':
args = ParseArguments(description='Automated tests to test frame synchronizer', default_num_mods=2)
Log(LogLevel.INFOBLUE, '\nLog File: ' + MAIN_LOG_FNAME + '\n')
with open(MAIN_LOG_FNAME, 'w') as fp:
try:
startTestsForAll(args, fp)
cleanup(fp)
except Exception as e:
with open(MAIN_LOG_FNAME, 'a') as fp_error:
traceback.print_exc(file=fp_error)
cleanup(fp)
Log(LogLevel.ERROR, f'Tests Failed.')