Merge branch 'main' into 'feature/revisiting-aerotech'

# Conflicts:
#   tomcat_bec/device_configs/microxas_test_bed.yaml
#   tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py
#   tomcat_bec/devices/gigafrost/gigafrostcamera.py
#   tomcat_bec/devices/gigafrost/pcoedgecamera.py
#   tomcat_bec/scans/__init__.py
#   tomcat_bec/scans/tutorial_fly_scan.py
This commit was merged in pull request #22.
This commit is contained in:
2025-04-16 14:10:42 +02:00
17 changed files with 2689 additions and 1249 deletions

View File

@@ -12,10 +12,27 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = ["ophyd_devices", "bec_lib", "requests", "websockets", "pyzmq", "jinja2"]
dependencies = [
"ophyd_devices",
"bec_lib",
"requests",
"websockets",
"pyzmq",
"jinja2",
]
[project.optional-dependencies]
dev = ["black", "isort", "coverage", "pylint", "pytest", "pytest-random-order", "ophyd_devices", "bec_server"]
dev = [
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"ophyd_devices",
"bec_server",
"requests-mock",
]
[project.entry-points."bec"]
plugin_bec = "tomcat_bec"

View File

@@ -0,0 +1,353 @@
import json
from unittest import mock
import pytest
import requests
import requests_mock
import typeguard
from ophyd import StatusBase
from websockets import WebSocketException
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqError, StdDaqStatus
@pytest.fixture
def client():
parent_device = mock.MagicMock()
_client = StdDaqClient(
parent=parent_device, ws_url="ws://localhost:5001", rest_url="http://localhost:5000"
)
yield _client
_client.shutdown()
@pytest.fixture
def full_config():
full_config = dict(
detector_name="tomcat-gf",
detector_type="gigafrost",
n_modules=8,
bit_depth=16,
image_pixel_height=2016,
image_pixel_width=2016,
start_udp_port=2000,
writer_user_id=18600,
max_number_of_forwarders_spawned=8,
use_all_forwarders=True,
module_sync_queue_size=4096,
number_of_writers=12,
module_positions={},
ram_buffer_gb=150,
delay_filter_timeout=10,
live_stream_configs={
"tcp://129.129.95.111:20000": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.111:20001": {"type": "periodic", "config": [1, 5]},
"tcp://129.129.95.38:20000": {"type": "periodic", "config": [1, 1]},
},
)
return full_config
def test_stddaq_client(client):
assert client is not None
def test_stddaq_client_get_daq_config(client, full_config):
with requests_mock.Mocker() as m:
response = full_config
m.get("http://localhost:5000/api/config/get?user=ioc", json=response.model_dump())
out = client.get_config()
# Check that the response is simply the json response
assert out == response.model_dump()
assert client._config == response
def test_stddaq_client_set_config_pydantic(client, full_config):
"""Test setting configurations through the StdDAQ client"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with StdDaqConfig object
config = full_config
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config)
# Verify the last request
assert m.last_request.json() == full_config.model_dump()
def test_std_daq_client_set_config_dict(client, full_config):
"""
Test setting configurations through the StdDAQ client with a dictionary input.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_ignores_extra_keys(client, full_config):
"""
Test that the set_config method ignores extra keys in the input dictionary.
"""
with requests_mock.Mocker() as m:
m.post("http://localhost:5000/api/config/set?user=ioc")
# Test with dictionary input
config_dict = full_config.model_dump()
config_dict["extra_key"] = "extra_value"
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
client.set_config(config_dict)
assert m.last_request.json() == full_config.model_dump()
def test_stddaq_client_set_config_error(client, full_config):
"""
Test error handling in the set_config method.
"""
with requests_mock.Mocker() as m:
config = full_config
m.post("http://localhost:5000/api/config/set?user=ioc", status_code=500)
with mock.patch.object(client, "_pre_restart"), mock.patch.object(client, "_post_restart"):
with pytest.raises(requests.exceptions.HTTPError):
client.set_config(config)
def test_stddaq_client_get_config_cached(client, full_config):
"""
Test that the client returns the cached configuration if it is available.
"""
# Set the cached configuration
config = full_config
client._config = config
# Test that the client returns the cached configuration
assert client.get_config(cached=True) == config
def test_stddaq_client_status(client):
client._status = StdDaqStatus.FILE_CREATED
assert client.status == StdDaqStatus.FILE_CREATED
def test_stddaq_client_start(client):
with mock.patch("tomcat_bec.devices.gigafrost.std_daq_client.StatusBase") as StatusBase:
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images=10)
out = client._send_queue.get()
assert out == {
"command": "start",
"path": "test_file_path",
"file_prefix": "test_file_prefix",
"n_image": 10,
}
StatusBase().wait.assert_called_once()
def test_stddaq_client_start_type_error(client):
with pytest.raises(typeguard.TypeCheckError):
client.start(file_path="test_file_path", file_prefix="test_file_prefix", num_images="10")
def test_stddaq_client_stop(client):
"""
Check that the stop method puts the stop command in the send queue.
"""
client.stop()
client._send_queue.get() == {"command": "stop"}
def test_stddaq_client_update_config(client, full_config):
"""
Test that the update_config method updates the configuration with the provided dictionary.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "new_name"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 1
def test_stddaq_client_updates_only_changed_configs(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {"detector_name": "tomcat-gf"}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_updates_only_changed_configs_empty(client, full_config):
"""
Test that the update_config method only updates the configuration if the config has changed.
"""
config = full_config
with requests_mock.Mocker() as m:
m.get("http://localhost:5000/api/config/get?user=ioc", json=config.model_dump())
# Update the configuration
update_dict = {}
with mock.patch.object(client, "set_config") as set_config:
client.update_config(update_dict)
assert set_config.call_count == 0
def test_stddaq_client_pre_restart(client):
"""
Test that the pre_restart method sets the status to RESTARTING.
"""
# let's assume the websocket loop is already idle
client._ws_idle_event.set()
client.ws_client = mock.MagicMock()
client._pre_restart()
client.ws_client.close.assert_called_once()
def test_stddaq_client_post_restart(client):
"""
Test that the post_restart method sets the status to IDLE.
"""
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._post_restart()
wait_for_connection.assert_called_once()
assert client._daq_is_running.is_set()
def test_stddaq_client_reset(client):
"""
Test that the reset method calls get_config and set_config.
"""
with (
mock.patch.object(client, "get_config") as get_config,
mock.patch.object(client, "set_config") as set_config,
):
client.reset()
get_config.assert_called_once()
set_config.assert_called_once()
def test_stddaq_client_run_status_callbacks(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[StdDaqStatus.FILE_CREATED], error=[])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
status.wait()
assert len(status._callbacks) == 0
def test_stddaq_client_run_status_callbacks_error(client):
"""
Test that the run_status_callback method runs the status callback.
"""
status = StatusBase()
client.add_status_callback(status, success=[], error=[StdDaqStatus.FILE_CREATED])
client._status = StdDaqStatus.FILE_CREATED
client._run_status_callbacks()
with pytest.raises(StdDaqError):
status.wait()
assert len(status._callbacks) == 0
@pytest.mark.parametrize(
"msg, updated",
[({"status": "IDLE"}, False), (json.dumps({"status": "waiting_for_first_image"}), True)],
)
def test_stddaq_client_on_received_ws_message(client, msg, updated):
"""
Test that the on_received_ws_message method runs the status callback.
"""
client._status = None
with mock.patch.object(client, "_run_status_callbacks") as run_status_callbacks:
client._on_received_ws_message(msg)
if updated:
run_status_callbacks.assert_called_once()
assert client._status == StdDaqStatus.WAITING_FOR_FIRST_IMAGE
else:
run_status_callbacks.assert_not_called()
assert client._status is None
def test_stddaq_client_ws_send_and_receive(client):
client.ws_client = mock.MagicMock()
client._send_queue.put({"command": "test"})
client._ws_send_and_receive()
# queue is not empty, so we should send the message
client.ws_client.send.assert_called_once()
client.ws_client.recv.assert_called_once()
client.ws_client.reset_mock()
client._ws_send_and_receive()
# queue is empty, so we should not send the message
client.ws_client.send.assert_not_called()
client.ws_client.recv.assert_called_once()
def test_stddaq_client_ws_send_and_receive_websocket_error(client):
"""
Test that the ws_send_and_receive method handles websocket errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.send.side_effect = WebSocketException()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_called_once()
def test_stddaq_client_ws_send_and_receive_timeout_error(client):
"""
Test that the ws_send_and_receive method handles timeout errors.
"""
client.ws_client = mock.MagicMock()
client.ws_client.recv.side_effect = TimeoutError()
client._send_queue.put({"command": "test"})
with mock.patch.object(client, "wait_for_connection") as wait_for_connection:
client._ws_send_and_receive()
wait_for_connection.assert_not_called()
def test_stddaq_client_ws_update_loop(client):
"""
Test that the ws_update_loop method runs the status callback.
"""
client._shutdown_event = mock.MagicMock()
client._shutdown_event.is_set.side_effect = [False, True]
with (
mock.patch.object(client, "_ws_send_and_receive") as ws_send_and_receive,
mock.patch.object(client, "_wait_for_server_running") as wait_for_server_running,
):
client._ws_update_loop()
ws_send_and_receive.assert_called_once()
wait_for_server_running.assert_called_once()

View File

@@ -64,59 +64,59 @@ femto_mean_curr:
# readOnly: false
# softwareTrigger: false
# es1_ismc:
# description: 'Automation1 iSMC interface'
# deviceClass: tomcat_bec.devices.aa1Controller
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:CTRL:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
es1_ismc:
description: 'Automation1 iSMC interface'
deviceClass: tomcat_bec.devices.aa1Controller
deviceConfig:
prefix: 'X02DA-ES1-SMP1:CTRL:'
deviceTags:
- es1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# es1_tasks:
# description: 'Automation1 task management interface'
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:TASK:'
# deviceTags:
# - es1
# enabled: false
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
es1_tasks:
description: 'Automation1 task management interface'
deviceClass: tomcat_bec.devices.aa1Tasks
deviceConfig:
prefix: 'X02DA-ES1-SMP1:TASK:'
deviceTags:
- es1
enabled: false
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# es1_psod:
# description: 'AA1 PSO output interface (trigger)'
# deviceClass: tomcat_bec.devices.aa1AxisPsoDistance
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:PSO:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
es1_psod:
description: 'AA1 PSO output interface (trigger)'
deviceClass: tomcat_bec.devices.aa1AxisPsoDistance
deviceConfig:
prefix: 'X02DA-ES1-SMP1:ROTY:PSO:'
deviceTags:
- es1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# es1_ddaq:
# description: 'Automation1 position recording interface'
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
es1_ddaq:
description: 'Automation1 position recording interface'
deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
deviceConfig:
prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
deviceTags:
- es1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
#camera:
@@ -132,69 +132,73 @@ femto_mean_curr:
# readoutPriority: monitored
# softwareTrigger: true
gfcam:
description: GigaFrost camera client
deviceClass: tomcat_bec.devices.GigaFrostCamera
deviceConfig:
prefix: 'X02DA-CAM-GF2:'
backend_url: 'http://sls-daq-001:8080'
auto_soft_enable: true
deviceTags:
- camera
- trigger
- gfcam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# gfcam:
# description: GigaFrost camera client
# deviceClass: tomcat_bec.devices.GigaFrostCamera
# deviceConfig:
# prefix: 'X02DA-CAM-GF2:'
# backend_url: 'http://sls-daq-001:8080'
# auto_soft_enable: true
# std_daq_live: 'tcp://129.129.95.111:20000'
# std_daq_ws: 'ws://129.129.95.111:8080'
# std_daq_rest: 'http://129.129.95.111:5000'
# deviceTags:
# - camera
# - trigger
# - gfcam
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
gfdaq:
description: GigaFrost stdDAQ client
deviceClass: tomcat_bec.devices.StdDaqClient
deviceConfig:
ws_url: 'ws://129.129.95.111:8080'
rest_url: 'http://129.129.95.111:5000'
data_source_name: 'gfcam'
deviceTags:
- std-daq
- daq
- gfcam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# gfdaq:
# description: GigaFrost stdDAQ client
# deviceClass: tomcat_bec.devices.StdDaqClient
# deviceConfig:
# ws_url: 'ws://129.129.95.111:8080'
# rest_url: 'http://129.129.95.111:5000'
# data_source_name: 'gfcam'
# deviceTags:
# - std-daq
# - gfcam
# enabled: false
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
gf_stream0:
description: stdDAQ preview (2 every 555)
deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
deviceConfig:
url: 'tcp://129.129.95.111:20000'
deviceTags:
- std-daq
- preview
- gfcam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# gf_stream0:
# description: stdDAQ preview (2 every 555)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20000'
# deviceTags:
# - std-daq
# - gfcam
# enabled: false
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
pcocam:
description: PCO.edge camera client
deviceClass: tomcat_bec.devices.PcoEdge5M
deviceConfig:
prefix: 'X02DA-CCDCAM2:'
deviceTags:
- camera
- trigger
- pcocam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# pcocam:
# description: PCO.edge camera client
# deviceClass: tomcat_bec.devices.PcoEdge5M
# deviceConfig:
# prefix: 'X02DA-CCDCAM2:'
# std_daq_live: 'tcp://129.129.95.111:20010'
# std_daq_ws: 'ws://129.129.95.111:8081'
# std_daq_rest: 'http://129.129.95.111:5010'
# deviceTags:
# - camera
# - trigger
# - pcocam
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
# pcodaq:
# description: GigaFrost stdDAQ client
@@ -202,28 +206,25 @@ pcocam:
# deviceConfig:
# ws_url: 'ws://129.129.95.111:8081'
# rest_url: 'http://129.129.95.111:5010'
# data_source_name: 'pcocam'
# deviceTags:
# - std-daq
# - daq
# - pcocam
# enabled: false
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
pco_stream0:
description: stdDAQ preview (2 every 555)
deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
deviceConfig:
url: 'tcp://129.129.95.111:20010'
deviceTags:
- std-daq
- preview
- pcocam
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: false
# pco_stream0:
# description: stdDAQ preview (2 every 555)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20010'
# deviceTags:
# - std-daq
# - pcocam
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false

View File

@@ -0,0 +1,191 @@
eyex:
readoutPriority: baseline
description: X-ray eye axis X
deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
deviceConfig:
prefix: MTEST-X05LA-ES2-XRAYEYE:M1
deviceTags:
- xray-eye
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
# eyey:
# readoutPriority: baseline
# description: X-ray eye axis Y
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
# deviceConfig:
# prefix: MTEST-X05LA-ES2-XRAYEYE:M2
# deviceTags:
# - xray-eye
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
# eyez:
# readoutPriority: baseline
# description: X-ray eye axis Z
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorEC
# deviceConfig:
# prefix: MTEST-X05LA-ES2-XRAYEYE:M3
# deviceTags:
# - xray-eye
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
femto_mean_curr:
readoutPriority: monitored
description: Femto mean current
deviceClass: ophyd.EpicsSignal
deviceConfig:
auto_monitor: true
read_pv: MTEST-X05LA-ES2-XRAYEYE:FEMTO-MEAN-CURR
deviceTags:
- xray-eye
onFailure: buffer
enabled: true
readOnly: true
softwareTrigger: false
# es1_roty:
# readoutPriority: baseline
# description: 'Test rotation stage'
# deviceClass: tomcat_bec.devices.psimotor.EpicsMotorMR
# deviceConfig:
# prefix: X02DA-ES1-SMP1:ROTY
# deviceTags:
# - es1-sam
# onFailure: buffer
# enabled: true
# readOnly: false
# softwareTrigger: false
# es1_ismc:
# description: 'Automation1 iSMC interface'
# deviceClass: tomcat_bec.devices.aa1Controller
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:CTRL:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# es1_tasks:
# description: 'Automation1 task management interface'
# deviceClass: tomcat_bec.devices.aa1Tasks
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:TASK:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# es1_psod:
# description: 'AA1 PSO output interface (trigger)'
# deviceClass: tomcat_bec.devices.aa1AxisPsoDistance
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:PSO:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
# es1_ddaq:
# description: 'Automation1 position recording interface'
# deviceClass: tomcat_bec.devices.aa1AxisDriveDataCollection
# deviceConfig:
# prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
# deviceTags:
# - es1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
#camera:
# description: Grashopper Camera
# deviceClass: tomcat_bec.devices.GrashopperTOMCAT
# deviceConfig:
# prefix: 'X02DA-PG-USB:'
# deviceTags:
# - camera
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: true
gfcam:
description: GigaFrost camera client
deviceClass: tomcat_bec.devices.GigaFrostCamera
deviceConfig:
prefix: 'X02DA-CAM-GF2:'
backend_url: 'http://sls-daq-001:8080'
std_daq_ws: 'ws://129.129.95.111:8080'
std_daq_rest: 'http://129.129.95.111:5000'
std_daq_live: 'tcp://129.129.95.111:20001'
auto_soft_enable: true
deviceTags:
- camera
- trigger
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: monitored
softwareTrigger: true
# gfdaq:
# description: GigaFrost stdDAQ client
# deviceClass: tomcat_bec.devices.StdDaqClient
# deviceConfig:
# ws_url: 'ws://129.129.95.111:8080'
# rest_url: 'http://129.129.95.111:5000'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# daq_stream0:
# description: stdDAQ preview (2 every 555)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20000'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false
# daq_stream1:
# description: stdDAQ preview (1 at 5 Hz)
# deviceClass: tomcat_bec.devices.StdDaqPreviewDetector
# deviceConfig:
# url: 'tcp://129.129.95.111:20001'
# deviceTags:
# - std-daq
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: monitored
# softwareTrigger: false

View File

@@ -57,7 +57,7 @@ es1_roty:
deviceTags:
- es1-sam
onFailure: buffer
enabled: true
enabled: false
readOnly: false
softwareTrigger: false
@@ -81,7 +81,7 @@ es1_tasks:
prefix: 'X02DA-ES1-SMP1:TASK:'
deviceTags:
- es1
enabled: true
enabled: false
onFailure: buffer
readOnly: false
readoutPriority: monitored
@@ -109,7 +109,7 @@ es1_ddaq:
prefix: 'X02DA-ES1-SMP1:ROTY:DDC:'
deviceTags:
- es1
enabled: true
enabled: false
onFailure: buffer
readOnly: false
readoutPriority: monitored

View File

@@ -8,74 +8,17 @@ drive data collection (DDC) interface.
import time
from collections import OrderedDict
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib import bec_logger
logger = bec_logger.logger
class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
NOTE: scripted scans start drive data collection internally
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs["ddc_trigger"]
if "ddc_source0" in scanargs:
d["ddc_source0"] = scanargs["ddc_source0"]
if "ddc_source1" in scanargs:
d["ddc_source1"] = scanargs["ddc_source1"]
# Number of points
if "ddc_num_points" in scanargs and scanargs["ddc_num_points"] is not None:
d["num_points_total"] = scanargs["ddc_num_points"]
elif "daq_num_points" in scanargs and scanargs["daq_num_points"] is not None:
d["num_points_total"] = scanargs["daq_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs["steps"] is not None:
num_points *= scanargs["steps"]
points_valid = True
if "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
if "repeats" in scanargs and scanargs["repeats"] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Stage the data collection if not in internally launced mode
# NOTE: Scripted scans might configure and start acquiring from the scrits
self.parent.arm()
def on_unstage(self):
"""Standard bluesky unstage"""
self.parent.disarm()
class aa1AxisDriveDataCollection(PSIDeviceBase):
class aa1AxisDriveDataCollection(PSIDeviceBase, Device):
"""Axis data collection
This class provides convenience wrappers around the Aerotech API's axis
@@ -94,9 +37,10 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
...
ret = yield from ddc.collect()
NOTE: scripted scans start drive data collection internally
NOTE: Expected behavior is that the device is disabled when not in use,
i.e. there's avtive enable/disable management.
i.e. there's active enable/disable management.
"""
# ########################################################################
@@ -117,8 +61,31 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
_buffer0 = Component(EpicsSignalRO, "BUFFER0", auto_monitor=True, kind=Kind.normal)
_buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.normal)
custom_prepare_cls = AerotechDriveDataCollectionMixin
USER_ACCESS = ["configure", "reset", "collect", "arm", "disarm"]
USER_ACCESS = ["configure", "reset", "arm", "disarm"]
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def configure(self, d: dict = None) -> tuple:
"""Configure data capture
@@ -134,24 +101,67 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
if "num_points_total" in d:
self.npoints.set(d["num_points_total"]).wait()
if "ddc_trigger" in d:
self._trigger.set(d['ddc_trigger']).wait()
self._trigger.set(d["ddc_trigger"]).wait()
if "ddc_source0" in d:
self._input0.set(d['ddc_source0']).wait()
self._input0.set(d["ddc_source0"]).wait()
if "ddc_source1" in d:
self._input1.set(d['ddc_source1']).wait()
self._input1.set(d["ddc_source1"]).wait()
# Reset incremental readback
self._switch.set("ResetRB", settle_time=0.1).wait()
new = self.read_configuration()
return (old, new)
def on_stage(self) -> None:
"""Configuration and staging"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
# NOTE: Scans don't have to fully configure the device
if "ddc_trigger" in scanargs:
d["ddc_trigger"] = scanargs["ddc_trigger"]
if "ddc_num_points" in scanargs:
d["num_points_total"] = scanargs["ddc_num_points"]
else:
# Try to figure out number of points
num_points = 1
points_valid = False
if "steps" in scanargs and scanargs["steps"] is not None:
num_points *= scanargs["steps"]
points_valid = True
elif "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
num_points *= scanargs["exp_burst"]
points_valid = True
elif "repeats" in scanargs and scanargs["repeats"] is not None:
num_points *= scanargs["repeats"]
points_valid = True
if points_valid:
d["num_points_total"] = num_points
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# Stage the data collection if not in internally launced mode
# NOTE: Scripted scans start acquiring from the scrits
if self.scaninfo.scan_type not in ("script", "scripted"):
self.arm()
# Reset readback
self.reset()
def on_unstage(self):
"""Standard bluesky unstage"""
self.disarm()
def arm(self) -> None:
"""Bluesky-style stage"""
self._switch.set("ResetRB", settle_time=0.1).wait()
self._switch.set("Start", settle_time=0.2).wait()
def disarm(self):
"""Bluesky-style unstage"""
"""Standard bluesky unstage"""
self._switch.set("Stop", settle_time=0.2).wait()
def reset(self):
@@ -175,6 +185,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
timestamp_ = timestamp
return result
status = None
if index == 0:
status = SubscriptionStatus(self._readstatus0, neg_edge, settle_time=0.5)
self._readback0.set(1).wait()
@@ -182,15 +193,14 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
status = SubscriptionStatus(self._readstatus1, neg_edge, settle_time=0.5)
self._readback1.set(1).wait()
else:
raise ValueError(f"[{self.name}] Invalid data acquisition channel {index}")
raise RuntimeError(f"Unsupported drive data collection channel: {index}")
# Start asynchronous readback
status.wait()
return status
def describe_collect(self) -> OrderedDict:
"""Describes collected array format according to JSONschema
"""
"""Describes collected array format according to JSONschema"""
ret = OrderedDict()
ret["buffer0"] = {
"source": "internal",

View File

@@ -0,0 +1,22 @@
{
"detector_name": "tomcat-gf",
"detector_type": "gigafrost",
"n_modules": 8,
"bit_depth": 16,
"image_pixel_height": 2016,
"image_pixel_width": 2016,
"start_udp_port": 2000,
"writer_user_id": 18600,
"max_number_of_forwarders_spawned": 8,
"use_all_forwarders": true,
"module_sync_queue_size": 4096,
"number_of_writers": 12,
"module_positions": {},
"ram_buffer_gb": 150,
"delay_filter_timeout": 10,
"live_stream_configs": {
"tcp://129.129.95.111:20000": { "type": "periodic", "config": [1, 5] },
"tcp://129.129.95.111:20001": { "type": "periodic", "config": [1, 5] },
"tcp://129.129.95.38:20000": { "type": "periodic", "config": [1, 1] }
}
}

View File

@@ -0,0 +1,317 @@
"""
This module contains the PV definitions for the Gigafrost camera at Tomcat. It
does not contain any logic to control the camera.
"""
from ophyd import Component as Cpt
from ophyd import Device, DynamicDeviceComponent, EpicsSignal, EpicsSignalRO, Kind, Signal
import tomcat_bec.devices.gigafrost.gfconstants as const
from tomcat_bec.devices.gigafrost.gfutils import extend_header_table
class GigaFrostBase(Device):
"""Ophyd device class to control Gigafrost cameras at Tomcat
The actual hardware is implemented by an IOC based on an old fork of Helge's
cameras. This means that the camera behaves differently than the SF cameras
in particular it provides even less feedback about it's internal progress.
Helge will update the GigaFrost IOC after working beamline.
The ophyd class is based on the 'gfclient' package and has a lot of Tomcat
specific additions. It does behave differently though, as ophyd swallows the
errors from failed PV writes.
Parameters
----------
use_soft_enable : bool
Flag to use the camera's soft enable (default: False)
backend_url : str
Backend url address necessary to set up the camera's udp header.
(default: http://xbl-daq-23:8080)
Bugs:
----------
FRAMERATE : Ignored in soft trigger mode, period becomes 2xExposure time
"""
# pylint: disable=too-many-instance-attributes
# Standard camera configs
acquire = Cpt(EpicsSignal, "START_CAM", put_complete=True, kind=Kind.omitted)
acquire_time = Cpt(
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config
)
acquire_period = Cpt(
EpicsSignal, "FRAMERATE", put_complete=True, auto_monitor=True, kind=Kind.config
)
num_exposures = Cpt(
EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config
)
array_size = DynamicDeviceComponent(
{
"array_size_x": (EpicsSignal, "ROIX", {"auto_monitor": True, "put_complete": True}),
"array_size_y": (EpicsSignal, "ROIY", {"auto_monitor": True, "put_complete": True}),
},
doc="Size of the array in the XY dimensions",
)
# DAQ parameters
file_path = Cpt(Signal, kind=Kind.config, value="/gpfs/test/test-beamline")
file_prefix = Cpt(Signal, kind=Kind.config, value="scan_")
num_images = Cpt(Signal, kind=Kind.config, value=1000)
num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0)
# GF specific interface
acquire_block = Cpt(Signal, kind=Kind.config, value=0)
busy_stat = Cpt(EpicsSignalRO, "BUSY_STAT", auto_monitor=True)
sync_flag = Cpt(EpicsSignalRO, "SYNC_FLAG", auto_monitor=True)
sync_swhw = Cpt(EpicsSignal, "SYNC_SWHW.PROC", put_complete=True, kind=Kind.omitted)
set_param = Cpt(EpicsSignal, "SET_PARAM.PROC", put_complete=True, kind=Kind.omitted)
acqmode = Cpt(EpicsSignal, "ACQMODE", put_complete=True, kind=Kind.config)
scan_id = Cpt(EpicsSignal, "SCAN_ID", put_complete=True, auto_monitor=True, kind=Kind.config)
corr_mode = Cpt(
EpicsSignal, "CORR_MODE", put_complete=True, auto_monitor=True, kind=Kind.config
)
# Software signals
soft_enable = Cpt(EpicsSignal, "SOFT_ENABLE", put_complete=True)
soft_trig = Cpt(EpicsSignal, "SOFT_TRIG.PROC", put_complete=True, kind=Kind.omitted)
soft_exp = Cpt(EpicsSignal, "SOFT_EXP", put_complete=True)
###############################################################################################
# Automatically set modes on camera init
auto_soft_enable = Cpt(Signal, kind=Kind.config, metadata={"write_access": False})
###############################################################################################
# Enable schemes
# NOTE: 0 physical, 1 virtual (i.e. always running, but logs enable signal)
mode_enbl_exp = Cpt(
EpicsSignal,
"MODE_ENBL_EXP_RBV",
write_pv="MODE_ENBL_EXP",
put_complete=True,
kind=Kind.config,
)
# Enable signals (combined by OR gate)
mode_enbl_ext = Cpt(
EpicsSignal,
"MODE_ENBL_EXT_RBV",
write_pv="MODE_ENBL_EXT",
put_complete=True,
kind=Kind.config,
)
mode_endbl_soft = Cpt(
EpicsSignal,
"MODE_ENBL_SOFT_RBV",
write_pv="MODE_ENBL_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_enbl_auto = Cpt(
EpicsSignal,
"MODE_ENBL_AUTO_RBV",
write_pv="MODE_ENBL_AUTO",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Trigger modes
mode_trig_ext = Cpt(
EpicsSignal,
"MODE_TRIG_EXT_RBV",
write_pv="MODE_TRIG_EXT",
put_complete=True,
kind=Kind.config,
)
mode_trig_soft = Cpt(
EpicsSignal,
"MODE_TRIG_SOFT_RBV",
write_pv="MODE_TRIG_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_trig_timer = Cpt(
EpicsSignal,
"MODE_TRIG_TIMER_RBV",
write_pv="MODE_TRIG_TIMER",
put_complete=True,
kind=Kind.config,
)
mode_trig_auto = Cpt(
EpicsSignal,
"MODE_TRIG_AUTO_RBV",
write_pv="MODE_TRIG_AUTO",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Exposure modes
# NOTE: I.e.exposure time control, usually TIMER
mode_exp_ext = Cpt(
EpicsSignal,
"MODE_EXP_EXT_RBV",
write_pv="MODE_EXP_EXT",
put_complete=True,
kind=Kind.config,
)
mode_exp_soft = Cpt(
EpicsSignal,
"MODE_EXP_SOFT_RBV",
write_pv="MODE_EXP_SOFT",
put_complete=True,
kind=Kind.config,
)
mode_exp_timer = Cpt(
EpicsSignal,
"MODE_EXP_TIMER_RBV",
write_pv="MODE_EXP_TIMER",
put_complete=True,
kind=Kind.config,
)
###############################################################################################
# Trigger configuration PVs
# NOTE: Theese PVs set the behavior on posedge and negedge of the trigger signal
cnt_startbit = Cpt(
EpicsSignal,
"CNT_STARTBIT_RBV",
write_pv="CNT_STARTBIT",
put_complete=True,
kind=Kind.config,
)
cnt_endbit = Cpt(
EpicsSignal, "CNT_ENDBIT_RBV", write_pv="CNT_ENDBIT", put_complete=True, kind=Kind.config
)
# HW settings as read only
pixrate = Cpt(EpicsSignalRO, "PIXRATE", auto_monitor=True, kind=Kind.config)
trig_delay = Cpt(EpicsSignalRO, "TRIG_DELAY", auto_monitor=True, kind=Kind.config)
syncout_dly = Cpt(EpicsSignalRO, "SYNCOUT_DLY", auto_monitor=True, kind=Kind.config)
bnc0_rbv = Cpt(EpicsSignalRO, "BNC0_RBV", auto_monitor=True, kind=Kind.config)
bnc1_rbv = Cpt(EpicsSignalRO, "BNC1_RBV", auto_monitor=True, kind=Kind.config)
bnc2_rbv = Cpt(EpicsSignalRO, "BNC2_RBV", auto_monitor=True, kind=Kind.config)
bnc3_rbv = Cpt(EpicsSignalRO, "BNC3_RBV", auto_monitor=True, kind=Kind.config)
bnc4_rbv = Cpt(EpicsSignalRO, "BNC4_RBV", auto_monitor=True, kind=Kind.config)
bnc5_rbv = Cpt(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config)
t_board = Cpt(EpicsSignalRO, "T_BOARD", auto_monitor=True)
### HW configuration parameters
# TODO: Only used at INIT, signals not needed
# UDP header configuration parameters
mac_north = Cpt(Signal, kind=Kind.config)
mac_south = Cpt(Signal, kind=Kind.config)
ip_north = Cpt(Signal, kind=Kind.config)
ip_south = Cpt(Signal, kind=Kind.config)
udp_backend_url = Cpt(Signal, kind=Kind.config, metadata={"write_access": False})
udp_ports = Cpt(EpicsSignal, "PORTS", put_complete=True, kind=Kind.config)
udp_framenum = Cpt(EpicsSignal, "FRAMENUM", put_complete=True, kind=Kind.config)
udp_ht_offset = Cpt(EpicsSignal, "HT_OFFSET", put_complete=True, kind=Kind.config)
udp_write_srv = Cpt(EpicsSignal, "WRITE_SRV.PROC", put_complete=True, kind=Kind.omitted)
conn_parm = Cpt(EpicsSignal, "CONN_PARM", string=True, put_complete=True, kind=Kind.config)
# Line swap selection
ls_sw = Cpt(EpicsSignal, "LS_SW", put_complete=True, kind=Kind.config)
ls_nw = Cpt(EpicsSignal, "LS_NW", put_complete=True, kind=Kind.config)
ls_se = Cpt(EpicsSignal, "LS_SE", put_complete=True, kind=Kind.config)
ls_ne = Cpt(EpicsSignal, "LS_NE", put_complete=True, kind=Kind.config)
# pylint: disable=protected-access
def _define_backend_ip(self):
"""Select backend IP address for UDP stream"""
if self.udp_backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_IP, const.BE3_SOUTH_IP
if self.udp_backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_IP, const.BE999_SOUTH_IP
raise RuntimeError(f"Backend {self.udp_backend_url.get()} not recognized.")
def _define_backend_mac(self):
"""Select backend MAC address for UDP stream"""
if self.udp_backend_url.get() == const.BE3_DAFL_CLIENT: # xbl-daq-33
return const.BE3_NORTH_MAC, const.BE3_SOUTH_MAC
if self.udp_backend_url.get() == const.BE999_DAFL_CLIENT:
return const.BE999_NORTH_MAC, const.BE999_SOUTH_MAC
raise RuntimeError(f"Backend {self.udp_backend_url.get()} not recognized.")
def _build_udp_header_table(self):
"""Build the header table for the UDP communication"""
udp_header_table = []
for i in range(0, 64, 1):
for j in range(0, 8, 1):
dest_port = 2000 + 8 * i + j
source_port = 3000 + j
if j < 4:
extend_header_table(
udp_header_table,
self.mac_south.get(),
self.ip_south.get(),
dest_port,
source_port,
)
else:
extend_header_table(
udp_header_table,
self.mac_north.get(),
self.ip_north.get(),
dest_port,
source_port,
)
return udp_header_table
def initialize_gigafrost(self) -> None:
"""Initialize the camera, set channel values"""
# Stop acquisition
self.acquire.set(0).wait()
# set entry to UDP table
# number of UDP ports to use
self.udp_ports.set(2).wait()
# number of images to send to each UDP port before switching to next
self.udp_framenum.set(5).wait()
# offset in UDP table - where to find the first entry
self.udp_ht_offset.set(0).wait()
# activate changes
self.udp_write_srv.set(1).wait()
# Configure triggering if needed
if self.auto_soft_enable.get():
# Set modes
# self.fix_nframes_mode = "start"
self.cnt_startbit.set(1).wait()
self.cnt_endbit.set(0).wait()
# self.enable_mode = "soft"
self.mode_enbl_ext.set(0).wait()
self.mode_endbl_soft.set(1).wait()
self.mode_enbl_auto.set(0).wait()
# self.trigger_mode = "auto"
self.mode_trig_auto.set(1).wait()
self.mode_trig_soft.set(0).wait()
self.mode_trig_timer.set(0).wait()
self.mode_trig_ext.set(0).wait()
# self.exposure_mode = "timer"
self.mode_exp_ext.set(0).wait()
self.mode_exp_soft.set(0).wait()
self.mode_exp_timer.set(1).wait()
# line swap - on for west, off for east
self.ls_sw.set(1).wait()
self.ls_nw.set(1).wait()
self.ls_se.set(0).wait()
self.ls_ne.set(0).wait()
# Commit parameters
self.set_param.set(1).wait()
# Initialize data backend
n, s = self._define_backend_ip()
self.ip_north.put(n, force=True)
self.ip_south.put(s, force=True)
n, s = self._define_backend_mac()
self.mac_north.put(n, force=True)
self.mac_south.put(s, force=True)
# Set udp header table (data communication parameters)
self.conn_parm.set(self._build_udp_header_table()).wait()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
"""
Standard DAQ preview image stream module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
from time import sleep, time
import threading
import zmq
import json
ZMQ_TOPIC_FILTER = b""
class PcoTestConsumer:
"""Detector wrapper class around the StdDaq preview image stream.
This was meant to provide live image stream directly from the StdDAQ.
Note that the preview stream must be already throtled in order to cope
with the incoming data and the python class might throttle it further.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
_shutdown_event = threading.Event()
_monitor_mutex = threading.Lock()
_monitor_thread = None
# Status attributes
_url = None
_image = None
_frame = None
_socket = None
def __init__(self, url: str = "tcp://129.129.95.38:20000") -> None:
super().__init__()
self._url = url
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface"""
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url)
def disconnect(self):
"""Disconnect"""
try:
if self._socket is not None:
self._socket.disconnect(self.url)
except zmq.ZMQError:
pass
finally:
self._socket = None
@property
def url(self):
return self._url
@property
def image(self):
return self._image
@property
def frame(self):
return self._frame
# pylint: disable=protected-access
def start(self):
"""Start listening for preview data stream"""
if self._monitor_mutex.locked():
raise RuntimeError("Only one consumer permitted")
self.connect()
self._mon = threading.Thread(target=self.poll, daemon=True)
self._mon.start()
def stop(self):
"""Stop a running preview"""
self._shutdown_event.set()
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket
self.disconnect()
self._shutdown_event.clear()
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
print("Starting monitor")
with self._monitor_mutex:
while not self._shutdown_event.is_set():
try:
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
continue
# # Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
self.header = header
# if header["type"] == "uint16":
# image = np.frombuffer(data, dtype=np.uint16)
# if image.size != np.prod(header['shape']):
# err = f"Unexpected array size of {image.size} for header: {header}"
# raise ValueError(err)
# image = image.reshape(header['shape'])
# # Update image and update subscribers
# self._frame = header['frame']
# self._image = image
t_last = t_curr
# print(
# f"[{self.name}] Updated frame {header['frame']}\t"
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
# )
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
print(f"{str(ex)}")
raise
finally:
try:
self._socket.disconnect(self.url)
except RuntimeError:
pass
self._monitor_thread = None
print(f"Detaching monitor")
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = PcoTestConsumer(url="tcp://10.4.0.82:8080")
daq.start()
sleep(500)
daq.stop()

View File

@@ -9,6 +9,7 @@ Created on Thu Jun 27 17:28:43 2024
from time import sleep, time
from threading import Thread
import zmq
import json
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
@@ -55,6 +56,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
"""Collect streamed updates"""
try:
t_last = time()
print("Starting monitor")
while True:
try:
# Exit loop and finish monitoring
@@ -63,7 +65,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
break
# pylint: disable=no-member
r = self.parent._socket.recv()
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
t_curr = time()
@@ -71,11 +73,11 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
if t_elapsed < self.parent.throttle.get():
continue
# # Unpack the Array V1 reply to metadata and array data
# meta, data = r
# print(meta)
meta, data = r
# # Update image and update subscribers
# header = json.loads(meta)
# Update image and update subscribers
header = json.loads(meta)
self.parent.header = header
# if header["type"] == "uint16":
# image = np.frombuffer(data, dtype=np.uint16)
# if image.size != np.prod(header['shape']):
@@ -105,7 +107,7 @@ class PcoTestConsumerMixin(CustomDetectorMixin):
raise
finally:
try:
self.parent._socket.disconnect()
self.parent._socket.disconnect(self.parent.url.get())
except RuntimeError:
pass
self.parent._mon = None
@@ -128,6 +130,8 @@ class PcoTestConsumer(PSIDetectorBase):
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
header = None
custom_prepare_cls = PcoTestConsumerMixin
# Status attributes

View File

@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 6 11:33:54 2023
@author: mohacsi_i
"""
from ophyd import Component as Cpt
from ophyd import Device, DynamicDeviceComponent, EpicsSignal, EpicsSignalRO, Kind, Signal
class PcoEdgeBase(Device):
"""Ophyd baseclass for Helge camera IOCs
This class provides wrappers for Helge's camera IOCs around SwissFEL and
for high performance SLS 2.0 cameras. The IOC's operation is a bit arcane
and there are different versions and cameras all around. So this device
only covers the absolute basics.
Probably the most important part is the configuration state machine. As
the SET_PARAMS takes care of buffer allocations it might take some time,
as well as a full re-configuration is required every time we change the
binning, roi, etc... This is automatically performed upon starting an
exposure (if it heven't been done before).
The status flag state machine during re-configuration is:
BUSY low, SET low -> BUSY high, SET low -> BUSY low, SET high -> BUSY low, SET low
UPDATE: Data sending operation modes
- Switch to ZMQ streaming by setting FILEFORMAT to ZEROMQ
- Set SAVESTART and SAVESTOP to select a ROI of image indices
- Start file transfer with FTRANSFER.
The ZMQ connection operates in PUSH-PULL mode, i.e. it needs incoming connection.
STOREMODE sets the acquisition mode:
if STOREMODE == Recorder
Fills up the buffer with images. Here SAVESTART and SAVESTOP selects a ROI
of image indices to be streamed out (i.e. maximum buffer_size number of images)
if STOREMODE == FIFO buffer
Continously streams out data using the buffer as a FIFO queue.
Here SAVESTART and SAVESTOP selects a ROI of image indices to be streamed continously
(i.e. a large SAVESTOP streams indefinitely). Note that in FIFO mode buffer reads are
destructive. to prevent this, we don't have EPICS preview
"""
# ########################################################################
# General hardware info (in AD nomenclature)
manufacturer = Cpt(EpicsSignalRO, "QUERY", kind=Kind.config, doc="Camera manufacturer info")
model = Cpt(EpicsSignalRO, "BOARD", kind=Kind.omitted, doc="Camera board info")
# ########################################################################
# Acquisition configuration (in AD nomenclature)
acquire = Cpt(EpicsSignal, "CAMERASTATUS", put_complete=True, kind=Kind.omitted)
acquire_time = Cpt(
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config
)
acquire_delay = Cpt(
EpicsSignal, "DELAY", put_complete=True, auto_monitor=True, kind=Kind.config
)
trigger_mode = Cpt(
EpicsSignal, "TRIGGER", put_complete=True, auto_monitor=True, kind=Kind.config
)
# num_exposures = Cpt(
# EpicsSignal, "CNT_NUM", put_complete=True, auto_monitor=True, kind=Kind.config
# )
array_size = DynamicDeviceComponent(
{
"array_size_x": (EpicsSignal, "WIDTH", {"auto_monitor": True, "put_complete": True}),
"array_size_y": (EpicsSignal, "HEIGHT", {"auto_monitor": True, "put_complete": True}),
},
doc="Size of the array in the XY dimensions",
)
# DAQ parameters
file_path = Cpt(Signal, kind=Kind.config, value="/gpfs/test/test-beamline")
file_prefix = Cpt(Signal, kind=Kind.config, value="scan_")
num_images = Cpt(Signal, kind=Kind.config, value=1000)
num_images_counter = Cpt(Signal, kind=Kind.hinted, value=0)
# GF specific interface
acquire_block = Cpt(Signal, kind=Kind.config, value=0)
# ########################################################################
# Image size configuration (in AD nomenclature)
bin_x = Cpt(EpicsSignal, "BINX", put_complete=True, auto_monitor=True, kind=Kind.config)
bin_y = Cpt(EpicsSignal, "BINY", put_complete=True, auto_monitor=True, kind=Kind.config)
# ########################################################################
# Additional status info
busy = Cpt(EpicsSignalRO, "BUSY", auto_monitor=True, kind=Kind.config)
camState = Cpt(EpicsSignalRO, "SS_CAMERA", auto_monitor=True, kind=Kind.config)
camProgress = Cpt(EpicsSignalRO, "CAMPROGRESS", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Configuration state maschine with separate transition states
set_param = Cpt(
EpicsSignal,
"BUSY_SET_PARAM",
write_pv="SET_PARAM",
put_complete=True,
auto_monitor=True,
kind=Kind.config,
)
camera_statuscode = Cpt(EpicsSignalRO, "STATUSCODE", auto_monitor=True, kind=Kind.config)
camera_init = Cpt(EpicsSignalRO, "INIT", auto_monitor=True, kind=Kind.config)
camera_init_busy = Cpt(EpicsSignalRO, "BUSY_INIT", auto_monitor=True, kind=Kind.config)
# camCamera = Cpt(EpicsSignalRO, "CAMERA", auto_monitor=True, kind=Kind.config)
# camCameraBusy = Component(EpicsSignalRO, "BUSY_CAMERA", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Acquisition configuration
acquire_mode = Cpt(EpicsSignalRO, "ACQMODE", auto_monitor=True, kind=Kind.config)
acquire_trigger = Cpt(EpicsSignalRO, "TRIGGER", auto_monitor=True, kind=Kind.config)
# acqTriggerSource = Component(
# EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config)
# acqTriggerEdge = Component(EpicsSignalRO, "TRIGGEREDGE", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Buffer configuration
bufferRecMode = Cpt(EpicsSignalRO, "RECMODE", auto_monitor=True, kind=Kind.config)
bufferStoreMode = Cpt(EpicsSignal, "STOREMODE", auto_monitor=True, kind=Kind.config)
fileRecMode = Cpt(EpicsSignalRO, "RECMODE", auto_monitor=True, kind=Kind.config)
buffer_used = Cpt(EpicsSignalRO, "PIC_BUFFER", auto_monitor=True, kind=Kind.normal)
buffer_size = Cpt(EpicsSignalRO, "PIC_MAX", auto_monitor=True, kind=Kind.normal)
buffer_clear = Cpt(EpicsSignal, "CLEARMEM", put_complete=True, kind=Kind.omitted)
# ########################################################################
# File saving/streaming interface
cam_data_rate = Cpt(EpicsSignalRO, "CAMRATE", auto_monitor=True, kind=Kind.normal)
file_data_rate = Cpt(EpicsSignalRO, "FILERATE", auto_monitor=True, kind=Kind.normal)
file_savestart = Cpt(EpicsSignal, "SAVESTART", put_complete=True, kind=Kind.config)
file_savestop = Cpt(EpicsSignal, "SAVESTOP", put_complete=True, kind=Kind.config)
file_format = Cpt(EpicsSignal, "FILEFORMAT", put_complete=True, kind=Kind.config)
file_transfer = Cpt(EpicsSignal, "FTRANSFER", put_complete=True, kind=Kind.config)
file_savebusy = Cpt(EpicsSignalRO, "FILESAVEBUSY", auto_monitor=True, kind=Kind.normal)
# ########################################################################
# Throtled image preview
image = Cpt(EpicsSignalRO, "FPICTURE", kind=Kind.omitted, doc="Throttled image preview")
# ########################################################################
# General hardware info
camError = Cpt(EpicsSignalRO, "ERRCODE", auto_monitor=True, kind=Kind.config)
camWarning = Cpt(EpicsSignalRO, "WARNCODE", auto_monitor=True, kind=Kind.config)
@property
def state(self) -> str:
"""Single word camera state"""
if self.set_param.value:
return "BUSY"
if self.camera_statuscode.value == 2 and self.camera_init.value == 1:
return "IDLE"
if self.camera_statuscode.value == 6 and self.camera_init.value == 1:
return "RUNNING"
# if self.camRemoval.value==0 and self.camInit.value==0:
if self.camera_init.value == 0:
return "OFFLINE"
# if self.camRemoval.value:
# return "REMOVED"
return "UNKNOWN"
@state.setter
def state(self):
raise RuntimeError("State is a ReadOnly property")
# Automatically connect to test camera if directly invoked
if __name__ == "__main__":
# Drive data collection
cam = PcoEdgeBase("X02DA-CCDCAM2:", name="mcpcam")
cam.wait_for_connection()

View File

@@ -5,145 +5,21 @@ Created on Wed Dec 6 11:33:54 2023
@author: mohacsi_i
"""
import time
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
import numpy as np
from ophyd.status import SubscriptionStatus, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as BECDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomPrepare,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("PcoEdgeCam")
from tomcat_bec.devices.gigafrost.pcoedge_base import PcoEdgeBase
from tomcat_bec.devices.gigafrost.std_daq_preview import StdDaqPreview
from tomcat_bec.devices.gigafrost.std_daq_client import StdDaqClient, StdDaqStatus
class PcoEdgeCameraMixin(CustomPrepare):
"""Mixin class to setup the Helge camera bae class.
from bec_lib.logger import bec_logger
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
# pylint: disable=protected-access
def on_stage(self) -> None:
"""Configure and arm PCO.Edge camera for acquisition"""
# PCO can finish a run without explicit unstaging
if self.parent.state not in ("IDLE"):
logger.warning(
f"Trying to stage the camera from state {self.parent.state}, unstaging it first!"
)
self.parent.unstage()
time.sleep(0.5)
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
d = {}
if "kwargs" in scanparam:
scanargs = scanparam["kwargs"]
if "exp_burst" in scanargs and scanargs["exp_burst"] is not None:
d["exposure_num_burst"] = scanargs["exp_burst"]
if "image_width" in scanargs and scanargs["image_width"] is not None:
d["image_width"] = scanargs["image_width"]
if "image_height" in scanargs and scanargs["image_height"] is not None:
d["image_height"] = scanargs["image_height"]
if "exp_time" in scanargs and scanargs["exp_time"] is not None:
d["exposure_time_ms"] = scanargs["exp_time"]
if "exp_period" in scanargs and scanargs["exp_period"] is not None:
d["exposure_period_ms"] = scanargs["exp_period"]
if "acq_time" in scanargs and scanargs["acq_time"] is not None:
d["exposure_time_ms"] = scanargs["acq_time"]
if "acq_period" in scanargs and scanargs["acq_period"] is not None:
d["exposure_period_ms"] = scanargs["acq_period"]
# if 'exp_burst' in scanargs and scanargs['exp_burst'] is not None:
# d['exposure_num_burst'] = scanargs['exp_burst']
# if 'acq_mode' in scanargs and scanargs['acq_mode'] is not None:
# d['acq_mode'] = scanargs['acq_mode']
# elif self.parent.scaninfo.scan_type == "step":
# d['acq_mode'] = "default"
if "pco_store_mode" in scanargs and scanargs["pco_store_mode"] is not None:
d["store_mode"] = scanargs["pco_store_mode"]
if "pco_data_format" in scanargs and scanargs["pco_data_format"] is not None:
d["data_format"] = scanargs["pco_data_format"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# ARM the camera
self.parent.arm()
def on_unstage(self) -> None:
"""Disarm the PCO.Edge camera"""
self.parent.disarm()
def on_stop(self) -> None:
"""Stop the PCO.Edge camera"""
self.parent.disarm()
def on_trigger(self) -> None | DeviceStatus:
"""Trigger mode operation
Use it to repeatedly record a fixed number of frames and send it to stdDAQ. The method waits
for the acquisition and data transfer to complete.
NOTE: Maciej confirmed that sparse data is no problem to the stdDAQ.
TODO: Optimize data transfer to launch at end and check completion at the beginning.
"""
# Ensure that previous data transfer finished
# def sentIt(*args, value, timestamp, **kwargs):
# return value==0
# status = SubscriptionStatus(self.parent.file_savebusy, sentIt, timeout=120)
# status.wait()
# Not sure if it always sends the first batch of images or the newest
def wait_bufferreset(*, old_value, value, timestamp, **_):
return (value < old_value) or (value == 0)
self.parent.buffer_clear.set(1).wait()
status = SubscriptionStatus(self.parent.buffer_used, wait_bufferreset, timeout=5)
status.wait()
t_expected = (
self.parent.acquire_time.get() + self.parent.acquire_delay.get()
) * self.parent.file_savestop.get()
# Wait until the buffer fills up with enough images
def wait_acquisition(*, value, timestamp, **_):
num_target = self.parent.file_savestop.get()
# logger.warning(f"{value} of {num_target}")
return bool(value >= num_target)
max_wait = max(5, 5 * t_expected)
status = SubscriptionStatus(
self.parent.buffer_used, wait_acquisition, timeout=max_wait, settle_time=0.2
)
status.wait()
# Then start file transfer (need to get the save busy flag update)
# self.parent.file_transfer.set(1, settle_time=0.2).wait()
self.parent.file_transfer.set(1).wait()
# And wait until the images have been sent
# NOTE: this does not wait for new value, the first check will be
# against values from the previous cycle, i.e. pass automatically.
t_start = time.time()
def wait_sending(*args, old_value, value, timestamp, **kwargs):
t_elapsed = timestamp - t_start
# logger.warning(f"{old_value}\t{value}\t{t_elapsed}")
return old_value == 1 and value == 0 and t_elapsed > 0
status = SubscriptionStatus(
self.parent.file_savebusy, wait_sending, timeout=120, settle_time=0.2
)
status.wait()
logger = bec_logger.logger
class HelgeCameraBase(BECDeviceBase):
class PcoEdge5M(PSIDeviceBase, PcoEdgeBase):
"""Ophyd baseclass for Helge camera IOCs
This class provides wrappers for Helge's camera IOCs around SwissFEL and
@@ -179,105 +55,57 @@ class HelgeCameraBase(BECDeviceBase):
destructive. to prevent this, we don't have EPICS preview
"""
# ########################################################################
# General hardware info (in AD nomenclature)
manufacturer = Component(EpicsSignalRO, "QUERY", kind=Kind.config, doc="Camera model info")
model = Component(EpicsSignalRO, "BOARD", kind=Kind.omitted, doc="Camera board info")
# pylint: disable=too-many-instance-attributes
USER_ACCESS = [
"complete",
"backend",
# "acq_done",
"live_preview",
"arm",
"disarm",
]
# ########################################################################
# Acquisition commands
camStatusCmd = Component(EpicsSignal, "CAMERASTATUS", put_complete=True, kind=Kind.config)
# Placeholders for stdDAQ and livestream clients
backend = None
live_preview = None
# ########################################################################
# Acquisition configuration (in AD nomenclature)
acquire_time = Component(
EpicsSignal, "EXPOSURE", put_complete=True, auto_monitor=True, kind=Kind.config
)
acquire_delay = Component(
EpicsSignal, "DELAY", put_complete=True, auto_monitor=True, kind=Kind.config
)
trigger_mode = Component(
EpicsSignal, "TRIGGER", put_complete=True, auto_monitor=True, kind=Kind.config
)
# ########################################################################
# Image size configuration (in AD nomenclature)
bin_x = Component(EpicsSignal, "BINX", put_complete=True, auto_monitor=True, kind=Kind.config)
bin_y = Component(EpicsSignal, "BINY", put_complete=True, auto_monitor=True, kind=Kind.config)
array_size_x = Component(
EpicsSignalRO, "WIDTH", auto_monitor=True, kind=Kind.config, doc="Final image width"
)
array_size_y = Component(
EpicsSignalRO, "HEIGHT", auto_monitor=True, kind=Kind.config, doc="Final image height"
)
# ########################################################################
# General hardware info
camError = Component(EpicsSignalRO, "ERRCODE", auto_monitor=True, kind=Kind.config)
camWarning = Component(EpicsSignalRO, "WARNCODE", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Buffer configuration
bufferRecMode = Component(EpicsSignalRO, "RECMODE", auto_monitor=True, kind=Kind.config)
bufferStoreMode = Component(EpicsSignal, "STOREMODE", auto_monitor=True, kind=Kind.config)
fileRecMode = Component(EpicsSignalRO, "RECMODE", auto_monitor=True, kind=Kind.config)
buffer_used = Component(EpicsSignalRO, "PIC_BUFFER", auto_monitor=True, kind=Kind.normal)
buffer_size = Component(EpicsSignalRO, "PIC_MAX", auto_monitor=True, kind=Kind.normal)
buffer_clear = Component(EpicsSignal, "CLEARMEM", put_complete=True, kind=Kind.omitted)
# ########################################################################
# File saving interface
cam_data_rate = Component(EpicsSignalRO, "CAMRATE", auto_monitor=True, kind=Kind.normal)
file_data_rate = Component(EpicsSignalRO, "FILERATE", auto_monitor=True, kind=Kind.normal)
file_savestart = Component(EpicsSignal, "SAVESTART", put_complete=True, kind=Kind.config)
file_savestop = Component(EpicsSignal, "SAVESTOP", put_complete=True, kind=Kind.config)
file_format = Component(EpicsSignal, "FILEFORMAT", put_complete=True, kind=Kind.config)
file_transfer = Component(EpicsSignal, "FTRANSFER", put_complete=True, kind=Kind.config)
file_savebusy = Component(EpicsSignalRO, "FILESAVEBUSY", auto_monitor=True, kind=Kind.normal)
# ########################################################################
# Configuration state maschine with separate transition states
camStatusCode = Component(EpicsSignalRO, "STATUSCODE", auto_monitor=True, kind=Kind.config)
camSetParam = Component(EpicsSignal, "SET_PARAM", auto_monitor=True, kind=Kind.config)
camSetParamBusy = Component(
EpicsSignalRO, "BUSY_SET_PARAM", auto_monitor=True, kind=Kind.config
)
camCamera = Component(EpicsSignalRO, "CAMERA", auto_monitor=True, kind=Kind.config)
camCameraBusy = Component(EpicsSignalRO, "BUSY_CAMERA", auto_monitor=True, kind=Kind.config)
camInit = Component(EpicsSignalRO, "INIT", auto_monitor=True, kind=Kind.config)
camInitBusy = Component(EpicsSignalRO, "BUSY_INIT", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Throtled image preview
image = Component(EpicsSignalRO, "FPICTURE", kind=Kind.omitted, doc="Throttled image preview")
# ########################################################################
# Misc PVs
# camRemoval = Component(EpicsSignalRO, "REMOVAL", auto_monitor=True, kind=Kind.config)
camStateString = Component(
EpicsSignalRO, "SS_CAMERA", string=True, auto_monitor=True, kind=Kind.config
)
@property
def state(self) -> str:
"""Single word camera state"""
if self.camSetParamBusy.value:
return "BUSY"
if self.camStatusCode.value == 2 and self.camInit.value == 1:
return "IDLE"
if self.camStatusCode.value == 6 and self.camInit.value == 1:
return "RUNNING"
# if self.camRemoval.value==0 and self.camInit.value==0:
if self.camInit.value == 0:
return "OFFLINE"
# if self.camRemoval.value:
# return "REMOVED"
return "UNKNOWN"
@state.setter
def state(self):
raise RuntimeError("State is a ReadOnly property")
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
std_daq_rest: str | None = None,
std_daq_ws: str | None = None,
std_daq_live: str | None = None,
**kwargs,
):
# super() will call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
# Configure the stdDAQ client
if std_daq_rest is None or std_daq_ws is None:
# raise ValueError("Both std_daq_rest and std_daq_ws must be provided")
logger.error("No stdDAQ address provided, launching without data backend!")
else:
self.backend = StdDaqClient(parent=self, ws_url=std_daq_ws, rest_url=std_daq_rest)
# Configure image preview
if std_daq_live is not None:
self.live_preview = StdDaqPreview(url=std_daq_live, cb=self._on_preview_update)
else:
logger.error("No stdDAQ stream address provided, launching without preview!")
def configure(self, d: dict = {}) -> tuple:
"""Configure the base Helge camera device
@@ -312,6 +140,10 @@ class HelgeCameraBase(BECDeviceBase):
self.acquire_delay.set(d["exposure_period_ms"]).wait()
if "exposure_period_ms" in d:
self.acquire_delay.set(d["exposure_period_ms"]).wait()
if "image_width" in d:
self.array_size.array_size_x.set(d["image_width"]).wait()
if "image_height" in d:
self.array_size.array_size_y.set(d["image_height"]).wait()
if "store_mode" in d:
self.bufferStoreMode.set(d["store_mode"]).wait()
if "data_format" in d:
@@ -324,13 +156,13 @@ class HelgeCameraBase(BECDeviceBase):
# 2. BUSY goes low, SET goes high
# 3. BUSY stays low, SET goes low
# So we need a 'negedge' on SET_PARAM
self.camSetParam.set(1).wait()
def negedge(*, old_value, value, timestamp, **_):
return bool(old_value and not value)
# Subscribe and wait for update
status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5)
status = SubscriptionStatus(self.set_param, negedge, timeout=5, settle_time=0.5)
self.set_param.set(1).wait()
status.wait()
def arm(self):
@@ -349,129 +181,220 @@ class HelgeCameraBase(BECDeviceBase):
)
# Start the acquisition (this sets parameers and starts acquisition)
self.camStatusCmd.set("Running").wait()
self.acquire.set("Running").wait()
# Subscribe and wait for update
def is_running(*, value, timestamp, **_):
return bool(value == 6)
status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2)
status = SubscriptionStatus(self.camera_statuscode, is_running, timeout=5, settle_time=0.2)
status.wait()
def disarm(self):
"""Bluesky style unstage: stop the detector"""
self.camStatusCmd.set("Idle").wait()
self.acquire.set("Idle").wait()
# Data streaming is stopped by setting the max index to 0
# FIXME: This might interrupt data transfer
self.file_savestop.set(0).wait()
def launch(self):
def destroy(self):
logger.warning("Destroy called")
if self.backend is not None:
self.backend.shutdown()
super().destroy()
def _on_preview_update(self, img: np.ndarray, header: dict):
"""Send preview stream and update frame index counter"""
self.num_images_counter.put(header["frame"], force=True)
self._run_subs(sub_type=self.SUB_DEVICE_MONITOR_2D, obj=self, value=img)
def acq_done(self) -> DeviceStatus:
"""
Check if the acquisition is done. For the GigaFrost camera, this is
done by checking the status of the backend as the camera does not
provide any feedback about its internal state.
Returns:
DeviceStatus: The status of the acquisition
"""
status = DeviceStatus(self)
if self.backend is not None:
self.backend.add_status_callback(
status,
success=[StdDaqStatus.IDLE, StdDaqStatus.FILE_SAVED],
error=[StdDaqStatus.REJECTED, StdDaqStatus.ERROR],
)
return status
########################################
# Beamline Specific Implementations #
########################################
# pylint: disable=protected-access
def on_stage(self) -> None:
"""Configure and arm PCO.Edge camera for acquisition"""
# PCO can finish a run without explicit unstaging
if self.state not in ("IDLE"):
logger.warning(
f"Trying to stage the camera from state {self.state}, unstaging it first!"
)
self.unstage()
time.sleep(0.5)
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scan_args = {
**self.scan_info.msg.request_inputs["inputs"],
**self.scan_info.msg.request_inputs["kwargs"],
**self.scan_info.msg.scan_parameters,
}
d = {}
if "exp_burst" in scan_args and scan_args["exp_burst"] is not None:
d["exposure_num_burst"] = scan_args["exp_burst"]
if "image_width" in scan_args and scan_args["image_width"] is not None:
d["image_width"] = scan_args["image_width"]
if "image_height" in scan_args and scan_args["image_height"] is not None:
d["image_height"] = scan_args["image_height"]
if "exp_time" in scan_args and scan_args["exp_time"] is not None:
d["exposure_time_ms"] = scan_args["exp_time"]
if "exp_period" in scan_args and scan_args["exp_period"] is not None:
d["exposure_period_ms"] = scan_args["exp_period"]
# if 'exp_burst' in scan_args and scan_args['exp_burst'] is not None:
# d['exposure_num_burst'] = scan_args['exp_burst']
# if 'acq_mode' in scan_args and scan_args['acq_mode'] is not None:
# d['acq_mode'] = scan_args['acq_mode']
# elif self.scaninfo.scan_type == "step":
# d['acq_mode'] = "default"
if "pco_store_mode" in scan_args and scan_args["pco_store_mode"] is not None:
d["store_mode"] = scan_args["pco_store_mode"]
if "pco_data_format" in scan_args and scan_args["pco_data_format"] is not None:
d["data_format"] = scan_args["pco_data_format"]
# Perform bluesky-style configuration
if d:
logger.warning(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# stdDAQ backend parameters
num_points = (
1
* scan_args.get("steps", 1)
* scan_args.get("exp_burst", 1)
* scan_args.get("repeats", 1)
* scan_args.get("burst_at_each_point", 1)
)
self.num_images.set(num_points).wait()
if "daq_file_path" in scan_args and scan_args["daq_file_path"] is not None:
self.file_path.set(scan_args["daq_file_path"]).wait()
if "daq_file_prefix" in scan_args and scan_args["daq_file_prefix"] is not None:
self.file_prefix.set(scan_args["daq_file_prefix"]).wait()
if "daq_num_images" in scan_args and scan_args["daq_num_images"] is not None:
self.num_images.set(scan_args["daq_num_images"]).wait()
# Start stdDAQ preview
if self.live_preview is not None:
self.live_preview.start()
def on_unstage(self) -> None:
"""Disarm the PCO.Edge camera"""
self.disarm()
if self.backend is not None:
logger.info(f"StdDaq status before unstage: {self.backend.status}")
self.backend.stop()
def on_pre_scan(self) -> DeviceStatus | None:
"""Called right before the scan starts on all devices automatically."""
# First start the stdDAQ
if self.backend is not None:
self.backend.start(
file_path=self.file_path.get(),
file_prefix=self.file_prefix.get(),
num_images=self.num_images.get(),
)
# Then start the camera
self.arm()
def on_trigger(self) -> None | DeviceStatus:
"""Trigger mode operation
Use it to repeatedly record a fixed number of frames and send it to stdDAQ. The method waits
for the acquisition and data transfer to complete.
NOTE: Maciej confirmed that sparse data is no problem to the stdDAQ.
TODO: Optimize data transfer to launch at end and check completion at the beginning.
"""
# Ensure that previous data transfer finished
# def sentIt(*args, value, timestamp, **kwargs):
# return value==0
# status = SubscriptionStatus(self.file_savebusy, sentIt, timeout=120)
# status.wait()
# Not sure if it always sends the first batch of images or the newest
def wait_bufferreset(*, old_value, value, timestamp, **_):
return (value < old_value) or (value == 0)
self.buffer_clear.set(1).wait()
status = SubscriptionStatus(self.buffer_used, wait_bufferreset, timeout=5)
status.wait()
t_expected = (self.acquire_time.get() + self.acquire_delay.get()) * self.file_savestop.get()
# Wait until the buffer fills up with enough images
def wait_acquisition(*, value, timestamp, **_):
num_target = self.file_savestop.get()
# logger.warning(f"{value} of {num_target}")
return bool(value >= num_target)
max_wait = max(5, 5 * t_expected)
status = SubscriptionStatus(
self.buffer_used, wait_acquisition, timeout=max_wait, settle_time=0.2
)
status.wait()
# Then start file transfer (need to get the save busy flag update)
# self.file_transfer.set(1, settle_time=0.2).wait()
self.file_transfer.set(1).wait()
# And wait until the images have been sent
# NOTE: this does not wait for new value, the first check will be
# against values from the previous cycle, i.e. pass automatically.
t_start = time.time()
def wait_sending(*, old_value, value, timestamp, **kwargs):
t_elapsed = timestamp - t_start
# logger.warning(f"{old_value}\t{value}\t{t_elapsed}")
return old_value == 1 and value == 0 and t_elapsed > 0
status = SubscriptionStatus(self.file_savebusy, wait_sending, timeout=120, settle_time=0.2)
status.wait()
def on_complete(self) -> DeviceStatus | None:
"""Called to inquire if a device has completed a scans."""
# return self.acq_done()
return None
def on_kickoff(self) -> DeviceStatus | None:
"""Start data transfer
TODO: Need to revisit this once triggering is complete
"""
self.file_transfer.set(1).wait()
class PcoEdge5M(HelgeCameraBase):
"""Ophyd baseclass for PCO.Edge cameras
This class provides wrappers for Helge's camera IOCs around SwissFEL and
for high performance SLS 2.0 cameras. Theese are mostly PCO cameras running
on a special Windows IOC host with lots of RAM and CPU power.
"""
custom_prepare_cls = PcoEdgeCameraMixin
USER_ACCESS = ["arm", "disarm", "launch"]
# ########################################################################
# Additional status info
busy = Component(EpicsSignalRO, "BUSY", auto_monitor=True, kind=Kind.config)
camState = Component(EpicsSignalRO, "SS_CAMERA", auto_monitor=True, kind=Kind.config)
camProgress = Component(EpicsSignalRO, "CAMPROGRESS", auto_monitor=True, kind=Kind.config)
camRate = Component(EpicsSignalRO, "CAMRATE", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Acquisition configuration
acqMode = Component(EpicsSignalRO, "ACQMODE", auto_monitor=True, kind=Kind.config)
acqDelay = Component(EpicsSignalRO, "DELAY", auto_monitor=True, kind=Kind.config)
acqTriggerEna = Component(EpicsSignalRO, "TRIGGER", auto_monitor=True, kind=Kind.config)
# acqTriggerSource = Component(
# EpicsSignalRO, "TRIGGERSOURCE", auto_monitor=True, kind=Kind.config)
# acqTriggerEdge = Component(EpicsSignalRO, "TRIGGEREDGE", auto_monitor=True, kind=Kind.config)
# ########################################################################
# Image size settings
# Priority is: binning -> roi -> final size
pxRoiX_lo = Component(
EpicsSignal, "REGIONX_START", put_complete=True, auto_monitor=True, kind=Kind.config
)
pxRoiX_hi = Component(
EpicsSignal, "REGIONX_END", put_complete=True, auto_monitor=True, kind=Kind.config
)
pxRoiY_lo = Component(
EpicsSignal, "REGIONY_START", put_complete=True, auto_monitor=True, kind=Kind.config
)
pxRoiY_hi = Component(
EpicsSignal, "REGIONY_END", put_complete=True, auto_monitor=True, kind=Kind.config
)
def configure(self, d: dict = {}) -> tuple:
"""
Camera configuration instructions:
After setting the corresponding PVs, one needs to process SET_PARAM and wait until
BUSY_SET_PARAM goes high and low, followed by SET_PARAM goes high and low. This will
both send the settings to the camera and allocate the necessary buffers in the correct
size and shape (that takes time). Starting the exposure with CAMERASTATUS will also
call SET_PARAM, but it might take long.
NOTE:
The camera IOC will automatically round up RoiX coordinates to the
next multiple of 160. This means that configure can only change image
width in steps of 320 pixels (or manually of 160). Roi
Parameters as 'd' dictionary
----------------------------
exposure_time_ms : float, optional
Exposure time [ms].
exposure_period_ms : float, optional
Exposure period [ms], ignored in soft trigger mode.
image_width : int, optional
ROI size in the x-direction, multiple of 320 [pixels]
image_height : int, optional
ROI size in the y-direction, multiple of 2 [pixels]
image_binx : int optional
Binning along image width [pixels]
image_biny: int, optional
Binning along image height [pixels]
acq_mode : str, not yet implemented
Select one of the pre-configured trigger behavior
"""
if d is not None:
# Need to be smart how we set the ROI....
# Image sensor is 2560x2160 (X and Y)
# Values are rounded to multiples of 16
if "image_width" in d and d["image_width"] is not None:
width = d["image_width"]
self.pxRoiX_lo.set(2560 / 2 - width / 2).wait()
self.pxRoiX_hi.set(2560 / 2 + width / 2).wait()
if "image_height" in d and d["image_height"] is not None:
height = d["image_height"]
self.pxRoiY_lo.set(2160 / 2 - height / 2).wait()
self.pxRoiY_hi.set(2160 / 2 + height / 2).wait()
if "image_binx" in d and d["image_binx"] is not None:
self.bin_x.set(d["image_binx"]).wait()
if "image_biny" in d and d["image_biny"] is not None:
self.bin_y.set(d["image_biny"]).wait()
# Call super() to commit the changes
super().configure(d)
def on_stop(self) -> None:
"""Called when the device is stopped."""
return self.on_unstage()
# Automatically connect to test camera if directly invoked
if __name__ == "__main__":
# Drive data collection
cam = PcoEdge5M(prefix="X02DA-CCDCAM2:", name="mcpcam")
cam = PcoEdge5M(
"X02DA-CCDCAM2:",
name="mcpcam",
std_daq_ws="ws://129.129.95.111:8081",
std_daq_rest="http://129.129.95.111:5010",
std_daq_live="tcp://129.129.95.111:20010",
)
cam.wait_for_connection()

View File

@@ -0,0 +1,362 @@
from __future__ import annotations
import copy
import enum
import json
import threading
import time
import traceback
from typing import TYPE_CHECKING
import requests
from bec_lib.logger import bec_logger
from ophyd import StatusBase
from typeguard import typechecked
from websockets import State
from websockets.exceptions import WebSocketException
import websockets.sync.client as ws
if TYPE_CHECKING: # pragma: no cover
from ophyd import Device, DeviceStatus
logger = bec_logger.logger
class StdDaqError(Exception): ...
class StdDaqStatus(str, enum.Enum):
"""
Status of the StdDAQ.
Extracted from https://git.psi.ch/controls-ci/std_detector_buffer/-/blob/master/source/std-det-driver/src/driver_state.hpp
"""
CREATING_FILE = "creating_file"
ERROR = "error"
FILE_CREATED = "file_created"
FILE_SAVED = "file_saved"
IDLE = "idle"
RECORDING = "recording"
REJECTED = "rejected"
SAVING_FILE = "saving_file"
STARTED = "started"
STOP = "stop"
UNDEFINED = "undefined"
WAITING_FOR_FIRST_IMAGE = "waiting_for_first_image"
class StdDaqClient:
USER_ACCESS = ["status", "start", "stop", "get_config", "set_config", "reset", "_status"]
_ws_client: ws.ClientConnection | None = None
_status: StdDaqStatus = StdDaqStatus.UNDEFINED
_status_timestamp: float | None = None
_ws_recv_mutex = threading.Lock()
_ws_monitor_thread: threading.Thread | None = None
_shutdown_event = threading.Event()
_ws_idle_event = threading.Event()
_daq_is_running = threading.Event()
_config: dict | None = None
_status_callbacks: dict[str, tuple[DeviceStatus, list[StdDaqStatus], list[StdDaqStatus]]] = {}
def __init__(self, parent: Device, ws_url: str, rest_url: str):
self.parent = parent
self.ws_url = ws_url
self.rest_url = rest_url
self._daq_is_running.set()
# Connect to WS interface and start status monitoring
self.wait_for_connection()
self._ws_monitor_thread = threading.Thread(
target=self._ws_monitor_loop, name=f"{self.parent.name}_stddaq_ws_monitor", daemon=True
)
self._ws_monitor_thread.start()
@property
def status(self) -> StdDaqStatus:
"""
Get the status of the StdDAQ.
"""
return self._status
def add_status_callback(
self, status: DeviceStatus, success: list[StdDaqStatus], error: list[StdDaqStatus]
):
"""
Add a DeviceStatus callback for the StdDAQ. The status will be updated when the StdDAQ status changes and
set to finished when the status matches one of the specified success statuses and to exception when the status
matches one of the specified error statuses.
Args:
status (DeviceStatus): DeviceStatus object
success (list[StdDaqStatus]): list of statuses that indicate success
error (list[StdDaqStatus]): list of statuses that indicate error
"""
self._status_callbacks[id(status)] = (status, success, error)
@typechecked
def start(
self, file_path: str, file_prefix: str, num_images: int, timeout: float = 20, wait=True
) -> StatusBase | None:
"""Start acquisition on the StdDAQ.
Args:
file_path (str): path to save the files
file_prefix (str): prefix of the files
num_images (int): number of images to acquire
timeout (float): timeout for the request
Returns:
status (StatusBase): Ophyd status object with attached monitor
"""
# Ensure connection
self.wait_for_connection()
logger.info(f"Starting StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["waiting_for_first_image"], error=["rejected"])
message = {
"command": "start",
"path": file_path,
"file_prefix": file_prefix,
"n_image": num_images,
}
self._ws_client.send(json.dumps(message))
if wait:
status.wait(timeout=timeout)
return None
return status
@typechecked
def stop(self, timeout: float = 5, wait=True, stop_cmd="stop") -> StatusBase | None:
"""Stop acquisition on the StdDAQ.
Args:
timeout (float): timeout for the request
Returns:
status (StatusBase): Ophyd status object with attached monitor
"""
# Ensure connection
self.wait_for_connection()
logger.info(f"Stopping StdDaq backend. Current status: {self.status}")
status = StatusBase()
self.add_status_callback(status, success=["idle"], error=["error"])
message = {"command": stop_cmd}
self._ws_client.send(json.dumps(message))
if wait:
status.wait(timeout=timeout)
return None
return status
def get_config(self, timeout: float = 2) -> dict:
"""Get the current configuration of the StdDAQ.
Args:
timeout (float): timeout for the request
Returns:
config (dict): configuration of the StdDAQ
"""
response = requests.get(
self.rest_url + "/api/config/get", params={"user": "ioc"}, timeout=timeout
)
response.raise_for_status()
self._config = response.json()
return self._config
def set_config(self, config: dict, timeout: float = 2, update: bool = True, force: bool=True) -> None:
"""
Set the configuration of the StdDAQ. This will overwrite the current configuration.
Args:
config (StdDaqConfig | dict): configuration to set
timeout (float): timeout for the request
"""
old_config = self.get_config()
if update:
cfg = copy.deepcopy(self._config)
cfg.update(config)
new_config = cfg
else:
new_config = config
# Escape unnecesary restarts
if not force and new_config == old_config:
return
if not new_config:
return
self._pre_restart()
# new_jason = json.dumps(new_config)
logger.warning(new_config)
response = requests.post(
self.rest_url + "/api/config/set", params={"user": "ioc"}, json=new_config, timeout=timeout
)
response.raise_for_status()
# Setting a new config will reboot the backend; we therefore have to restart the websocket
self._post_restart()
def _pre_restart(self):
"""Stop monitor before restart"""
self._daq_is_running.clear()
self._ws_idle_event.wait()
if self._ws_client is not None:
self._ws_client.close()
def _post_restart(self):
"""Start monitor after a restart"""
time.sleep(2)
self.wait_for_connection()
self._daq_is_running.set()
def reset(self, min_wait: float = 5) -> None:
"""
Reset the StdDAQ.
Args:
min_wait (float): minimum wait time after reset
"""
self.set_config(self.get_config())
time.sleep(min_wait)
def wait_for_connection(self, timeout: float = 20) -> None:
"""
Wait for the connection to the StdDAQ to be established.
Args:
timeout (float): timeout for the request
"""
start_time = time.time()
while True:
if self._ws_client is not None and self._ws_client.state == State.OPEN:
return
try:
self._ws_client = ws.connect(self.ws_url)
break
except ConnectionRefusedError as exc:
if time.time() - start_time > timeout:
raise TimeoutError("Timeout while waiting for connection to StdDAQ") from exc
time.sleep(2)
def create_virtual_datasets(self, file_path: str, file_prefix: str, timeout: float = 5) -> None:
"""
Combine the stddaq written files in a given folder in an interleaved
h5 virtual dataset.
Args:
file_path (str): path to the folder containing the files
file_prefix (str): prefix of the files to combine
timeout (float): timeout for the request
"""
# TODO: Add wait for 'idle' state
response = requests.post(
self.rest_url + "/api/h5/create_interleaved_vds",
params={"user": "ioc"},
json={
"base_path": file_path,
"file_prefix": file_prefix,
"output_file": file_prefix.rstrip("_") + ".h5",
},
timeout=timeout,
headers={"Content-type": "application/json"},
)
response.raise_for_status()
def shutdown(self):
"""
Shutdown the StdDAQ client.
"""
logger.warning("Shutting down sdtDAQ monitor")
self._shutdown_event.set()
if self._ws_monitor_thread is not None:
self._ws_monitor_thread.join()
logger.warning("Thread joined")
if self._ws_client is not None:
self._ws_client.close()
self._ws_client = None
logger.warning("Shutdown complete")
def _wait_for_server_running(self):
"""
Wait for the StdDAQ to be running. If the StdDaq is not running, the
websocket loop will be set to idle.
"""
while not self._shutdown_event.is_set():
if self._daq_is_running.wait(0.1):
self._ws_idle_event.clear()
break
self._ws_idle_event.set()
def _ws_monitor_loop(self):
"""Loop to update the status property of the StdDAQ.
This is a persistent monitor that updates the status and calls attached
callbacks. It also handles stdDAQ restarts and reconnection by itself.
"""
if self._ws_recv_mutex.locked():
logger.warning("stdDAQ WS monitor loop already locked")
return
with self._ws_recv_mutex:
while not self._shutdown_event.is_set():
self._wait_for_server_running()
try:
msg = self._ws_client.recv(timeout=0.1)
msg_timestamp = time.time()
except TimeoutError:
continue
except WebSocketException:
content = traceback.format_exc()
# TODO: this is expected to happen on every reconfiguration
logger.warning(f"Websocket connection closed unexpectedly: {content}")
self.wait_for_connection()
continue
msg = json.loads(msg)
if self._status != msg["status"]:
logger.info(f"stdDAQ state transition by: {msg}")
self._status = msg["status"]
self._status_timestamp = msg_timestamp
self._run_status_callbacks()
def _run_status_callbacks(self):
"""
Update the DeviceStatus objects based on the current status of the StdDAQ.
If the status matches one of the success or error statuses, the DeviceStatus
object will be set to finished or exception, respectively and removed from
the list of callbacks.
"""
status = self._status
completed_callbacks = []
for dev_status, success, error in self._status_callbacks.values():
if status in success:
dev_status.set_finished()
logger.info(f"StdDaq status is {status}")
completed_callbacks.append(dev_status)
elif status in error:
logger.warning(f"StdDaq status is {status}")
dev_status.set_exception(StdDaqError(f"StdDaq status is {status}"))
completed_callbacks.append(dev_status)
for cb in completed_callbacks:
self._status_callbacks.pop(id(cb))
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
# pylint: disable=disallowed-name,too-few-public-methods
class foo:
"""Dummy"""
name="bar"
daq = StdDaqClient(
parent=foo(), ws_url='ws://129.129.95.111:8080', rest_url='http://129.129.95.111:5000'
)

View File

@@ -0,0 +1,128 @@
import json
import threading
import time
from typing import Callable
import traceback
import numpy as np
import zmq
from bec_lib.logger import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
class StdDaqPreview:
USER_ACCESS = ["start", "stop", "image", "frameno"]
_socket = None
_zmq_thread = None
_monitor_mutex = threading.Lock()
_shutdown_event = threading.Event()
_throttle = 0.2
image = None
frameno = None
def __init__(self, url: str, cb: Callable):
self.url = url
self._on_update_callback = cb
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url)
except ConnectionRefusedError:
time.sleep(1)
self._socket.connect(self.url)
def start(self):
# Only one consumer thread
if self._zmq_thread:
self.stop()
self._shutdown_event.clear()
self._zmq_thread = threading.Thread(
target=self._zmq_monitor, daemon=True, name="StdDaq_live_preview"
)
self._zmq_thread.start()
def stop(self):
self._shutdown_event.set()
if self._zmq_thread:
self._zmq_thread.join()
self._zmq_thread = None
def _zmq_monitor(self):
"""ZMQ stream monitor"""
# Exit if another monitor is running
if self._monitor_mutex.locked():
return
with self._monitor_mutex:
# Open a new connection
self.connect()
try:
# Run the monitor loop
t_last = time.time()
while not self._shutdown_event.is_set():
try:
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
# Throttle parsing and callbacks
t_curr = time.time()
if t_curr - t_last > self._throttle:
self._parse_data(r)
t_last = t_curr
except ValueError:
# Happens when ZMQ partially delivers the multipart message
content = traceback.format_exc()
logger.warning(f"Websocket connection closed unexpectedly: {content}")
continue
except zmq.error.Again:
# Happens when receive queue is empty
time.sleep(0.1)
finally:
# Stop receiving incoming data
self._socket.close()
logger.warning("Detached live_preview monitoring")
def _parse_data(self, data):
# Length and throtling checks
if len(data) != 2:
logger.warning(f"Received incomplete ZMQ message of length {len(data)}")
# Unpack the Array V1 reply to metadata and array data
meta, img_data = data
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(img_data, dtype=np.uint16)
elif header["type"] == "uint8":
image = np.frombuffer(img_data, dtype=np.uint8)
else:
raise ValueError(f"Unexpected type {header['type']}")
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
# Print diadnostics and run callback
logger.info(
f"Live update: frame {header['frame']}\tShape: {header['shape']}\t"
f"Mean: {np.mean(image):.3f}"
)
self.image = image
self.frameno = header['frame']
self._on_update_callback(image, header)

View File

@@ -1,3 +1,2 @@
from .tutorial_fly_scan import AcquireDark, AcquireWhite, AcquireRefs, AcquireProjections, TutorialFlyScanContLine
from .tomcat_scans import TomcatSnapNStep, TomcatSimpleSequence
from .advanced_scans import AcquireRefsV2, AcquireDarkV2, AcquireWhiteV2

View File

@@ -1,11 +1,10 @@
import time
import numpy as np
from bec_lib import bec_logger
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import Acquire, AsyncFlyScanBase
from bec_lib import bec_logger
logger = bec_logger.logger
@@ -14,7 +13,7 @@ class AcquireDark(Acquire):
required_kwargs = ["exp_burst"]
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, exp_burst: int, **kwargs):
def __init__(self, exp_burst: int, exp_time: float = 0, **kwargs):
"""
Acquire dark images. This scan is used to acquire dark images. Dark images are images taken with the shutter
closed and no beam on the sample. Dark images are used to correct the data images for dark current.
@@ -47,16 +46,17 @@ class AcquireDark(Acquire):
>>> scans.acquire_dark(5)
"""
super().__init__(**kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.exp_time = exp_time / 1000 # In BEC, the exp time is always in s, not ms.
super().__init__(self.exp_time, **kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.scan_motors = ["eyex"] # change to the correct shutter device
#self.shutter = "eyex" # change to the correct shutter device
self.dark_shutter_pos = 0 ### change with a variable
# self.shutter = "eyex" # change to the correct shutter device
self.dark_shutter_pos = 0 ### change with a variable
def scan_core(self):
# close the shutter
yield from self._move_scan_motors_and_wait(self.dark_shutter_pos)
#yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0])
# yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0])
yield from super().scan_core()
@@ -64,7 +64,15 @@ class AcquireWhite(Acquire):
scan_name = "acquire_white"
gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, exp_burst: int, sample_position_out: float, sample_angle_out: float, motor: DeviceBase, **kwargs):
def __init__(
self,
exp_burst: int,
sample_position_out: float,
sample_angle_out: float,
motor: DeviceBase,
exp_time: float = 0,
**kwargs,
):
"""
Acquire flat field images. This scan is used to acquire flat field images. The flat field image is an image taken
with the shutter open but the sample out of the beam. Flat field images are used to correct the data images for
@@ -103,47 +111,54 @@ class AcquireWhite(Acquire):
>>> scans.acquire_white(5, 20)
"""
super().__init__(**kwargs)
self.exp_time = exp_time / 1000 # In BEC, the exp time is always in s, not ms.
super().__init__(exp_time=exp_time, **kwargs)
self.burst_at_each_point = 1
self.sample_position_out = sample_position_out
self.sample_angle_out = sample_angle_out
self.motor_sample = motor
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
def scan_core(self):
# move the sample stage to the out position and correct angular position
status_sample_out_angle = yield from self.stubs.set(device=[self.motor_sample, "es1_roty"], value=[self.sample_position_out, self.sample_angle_out], wait=False)
status_sample_out_angle = yield from self.stubs.set(
device=[self.motor_sample, "es1_roty"],
value=[self.sample_position_out, self.sample_angle_out],
wait=False,
)
# open the main shutter (TODO change to the correct shutter device)
yield from self.stubs.set(device=["eyex"], value=[self.dark_shutter_pos_out])
status_sample_out_angle.wait()
# TODO add opening of fast shutter
yield from super().scan_core()
# TODO add closing of fast shutter
yield from self.stubs.set(device=["eyex"], value=[self.dark_shutter_pos_in])
class AcquireProjections(AsyncFlyScanBase):
scan_name = "acquire_projections"
gui_config = {
"Motor": ["motor"],
"Acquisition parameters": ["sample_position_in", "start_angle", "angular_range" ],
"Camera": ["exp_time", "exp_burst"]
"Acquisition parameters": ["sample_position_in", "start_angle", "angular_range"],
"Camera": ["exp_time", "exp_burst"],
}
def __init__(self,
motor: DeviceBase,
exp_burst: int,
sample_position_in: float,
start_angle: float,
angular_range: float,
exp_time:float,
**kwargs):
def __init__(
self,
motor: DeviceBase,
exp_burst: int,
sample_position_in: float,
start_angle: float,
angular_range: float,
exp_time: float,
**kwargs,
):
"""
Acquire projection images.
Acquire projection images.
Args:
motor : DeviceBase
@@ -181,28 +196,30 @@ class AcquireProjections(AsyncFlyScanBase):
"""
self.motor = motor
super().__init__(exp_time=exp_time,**kwargs)
super().__init__(exp_time=exp_time, **kwargs)
self.burst_at_each_point = 1
self.sample_position_in = sample_position_in
self.start_angle = start_angle
self.angular_range = angular_range
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
self.dark_shutter_pos_out = 1 ### change with a variable
self.dark_shutter_pos_in = 0 ### change with a variable
def update_scan_motors(self):
return [self.motor]
def prepare_positions(self):
self.positions = np.array([[self.start_angle], [self.start_angle+self.angular_range]])
self.positions = np.array([[self.start_angle], [self.start_angle + self.angular_range]])
self.num_pos = None
yield from self._set_position_offset()
def scan_core(self):
# move to in position and go to start angular position
yield from self.stubs.set(device=["eyez", self.motor], value=[self.sample_position_in, self.positions[0][0]])
yield from self.stubs.set(
device=["eyez", self.motor], value=[self.sample_position_in, self.positions[0][0]]
)
# open the shutter
yield from self.stubs.set(device="eyex", value=self.dark_shutter_pos_out)
@@ -215,15 +232,13 @@ class AcquireProjections(AsyncFlyScanBase):
self.connector.send_client_info(
"Starting the scan", show_asap=True, rid=self.metadata.get("RID")
)
)
yield from self.stubs.trigger()
while not flyer_request.done:
yield from self.stubs.read(
group="monitored", point_id=self.point_id
)
yield from self.stubs.read(group="monitored", point_id=self.point_id)
time.sleep(1)
# increase the point id
@@ -244,13 +259,13 @@ class AcquireRefs(Acquire):
sample_angle_out: float = 0,
sample_position_in: float = 0,
sample_position_out: float = 1,
file_prefix_dark: str = 'tmp_dark',
file_prefix_white: str = 'tmp_white',
**kwargs
file_prefix_dark: str = "tmp_dark",
file_prefix_white: str = "tmp_white",
**kwargs,
):
"""
Acquire reference images (darks + whites) and return to beam position.
Reference images are acquired automatically in an optimized sequence and
the sample is returned to the sample_in_position afterwards.
@@ -268,16 +283,16 @@ class AcquireRefs(Acquire):
sample_position_out : float ,optional
Sample stage X position for sample out of the beam [um]
exp_time : float, optional
Exposure time [ms]. If not specified, the currently configured value
Exposure time [ms]. If not specified, the currently configured value
on the camera will be used
exp_period : float, optional
Exposure period [ms]. If not specified, the currently configured value
on the camera will be used
image_width : int, optional
ROI size in the x-direction [pixels]. If not specified, the currently
ROI size in the x-direction [pixels]. If not specified, the currently
configured value on the camera will be used
image_height : int, optional
ROI size in the y-direction [pixels]. If not specified, the currently
ROI size in the y-direction [pixels]. If not specified, the currently
configured value on the camera will be used
acq_mode : str, optional
Predefined acquisition mode (default= 'default')
@@ -300,11 +315,20 @@ class AcquireRefs(Acquire):
self.num_flats = num_flats
self.file_prefix_dark = file_prefix_dark
self.file_prefix_white = file_prefix_white
self.scan_parameters["std_daq_params"] = {"reconnect": False}
def scan_core(self):
status_sample_out_angle = yield from self.stubs.set(device=[self.motor, "es1_roty"], value=[self.sample_position_out, self.sample_angle_out], wait=False)
status_sample_out_angle = yield from self.stubs.set(
device=[self.motor, "es1_roty"],
value=[self.sample_position_out, self.sample_angle_out],
wait=False,
)
cameras = [
cam.name
for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
if self.num_darks:
self.connector.send_client_info(
@@ -314,13 +338,6 @@ class AcquireRefs(Acquire):
)
# to set signals on a device
cameras = [cam.name for cam in self.device_manager.devices.get_devices_with_tags("camera") if cam.enabled]
for cam in cameras:
yield from self.stubs.send_rpc_and_wait(cam, "file_prefix.set", f"{self.file_prefix}_{cam}_dark")
yield from self.stubs.send_rpc_and_wait(cam, "num_images.set", self.num_darks)
yield from self.stubs.send_rpc_and_wait(cam, "bluestage") # rename to arm
darks = AcquireDark(
exp_burst=self.num_darks,
# file_prefix=self.file_prefix_dark,
@@ -329,12 +346,17 @@ class AcquireRefs(Acquire):
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
yield from darks.scan_core()
yield from self.stubs.send_rpc_and_wait("gfdaq", "complete")
yield from self.stubs.send_rpc_and_wait("gfdaq", "unstage")
# reconfigure the cameras to write to a different file
for cam in cameras:
yield from self.stubs.send_rpc_and_wait(cam, "configure", **darks.scan_parameters)
yield from darks.pre_scan() # prepare for the upcoming scan
yield from darks.scan_core() # do the scan
yield from darks.finalize() # wait for everything to finish
self.point_id = darks.point_id
status_sample_out_angle.wait()
if self.num_flats:
self.connector.send_client_info(
@@ -342,36 +364,27 @@ class AcquireRefs(Acquire):
show_asap=True,
rid=self.metadata.get("RID"),
)
yield from self.stubs.send_rpc_and_wait("gfdaq", "file_prefix.set", self.file_prefix_white)
yield from self.stubs.send_rpc_and_wait("gfdaq", "num_images.set", self.num_flats)
yield from self.stubs.send_rpc_and_wait("gfdaq", "bluestage")
logger.warning("Calling AcquireWhite")
flats = AcquireWhite(
exp_burst=self.num_flats,
#sample_position_out=self.sample_position_out,
#sample_angle_out=self.sample_angle_out,
#motor=self.motor,
# sample_position_out=self.sample_position_out,
# sample_angle_out=self.sample_angle_out,
# motor=self.motor,
device_manager=self.device_manager,
metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler,
**self.caller_kwargs,
)
flats.point_id = self.point_id
yield from flats.scan_core()
logger.warning("Unstaging the DAQ")
# reconfigure the cameras to write to a different file
for cam in cameras:
yield from self.stubs.send_rpc_and_wait(cam, "configure", **flats.scan_parameters)
yield from self.stubs.send_rpc_and_wait("gfdaq", "complete")
yield from self.stubs.send_rpc_and_wait("gfdaq", "unstage")
logger.warning("Completing the DAQ")
logger.warning("Finished the DAQ")
yield from flats.pre_scan() # prepare for the upcoming scan
yield from flats.scan_core() # do the scan
yield from flats.finalize() # wait for everything to finish
self.point_id = flats.point_id
## TODO move sample in beam and do not wait