Compare commits

..

1 Commits

Author SHA1 Message Date
f40fe32317 fix: flomni async readout
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m58s
CI for csaxs_bec / test (pull_request) Successful in 2m0s
2026-03-19 11:11:11 +01:00
12 changed files with 82 additions and 334 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 562 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 124 KiB

View File

@@ -778,9 +778,7 @@ class FlomniSampleTransferMixin:
dev.ftransy.controller.socket_put_confirmed("confirm=1")
else:
print("Stopping.")
raise FlomniError(
"User abort sample transfer."
)
exit
def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
@@ -803,8 +801,7 @@ class FlomniSampleTransferMixin:
def ftransfer_gripper_move(self, position: int):
self.check_position_is_valid(position)
#this is not used for sample stage position!
self._ftransfer_shiftx = -0.15
self._ftransfer_shiftx = -0.2
self._ftransfer_shiftz = -0.5
fsamx_pos = dev.fsamx.readback.get()
@@ -824,7 +821,7 @@ class FlomniSampleTransferMixin:
self.check_tray_in()
if position == 0:
umv(dev.ftransx, 11, dev.ftransz, 3.5950)
umv(dev.ftransx, 10.715 + 0.2, dev.ftransz, 3.5950)
if position == 1:
umv(
dev.ftransx,
@@ -969,6 +966,8 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin:
import csaxs_bec
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
@@ -1209,76 +1208,6 @@ class FlomniAlignmentMixin:
return additional_correction_shift
class _ProgressProxy:
"""Dict-like proxy that persists the flOMNI progress dict as a BEC global variable.
Every read (`proxy["key"]`) fetches the current dict from the global var store,
and every write (`proxy["key"] = val`) fetches, updates, and saves it back.
This makes the progress state visible to all BEC client sessions via
``client.get_global_var("tomo_progress")``.
"""
_GLOBAL_VAR_KEY = "tomo_progress"
_DEFAULTS: dict = {
"subtomo": 0,
"subtomo_projection": 0,
"subtomo_total_projections": 1,
"projection": 0,
"total_projections": 1,
"angle": 0,
"tomo_type": 0,
"tomo_start_time": None,
"estimated_remaining_time": None,
"heartbeat": None,
}
def __init__(self, client):
self._client = client
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _load(self) -> dict:
val = self._client.get_global_var(self._GLOBAL_VAR_KEY)
if val is None:
return dict(self._DEFAULTS)
return val
def _save(self, data: dict) -> None:
self._client.set_global_var(self._GLOBAL_VAR_KEY, data)
# ------------------------------------------------------------------
# Dict-like interface
# ------------------------------------------------------------------
def __getitem__(self, key):
return self._load()[key]
def __setitem__(self, key, value) -> None:
data = self._load()
data[key] = value
self._save(data)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._load()!r})"
def get(self, key, default=None):
return self._load().get(key, default)
def update(self, *args, **kwargs) -> None:
"""Update multiple fields in a single round-trip."""
data = self._load()
data.update(*args, **kwargs)
self._save(data)
def reset(self) -> None:
"""Reset all progress fields to their default values."""
self._save(dict(self._DEFAULTS))
def as_dict(self) -> dict:
"""Return a plain copy of the current progress state."""
return self._load()
class Flomni(
FlomniInitStagesMixin,
FlomniSampleTransferMixin,
@@ -1301,8 +1230,14 @@ class Flomni(
self.corr_angle_y = []
self.corr_pos_y_2 = []
self.corr_angle_y_2 = []
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -1358,42 +1293,6 @@ class Flomni(
self.special_angles = []
self.special_angle_repeats = 1
@property
def progress(self) -> _ProgressProxy:
"""Proxy dict backed by the BEC global variable ``tomo_progress``.
Readable from any BEC client session via::
client.get_global_var("tomo_progress")
Individual fields can be read and written just like a regular dict::
flomni.progress["projection"] # read
flomni.progress["projection"] = 42 # write (persists immediately)
To update multiple fields atomically use :py:meth:`_ProgressProxy.update`::
flomni.progress.update(projection=42, angle=90.0)
To reset all fields to their defaults::
flomni.progress.reset()
"""
return self._progress_proxy
@progress.setter
def progress(self, val: dict) -> None:
"""Replace the entire progress dict.
Accepts a plain :class:`dict` and persists it to the global var store.
Example::
flomni.progress = {"projection": 0, "total_projections": 100, ...}
"""
if not isinstance(val, dict):
raise TypeError(f"progress must be a dict, got {type(val).__name__!r}")
self._progress_proxy._save(val)
@property
def tomo_shellstep(self):
val = self.client.get_global_var("tomo_shellstep")
@@ -1580,11 +1479,21 @@ class Flomni(
def sample_name(self):
return self.sample_get_name(0)
def write_to_scilog(self, content, tags: list = None):
try:
if tags is not None:
tags.append("BEC")
else:
tags = ["BEC"]
msg = bec.logbook.LogbookMessage()
msg.add_text(content).add_tag(tags)
self.client.logbook.send_logbook_message(msg)
except Exception:
logger.warning("Failed to write to scilog.")
def tomo_alignment_scan(self):
"""
Performs a tomogram alignment scan.
Collects all scan numbers acquired during the alignment, prints them at the end,
and creates a BEC scilog text entry summarising the alignment scan numbers.
"""
if self.get_alignment_offset(0) == (0, 0, 0):
print("It appears that the xrayeye alignemtn was not performend or loaded. Aborting.")
@@ -1594,9 +1503,11 @@ class Flomni(
self.feye_out()
tags = ["BEC_alignment_tomo", self.sample_name]
self.write_to_scilog(
f"Starting alignment scan. First scan number: {bec.queue.next_scan_number}.", tags
)
start_angle = 0
alignment_scan_numbers = []
angle_end = start_angle + 180
for angle in np.linspace(start_angle, angle_end, num=int(180 / 45) + 1, endpoint=True):
@@ -1608,6 +1519,7 @@ class Flomni(
try:
start_scan_number = bec.queue.next_scan_number
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
@@ -1621,27 +1533,24 @@ class Flomni(
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
#self._write_tomo_scan_number(scan_nr, angle, 0)
alignment_scan_numbers.append(scan_nr)
self._write_tomo_scan_number(scan_nr, angle, 0)
umv(dev.fsamroy, 0)
self.OMNYTools.printgreenbold(
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit by flomni.read_alignment_offset() ."
"\n\nAlignment scan finished. Please run SPEC_ptycho_align and load the new fit."
)
# summary of alignment scan numbers
scan_list_str = ", ".join(str(s) for s in alignment_scan_numbers)
#print(f"\nAlignment scan numbers ({len(alignment_scan_numbers)} total): {scan_list_str}")
# BEC scilog entry (no logo)
scilog_content = (
f"Alignment scan finished.\n"
f"Sample: {self.sample_name}\n"
f"Number of alignment scans: {len(alignment_scan_numbers)}\n"
f"Alignment scan numbers: {scan_list_str}\n"
def _write_subtomo_to_scilog(self, subtomo_number):
dev = builtins.__dict__.get("dev")
bec = builtins.__dict__.get("bec")
if self.tomo_id > 0:
tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
else:
tags = ["BEC_subtomo", self.sample_name]
self.write_to_scilog(
f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
tags,
)
print(scliog_content)
bec.messaging.scilog.new().add_text(scilog_content.replace("\n", "<br>")).add_tags("alignmentscan").send()
def sub_tomo_scan(self, subtomo_number, start_angle=None):
"""
@@ -1650,6 +1559,18 @@ class Flomni(
subtomo_number (int): The sub tomogram number.
start_angle (float, optional): The start angle of the scan. Defaults to None.
"""
# dev = builtins.__dict__.get("dev")
# bec = builtins.__dict__.get("bec")
# if self.tomo_id > 0:
# tags = ["BEC_subtomo", self.sample_name, f"tomo_id_{self.tomo_id}"]
# else:
# tags = ["BEC_subtomo", self.sample_name]
# self.write_to_scilog(
# f"Starting subtomo: {subtomo_number}. First scan number: {bec.queue.next_scan_number}.",
# tags,
# )
self._write_subtomo_to_scilog(subtomo_number)
if start_angle is not None:
print(f"Sub tomo scan with start angle {start_angle} requested.")
@@ -1749,7 +1670,6 @@ class Flomni(
successful = False
error_caught = False
if 0 <= angle < 180.05:
self.progress["heartbeat"] = datetime.datetime.now().isoformat()
print(f"Starting flOMNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
@@ -1793,9 +1713,9 @@ class Flomni(
)
if self.OMNYTools.yesno("Shall I continue?", "n"):
print("OK")
else:
print("Stopping.")
return
else:
print("Stopping.")
return
self.flomnigui_show_progress()
@@ -1823,8 +1743,6 @@ class Flomni(
# self.write_pdf_report()
# else:
self.tomo_id = 0
self.write_pdf_report()
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
@@ -1844,6 +1762,7 @@ class Flomni(
while True:
angle, subtomo_number = self._golden(ii, self.golden_ratio_bunch_size, 180, 1)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1891,6 +1810,7 @@ class Flomni(
ii, int(180 / self.tomo_angle_stepsize), 180, 1, 0
)
if previous_subtomo_number != subtomo_number:
self._write_subtomo_to_scilog(subtomo_number)
if (
subtomo_number % 2 == 1
and ii > 10
@@ -1932,42 +1852,14 @@ class Flomni(
self._print_progress()
self.OMNYTools.printgreenbold("Tomoscan finished")
@staticmethod
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string, e.g. '2h 03m 15s'."""
seconds = int(seconds)
h, remainder = divmod(seconds, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}h {m:02d}m {s:02d}s"
if m > 0:
return f"{m}m {s:02d}s"
return f"{s}s"
def _print_progress(self):
# --- compute and store estimated remaining time -----------------------
start_str = self.progress.get("tomo_start_time")
projection = self.progress["projection"]
total = self.progress["total_projections"]
if start_str is not None and total > 0 and projection > 9:
elapsed = (
datetime.datetime.now() - datetime.datetime.fromisoformat(start_str)
).total_seconds()
rate = projection / elapsed # projections per second
remaining_s = (total - projection) / rate
self.progress["estimated_remaining_time"] = remaining_s
eta_str = self._format_duration(remaining_s)
else:
eta_str = "N/A"
# ----------------------------------------------------------------------
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']:.0f}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}")
print(f"Estimated remaining time: ........ {eta_str}\x1b[0m")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database(
@@ -1991,6 +1883,7 @@ class Flomni(
return
self.tomo_scan_projection(angle)
self.tomo_reconstruct()
def _golden(self, ii, howmany_sorted, maxangle, reverse=False):
"""returns the iis golden ratio angle of sorted bunches of howmany_sorted and its subtomo number"""
@@ -2095,7 +1988,7 @@ class Flomni(
f"{str(datetime.datetime.now())}: flomni scan projection at angle {angle}, scan"
f" number {bec.queue.next_scan_number}.\n"
)
# self.write_to_scilog(log_message, ["BEC_scans", self.sample_name])
scans.flomni_fermat_scan(
fovx=self.fovx,
fovy=self.fovy,
@@ -2108,9 +2001,6 @@ class Flomni(
corridor_size=corridor_size,
)
self.tomo_reconstruct()
def tomo_parameters(self):
"""print and update the tomo parameters"""
print("Current settings:")
@@ -2252,58 +2142,38 @@ class Flomni(
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
f"{'Sample Name:':<{padding}}{self.sample_name:>{padding}}\n",
f"{'Measurement ID:':<{padding}}{str(self.tomo_id):>{padding}}\n",
f"{'Dataset ID:':<{padding}}{dataset_id:>{padding}}\n",
f"{'Sample Info:':<{padding}}{'Sample Info':>{padding}}\n",
f"{'e-account:':<{padding}}{str(account):>{padding}}\n",
f"{'e-account:':<{padding}}{str(self.client.username):>{padding}}\n",
f"{'Number of projections:':<{padding}}{int(180 / self.tomo_angle_stepsize * 8):>{padding}}\n",
f"{'First scan number:':<{padding}}{self.client.queue.next_scan_number:>{padding}}\n",
f"{'Last scan number approx.:':<{padding}}{self.client.queue.next_scan_number + int(180 / self.tomo_angle_stepsize * 8) + 10:>{padding}}\n",
f"{'Current photon energy:':<{padding}}To be implemented\n",
#f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Current photon energy:':<{padding}}{dev.mokev.read()['mokev']['value']:>{padding}.4f}\n",
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}.0f}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]
content = "".join(content)
user_target = os.path.expanduser(f"~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
user_target = os.path.expanduser(f"~/Data10/documentation/tomo_scan_ID_{self.tomo_id}.pdf")
with PDFWriter(user_target) as file:
file.write(header)
file.write(content)
# subprocess.run(
# "xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
# )
subprocess.run(
"xterm /work/sls/spec/local/XOMNY/bin/upload/upload_last_pon.sh &", shell=True
)
# status = subprocess.run(f"cp /tmp/spec-e20131-specES1.pdf {user_target}", shell=True)
# msg = bec.tomo_progress.tomo_progressMessage()
# logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
# msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
# ["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "flOMNI", self.sample_name]
# )
# self.client.tomo_progress.send_tomo_progress_message("~/data/raw/documentation/tomo_scan_ID_{self.tomo_id}.pdf").send()
import csaxs_bec
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
logo_file_rel = "flOMNI.png"
# Build the absolute path correctly
logo_file = (
csaxs_bec_basepath.parent
/ "bec_ipython_client"
/ "plugins"
/ "flomni"
/ logo_file_rel
).resolve()
print(logo_file)
bec.messaging.scilog.new().add_attachment(logo_file, width=200).add_text(content.replace("\n", "<br>")).add_tags("tomoscan").send()
msg = bec.logbook.LogbookMessage()
logo_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LamNI_logo.png")
msg.add_file(logo_path).add_text("".join(content).replace("\n", "</p><p>")).add_tag(
["BEC", "tomo_parameters", f"dataset_id_{dataset_id}", "LamNI", self.sample_name]
)
self.client.logbook.send_logbook_message(msg)
if __name__ == "__main__":

View File

@@ -50,6 +50,8 @@ class FlomniOpticsMixin:
# move both axes to the desired "in" positions
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
self.xrayeye_update_frame()
def _ffzp_in(self):
foptx_in = self._get_user_param_safe("foptx", "in")
fopty_in = self._get_user_param_safe("fopty", "in")

View File

@@ -223,14 +223,6 @@ class flomniGuiTools:
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
"""Update the progress ring bar and center label from the current progress state.
``self.progress`` is backed by the BEC global variable ``tomo_progress``
(see :class:`_ProgressProxy` in ``flomni.py``), so this method reflects
the live state that is also accessible from other BEC client sessions via::
client.get_global_var("tomo_progress")
"""
main_progress_ring = self.progressbar.rings[0]
subtomo_progress_ring = self.progressbar.rings[1]
if self.progressbar is not None:
@@ -243,31 +235,6 @@ class flomniGuiTools:
main_progress_ring.set_value(progress)
subtomo_progress_ring.set_value(subtomo_progress)
# --- format start time for display --------------------------------
start_str = self.progress.get("tomo_start_time")
if start_str is not None:
import datetime as _dt
start_display = _dt.datetime.fromisoformat(start_str).strftime("%Y-%m-%d %H:%M:%S")
else:
start_display = "N/A"
# --- format estimated remaining time ------------------------------
remaining_s = self.progress.get("estimated_remaining_time")
if remaining_s is not None and remaining_s >= 0:
import datetime as _dt
remaining_s = int(remaining_s)
h, rem = divmod(remaining_s, 3600)
m, s = divmod(rem, 60)
if h > 0:
eta_display = f"{h}h {m:02d}m {s:02d}s"
elif m > 0:
eta_display = f"{m}m {s:02d}s"
else:
eta_display = f"{s}s"
else:
eta_display = "N/A"
# ------------------------------------------------------------------
text = (
f"Progress report:\n"
f" Tomo type: {self.progress['tomo_type']}\n"
@@ -276,9 +243,7 @@ class flomniGuiTools:
f" Angle: {self.progress['angle']:.1f}\n"
f" Current subtomo: {self.progress['subtomo']}\n"
f" Current projection within subtomo: {self.progress['subtomo_projection']}\n"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}\n"
f" Scan started: {start_display}\n"
f" Est. remaining: {eta_display}"
f" Total projections per subtomo: {int(self.progress['subtomo_total_projections'])}"
)
self.progressbar.set_center_label(text)

View File

@@ -253,8 +253,6 @@ class XrayEyeAlign:
umv(dev.rtx, 0)
print("You are ready to remove the xray eye and start ptychography scans.")
print("Fine alignment: flomni.tomo_parameters() , then flomni.tomo_alignment_scan()")
print("After that, run the fit in Matlab and load the new fit flomni.read_alignment_offset()")
def write_output(self):
file = os.path.expanduser("~/Data10/specES1/internal/xrayeye_alignmentvalues")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 359 KiB

View File

@@ -317,6 +317,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
try:
scan_done = bool(value == self._num_total_triggers)
self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done)
if scan_done:
self._scan_done_event.set()
except Exception:
content = traceback.format_exc()
logger.info(f"Device {self.name} error: {content}")
@@ -391,7 +393,6 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
self._current_data_index = 0
# NOTE Make sure that the signal that omits mca callbacks is cleared
# DO NOT REMOVE!!
self._omit_mca_callbacks.clear()
# For a fly scan we need to start the mcs card ourselves
@@ -562,9 +563,8 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard):
def on_stop(self) -> None:
"""Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here."""
with suppress_mca_callbacks(self):
self.stop_all.put(1)
self.erase_all.put(1)
self.stop_all.put(1)
self.erase_all.put(1)
def mcs_recovery(self, timeout: int = 1) -> None:
"""

View File

@@ -13,14 +13,6 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -34,7 +26,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
shutter = Cpt(Signal, name="shutter")
# -----------------------------------------------------
# User-facing shutter control functions

View File

@@ -217,16 +217,6 @@ def test_mcs_card_csaxs_complete_and_stop(mock_mcs_csaxs: MCSCardCSAXS):
assert not mcs._start_monitor_async_data_emission.is_set()
def test_mcs_on_stop(mock_mcs_csaxs: MCSCardCSAXS):
"""Test that on stop sets the omit_mca_callbacks flag. Also test that on stage clears the omit_mca_callbacks flag."""
mcs = mock_mcs_csaxs
assert mcs._omit_mca_callbacks.is_set() is False
mcs.stop()
assert mcs._omit_mca_callbacks.is_set() is True
mcs.stage()
assert mcs._omit_mca_callbacks.is_set() is False
def test_mcs_recovery(mock_mcs_csaxs: MCSCardCSAXS):
mcs = mock_mcs_csaxs
# Simulate ongoing acquisition

View File

@@ -1,69 +0,0 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1