Compare commits

..

3 Commits

Author SHA1 Message Date
x12sa
79d2071094 correct epics channel for user motor and add direction_of_travel parameter in device config file
Some checks failed
CI for csaxs_bec / test (push) Failing after 34s
2025-10-01 16:01:14 +02:00
x12sa
d8d2417679 add motors_copy_paste.yaml for user motors in experimental hutch
Some checks failed
CI for csaxs_bec / test (push) Failing after 31s
2025-09-30 17:07:40 +02:00
x12sa
8aa0a52bda adding energy motor for double nultilayer mono, for the moment just commented
Some checks failed
CI for csaxs_bec / test (push) Failing after 40s
2025-09-22 16:26:56 +02:00
51 changed files with 1272 additions and 2290 deletions

View File

@@ -47,13 +47,25 @@ jobs:
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}" python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
- name: Checkout BEC Core - name: Checkout BEC Core
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec uses: actions/checkout@v4
with:
repository: bec/bec
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
path: ./bec
- name: Checkout Ophyd Devices - name: Checkout Ophyd Devices
run: git clone --depth 1 --branch "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}" https://github.com/bec-project/ophyd_devices.git ./ophyd_devices uses: actions/checkout@v4
with:
repository: bec/ophyd_devices
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
path: ./ophyd_devices
- name: Checkout BEC Widgets - name: Checkout BEC Widgets
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets uses: actions/checkout@v4
with:
repository: bec/bec_widgets
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
path: ./bec_widgets
- name: Checkout BEC Plugin Repository - name: Checkout BEC Plugin Repository
uses: actions/checkout@v4 uses: actions/checkout@v4

View File

@@ -14,8 +14,6 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
logger = bec_logger.logger logger = bec_logger.logger
@@ -26,57 +24,27 @@ if builtins.__dict__.get("bec") is not None:
umvr = builtins.__dict__.get("umvr") umvr = builtins.__dict__.get("umvr")
class FlomniToolsError(Exception):
pass
class FlomniInitError(Exception): class FlomniInitError(Exception):
pass pass
class FlomniError(Exception): class FlomniError(Exception):
pass pass
class FlomniTools:
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
if autoconfirm and default == "y":
self.printgreen(message + " Automatically confirming default: yes")
return True
elif autoconfirm and default == "n":
self.printgreen(message + " Automatically confirming default: no")
return False
if default == "y":
message_ending = " [Y]/n? "
elif default == "n":
message_ending = " y/[N]? "
else:
message_ending = " y/n? "
while True:
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
if (
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
) or (default == "y" and user_input == ""):
return True
if (
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
) or (default == "n" and user_input == ""):
return False
else:
print("Please expicitely confirm y or n.")
class FlomniInitStagesMixin: class FlomniInitStagesMixin:
def flomni_init_stages(self): def flomni_init_stages(self):
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"): user_input = input("Starting initialization of flOMNI stages. OK? [y/n]")
if user_input == "y":
print("staring...") print("staring...")
else: else:
return return
if self.check_all_axes_of_fomni_referenced(): if self.check_all_axes_of_fomni_referenced():
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"): user_input = input("Continue anyways? [y/n]")
if user_input == "y":
print("ok then...") print("ok then...")
else: else:
return return
@@ -106,8 +74,10 @@ class FlomniInitStagesMixin:
dev.feyex.limits = [-30, -1] dev.feyex.limits = [-30, -1]
print("done") print("done")
user_input = input(
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"): "Init of foptz. Can the stage move to the upstream limit without collision? [y/n]"
)
if user_input == "y":
print("good then") print("good then")
else: else:
return return
@@ -161,7 +131,10 @@ class FlomniInitStagesMixin:
dev.fsamy.limits = [2, 3.1] dev.fsamy.limits = [2, 3.1]
print("done") print("done")
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"): user_input = input(
"Init of tracking stages. Did you remove the outer laser flight tubes? [y/n]"
)
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -177,7 +150,8 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5] dev.ftrackz.limits = [4.5, 5.5]
print("done") print("done")
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"): user_input = input("Init of sample stage. Is the piezo at about 0 deg? [y/n]")
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -194,7 +168,11 @@ class FlomniInitStagesMixin:
print("done") print("done")
print("Initializing UPR stage.") print("Initializing UPR stage.")
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"): user_input = input(
"To ensure that the end switches work, please check that they are currently not pushed."
" Is everything okay? [y/n]"
)
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -215,7 +193,8 @@ class FlomniInitStagesMixin:
time.sleep(1) time.sleep(1)
continue continue
break break
if self.OMNYTools.yesno("Shall I start the index search?"): user_input = input("Shall I start the index search? [y/n]")
if user_input == "y":
print("good then. Starting index search.") print("good then. Starting index search.")
else: else:
print("Stopping.") print("Stopping.")
@@ -234,7 +213,11 @@ class FlomniInitStagesMixin:
dev.fsamroy.limits = [-5, 365] dev.fsamroy.limits = [-5, 365]
print("done") print("done")
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"): user_input = input(
"Init of foptx. Can the stage move to the positive limit without collision? Attention:"
" tracker flight tube! [y/n]"
)
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -258,7 +241,8 @@ class FlomniInitStagesMixin:
continue continue
break break
if self.OMNYTools.yesno("Start limit switch search of fopty?"): user_input = input("Start limit switch search of fopty? [y/n]")
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -291,7 +275,8 @@ class FlomniInitStagesMixin:
return False return False
def set_limits(self): def set_limits(self):
if self.OMNYTools.yesno("Set default limits for flOMNI?"): user_input = input("Set default limits for flOMNI? [y/n]")
if user_input == "y":
print("setting limits...") print("setting limits...")
else: else:
print("Stopping.") print("Stopping.")
@@ -318,7 +303,8 @@ class FlomniInitStagesMixin:
dev.ftrackz.limits = [4.5, 5.5] dev.ftrackz.limits = [4.5, 5.5]
def _align_setup(self): def _align_setup(self):
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"): user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
print("Start moving stages...") print("Start moving stages...")
else: else:
print("Stopping.") print("Stopping.")
@@ -411,8 +397,7 @@ class FlomniSampleTransferMixin:
raise FlomniError("Ftray is not at the 'IN' position. Aborting.") raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
def ftransfer_flomni_stage_in(self): def ftransfer_flomni_stage_in(self):
sample_in_position = dev.flomni_samples.is_sample_slot_used(0) sample_in_position = bool(float(dev.flomni_samples.sample_placed.sample0.get()))
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
if not sample_in_position: if not sample_in_position:
raise FlomniError("There is no sample in the sample stage. Aborting.") raise FlomniError("There is no sample in the sample stage. Aborting.")
self.reset_correction() self.reset_correction()
@@ -425,8 +410,6 @@ class FlomniSampleTransferMixin:
umv(dev.fsamx, fsamx_in) umv(dev.fsamx, fsamx_in)
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4] dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
self.flomnigui_idle()
def laser_tracker_show_all(self): def laser_tracker_show_all(self):
dev.rtx.controller.laser_tracker_show_all() dev.rtx.controller.laser_tracker_show_all()
@@ -466,10 +449,6 @@ class FlomniSampleTransferMixin:
self.device_manager.devices.fsamx.controller.lights_on() self.device_manager.devices.fsamx.controller.lights_on()
def ftransfer_flomni_stage_out(self): def ftransfer_flomni_stage_out(self):
self.flomnigui_show_cameras()
self.flomnigui_raise()
target_pos = -162 target_pos = -162
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01): if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
return return
@@ -517,20 +496,22 @@ class FlomniSampleTransferMixin:
self.check_tray_in() self.check_tray_in()
self.check_sensor_connected() self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
if sample_in_gripper: if sample_in_gripper:
raise FlomniError( raise FlomniError(
"The gripper does carry a sample. Cannot proceed getting another sample." "The gripper does carry a sample. Cannot proceed getting another sample."
) )
sample_in_position = dev.flomni_samples.is_sample_slot_used(position) sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
if not sample_in_position: if not sample_in_position:
raise FlomniError(f"The planned pick position [{position}] does not have a sample.") raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
self.flomnigui_show_cameras() user_input = input(
"Please confirm that there is currently no sample in the gripper. It would be dropped!"
" [y/n]"
if self.OMNYTools.yesno("Please confirm that there is currently no sample in the gripper. It would be dropped!", "y"): )
if user_input == "y":
print("good then") print("good then")
else: else:
print("Stopping.") print("Stopping.")
@@ -574,12 +555,12 @@ class FlomniSampleTransferMixin:
self.check_tray_in() self.check_tray_in()
self.check_sensor_connected() self.check_sensor_connected()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
if not sample_in_gripper: if not sample_in_gripper:
raise FlomniError("The gripper does not carry a sample.") raise FlomniError("The gripper does not carry a sample.")
sample_in_position = dev.flomni_samples.is_sample_slot_used(position) sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
sample_in_position = bool(float(sample_signal.get()))
if sample_in_position: if sample_in_position:
raise FlomniError(f"The planned put position [{position}] already has a sample.") raise FlomniError(f"The planned put position [{position}] already has a sample.")
@@ -612,9 +593,8 @@ class FlomniSampleTransferMixin:
self.flomni_modify_storage_non_interactive(100, 0, "-") self.flomni_modify_storage_non_interactive(100, 0, "-")
self.flomni_modify_storage_non_interactive(position, 1, sample_name) self.flomni_modify_storage_non_interactive(position, 1, sample_name)
if position == 0: # TODO: flomni_stage_in if position == 0
self.ftransfer_flomni_stage_in() # bec.queue.next_dataset_number += 1
bec.queue.next_dataset_number += 1
def sample_get_name(self, position: int = 0) -> str: def sample_get_name(self, position: int = 0) -> str:
""" """
@@ -625,51 +605,36 @@ class FlomniSampleTransferMixin:
def ftransfer_sample_change(self, new_sample_position: int): def ftransfer_sample_change(self, new_sample_position: int):
self.check_tray_in() self.check_tray_in()
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get() sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
if sample_in_gripper: if sample_in_gripper:
raise FlomniError("There is already a sample in the gripper. Aborting.") raise FlomniError("There is already a sample in the gripper. Aborting.")
self.check_position_is_valid(new_sample_position) self.check_position_is_valid(new_sample_position)
if new_sample_position == 0: sample_placed = getattr(
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.") dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
).get()
# sample_placed = getattr(
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
# ).get()
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
if not sample_placed: if not sample_placed:
raise FlomniError( raise FlomniError(
f"There is currently no sample in position [{new_sample_position}]. Aborting." f"There is currently no sample in position [{new_sample_position}]. Aborting."
) )
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get() sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
if sample_in_sample_stage: if sample_in_sample_stage:
# find a new home for the sample... # find a new home for the sample...
empty_slots = [] empty_slots = []
# for name, val in dev.flomni_samples.read().items(): for name, val in dev.flomni_samples.read().items():
# if "flomni_samples_sample_placed_sample" not in name: if "flomni_samples_sample_placed_sample" not in name:
# continue continue
# if val.get("value") == 0: if val.get("value") == 0:
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1])) empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
for j in range(1,20):
if not dev.flomni_samples.is_sample_slot_used(j):
empty_slots.append(j)
if not empty_slots: if not empty_slots:
raise FlomniError("There are no empty slots available. Aborting.") raise FlomniError("There are no empty slots available. Aborting.")
print(f"The following slots are empty: {empty_slots}.") print(f"The following slots are empty: {empty_slots}.")
while True: while True:
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ") user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}]")
if user_input.strip() == "":
# No entry: use default
user_input = empty_slots[0]
break
try: try:
user_input = int(user_input) user_input = int(user_input)
if user_input not in empty_slots: if user_input not in empty_slots:
@@ -735,20 +700,20 @@ class FlomniSampleTransferMixin:
if confirm != -1: if confirm != -1:
return return
if self.OMNYTools.yesno("All OK? Continue?", "y"): user_input = input("All OK? Continue? [y/n]")
if user_input == "y":
print("good then") print("good then")
dev.ftransy.controller.socket_put_confirmed("confirm=1") dev.ftransy.controller.socket_put_confirmed("confirm=1")
else: else:
print("Stopping.") print("Stopping.")
exit return
def ftransfer_gripper_is_open(self) -> bool: def ftransfer_gripper_is_open(self) -> bool:
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip())) status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
return status return status
def ftransfer_gripper_open(self): def ftransfer_gripper_open(self):
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper() sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
#dev.flomni_samples.sample_in_gripper.get()
if sample_in_gripper: if sample_in_gripper:
raise FlomniError( raise FlomniError(
"Cannot open gripper. There is still a sample in the gripper! Aborting." "Cannot open gripper. There is still a sample in the gripper! Aborting."
@@ -768,8 +733,11 @@ class FlomniSampleTransferMixin:
fsamx_pos = dev.fsamx.readback.get() fsamx_pos = dev.fsamx.readback.get()
if position == 0 and fsamx_pos > -160: if position == 0 and fsamx_pos > -160:
user_input = input(
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"): "May the flomni stage be moved out for the sample change? Feedback will be disabled"
" and alignment will be lost! [y/n]"
)
if user_input == "y":
print("good then") print("good then")
self.ftransfer_flomni_stage_out() self.ftransfer_flomni_stage_out()
else: else:
@@ -924,20 +892,7 @@ class FlomniSampleTransferMixin:
class FlomniAlignmentMixin: class FlomniAlignmentMixin:
import csaxs_bec default_correction_file = "correction_flomni_20210300_360deg.txt"
import os
from pathlib import Path
# Ensure this is a Path object, not a string
csaxs_bec_basepath = Path(csaxs_bec.__file__)
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
# Build the absolute path correctly
default_correction_file = (
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
).resolve()
def reset_correction(self, use_default_correction=True): def reset_correction(self, use_default_correction=True):
""" """
@@ -1151,7 +1106,6 @@ class Flomni(
FlomniAlignmentMixin, FlomniAlignmentMixin,
FlomniOpticsMixin, FlomniOpticsMixin,
cSAXSBeamlineChecks, cSAXSBeamlineChecks,
flomniGuiTools
): ):
def __init__(self, client): def __init__(self, client):
super().__init__() super().__init__()
@@ -1174,23 +1128,16 @@ class Flomni(
self.corr_pos_y_2 = [] self.corr_pos_y_2 = []
self.corr_angle_y_2 = [] self.corr_angle_y_2 = []
self.progress = {} self.progress = {}
self.progress["subtomo"] = 0
self.progress["subtomo_projection"] = 0
self.progress["subtomo_total_projections"] = 1
self.progress["projection"] = 0
self.progress["total_projections"] = 1
self.progress["angle"] = 0
self.progress["tomo_type"] = 0
self.OMNYTools = OMNYTools(self.client)
self.align = XrayEyeAlign(self.client, self) self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
def start_x_ray_eye_alignment(self, keep_shutter_open=False): def start_x_ray_eye_alignment(self):
user_input = input(
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"): "Starting Xrayeye alignment. Deleting any potential existing alignment for this sample. [Y/n]"
)
if user_input == "y" or user_input == "":
self.align = XrayEyeAlign(self.client, self) self.align = XrayEyeAlign(self.client, self)
try: try:
self.align.align(keep_shutter_open) self.align.align()
except KeyboardInterrupt as exc: except KeyboardInterrupt as exc:
fsamx_in = self._get_user_param_safe(dev.fsamx, "in") fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5): if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
@@ -1199,11 +1146,11 @@ class Flomni(
umv(dev.fsamx, fsamx_in) umv(dev.fsamx, fsamx_in)
raise exc raise exc
def xrayeye_update_frame(self,keep_shutter_open=False): def xrayeye_update_frame(self):
self.align.update_frame(keep_shutter_open) self.align.update_frame()
def xrayeye_alignment_start(self, keep_shutter_open=False): def xrayeye_alignment_start(self):
self.start_x_ray_eye_alignment(keep_shutter_open) self.start_x_ray_eye_alignment()
def drive_axis_to_limit(self, device, direction): def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id") axis_id = device._config["deviceConfig"].get("axis_Id")
@@ -1623,9 +1570,6 @@ class Flomni(
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None): def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
"""start a tomo scan""" """start a tomo scan"""
self.flomnigui_show_progress()
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
scans = builtins.__dict__.get("scans") scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy() self._current_special_angles = self.special_angles.copy()
@@ -1762,7 +1706,6 @@ class Flomni(
print(f"Angle: ........................... {self.progress['angle']}") print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}") print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m") print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
self._flomnigui_update_progress()
def add_sample_database( def add_sample_database(
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
@@ -1852,7 +1795,7 @@ class Flomni(
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser( tomo_scan_numbers_file = os.path.expanduser(
"~/tomography_scannumbers.txt" "~/Data10/specES1/dat-files/tomography_scannumbers.txt"
) )
with open(tomo_scan_numbers_file, "a+") as out_file: with open(tomo_scan_numbers_file, "a+") as out_file:
# pylint: disable=undefined-variable # pylint: disable=undefined-variable
@@ -1951,8 +1894,8 @@ class Flomni(
) )
print(f"\nSample name: {self.sample_name}\n") print(f"\nSample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? [Y/n]")
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"): if user_input == "y" or user_input == "":
print("... excellent!") print("... excellent!")
else: else:
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float) self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)

View File

@@ -33,7 +33,7 @@ class FlomniOpticsMixin:
feyex_in = self._get_user_param_safe("feyex", "in") feyex_in = self._get_user_param_safe("feyex", "in")
feyey_in = self._get_user_param_safe("feyey", "in") feyey_in = self._get_user_param_safe("feyey", "in")
umv(dev.feyex, feyex_in, dev.feyey, feyey_in) umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
#self.align.update_frame() self.align.update_frame()
def _ffzp_in(self): def _ffzp_in(self):
foptx_in = self._get_user_param_safe("foptx", "in") foptx_in = self._get_user_param_safe("foptx", "in")

View File

@@ -1,173 +0,0 @@
import builtins
from bec_widgets.cli.client import BECDockArea
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
if builtins.__dict__.get("bec") is not None:
bec = builtins.__dict__.get("bec")
dev = builtins.__dict__.get("dev")
umv = builtins.__dict__.get("umv")
umvr = builtins.__dict__.get("umvr")
class flomniGuiToolsError(Exception):
pass
class flomniGuiTools:
def __init__(self):
self.text_box = None
self.progressbar = None
def set_client(self, client):
self.client = client
self.gui = self.client.gui
def flomnigui_show_gui(self):
if "flomni" in self.gui.windows:
self.gui.flomni.show()
else:
self.gui.new("flomni")
def flomnigui_stop_gui(self):
self.gui.flomni.hide()
def flomnigui_raise(self):
self.gui.flomni.raise_window()
# def flomnigui_show_xeyealign(self):
# self.flomnigui_show_gui()
# if self.xeyegui is None:
# self.flomnigui_remove_all_docks()
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# # start live
# if not dev.cam_xeye.live_mode:
# dev.cam_xeye.live_mode = True
def flomnigui_show_xeyealign(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("xeyegui"):
self.flomnigui_remove_all_docks()
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
# start live
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
def _flomnigui_check_attribute_not_exists(self, attribute_name):
if hasattr(self.gui,"flomni"):
if hasattr(self.gui.flomni,attribute_name):
return False
return True
def flomnigui_show_cameras(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
self.flomnigui_remove_all_docks()
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
camera_gripper_image.lock_aspect_ratio = True
camera_gripper_image.enable_fps_monitor = True
camera_gripper_image.enable_toolbar = False
camera_gripper_image.outer_axes = False
camera_gripper_image.inner_axes = False
dev.cam_flomni_gripper.start_live_mode()
else:
print("Cannot open camera_gripper. Device does not exist.")
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
camera_overview_image.image(("cam_flomni_overview", "preview"))
camera_overview_image.lock_aspect_ratio = True
camera_overview_image.enable_fps_monitor = True
camera_overview_image.enable_toolbar = False
camera_overview_image.outer_axes = False
camera_overview_image.inner_axes = False
dev.cam_flomni_overview.start_live_mode()
else:
print("Cannot open camera_overview. Device does not exist.")
def flomnigui_remove_all_docks(self):
dev.cam_flomni_overview.stop_live_mode()
dev.cam_flomni_gripper.stop_live_mode()
dev.cam_xeye.live_mode = False
self.gui.flomni.delete_all()
self.progressbar = None
self.text_box = None
def flomnigui_idle(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
self.flomnigui_remove_all_docks()
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
text = (
"<pre>"
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
+ "| `-,| || | | || |'.'| || |' ' || | \n"
+ "| .-'| |' '-' '| | | || | ` || | \n"
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
+ "</pre>"
)
idle_text_box.set_html_text(text)
def _flomnicam_check_device_exists(self, device):
try:
device
except:
return False
else:
return True
def flomnigui_show_progress(self):
self.flomnigui_show_gui()
if self._flomnigui_check_attribute_not_exists("progressbar"):
self.flomnigui_remove_all_docks()
# Add a new dock with a RingProgressBar widget
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
# Customize the size of the progress ring
self.progressbar.set_line_widths(20)
# Disable automatic updates and manually set the self.progressbar value
self.progressbar.enable_auto_updates(False)
# Set precision for the self.progressbar display
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
# Setting multiple rigns with different values
self.progressbar.set_number_of_bars(3)
self.progressbar.rings[0].set_update("manual")
self.progressbar.rings[1].set_update("manual")
self.progressbar.rings[2].set_update("scan")
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
# self.progressbar.set_value([50, 75])
# Add a new dock with a TextBox widget
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
self._flomnigui_update_progress()
def _flomnigui_update_progress(self):
if self.progressbar is not None:
progress = self.progress["projection"] / self.progress["total_projections"] * 100
subtomo_progress = (
self.progress["subtomo_projection"]
/ self.progress["subtomo_total_projections"]
* 100
)
self.progressbar.set_value([progress, subtomo_progress, 0])
if self.text_box is not None:
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
self.text_box.set_plain_text(text)
if __name__ == "__main__":
from bec_lib.client import BECClient
from bec_widgets.cli.client_utils import BECGuiClient
client = BECClient()
client.start()
client.gui = BECGuiClient()
flomni_gui = flomniGuiTools(client)
flomni_gui.flomnigui_show_gui()
flomni_gui.flomnigui_show_progress()

View File

@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
from bec_lib import bec_logger from bec_lib import bec_logger
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
logger = bec_logger.logger logger = bec_logger.logger
# import builtins to avoid linter errors # import builtins to avoid linter errors
@@ -22,7 +22,6 @@ if TYPE_CHECKING:
class XrayEyeAlign: class XrayEyeAlign:
# pixel calibration, multiply to get mm # pixel calibration, multiply to get mm
labview=False
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
def __init__(self, client, flomni: Flomni) -> None: def __init__(self, client, flomni: Flomni) -> None:
@@ -41,40 +40,28 @@ class XrayEyeAlign:
def save_frame(self): def save_frame(self):
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1) epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
def update_frame(self,keep_shutter_open=False): def update_frame(self):
if self.labview: epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
epics_put("XOMNYI-XEYE-ACQDONE:0", 0) # start live
if not self.labview:
self.flomni.flomnigui_show_xeyealign()
if not dev.cam_xeye.live_mode:
dev.cam_xeye.live_mode = True
epics_put("XOMNYI-XEYE-ACQ:0", 1) epics_put("XOMNYI-XEYE-ACQ:0", 1)
if self.labview: # wait for start live
# wait for start live while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: time.sleep(0.5)
time.sleep(0.5) print("waiting for live view to start...")
print("waiting for live view to start...")
fshopen() fshopen()
if self.labview: epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0: while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
print("waiting for new frame...") print("waiting for new frame...")
time.sleep(0.5) time.sleep(0.5)
time.sleep(0.5) time.sleep(0.5)
# stop live view # stop live view
if not keep_shutter_open: epics_put("XOMNYI-XEYE-ACQ:0", 0)
epics_put("XOMNYI-XEYE-ACQ:0", 0) time.sleep(1)
time.sleep(0.1) # fshclose
fshclose() print("got new frame")
print("got new frame")
else:
print("Staying in live view, shutter is and remains open!")
def tomo_rotate(self, val: float): def tomo_rotate(self, val: float):
# pylint: disable=undefined-variable # pylint: disable=undefined-variable
@@ -100,23 +87,12 @@ class XrayEyeAlign:
def send_message(self, msg: str): def send_message(self, msg: str):
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg) epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
def align(self,keep_shutter_open=False): def align(self):
if not keep_shutter_open:
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
self.send_message("Getting things ready. Please wait...")
#potential unresolved movement requests to zero
epics_put("XOMNYI-XEYE-MVX:0", 0)
epics_put("XOMNYI-XEYE-MVY:0", 0)
# reset shift xy and fov params # reset shift xy and fov params
self._reset_init_values() self._reset_init_values()
self.flomni.lights_off() self.flomni.lights_off()
self.flomni.flomnigui_show_xeyealign()
self.flomni.flomnigui_raise()
self.tomo_rotate(0) self.tomo_rotate(0)
epics_put("XOMNYI-XEYE-ANGLE:0", 0) epics_put("XOMNYI-XEYE-ANGLE:0", 0)
@@ -143,7 +119,7 @@ class XrayEyeAlign:
umv(dev.fsamx, fsamx_in - 0.25) umv(dev.fsamx, fsamx_in - 0.25)
self.flomni.ffzp_in() self.flomni.ffzp_in()
self.update_frame(keep_shutter_open) self.update_frame()
# enable submit buttons # enable submit buttons
self.movement_buttons_enabled = False self.movement_buttons_enabled = False
@@ -176,18 +152,17 @@ class XrayEyeAlign:
self.flomni.feedback_disable() self.flomni.feedback_disable()
umv(dev.fsamx, fsamx_in - 0.25) umv(dev.fsamx, fsamx_in - 0.25)
if self.labview: self.update_frame()
self.update_frame(keep_shutter_open) epics_put("XOMNYI-XEYE-RECBG:0", 1)
epics_put("XOMNYI-XEYE-RECBG:0", 1) while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
while epics_get("XOMNYI-XEYE-RECBG:0") == 1: time.sleep(0.5)
time.sleep(0.5) print("waiting for background frame...")
print("waiting for background frame...")
umv(dev.fsamx, fsamx_in) umv(dev.fsamx, fsamx_in)
time.sleep(0.5) time.sleep(0.5)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame()
self.send_message("Adjust sample height and submit center") self.send_message("Adjust sample height and submit center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0) epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
self.movement_buttons_enabled = True self.movement_buttons_enabled = True
@@ -200,7 +175,7 @@ class XrayEyeAlign:
umv(dev.rtx, 0) umv(dev.rtx, 0)
self.tomo_rotate(k * 45) self.tomo_rotate(k * 45)
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle()) epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
self.update_frame(keep_shutter_open) self.update_frame()
self.send_message("Submit sample center") self.send_message("Submit sample center")
epics_put("XOMNYI-XEYE-SUBMIT:0", 0) epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
epics_put("XOMNYI-XEYE-ENAMVX:0", 1) epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
@@ -224,7 +199,7 @@ class XrayEyeAlign:
if k > 0: if k > 0:
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000) epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
time.sleep(3) time.sleep(3)
self.update_frame(keep_shutter_open) self.update_frame()
if k < 2: if k < 2:
# allow movements, store movements to calculate center # allow movements, store movements to calculate center
@@ -235,7 +210,7 @@ class XrayEyeAlign:
time.sleep(2) time.sleep(2)
epics_put("XOMNYI-XEYE-MVY:0", 0) epics_put("XOMNYI-XEYE-MVY:0", 0)
self.flomni.feedback_enable_with_reset() self.flomni.feedback_enable_with_reset()
self.update_frame(keep_shutter_open) self.update_frame()
time.sleep(0.2) time.sleep(0.2)
self.write_output() self.write_output()
@@ -246,16 +221,8 @@ class XrayEyeAlign:
umv(dev.rtx, 0) umv(dev.rtx, 0)
# free camera # free camera
if self.labview: epics_put("XOMNYI-XEYE-ACQ:0", 2)
epics_put("XOMNYI-XEYE-ACQ:0", 2)
if keep_shutter_open and not self.labview:
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
fshclose()
epics_put("XOMNYI-XEYE-ACQ:0", 0)
if not self.labview:
self.flomni.flomnigui_idle()
print( print(
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy" f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"

View File

@@ -61,3 +61,25 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS # SETUP PROMPTS
bec._ip.prompts.session_name = _session_name bec._ip.prompts.session_name = _session_name
bec._ip.prompts.status = 1 bec._ip.prompts.status = 1
# REGISTER BEAMLINE CHECKS
from bec_lib.bl_conditions import (
FastOrbitFeedbackCondition,
LightAvailableCondition,
ShutterCondition,
)
if "sls_machine_status" in dev:
print("Registering light available condition for SLS machine status")
_light_available_condition = LightAvailableCondition(dev.sls_machine_status)
bec.bl_checks.register(_light_available_condition)
if "x12sa_es1_shutter_status" in dev:
print("Registering shutter condition for X12SA ES1 shutter status")
_shutter_condition = ShutterCondition(dev.x12sa_es1_shutter_status)
bec.bl_checks.register(_shutter_condition)
# if hasattr(dev, "sls_fast_orbit_feedback"):
# print("Registering fast orbit feedback condition for SLS fast orbit feedback")
# _fast_orbit_feedback_condition = FastOrbitFeedbackCondition(dev.sls_fast_orbit_feedback)
# bec.bl_checks.register(_fast_orbit_feedback_condition)

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
logger = bec_logger.logger logger = bec_logger.logger
@@ -14,7 +14,6 @@ logger = bec_logger.logger
_Widgets = { _Widgets = {
"OmnyAlignment": "OmnyAlignment", "OmnyAlignment": "OmnyAlignment",
"XRayEye": "XRayEye",
} }
@@ -74,75 +73,3 @@ class OmnyAlignment(RPCBase):
""" """
None None
""" """
class XRayEye(RPCBase):
@rpc_call
def active_roi(self) -> "BaseROI | None":
"""
Return the currently active ROI, or None if no ROI is active.
"""
@property
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@enable_live_view.setter
@rpc_call
def enable_live_view(self):
"""
Get or set the live view enabled state.
"""
@property
@rpc_call
def user_message(self):
"""
None
"""
@user_message.setter
@rpc_call
def user_message(self):
"""
None
"""
@property
@rpc_call
def sample_name(self):
"""
None
"""
@sample_name.setter
@rpc_call
def sample_name(self):
"""
None
"""
@property
@rpc_call
def enable_move_buttons(self):
"""
None
"""
@enable_move_buttons.setter
@rpc_call
def enable_move_buttons(self):
"""
None
"""
class XRayEye2DControl(RPCBase):
@rpc_call
def remove(self):
"""
Cleanup the BECConnector
"""

View File

@@ -63,7 +63,7 @@ class OmnyAlignment(BECWidget, QWidget):
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled) self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
# self.ui.moveUpButton.clicked.connect(self.on_move_up) self.ui.moveUpButton.clicked.connect(self.on_move_up)
@property @property
@@ -98,10 +98,10 @@ class OmnyAlignment(BECWidget, QWidget):
logger.info(f"Live view is enabled: {enabled}") logger.info(f"Live view is enabled: {enabled}")
image: Image = self.ui.image image: Image = self.ui.image
if enabled: if enabled:
image.image("cam_xeye") image.image("cam200")
return return
image.disconnect_monitor("cam_xeye") image.disconnect_monitor("cam200")
@property @property

View File

@@ -86,7 +86,7 @@
<bool>false</bool> <bool>false</bool>
</property> </property>
<property name="monitor" stdset="0"> <property name="monitor" stdset="0">
<string>cam_xeye</string> <string>cam200</string>
</property> </property>
<property name="rotation" stdset="0"> <property name="rotation" stdset="0">
<number>3</number> <number>3</number>

View File

@@ -1,15 +0,0 @@
def main(): # pragma: no cover
from qtpy import PYSIDE6
if not PYSIDE6:
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
return
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye_plugin import XRayEyePlugin
QPyDesignerCustomWidgetCollection.addCustomWidget(XRayEyePlugin())
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -1,426 +0,0 @@
from __future__ import annotations
from bec_lib import bec_logger
from bec_lib.endpoints import MessageEndpoints
from bec_qthemes import material_icon
from bec_widgets import BECWidget, SafeProperty, SafeSlot
from bec_widgets.widgets.plots.image.image import Image
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import (
QFrame,
QGridLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QSizePolicy,
QSpinBox,
QToolButton,
QVBoxLayout,
QWidget,
)
logger = bec_logger.logger
CAMERA = ("cam_xeye", "image")
class XRayEye2DControl(BECWidget, QWidget):
def __init__(self, parent=None, step_size: int = 100, *arg, **kwargs):
super().__init__(parent=parent, *arg, **kwargs)
self.get_bec_shortcuts()
self._step_size = step_size
self.root_layout = QGridLayout(self)
self.setStyleSheet("""
QToolButton {
border: 1px solid;
border-radius: 4px;
}
""")
# Up
self.move_up_button = QToolButton(parent=self)
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
self.root_layout.addWidget(self.move_up_button, 0, 2)
# Up tweak button
self.move_up_tweak_button = QToolButton(parent=self)
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
# Left
self.move_left_button = QToolButton(parent=self)
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
self.root_layout.addWidget(self.move_left_button, 2, 0)
# Left tweak button
self.move_left_tweak_button = QToolButton(parent=self)
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
# Right
self.move_right_button = QToolButton(parent=self)
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
self.root_layout.addWidget(self.move_right_button, 2, 4)
# Right tweak button
self.move_right_tweak_button = QToolButton(parent=self)
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
# Down
self.move_down_button = QToolButton(parent=self)
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
self.root_layout.addWidget(self.move_down_button, 4, 2)
# Down tweak button
self.move_down_tweak_button = QToolButton(parent=self)
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
# Connections
self.move_up_button.clicked.connect(lambda: self.move("up", tweak=False))
self.move_up_tweak_button.clicked.connect(lambda: self.move("up", tweak=True))
self.move_down_button.clicked.connect(lambda: self.move("down", tweak=False))
self.move_down_tweak_button.clicked.connect(lambda: self.move("down", tweak=True))
self.move_left_button.clicked.connect(lambda: self.move("left", tweak=False))
self.move_left_tweak_button.clicked.connect(lambda: self.move("left", tweak=True))
self.move_right_button.clicked.connect(lambda: self.move("right", tweak=False))
self.move_right_tweak_button.clicked.connect(lambda: self.move("right", tweak=True))
@SafeProperty(int)
def step_size(self) -> int:
return self._step_size
@step_size.setter
def step_size(self, step_size: int):
self._step_size = step_size
@SafeSlot(bool)
def enable_controls_hor(self, enable: bool):
self.move_left_button.setEnabled(enable)
self.move_left_tweak_button.setEnabled(enable)
self.move_right_button.setEnabled(enable)
self.move_right_tweak_button.setEnabled(enable)
@SafeSlot(bool)
def enable_controls_ver(self, enable: bool):
self.move_up_button.setEnabled(enable)
self.move_up_tweak_button.setEnabled(enable)
self.move_down_button.setEnabled(enable)
self.move_down_tweak_button.setEnabled(enable)
def move(self, direction: str, tweak: bool = False):
step = self._step_size
if tweak:
step = int(self._step_size / 5)
if direction == "up":
self.dev.omny_xray_gui.mvy.set(step)
elif direction == "down":
self.dev.omny_xray_gui.mvy.set(-step)
elif direction == "left":
self.dev.omny_xray_gui.mvx.set(-step)
elif direction == "right":
self.dev.omny_xray_gui.mvx.set(step)
else:
logger.warning(f"Unknown direction {direction} for move command.")
class XRayEye(BECWidget, QWidget):
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
PLUGIN = True
def __init__(self, parent=None, **kwargs):
super().__init__(parent=parent, **kwargs)
self.get_bec_shortcuts()
self._init_ui()
self._make_connections()
# Connection to redis endpoints
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
self.connect_motors()
self.resize(800, 600)
QTimer.singleShot(0, self._init_gui_trigger)
def _init_ui(self):
self.core_layout = QHBoxLayout(self)
self.image = Image(parent=self)
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
self.image.inner_axes = False # Disable inner axes to maximize image area
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
# Control panel on the right: vertical layout inside a fixed-width widget
self.control_panel = QWidget(parent=self)
self.control_panel_layout = QVBoxLayout(self.control_panel)
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
self.control_panel_layout.setSpacing(10)
# ROI toolbar + Live toggle (header row)
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
compact_orientation="horizontal")
header_row = QHBoxLayout()
header_row.setContentsMargins(0, 0, 0, 0)
header_row.setSpacing(8)
header_row.addWidget(self.roi_manager, 0)
header_row.addStretch()
self.live_preview_label = QLabel("Live Preview", parent=self)
self.live_preview_toggle = ToggleSwitch(parent=self)
self.live_preview_toggle.checked = False
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
self.control_panel_layout.addLayout(header_row)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# 2D Positioner (fixed size)
self.motor_control_2d = XRayEye2DControl(parent=self)
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
# separator
self.control_panel_layout.addWidget(self._create_separator())
# Step size label
step_size_form = QGridLayout()
# General Step size
self.step_size = QSpinBox(parent=self)
self.step_size.setRange(10, 100)
self.step_size.setSingleStep(10)
self.step_size.setValue(100)
# Submit button
self.submit_button = QPushButton("Submit", parent=self)
# Add to layout form
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
step_size_form.addWidget(self.step_size, 0, 1)
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
# Add form to control panel
self.control_panel_layout.addLayout(step_size_form)
# Push form to bottom
self.control_panel_layout.addStretch()
# Sample/Message form (bottom)
form = QGridLayout()
self.sample_name_line_edit = QLineEdit(parent=self)
self.sample_name_line_edit.setReadOnly(True)
form.addWidget(QLabel("Sample", parent=self), 0, 0)
form.addWidget(self.sample_name_line_edit, 0, 1)
self.message_line_edit = QLineEdit(parent=self)
self.message_line_edit.setReadOnly(True)
form.addWidget(QLabel("Message", parent=self), 1, 0)
form.addWidget(self.message_line_edit, 1, 1)
self.control_panel_layout.addLayout(form)
# Fix panel width and allow vertical expansion
self.control_panel.adjustSize()
p_hint = self.control_panel.sizeHint()
self.control_panel.setFixedWidth(p_hint.width())
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
# Core Layout: image (expanding) | control panel (fixed)
self.core_layout.addWidget(self.image)
self.core_layout.addWidget(self.control_panel)
def _make_connections(self):
# Fetch initial state
self.on_live_view_enabled(True)
self.step_size.setValue(self.motor_control_2d.step_size)
# Make connections
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
self.submit_button.clicked.connect(self.submit)
def _create_separator(self):
sep = QFrame(parent=self)
sep.setFrameShape(QFrame.HLine)
sep.setFrameShadow(QFrame.Sunken)
sep.setLineWidth(1)
return sep
def _init_gui_trigger(self):
self.dev.omny_xray_gui.read()
################################################################################
# Device Connection logic
################################################################################
def connect_motors(self):
""" Checks one of the possible motors for flomni, omny and lamni setup."""
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
for motor in possible_motors:
if motor in self.dev:
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
logger.info(f"Succesfully connected to {motor}")
################################################################################
# Properties ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeProperty(str)
def user_message(self):
return self.message_line_edit.text()
@user_message.setter
def user_message(self, message: str):
self.message_line_edit.setText(message)
@SafeProperty(str)
def sample_name(self):
return self.sample_name_line_edit.text()
@sample_name.setter
def sample_name(self, message: str):
self.sample_name_line_edit.setText(message)
@SafeProperty(bool)
def enable_move_buttons(self):
return self.motor_control_2d.isEnabled()
@enable_move_buttons.setter
def enable_move_buttons(self, enabled: bool):
self.motor_control_2d.setEnabled(enabled)
def active_roi(self) -> BaseROI | None:
"""Return the currently active ROI, or None if no ROI is active."""
return self.roi_manager.single_active_roi
################################################################################
# Slots ported from the original OmnyAlignment, can be adjusted as needed
################################################################################
@SafeSlot()
def get_roi_coordinates(self) -> dict | None:
"""Get the coordinates of the currently active ROI."""
roi = self.roi_manager.single_active_roi
if roi is None:
logger.warning("No active ROI")
return None
logger.info(f"Active ROI coordinates: {roi.get_coordinates()}")
return roi.get_coordinates()
@SafeSlot(bool)
def on_live_view_enabled(self, enabled: bool):
logger.info(f"Live view is enabled: {enabled}")
self.live_preview_toggle.blockSignals(True)
if enabled:
self.live_preview_toggle.checked = enabled
self.image.image(CAMERA)
self.live_preview_toggle.blockSignals(False)
return
self.image.disconnect_monitor(CAMERA)
self.live_preview_toggle.checked = enabled
self.live_preview_toggle.blockSignals(False)
@SafeSlot(bool, bool)
def on_motors_enable(self, x_enable: bool, y_enable: bool):
"""
Enable/Disable motor controls
Args:
x_enable(bool): enable x motor controls
y_enable(bool): enable y motor controls
"""
self.motor_control_2d.enable_controls_hor(x_enable)
self.motor_control_2d.enable_controls_ver(y_enable)
@SafeSlot(int)
def enable_submit_button(self, enable: int):
"""
Enable/disable submit button.
Args:
enable(int): -1 disable else enable
"""
if enable == -1:
self.submit_button.setEnabled(False)
else:
self.submit_button.setEnabled(True)
@SafeSlot(bool, bool)
def on_tomo_angle_readback(self, data: dict, meta: dict):
#TODO implement if needed
print(f"data: {data}")
print(f"meta: {meta}")
@SafeSlot(dict, dict)
def device_updates(self, data: dict, meta: dict):
"""
Slot to handle device updates from omny_xray_gui device.
Args:
data(dict): data from device
meta(dict): metadata from device
"""
signals = data.get('signals')
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
self.on_live_view_enabled(bool(enable_live_preview))
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
# Signals from epics gui device
# send message
user_message = signals.get("omny_xray_gui_send_message").get('value')
self.user_message = user_message
# sample name
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
self.sample_name = sample_message
# enable frame acquisition
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
self.on_live_view_enabled(bool(update_frame_acq))
# enable submit button
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
self.enable_submit_button(enable_submit_button)
@SafeSlot()
def submit(self):
"""Execute submit action by submit button."""
if self.roi_manager.single_active_roi is None:
logger.warning("No active ROI")
return
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
roi_center_x = roi_coordinates['center_x']
roi_center_y = roi_coordinates['center_y']
# Case of rectangular ROI
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
roi_width = roi_coordinates['width']
roi_height = roi_coordinates['height']
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
roi_width = roi_coordinates['diameter']
roi_height = roi_coordinates['radius']
else:
logger.warning("Unsupported ROI type for submit action.")
return
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
# submit roi coordinates
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
self.dev.omny_xray_gui.submit.set(1)
def cleanup(self):
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
getattr(self.dev,CAMERA[0]).live_mode = False
super().cleanup()
if __name__ == "__main__":
import sys
from qtpy.QtWidgets import QApplication
app = QApplication(sys.argv)
win = XRayEye()
win.resize(1000, 800)
win.show()
sys.exit(app.exec_())

View File

@@ -1 +0,0 @@
{'files': ['x_ray_eye.py']}

View File

@@ -1,57 +0,0 @@
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from bec_widgets.utils.bec_designer import designer_material_icon
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
from qtpy.QtWidgets import QWidget
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye import XRayEye
DOM_XML = """
<ui language='c++'>
<widget class='XRayEye' name='x_ray_eye'>
</widget>
</ui>
"""
class XRayEyePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
def __init__(self):
super().__init__()
self._form_editor = None
def createWidget(self, parent):
if parent is None:
return QWidget()
t = XRayEye(parent)
return t
def domXml(self):
return DOM_XML
def group(self):
return ""
def icon(self):
return designer_material_icon(XRayEye.ICON_NAME)
def includeFile(self):
return "x_ray_eye"
def initialize(self, form_editor):
self._form_editor = form_editor
def isContainer(self):
return False
def isInitialized(self):
return self._form_editor is not None
def name(self):
return "XRayEye"
def toolTip(self):
return "XRayEye"
def whatsThis(self):
return self.toolTip()

View File

@@ -213,8 +213,6 @@ ftransy:
onFailure: buffer onFailure: buffer
readOnly: false readOnly: false
readoutPriority: baseline readoutPriority: baseline
userParameter:
sensor_voltage: -2.4
ftransz: ftransz:
description: Sample transer Z description: Sample transer Z
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
@@ -335,8 +333,8 @@ rtx:
readOnly: false readOnly: false
readoutPriority: on_request readoutPriority: on_request
userParameter: userParameter:
low_signal: 10000 low_signal: 11000
min_signal: 9000 min_signal: 10000
rt_pid_voltage: -0.06219 rt_pid_voltage: -0.06219
rty: rty:
description: flomni rt description: flomni rt
@@ -364,105 +362,3 @@ rtz:
onFailure: buffer onFailure: buffer
readOnly: false readOnly: false
readoutPriority: on_request readoutPriority: on_request
############################################################
####################### Cameras ############################
############################################################
cam_flomni_gripper:
description: Camera sample changer
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5000/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_flomni_overview:
description: Camera flomni overview
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
deviceConfig:
url: http://flomnicamserver:5001/video_high
num_rotation_90: 3
transpose: false
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
cam_xeye:
description: Camera flOMNI Xray eye ID1
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 1
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
cam_ids_rgb:
description: Camera flOMNI Xray eye ID203
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
deviceConfig:
camera_id: 203
bits_per_pixel: 24
num_rotation_90: 3
transpose: false
force_monochrome: true
m_n_colormode: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: async
# ############################################################
# ################### flOMNI temperatures ####################
# ############################################################
flomni_temphum:
description: flOMNI Temperatures and humidity
deviceClass: csaxs_bec.devices.omny.flomni_temp_and_humidity.FlomniTempHum
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
# ############################################################
# ########## OMNY / flOMNI / LamNI fast shutter ##############
# ############################################################
omnyfsh:
description: omnyfsh connects to read fast shutter at X12 if in that network
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
############################################################
#################### GUI Signals ###########################
############################################################
omny_xray_gui:
description: Gui Epics signals
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
deviceConfig: {}
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: on_request
calculated_signal:
description: Calculated signal from alignment for fit
deviceClass: ophyd_devices.ComputedSignal
deviceConfig:
compute_method: "def just_rand():\n return 42"
enabled: true
readOnly: false
readoutPriority: baseline

View File

@@ -0,0 +1,35 @@
#Standard configuration of an Owis LTM80 linear stage
samx:
description: Owis LTM80 linear stage
deviceClass: ophyd_devices.devices.EpicsMotorEx
deviceConfig:
prefix: X12SA-ES2-ES01
motor_resolution: 0.00125
base_velocity: 0.25
velocity: 2.5
backlash_distance: 0.125
acceleration: 0.4
user_offset_dir: 0
direction_of_travel: 1
deviceTags:
- cSAXS
- user_motor
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
#xbpm1x:
# description: 'X-ray BPM1 in frontend translation x'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-FE-XBPM1:TRX'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: false
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - frontend

View File

@@ -26,6 +26,20 @@ dmmx:
- cSAXS - cSAXS
- optics - optics
#E_kev:
# description: 'Double Multilayer Monochromator, energy (keV)'
# deviceClass: ophyd.EpicsMotor
# deviceConfig:
# prefix: 'X12SA-OP-DMM1:ENERGY'
# onFailure: raise
# enabled: true
# readoutPriority: baseline
# readOnly: true
# softwareTrigger: false
# deviceTags:
# - cSAXS
# - optics
dmmy: dmmy:
description: 'Double Multilayer Monochromator, translation Y' description: 'Double Multilayer Monochromator, translation Y'
deviceClass: ophyd.EpicsMotor deviceClass: ophyd.EpicsMotor

View File

@@ -37,7 +37,8 @@ import traceback
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus from ophyd import DeviceStatus
from ophyd_devices import CompareStatus, TransitionStatus
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import ( from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (

View File

@@ -143,7 +143,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
self._add_delay = add_delay or 0 self._add_delay = add_delay or 0
self._raise_states = raise_states or [] self._raise_states = raise_states or []
super().__init__( super().__init__(
obj=signal, device=signal,
callback=self._compare_callback, callback=self._compare_callback,
timeout=timeout, timeout=timeout,
settle_time=settle_time, settle_time=settle_time,

View File

@@ -1 +1 @@
from .ids_camera import IDSCamera from .ids_camera_new import IDSCamera

View File

@@ -18,7 +18,6 @@ import atexit
from typing import Literal from typing import Literal
import numpy as np import numpy as np
import time
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
@@ -67,8 +66,8 @@ class IDSCameraObject:
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject") check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
if ( if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big") int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER == self.ueye.IS_COLORMODE_BAYER
): ):
logger.info("Bayer color mode detected.") logger.info("Bayer color mode detected.")
# setup the color depth to the current windows setting # setup the color depth to the current windows setting
@@ -77,16 +76,16 @@ class IDSCameraObject:
) # TODO This raises an error - maybe check the m_n_colormode value ) # TODO This raises an error - maybe check the m_n_colormode value
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif ( elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big") int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY == self.ueye.IS_COLORMODE_CBYCRY
): ):
# for color camera models use RGB32 mode # for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
self.n_bits_per_pixel = self.ueye.INT(32) self.n_bits_per_pixel = self.ueye.INT(32)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8) self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
elif ( elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big") int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME == self.ueye.IS_COLORMODE_MONOCHROME
): ):
# for color camera models use RGB32 mode # for color camera models use RGB32 mode
self.m_n_colormode = self.ueye.IS_CM_MONO8 self.m_n_colormode = self.ueye.IS_CM_MONO8
@@ -160,17 +159,15 @@ class Camera:
""" """
def __init__( def __init__(
self, self,
camera_id: int, camera_id: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1, m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: int = 24, bits_per_pixel: int = 24,
connect: bool = True, connect: bool = True,
force_monochrome: bool = False,
): ):
self.ueye = ueye self.ueye = ueye
self.camera_id = camera_id self.camera_id = camera_id
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel} self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
self.force_monochrome = force_monochrome
self._connected = False self._connected = False
self.cam = None self.cam = None
atexit.register(self.on_disconnect) atexit.register(self.on_disconnect)
@@ -200,16 +197,14 @@ class Camera:
self.cam = IDSCameraObject(self.camera_id, **self._inputs) self.cam = IDSCameraObject(self.camera_id, **self._inputs)
self._connected = True self._connected = True
def on_disconnect(self, delay_after: float = 0.3): def on_disconnect(self):
"""Disconnect from the camera and optionally wait a short time for driver cleanup.""" """Disconnect from the camera."""
try: try:
if self.cam and self.cam.h_cam: if self.cam and self.cam.h_cam:
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject") check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
self._connected = False self._connected = False
self.cam = None self.cam = None
if delay_after > 0: logger.info("Camera disconnected.")
time.sleep(delay_after)
logger.debug(f"Waited {delay_after:.2f}s after camera disconnect for cleanup.")
except Exception as e: except Exception as e:
logger.info(f"Error during camera disconnection: {e}") logger.info(f"Error during camera disconnection: {e}")
@@ -268,19 +263,9 @@ class Camera:
if array is None: if array is None:
logger.error("Failed to get image data from the camera.") logger.error("Failed to get image data from the camera.")
return None return None
img = np.reshape( return np.reshape(
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel) array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
) )
# If RGB image (H, W, 3), reshuffle channels from BGR → RGB
if img.ndim == 3 and img.shape[2] == 3:
img = img[:, :, ::-1]
if self.force_monochrome:
gray = np.dot(img[..., :3], [0.2989, 0.5870, 0.1140]).astype(np.uint8)
# expand to 3D shape (H, W, 1) for consistency with real mono cams
img = np.expand_dims(gray, axis=-1)
img = np.ascontiguousarray(img)
return img
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,225 +1,403 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading import threading
import time import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger from bec_lib.logger import bec_logger
from ophyd import Component as Cpt from ophyd import Component as Cpt
from ophyd import DeviceStatus, Kind, Signal, StatusBase
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal from ophyd_devices.utils.bec_signals import PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger logger = bec_logger.logger
class IDSCamera(PSIDeviceBase): class ROISignal(Signal):
"""IDS Camera class for cSAXS. """
Signal to handle the Region of Interest (ROI) for the IDS camera.
This class inherits from PSIDeviceBase and implements the necessary methods It is a tuple of (x, y, width, height).
to interact with the IDS camera using the pyueye library.
""" """
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0, def __init__(
transpose=False) self,
roi_signal = Cpt( *,
AsyncSignal, name,
name="roi_signal", roi: tuple | None = None,
ndim=0, value=0,
max_size=1000, dtype=None,
doc="Signal for the region of interest (ROI).", shape=None,
async_update={"type": "add", "max_shape": [None]}, timestamp=None,
) parent=None,
labels=None,
kind=Kind.hinted,
tolerance=None,
rtolerance=None,
metadata=None,
cl=None,
attr_name="",
):
super().__init__(
name=name,
value=value,
dtype=dtype,
shape=shape,
timestamp=timestamp,
parent=parent,
labels=labels,
kind=kind,
tolerance=tolerance,
rtolerance=rtolerance,
metadata=metadata,
cl=cl,
attr_name=attr_name,
)
self.roi = roi
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"] def get(self, **kwargs):
image = self.parent.image_data.get().data
if not isinstance(image, np.ndarray):
return -1 # -1 if no valid image is available
if self.roi is None:
roi = (0, 0, image.shape[1], image.shape[0])
else:
roi = self.roi
if len(image.shape) > 2:
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
class IDSCamera(PSIDeviceBase):
""" "
#---------------------------------------------------------------------------------------------------------------------------------------
#Variables
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
sInfo = ueye.SENSORINFO()
cInfo = ueye.CAMINFO()
pcImageMemory = ueye.c_mem_p()
MemID = ueye.int()
rectAOI = ueye.IS_RECT()
pitch = ueye.INT()
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
bytes_per_pixel = int(nBitsPerPixel / 8)
ids_cam
...
"""
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
def __init__( def __init__(
self, self,
*, prefix="",
name: str, *,
camera_id: int, name: str,
prefix: str = "", camera_ID: int,
scan_info: ScanInfo | None = None, bits_per_pixel: int,
m_n_colormode: Literal[0, 1, 2, 3] = 1, channels: int,
bits_per_pixel: Literal[8, 24] = 24, m_n_colormode: int,
live_mode: bool = False, kind=None,
num_rotation_90: int = 0, device_manager=None,
transpose: bool = False, **kwargs,
force_monochrome: bool = False,
**kwargs,
): ):
"""Initialize the IDS Camera.
Args: super().__init__(
name (str): Name of the device. prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
) )
self._live_mode = False self.camera_ID = camera_ID
self._inputs = {"live_mode": live_mode} self.bits_per_pixel = bits_per_pixel
self._mask = np.zeros((1, 1), dtype=np.uint8) self.bytes_per_pixel = None
self.image.num_rotation_90 = num_rotation_90 self.channels = channels
self.image.transpose = transpose self._m_n_colormode_input = m_n_colormode
self._force_monochrome = force_monochrome self.m_n_colormode = None
self.ueye = ueye
############## Live Mode Methods ############## self.h_cam = None
self.s_info = None
@property self.data_thread = None
def mask(self) -> np.ndarray: self.c_info = None
"""Return the current region of interest (ROI) for the camera.""" self.pc_image_memory = None
return self._mask self.mem_id = None
self.rect_aoi = None
@mask.setter self.pitch = None
def mask(self, value: np.ndarray): self.n_bits_per_pixel = None
""" self.width = None
Set the region of interest (ROI) for the camera. self.height = None
self.thread_event = threading.Event()
Args: self.data_thread = None
value (np.ndarray): The mask to set as the ROI. self._roi: tuple | None = None # x, y, width, height
""" logger.info(
if value.ndim != 2: f"Deprecation warning: The IDSCamera class is deprecated. Use the new IDSCameraNew class instead."
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y: y + height, x: x + width] = 1
self.mask = mask
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
) )
self._live_mode_thread.start()
def _stop_live(self): def set_roi(self, x: int, y: int, width: int, height: int):
"""Stop the live mode for the camera.""" self._roi = (x, y, width, height)
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.") def start_backend(self):
return if self.ueye is None:
self._stop_live_mode_event.set() raise ImportError("The pyueye library is not installed.")
self._live_mode_thread.join(timeout=5) self.h_cam = self.ueye.HIDS(
if self._live_mode_thread.is_alive(): self.camera_ID
logger.warning("Live mode thread did not stop gracefully.") ) # 0: first available camera; 1-254: The camera with the specified camera ID
self.s_info = self.ueye.SENSORINFO()
self.c_info = self.ueye.CAMINFO()
self.pc_image_memory = self.ueye.c_mem_p()
self.mem_id = self.ueye.int()
self.rect_aoi = self.ueye.IS_RECT()
self.pitch = self.ueye.INT()
self.n_bits_per_pixel = self.ueye.INT(
self.bits_per_pixel
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
self.m_n_colormode = self.ueye.INT(
self._m_n_colormode_input
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
# Starts the driver and establishes the connection to the camera
ret = self.ueye.is_InitCamera(self.h_cam, None)
if ret != self.ueye.IS_SUCCESS:
print("is_InitCamera ERROR")
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetCameraInfo ERROR")
# You can query additional information about the sensor type used in the camera
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
if ret != self.ueye.IS_SUCCESS:
print("is_GetSensorInfo ERROR")
ret = self.ueye.is_ResetToDefault(self.h_cam)
if ret != self.ueye.IS_SUCCESS:
print("is_ResetToDefault ERROR")
# Set display mode to DIB
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
# Set the right color mode
if (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_BAYER
):
# setup the color depth to the current windows setting
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_BAYER: ")
print("\tm_n_colormode: \t\t", self.m_n_colormode)
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_CBYCRY
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
n_bits_per_pixel = self.ueye.INT(32)
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
print("IS_COLORMODE_CBYCRY: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
elif (
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
== self.ueye.IS_COLORMODE_MONOCHROME
):
# for color camera models use RGB32 mode
m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("IS_COLORMODE_MONOCHROME: ")
print("\tm_n_colormode: \t\t", m_n_colormode)
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
print()
else: else:
self._live_mode_thread = None # for monochrome camera models use Y8 mode
logger.info("Live mode stopped.") m_n_colormode = self.ueye.IS_CM_MONO8
n_bits_per_pixel = self.ueye.INT(8)
bytes_per_pixel = int(n_bits_per_pixel / 8)
print("else")
def _live_mode_loop(self, stop_event: threading.Event): # Can be used to set the size and position of an "area of interest"(AOI) within an image
"""Loop to capture images in live mode.""" ret = self.ueye.is_AOI(
while not stop_event.is_set(): self.h_cam,
try: self.ueye.IS_AOI_IMAGE_GET_AOI,
self.process_data(self.cam.get_image_data()) self.rect_aoi,
except Exception as e: self.ueye.sizeof(self.rect_aoi),
logger.error(f"Error in live mode loop: {e}") )
break if ret != self.ueye.IS_SUCCESS:
stop_event.wait(0.2) # 5 Hz print("is_AOI ERROR")
def process_data(self, image: np.ndarray | None): self.width = self.rect_aoi.s32Width
"""Process the image data before sending it to the preview signal.""" self.height = self.rect_aoi.s32Height
if image is None:
return
self.image.put(image)
def get_last_image(self) -> np.ndarray: # Prints out some information about the camera and the sensor
"""Get the last captured image from the camera.""" print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
image = self.image.get() print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
if image: print("Maximum image width:\t", self.width)
return image.data print("Maximum image height:\t", self.height)
print()
############## User Interface Methods ############## # ---------------------------------------------------------------------------------------------------------------------------------------
def on_connected(self): # Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
"""Connect to the camera.""" ret = self.ueye.is_AllocImageMem(
self.cam.force_monochrome = self._force_monochrome self.h_cam,
self.cam.on_connect() self.width,
self.live_mode = self._inputs.get("live_mode", False) self.height,
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value) self.n_bits_per_pixel,
self.pc_image_memory,
def on_destroy(self): self.mem_id,
"""Clean up resources when the device is destroyed.""" )
self.cam.on_disconnect() if ret != self.ueye.IS_SUCCESS:
super().on_destroy() print("is_AllocImageMem ERROR")
else:
def on_trigger(self): # Makes the specified image memory the active memory
"""Handle the trigger event.""" ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
if not self.live_mode: if ret != self.ueye.IS_SUCCESS:
return print("is_SetImageMem ERROR")
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
else: else:
data = image.data * self.mask # Set the desired color mode
n_channels = 1 ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
# Activates the camera's live video mode (free run mode)
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
if ret != self.ueye.IS_SUCCESS:
print("is_CaptureVideo ERROR")
# Enables the queue mode for existing image memory sequences
ret = self.ueye.is_InquireImageMem(
self.h_cam,
self.pc_image_memory,
self.mem_id,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
)
if ret != self.ueye.IS_SUCCESS:
print("is_InquireImageMem ERROR")
else:
print("Press q to leave the programm")
# startmeasureframerate = True
# Gain = False
# Start live mode of camera immediately
self.start_live_mode()
def _start_data_thread(self):
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
self.data_thread.start()
def _receive_data_from_camera(self):
while not self.thread_event.is_set():
if self.ueye is None:
print("pyueye library not available.")
return
# In order to display the image in an OpenCV window we need to...
# ...extract the data of our image memory
array = self.ueye.get_data(
self.pc_image_memory,
self.width,
self.height,
self.n_bits_per_pixel,
self.pitch,
copy=False,
)
# ...reshape it in an numpy array...
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
self.image_data.put(frame)
time.sleep(0.1)
def wait_for_connection(self, all_signals=False, timeout=10):
if ueye is None:
raise ImportError(
"The pyueye library is not installed or doesn't provide the necessary c libs"
)
super().wait_for_connection(all_signals, timeout)
def start_live_mode(self):
if self.data_thread is not None:
self.stop_live_mode()
self._start_data_thread()
def stop_live_mode(self):
"""Stopping the camera live mode."""
self.thread_event.set()
if self.data_thread is not None:
self.data_thread.join()
self.thread_event.clear()
self.data_thread = None
########################################
# Beamline Specific Implementations #
########################################
def on_init(self) -> None:
"""
Called when the device is initialized.
No signals are connected at this point. If you like to
set default values on signals, please use on_connected instead.
"""
def on_connected(self) -> None:
"""
Called after the device is connected and its signals are connected.
Default values for signals should be set here.
"""
self.start_backend()
self.start_live_mode()
def on_stage(self) -> DeviceStatus | StatusBase | None:
"""
Called while staging the device.
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
"""
def on_unstage(self) -> DeviceStatus | StatusBase | None:
"""Called while unstaging the device."""
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
def on_trigger(self) -> DeviceStatus | StatusBase | None:
"""Called when the device is triggered."""
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
def on_stop(self) -> None:
"""Called when the device is stopped."""
def on_destroy(self) -> None:
"""Called when the device is destroyed. Cleanup resources here."""
self.stop_live_mode()
if __name__ == "__main__": if __name__ == "__main__":
# Example usage of the IDSCamera class # Example usage
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False) camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.") camera.wait_for_connection()
camera.on_destroy()

View File

@@ -0,0 +1,226 @@
"""IDS Camera class for cSAXS IDS cameras."""
from __future__ import annotations
import threading
import time
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
import numpy as np
from bec_lib import messages
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
if TYPE_CHECKING:
from bec_lib.devicemanager import ScanInfo
from pydantic import ValidationInfo
logger = bec_logger.logger
class IDSCamera(PSIDeviceBase):
"""IDS Camera class for cSAXS.
This class inherits from PSIDeviceBase and implements the necessary methods
to interact with the IDS camera using the pyueye library.
"""
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
roi_signal = Cpt(
AsyncSignal,
name="roi_signal",
ndim=0,
max_size=1000,
doc="Signal for the region of interest (ROI).",
async_update={"type": "add", "max_shape": [None]},
)
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
def __init__(
self,
*,
name: str,
camera_id: int,
prefix: str = "",
scan_info: ScanInfo | None = None,
m_n_colormode: Literal[0, 1, 2, 3] = 1,
bits_per_pixel: Literal[8, 24] = 24,
live_mode: bool = False,
**kwargs,
):
"""Initialize the IDS Camera.
Args:
name (str): Name of the device.
camera_id (int): The ID of the camera device.
prefix (str): Prefix for the device.
scan_info (ScanInfo | None): Scan information for the device.
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
live_mode (bool): Whether to enable live mode for the camera.
"""
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
self._live_mode_thread: threading.Thread | None = None
self._stop_live_mode_event: threading.Event = threading.Event()
self.cam = Camera(
camera_id=camera_id,
m_n_colormode=m_n_colormode,
bits_per_pixel=bits_per_pixel,
connect=False,
)
self._live_mode = False
self._inputs = {"live_mode": live_mode}
self._mask = np.zeros((1, 1), dtype=np.uint8)
############## Live Mode Methods ##############
@property
def mask(self) -> np.ndarray:
"""Return the current region of interest (ROI) for the camera."""
return self._mask
@mask.setter
def mask(self, value: np.ndarray):
"""
Set the region of interest (ROI) for the camera.
Args:
value (np.ndarray): The mask to set as the ROI.
"""
if value.ndim != 2:
raise ValueError("ROI mask must be a 2D array.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
raise ValueError(
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
)
self._mask = value
@property
def live_mode(self) -> bool:
"""Return whether the camera is in live mode."""
return self._live_mode
@live_mode.setter
def live_mode(self, value: bool):
"""Set the live mode for the camera."""
if value != self._live_mode:
if self.cam._connected is False: # $ pylint: disable=protected-access
self.cam.on_connect()
self._live_mode = value
if value:
self._start_live()
else:
self._stop_live()
def set_rect_roi(self, x: int, y: int, width: int, height: int):
"""Set the rectangular region of interest (ROI) for the camera."""
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ValueError("ROI coordinates and dimensions must be positive integers.")
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
if x + width > img_shape[1] or y + height > img_shape[0]:
raise ValueError("ROI exceeds camera dimensions.")
mask = np.zeros(img_shape, dtype=np.uint8)
mask[y : y + height, x : x + width] = 1
self.mask = mask
def _start_live(self):
"""Start the live mode for the camera."""
if self._live_mode_thread is not None:
logger.info("Live mode thread is already running.")
return
self._stop_live_mode_event.clear()
self._live_mode_thread = threading.Thread(
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
)
self._live_mode_thread.start()
def _stop_live(self):
"""Stop the live mode for the camera."""
if self._live_mode_thread is None:
logger.info("Live mode thread is not running.")
return
self._stop_live_mode_event.set()
self._live_mode_thread.join(timeout=5)
if self._live_mode_thread.is_alive():
logger.warning("Live mode thread did not stop gracefully.")
else:
self._live_mode_thread = None
logger.info("Live mode stopped.")
def _live_mode_loop(self, stop_event: threading.Event):
"""Loop to capture images in live mode."""
while not stop_event.is_set():
try:
self.process_data(self.cam.get_image_data())
except Exception as e:
logger.error(f"Error in live mode loop: {e}")
break
stop_event.wait(0.2) # 5 Hz
def process_data(self, image: np.ndarray | None):
"""Process the image data before sending it to the preview signal."""
if image is None:
return
self.image.put(image)
def get_last_image(self) -> np.ndarray:
"""Get the last captured image from the camera."""
image = self.image.get()
if image:
return image.data
############## User Interface Methods ##############
def on_connected(self):
"""Connect to the camera."""
self.cam.on_connect()
self.live_mode = self._inputs.get("live_mode", False)
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
def on_destroy(self):
"""Clean up resources when the device is destroyed."""
self.cam.on_disconnect()
super().on_destroy()
def on_trigger(self):
"""Handle the trigger event."""
if not self.live_mode:
return
image = self.image.get()
if image is not None:
image: messages.DevicePreviewMessage
if self.mask.shape[0:2] != image.data.shape[0:2]:
logger.info(
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
)
return
if len(image.data.shape) == 3:
# If the image has multiple channels, apply the mask to each channel
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
n_channels = 3
else:
data = image.data * self.mask
n_channels = 1
self.roi_signal.put(
{
self.roi_signal.name: {
"value": np.sum(data)
/ (np.sum(self.mask) * n_channels), # TODO could be optimized
"timestamp": time.time(),
}
}
)
if __name__ == "__main__":
# Example usage of the IDSCamera class
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")

View File

@@ -412,11 +412,10 @@ class NPointAxis(Device, PositionerBase):
sign=1, sign=1,
socket_cls=SocketIO, socket_cls=SocketIO,
tolerance: float = 0.05, tolerance: float = 0.05,
device_manager=None,
**kwargs, **kwargs,
): ):
self.controller = NPointController( self.controller = NPointController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign

View File

@@ -24,25 +24,24 @@ class FlomniSampleStorage(Device):
SUB_VALUE = "value" SUB_VALUE = "value"
_default_sub = SUB_VALUE _default_sub = SUB_VALUE
sample_placed = { sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
} }
sample_placed = Dcpt(sample_placed) sample_placed = Dcpt(sample_placed)
sample_names = { sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
for i in range(21) for i in range(21)
} }
sample_names = Dcpt(sample_names) sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt( sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
) )
sample_in_gripper_name = Cpt( sample_in_gripper_name = Cpt(
EpicsSignal, EpicsSignal,
name="sample_in_gripper_name", name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC", read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
string=True, string=True,
auto_monitor=True
) )
def __init__(self, prefix="", *, name, **kwargs): def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -1,208 +0,0 @@
import time
import datetime
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable
import numpy as np
class FlomniTempHumError(Exception):
pass
class FlomniTempHum(Device):
USER_ACCESS = [
"show_all",
"help",
]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
temperature_mirror = Cpt(
EpicsSignal, name="temperature_mirror", read_pv="XOMNI-TEMPHUM-MIRROR:0.VAL"
)
temperature_mirrorset_set = Cpt(
EpicsSignal, name="temperature_mirrorset_set", read_pv="XOMNI-TEMPHUM-MIRRORSET_SET:0.VAL"
)
temperature_mirrorset_rb = Cpt(
EpicsSignal, name="temperature_mirrorset_rb", read_pv="XOMNI-TEMPHUM-MIRRORSET_RB:0.VAL"
)
temperature_osa = Cpt(
EpicsSignal, name="temperature_osa", read_pv="XOMNI-TEMPHUM-OSA:0.VAL"
)
temperature_osaset_set = Cpt(
EpicsSignal, name="temperature_osaset_set", read_pv="XOMNI-TEMPHUM-OSASET_SET:0.VAL"
)
temperature_osaset_rb = Cpt(
EpicsSignal, name="temperature_osaset_rb", read_pv="XOMNI-TEMPHUM-OSASET_RB:0.VAL"
)
omegactrl_alive = Cpt(
EpicsSignal, name="omegactrl_alive", read_pv="XOMNI-TEMPHUM-OMEGACTRL-ALIVE:0.VAL"
)
galilctrl_alive = Cpt(
EpicsSignal, name="galilctrl_alive", read_pv="XOMNI-TEMPHUM-GALILCTRL-ALIVE:0.VAL"
)
temperature_heater = Cpt(
EpicsSignal, name="temperature_heater", read_pv="XOMNI-TEMPHUM-HEATER:0.VAL"
)
temperature_heaterset_set = Cpt(
EpicsSignal, name="temperature_heaterset_set", read_pv="XOMNI-TEMPHUM-HEATERSET_SET:0.VAL"
)
temperature_heaterset_rb = Cpt(
EpicsSignal, name="temperature_heaterset_rb", read_pv="XOMNI-TEMPHUM-HEATERSET_RB:0.VAL"
)
temperature_heaterhousing = Cpt(
EpicsSignal, name="temperature_heaterhousing", read_pv="XOMNI-TEMPHUM-HEATERHOUSE:0.VAL"
)
temperature_heaterhousing_alarm = Cpt(
EpicsSignal, name="temperature_heaterhousing_alarm", read_pv="XOMNI-TEMPHUM-HEATERHOUSEALARM:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
temperature_heater_enabled = Cpt(
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
)
###### GALIL CONTROLLER
humidity_sensor1 = Cpt(
EpicsSignal, name="humidity_sensor1", read_pv="XOMNI-TEMPHUM-HUM1:0.VAL"
)
humidity_sensor2 = Cpt(
EpicsSignal, name="humidity_sensor2", read_pv="XOMNI-TEMPHUM-HUM2:0.VAL"
)
humidity_sensor1_temperature = Cpt(
EpicsSignal, name="humidity_sensor1_temperature", read_pv="XOMNI-TEMPHUM-TEMP1:0.VAL"
)
humidity_sensor2_temperature = Cpt(
EpicsSignal, name="humidity_sensor2_temperature", read_pv="XOMNI-TEMPHUM-TEMP2:0.VAL"
)
humidity_sensor1_err = Cpt(
EpicsSignal, name="humidity_sensor1_err", read_pv="XOMNI-TEMPHUM-ERR1:0.VAL"
)
humidity_sensor2_err = Cpt(
EpicsSignal, name="humidity_sensor2_err", read_pv="XOMNI-TEMPHUM-ERR2:0.VAL"
)
flow = Cpt(
EpicsSignal, name="flow", read_pv="XOMNI-TEMPHUM-FLOW:0.VAL"
)
flowset = Cpt(
EpicsSignal, name="flowset", read_pv="XOMNI-TEMPHUM-FLOWSET:0.VAL"
)
flowset_set = Cpt(
EpicsSignal, name="flowset_set", read_pv="XOMNI-TEMPHUM-FLOWSETSET:0.VAL"
)
humidityset = Cpt(
EpicsSignal, name="humidityset", read_pv="XOMNI-TEMPHUM-HUMSET:0.VAL"
)
humidityset_set = Cpt(
EpicsSignal, name="humidityset_set", read_pv="XOMNI-TEMPHUM-HUMSETSET:0.VAL"
)
suction = Cpt(
EpicsSignal, name="suction", read_pv="XOMNI-TEMPHUM-SUCTION:0.VAL"
)
valvedry = Cpt(
EpicsSignal, name="valvedry", read_pv="XOMNI-TEMPHUM-VALVEDRY:0.VAL"
)
valvewet = Cpt(
EpicsSignal, name="valvewet", read_pv="XOMNI-TEMPHUM-VALVEWET:0.VAL"
)
setuptemp = Cpt(
EpicsSignal, name="setuptemp", read_pv="XOMNI-TEMPHUM-SETUPTEMP:0.VAL"
)
def omega_controller_running(self):
time_diff = np.fabs(float(self.omegactrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def galil_controller_running(self):
time_diff = np.fabs(float(self.galilctrl_alive.get()) - time.time())
if time_diff > 120:
return False
else:
return True
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.temperature_mirror.subscribe(self._emit_value, run=False)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
def show_all(self):
print("=== flOMNI Temperature & Humidity Overview ===")
print("")
print("Temperatures:")
print(f" Mirror: {float(self.temperature_mirror.get()):7.2f} °C")
print(f" Mirror Setpoint (RB): {float(self.temperature_mirrorset_rb.get()):7.2f} °C")
print(f" OSA: {float(self.temperature_osa.get()):7.2f} °C")
print(f" OSA Setpoint (RB): {float(self.temperature_osaset_rb.get()):7.2f} °C")
print(f" Heater: {float(self.temperature_heater.get()):7.2f} °C")
print(f" Heater Setpoint (RB): {float(self.temperature_heaterset_rb.get()):7.2f} °C")
print(f" Heater Enabled: {float(self.temperature_heater_enabled.get()):.0f}")
print(f" Heater Housing: {float(self.temperature_heaterhousing.get()):7.2f} °C")
print(f" Heater Housing Alarm: {float(self.temperature_heaterhousing_alarm.get()):.0f}")
print("")
print("Humidity Sensors:")
print(f" Sensor 1 Humidity: {float(self.humidity_sensor1.get()):7.2f} %RH")
print(f" Sensor 1 Temperature: {float(self.humidity_sensor1_temperature.get()):7.2f} °C")
print(f" Sensor 1 Error: {float(self.humidity_sensor1_err.get()):.0f}")
print(f" Sensor 2 Humidity: {float(self.humidity_sensor2.get()):7.2f} %RH")
print(f" Sensor 2 Temperature: {float(self.humidity_sensor2_temperature.get()):7.2f} °C")
print(f" Sensor 2 Error: {float(self.humidity_sensor2_err.get()):.0f}")
print(f" Humidity Setpoint: {float(self.humidityset.get()):7.2f} %RH")
print("")
print("Flow Control:")
print(f" Flow: {float(self.flow.get()):7.2f} sccm")
print(f" Flow Setpoint (RB): {float(self.flowset.get()):7.2f} sccm")
print("")
print("Suction:")
print(f" Suction: {float(self.suction.get()):7.2f}")
print("")
print("Valves:")
print(f" Dry Valve: {float(self.valvedry.get()):.0f}")
print(f" Wet Valve: {float(self.valvewet.get()):.0f}")
print("")
print("Controller Heartbeats:")
print(f" OMEGA Controller Alive: {self.omega_controller_running()}")
print(f" GALIL Controller Alive: {self.galil_controller_running()}")
print("==============================================")
def help(self):
print("Help for flOMNI temperature and humidity control system:")
print("Available methods:")
print(" show_all() - display all current values")

View File

@@ -175,7 +175,7 @@ class FlomniGalilMotor(Device, PositionerBase):
**kwargs, **kwargs,
): ):
self.controller = FlomniGalilController( self.controller = FlomniGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -212,10 +212,6 @@ class FlomniGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -346,10 +342,10 @@ class FlomniGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction. Drive an axis to the limit in a specified direction.
Args: Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
""" """
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction) self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
# now force position read to cache #now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())

View File

@@ -149,7 +149,7 @@ class FuprGalilMotor(Device, PositionerBase):
**kwargs, **kwargs,
): ):
self.controller = FuprGalilController( self.controller = FuprGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -185,10 +185,6 @@ class FuprGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -59,12 +59,12 @@ class GalilController(Controller):
"all_axes_referenced", "all_axes_referenced",
] ]
OKBLUE = "\033[94m" OKBLUE = '\033[94m'
OKCYAN = "\033[96m" OKCYAN = '\033[96m'
OKGREEN = "\033[92m" OKGREEN = '\033[92m'
WARNING = "\033[93m" WARNING = '\033[93m'
FAIL = "\033[91m" FAIL = '\033[91m'
ENDC = "\033[0m" ENDC = '\033[0m'
@threadlocked @threadlocked
def socket_put(self, val: str) -> None: def socket_put(self, val: str) -> None:
@@ -115,29 +115,29 @@ class GalilController(Controller):
def axis_is_referenced(self, axis_Id_numeric) -> bool: def axis_is_referenced(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip())) return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
def folerr_status(self, axis_Id_numeric) -> bool: def folerr_status(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG folaxerr[{axis_Id_numeric}]").strip())) return bool(float(self.socket_put_and_receive(f"MG folaxerr[{axis_Id_numeric}]").strip()))
def motor_temperature(self, axis_Id_numeric) -> float: def motor_temperature(self, axis_Id_numeric) -> float:
# this is only valid for omny. consider moving to ogalil #this is only valid for omny. consider moving to ogalil
voltage = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) voltage = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip())
voltage2 = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip()) voltage2 = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip())
if voltage2 < voltage: if voltage2 < voltage:
voltage = voltage2 voltage = voltage2
# convert from [-10,10]V to [0,300]degC # convert from [-10,10]V to [0,300]degC
temperature_degC = round((voltage + 10.0) / 20.0 * 300.0, 1) temperature_degC = round((voltage+10.0) / 20.0 * 300.0, 1)
# the motors of the parking station have a different offset #the motors of the parking station have a different offset
# the range is reduced, so if at the limit, we show an extreme value #the range is reduced, so if at the limit, we show an extreme value
if self.sock.port == 8082: if self.sock.port == 8082:
# controller 2 #controller 2
if axis_Id_numeric == 6: if axis_Id_numeric == 6:
temperature_degC = round((voltage + 10.0 - 11.4) / 20.0 * 300.0, 1) temperature_degC = round((voltage+10.0-11.4) / 20.0 * 300.0, 1)
if voltage > 9.9: if voltage > 9.9:
temperature_degC = 300 temperature_degC = 300
if axis_Id_numeric == 7: if axis_Id_numeric == 7:
temperature_degC = round((voltage + 0.0 - 12) / 20.0 * 300.0, 1) temperature_degC = round((voltage+.0-12) / 20.0 * 300.0, 1)
if voltage > 9.9: if voltage > 9.9:
temperature_degC = 300 temperature_degC = 300
return temperature_degC return temperature_degC
@@ -147,15 +147,16 @@ class GalilController(Controller):
Check if all axes are referenced. Check if all axes are referenced.
""" """
return bool(float(self.socket_put_and_receive("MG allaxref").strip())) return bool(float(self.socket_put_and_receive("MG allaxref").strip()))
def _omny_get_microstep_position(self, axis_Id): def _omny_get_microstep_position(self,axis_Id):
return float(self.socket_put_and_receive(f"MG _TD{axis_Id}").strip()) return float(self.socket_put_and_receive(f"MG _TD{axis_Id}").strip())
def _omny_get_reference_limit(self, axis_Id):
def _omny_get_reference_limit(self,axis_Id):
get_axis_no = float(self.socket_put_and_receive(f"MG frmmv").strip()) get_axis_no = float(self.socket_put_and_receive(f"MG frmmv").strip())
if get_axis_no > 0: if(get_axis_no>0):
reference_is_before = float(self.socket_put_and_receive(f"MG _FL{axis_Id}").strip()) reference_is_before = float(self.socket_put_and_receive(f"MG _FL{axis_Id}").strip())
elif get_axis_no < 0: elif(get_axis_no<0):
reference_is_before = float(self.socket_put_and_receive(f"MG _BL{axis_Id}").strip()) reference_is_before = float(self.socket_put_and_receive(f"MG _BL{axis_Id}").strip())
else: else:
reference_is_before = 0 reference_is_before = 0
@@ -186,11 +187,7 @@ class GalilController(Controller):
while self.is_axis_moving(None, axis_Id_numeric): while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.01) time.sleep(0.01)
if verbose: if verbose:
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}", scope="drive axis to limit", show_asap=True)
f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}",
scope="drive axis to limit",
show_asap=True,
)
time.sleep(0.5) time.sleep(0.5)
# check if we actually hit the limit # check if we actually hit the limit
@@ -204,7 +201,13 @@ class GalilController(Controller):
else: else:
print("Limit reached.") print("Limit reached.")
def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error=1) -> None: def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error = 1) -> None:
""" """
Find the reference of an axis. Find the reference of an axis.
@@ -221,11 +224,7 @@ class GalilController(Controller):
while self.is_axis_moving(None, axis_Id_numeric): while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.1) time.sleep(0.1)
if verbose: if verbose:
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}", scope="find axis reference", show_asap=True)
f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}",
scope="find axis reference",
show_asap=True,
)
time.sleep(0.5) time.sleep(0.5)
if not self.axis_is_referenced(axis_Id_numeric): if not self.axis_is_referenced(axis_Id_numeric):
@@ -237,6 +236,7 @@ class GalilController(Controller):
logger.info(f"Successfully found reference of axis {axis_Id_numeric}.") logger.info(f"Successfully found reference of axis {axis_Id_numeric}.")
print(f"Successfully found reference of axis {axis_Id_numeric}.") print(f"Successfully found reference of axis {axis_Id_numeric}.")
def show_running_threads(self) -> None: def show_running_threads(self) -> None:
t = PrettyTable() t = PrettyTable()
t.title = f"Threads on {self.sock.host}:{self.sock.port}" t.title = f"Threads on {self.sock.host}:{self.sock.port}"
@@ -251,7 +251,7 @@ class GalilController(Controller):
def is_motor_on(self, axis_Id) -> bool: def is_motor_on(self, axis_Id) -> bool:
return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip())) return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip()))
def get_motor_limit_switch(self, axis_Id) -> list: def get_motor_limit_switch(self, axis_Id) -> list:
""" """
Get the status of the motor limit switches. Get the status of the motor limit switches.
@@ -269,7 +269,14 @@ class GalilController(Controller):
def describe(self) -> None: def describe(self) -> None:
t = PrettyTable() t = PrettyTable()
t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}" t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}"
field_names = ["Axis", "Name", "Referenced", "Motor On", "Limits", "Position"] field_names = [
"Axis",
"Name",
"Referenced",
"Motor On",
"Limits",
"Position",
]
# in case of OMNY # in case of OMNY
if self.sock.host == "mpc3217.psi.ch": if self.sock.host == "mpc3217.psi.ch":
field_names.append("Temperature") field_names.append("Temperature")
@@ -279,7 +286,7 @@ class GalilController(Controller):
axis = self._axis[ax] axis = self._axis[ax]
if axis is not None: if axis is not None:
if self.sock.host == "mpc3217.psi.ch": if self.sock.host == "mpc3217.psi.ch":
# case of omny. possibly consider moving to ogalil #case of omny. possibly consider moving to ogalil
motor_on = self.is_motor_on(axis.axis_Id) motor_on = self.is_motor_on(axis.axis_Id)
if motor_on == True: if motor_on == True:
motor_on = self.WARNING + "ON" + self.ENDC motor_on = self.WARNING + "ON" + self.ENDC
@@ -292,7 +299,7 @@ class GalilController(Controller):
else: else:
folerr_status = "False" folerr_status = "False"
position = axis.readback.read().get(axis.name).get("value") position = axis.readback.read().get(axis.name).get("value")
position = f"{position:.3f}" position = f'{position:.3f}'
t.add_row( t.add_row(
[ [
f"{axis.axis_Id_numeric}/{axis.axis_Id}", f"{axis.axis_Id_numeric}/{axis.axis_Id}",
@@ -323,6 +330,8 @@ class GalilController(Controller):
self.show_running_threads() self.show_running_threads()
self.show_status_other() self.show_status_other()
def show_status_other(self) -> None: def show_status_other(self) -> None:
""" """
Show additional device-specific status information. Show additional device-specific status information.
@@ -411,7 +420,7 @@ class GalilSetpointSignal(GalilSignalBase):
while self.controller.is_thread_active(0): while self.controller.is_thread_active(0):
time.sleep(0.1) time.sleep(0.1)
# in the case of lamni, consider moving to lgalil #in the case of lamni, consider moving to lgalil
if self.parent.axis_Id_numeric == 2 and self.controller.sock.host == "mpc2680.psi.ch": if self.parent.axis_Id_numeric == 2 and self.controller.sock.host == "mpc2680.psi.ch":
try: try:
rt = self.parent.device_manager.devices[self.parent.rt] rt = self.parent.device_manager.devices[self.parent.rt]

View File

@@ -73,7 +73,6 @@ class LamniGalilController(GalilController):
air_off = bool(self.socket_put_and_receive("MG@OUT[13]")) air_off = bool(self.socket_put_and_receive("MG@OUT[13]"))
return rt_not_blocked_by_galil and air_off return rt_not_blocked_by_galil and air_off
class LamniGalilReadbackSignal(GalilSignalRO): class LamniGalilReadbackSignal(GalilSignalRO):
@retry_once @retry_once
@threadlocked @threadlocked
@@ -100,7 +99,6 @@ class LamniGalilReadbackSignal(GalilSignalRO):
logger.warning("Failed to set RT value during readback.") logger.warning("Failed to set RT value during readback.")
return val return val
class LamniGalilMotor(Device, PositionerBase): class LamniGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"] USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"]
readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted") readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted")
@@ -134,7 +132,7 @@ class LamniGalilMotor(Device, PositionerBase):
**kwargs, **kwargs,
): ):
self.controller = LamniGalilController( self.controller = LamniGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -170,10 +168,6 @@ class LamniGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -298,7 +292,7 @@ class LamniGalilMotor(Device, PositionerBase):
Find the reference of the axis. Find the reference of the axis.
""" """
self.controller.find_reference(self.axis_Id_numeric) self.controller.find_reference(self.axis_Id_numeric)
# now force position read to cache #now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -307,10 +301,10 @@ class LamniGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction. Drive an axis to the limit in a specified direction.
Args: Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
""" """
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction) self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
# now force position read to cache #now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())

View File

@@ -46,7 +46,7 @@ class GalilMotorResolution(GalilSignalRO):
@threadlocked @threadlocked
def _socket_get(self): def _socket_get(self):
if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2: if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2:
# rotation stage # rotation stage
return 89565.8666667 return 89565.8666667
else: else:
return 51200 return 51200
@@ -69,43 +69,37 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}")) current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
current_pos *= self.parent.sign current_pos *= self.parent.sign
step_mm = self.parent.motor_resolution.get() step_mm = self.parent.motor_resolution.get()
# here we introduce an offset of 25 to the rotation axis #here we introduce an offset of 25 to the rotation axis
# when setting a position this is taken into account in the controller #when setting a position this is taken into account in the controller
# that way we just do tomography from 0 to 180 degrees #that way we just do tomography from 0 to 180 degrees
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
return (current_pos / step_mm) + 25 return (current_pos / step_mm)+25
else: else:
return current_pos / step_mm return current_pos / step_mm
def read(self): def read(self):
self._metadata["timestamp"] = time.time() self._metadata["timestamp"] = time.time()
val = super().read() val = super().read()
# if reading rotation stage angle #if reading rotation stage angle
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083: if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
current_readback_value = val[self.parent.name]["value"] current_readback_value = val[self.parent.name]["value"]
# print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.") #print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}." message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}."
print(message) print(message)
self.parent.device_manager.connector.send_client_info( self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
message, scope="glitch detector", show_asap=True
)
val = super().read() val = super().read()
current_readback_value = val[self.parent.name]["value"] current_readback_value = val[self.parent.name]["value"]
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10): if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller." message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller."
print(message) print(message)
self.parent.device_manager.connector.send_client_info( self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
message, scope="glitch detector", show_asap=True
) self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed("allaxref=0")
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed(
"allaxref=0"
)
self.parent.device_manager.devices["osamroy"].obj.enabled = False self.parent.device_manager.devices["osamroy"].obj.enabled = False
return val return val
@@ -114,12 +108,13 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
try: try:
rt = self.parent.device_manager.devices["rtx"] rt = self.parent.device_manager.devices["rtx"]
if rt.enabled: if rt.enabled:
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54) rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]-25+54)
except KeyError: except KeyError:
logger.warning("Failed to set RT value during ogalil readback.") logger.warning("Failed to set RT value during ogalil readback.")
return val return val
class OMNYGalilController(GalilController): class OMNYGalilController(GalilController):
USER_ACCESS = [ USER_ACCESS = [
"describe", "describe",
@@ -137,18 +132,18 @@ class OMNYGalilController(GalilController):
"_ogalil_folerr_not_ignore", "_ogalil_folerr_not_ignore",
] ]
OKBLUE = "\033[94m" OKBLUE = '\033[94m'
OKCYAN = "\033[96m" OKCYAN = '\033[96m'
OKGREEN = "\033[92m" OKGREEN = '\033[92m'
WARNING = "\033[93m" WARNING = '\033[93m'
FAIL = "\033[91m" FAIL = '\033[91m'
ENDC = "\033[0m" ENDC = '\033[0m'
def on(self, timeout: int = 10) -> None: def on(self) -> None:
"""Open a new socket connection to the controller""" """Open a new socket connection to the controller"""
self._ogalil_switchsocket_switch_all_on() self._ogalil_switchsocket_switch_all_on()
time.sleep(0.3) time.sleep(0.3)
super().on(timeout=timeout) super().on()
def _ogalil_switchsocket(self, number: int, switch: bool): def _ogalil_switchsocket(self, number: int, switch: bool):
# number is socket number ranging from 1 to 4 # number is socket number ranging from 1 to 4
@@ -190,16 +185,15 @@ class OMNYGalilController(GalilController):
self.socket_put_confirmed("IgNoFol=1") self.socket_put_confirmed("IgNoFol=1")
self.socket_put_confirmed("XQ#STOP,1") self.socket_put_confirmed("XQ#STOP,1")
def _ogalil_set_axis_to_pos_wo_reference_search(
self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign def _ogalil_set_axis_to_pos_wo_reference_search(self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign):
):
self.socket_put_confirmed("IgNoFol=1") self.socket_put_confirmed("IgNoFol=1")
# pos_mm = pos_encoder / motor_resolution # pos_mm = pos_encoder / motor_resolution
pos_encoder = pos_mm * motor_resolution * motor_sign pos_encoder = pos_mm * motor_resolution * motor_sign
# print(motor_resolution) #print(motor_resolution)
self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}") self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}")
self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]") self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]")
@@ -209,6 +203,7 @@ class OMNYGalilController(GalilController):
self._ogalil_folerr_not_ignore() self._ogalil_folerr_not_ignore()
def _ogalil_folerr_not_ignore(self): def _ogalil_folerr_not_ignore(self):
self.socket_put_confirmed("IgNoFol=0") self.socket_put_confirmed("IgNoFol=0")
@@ -245,18 +240,7 @@ class OMNYGalilController(GalilController):
class OMNYGalilMotor(Device, PositionerBase): class OMNYGalilMotor(Device, PositionerBase):
USER_ACCESS = [ USER_ACCESS = ["controller", "find_reference", "omny_osamx_to_scan_center", "drive_axis_to_limit", "_ogalil_folerr_reset_and_ignore", "_ogalil_set_axis_to_pos_wo_reference_search", "get_motor_limit_switch", "axis_is_referenced", "get_motor_temperature", "folerr_status"]
"controller",
"find_reference",
"omny_osamx_to_scan_center",
"drive_axis_to_limit",
"_ogalil_folerr_reset_and_ignore",
"_ogalil_set_axis_to_pos_wo_reference_search",
"get_motor_limit_switch",
"axis_is_referenced",
"get_motor_temperature",
"folerr_status",
]
readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted") readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint") user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config") motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
@@ -288,7 +272,7 @@ class OMNYGalilMotor(Device, PositionerBase):
**kwargs, **kwargs,
): ):
self.controller = OMNYGalilController( self.controller = OMNYGalilController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
@@ -324,10 +308,6 @@ class OMNYGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -453,10 +433,8 @@ class OMNYGalilMotor(Device, PositionerBase):
def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm): def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm):
motor_resolution = self.motor_resolution.get() motor_resolution = self.motor_resolution.get()
self.controller._ogalil_set_axis_to_pos_wo_reference_search( self.controller._ogalil_set_axis_to_pos_wo_reference_search(self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign)
self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign #now force position read to cache
)
# now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -464,9 +442,9 @@ class OMNYGalilMotor(Device, PositionerBase):
""" """
Find the reference of the axis. Find the reference of the axis.
""" """
verbose = 1 verbose=1
self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error) self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error)
# now force position read to cache #now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -475,10 +453,10 @@ class OMNYGalilMotor(Device, PositionerBase):
Drive an axis to the limit in a specified direction. Drive an axis to the limit in a specified direction.
Args: Args:
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'. direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
""" """
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1) self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1)
# now force position read to cache #now force position read to cache
val = self.readback.read() val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
@@ -509,31 +487,29 @@ class OMNYGalilMotor(Device, PositionerBase):
def omny_osamx_to_scan_center(self, cenx): def omny_osamx_to_scan_center(self, cenx):
if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0: if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0:
# get last setpoint # get last setpoint
osamx = self.device_manager.devices["osamx"] osamx = self.device_manager.devices["osamx"]
osamx_current_setpoint = osamx.obj.readback.get() osamx_current_setpoint = osamx.obj.readback.get()
omny_samx_in = self._get_user_param_safe("osamx", "in") omny_samx_in = self._get_user_param_safe("osamx","in")
if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025: if np.fabs(osamx_current_setpoint-(omny_samx_in+cenx/1000)) > 0.025:
message = ( message=f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}." logger.info(message)
)
logger.info(message)
osamx.read_only = False osamx.read_only = False
# osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')") #osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
osamx.set(omny_samx_in + cenx / 1000) osamx.set(omny_samx_in+cenx/1000)
time.sleep(0.1) time.sleep(0.1)
while osamx.motor_is_moving.get(): while(osamx.motor_is_moving.get()):
time.sleep(0.05) time.sleep(0.05)
osamx.read_only = True osamx.read_only = True
time.sleep(2) time.sleep(2)
rt = self.device_manager.devices["rtx"] rt = self.device_manager.devices["rtx"]
if rt.enabled: if rt.enabled:
rt.obj.controller.laser_tracker_on() rt.obj.controller.laser_tracker_on()
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength() rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
def folerr_status(self) -> bool: def folerr_status(self) -> bool:
return self.controller.folerr_status(self.axis_Id_numeric) return self.controller.folerr_status(self.axis_Id_numeric)
def stop(self, *, success=False): def stop(self, *, success=False):
self.controller.stop_all_axes() self.controller.stop_all_axes()
return super().stop(success=success) return super().stop(success=success)

View File

@@ -52,12 +52,33 @@ class GalilController(Controller):
"fly_grid_scan", "fly_grid_scan",
"read_encoder_position", "read_encoder_position",
] ]
_axes_per_controller = 8
def on(self, timeout: int = 10) -> None: def __init__(
self,
*,
name="GalilController",
kind=None,
parent=None,
socket=None,
attr_name="",
labels=None,
):
if not hasattr(self, "_initialized") or not self._initialized:
self._galil_axis_per_controller = 8
self._axis = [None for axis_num in range(self._galil_axis_per_controller)]
super().__init__(
name=name,
socket=socket,
attr_name=attr_name,
parent=parent,
labels=labels,
kind=kind,
)
def on(self, controller_num=0) -> None:
"""Open a new socket connection to the controller""" """Open a new socket connection to the controller"""
if not self.connected: if not self.connected:
self.sock.open(timeout=timeout) self.sock.open()
self.connected = True self.connected = True
else: else:
logger.info("The connection has already been established.") logger.info("The connection has already been established.")
@@ -144,11 +165,11 @@ class GalilController(Controller):
def show_running_threads(self) -> None: def show_running_threads(self) -> None:
t = PrettyTable() t = PrettyTable()
t.title = f"Threads on {self.sock.host}:{self.sock.port}" t.title = f"Threads on {self.sock.host}:{self.sock.port}"
t.field_names = [str(ax) for ax in range(self._axes_per_controller)] t.field_names = [str(ax) for ax in range(self._galil_axis_per_controller)]
t.add_row( t.add_row(
[ [
"active" if self.is_thread_active(t) else "inactive" "active" if self.is_thread_active(t) else "inactive"
for t in range(self._axes_per_controller) for t in range(self._galil_axis_per_controller)
] ]
) )
print(t) print(t)
@@ -178,7 +199,7 @@ class GalilController(Controller):
"Limits", "Limits",
"Position", "Position",
] ]
for ax in range(self._axes_per_controller): for ax in range(self._galil_axis_per_controller):
axis = self._axis[ax] axis = self._axis[ax]
if axis is not None: if axis is not None:
t.add_row( t.add_row(
@@ -495,9 +516,7 @@ class SGalilMotor(Device, PositionerBase):
): ):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = GalilController( self.controller = GalilController(socket=socket_cls(host=host, port=port))
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
)
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = kwargs.pop("tolerance", 0.5) self.tolerance = kwargs.pop("tolerance", 0.5)
self.device_mapping = kwargs.pop("device_mapping", {}) self.device_mapping = kwargs.pop("device_mapping", {})
@@ -530,10 +549,6 @@ class SGalilMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -37,86 +37,84 @@ class OMNYSampleStorage(Device):
_default_sub = SUB_VALUE _default_sub = SUB_VALUE
sample_shuttle_A_placed = { sample_shuttle_A_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {}) for i in range(1, 7)
} }
sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed) sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed)
sample_shuttle_B_placed = { sample_shuttle_B_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {}) for i in range(1, 7)
} }
sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed) sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed)
sample_shuttle_C_placed = { sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
} }
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed) sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
sample_shuttle_C_placed = { sample_shuttle_C_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
} }
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed) sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
parking_placed = { parking_placed = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7) f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {}) for i in range(1, 7)
} }
parking_placed = Dcpt(parking_placed) parking_placed = Dcpt(parking_placed)
sample_placed = { sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101] for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
} }
sample_placed = Dcpt(sample_placed) sample_placed = Dcpt(sample_placed)
sample_shuttle_A_names = { sample_shuttle_A_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True})
for i in range(1, 7) for i in range(1, 7)
} }
sample_shuttle_A_names = Dcpt(sample_shuttle_A_names) sample_shuttle_A_names = Dcpt(sample_shuttle_A_names)
sample_shuttle_B_names = { sample_shuttle_B_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True})
for i in range(1, 7) for i in range(1, 7)
} }
sample_shuttle_B_names = Dcpt(sample_shuttle_B_names) sample_shuttle_B_names = Dcpt(sample_shuttle_B_names)
sample_shuttle_C_names = { sample_shuttle_C_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True})
for i in range(1, 7) for i in range(1, 7)
} }
sample_shuttle_C_names = Dcpt(sample_shuttle_C_names) sample_shuttle_C_names = Dcpt(sample_shuttle_C_names)
parking_names = { parking_names = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True}) f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True})
for i in range(1, 7) for i in range(1, 7)
} }
parking_names = Dcpt(parking_names) parking_names = Dcpt(parking_names)
sample_names = { sample_names = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True}) f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101] for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
} }
sample_names = Dcpt(sample_names) sample_names = Dcpt(sample_names)
sample_in_gripper = Cpt( sample_in_gripper = Cpt(
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL"
) )
sample_in_gripper_name = Cpt( sample_in_gripper_name = Cpt(
EpicsSignal, EpicsSignal,
name="sample_in_gripper_name", name="sample_in_gripper_name",
read_pv="XOMNY-SAMPLE_DB_omny:110.DESC", read_pv="XOMNY-SAMPLE_DB_omny:110.DESC",
string=True, string=True,
auto_monitor=True
) )
sample_in_samplestage = Cpt( sample_in_samplestage = Cpt(
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL"
) )
sample_in_samplestage_name = Cpt( sample_in_samplestage_name = Cpt(
EpicsSignal, EpicsSignal,
name="sample_in_samplestage_name", name="sample_in_samplestage_name",
read_pv="XOMNY-SAMPLE_DB_omny:0.DESC", read_pv="XOMNY-SAMPLE_DB_omny:0.DESC",
string=True, string=True,
auto_monitor=True
) )
def __init__(self, prefix="", *, name, **kwargs): def __init__(self, prefix="", *, name, **kwargs):

View File

@@ -57,7 +57,6 @@ class RtFlomniController(Controller):
socket_cls=None, socket_cls=None,
socket_host=None, socket_host=None,
socket_port=None, socket_port=None,
device_manager=None,
attr_name="", attr_name="",
parent=None, parent=None,
labels=None, labels=None,
@@ -68,7 +67,6 @@ class RtFlomniController(Controller):
socket_cls=socket_cls, socket_cls=socket_cls,
socket_host=socket_host, socket_host=socket_host,
socket_port=socket_port, socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name, attr_name=attr_name,
parent=parent, parent=parent,
labels=labels, labels=labels,
@@ -128,15 +126,15 @@ class RtFlomniController(Controller):
while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1: while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
time.sleep(0.05) time.sleep(0.05)
self.device_manager.devices.rty.update_user_parameter({"tomo_additional_offsety": 0}) self.get_device_manager().devices.rty.update_user_parameter({"tomo_additional_offsety": 0})
self.clear_trajectory_generator() self.clear_trajectory_generator()
self.laser_tracker_on() self.laser_tracker_on()
# move to 0. FUPR will set the rotation angle during readout # move to 0. FUPR will set the rotation angle during readout
self.device_manager.devices.fsamroy.obj.move(0, wait=True) self.get_device_manager().devices.fsamroy.obj.move(0, wait=True)
fsamx = self.device_manager.devices.fsamx fsamx = self.get_device_manager().devices.fsamx
fsamx.obj.pid_x_correction = 0 fsamx.obj.pid_x_correction = 0
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
@@ -166,18 +164,18 @@ class RtFlomniController(Controller):
self.show_cyclic_error_compensation() self.show_cyclic_error_compensation()
self.rt_pid_voltage = self.get_pid_x() self.rt_pid_voltage = self.get_pid_x()
rtx = self.device_manager.devices.rtx rtx = self.get_device_manager().devices.rtx
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage}) rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
self.set_device_read_write("fsamx", False) self.set_device_enabled("fsamx", False)
self.set_device_read_write("fsamy", False) self.set_device_enabled("fsamy", False)
self.set_device_read_write("foptx", False) self.set_device_enabled("foptx", False)
self.set_device_read_write("fopty", False) self.set_device_enabled("fopty", False)
def move_samx_to_scan_region(self, fovx: float, cenx: float): def move_samx_to_scan_region(self, fovx: float, cenx: float):
time.sleep(0.05) time.sleep(0.05)
if self.rt_pid_voltage is None: if self.rt_pid_voltage is None:
rtx = self.device_manager.devices.rtx rtx = self.get_device_manager().devices.rtx
self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage") self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage")
if self.rt_pid_voltage is None: if self.rt_pid_voltage is None:
raise RtError( raise RtError(
@@ -194,7 +192,7 @@ class RtFlomniController(Controller):
break break
wait_on_exit = True wait_on_exit = True
self.socket_put("v0") self.socket_put("v0")
fsamx = self.device_manager.devices.fsamx fsamx = self.get_device_manager().devices.fsamx
fsamx.read_only = False fsamx.read_only = False
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.007 fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.007
@@ -225,22 +223,22 @@ class RtFlomniController(Controller):
print("Feedback is not running; likely an error in the interferometer.") print("Feedback is not running; likely an error in the interferometer.")
raise RtError("Feedback is not running; likely an error in the interferometer.") raise RtError("Feedback is not running; likely an error in the interferometer.")
self.set_device_read_write("fsamx", False) self.set_device_enabled("fsamx", False)
self.set_device_read_write("fsamy", False) self.set_device_enabled("fsamy", False)
self.set_device_read_write("foptx", False) self.set_device_enabled("foptx", False)
self.set_device_read_write("fopty", False) self.set_device_enabled("fopty", False)
def feedback_disable(self): def feedback_disable(self):
self.clear_trajectory_generator() self.clear_trajectory_generator()
self.move_to_zero() self.move_to_zero()
self.socket_put("l0") self.socket_put("l0")
self.set_device_read_write("fsamx", True) self.set_device_enabled("fsamx", True)
self.set_device_read_write("fsamy", True) self.set_device_enabled("fsamy", True)
self.set_device_read_write("foptx", True) self.set_device_enabled("foptx", True)
self.set_device_read_write("fopty", True) self.set_device_enabled("fopty", True)
fsamx = self.device_manager.devices.fsamx fsamx = self.get_device_manager().devices.fsamx
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]") fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
print("rt feedback is now disalbed.") print("rt feedback is now disalbed.")
@@ -291,8 +289,12 @@ class RtFlomniController(Controller):
self.socket_put("T1") self.socket_put("T1")
time.sleep(0.5) time.sleep(0.5)
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0") self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0") "trackyct=0"
)
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
"trackzct=0"
)
self.laser_tracker_wait_on_target() self.laser_tracker_wait_on_target()
logger.info("Laser tracker running!") logger.info("Laser tracker running!")
@@ -339,7 +341,7 @@ class RtFlomniController(Controller):
} }
def laser_tracker_galil_enable(self): def laser_tracker_galil_enable(self):
ftrackz_con = self.device_manager.devices.ftrackz.obj.controller ftrackz_con = self.get_device_manager().devices.ftrackz.obj.controller
ftrackz_con.socket_put_confirmed("tracken=1") ftrackz_con.socket_put_confirmed("tracken=1")
ftrackz_con.socket_put_confirmed("trackyct=0") ftrackz_con.socket_put_confirmed("trackyct=0")
ftrackz_con.socket_put_confirmed("trackzct=0") ftrackz_con.socket_put_confirmed("trackzct=0")
@@ -387,12 +389,9 @@ class RtFlomniController(Controller):
self.laser_tracker_wait_on_target() self.laser_tracker_wait_on_target()
signal = self.read_ssi_interferometer(1) signal = self.read_ssi_interferometer(1)
rtx = self.device_manager.devices.rtx rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal") min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal") low_signal = rtx.user_parameter.get("low_signal")
print(f"low signal: {low_signal}")
print(f"min signal: {min_signal}")
print(f"signal: {signal}")
if signal < min_signal: if signal < min_signal:
time.sleep(1) time.sleep(1)
if signal < min_signal: if signal < min_signal:
@@ -476,6 +475,12 @@ class RtFlomniController(Controller):
current_position_in_scan = int(float(return_table[2])) current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan) return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def read_positions_from_sampler(self): def read_positions_from_sampler(self):
# this was for reading after the scan completed # this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
@@ -490,7 +495,7 @@ class RtFlomniController(Controller):
# if not (mode==2 or mode==3): # if not (mode==2 or mode==3):
# error # error
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata device="rt_scan", status=1, metadata=self.readout_metadata
@@ -525,7 +530,7 @@ class RtFlomniController(Controller):
signals = self._get_signals_from_table(return_table) signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0])) self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata device="rt_scan", status=0, metadata=self.readout_metadata
@@ -539,7 +544,7 @@ class RtFlomniController(Controller):
) )
def publish_device_data(self, signals, point_id): def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish( self.get_device_manager().connector.set_and_publish(
MessageEndpoints.device_read("rt_flomni"), MessageEndpoints.device_read("rt_flomni"),
messages.DeviceMessage( messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata} signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
@@ -650,7 +655,7 @@ class RtFlomniMotor(Device, PositionerBase):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = RtFlomniController( self.controller = RtFlomniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager self.device_manager = device_manager
@@ -678,10 +683,6 @@ class RtFlomniMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -809,7 +810,7 @@ class RtFlomniMotor(Device, PositionerBase):
if __name__ == "__main__": if __name__ == "__main__":
rtcontroller = RtFlomniController( rtcontroller = RtFlomniController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
) )
rtcontroller.on() rtcontroller.on()
rtcontroller.laser_tracker_on() rtcontroller.laser_tracker_on()

View File

@@ -71,7 +71,6 @@ class RtLamniController(Controller):
socket_cls=None, socket_cls=None,
socket_host=None, socket_host=None,
socket_port=None, socket_port=None,
device_manager=None,
attr_name="", attr_name="",
parent=None, parent=None,
labels=None, labels=None,
@@ -82,7 +81,6 @@ class RtLamniController(Controller):
socket_cls=socket_cls, socket_cls=socket_cls,
socket_host=socket_host, socket_host=socket_host,
socket_port=socket_port, socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name, attr_name=attr_name,
parent=parent, parent=parent,
labels=labels, labels=labels,
@@ -94,11 +92,11 @@ class RtLamniController(Controller):
def feedback_disable(self): def feedback_disable(self):
self.socket_put("J0") self.socket_put("J0")
logger.info("LamNI Feedback disabled.") logger.info("LamNI Feedback disabled.")
self.set_device_read_write("lsamx", True) self.set_device_enabled("lsamx", True)
self.set_device_read_write("lsamy", True) self.set_device_enabled("lsamy", True)
self.set_device_read_write("loptx", True) self.set_device_enabled("loptx", True)
self.set_device_read_write("lopty", True) self.set_device_enabled("lopty", True)
self.set_device_read_write("loptz", True) self.set_device_enabled("loptz", True)
def is_axis_moving(self, axis_Id) -> bool: def is_axis_moving(self, axis_Id) -> bool:
# this checks that axis is on target # this checks that axis is on target
@@ -152,25 +150,25 @@ class RtLamniController(Controller):
# set these as closed loop target position # set these as closed loop target position
self.socket_put(f"pa0,{x_curr:.4f}") self.socket_put(f"pa0,{x_curr:.4f}")
self.socket_put(f"pa1,{y_curr:.4f}") self.socket_put(f"pa1,{y_curr:.4f}")
self.device_manager.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr) self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
self.device_manager.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr) self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
self.socket_put("J5") self.socket_put("J5")
logger.info("LamNI Feedback enabled (without reset).") logger.info("LamNI Feedback enabled (without reset).")
self.set_device_read_write("lsamx", False) self.set_device_enabled("lsamx", False)
self.set_device_read_write("lsamy", False) self.set_device_enabled("lsamy", False)
self.set_device_read_write("loptx", False) self.set_device_enabled("loptx", False)
self.set_device_read_write("lopty", False) self.set_device_enabled("lopty", False)
self.set_device_read_write("loptz", False) self.set_device_enabled("loptz", False)
@threadlocked @threadlocked
def feedback_disable_and_even_reset_lamni_angle_interferometer(self): def feedback_disable_and_even_reset_lamni_angle_interferometer(self):
self.socket_put("J6") self.socket_put("J6")
logger.info("LamNI Feedback disabled including the angular interferometer.") logger.info("LamNI Feedback disabled including the angular interferometer.")
self.set_device_read_write("lsamx", True) self.set_device_enabled("lsamx", True)
self.set_device_read_write("lsamy", True) self.set_device_enabled("lsamy", True)
self.set_device_read_write("loptx", True) self.set_device_enabled("loptx", True)
self.set_device_read_write("lopty", True) self.set_device_enabled("lopty", True)
self.set_device_read_write("loptz", True) self.set_device_enabled("loptz", True)
@threadlocked @threadlocked
def clear_trajectory_generator(self): def clear_trajectory_generator(self):
@@ -286,7 +284,7 @@ class RtLamniController(Controller):
# if not (mode==2 or mode==3): # if not (mode==2 or mode==3):
# error # error
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata device="rt_scan", status=1, metadata=self.readout_metadata
@@ -321,7 +319,7 @@ class RtLamniController(Controller):
signals = self._get_signals_from_table(return_table) signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0])) self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata device="rt_scan", status=0, metadata=self.readout_metadata
@@ -333,7 +331,7 @@ class RtLamniController(Controller):
) )
def publish_device_data(self, signals, point_id): def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish( self.get_device_manager().connector.set_and_publish(
MessageEndpoints.device_read("rt_lamni"), MessageEndpoints.device_read("rt_lamni"),
messages.DeviceMessage( messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata} signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
@@ -368,10 +366,10 @@ class RtLamniController(Controller):
) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used ) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used
self.clear_trajectory_generator() self.clear_trajectory_generator()
self.device_manager.devices.lsamrot.obj.move(0, wait=True) self.get_device_manager().devices.lsamrot.obj.move(0, wait=True)
galil_controller_rt_status = ( galil_controller_rt_status = (
self.device_manager.devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled() self.get_device_manager().devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled()
) )
if galil_controller_rt_status == 0: if galil_controller_rt_status == 0:
@@ -384,16 +382,16 @@ class RtLamniController(Controller):
time.sleep(0.03) time.sleep(0.03)
lsamx_user_params = self.device_manager.devices.lsamx.user_parameter lsamx_user_params = self.get_device_manager().devices.lsamx.user_parameter
if lsamx_user_params is None or lsamx_user_params.get("center") is None: if lsamx_user_params is None or lsamx_user_params.get("center") is None:
raise RuntimeError("lsamx center is not defined") raise RuntimeError("lsamx center is not defined")
lsamy_user_params = self.device_manager.devices.lsamy.user_parameter lsamy_user_params = self.get_device_manager().devices.lsamy.user_parameter
if lsamy_user_params is None or lsamy_user_params.get("center") is None: if lsamy_user_params is None or lsamy_user_params.get("center") is None:
raise RuntimeError("lsamy center is not defined") raise RuntimeError("lsamy center is not defined")
lsamx_center = lsamx_user_params.get("center") lsamx_center = lsamx_user_params.get("center")
lsamy_center = lsamy_user_params.get("center") lsamy_center = lsamy_user_params.get("center")
self.device_manager.devices.lsamx.obj.move(lsamx_center, wait=True) self.get_device_manager().devices.lsamx.obj.move(lsamx_center, wait=True)
self.device_manager.devices.lsamy.obj.move(lsamy_center, wait=True) self.get_device_manager().devices.lsamy.obj.move(lsamy_center, wait=True)
self.socket_put("J1") self.socket_put("J1")
_waitforfeedbackctr = 0 _waitforfeedbackctr = 0
@@ -407,11 +405,11 @@ class RtLamniController(Controller):
(self.socket_put_and_receive("J2")).split(",")[0] (self.socket_put_and_receive("J2")).split(",")[0]
) )
self.set_device_read_write("lsamx", False) self.set_device_enabled("lsamx", False)
self.set_device_read_write("lsamy", False) self.set_device_enabled("lsamy", False)
self.set_device_read_write("loptx", False) self.set_device_enabled("loptx", False)
self.set_device_read_write("lopty", False) self.set_device_enabled("lopty", False)
self.set_device_read_write("loptz", False) self.set_device_enabled("loptz", False)
if interferometer_feedback_not_running == 1: if interferometer_feedback_not_running == 1:
logger.error( logger.error(
@@ -561,7 +559,7 @@ class RtLamniMotor(Device, PositionerBase):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = RtLamniController( self.controller = RtLamniController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager self.device_manager = device_manager
@@ -588,10 +586,6 @@ class RtLamniMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())

View File

@@ -1,8 +1,8 @@
import builtins
import socket
import threading import threading
import time import time
from typing import List from typing import List
import builtins
import socket
import numpy as np import numpy as np
from bec_lib import bec_logger, messages from bec_lib import bec_logger, messages
@@ -34,15 +34,12 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
logger = bec_logger.logger logger = bec_logger.logger
class RtOMNY_mirror_switchbox_Error(Exception): class RtOMNY_mirror_switchbox_Error(Exception):
pass pass
class RtOMNY_Error(Exception): class RtOMNY_Error(Exception):
pass pass
class RtOMNYController(Controller): class RtOMNYController(Controller):
_axes_per_controller = 3 _axes_per_controller = 3
red = "\x1b[91m" red = "\x1b[91m"
@@ -90,7 +87,6 @@ class RtOMNYController(Controller):
socket_cls=None, socket_cls=None,
socket_host=None, socket_host=None,
socket_port=None, socket_port=None,
device_manager=None,
attr_name="", attr_name="",
parent=None, parent=None,
labels=None, labels=None,
@@ -101,7 +97,6 @@ class RtOMNYController(Controller):
socket_cls=socket_cls, socket_cls=socket_cls,
socket_host=socket_host, socket_host=socket_host,
socket_port=socket_port, socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name, attr_name=attr_name,
parent=parent, parent=parent,
labels=labels, labels=labels,
@@ -239,7 +234,7 @@ class RtOMNYController(Controller):
"opt_amplitude1_neg": 3000, "opt_amplitude1_neg": 3000,
"opt_amplitude2_pos": 3000, "opt_amplitude2_pos": 3000,
"opt_amplitude2_neg": 3000, "opt_amplitude2_neg": 3000,
}, }
} }
# def is_axis_moving(self, axis_Id) -> bool: # def is_axis_moving(self, axis_Id) -> bool:
@@ -266,60 +261,42 @@ class RtOMNYController(Controller):
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start() threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
def get_mirror_parameters(self, channel): def get_mirror_parameters(self,channel):
return self.mirror_parameters[channel] return self.mirror_parameters[channel]
def laser_tracker_check_and_wait_for_signalstrength(self): def laser_tracker_check_and_wait_for_signalstrength(self):
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info("Checking laser tracker...", scope="", show_asap=True)
"Checking laser tracker...", scope="", show_asap=True
)
if not self.laser_tracker_check_enabled(): if not self.laser_tracker_check_enabled():
print( print("laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled.")
"laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled."
)
return return
# first check on target #first check on target
self.laser_tracker_wait_on_target() self.laser_tracker_wait_on_target()
# when on target, check interferometer signal #when on target, check interferometer signal
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
rtx = self.device_manager.devices.rtx rtx = self.get_device_manager().devices.rtx
min_signal = rtx.user_parameter.get("min_signal") min_signal = rtx.user_parameter.get("min_signal")
low_signal = rtx.user_parameter.get("low_signal") low_signal = rtx.user_parameter.get("low_signal")
wait_counter = 0 wait_counter = 0
while signal < min_signal and wait_counter < 10: while signal < min_signal and wait_counter<10:
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
wait_counter += 1 wait_counter+=1
time.sleep(0.2) time.sleep(0.2)
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1) signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
if signal < low_signal: if signal < low_signal:
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m",
scope="laser_tracker_check_and_wait_for_signalstrength",
show_asap=True,
)
self.omny_interferometer_align_tracking() self.omny_interferometer_align_tracking()
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info("Checking laser tracker completed.", scope="", show_asap=True)
"Checking laser tracker completed.", scope="", show_asap=True
)
def omny_interferometer_align_tracking(self): def omny_interferometer_align_tracking(self):
mirror_channel = 6 mirror_channel=6
signal = self._omny_interferometer_get_signalsample( signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print( print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
else: else:
self._omny_interferometer_switch_channel(mirror_channel) self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1) time.sleep(0.1)
@@ -330,19 +307,16 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_alloff() self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer() self.show_signal_strength_interferometer()
mirror_channel = -1 mirror_channel=-1
def omny_interferometer_align_incoupling_angle(self): def omny_interferometer_align_incoupling_angle(self):
mirror_channel = 1 mirror_channel=1
signal = self._omny_interferometer_get_signalsample( signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print( print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
)
else: else:
self._omny_interferometer_switch_channel(mirror_channel) self._omny_interferometer_switch_channel(mirror_channel)
time.sleep(0.1) time.sleep(0.1)
@@ -353,18 +327,19 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_alloff() self._omny_interferometer_switch_alloff()
self.show_signal_strength_interferometer() self.show_signal_strength_interferometer()
mirror_channel = -1 mirror_channel=-1
def _omny_interferometer_openloop_steps(self, channel, steps, amplitude): def _omny_interferometer_openloop_steps(self, channel, steps, amplitude):
if channel not in range(3, 5): if channel not in range(3,5):
raise RtOMNY_Error(f"invalid channel number {channel}.") raise RtOMNY_Error(f"invalid channel number {channel}.")
if amplitude > 4090: if amplitude > 4090:
amplitude = 4090 amplitude = 4090
elif amplitude < 10: elif amplitude < 10:
amplitude = 10 amplitude = 10
oshield = self.device_manager.devices.oshield oshield = self.get_device_manager().devices.oshield
oshield.obj.controller.move_open_loop_steps( oshield.obj.controller.move_open_loop_steps(
channel, steps, amplitude=amplitude, frequency=500 channel, steps, amplitude=amplitude, frequency=500
@@ -376,7 +351,7 @@ class RtOMNYController(Controller):
def _omny_interferometer_optimize(self, mirror_channel, channel): def _omny_interferometer_optimize(self, mirror_channel, channel):
if mirror_channel == -1: if mirror_channel == -1:
raise RtOMNY_Error("no mirror channel selected") raise RtOMNY_Error("no mirror channel selected")
# mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror #mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
if channel == 3: if channel == 3:
steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"] steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"]
steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"] steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"]
@@ -390,80 +365,67 @@ class RtOMNYController(Controller):
else: else:
raise RtOMNY_Error(f"invalid channel number {channel}.") raise RtOMNY_Error(f"invalid channel number {channel}.")
previous_signal = self._omny_interferometer_get_signalsample( previous_signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"] min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
if previous_signal < min_begin: if previous_signal < min_begin:
# raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") #raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
print(f"\rMinimum signal for auto alignment {min_begin} not reached.") print(f"\rMinimum signal for auto alignment {min_begin} not reached.")
return return
elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]: elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
print( print(f"\rInterferometer signal of axis is good") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
f"\rInterferometer signal of axis is good" return
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
return
else: else:
direction = 1 direction = 1
cycle_counter = 0 cycle_counter=0
cycle_max = 20 cycle_max=20
reversal_counter = 0 reversal_counter=0
reversal_max = 4 reversal_max=4
self.mirror_amplitutde_increase = 0 self.mirror_amplitutde_increase=0
current_sample = self._omny_interferometer_get_signalsample( current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
self.mirror_parameters[mirror_channel]["opt_signalchannel"], max=current_sample
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
max = current_sample
while ( while current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"] and cycle_counter<cycle_max and reversal_counter < reversal_max:
current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"]
and cycle_counter < cycle_max
and reversal_counter < reversal_max
):
# if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]: # if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]:
# raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.") # raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
if direction > 0: if direction>0:
self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos) self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos)
verbose_str = f"channel {channel}, steps {steps_pos}" verbose_str = f"channel {channel}, steps {steps_pos}"
else: else:
self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg) self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg)
verbose_str = f"auto action {channel}, steps {-steps_pos}" verbose_str = f"auto action {channel}, steps {-steps_pos}"
# print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}") #print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
current_sample = self._omny_interferometer_get_signalsample( current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
)
opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"] opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"]
info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}" info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}"
message = info_str + " (q)uit \r" message=info_str +" (q)uit \r"
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(message, scope="_omny_interferometer_optimize", show_asap=True)
message, scope="_omny_interferometer_optimize", show_asap=True
)
if previous_signal>current_sample:
if direction<0:
steps_pos=int(steps_pos/2)
steps_neg=int(steps_neg/2)
if steps_pos<1:
steps_pos=1
if steps_neg<1:
steps_neg=1
direction=direction*(-1)
reversal_counter+=1
previous_signal=current_sample
cycle_counter+=1
print(f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
if previous_signal > current_sample:
if direction < 0:
steps_pos = int(steps_pos / 2)
steps_neg = int(steps_neg / 2)
if steps_pos < 1:
steps_pos = 1
if steps_neg < 1:
steps_neg = 1
direction = direction * (-1)
reversal_counter += 1
previous_signal = current_sample
cycle_counter += 1
print(
f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r"
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
def move_to_zero(self): def move_to_zero(self):
self.socket_put("pa0,0") self.socket_put("pa0,0")
@@ -495,7 +457,7 @@ class RtOMNYController(Controller):
if ret == 1: if ret == 1:
return True return True
return False return False
def feedback_is_running(self) -> bool: def feedback_is_running(self) -> bool:
self.feedback_get_status_and_ssi() self.feedback_get_status_and_ssi()
interferometer_feedback_not_running = int(self.ssi["feedback_error"]) interferometer_feedback_not_running = int(self.ssi["feedback_error"])
@@ -504,9 +466,7 @@ class RtOMNYController(Controller):
return True return True
def feedback_enable_with_reset(self): def feedback_enable_with_reset(self):
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"Enabling the feedback...", scope="", show_asap=True)
f"Enabling the feedback...", scope="", show_asap=True
)
self.socket_put("J0") # disable feedback self.socket_put("J0") # disable feedback
time.sleep(0.01) time.sleep(0.01)
@@ -525,16 +485,14 @@ class RtOMNYController(Controller):
self.laser_tracker_on() self.laser_tracker_on()
time.sleep(0.01) time.sleep(0.01)
osamroy = self.device_manager.devices.osamroy osamroy = self.get_device_manager().devices.osamroy
# the following read will also upate the angle in rt during readout # the following read will also upate the angle in rt during readout
readback = osamroy.obj.readback.get() readback = osamroy.obj.readback.get()
if np.fabs(readback) > 0.1: if (np.fabs(readback) > 0.1):
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"Rotating to zero", scope="feedback enable", show_asap=True)
f"Rotating to zero", scope="feedback enable", show_asap=True
)
osamroy.obj.move(0, wait=True) osamroy.obj.move(0, wait=True)
osamx = self.device_manager.devices.osamx osamx = self.get_device_manager().devices.osamx
osamx_in = osamx.user_parameter.get("in") osamx_in = osamx.user_parameter.get("in")
if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01): if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01):
@@ -556,15 +514,16 @@ class RtOMNYController(Controller):
time.sleep(1.5) time.sleep(1.5)
self.set_device_read_write("osamx", False) self.set_device_enabled("osamx", False)
self.set_device_read_write("osamy", False) self.set_device_enabled("osamy", False)
self.set_device_read_write("ofzpx", False) self.set_device_enabled("ofzpx", False)
self.set_device_read_write("ofzpy", False) self.set_device_enabled("ofzpy", False)
self.set_device_read_write("oosax", False) self.set_device_enabled("oosax", False)
self.set_device_read_write("oosax", False) self.set_device_enabled("oosax", False)
print("Feedback is running.") print("Feedback is running.")
@threadlocked @threadlocked
def clear_trajectory_generator(self): def clear_trajectory_generator(self):
self.socket_put("sc") self.socket_put("sc")
@@ -575,15 +534,16 @@ class RtOMNYController(Controller):
self.move_to_zero() self.move_to_zero()
self.socket_put("J0") self.socket_put("J0")
self.set_device_read_write("osamx", True) self.set_device_enabled("osamx", True)
self.set_device_read_write("osamy", True) self.set_device_enabled("osamy", True)
self.set_device_read_write("ofzpx", True) self.set_device_enabled("ofzpx", True)
self.set_device_read_write("ofzpy", True) self.set_device_enabled("ofzpy", True)
self.set_device_read_write("oosax", True) self.set_device_enabled("oosax", True)
self.set_device_read_write("oosax", True) self.set_device_enabled("oosax", True)
print("rt feedback is now disabled.") print("rt feedback is now disabled.")
def set_rotation_angle(self, val: float) -> None: def set_rotation_angle(self, val: float) -> None:
self.socket_put(f"a{val/180*np.pi}") self.socket_put(f"a{val/180*np.pi}")
@@ -618,13 +578,12 @@ class RtOMNYController(Controller):
"enabled_z": bool(tracker_values[10]), "enabled_z": bool(tracker_values[10]),
} }
def laser_tracker_on(self): def laser_tracker_on(self):
# update variables and enable if not yet active #update variables and enable if not yet active
if not self.laser_tracker_check_enabled(): if not self.laser_tracker_check_enabled():
logger.info("Enabling the laser tracker. Please wait...") logger.info("Enabling the laser tracker. Please wait...")
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(f"Enabling the laser tracker. Please wait...", scope="", show_asap=True)
f"Enabling the laser tracker. Please wait...", scope="", show_asap=True
)
tracker_intensity = self.tracker_info["tracker_intensity"] tracker_intensity = self.tracker_info["tracker_intensity"]
if ( if (
@@ -639,13 +598,18 @@ class RtOMNYController(Controller):
self.socket_put("T1") self.socket_put("T1")
time.sleep(0.5) time.sleep(0.5)
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackyct=0") self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed(
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackzct=0") "trackyct=0"
)
self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed(
"trackzct=0"
)
self.laser_tracker_wait_on_target() self.laser_tracker_wait_on_target()
logger.info("Laser tracker running!") logger.info("Laser tracker running!")
print("Laser tracker running!") print("Laser tracker running!")
def laser_tracker_check_enabled(self) -> bool: def laser_tracker_check_enabled(self) -> bool:
self.laser_update_tracker_info() self.laser_update_tracker_info()
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]: if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
@@ -664,10 +628,11 @@ class RtOMNYController(Controller):
return True return True
return False return False
def laser_tracker_wait_on_target(self): def laser_tracker_wait_on_target(self):
max_repeat = 15 max_repeat = 15
count = 0 count = 0
while not self.laser_tracker_check_on_target() and count < max_repeat: while not self.laser_tracker_check_on_target() and count<max_repeat:
logger.info("Waiting for laser tracker to reach target position.") logger.info("Waiting for laser tracker to reach target position.")
time.sleep(0.5) time.sleep(0.5)
count += 1 count += 1
@@ -676,74 +641,75 @@ class RtOMNYController(Controller):
raise RtError("Failed to reach laser target position.") raise RtError("Failed to reach laser target position.")
def laser_tracker_print_intensity_for_otrack_tweaking(self): def laser_tracker_print_intensity_for_otrack_tweaking(self):
# self.laser_update_tracker_info() #self.laser_update_tracker_info()
# _laser_tracker_intensity = self.tracker_info["tracker_intensity"] #_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
# print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r") #print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
self.laser_tracker_show_all(extra_endline="\r") self.laser_tracker_show_all(extra_endline="\r")
def laser_tracker_show_all(self, extra_endline=""): def laser_tracker_show_all(self,extra_endline=""):
self.laser_update_tracker_info() self.laser_update_tracker_info()
enabled_y = self.tracker_info["enabled_y"] enabled_y = self.tracker_info["enabled_y"]
print(extra_endline + f"Tracker enabled: {bool(enabled_y)}" + extra_endline) print(extra_endline+f"Tracker enabled: {bool(enabled_y)}"+extra_endline)
if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]: if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]:
print(self.red + " LOW INTENSITY" + self.white + extra_endline) print(self.red+" LOW INTENSITY"+self.white+extra_endline)
_laser_tracker_intensity = self.tracker_info["tracker_intensity"] _laser_tracker_intensity = self.tracker_info["tracker_intensity"]
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}" + extra_endline) print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}"+extra_endline)
_laser_tracker_y_beampos = self.tracker_info["beampos_y"] _laser_tracker_y_beampos = self.tracker_info["beampos_y"]
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}" + extra_endline) print(f" Y beam position: {_laser_tracker_y_beampos:.2f}"+extra_endline)
_laser_tracker_y_target = self.tracker_info["target_y"] _laser_tracker_y_target = self.tracker_info["target_y"]
print(f" target position: {_laser_tracker_y_target:.2f}" + extra_endline) print(f" target position: {_laser_tracker_y_target:.2f}"+extra_endline)
_laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"] _laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"]
print( print(f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}"+extra_endline)
f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}" + extra_endline
)
_laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"] _laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"]
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}" + extra_endline) print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}"+extra_endline)
_laser_tracker_z_beampos = self.tracker_info["beampos_z"] _laser_tracker_z_beampos = self.tracker_info["beampos_z"]
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}" + extra_endline) print(f" Z beam position: {_laser_tracker_z_beampos:.2f}"+extra_endline)
_laser_tracker_z_target = self.tracker_info["target_z"] _laser_tracker_z_target = self.tracker_info["target_z"]
print(f" target position: {_laser_tracker_z_target:.2f}" + extra_endline) print(f" target position: {_laser_tracker_z_target:.2f}"+extra_endline)
_laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"] _laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"]
print( print(f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}"+extra_endline)
f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}" + extra_endline
)
_laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"] _laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"]
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}" + extra_endline) print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}"+extra_endline)
print(" Reminder - there is also an upper threshold active in rt\n" + extra_endline) print(" Reminder - there is also an upper threshold active in rt\n"+extra_endline)
if extra_endline == "": if extra_endline == "":
self.laser_tracker_galil_status() self.laser_tracker_galil_status()
def laser_tracker_galil_enable(self): def laser_tracker_galil_enable(self):
otracky_con = self.device_manager.devices.otracky.obj.controller otracky_con = self.get_device_manager().devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=1") otracky_con.socket_put_confirmed("tracken=1")
otracky_con.socket_put_confirmed("trackyct=0") otracky_con.socket_put_confirmed("trackyct=0")
otracky_con.socket_put_confirmed("trackzct=0") otracky_con.socket_put_confirmed("trackzct=0")
def laser_tracker_galil_disable(self): def laser_tracker_galil_disable(self):
otracky_con = self.device_manager.devices.otracky.obj.controller otracky_con = self.get_device_manager().devices.otracky.obj.controller
otracky_con.socket_put_confirmed("tracken=0") otracky_con.socket_put_confirmed("tracken=0")
def laser_tracker_galil_status(self): def laser_tracker_galil_status(self):
otracky_con = self.device_manager.devices.otracky.obj.controller otracky_con = self.get_device_manager().devices.otracky.obj.controller
if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0: if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0:
print(self.red + "Tracking in the Galil Controller is disabled." + self.white) print(self.red+"Tracking in the Galil Controller is disabled."+self.white)
print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n") print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n")
return 0 return(0)
else: else:
print("Tracking in the Galil Controller is enabled.") print("Tracking in the Galil Controller is enabled.")
trackyct = int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip())) trackyct=int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
trackzct = int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip())) trackzct=int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
print(f"Galil Trackcounters y={trackyct}, z={trackzct}") print(f"Galil Trackcounters y={trackyct}, z={trackzct}")
def show_signal_strength_interferometer(self): def show_signal_strength_interferometer(self):
channelnames = {1: "OSA FZP Y", 2: "ST OSA Y", 3: "OSA FZP X", 4: "ST OSA X", 5: "Angle"} channelnames={1:"OSA FZP Y",2:"ST OSA Y",3:"OSA FZP X",4:"ST OSA X",5:"Angle"}
self.feedback_get_status_and_ssi() self.feedback_get_status_and_ssi()
t = PrettyTable() t = PrettyTable()
t.title = f"Interferometer signal strength" t.title = f"Interferometer signal strength"
t.field_names = ["Channel", "Name", "Value"] t.field_names = ["Channel", "Name", "Value"]
for i in range(1, 6): for i in range(1,6):
ssi = self.ssi[f"ssi_{i}"] ssi = self.ssi[f"ssi_{i}"]
t.add_row([i, channelnames[i], ssi]) t.add_row([i,channelnames[i], ssi])
print(t) print(t)
def _omny_interferometer_switch_open_socket(self): def _omny_interferometer_switch_open_socket(self):
@@ -756,42 +722,44 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_put_and_receive("?000\r") self._omny_interferometer_switch_put_and_receive("?000\r")
time.sleep(1) time.sleep(1)
def _omny_interferometer_switch_channel(self, channel): def _omny_interferometer_switch_channel(self, channel):
self._omny_interferometer_switch_alloff() self._omny_interferometer_switch_alloff()
time.sleep(0.1) time.sleep(0.1)
if channel == 1: # Relais 1 and 2 if channel == 1: #Relais 1 and 2
rback = self._omny_interferometer_switch_put_and_receive("!0020003\r") rback = self._omny_interferometer_switch_put_and_receive("!0020003\r")
# if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"): #if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 2: # Relais 3 and 4 elif channel == 2: #Relais 3 and 4
rback = self._omny_interferometer_switch_put_and_receive("!002000C\r") rback = self._omny_interferometer_switch_put_and_receive("!002000C\r")
# if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"): # if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 3: # Relais 5 and 6 elif channel == 3: #Relais 5 and 6
rback = self._omny_interferometer_switch_put_and_receive("!0020030\r") rback = self._omny_interferometer_switch_put_and_receive("!0020030\r")
# if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"): # if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 4: # Relais 7 and 8 elif channel == 4: #Relais 7 and 8
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r") rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"): # if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 5: # Relais 9 and 10 elif channel == 5: #Relais 9 and 10
rback = self._omny_interferometer_switch_put_and_receive("!0020300\r") rback = self._omny_interferometer_switch_put_and_receive("!0020300\r")
# if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"): # if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 6: # Relais 11 and 12 elif channel == 6: #Relais 11 and 12
rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r") rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r")
# if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"): # if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 7: # Relais 13 and 14 elif channel == 7: #Relais 13 and 14
rback = self._omny_interferometer_switch_put_and_receive("!0023000\r") rback = self._omny_interferometer_switch_put_and_receive("!0023000\r")
# if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"): # if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 8: # Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y elif channel == 8: #Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r") rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"): # if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
elif channel == 9: # Relais 15 and 16 elif channel == 9: #Relais 15 and 16
rback = self._omny_interferometer_switch_put_and_receive("!002C000\r") rback = self._omny_interferometer_switch_put_and_receive("!002C000\r")
# if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"): # if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"):
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.") # raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
@@ -817,14 +785,14 @@ class RtOMNYController(Controller):
self._omny_interferometer_switch_put_and_receive("!00S01\r") self._omny_interferometer_switch_put_and_receive("!00S01\r")
def _omny_interferometer_switch_alloff(self): def _omny_interferometer_switch_alloff(self):
# switch all off #switch all off
self._omny_interferometer_switch_put_and_receive("!0020000\r") self._omny_interferometer_switch_put_and_receive("!0020000\r")
# LED OFF #LED OFF
time.sleep(0.1) time.sleep(0.1)
self._omny_interferometer_switch_put_and_receive("!00S00\r") self._omny_interferometer_switch_put_and_receive("!00S00\r")
time.sleep(0.1) time.sleep(0.1)
alloff = self._omny_interferometer_switch_put_and_receive("?002\r") alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
# check all off #check all off
if "00" not in alloff: if "00" not in alloff:
raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.") raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.")
@@ -832,16 +800,17 @@ class RtOMNYController(Controller):
self.socket_put("J3") self.socket_put("J3")
def _omny_interferometer_get_signalsample(self, channel, averaging_duration): def _omny_interferometer_get_signalsample(self, channel, averaging_duration):
# channel is string, eg ssi_1 #channel is string, eg ssi_1
# ensure no averaging running currently #ensure no averaging running currently
self.feedback_is_running() self.feedback_is_running()
# measure first sample #measure first sample
self._rt_start_averaging_SSI() self._rt_start_averaging_SSI()
time.sleep(averaging_duration) time.sleep(averaging_duration)
self.feedback_is_running() self.feedback_is_running()
return self.ssi[channel] return self.ssi[channel]
def _get_signals_from_table(self, return_table) -> dict: def _get_signals_from_table(self, return_table) -> dict:
self.average_stdeviations_x_st_fzp += float(return_table[16]) self.average_stdeviations_x_st_fzp += float(return_table[16])
self.average_stdeviations_y_st_fzp += float(return_table[18]) self.average_stdeviations_y_st_fzp += float(return_table[18])
@@ -862,6 +831,7 @@ class RtOMNYController(Controller):
"stdev_x_st_fzp": {"value": float(return_table[16])}, "stdev_x_st_fzp": {"value": float(return_table[16])},
"average_y_st_fzp": {"value": float(return_table[17])}, "average_y_st_fzp": {"value": float(return_table[17])},
"stdev_y_st_fzp": {"value": float(return_table[18])}, "stdev_y_st_fzp": {"value": float(return_table[18])},
"average_stdeviations_x_st_fzp": { "average_stdeviations_x_st_fzp": {
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1) "value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
}, },
@@ -870,7 +840,7 @@ class RtOMNYController(Controller):
}, },
} }
return signals return signals
@threadlocked @threadlocked
def start_scan(self): def start_scan(self):
if not self.feedback_is_running(): if not self.feedback_is_running():
@@ -892,6 +862,7 @@ class RtOMNYController(Controller):
# start a point-by-point scan (for cont scan in flomni it would be "sa") # start a point-by-point scan (for cont scan in flomni it would be "sa")
self.socket_put_and_receive("sd") self.socket_put_and_receive("sd")
@retry_once @retry_once
@threadlocked @threadlocked
def get_scan_status(self): def get_scan_status(self):
@@ -910,6 +881,13 @@ class RtOMNYController(Controller):
current_position_in_scan = int(float(return_table[2])) current_position_in_scan = int(float(return_table[2]))
return (mode, number_of_positions_planned, current_position_in_scan) return (mode, number_of_positions_planned, current_position_in_scan)
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def read_positions_from_sampler(self): def read_positions_from_sampler(self):
# this was for reading after the scan completed # this was for reading after the scan completed
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
@@ -923,7 +901,7 @@ class RtOMNYController(Controller):
# if not (mode==2 or mode==3): # if not (mode==2 or mode==3):
# error # error
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=1, metadata=self.readout_metadata device="rt_scan", status=1, metadata=self.readout_metadata
@@ -958,7 +936,7 @@ class RtOMNYController(Controller):
signals = self._get_signals_from_table(return_table) signals = self._get_signals_from_table(return_table)
self.publish_device_data(signals=signals, point_id=int(return_table[0])) self.publish_device_data(signals=signals, point_id=int(return_table[0]))
self.device_manager.connector.set( self.get_device_manager().connector.set(
MessageEndpoints.device_status("rt_scan"), MessageEndpoints.device_status("rt_scan"),
messages.DeviceStatusMessage( messages.DeviceStatusMessage(
device="rt_scan", status=0, metadata=self.readout_metadata device="rt_scan", status=0, metadata=self.readout_metadata
@@ -971,16 +949,15 @@ class RtOMNYController(Controller):
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}." f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}."
) )
self.device_manager.connector.send_client_info( self.get_device_manager().connector.send_client_info(
"OMNY statistics: Average of all standard deviations [nm]: x" "OMNY statistics: Average of all standard deviations [nm]: x"
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y" f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.", f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.",
scope="", scope="", show_asap=True)
show_asap=True,
)
def publish_device_data(self, signals, point_id): def publish_device_data(self, signals, point_id):
self.device_manager.connector.set_and_publish( self.get_device_manager().connector.set_and_publish(
MessageEndpoints.device_read("rt_omny"), MessageEndpoints.device_read("rt_omny"),
messages.DeviceMessage( messages.DeviceMessage(
signals=signals, metadata={"point_id": point_id, **self.readout_metadata} signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
@@ -1091,7 +1068,7 @@ class RtOMNYMotor(Device, PositionerBase):
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign
self.controller = RtOMNYController( self.controller = RtOMNYController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric) self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.device_manager = device_manager self.device_manager = device_manager
@@ -1119,10 +1096,6 @@ class RtOMNYMotor(Device, PositionerBase):
self.low_limit_travel.put(limits[0]) self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1]) self.high_limit_travel.put(limits[1])
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
"""Wait for the device to be connected."""
self.controller.on(timeout=timeout)
@property @property
def limits(self): def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get()) return (self.low_limit_travel.get(), self.high_limit_travel.get())
@@ -1209,6 +1182,7 @@ class RtOMNYMotor(Device, PositionerBase):
return status return status
@property @property
def axis_Id(self): def axis_Id(self):
return self._axis_Id_alpha return self._axis_Id_alpha
@@ -1253,7 +1227,7 @@ class RtOMNYMotor(Device, PositionerBase):
if __name__ == "__main__": if __name__ == "__main__":
rtcontroller = RtOMNYController( rtcontroller = RtOMNYController(
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
) )
rtcontroller.on() rtcontroller.on()
rtcontroller.laser_tracker_on() rtcontroller.laser_tracker_on()

View File

@@ -1,82 +0,0 @@
import time
import socket
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import EpicsSignal
class OMNYFastEpicsShutterError(Exception):
pass
def _detect_host_pv():
"""Detect host subnet and return appropriate PV name."""
try:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip.startswith("129.129.122."):
return "X12SA-ES1-TTL:OUT_01"
else:
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
except Exception as ex:
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
class OMNYFastEpicsShutter(Device):
"""
Fast EPICS shutter with automatic PV selection based on host subnet.
"""
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
SUB_VALUE = "value"
_default_sub = SUB_VALUE
# PV is detected dynamically at import time
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
def __init__(self, prefix="", *, name, **kwargs):
super().__init__(prefix, name=name, **kwargs)
self.shutter.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
# -----------------------------------------------------
# User-facing shutter control functions
# -----------------------------------------------------
def fshopen(self):
"""Open the fast shutter."""
try:
self.shutter.put(1, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
def fshclose(self):
"""Close the fast shutter."""
try:
self.shutter.put(0, wait=True)
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
def fshstatus(self):
"""Return the fast shutter status (0=closed, 1=open)."""
try:
return self.shutter.get()
except Exception as ex:
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
def fshinfo(self):
"""Print information about which EPICS PV channel is being used."""
pvname = self.shutter.pvname
print(f"Fast shutter connected to EPICS channel: {pvname}")
return pvname
def help(self):
"""Display available user methods."""
print("Available methods:")
for method in self.USER_ACCESS:
print(f" - {method}")

View File

@@ -1,65 +0,0 @@
import requests
import threading
import cv2
import numpy as np
from ophyd import Device, Component as Cpt
from ophyd_devices import PreviewSignal
import traceback
from bec_lib.logger import bec_logger
logger = bec_logger.logger
class WebcamViewer(Device):
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
super().__init__(name=name, **kwargs)
self.url = url
self._connection = None
self._update_thread = None
self._buffer = b""
self._shutdown_event = threading.Event()
self.preview.num_rotation_90 = num_rotation_90
self.preview.transpose = transpose
def start_live_mode(self) -> None:
if self._connection is not None:
return
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def _update_loop(self) -> None:
while not self._shutdown_event.is_set():
try:
self._connection = requests.get(self.url, stream=True)
for chunk in self._connection.iter_content(chunk_size=1024):
self._buffer += chunk
start = self._buffer.find(b'\xff\xd8') # JPEG start
end = self._buffer.find(b'\xff\xd9') # JPEG end
if start == -1 or end == -1:
continue
jpg = self._buffer[start:end+2]
self._buffer = self._buffer[end+2:]
image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR)
if image is not None:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
self.preview.put(image)
except Exception as exc:
content = traceback.format_exc()
logger.error(f"Image update loop failed: {content}")
def stop_live_mode(self) -> None:
if self._connection is None:
return
self._shutdown_event.set()
if self._connection is not None:
self._connection.close()
self._connection = None
if self._update_thread is not None:
self._update_thread.join()
self._update_thread = None
self._shutdown_event.clear()

View File

@@ -1,74 +0,0 @@
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
class OMNYXRayEpicsGUI(Device):
save_frame = Cpt(
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
)
update_frame_acqdone = Cpt(
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
)
update_frame_acq = Cpt(
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
)
width_y_dynamic = {
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_y = Dcpt(width_y_dynamic)
width_x_dynamic = {
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
width_x = Dcpt(width_x_dynamic)
enable_mv_x = Cpt(
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
)
enable_mv_y = Cpt(
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
)
send_message = Cpt(
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
)
sample_name = Cpt(
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
)
angle = Cpt(
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
)
pixel_size = Cpt(
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
)
submit = Cpt(
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
)
step = Cpt(
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
)
xval_x_dynamic = {
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
xval_x = Dcpt(xval_x_dynamic)
yval_y_dynamic = {
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
}
yval_y = Dcpt(yval_y_dynamic)
recbg = Cpt(
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
)
stage_pos_x_dynamic = {
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
}
stage_pos_x = Dcpt(stage_pos_x_dynamic)
mvx = Cpt(
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
)
mvy = Cpt(
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
)

View File

@@ -93,7 +93,6 @@ class SmaractController(Controller):
socket_cls=None, socket_cls=None,
socket_host=None, socket_host=None,
socket_port=None, socket_port=None,
device_manager=None,
attr_name="", attr_name="",
labels=None, labels=None,
): ):
@@ -103,7 +102,6 @@ class SmaractController(Controller):
socket_cls=socket_cls, socket_cls=socket_cls,
socket_host=socket_host, socket_host=socket_host,
socket_port=socket_port, socket_port=socket_port,
device_manager=device_manager,
attr_name=attr_name, attr_name=attr_name,
parent=parent, parent=parent,
labels=labels, labels=labels,

View File

@@ -123,11 +123,10 @@ class SmaractMotor(Device, PositionerBase):
limits=None, limits=None,
sign=1, sign=1,
socket_cls=SocketIO, socket_cls=SocketIO,
device_manager=None,
**kwargs, **kwargs,
): ):
self.controller = SmaractController( self.controller = SmaractController(
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager socket_cls=socket_cls, socket_host=host, socket_port=port
) )
self.axis_Id = axis_Id self.axis_Id = axis_Id
self.sign = sign self.sign = sign

View File

@@ -33,11 +33,11 @@ logger = bec_logger.logger
class FlomniFermatScan(SyncFlyScanBase): class FlomniFermatScan(SyncFlyScanBase):
scan_name = "flomni_fermat_scan" scan_name = "flomni_fermat_scan"
scan_report_hint = "table"
scan_type = "fly" scan_type = "fly"
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"] required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
arg_input = {} arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
use_scan_progress_report = True
def __init__( def __init__(
self, self,
@@ -74,7 +74,6 @@ class FlomniFermatScan(SyncFlyScanBase):
""" """
super().__init__(parameter=parameter, **kwargs) super().__init__(parameter=parameter, **kwargs)
self.show_live_table = False
self.axis = [] self.axis = []
self.fovx = fovx self.fovx = fovx
self.fovy = fovy self.fovy = fovy
@@ -169,7 +168,6 @@ class FlomniFermatScan(SyncFlyScanBase):
tracker_signal_status = yield from self.stubs.send_rpc_and_wait( tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
"rtx", "controller.laser_tracker_check_signalstrength" "rtx", "controller.laser_tracker_check_signalstrength"
) )
#self.device_manager.connector.send_client_info(tracker_signal_status)
if tracker_signal_status == "low": if tracker_signal_status == "low":
self.device_manager.connector.raise_alarm( self.device_manager.connector.raise_alarm(
severity=0, severity=0,

View File

@@ -88,14 +88,6 @@ Depending on the tomo mode following parameters can be given to the `flomni.tomo
| Golden ratio tomography (sorted in bunches) | projection_number=None | | Golden ratio tomography (sorted in bunches) | projection_number=None |
| Equally spaced with golden starting angle | projection_number=None | | Equally spaced with golden starting angle | projection_number=None |
### GUI tools
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'flomni.flomnigui_' will show all options to control the GUI. Most useful
'flomni.flomnigui_show_cameras()' will show the cameras for sample transfer and interior overview
'flomni.flomnigui_show_progress()' will show the measurement progress GUI
'flomnigui_show_xeyealign()' will show the XrayEye alignment GUI
## How to setup flOMNI (software) ## How to setup flOMNI (software)
This part of the manual is intended for beamline staff and expert users This part of the manual is intended for beamline staff and expert users
@@ -226,15 +218,9 @@ Update the values by, example for feyex and in position,
To refresh the frame of the xray eye windows software To refresh the frame of the xray eye windows software
`flomni.xrayeye_update_frame()` `flomni.xrayeye_update_frame()`
This command can also be called to keep the shutter open and live view active
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To start the xray eye alignment (and clear any previous alignment) To start the xray eye alignment (and clear any previous alignment)
`flomni.xrayeye_alignment_start()` `flomni.xrayeye_alignment_start()`
This command can also be called to keep the shutter open and live view active. Warning: The dose to the sample will be significantly higher.
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run
`flomni.read_alignment_offset(dir_path='')` `flomni.read_alignment_offset(dir_path='')`
The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`. The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`.

View File

@@ -16,7 +16,7 @@ dependencies = [
"bec_ipython_client", "bec_ipython_client",
"bec_lib", "bec_lib",
"bec_server", "bec_server",
"ophyd_devices~=1.29", "ophyd_devices",
"std_daq_client", "std_daq_client",
"jfjoch-client", "jfjoch-client",
"rich", "rich",
@@ -24,7 +24,6 @@ dependencies = [
"pyueye", # for the IDS uEye camera "pyueye", # for the IDS uEye camera
"bec_widgets", "bec_widgets",
"zmq", "zmq",
"opencv-python",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGal
@pytest.fixture @pytest.fixture
def fsamroy(dm_with_devices): def fsamroy():
FuprGalilController._reset_controller() FuprGalilController._reset_controller()
fsamroy_motor = FuprGalilMotor( fsamroy_motor = FuprGalilMotor(
"A", "A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="fsamroy",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
fsamroy_motor.controller.on() fsamroy_motor.controller.on()
assert isinstance(fsamroy_motor.controller, FuprGalilController) assert isinstance(fsamroy_motor.controller, FuprGalilController)

View File

@@ -8,15 +8,10 @@ from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, Lamn
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def leyey(dm_with_devices): def leyey():
LamniGalilController._reset_controller() LamniGalilController._reset_controller()
leyey_motor = LamniGalilMotor( leyey_motor = LamniGalilMotor(
"H", "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
leyey_motor.controller.on() leyey_motor.controller.on()
yield leyey_motor yield leyey_motor
@@ -25,15 +20,10 @@ def leyey(dm_with_devices):
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def leyex(dm_with_devices): def leyex():
LamniGalilController._reset_controller() LamniGalilController._reset_controller()
leyex_motor = LamniGalilMotor( leyex_motor = LamniGalilMotor(
"A", "A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
leyex_motor.controller.on() leyex_motor.controller.on()
yield leyex_motor yield leyex_motor

View File

@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, Flo
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def leyey(dm_with_devices): def leyey():
FlomniGalilController._reset_controller() FlomniGalilController._reset_controller()
leyey_motor = FlomniGalilMotor( leyey_motor = FlomniGalilMotor(
"H", "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
leyey_motor.controller.on() leyey_motor.controller.on()
yield leyey_motor yield leyey_motor
@@ -24,15 +19,10 @@ def leyey(dm_with_devices):
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def leyex(dm_with_devices): def leyex():
FlomniGalilController._reset_controller() FlomniGalilController._reset_controller()
leyex_motor = FlomniGalilMotor( leyex_motor = FlomniGalilMotor(
"H", "H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="leyey",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
leyex_motor.controller.on() leyex_motor.controller.on()
yield leyex_motor yield leyex_motor
@@ -167,16 +157,11 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
], ],
) )
def test_fosaz_light_curtain_is_triggered( def test_fosaz_light_curtain_is_triggered(
axis_Id, socket_put_messages, socket_get_messages, triggered, dm_with_devices axis_Id, socket_put_messages, socket_get_messages, triggered
): ):
"""test that the light curtain is triggered""" """test that the light curtain is triggered"""
fosaz = FlomniGalilMotor( fosaz = FlomniGalilMotor(
axis_Id, axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
name="fosaz",
host="mpc2680.psi.ch",
port=8081,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
fosaz.controller.on() fosaz.controller.on()
fosaz.controller.sock.flush_buffer() fosaz.controller.sock.flush_buffer()

View File

@@ -5,7 +5,7 @@ from unittest import mock
import numpy as np import numpy as np
import pytest import pytest
from csaxs_bec.devices.ids_cameras.ids_camera import IDSCamera from csaxs_bec.devices.ids_cameras.ids_camera_new import IDSCamera
@pytest.fixture(scope="function") @pytest.fixture(scope="function")

View File

@@ -16,10 +16,7 @@ def controller():
""" """
with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls: with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls:
controller = NPointController( controller = NPointController(
socket_cls=socket_cls, socket_cls=socket_cls, socket_host="localhost", socket_port=1234
socket_host="localhost",
socket_port=1234,
device_manager=mock.MagicMock(),
) )
controller.on() controller.on()
controller.sock.reset_mock() controller.sock.reset_mock()
@@ -28,18 +25,13 @@ def controller():
@pytest.fixture @pytest.fixture
def npointx(dm_with_devices): def npointx():
""" """
Fixture to create a NPointAxis object. Fixture to create a NPointAxis object.
""" """
controller = mock.MagicMock() controller = mock.MagicMock()
npointx = NPointAxis( npointx = NPointAxis(
axis_Id="A", axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
name="npointx",
host="localhost",
port=1234,
socket_cls=controller,
device_manager=dm_with_devices,
) )
npointx.controller.on() npointx.controller.on()
npointx.controller.sock.reset_mock() npointx.controller.sock.reset_mock()
@@ -115,18 +107,13 @@ def test_axis_get_in(npointx, axis, msg_in, msg_out):
npointx.controller.sock.put.assert_called_once_with(msg_in) npointx.controller.sock.put.assert_called_once_with(msg_in)
def test_axis_out_of_range(dm_with_devices): def test_axis_out_of_range(controller):
""" """
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID. Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
""" """
with pytest.raises(ValueError): with pytest.raises(ValueError):
npointx = NPointAxis( npointx = NPointAxis(
axis_Id="G", axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
name="npointx",
host="localhost",
port=1234,
socket_cls=mock.MagicMock(),
device_manager=dm_with_devices,
) )

View File

@@ -11,29 +11,26 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import RtError
def rt_flomni(): def rt_flomni():
RtFlomniController._reset_controller() RtFlomniController._reset_controller()
rt_flomni = RtFlomniController( rt_flomni = RtFlomniController(
name="rt_flomni", name="rt_flomni", socket_cls=SocketMock, socket_host="localhost", socket_port=8081
socket_cls=SocketMock,
socket_host="localhost",
socket_port=8081,
device_manager=mock.MagicMock(),
) )
with mock.patch.object(rt_flomni, "sock"): with mock.patch.object(rt_flomni, "get_device_manager"):
rtx = mock.MagicMock(spec=RtFlomniMotor) with mock.patch.object(rt_flomni, "sock"):
rtx.name = "rtx" rtx = mock.MagicMock(spec=RtFlomniMotor)
rtx.axis_Id = "A" rtx.name = "rtx"
rtx.axis_Id_numeric = 0 rtx.axis_Id = "A"
rty = mock.MagicMock(spec=RtFlomniMotor) rtx.axis_Id_numeric = 0
rty.name = "rty" rty = mock.MagicMock(spec=RtFlomniMotor)
rty.axis_Id = "B" rty.name = "rty"
rty.axis_Id_numeric = 1 rty.axis_Id = "B"
rtz = mock.MagicMock(spec=RtFlomniMotor) rty.axis_Id_numeric = 1
rtz.name = "rtz" rtz = mock.MagicMock(spec=RtFlomniMotor)
rtz.axis_Id = "C" rtz.name = "rtz"
rtz.axis_Id_numeric = 2 rtz.axis_Id = "C"
rt_flomni.set_axis(axis=rtx, axis_nr=0) rtz.axis_Id_numeric = 2
rt_flomni.set_axis(axis=rty, axis_nr=1) rt_flomni.set_axis(axis=rtx, axis_nr=0)
rt_flomni.set_axis(axis=rtz, axis_nr=2) rt_flomni.set_axis(axis=rty, axis_nr=1)
yield rt_flomni rt_flomni.set_axis(axis=rtz, axis_nr=2)
yield rt_flomni
RtFlomniController._reset_controller() RtFlomniController._reset_controller()
@@ -55,7 +52,7 @@ def test_rt_flomni_feedback_is_running(rt_flomni, return_value, is_running):
def test_feedback_enable_with_reset(rt_flomni): def test_feedback_enable_with_reset(rt_flomni):
device_manager = rt_flomni.device_manager device_manager = rt_flomni.get_device_manager()
device_manager.devices.fsamx.user_parameter.get.return_value = 0.05 device_manager.devices.fsamx.user_parameter.get.return_value = 0.05
device_manager.devices.fsamx.obj.readback.get.return_value = 0.05 device_manager.devices.fsamx.obj.readback.get.return_value = 0.05
@@ -71,7 +68,7 @@ def test_feedback_enable_with_reset(rt_flomni):
def test_move_samx_to_scan_region(rt_flomni): def test_move_samx_to_scan_region(rt_flomni):
device_manager = rt_flomni.device_manager device_manager = rt_flomni.get_device_manager()
device_manager.devices.rtx.user_parameter.get.return_value = 1 device_manager.devices.rtx.user_parameter.get.return_value = 1
rt_flomni.move_samx_to_scan_region(20, 2) rt_flomni.move_samx_to_scan_region(20, 2)
assert mock.call(b"v0\n") not in rt_flomni.sock.put.mock_calls assert mock.call(b"v0\n") not in rt_flomni.sock.put.mock_calls
@@ -79,16 +76,16 @@ def test_move_samx_to_scan_region(rt_flomni):
def test_feedback_enable_without_reset(rt_flomni): def test_feedback_enable_without_reset(rt_flomni):
with mock.patch.object(rt_flomni, "set_device_read_write") as set_device_read_write: with mock.patch.object(rt_flomni, "set_device_enabled") as set_device_enabled:
with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True): with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True):
with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on: with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on:
rt_flomni.feedback_enable_without_reset() rt_flomni.feedback_enable_without_reset()
laser_tracker_on.assert_called_once() laser_tracker_on.assert_called_once()
assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls
assert mock.call("fsamx", False) in set_device_read_write.mock_calls assert mock.call("fsamx", False) in set_device_enabled.mock_calls
assert mock.call("fsamy", False) in set_device_read_write.mock_calls assert mock.call("fsamy", False) in set_device_enabled.mock_calls
assert mock.call("foptx", False) in set_device_read_write.mock_calls assert mock.call("foptx", False) in set_device_enabled.mock_calls
assert mock.call("fopty", False) in set_device_read_write.mock_calls assert mock.call("fopty", False) in set_device_enabled.mock_calls
def test_feedback_enable_without_reset_raises(rt_flomni): def test_feedback_enable_without_reset_raises(rt_flomni):

View File

@@ -10,27 +10,19 @@ from csaxs_bec.devices.smaract.smaract_ophyd import SmaractMotor
@pytest.fixture @pytest.fixture
def controller(dm_with_devices): def controller():
SmaractController._reset_controller() SmaractController._reset_controller()
controller = SmaractController( controller = SmaractController(socket_cls=SocketMock, socket_host="dummy", socket_port=123)
socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
)
controller.on() controller.on()
controller.sock.flush_buffer() controller.sock.flush_buffer()
yield controller yield controller
@pytest.fixture @pytest.fixture
def lsmarA(dm_with_devices): def lsmarA():
SmaractController._reset_controller() SmaractController._reset_controller()
motor_a = SmaractMotor( motor_a = SmaractMotor(
"A", "A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
name="lsmarA",
host="mpc2680.psi.ch",
port=8085,
sign=1,
socket_cls=SocketMock,
device_manager=dm_with_devices,
) )
motor_a.controller.on() motor_a.controller.on()
motor_a.controller.sock.flush_buffer() motor_a.controller.sock.flush_buffer()