Feature/revisiting aerotech #26

Merged
mohacsi_i merged 15 commits from feature/revisiting-aerotech into development 2025-04-16 15:58:30 +02:00
13 changed files with 1123 additions and 488 deletions

View File

@@ -1,248 +1,281 @@
import json
from unittest import mock
# import json
# from unittest import mock
import pytest
import requests
import requests_mock
import typeguard
from ophyd import StatusBase
from websockets import WebSocketException
# import pytest
# import requests
# import requests_mock
# import typeguard
# from ophyd import StatusBase
# from websockets import WebSocketException
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
# from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
@pytest.fixture
def client():
parent_device = mock.MagicMock()
_client = StdDaqClient(
parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
)
yield _client
_client.shutdown()
# @pytest.fixture
# def client():
# parent_device = mock.MagicMock()
# _client = StdDaqClient(
# parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
# )
# yield _client
# _client.shutdown()
@pytest.fixture
def full_config():
full_config = dict(
detector_name="tomcat-gf",
detector_type="gigafrost",
n_modules=8,
bit_depth=16,
image_pixel_height=2016,
image_pixel_width=2016,
start_udp_port=2000,
writer_user_id=18600,
max_number_of_forwarders_spawned=8,
use_all_forwarders=True,
module_sync_queue_size=4096,
number_of_writers=12,
module_positions={},
ram_buffer_gb=150,
delay_filter_timeout=10,
live_stream_configs={
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
},
)
return full_config
# @pytest.fixture
# def full_config():
# full_config = dict(
# detector_name="tomcat-gf",
# detector_type="gigafrost",
# n_modules=8,
# bit_depth=16,
# image_pixel_height=2016,
# image_pixel_width=2016,
# start_udp_port=2000,
# writer_user_id=18600,
# max_number_of_forwarders_spawned=8,
# use_all_forwarders=True,
# module_sync_queue_size=4096,
# number_of_writers=12,
# module_positions={},
# ram_buffer_gb=150,
# delay_filter_timeout=10,
# live_stream_configs={
# "tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
# "tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
# "tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
# },
# )
# return full_config
def test_stddaq_client(client):
assert client is not None
# def test_stddaq_client(client):
# assert client is not None
def test_stddaq_client_get_daq_config(client, full_config):
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
out = client.get_config()
# def test_stddaq_client_get_daq_config(client, full_config):
# with requests_mock.Mocker() as m:
# response = full_config
# m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
# out = client.get_config()
# Check that the response is simply the json response
assert out == response.model_dump()
# # Check that the response is simply the json response
# assert out == response.model_dump()
assert client._config == response
# assert client._config == response
def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# def test_stddaq_client_set_config_pydantic(client, full_config):
# """Test setting configurations through the StdDAQ client"""
# with requests_mock.Mocker() as m:
# m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object
config = full_config
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config)
# # Test with StdDaqConfig object
# config = full_config
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
# client.set_config(config)
# Verify the last request
assert m.last_request.json() == full_config.model_dump()
# # Verify the last request
# assert m.last_request.json() == full_config.model_dump()
def test_std_daq_client_set_config_dict(client, full_config):
"""
Test setting configurations through the StdDAQ client with a dictionary input.
"""
# def test_std_daq_client_set_config_dict(client, full_config):
# """
# Test setting configurations through the StdDAQ client with a dictionary input.
# """
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# with requests_mock.Mocker() as m:
# m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
# # Test with dictionary input
# config_dict = full_config.model_dump()
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
# client.set_config(config_dict)
# assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
"""
Test that the set_config method ignores extra keys in the input dictionary.
"""
# def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
# """
# Test that the set_config method ignores extra keys in the input dictionary.
# """
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
m.post("http://localhost:5000/api/config/set?user=ioc")
# with requests_mock.Mocker() as m:
# m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
config_dict["extra_key"] = "extra_value"
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
# # Test with dictionary input
# config_dict = full_config.model_dump()
# config_dict["extra_key"] = "extra_value"
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
# client.set_config(config_dict)
# assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_error(client, full_config):
"""
Test error handling in the set_config method.
"""
with requests_mock.Mocker() as m:
config = full_config
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
with pytest.raises(requests.exceptions.HTTPError):
client.set_config(config)
# def test_stddaq_client_set_config_error(client, full_config):
# """
# Test error handling in the set_config method.
# """
# with requests_mock.Mocker() as m:
# config = full_config
# m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
# with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
# with pytest.raises(requests.exceptions.HTTPError):
# client.set_config(config)
def test_stddaq_client_get_config_cached(client, full_config):
"""
Test that the client returns the cached configuration if it is available.
"""
# def test_stddaq_client_get_config_cached(client, full_config):
# """
# Test that the client returns the cached configuration if it is available.
# """
# Set the cached configuration
config = full_config
client._config = config
# # Set the cached configuration
# config = full_config
# client._config = config
# Test that the client returns the cached configuration
assert client.get_config(cached=True) == config
# # Test that the client returns the cached configuration
# assert client.get_config(cached=True) == config
def test_stddaq_client_status(client):
client._status = StdDaqStatus.FILE_CREATED
assert client.status == StdDaqStatus.FILE_CREATED
# def test_stddaq_client_status(client):
# client._status = StdDaqStatus.FILE_CREATED
# assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
# def test_stddaq_client_start(client):
# with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
# client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
# out = client._send_queue.get()
# assert out == {
# "command": "start",
# "path": "test_file_path",
# "file_prefix": "test_file_prefix",
# "n_image": 10,
# }
# StatusBase().wait.assert_called_once()
def test_stddaq_client_update_config(client, full_config):
"""
Test that the update_config method updates the configuration with the provided dictionary.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config:
client.set_config(update_dict, update=True)
assert set_config.call_count == 1
# def test_stddaq_client_start_type_error(client):
# with pytest.raises(typeguard.TypeCheckError):
# client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_updates_only_changed_configs(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config:
client.set_config(update_dict, update=True)
assert set_config.call_count == 0
# def test_stddaq_client_stop(client):
# """
# Check that the stop method puts the stop command in the send queue.
# """
# client.stop()
# client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_pre_restart(client):
"""
Test that the pre_restart method sets the status to RESTARTING.
"""
# let's assume the websocket loop is already idle
client._ws_idle_event.set()
client.ws_client = mock.MagicMock()
client._pre_restart()
client.ws_client.close.assert_called_once()
# def test_stddaq_client_update_config(client, full_config):
# """
# Test that the update_config method updates the configuration with the provided dictionary.
# """
# config = full_config
# with requests_mock.Mocker() as m:
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# # Update the configuration
# update_dict = {"detector_name": "new_name"}
# with mock.patch.object(client, "set_config") as set_config:
# client.update_config(update_dict)
# assert set_config.call_count == 1
def test_stddaq_client_post_restart(client):
"""
Test that the post_restart method sets the status to IDLE.
"""
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._post_restart()
wait_for_connection.assert_called_once()
assert client._daq_is_running.is_set()
# def test_stddaq_client_updates_only_changed_configs(client, full_config):
# """
# Test that the update_config method only updates the configuration if the config has changed.
# """
# config = full_config
# with requests_mock.Mocker() as m:
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# # Update the configuration
# update_dict = {"detector_name": "tomcat-gf"}
# with mock.patch.object(client, "set_config") as set_config:
# client.update_config(update_dict)
# assert set_config.call_count == 0
def test_stddaq_client_reset(client):
"""
Test that the reset method calls get_config and set_config.
"""
with (
mock.patch.object(client, "get_config") as get_config,
mock.patch.object(client, "set_config") as set_config,
):
client.reset()
get_config.assert_called_once()
set_config.assert_called_once()
# def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
# """
# Test that the update_config method only updates the configuration if the config has changed.
# """
# config = full_config
# with requests_mock.Mocker() as m:
# m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# # Update the configuration
# update_dict = {}
# with mock.patch.object(client, "set_config") as set_config:
# client.update_config(update_dict)
# assert set_config.call_count == 0
def test_stddaq_client_run_status_callbacks(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
status.wait()
assert len(status._callbacks) == 0
# def test_stddaq_client_pre_restart(client):
# """
# Test that the pre_restart method sets the status to RESTARTING.
# """
# # let's assume the websocket loop is already idle
# client._ws_idle_event.set()
# client.ws_client = mock.MagicMock()
# client._pre_restart()
# client.ws_client.close.assert_called_once()
def test_stddaq_client_run_status_callbacks_error(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
with pytest.raises(StdDaqError):
status.wait()
# def test_stddaq_client_post_restart(client):
# """
# Test that the post_restart method sets the status to IDLE.
# """
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
# client._post_restart()
# wait_for_connection.assert_called_once()
# assert client._daq_is_running.is_set()
assert len(status._callbacks) == 0
# def test_stddaq_client_reset(client):
# """
# Test that the reset method calls get_config and set_config.
# """
# with (
# mock.patch.object(client, "get_config") as get_config,
# mock.patch.object(client, "set_config") as set_config,
# ):
# client.reset()
# get_config.assert_called_once()
# set_config.assert_called_once()
# def test_stddaq_client_run_status_callbacks(client):
# """
# Test that the run_status_callback method runs the status callback.
# """
# status = StatusBase()
# client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
# client._status = StdDaqStatus.FILE_CREATED
# client._run_status_callbacks()
# status.wait()
# assert len(status._callbacks) == 0
# def test_stddaq_client_run_status_callbacks_error(client):
# """
# Test that the run_status_callback method runs the status callback.
# """
# status = StatusBase()
# client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
# client._status = StdDaqStatus.FILE_CREATED
# client._run_status_callbacks()
# with pytest.raises(StdDaqError):
# status.wait()
# assert len(status._callbacks) == 0
# @pytest.mark.parametrize(
@@ -262,3 +295,59 @@ def test_stddaq_client_run_status_callbacks_error(client):
# else:
# run_status_callbacks.assert_not_called()
# assert client._status is None
# def test_stddaq_client_ws_send_and_receive(client):
# client.ws_client = mock.MagicMock()
# client._send_queue.put({"command": "test"})
# client._ws_send_and_receive()
# # queue is not empty, so we should send the message
# client.ws_client.send.assert_called_once()
# client.ws_client.recv.assert_called_once()
# client.ws_client.reset_mock()
# client._ws_send_and_receive()
# # queue is empty, so we should not send the message
# client.ws_client.send.assert_not_called()
# client.ws_client.recv.assert_called_once()
# def test_stddaq_client_ws_send_and_receive_websocket_error(client):
# """
# Test that the ws_send_and_receive method handles websocket errors.
# """
# client.ws_client = mock.MagicMock()
# client.ws_client.send.side_effect = WebSocketException()
# client._send_queue.put({"command": "test"})
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
# client._ws_send_and_receive()
# wait_for_connection.assert_called_once()
# def test_stddaq_client_ws_send_and_receive_timeout_error(client):
# """
# Test that the ws_send_and_receive method handles timeout errors.
# """
# client.ws_client = mock.MagicMock()
# client.ws_client.recv.side_effect = TimeoutError()
# client._send_queue.put({"command": "test"})
# with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
# client._ws_send_and_receive()
# wait_for_connection.assert_not_called()
# def test_stddaq_client_ws_update_loop(client):
# """
# Test that the ws_update_loop method runs the status callback.
# """
# client._shutdown_event = mock.MagicMock()
# client._shutdown_event.is_set.side_effect = [False, True]
# with (
# mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
# mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
# ):
# client._ws_update_loop()
# ws_send_and_receive.assert_called_once()
# wait_for_server_running.assert_called_once()

View File

@@ -51,6 +51,19 @@ es1_roty:
readOnly: false
softwareTrigger: false
es1_trx:
readoutPriority: monitored
description: 'Test translation stage'
deviceClass: ophyd.EpicsMotor
deviceConfig:
prefix: X02DA-ES1-SMP1:TRX
deviceTags:
- es1-sam
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
es1_ismc:
description: 'Automation1 iSMC interface'
deviceClass: tomcat_bec.devices.aa1Controller

View File

@@ -1,7 +1,6 @@
from .aerotech import (
aa1AxisDriveDataCollection,
aa1AxisPsoDistance,
aa1Controller,
aa1GlobalVariableBindings,
aa1GlobalVariables,
aa1Tasks,

View File

@@ -63,6 +63,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase, Device):
USER_ACCESS = ["configure", "reset", "arm", "disarm"]
# pylint: disable=duplicate-code, too-many-arguments
def __init__(
self,
prefix="",
@@ -157,6 +158,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase, Device):
def arm(self) -> None:
"""Bluesky-style stage"""
self._switch.set("ResetRB", settle_time=0.1).wait()
self._switch.set("Start", settle_time=0.2).wait()
def disarm(self):

View File

@@ -7,78 +7,17 @@ synchronized output (PSO) interface.
"""
from time import sleep
import numpy as np
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib import bec_logger
logger = bec_logger.logger
class AerotechPsoDistanceMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Stage the PSO distance module
self.parent.bluestage()
def on_unstage(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
def on_trigger(self) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.parent.name}] Triggerin...")
if self.parent.dstDistanceVal.get() == 0:
status = self.parent._eventSingle.set(1, settle_time=0.1)
return status
class aa1AxisPsoBase(PSIDeviceBase):
class AerotechPsoBase(PSIDeviceBase, Device):
"""Position Sensitive Output - Base class
This class provides convenience wrappers around the Aerotech IOC's PSO
@@ -152,8 +91,35 @@ class aa1AxisPsoBase(PSIDeviceBase):
outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config)
outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config)
def trigger(self, settle_time=0.1) -> DeviceStatus:
# pylint: disable=duplicate-code, too-many-arguments
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def fire(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.name}] Triggerin...")
self._eventSingle.set(1, settle_time=settle_time).wait()
status = DeviceStatus(self)
status.set_finished()
@@ -163,7 +129,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
"""Toggle waveform outup"""
orig_wave_mode = self.waveMode.get()
self.waveMode.set("Toggle").wait()
self.trigger(0.1)
self.fire(0.1)
self.waveMode.set(orig_wave_mode).wait()
def configure(self, d: dict):
@@ -203,7 +169,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
self.outSource.set("Window").wait()
class aa1AxisPsoDistance(aa1AxisPsoBase):
class aa1AxisPsoDistance(AerotechPsoBase):
"""Position Sensitive Output - Distance mode
This class provides convenience wrappers around the Aerotech API's PSO functionality in
@@ -232,13 +198,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
```
"""
custom_prepare_cls = AerotechPsoDistanceMixin
USER_ACCESS = ["configure", "prepare", "toggle"]
USER_ACCESS = ["configure", "fire", "toggle", "arm", "disarm"]
_distance_value = None
# ########################################################################
# PSO high level interface
def configure(self, d: dict = {}) -> tuple:
def configure(self, d: dict = None) -> tuple:
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
@@ -282,7 +247,49 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
return (old, new)
def bluestage(self) -> None:
def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if d:
# logger.info(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# Stage the PSO distance module
self.arm()
def on_unstage(self):
"""Turn off the PSO module"""
self.disarm()
def on_trigger(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
# if self.dstDistanceVal.get() == 0:
logger.warning(f"[{self.name}] Triggerin...")
return self.fire(settle_time)
def arm(self) -> None:
"""Bluesky style stage"""
# Stage the PSO distance module and zero counter
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
@@ -293,3 +300,19 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
if self.dstDistanceVal.get() > 0:
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
def disarm(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.output.value:
# self.toggle()
# Turn off window mode
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Turn off distance mode
self.dstEventsEna.set("Off").wait()
self.dstCounterEna.set("Off").wait()
# Disable output
self.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)

View File

@@ -34,7 +34,8 @@ program
//////////////////////////////////////////////////////////////////////////
// Internal parameters - dont use
var $axis as axis = ROTY
var $ii as integer
var $ii as integer
var $axisFaults as integer = 0
var $iDdcSafeSpace as integer = 4096
// Set acceleration
@@ -126,7 +127,12 @@ program
if $eScanType == ScanType.POS || $eScanType == ScanType.NEG
PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, $iPsoArrayPosSize, 0)
MoveAbsolute($axis, $fPosEnd, $fVelScan)
WaitForInPosition($axis)
WaitForMotionDone($axis)
$axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault)
if $axisFaults
TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY")
end
elseif $eScanType == ScanType.POSNEG || $eScanType == ScanType.NEGPOS
for $ii = 0 to ($iNumRepeat-1)
// Feedback on progress
@@ -134,11 +140,15 @@ program
if ($ii % 2) == 0
PsoDistanceConfigureArrayDistances($axis, $iPsoArrayPosAddr, $iPsoArrayPosSize, 0)
MoveAbsolute($axis, $fPosEnd, $fVelScan)
WaitForInPosition($axis)
elseif ($ii % 2) == 1
PsoDistanceConfigureArrayDistances($axis, $iPsoArrayNegAddr, $iPsoArrayNegSize, 0)
MoveAbsolute($axis, $fPosStart, $fVelScan)
WaitForInPosition($axis)
end
WaitForMotionDone($axis)
$axisFaults = StatusGetAxisItem($axis, AxisDataSignal.AxisFault)
if $axisFaults
TaskSetError(TaskGetIndex(), "AxisFault on axis ROTY")
end
Dwell(0.2)
end

View File

@@ -78,6 +78,7 @@ program
///////////////////////////////////////////////////////////
$iglobal[2] = $iNumSteps
for $ii = 0 to ($iNumSteps-1)
$rglobal[4] = $ii
MoveAbsolute($axis, $fStartPosition + $ii * $fStepSize, $fVelScan)
WaitForMotionDone($axis)

View File

@@ -5,67 +5,17 @@ interface.
@author: mohacsi_i
"""
from time import sleep
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from bec_lib import bec_logger
logger = bec_logger.logger
class AerotechTasksMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# The actual staging
self.parent.bluestage()
def on_unstage(self):
"""Stop the currently selected task"""
self.parent.switch.set("Stop").wait()
def on_stop(self):
"""Stop the currently selected task"""
self.parent.switch.set("Stop").wait()
class aa1Tasks(PSIDeviceBase):
class aa1Tasks(PSIDeviceBase, Device):
"""Task management API
The place to manage tasks and AeroScript user files on the controller.
@@ -96,7 +46,7 @@ class aa1Tasks(PSIDeviceBase):
"""
custom_prepare_cls = AerotechTasksMixin
USER_ACCESS = ["arm", "disarm", "launch", "kickoff"]
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal)
errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal)
@@ -109,27 +59,55 @@ class aa1Tasks(PSIDeviceBase):
_executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True)
fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True)
# _fileRead = Component(EpicsPassiveRO, "FILEREAD", string=True, kind=Kind.omitted)
_fileWrite = Component(
EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True
)
# pylint: disable=duplicate-code, too-many-arguments
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def configure(self, d: dict) -> tuple:
"""Configuration interface for flying"""
"""Configure the scripting interface
Handles AeroScript loading and the launching of existing script files
on the Automation1 iSMC. The interface is meant to be used for flying.
"""
# Common operations
old = self.read_configuration()
self.switch.set("Reset").wait()
# Check what we got
if "script_task" in d:
if d['script_task'] < 3 or d['script_task'] > 21:
if d["script_task"] < 3 or d["script_task"] > 21:
raise RuntimeError(f"Invalid task index: {d['script_task']}")
self.taskIndex.set(d['script_task']).wait()
self.taskIndex.set(d["script_task"]).wait()
if "script_file" in d:
self.fileName.set(d["script_file"]).wait()
if "script_text" in d:
# Compile text for syntax checking
# NOTE: This will load to 'script_file'
self._fileWrite.set(d['script_text'], settle_time=0.2).wait()
self._fileWrite.set(d["script_text"], settle_time=0.2).wait()
self.switch.set("Load").wait()
# Check the result of load
if self._failure.value:
@@ -138,25 +116,82 @@ class aa1Tasks(PSIDeviceBase):
new = self.read_configuration()
return (old, new)
def bluestage(self) -> None:
"""Bluesky style stage"""
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if self.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration
if d:
self.configure(d=d)
# The actual staging
self.arm()
def on_unstage(self):
"""Stop the currently selected task"""
self.disarm()
def on_stop(self):
"""Stop the currently selected task"""
self.unstage()
def on_kickoff(self):
"""Start execution of the selected task"""
self.launch()
def arm(self) -> None:
"""Bluesky style stage, prepare, but does not execute"""
if self.taskIndex.get() in (0, 1, 2):
logger.error(f"[{self.name}] Launching AeroScript on system task. Daring today are we?")
# Launch and check success
status = self.switch.set("Run", settle_time=0.2)
logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?")
# Load and check success
status = self.switch.set("Load", settle_time=0.2)
status.wait()
if self._failure.value:
raise RuntimeError("Failed to kick off task, please check the Aerotech IOC")
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status
##########################################################################
# Bluesky flyer interface
def complete(self) -> DeviceStatus:
def disarm(self):
"""Bluesky style unstage, stops execution"""
self.switch.set("Stop").wait()
def launch(self):
"""Bluesky style kickoff"""
# Launch and check success
status = self.switch.set("Start", settle_time=0.2)
status.wait()
if self._failure.value:
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status
def complete(self) -> SubscriptionStatus:
"""Wait for a RUNNING task"""
timestamp_ = 0
task_idx = int(self.taskIndex.get())
def not_running(*args, value, timestamp, **kwargs):
def not_running(*, value, timestamp, **_):
nonlocal timestamp_
result = value[task_idx] not in ["Running", 4]
timestamp_ = timestamp

View File

@@ -1,4 +1,9 @@
from .AerotechTasks import aa1Tasks
from .AerotechPso import aa1AxisPsoDistance
from .AerotechDriveDataCollection import aa1AxisDriveDataCollection
from .AerotechAutomation1 import aa1Controller, aa1GlobalVariables, aa1GlobalVariableBindings, aa1AxisIo
from .AerotechAutomation1 import (
aa1Controller,
aa1GlobalVariables,
aa1GlobalVariableBindings,
aa1AxisIo,
)

View File

@@ -0,0 +1,410 @@
import time
import numpy as np
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import Acquire, AsyncFlyScanBase
from bec_lib import bec_logger
logger = bec_logger.logger
class Shutter:
"""Shutter status"""
CLOSED = 0
OPEN = 1
class AcquireDarkV2(Acquire):
scan_name = "acquire_dark_v2"
required_kwargs = ["exp_burst"]
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, exp_burst: int, file_prefix="", **kwargs):
"""
Acquire dark images. This scan is used to acquire dark images. Dark images are images taken with the shutter
closed and no beam on the sample. Dark images are used to correct the data images for dark current.
NOTE: this scan has a special operation mode that does not call
Args:
exp_burst : int
Number of dark images to acquire (no default)
file_prefix : str
File prefix
Examples:
>>> scans.acquire_dark(5)
"""
super().__init__(exp_burst=exp_burst, file_prefix="", **kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.exp_burst = exp_burst
self.file_prefix = file_prefix
def pre_scan(self):
"""Close the shutter before scan"""
yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED])
return super().pre_scan()
def direct(self):
"""Direct scan procedure call"""
# Collect relevant devices
self.cams = [
cam.name
for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
self.prev = [
pre.name
for pre in self.device_manager.devices.get_devices_with_tags("preview")
if pre.enabled
]
self.daqs = [
daq.name
for daq in self.device_manager.devices.get_devices_with_tags("daq")
if daq.enabled
]
# Do not call stage, as there's no ScanInfo emitted for direct call
for daq in self.daqs:
cam = yield from self.stubs.send_rpc_and_wait(daq, "datasource.get")
prefix = f"{self.file_prefix}_{cam}_dark"
yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix)
yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst)
yield from self.stubs.send_rpc_and_wait(daq, "arm")
for prev in self.prev:
yield from self.stubs.send_rpc_and_wait(prev, "arm")
for cam in self.cams:
yield from self.stubs.send_rpc_and_wait(
cam, "configure", {"exposure_num_burst": self.exp_burst}
)
yield from self.stubs.send_rpc_and_wait(cam, "arm")
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()
yield from self.cleanup()
class AcquireWhiteV2(Acquire):
scan_name = "acquire_white_v2"
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(
self,
motor: DeviceBase,
exp_burst: int,
sample_position_out: float,
sample_angle_out: float,
file_prefix: str = "",
**kwargs,
):
"""
Acquire flat field images. This scan is used to acquire flat field images. The flat field
image is an image taken with the shutter open but the sample out of the beam. Flat field
images are used to correct the data images for non-uniformity in the detector.
Args:
motor : DeviceBase
Motor to be moved to move the sample out of beam
exp_burst : int
Number of flat field images to acquire (no default)
sample_position_out : float
Position to move the sample stage out of beam and take flat field images
sample_angle_out : float
Angular position where to take the flat field images
Examples:
>>> scans.acquire_white(dev.samx, 5, 20)
"""
super().__init__(exp_burst=exp_burst, **kwargs)
self.exp_burst = exp_burst
self.file_prefix = file_prefix
self.burst_at_each_point = 1
self.scan_motors = [motor, "eyez"]
# self.scan_motors = [motor, "es1_roty"]
self.out_position = [sample_position_out, sample_angle_out]
def pre_scan(self):
"""Open the shutter before scan"""
# Move sample out
yield from self._move_scan_motors_and_wait(self.out_position)
# Open the main shutter (TODO change to the correct shutter device)
yield from self.stubs.set(device=["eyex"], value=[Shutter.OPEN])
return super().pre_scan()
def cleanup(self):
"""Close the shutter after scan"""
# Close fast shutter
yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED])
return super().cleanup()
def direct(self):
"""Direct scan procedure call"""
# Collect relevant devices
self.cams = [
cam.name
for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
self.prev = [
pre.name
for pre in self.device_manager.devices.get_devices_with_tags("preview")
if pre.enabled
]
self.daqs = [
daq.name
for daq in self.device_manager.devices.get_devices_with_tags("daq")
if daq.enabled
]
# Do not call stage, as there's no ScanInfo emitted for direct call
for daq in self.daqs:
cam = yield from self.stubs.send_rpc_and_wait(daq, "datasource.get")
prefix = f"{self.file_prefix}_{cam}_white"
yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix)
yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst)
yield from self.stubs.send_rpc_and_wait(daq, "arm")
for prev in self.prev:
yield from self.stubs.send_rpc_and_wait(prev, "arm")
for cam in self.cams:
yield from self.stubs.send_rpc_and_wait(
cam, "configure", {"exposure_num_burst": self.exp_burst}
)
yield from self.stubs.send_rpc_and_wait(cam, "arm")
yield from self.pre_scan()
yield from self.scan_core()
yield from self.finalize()
yield from self.unstage()
yield from self.cleanup()
# class AcquireProjections(AsyncFlyScanBase):
# scan_name = "acquire_projections"
# gui_config = {
# "Motor": ["motor"],
# "Acquisition parameters": ["sample_position_in", "start_angle", "angular_range" ],
# "Camera": ["exp_time", "exp_burst"]
# }
# def __init__(self,
# motor: DeviceBase,
# exp_burst: int,
# sample_position_in: float,
# start_angle: float,
# angular_range: float,
# exp_time:float,
# **kwargs):
# """
# Acquire projection images.
# Args:
# motor : DeviceBase
# Motor to move continuously from start to stop position
# exp_burst : int
# Number of flat field images to acquire (no default)
# sample_position_in : float
# Position to move the sample stage to position the sample in the beam
# start_angle : float
# Angular start position for the scan
# angular_range : float
# Angular range
# exp_time : float, optional
# Exposure time [ms]. If not specified, the currently configured value on the camera will be used
# exp_period : float, optional
# Exposure period [ms]. If not specified, the currently configured value on the camera will be used
# image_width : int, optional
# ROI size in the x-direction [pixels]. If not specified, the currently configured value on the camera will be used
# image_height : int, optional
# ROI size in the y-direction [pixels]. If not specified, the currently configured value on the camera will be used
# acq_mode : str, optional
# Predefined acquisition mode (default= 'default')
# file_path : str, optional
# File path for standard daq
# ddc_trigger : int, optional
# Drive Data Capture Trigger
# ddc_source0 : int, optional
# Drive Data capture Input0
# Returns:
# ScanReport
# Examples:
# >>> scans.acquire_projections()
# """
# self.motor = motor
# super().__init__(exp_time=exp_time,**kwargs)
# self.burst_at_each_point = 1
# self.sample_position_in = sample_position_in
# self.start_angle = start_angle
# self.angular_range = angular_range
# self.dark_shutter_pos_out = 1 ### change with a variable
# self.dark_shutter_pos_in = 0 ### change with a variable
# def update_scan_motors(self):
# return [self.motor]
# def prepare_positions(self):
# self.positions = np.array([[self.start_angle], [self.start_angle+self.angular_range]])
# self.num_pos = None
# yield from self._set_position_offset()
# def scan_core(self):
# # move to in position and go to start angular position
# yield from self.stubs.set(device=["eyez", self.motor], value=[self.sample_position_in, self.positions[0][0]])
# # open the shutter
# yield from self.stubs.set(device="eyex", value=self.dark_shutter_pos_out)
# # TODO add opening of fast shutter
# # start the flyer
# flyer_request = yield from self.stubs.set(
# device=self.motor, value=self.positions[1][0], wait=False
# )
# self.connector.send_client_info(
# "Starting the scan", show_asap=True, rid=self.metadata.get("RID")
# )
# yield from self.stubs.trigger()
# while not flyer_request.done:
# yield from self.stubs.read(
# group="monitored", point_id=self.point_id
# )
# time.sleep(1)
# # increase the point id
# self.point_id += 1
# self.num_pos = self.point_id
class AcquireRefsV2(Acquire):
scan_name = "acquire_refs_v2"
gui_config = {}
def __init__(
self,
motor: DeviceBase,
num_darks: int = 0,
num_flats: int = 0,
sample_angle_out: float = 0,
sample_position_in: float = 0,
sample_position_out: float = 1,
file_prefix_dark: str = "tmp_dark",
file_prefix_white: str = "tmp_white",
**kwargs,
):
"""
Acquire reference images (darks + whites) and return to beam position.
Reference images are acquired automatically in an optimized sequence and
the sample is returned to the sample_in_position afterwards.
Args:
motor : DeviceBase
Motor to be moved to move the sample out of beam
num_darks : int , optional
Number of dark field images to acquire
num_flats : int , optional
Number of white field images to acquire
sample_angle_out : float , optional
Angular position where to take the flat field images
sample_position_in : float , optional
Sample stage X position for sample in beam [um]
sample_position_out : float ,optional
Sample stage X position for sample out of the beam [um]
exp_time : float, optional
Exposure time [ms]. If not specified, the currently configured value
on the camera will be used
exp_period : float, optional
Exposure period [ms]. If not specified, the currently configured value
on the camera will be used
image_width : int, optional
ROI size in the x-direction [pixels]. If not specified, the currently
configured value on the camera will be used
image_height : int, optional
ROI size in the y-direction [pixels]. If not specified, the currently
configured value on the camera will be used
acq_mode : str, optional
Predefined acquisition mode (default= 'default')
file_path : str, optional
File path for standard daq
Returns:
ScanReport
Examples:
>>> scans.acquire_refs(sample_angle_out=90, sample_position_in=10, num_darks=5, num_flats=5, exp_time=0.1)
"""
self.motor = motor
super().__init__(**kwargs)
self.sample_position_in = sample_position_in
self.sample_position_out = sample_position_out
self.sample_angle_out = sample_angle_out
self.num_darks = num_darks
self.num_flats = num_flats
self.file_prefix_dark = file_prefix_dark
self.file_prefix_white = file_prefix_white
self.scan_parameters["std_daq_params"] = {"reconnect": False}
def stage(self):
"""Wrapped scan doesn't need staging"""
yield None
def scan_core(self):
if self.num_darks:
msg = f"Acquiring {self.num_darks} dark images"
logger.warning(msg)
self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID"))
darks = AcquireDarkV2(
exp_burst=self.num_darks,
# file_prefix=self.file_prefix_dark,
device_manager=self.device_manager,
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
yield from darks.direct()
self.point_id = darks.point_id
if self.num_flats:
msg = f"Acquiring {self.num_flats} flat field images"
logger.warning(msg)
self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID"))
logger.warning("Calling AcquireWhite")
flats = AcquireWhiteV2(
motor=self.motor,
exp_burst=self.num_flats,
sample_position_out=self.sample_position_out,
# sample_angle_out=self.sample_angle_out,
device_manager=self.device_manager,
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
flats.point_id = self.point_id
yield from flats.direct()
self.point_id = flats.point_id
## TODO move sample in beam and do not wait
## TODO move rotation to angle and do not wait
logger.warning("[AcquireRefsV2] Done with scan_core")

View File

@@ -18,95 +18,11 @@ import os
import time
from bec_lib import bec_logger
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanBase, ScanArgType
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanArgType
logger = bec_logger.logger
class TomcatStepScan(ScanBase):
"""Simple software step scan for Tomcat
Example class for simple BEC-based step scan using the low-level API. It's just a standard
'line_scan' with the only difference that overrides burst behavior to use camera burst instead
of individual software triggers.
NOTE: As decided by Tomcat, the scans should not manage the scope of devices
- All enabled devices are expected to be configured for acquisition by the end of stage
- Some devices can configure themselves from mandatory scan parameters (steps, burst)
- Other devices can be optionally configured by keyword arguments
- Devices will try to stage using whatever was set on them before
Example:
--------
>>> scans.tomcatstepscan(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5)
Common keyword arguments:
-------------------------
image_width : int
image_height : int
ddc_trigger : str
"""
scan_name = "tomcatstepscan"
scan_type = "step"
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "exp_burst"],
}
def update_scan_motors(self):
self.scan_motors = ["es1_roty"]
def _get_scan_motors(self):
self.scan_motors = ["es1_roty"]
def __init__(
self,
scan_start: float,
scan_end: float,
steps: int,
exp_time: float=0.005,
settling_time: float=0.2,
exp_burst: int=1,
**kwargs,
):
# Converting generic kwargs to tomcat device configuration parameters
super().__init__(
exp_time=exp_time,
settling_time=settling_time,
burst_at_each_point=1,
optim_trajectory=None,
**kwargs,
)
# For position calculation
self.motor = "es1_roty"
self.scan_start = scan_start
self.scan_end = scan_end
self.scan_steps = steps
self.scan_stepsize = (scan_end - scan_start) / steps
def _calculate_positions(self) -> None:
"""Pre-calculate scan positions"""
for ii in range(self.scan_steps + 1):
self.positions.append(self.scan_start + ii * self.scan_stepsize)
def _at_each_point(self, ind=None, pos=None):
""" Overriden at_each_point, using detector burst instaead of manual triggering"""
yield from self._move_scan_motors_and_wait(pos)
time.sleep(self.settling_time)
trigger_time = 0.001*self.exp_time * self.burst_at_each_point
yield from self.stubs.trigger(min_wait=trigger_time)
# yield from self.stubs.trigger(group='trigger', point_id=self.point_id)
# time.sleep(trigger_time)
yield from self.stubs.read(group="monitored", point_id=self.point_id)
# yield from self.stubs.read(group="monitored", point_id=self.point_id, wait_group=None)
self.point_id += 1
class TomcatSnapNStep(AsyncFlyScanBase):
"""Simple software step scan forTomcat
@@ -115,21 +31,21 @@ class TomcatSnapNStep(AsyncFlyScanBase):
Example
-------
>>> scans.tomcatsnapnstepscan(scan_start=-25, scan_end=155, steps=180, exp_time=0.005, exp_burst=5)
>>> scans.tomcatsnapnstepscan(scan_start=-25, scan_end=155, steps=180, acq_time=5, exp_burst=5)
"""
scan_name = "tomcatsnapnstepscan"
scan_type = "scripted"
# scan_type = "scripted"
# arg_input = {"camera" : ScanArgType.DEVICE,
# "exp_time" : ScanArgType.FLOAT}
# arg_bundle_size= {"bundle": len(arg_input), "min": 1, "max": None}
required_kwargs = ["scan_start", "scan_end", "steps"]
gui_config = {
"Movement parameters": ["steps"],
"Acquisition parameters": ["exp_time", "exp_burst"],
"Acquisition parameters": ["acq_time", "exp_burst"],
}
def _get_scan_motors(self):
def _update_scan_motors(self):
self.scan_motors = ["es1_roty"]
def __init__(
@@ -137,8 +53,8 @@ class TomcatSnapNStep(AsyncFlyScanBase):
scan_start: float,
scan_end: float,
steps: int,
exp_time:float=0.005,
settling_time:float=0.2,
acq_time:float=5.0,
exp_burst:int=1,
sync:str="event",
**kwargs,
@@ -147,25 +63,30 @@ class TomcatSnapNStep(AsyncFlyScanBase):
self.scan_start = scan_start
self.scan_end = scan_end
self.scan_steps = steps
self.scan_stepsize = (scan_end - scan_start) / steps
self.scan_ntotal = exp_burst * (steps + 1)
self.exp_time = exp_time
self.exp_time = acq_time
self.exp_burst = exp_burst
self.settling_time = settling_time
self.scan_sync = sync
# General device configuration
# Gigafrost trigger mode
kwargs["parameter"]["kwargs"]["acq_mode"] = "ext_enable"
# Used for Aeroscript file substitutions for the task interface
filename = "AerotechSnapAndStepTemplate.ascript"
filesubs = self.get_filesubs()
filetext = self.render_file(filename, filesubs)
kwargs["parameter"]["kwargs"]["script_text"] = filetext
kwargs["parameter"]["kwargs"]["script_file"] = "bec.ascript"
self.scan_parameters["aerotech_config"] = {
"script_text":filetext,
"script_file":"bec.ascript",
"script_task": 4,
}
# self.scan_parameters["script_file"] = "bec.ascript"
# kwargs["parameter"]["kwargs"]["script_text"] = filetext
# kwargs["parameter"]["kwargs"]["script_file"] = "bec.ascript"
super().__init__(
exp_time=exp_time,
acq_time=acq_time,
exp_burst=exp_burst,
settling_time=settling_time,
burst_at_each_point=1,
optim_trajectory=None,
@@ -182,12 +103,12 @@ class TomcatSnapNStep(AsyncFlyScanBase):
}
filesubs = {
"startpos": self.scan_start,
"stepsize": self.scan_stepsize,
"stepsize": (self.scan_end - self.scan_start) / self.scan_steps,
"numsteps": self.scan_steps,
"exptime": 2 * self.exp_time * self.exp_burst,
"exptime": 0.002 * self.exp_time * self.exp_burst,
"settling": self.settling_time,
"psotrigger": p_modes[self.scan_sync],
"npoints": self.scan_ntotal,
"npoints": self.exp_burst * (self.scan_steps + 1),
}
return filesubs
@@ -196,7 +117,7 @@ class TomcatSnapNStep(AsyncFlyScanBase):
# Load the test file
filename = os.path.join(os.path.dirname(__file__), "../devices/aerotech/" + filename)
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
with open(filename, "r", encoding="utf-8") as f:
templatetext = f.read()
# Substitute jinja template
@@ -208,12 +129,15 @@ class TomcatSnapNStep(AsyncFlyScanBase):
"""The actual scan routine"""
print("TOMCAT Running scripted scan (via Jinjad AeroScript)")
t_start = time.time()
# Kickoff
yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff")
# Complete
# FIXME: this will swallow errors
# yield from self.stubs.complete(device="es1_tasks")
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "complete")
# st.wait()
yield from self.stubs.send_rpc_and_wait("es1_tasks", "complete")
# Check final state
task_states = yield from self.stubs.send_rpc_and_wait("es1_tasks", "taskStates.get")
if task_states[4] == 8:
raise RuntimeError(f"Task {4} finished in ERROR state")
@@ -243,11 +167,11 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
Example
-------
>>> scans.tomcatsimplesequencescan(33, 180, 180, exp_time=0.005, exp_burst=1800, repeats=10)
>>> scans.tomcatsimplesequencescan(33, 180, 180, acq_time=5, exp_burst=1800, repeats=10)
"""
scan_name = "tomcatsimplesequencescan"
scan_type = "scripted"
# scan_type = "scripted"
scan_report_hint = "table"
required_kwargs = ["scan_start", "gate_high", "gate_low"]
gui_config = {
@@ -255,7 +179,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
"Acquisition parameters": [
"gate_high",
"gate_low",
"exp_time",
"acq_time",
"exp_burst",
"sync",
],
@@ -271,7 +195,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
gate_low: float,
repeats: int = 1,
repmode: str = "PosNeg",
exp_time: float = 0.005,
acq_time: float = 5,
exp_burst: float = 180,
sync: str = "pso",
**kwargs,
@@ -282,13 +206,13 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
self.gate_low = gate_low
self.scan_repnum = repeats
self.scan_repmode = repmode.upper()
self.exp_time = exp_time
self.exp_time = acq_time
self.exp_burst = exp_burst
self.scan_sync = sync
# Synthetic values
self.scan_ntotal = exp_burst * repeats
self.scan_velocity = gate_high / (exp_time * exp_burst)
self.scan_velocity = gate_high / (0.001*acq_time * exp_burst)
self.scan_acceleration = 500
self.scan_safedistance = 10
self.scan_accdistance = (
@@ -314,7 +238,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
kwargs["parameter"]["kwargs"]["script_file"] = "bec.ascript"
super().__init__(
exp_time=exp_time,
acq_time=acq_time,
settling_time=0.5,
relative=False,
burst_at_each_point=1,
@@ -327,7 +251,7 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
# Load the test file
filename = os.path.join(os.path.dirname(__file__), "../devices/aerotech/" + filename)
logger.info(f"Attempting to load file {filename}")
with open(filename) as f:
with open(filename, 'r', encoding="utf-8") as f:
templatetext = f.read()
# Substitute jinja template
@@ -339,12 +263,14 @@ class TomcatSimpleSequence(AsyncFlyScanBase):
"""The actual scan routine"""
print("TOMCAT Running scripted scan (via Jinjad AeroScript)")
t_start = time.time()
# Kickoff
yield from self.stubs.send_rpc_and_wait("es1_tasks", "kickoff")
# Complete
# FIXME: this will swallow errors
# yield from self.stubs.complete(device="es1_tasks")
st = yield from self.stubs.send_rpc_and_wait("es1_tasks", "complete")
# st.wait()
yield from self.stubs.send_rpc_and_wait("es1_tasks", "complete")
task_states = yield from self.stubs.send_rpc_and_wait("es1_tasks", "taskStates.get")
if task_states[4] == 8:
raise RuntimeError(f"Task {4} finished in ERROR state")

View File

@@ -340,7 +340,7 @@ class AcquireRefs(Acquire):
# to set signals on a device
darks = AcquireDark(
exp_burst=self.num_darks,
file_prefix=self.file_prefix_dark,
# file_prefix=self.file_prefix_dark,
device_manager=self.device_manager,
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
@@ -389,6 +389,7 @@ class AcquireRefs(Acquire):
self.point_id = flats.point_id
## TODO move sample in beam and do not wait
## TODO move rotation to angle and do not wait
logger.warning("Done with scan_core")
class TutorialFlyScanContLine(AsyncFlyScanBase):

View File

@@ -16,6 +16,7 @@ class Measurement:
self.nimages_white = 100
self.start_angle = 0
self.angular_range = 180
self.sample_angle_out = 0
self.sample_position_in = 0
self.sample_position_out = 1
@@ -43,6 +44,10 @@ class Measurement:
self.exposure_period = self.det.cfgFramerate.get()
self.roix = self.det.cfgRoiX.get()
self.roiy = self.det.cfgRoiY.get()
self.get_position_rb()
#self.position_rb = False
#self.disable_position_rb_device()
def build_filename(self, acquisition_type='data'):
@@ -74,7 +79,8 @@ class Measurement:
exposure_period=None, roix=None, roiy=None,nimages=None,
nimages_dark=None, nimages_white=None,
start_angle=None, sample_angle_out=None,
sample_position_in=None, sample_position_out=None):
sample_position_in=None, sample_position_out=None,
position_rb=None):
"""
Reconfigure the measurement with any number of new parameter
@@ -110,6 +116,8 @@ class Measurement:
sample_position_out : float, optional
Sample stage X position for sample out of the beam [um]
(default = None)
position_rb : bool, optional
Enable position readback (default = None)
"""
if sample_name != None:
@@ -138,6 +146,13 @@ class Measurement:
self.sample_position_in = sample_position_in
if sample_position_out != None:
self.sample_position_out = sample_position_out
if position_rb != None:
if position_rb == True:
self.enable_position_rb_device()
elif position_rb == False:
self.disable_position_rb_device()
else:
print("WARNING! Position readback should be either True, False or None")
self.build_filename()
@@ -168,6 +183,12 @@ class Measurement:
self.device_name = self.enabled_detectors[0].name
self.build_filename()
def disable_position_rb_device(self):
"Disable position readback device"
dev.es1_ddaq.enabled= False
self.position_rb = False
def enable_detector(self, detector_name):
"""
@@ -197,7 +218,12 @@ class Measurement:
self.device_name = self.enabled_detectors[0].name
self.build_filename()
def enable_position_rb_device(self):
"Enable position readback device"
dev.es1_ddaq.enabled= True
self.position_rb = True
def get_available_detectors(self):
"""
@@ -215,6 +241,14 @@ class Measurement:
"""
self.enabled_detectors = [obj for obj in dev.get_devices_with_tags('camera') if obj.enabled]
def get_position_rb(self):
"""
Get position rb
"""
if dev.es1_ddaq.enabled == True:
self.position_rb = True
else:
self.position_rb = False
def show_available_detectors(self):
"""
@@ -242,39 +276,41 @@ class Measurement:
TODO: make it work for multiple devices
"""
print("Sample name: " + self.sample_name)
print("Data path: " + self.data_path)
print("Number of images: " + str(self.nimages))
print("Number of darks: " + str(self.nimages_dark))
print("Number of flats: " + str(self.nimages_white))
print("Sample name (sample_name): " + self.sample_name)
print("Data path (data_path): " + self.data_path)
print("Number of images (nimages): " + str(self.nimages))
print("Number of darks (nimages_dark): " + str(self.nimages_dark))
print("Number of flats (nimages_flat): " + str(self.nimages_white))
if self.exposure_time == None:
print("Exposure time: " + str(self.det.cfgExposure.get()))
print("Exposure time (exposure_time): " + str(self.det.cfgExposure.get()))
self.exposure_time = self.det.cfgExposure.get()
else:
print("Exposure time: " + str(self.exposure_time))
print("Exposure time (exposure_time): " + str(self.exposure_time))
if self.exposure_period == None:
print("Exposure period: " + str(self.det.cfgFramerate.get()))
print("Exposure period (exposure_period): " + str(self.det.cfgFramerate.get()))
self.exposure_period = self.det.cfgFramerate.get()
else:
print("Exposure period: " + str(self.exposure_period))
print("Exposure period (exposure_period): " + str(self.exposure_period))
if self.roix == None:
print("Roix: " + str(self.det.cfgRoiX.get()))
print("Roix (roix): " + str(self.det.cfgRoiX.get()))
self.roix = self.det.cfgRoiX.get()
else:
print("Roix: " + str(self.roix))
print("Roix (roix): " + str(self.roix))
if self.roiy == None:
print("Roiy: " + str(self.det.cfgRoiY.get()))
print("Roiy (roiy): " + str(self.det.cfgRoiY.get()))
self.roiy = self.det.cfgRoiY.get()
else:
print("Roiy: " + str(self.roiy))
print("Start angle: " + str(self.start_angle))
print("Sample angle out: " + str(self.sample_angle_out))
print("Sample position in: " + str(self.sample_position_in))
print("Sample position out: " + str(self.sample_position_out))
print("Roiy (roiy): " + str(self.roiy))
print("Start angle (start_angle): " + str(self.start_angle))
print("Angular range (angular_range): " + str(self.angular_range))
print("Sample angle out (sample_angle_out): " + str(self.sample_angle_out))
print("Sample position in (sample_position_in): " + str(self.sample_position_in))
print("Sample position out (sample_position_out): " + str(self.sample_position_out))
print("Position readback (position_rb): " + str(self.position_rb))
def acquire_darks(self,nimages_dark=None, exposure_time=None, exposure_period=None,
roix=None, roiy=None, acq_mode=None):
roix=None, roiy=None, acq_mode=None, **kwargs):
"""
Acquire a set of dark images with shutters closed.
@@ -315,17 +351,19 @@ class Measurement:
print("Handing over to 'scans.acquire_dark")
scans.acquire_dark(exp_burst=self.nimages_dark, exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode=acq_mode, file_path=self.file_path, nr_writers=2, base_path=self.base_path,
file_prefix=self.file_prefix)
file_prefix=self.file_prefix, ddc_trigger=4, ddc_source0=1, **kwargs)
def acquire_whites(self,nimages_white=None, sample_angle_out=None, sample_position_out=None,
def acquire_whites(self,motor="eyez", nimages_white=None, sample_angle_out=None, sample_position_out=None,
exposure_time=None, exposure_period=None,
roix=None, roiy=None, acq_mode=None):
roix=None, roiy=None, acq_mode=None, **kwargs):
"""
Acquire a set of whites images with shutters open and sample out of beam.
Parameters
----------
motor : DeviceBase
Motor to be moved to move the sample out of beam
nimages_whites : int, optional
Number of white images to acquire (no default)
sample_angle_out : float, optional
@@ -348,6 +386,7 @@ class Measurement:
m.acquire_whites(nimages_whites=100, exposure_time=5)
"""
self.motor_sample = motor
if nimages_white != None:
self.nimages_white = nimages_white
if sample_angle_out != None:
@@ -367,16 +406,76 @@ class Measurement:
### TODO: camera reset
print("Handing over to 'scans.acquire_whites")
scans.acquire_white(exp_burst=self.nimages_white, sample_angle_out=self.sample_angle_out, sample_position_out= self.sample_position_out,
scans.acquire_white(motor=self.motor_sample, exp_burst=self.nimages_white, sample_angle_out=self.sample_angle_out, sample_position_out= self.sample_position_out,
exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode=acq_mode, file_path=self.file_path, nr_writers=2, base_path=self.base_path,
file_prefix=self.file_prefix)
file_prefix=self.file_prefix, ddc_trigger=4, ddc_source0=1, **kwargs)
def acquire_projections(self, nimages=None, sample_position_in=None,
start_angle=None, angular_range=None,
exposure_time=None, exposure_period=None,
roix=None, roiy=None, acq_mode=None, **kwargs):
"""
Acquire a set of whites images with shutters open and sample out of beam.
def acquire_refs(self,nimages_dark=None, nimages_white=None, sample_angle_out=None,
Parameters
----------
nimages : int, optional
Number of projection images to acquire (no default)
sample_position_in : float, optional
Sample stage X position for sample in the beam [um]
start_angle : float, optional
Starting angular position [deg]
angular_range : float, optional
Angular range [deg]
exposure_time : float, optional
Exposure time [ms]. If not specified, the currently configured value on the camera will be used
exposure_period : float, optional
Exposure period [ms]
roix : int, optional
ROI size in the x-direction [pixels]
roiy : int, optional
ROI size in the y-direction [pixels]
acq_mode : str, optional
Predefined acquisition mode (default=None)
Example:
--------
m.acquire_projections(nimages_projections=100, exposure_time=5)
"""
if nimages != None:
self.nimages = nimages
if sample_position_in != None:
self.sample_position_in = sample_position_in
if start_angle != None:
self.start_angle = start_angle
if angular_range != None:
self.angular_range = angular_range
if exposure_time != None:
self.exposure_time = exposure_time
if exposure_period != None:
self.exposure_period = exposure_period
if roix != None:
self.roix = roix
if roiy != None:
self.roiy = roiy
self.build_filename(acquisition_type='data')
### TODO: camera reset
print("Handing over to 'scans.acquire_projections")
scans.acquire_projections(motor="es1_roty", exp_burst=self.nimages, sample_position_in= self.sample_position_in,
start_angle = self.start_angle, angular_range = self.angular_range,
exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode=acq_mode, file_path=self.file_path, nr_writers=2,
base_path=self.base_path,file_prefix=self.file_prefix, ddc_trigger=4, ddc_source0=1, **kwargs)
def acquire_refs(self, motor="eyez", nimages_dark=None, nimages_white=None, sample_angle_out=None,
sample_position_in=None, sample_position_out=None,
exposure_time=None, exposure_period=None,
roix=None, roiy=None, acq_mode=None):
roix=None, roiy=None, acq_mode=None, **kwargs):
"""
Acquire reference images (darks + whites) and return to beam position.
@@ -385,6 +484,8 @@ class Measurement:
Parameters
----------
motor : DeviceBase
Motor to be moved to move the sample out of beam
darks : int, optional
Number of dark images to acquire (no default)
nimages_whites : int, optional
@@ -435,10 +536,30 @@ class Measurement:
self.build_filename(acquisition_type='white')
file_prefix_white = self.file_prefix
print(file_prefix_dark)
print(file_prefix_white)
### TODO: camera reset
print("Handing over to 'scans.acquire_refs")
scans.acquire_refs(num_darks=self.nimages_dark, num_flats=self.nimages_white, sample_angle_out=self.sample_angle_out,
scans.acquire_refs(motor=motor, num_darks=self.nimages_dark, num_flats=self.nimages_white, sample_angle_out=self.sample_angle_out,
sample_position_in=self.sample_position_in, sample_position_out=self.sample_position_out,
exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode='default', file_path=self.file_path, nr_writers=2, base_path=self.base_path,
file_prefix_dark=file_prefix_dark, file_prefix_white=file_prefix_white)
file_prefix_dark=file_prefix_dark, file_prefix_white=file_prefix_white,
ddc_trigger=4, ddc_source0=1, **kwargs)
def start_preview(self, exposure_time=None, exposure_period=None,
preview_strategy='', preview_paramters=200, **kwargs):
"""
Start the camera in preview mode, no data will be written.
Parameters
----------
exposure_time : float, optional
"""
if exposure_time is None:
exposure_time = self.exposure_time
if exposure_period is None:
exposure_period = 50 # no need to go faster for a preview