refactor(gf): cleanup of the std daq integration

This commit is contained in:
2025-03-06 15:08:40 +01:00
parent b8dcda1696
commit d28f515c0d
10 changed files with 1983 additions and 672 deletions

View File

@@ -12,10 +12,27 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = ["ophyd_devices", "bec_lib", "requests", "websockets", "pyzmq", "jinja2"]
dependencies = [
"ophyd_devices",
"bec_lib",
"requests",
"websockets",
"pyzmq",
"jinja2",
]
[project.optional-dependencies]
dev = ["black", "isort", "coverage", "pylint", "pytest", "pytest-random-order", "ophyd_devices", "bec_server"]
dev = [
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"ophyd_devices",
"bec_server",
"requests-mock",
]
[project.entry-points."bec"]
plugin_bec = "tomcat_bec"

View File

@@ -0,0 +1,358 @@
import json
from unittest import mock
import pytest
import requests
import requests_mock
import typeguard
from ophyd import StatusBase
from websockets import WebSocketException
from tomcat_bec.devices.gigafrost.std_daq_client import (
StdDaqClient,
StdDaqConfig,
StdDaqError,
StdDaqStatus,
)
@pytest.fixture
def client():
parent_device = mock.MagicMock()
_client = StdDaqClient(
parent=parent_device, ws_url="http://localhost:5000", rest_url="http://localhost:5000"
)
yield _client
_client.shutdown()
@pytest.fixture
def full_config():
full_config = StdDaqConfig(
detector_name="tomcat-gf",
detector_type="gigafrost",
n_modules=8,
bit_depth=16,
image_pixel_height=2016,
image_pixel_width=2016,
start_udp_port=2000,
writer_user_id=18600,
max_number_of_forwarders_spawned=8,
use_all_forwarders=True,
module_sync_queue_size=4096,
number_of_writers=12,
module_positions={},
ram_buffer_gb=150,
delay_filter_timeout=10,
live_stream_configs={
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
},
)
return full_config
def test_stddaq_client(client):
assert client is not None
def test_stddaq_client_get_daq_config(client, full_config):
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
out = client.get_config()
# Check that the response is simply the json response
assert out == response.model_dump()
assert client._config == response
def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object
config = full_config
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config)
# Verify the last request
assert m.last_request.json() == full_config.model_dump()
def test_std_daq_client_set_config_dict(client, full_config):
"""
Test setting configurations through the StdDAQ client with a dictionary input.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
"""
Test that the set_config method ignores extra keys in the input dictionary.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
config_dict["extra_key"] = "extra_value"
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_error(client, full_config):
"""
Test error handling in the set_config method.
"""
with requests_mock.Mocker() as m:
config = full_config
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
with pytest.raises(requests.exceptions.HTTPError):
client.set_config(config)
def test_stddaq_client_get_config_cached(client, full_config):
"""
Test that the client returns the cached configuration if it is available.
"""
# Set the cached configuration
config = full_config
client._config = config
# Test that the client returns the cached configuration
assert client.get_config(cached=True) == config
def test_stddaq_client_status(client):
client._status = StdDaqStatus.FILE_CREATED
assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start(client):
with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
out = client._send_queue.get()
assert out == {
"command": "start",
"path": "test_file_path",
"file_prefix": "test_file_prefix",
"n_image": 10,
}
StatusBase().wait.assert_called_once()
def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_stop(client):
"""
Check that the stop method puts the stop command in the send queue.
"""
client.stop()
client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_update_config(client, full_config):
"""
Test that the update_config method updates the configuration with the provided dictionary.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 1
def test_stddaq_client_updates_only_changed_configs(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_pre_restart(client):
"""
Test that the pre_restart method sets the status to RESTARTING.
"""
# let's assume the websocket loop is already idle
client._ws_idle_event.set()
client.ws_client = mock.MagicMock()
client._pre_restart()
client.ws_client.close.assert_called_once()
def test_stddaq_client_post_restart(client):
"""
Test that the post_restart method sets the status to IDLE.
"""
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._post_restart()
wait_for_connection.assert_called_once()
assert client._daq_is_running.is_set()
def test_stddaq_client_reset(client):
"""
Test that the reset method calls get_config and set_config.
"""
with (
mock.patch.object(client, "get_config") as get_config,
mock.patch.object(client, "set_config") as set_config,
):
client.reset()
get_config.assert_called_once()
set_config.assert_called_once()
def test_stddaq_client_run_status_callbacks(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
status.wait()
assert len(status._callbacks) == 0
def test_stddaq_client_run_status_callbacks_error(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
with pytest.raises(StdDaqError):
status.wait()
assert len(status._callbacks) == 0
@pytest.mark.parametrize(
"msg, updated",
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
)
def test_stddaq_client_on_received_ws_message(client, msg, updated):
"""
Test that the on_received_ws_message method runs the status callback.
"""
client._status = None
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
client._on_received_ws_message(msg)
if updated:
run_status_callbacks.assert_called_once()
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
else:
run_status_callbacks.assert_not_called()
assert client._status is None
def test_stddaq_client_ws_send_and_receive(client):
client.ws_client = mock.MagicMock()
client._send_queue.put({"command": "test"})
client._ws_send_and_receive()
# queue is not empty, so we should send the message
client.ws_client.send.assert_called_once()
client.ws_client.recv.assert_called_once()
client.ws_client.reset_mock()
client._ws_send_and_receive()
# queue is empty, so we should not send the message
client.ws_client.send.assert_not_called()
client.ws_client.recv.assert_called_once()
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
"""
Test that the ws_send_and_receive method handles websocket errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.send.side_effect = WebSocketException()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_called_once()
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
"""
Test that the ws_send_and_receive method handles timeout errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.recv.side_effect = TimeoutError()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_not_called()
def test_stddaq_client_ws_update_loop(client):
"""
Test that the ws_update_loop method runs the status callback.
"""
client._shutdown_event = mock.MagicMock()
client._shutdown_event.is_set.side_effect = [False, True]
with (
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
):
client._ws_update_loop()
ws_send_and_receive.assert_called_once()
wait_for_server_running.assert_called_once()

View File

@@ -0,0 +1,191 @@
eyex:
readoutPriority: baseline
description: X-ray eye axis X
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
deviceConfig:
prefix: MTEST-X05LA-ES2-XRAYEYE:M1
deviceTags:
- xray-eye
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
# eyey:
# readoutPriority: baseline
# description: X-ray eye axis Y
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
# deviceConfig:
# prefix: MTEST-X05LA-ES2-XRAYEYE:M2
# deviceTags:
# - xray-eye
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
# eyez:
# readoutPriority: baseline
# description: X-ray eye axis Z
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
# deviceConfig:
# prefix: MTEST-X05LA-ES2-XRAYEYE:M3
# deviceTags:
# - xray-eye
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
femto_mean_curr:
readoutPriority: monitored
description: Femto mean current
deviceClass: ophyd.EpicsSignal
deviceConfig:
auto_monitor: true
read_pv: MTEST-X05LA-ES2-XRAYEYE:FEMTO-MEAN-CURR
deviceTags:
- xray-eye
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
# es1_roty:
# readoutPriority: baseline
# description: 'Test rotation stage'
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorMR
# deviceConfig:
# prefix: X02DA-ES1-SMP1:ROTY
# deviceTags:
# - es1-sam
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
# es1_ismc:
# description: 'Automation1 iSMC interface'
# deviceClass: tomcat_bec.devices.aa1Controller
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:CTRL:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# es1_tasks:
# description: 'Automation1 task management interface'
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:TASK:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# es1_psod:
# description: 'AA1 PSO output interface (trigger)'
# deviceClass: tomcat_bec.devices.aa1AxisPsoDistance
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:PSO:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
# es1_ddaq:
# description: 'Automation1 position recording interface'
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
#camera:
# description: Grashopper Camera
# deviceClass: tomcat_bec.devices.GrashopperTOMCAT
# deviceConfig:
# prefix: 'X02DA-PG-USB:'
# deviceTags:
# - camera
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
gfcam:
description: GigaFrost camera client
deviceClass: tomcat_bec.devices.GigaFrostCamera
deviceConfig:
prefix: 'X02DA-CAM-GF2:'
backend_url: 'http://sls-daq-001:8080'
std_daq_ws: 'ws://129.129.95.111:8080'
std_daq_rest: 'http://129.129.95.111:5000'
std_daq_live: 'tcp://129.129.95.111:20001'
auto_soft_enable: true
deviceTags:
- camera
- trigger
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# gfdaq:
# description: GigaFrost stdDAQ client
# deviceClass: tomcat_bec.devices.StdDaqClient
# deviceConfig:
# ws_url: 'ws://129.129.95.111:8080'
# rest_url: 'http://129.129.95.111:5000'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# daq_stream0:
# description: stdDAQ preview (2 every 555)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20000'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# daq_stream1:
# description: stdDAQ preview (1 at 5 Hz)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20001'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false

View File

@@ -0,0 +1,22 @@
{
"detector_name": "tomcat-gf",
"detector_type": "gigafrost",
"n_modules": 8,
"bit_depth": 16,
"image_pixel_height": 2016,
"image_pixel_width": 2016,
"start_udp_port": 2000,
"writer_user_id": 18600,
"max_number_of_forwarders_spawned": 8,
"use_all_forwarders": true,
"module_sync_queue_size": 4096,
"number_of_writers": 12,
"module_positions": {},
"ram_buffer_gb": 150,
"delay_filter_timeout": 10,
"live_stream_configs": {
"tcp://129.129.95.111:20000": { "type": "periodic", "config": [1, 5] },
"tcp://129.129.95.111:20001": { "type": "periodic", "config": [1, 5] },
"tcp://129.129.95.38:20000": { "type": "periodic", "config": [1, 1] }
}
}

View File

@@ -0,0 +1,292 @@
"""
This module contains the PV definitions for the Gigafrost camera at Tomcat. It
does not contain any logic to control the camera.
"""
from ophyd import Component as Cpt
from ophyd import Device, DynamicDeviceComponent, EpicsSignal, EpicsSignalRO, Kind, Signal
import tomcat_bec.devices.gigafrost.gfconstants as const
class GigaFrostSignalWithValidation(EpicsSignal):
"""
Custom EpicsSignal class that validates the value with the specified validator
before setting the value.
"""
def __init__(
self,
read_pv,
write_pv=None,
*,
put_complete=False,
string=False,
limits=False,
name=None,
validator=None,
**kwargs,
):
self._validator = validator
super().__init__(
read_pv,
write_pv,
put_complete=put_complete,
string=string,
limits=limits,
name=name,
**kwargs,
)
def check_value(self, value):
if self._validator is not None:
self._validator(value)
return super().check_value(value)
def check_image_width(value):
"""
The Gigafrost camera requires the image width to be a multiple of 48.
"""
if value % 48 != 0:
raise ValueError("Image width must be a multiple of 48")
def check_image_height(value):
"""
The Gigafrost camera requires the image height to be a multiple of 16.
"""
if value % 16 != 0:
raise ValueError("Image height must be a multiple of 16")
class GigaFrostBase(Device):
"""Ophyd device class to control Gigafrost cameras at Tomcat
The actual hardware is implemented by an IOC based on an old fork of Helge's
cameras. This means that the camera behaves differently than the SF cameras
in particular it provides even less feedback about it's internal progress.
Helge will update the GigaFrost IOC after working beamline.
The ophyd class is based on the 'gfclient' package and has a lot of Tomcat
specific additions. It does behave differently though, as ophyd swallows the
errors from failed PV writes.
Parameters
----------
use_soft_enable : bool
Flag to use the camera's soft enable (default: False)
backend_url : str
Backend url address necessary to set up the camera's udp header.
(default: http://xbl-daq-23:8080)
Bugs:
----------
FRAMERATE : Ignored in soft trigger mode, period becomes 2xExposure time
"""
# pylint: disable=too-many-instance-attributes
busy_stat = Cpt(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
sync_flag = Cpt(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
sync_swhw = Cpt(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True, kind=Kind.omitted)
start_cam = Cpt(EpicsSignal, "START_CAM", put_complete=True, kind=Kind.omitted)
set_param = Cpt(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted)
acqmode = Cpt(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
array_size = DynamicDeviceComponent(
{
"array_size_x": (EpicsSignalRO, "ROIX", {"auto_monitor": True}),
"array_size_y": (EpicsSignalRO, "ROIY", {"auto_monitor": True}),
},
doc="Size of the array in the XY dimensions",
)
# UDP header
ports = Cpt(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
framenum = Cpt(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
ht_offset = Cpt(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
write_srv = Cpt(EpicsSignal, "WRITE_SRV.PROC", put_complete=True, kind=Kind.omitted)
# Standard camera configs
exposure = Cpt(EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config)
framerate = Cpt(
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config
)
roix = Cpt(
GigaFrostSignalWithValidation,
"ROIX",
put_complete=True,
auto_monitor=True,
kind=Kind.config,
validator=check_image_width,
)
roiy = Cpt(
GigaFrostSignalWithValidation,
"ROIY",
put_complete=True,
auto_monitor=True,
kind=Kind.config,
validator=check_image_height,
)
scan_id = Cpt(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
cnt_num = Cpt(EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config)
corr_mode = Cpt(
EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config
)
# Software signals
soft_enable = Cpt(EpicsSignal, "SOFT_ENABLE", put_complete=True)
soft_trig = Cpt(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted)
soft_exp = Cpt(EpicsSignal, "SOFT_EXP", put_complete=True)
###############################################################################################
# Enable schemes
# NOTE: 0 physical, 1 virtual (i.e. always running, but logs enable signal)
mode_enbl_exp = Cpt(
EpicsSignal,
"MODE_ENBL_EXP_RBV",
write_pv="MODE_ENBL_EXP",
put_complete=True,
kind=Kind.config,
)
# Enable signals (combined by OR gate)
mode_enbl_ext = Cpt(
EpicsSignal,
"MODE_ENBL_EXT_RBV",
write_pv="MODE_ENBL_EXT",
put_complete=True,
kind=Kind.config,
)
mode_endbl_soft = Cpt(
EpicsSignal,
"MODE_ENBL_SOFT_RBV",
write_pv="MODE_ENBL_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_enbl_auto = Cpt(
EpicsSignal,
"MODE_ENBL_AUTO_RBV",
write_pv="MODE_ENBL_AUTO",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Trigger modes
mode_trig_ext = Cpt(
EpicsSignal,
"MODE_TRIG_EXT_RBV",
write_pv="MODE_TRIG_EXT",
put_complete=True,
kind=Kind.config,
)
mode_trig_soft = Cpt(
EpicsSignal,
"MODE_TRIG_SOFT_RBV",
write_pv="MODE_TRIG_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_trig_timer = Cpt(
EpicsSignal,
"MODE_TRIG_TIMER_RBV",
write_pv="MODE_TRIG_TIMER",
put_complete=True,
kind=Kind.config,
)
mode_trig_auto = Cpt(
EpicsSignal,
"MODE_TRIG_AUTO_RBV",
write_pv="MODE_TRIG_AUTO",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Exposure modes
# NOTE: I.e.exposure time control, usually TIMER
mode_exp_ext = Cpt(
EpicsSignal,
"MODE_EXP_EXT_RBV",
write_pv="MODE_EXP_EXT",
put_complete=True,
kind=Kind.config,
)
mode_exp_soft = Cpt(
EpicsSignal,
"MODE_EXP_SOFT_RBV",
write_pv="MODE_EXP_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_exp_timer = Cpt(
EpicsSignal,
"MODE_EXP_TIMER_RBV",
write_pv="MODE_EXP_TIMER",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Trigger configuration PVs
# NOTE: Theese PVs set the behavior on posedge and negedge of the trigger signal
cnt_startbit = Cpt(
EpicsSignal,
"CNT_STARTBIT_RBV",
write_pv="CNT_STARTBIT",
put_complete=True,
kind=Kind.config,
)
cnt_endbit = Cpt(
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# Line swap selection
ls_sw = Cpt(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
ls_nw = Cpt(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
ls_se = Cpt(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
ls_ne = Cpt(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
conn_parm = Cpt(EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config)
# HW settings as read only
pixrate = Cpt(EpicsSignalRO, "PIXRATE", auto_monitor=True, kind=Kind.config)
trig_delay = Cpt(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True, kind=Kind.config)
syncout_dly = Cpt(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True, kind=Kind.config)
bnc0_rbv = Cpt(EpicsSignalRO, "BNC0_RBV", auto_monitor=True, kind=Kind.config)
bnc1_rbv = Cpt(EpicsSignalRO, "BNC1_RBV", auto_monitor=True, kind=Kind.config)
bnc2_rbv = Cpt(EpicsSignalRO, "BNC2_RBV", auto_monitor=True, kind=Kind.config)
bnc3_rbv = Cpt(EpicsSignalRO, "BNC3_RBV", auto_monitor=True, kind=Kind.config)
bnc4_rbv = Cpt(EpicsSignalRO, "BNC4_RBV", auto_monitor=True, kind=Kind.config)
bnc5_rbv = Cpt(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config)
t_board = Cpt(EpicsSignalRO, "T_BOARD", auto_monitor=True)
auto_soft_enable = Cpt(Signal, kind=Kind.config)
backend_url = Cpt(Signal, kind=Kind.config)
mac_north = Cpt(Signal, kind=Kind.config)
mac_south = Cpt(Signal, kind=Kind.config)
ip_north = Cpt(Signal, kind=Kind.config)
ip_south = Cpt(Signal, kind=Kind.config)
file_path = Cpt(Signal, kind=Kind.config, value="")
file_prefix = Cpt(Signal, kind=Kind.config, value="")
num_images = Cpt(Signal, kind=Kind.config, value=1)
# pylint: disable=protected-access
def _define_backend_ip(self):
"""Select backend IP address for UDP stream"""
if self.backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
if self.backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
raise RuntimeError(f"Backend {self.backend_url.get()} not recognized.")
def _define_backend_mac(self):
"""Select backend MAC address for UDP stream"""
if self.backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
if self.backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
raise RuntimeError(f"Backend {self.backend_url.get()} not recognized.")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,433 @@
from __future__ import annotations
import copy
import enum
import json
import queue
import threading
import time
import traceback
from typing import TYPE_CHECKING, Callable, Literal
import requests
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from pydantic import BaseModel, ConfigDict, Field, model_validator
from typeguard import typechecked
from websockets import State
from websockets.exceptions import WebSocketException
from websockets.sync.client import ClientConnection, connect
if TYPE_CHECKING: # pragma: no cover
from ophyd import Device, DeviceStatus
logger = bec_logger.logger
class StdDaqError(Exception): ...
class StdDaqStatus(str, enum.Enum):
"""
Status of the StdDAQ.
Extracted from https://git.psi.ch/controls-ci/std_detector_buffer/-/blob/master/source/std-det-driver/src/driver_state.hpp
"""
CREATING_FILE = "creating_file"
ERROR = "error"
FILE_CREATED = "file_created"
FILE_SAVED = "file_saved"
IDLE = "idle"
RECORDING = "recording"
REJECTED = "rejected"
SAVING_FILE = "saving_file"
STARTED = "started"
STOP = "stop"
UNDEFINED = "undefined"
WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image"
class StdDaqConfig(BaseModel):
"""
Configuration for the StdDAQ
"""
detector_name: str
detector_type: str
n_modules: int
bit_depth: int
image_pixel_height: int
image_pixel_width: int
start_udp_port: int
writer_user_id: int
max_number_of_forwarders_spawned: int
use_all_forwarders: bool
module_sync_queue_size: int
number_of_writers: int
module_positions: dict
ram_buffer_gb: float
delay_filter_timeout: float
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]]
model_config = ConfigDict(extra="ignore")
@model_validator(mode="before")
@classmethod
def resolve_aliases(cls, values):
if "roix" in values:
values["image_pixel_height"] = values.pop("roiy")
if "roiy" in values:
values["image_pixel_width"] = values.pop("roix")
return values
class StdDaqConfigPartial(BaseModel):
"""
Partial configuration for the StdDAQ.
"""
detector_name: str | None = None
detector_type: str | None = None
n_modules: int | None = None
bit_depth: int | None = None
image_pixel_height: int | None = Field(default=None, alias="roiy")
image_pixel_width: int | None = Field(default=None, alias="roix")
start_udp_port: int | None = None
writer_user_id: int | None = None
max_number_of_forwarders_spawned: int | None = None
use_all_forwarders: bool | None = None
module_sync_queue_size: int | None = None
number_of_writers: int | None = None
module_positions: dict | None = None
ram_buffer_gb: float | None = None
delay_filter_timeout: float | None = None
live_stream_configs: dict[str, dict[Literal["type", "config"], str | list]] | None = None
model_config = ConfigDict(extra="ignore")
class StdDaqWsResponse(BaseModel):
"""
Response from the StdDAQ websocket
"""
status: StdDaqStatus
reason: str | None = None
model_config = ConfigDict(extra="allow")
class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset"]
def __init__(self, parent: Device, ws_url: str, rest_url: str):
self.parent = parent
self.ws_url = ws_url
self.rest_url = rest_url
self.ws_client: ClientConnection | None = None
self._status: StdDaqStatus = StdDaqStatus.UNDEFINED
self._ws_update_thread: threading.Thread | None = None
self._shutdown_event = threading.Event()
self._ws_idle_event = threading.Event()
self._daq_is_running = threading.Event()
self._config: StdDaqConfig | None = None
self._status_callbacks: dict[
str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]
] = {}
self._send_queue = queue.Queue()
self._daq_is_running.set()
@property
def status(self) -> StdDaqStatus:
"""
Get the status of the StdDAQ.
"""
return self._status
def add_status_callback(
self, status: DeviceStatus, success: list[StdDaqStatus], error: list[StdDaqStatus]
):
"""
Add a DeviceStatus callback for the StdDAQ. The status will be updated when the StdDAQ status changes and
set to finished when the status matches one of the specified success statuses and to exception when the status
matches one of the specified error statuses.
Args:
status (DeviceStatus): DeviceStatus object
success (list[StdDaqStatus]): list of statuses that indicate success
error (list[StdDaqStatus]): list of statuses that indicate error
"""
self._status_callbacks[id(status)] = (status, success, error)
@typechecked
def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True
) -> StatusBase:
"""
Start acquisition on the StdDAQ.
Args:
file_path (str): path to save the files
file_prefix (str): prefix of the files
num_images (int): number of images to acquire
timeout (float): timeout for the request
"""
logger.info(f"Starting StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["waiting_for_first_image"], error=[])
message = {
"command": "start",
"path": file_path,
"file_prefix": file_prefix,
"n_image": num_images,
}
self._send_queue.put(message)
if wait:
return status.wait(timeout=timeout)
return status
def stop(self):
"""
Stop acquisition on the StdDAQ.
Args:
timeout (float): timeout for the request
"""
message = {"command": "stop"}
return self._send_queue.put(message)
def get_config(self, cached=False, timeout: float = 2) -> dict:
"""
Get the current configuration of the StdDAQ.
Args:
cached (bool): whether to use the cached configuration
timeout (float): timeout for the request
Returns:
StdDaqConfig: configuration of the StdDAQ
"""
if cached and self._config is not None:
return self._config
response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
)
response.raise_for_status()
self._config = StdDaqConfig(**response.json())
return self._config.model_dump()
def set_config(self, config: StdDaqConfig | dict, timeout: float = 2) -> None:
"""
Set the configuration of the StdDAQ. This will overwrite the current configuration.
Args:
config (StdDaqConfig | dict): configuration to set
timeout (float): timeout for the request
"""
if not isinstance(config, StdDaqConfig):
config = StdDaqConfig(**config)
out = config.model_dump(exclude_none=True)
if not out:
logger.info(
"The provided config does not contain relevant values for the StdDaq. Skipping set_config."
)
return
self._pre_restart()
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=out, timeout=timeout
)
response.raise_for_status()
# Setting a new config will reboot the backend; we therefore have to restart the websocket
self._post_restart()
def _pre_restart(self):
self._daq_is_running.clear()
self._ws_idle_event.wait()
if self.ws_client is not None:
self.ws_client.close()
def _post_restart(self):
self.wait_for_connection()
self._daq_is_running.set()
def update_config(self, config: StdDaqConfigPartial | dict, timeout: float = 2) -> None:
"""
Update the configuration of the StdDAQ. This will update the current configuration.
Args:
config (StdDaqConfigPartial | dict): configuration to update
timeout (float): timeout for the request
"""
if not isinstance(config, StdDaqConfigPartial):
config = StdDaqConfigPartial(**config)
patch_config_dict = config.model_dump(exclude_none=True)
if not patch_config_dict:
return
current_config = copy.deepcopy(self.get_config())
new_config = copy.deepcopy(current_config)
new_config.update(patch_config_dict)
if current_config == new_config:
return
self.set_config(StdDaqConfig(**new_config), timeout=timeout)
def reset(self, min_wait: float = 5) -> None:
"""
Reset the StdDAQ.
Args:
min_wait (float): minimum wait time after reset
"""
self.set_config(self.get_config())
time.sleep(min_wait)
def wait_for_connection(self, timeout: float = 20) -> None:
"""
Wait for the connection to the StdDAQ to be established.
Args:
timeout (float): timeout for the request
"""
start_time = time.time()
while True:
if self.ws_client is not None and self.ws_client.state == State.OPEN:
return
try:
self.ws_client = connect(self.ws_url)
break
except ConnectionRefusedError as exc:
if time.time() - start_time > timeout:
raise TimeoutError("Timeout while waiting for connection to StdDAQ") from exc
time.sleep(2)
def create_virtual_datasets(self, file_path: str, file_prefix: str, timeout: float = 5) -> None:
"""
Combine the stddaq written files in a given folder in an interleaved
h5 virtual dataset.
Args:
file_path (str): path to the folder containing the files
file_prefix (str): prefix of the files to combine
timeout (float): timeout for the request
"""
# TODO: Add wait for 'idle' state
response = requests.post(
self.rest_url + "/api/h5/create_interleaved_vds",
params={"user": "ioc"},
json={
"base_path": file_path,
"file_prefix": file_prefix,
"output_file": file_prefix.rstrip("_") + ".h5",
},
timeout=timeout,
headers={"Content-type": "application/json"},
)
response.raise_for_status()
def connect(self):
"""
Connect to the StdDAQ. This method should be called after the client is created. It will
launch a background thread to exchange data with the StdDAQ.
"""
self._ws_update_thread = threading.Thread(
target=self._ws_update_loop, name=f"{self.parent.name}_stddaq_ws_loop", daemon=True
)
self._ws_update_thread.start()
def shutdown(self):
"""
Shutdown the StdDAQ client.
"""
if self._ws_update_thread is not None:
self._ws_update_thread.join()
if self.ws_client is not None:
self.ws_client.close()
self.ws_client = None
def _wait_for_server_running(self):
"""
Wait for the StdDAQ to be running. If the StdDaq is not running, the
websocket loop will be set to idle.
"""
while not self._shutdown_event.is_set():
if self._daq_is_running.wait(0.1):
self._ws_idle_event.clear()
break
self._ws_idle_event.set()
def _ws_send_and_receive(self):
if not self.ws_client:
self.wait_for_connection()
try:
try:
msg = self._send_queue.get(block=False)
logger.trace(f"Sending to stddaq ws: {msg}")
self.ws_client.send(json.dumps(msg))
logger.trace(f"Sent to stddaq ws: {msg}")
except queue.Empty:
pass
try:
recv_msgs = self.ws_client.recv(timeout=0.1)
except TimeoutError:
return
logger.trace(f"Received from stddaq ws: {recv_msgs}")
if recv_msgs is not None:
self._on_received_ws_message(recv_msgs)
except WebSocketException:
content = traceback.format_exc()
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
def _ws_update_loop(self):
"""
Loop to update the status property of the StdDAQ.
"""
while not self._shutdown_event.is_set():
self._wait_for_server_running()
self._ws_send_and_receive()
def _on_received_ws_message(self, msg: str):
"""
Handle a message received from the StdDAQ.
"""
try:
data = StdDaqWsResponse(**json.loads(msg))
except Exception:
content = traceback.format_exc()
logger.warning(f"Failed to decode websocket message: {content}")
return
self._status = data.status
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
Update the DeviceStatus objects based on the current status of the StdDAQ.
If the status matches one of the success or error statuses, the DeviceStatus object will be set to finished
or exception, respectively and removed from the list of callbacks.
"""
status = self._status
completed_callbacks = []
for dev_status, success, error in self._status_callbacks.values():
if status in success:
dev_status.set_finished()
logger.info(f"StdDaq status is {status}")
completed_callbacks.append(dev_status)
elif status in error:
logger.warning(f"StdDaq status is {status}")
dev_status.set_exception(StdDaqError(f"StdDaq status is {status}"))
completed_callbacks.append(dev_status)
for cb in completed_callbacks:
self._status_callbacks.pop(id(cb))

View File

@@ -0,0 +1,108 @@
import json
import threading
import time
from typing import Callable
import numpy as np
import zmq
from bec_lib.logger import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class StdDaqPreview:
USER_ACCESS = ["start", "stop"]
def __init__(self, url: str, cb: Callable):
self.url = url
self._socket = None
self._shutdown_event = threading.Event()
self._zmq_thread = None
self._on_update_callback = cb
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
time.sleep(1)
self._socket.connect(self.url)
def start(self):
self._zmq_thread = threading.Thread(
target=self._zmq_update_loop, daemon=True, name="StdDaq_live_preview"
)
self._zmq_thread.start()
def stop(self):
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
def _zmq_update_loop(self):
while not self._shutdown_event.is_set():
if self._socket is None:
self.connect()
try:
self._poll()
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
def _poll(self):
"""
Poll the ZMQ socket for new data. It will throttle the data update and
only subscribe to the topic for a single update. This is not very nice
but it seems like there is currently no option to set the update rate on
the backend.
"""
if self._shutdown_event.wait(0.2):
return
try:
# subscribe to the topic
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
self._parse_data(r)
finally:
# Unsubscribe from the topic
self._socket.setsockopt(zmq.UNSUBSCRIBE, ZMQ_TOPIC_FILTER)
def _parse_data(self, data):
# Length and throtling checks
if len(data) != 2:
logger.warning(f"Received malformed array of length {len(data)}")
# Unpack the Array V1 reply to metadata and array data
meta, img_data = data
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(img_data, dtype=np.uint16)
else:
raise ValueError(f"Unexpected type {header['type']}")
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
logger.info(f"Live update: frame {header['frame']}")
self._on_update_callback(image)

View File

@@ -1,2 +1,2 @@
from .tutorial_fly_scan import AcquireDark, AcquireWhite, AcquireRefs, TutorialFlyScanContLine
from .tomcat_scans import TomcatStepScan, TomcatSnapNStep, TomcatSimpleSequence
from .tutorial_fly_scan import AcquireDark, AcquireWhite, AcquireRefs, AcquireProjections, TutorialFlyScanContLine
from .tomcat_scans import TomcatSnapNStep, TomcatSimpleSequence

View File

@@ -1,15 +1,19 @@
import time
import numpy as np
from bec_lib import bec_logger
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import Acquire, AsyncFlyScanBase
logger = bec_logger.logger
class AcquireDark(Acquire):
scan_name = "acquire_dark"
required_kwargs = ["exp_burst"]
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, exp_burst: int, **kwargs):
def __init__(self, exp_burst: int, exp_time: float = 0, **kwargs):
"""
Acquire dark images. This scan is used to acquire dark images. Dark images are images taken with the shutter
closed and no beam on the sample. Dark images are used to correct the data images for dark current.
@@ -30,6 +34,10 @@ class AcquireDark(Acquire):
Predefined acquisition mode (default= 'default')
file_path : str, optional
File path for standard daq
ddc_trigger : int, optional
Drive Data Capture Trigger
ddc_source0 : int, optional
Drive Data capture Input0
Returns:
ScanReport
@@ -38,25 +46,33 @@ class AcquireDark(Acquire):
>>> scans.acquire_dark(5)
"""
super().__init__(**kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.exp_time = exp_time / 1000 # In BEC, the exp time is always in s, not ms.
super().__init__(self.exp_time, **kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.scan_motors = ["eyex"] # change to the correct shutter device
#self.shutter = "eyex" # change to the correct shutter device
self.dark_shutter_pos = 0 ### change with a variable
# self.shutter = "eyex" # change to the correct shutter device
self.dark_shutter_pos = 0 ### change with a variable
def scan_core(self):
# close the shutter
yield from self._move_scan_motors_and_wait(self.dark_shutter_pos)
#yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0])
# yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0])
yield from super().scan_core()
class AcquireWhite(Acquire):
scan_name = "acquire_white"
required_kwargs = ["exp_burst", "sample_position_out", "sample_angle_out"]
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, exp_burst: int, sample_position_out: float, sample_angle_out: float, **kwargs):
def __init__(
self,
exp_burst: int,
sample_position_out: float,
sample_angle_out: float,
motor: DeviceBase,
exp_time: float = 0,
**kwargs,
):
"""
Acquire flat field images. This scan is used to acquire flat field images. The flat field image is an image taken
with the shutter open but the sample out of the beam. Flat field images are used to correct the data images for
@@ -69,6 +85,8 @@ class AcquireWhite(Acquire):
Position to move the sample stage to position the sample out of beam and take flat field images
sample_angle_out : float
Angular position where to take the flat field images
motor : DeviceBase
Motor to be moved to move the sample out of beam
exp_time : float, optional
Exposure time [ms]. If not specified, the currently configured value on the camera will be used
exp_period : float, optional
@@ -81,6 +99,10 @@ class AcquireWhite(Acquire):
Predefined acquisition mode (default= 'default')
file_path : str, optional
File path for standard daq
ddc_trigger : int, optional
Drive Data Capture Trigger
ddc_source0 : int, optional
Drive Data capture Input0
Returns:
ScanReport
@@ -89,48 +111,63 @@ class AcquireWhite(Acquire):
>>> scans.acquire_white(5, 20)
"""
super().__init__(**kwargs)
self.exp_time = exp_time / 1000 # In BEC, the exp time is always in s, not ms.
super().__init__(exp_time=exp_time, **kwargs)
self.burst_at_each_point = 1
self.sample_position_out = sample_position_out
self.sample_angle_out = sample_angle_out
self.motor_sample = motor
self.scan_motors = ["eyex", "eyez", "es1_roty"] # change to the correct shutter device
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
def scan_core(self):
# open the shutter and move the sample stage to the out position
self.scan_motors = ["eyez", "es1_roty"] # change to the correct shutter device
yield from self._move_scan_motors_and_wait([self.sample_position_out, self.sample_angle_out])
self.scan_motors = ["eyex"] # change to the correct shutter device
yield from self._move_scan_motors_and_wait([self.dark_shutter_pos_out])
# move the sample stage to the out position and correct angular position
status_sample_out_angle = yield from self.stubs.set(
device=[self.motor_sample, "es1_roty"],
value=[self.sample_position_out, self.sample_angle_out],
wait=False,
)
# open the main shutter (TODO change to the correct shutter device)
yield from self.stubs.set(device=["eyex"], value=[self.dark_shutter_pos_out])
status_sample_out_angle.wait()
# TODO add opening of fast shutter
yield from super().scan_core()
# TODO add closing of fast shutter
yield from self._move_scan_motors_and_wait([self.dark_shutter_pos_in])
yield from self.stubs.set(device=["eyex"], value=[self.dark_shutter_pos_in])
class AcquireProjectins(Acquire):
class AcquireProjections(AsyncFlyScanBase):
scan_name = "acquire_projections"
required_kwargs = ["exp_burst", "sample_position_in", "start_position", "angular_range"]
gui_config = {"Acquisition parameters": ["exp_burst"]}
gui_config = {
"Motor": ["motor"],
"Acquisition parameters": ["sample_position_in", "start_angle", "angular_range"],
"Camera": ["exp_time", "exp_burst"],
}
def __init__(self,
exp_burst: int,
sample_position_in: float,
start_position: float,
angular_range: float,
**kwargs):
def __init__(
self,
motor: DeviceBase,
exp_burst: int,
sample_position_in: float,
start_angle: float,
angular_range: float,
exp_time: float,
**kwargs,
):
"""
Acquire projection images.
Acquire projection images.
Args:
motor : DeviceBase
Motor to move continuously from start to stop position
exp_burst : int
Number of flat field images to acquire (no default)
sample_position_in : float
Position to move the sample stage to position the sample in the beam
start_position : float
start_angle : float
Angular start position for the scan
angular_range : float
Angular range
@@ -146,87 +183,117 @@ class AcquireProjectins(Acquire):
Predefined acquisition mode (default= 'default')
file_path : str, optional
File path for standard daq
ddc_trigger : int, optional
Drive Data Capture Trigger
ddc_source0 : int, optional
Drive Data capture Input0
Returns:
ScanReport
Examples:
>>> scans.acquire_white(5, 20)
>>> scans.acquire_projections()
"""
super().__init__(**kwargs)
self.motor = motor
super().__init__(exp_time=exp_time, **kwargs)
self.burst_at_each_point = 1
self.sample_position_in = sample_position_in
self.start_position = start_position
self.start_angle = start_angle
self.angular_range = angular_range
self.scan_motors = ["eyex", "eyez", "es1_roty"] # change to the correct shutter device
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
def update_scan_motors(self):
return [self.motor]
def prepare_positions(self):
self.positions = np.array([[self.start_angle], [self.start_angle + self.angular_range]])
self.num_pos = None
yield from self._set_position_offset()
def scan_core(self):
# open the shutter and move the sample stage to the out position
self.scan_motors = ["eyez", "es1_roty"] # change to the correct shutter device
yield from self._move_scan_motors_and_wait([self.sample_position_out, self.sample_angle_out])
self.scan_motors = ["eyex"] # change to the correct shutter device
yield from self._move_scan_motors_and_wait([self.dark_shutter_pos_out])
# move to in position and go to start angular position
yield from self.stubs.set(
device=["eyez", self.motor], value=[self.sample_position_in, self.positions[0][0]]
)
# open the shutter
yield from self.stubs.set(device="eyex", value=self.dark_shutter_pos_out)
# TODO add opening of fast shutter
yield from super().scan_core()
# TODO add closing of fast shutter
yield from self._move_scan_motors_and_wait([self.dark_shutter_pos_in])
# start the flyer
flyer_request = yield from self.stubs.set(
device=self.motor, value=self.positions[1][0], wait=False
)
self.connector.send_client_info(
"Starting the scan", show_asap=True, rid=self.metadata.get("RID")
)
yield from self.stubs.trigger()
while not flyer_request.done:
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(1)
# increase the point id
self.point_id += 1
self.num_pos = self.point_id
class AcquireRefs(Acquire):
scan_name = "acquire_refs"
required_kwargs = []
gui_config = {}
def __init__(
self,
motor: DeviceBase,
num_darks: int = 0,
num_flats: int = 0,
sample_angle_out: float = 0,
sample_position_in: float = 0,
sample_position_out: float = 5000,
file_prefix_dark: str = 'tmp_dark',
file_prefix_white: str = 'tmp_white',
exp_time: float = 0,
exp_period: float = 0,
image_width: int = 2016,
image_height: int = 2016,
acq_mode: str = 'default',
file_path: str = 'tmp',
nr_writers: int = 2,
base_path: str = 'tmp',
**kwargs
sample_position_out: float = 1,
file_prefix_dark: str = "tmp_dark",
file_prefix_white: str = "tmp_white",
**kwargs,
):
"""
Acquire reference images (darks + whites) and return to beam position.
Reference images are acquired automatically in an optimized sequence and
the sample is returned to the sample_in_position afterwards.
Args:
motor : DeviceBase
Motor to be moved to move the sample out of beam
num_darks : int , optional
Number of dark field images to acquire
num_flats : int , optional
Number of white field images to acquire
sample_angle_out : float , optional
Angular position where to take the flat field images
sample_position_in : float , optional
Sample stage X position for sample in beam [um]
sample_position_out : float ,optional
Sample stage X position for sample out of the beam [um]
sample_angle_out : float , optional
Angular position where to take the flat field images
exp_time : float, optional
Exposure time [ms]. If not specified, the currently configured value on the camera will be used
Exposure time [ms]. If not specified, the currently configured value
on the camera will be used
exp_period : float, optional
Exposure period [ms]. If not specified, the currently configured value on the camera will be used
Exposure period [ms]. If not specified, the currently configured value
on the camera will be used
image_width : int, optional
ROI size in the x-direction [pixels]. If not specified, the currently configured value on the camera will be used
ROI size in the x-direction [pixels]. If not specified, the currently
configured value on the camera will be used
image_height : int, optional
ROI size in the y-direction [pixels]. If not specified, the currently configured value on the camera will be used
ROI size in the y-direction [pixels]. If not specified, the currently
configured value on the camera will be used
acq_mode : str, optional
Predefined acquisition mode (default= 'default')
file_path : str, optional
@@ -239,6 +306,7 @@ class AcquireRefs(Acquire):
>>> scans.acquire_refs(sample_angle_out=90, sample_position_in=10, num_darks=5, num_flats=5, exp_time=0.1)
"""
self.motor = motor
super().__init__(**kwargs)
self.sample_position_in = sample_position_in
self.sample_position_out = sample_position_out
@@ -247,50 +315,77 @@ class AcquireRefs(Acquire):
self.num_flats = num_flats
self.file_prefix_dark = file_prefix_dark
self.file_prefix_white = file_prefix_white
self.exp_time = exp_time
self.exp_period = exp_period
self.image_width = image_width
self.image_height = image_height
self.acq_mode = acq_mode
self.file_path = file_path
self.nr_writers = nr_writers
self.base_path = base_path
def scan_core(self):
## TODO move sample in position and do not wait
## TODO move angle in position and do not wait
status_sample_out_angle = yield from self.stubs.set(
device=[self.motor, "es1_roty"],
value=[self.sample_position_out, self.sample_angle_out],
wait=False,
)
cameras = [
cam.name
for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
if self.num_darks:
self.connector.send_client_info(
f"Acquiring {self.num_darks} dark images",
show_asap=True,
rid=self.metadata.get("RID"),
)
# to set signals on a device
darks = AcquireDark(
exp_burst=self.num_darks,
file_prefix=self.file_prefix_dark,
device_manager=self.device_manager,
metadata=self.metadata
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
yield from darks.scan_core()
# reconfigure the cameras to write to a different file
for cam in cameras:
yield from self.stubs.send_rpc_and_wait(cam, "configure", **darks.scan_parameters)
yield from darks.pre_scan() # prepare for the upcoming scan
yield from darks.scan_core() # do the scan
yield from darks.finalize() # wait for everything to finish
self.point_id = darks.point_id
status_sample_out_angle.wait()
if self.num_flats:
self.connector.send_client_info(
f"Acquiring {self.num_flats} flat field images",
show_asap=True,
rid=self.metadata.get("RID"),
)
flats = AcquireWhite(
exp_burst=self.num_flats,
sample_position_out=self.sample_position_out,
sample_angle_out=self.sample_angle_out,
file_prefix=self.file_prefix_white,
# sample_position_out=self.sample_position_out,
# sample_angle_out=self.sample_angle_out,
# motor=self.motor,
device_manager=self.device_manager,
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
flats.point_id = self.point_id
yield from flats.scan_core()
# reconfigure the cameras to write to a different file
for cam in cameras:
yield from self.stubs.send_rpc_and_wait(cam, "configure", **flats.scan_parameters)
yield from flats.pre_scan() # prepare for the upcoming scan
yield from flats.scan_core() # do the scan
yield from flats.finalize() # wait for everything to finish
self.point_id = flats.point_id
## TODO move sample in beam and do not wait
## TODO move rotation to angle and do not wait