Compare commits
3 Commits
fix/contro
...
test_gitea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79d2071094 | ||
|
|
d8d2417679 | ||
|
|
8aa0a52bda |
@@ -47,13 +47,25 @@ jobs:
|
||||
python-version: "${{ inputs.PYTHON_VERSION || '3.11' }}"
|
||||
|
||||
- name: Checkout BEC Core
|
||||
run: git clone --depth 1 --branch "${{ inputs.BEC_CORE_BRANCH || 'main' }}" https://github.com/bec-project/bec.git ./bec
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/bec
|
||||
ref: "${{ inputs.BEC_CORE_BRANCH || 'main' }}"
|
||||
path: ./bec
|
||||
|
||||
- name: Checkout Ophyd Devices
|
||||
run: git clone --depth 1 --branch "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}" https://github.com/bec-project/ophyd_devices.git ./ophyd_devices
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/ophyd_devices
|
||||
ref: "${{ inputs.OPHYD_DEVICES_BRANCH || 'main' }}"
|
||||
path: ./ophyd_devices
|
||||
|
||||
- name: Checkout BEC Widgets
|
||||
run: git clone --depth 1 --branch "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}" https://github.com/bec-project/bec_widgets.git ./bec_widgets
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
repository: bec/bec_widgets
|
||||
ref: "${{ inputs.BEC_WIDGETS_BRANCH || 'main' }}"
|
||||
path: ./bec_widgets
|
||||
|
||||
- name: Checkout BEC Plugin Repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
@@ -14,8 +14,6 @@ from typeguard import typechecked
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import cSAXSBeamlineChecks
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_optics_mixin import FlomniOpticsMixin
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.x_ray_eye_align import XrayEyeAlign
|
||||
from csaxs_bec.bec_ipython_client.plugins.flomni.gui_tools import flomniGuiTools
|
||||
from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import OMNYTools
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -26,57 +24,27 @@ if builtins.__dict__.get("bec") is not None:
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class FlomniToolsError(Exception):
|
||||
pass
|
||||
|
||||
class FlomniInitError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FlomniError(Exception):
|
||||
pass
|
||||
|
||||
class FlomniTools:
|
||||
def yesno(self, message: str, default="none", autoconfirm=0) -> bool:
|
||||
if autoconfirm and default == "y":
|
||||
self.printgreen(message + " Automatically confirming default: yes")
|
||||
return True
|
||||
elif autoconfirm and default == "n":
|
||||
self.printgreen(message + " Automatically confirming default: no")
|
||||
return False
|
||||
if default == "y":
|
||||
message_ending = " [Y]/n? "
|
||||
elif default == "n":
|
||||
message_ending = " y/[N]? "
|
||||
else:
|
||||
message_ending = " y/n? "
|
||||
while True:
|
||||
user_input = input(self.OKBLUE + message + message_ending + self.ENDC)
|
||||
if (
|
||||
user_input == "Y" or user_input == "y" or user_input == "yes" or user_input == "Yes"
|
||||
) or (default == "y" and user_input == ""):
|
||||
return True
|
||||
if (
|
||||
user_input == "N" or user_input == "n" or user_input == "no" or user_input == "No"
|
||||
) or (default == "n" and user_input == ""):
|
||||
return False
|
||||
else:
|
||||
print("Please expicitely confirm y or n.")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class FlomniInitStagesMixin:
|
||||
|
||||
def flomni_init_stages(self):
|
||||
|
||||
if self.OMNYTools.yesno("Starting initialization of flOMNI stages. OK?"):
|
||||
user_input = input("Starting initialization of flOMNI stages. OK? [y/n]")
|
||||
if user_input == "y":
|
||||
print("staring...")
|
||||
else:
|
||||
return
|
||||
|
||||
if self.check_all_axes_of_fomni_referenced():
|
||||
if self.OMNYTools.yesno("All axes are referenced. Continue anyways?"):
|
||||
user_input = input("Continue anyways? [y/n]")
|
||||
if user_input == "y":
|
||||
print("ok then...")
|
||||
else:
|
||||
return
|
||||
@@ -106,8 +74,10 @@ class FlomniInitStagesMixin:
|
||||
dev.feyex.limits = [-30, -1]
|
||||
print("done")
|
||||
|
||||
|
||||
if self.OMNYTools.yesno("Init of foptz. Can the stage move to the upstream limit without collision?"):
|
||||
user_input = input(
|
||||
"Init of foptz. Can the stage move to the upstream limit without collision? [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
return
|
||||
@@ -161,7 +131,10 @@ class FlomniInitStagesMixin:
|
||||
dev.fsamy.limits = [2, 3.1]
|
||||
print("done")
|
||||
|
||||
if self.OMNYTools.yesno("Init of tracking stages. Did you remove the outer laser flight tubes?"):
|
||||
user_input = input(
|
||||
"Init of tracking stages. Did you remove the outer laser flight tubes? [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -177,7 +150,8 @@ class FlomniInitStagesMixin:
|
||||
dev.ftrackz.limits = [4.5, 5.5]
|
||||
print("done")
|
||||
|
||||
if self.OMNYTools.yesno("Init of sample stage. Is the piezo at about 0 deg?"):
|
||||
user_input = input("Init of sample stage. Is the piezo at about 0 deg? [y/n]")
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -194,7 +168,11 @@ class FlomniInitStagesMixin:
|
||||
print("done")
|
||||
|
||||
print("Initializing UPR stage.")
|
||||
if self.OMNYTools.yesno("To ensure that the end switches work, please check that they are currently not pushed. Is everything okay?"):
|
||||
user_input = input(
|
||||
"To ensure that the end switches work, please check that they are currently not pushed."
|
||||
" Is everything okay? [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -215,7 +193,8 @@ class FlomniInitStagesMixin:
|
||||
time.sleep(1)
|
||||
continue
|
||||
break
|
||||
if self.OMNYTools.yesno("Shall I start the index search?"):
|
||||
user_input = input("Shall I start the index search? [y/n]")
|
||||
if user_input == "y":
|
||||
print("good then. Starting index search.")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -234,7 +213,11 @@ class FlomniInitStagesMixin:
|
||||
dev.fsamroy.limits = [-5, 365]
|
||||
print("done")
|
||||
|
||||
if self.OMNYTools.yesno("Init of foptx. Can the stage move to the positive limit without collision? Attention: tracker flight tube!"):
|
||||
user_input = input(
|
||||
"Init of foptx. Can the stage move to the positive limit without collision? Attention:"
|
||||
" tracker flight tube! [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -258,7 +241,8 @@ class FlomniInitStagesMixin:
|
||||
continue
|
||||
break
|
||||
|
||||
if self.OMNYTools.yesno("Start limit switch search of fopty?"):
|
||||
user_input = input("Start limit switch search of fopty? [y/n]")
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -291,7 +275,8 @@ class FlomniInitStagesMixin:
|
||||
return False
|
||||
|
||||
def set_limits(self):
|
||||
if self.OMNYTools.yesno("Set default limits for flOMNI?"):
|
||||
user_input = input("Set default limits for flOMNI? [y/n]")
|
||||
if user_input == "y":
|
||||
print("setting limits...")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -318,7 +303,8 @@ class FlomniInitStagesMixin:
|
||||
dev.ftrackz.limits = [4.5, 5.5]
|
||||
|
||||
def _align_setup(self):
|
||||
if self.OMNYTools.yesno("Start moving stages to default initial positions?", "y"):
|
||||
user_input = input("Start moving stages to default initial positions? [y/n]")
|
||||
if user_input == "y":
|
||||
print("Start moving stages...")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -411,8 +397,7 @@ class FlomniSampleTransferMixin:
|
||||
raise FlomniError("Ftray is not at the 'IN' position. Aborting.")
|
||||
|
||||
def ftransfer_flomni_stage_in(self):
|
||||
sample_in_position = dev.flomni_samples.is_sample_slot_used(0)
|
||||
#bool(float(dev.flomni_samples.sample_placed.sample0.get()))
|
||||
sample_in_position = bool(float(dev.flomni_samples.sample_placed.sample0.get()))
|
||||
if not sample_in_position:
|
||||
raise FlomniError("There is no sample in the sample stage. Aborting.")
|
||||
self.reset_correction()
|
||||
@@ -425,8 +410,6 @@ class FlomniSampleTransferMixin:
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
dev.fsamx.limits = [fsamx_in - 0.4, fsamx_in + 0.4]
|
||||
|
||||
self.flomnigui_idle()
|
||||
|
||||
def laser_tracker_show_all(self):
|
||||
dev.rtx.controller.laser_tracker_show_all()
|
||||
|
||||
@@ -466,10 +449,6 @@ class FlomniSampleTransferMixin:
|
||||
self.device_manager.devices.fsamx.controller.lights_on()
|
||||
|
||||
def ftransfer_flomni_stage_out(self):
|
||||
|
||||
self.flomnigui_show_cameras()
|
||||
self.flomnigui_raise()
|
||||
|
||||
target_pos = -162
|
||||
if np.isclose(dev.fsamx.readback.get(), target_pos, 0.01):
|
||||
return
|
||||
@@ -517,20 +496,22 @@ class FlomniSampleTransferMixin:
|
||||
self.check_tray_in()
|
||||
self.check_sensor_connected()
|
||||
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
|
||||
if sample_in_gripper:
|
||||
raise FlomniError(
|
||||
"The gripper does carry a sample. Cannot proceed getting another sample."
|
||||
)
|
||||
|
||||
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
|
||||
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
|
||||
sample_in_position = bool(float(sample_signal.get()))
|
||||
if not sample_in_position:
|
||||
raise FlomniError(f"The planned pick position [{position}] does not have a sample.")
|
||||
|
||||
self.flomnigui_show_cameras()
|
||||
|
||||
|
||||
if self.OMNYTools.yesno("Please confirm that there is currently no sample in the gripper. It would be dropped!", "y"):
|
||||
user_input = input(
|
||||
"Please confirm that there is currently no sample in the gripper. It would be dropped!"
|
||||
" [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
else:
|
||||
print("Stopping.")
|
||||
@@ -574,12 +555,12 @@ class FlomniSampleTransferMixin:
|
||||
self.check_tray_in()
|
||||
self.check_sensor_connected()
|
||||
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
#bool(float(dev.flomni_samples.sample_in_gripper.get()))
|
||||
sample_in_gripper = bool(float(dev.flomni_samples.sample_in_gripper.get()))
|
||||
if not sample_in_gripper:
|
||||
raise FlomniError("The gripper does not carry a sample.")
|
||||
|
||||
sample_in_position = dev.flomni_samples.is_sample_slot_used(position)
|
||||
sample_signal = getattr(dev.flomni_samples.sample_placed, f"sample{position}")
|
||||
sample_in_position = bool(float(sample_signal.get()))
|
||||
if sample_in_position:
|
||||
raise FlomniError(f"The planned put position [{position}] already has a sample.")
|
||||
|
||||
@@ -612,9 +593,8 @@ class FlomniSampleTransferMixin:
|
||||
self.flomni_modify_storage_non_interactive(100, 0, "-")
|
||||
self.flomni_modify_storage_non_interactive(position, 1, sample_name)
|
||||
|
||||
if position == 0:
|
||||
self.ftransfer_flomni_stage_in()
|
||||
bec.queue.next_dataset_number += 1
|
||||
# TODO: flomni_stage_in if position == 0
|
||||
# bec.queue.next_dataset_number += 1
|
||||
|
||||
def sample_get_name(self, position: int = 0) -> str:
|
||||
"""
|
||||
@@ -625,51 +605,36 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
def ftransfer_sample_change(self, new_sample_position: int):
|
||||
self.check_tray_in()
|
||||
# sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
|
||||
if sample_in_gripper:
|
||||
raise FlomniError("There is already a sample in the gripper. Aborting.")
|
||||
|
||||
self.check_position_is_valid(new_sample_position)
|
||||
|
||||
if new_sample_position == 0:
|
||||
raise FlomniError("The new sample to place cannot be the sample in the sample stage. Aborting.")
|
||||
|
||||
# sample_placed = getattr(
|
||||
# dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
|
||||
# ).get()
|
||||
sample_placed = dev.flomni_samples.is_sample_slot_used(new_sample_position)
|
||||
sample_placed = getattr(
|
||||
dev.flomni_samples.sample_placed, f"sample{new_sample_position}"
|
||||
).get()
|
||||
if not sample_placed:
|
||||
raise FlomniError(
|
||||
f"There is currently no sample in position [{new_sample_position}]. Aborting."
|
||||
)
|
||||
|
||||
# sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
|
||||
sample_in_sample_stage = dev.flomni_samples.is_sample_slot_used(0)
|
||||
sample_in_sample_stage = dev.flomni_samples.sample_placed.sample0.get()
|
||||
if sample_in_sample_stage:
|
||||
# find a new home for the sample...
|
||||
empty_slots = []
|
||||
# for name, val in dev.flomni_samples.read().items():
|
||||
# if "flomni_samples_sample_placed_sample" not in name:
|
||||
# continue
|
||||
# if val.get("value") == 0:
|
||||
# empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
|
||||
for j in range(1,20):
|
||||
if not dev.flomni_samples.is_sample_slot_used(j):
|
||||
empty_slots.append(j)
|
||||
for name, val in dev.flomni_samples.read().items():
|
||||
if "flomni_samples_sample_placed_sample" not in name:
|
||||
continue
|
||||
if val.get("value") == 0:
|
||||
empty_slots.append(int(name.split("flomni_samples_sample_placed_sample")[1]))
|
||||
if not empty_slots:
|
||||
raise FlomniError("There are no empty slots available. Aborting.")
|
||||
|
||||
print(f"The following slots are empty: {empty_slots}.")
|
||||
|
||||
while True:
|
||||
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}] ")
|
||||
|
||||
if user_input.strip() == "":
|
||||
# No entry: use default
|
||||
user_input = empty_slots[0]
|
||||
break
|
||||
|
||||
user_input = input(f"Where shall I put the sample? Default: [{empty_slots[0]}]")
|
||||
try:
|
||||
user_input = int(user_input)
|
||||
if user_input not in empty_slots:
|
||||
@@ -735,20 +700,20 @@ class FlomniSampleTransferMixin:
|
||||
if confirm != -1:
|
||||
return
|
||||
|
||||
if self.OMNYTools.yesno("All OK? Continue?", "y"):
|
||||
user_input = input("All OK? Continue? [y/n]")
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
dev.ftransy.controller.socket_put_confirmed("confirm=1")
|
||||
else:
|
||||
print("Stopping.")
|
||||
exit
|
||||
return
|
||||
|
||||
def ftransfer_gripper_is_open(self) -> bool:
|
||||
status = bool(float(dev.ftransy.controller.socket_put_and_receive("MG @OUT[9]").strip()))
|
||||
return status
|
||||
|
||||
def ftransfer_gripper_open(self):
|
||||
sample_in_gripper = dev.flomni_samples.is_sample_in_gripper()
|
||||
#dev.flomni_samples.sample_in_gripper.get()
|
||||
sample_in_gripper = dev.flomni_samples.sample_in_gripper.get()
|
||||
if sample_in_gripper:
|
||||
raise FlomniError(
|
||||
"Cannot open gripper. There is still a sample in the gripper! Aborting."
|
||||
@@ -768,8 +733,11 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
fsamx_pos = dev.fsamx.readback.get()
|
||||
if position == 0 and fsamx_pos > -160:
|
||||
|
||||
if self.OMNYTools.yesno("May the flomni stage be moved out for the sample change? Feedback will be disabled and alignment will be lost!", "y"):
|
||||
user_input = input(
|
||||
"May the flomni stage be moved out for the sample change? Feedback will be disabled"
|
||||
" and alignment will be lost! [y/n]"
|
||||
)
|
||||
if user_input == "y":
|
||||
print("good then")
|
||||
self.ftransfer_flomni_stage_out()
|
||||
else:
|
||||
@@ -924,20 +892,7 @@ class FlomniSampleTransferMixin:
|
||||
|
||||
|
||||
class FlomniAlignmentMixin:
|
||||
import csaxs_bec
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure this is a Path object, not a string
|
||||
csaxs_bec_basepath = Path(csaxs_bec.__file__)
|
||||
|
||||
default_correction_file_rel = "correction_flomni_20210300_360deg.txt"
|
||||
|
||||
# Build the absolute path correctly
|
||||
default_correction_file = (
|
||||
csaxs_bec_basepath.parent / 'bec_ipython_client' / 'plugins' / 'flomni' / default_correction_file_rel
|
||||
).resolve()
|
||||
|
||||
default_correction_file = "correction_flomni_20210300_360deg.txt"
|
||||
|
||||
def reset_correction(self, use_default_correction=True):
|
||||
"""
|
||||
@@ -1151,7 +1106,6 @@ class Flomni(
|
||||
FlomniAlignmentMixin,
|
||||
FlomniOpticsMixin,
|
||||
cSAXSBeamlineChecks,
|
||||
flomniGuiTools
|
||||
):
|
||||
def __init__(self, client):
|
||||
super().__init__()
|
||||
@@ -1174,23 +1128,16 @@ class Flomni(
|
||||
self.corr_pos_y_2 = []
|
||||
self.corr_angle_y_2 = []
|
||||
self.progress = {}
|
||||
self.progress["subtomo"] = 0
|
||||
self.progress["subtomo_projection"] = 0
|
||||
self.progress["subtomo_total_projections"] = 1
|
||||
self.progress["projection"] = 0
|
||||
self.progress["total_projections"] = 1
|
||||
self.progress["angle"] = 0
|
||||
self.progress["tomo_type"] = 0
|
||||
self.OMNYTools = OMNYTools(self.client)
|
||||
self.align = XrayEyeAlign(self.client, self)
|
||||
self.set_client(client)
|
||||
|
||||
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
|
||||
|
||||
if self.OMNYTools.yesno("Starting Xrayeye alignment. Deleting any potential existing alignment for this sample.", "y"):
|
||||
def start_x_ray_eye_alignment(self):
|
||||
user_input = input(
|
||||
"Starting Xrayeye alignment. Deleting any potential existing alignment for this sample. [Y/n]"
|
||||
)
|
||||
if user_input == "y" or user_input == "":
|
||||
self.align = XrayEyeAlign(self.client, self)
|
||||
try:
|
||||
self.align.align(keep_shutter_open)
|
||||
self.align.align()
|
||||
except KeyboardInterrupt as exc:
|
||||
fsamx_in = self._get_user_param_safe(dev.fsamx, "in")
|
||||
if np.isclose(fsamx_in, dev.fsamx.readback.get(), 0.5):
|
||||
@@ -1199,11 +1146,11 @@ class Flomni(
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
raise exc
|
||||
|
||||
def xrayeye_update_frame(self,keep_shutter_open=False):
|
||||
self.align.update_frame(keep_shutter_open)
|
||||
def xrayeye_update_frame(self):
|
||||
self.align.update_frame()
|
||||
|
||||
def xrayeye_alignment_start(self, keep_shutter_open=False):
|
||||
self.start_x_ray_eye_alignment(keep_shutter_open)
|
||||
def xrayeye_alignment_start(self):
|
||||
self.start_x_ray_eye_alignment()
|
||||
|
||||
def drive_axis_to_limit(self, device, direction):
|
||||
axis_id = device._config["deviceConfig"].get("axis_Id")
|
||||
@@ -1623,9 +1570,6 @@ class Flomni(
|
||||
|
||||
def tomo_scan(self, subtomo_start=1, start_angle=None, projection_number=None):
|
||||
"""start a tomo scan"""
|
||||
|
||||
self.flomnigui_show_progress()
|
||||
|
||||
bec = builtins.__dict__.get("bec")
|
||||
scans = builtins.__dict__.get("scans")
|
||||
self._current_special_angles = self.special_angles.copy()
|
||||
@@ -1762,7 +1706,6 @@ class Flomni(
|
||||
print(f"Angle: ........................... {self.progress['angle']}")
|
||||
print(f"Current subtomo: ................. {self.progress['subtomo']}")
|
||||
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
|
||||
self._flomnigui_update_progress()
|
||||
|
||||
def add_sample_database(
|
||||
self, samplename, date, eaccount, scan_number, setup, sample_additional_info, user
|
||||
@@ -1852,7 +1795,7 @@ class Flomni(
|
||||
|
||||
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
|
||||
tomo_scan_numbers_file = os.path.expanduser(
|
||||
"~/tomography_scannumbers.txt"
|
||||
"~/Data10/specES1/dat-files/tomography_scannumbers.txt"
|
||||
)
|
||||
with open(tomo_scan_numbers_file, "a+") as out_file:
|
||||
# pylint: disable=undefined-variable
|
||||
@@ -1951,8 +1894,8 @@ class Flomni(
|
||||
)
|
||||
print(f"\nSample name: {self.sample_name}\n")
|
||||
|
||||
|
||||
if self.OMNYTools.yesno("Are these parameters correctly set for your scan?", "y"):
|
||||
user_input = input("Are these parameters correctly set for your scan? [Y/n]")
|
||||
if user_input == "y" or user_input == "":
|
||||
print("... excellent!")
|
||||
else:
|
||||
self.tomo_countingtime = self._get_val("<ctime> s", self.tomo_countingtime, float)
|
||||
|
||||
@@ -33,7 +33,7 @@ class FlomniOpticsMixin:
|
||||
feyex_in = self._get_user_param_safe("feyex", "in")
|
||||
feyey_in = self._get_user_param_safe("feyey", "in")
|
||||
umv(dev.feyex, feyex_in, dev.feyey, feyey_in)
|
||||
#self.align.update_frame()
|
||||
self.align.update_frame()
|
||||
|
||||
def _ffzp_in(self):
|
||||
foptx_in = self._get_user_param_safe("foptx", "in")
|
||||
|
||||
@@ -1,173 +0,0 @@
|
||||
import builtins
|
||||
|
||||
from bec_widgets.cli.client import BECDockArea
|
||||
|
||||
# from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
|
||||
|
||||
if builtins.__dict__.get("bec") is not None:
|
||||
bec = builtins.__dict__.get("bec")
|
||||
dev = builtins.__dict__.get("dev")
|
||||
umv = builtins.__dict__.get("umv")
|
||||
umvr = builtins.__dict__.get("umvr")
|
||||
|
||||
|
||||
class flomniGuiToolsError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class flomniGuiTools:
|
||||
|
||||
def __init__(self):
|
||||
self.text_box = None
|
||||
self.progressbar = None
|
||||
|
||||
def set_client(self, client):
|
||||
self.client = client
|
||||
self.gui = self.client.gui
|
||||
|
||||
def flomnigui_show_gui(self):
|
||||
if "flomni" in self.gui.windows:
|
||||
self.gui.flomni.show()
|
||||
else:
|
||||
self.gui.new("flomni")
|
||||
|
||||
def flomnigui_stop_gui(self):
|
||||
self.gui.flomni.hide()
|
||||
|
||||
def flomnigui_raise(self):
|
||||
self.gui.flomni.raise_window()
|
||||
|
||||
# def flomnigui_show_xeyealign(self):
|
||||
# self.flomnigui_show_gui()
|
||||
# if self.xeyegui is None:
|
||||
# self.flomnigui_remove_all_docks()
|
||||
# self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
|
||||
# # start live
|
||||
# if not dev.cam_xeye.live_mode:
|
||||
# dev.cam_xeye.live_mode = True
|
||||
|
||||
def flomnigui_show_xeyealign(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("xeyegui"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
self.xeyegui = self.gui.flomni.new("xeyegui").new("XRayEye")
|
||||
# start live
|
||||
if not dev.cam_xeye.live_mode:
|
||||
dev.cam_xeye.live_mode = True
|
||||
|
||||
|
||||
def _flomnigui_check_attribute_not_exists(self, attribute_name):
|
||||
if hasattr(self.gui,"flomni"):
|
||||
if hasattr(self.gui.flomni,attribute_name):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def flomnigui_show_cameras(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("camera_gripper") or self._flomnigui_check_attribute_not_exists("camera_overview"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
camera_gripper_image = self.gui.flomni.new("camera_gripper").new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_gripper):
|
||||
camera_gripper_image.image(("cam_flomni_gripper", "preview"))
|
||||
camera_gripper_image.lock_aspect_ratio = True
|
||||
camera_gripper_image.enable_fps_monitor = True
|
||||
camera_gripper_image.enable_toolbar = False
|
||||
camera_gripper_image.outer_axes = False
|
||||
camera_gripper_image.inner_axes = False
|
||||
dev.cam_flomni_gripper.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_gripper. Device does not exist.")
|
||||
camera_overview_image = self.gui.flomni.new("camera_overview").new("Image")
|
||||
if self._flomnicam_check_device_exists(dev.cam_flomni_overview):
|
||||
camera_overview_image.image(("cam_flomni_overview", "preview"))
|
||||
camera_overview_image.lock_aspect_ratio = True
|
||||
camera_overview_image.enable_fps_monitor = True
|
||||
camera_overview_image.enable_toolbar = False
|
||||
camera_overview_image.outer_axes = False
|
||||
camera_overview_image.inner_axes = False
|
||||
dev.cam_flomni_overview.start_live_mode()
|
||||
else:
|
||||
print("Cannot open camera_overview. Device does not exist.")
|
||||
|
||||
def flomnigui_remove_all_docks(self):
|
||||
dev.cam_flomni_overview.stop_live_mode()
|
||||
dev.cam_flomni_gripper.stop_live_mode()
|
||||
dev.cam_xeye.live_mode = False
|
||||
self.gui.flomni.delete_all()
|
||||
self.progressbar = None
|
||||
self.text_box = None
|
||||
|
||||
def flomnigui_idle(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("idle_text_box"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
idle_text_box = self.gui.flomni.new("idle_textbox").new("TextBox")
|
||||
text = (
|
||||
"<pre>"
|
||||
+ " ,---.,--. ,-----. ,--. ,--.,--. ,--.,--. \n"
|
||||
+ "/ .-'| |' .-. '| `.' || ,'.| || | \n"
|
||||
+ "| `-,| || | | || |'.'| || |' ' || | \n"
|
||||
+ "| .-'| |' '-' '| | | || | ` || | \n"
|
||||
+ "`--' `--' `-----' `--' `--'`--' `--'`--' \n"
|
||||
+ "</pre>"
|
||||
)
|
||||
idle_text_box.set_html_text(text)
|
||||
|
||||
def _flomnicam_check_device_exists(self, device):
|
||||
try:
|
||||
device
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def flomnigui_show_progress(self):
|
||||
self.flomnigui_show_gui()
|
||||
if self._flomnigui_check_attribute_not_exists("progressbar"):
|
||||
self.flomnigui_remove_all_docks()
|
||||
# Add a new dock with a RingProgressBar widget
|
||||
self.progressbar = self.gui.flomni.new("progressbar").new("RingProgressBar")
|
||||
# Customize the size of the progress ring
|
||||
self.progressbar.set_line_widths(20)
|
||||
# Disable automatic updates and manually set the self.progressbar value
|
||||
self.progressbar.enable_auto_updates(False)
|
||||
# Set precision for the self.progressbar display
|
||||
self.progressbar.set_precision(1) # Display self.progressbar with one decimal places
|
||||
# Setting multiple rigns with different values
|
||||
self.progressbar.set_number_of_bars(3)
|
||||
self.progressbar.rings[0].set_update("manual")
|
||||
self.progressbar.rings[1].set_update("manual")
|
||||
self.progressbar.rings[2].set_update("scan")
|
||||
# Set the values of the rings to 50, 75, and 25 from outer to inner ring
|
||||
# self.progressbar.set_value([50, 75])
|
||||
# Add a new dock with a TextBox widget
|
||||
self.text_box = self.gui.flomni.new(name="progress_text").new("TextBox")
|
||||
|
||||
self._flomnigui_update_progress()
|
||||
|
||||
def _flomnigui_update_progress(self):
|
||||
if self.progressbar is not None:
|
||||
progress = self.progress["projection"] / self.progress["total_projections"] * 100
|
||||
subtomo_progress = (
|
||||
self.progress["subtomo_projection"]
|
||||
/ self.progress["subtomo_total_projections"]
|
||||
* 100
|
||||
)
|
||||
self.progressbar.set_value([progress, subtomo_progress, 0])
|
||||
if self.text_box is not None:
|
||||
text = f"Progress report:\n Tomo type: ....................... {self.progress['tomo_type']}\n Projection: ...................... {self.progress['projection']:.0f}\n Total projections expected ....... {self.progress['total_projections']}\n Angle: ........................... {self.progress['angle']}\n Current subtomo: ................. {self.progress['subtomo']}\n Current projection within subtomo: {self.progress['subtomo_projection']}\n Total projections per subtomo: ... {self.progress['subtomo_total_projections']}"
|
||||
self.text_box.set_plain_text(text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from bec_lib.client import BECClient
|
||||
from bec_widgets.cli.client_utils import BECGuiClient
|
||||
|
||||
client = BECClient()
|
||||
client.start()
|
||||
client.gui = BECGuiClient()
|
||||
|
||||
flomni_gui = flomniGuiTools(client)
|
||||
flomni_gui.flomnigui_show_gui()
|
||||
flomni_gui.flomnigui_show_progress()
|
||||
@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen, fshclose
|
||||
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
|
||||
|
||||
logger = bec_logger.logger
|
||||
# import builtins to avoid linter errors
|
||||
@@ -22,7 +22,6 @@ if TYPE_CHECKING:
|
||||
|
||||
class XrayEyeAlign:
|
||||
# pixel calibration, multiply to get mm
|
||||
labview=False
|
||||
PIXEL_CALIBRATION = 0.1 / 113 # .2 with binning
|
||||
|
||||
def __init__(self, client, flomni: Flomni) -> None:
|
||||
@@ -41,40 +40,28 @@ class XrayEyeAlign:
|
||||
def save_frame(self):
|
||||
epics_put("XOMNYI-XEYE-SAVFRAME:0", 1)
|
||||
|
||||
def update_frame(self,keep_shutter_open=False):
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
if not self.labview:
|
||||
self.flomni.flomnigui_show_xeyealign()
|
||||
if not dev.cam_xeye.live_mode:
|
||||
dev.cam_xeye.live_mode = True
|
||||
|
||||
def update_frame(self):
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
# start live
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 1)
|
||||
if self.labview:
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
|
||||
# wait for start live
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
time.sleep(0.5)
|
||||
print("waiting for live view to start...")
|
||||
fshopen()
|
||||
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
epics_put("XOMNYI-XEYE-ACQDONE:0", 0)
|
||||
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
while epics_get("XOMNYI-XEYE-ACQDONE:0") == 0:
|
||||
print("waiting for new frame...")
|
||||
time.sleep(0.5)
|
||||
|
||||
time.sleep(0.5)
|
||||
# stop live view
|
||||
if not keep_shutter_open:
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(0.1)
|
||||
fshclose()
|
||||
print("got new frame")
|
||||
else:
|
||||
print("Staying in live view, shutter is and remains open!")
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
time.sleep(1)
|
||||
# fshclose
|
||||
print("got new frame")
|
||||
|
||||
def tomo_rotate(self, val: float):
|
||||
# pylint: disable=undefined-variable
|
||||
@@ -100,23 +87,12 @@ class XrayEyeAlign:
|
||||
def send_message(self, msg: str):
|
||||
epics_put("XOMNYI-XEYE-MESSAGE:0.DESC", msg)
|
||||
|
||||
def align(self,keep_shutter_open=False):
|
||||
if not keep_shutter_open:
|
||||
print("This routine can be called with paramter keep_shutter_open=True to keep the shutter always open")
|
||||
self.send_message("Getting things ready. Please wait...")
|
||||
|
||||
#potential unresolved movement requests to zero
|
||||
epics_put("XOMNYI-XEYE-MVX:0", 0)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
|
||||
def align(self):
|
||||
# reset shift xy and fov params
|
||||
self._reset_init_values()
|
||||
|
||||
self.flomni.lights_off()
|
||||
|
||||
self.flomni.flomnigui_show_xeyealign()
|
||||
self.flomni.flomnigui_raise()
|
||||
|
||||
self.tomo_rotate(0)
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", 0)
|
||||
|
||||
@@ -143,7 +119,7 @@ class XrayEyeAlign:
|
||||
umv(dev.fsamx, fsamx_in - 0.25)
|
||||
|
||||
self.flomni.ffzp_in()
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.update_frame()
|
||||
|
||||
# enable submit buttons
|
||||
self.movement_buttons_enabled = False
|
||||
@@ -176,18 +152,17 @@ class XrayEyeAlign:
|
||||
self.flomni.feedback_disable()
|
||||
umv(dev.fsamx, fsamx_in - 0.25)
|
||||
|
||||
if self.labview:
|
||||
self.update_frame(keep_shutter_open)
|
||||
epics_put("XOMNYI-XEYE-RECBG:0", 1)
|
||||
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
|
||||
time.sleep(0.5)
|
||||
print("waiting for background frame...")
|
||||
self.update_frame()
|
||||
epics_put("XOMNYI-XEYE-RECBG:0", 1)
|
||||
while epics_get("XOMNYI-XEYE-RECBG:0") == 1:
|
||||
time.sleep(0.5)
|
||||
print("waiting for background frame...")
|
||||
|
||||
umv(dev.fsamx, fsamx_in)
|
||||
time.sleep(0.5)
|
||||
self.flomni.feedback_enable_with_reset()
|
||||
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.update_frame()
|
||||
self.send_message("Adjust sample height and submit center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
self.movement_buttons_enabled = True
|
||||
@@ -200,7 +175,7 @@ class XrayEyeAlign:
|
||||
umv(dev.rtx, 0)
|
||||
self.tomo_rotate(k * 45)
|
||||
epics_put("XOMNYI-XEYE-ANGLE:0", self.get_tomo_angle())
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.update_frame()
|
||||
self.send_message("Submit sample center")
|
||||
epics_put("XOMNYI-XEYE-SUBMIT:0", 0)
|
||||
epics_put("XOMNYI-XEYE-ENAMVX:0", 1)
|
||||
@@ -224,7 +199,7 @@ class XrayEyeAlign:
|
||||
if k > 0:
|
||||
epics_put(f"XOMNYI-XEYE-STAGEPOSX:{k}", dev.rtx.readback.get() / 1000)
|
||||
time.sleep(3)
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.update_frame()
|
||||
|
||||
if k < 2:
|
||||
# allow movements, store movements to calculate center
|
||||
@@ -235,7 +210,7 @@ class XrayEyeAlign:
|
||||
time.sleep(2)
|
||||
epics_put("XOMNYI-XEYE-MVY:0", 0)
|
||||
self.flomni.feedback_enable_with_reset()
|
||||
self.update_frame(keep_shutter_open)
|
||||
self.update_frame()
|
||||
time.sleep(0.2)
|
||||
|
||||
self.write_output()
|
||||
@@ -246,16 +221,8 @@ class XrayEyeAlign:
|
||||
|
||||
umv(dev.rtx, 0)
|
||||
|
||||
# free camera
|
||||
if self.labview:
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
if keep_shutter_open and not self.labview:
|
||||
if self.flomni.OMNYTools.yesno("Close the shutter now?","y"):
|
||||
fshclose()
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 0)
|
||||
if not self.labview:
|
||||
self.flomni.flomnigui_idle()
|
||||
|
||||
# free camera
|
||||
epics_put("XOMNYI-XEYE-ACQ:0", 2)
|
||||
|
||||
print(
|
||||
f"The largest field of view from the xrayeyealign was \nfovx = {fovx:.0f} microns, fovy"
|
||||
|
||||
@@ -61,3 +61,25 @@ bec._beamline_mixin._bl_info_register(OperatorInfo)
|
||||
# SETUP PROMPTS
|
||||
bec._ip.prompts.session_name = _session_name
|
||||
bec._ip.prompts.status = 1
|
||||
|
||||
# REGISTER BEAMLINE CHECKS
|
||||
from bec_lib.bl_conditions import (
|
||||
FastOrbitFeedbackCondition,
|
||||
LightAvailableCondition,
|
||||
ShutterCondition,
|
||||
)
|
||||
|
||||
if "sls_machine_status" in dev:
|
||||
print("Registering light available condition for SLS machine status")
|
||||
_light_available_condition = LightAvailableCondition(dev.sls_machine_status)
|
||||
bec.bl_checks.register(_light_available_condition)
|
||||
|
||||
if "x12sa_es1_shutter_status" in dev:
|
||||
print("Registering shutter condition for X12SA ES1 shutter status")
|
||||
_shutter_condition = ShutterCondition(dev.x12sa_es1_shutter_status)
|
||||
bec.bl_checks.register(_shutter_condition)
|
||||
|
||||
# if hasattr(dev, "sls_fast_orbit_feedback"):
|
||||
# print("Registering fast orbit feedback condition for SLS fast orbit feedback")
|
||||
# _fast_orbit_feedback_condition = FastOrbitFeedbackCondition(dev.sls_fast_orbit_feedback)
|
||||
# bec.bl_checks.register(_fast_orbit_feedback_condition)
|
||||
|
||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call, rpc_timeout
|
||||
from bec_widgets.cli.rpc.rpc_base import RPCBase, rpc_call
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
@@ -14,7 +14,6 @@ logger = bec_logger.logger
|
||||
|
||||
_Widgets = {
|
||||
"OmnyAlignment": "OmnyAlignment",
|
||||
"XRayEye": "XRayEye",
|
||||
}
|
||||
|
||||
|
||||
@@ -74,75 +73,3 @@ class OmnyAlignment(RPCBase):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
|
||||
class XRayEye(RPCBase):
|
||||
@rpc_call
|
||||
def active_roi(self) -> "BaseROI | None":
|
||||
"""
|
||||
Return the currently active ROI, or None if no ROI is active.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@enable_live_view.setter
|
||||
@rpc_call
|
||||
def enable_live_view(self):
|
||||
"""
|
||||
Get or set the live view enabled state.
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@user_message.setter
|
||||
@rpc_call
|
||||
def user_message(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@sample_name.setter
|
||||
@rpc_call
|
||||
def sample_name(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@property
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
@enable_move_buttons.setter
|
||||
@rpc_call
|
||||
def enable_move_buttons(self):
|
||||
"""
|
||||
None
|
||||
"""
|
||||
|
||||
|
||||
class XRayEye2DControl(RPCBase):
|
||||
@rpc_call
|
||||
def remove(self):
|
||||
"""
|
||||
Cleanup the BECConnector
|
||||
"""
|
||||
|
||||
@@ -63,7 +63,7 @@ class OmnyAlignment(BECWidget, QWidget):
|
||||
|
||||
self.ui.liveViewSwitch.enabled.connect(self.on_live_view_enabled)
|
||||
|
||||
# self.ui.moveUpButton.clicked.connect(self.on_move_up)
|
||||
self.ui.moveUpButton.clicked.connect(self.on_move_up)
|
||||
|
||||
|
||||
@property
|
||||
@@ -98,10 +98,10 @@ class OmnyAlignment(BECWidget, QWidget):
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
image: Image = self.ui.image
|
||||
if enabled:
|
||||
image.image("cam_xeye")
|
||||
image.image("cam200")
|
||||
return
|
||||
|
||||
image.disconnect_monitor("cam_xeye")
|
||||
image.disconnect_monitor("cam200")
|
||||
|
||||
|
||||
@property
|
||||
|
||||
@@ -86,7 +86,7 @@
|
||||
<bool>false</bool>
|
||||
</property>
|
||||
<property name="monitor" stdset="0">
|
||||
<string>cam_xeye</string>
|
||||
<string>cam200</string>
|
||||
</property>
|
||||
<property name="rotation" stdset="0">
|
||||
<number>3</number>
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
def main(): # pragma: no cover
|
||||
from qtpy import PYSIDE6
|
||||
|
||||
if not PYSIDE6:
|
||||
print("PYSIDE6 is not available in the environment. Cannot patch designer.")
|
||||
return
|
||||
from PySide6.QtDesigner import QPyDesignerCustomWidgetCollection
|
||||
|
||||
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye_plugin import XRayEyePlugin
|
||||
|
||||
QPyDesignerCustomWidgetCollection.addCustomWidget(XRayEyePlugin())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
@@ -1,426 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from bec_lib import bec_logger
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_qthemes import material_icon
|
||||
from bec_widgets import BECWidget, SafeProperty, SafeSlot
|
||||
from bec_widgets.widgets.plots.image.image import Image
|
||||
from bec_widgets.widgets.plots.image.setting_widgets.image_roi_tree import ROIPropertyTree
|
||||
from bec_widgets.widgets.plots.roi.image_roi import BaseROI, CircularROI, RectangularROI
|
||||
from bec_widgets.widgets.utility.toggle.toggle import ToggleSwitch
|
||||
from qtpy.QtCore import Qt, QTimer
|
||||
from qtpy.QtWidgets import (
|
||||
QFrame,
|
||||
QGridLayout,
|
||||
QHBoxLayout,
|
||||
QLabel,
|
||||
QLineEdit,
|
||||
QPushButton,
|
||||
QSizePolicy,
|
||||
QSpinBox,
|
||||
QToolButton,
|
||||
QVBoxLayout,
|
||||
QWidget,
|
||||
)
|
||||
|
||||
logger = bec_logger.logger
|
||||
CAMERA = ("cam_xeye", "image")
|
||||
|
||||
|
||||
class XRayEye2DControl(BECWidget, QWidget):
|
||||
def __init__(self, parent=None, step_size: int = 100, *arg, **kwargs):
|
||||
super().__init__(parent=parent, *arg, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
self._step_size = step_size
|
||||
self.root_layout = QGridLayout(self)
|
||||
self.setStyleSheet("""
|
||||
QToolButton {
|
||||
border: 1px solid;
|
||||
border-radius: 4px;
|
||||
}
|
||||
""")
|
||||
# Up
|
||||
self.move_up_button = QToolButton(parent=self)
|
||||
self.move_up_button.setIcon(material_icon('keyboard_double_arrow_up'))
|
||||
self.root_layout.addWidget(self.move_up_button, 0, 2)
|
||||
# Up tweak button
|
||||
self.move_up_tweak_button = QToolButton(parent=self)
|
||||
self.move_up_tweak_button.setIcon(material_icon('keyboard_arrow_up'))
|
||||
self.root_layout.addWidget(self.move_up_tweak_button, 1, 2)
|
||||
|
||||
# Left
|
||||
self.move_left_button = QToolButton(parent=self)
|
||||
self.move_left_button.setIcon(material_icon('keyboard_double_arrow_left'))
|
||||
self.root_layout.addWidget(self.move_left_button, 2, 0)
|
||||
# Left tweak button
|
||||
self.move_left_tweak_button = QToolButton(parent=self)
|
||||
self.move_left_tweak_button.setIcon(material_icon('keyboard_arrow_left'))
|
||||
self.root_layout.addWidget(self.move_left_tweak_button, 2, 1)
|
||||
|
||||
# Right
|
||||
self.move_right_button = QToolButton(parent=self)
|
||||
self.move_right_button.setIcon(material_icon('keyboard_double_arrow_right'))
|
||||
self.root_layout.addWidget(self.move_right_button, 2, 4)
|
||||
# Right tweak button
|
||||
self.move_right_tweak_button = QToolButton(parent=self)
|
||||
self.move_right_tweak_button.setIcon(material_icon('keyboard_arrow_right'))
|
||||
self.root_layout.addWidget(self.move_right_tweak_button, 2, 3)
|
||||
|
||||
# Down
|
||||
self.move_down_button = QToolButton(parent=self)
|
||||
self.move_down_button.setIcon(material_icon('keyboard_double_arrow_down'))
|
||||
self.root_layout.addWidget(self.move_down_button, 4, 2)
|
||||
# Down tweak button
|
||||
self.move_down_tweak_button = QToolButton(parent=self)
|
||||
self.move_down_tweak_button.setIcon(material_icon('keyboard_arrow_down'))
|
||||
self.root_layout.addWidget(self.move_down_tweak_button, 3, 2)
|
||||
|
||||
# Connections
|
||||
self.move_up_button.clicked.connect(lambda: self.move("up", tweak=False))
|
||||
self.move_up_tweak_button.clicked.connect(lambda: self.move("up", tweak=True))
|
||||
self.move_down_button.clicked.connect(lambda: self.move("down", tweak=False))
|
||||
self.move_down_tweak_button.clicked.connect(lambda: self.move("down", tweak=True))
|
||||
self.move_left_button.clicked.connect(lambda: self.move("left", tweak=False))
|
||||
self.move_left_tweak_button.clicked.connect(lambda: self.move("left", tweak=True))
|
||||
self.move_right_button.clicked.connect(lambda: self.move("right", tweak=False))
|
||||
self.move_right_tweak_button.clicked.connect(lambda: self.move("right", tweak=True))
|
||||
|
||||
@SafeProperty(int)
|
||||
def step_size(self) -> int:
|
||||
return self._step_size
|
||||
|
||||
@step_size.setter
|
||||
def step_size(self, step_size: int):
|
||||
self._step_size = step_size
|
||||
|
||||
@SafeSlot(bool)
|
||||
def enable_controls_hor(self, enable: bool):
|
||||
self.move_left_button.setEnabled(enable)
|
||||
self.move_left_tweak_button.setEnabled(enable)
|
||||
self.move_right_button.setEnabled(enable)
|
||||
self.move_right_tweak_button.setEnabled(enable)
|
||||
|
||||
@SafeSlot(bool)
|
||||
def enable_controls_ver(self, enable: bool):
|
||||
self.move_up_button.setEnabled(enable)
|
||||
self.move_up_tweak_button.setEnabled(enable)
|
||||
self.move_down_button.setEnabled(enable)
|
||||
self.move_down_tweak_button.setEnabled(enable)
|
||||
|
||||
def move(self, direction: str, tweak: bool = False):
|
||||
step = self._step_size
|
||||
if tweak:
|
||||
step = int(self._step_size / 5)
|
||||
if direction == "up":
|
||||
self.dev.omny_xray_gui.mvy.set(step)
|
||||
elif direction == "down":
|
||||
self.dev.omny_xray_gui.mvy.set(-step)
|
||||
elif direction == "left":
|
||||
self.dev.omny_xray_gui.mvx.set(-step)
|
||||
elif direction == "right":
|
||||
self.dev.omny_xray_gui.mvx.set(step)
|
||||
else:
|
||||
logger.warning(f"Unknown direction {direction} for move command.")
|
||||
|
||||
|
||||
class XRayEye(BECWidget, QWidget):
|
||||
USER_ACCESS = ["active_roi", "enable_live_view", "enable_live_view.setter", "user_message", "user_message.setter",
|
||||
"sample_name", "sample_name.setter", "enable_move_buttons", "enable_move_buttons.setter"]
|
||||
PLUGIN = True
|
||||
|
||||
def __init__(self, parent=None, **kwargs):
|
||||
super().__init__(parent=parent, **kwargs)
|
||||
self.get_bec_shortcuts()
|
||||
|
||||
self._init_ui()
|
||||
self._make_connections()
|
||||
|
||||
# Connection to redis endpoints
|
||||
self.bec_dispatcher.connect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
self.connect_motors()
|
||||
self.resize(800, 600)
|
||||
QTimer.singleShot(0, self._init_gui_trigger)
|
||||
|
||||
def _init_ui(self):
|
||||
self.core_layout = QHBoxLayout(self)
|
||||
|
||||
self.image = Image(parent=self)
|
||||
self.image.enable_toolbar = False # Disable default toolbar to not allow to user set anything
|
||||
self.image.inner_axes = False # Disable inner axes to maximize image area
|
||||
self.image.plot_item.vb.invertY(True) # #TODO Invert y axis to match logic of LabView GUI
|
||||
|
||||
# Control panel on the right: vertical layout inside a fixed-width widget
|
||||
self.control_panel = QWidget(parent=self)
|
||||
self.control_panel_layout = QVBoxLayout(self.control_panel)
|
||||
self.control_panel_layout.setContentsMargins(0, 0, 0, 0)
|
||||
self.control_panel_layout.setSpacing(10)
|
||||
|
||||
# ROI toolbar + Live toggle (header row)
|
||||
self.roi_manager = ROIPropertyTree(parent=self, image_widget=self.image, compact=True,
|
||||
compact_orientation="horizontal")
|
||||
header_row = QHBoxLayout()
|
||||
header_row.setContentsMargins(0, 0, 0, 0)
|
||||
header_row.setSpacing(8)
|
||||
header_row.addWidget(self.roi_manager, 0)
|
||||
header_row.addStretch()
|
||||
self.live_preview_label = QLabel("Live Preview", parent=self)
|
||||
self.live_preview_toggle = ToggleSwitch(parent=self)
|
||||
self.live_preview_toggle.checked = False
|
||||
header_row.addWidget(self.live_preview_label, 0, Qt.AlignVCenter)
|
||||
header_row.addWidget(self.live_preview_toggle, 0, Qt.AlignVCenter)
|
||||
self.control_panel_layout.addLayout(header_row)
|
||||
|
||||
# separator
|
||||
self.control_panel_layout.addWidget(self._create_separator())
|
||||
|
||||
# 2D Positioner (fixed size)
|
||||
self.motor_control_2d = XRayEye2DControl(parent=self)
|
||||
self.control_panel_layout.addWidget(self.motor_control_2d, 0, Qt.AlignTop | Qt.AlignCenter)
|
||||
|
||||
# separator
|
||||
self.control_panel_layout.addWidget(self._create_separator())
|
||||
|
||||
# Step size label
|
||||
step_size_form = QGridLayout()
|
||||
# General Step size
|
||||
self.step_size = QSpinBox(parent=self)
|
||||
self.step_size.setRange(10, 100)
|
||||
self.step_size.setSingleStep(10)
|
||||
self.step_size.setValue(100)
|
||||
# Submit button
|
||||
self.submit_button = QPushButton("Submit", parent=self)
|
||||
# Add to layout form
|
||||
step_size_form.addWidget(QLabel("Horizontal", parent=self), 0, 0)
|
||||
step_size_form.addWidget(self.step_size, 0, 1)
|
||||
step_size_form.addWidget(QLabel("Vertical", parent=self), 1, 0)
|
||||
step_size_form.addWidget(self.submit_button, 2, 0, 1, 2)
|
||||
|
||||
# Add form to control panel
|
||||
self.control_panel_layout.addLayout(step_size_form)
|
||||
|
||||
# Push form to bottom
|
||||
self.control_panel_layout.addStretch()
|
||||
|
||||
# Sample/Message form (bottom)
|
||||
form = QGridLayout()
|
||||
self.sample_name_line_edit = QLineEdit(parent=self)
|
||||
self.sample_name_line_edit.setReadOnly(True)
|
||||
form.addWidget(QLabel("Sample", parent=self), 0, 0)
|
||||
form.addWidget(self.sample_name_line_edit, 0, 1)
|
||||
self.message_line_edit = QLineEdit(parent=self)
|
||||
self.message_line_edit.setReadOnly(True)
|
||||
form.addWidget(QLabel("Message", parent=self), 1, 0)
|
||||
form.addWidget(self.message_line_edit, 1, 1)
|
||||
self.control_panel_layout.addLayout(form)
|
||||
|
||||
# Fix panel width and allow vertical expansion
|
||||
self.control_panel.adjustSize()
|
||||
p_hint = self.control_panel.sizeHint()
|
||||
self.control_panel.setFixedWidth(p_hint.width())
|
||||
self.control_panel.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
|
||||
|
||||
# Core Layout: image (expanding) | control panel (fixed)
|
||||
self.core_layout.addWidget(self.image)
|
||||
self.core_layout.addWidget(self.control_panel)
|
||||
|
||||
def _make_connections(self):
|
||||
# Fetch initial state
|
||||
self.on_live_view_enabled(True)
|
||||
self.step_size.setValue(self.motor_control_2d.step_size)
|
||||
|
||||
# Make connections
|
||||
self.live_preview_toggle.enabled.connect(self.on_live_view_enabled)
|
||||
self.step_size.valueChanged.connect(lambda x: self.motor_control_2d.setProperty("step_size", x))
|
||||
self.submit_button.clicked.connect(self.submit)
|
||||
|
||||
def _create_separator(self):
|
||||
sep = QFrame(parent=self)
|
||||
sep.setFrameShape(QFrame.HLine)
|
||||
sep.setFrameShadow(QFrame.Sunken)
|
||||
sep.setLineWidth(1)
|
||||
return sep
|
||||
|
||||
def _init_gui_trigger(self):
|
||||
self.dev.omny_xray_gui.read()
|
||||
|
||||
################################################################################
|
||||
# Device Connection logic
|
||||
################################################################################
|
||||
|
||||
def connect_motors(self):
|
||||
""" Checks one of the possible motors for flomni, omny and lamni setup."""
|
||||
possible_motors = ['osamroy', 'lsamrot', 'fsamroy']
|
||||
|
||||
for motor in possible_motors:
|
||||
if motor in self.dev:
|
||||
self.bec_dispatcher.connect_slot(self.on_tomo_angle_readback, MessageEndpoints.device_readback(motor))
|
||||
logger.info(f"Succesfully connected to {motor}")
|
||||
|
||||
################################################################################
|
||||
# Properties ported from the original OmnyAlignment, can be adjusted as needed
|
||||
################################################################################
|
||||
@SafeProperty(str)
|
||||
def user_message(self):
|
||||
return self.message_line_edit.text()
|
||||
|
||||
@user_message.setter
|
||||
def user_message(self, message: str):
|
||||
self.message_line_edit.setText(message)
|
||||
|
||||
@SafeProperty(str)
|
||||
def sample_name(self):
|
||||
return self.sample_name_line_edit.text()
|
||||
|
||||
@sample_name.setter
|
||||
def sample_name(self, message: str):
|
||||
self.sample_name_line_edit.setText(message)
|
||||
|
||||
@SafeProperty(bool)
|
||||
def enable_move_buttons(self):
|
||||
return self.motor_control_2d.isEnabled()
|
||||
|
||||
@enable_move_buttons.setter
|
||||
def enable_move_buttons(self, enabled: bool):
|
||||
self.motor_control_2d.setEnabled(enabled)
|
||||
|
||||
def active_roi(self) -> BaseROI | None:
|
||||
"""Return the currently active ROI, or None if no ROI is active."""
|
||||
return self.roi_manager.single_active_roi
|
||||
|
||||
################################################################################
|
||||
# Slots ported from the original OmnyAlignment, can be adjusted as needed
|
||||
################################################################################
|
||||
|
||||
@SafeSlot()
|
||||
def get_roi_coordinates(self) -> dict | None:
|
||||
"""Get the coordinates of the currently active ROI."""
|
||||
roi = self.roi_manager.single_active_roi
|
||||
if roi is None:
|
||||
logger.warning("No active ROI")
|
||||
return None
|
||||
logger.info(f"Active ROI coordinates: {roi.get_coordinates()}")
|
||||
return roi.get_coordinates()
|
||||
|
||||
@SafeSlot(bool)
|
||||
def on_live_view_enabled(self, enabled: bool):
|
||||
logger.info(f"Live view is enabled: {enabled}")
|
||||
self.live_preview_toggle.blockSignals(True)
|
||||
if enabled:
|
||||
self.live_preview_toggle.checked = enabled
|
||||
self.image.image(CAMERA)
|
||||
self.live_preview_toggle.blockSignals(False)
|
||||
return
|
||||
|
||||
self.image.disconnect_monitor(CAMERA)
|
||||
self.live_preview_toggle.checked = enabled
|
||||
self.live_preview_toggle.blockSignals(False)
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
def on_motors_enable(self, x_enable: bool, y_enable: bool):
|
||||
"""
|
||||
Enable/Disable motor controls
|
||||
|
||||
Args:
|
||||
x_enable(bool): enable x motor controls
|
||||
y_enable(bool): enable y motor controls
|
||||
"""
|
||||
self.motor_control_2d.enable_controls_hor(x_enable)
|
||||
self.motor_control_2d.enable_controls_ver(y_enable)
|
||||
|
||||
@SafeSlot(int)
|
||||
def enable_submit_button(self, enable: int):
|
||||
"""
|
||||
Enable/disable submit button.
|
||||
Args:
|
||||
enable(int): -1 disable else enable
|
||||
"""
|
||||
if enable == -1:
|
||||
self.submit_button.setEnabled(False)
|
||||
else:
|
||||
self.submit_button.setEnabled(True)
|
||||
|
||||
@SafeSlot(bool, bool)
|
||||
def on_tomo_angle_readback(self, data: dict, meta: dict):
|
||||
#TODO implement if needed
|
||||
print(f"data: {data}")
|
||||
print(f"meta: {meta}")
|
||||
|
||||
@SafeSlot(dict, dict)
|
||||
def device_updates(self, data: dict, meta: dict):
|
||||
"""
|
||||
Slot to handle device updates from omny_xray_gui device.
|
||||
|
||||
Args:
|
||||
data(dict): data from device
|
||||
meta(dict): metadata from device
|
||||
"""
|
||||
|
||||
signals = data.get('signals')
|
||||
enable_live_preview = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
enable_x_motor = signals.get("omny_xray_gui_enable_mv_x").get('value')
|
||||
enable_y_motor = signals.get("omny_xray_gui_enable_mv_y").get('value')
|
||||
self.on_live_view_enabled(bool(enable_live_preview))
|
||||
self.on_motors_enable(bool(enable_x_motor), bool(enable_y_motor))
|
||||
|
||||
# Signals from epics gui device
|
||||
# send message
|
||||
user_message = signals.get("omny_xray_gui_send_message").get('value')
|
||||
self.user_message = user_message
|
||||
# sample name
|
||||
sample_message = signals.get("omny_xray_gui_sample_name").get('value')
|
||||
self.sample_name = sample_message
|
||||
# enable frame acquisition
|
||||
update_frame_acq = signals.get("omny_xray_gui_update_frame_acq").get('value')
|
||||
self.on_live_view_enabled(bool(update_frame_acq))
|
||||
# enable submit button
|
||||
enable_submit_button = signals.get("omny_xray_gui_submit").get('value')
|
||||
self.enable_submit_button(enable_submit_button)
|
||||
|
||||
@SafeSlot()
|
||||
def submit(self):
|
||||
"""Execute submit action by submit button."""
|
||||
if self.roi_manager.single_active_roi is None:
|
||||
logger.warning("No active ROI")
|
||||
return
|
||||
roi_coordinates = self.roi_manager.single_active_roi.get_coordinates()
|
||||
roi_center_x = roi_coordinates['center_x']
|
||||
roi_center_y = roi_coordinates['center_y']
|
||||
# Case of rectangular ROI
|
||||
if isinstance(self.roi_manager.single_active_roi, RectangularROI):
|
||||
roi_width = roi_coordinates['width']
|
||||
roi_height = roi_coordinates['height']
|
||||
elif isinstance(self.roi_manager.single_active_roi, CircularROI):
|
||||
roi_width = roi_coordinates['diameter']
|
||||
roi_height = roi_coordinates['radius']
|
||||
else:
|
||||
logger.warning("Unsupported ROI type for submit action.")
|
||||
return
|
||||
|
||||
print(f"current roi: x:{roi_center_x}, y:{roi_center_y}, w:{roi_width},h:{roi_height}") #TODO remove when will be not needed for debugging
|
||||
# submit roi coordinates
|
||||
step = int(self.dev.omny_xray_gui.step.read().get("omny_xray_gui_step").get('value'))
|
||||
|
||||
xval_x = getattr(self.dev.omny_xray_gui.xval_x, f"xval_x_{step}").set(roi_center_x)
|
||||
xval_y = getattr(self.dev.omny_xray_gui.yval_y, f"yval_y_{step}").set(roi_center_y)
|
||||
width_x = getattr(self.dev.omny_xray_gui.width_x, f"width_x_{step}").set(roi_width)
|
||||
width_y = getattr(self.dev.omny_xray_gui.width_y, f"width_y_{step}").set(roi_height)
|
||||
self.dev.omny_xray_gui.submit.set(1)
|
||||
|
||||
def cleanup(self):
|
||||
"""Cleanup connections on widget close -> disconnect slots and stop live mode of camera."""
|
||||
self.bec_dispatcher.disconnect_slot(self.device_updates, MessageEndpoints.device_readback("omny_xray_gui"))
|
||||
getattr(self.dev,CAMERA[0]).live_mode = False
|
||||
super().cleanup()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
from qtpy.QtWidgets import QApplication
|
||||
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
win = XRayEye()
|
||||
|
||||
win.resize(1000, 800)
|
||||
win.show()
|
||||
sys.exit(app.exec_())
|
||||
@@ -1 +0,0 @@
|
||||
{'files': ['x_ray_eye.py']}
|
||||
@@ -1,57 +0,0 @@
|
||||
# Copyright (C) 2022 The Qt Company Ltd.
|
||||
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
|
||||
|
||||
from bec_widgets.utils.bec_designer import designer_material_icon
|
||||
from qtpy.QtDesigner import QDesignerCustomWidgetInterface
|
||||
from qtpy.QtWidgets import QWidget
|
||||
|
||||
from csaxs_bec.bec_widgets.widgets.xray_eye.x_ray_eye import XRayEye
|
||||
|
||||
DOM_XML = """
|
||||
<ui language='c++'>
|
||||
<widget class='XRayEye' name='x_ray_eye'>
|
||||
</widget>
|
||||
</ui>
|
||||
"""
|
||||
|
||||
|
||||
class XRayEyePlugin(QDesignerCustomWidgetInterface): # pragma: no cover
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._form_editor = None
|
||||
|
||||
def createWidget(self, parent):
|
||||
if parent is None:
|
||||
return QWidget()
|
||||
t = XRayEye(parent)
|
||||
return t
|
||||
|
||||
def domXml(self):
|
||||
return DOM_XML
|
||||
|
||||
def group(self):
|
||||
return ""
|
||||
|
||||
def icon(self):
|
||||
return designer_material_icon(XRayEye.ICON_NAME)
|
||||
|
||||
def includeFile(self):
|
||||
return "x_ray_eye"
|
||||
|
||||
def initialize(self, form_editor):
|
||||
self._form_editor = form_editor
|
||||
|
||||
def isContainer(self):
|
||||
return False
|
||||
|
||||
def isInitialized(self):
|
||||
return self._form_editor is not None
|
||||
|
||||
def name(self):
|
||||
return "XRayEye"
|
||||
|
||||
def toolTip(self):
|
||||
return "XRayEye"
|
||||
|
||||
def whatsThis(self):
|
||||
return self.toolTip()
|
||||
@@ -213,8 +213,6 @@ ftransy:
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
userParameter:
|
||||
sensor_voltage: -2.4
|
||||
ftransz:
|
||||
description: Sample transer Z
|
||||
deviceClass: csaxs_bec.devices.omny.galil.fgalil_ophyd.FlomniGalilMotor
|
||||
@@ -335,8 +333,8 @@ rtx:
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
userParameter:
|
||||
low_signal: 10000
|
||||
min_signal: 9000
|
||||
low_signal: 11000
|
||||
min_signal: 10000
|
||||
rt_pid_voltage: -0.06219
|
||||
rty:
|
||||
description: flomni rt
|
||||
@@ -364,105 +362,3 @@ rtz:
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
############################################################
|
||||
####################### Cameras ############################
|
||||
############################################################
|
||||
|
||||
cam_flomni_gripper:
|
||||
description: Camera sample changer
|
||||
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
|
||||
deviceConfig:
|
||||
url: http://flomnicamserver:5000/video_high
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
cam_flomni_overview:
|
||||
description: Camera flomni overview
|
||||
deviceClass: csaxs_bec.devices.omny.webcam_viewer.WebcamViewer
|
||||
deviceConfig:
|
||||
url: http://flomnicamserver:5001/video_high
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
cam_xeye:
|
||||
description: Camera flOMNI Xray eye ID1
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 1
|
||||
bits_per_pixel: 24
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
force_monochrome: true
|
||||
m_n_colormode: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: async
|
||||
|
||||
cam_ids_rgb:
|
||||
description: Camera flOMNI Xray eye ID203
|
||||
deviceClass: csaxs_bec.devices.ids_cameras.ids_camera.IDSCamera
|
||||
deviceConfig:
|
||||
camera_id: 203
|
||||
bits_per_pixel: 24
|
||||
num_rotation_90: 3
|
||||
transpose: false
|
||||
force_monochrome: true
|
||||
m_n_colormode: 1
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: async
|
||||
|
||||
|
||||
# ############################################################
|
||||
# ################### flOMNI temperatures ####################
|
||||
# ############################################################
|
||||
flomni_temphum:
|
||||
description: flOMNI Temperatures and humidity
|
||||
deviceClass: csaxs_bec.devices.omny.flomni_temp_and_humidity.FlomniTempHum
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
# ############################################################
|
||||
# ########## OMNY / flOMNI / LamNI fast shutter ##############
|
||||
# ############################################################
|
||||
omnyfsh:
|
||||
description: omnyfsh connects to read fast shutter at X12 if in that network
|
||||
deviceClass: csaxs_bec.devices.omny.shutter.OMNYFastEpicsShutter
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
############################################################
|
||||
#################### GUI Signals ###########################
|
||||
############################################################
|
||||
omny_xray_gui:
|
||||
description: Gui Epics signals
|
||||
deviceClass: csaxs_bec.devices.omny.xray_epics_gui.OMNYXRayEpicsGUI
|
||||
deviceConfig: {}
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: on_request
|
||||
|
||||
calculated_signal:
|
||||
description: Calculated signal from alignment for fit
|
||||
deviceClass: ophyd_devices.ComputedSignal
|
||||
deviceConfig:
|
||||
compute_method: "def just_rand():\n return 42"
|
||||
enabled: true
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
35
csaxs_bec/device_configs/motors_copy_paste.yaml
Normal file
35
csaxs_bec/device_configs/motors_copy_paste.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
#Standard configuration of an Owis LTM80 linear stage
|
||||
samx:
|
||||
description: Owis LTM80 linear stage
|
||||
deviceClass: ophyd_devices.devices.EpicsMotorEx
|
||||
deviceConfig:
|
||||
prefix: X12SA-ES2-ES01
|
||||
motor_resolution: 0.00125
|
||||
base_velocity: 0.25
|
||||
velocity: 2.5
|
||||
backlash_distance: 0.125
|
||||
acceleration: 0.4
|
||||
user_offset_dir: 0
|
||||
direction_of_travel: 1
|
||||
deviceTags:
|
||||
- cSAXS
|
||||
- user_motor
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
#xbpm1x:
|
||||
# description: 'X-ray BPM1 in frontend translation x'
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: 'X12SA-FE-XBPM1:TRX'
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: baseline
|
||||
# readOnly: false
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - cSAXS
|
||||
# - frontend
|
||||
@@ -26,6 +26,20 @@ dmmx:
|
||||
- cSAXS
|
||||
- optics
|
||||
|
||||
#E_kev:
|
||||
# description: 'Double Multilayer Monochromator, energy (keV)'
|
||||
# deviceClass: ophyd.EpicsMotor
|
||||
# deviceConfig:
|
||||
# prefix: 'X12SA-OP-DMM1:ENERGY'
|
||||
# onFailure: raise
|
||||
# enabled: true
|
||||
# readoutPriority: baseline
|
||||
# readOnly: true
|
||||
# softwareTrigger: false
|
||||
# deviceTags:
|
||||
# - cSAXS
|
||||
# - optics
|
||||
|
||||
dmmy:
|
||||
description: 'Double Multilayer Monochromator, translation Y'
|
||||
deviceClass: ophyd.EpicsMotor
|
||||
|
||||
@@ -37,7 +37,8 @@ import traceback
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd_devices import CompareStatus, DeviceStatus, TransitionStatus
|
||||
from ophyd import DeviceStatus
|
||||
from ophyd_devices import CompareStatus, TransitionStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import (
|
||||
|
||||
@@ -143,7 +143,7 @@ class StatusBitsCompareStatus(SubscriptionStatus):
|
||||
self._add_delay = add_delay or 0
|
||||
self._raise_states = raise_states or []
|
||||
super().__init__(
|
||||
obj=signal,
|
||||
device=signal,
|
||||
callback=self._compare_callback,
|
||||
timeout=timeout,
|
||||
settle_time=settle_time,
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .ids_camera import IDSCamera
|
||||
from .ids_camera_new import IDSCamera
|
||||
|
||||
@@ -18,7 +18,6 @@ import atexit
|
||||
from typing import Literal
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.utils import check_error
|
||||
@@ -67,8 +66,8 @@ class IDSCameraObject:
|
||||
check_error(ueye.is_SetDisplayMode(self.h_cam, ueye.IS_SET_DM_DIB), "IDSCameraObject")
|
||||
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
logger.info("Bayer color mode detected.")
|
||||
# setup the color depth to the current windows setting
|
||||
@@ -77,16 +76,16 @@ class IDSCameraObject:
|
||||
) # TODO This raises an error - maybe check the m_n_colormode value
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
self.n_bits_per_pixel = self.ueye.INT(32)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
self.m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
@@ -160,17 +159,15 @@ class Camera:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
force_monochrome: bool = False,
|
||||
self,
|
||||
camera_id: int,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: int = 24,
|
||||
connect: bool = True,
|
||||
):
|
||||
self.ueye = ueye
|
||||
self.camera_id = camera_id
|
||||
self._inputs = {"m_n_colormode": m_n_colormode, "bits_per_pixel": bits_per_pixel}
|
||||
self.force_monochrome = force_monochrome
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
atexit.register(self.on_disconnect)
|
||||
@@ -200,16 +197,14 @@ class Camera:
|
||||
self.cam = IDSCameraObject(self.camera_id, **self._inputs)
|
||||
self._connected = True
|
||||
|
||||
def on_disconnect(self, delay_after: float = 0.3):
|
||||
"""Disconnect from the camera and optionally wait a short time for driver cleanup."""
|
||||
def on_disconnect(self):
|
||||
"""Disconnect from the camera."""
|
||||
try:
|
||||
if self.cam and self.cam.h_cam:
|
||||
check_error(self.ueye.is_ExitCamera(self.cam.h_cam), "IDSCameraObject")
|
||||
self._connected = False
|
||||
self.cam = None
|
||||
if delay_after > 0:
|
||||
time.sleep(delay_after)
|
||||
logger.debug(f"Waited {delay_after:.2f}s after camera disconnect for cleanup.")
|
||||
logger.info("Camera disconnected.")
|
||||
except Exception as e:
|
||||
logger.info(f"Error during camera disconnection: {e}")
|
||||
|
||||
@@ -268,19 +263,9 @@ class Camera:
|
||||
if array is None:
|
||||
logger.error("Failed to get image data from the camera.")
|
||||
return None
|
||||
img = np.reshape(
|
||||
return np.reshape(
|
||||
array, (self.cam.height.value, self.cam.width.value, self.cam.bytes_per_pixel)
|
||||
)
|
||||
# If RGB image (H, W, 3), reshuffle channels from BGR → RGB
|
||||
if img.ndim == 3 and img.shape[2] == 3:
|
||||
img = img[:, :, ::-1]
|
||||
if self.force_monochrome:
|
||||
gray = np.dot(img[..., :3], [0.2989, 0.5870, 0.1140]).astype(np.uint8)
|
||||
# expand to 3D shape (H, W, 1) for consistency with real mono cams
|
||||
img = np.expand_dims(gray, axis=-1)
|
||||
img = np.ascontiguousarray(img)
|
||||
|
||||
return img
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,225 +1,403 @@
|
||||
"""IDS Camera class for cSAXS IDS cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, Kind, Signal, StatusBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from pydantic import ValidationInfo
|
||||
from ophyd_devices.utils.bec_signals import PreviewSignal
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
"""IDS Camera class for cSAXS.
|
||||
|
||||
This class inherits from PSIDeviceBase and implements the necessary methods
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
class ROISignal(Signal):
|
||||
"""
|
||||
Signal to handle the Region of Interest (ROI) for the IDS camera.
|
||||
It is a tuple of (x, y, width, height).
|
||||
"""
|
||||
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.", num_rotation_90=0,
|
||||
transpose=False)
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
ndim=0,
|
||||
max_size=1000,
|
||||
doc="Signal for the region of interest (ROI).",
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
)
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name,
|
||||
roi: tuple | None = None,
|
||||
value=0,
|
||||
dtype=None,
|
||||
shape=None,
|
||||
timestamp=None,
|
||||
parent=None,
|
||||
labels=None,
|
||||
kind=Kind.hinted,
|
||||
tolerance=None,
|
||||
rtolerance=None,
|
||||
metadata=None,
|
||||
cl=None,
|
||||
attr_name="",
|
||||
):
|
||||
super().__init__(
|
||||
name=name,
|
||||
value=value,
|
||||
dtype=dtype,
|
||||
shape=shape,
|
||||
timestamp=timestamp,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
kind=kind,
|
||||
tolerance=tolerance,
|
||||
rtolerance=rtolerance,
|
||||
metadata=metadata,
|
||||
cl=cl,
|
||||
attr_name=attr_name,
|
||||
)
|
||||
self.roi = roi
|
||||
|
||||
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
|
||||
def get(self, **kwargs):
|
||||
image = self.parent.image_data.get().data
|
||||
if not isinstance(image, np.ndarray):
|
||||
return -1 # -1 if no valid image is available
|
||||
|
||||
if self.roi is None:
|
||||
roi = (0, 0, image.shape[1], image.shape[0])
|
||||
else:
|
||||
roi = self.roi
|
||||
if len(image.shape) > 2:
|
||||
image = np.sum(image, axis=2) # Convert to grayscale if it's a color image
|
||||
return np.sum(image[roi[1] : roi[1] + roi[3], roi[0] : roi[0] + roi[2]], (0, 1))
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
""" "
|
||||
#---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
#Variables
|
||||
hCam = ueye.HIDS(202) #0: first available camera; 1-254: The camera with the specified camera ID
|
||||
sInfo = ueye.SENSORINFO()
|
||||
cInfo = ueye.CAMINFO()
|
||||
pcImageMemory = ueye.c_mem_p()
|
||||
MemID = ueye.int()
|
||||
rectAOI = ueye.IS_RECT()
|
||||
pitch = ueye.INT()
|
||||
nBitsPerPixel = ueye.INT(24) #24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
channels = 3 #3: channels for color mode(RGB); take 1 channel for monochrome
|
||||
m_nColorMode = ueye.INT(1) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
bytes_per_pixel = int(nBitsPerPixel / 8)
|
||||
|
||||
ids_cam
|
||||
...
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["start_live_mode", "stop_live_mode", "set_roi", "width", "height"]
|
||||
|
||||
image_data = Cpt(PreviewSignal, ndim=2, kind=Kind.omitted)
|
||||
# roi_bot_left = Cpt(ROISignal, roi=(400, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_bot_right = Cpt(ROISignal, roi=(518, 525, 118, 105), kind=Kind.normal)
|
||||
# roi_top_left = Cpt(ROISignal, roi=(400, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_top_right = Cpt(ROISignal, roi=(518, 630, 118, 105), kind=Kind.normal)
|
||||
# roi_signal = Cpt(ROISignal, kind=Kind.normal, doc="Region of Interest signal")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
num_rotation_90: int = 0,
|
||||
transpose: bool = False,
|
||||
force_monochrome: bool = False,
|
||||
**kwargs,
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name: str,
|
||||
camera_ID: int,
|
||||
bits_per_pixel: int,
|
||||
channels: int,
|
||||
m_n_colormode: int,
|
||||
kind=None,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
camera_id (int): The ID of the camera device.
|
||||
prefix (str): Prefix for the device.
|
||||
scan_info (ScanInfo | None): Scan information for the device.
|
||||
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
|
||||
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
|
||||
live_mode (bool): Whether to enable live mode for the camera.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._live_mode_thread: threading.Thread | None = None
|
||||
self._stop_live_mode_event: threading.Event = threading.Event()
|
||||
self.cam = Camera(
|
||||
camera_id=camera_id,
|
||||
m_n_colormode=m_n_colormode,
|
||||
bits_per_pixel=bits_per_pixel,
|
||||
connect=False,
|
||||
super().__init__(
|
||||
prefix=prefix, name=name, kind=kind, device_manager=device_manager, **kwargs
|
||||
)
|
||||
self._live_mode = False
|
||||
self._inputs = {"live_mode": live_mode}
|
||||
self._mask = np.zeros((1, 1), dtype=np.uint8)
|
||||
self.image.num_rotation_90 = num_rotation_90
|
||||
self.image.transpose = transpose
|
||||
self._force_monochrome = force_monochrome
|
||||
|
||||
############## Live Mode Methods ##############
|
||||
|
||||
@property
|
||||
def mask(self) -> np.ndarray:
|
||||
"""Return the current region of interest (ROI) for the camera."""
|
||||
return self._mask
|
||||
|
||||
@mask.setter
|
||||
def mask(self, value: np.ndarray):
|
||||
"""
|
||||
Set the region of interest (ROI) for the camera.
|
||||
|
||||
Args:
|
||||
value (np.ndarray): The mask to set as the ROI.
|
||||
"""
|
||||
if value.ndim != 2:
|
||||
raise ValueError("ROI mask must be a 2D array.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
|
||||
raise ValueError(
|
||||
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
|
||||
)
|
||||
self._mask = value
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Return whether the camera is in live mode."""
|
||||
return self._live_mode
|
||||
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool):
|
||||
"""Set the live mode for the camera."""
|
||||
if value != self._live_mode:
|
||||
if self.cam._connected is False: # $ pylint: disable=protected-access
|
||||
self.cam.on_connect()
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live()
|
||||
else:
|
||||
self._stop_live()
|
||||
|
||||
def set_rect_roi(self, x: int, y: int, width: int, height: int):
|
||||
"""Set the rectangular region of interest (ROI) for the camera."""
|
||||
if x < 0 or y < 0 or width <= 0 or height <= 0:
|
||||
raise ValueError("ROI coordinates and dimensions must be positive integers.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if x + width > img_shape[1] or y + height > img_shape[0]:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y: y + height, x: x + width] = 1
|
||||
self.mask = mask
|
||||
|
||||
def _start_live(self):
|
||||
"""Start the live mode for the camera."""
|
||||
if self._live_mode_thread is not None:
|
||||
logger.info("Live mode thread is already running.")
|
||||
return
|
||||
self._stop_live_mode_event.clear()
|
||||
self._live_mode_thread = threading.Thread(
|
||||
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
|
||||
self.camera_ID = camera_ID
|
||||
self.bits_per_pixel = bits_per_pixel
|
||||
self.bytes_per_pixel = None
|
||||
self.channels = channels
|
||||
self._m_n_colormode_input = m_n_colormode
|
||||
self.m_n_colormode = None
|
||||
self.ueye = ueye
|
||||
self.h_cam = None
|
||||
self.s_info = None
|
||||
self.data_thread = None
|
||||
self.c_info = None
|
||||
self.pc_image_memory = None
|
||||
self.mem_id = None
|
||||
self.rect_aoi = None
|
||||
self.pitch = None
|
||||
self.n_bits_per_pixel = None
|
||||
self.width = None
|
||||
self.height = None
|
||||
self.thread_event = threading.Event()
|
||||
self.data_thread = None
|
||||
self._roi: tuple | None = None # x, y, width, height
|
||||
logger.info(
|
||||
f"Deprecation warning: The IDSCamera class is deprecated. Use the new IDSCameraNew class instead."
|
||||
)
|
||||
self._live_mode_thread.start()
|
||||
|
||||
def _stop_live(self):
|
||||
"""Stop the live mode for the camera."""
|
||||
if self._live_mode_thread is None:
|
||||
logger.info("Live mode thread is not running.")
|
||||
return
|
||||
self._stop_live_mode_event.set()
|
||||
self._live_mode_thread.join(timeout=5)
|
||||
if self._live_mode_thread.is_alive():
|
||||
logger.warning("Live mode thread did not stop gracefully.")
|
||||
def set_roi(self, x: int, y: int, width: int, height: int):
|
||||
self._roi = (x, y, width, height)
|
||||
|
||||
def start_backend(self):
|
||||
if self.ueye is None:
|
||||
raise ImportError("The pyueye library is not installed.")
|
||||
self.h_cam = self.ueye.HIDS(
|
||||
self.camera_ID
|
||||
) # 0: first available camera; 1-254: The camera with the specified camera ID
|
||||
self.s_info = self.ueye.SENSORINFO()
|
||||
self.c_info = self.ueye.CAMINFO()
|
||||
self.pc_image_memory = self.ueye.c_mem_p()
|
||||
self.mem_id = self.ueye.int()
|
||||
self.rect_aoi = self.ueye.IS_RECT()
|
||||
self.pitch = self.ueye.INT()
|
||||
self.n_bits_per_pixel = self.ueye.INT(
|
||||
self.bits_per_pixel
|
||||
) # 24: bits per pixel for color mode; take 8 bits per pixel for monochrome
|
||||
self.m_n_colormode = self.ueye.INT(
|
||||
self._m_n_colormode_input
|
||||
) # Y8/RGB16/RGB24/REG32 (1 for our color cameras)
|
||||
self.bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
|
||||
# Starts the driver and establishes the connection to the camera
|
||||
ret = self.ueye.is_InitCamera(self.h_cam, None)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InitCamera ERROR")
|
||||
|
||||
# Reads out the data hard-coded in the non-volatile camera memory and writes it to the data structure that c_info points to
|
||||
ret = self.ueye.is_GetCameraInfo(self.h_cam, self.c_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetCameraInfo ERROR")
|
||||
|
||||
# You can query additional information about the sensor type used in the camera
|
||||
ret = self.ueye.is_GetSensorInfo(self.h_cam, self.s_info)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_GetSensorInfo ERROR")
|
||||
|
||||
ret = self.ueye.is_ResetToDefault(self.h_cam)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_ResetToDefault ERROR")
|
||||
|
||||
# Set display mode to DIB
|
||||
ret = self.ueye.is_SetDisplayMode(self.h_cam, self.ueye.IS_SET_DM_DIB)
|
||||
|
||||
# Set the right color mode
|
||||
if (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_BAYER
|
||||
):
|
||||
# setup the color depth to the current windows setting
|
||||
self.ueye.is_GetColorDepth(self.h_cam, self.n_bits_per_pixel, self.m_n_colormode)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_BAYER: ")
|
||||
print("\tm_n_colormode: \t\t", self.m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", self.n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_CBYCRY
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_BGRA8_PACKED
|
||||
n_bits_per_pixel = self.ueye.INT(32)
|
||||
bytes_per_pixel = int(self.n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_CBYCRY: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
elif (
|
||||
int.from_bytes(self.s_info.nColorMode.value, byteorder="big")
|
||||
== self.ueye.IS_COLORMODE_MONOCHROME
|
||||
):
|
||||
# for color camera models use RGB32 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("IS_COLORMODE_MONOCHROME: ")
|
||||
print("\tm_n_colormode: \t\t", m_n_colormode)
|
||||
print("\tn_bits_per_pixel: \t\t", n_bits_per_pixel)
|
||||
print("\tbytes_per_pixel: \t\t", bytes_per_pixel)
|
||||
print()
|
||||
|
||||
else:
|
||||
self._live_mode_thread = None
|
||||
logger.info("Live mode stopped.")
|
||||
# for monochrome camera models use Y8 mode
|
||||
m_n_colormode = self.ueye.IS_CM_MONO8
|
||||
n_bits_per_pixel = self.ueye.INT(8)
|
||||
bytes_per_pixel = int(n_bits_per_pixel / 8)
|
||||
print("else")
|
||||
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
# Can be used to set the size and position of an "area of interest"(AOI) within an image
|
||||
ret = self.ueye.is_AOI(
|
||||
self.h_cam,
|
||||
self.ueye.IS_AOI_IMAGE_GET_AOI,
|
||||
self.rect_aoi,
|
||||
self.ueye.sizeof(self.rect_aoi),
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AOI ERROR")
|
||||
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
if image is None:
|
||||
return
|
||||
self.image.put(image)
|
||||
self.width = self.rect_aoi.s32Width
|
||||
self.height = self.rect_aoi.s32Height
|
||||
|
||||
def get_last_image(self) -> np.ndarray:
|
||||
"""Get the last captured image from the camera."""
|
||||
image = self.image.get()
|
||||
if image:
|
||||
return image.data
|
||||
# Prints out some information about the camera and the sensor
|
||||
print("Camera model:\t\t", self.s_info.strSensorName.decode("utf-8"))
|
||||
print("Camera serial no.:\t", self.c_info.SerNo.decode("utf-8"))
|
||||
print("Maximum image width:\t", self.width)
|
||||
print("Maximum image height:\t", self.height)
|
||||
print()
|
||||
|
||||
############## User Interface Methods ##############
|
||||
# ---------------------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
def on_connected(self):
|
||||
"""Connect to the camera."""
|
||||
self.cam.force_monochrome = self._force_monochrome
|
||||
self.cam.on_connect()
|
||||
self.live_mode = self._inputs.get("live_mode", False)
|
||||
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self.cam.on_disconnect()
|
||||
super().on_destroy()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Handle the trigger event."""
|
||||
if not self.live_mode:
|
||||
return
|
||||
image = self.image.get()
|
||||
if image is not None:
|
||||
image: messages.DevicePreviewMessage
|
||||
if self.mask.shape[0:2] != image.data.shape[0:2]:
|
||||
logger.info(
|
||||
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
|
||||
)
|
||||
return
|
||||
|
||||
if len(image.data.shape) == 3:
|
||||
# If the image has multiple channels, apply the mask to each channel
|
||||
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
|
||||
n_channels = 3
|
||||
# Allocates an image memory for an image having its dimensions defined by width and height and its color depth defined by n_bits_per_pixel
|
||||
ret = self.ueye.is_AllocImageMem(
|
||||
self.h_cam,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_AllocImageMem ERROR")
|
||||
else:
|
||||
# Makes the specified image memory the active memory
|
||||
ret = self.ueye.is_SetImageMem(self.h_cam, self.pc_image_memory, self.mem_id)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_SetImageMem ERROR")
|
||||
else:
|
||||
data = image.data * self.mask
|
||||
n_channels = 1
|
||||
self.roi_signal.put(np.sum(data) / (np.sum(self.mask) * n_channels))
|
||||
# Set the desired color mode
|
||||
ret = self.ueye.is_SetColorMode(self.h_cam, self.m_n_colormode)
|
||||
|
||||
# Activates the camera's live video mode (free run mode)
|
||||
ret = self.ueye.is_CaptureVideo(self.h_cam, self.ueye.IS_DONT_WAIT)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_CaptureVideo ERROR")
|
||||
|
||||
# Enables the queue mode for existing image memory sequences
|
||||
ret = self.ueye.is_InquireImageMem(
|
||||
self.h_cam,
|
||||
self.pc_image_memory,
|
||||
self.mem_id,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
)
|
||||
if ret != self.ueye.IS_SUCCESS:
|
||||
print("is_InquireImageMem ERROR")
|
||||
else:
|
||||
print("Press q to leave the programm")
|
||||
# startmeasureframerate = True
|
||||
# Gain = False
|
||||
|
||||
# Start live mode of camera immediately
|
||||
self.start_live_mode()
|
||||
|
||||
def _start_data_thread(self):
|
||||
self.data_thread = threading.Thread(target=self._receive_data_from_camera, daemon=True)
|
||||
self.data_thread.start()
|
||||
|
||||
def _receive_data_from_camera(self):
|
||||
while not self.thread_event.is_set():
|
||||
if self.ueye is None:
|
||||
print("pyueye library not available.")
|
||||
return
|
||||
# In order to display the image in an OpenCV window we need to...
|
||||
# ...extract the data of our image memory
|
||||
array = self.ueye.get_data(
|
||||
self.pc_image_memory,
|
||||
self.width,
|
||||
self.height,
|
||||
self.n_bits_per_pixel,
|
||||
self.pitch,
|
||||
copy=False,
|
||||
)
|
||||
|
||||
# ...reshape it in an numpy array...
|
||||
frame = np.reshape(array, (self.height.value, self.width.value, self.bytes_per_pixel))
|
||||
self.image_data.put(frame)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
def wait_for_connection(self, all_signals=False, timeout=10):
|
||||
if ueye is None:
|
||||
raise ImportError(
|
||||
"The pyueye library is not installed or doesn't provide the necessary c libs"
|
||||
)
|
||||
super().wait_for_connection(all_signals, timeout)
|
||||
|
||||
def start_live_mode(self):
|
||||
if self.data_thread is not None:
|
||||
self.stop_live_mode()
|
||||
self._start_data_thread()
|
||||
|
||||
def stop_live_mode(self):
|
||||
"""Stopping the camera live mode."""
|
||||
self.thread_event.set()
|
||||
if self.data_thread is not None:
|
||||
self.data_thread.join()
|
||||
self.thread_event.clear()
|
||||
self.data_thread = None
|
||||
|
||||
########################################
|
||||
# Beamline Specific Implementations #
|
||||
########################################
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
Called when the device is initialized.
|
||||
|
||||
No signals are connected at this point. If you like to
|
||||
set default values on signals, please use on_connected instead.
|
||||
"""
|
||||
|
||||
def on_connected(self) -> None:
|
||||
"""
|
||||
Called after the device is connected and its signals are connected.
|
||||
Default values for signals should be set here.
|
||||
"""
|
||||
self.start_backend()
|
||||
self.start_live_mode()
|
||||
|
||||
def on_stage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""
|
||||
Called while staging the device.
|
||||
|
||||
Information about the upcoming scan can be accessed from the scan_info (self.scan_info.msg) object.
|
||||
"""
|
||||
|
||||
def on_unstage(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called while unstaging the device."""
|
||||
|
||||
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called right before the scan starts on all devices automatically."""
|
||||
|
||||
def on_trigger(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called when the device is triggered."""
|
||||
|
||||
def on_complete(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to inquire if a device has completed a scans."""
|
||||
|
||||
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
|
||||
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Called when the device is stopped."""
|
||||
|
||||
def on_destroy(self) -> None:
|
||||
"""Called when the device is destroyed. Cleanup resources here."""
|
||||
self.stop_live_mode()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage of the IDSCamera class
|
||||
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
|
||||
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
|
||||
# Example usage
|
||||
camera = IDSCamera(name="camera", camera_ID=201, bits_per_pixel=24, channels=3, m_n_colormode=1)
|
||||
camera.wait_for_connection()
|
||||
|
||||
camera.on_destroy()
|
||||
|
||||
226
csaxs_bec/devices/ids_cameras/ids_camera_new.py
Normal file
226
csaxs_bec/devices/ids_cameras/ids_camera_new.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""IDS Camera class for cSAXS IDS cameras."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Literal, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import messages
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_signals import AsyncSignal, PreviewSignal
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.base_integration.camera import Camera
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from pydantic import ValidationInfo
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class IDSCamera(PSIDeviceBase):
|
||||
"""IDS Camera class for cSAXS.
|
||||
|
||||
This class inherits from PSIDeviceBase and implements the necessary methods
|
||||
to interact with the IDS camera using the pyueye library.
|
||||
"""
|
||||
|
||||
image = Cpt(PreviewSignal, name="image", ndim=2, doc="Preview signal for the camera.")
|
||||
roi_signal = Cpt(
|
||||
AsyncSignal,
|
||||
name="roi_signal",
|
||||
ndim=0,
|
||||
max_size=1000,
|
||||
doc="Signal for the region of interest (ROI).",
|
||||
async_update={"type": "add", "max_shape": [None]},
|
||||
)
|
||||
|
||||
USER_ACCESS = ["live_mode", "mask", "set_rect_roi", "get_last_image"]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name: str,
|
||||
camera_id: int,
|
||||
prefix: str = "",
|
||||
scan_info: ScanInfo | None = None,
|
||||
m_n_colormode: Literal[0, 1, 2, 3] = 1,
|
||||
bits_per_pixel: Literal[8, 24] = 24,
|
||||
live_mode: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the IDS Camera.
|
||||
|
||||
Args:
|
||||
name (str): Name of the device.
|
||||
camera_id (int): The ID of the camera device.
|
||||
prefix (str): Prefix for the device.
|
||||
scan_info (ScanInfo | None): Scan information for the device.
|
||||
m_n_colormode (Literal[0, 1, 2, 3]): Color mode for the camera.
|
||||
bits_per_pixel (Literal[8, 24]): Number of bits per pixel for the camera.
|
||||
live_mode (bool): Whether to enable live mode for the camera.
|
||||
"""
|
||||
super().__init__(name=name, prefix=prefix, scan_info=scan_info, **kwargs)
|
||||
self._live_mode_thread: threading.Thread | None = None
|
||||
self._stop_live_mode_event: threading.Event = threading.Event()
|
||||
self.cam = Camera(
|
||||
camera_id=camera_id,
|
||||
m_n_colormode=m_n_colormode,
|
||||
bits_per_pixel=bits_per_pixel,
|
||||
connect=False,
|
||||
)
|
||||
self._live_mode = False
|
||||
self._inputs = {"live_mode": live_mode}
|
||||
self._mask = np.zeros((1, 1), dtype=np.uint8)
|
||||
|
||||
############## Live Mode Methods ##############
|
||||
|
||||
@property
|
||||
def mask(self) -> np.ndarray:
|
||||
"""Return the current region of interest (ROI) for the camera."""
|
||||
return self._mask
|
||||
|
||||
@mask.setter
|
||||
def mask(self, value: np.ndarray):
|
||||
"""
|
||||
Set the region of interest (ROI) for the camera.
|
||||
|
||||
Args:
|
||||
value (np.ndarray): The mask to set as the ROI.
|
||||
"""
|
||||
if value.ndim != 2:
|
||||
raise ValueError("ROI mask must be a 2D array.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if value.shape[0] != img_shape[0] or value.shape[1] != img_shape[1]:
|
||||
raise ValueError(
|
||||
f"ROI mask shape {value.shape} does not match image shape {img_shape}."
|
||||
)
|
||||
self._mask = value
|
||||
|
||||
@property
|
||||
def live_mode(self) -> bool:
|
||||
"""Return whether the camera is in live mode."""
|
||||
return self._live_mode
|
||||
|
||||
@live_mode.setter
|
||||
def live_mode(self, value: bool):
|
||||
"""Set the live mode for the camera."""
|
||||
if value != self._live_mode:
|
||||
if self.cam._connected is False: # $ pylint: disable=protected-access
|
||||
self.cam.on_connect()
|
||||
self._live_mode = value
|
||||
if value:
|
||||
self._start_live()
|
||||
else:
|
||||
self._stop_live()
|
||||
|
||||
def set_rect_roi(self, x: int, y: int, width: int, height: int):
|
||||
"""Set the rectangular region of interest (ROI) for the camera."""
|
||||
if x < 0 or y < 0 or width <= 0 or height <= 0:
|
||||
raise ValueError("ROI coordinates and dimensions must be positive integers.")
|
||||
img_shape = (self.cam.cam.height.value, self.cam.cam.width.value)
|
||||
if x + width > img_shape[1] or y + height > img_shape[0]:
|
||||
raise ValueError("ROI exceeds camera dimensions.")
|
||||
mask = np.zeros(img_shape, dtype=np.uint8)
|
||||
mask[y : y + height, x : x + width] = 1
|
||||
self.mask = mask
|
||||
|
||||
def _start_live(self):
|
||||
"""Start the live mode for the camera."""
|
||||
if self._live_mode_thread is not None:
|
||||
logger.info("Live mode thread is already running.")
|
||||
return
|
||||
self._stop_live_mode_event.clear()
|
||||
self._live_mode_thread = threading.Thread(
|
||||
target=self._live_mode_loop, args=(self._stop_live_mode_event,)
|
||||
)
|
||||
self._live_mode_thread.start()
|
||||
|
||||
def _stop_live(self):
|
||||
"""Stop the live mode for the camera."""
|
||||
if self._live_mode_thread is None:
|
||||
logger.info("Live mode thread is not running.")
|
||||
return
|
||||
self._stop_live_mode_event.set()
|
||||
self._live_mode_thread.join(timeout=5)
|
||||
if self._live_mode_thread.is_alive():
|
||||
logger.warning("Live mode thread did not stop gracefully.")
|
||||
else:
|
||||
self._live_mode_thread = None
|
||||
logger.info("Live mode stopped.")
|
||||
|
||||
def _live_mode_loop(self, stop_event: threading.Event):
|
||||
"""Loop to capture images in live mode."""
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
self.process_data(self.cam.get_image_data())
|
||||
except Exception as e:
|
||||
logger.error(f"Error in live mode loop: {e}")
|
||||
break
|
||||
stop_event.wait(0.2) # 5 Hz
|
||||
|
||||
def process_data(self, image: np.ndarray | None):
|
||||
"""Process the image data before sending it to the preview signal."""
|
||||
if image is None:
|
||||
return
|
||||
self.image.put(image)
|
||||
|
||||
def get_last_image(self) -> np.ndarray:
|
||||
"""Get the last captured image from the camera."""
|
||||
image = self.image.get()
|
||||
if image:
|
||||
return image.data
|
||||
|
||||
############## User Interface Methods ##############
|
||||
|
||||
def on_connected(self):
|
||||
"""Connect to the camera."""
|
||||
self.cam.on_connect()
|
||||
self.live_mode = self._inputs.get("live_mode", False)
|
||||
self.set_rect_roi(0, 0, self.cam.cam.width.value, self.cam.cam.height.value)
|
||||
|
||||
def on_destroy(self):
|
||||
"""Clean up resources when the device is destroyed."""
|
||||
self.cam.on_disconnect()
|
||||
super().on_destroy()
|
||||
|
||||
def on_trigger(self):
|
||||
"""Handle the trigger event."""
|
||||
if not self.live_mode:
|
||||
return
|
||||
image = self.image.get()
|
||||
if image is not None:
|
||||
image: messages.DevicePreviewMessage
|
||||
if self.mask.shape[0:2] != image.data.shape[0:2]:
|
||||
logger.info(
|
||||
f"ROI shape does not match image shape, skipping ROI application for device {self.name}."
|
||||
)
|
||||
return
|
||||
|
||||
if len(image.data.shape) == 3:
|
||||
# If the image has multiple channels, apply the mask to each channel
|
||||
data = image.data * self.mask[:, :, np.newaxis] # Apply mask to the image data
|
||||
n_channels = 3
|
||||
else:
|
||||
data = image.data * self.mask
|
||||
n_channels = 1
|
||||
self.roi_signal.put(
|
||||
{
|
||||
self.roi_signal.name: {
|
||||
"value": np.sum(data)
|
||||
/ (np.sum(self.mask) * n_channels), # TODO could be optimized
|
||||
"timestamp": time.time(),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage of the IDSCamera class
|
||||
camera = IDSCamera(name="TestCamera", camera_id=201, live_mode=False)
|
||||
print(f"Camera {camera.name} initialized with ID {camera.cam.camera_id}.")
|
||||
@@ -412,11 +412,10 @@ class NPointAxis(Device, PositionerBase):
|
||||
sign=1,
|
||||
socket_cls=SocketIO,
|
||||
tolerance: float = 0.05,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = NPointController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
|
||||
@@ -24,25 +24,24 @@ class FlomniSampleStorage(Device):
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
sample_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {"auto_monitor": True}) for i in range(21)
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET", {}) for i in range(21)
|
||||
}
|
||||
sample_placed = Dcpt(sample_placed)
|
||||
|
||||
sample_names = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True, "auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_flomni{i}:GET.DESC", {"string": True})
|
||||
for i in range(21)
|
||||
}
|
||||
sample_names = Dcpt(sample_names)
|
||||
|
||||
sample_in_gripper = Cpt(
|
||||
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET", auto_monitor=True
|
||||
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_flomni100:GET"
|
||||
)
|
||||
sample_in_gripper_name = Cpt(
|
||||
EpicsSignal,
|
||||
name="sample_in_gripper_name",
|
||||
read_pv="XOMNY-SAMPLE_DB_flomni100:GET.DESC",
|
||||
string=True,
|
||||
auto_monitor=True
|
||||
)
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
|
||||
@@ -1,208 +0,0 @@
|
||||
import time
|
||||
import datetime
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd import DynamicDeviceComponent as Dcpt
|
||||
from ophyd import EpicsSignal
|
||||
from prettytable import FRAME, PrettyTable
|
||||
import numpy as np
|
||||
|
||||
class FlomniTempHumError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FlomniTempHum(Device):
|
||||
USER_ACCESS = [
|
||||
"show_all",
|
||||
"help",
|
||||
]
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
|
||||
temperature_mirror = Cpt(
|
||||
EpicsSignal, name="temperature_mirror", read_pv="XOMNI-TEMPHUM-MIRROR:0.VAL"
|
||||
)
|
||||
temperature_mirrorset_set = Cpt(
|
||||
EpicsSignal, name="temperature_mirrorset_set", read_pv="XOMNI-TEMPHUM-MIRRORSET_SET:0.VAL"
|
||||
)
|
||||
temperature_mirrorset_rb = Cpt(
|
||||
EpicsSignal, name="temperature_mirrorset_rb", read_pv="XOMNI-TEMPHUM-MIRRORSET_RB:0.VAL"
|
||||
)
|
||||
|
||||
temperature_osa = Cpt(
|
||||
EpicsSignal, name="temperature_osa", read_pv="XOMNI-TEMPHUM-OSA:0.VAL"
|
||||
)
|
||||
temperature_osaset_set = Cpt(
|
||||
EpicsSignal, name="temperature_osaset_set", read_pv="XOMNI-TEMPHUM-OSASET_SET:0.VAL"
|
||||
)
|
||||
temperature_osaset_rb = Cpt(
|
||||
EpicsSignal, name="temperature_osaset_rb", read_pv="XOMNI-TEMPHUM-OSASET_RB:0.VAL"
|
||||
)
|
||||
|
||||
omegactrl_alive = Cpt(
|
||||
EpicsSignal, name="omegactrl_alive", read_pv="XOMNI-TEMPHUM-OMEGACTRL-ALIVE:0.VAL"
|
||||
)
|
||||
galilctrl_alive = Cpt(
|
||||
EpicsSignal, name="galilctrl_alive", read_pv="XOMNI-TEMPHUM-GALILCTRL-ALIVE:0.VAL"
|
||||
)
|
||||
|
||||
temperature_heater = Cpt(
|
||||
EpicsSignal, name="temperature_heater", read_pv="XOMNI-TEMPHUM-HEATER:0.VAL"
|
||||
)
|
||||
temperature_heaterset_set = Cpt(
|
||||
EpicsSignal, name="temperature_heaterset_set", read_pv="XOMNI-TEMPHUM-HEATERSET_SET:0.VAL"
|
||||
)
|
||||
temperature_heaterset_rb = Cpt(
|
||||
EpicsSignal, name="temperature_heaterset_rb", read_pv="XOMNI-TEMPHUM-HEATERSET_RB:0.VAL"
|
||||
)
|
||||
temperature_heaterhousing = Cpt(
|
||||
EpicsSignal, name="temperature_heaterhousing", read_pv="XOMNI-TEMPHUM-HEATERHOUSE:0.VAL"
|
||||
)
|
||||
temperature_heaterhousing_alarm = Cpt(
|
||||
EpicsSignal, name="temperature_heaterhousing_alarm", read_pv="XOMNI-TEMPHUM-HEATERHOUSEALARM:0.VAL"
|
||||
)
|
||||
temperature_heater_enabled = Cpt(
|
||||
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
|
||||
)
|
||||
|
||||
temperature_heater_enabled = Cpt(
|
||||
EpicsSignal, name="temperature_heater_enabled", read_pv="XOMNI-TEMPHUM-HEAT_EN:0.VAL"
|
||||
)
|
||||
|
||||
###### GALIL CONTROLLER
|
||||
|
||||
|
||||
humidity_sensor1 = Cpt(
|
||||
EpicsSignal, name="humidity_sensor1", read_pv="XOMNI-TEMPHUM-HUM1:0.VAL"
|
||||
)
|
||||
|
||||
humidity_sensor2 = Cpt(
|
||||
EpicsSignal, name="humidity_sensor2", read_pv="XOMNI-TEMPHUM-HUM2:0.VAL"
|
||||
)
|
||||
|
||||
humidity_sensor1_temperature = Cpt(
|
||||
EpicsSignal, name="humidity_sensor1_temperature", read_pv="XOMNI-TEMPHUM-TEMP1:0.VAL"
|
||||
)
|
||||
|
||||
humidity_sensor2_temperature = Cpt(
|
||||
EpicsSignal, name="humidity_sensor2_temperature", read_pv="XOMNI-TEMPHUM-TEMP2:0.VAL"
|
||||
)
|
||||
|
||||
humidity_sensor1_err = Cpt(
|
||||
EpicsSignal, name="humidity_sensor1_err", read_pv="XOMNI-TEMPHUM-ERR1:0.VAL"
|
||||
)
|
||||
|
||||
humidity_sensor2_err = Cpt(
|
||||
EpicsSignal, name="humidity_sensor2_err", read_pv="XOMNI-TEMPHUM-ERR2:0.VAL"
|
||||
)
|
||||
|
||||
flow = Cpt(
|
||||
EpicsSignal, name="flow", read_pv="XOMNI-TEMPHUM-FLOW:0.VAL"
|
||||
)
|
||||
|
||||
flowset = Cpt(
|
||||
EpicsSignal, name="flowset", read_pv="XOMNI-TEMPHUM-FLOWSET:0.VAL"
|
||||
)
|
||||
|
||||
flowset_set = Cpt(
|
||||
EpicsSignal, name="flowset_set", read_pv="XOMNI-TEMPHUM-FLOWSETSET:0.VAL"
|
||||
)
|
||||
|
||||
humidityset = Cpt(
|
||||
EpicsSignal, name="humidityset", read_pv="XOMNI-TEMPHUM-HUMSET:0.VAL"
|
||||
)
|
||||
|
||||
humidityset_set = Cpt(
|
||||
EpicsSignal, name="humidityset_set", read_pv="XOMNI-TEMPHUM-HUMSETSET:0.VAL"
|
||||
)
|
||||
|
||||
suction = Cpt(
|
||||
EpicsSignal, name="suction", read_pv="XOMNI-TEMPHUM-SUCTION:0.VAL"
|
||||
)
|
||||
|
||||
valvedry = Cpt(
|
||||
EpicsSignal, name="valvedry", read_pv="XOMNI-TEMPHUM-VALVEDRY:0.VAL"
|
||||
)
|
||||
|
||||
valvewet = Cpt(
|
||||
EpicsSignal, name="valvewet", read_pv="XOMNI-TEMPHUM-VALVEWET:0.VAL"
|
||||
)
|
||||
|
||||
setuptemp = Cpt(
|
||||
EpicsSignal, name="setuptemp", read_pv="XOMNI-TEMPHUM-SETUPTEMP:0.VAL"
|
||||
)
|
||||
|
||||
|
||||
|
||||
def omega_controller_running(self):
|
||||
time_diff = np.fabs(float(self.omegactrl_alive.get()) - time.time())
|
||||
if time_diff > 120:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def galil_controller_running(self):
|
||||
time_diff = np.fabs(float(self.galilctrl_alive.get()) - time.time())
|
||||
if time_diff > 120:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
super().__init__(prefix, name=name, **kwargs)
|
||||
self.temperature_mirror.subscribe(self._emit_value, run=False)
|
||||
|
||||
def _emit_value(self, **kwargs):
|
||||
timestamp = kwargs.pop("timestamp", time.time())
|
||||
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
|
||||
|
||||
|
||||
|
||||
def show_all(self):
|
||||
print("=== flOMNI Temperature & Humidity Overview ===")
|
||||
print("")
|
||||
print("Temperatures:")
|
||||
print(f" Mirror: {float(self.temperature_mirror.get()):7.2f} °C")
|
||||
print(f" Mirror Setpoint (RB): {float(self.temperature_mirrorset_rb.get()):7.2f} °C")
|
||||
print(f" OSA: {float(self.temperature_osa.get()):7.2f} °C")
|
||||
print(f" OSA Setpoint (RB): {float(self.temperature_osaset_rb.get()):7.2f} °C")
|
||||
print(f" Heater: {float(self.temperature_heater.get()):7.2f} °C")
|
||||
print(f" Heater Setpoint (RB): {float(self.temperature_heaterset_rb.get()):7.2f} °C")
|
||||
print(f" Heater Enabled: {float(self.temperature_heater_enabled.get()):.0f}")
|
||||
print(f" Heater Housing: {float(self.temperature_heaterhousing.get()):7.2f} °C")
|
||||
print(f" Heater Housing Alarm: {float(self.temperature_heaterhousing_alarm.get()):.0f}")
|
||||
print("")
|
||||
print("Humidity Sensors:")
|
||||
print(f" Sensor 1 Humidity: {float(self.humidity_sensor1.get()):7.2f} %RH")
|
||||
print(f" Sensor 1 Temperature: {float(self.humidity_sensor1_temperature.get()):7.2f} °C")
|
||||
print(f" Sensor 1 Error: {float(self.humidity_sensor1_err.get()):.0f}")
|
||||
print(f" Sensor 2 Humidity: {float(self.humidity_sensor2.get()):7.2f} %RH")
|
||||
print(f" Sensor 2 Temperature: {float(self.humidity_sensor2_temperature.get()):7.2f} °C")
|
||||
print(f" Sensor 2 Error: {float(self.humidity_sensor2_err.get()):.0f}")
|
||||
print(f" Humidity Setpoint: {float(self.humidityset.get()):7.2f} %RH")
|
||||
print("")
|
||||
print("Flow Control:")
|
||||
print(f" Flow: {float(self.flow.get()):7.2f} sccm")
|
||||
print(f" Flow Setpoint (RB): {float(self.flowset.get()):7.2f} sccm")
|
||||
print("")
|
||||
print("Suction:")
|
||||
print(f" Suction: {float(self.suction.get()):7.2f}")
|
||||
print("")
|
||||
print("Valves:")
|
||||
print(f" Dry Valve: {float(self.valvedry.get()):.0f}")
|
||||
print(f" Wet Valve: {float(self.valvewet.get()):.0f}")
|
||||
print("")
|
||||
print("Controller Heartbeats:")
|
||||
print(f" OMEGA Controller Alive: {self.omega_controller_running()}")
|
||||
print(f" GALIL Controller Alive: {self.galil_controller_running()}")
|
||||
print("==============================================")
|
||||
|
||||
|
||||
def help(self):
|
||||
print("Help for flOMNI temperature and humidity control system:")
|
||||
print("Available methods:")
|
||||
print(" show_all() - display all current values")
|
||||
@@ -175,7 +175,7 @@ class FlomniGalilMotor(Device, PositionerBase):
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = FlomniGalilController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
@@ -212,10 +212,6 @@ class FlomniGalilMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
@@ -346,10 +342,10 @@ class FlomniGalilMotor(Device, PositionerBase):
|
||||
Drive an axis to the limit in a specified direction.
|
||||
|
||||
Args:
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
"""
|
||||
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
|
||||
# now force position read to cache
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
|
||||
@@ -149,7 +149,7 @@ class FuprGalilMotor(Device, PositionerBase):
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = FuprGalilController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
@@ -185,10 +185,6 @@ class FuprGalilMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
|
||||
@@ -59,12 +59,12 @@ class GalilController(Controller):
|
||||
"all_axes_referenced",
|
||||
]
|
||||
|
||||
OKBLUE = "\033[94m"
|
||||
OKCYAN = "\033[96m"
|
||||
OKGREEN = "\033[92m"
|
||||
WARNING = "\033[93m"
|
||||
FAIL = "\033[91m"
|
||||
ENDC = "\033[0m"
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
|
||||
@threadlocked
|
||||
def socket_put(self, val: str) -> None:
|
||||
@@ -115,29 +115,29 @@ class GalilController(Controller):
|
||||
|
||||
def axis_is_referenced(self, axis_Id_numeric) -> bool:
|
||||
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
|
||||
|
||||
|
||||
def folerr_status(self, axis_Id_numeric) -> bool:
|
||||
return bool(float(self.socket_put_and_receive(f"MG folaxerr[{axis_Id_numeric}]").strip()))
|
||||
|
||||
def motor_temperature(self, axis_Id_numeric) -> float:
|
||||
# this is only valid for omny. consider moving to ogalil
|
||||
#this is only valid for omny. consider moving to ogalil
|
||||
voltage = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip())
|
||||
voltage2 = float(self.socket_put_and_receive(f"MG @AN[{axis_Id_numeric+1}]").strip())
|
||||
if voltage2 < voltage:
|
||||
voltage = voltage2
|
||||
# convert from [-10,10]V to [0,300]degC
|
||||
temperature_degC = round((voltage + 10.0) / 20.0 * 300.0, 1)
|
||||
temperature_degC = round((voltage+10.0) / 20.0 * 300.0, 1)
|
||||
|
||||
# the motors of the parking station have a different offset
|
||||
# the range is reduced, so if at the limit, we show an extreme value
|
||||
#the motors of the parking station have a different offset
|
||||
#the range is reduced, so if at the limit, we show an extreme value
|
||||
if self.sock.port == 8082:
|
||||
# controller 2
|
||||
#controller 2
|
||||
if axis_Id_numeric == 6:
|
||||
temperature_degC = round((voltage + 10.0 - 11.4) / 20.0 * 300.0, 1)
|
||||
temperature_degC = round((voltage+10.0-11.4) / 20.0 * 300.0, 1)
|
||||
if voltage > 9.9:
|
||||
temperature_degC = 300
|
||||
if axis_Id_numeric == 7:
|
||||
temperature_degC = round((voltage + 0.0 - 12) / 20.0 * 300.0, 1)
|
||||
temperature_degC = round((voltage+.0-12) / 20.0 * 300.0, 1)
|
||||
if voltage > 9.9:
|
||||
temperature_degC = 300
|
||||
return temperature_degC
|
||||
@@ -147,15 +147,16 @@ class GalilController(Controller):
|
||||
Check if all axes are referenced.
|
||||
"""
|
||||
return bool(float(self.socket_put_and_receive("MG allaxref").strip()))
|
||||
|
||||
def _omny_get_microstep_position(self, axis_Id):
|
||||
|
||||
def _omny_get_microstep_position(self,axis_Id):
|
||||
return float(self.socket_put_and_receive(f"MG _TD{axis_Id}").strip())
|
||||
|
||||
def _omny_get_reference_limit(self, axis_Id):
|
||||
|
||||
|
||||
def _omny_get_reference_limit(self,axis_Id):
|
||||
get_axis_no = float(self.socket_put_and_receive(f"MG frmmv").strip())
|
||||
if get_axis_no > 0:
|
||||
if(get_axis_no>0):
|
||||
reference_is_before = float(self.socket_put_and_receive(f"MG _FL{axis_Id}").strip())
|
||||
elif get_axis_no < 0:
|
||||
elif(get_axis_no<0):
|
||||
reference_is_before = float(self.socket_put_and_receive(f"MG _BL{axis_Id}").strip())
|
||||
else:
|
||||
reference_is_before = 0
|
||||
@@ -186,11 +187,7 @@ class GalilController(Controller):
|
||||
while self.is_axis_moving(None, axis_Id_numeric):
|
||||
time.sleep(0.01)
|
||||
if verbose:
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}",
|
||||
scope="drive axis to limit",
|
||||
show_asap=True,
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f}", scope="drive axis to limit", show_asap=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
# check if we actually hit the limit
|
||||
@@ -204,7 +201,13 @@ class GalilController(Controller):
|
||||
else:
|
||||
print("Limit reached.")
|
||||
|
||||
def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error=1) -> None:
|
||||
def get_device_manager(self):
|
||||
for axis in self._axis:
|
||||
if hasattr(axis, "device_manager") and axis.device_manager:
|
||||
return axis.device_manager
|
||||
raise BECConfigError("Could not access the device_manager")
|
||||
|
||||
def find_reference(self, axis_Id_numeric: int, verbose=0, raise_error = 1) -> None:
|
||||
"""
|
||||
Find the reference of an axis.
|
||||
|
||||
@@ -221,11 +224,7 @@ class GalilController(Controller):
|
||||
while self.is_axis_moving(None, axis_Id_numeric):
|
||||
time.sleep(0.1)
|
||||
if verbose:
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}",
|
||||
scope="find axis reference",
|
||||
show_asap=True,
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info(f"Current microstep position {self._omny_get_microstep_position(axis_Id):.0f} reference is before {self._omny_get_reference_limit(axis_Id)}", scope="find axis reference", show_asap=True)
|
||||
time.sleep(0.5)
|
||||
|
||||
if not self.axis_is_referenced(axis_Id_numeric):
|
||||
@@ -237,6 +236,7 @@ class GalilController(Controller):
|
||||
logger.info(f"Successfully found reference of axis {axis_Id_numeric}.")
|
||||
print(f"Successfully found reference of axis {axis_Id_numeric}.")
|
||||
|
||||
|
||||
def show_running_threads(self) -> None:
|
||||
t = PrettyTable()
|
||||
t.title = f"Threads on {self.sock.host}:{self.sock.port}"
|
||||
@@ -251,7 +251,7 @@ class GalilController(Controller):
|
||||
|
||||
def is_motor_on(self, axis_Id) -> bool:
|
||||
return not bool(float(self.socket_put_and_receive(f"MG _MO{axis_Id}").strip()))
|
||||
|
||||
|
||||
def get_motor_limit_switch(self, axis_Id) -> list:
|
||||
"""
|
||||
Get the status of the motor limit switches.
|
||||
@@ -269,7 +269,14 @@ class GalilController(Controller):
|
||||
def describe(self) -> None:
|
||||
t = PrettyTable()
|
||||
t.title = f"{self.__class__.__name__} on {self.sock.host}:{self.sock.port}"
|
||||
field_names = ["Axis", "Name", "Referenced", "Motor On", "Limits", "Position"]
|
||||
field_names = [
|
||||
"Axis",
|
||||
"Name",
|
||||
"Referenced",
|
||||
"Motor On",
|
||||
"Limits",
|
||||
"Position",
|
||||
]
|
||||
# in case of OMNY
|
||||
if self.sock.host == "mpc3217.psi.ch":
|
||||
field_names.append("Temperature")
|
||||
@@ -279,7 +286,7 @@ class GalilController(Controller):
|
||||
axis = self._axis[ax]
|
||||
if axis is not None:
|
||||
if self.sock.host == "mpc3217.psi.ch":
|
||||
# case of omny. possibly consider moving to ogalil
|
||||
#case of omny. possibly consider moving to ogalil
|
||||
motor_on = self.is_motor_on(axis.axis_Id)
|
||||
if motor_on == True:
|
||||
motor_on = self.WARNING + "ON" + self.ENDC
|
||||
@@ -292,7 +299,7 @@ class GalilController(Controller):
|
||||
else:
|
||||
folerr_status = "False"
|
||||
position = axis.readback.read().get(axis.name).get("value")
|
||||
position = f"{position:.3f}"
|
||||
position = f'{position:.3f}'
|
||||
t.add_row(
|
||||
[
|
||||
f"{axis.axis_Id_numeric}/{axis.axis_Id}",
|
||||
@@ -323,6 +330,8 @@ class GalilController(Controller):
|
||||
self.show_running_threads()
|
||||
self.show_status_other()
|
||||
|
||||
|
||||
|
||||
def show_status_other(self) -> None:
|
||||
"""
|
||||
Show additional device-specific status information.
|
||||
@@ -411,7 +420,7 @@ class GalilSetpointSignal(GalilSignalBase):
|
||||
while self.controller.is_thread_active(0):
|
||||
time.sleep(0.1)
|
||||
|
||||
# in the case of lamni, consider moving to lgalil
|
||||
#in the case of lamni, consider moving to lgalil
|
||||
if self.parent.axis_Id_numeric == 2 and self.controller.sock.host == "mpc2680.psi.ch":
|
||||
try:
|
||||
rt = self.parent.device_manager.devices[self.parent.rt]
|
||||
|
||||
@@ -73,7 +73,6 @@ class LamniGalilController(GalilController):
|
||||
air_off = bool(self.socket_put_and_receive("MG@OUT[13]"))
|
||||
return rt_not_blocked_by_galil and air_off
|
||||
|
||||
|
||||
class LamniGalilReadbackSignal(GalilSignalRO):
|
||||
@retry_once
|
||||
@threadlocked
|
||||
@@ -100,7 +99,6 @@ class LamniGalilReadbackSignal(GalilSignalRO):
|
||||
logger.warning("Failed to set RT value during readback.")
|
||||
return val
|
||||
|
||||
|
||||
class LamniGalilMotor(Device, PositionerBase):
|
||||
USER_ACCESS = ["controller", "drive_axis_to_limit", "find_reference"]
|
||||
readback = Cpt(LamniGalilReadbackSignal, signal_name="readback", kind="hinted")
|
||||
@@ -134,7 +132,7 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = LamniGalilController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
@@ -170,10 +168,6 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
@@ -298,7 +292,7 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
Find the reference of the axis.
|
||||
"""
|
||||
self.controller.find_reference(self.axis_Id_numeric)
|
||||
# now force position read to cache
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
@@ -307,10 +301,10 @@ class LamniGalilMotor(Device, PositionerBase):
|
||||
Drive an axis to the limit in a specified direction.
|
||||
|
||||
Args:
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
"""
|
||||
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction)
|
||||
# now force position read to cache
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ class GalilMotorResolution(GalilSignalRO):
|
||||
@threadlocked
|
||||
def _socket_get(self):
|
||||
if self.controller.sock.port == 8083 and self.parent.axis_Id_numeric == 2:
|
||||
# rotation stage
|
||||
# rotation stage
|
||||
return 89565.8666667
|
||||
else:
|
||||
return 51200
|
||||
@@ -69,43 +69,37 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
|
||||
current_pos = float(self.controller.socket_put_and_receive(f"TP{self.parent.axis_Id}"))
|
||||
current_pos *= self.parent.sign
|
||||
step_mm = self.parent.motor_resolution.get()
|
||||
# here we introduce an offset of 25 to the rotation axis
|
||||
# when setting a position this is taken into account in the controller
|
||||
# that way we just do tomography from 0 to 180 degrees
|
||||
#here we introduce an offset of 25 to the rotation axis
|
||||
#when setting a position this is taken into account in the controller
|
||||
#that way we just do tomography from 0 to 180 degrees
|
||||
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
|
||||
return (current_pos / step_mm) + 25
|
||||
return (current_pos / step_mm)+25
|
||||
else:
|
||||
return current_pos / step_mm
|
||||
|
||||
def read(self):
|
||||
self._metadata["timestamp"] = time.time()
|
||||
val = super().read()
|
||||
|
||||
# if reading rotation stage angle
|
||||
|
||||
#if reading rotation stage angle
|
||||
if self.parent.axis_Id_numeric == 2 and self.controller.sock.port == 8083:
|
||||
current_readback_value = val[self.parent.name]["value"]
|
||||
# print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
|
||||
#print (f"previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}.")
|
||||
|
||||
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
|
||||
if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
|
||||
message = f"Glitch detected in rotation stage. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}."
|
||||
print(message)
|
||||
self.parent.device_manager.connector.send_client_info(
|
||||
message, scope="glitch detector", show_asap=True
|
||||
)
|
||||
|
||||
self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
|
||||
|
||||
val = super().read()
|
||||
current_readback_value = val[self.parent.name]["value"]
|
||||
|
||||
if np.fabs((self.previous_rotation_angle - current_readback_value) > 10):
|
||||
if np.fabs((self.previous_rotation_angle-current_readback_value)>10):
|
||||
message = f"Glitch detected in rotation stage second read. Previous rotation angle {self.previous_rotation_angle}, current readback {current_readback_value}. Disabling the controller."
|
||||
print(message)
|
||||
self.parent.device_manager.connector.send_client_info(
|
||||
message, scope="glitch detector", show_asap=True
|
||||
)
|
||||
|
||||
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed(
|
||||
"allaxref=0"
|
||||
)
|
||||
self.parent.device_manager.connector.send_client_info(message, scope="glitch detector", show_asap=True)
|
||||
|
||||
self.parent.device_manager.devices["osamroy"].obj.controller.socket_put_confirmed("allaxref=0")
|
||||
self.parent.device_manager.devices["osamroy"].obj.enabled = False
|
||||
|
||||
return val
|
||||
@@ -114,12 +108,13 @@ class OMNYGalilReadbackSignal(GalilSignalRO):
|
||||
try:
|
||||
rt = self.parent.device_manager.devices["rtx"]
|
||||
if rt.enabled:
|
||||
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"] - 25 + 54)
|
||||
rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"]-25+54)
|
||||
except KeyError:
|
||||
logger.warning("Failed to set RT value during ogalil readback.")
|
||||
logger.warning("Failed to set RT value during ogalil readback.")
|
||||
return val
|
||||
|
||||
|
||||
|
||||
class OMNYGalilController(GalilController):
|
||||
USER_ACCESS = [
|
||||
"describe",
|
||||
@@ -137,18 +132,18 @@ class OMNYGalilController(GalilController):
|
||||
"_ogalil_folerr_not_ignore",
|
||||
]
|
||||
|
||||
OKBLUE = "\033[94m"
|
||||
OKCYAN = "\033[96m"
|
||||
OKGREEN = "\033[92m"
|
||||
WARNING = "\033[93m"
|
||||
FAIL = "\033[91m"
|
||||
ENDC = "\033[0m"
|
||||
|
||||
def on(self, timeout: int = 10) -> None:
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
|
||||
def on(self) -> None:
|
||||
"""Open a new socket connection to the controller"""
|
||||
self._ogalil_switchsocket_switch_all_on()
|
||||
time.sleep(0.3)
|
||||
super().on(timeout=timeout)
|
||||
super().on()
|
||||
|
||||
def _ogalil_switchsocket(self, number: int, switch: bool):
|
||||
# number is socket number ranging from 1 to 4
|
||||
@@ -190,16 +185,15 @@ class OMNYGalilController(GalilController):
|
||||
self.socket_put_confirmed("IgNoFol=1")
|
||||
self.socket_put_confirmed("XQ#STOP,1")
|
||||
|
||||
def _ogalil_set_axis_to_pos_wo_reference_search(
|
||||
self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign
|
||||
):
|
||||
|
||||
def _ogalil_set_axis_to_pos_wo_reference_search(self, axis_id_numeric, axis_id, pos_mm, motor_resolution, motor_sign):
|
||||
|
||||
self.socket_put_confirmed("IgNoFol=1")
|
||||
|
||||
# pos_mm = pos_encoder / motor_resolution
|
||||
pos_encoder = pos_mm * motor_resolution * motor_sign
|
||||
# print(motor_resolution)
|
||||
|
||||
#print(motor_resolution)
|
||||
|
||||
self.socket_put_confirmed(f"DE{axis_id}={pos_encoder:.0f}")
|
||||
self.socket_put_confirmed(f"DP{axis_id}=_TP{axis_id}*ratio[{axis_id_numeric:.0f}]")
|
||||
|
||||
@@ -209,6 +203,7 @@ class OMNYGalilController(GalilController):
|
||||
|
||||
self._ogalil_folerr_not_ignore()
|
||||
|
||||
|
||||
def _ogalil_folerr_not_ignore(self):
|
||||
self.socket_put_confirmed("IgNoFol=0")
|
||||
|
||||
@@ -245,18 +240,7 @@ class OMNYGalilController(GalilController):
|
||||
|
||||
|
||||
class OMNYGalilMotor(Device, PositionerBase):
|
||||
USER_ACCESS = [
|
||||
"controller",
|
||||
"find_reference",
|
||||
"omny_osamx_to_scan_center",
|
||||
"drive_axis_to_limit",
|
||||
"_ogalil_folerr_reset_and_ignore",
|
||||
"_ogalil_set_axis_to_pos_wo_reference_search",
|
||||
"get_motor_limit_switch",
|
||||
"axis_is_referenced",
|
||||
"get_motor_temperature",
|
||||
"folerr_status",
|
||||
]
|
||||
USER_ACCESS = ["controller", "find_reference", "omny_osamx_to_scan_center", "drive_axis_to_limit", "_ogalil_folerr_reset_and_ignore", "_ogalil_set_axis_to_pos_wo_reference_search", "get_motor_limit_switch", "axis_is_referenced", "get_motor_temperature", "folerr_status"]
|
||||
readback = Cpt(OMNYGalilReadbackSignal, signal_name="readback", kind="hinted")
|
||||
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
|
||||
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
|
||||
@@ -288,7 +272,7 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = OMNYGalilController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
@@ -324,10 +308,6 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
@@ -453,10 +433,8 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
|
||||
def _ogalil_set_axis_to_pos_wo_reference_search(self, pos_mm):
|
||||
motor_resolution = self.motor_resolution.get()
|
||||
self.controller._ogalil_set_axis_to_pos_wo_reference_search(
|
||||
self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign
|
||||
)
|
||||
# now force position read to cache
|
||||
self.controller._ogalil_set_axis_to_pos_wo_reference_search(self.axis_Id_numeric, self.axis_Id, pos_mm, motor_resolution, self.sign)
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
@@ -464,9 +442,9 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
"""
|
||||
Find the reference of the axis.
|
||||
"""
|
||||
verbose = 1
|
||||
verbose=1
|
||||
self.controller.find_reference(self.axis_Id_numeric, verbose, raise_error)
|
||||
# now force position read to cache
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
@@ -475,10 +453,10 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
Drive an axis to the limit in a specified direction.
|
||||
|
||||
Args:
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
direction (str): Direction in which the axis should be driven to the limit. Either 'forward' or 'reverse'.
|
||||
"""
|
||||
self.controller.drive_axis_to_limit(self.axis_Id_numeric, direction, verbose=1)
|
||||
# now force position read to cache
|
||||
#now force position read to cache
|
||||
val = self.readback.read()
|
||||
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
|
||||
|
||||
@@ -509,31 +487,29 @@ class OMNYGalilMotor(Device, PositionerBase):
|
||||
def omny_osamx_to_scan_center(self, cenx):
|
||||
if self.controller.sock.port == 8082 and self.axis_Id_numeric == 0:
|
||||
# get last setpoint
|
||||
osamx = self.device_manager.devices["osamx"]
|
||||
osamx_current_setpoint = osamx.obj.readback.get()
|
||||
omny_samx_in = self._get_user_param_safe("osamx", "in")
|
||||
if np.fabs(osamx_current_setpoint - (omny_samx_in + cenx / 1000)) > 0.025:
|
||||
message = (
|
||||
f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
|
||||
)
|
||||
logger.info(message)
|
||||
osamx = self.device_manager.devices["osamx"]
|
||||
osamx_current_setpoint = osamx.obj.readback.get()
|
||||
omny_samx_in = self._get_user_param_safe("osamx","in")
|
||||
if np.fabs(osamx_current_setpoint-(omny_samx_in+cenx/1000)) > 0.025:
|
||||
message=f"Moving osamx to scan center. new osamx target {omny_samx_in+cenx/1000:.3f}."
|
||||
logger.info(message)
|
||||
|
||||
osamx.read_only = False
|
||||
# osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
|
||||
osamx.set(omny_samx_in + cenx / 1000)
|
||||
time.sleep(0.1)
|
||||
while osamx.motor_is_moving.get():
|
||||
time.sleep(0.05)
|
||||
osamx.read_only = True
|
||||
time.sleep(2)
|
||||
rt = self.device_manager.devices["rtx"]
|
||||
if rt.enabled:
|
||||
rt.obj.controller.laser_tracker_on()
|
||||
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
|
||||
osamx.read_only = False
|
||||
#osamx.controller.("osamx", "controller.socket_put_confirmed('axspeed[0]=1000')")
|
||||
osamx.set(omny_samx_in+cenx/1000)
|
||||
time.sleep(0.1)
|
||||
while(osamx.motor_is_moving.get()):
|
||||
time.sleep(0.05)
|
||||
osamx.read_only = True
|
||||
time.sleep(2)
|
||||
rt = self.device_manager.devices["rtx"]
|
||||
if rt.enabled:
|
||||
rt.obj.controller.laser_tracker_on()
|
||||
rt.obj.controller.laser_tracker_check_and_wait_for_signalstrength()
|
||||
|
||||
def folerr_status(self) -> bool:
|
||||
return self.controller.folerr_status(self.axis_Id_numeric)
|
||||
|
||||
|
||||
def stop(self, *, success=False):
|
||||
self.controller.stop_all_axes()
|
||||
return super().stop(success=success)
|
||||
|
||||
@@ -52,12 +52,33 @@ class GalilController(Controller):
|
||||
"fly_grid_scan",
|
||||
"read_encoder_position",
|
||||
]
|
||||
_axes_per_controller = 8
|
||||
|
||||
def on(self, timeout: int = 10) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name="GalilController",
|
||||
kind=None,
|
||||
parent=None,
|
||||
socket=None,
|
||||
attr_name="",
|
||||
labels=None,
|
||||
):
|
||||
if not hasattr(self, "_initialized") or not self._initialized:
|
||||
self._galil_axis_per_controller = 8
|
||||
self._axis = [None for axis_num in range(self._galil_axis_per_controller)]
|
||||
super().__init__(
|
||||
name=name,
|
||||
socket=socket,
|
||||
attr_name=attr_name,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
kind=kind,
|
||||
)
|
||||
|
||||
def on(self, controller_num=0) -> None:
|
||||
"""Open a new socket connection to the controller"""
|
||||
if not self.connected:
|
||||
self.sock.open(timeout=timeout)
|
||||
self.sock.open()
|
||||
self.connected = True
|
||||
else:
|
||||
logger.info("The connection has already been established.")
|
||||
@@ -144,11 +165,11 @@ class GalilController(Controller):
|
||||
def show_running_threads(self) -> None:
|
||||
t = PrettyTable()
|
||||
t.title = f"Threads on {self.sock.host}:{self.sock.port}"
|
||||
t.field_names = [str(ax) for ax in range(self._axes_per_controller)]
|
||||
t.field_names = [str(ax) for ax in range(self._galil_axis_per_controller)]
|
||||
t.add_row(
|
||||
[
|
||||
"active" if self.is_thread_active(t) else "inactive"
|
||||
for t in range(self._axes_per_controller)
|
||||
for t in range(self._galil_axis_per_controller)
|
||||
]
|
||||
)
|
||||
print(t)
|
||||
@@ -178,7 +199,7 @@ class GalilController(Controller):
|
||||
"Limits",
|
||||
"Position",
|
||||
]
|
||||
for ax in range(self._axes_per_controller):
|
||||
for ax in range(self._galil_axis_per_controller):
|
||||
axis = self._axis[ax]
|
||||
if axis is not None:
|
||||
t.add_row(
|
||||
@@ -495,9 +516,7 @@ class SGalilMotor(Device, PositionerBase):
|
||||
):
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
self.controller = GalilController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
)
|
||||
self.controller = GalilController(socket=socket_cls(host=host, port=port))
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
self.tolerance = kwargs.pop("tolerance", 0.5)
|
||||
self.device_mapping = kwargs.pop("device_mapping", {})
|
||||
@@ -530,10 +549,6 @@ class SGalilMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
|
||||
@@ -37,86 +37,84 @@ class OMNYSampleStorage(Device):
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
sample_shuttle_A_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {"auto_monitor": True}) for i in range(1, 7)
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}", {}) for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_A_placed = Dcpt(sample_shuttle_A_placed)
|
||||
|
||||
sample_shuttle_B_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {"auto_monitor": True}) for i in range(1, 7)
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}", {}) for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_B_placed = Dcpt(sample_shuttle_B_placed)
|
||||
|
||||
sample_shuttle_C_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
|
||||
|
||||
sample_shuttle_C_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {"auto_monitor": True}) for i in range(1, 7)
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}", {}) for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_C_placed = Dcpt(sample_shuttle_C_placed)
|
||||
|
||||
parking_placed = {
|
||||
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {"auto_monitor": True}) for i in range(1, 7)
|
||||
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}", {}) for i in range(1, 7)
|
||||
}
|
||||
parking_placed = Dcpt(parking_placed)
|
||||
|
||||
sample_placed = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {"auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
|
||||
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
|
||||
}
|
||||
sample_placed = Dcpt(sample_placed)
|
||||
|
||||
sample_shuttle_A_names = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True, "auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_A:{i}.DESC", {"string": True})
|
||||
for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_A_names = Dcpt(sample_shuttle_A_names)
|
||||
|
||||
sample_shuttle_B_names = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True, "auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_B:{i}.DESC", {"string": True})
|
||||
for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_B_names = Dcpt(sample_shuttle_B_names)
|
||||
|
||||
sample_shuttle_C_names = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True, "auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_shuttle_C:{i}.DESC", {"string": True})
|
||||
for i in range(1, 7)
|
||||
}
|
||||
sample_shuttle_C_names = Dcpt(sample_shuttle_C_names)
|
||||
|
||||
parking_names = {
|
||||
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True, "auto_monitor": True})
|
||||
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_parking:{i}.DESC", {"string": True})
|
||||
for i in range(1, 7)
|
||||
}
|
||||
parking_names = Dcpt(parking_names)
|
||||
|
||||
sample_names = {
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True, "auto_monitor": True})
|
||||
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}.DESC", {"string": True})
|
||||
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
|
||||
}
|
||||
sample_names = Dcpt(sample_names)
|
||||
|
||||
sample_in_gripper = Cpt(
|
||||
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL", auto_monitor=True
|
||||
EpicsSignal, name="sample_in_gripper", read_pv="XOMNY-SAMPLE_DB_omny:110.VAL"
|
||||
)
|
||||
sample_in_gripper_name = Cpt(
|
||||
EpicsSignal,
|
||||
name="sample_in_gripper_name",
|
||||
read_pv="XOMNY-SAMPLE_DB_omny:110.DESC",
|
||||
string=True,
|
||||
auto_monitor=True
|
||||
)
|
||||
|
||||
sample_in_samplestage = Cpt(
|
||||
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL", auto_monitor=True
|
||||
EpicsSignal, name="sample_in_samplestage", read_pv="XOMNY-SAMPLE_DB_omny:0.VAL"
|
||||
)
|
||||
sample_in_samplestage_name = Cpt(
|
||||
EpicsSignal,
|
||||
name="sample_in_samplestage_name",
|
||||
read_pv="XOMNY-SAMPLE_DB_omny:0.DESC",
|
||||
string=True,
|
||||
auto_monitor=True
|
||||
)
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
|
||||
@@ -57,7 +57,6 @@ class RtFlomniController(Controller):
|
||||
socket_cls=None,
|
||||
socket_host=None,
|
||||
socket_port=None,
|
||||
device_manager=None,
|
||||
attr_name="",
|
||||
parent=None,
|
||||
labels=None,
|
||||
@@ -68,7 +67,6 @@ class RtFlomniController(Controller):
|
||||
socket_cls=socket_cls,
|
||||
socket_host=socket_host,
|
||||
socket_port=socket_port,
|
||||
device_manager=device_manager,
|
||||
attr_name=attr_name,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
@@ -128,15 +126,15 @@ class RtFlomniController(Controller):
|
||||
while not self.slew_rate_limiters_on_target() or np.abs(self.pid_y()) > 0.1:
|
||||
time.sleep(0.05)
|
||||
|
||||
self.device_manager.devices.rty.update_user_parameter({"tomo_additional_offsety": 0})
|
||||
self.get_device_manager().devices.rty.update_user_parameter({"tomo_additional_offsety": 0})
|
||||
self.clear_trajectory_generator()
|
||||
|
||||
self.laser_tracker_on()
|
||||
|
||||
# move to 0. FUPR will set the rotation angle during readout
|
||||
self.device_manager.devices.fsamroy.obj.move(0, wait=True)
|
||||
self.get_device_manager().devices.fsamroy.obj.move(0, wait=True)
|
||||
|
||||
fsamx = self.device_manager.devices.fsamx
|
||||
fsamx = self.get_device_manager().devices.fsamx
|
||||
|
||||
fsamx.obj.pid_x_correction = 0
|
||||
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
|
||||
@@ -166,18 +164,18 @@ class RtFlomniController(Controller):
|
||||
self.show_cyclic_error_compensation()
|
||||
|
||||
self.rt_pid_voltage = self.get_pid_x()
|
||||
rtx = self.device_manager.devices.rtx
|
||||
rtx = self.get_device_manager().devices.rtx
|
||||
rtx.update_user_parameter({"rt_pid_voltage": self.rt_pid_voltage})
|
||||
|
||||
self.set_device_read_write("fsamx", False)
|
||||
self.set_device_read_write("fsamy", False)
|
||||
self.set_device_read_write("foptx", False)
|
||||
self.set_device_read_write("fopty", False)
|
||||
self.set_device_enabled("fsamx", False)
|
||||
self.set_device_enabled("fsamy", False)
|
||||
self.set_device_enabled("foptx", False)
|
||||
self.set_device_enabled("fopty", False)
|
||||
|
||||
def move_samx_to_scan_region(self, fovx: float, cenx: float):
|
||||
time.sleep(0.05)
|
||||
if self.rt_pid_voltage is None:
|
||||
rtx = self.device_manager.devices.rtx
|
||||
rtx = self.get_device_manager().devices.rtx
|
||||
self.rt_pid_voltage = rtx.user_parameter.get("rt_pid_voltage")
|
||||
if self.rt_pid_voltage is None:
|
||||
raise RtError(
|
||||
@@ -194,7 +192,7 @@ class RtFlomniController(Controller):
|
||||
break
|
||||
wait_on_exit = True
|
||||
self.socket_put("v0")
|
||||
fsamx = self.device_manager.devices.fsamx
|
||||
fsamx = self.get_device_manager().devices.fsamx
|
||||
fsamx.read_only = False
|
||||
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]")
|
||||
fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.007
|
||||
@@ -225,22 +223,22 @@ class RtFlomniController(Controller):
|
||||
print("Feedback is not running; likely an error in the interferometer.")
|
||||
raise RtError("Feedback is not running; likely an error in the interferometer.")
|
||||
|
||||
self.set_device_read_write("fsamx", False)
|
||||
self.set_device_read_write("fsamy", False)
|
||||
self.set_device_read_write("foptx", False)
|
||||
self.set_device_read_write("fopty", False)
|
||||
self.set_device_enabled("fsamx", False)
|
||||
self.set_device_enabled("fsamy", False)
|
||||
self.set_device_enabled("foptx", False)
|
||||
self.set_device_enabled("fopty", False)
|
||||
|
||||
def feedback_disable(self):
|
||||
self.clear_trajectory_generator()
|
||||
self.move_to_zero()
|
||||
self.socket_put("l0")
|
||||
|
||||
self.set_device_read_write("fsamx", True)
|
||||
self.set_device_read_write("fsamy", True)
|
||||
self.set_device_read_write("foptx", True)
|
||||
self.set_device_read_write("fopty", True)
|
||||
self.set_device_enabled("fsamx", True)
|
||||
self.set_device_enabled("fsamy", True)
|
||||
self.set_device_enabled("foptx", True)
|
||||
self.set_device_enabled("fopty", True)
|
||||
|
||||
fsamx = self.device_manager.devices.fsamx
|
||||
fsamx = self.get_device_manager().devices.fsamx
|
||||
fsamx.obj.controller.socket_put_confirmed("axspeed[4]=025*stppermm[4]")
|
||||
print("rt feedback is now disalbed.")
|
||||
|
||||
@@ -291,8 +289,12 @@ class RtFlomniController(Controller):
|
||||
self.socket_put("T1")
|
||||
time.sleep(0.5)
|
||||
|
||||
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackyct=0")
|
||||
self.device_manager.devices.ftrackz.obj.controller.socket_put_confirmed("trackzct=0")
|
||||
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
|
||||
"trackyct=0"
|
||||
)
|
||||
self.get_device_manager().devices.ftrackz.obj.controller.socket_put_confirmed(
|
||||
"trackzct=0"
|
||||
)
|
||||
|
||||
self.laser_tracker_wait_on_target()
|
||||
logger.info("Laser tracker running!")
|
||||
@@ -339,7 +341,7 @@ class RtFlomniController(Controller):
|
||||
}
|
||||
|
||||
def laser_tracker_galil_enable(self):
|
||||
ftrackz_con = self.device_manager.devices.ftrackz.obj.controller
|
||||
ftrackz_con = self.get_device_manager().devices.ftrackz.obj.controller
|
||||
ftrackz_con.socket_put_confirmed("tracken=1")
|
||||
ftrackz_con.socket_put_confirmed("trackyct=0")
|
||||
ftrackz_con.socket_put_confirmed("trackzct=0")
|
||||
@@ -387,12 +389,9 @@ class RtFlomniController(Controller):
|
||||
self.laser_tracker_wait_on_target()
|
||||
|
||||
signal = self.read_ssi_interferometer(1)
|
||||
rtx = self.device_manager.devices.rtx
|
||||
rtx = self.get_device_manager().devices.rtx
|
||||
min_signal = rtx.user_parameter.get("min_signal")
|
||||
low_signal = rtx.user_parameter.get("low_signal")
|
||||
print(f"low signal: {low_signal}")
|
||||
print(f"min signal: {min_signal}")
|
||||
print(f"signal: {signal}")
|
||||
if signal < min_signal:
|
||||
time.sleep(1)
|
||||
if signal < min_signal:
|
||||
@@ -476,6 +475,12 @@ class RtFlomniController(Controller):
|
||||
current_position_in_scan = int(float(return_table[2]))
|
||||
return (mode, number_of_positions_planned, current_position_in_scan)
|
||||
|
||||
def get_device_manager(self):
|
||||
for axis in self._axis:
|
||||
if hasattr(axis, "device_manager") and axis.device_manager:
|
||||
return axis.device_manager
|
||||
raise BECConfigError("Could not access the device_manager")
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
|
||||
@@ -490,7 +495,7 @@ class RtFlomniController(Controller):
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
@@ -525,7 +530,7 @@ class RtFlomniController(Controller):
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
@@ -539,7 +544,7 @@ class RtFlomniController(Controller):
|
||||
)
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
self.get_device_manager().connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_flomni"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
@@ -650,7 +655,7 @@ class RtFlomniMotor(Device, PositionerBase):
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
self.controller = RtFlomniController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
self.device_manager = device_manager
|
||||
@@ -678,10 +683,6 @@ class RtFlomniMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
@@ -809,7 +810,7 @@ class RtFlomniMotor(Device, PositionerBase):
|
||||
|
||||
if __name__ == "__main__":
|
||||
rtcontroller = RtFlomniController(
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
|
||||
)
|
||||
rtcontroller.on()
|
||||
rtcontroller.laser_tracker_on()
|
||||
|
||||
@@ -71,7 +71,6 @@ class RtLamniController(Controller):
|
||||
socket_cls=None,
|
||||
socket_host=None,
|
||||
socket_port=None,
|
||||
device_manager=None,
|
||||
attr_name="",
|
||||
parent=None,
|
||||
labels=None,
|
||||
@@ -82,7 +81,6 @@ class RtLamniController(Controller):
|
||||
socket_cls=socket_cls,
|
||||
socket_host=socket_host,
|
||||
socket_port=socket_port,
|
||||
device_manager=device_manager,
|
||||
attr_name=attr_name,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
@@ -94,11 +92,11 @@ class RtLamniController(Controller):
|
||||
def feedback_disable(self):
|
||||
self.socket_put("J0")
|
||||
logger.info("LamNI Feedback disabled.")
|
||||
self.set_device_read_write("lsamx", True)
|
||||
self.set_device_read_write("lsamy", True)
|
||||
self.set_device_read_write("loptx", True)
|
||||
self.set_device_read_write("lopty", True)
|
||||
self.set_device_read_write("loptz", True)
|
||||
self.set_device_enabled("lsamx", True)
|
||||
self.set_device_enabled("lsamy", True)
|
||||
self.set_device_enabled("loptx", True)
|
||||
self.set_device_enabled("lopty", True)
|
||||
self.set_device_enabled("loptz", True)
|
||||
|
||||
def is_axis_moving(self, axis_Id) -> bool:
|
||||
# this checks that axis is on target
|
||||
@@ -152,25 +150,25 @@ class RtLamniController(Controller):
|
||||
# set these as closed loop target position
|
||||
self.socket_put(f"pa0,{x_curr:.4f}")
|
||||
self.socket_put(f"pa1,{y_curr:.4f}")
|
||||
self.device_manager.devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
|
||||
self.device_manager.devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
|
||||
self.get_device_manager().devices.rtx.obj.user_setpoint.set_with_feedback_disabled(x_curr)
|
||||
self.get_device_manager().devices.rty.obj.user_setpoint.set_with_feedback_disabled(y_curr)
|
||||
self.socket_put("J5")
|
||||
logger.info("LamNI Feedback enabled (without reset).")
|
||||
self.set_device_read_write("lsamx", False)
|
||||
self.set_device_read_write("lsamy", False)
|
||||
self.set_device_read_write("loptx", False)
|
||||
self.set_device_read_write("lopty", False)
|
||||
self.set_device_read_write("loptz", False)
|
||||
self.set_device_enabled("lsamx", False)
|
||||
self.set_device_enabled("lsamy", False)
|
||||
self.set_device_enabled("loptx", False)
|
||||
self.set_device_enabled("lopty", False)
|
||||
self.set_device_enabled("loptz", False)
|
||||
|
||||
@threadlocked
|
||||
def feedback_disable_and_even_reset_lamni_angle_interferometer(self):
|
||||
self.socket_put("J6")
|
||||
logger.info("LamNI Feedback disabled including the angular interferometer.")
|
||||
self.set_device_read_write("lsamx", True)
|
||||
self.set_device_read_write("lsamy", True)
|
||||
self.set_device_read_write("loptx", True)
|
||||
self.set_device_read_write("lopty", True)
|
||||
self.set_device_read_write("loptz", True)
|
||||
self.set_device_enabled("lsamx", True)
|
||||
self.set_device_enabled("lsamy", True)
|
||||
self.set_device_enabled("loptx", True)
|
||||
self.set_device_enabled("lopty", True)
|
||||
self.set_device_enabled("loptz", True)
|
||||
|
||||
@threadlocked
|
||||
def clear_trajectory_generator(self):
|
||||
@@ -286,7 +284,7 @@ class RtLamniController(Controller):
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
@@ -321,7 +319,7 @@ class RtLamniController(Controller):
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
@@ -333,7 +331,7 @@ class RtLamniController(Controller):
|
||||
)
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
self.get_device_manager().connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_lamni"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
@@ -368,10 +366,10 @@ class RtLamniController(Controller):
|
||||
) # we set all three outputs of the traj. gen. although in LamNI case only 0,1 are used
|
||||
self.clear_trajectory_generator()
|
||||
|
||||
self.device_manager.devices.lsamrot.obj.move(0, wait=True)
|
||||
self.get_device_manager().devices.lsamrot.obj.move(0, wait=True)
|
||||
|
||||
galil_controller_rt_status = (
|
||||
self.device_manager.devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled()
|
||||
self.get_device_manager().devices.lsamx.obj.controller.lgalil_is_air_off_and_orchestra_enabled()
|
||||
)
|
||||
|
||||
if galil_controller_rt_status == 0:
|
||||
@@ -384,16 +382,16 @@ class RtLamniController(Controller):
|
||||
|
||||
time.sleep(0.03)
|
||||
|
||||
lsamx_user_params = self.device_manager.devices.lsamx.user_parameter
|
||||
lsamx_user_params = self.get_device_manager().devices.lsamx.user_parameter
|
||||
if lsamx_user_params is None or lsamx_user_params.get("center") is None:
|
||||
raise RuntimeError("lsamx center is not defined")
|
||||
lsamy_user_params = self.device_manager.devices.lsamy.user_parameter
|
||||
lsamy_user_params = self.get_device_manager().devices.lsamy.user_parameter
|
||||
if lsamy_user_params is None or lsamy_user_params.get("center") is None:
|
||||
raise RuntimeError("lsamy center is not defined")
|
||||
lsamx_center = lsamx_user_params.get("center")
|
||||
lsamy_center = lsamy_user_params.get("center")
|
||||
self.device_manager.devices.lsamx.obj.move(lsamx_center, wait=True)
|
||||
self.device_manager.devices.lsamy.obj.move(lsamy_center, wait=True)
|
||||
self.get_device_manager().devices.lsamx.obj.move(lsamx_center, wait=True)
|
||||
self.get_device_manager().devices.lsamy.obj.move(lsamy_center, wait=True)
|
||||
self.socket_put("J1")
|
||||
|
||||
_waitforfeedbackctr = 0
|
||||
@@ -407,11 +405,11 @@ class RtLamniController(Controller):
|
||||
(self.socket_put_and_receive("J2")).split(",")[0]
|
||||
)
|
||||
|
||||
self.set_device_read_write("lsamx", False)
|
||||
self.set_device_read_write("lsamy", False)
|
||||
self.set_device_read_write("loptx", False)
|
||||
self.set_device_read_write("lopty", False)
|
||||
self.set_device_read_write("loptz", False)
|
||||
self.set_device_enabled("lsamx", False)
|
||||
self.set_device_enabled("lsamy", False)
|
||||
self.set_device_enabled("loptx", False)
|
||||
self.set_device_enabled("lopty", False)
|
||||
self.set_device_enabled("loptz", False)
|
||||
|
||||
if interferometer_feedback_not_running == 1:
|
||||
logger.error(
|
||||
@@ -561,7 +559,7 @@ class RtLamniMotor(Device, PositionerBase):
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
self.controller = RtLamniController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
self.device_manager = device_manager
|
||||
@@ -588,10 +586,6 @@ class RtLamniMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import builtins
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
import builtins
|
||||
import socket
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger, messages
|
||||
@@ -34,15 +34,12 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import (
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class RtOMNY_mirror_switchbox_Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RtOMNY_Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RtOMNYController(Controller):
|
||||
_axes_per_controller = 3
|
||||
red = "\x1b[91m"
|
||||
@@ -90,7 +87,6 @@ class RtOMNYController(Controller):
|
||||
socket_cls=None,
|
||||
socket_host=None,
|
||||
socket_port=None,
|
||||
device_manager=None,
|
||||
attr_name="",
|
||||
parent=None,
|
||||
labels=None,
|
||||
@@ -101,7 +97,6 @@ class RtOMNYController(Controller):
|
||||
socket_cls=socket_cls,
|
||||
socket_host=socket_host,
|
||||
socket_port=socket_port,
|
||||
device_manager=device_manager,
|
||||
attr_name=attr_name,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
@@ -239,7 +234,7 @@ class RtOMNYController(Controller):
|
||||
"opt_amplitude1_neg": 3000,
|
||||
"opt_amplitude2_pos": 3000,
|
||||
"opt_amplitude2_neg": 3000,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
# def is_axis_moving(self, axis_Id) -> bool:
|
||||
@@ -266,60 +261,42 @@ class RtOMNYController(Controller):
|
||||
|
||||
threading.Thread(target=send_positions, args=(self, positions), daemon=True).start()
|
||||
|
||||
def get_mirror_parameters(self, channel):
|
||||
def get_mirror_parameters(self,channel):
|
||||
return self.mirror_parameters[channel]
|
||||
|
||||
|
||||
def laser_tracker_check_and_wait_for_signalstrength(self):
|
||||
self.device_manager.connector.send_client_info(
|
||||
"Checking laser tracker...", scope="", show_asap=True
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info("Checking laser tracker...", scope="", show_asap=True)
|
||||
if not self.laser_tracker_check_enabled():
|
||||
print(
|
||||
"laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled."
|
||||
)
|
||||
print("laser_tracker_check_and_wait_for_signalstrength: The laser tracker is not even enabled.")
|
||||
return
|
||||
# first check on target
|
||||
#first check on target
|
||||
self.laser_tracker_wait_on_target()
|
||||
# when on target, check interferometer signal
|
||||
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
|
||||
rtx = self.device_manager.devices.rtx
|
||||
#when on target, check interferometer signal
|
||||
signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
|
||||
rtx = self.get_device_manager().devices.rtx
|
||||
min_signal = rtx.user_parameter.get("min_signal")
|
||||
low_signal = rtx.user_parameter.get("low_signal")
|
||||
wait_counter = 0
|
||||
while signal < min_signal and wait_counter < 10:
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...",
|
||||
scope="laser_tracker_check_and_wait_for_signalstrength",
|
||||
show_asap=True,
|
||||
)
|
||||
while signal < min_signal and wait_counter<10:
|
||||
self.get_device_manager().connector.send_client_info(f"The signal of the tracker {signal} is below the minimum required signal of {min_signal}. Waiting...", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
|
||||
|
||||
wait_counter += 1
|
||||
wait_counter+=1
|
||||
time.sleep(0.2)
|
||||
signal = self._omny_interferometer_get_signalsample("ssi_4", 0.1)
|
||||
signal = self._omny_interferometer_get_signalsample("ssi_4",0.1)
|
||||
|
||||
if signal < low_signal:
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m",
|
||||
scope="laser_tracker_check_and_wait_for_signalstrength",
|
||||
show_asap=True,
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info(f"\x1b[91mThe signal of the tracker {signal} is below the low limit of {low_signal}. Auto readjustment...\x1b[0m", scope="laser_tracker_check_and_wait_for_signalstrength", show_asap=True)
|
||||
|
||||
self.omny_interferometer_align_tracking()
|
||||
self.device_manager.connector.send_client_info(
|
||||
"Checking laser tracker completed.", scope="", show_asap=True
|
||||
)
|
||||
self.omny_interferometer_align_tracking()
|
||||
self.get_device_manager().connector.send_client_info("Checking laser tracker completed.", scope="", show_asap=True)
|
||||
|
||||
def omny_interferometer_align_tracking(self):
|
||||
mirror_channel = 6
|
||||
signal = self._omny_interferometer_get_signalsample(
|
||||
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
|
||||
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
|
||||
)
|
||||
mirror_channel=6
|
||||
signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
|
||||
|
||||
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
|
||||
print(
|
||||
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
|
||||
)
|
||||
print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
|
||||
else:
|
||||
self._omny_interferometer_switch_channel(mirror_channel)
|
||||
time.sleep(0.1)
|
||||
@@ -330,19 +307,16 @@ class RtOMNYController(Controller):
|
||||
self._omny_interferometer_switch_alloff()
|
||||
self.show_signal_strength_interferometer()
|
||||
|
||||
mirror_channel = -1
|
||||
mirror_channel=-1
|
||||
|
||||
|
||||
|
||||
def omny_interferometer_align_incoupling_angle(self):
|
||||
mirror_channel = 1
|
||||
signal = self._omny_interferometer_get_signalsample(
|
||||
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
|
||||
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
|
||||
)
|
||||
mirror_channel=1
|
||||
signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
|
||||
|
||||
if signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
|
||||
print(
|
||||
f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed."
|
||||
)
|
||||
print(f"Interferometer signal of axis {self.mirror_parameters[mirror_channel]['opt_mirrorname']} is good, no alignment needed.")
|
||||
else:
|
||||
self._omny_interferometer_switch_channel(mirror_channel)
|
||||
time.sleep(0.1)
|
||||
@@ -353,18 +327,19 @@ class RtOMNYController(Controller):
|
||||
self._omny_interferometer_switch_alloff()
|
||||
self.show_signal_strength_interferometer()
|
||||
|
||||
mirror_channel = -1
|
||||
mirror_channel=-1
|
||||
|
||||
|
||||
def _omny_interferometer_openloop_steps(self, channel, steps, amplitude):
|
||||
if channel not in range(3, 5):
|
||||
if channel not in range(3,5):
|
||||
raise RtOMNY_Error(f"invalid channel number {channel}.")
|
||||
|
||||
if amplitude > 4090:
|
||||
amplitude = 4090
|
||||
elif amplitude < 10:
|
||||
amplitude = 10
|
||||
|
||||
oshield = self.device_manager.devices.oshield
|
||||
|
||||
oshield = self.get_device_manager().devices.oshield
|
||||
|
||||
oshield.obj.controller.move_open_loop_steps(
|
||||
channel, steps, amplitude=amplitude, frequency=500
|
||||
@@ -376,7 +351,7 @@ class RtOMNYController(Controller):
|
||||
def _omny_interferometer_optimize(self, mirror_channel, channel):
|
||||
if mirror_channel == -1:
|
||||
raise RtOMNY_Error("no mirror channel selected")
|
||||
# mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
|
||||
#mirror channel is mirror number and channel is smaract channel, i.e. axis of the mirror
|
||||
if channel == 3:
|
||||
steps_pos = self.mirror_parameters[mirror_channel]["opt_steps1_pos"]
|
||||
steps_neg = self.mirror_parameters[mirror_channel]["opt_steps1_neg"]
|
||||
@@ -390,80 +365,67 @@ class RtOMNYController(Controller):
|
||||
else:
|
||||
raise RtOMNY_Error(f"invalid channel number {channel}.")
|
||||
|
||||
previous_signal = self._omny_interferometer_get_signalsample(
|
||||
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
|
||||
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
|
||||
)
|
||||
previous_signal = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
|
||||
|
||||
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
|
||||
min_begin = self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]
|
||||
if previous_signal < min_begin:
|
||||
# raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
|
||||
#raise OMNY_rt_clientError("error1") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
|
||||
print(f"\rMinimum signal for auto alignment {min_begin} not reached.")
|
||||
return
|
||||
elif previous_signal > self.mirror_parameters[mirror_channel]["opt_signal_stop"]:
|
||||
print(
|
||||
f"\rInterferometer signal of axis is good"
|
||||
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
|
||||
return
|
||||
print(f"\rInterferometer signal of axis is good") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} is good.")
|
||||
return
|
||||
else:
|
||||
direction = 1
|
||||
cycle_counter = 0
|
||||
cycle_max = 20
|
||||
reversal_counter = 0
|
||||
reversal_max = 4
|
||||
self.mirror_amplitutde_increase = 0
|
||||
cycle_counter=0
|
||||
cycle_max=20
|
||||
reversal_counter=0
|
||||
reversal_max=4
|
||||
self.mirror_amplitutde_increase=0
|
||||
|
||||
current_sample = self._omny_interferometer_get_signalsample(
|
||||
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
|
||||
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
|
||||
)
|
||||
max = current_sample
|
||||
current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
|
||||
max=current_sample
|
||||
|
||||
while (
|
||||
current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"]
|
||||
and cycle_counter < cycle_max
|
||||
and reversal_counter < reversal_max
|
||||
):
|
||||
while current_sample < self.mirror_parameters[mirror_channel]["opt_signal_stop"] and cycle_counter<cycle_max and reversal_counter < reversal_max:
|
||||
# if current_sample < self.mirror_parameters[mirror_channel]["opt_signal_min_begin"]:
|
||||
# raise OMNY_rt_clientError("error2") #(f"Minimum signal of axis {self.mirror_parameters[mirror_channel]["opt_mirrorname"]} to start alignment not present.")
|
||||
|
||||
if direction > 0:
|
||||
if direction>0:
|
||||
self._omny_interferometer_openloop_steps(channel, steps_pos, opt_amplitude_pos)
|
||||
verbose_str = f"channel {channel}, steps {steps_pos}"
|
||||
else:
|
||||
self._omny_interferometer_openloop_steps(channel, -steps_neg, opt_amplitude_neg)
|
||||
verbose_str = f"auto action {channel}, steps {-steps_pos}"
|
||||
# print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
|
||||
#print(f"Aligning axis ") #{self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
|
||||
|
||||
current_sample = self._omny_interferometer_get_signalsample(
|
||||
self.mirror_parameters[mirror_channel]["opt_signalchannel"],
|
||||
self.mirror_parameters[mirror_channel]["opt_averaging_time"],
|
||||
)
|
||||
current_sample = self._omny_interferometer_get_signalsample(self.mirror_parameters[mirror_channel]["opt_signalchannel"], self.mirror_parameters[mirror_channel]["opt_averaging_time"])
|
||||
|
||||
opt_mirrorname = self.mirror_parameters[mirror_channel]["opt_mirrorname"]
|
||||
|
||||
info_str = f"\rAuto aligning Channel {mirror_channel}, {opt_mirrorname}, Current signal: {current_sample:.0f}"
|
||||
message = info_str + " (q)uit \r"
|
||||
self.device_manager.connector.send_client_info(
|
||||
message, scope="_omny_interferometer_optimize", show_asap=True
|
||||
)
|
||||
message=info_str +" (q)uit \r"
|
||||
self.get_device_manager().connector.send_client_info(message, scope="_omny_interferometer_optimize", show_asap=True)
|
||||
|
||||
|
||||
|
||||
if previous_signal>current_sample:
|
||||
if direction<0:
|
||||
steps_pos=int(steps_pos/2)
|
||||
steps_neg=int(steps_neg/2)
|
||||
if steps_pos<1:
|
||||
steps_pos=1
|
||||
if steps_neg<1:
|
||||
steps_neg=1
|
||||
direction=direction*(-1)
|
||||
reversal_counter+=1
|
||||
previous_signal=current_sample
|
||||
cycle_counter+=1
|
||||
|
||||
print(f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r") # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
|
||||
|
||||
|
||||
|
||||
if previous_signal > current_sample:
|
||||
if direction < 0:
|
||||
steps_pos = int(steps_pos / 2)
|
||||
steps_neg = int(steps_neg / 2)
|
||||
if steps_pos < 1:
|
||||
steps_pos = 1
|
||||
if steps_neg < 1:
|
||||
steps_neg = 1
|
||||
direction = direction * (-1)
|
||||
reversal_counter += 1
|
||||
previous_signal = current_sample
|
||||
cycle_counter += 1
|
||||
|
||||
print(
|
||||
f"\r\nFinished aligning channel {channel} of mirror {mirror_channel}\n\r"
|
||||
) # {self.mirror_parameters[mirror_channel]["opt_mirrorname"]}. Target: {self.mirror_parameters[mirror_channel]["opt_signal_stop"]}, current {current_sample}")
|
||||
|
||||
def move_to_zero(self):
|
||||
self.socket_put("pa0,0")
|
||||
@@ -495,7 +457,7 @@ class RtOMNYController(Controller):
|
||||
if ret == 1:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def feedback_is_running(self) -> bool:
|
||||
self.feedback_get_status_and_ssi()
|
||||
interferometer_feedback_not_running = int(self.ssi["feedback_error"])
|
||||
@@ -504,9 +466,7 @@ class RtOMNYController(Controller):
|
||||
return True
|
||||
|
||||
def feedback_enable_with_reset(self):
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"Enabling the feedback...", scope="", show_asap=True
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info(f"Enabling the feedback...", scope="", show_asap=True)
|
||||
|
||||
self.socket_put("J0") # disable feedback
|
||||
time.sleep(0.01)
|
||||
@@ -525,16 +485,14 @@ class RtOMNYController(Controller):
|
||||
self.laser_tracker_on()
|
||||
time.sleep(0.01)
|
||||
|
||||
osamroy = self.device_manager.devices.osamroy
|
||||
osamroy = self.get_device_manager().devices.osamroy
|
||||
# the following read will also upate the angle in rt during readout
|
||||
readback = osamroy.obj.readback.get()
|
||||
if np.fabs(readback) > 0.1:
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"Rotating to zero", scope="feedback enable", show_asap=True
|
||||
)
|
||||
if (np.fabs(readback) > 0.1):
|
||||
self.get_device_manager().connector.send_client_info(f"Rotating to zero", scope="feedback enable", show_asap=True)
|
||||
osamroy.obj.move(0, wait=True)
|
||||
|
||||
osamx = self.device_manager.devices.osamx
|
||||
osamx = self.get_device_manager().devices.osamx
|
||||
|
||||
osamx_in = osamx.user_parameter.get("in")
|
||||
if not np.isclose(osamx.obj.readback.get(), osamx_in, atol=0.01):
|
||||
@@ -556,15 +514,16 @@ class RtOMNYController(Controller):
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
self.set_device_read_write("osamx", False)
|
||||
self.set_device_read_write("osamy", False)
|
||||
self.set_device_read_write("ofzpx", False)
|
||||
self.set_device_read_write("ofzpy", False)
|
||||
self.set_device_read_write("oosax", False)
|
||||
self.set_device_read_write("oosax", False)
|
||||
self.set_device_enabled("osamx", False)
|
||||
self.set_device_enabled("osamy", False)
|
||||
self.set_device_enabled("ofzpx", False)
|
||||
self.set_device_enabled("ofzpy", False)
|
||||
self.set_device_enabled("oosax", False)
|
||||
self.set_device_enabled("oosax", False)
|
||||
|
||||
print("Feedback is running.")
|
||||
|
||||
|
||||
@threadlocked
|
||||
def clear_trajectory_generator(self):
|
||||
self.socket_put("sc")
|
||||
@@ -575,15 +534,16 @@ class RtOMNYController(Controller):
|
||||
self.move_to_zero()
|
||||
self.socket_put("J0")
|
||||
|
||||
self.set_device_read_write("osamx", True)
|
||||
self.set_device_read_write("osamy", True)
|
||||
self.set_device_read_write("ofzpx", True)
|
||||
self.set_device_read_write("ofzpy", True)
|
||||
self.set_device_read_write("oosax", True)
|
||||
self.set_device_read_write("oosax", True)
|
||||
self.set_device_enabled("osamx", True)
|
||||
self.set_device_enabled("osamy", True)
|
||||
self.set_device_enabled("ofzpx", True)
|
||||
self.set_device_enabled("ofzpy", True)
|
||||
self.set_device_enabled("oosax", True)
|
||||
self.set_device_enabled("oosax", True)
|
||||
|
||||
print("rt feedback is now disabled.")
|
||||
|
||||
|
||||
def set_rotation_angle(self, val: float) -> None:
|
||||
self.socket_put(f"a{val/180*np.pi}")
|
||||
|
||||
@@ -618,13 +578,12 @@ class RtOMNYController(Controller):
|
||||
"enabled_z": bool(tracker_values[10]),
|
||||
}
|
||||
|
||||
|
||||
def laser_tracker_on(self):
|
||||
# update variables and enable if not yet active
|
||||
#update variables and enable if not yet active
|
||||
if not self.laser_tracker_check_enabled():
|
||||
logger.info("Enabling the laser tracker. Please wait...")
|
||||
self.device_manager.connector.send_client_info(
|
||||
f"Enabling the laser tracker. Please wait...", scope="", show_asap=True
|
||||
)
|
||||
self.get_device_manager().connector.send_client_info(f"Enabling the laser tracker. Please wait...", scope="", show_asap=True)
|
||||
|
||||
tracker_intensity = self.tracker_info["tracker_intensity"]
|
||||
if (
|
||||
@@ -639,13 +598,18 @@ class RtOMNYController(Controller):
|
||||
self.socket_put("T1")
|
||||
time.sleep(0.5)
|
||||
|
||||
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackyct=0")
|
||||
self.device_manager.devices.otracky.obj.controller.socket_put_confirmed("trackzct=0")
|
||||
self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed(
|
||||
"trackyct=0"
|
||||
)
|
||||
self.get_device_manager().devices.otracky.obj.controller.socket_put_confirmed(
|
||||
"trackzct=0"
|
||||
)
|
||||
|
||||
self.laser_tracker_wait_on_target()
|
||||
logger.info("Laser tracker running!")
|
||||
print("Laser tracker running!")
|
||||
|
||||
|
||||
def laser_tracker_check_enabled(self) -> bool:
|
||||
self.laser_update_tracker_info()
|
||||
if self.tracker_info["enabled_z"] and self.tracker_info["enabled_y"]:
|
||||
@@ -664,10 +628,11 @@ class RtOMNYController(Controller):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def laser_tracker_wait_on_target(self):
|
||||
max_repeat = 15
|
||||
count = 0
|
||||
while not self.laser_tracker_check_on_target() and count < max_repeat:
|
||||
while not self.laser_tracker_check_on_target() and count<max_repeat:
|
||||
logger.info("Waiting for laser tracker to reach target position.")
|
||||
time.sleep(0.5)
|
||||
count += 1
|
||||
@@ -676,74 +641,75 @@ class RtOMNYController(Controller):
|
||||
raise RtError("Failed to reach laser target position.")
|
||||
|
||||
def laser_tracker_print_intensity_for_otrack_tweaking(self):
|
||||
# self.laser_update_tracker_info()
|
||||
# _laser_tracker_intensity = self.tracker_info["tracker_intensity"]
|
||||
# print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
|
||||
#self.laser_update_tracker_info()
|
||||
#_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
|
||||
#print(f"\r PSD beam intensity: {_laser_tracker_intensity:.2f}\r")
|
||||
self.laser_tracker_show_all(extra_endline="\r")
|
||||
|
||||
def laser_tracker_show_all(self, extra_endline=""):
|
||||
def laser_tracker_show_all(self,extra_endline=""):
|
||||
self.laser_update_tracker_info()
|
||||
enabled_y = self.tracker_info["enabled_y"]
|
||||
print(extra_endline + f"Tracker enabled: {bool(enabled_y)}" + extra_endline)
|
||||
print(extra_endline+f"Tracker enabled: {bool(enabled_y)}"+extra_endline)
|
||||
if self.tracker_info["tracker_intensity"] < self.tracker_info["threshold_intensity_y"]:
|
||||
print(self.red + " LOW INTENSITY" + self.white + extra_endline)
|
||||
print(self.red+" LOW INTENSITY"+self.white+extra_endline)
|
||||
_laser_tracker_intensity = self.tracker_info["tracker_intensity"]
|
||||
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}" + extra_endline)
|
||||
print(f" PSD beam intensity: {_laser_tracker_intensity:.2f}"+extra_endline)
|
||||
_laser_tracker_y_beampos = self.tracker_info["beampos_y"]
|
||||
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}" + extra_endline)
|
||||
print(f" Y beam position: {_laser_tracker_y_beampos:.2f}"+extra_endline)
|
||||
_laser_tracker_y_target = self.tracker_info["target_y"]
|
||||
print(f" target position: {_laser_tracker_y_target:.2f}" + extra_endline)
|
||||
print(f" target position: {_laser_tracker_y_target:.2f}"+extra_endline)
|
||||
_laser_tracker_y_threshold_intensity = self.tracker_info["threshold_intensity_y"]
|
||||
print(
|
||||
f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}" + extra_endline
|
||||
)
|
||||
print(f" threshold intensity: {_laser_tracker_y_threshold_intensity:.2f}"+extra_endline)
|
||||
_laser_tracker_y_piezo_voltage = self.tracker_info["piezo_voltage_y"]
|
||||
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}" + extra_endline)
|
||||
print(f" Piezo voltage: {_laser_tracker_y_piezo_voltage:.2f}"+extra_endline)
|
||||
_laser_tracker_z_beampos = self.tracker_info["beampos_z"]
|
||||
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}" + extra_endline)
|
||||
print(f" Z beam position: {_laser_tracker_z_beampos:.2f}"+extra_endline)
|
||||
_laser_tracker_z_target = self.tracker_info["target_z"]
|
||||
print(f" target position: {_laser_tracker_z_target:.2f}" + extra_endline)
|
||||
print(f" target position: {_laser_tracker_z_target:.2f}"+extra_endline)
|
||||
_laser_tracker_z_threshold_intensity = self.tracker_info["threshold_intensity_z"]
|
||||
print(
|
||||
f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}" + extra_endline
|
||||
)
|
||||
print(f" threshold intensity: {_laser_tracker_z_threshold_intensity:.2f}"+extra_endline)
|
||||
_laser_tracker_z_piezo_voltage = self.tracker_info["piezo_voltage_z"]
|
||||
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}" + extra_endline)
|
||||
print(" Reminder - there is also an upper threshold active in rt\n" + extra_endline)
|
||||
print(f" Piezo voltage: {_laser_tracker_z_piezo_voltage:.2f}"+extra_endline)
|
||||
print(" Reminder - there is also an upper threshold active in rt\n"+extra_endline)
|
||||
if extra_endline == "":
|
||||
self.laser_tracker_galil_status()
|
||||
|
||||
|
||||
|
||||
def laser_tracker_galil_enable(self):
|
||||
otracky_con = self.device_manager.devices.otracky.obj.controller
|
||||
otracky_con = self.get_device_manager().devices.otracky.obj.controller
|
||||
otracky_con.socket_put_confirmed("tracken=1")
|
||||
otracky_con.socket_put_confirmed("trackyct=0")
|
||||
otracky_con.socket_put_confirmed("trackzct=0")
|
||||
|
||||
|
||||
|
||||
def laser_tracker_galil_disable(self):
|
||||
otracky_con = self.device_manager.devices.otracky.obj.controller
|
||||
otracky_con = self.get_device_manager().devices.otracky.obj.controller
|
||||
otracky_con.socket_put_confirmed("tracken=0")
|
||||
|
||||
|
||||
def laser_tracker_galil_status(self):
|
||||
otracky_con = self.device_manager.devices.otracky.obj.controller
|
||||
otracky_con = self.get_device_manager().devices.otracky.obj.controller
|
||||
if bool(float(otracky_con.socket_put_and_receive("MGtracken").strip())) == 0:
|
||||
print(self.red + "Tracking in the Galil Controller is disabled." + self.white)
|
||||
print(self.red+"Tracking in the Galil Controller is disabled."+self.white)
|
||||
print("Use dev.rtx.controller.laser_tracker_galil_enable to enable.\n")
|
||||
return 0
|
||||
return(0)
|
||||
else:
|
||||
print("Tracking in the Galil Controller is enabled.")
|
||||
trackyct = int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
|
||||
trackzct = int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
|
||||
trackyct=int(float(otracky_con.socket_put_and_receive("MGtrackyct").strip()))
|
||||
trackzct=int(float(otracky_con.socket_put_and_receive("MGtrackzct").strip()))
|
||||
print(f"Galil Trackcounters y={trackyct}, z={trackzct}")
|
||||
|
||||
def show_signal_strength_interferometer(self):
|
||||
channelnames = {1: "OSA FZP Y", 2: "ST OSA Y", 3: "OSA FZP X", 4: "ST OSA X", 5: "Angle"}
|
||||
channelnames={1:"OSA FZP Y",2:"ST OSA Y",3:"OSA FZP X",4:"ST OSA X",5:"Angle"}
|
||||
self.feedback_get_status_and_ssi()
|
||||
t = PrettyTable()
|
||||
t.title = f"Interferometer signal strength"
|
||||
t.field_names = ["Channel", "Name", "Value"]
|
||||
for i in range(1, 6):
|
||||
for i in range(1,6):
|
||||
ssi = self.ssi[f"ssi_{i}"]
|
||||
t.add_row([i, channelnames[i], ssi])
|
||||
t.add_row([i,channelnames[i], ssi])
|
||||
print(t)
|
||||
|
||||
def _omny_interferometer_switch_open_socket(self):
|
||||
@@ -756,42 +722,44 @@ class RtOMNYController(Controller):
|
||||
self._omny_interferometer_switch_put_and_receive("?000\r")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
|
||||
def _omny_interferometer_switch_channel(self, channel):
|
||||
self._omny_interferometer_switch_alloff()
|
||||
time.sleep(0.1)
|
||||
if channel == 1: # Relais 1 and 2
|
||||
if channel == 1: #Relais 1 and 2
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!0020003\r")
|
||||
# if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
|
||||
#if "|0003\r" != self._omny_interferometer_switch_put_and_receive("!0020003\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 2: # Relais 3 and 4
|
||||
elif channel == 2: #Relais 3 and 4
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!002000C\r")
|
||||
# if "|000C\r" != self._omny_interferometer_switch_put_and_receive("!002000C\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 3: # Relais 5 and 6
|
||||
elif channel == 3: #Relais 5 and 6
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!0020030\r")
|
||||
# if "|0030\r" != self._omny_interferometer_switch_put_and_receive("!0020030\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 4: # Relais 7 and 8
|
||||
elif channel == 4: #Relais 7 and 8
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
|
||||
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 5: # Relais 9 and 10
|
||||
elif channel == 5: #Relais 9 and 10
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!0020300\r")
|
||||
# if "|0300\r" != self._omny_interferometer_switch_put_and_receive("!0020300\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 6: # Relais 11 and 12
|
||||
elif channel == 6: #Relais 11 and 12
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!0020C00\r")
|
||||
# if "|0C00\r" != self._omny_interferometer_switch_put_and_receive("!0020C00\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 7: # Relais 13 and 14
|
||||
elif channel == 7: #Relais 13 and 14
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!0023000\r")
|
||||
# if "|3000\r" != self._omny_interferometer_switch_put_and_receive("!0023000\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 8: # Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
|
||||
elif channel == 8: #Relais 7 and 8 SPECIAL CASE use osafzp y signal to align osa y
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!00200C0\r")
|
||||
# if "|00C0\r" != self._omny_interferometer_switch_put_and_receive("!00200C0\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
elif channel == 9: # Relais 15 and 16
|
||||
elif channel == 9: #Relais 15 and 16
|
||||
rback = self._omny_interferometer_switch_put_and_receive("!002C000\r")
|
||||
# if "|C000\r" != self._omny_interferometer_switch_put_and_receive("!002C000\r"):
|
||||
# raise RtOMNY_mirror_switchbox_Error("Channel switching failed.")
|
||||
@@ -817,14 +785,14 @@ class RtOMNYController(Controller):
|
||||
self._omny_interferometer_switch_put_and_receive("!00S01\r")
|
||||
|
||||
def _omny_interferometer_switch_alloff(self):
|
||||
# switch all off
|
||||
#switch all off
|
||||
self._omny_interferometer_switch_put_and_receive("!0020000\r")
|
||||
# LED OFF
|
||||
#LED OFF
|
||||
time.sleep(0.1)
|
||||
self._omny_interferometer_switch_put_and_receive("!00S00\r")
|
||||
self._omny_interferometer_switch_put_and_receive("!00S00\r")
|
||||
time.sleep(0.1)
|
||||
alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
|
||||
# check all off
|
||||
alloff = self._omny_interferometer_switch_put_and_receive("?002\r")
|
||||
#check all off
|
||||
if "00" not in alloff:
|
||||
raise RtOMNY_mirror_switchbox_Error("Not all channels switched off.")
|
||||
|
||||
@@ -832,16 +800,17 @@ class RtOMNYController(Controller):
|
||||
self.socket_put("J3")
|
||||
|
||||
def _omny_interferometer_get_signalsample(self, channel, averaging_duration):
|
||||
# channel is string, eg ssi_1
|
||||
# ensure no averaging running currently
|
||||
#channel is string, eg ssi_1
|
||||
#ensure no averaging running currently
|
||||
self.feedback_is_running()
|
||||
|
||||
# measure first sample
|
||||
#measure first sample
|
||||
self._rt_start_averaging_SSI()
|
||||
time.sleep(averaging_duration)
|
||||
self.feedback_is_running()
|
||||
return self.ssi[channel]
|
||||
|
||||
|
||||
def _get_signals_from_table(self, return_table) -> dict:
|
||||
self.average_stdeviations_x_st_fzp += float(return_table[16])
|
||||
self.average_stdeviations_y_st_fzp += float(return_table[18])
|
||||
@@ -862,6 +831,7 @@ class RtOMNYController(Controller):
|
||||
"stdev_x_st_fzp": {"value": float(return_table[16])},
|
||||
"average_y_st_fzp": {"value": float(return_table[17])},
|
||||
"stdev_y_st_fzp": {"value": float(return_table[18])},
|
||||
|
||||
"average_stdeviations_x_st_fzp": {
|
||||
"value": self.average_stdeviations_x_st_fzp / (int(return_table[0]) + 1)
|
||||
},
|
||||
@@ -870,7 +840,7 @@ class RtOMNYController(Controller):
|
||||
},
|
||||
}
|
||||
return signals
|
||||
|
||||
|
||||
@threadlocked
|
||||
def start_scan(self):
|
||||
if not self.feedback_is_running():
|
||||
@@ -892,6 +862,7 @@ class RtOMNYController(Controller):
|
||||
# start a point-by-point scan (for cont scan in flomni it would be "sa")
|
||||
self.socket_put_and_receive("sd")
|
||||
|
||||
|
||||
@retry_once
|
||||
@threadlocked
|
||||
def get_scan_status(self):
|
||||
@@ -910,6 +881,13 @@ class RtOMNYController(Controller):
|
||||
current_position_in_scan = int(float(return_table[2]))
|
||||
return (mode, number_of_positions_planned, current_position_in_scan)
|
||||
|
||||
|
||||
def get_device_manager(self):
|
||||
for axis in self._axis:
|
||||
if hasattr(axis, "device_manager") and axis.device_manager:
|
||||
return axis.device_manager
|
||||
raise BECConfigError("Could not access the device_manager")
|
||||
|
||||
def read_positions_from_sampler(self):
|
||||
# this was for reading after the scan completed
|
||||
number_of_samples_to_read = 1 # self.get_scan_status()[1] #number of valid samples, will be updated upon first data read
|
||||
@@ -923,7 +901,7 @@ class RtOMNYController(Controller):
|
||||
|
||||
# if not (mode==2 or mode==3):
|
||||
# error
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=1, metadata=self.readout_metadata
|
||||
@@ -958,7 +936,7 @@ class RtOMNYController(Controller):
|
||||
signals = self._get_signals_from_table(return_table)
|
||||
self.publish_device_data(signals=signals, point_id=int(return_table[0]))
|
||||
|
||||
self.device_manager.connector.set(
|
||||
self.get_device_manager().connector.set(
|
||||
MessageEndpoints.device_status("rt_scan"),
|
||||
messages.DeviceStatusMessage(
|
||||
device="rt_scan", status=0, metadata=self.readout_metadata
|
||||
@@ -971,16 +949,15 @@ class RtOMNYController(Controller):
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}."
|
||||
)
|
||||
|
||||
self.device_manager.connector.send_client_info(
|
||||
self.get_device_manager().connector.send_client_info(
|
||||
"OMNY statistics: Average of all standard deviations [nm]: x"
|
||||
f" {self.average_stdeviations_x_st_fzp/read_counter*1000:.1f}, y"
|
||||
f" {self.average_stdeviations_y_st_fzp/read_counter*1000:.1f}.",
|
||||
scope="",
|
||||
show_asap=True,
|
||||
)
|
||||
scope="", show_asap=True)
|
||||
|
||||
|
||||
def publish_device_data(self, signals, point_id):
|
||||
self.device_manager.connector.set_and_publish(
|
||||
self.get_device_manager().connector.set_and_publish(
|
||||
MessageEndpoints.device_read("rt_omny"),
|
||||
messages.DeviceMessage(
|
||||
signals=signals, metadata={"point_id": point_id, **self.readout_metadata}
|
||||
@@ -1091,7 +1068,7 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
self.controller = RtOMNYController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
|
||||
self.device_manager = device_manager
|
||||
@@ -1119,10 +1096,6 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
self.low_limit_travel.put(limits[0])
|
||||
self.high_limit_travel.put(limits[1])
|
||||
|
||||
def wait_for_connection(self, timeout: int = 30, **kwargs) -> None:
|
||||
"""Wait for the device to be connected."""
|
||||
self.controller.on(timeout=timeout)
|
||||
|
||||
@property
|
||||
def limits(self):
|
||||
return (self.low_limit_travel.get(), self.high_limit_travel.get())
|
||||
@@ -1209,6 +1182,7 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
|
||||
return status
|
||||
|
||||
|
||||
@property
|
||||
def axis_Id(self):
|
||||
return self._axis_Id_alpha
|
||||
@@ -1253,7 +1227,7 @@ class RtOMNYMotor(Device, PositionerBase):
|
||||
|
||||
if __name__ == "__main__":
|
||||
rtcontroller = RtOMNYController(
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222, device_manager=None
|
||||
socket_cls=SocketIO, socket_host="mpc2844.psi.ch", socket_port=2222
|
||||
)
|
||||
rtcontroller.on()
|
||||
rtcontroller.laser_tracker_on()
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
import time
|
||||
import socket
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd import EpicsSignal
|
||||
|
||||
|
||||
class OMNYFastEpicsShutterError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _detect_host_pv():
|
||||
"""Detect host subnet and return appropriate PV name."""
|
||||
try:
|
||||
hostname = socket.gethostname()
|
||||
local_ip = socket.gethostbyname(hostname)
|
||||
if local_ip.startswith("129.129.122."):
|
||||
return "X12SA-ES1-TTL:OUT_01"
|
||||
else:
|
||||
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
|
||||
except Exception as ex:
|
||||
print(f"Warning: could not detect IP subnet ({ex}), using dummy shutter.")
|
||||
return "XOMNYI-XEYE-DUMMYSHUTTER:0"
|
||||
|
||||
|
||||
class OMNYFastEpicsShutter(Device):
|
||||
"""
|
||||
Fast EPICS shutter with automatic PV selection based on host subnet.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["fshopen", "fshclose", "fshstatus", "fshinfo", "help"]
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
# PV is detected dynamically at import time
|
||||
shutter = Cpt(EpicsSignal, name="shutter", read_pv=_detect_host_pv(), auto_monitor=True)
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
super().__init__(prefix, name=name, **kwargs)
|
||||
self.shutter.subscribe(self._emit_value)
|
||||
|
||||
def _emit_value(self, **kwargs):
|
||||
timestamp = kwargs.pop("timestamp", time.time())
|
||||
self.wait_for_connection()
|
||||
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# User-facing shutter control functions
|
||||
# -----------------------------------------------------
|
||||
|
||||
def fshopen(self):
|
||||
"""Open the fast shutter."""
|
||||
try:
|
||||
self.shutter.put(1, wait=True)
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to open shutter: {ex}")
|
||||
|
||||
def fshclose(self):
|
||||
"""Close the fast shutter."""
|
||||
try:
|
||||
self.shutter.put(0, wait=True)
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to close shutter: {ex}")
|
||||
|
||||
def fshstatus(self):
|
||||
"""Return the fast shutter status (0=closed, 1=open)."""
|
||||
try:
|
||||
return self.shutter.get()
|
||||
except Exception as ex:
|
||||
raise OMNYFastEpicsShutterError(f"Failed to read shutter status: {ex}")
|
||||
|
||||
def fshinfo(self):
|
||||
"""Print information about which EPICS PV channel is being used."""
|
||||
pvname = self.shutter.pvname
|
||||
print(f"Fast shutter connected to EPICS channel: {pvname}")
|
||||
return pvname
|
||||
|
||||
def help(self):
|
||||
"""Display available user methods."""
|
||||
print("Available methods:")
|
||||
for method in self.USER_ACCESS:
|
||||
print(f" - {method}")
|
||||
@@ -1,65 +0,0 @@
|
||||
import requests
|
||||
import threading
|
||||
import cv2
|
||||
import numpy as np
|
||||
from ophyd import Device, Component as Cpt
|
||||
from ophyd_devices import PreviewSignal
|
||||
import traceback
|
||||
|
||||
from bec_lib.logger import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
class WebcamViewer(Device):
|
||||
USER_ACCESS = ["start_live_mode", "stop_live_mode"]
|
||||
preview = Cpt(PreviewSignal, ndim=2, num_rotation_90=0, transpose=False)
|
||||
|
||||
def __init__(self, url:str, name:str, num_rotation_90=0, transpose=False, **kwargs) -> None:
|
||||
super().__init__(name=name, **kwargs)
|
||||
self.url = url
|
||||
self._connection = None
|
||||
self._update_thread = None
|
||||
self._buffer = b""
|
||||
self._shutdown_event = threading.Event()
|
||||
self.preview.num_rotation_90 = num_rotation_90
|
||||
self.preview.transpose = transpose
|
||||
|
||||
def start_live_mode(self) -> None:
|
||||
if self._connection is not None:
|
||||
return
|
||||
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
|
||||
self._update_thread.start()
|
||||
|
||||
def _update_loop(self) -> None:
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
self._connection = requests.get(self.url, stream=True)
|
||||
for chunk in self._connection.iter_content(chunk_size=1024):
|
||||
self._buffer += chunk
|
||||
start = self._buffer.find(b'\xff\xd8') # JPEG start
|
||||
end = self._buffer.find(b'\xff\xd9') # JPEG end
|
||||
if start == -1 or end == -1:
|
||||
continue
|
||||
jpg = self._buffer[start:end+2]
|
||||
self._buffer = self._buffer[end+2:]
|
||||
|
||||
image = cv2.imdecode(np.frombuffer(jpg, np.uint8), cv2.IMREAD_COLOR)
|
||||
if image is not None:
|
||||
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||||
self.preview.put(image)
|
||||
except Exception as exc:
|
||||
content = traceback.format_exc()
|
||||
logger.error(f"Image update loop failed: {content}")
|
||||
|
||||
def stop_live_mode(self) -> None:
|
||||
if self._connection is None:
|
||||
return
|
||||
self._shutdown_event.set()
|
||||
if self._connection is not None:
|
||||
self._connection.close()
|
||||
self._connection = None
|
||||
if self._update_thread is not None:
|
||||
self._update_thread.join()
|
||||
self._update_thread = None
|
||||
|
||||
self._shutdown_event.clear()
|
||||
@@ -1,74 +0,0 @@
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device
|
||||
from ophyd import DynamicDeviceComponent as Dcpt
|
||||
from ophyd import EpicsSignal
|
||||
|
||||
|
||||
|
||||
class OMNYXRayEpicsGUI(Device):
|
||||
|
||||
save_frame = Cpt(
|
||||
EpicsSignal, name="save_frame", read_pv="XOMNYI-XEYE-SAVFRAME:0",auto_monitor=True
|
||||
)
|
||||
update_frame_acqdone = Cpt(
|
||||
EpicsSignal, name="update_frame_acqdone", read_pv="XOMNYI-XEYE-ACQDONE:0",auto_monitor=True
|
||||
)
|
||||
update_frame_acq = Cpt(
|
||||
EpicsSignal, name="update_frame_acq", read_pv="XOMNYI-XEYE-ACQ:0",auto_monitor=True
|
||||
)
|
||||
width_y_dynamic = {
|
||||
f"width_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YWIDTH_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
width_y = Dcpt(width_y_dynamic)
|
||||
width_x_dynamic = {
|
||||
f"width_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XWIDTH_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
width_x = Dcpt(width_x_dynamic)
|
||||
enable_mv_x = Cpt(
|
||||
EpicsSignal, name="enable_mv_x", read_pv="XOMNYI-XEYE-ENAMVX:0",auto_monitor=True
|
||||
)
|
||||
enable_mv_y = Cpt(
|
||||
EpicsSignal, name="enable_mv_y", read_pv="XOMNYI-XEYE-ENAMVY:0",auto_monitor=True
|
||||
)
|
||||
send_message = Cpt(
|
||||
EpicsSignal, name="send_message", read_pv="XOMNYI-XEYE-MESSAGE:0.DESC",auto_monitor=True
|
||||
)
|
||||
sample_name = Cpt(
|
||||
EpicsSignal, name="sample_name", read_pv="XOMNYI-XEYE-SAMPLENAME:0.DESC",auto_monitor=True
|
||||
)
|
||||
angle = Cpt(
|
||||
EpicsSignal, name="angle", read_pv="XOMNYI-XEYE-ANGLE:0",auto_monitor=True
|
||||
)
|
||||
pixel_size = Cpt(
|
||||
EpicsSignal, name="pixel_size", read_pv="XOMNYI-XEYE-PIXELSIZE:0",auto_monitor=True
|
||||
)
|
||||
submit = Cpt(
|
||||
EpicsSignal, name="submit", read_pv="XOMNYI-XEYE-SUBMIT:0",auto_monitor=True
|
||||
)
|
||||
step = Cpt(
|
||||
EpicsSignal, name="step", read_pv="XOMNYI-XEYE-STEP:0",auto_monitor=True
|
||||
)
|
||||
xval_x_dynamic = {
|
||||
f"xval_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-XVAL_X:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
xval_x = Dcpt(xval_x_dynamic)
|
||||
yval_y_dynamic = {
|
||||
f"yval_y_{i}": (EpicsSignal, f"XOMNYI-XEYE-YVAL_Y:{i}", {"auto_monitor": True}) for i in range(0, 11)
|
||||
}
|
||||
yval_y = Dcpt(yval_y_dynamic)
|
||||
recbg = Cpt(
|
||||
EpicsSignal, name="recbg", read_pv="XOMNYI-XEYE-RECBG:0",auto_monitor=True
|
||||
)
|
||||
stage_pos_x_dynamic = {
|
||||
f"stage_pos_x_{i}": (EpicsSignal, f"XOMNYI-XEYE-STAGEPOSX:{i}", {"auto_monitor": True}) for i in range(1, 6)
|
||||
}
|
||||
stage_pos_x = Dcpt(stage_pos_x_dynamic)
|
||||
mvx = Cpt(
|
||||
EpicsSignal, name="mvx", read_pv="XOMNYI-XEYE-MVX:0",auto_monitor=True
|
||||
)
|
||||
mvy = Cpt(
|
||||
EpicsSignal, name="mvy", read_pv="XOMNYI-XEYE-MVY:0",auto_monitor=True
|
||||
)
|
||||
|
||||
|
||||
@@ -93,7 +93,6 @@ class SmaractController(Controller):
|
||||
socket_cls=None,
|
||||
socket_host=None,
|
||||
socket_port=None,
|
||||
device_manager=None,
|
||||
attr_name="",
|
||||
labels=None,
|
||||
):
|
||||
@@ -103,7 +102,6 @@ class SmaractController(Controller):
|
||||
socket_cls=socket_cls,
|
||||
socket_host=socket_host,
|
||||
socket_port=socket_port,
|
||||
device_manager=device_manager,
|
||||
attr_name=attr_name,
|
||||
parent=parent,
|
||||
labels=labels,
|
||||
|
||||
@@ -123,11 +123,10 @@ class SmaractMotor(Device, PositionerBase):
|
||||
limits=None,
|
||||
sign=1,
|
||||
socket_cls=SocketIO,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
self.controller = SmaractController(
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port, device_manager=device_manager
|
||||
socket_cls=socket_cls, socket_host=host, socket_port=port
|
||||
)
|
||||
self.axis_Id = axis_Id
|
||||
self.sign = sign
|
||||
|
||||
@@ -33,11 +33,11 @@ logger = bec_logger.logger
|
||||
|
||||
class FlomniFermatScan(SyncFlyScanBase):
|
||||
scan_name = "flomni_fermat_scan"
|
||||
scan_report_hint = "table"
|
||||
scan_type = "fly"
|
||||
required_kwargs = ["fovx", "fovy", "exp_time", "step", "angle"]
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
use_scan_progress_report = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -74,7 +74,6 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
"""
|
||||
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.show_live_table = False
|
||||
self.axis = []
|
||||
self.fovx = fovx
|
||||
self.fovy = fovy
|
||||
@@ -169,7 +168,6 @@ class FlomniFermatScan(SyncFlyScanBase):
|
||||
tracker_signal_status = yield from self.stubs.send_rpc_and_wait(
|
||||
"rtx", "controller.laser_tracker_check_signalstrength"
|
||||
)
|
||||
#self.device_manager.connector.send_client_info(tracker_signal_status)
|
||||
if tracker_signal_status == "low":
|
||||
self.device_manager.connector.raise_alarm(
|
||||
severity=0,
|
||||
|
||||
@@ -88,14 +88,6 @@ Depending on the tomo mode following parameters can be given to the `flomni.tomo
|
||||
| Golden ratio tomography (sorted in bunches) | projection_number=None |
|
||||
| Equally spaced with golden starting angle | projection_number=None |
|
||||
|
||||
### GUI tools
|
||||
|
||||
During operation the BEC GUI will show the relevant cameras or progress information. To manually switch view TAB completion on 'flomni.flomnigui_' will show all options to control the GUI. Most useful
|
||||
'flomni.flomnigui_show_cameras()' will show the cameras for sample transfer and interior overview
|
||||
'flomni.flomnigui_show_progress()' will show the measurement progress GUI
|
||||
'flomnigui_show_xeyealign()' will show the XrayEye alignment GUI
|
||||
|
||||
|
||||
## How to setup flOMNI (software)
|
||||
|
||||
This part of the manual is intended for beamline staff and expert users
|
||||
@@ -226,15 +218,9 @@ Update the values by, example for feyex and in position,
|
||||
To refresh the frame of the xray eye windows software
|
||||
`flomni.xrayeye_update_frame()`
|
||||
|
||||
This command can also be called to keep the shutter open and live view active
|
||||
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
|
||||
|
||||
To start the xray eye alignment (and clear any previous alignment)
|
||||
`flomni.xrayeye_alignment_start()`
|
||||
|
||||
This command can also be called to keep the shutter open and live view active. Warning: The dose to the sample will be significantly higher.
|
||||
`flomni.xrayeye_update_frame(keep_shutter_open=True)`
|
||||
|
||||
To load the fit parameters from directory _dir_path_ computed by _SPEC_ptycho_align.m_ in Matlab run
|
||||
`flomni.read_alignment_offset(dir_path='')`
|
||||
The loading routine uses default values for the vertical alignment. This behavior can be changed (e.g. for getting new default values) by the parameter `use_vertical_default_values=False`.
|
||||
|
||||
@@ -16,7 +16,7 @@ dependencies = [
|
||||
"bec_ipython_client",
|
||||
"bec_lib",
|
||||
"bec_server",
|
||||
"ophyd_devices~=1.29",
|
||||
"ophyd_devices",
|
||||
"std_daq_client",
|
||||
"jfjoch-client",
|
||||
"rich",
|
||||
@@ -24,7 +24,6 @@ dependencies = [
|
||||
"pyueye", # for the IDS uEye camera
|
||||
"bec_widgets",
|
||||
"zmq",
|
||||
"opencv-python",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fupr_ophyd import FuprGalilController, FuprGal
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fsamroy(dm_with_devices):
|
||||
def fsamroy():
|
||||
FuprGalilController._reset_controller()
|
||||
fsamroy_motor = FuprGalilMotor(
|
||||
"A",
|
||||
name="fsamroy",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"A", name="fsamroy", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
fsamroy_motor.controller.on()
|
||||
assert isinstance(fsamroy_motor.controller, FuprGalilController)
|
||||
|
||||
@@ -8,15 +8,10 @@ from csaxs_bec.devices.omny.galil.lgalil_ophyd import LamniGalilController, Lamn
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyey(dm_with_devices):
|
||||
def leyey():
|
||||
LamniGalilController._reset_controller()
|
||||
leyey_motor = LamniGalilMotor(
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
leyey_motor.controller.on()
|
||||
yield leyey_motor
|
||||
@@ -25,15 +20,10 @@ def leyey(dm_with_devices):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyex(dm_with_devices):
|
||||
def leyex():
|
||||
LamniGalilController._reset_controller()
|
||||
leyex_motor = LamniGalilMotor(
|
||||
"A",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"A", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
leyex_motor.controller.on()
|
||||
yield leyex_motor
|
||||
|
||||
@@ -7,15 +7,10 @@ from csaxs_bec.devices.omny.galil.fgalil_ophyd import FlomniGalilController, Flo
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyey(dm_with_devices):
|
||||
def leyey():
|
||||
FlomniGalilController._reset_controller()
|
||||
leyey_motor = FlomniGalilMotor(
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
leyey_motor.controller.on()
|
||||
yield leyey_motor
|
||||
@@ -24,15 +19,10 @@ def leyey(dm_with_devices):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def leyex(dm_with_devices):
|
||||
def leyex():
|
||||
FlomniGalilController._reset_controller()
|
||||
leyex_motor = FlomniGalilMotor(
|
||||
"H",
|
||||
name="leyey",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"H", name="leyey", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
leyex_motor.controller.on()
|
||||
yield leyex_motor
|
||||
@@ -167,16 +157,11 @@ def test_find_reference(leyex, axis_nr, socket_put_messages, socket_get_messages
|
||||
],
|
||||
)
|
||||
def test_fosaz_light_curtain_is_triggered(
|
||||
axis_Id, socket_put_messages, socket_get_messages, triggered, dm_with_devices
|
||||
axis_Id, socket_put_messages, socket_get_messages, triggered
|
||||
):
|
||||
"""test that the light curtain is triggered"""
|
||||
fosaz = FlomniGalilMotor(
|
||||
axis_Id,
|
||||
name="fosaz",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8081,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
axis_Id, name="fosaz", host="mpc2680.psi.ch", port=8081, socket_cls=SocketMock
|
||||
)
|
||||
fosaz.controller.on()
|
||||
fosaz.controller.sock.flush_buffer()
|
||||
|
||||
@@ -5,7 +5,7 @@ from unittest import mock
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from csaxs_bec.devices.ids_cameras.ids_camera import IDSCamera
|
||||
from csaxs_bec.devices.ids_cameras.ids_camera_new import IDSCamera
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
|
||||
@@ -16,10 +16,7 @@ def controller():
|
||||
"""
|
||||
with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls:
|
||||
controller = NPointController(
|
||||
socket_cls=socket_cls,
|
||||
socket_host="localhost",
|
||||
socket_port=1234,
|
||||
device_manager=mock.MagicMock(),
|
||||
socket_cls=socket_cls, socket_host="localhost", socket_port=1234
|
||||
)
|
||||
controller.on()
|
||||
controller.sock.reset_mock()
|
||||
@@ -28,18 +25,13 @@ def controller():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def npointx(dm_with_devices):
|
||||
def npointx():
|
||||
"""
|
||||
Fixture to create a NPointAxis object.
|
||||
"""
|
||||
controller = mock.MagicMock()
|
||||
npointx = NPointAxis(
|
||||
axis_Id="A",
|
||||
name="npointx",
|
||||
host="localhost",
|
||||
port=1234,
|
||||
socket_cls=controller,
|
||||
device_manager=dm_with_devices,
|
||||
axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
|
||||
)
|
||||
npointx.controller.on()
|
||||
npointx.controller.sock.reset_mock()
|
||||
@@ -115,18 +107,13 @@ def test_axis_get_in(npointx, axis, msg_in, msg_out):
|
||||
npointx.controller.sock.put.assert_called_once_with(msg_in)
|
||||
|
||||
|
||||
def test_axis_out_of_range(dm_with_devices):
|
||||
def test_axis_out_of_range(controller):
|
||||
"""
|
||||
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
|
||||
"""
|
||||
with pytest.raises(ValueError):
|
||||
npointx = NPointAxis(
|
||||
axis_Id="G",
|
||||
name="npointx",
|
||||
host="localhost",
|
||||
port=1234,
|
||||
socket_cls=mock.MagicMock(),
|
||||
device_manager=dm_with_devices,
|
||||
axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -11,29 +11,26 @@ from csaxs_bec.devices.omny.rt.rt_ophyd import RtError
|
||||
def rt_flomni():
|
||||
RtFlomniController._reset_controller()
|
||||
rt_flomni = RtFlomniController(
|
||||
name="rt_flomni",
|
||||
socket_cls=SocketMock,
|
||||
socket_host="localhost",
|
||||
socket_port=8081,
|
||||
device_manager=mock.MagicMock(),
|
||||
name="rt_flomni", socket_cls=SocketMock, socket_host="localhost", socket_port=8081
|
||||
)
|
||||
with mock.patch.object(rt_flomni, "sock"):
|
||||
rtx = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rtx.name = "rtx"
|
||||
rtx.axis_Id = "A"
|
||||
rtx.axis_Id_numeric = 0
|
||||
rty = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rty.name = "rty"
|
||||
rty.axis_Id = "B"
|
||||
rty.axis_Id_numeric = 1
|
||||
rtz = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rtz.name = "rtz"
|
||||
rtz.axis_Id = "C"
|
||||
rtz.axis_Id_numeric = 2
|
||||
rt_flomni.set_axis(axis=rtx, axis_nr=0)
|
||||
rt_flomni.set_axis(axis=rty, axis_nr=1)
|
||||
rt_flomni.set_axis(axis=rtz, axis_nr=2)
|
||||
yield rt_flomni
|
||||
with mock.patch.object(rt_flomni, "get_device_manager"):
|
||||
with mock.patch.object(rt_flomni, "sock"):
|
||||
rtx = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rtx.name = "rtx"
|
||||
rtx.axis_Id = "A"
|
||||
rtx.axis_Id_numeric = 0
|
||||
rty = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rty.name = "rty"
|
||||
rty.axis_Id = "B"
|
||||
rty.axis_Id_numeric = 1
|
||||
rtz = mock.MagicMock(spec=RtFlomniMotor)
|
||||
rtz.name = "rtz"
|
||||
rtz.axis_Id = "C"
|
||||
rtz.axis_Id_numeric = 2
|
||||
rt_flomni.set_axis(axis=rtx, axis_nr=0)
|
||||
rt_flomni.set_axis(axis=rty, axis_nr=1)
|
||||
rt_flomni.set_axis(axis=rtz, axis_nr=2)
|
||||
yield rt_flomni
|
||||
RtFlomniController._reset_controller()
|
||||
|
||||
|
||||
@@ -55,7 +52,7 @@ def test_rt_flomni_feedback_is_running(rt_flomni, return_value, is_running):
|
||||
|
||||
def test_feedback_enable_with_reset(rt_flomni):
|
||||
|
||||
device_manager = rt_flomni.device_manager
|
||||
device_manager = rt_flomni.get_device_manager()
|
||||
device_manager.devices.fsamx.user_parameter.get.return_value = 0.05
|
||||
device_manager.devices.fsamx.obj.readback.get.return_value = 0.05
|
||||
|
||||
@@ -71,7 +68,7 @@ def test_feedback_enable_with_reset(rt_flomni):
|
||||
|
||||
|
||||
def test_move_samx_to_scan_region(rt_flomni):
|
||||
device_manager = rt_flomni.device_manager
|
||||
device_manager = rt_flomni.get_device_manager()
|
||||
device_manager.devices.rtx.user_parameter.get.return_value = 1
|
||||
rt_flomni.move_samx_to_scan_region(20, 2)
|
||||
assert mock.call(b"v0\n") not in rt_flomni.sock.put.mock_calls
|
||||
@@ -79,16 +76,16 @@ def test_move_samx_to_scan_region(rt_flomni):
|
||||
|
||||
|
||||
def test_feedback_enable_without_reset(rt_flomni):
|
||||
with mock.patch.object(rt_flomni, "set_device_read_write") as set_device_read_write:
|
||||
with mock.patch.object(rt_flomni, "set_device_enabled") as set_device_enabled:
|
||||
with mock.patch.object(rt_flomni, "feedback_is_running", return_value=True):
|
||||
with mock.patch.object(rt_flomni, "laser_tracker_on") as laser_tracker_on:
|
||||
rt_flomni.feedback_enable_without_reset()
|
||||
laser_tracker_on.assert_called_once()
|
||||
assert mock.call(b"l3\n") in rt_flomni.sock.put.mock_calls
|
||||
assert mock.call("fsamx", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("fsamy", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("foptx", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("fopty", False) in set_device_read_write.mock_calls
|
||||
assert mock.call("fsamx", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("fsamy", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("foptx", False) in set_device_enabled.mock_calls
|
||||
assert mock.call("fopty", False) in set_device_enabled.mock_calls
|
||||
|
||||
|
||||
def test_feedback_enable_without_reset_raises(rt_flomni):
|
||||
|
||||
@@ -10,27 +10,19 @@ from csaxs_bec.devices.smaract.smaract_ophyd import SmaractMotor
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def controller(dm_with_devices):
|
||||
def controller():
|
||||
SmaractController._reset_controller()
|
||||
controller = SmaractController(
|
||||
socket_cls=SocketMock, socket_host="dummy", socket_port=123, device_manager=dm_with_devices
|
||||
)
|
||||
controller = SmaractController(socket_cls=SocketMock, socket_host="dummy", socket_port=123)
|
||||
controller.on()
|
||||
controller.sock.flush_buffer()
|
||||
yield controller
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lsmarA(dm_with_devices):
|
||||
def lsmarA():
|
||||
SmaractController._reset_controller()
|
||||
motor_a = SmaractMotor(
|
||||
"A",
|
||||
name="lsmarA",
|
||||
host="mpc2680.psi.ch",
|
||||
port=8085,
|
||||
sign=1,
|
||||
socket_cls=SocketMock,
|
||||
device_manager=dm_with_devices,
|
||||
"A", name="lsmarA", host="mpc2680.psi.ch", port=8085, sign=1, socket_cls=SocketMock
|
||||
)
|
||||
motor_a.controller.on()
|
||||
motor_a.controller.sock.flush_buffer()
|
||||
|
||||
Reference in New Issue
Block a user