fix: fix std_daq live processing and simple scans

This commit is contained in:
gac-x02da
2025-06-16 21:21:40 +02:00
committed by wakonig_k
parent c5799cb37e
commit 2a0b5c2a61
4 changed files with 199 additions and 44 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
from collections import deque
from typing import TYPE_CHECKING, Literal, cast from typing import TYPE_CHECKING, Literal, cast
import numpy as np import numpy as np
@@ -69,7 +70,14 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
analysis_signal = Cpt(Signal, name="analysis_signal", kind=Kind.hinted, doc="Analysis Signal") analysis_signal = Cpt(Signal, name="analysis_signal", kind=Kind.hinted, doc="Analysis Signal")
analysis_signal2 = Cpt(Signal, name="analysis_signal2", kind=Kind.hinted, doc="Analysis Signal") analysis_signal2 = Cpt(Signal, name="analysis_signal2", kind=Kind.hinted, doc="Analysis Signal")
preview = Cpt(PreviewSignal, ndim=2, name="preview", doc="Camera raw data preview signal") preview = Cpt(PreviewSignal, ndim=2, name="preview", doc="Camera raw data preview signal", num_rotation_90=1, transpose=False)
preview_corrected = Cpt(
PreviewSignal,
ndim=2,
name="preview_corrected",
doc="Camera preview signal with flat and dark correction",
num_rotation_90=1, transpose=False
)
progress = Cpt( progress = Cpt(
ProgressSignal, ProgressSignal,
name="progress", name="progress",
@@ -117,6 +125,8 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
self.backend = StdDaqClient(parent=self, ws_url=std_daq_ws, rest_url=std_daq_rest) self.backend = StdDaqClient(parent=self, ws_url=std_daq_ws, rest_url=std_daq_rest)
self.backend.add_count_callback(self._on_count_update) self.backend.add_count_callback(self._on_count_update)
self.live_preview = None self.live_preview = None
self.converted_files = deque(maxlen=100) # Store the last 10 converted files
self.target_files = deque(maxlen=100) # Store the last 10 target files
self.acq_configs = {} self.acq_configs = {}
if std_daq_live is not None: if std_daq_live is not None:
self.live_preview = StdDaqPreview(url=std_daq_live, cb=self._on_preview_update) self.live_preview = StdDaqPreview(url=std_daq_live, cb=self._on_preview_update)
@@ -207,8 +217,8 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
def _on_preview_update(self, img: np.ndarray): def _on_preview_update(self, img: np.ndarray):
corrected_img = self.live_processing.apply_flat_dark_correction(img) corrected_img = self.live_processing.apply_flat_dark_correction(img)
self.live_processing.on_new_data(corrected_img) self.live_processing.on_new_data(corrected_img)
self.preview.put(corrected_img) self.preview.put(img)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=corrected_img) self.preview_corrected.put(corrected_img)
def _on_count_update(self, count: int): def _on_count_update(self, count: int):
""" """
@@ -245,7 +255,8 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
self, self,
name: str, name: str,
file_path: str = "", file_path: str = "",
file_prefix: str = "", file_name: str | None = None,
file_suffix: str = "",
num_images: int | None = None, num_images: int | None = None,
frames_per_trigger: int | None = None, frames_per_trigger: int | None = None,
) -> StatusBase: ) -> StatusBase:
@@ -263,14 +274,20 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
Returns: Returns:
DeviceStatus: The status of the restart operation. It resolves when the camera is ready to receive the first image. DeviceStatus: The status of the restart operation. It resolves when the camera is ready to receive the first image.
""" """
if file_name is not None and file_suffix:
raise ValueError("Both file_name and file_suffix are specified. Please choose one.")
self.acq_configs[name] = {} self.acq_configs[name] = {}
conf = {} conf = {}
if file_path: if file_path:
self.acq_configs[name]["file_path"] = self.file_path.get() self.acq_configs[name]["file_path"] = self.file_path.get()
conf["file_path"] = file_path conf["file_path"] = file_path
if file_prefix: if file_suffix:
self.acq_configs[name]["file_prefix"] = self.file_prefix.get() self.acq_configs[name]["file_prefix"] = self.file_prefix.get()
conf["file_prefix"] = file_prefix conf["file_prefix"] = "_".join([self.file_prefix.get(), file_suffix])
if file_name:
self.acq_configs[name]["file_prefix"] = self.file_prefix.get()
conf["file_prefix"] = file_name
if num_images is not None: if num_images is not None:
self.acq_configs[name]["num_images"] = self.num_images.get() self.acq_configs[name]["num_images"] = self.num_images.get()
conf["num_images"] = num_images conf["num_images"] = num_images
@@ -346,6 +363,7 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
num_images=self.num_images.get(), # type: ignore num_images=self.num_images.get(), # type: ignore
) )
self.camera_status.set(CameraStatus.RUNNING).wait() self.camera_status.set(CameraStatus.RUNNING).wait()
self.target_files.append(self.target_file)
def is_running(*, value, timestamp, **_): def is_running(*, value, timestamp, **_):
return bool(value == CameraStatusCode.RUNNING) return bool(value == CameraStatusCode.RUNNING)
@@ -369,7 +387,9 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
status = DeviceStatus(self) status = DeviceStatus(self)
if self.backend.status != StdDaqStatus.IDLE: if self.backend.status != StdDaqStatus.IDLE:
self.backend.add_status_callback( self.backend.add_status_callback(
status, success=[StdDaqStatus.IDLE], error=[StdDaqStatus.REJECTED, StdDaqStatus.ERROR] status,
success=[StdDaqStatus.IDLE],
error=[StdDaqStatus.REJECTED, StdDaqStatus.ERROR],
) )
self.backend.stop() self.backend.stop()
else: else:
@@ -434,11 +454,16 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
"/gpfs/test/test-beamline" # FIXME: This should be from the scan message "/gpfs/test/test-beamline" # FIXME: This should be from the scan message
) )
if "file_prefix" not in scan_args: if "file_prefix" not in scan_args:
scan_args["file_prefix"] = scan_msg.info["file_components"][0].split("/")[-1] + "_" file_base = scan_msg.info["file_components"][0].split("/")[-1]
file_suffix = scan_msg.info.get("file_suffix") or ""
comps = [file_base, self.name]
if file_suffix:
comps.append(file_suffix)
scan_args["file_prefix"] = "_".join(comps)
self.configure(scan_args) self.configure(scan_args)
if scan_msg.scan_type == "step": if scan_msg.scan_type == "step":
num_points = self.frames_per_trigger.get() * scan_msg.num_points # type: ignore num_points = self.frames_per_trigger.get() * max(scan_msg.num_points, 1) # type: ignore
else: else:
num_points = self.frames_per_trigger.get() num_points = self.frames_per_trigger.get()
@@ -519,6 +544,12 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
"""Called to inquire if a device has completed a scans.""" """Called to inquire if a device has completed a scans."""
def _create_dataset(_status: DeviceStatus): def _create_dataset(_status: DeviceStatus):
if (
self.target_file in self.converted_files
or self.target_file not in self.target_files
):
logger.info(f"File {self.target_file} already processed or not in target files.")
return
self.backend.create_virtual_datasets( self.backend.create_virtual_datasets(
self.file_path.get(), file_prefix=self.file_prefix.get() # type: ignore self.file_path.get(), file_prefix=self.file_prefix.get() # type: ignore
) )
@@ -527,8 +558,9 @@ class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
file_path=self.target_file, file_path=self.target_file,
done=True, done=True,
successful=True, successful=True,
hinted_location={"data": "data"}, hinted_location={"data": "tomcat-pco/data"},
) )
self.converted_files.append(self.target_file)
logger.info(f"Finished writing to {self.target_file}") logger.info(f"Finished writing to {self.target_file}")
status = self.acq_done() status = self.acq_done()

View File

@@ -122,10 +122,11 @@ class StdDaqLiveProcessing:
return corrected_data return corrected_data
# Ensure that the division does not lead to division by zero # Ensure that the division does not lead to division by zero
flat_corr = np.abs(flat-dark)
corrected_data = np.divide( corrected_data = np.divide(
data - dark, flat - dark, out=np.zeros_like(data), where=(flat - dark) != 0 data - dark, flat_corr, out=np.zeros_like(data, dtype=np.float32), where=flat_corr != 0
) )
return corrected_data return np.clip(corrected_data, a_min=0, a_max=None)
@typechecked @typechecked
def _load_and_update_reference( def _load_and_update_reference(

View File

@@ -1,9 +1,10 @@
from .simple_scans import TomoFlyScan, TomoScan from .simple_scans import AcquireDark, AcquireFlat, AcquireReferences, TomoFlyScan, TomoScan
from .tomcat_scans import TomcatSimpleSequence, TomcatSnapNStep from .tomcat_scans import TomcatSimpleSequence, TomcatSnapNStep
from .tutorial_fly_scan import (
AcquireDark, # from .tutorial_fly_scan import (
AcquireProjections, # # AcquireDark,
AcquireRefs, # AcquireProjections,
AcquireWhite, # AcquireRefs,
TutorialFlyScanContLine, # AcquireWhite,
) # TutorialFlyScanContLine,
# )

View File

@@ -45,28 +45,59 @@ class TomoComponents:
self, self,
name: str, name: str,
num_images: int, num_images: int,
prefix: str = "", file_suffix: str = "",
file_path: str = "", file_path: str = "",
frames_per_trigger: int = 1, frames_per_trigger: int = 1,
): ):
if not prefix: """
return Restart the cameras with a new configuration.
This is typically used to reset the cameras during another scan, e.g. before acquiring dark or flat images.
Args:
name (str): Name of the configuration to restart with.
num_images (int): Number of images to acquire.
file_suffix (str): Suffix for the file names.
file_path (str): Path where the files will be saved.
frames_per_trigger (int): Number of frames to acquire per trigger.
"""
for cam in self.cameras: for cam in self.cameras:
yield from self.stubs.send_rpc_and_wait( yield from self.stubs.send_rpc_and_wait(
device=cam, device=cam,
func_name="restart_with_new_config", func_name="restart_with_new_config",
name=name, name=name,
file_prefix=prefix, file_suffix=file_suffix,
file_path=file_path, file_path=file_path,
num_images=num_images, num_images=num_images,
frames_per_trigger=frames_per_trigger, frames_per_trigger=frames_per_trigger,
) )
def scan_report_instructions(self):
"""
Generate scan report instructions for the acquisition.
This method provides the necessary instructions to listen to the camera progress during the scan.
"""
if not self.cameras:
return
# Use the first camera or "gfcam" if available for reporting
report_camera = "gfcam" if "gfcam" in self.cameras else self.cameras[0]
yield from self.stubs.scan_report_instruction({"device_progress": [report_camera]})
def complete(self): def complete(self):
"""
Complete the acquisition by sending an RPC to each camera.
This method is typically called after the acquisition is done to finalize the process and start
writing the virtual dataset.
"""
for cam in self.cameras: for cam in self.cameras:
yield from self.stubs.send_rpc_and_wait(device=cam, func_name="on_complete") yield from self.stubs.send_rpc_and_wait(device=cam, func_name="on_complete")
def restore_configs(self, name: str): def restore_configs(self, name: str):
"""
Restore the camera configurations after an acquisition.
Args:
name (str): Name of the configuration to restore.
"""
for cam in self.cameras: for cam in self.cameras:
yield from self.stubs.send_rpc_and_wait( yield from self.stubs.send_rpc_and_wait(
device=cam, func_name="restore_config", name=name device=cam, func_name="restore_config", name=name
@@ -88,7 +119,7 @@ class TomoComponents:
device=cam, func_name="update_live_processing_reference", reference_type=ref_type device=cam, func_name="update_live_processing_reference", reference_type=ref_type
) )
def acquire_dark(self, num_images: int, exposure_time: float, name="dark"): def acquire_dark(self, num_images: int, exposure_time: float, name="dark", restart=True, restore=True):
""" """
Acquire dark images. Acquire dark images.
@@ -101,22 +132,22 @@ class TomoComponents:
logger.info(f"Acquiring {num_images} dark images with exposure time {exposure_time}s.") logger.info(f"Acquiring {num_images} dark images with exposure time {exposure_time}s.")
self.connector.send_client_info(f"Acquiring {num_images} dark images.") self.connector.send_client_info(f"Acquiring {num_images} dark images.")
yield from self.restart_cameras( if restart:
name=name, prefix=name, num_images=num_images, frames_per_trigger=1 yield from self.restart_cameras(
) name=name, file_suffix=name, num_images=num_images, frames_per_trigger=num_images
)
# yield from self.close_shutter() # yield from self.close_shutter()
for i in range(num_images): yield from self.stubs.trigger(min_wait=exposure_time * num_images)
logger.debug(f"Acquiring dark image {i+1}/{num_images}.")
yield from self.stubs.trigger(min_wait=exposure_time)
yield from self.complete() yield from self.complete()
yield from self.update_live_processing_references(ref_type="dark") yield from self.update_live_processing_references(ref_type="dark")
yield from self.restore_configs(name=name) if restore:
yield from self.restore_configs(name=name)
# yield from self.open_shutter() # yield from self.open_shutter()
self.connector.send_client_info("") self.connector.send_client_info("")
logger.info("Dark image acquisition complete.") logger.info("Dark image acquisition complete.")
def acquire_flat(self, num_images: int, exposure_time: float, name="flat"): def acquire_flat(self, num_images: int, exposure_time: float, name="flat", restart=True, restore=True):
""" """
Acquire flat images. Acquire flat images.
@@ -129,22 +160,112 @@ class TomoComponents:
logger.info(f"Acquiring {num_images} flat images with exposure time {exposure_time}s.") logger.info(f"Acquiring {num_images} flat images with exposure time {exposure_time}s.")
self.connector.send_client_info(f"Acquiring {num_images} flat images.") self.connector.send_client_info(f"Acquiring {num_images} flat images.")
yield from self.restart_cameras( if restart:
name=name, prefix=name, num_images=num_images, frames_per_trigger=1 yield from self.restart_cameras(
) name=name, file_suffix=name, num_images=num_images, frames_per_trigger=num_images
)
# yield from self.open_shutter() # yield from self.open_shutter()
for i in range(num_images): yield from self.stubs.trigger(min_wait=exposure_time * num_images)
logger.debug(f"Acquiring flat image {i+1}/{num_images}.")
yield from self.stubs.trigger(min_wait=exposure_time)
yield from self.complete() yield from self.complete()
yield from self.update_live_processing_references(ref_type="flat") yield from self.update_live_processing_references(ref_type="flat")
yield from self.restore_configs(name=name)
if restore:
yield from self.restore_configs(name=name)
logger.info("Flat image acquisition complete.") logger.info("Flat image acquisition complete.")
self.connector.send_client_info("") self.connector.send_client_info("")
def acquire_references(self, num_darks: int, num_flats: int, exp_time: float, name: str): def acquire_references(self, num_darks: int, num_flats: int, exp_time: float, restart=True, restore=True):
yield from self.acquire_dark(num_darks, exposure_time=exp_time, name=name) yield from self.acquire_dark(num_darks, exposure_time=exp_time, restart=restart, restore=restore)
yield from self.acquire_flat(num_flats, exposure_time=exp_time, name=name) yield from self.acquire_flat(num_flats, exposure_time=exp_time, restart=restart, restore=restore)
class AcquireDark(ScanBase):
scan_name = "acquire_dark"
gui_config = {"Acquisition Parameters": ["num_images", "exp_time"]}
def __init__(self, num_images: int, exp_time: float, **kwargs):
"""
Acquire dark images.
Args:
num_images (int): Number of dark images to acquire.
exp_time (float): Exposure time for each dark image in seconds.
Returns:
ScanReport
"""
frames_per_trigger = num_images if num_images > 0 else 1
super().__init__(frames_per_trigger=frames_per_trigger, exp_time=exp_time, **kwargs)
self.components = TomoComponents(self)
def scan_report_instructions(self):
yield from self.components.scan_report_instructions()
def scan_core(self):
yield from self.components.acquire_dark(
self.frames_per_trigger, self.exp_time, restart=False
)
class AcquireFlat(ScanBase):
scan_name = "acquire_flat"
gui_config = {"Acquisition Parameters": ["num_images", "exp_time"]}
def __init__(self, num_images: int, exp_time: float, **kwargs):
"""
Acquire flat images.
Args:
num_images (int): Number of flat images to acquire.
exp_time (float): Exposure time for each flat image in seconds.
frames_per_trigger (int): Number of frames to acquire per trigger.
Returns:
ScanReport
"""
frames_per_trigger = num_images if num_images > 0 else 1
super().__init__(frames_per_trigger=frames_per_trigger, exp_time=exp_time, **kwargs)
self.components = TomoComponents(self)
def scan_report_instructions(self):
yield from self.components.scan_report_instructions()
def scan_core(self):
yield from self.components.acquire_flat(
self.frames_per_trigger, self.exp_time, restart=False
)
class AcquireReferences(ScanBase):
scan_name = "acquire_refs"
gui_config = {"Acquisition Parameters": ["num_darks", "num_flats", "exp_time"]}
def __init__(self, num_darks: int, num_flats: int, exp_time: float, **kwargs):
"""
Acquire flats and darks.
Args:
num_darks (int): Number of dark images to acquire.
num_flats (int): Number of flat images to acquire.
exp_time (float): Exposure time for each flat image in seconds.
frames_per_trigger (int): Number of frames to acquire per trigger.
Returns:
ScanReport
"""
super().__init__(exp_time=exp_time, **kwargs)
self.num_darks = num_darks
self.num_flats = num_flats
self.components = TomoComponents(self)
def scan_report_instructions(self):
yield from self.components.scan_report_instructions()
def pre_scan(self):
yield from self.components.acquire_references(self.num_darks, self.num_flats, self.exp_time)
def scan_core(self):
yield None
class TomoScan(LineScan): class TomoScan(LineScan):