fix: move lamni fly scan to dedicated flyer device #192
@@ -271,6 +271,16 @@ rty:
|
||||
enabled: true
|
||||
readOnly: False
|
||||
|
||||
rt_positions:
|
||||
deviceClass: csaxs_bec.devices.omny.rt.rt_lamni_ophyd.RtLamniFlyer
|
||||
deviceConfig:
|
||||
host: mpc2680.psi.ch
|
||||
port: 3333
|
||||
readoutPriority: async
|
||||
connectionTimeout: 20
|
||||
enabled: true
|
||||
readOnly: False
|
||||
|
||||
############################################################
|
||||
######################### Cameras ##########################
|
||||
############################################################
|
||||
|
||||
@@ -101,7 +101,17 @@ rtz:
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
connectionTimeout: 20
|
||||
|
||||
|
||||
rt_positions:
|
||||
deviceClass: csaxs_bec.devices.omny.rt.rt_omny_ophyd.RtOMNYFlyer
|
||||
deviceConfig:
|
||||
host: mpc3217.psi.ch
|
||||
port: 3333
|
||||
readoutPriority: async
|
||||
connectionTimeout: 20
|
||||
enabled: true
|
||||
readOnly: False
|
||||
|
||||
# ############################################################
|
||||
# ##################### OMNY samples #########################
|
||||
# ############################################################
|
||||
|
||||
@@ -783,7 +783,6 @@ class RtFlomniFlyer(Device):
|
||||
)
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
self.readout_thread = None
|
||||
self.scan_done_event = threading.Event()
|
||||
self.scan_done_event.set()
|
||||
@@ -800,7 +799,6 @@ class RtFlomniFlyer(Device):
|
||||
read_counter = 0
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
|
||||
@@ -3,12 +3,12 @@ import threading
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, PositionerBase, Signal
|
||||
from ophyd import Device, DeviceStatus, PositionerBase, Signal
|
||||
from ophyd.status import wait as status_wait
|
||||
from ophyd.utils import LimitError, ReadOnlyError
|
||||
from ophyd_devices.utils.bec_signals import AsyncMultiSignal, ProgressSignal
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
|
||||
from prettytable import PrettyTable
|
||||
@@ -186,7 +186,8 @@ class RtLamniController(Controller):
|
||||
def send_positions(parent, positions):
|
||||
parent._min_scan_buffer_reached = False
|
||||
for pos_index, pos in enumerate(positions):
|
||||
parent.socket_put_and_receive(f"s{pos[0]:.05f},{pos[1]:.05f},0")
|
||||
cmd = f"s{pos[0]:.05f},{pos[1]:.05f},0"
|
||||
parent.socket_put_and_receive(cmd)
|
||||
if pos_index > 100:
|
||||
parent._min_scan_buffer_reached = True
|
||||
parent._min_scan_buffer_reached = True
|
||||
@@ -215,7 +216,7 @@ class RtLamniController(Controller):
|
||||
def start_scan(self):
|
||||
# interferometer_feedback_not_running = int((self.socket_put_and_receive("J2")).split(",")[0])
|
||||
# if interferometer_feedback_not_running == 1:
|
||||
if not self.feedback_is_running():
|
||||
if not self.feedback_is_running():
|
||||
logger.error(
|
||||
"Cannot start scan because feedback loop is not running or there is an interferometer error."
|
||||
)
|
||||
@@ -232,51 +233,6 @@ class RtLamniController(Controller):
|
||||
# start a point-by-point scan (for cont scan in flomni it would be "sa")
|
||||
self.socket_put_and_receive("sd")
|
||||
|
||||
def start_readout(self):
|
||||
readout = threading.Thread(target=self.read_positions_from_sampler)
|
||||
readout.start()
|
||||
|
||||
def kickoff(self, metadata):
|
||||
self.readout_metadata = metadata
|
||||
while not self._min_scan_buffer_reached:
|
||||
time.sleep(0.001)
|
||||
self.start_scan()
|
||||
time.sleep(0.1)
|
||||
self.start_readout()
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[5])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[8])
|
||||
self.average_lamni_angle += float(return_table[19])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[3])},
|
||||
"average_x_st_fzp": {"value": float(return_table[4])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[5])},
|
||||
"target_y": {"value": float(return_table[6])},
|
||||
"average_y_st_fzp": {"value": float(return_table[7])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[8])},
|
||||
"average_cap1": {"value": float(return_table[9])},
|
||||
"stdev_cap1": {"value": float(return_table[10])},
|
||||
"average_cap2": {"value": float(return_table[11])},
|
||||
"stdev_cap2": {"value": float(return_table[12])},
|
||||
"average_cap3": {"value": float(return_table[13])},
|
||||
"stdev_cap3": {"value": float(return_table[14])},
|
||||
"average_cap4": {"value": float(return_table[15])},
|
||||
"stdev_cap4": {"value": float(return_table[16])},
|
||||
"average_cap5": {"value": float(return_table[17])},
|
||||
"stdev_cap5": {"value": float(return_table[18])},
|
||||
"average_angle_interf_ST": {"value": float(return_table[19])},
|
||||
"stdev_angle_interf_ST": {"value": float(return_table[20])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
|
||||
}
|
||||
return signals
|
||||
|
||||
def feedback_is_running(self) -> bool:
|
||||
status = int(float((self.socket_put_and_receive("J2")).split(",")[0]))
|
||||
return status == 0 # 0 means running, 1 means error/disabled
|
||||
@@ -315,75 +271,6 @@ class RtLamniController(Controller):
|
||||
|
||||
return
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
|
||||
|
||||
read_counter = 0
|
||||
previous_point_in_scan = 0
|
||||
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
# while scan is running
|
||||
while mode > 0:
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
time.sleep(0.01)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
# logger.info(f"{return_table}")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
# logger.info(f"{return_table}")
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp/number_of_samples_to_read}, y {self.average_stdeviations_y_st_fzp/number_of_samples_to_read}, angle {self.average_lamni_angle/number_of_samples_to_read}."
|
||||
)
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_lamni"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
),
|
||||
)
|
||||
|
||||
def feedback_status_angle_lamni(self) -> bool:
|
||||
return_table = (self.socket_put_and_receive("J7")).split(",")
|
||||
logger.debug(
|
||||
@@ -792,27 +679,214 @@ class RtLamniMotor(Device, PositionerBase):
|
||||
else:
|
||||
raise TypeError(f"Expected value of type int but received {type(val)}")
|
||||
|
||||
def kickoff(self, metadata, **kwargs) -> None:
|
||||
self.controller.kickoff(metadata)
|
||||
|
||||
@property
|
||||
def egu(self):
|
||||
"""The engineering units (EGU) for positions"""
|
||||
return "um"
|
||||
|
||||
# how is this used later?
|
||||
|
||||
def stage(self) -> list[object]:
|
||||
return super().stage()
|
||||
|
||||
def unstage(self) -> list[object]:
|
||||
return super().unstage()
|
||||
|
||||
def stop(self, *, success=False):
|
||||
self.controller.stop_all_axes()
|
||||
return super().stop(success=success)
|
||||
|
||||
|
||||
class RtLamniFlyer(Device):
|
||||
USER_ACCESS = ["controller"]
|
||||
data = Cpt(
|
||||
AsyncMultiSignal,
|
||||
name="data",
|
||||
signals=[
|
||||
"target_x",
|
||||
"average_x_st_fzp",
|
||||
"stdev_x_st_fzp",
|
||||
"target_y",
|
||||
"average_y_st_fzp",
|
||||
"stdev_y_st_fzp",
|
||||
"average_cap1",
|
||||
"stdev_cap1",
|
||||
"average_cap2",
|
||||
"stdev_cap2",
|
||||
"average_cap3",
|
||||
"stdev_cap3",
|
||||
"average_cap4",
|
||||
"stdev_cap4",
|
||||
"average_cap5",
|
||||
"stdev_cap5",
|
||||
"average_angle_interf_ST",
|
||||
"stdev_angle_interf_ST",
|
||||
"average_stdeviations_x_st_fzp",
|
||||
"average_stdeviations_y_st_fzp",
|
||||
"average_lamni_angle",
|
||||
],
|
||||
ndim=1,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
)
|
||||
progress = Cpt(ProgressSignal, doc="ProgressSignal indicating of the device during the scan.")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
host="mpc2680.psi.ch",
|
||||
port=3333,
|
||||
socket_cls=SocketIO,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
|
||||
self.shutdown_event = threading.Event()
|
||||
self.controller = RtLamniController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
)
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
self.readout_thread = None
|
||||
self.scan_done_event = threading.Event()
|
||||
self.scan_done_event.set()
|
||||
|
||||
def stage(self):
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.set()
|
||||
return super().stage()
|
||||
|
||||
def start_readout(self, status: DeviceStatus):
|
||||
self.readout_thread = threading.Thread(
|
||||
target=self.read_positions_from_sampler, args=(status,)
|
||||
)
|
||||
self.readout_thread.start()
|
||||
|
||||
def kickoff(self) -> DeviceStatus:
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.clear()
|
||||
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
|
||||
...
|
||||
self.controller.start_scan()
|
||||
self.shutdown_event.wait(0.1)
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
def complete(self) -> DeviceStatus:
|
||||
"""Wait until the flyer is done."""
|
||||
if self.scan_done_event.is_set():
|
||||
# if the scan_done_event is already set, we can return a finished status immediately
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
status = DeviceStatus(self)
|
||||
self.start_readout(status)
|
||||
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
|
||||
return status
|
||||
|
||||
def stop(self, *, success=False):
|
||||
self.shutdown_event.set()
|
||||
self.scan_done_event.set()
|
||||
if self.readout_thread is not None:
|
||||
self.readout_thread.join()
|
||||
return super().stop(success=success)
|
||||
|
||||
def read_positions_from_sampler(self, status: DeviceStatus):
|
||||
"""
|
||||
Read the positions from the sampler and update the data signal.
|
||||
This function runs in a separate thread and continuously checks the
|
||||
scan status.
|
||||
|
||||
Args:
|
||||
status (DeviceStatus): The status object to update when the readout is complete.
|
||||
"""
|
||||
read_counter = 0
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.average_lamni_angle = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
)
|
||||
|
||||
# while scan is running
|
||||
while mode > 0 and not self.shutdown_event.wait(0.01):
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (
|
||||
self.controller.socket_put_and_receive(f"r{read_counter}")
|
||||
).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(
|
||||
value=read_counter, max_value=number_of_positions_planned, done=False
|
||||
)
|
||||
read_counter = read_counter + 1
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
if self.shutdown_event.wait(0.05):
|
||||
logger.info("Shutdown event set, stopping readout.")
|
||||
# if we are here, the shutdown_event is set. We can exit the readout loop.
|
||||
status.set_finished()
|
||||
return
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
|
||||
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
# NOTE: No need to set the status to failed if the shutdown_event is set.
|
||||
# The stop() method will take care of that.
|
||||
status.set_finished()
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
|
||||
|
||||
logger.info(
|
||||
f"LamNI statistics: Average of all standard deviations: x {self.average_stdeviations_x_st_fzp}, y {self.average_stdeviations_y_st_fzp}, angle {self.average_lamni_angle}."
|
||||
)
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[5])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[8])
|
||||
self.average_lamni_angle += float(return_table[19])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[3])},
|
||||
"average_x_st_fzp": {"value": float(return_table[4])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[5])},
|
||||
"target_y": {"value": float(return_table[6])},
|
||||
"average_y_st_fzp": {"value": float(return_table[7])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[8])},
|
||||
"average_cap1": {"value": float(return_table[9])},
|
||||
"stdev_cap1": {"value": float(return_table[10])},
|
||||
"average_cap2": {"value": float(return_table[11])},
|
||||
"stdev_cap2": {"value": float(return_table[12])},
|
||||
"average_cap3": {"value": float(return_table[13])},
|
||||
"stdev_cap3": {"value": float(return_table[14])},
|
||||
"average_cap4": {"value": float(return_table[15])},
|
||||
"stdev_cap4": {"value": float(return_table[16])},
|
||||
"average_cap5": {"value": float(return_table[17])},
|
||||
"stdev_cap5": {"value": float(return_table[18])},
|
||||
"average_angle_interf_ST": {"value": float(return_table[19])},
|
||||
"stdev_angle_interf_ST": {"value": float(return_table[20])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_lamni_angle": {"value": self.average_lamni_angle / (int(return_table[0]) + 1)},
|
||||
}
|
||||
return signals
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
mock = False
|
||||
if not mock:
|
||||
|
||||
@@ -2,15 +2,14 @@ import builtins
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger, messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, PositionerBase, Signal
|
||||
from ophyd import Device, DeviceStatus, PositionerBase, Signal
|
||||
from ophyd.status import wait as status_wait
|
||||
from ophyd.utils import LimitError
|
||||
from ophyd_devices import AsyncMultiSignal, ProgressSignal
|
||||
from ophyd_devices.utils.controller import Controller, threadlocked
|
||||
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
|
||||
from prettytable import PrettyTable
|
||||
@@ -23,7 +22,6 @@ if builtins.__dict__.get("bec") is not None:
|
||||
|
||||
|
||||
from csaxs_bec.devices.omny.rt.rt_ophyd import (
|
||||
BECConfigError,
|
||||
RtCommunicationError,
|
||||
RtError,
|
||||
RtReadbackSignal,
|
||||
@@ -256,7 +254,8 @@ class RtOMNYController(Controller):
|
||||
parent._min_scan_buffer_reached = False
|
||||
start_time = time.time()
|
||||
for pos_index, pos in enumerate(positions):
|
||||
parent.socket_put_and_receive(f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}")
|
||||
cmd = f"s{pos[0]:.05f},{pos[1]:.05f},{pos[2]:.05f}"
|
||||
parent.socket_put_and_receive(cmd)
|
||||
if pos_index > 100:
|
||||
parent._min_scan_buffer_reached = True
|
||||
parent._min_scan_buffer_reached = True
|
||||
@@ -842,35 +841,6 @@ class RtOMNYController(Controller):
|
||||
self.feedback_is_running()
|
||||
return self.ssi[channel]
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[16])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[18])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[3])},
|
||||
"average_x_st_osa": {"value": float(return_table[4])},
|
||||
"stdev_x_st_osa": {"value": float(return_table[5])},
|
||||
"target_y": {"value": float(return_table[6])},
|
||||
"average_y_st_osa": {"value": float(return_table[7])},
|
||||
"stdev_y_st_osa": {"value": float(return_table[8])},
|
||||
"average_x_osa_fzp": {"value": float(return_table[9])},
|
||||
"stdev_x_osa_fzp": {"value": float(return_table[10])},
|
||||
"average_y_osa_fzp": {"value": float(return_table[11])},
|
||||
"stdev_y_osa_fzp": {"value": float(return_table[12])},
|
||||
"average_rotz_st": {"value": float(return_table[13])},
|
||||
"stdev_rotz_st": {"value": float(return_table[14])},
|
||||
"average_x_st_fzp": {"value": float(return_table[15])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[16])},
|
||||
"average_y_st_fzp": {"value": float(return_table[17])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[18])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
}
|
||||
return signals
|
||||
|
||||
@threadlocked
|
||||
def start_scan(self):
|
||||
if not self.feedback_is_running():
|
||||
@@ -910,95 +880,6 @@ class RtOMNYController(Controller):
|
||||
current_position_in_scan = int(float(return_table[2]))
|
||||
return (mode, number_of_positions_planned, current_position_in_scan)
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
|
||||
|
||||
read_counter = 0
|
||||
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
# while scan is running
|
||||
while mode > 0:
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
time.sleep(0.01)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
# logger.info(f"{return_table}")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter:
|
||||
return_table = (self.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
# logger.info(f"{return_table}")
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"OMNY statistics: Average of all standard deviations [nm]: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}."
|
||||
)
|
||||
|
||||
self.device_manager.connector.send_client_info(
|
||||
"OMNY statistics: Average of all standard deviations [nm]: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.",
|
||||
scope="",
|
||||
show_asap=True,
|
||||
)
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_omny"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
),
|
||||
)
|
||||
|
||||
def start_readout(self):
|
||||
readout = threading.Thread(target=self.read_positions_from_sampler)
|
||||
readout.start()
|
||||
|
||||
def kickoff(self, metadata):
|
||||
self.readout_metadata = metadata
|
||||
while not self._min_scan_buffer_reached:
|
||||
time.sleep(0.001)
|
||||
self.start_scan()
|
||||
time.sleep(0.1)
|
||||
self.start_readout()
|
||||
|
||||
|
||||
class RtOMNYReadbackSignal(RtReadbackSignal):
|
||||
@retry_once
|
||||
@@ -1241,9 +1122,6 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
else:
|
||||
raise TypeError(f"Expected value of type int but received {type(val)}")
|
||||
|
||||
def kickoff(self, metadata, **kwargs) -> None:
|
||||
self.controller.kickoff(metadata)
|
||||
|
||||
@property
|
||||
def egu(self):
|
||||
"""The engineering units (EGU) for positions"""
|
||||
@@ -1255,6 +1133,194 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
return super().stop(success=success)
|
||||
|
||||
|
||||
class RtOMNYFlyer(Device):
|
||||
USER_ACCESS = ["controller"]
|
||||
|
||||
data = Cpt(
|
||||
AsyncMultiSignal,
|
||||
name="data",
|
||||
signals=[
|
||||
"target_x",
|
||||
"average_x_st_osa",
|
||||
"stdev_x_st_osa",
|
||||
"target_y",
|
||||
"average_y_st_osa",
|
||||
"stdev_y_st_osa",
|
||||
"average_x_osa_fzp",
|
||||
"stdev_x_osa_fzp",
|
||||
"average_y_osa_fzp",
|
||||
"stdev_y_osa_fzp",
|
||||
"average_rotz_st",
|
||||
"stdev_rotz_st",
|
||||
"average_x_st_fzp",
|
||||
"stdev_x_st_fzp",
|
||||
"average_y_st_fzp",
|
||||
"stdev_y_st_fzp",
|
||||
"average_stdeviations_x_st_fzp",
|
||||
"average_stdeviations_y_st_fzp",
|
||||
],
|
||||
ndim=1,
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
max_size=1000,
|
||||
)
|
||||
progress = Cpt(ProgressSignal, doc="Signal to track the progress of the device during a scan")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
host="mpc3217.psi.ch",
|
||||
port=2222,
|
||||
socket_cls=SocketIO,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(prefix=prefix, name=name, parent=parent, **kwargs)
|
||||
self.shutdown_event = threading.Event()
|
||||
self.controller = RtOMNYController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
)
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
self.readout_thread = None
|
||||
self.scan_done_event = threading.Event()
|
||||
self.scan_done_event.set()
|
||||
|
||||
def stage(self):
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.set()
|
||||
return super().stage()
|
||||
|
||||
def start_readout(self, status: DeviceStatus):
|
||||
self.readout_thread = threading.Thread(
|
||||
target=self.read_positions_from_sampler, args=(status,)
|
||||
)
|
||||
self.readout_thread.start()
|
||||
|
||||
def kickoff(self) -> DeviceStatus:
|
||||
self.shutdown_event.clear()
|
||||
self.scan_done_event.clear()
|
||||
while not self.controller._min_scan_buffer_reached and not self.shutdown_event.wait(0.001):
|
||||
...
|
||||
self.controller.start_scan()
|
||||
self.shutdown_event.wait(0.1)
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
|
||||
def complete(self) -> DeviceStatus:
|
||||
"""Wait until the flyer is done."""
|
||||
if self.scan_done_event.is_set():
|
||||
# if the scan_done_event is already set, we can return a finished status immediately
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
return status
|
||||
status = DeviceStatus(self)
|
||||
self.start_readout(status)
|
||||
status.add_callback(lambda *args, **kwargs: self.scan_done_event.set())
|
||||
return status
|
||||
|
||||
def stop(self, *, success=False):
|
||||
self.shutdown_event.set()
|
||||
self.scan_done_event.set()
|
||||
if self.readout_thread is not None:
|
||||
self.readout_thread.join()
|
||||
return super().stop(success=success)
|
||||
|
||||
def read_positions_from_sampler(self, status: DeviceStatus):
|
||||
"""
|
||||
Read the positions from the sampler and update the data signal.
|
||||
This function runs in a separate thread and continuously checks the
|
||||
scan status.
|
||||
|
||||
Args:
|
||||
status (DeviceStatus): The status object to update when the readout is complete.
|
||||
"""
|
||||
read_counter = 0
|
||||
self.average_stdeviations_x_st_fzp = 0
|
||||
self.average_stdeviations_y_st_fzp = 0
|
||||
|
||||
mode, number_of_positions_planned, current_position_in_scan = self.get_scan_status()
|
||||
while mode > 0 and not self.shutdown_event.wait(0.01):
|
||||
# logger.info(f"Current scan position {current_position_in_scan} out of {number_of_positions_planned}")
|
||||
mode, number_of_positions_planned, current_position_in_scan = (
|
||||
self.controller.get_scan_status()
|
||||
)
|
||||
if current_position_in_scan > 5:
|
||||
while current_position_in_scan > read_counter + 1:
|
||||
return_table = (
|
||||
self.controller.socket_put_and_receive(f"r{read_counter}")
|
||||
).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(
|
||||
value=read_counter, max_value=number_of_positions_planned, done=False
|
||||
)
|
||||
read_counter = read_counter + 1
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
if self.shutdown_event.wait(0.05):
|
||||
logger.info("Shutdown event set, stopping readout.")
|
||||
# if we are here, the shutdown_event is set. We can exit the readout loop.
|
||||
status.set_finished()
|
||||
return
|
||||
|
||||
# read the last samples even though scan is finished already
|
||||
while number_of_positions_planned > read_counter and not self.shutdown_event.is_set():
|
||||
return_table = (self.controller.socket_put_and_receive(f"r{read_counter}")).split(",")
|
||||
logger.info(f"Read {read_counter} out of {number_of_positions_planned}")
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=False)
|
||||
read_counter = read_counter + 1
|
||||
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.data.set(signals)
|
||||
|
||||
# NOTE: No need to set the status to failed if the shutdown_event is set.
|
||||
# The stop() method will take care of that.
|
||||
status.set_finished()
|
||||
self.progress.put(value=read_counter, max_value=number_of_positions_planned, done=True)
|
||||
|
||||
logger.info(
|
||||
"OMNY statistics: Average of all standard deviations: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}"
|
||||
)
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[16])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[18])
|
||||
signals = {
|
||||
"target_x": {"value": float(return_table[3])},
|
||||
"average_x_st_osa": {"value": float(return_table[4])},
|
||||
"stdev_x_st_osa": {"value": float(return_table[5])},
|
||||
"target_y": {"value": float(return_table[6])},
|
||||
"average_y_st_osa": {"value": float(return_table[7])},
|
||||
"stdev_y_st_osa": {"value": float(return_table[8])},
|
||||
"average_x_osa_fzp": {"value": float(return_table[9])},
|
||||
"stdev_x_osa_fzp": {"value": float(return_table[10])},
|
||||
"average_y_osa_fzp": {"value": float(return_table[11])},
|
||||
"stdev_y_osa_fzp": {"value": float(return_table[12])},
|
||||
"average_rotz_st": {"value": float(return_table[13])},
|
||||
"stdev_rotz_st": {"value": float(return_table[14])},
|
||||
"average_x_st_fzp": {"value": float(return_table[15])},
|
||||
"stdev_x_st_fzp": {"value": float(return_table[16])},
|
||||
"average_y_st_fzp": {"value": float(return_table[17])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[18])},
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
"average_stdeviations_y_st_fzp": {
|
||||
"value": self.average_stdeviations_y_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
}
|
||||
return signals
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
rtcontroller = RtOMNYController(
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
|
||||
|
||||
@@ -169,8 +169,8 @@ class LamNIMixin:
|
||||
|
||||
self.device_manager.devices.lsamx.read_only = True
|
||||
self.device_manager.devices.lsamy.read_only = True
|
||||
|
||||
#update angle readback before start of the scan
|
||||
|
||||
# update angle readback before start of the scan
|
||||
yield from self.stubs.send_rpc_and_wait("lsamrot", "readback.get")
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait("rtx", "controller.feedback_enable_without_reset")
|
||||
@@ -339,7 +339,7 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
|
||||
def _transfer_positions_to_LamNI(self):
|
||||
yield from self.stubs.send_rpc_and_wait(
|
||||
"rtx", "controller.add_pos_to_scan", (self.positions.tolist(),)
|
||||
"rtx", "controller.add_pos_to_scan", self.positions.tolist()
|
||||
)
|
||||
|
||||
def _calculate_positions(self):
|
||||
@@ -455,27 +455,17 @@ class LamNIFermatScan(ScanBase, LamNIMixin):
|
||||
yield from self._at_each_point(ind, pos)
|
||||
self.burst_index = 0
|
||||
elif self.scan_type == "fly":
|
||||
# use a device message to receive the scan number and
|
||||
# scan ID before sending the message to the device server
|
||||
yield from self.stubs.kickoff(device="rtx")
|
||||
while True:
|
||||
yield from self.stubs.read(group="monitored")
|
||||
msg = self.device_manager.connector.get(MessageEndpoints.device_status("rt_scan"))
|
||||
if msg:
|
||||
status = msg
|
||||
status_id = status.content.get("status", 1)
|
||||
request_id = status.metadata.get("RID")
|
||||
if status_id == 0 and self.metadata.get("RID") == request_id:
|
||||
break
|
||||
if status_id == 2 and self.metadata.get("RID") == request_id:
|
||||
raise ScanAbortion(
|
||||
"An error occured during the LamNI readout:"
|
||||
f" {status.metadata.get('error')}"
|
||||
)
|
||||
# fly scan mode
|
||||
yield from self.stubs.kickoff(device="rt_positions")
|
||||
|
||||
# start the readout loop of the flyer
|
||||
status = yield from self.stubs.complete(device="rt_positions", wait=False)
|
||||
|
||||
while not status.done:
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
self.point_id += 1
|
||||
time.sleep(1)
|
||||
logger.debug("reading monitors")
|
||||
# yield from self.device_rpc("rtx", "controller.kickoff")
|
||||
|
||||
def run(self):
|
||||
self.initialize()
|
||||
|
||||
@@ -51,7 +51,7 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
angle: float = None,
|
||||
corridor_size: float = 3,
|
||||
parameter: dict = None,
|
||||
frames_per_trigger:int=1,
|
||||
frames_per_trigger: int = 1,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
@@ -75,7 +75,9 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
>>> scans.omny_fermat_scan(fovx=20, fovy=25, cenx=10, ceny=0, zshift=0, angle=0, step=2, exp_time=0.01)
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs)
|
||||
super().__init__(
|
||||
parameter=parameter, exp_time=exp_time, frames_per_trigger=frames_per_trigger, **kwargs
|
||||
)
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
self.fovy = fovy
|
||||
@@ -265,22 +267,16 @@ class OMNYFermatScan(SyncFlyScanBase):
|
||||
# scan ID before sending the message to the device server
|
||||
yield from self.stubs.kickoff(device="rtx")
|
||||
while True:
|
||||
yield from self.stubs.read(group="monitored")
|
||||
status = self.connector.get(MessageEndpoints.device_status("rt_scan"))
|
||||
if status:
|
||||
status_id = status.content.get("status", 1)
|
||||
request_id = status.metadata.get("RID")
|
||||
if status_id == 0 and self.metadata.get("RID") == request_id:
|
||||
break
|
||||
if status_id == 2 and self.metadata.get("RID") == request_id:
|
||||
raise ScanAbortion(
|
||||
"An error occured during the omny readout:"
|
||||
f" {status.metadata.get('error')}"
|
||||
)
|
||||
yield from self.stubs.kickoff(device="rt_positions")
|
||||
|
||||
time.sleep(1)
|
||||
logger.debug("reading monitors")
|
||||
# yield from self.device_rpc("rtx", "controller.kickoff")
|
||||
# start the readout loop of the flyer
|
||||
status = yield from self.stubs.complete(device="rt_positions", wait=False)
|
||||
|
||||
while not status.done:
|
||||
yield from self.stubs.read(group="monitored", point_id=self.point_id)
|
||||
self.point_id += 1
|
||||
time.sleep(1)
|
||||
logger.debug("reading monitors")
|
||||
|
||||
def move_to_start(self):
|
||||
"""return to the start position"""
|
||||
|
||||
Reference in New Issue
Block a user