feat(debug-tools): add debug tools and adjust logic from beamline tests
Some checks failed
CI for csaxs_bec / test (push) Failing after 1m23s
CI for csaxs_bec / test (pull_request) Failing after 1m27s

This commit is contained in:
2026-01-22 22:32:58 +01:00
parent 44e8456272
commit b5369682de
5 changed files with 426 additions and 2 deletions

View File

@@ -0,0 +1,257 @@
"""Module providing debugging tools for the BEC IPython client at cSAXS."""
from __future__ import annotations
import inspect
import json
import os
import re
import socket
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import TYPE_CHECKING, Literal
import numpy as np
from pydantic import BaseModel
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from slugify import slugify
if TYPE_CHECKING:
from bec_ipython_client.main import BECClient
from bec_lib.scans import Scans
from bec_widgets.cli.client_utils import BECGuiClient
scans: Scans # type: ignore[no-redef]
bec: BECClient # type: ignore[no-redef]
gui: BECGuiClient # type: ignore[no-redef]
dev: bec.device_manager
class Detector(BaseModel):
"""Model representing a detector configuration."""
name: str
hostnames: list[str]
cfg: dict
def to_identifier(text: str) -> str:
"""
Convert an unsafe string into a valid Python identifier.
"""
name = slugify(text.strip(), separator="_")
name = re.sub(r"[^a-zA-Z0-9_]", "", name)
if not name:
return "_"
if name[0].isdigit():
name = f"_{name}"
return name
class DebugTools:
"""A collection of debugging tools for the BEC IPython client at cSAXS."""
_PURPOSE = " ".join(
[
"Debugging helpers for the cSAXS BEC IPython client. These tools are intended for ",
"advanced users and developers to diagnose and troubleshoot issues within the BEC environment.",
"Below are the available methods together with a brief description of their functionality.",
]
)
######################
## Internal Methods ##
######################
def _describe(self) -> None:
"""Pretty-print a description of this debugging tool."""
console = Console()
# Offset for IPython prompt misplacement
console.print("\n\n", end="")
header = Text("DebugTools", style="bold cyan")
purpose = Text(self._PURPOSE, style="dim")
console.print(Panel(purpose, title=header, expand=False))
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Method", style="bold", no_wrap=True)
table.add_column("Description")
for name, member in inspect.getmembers(self, predicate=inspect.ismethod):
if name.startswith("_"):
continue
doc = inspect.getdoc(member)
short_doc = doc.splitlines()[0] if doc else ""
table.add_row(name, short_doc)
console.print(table)
def _repr_pretty_(self, p, cycle: bool) -> None:
if cycle:
p.text("DebugTools(...)")
else:
self._describe()
#####################
### MCS Card Check ###
#####################
def _check_if_mcs_card_is_loaded(self):
"""Check if the MCS card device is loaded in the current BEC session."""
if "mcs" not in dev:
raise RuntimeError("MCS device is not loaded in the current active BEC session.")
def _check_if_ddg_is_loaded(self):
"""Check if the DDG1 device is loaded in the current BEC session."""
if "ddg1" not in dev:
raise RuntimeError("DDG1 device is not loaded in the current active BEC session.")
if "ddg2" not in dev:
raise RuntimeError("DDG2 device is not loaded in the current active BEC session.")
def mcs_test_acquire(
self, mode: Literal["high_frame", "medium_frame", "low_frame"] = "high_frame"
):
"""
Method to perform a test acquisition with randomized exposure time, burst frames, and cycles
on the MCS card using the DDG trigger setup.
Args:
mode (Literal["high_frame", "medium_frame", "low_frame"]): The mode of the test.
- 'high_frame': Tests high frame rates with short exposure times.
- 'medium_frame': Tests medium frame rates with moderate exposure times.
- 'low_frame': Tests low frame rates with longer exposure times.
"""
self._check_if_mcs_card_is_loaded()
self._check_if_ddg_is_loaded()
if mode == "high_frame":
burst_frames = np.random.randint(10000, 100000) # between 10000 and 100000
cycles = np.random.randint(5, 20) # between 5 and 20
exp_time = (
np.random.rand() * (0.001 - 0.000201) + 0.000201
) # between 0.000201 ms and 0.001 s
elif mode == "medium_frame":
burst_frames = np.random.randint(50, 500) # between 50 and 500
cycles = np.random.randint(1, 10) # between 1 and 10
exp_time = np.random.rand() * (0.01 - 0.001) + 0.001 # between 0.001 ms and 0.01 s
elif mode == "low_frame":
burst_frames = np.random.randint(5, 20) # between 5 and 20
cycles = np.random.randint(1, 5) # between 1 and 5
exp_time = np.random.rand() * (2 - 0.1) + 0.1 # between 0.1 ms and 2 s
else:
raise ValueError(f"Invalid mode '{mode}' specified for acquire scan test.")
print(
f"Starting acquire measurement with exp_time={exp_time}, burst_frames={burst_frames}, cycles={cycles}"
)
s = scans.acquire(
exp_time=exp_time, frames_per_trigger=burst_frames, burst_at_each_point=cycles
)
s.wait()
print("Acquire measurement finished.")
print("Checking MCS data...")
scan_data = bec.history.get_by_scan_id(s.scan.scan_id)
mcs_data = scan_data.devices.mcs
print(mcs_data)
shape = mcs_data._info["mcs_mca_mca1"]["value"]["shape"]
expected_shape = (cycles * burst_frames,)
# Assert will raise an error if the shapes do not match
assert (
shape == expected_shape
), f"MCS data shape {shape} does not match expected shape {expected_shape}."
########################
### JFJ/Eiger Checks ###
########################
def _get_jfj_eiger_config(self) -> dict[str, Detector]:
"""Retrieve the current JFJ/Eiger detector configuration from the BEC client."""
# FIXME: Implement REST API call once ready for use from Leo Sala's team.
ret = {}
base_path = os.path.dirname(__file__)
config_path = os.path.join(base_path, "jfj_config.json")
with open(config_path, "r", encoding="utf-8") as fh:
cfg = json.load(fh)
for entry in cfg["detector"]:
det = Detector(
name=to_identifier(entry["description"]), hostnames=entry["hostname"], cfg=cfg
)
ret[det.name] = det
return ret
def list_detectors(self) -> list[str]:
"""
List the names of all JFJ/Eiger detectors configured in the BEC client.
Returns:
list[str]: A list of detector names.
"""
detectors = self._get_jfj_eiger_config()
return list(detectors.keys())
def ping_detector(self, detector_name: str) -> bool:
"""
Ping a JFJ/Eiger detector to check if it is reachable.
Args:
detector_name (str): The name of the detector to ping.
Returns:
bool: True if the detector is reachable, False otherwise.
"""
detectors = self._get_jfj_eiger_config()
if detector_name not in detectors:
raise ValueError(f"Detector '{detector_name}' not found in configuration.")
det = detectors[detector_name]
results = self._ping_many(det.hostnames)
table = Table(title=f"Ping results for detector '{detector_name}'")
table.add_column("Hostname", style="cyan", no_wrap=True)
table.add_column("Status", style="magenta")
for host, alive in results.items():
status = "[green]OK[/green]" if alive else "[red]DOWN[/red]"
table.add_row(host, status)
console = Console()
console.print(table)
def _ping_many(self, hosts: list[str], port=22, timeout=2, max_workers=None):
max_workers = max_workers or len(hosts)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
primed_ping = partial(self._ping, port=port, timeout=timeout)
pings = executor.map(primed_ping, hosts)
return dict(zip(hosts, pings))
def _ping(self, host: str, port=23, timeout=2): # telnet is port 23
address = (host, port)
try:
with socket.create_connection(address, timeout):
return True
except OSError:
return False
def open_it_service_page(self):
"""Open the overview of IT services hosted by Science IT (Leo Sala) for cSAXS."""
gui = bec.gui
dock_area = gui.new()
print("Opening IT service page in new dock...")
url = "https://metrics.psi.ch/d/saf8mxv/x12sa?orgId=1&from=now-24h&to=now&timezone=browser&var-receiver_hosts=sls-jfjoch-001.psi.ch&var-writer_hosts=xbl-daq-34.psi.ch&var-beamline=X12SA&var-slurm_partitions=csaxs&var-receiver_services=broker&var-writer_services=writer&refresh=15m"
# FIXME BEC WIDGETS v3
dock = dock_area.new()
wb = dock.new(widget=gui.available_widgets.WebsiteWidget)
wb.set_url(url)

View File

@@ -0,0 +1,162 @@
{
"zeromq" : {
"image_socket": ["tcp://0.0.0.0:5500"]
},
"zeromq_preview": {
"socket_address": "tcp://0.0.0.0:5400",
"enabled": true,
"period_ms": 1000
},
"zeromq_metadata" : {
"socket_address": "tcp://0.0.0.0:5600",
"enabled": true,
"period_ms": 100
},
"instrument" : {
"source_name": "Swiss Light Source",
"instrument_name": "cSAXS",
"source_type": "Synchrotron X-ray Source"
},
"detector": [
{
"description": "EIGER 9M",
"serial_number": "E1",
"type": "EIGER",
"mirror_y": true,
"base_data_ipv4_address": "10.10.10.10",
"calibration_file":["/opt/jfjoch/calibration/"],
"standard_geometry" : {
"nmodules": 18,
"modules_in_row": 3,
"gap_x": 8,
"gap_y": 36
},
"hostname": [
"beb101",
"beb103",
"beb014",
"beb078",
"beb060",
"beb030",
"beb092",
"beb178",
"beb009",
"beb038",
"beb056",
"beb058",
"beb033",
"beb113",
"beb005",
"beb017",
"beb119",
"beb095",
"beb186",
"beb042",
"beb106",
"beb059",
"beb111",
"beb203",
"beb100",
"beb093",
"beb123",
"beb061",
"beb121",
"beb055",
"beb004",
"beb190",
"beb054",
"beb189",
"beb107",
"beb115"
]
},
{
"description": "EIGER 8.5M (tmp)",
"serial_number": "E1-tmp",
"type": "EIGER",
"mirror_y": true,
"base_data_ipv4_address": "10.10.10.10",
"calibration_file":["/opt/jfjoch/calibration/"],
"standard_geometry" : {
"nmodules": 17,
"modules_in_row": 3,
"gap_x": 8,
"gap_y": 36
},
"hostname": [
"beb101",
"beb103",
"beb014",
"beb078",
"beb060",
"beb030",
"beb092",
"beb178",
"beb009",
"beb038",
"beb056",
"beb058",
"beb033",
"beb113",
"beb005",
"beb017",
"beb119",
"beb095",
"beb186",
"beb042",
"beb106",
"beb059",
"beb100",
"beb093",
"beb123",
"beb061",
"beb121",
"beb055",
"beb004",
"beb190",
"beb054",
"beb189",
"beb107",
"beb115"
]
},
{
"description": "EIGER 1.5M",
"serial_number": "E2",
"type": "EIGER",
"mirror_y": true,
"base_data_ipv4_address": "10.10.11.10",
"calibration_file":["/opt/jfjoch/calibration_e1p5m/"],
"standard_geometry" : {
"nmodules": 3,
"modules_in_row": 1,
"gap_x": 8,
"gap_y": 36
},
"hostname": ["beb062", "beb026", "beb099", "beb084", "beb120", "beb108"]
}
],
"frontend_directory": "/usr/share/jfjoch/frontend/",
"image_pusher": "ZeroMQ",
"numa_policy": "n2g2",
"receiver_threads": 64,
"image_buffer_MiB": 96000,
"pcie": [
{
"blk": "/dev/jfjoch0",
"ipv4": "10.10.10.1"
},
{
"blk": "/dev/jfjoch1",
"ipv4": "10.10.10.2"
},
{
"blk": "/dev/jfjoch2",
"ipv4": "10.10.10.3"
},
{
"blk": "/dev/jfjoch3",
"ipv4": "10.10.10.4"
}
]
}

View File

@@ -48,6 +48,11 @@ elif _args.session.lower() == "csaxs":
logger.success("cSAXS session loaded.")
from csaxs_bec.bec_ipython_client.plugins.tool_box.debug_tools import DebugTools
debug = DebugTools()
logger.success("Debug tools loaded. Use 'debug' to access them.")
# SETUP BEAMLINE INFO
from bec_ipython_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo

View File

@@ -158,7 +158,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
def _compare_callback(self, *args, value, **kwargs) -> bool:
"""Callback for subscription status"""
logger.info(f"StatusBitsCompareStatus: Received value {value}")
logger.debug(f"StatusBitsCompareStatus: Received value {value}")
obj = kwargs.get("obj", None)
if obj is None:
name = "no object received"
@@ -173,7 +173,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
return False
if self._add_delay != 0:
time.sleep(self._add_delay)
logger.info(
logger.debug(
f"Returning comparison for {name}: {(STATUSBITS(value) & self._value) == self._value}"
)
return (STATUSBITS(value) & self._value) == self._value