mirror of
https://github.com/slsdetectorgroup/slsDetectorPackage.git
synced 2025-05-12 15:32:13 +02:00
moved log out to utils
This commit is contained in:
parent
9889a31f81
commit
2c93d40e81
@ -1,26 +1,23 @@
|
||||
# SPDX-License-Identifier: LGPL-3.0-or-other
|
||||
# Copyright (C) 2021 Contributors to the SLS Detector Package
|
||||
'''
|
||||
This file is used to start up simulators, receivers and run all the tests on them and finally kill the simulators and receivers.
|
||||
This file is used to start up simulators, frame synchronizer, pull sockets, acquire, test and kill them finally.
|
||||
'''
|
||||
import argparse
|
||||
import os, sys, subprocess, time, colorama
|
||||
import os, sys, subprocess, time
|
||||
import shlex, traceback, json
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from slsdet import Detector, detectorType, detectorSettings
|
||||
from slsdet.defines import DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
|
||||
SERVER_START_PORTNO=1900
|
||||
|
||||
from utils_for_test import Log, LogLevel
|
||||
|
||||
colorama.init(autoreset=True)
|
||||
|
||||
def Log(color, message, stream=sys.stdout):
|
||||
print(f"{color}{message}{Style.RESET_ALL}", file=stream, flush=True)
|
||||
|
||||
class RuntimeException (Exception):
|
||||
def __init__ (self, message):
|
||||
super().__init__(Log(Fore.RED, message))
|
||||
super().__init__(Log(LogLevel.ERROR, message))
|
||||
|
||||
def checkIfProcessRunning(processName):
|
||||
cmd = f"pgrep -f {processName}"
|
||||
@ -31,25 +28,24 @@ def checkIfProcessRunning(processName):
|
||||
def killProcess(name, fp):
|
||||
pids = checkIfProcessRunning(name)
|
||||
if pids:
|
||||
Log(Fore.WHITE, f"Killing '{name}' processes with PIDs: {', '.join(pids)}", fp)
|
||||
Log(LogLevel.INFO, f"Killing '{name}' processes with PIDs: {', '.join(pids)}", fp)
|
||||
for pid in pids:
|
||||
try:
|
||||
p = subprocess.run(['kill', pid])
|
||||
if p.returncode != 0 and bool(checkIfProcessRunning(name)):
|
||||
raise RuntimeException(f"Could not kill {name} with pid {pid}")
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f"Failed to kill process {name} pid:{pid}. Exception occured: [code:{e}, msg:{e.stderr}]")
|
||||
raise
|
||||
raise RuntimeException(f"Failed to kill process {name} pid:{pid}. Exception occured: [code:{e}, msg:{e.stderr}]")
|
||||
#else:
|
||||
# Log(Fore.WHITE, 'process not running : ' + name)
|
||||
# Log(LogLevel.INFO, 'process not running : ' + name)
|
||||
|
||||
|
||||
def cleanup(fp):
|
||||
'''
|
||||
kill both servers, receivers and clean shared memory
|
||||
'''
|
||||
Log(Fore.WHITE, 'Cleaning up')
|
||||
Log(Fore.WHITE, 'Cleaning up', fp)
|
||||
Log(LogLevel.INFO, 'Cleaning up')
|
||||
Log(LogLevel.INFO, 'Cleaning up', fp)
|
||||
killProcess('DetectorServer_virtual', fp)
|
||||
killProcess('slsReceiver', fp)
|
||||
killProcess('slsMultiReceiver', fp)
|
||||
@ -58,43 +54,40 @@ def cleanup(fp):
|
||||
cleanSharedmemory(fp)
|
||||
|
||||
def cleanSharedmemory(fp):
|
||||
Log(Fore.WHITE, 'Cleaning up shared memory...', fp)
|
||||
Log(LogLevel.INFO, 'Cleaning up shared memory...', fp)
|
||||
try:
|
||||
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp)
|
||||
except:
|
||||
Log(Fore.RED, 'Could not free shared memory')
|
||||
raise
|
||||
raise RuntimeException('Could not free shared memory')
|
||||
|
||||
def startProcessInBackground(name, fp):
|
||||
try:
|
||||
# in background and dont print output
|
||||
p = subprocess.Popen(shlex.split(name), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
|
||||
Log(Fore.WHITE, 'Starting up ' + name + ' ...', fp)
|
||||
Log(LogLevel.INFO, 'Starting up ' + name + ' ...', fp)
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f'Could not start {name}:{e}')
|
||||
raise
|
||||
raise RuntimeException(f'Could not start {name}:{e}')
|
||||
|
||||
def startServers(name, num_mods):
|
||||
Log(Fore.WHITE, 'Starting server')
|
||||
Log(LogLevel.INFO, 'Starting server')
|
||||
for i in range(num_mods):
|
||||
port_no = SERVER_START_PORTNO + (i * 2)
|
||||
startProcessInBackground(name + 'DetectorServer_virtual -p' + str(port_no), fp)
|
||||
time.sleep(6)
|
||||
|
||||
def startFrameSynchronizerPullSocket(fname, fp):
|
||||
Log(Fore.WHITE, 'Starting sync pull socket')
|
||||
Log(Fore.WHITE, f"Starting up Synchronizer pull socket. Log: {fname}", fp)
|
||||
Log(Fore.WHITE, f"Synchronizer pull socket log: {fname}")
|
||||
Log(LogLevel.INFO, 'Starting sync pull socket')
|
||||
Log(LogLevel.INFO, f"Starting up Synchronizer pull socket. Log: {fname}", fp)
|
||||
Log(LogLevel.INFO, f"Synchronizer pull socket log: {fname}")
|
||||
cmd = ['python', '-u', 'frameSynchronizerPullSocket.py']
|
||||
try:
|
||||
with open(fname, 'w') as fp:
|
||||
subprocess.Popen(cmd, stdout=fp, stderr=fp, text=True)
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f"failed to start synchronizer pull socket: {e}")
|
||||
raise
|
||||
raise RuntimeException(f"failed to start synchronizer pull socket: {e}")
|
||||
|
||||
def startFrameSynchronizer(num_mods, fp):
|
||||
Log(Fore.WHITE, 'Starting frame synchronizer')
|
||||
Log(LogLevel.INFO, 'Starting frame synchronizer')
|
||||
# in 10.0.0
|
||||
#startProcessInBackground('slsFrameSynchronizer -n ' + str(num_mods) + ' -p ' + str(DEFAULT_TCP_RX_PORTNO))
|
||||
startProcessInBackground('slsFrameSynchronizer ' + str(DEFAULT_TCP_RX_PORTNO) + ' ' + str(num_mods), fp)
|
||||
@ -102,8 +95,8 @@ def startFrameSynchronizer(num_mods, fp):
|
||||
time.sleep(tStartup)
|
||||
|
||||
def loadConfig(name, num_mods, rx_hostname, settingsdir, num_frames, fp):
|
||||
Log(Fore.WHITE, 'Loading config')
|
||||
Log(Fore.WHITE, 'Loading config', fp)
|
||||
Log(LogLevel.INFO, 'Loading config')
|
||||
Log(LogLevel.INFO, 'Loading config', fp)
|
||||
try:
|
||||
d = Detector()
|
||||
d.virtual = [num_mods, SERVER_START_PORTNO]
|
||||
@ -125,8 +118,7 @@ def loadConfig(name, num_mods, rx_hostname, settingsdir, num_frames, fp):
|
||||
|
||||
d.frames = num_frames
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f'Could not load config for {name}. Error: {str(e)}')
|
||||
raise
|
||||
raise RuntimeException(f'Could not load config for {name}. Error: {str(e)}')
|
||||
|
||||
def validate_htype_counts(log_path, num_mods, num_ports_per_module, num_frames):
|
||||
htype_counts = {
|
||||
@ -152,12 +144,11 @@ def validate_htype_counts(log_path, num_mods, num_ports_per_module, num_frames):
|
||||
for htype, expected_count in [("header", num_mods), ("series_end", num_mods), ("module", num_ports_per_module * num_mods * num_frames)]:
|
||||
if htype_counts[htype] != expected_count:
|
||||
msg = f"Expected {expected_count} '{htype}' entries, found {htype_counts[htype]}"
|
||||
Log(Fore.RED, msg)
|
||||
raise RuntimeError(msg)
|
||||
raise RuntimeException(msg)
|
||||
|
||||
def startTests(name, num_mods, num_frames, fp, file_pull_socket):
|
||||
Log(Fore.WHITE, 'Tests for ' + name)
|
||||
Log(Fore.WHITE, 'Tests for ' + name, fp)
|
||||
Log(LogLevel.INFO, 'Tests for ' + name)
|
||||
Log(LogLevel.INFO, 'Tests for ' + name, fp)
|
||||
cmd = 'tests --abort [.cmdcall] -s -o ' + fname
|
||||
|
||||
d = Detector()
|
||||
@ -167,11 +158,10 @@ def startTests(name, num_mods, num_frames, fp, file_pull_socket):
|
||||
d.acquire()
|
||||
fnum = d.rx_framescaught[0]
|
||||
if fnum != num_frames:
|
||||
Log(Fore.RED, f"{name} caught only {fnum}. Expected {num_frames}")
|
||||
raise
|
||||
raise RuntimeException(f"{name} caught only {fnum}. Expected {num_frames}")
|
||||
|
||||
validate_htype_counts(file_pull_socket, num_mods, num_ports_per_module, num_frames)
|
||||
Log(Fore.GREEN, f"Log file htype checks passed for {name}", fp)
|
||||
Log(LogLevel.INFOGREEN, f"Log file htype checks passed for {name}", fp)
|
||||
|
||||
|
||||
# parse cmd line for rx_hostname and settingspath using the argparse library
|
||||
@ -197,7 +187,7 @@ else:
|
||||
servers = args.servers
|
||||
|
||||
|
||||
Log(Fore.WHITE, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\nservers: \'' + ' '.join(servers) + '\nnum_mods: \'' + str(args.num_mods) + '\nnum_frames: \'' + str(args.num_frames) + '\'')
|
||||
Log(LogLevel.INFO, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\nservers: \'' + ' '.join(servers) + '\nnum_mods: \'' + str(args.num_mods) + '\nnum_frames: \'' + str(args.num_frames) + '\'')
|
||||
|
||||
|
||||
# redirect to file
|
||||
@ -205,7 +195,7 @@ prefix_fname = '/tmp/slsFrameSynchronizer_test'
|
||||
original_stdout = sys.stdout
|
||||
original_stderr = sys.stderr
|
||||
fname = prefix_fname + '_log.txt'
|
||||
Log(Fore.BLUE, '\nLog File: ' + fname)
|
||||
Log(LogLevel.INFOBLUE, '\nLog File: ' + fname)
|
||||
|
||||
with open(fname, 'w') as fp:
|
||||
|
||||
@ -213,8 +203,8 @@ with open(fname, 'w') as fp:
|
||||
testError = False
|
||||
for server in servers:
|
||||
try:
|
||||
Log(Fore.BLUE, '\nSynchonizer tests for ' + server, fp)
|
||||
Log(Fore.BLUE, '\nSynchonizer tests for ' + server)
|
||||
Log(LogLevel.INFOBLUE, '\nSynchonizer tests for ' + server, fp)
|
||||
Log(LogLevel.INFOBLUE, '\nSynchonizer tests for ' + server)
|
||||
|
||||
# cmd tests for det
|
||||
cleanup(fp)
|
||||
@ -230,7 +220,7 @@ with open(fname, 'w') as fp:
|
||||
# redirect to terminal
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
Log(Fore.RED, f'Exception caught while testing {server}. Cleaning up...')
|
||||
Log(LogLevel.INFORED, f'Exception caught while testing {server}. Cleaning up...')
|
||||
with open(fname, 'a') as fp_error:
|
||||
traceback.print_exc(file=fp_error) # This will log the full traceback
|
||||
|
||||
@ -242,13 +232,13 @@ with open(fname, 'w') as fp:
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
if not testError:
|
||||
Log(Fore.GREEN, 'Passed all sync tests\n' + str(servers))
|
||||
Log(LogLevel.INFOGREEN, 'Passed all sync tests\n' + str(servers))
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# redirect to terminal
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
Log(Fore.RED, f'Exception caught with general testing. Cleaning up...')
|
||||
Log(LogLevel.INFORED, f'Exception caught with general testing. Cleaning up...')
|
||||
cleanup(fp)
|
||||
|
@ -4,22 +4,18 @@
|
||||
This file is used to start up simulators, receivers and run all the tests on them and finally kill the simulators and receivers.
|
||||
'''
|
||||
import argparse
|
||||
import os, sys, subprocess, time, colorama
|
||||
import os, sys, subprocess, time
|
||||
|
||||
from colorama import Fore, Style
|
||||
from slsdet import Detector, detectorType, detectorSettings
|
||||
from slsdet.defines import DEFAULT_TCP_CNTRL_PORTNO, DEFAULT_TCP_RX_PORTNO, DEFAULT_UDP_DST_PORTNO
|
||||
HALFMOD2_TCP_CNTRL_PORTNO=1955
|
||||
HALFMOD2_TCP_RX_PORTNO=1957
|
||||
|
||||
colorama.init(autoreset=True)
|
||||
|
||||
def Log(color, message):
|
||||
print(f"{color}{message}{Style.RESET_ALL}", flush=True)
|
||||
from utils_for_test import Log, LogLevel
|
||||
|
||||
class RuntimeException (Exception):
|
||||
def __init__ (self, message):
|
||||
super().__init__(Log(Fore.RED, message))
|
||||
super().__init__(Log(LogLevel.INFORED, message))
|
||||
|
||||
def checkIfProcessRunning(processName):
|
||||
cmd = f"pgrep -f {processName}"
|
||||
@ -30,45 +26,42 @@ def checkIfProcessRunning(processName):
|
||||
def killProcess(name):
|
||||
pids = checkIfProcessRunning(name)
|
||||
if pids:
|
||||
Log(Fore.GREEN, f"Killing '{name}' processes with PIDs: {', '.join(pids)}")
|
||||
Log(LogLevel.INFOGREEN, f"Killing '{name}' processes with PIDs: {', '.join(pids)}")
|
||||
for pid in pids:
|
||||
try:
|
||||
p = subprocess.run(['kill', pid])
|
||||
if p.returncode != 0 and bool(checkIfProcessRunning(name)):
|
||||
raise RuntimeException(f"Could not kill {name} with pid {pid}")
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f"Failed to kill process {name} pid:{pid}. Exception occured: [code:{e}, msg:{e.stderr}]")
|
||||
raise
|
||||
raise RuntimeException(f"Failed to kill process {name} pid:{pid}. Exception occured: [code:{e}, msg:{e.stderr}]")
|
||||
#else:
|
||||
# Log(Fore.WHITE, 'process not running : ' + name)
|
||||
# Log(LogLevel.INFO, 'process not running : ' + name)
|
||||
|
||||
|
||||
def cleanup(fp):
|
||||
'''
|
||||
kill both servers, receivers and clean shared memory
|
||||
'''
|
||||
Log(Fore.GREEN, 'Cleaning up...')
|
||||
Log(LogLevel.INFOGREEN, 'Cleaning up...')
|
||||
killProcess('DetectorServer_virtual')
|
||||
killProcess('slsReceiver')
|
||||
killProcess('slsMultiReceiver')
|
||||
cleanSharedmemory(fp)
|
||||
|
||||
def cleanSharedmemory(fp):
|
||||
Log(Fore.GREEN, 'Cleaning up shared memory...')
|
||||
Log(LogLevel.INFOGREEN, 'Cleaning up shared memory...')
|
||||
try:
|
||||
p = subprocess.run(['sls_detector_get', 'free'], stdout=fp, stderr=fp)
|
||||
except:
|
||||
Log(Fore.RED, 'Could not free shared memory')
|
||||
raise
|
||||
raise RuntimeException('Could not free shared memory')
|
||||
|
||||
def startProcessInBackground(name):
|
||||
try:
|
||||
# in background and dont print output
|
||||
p = subprocess.Popen(name.split(), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, restore_signals=False)
|
||||
Log(Fore.GREEN, 'Starting up ' + name + ' ...')
|
||||
Log(LogLevel.INFOGREEN, 'Starting up ' + name + ' ...')
|
||||
except Exception as e:
|
||||
Log(Fore.RED, f'Could not start {name}:{e}')
|
||||
raise
|
||||
raise RuntimeException(f'Could not start {name}:{e}')
|
||||
|
||||
def startServer(name):
|
||||
|
||||
@ -77,7 +70,7 @@ def startServer(name):
|
||||
if name == 'eiger':
|
||||
startProcessInBackground(name + 'DetectorServer_virtual -p' + str(HALFMOD2_TCP_CNTRL_PORTNO))
|
||||
tStartup = 6
|
||||
Log(Fore.WHITE, 'Takes ' + str(tStartup) + ' seconds... Please be patient')
|
||||
Log(LogLevel.INFO, 'Takes ' + str(tStartup) + ' seconds... Please be patient')
|
||||
time.sleep(tStartup)
|
||||
|
||||
def startReceiver(name):
|
||||
@ -88,7 +81,7 @@ def startReceiver(name):
|
||||
time.sleep(2)
|
||||
|
||||
def loadConfig(name, rx_hostname, settingsdir):
|
||||
Log(Fore.GREEN, 'Loading config')
|
||||
Log(LogLevel.INFOGREEN, 'Loading config')
|
||||
try:
|
||||
d = Detector()
|
||||
if name == 'eiger':
|
||||
@ -116,11 +109,10 @@ def loadConfig(name, rx_hostname, settingsdir):
|
||||
if d.type == detectorType.XILINX_CHIPTESTBOARD:
|
||||
d.configureTransceiver()
|
||||
except:
|
||||
Log(Fore.RED, 'Could not load config for ' + name)
|
||||
raise
|
||||
raise RuntimeException('Could not load config for ' + name)
|
||||
|
||||
def startCmdTests(name, fp, fname):
|
||||
Log(Fore.GREEN, 'Cmd Tests for ' + name)
|
||||
Log(LogLevel.INFOGREEN, 'Cmd Tests for ' + name)
|
||||
cmd = 'tests --abort [.cmdcall] -s -o ' + fname
|
||||
try:
|
||||
subprocess.run(cmd.split(), stdout=fp, stderr=fp, check=True, text=True)
|
||||
@ -132,15 +124,14 @@ def startCmdTests(name, fp, fname):
|
||||
if "FAILED" in line:
|
||||
msg = 'Cmd tests failed for ' + name + '!!!'
|
||||
sys.stdout = original_stdout
|
||||
Log(Fore.RED, msg)
|
||||
Log(Fore.RED, line)
|
||||
Log(LogLevel.ERROR, f"{msg}\n{line}")
|
||||
sys.stdout = fp
|
||||
raise Exception(msg)
|
||||
|
||||
Log(Fore.GREEN, 'Cmd Tests successful for ' + name)
|
||||
Log(LogLevel.INFOGREEN, 'Cmd Tests successful for ' + name)
|
||||
|
||||
def startGeneralTests(fp, fname):
|
||||
Log(Fore.GREEN, 'General Tests')
|
||||
Log(LogLevel.INFOGREEN, 'General Tests')
|
||||
cmd = 'tests --abort -s -o ' + fname
|
||||
try:
|
||||
subprocess.run(cmd.split(), stdout=fp, stderr=fp, check=True, text=True)
|
||||
@ -152,11 +143,11 @@ def startGeneralTests(fp, fname):
|
||||
if "FAILED" in line:
|
||||
msg = 'General tests failed !!!'
|
||||
sys.stdout = original_stdout
|
||||
Log(Fore.RED, msg + '\n' + line)
|
||||
Log(LogLevel.ERROR, msg + '\n' + line)
|
||||
sys.stdout = fp
|
||||
raise Exception(msg)
|
||||
|
||||
Log(Fore.GREEN, 'General Tests successful')
|
||||
Log(LogLevel.INFOGREEN, 'General Tests successful')
|
||||
|
||||
|
||||
|
||||
@ -182,7 +173,7 @@ else:
|
||||
servers = args.servers
|
||||
|
||||
|
||||
Log(Fore.WHITE, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\nservers: \'' + ' '.join(servers) + '\'')
|
||||
Log(LogLevel.INFO, 'Arguments:\nrx_hostname: ' + args.rx_hostname + '\nsettingspath: \'' + args.settingspath + '\nservers: \'' + ' '.join(servers) + '\'')
|
||||
|
||||
|
||||
# redirect to file
|
||||
@ -190,7 +181,7 @@ prefix_fname = '/tmp/slsDetectorPackage_virtual_test'
|
||||
original_stdout = sys.stdout
|
||||
original_stderr = sys.stderr
|
||||
fname = prefix_fname + '_log.txt'
|
||||
Log(Fore.BLUE, '\nLog File: ' + fname)
|
||||
Log(LogLevel.INFOBLUE, '\nLog File: ' + fname)
|
||||
|
||||
with open(fname, 'w') as fp:
|
||||
|
||||
@ -198,10 +189,10 @@ with open(fname, 'w') as fp:
|
||||
|
||||
# general tests
|
||||
file_results = prefix_fname + '_results_general.txt'
|
||||
Log(Fore.BLUE, 'General tests (results: ' + file_results + ')')
|
||||
Log(LogLevel.INFOBLUE, 'General tests (results: ' + file_results + ')')
|
||||
sys.stdout = fp
|
||||
sys.stderr = fp
|
||||
Log(Fore.BLUE, 'General tests (results: ' + file_results + ')')
|
||||
Log(LogLevel.INFOBLUE, 'General tests (results: ' + file_results + ')')
|
||||
|
||||
try:
|
||||
cleanup(fp)
|
||||
@ -215,10 +206,10 @@ with open(fname, 'w') as fp:
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
file_results = prefix_fname + '_results_cmd_' + server + '.txt'
|
||||
Log(Fore.BLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')')
|
||||
Log(LogLevel.INFOBLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')')
|
||||
sys.stdout = fp
|
||||
sys.stderr = fp
|
||||
Log(Fore.BLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')')
|
||||
Log(LogLevel.INFOBLUE, 'Cmd tests for ' + server + ' (results: ' + file_results + ')')
|
||||
|
||||
# cmd tests for det
|
||||
cleanup(fp)
|
||||
@ -232,7 +223,7 @@ with open(fname, 'w') as fp:
|
||||
# redirect to terminal
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
Log(Fore.RED, f'Exception caught while testing {server}. Cleaning up...')
|
||||
Log(LogLevel.INFORED, f'Exception caught while testing {server}. Cleaning up...')
|
||||
testError = True
|
||||
break
|
||||
|
||||
@ -240,13 +231,13 @@ with open(fname, 'w') as fp:
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
if not testError:
|
||||
Log(Fore.GREEN, 'Passed all tests for virtual detectors \n' + str(servers))
|
||||
Log(LogLevel.INFOGREEN, 'Passed all tests for virtual detectors \n' + str(servers))
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# redirect to terminal
|
||||
sys.stdout = original_stdout
|
||||
sys.stderr = original_stderr
|
||||
Log(Fore.RED, f'Exception caught with general testing. Cleaning up...')
|
||||
Log(LogLevel.INFORED, f'Exception caught with general testing. Cleaning up...')
|
||||
cleanSharedmemory(sys.stdout)
|
||||
|
41
tests/scripts/utils_for_test.py
Normal file
41
tests/scripts/utils_for_test.py
Normal file
@ -0,0 +1,41 @@
|
||||
# SPDX-License-Identifier: LGPL-3.0-or-other
|
||||
# Copyright (C) 2021 Contributors to the SLS Detector Package
|
||||
'''
|
||||
This file is used for common utils used for integration tests between simulators and receivers.
|
||||
'''
|
||||
|
||||
import sys
|
||||
from enum import Enum
|
||||
from colorama import Fore, Style, init
|
||||
|
||||
init(autoreset=True)
|
||||
|
||||
class LogLevel(Enum):
|
||||
INFO = 0
|
||||
INFORED = 1
|
||||
INFOGREEN = 2
|
||||
INFOBLUE = 3
|
||||
WARNING = 4
|
||||
ERROR = 5
|
||||
DEBUG = 6
|
||||
|
||||
LOG_LABELS = {
|
||||
LogLevel.WARNING: "WARNING: ",
|
||||
LogLevel.ERROR: "ERROR: ",
|
||||
LogLevel.DEBUG: "DEBUG: "
|
||||
}
|
||||
|
||||
LOG_COLORS = {
|
||||
LogLevel.INFO: Fore.WHITE,
|
||||
LogLevel.INFORED: Fore.RED,
|
||||
LogLevel.INFOGREEN: Fore.GREEN,
|
||||
LogLevel.INFOBLUE: Fore.BLUE,
|
||||
LogLevel.WARNING: Fore.YELLOW,
|
||||
LogLevel.ERROR: Fore.RED,
|
||||
LogLevel.DEBUG: Fore.CYAN
|
||||
}
|
||||
|
||||
def Log(level: LogLevel, message: str, stream=sys.stdout):
|
||||
color = LOG_COLORS.get(level, Fore.WHITE)
|
||||
label = LOG_LABELS.get(level, "")
|
||||
print(f"{color}{label}{message}{Style.RESET_ALL}", file=stream, flush=True)
|
Loading…
x
Reference in New Issue
Block a user