Compare commits

...

14 Commits

Author SHA1 Message Date
Holler Mirko
d5e01418d1 variables for progress 2024-05-08 11:34:57 +02:00
Holler Mirko
d6f88b599a improve show all 2024-05-07 15:24:55 +02:00
Holler Mirko
ab2270683f feature_lamni_init 2024-05-07 15:21:06 +02:00
Holler Mirko
7b6652f867 feature_lamni_init 2024-05-07 15:21:06 +02:00
Holler Mirko
cb525e8778 feature: initialization of lamni stages 2024-05-07 15:21:06 +02:00
21c3e3cab4 Merge branch 'wakonig_k-main-patch-ef88' into 'main'
moved gitlab ci to awi-utils

See merge request bec/csaxs_bec!16
2024-05-07 09:30:43 +02:00
adfdb6fe5d ci: moved to awi utils template 2024-05-07 09:27:55 +02:00
73d9a967d6 Merge branch 'test/async_readback_fix' into 'main'
test: fixed async readback dummy data

See merge request bec/csaxs_bec!17
2024-05-07 09:25:40 +02:00
787e0afcec test: fixed async readback dummy data 2024-05-07 09:23:04 +02:00
74d941c249 Merge branch 'backlashcommandfix' into 'main'
backlashcommandfix

See merge request bec/csaxs_bec!13
2024-04-26 10:32:17 +02:00
f601593c77 test(galil): fixed wrong backlash assert 2024-04-26 10:29:57 +02:00
Holler Mirko
e0d93cd84e backlashcommandfix 2024-04-26 10:00:44 +02:00
803dffb9cd Merge branch 'fix' into 'main'
fix

See merge request bec/csaxs_bec!11
2024-04-25 23:32:09 +02:00
Holler Mirko
3cbafa51be fix 2024-04-25 15:40:53 +02:00
7 changed files with 351 additions and 173 deletions

View File

@@ -1,105 +1,7 @@
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.10
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS
when: never
- if: $CI_COMMIT_BRANCH
include:
- template: Security/Secret-Detection.gitlab-ci.yml
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
#commands to run in the Docker container before starting each job.
before_script:
- pip install -e .[dev]
# different stages in the pipeline
stages:
- Formatter
- test # must be called test for security/secret-detection to work
- AdditionalTests
- Deploy
pylint:
stage: Formatter
script:
- pip install pylint pylint-exit anybadge
- pip install -e .[dev]
- mkdir ./pylint
- pylint ./csaxs_bec --output-format=text --output=./pylint/pylint.log | tee ./pylint/pylint.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
- anybadge --label=Pylint --file=pylint/pylint.svg --value=$PYLINT_SCORE 2=red 4=orange 8=yellow 10=green
- echo "Pylint score is $PYLINT_SCORE"
artifacts:
paths:
- ./pylint/
expire_in: 1 week
pylint-check:
stage: Formatter
needs: []
allow_failure: true
before_script:
- pip install pylint pylint-exit anybadge
- apt-get update
- apt-get install -y bc
script:
# Identify changed Python files
- if [ "$CI_PIPELINE_SOURCE" == "merge_request_event" ]; then
TARGET_BRANCH_COMMIT_SHA=$(git rev-parse $CI_MERGE_REQUEST_TARGET_BRANCH_NAME);
CHANGED_FILES=$(git diff --name-only $SOURCE_BRANCH_COMMIT_SHA $TARGET_BRANCH_COMMIT_SHA | grep '\.py$' || true);
else
CHANGED_FILES=$(git diff --name-only $CI_COMMIT_BEFORE_SHA $CI_COMMIT_SHA | grep '\.py$' || true);
fi
- if [ -z "$CHANGED_FILES" ]; then echo "No Python files changed."; exit 0; fi
# Run pylint only on changed files
- mkdir ./pylint
- pylint $CHANGED_FILES --output-format=text . | tee ./pylint/pylint_changed_files.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint_changed_files.log)
- echo "Pylint score is $PYLINT_SCORE"
# Fail the job if the pylint score is below 9
- if [ "$(echo "$PYLINT_SCORE < 9" | bc)" -eq 1 ]; then echo "Your pylint score is below the acceptable threshold (9)."; exit 1; fi
artifacts:
paths:
- ./pylint/
expire_in: 1 week
secret_detection:
before_script:
- ''
config_test:
stage: test
script:
- ophyd_test --config ./csaxs_bec/device_configs/ --output ./config_tests
artifacts:
paths:
- ./config_tests
when: on_failure
expire_in: "30 days"
allow_failure: true
pytest:
stage: test
script:
- pip install coverage
- coverage run --source=./csaxs_bec -m pytest -v --junitxml=report.xml --random-order --full-trace ./tests
- coverage report
- coverage xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -13,6 +13,166 @@ umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
dev.lsamrot.enabled=True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
print("starting...")
else:
return
self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled=False
print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
print("ok then...")
else:
return
print("Referencing loptz")
self.drive_axis_to_limit(dev.loptz, "forward")
self.find_reference_mark(dev.loptz)
print("Referencing loptx")
self.drive_axis_to_limit(dev.loptx, "reverse")
self.find_reference_mark(dev.loptx)
print("Referencing lopty")
self.drive_axis_to_limit(dev.lopty, "forward")
self.find_reference_mark(dev.lopty)
print("Referencing lsamx")
self.drive_axis_to_limit(dev.lsamx, "forward")
self.find_reference_mark(dev.lsamx)
print("Referencing lsamy")
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
print("starting...")
else:
return
print("Referencing leyex")
self.drive_axis_to_limit(dev.leyex, "forward")
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(0, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
#dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
#time.sleep(1)
#dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
#time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
def find_reference_mark(self, device):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.find_reference(axis_id_numeric)
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.drive_axis_to_limit(axis_id_numeric, direction)
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
print("Start moving stages...")
else:
print("Stopping.")
return
lsamx_center = dev.lsamx.user_parameter.get("center")
if lsamx_center is None:
raise LamNIInitError(
"Could not find a lsamx center position. Please check your device config."
)
lsamy_center = dev.lsamy.user_parameter.get("center")
if lsamy_center is None:
raise LamNIInitError(
"Could not find a lsamy center position. Please check your device config."
)
umv(dev.lsamx, lsamx_center, dev.lsamy, lsamy_center, dev.loptx, -0.3, dev.lopty, 0)
umv(dev.losax, -1)
umv(dev.loptz, 82.25)
umv(dev.lsamrot, -1)
umv(dev.lsamrot, 0)
time.sleep(2)
dev.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
def check_all_axes_of_lamni_referenced(self):
if (
dev.losax.controller.axis_is_referenced(0)
& dev.losax.controller.axis_is_referenced(1)
& dev.losax.controller.axis_is_referenced(2)
& dev.lsamx.controller.all_axes_referenced()
):
print("All axes of LamNI are referenced.")
return True
else:
return False
class LamNIOpticsMixin:
@staticmethod
def _get_user_param_safe(device, var):

View File

@@ -17,6 +17,7 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from .lamni_optics_mixin import LamNIOpticsMixin
from .lamni_optics_mixin import LaMNIInitStagesMixin
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
@@ -32,7 +33,7 @@ class XrayEyeAlign:
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
self.xeye = self.device_manager.devices.xeye
# self.xeye = self.device_manager.devices.xeye
self.alignment_values = defaultdict(list)
self._reset_init_values()
self.corr_pos_x = []
@@ -511,7 +512,7 @@ class LamNI(LamNIOpticsMixin):
super().__init__()
self.client = client
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStagesMixin()
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
@@ -524,6 +525,7 @@ class LamNI(LamNIOpticsMixin):
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.progress = {}
def get_beamline_checks_enabled(self):
print(
@@ -586,6 +588,27 @@ class LamNI(LamNIOpticsMixin):
def tomo_circfov(self, val: float):
self.client.set_global_var("tomo_circfov", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: float):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
#elif val == 2:
# # golden ratio tomography (sorted bunches)
# self.client.set_global_var("tomo_type", val)
#elif val == 3:
# # equally spaced tomography with starting angles shifted by golden ratio
# self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
@@ -948,47 +971,73 @@ class LamNI(LamNIOpticsMixin):
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
angle_end = start_angle + 360
for angle in np.linspace(
angles = np.linspace(
start_angle + _tomo_shift_angles,
angle_end,
num=int(360 / self.tomo_angle_stepsize) + 1,
endpoint=True,
):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
)
# reverse even sub-tomograms
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = angles.index(angle)
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
successful = True
def _print_progress(self):
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
@@ -1006,22 +1055,29 @@ class LamNI(LamNIOpticsMixin):
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
if subtomo_start == 1 and start_angle is None:
if (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None):
# pylint: disable=undefined-variable
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
with scans.dataset_id_on_hold:
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
if self.tomo_type == 1:
# 8 equally spaced sub-tomograms
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
def tomo_parameters(self):
"""print and update the tomo parameters"""
@@ -1044,8 +1100,10 @@ class LamNI(LamNIOpticsMixin):
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Total number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Sample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")

View File

@@ -4,7 +4,7 @@ from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import FRAME, PrettyTable
from prettytable import PrettyTable, FRAME
class OMNYSampleStorageError(Exception):
@@ -28,6 +28,7 @@ class OMNYSampleStorage(Device):
"unset_shuttle_slot",
"get_shuttle_name_slot",
"is_shuttle_slot_used",
"search_shuttle_in_slot",
"show_all",
]
SUB_VALUE = "value"
@@ -59,7 +60,7 @@ class OMNYSampleStorage(Device):
parking_placed = Dcpt(parking_placed)
sample_placed = {
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_placed = Dcpt(sample_placed)
@@ -227,11 +228,25 @@ class OMNYSampleStorage(Device):
val = self.sample_in_samplestage_name.get()
return str(val)
def search_shuttle_in_slot(self, shuttle: str) -> int:
returnvalue = 0
for i in range(1, 7):
if self.get_shuttle_name_slot(i) == shuttle:
returnvalue = i
return returnvalue
def show_all(self):
t = PrettyTable()
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
for ch in ["A", "B", "C"]:
t.clear()
t.title = "Shuttle " + ch
shuttle_in_slot = self.search_shuttle_in_slot(ch)
if shuttle_in_slot > 0:
t.title = green + "Shuttle " + ch + " in OMNY slot " + str(shuttle_in_slot) + white
else:
t.title = red + "Shuttle " + ch + white
field_names = [""]
for ax in [1, 3, 5]:
row = []

View File

@@ -94,7 +94,7 @@ class GalilController(Controller):
if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0)
backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[axis]")) != 0)
backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[{axis_Id_numeric}]")) != 0)
return bool(
is_moving or backlash_is_active or self.is_thread_active(0) or self.is_thread_active(2)
)
@@ -118,6 +118,9 @@ class GalilController(Controller):
rt_not_blocked_by_galil = bool(self.socket_put_and_receive(f"MG@OUT[9]"))
air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]"))
return rt_not_blocked_by_galil and air_off
def get_digital_input(self,channel):
return bool(float(self.socket_put_and_receive(f"MG @IN[{channel}]").strip()))
def axis_is_referenced(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
@@ -146,9 +149,9 @@ class GalilController(Controller):
self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_confirmed(f"ndir={direction_flag}")
self.socket_put_confirmed("XQ#NEWPAR")
time.sleep(0.005)
time.sleep(0.1)
self.socket_put_confirmed("XQ#FES")
time.sleep(0.01)
time.sleep(0.1)
while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.01)
@@ -169,8 +172,10 @@ class GalilController(Controller):
Args:
axis_Id_numeric (int): Axis number
"""
time.sleep(0.1)
self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_and_receive("XQ#NEWPAR")
time.sleep(0.1)
self.socket_put_confirmed("XQ#FRM")
time.sleep(0.1)
while self.is_axis_moving(None, axis_Id_numeric):
@@ -241,12 +246,42 @@ class GalilController(Controller):
print(t)
self.show_running_threads()
self.show_status_other()
def show_status_other(self):
#Todo: move to lgalil specific section
if(self.get_digital_input(5)):
print("Emergency stop is not pushed.")
else:
print("Emergency stop is pushed.")
if(self.get_digital_input(6)):
print("Driver axis 2 error.")
if(self.get_digital_input(13)):
print("No air pressure at inner rotation.")
else:
print("There is air pressure at the inner rotation.")
if(self.get_digital_input(14)):
print("No air pressure at outer rotation axial.")
else:
print("There is air pressure at the outer rotation axial.")
if(self.get_digital_input(15)):
print("No air pressure at outer rotation radial.")
else:
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
def galil_show_all(self) -> None:
for controller in self._controller_instances.values():
if isinstance(controller, GalilController):
controller.describe()
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
def lamni_lights_on(self):
self.socket_put_confirmed("CB1")
@staticmethod
def axis_Id_to_numeric(axis_Id: str) -> int:
return ord(axis_Id.lower()) - 97
@@ -334,12 +369,16 @@ class GalilSetpointSignal(GalilSignalBase):
time.sleep(0.1)
if self.parent.axis_Id_numeric == 2:
angle_status = self.parent.device_manager.devices[
self.parent.rt
].obj.controller.feedback_status_angle_lamni()
if angle_status:
self.controller.socket_put_confirmed("angintf=1")
try:
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
angle_status = self.parent.device_manager.devices[
self.parent.rt
].obj.controller.feedback_status_angle_lamni()
if angle_status:
self.controller.socket_put_confirmed("angintf=1")
except KeyError:
logger.warning("RT is disabled. Failed to update RT angle interferometer status to galil.")
self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}")
self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}")

View File

@@ -77,7 +77,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGA\r",
b"MGbcklact[axis]\r",
b"MGbcklact[0]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRA, _LFA\r",
@@ -93,7 +93,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGB\r",
b"MGbcklact[axis]\r",
b"MGbcklact[1]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRB, _LFB\r",
@@ -119,7 +119,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGA\r",
b"MGbcklact[axis]\r",
b"MGbcklact[0]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[0]\r",
@@ -133,7 +133,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGB\r",
b"MGbcklact[axis]\r",
b"MGbcklact[1]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[1]\r",

View File

@@ -172,7 +172,11 @@ def test_on_mca_data(mock_det, values, expected_nothing):
[
(
{"scan_id": 123},
{"mca1": [100, 120, 140], "mca3": [200, 220, 240], "mca4": [300, 320, 340]},
{
"mca1": {"value": [100, 120, 140]},
"mca3": {"value": [200, 220, 240]},
"mca4": {"value": [300, 320, 340]},
},
)
],
)