Compare commits

..

1 Commits

Author SHA1 Message Date
0849d093f8 ci: fixed security detection job 2024-04-25 15:58:05 +02:00
7 changed files with 174 additions and 352 deletions

View File

@@ -1,7 +1,105 @@
include:
- project: bec/awi_utils
file: /templates/plugin-repo-template.yml
inputs:
name: "csaxs"
target: "csaxs_bec"
# This file is a template, and might need editing before it works on your project.
# Official language image. Look for the different tagged releases at:
# https://hub.docker.com/r/library/python/tags/
image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.10
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS
when: never
- if: $CI_COMMIT_BRANCH
include:
- template: Jobs/Secret-Detection.gitlab-ci.yml
#commands to run in the Docker container before starting each job.
before_script:
- pip install -e .[dev]
# different stages in the pipeline
stages:
- Formatter
- test # must be called test for security/secret-detection to work
- AdditionalTests
- Deploy
pylint:
stage: Formatter
script:
- pip install pylint pylint-exit anybadge
- pip install -e .[dev]
- mkdir ./pylint
- pylint ./csaxs_bec --output-format=text --output=./pylint/pylint.log | tee ./pylint/pylint.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
- anybadge --label=Pylint --file=pylint/pylint.svg --value=$PYLINT_SCORE 2=red 4=orange 8=yellow 10=green
- echo "Pylint score is $PYLINT_SCORE"
artifacts:
paths:
- ./pylint/
expire_in: 1 week
pylint-check:
stage: Formatter
needs: []
allow_failure: true
before_script:
- pip install pylint pylint-exit anybadge
- apt-get update
- apt-get install -y bc
script:
# Identify changed Python files
- if [ "$CI_PIPELINE_SOURCE" == "merge_request_event" ]; then
TARGET_BRANCH_COMMIT_SHA=$(git rev-parse $CI_MERGE_REQUEST_TARGET_BRANCH_NAME);
CHANGED_FILES=$(git diff --name-only $SOURCE_BRANCH_COMMIT_SHA $TARGET_BRANCH_COMMIT_SHA | grep '\.py$' || true);
else
CHANGED_FILES=$(git diff --name-only $CI_COMMIT_BEFORE_SHA $CI_COMMIT_SHA | grep '\.py$' || true);
fi
- if [ -z "$CHANGED_FILES" ]; then echo "No Python files changed."; exit 0; fi
# Run pylint only on changed files
- mkdir ./pylint
- pylint $CHANGED_FILES --output-format=text . | tee ./pylint/pylint_changed_files.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint_changed_files.log)
- echo "Pylint score is $PYLINT_SCORE"
# Fail the job if the pylint score is below 9
- if [ "$(echo "$PYLINT_SCORE < 9" | bc)" -eq 1 ]; then echo "Your pylint score is below the acceptable threshold (9)."; exit 1; fi
artifacts:
paths:
- ./pylint/
expire_in: 1 week
secret_detection:
before_script:
- ''
config_test:
stage: test
script:
- ophyd_test --config ./csaxs_bec/device_configs/ --output ./config_tests
artifacts:
paths:
- ./config_tests
when: on_failure
expire_in: "30 days"
allow_failure: true
pytest:
stage: test
script:
- pip install coverage
- coverage run --source=./csaxs_bec -m pytest -v --junitxml=report.xml --random-order --full-trace ./tests
- coverage report
- coverage xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -13,166 +13,6 @@ umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec")
class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
dev.lsamrot.enabled=True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
print("starting...")
else:
return
self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled=False
print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
print("ok then...")
else:
return
print("Referencing loptz")
self.drive_axis_to_limit(dev.loptz, "forward")
self.find_reference_mark(dev.loptz)
print("Referencing loptx")
self.drive_axis_to_limit(dev.loptx, "reverse")
self.find_reference_mark(dev.loptx)
print("Referencing lopty")
self.drive_axis_to_limit(dev.lopty, "forward")
self.find_reference_mark(dev.lopty)
print("Referencing lsamx")
self.drive_axis_to_limit(dev.lsamx, "forward")
self.find_reference_mark(dev.lsamx)
print("Referencing lsamy")
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
print("starting...")
else:
return
print("Referencing leyex")
self.drive_axis_to_limit(dev.leyex, "forward")
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(0, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
#dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
#time.sleep(1)
#dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
#time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
def find_reference_mark(self, device):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.find_reference(axis_id_numeric)
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.drive_axis_to_limit(axis_id_numeric, direction)
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
print("Start moving stages...")
else:
print("Stopping.")
return
lsamx_center = dev.lsamx.user_parameter.get("center")
if lsamx_center is None:
raise LamNIInitError(
"Could not find a lsamx center position. Please check your device config."
)
lsamy_center = dev.lsamy.user_parameter.get("center")
if lsamy_center is None:
raise LamNIInitError(
"Could not find a lsamy center position. Please check your device config."
)
umv(dev.lsamx, lsamx_center, dev.lsamy, lsamy_center, dev.loptx, -0.3, dev.lopty, 0)
umv(dev.losax, -1)
umv(dev.loptz, 82.25)
umv(dev.lsamrot, -1)
umv(dev.lsamrot, 0)
time.sleep(2)
dev.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
def check_all_axes_of_lamni_referenced(self):
if (
dev.losax.controller.axis_is_referenced(0)
& dev.losax.controller.axis_is_referenced(1)
& dev.losax.controller.axis_is_referenced(2)
& dev.lsamx.controller.all_axes_referenced()
):
print("All axes of LamNI are referenced.")
return True
else:
return False
class LamNIOpticsMixin:
@staticmethod
def _get_user_param_safe(device, var):

View File

@@ -17,7 +17,6 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from .lamni_optics_mixin import LamNIOpticsMixin
from .lamni_optics_mixin import LaMNIInitStagesMixin
logger = bec_logger.logger
bec = builtins.__dict__.get("bec")
@@ -33,7 +32,7 @@ class XrayEyeAlign:
self.lamni = lamni
self.device_manager = client.device_manager
self.scans = client.scans
# self.xeye = self.device_manager.devices.xeye
self.xeye = self.device_manager.devices.xeye
self.alignment_values = defaultdict(list)
self._reset_init_values()
self.corr_pos_x = []
@@ -512,7 +511,7 @@ class LamNI(LamNIOpticsMixin):
super().__init__()
self.client = client
self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStagesMixin()
self.check_shutter = True
self.check_light_available = True
self.check_fofb = True
@@ -525,7 +524,6 @@ class LamNI(LamNIOpticsMixin):
self._beam_is_okay = True
self._stop_beam_check_event = None
self.beam_check_thread = None
self.progress = {}
def get_beamline_checks_enabled(self):
print(
@@ -588,27 +586,6 @@ class LamNI(LamNIOpticsMixin):
def tomo_circfov(self, val: float):
self.client.set_global_var("tomo_circfov", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: float):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
#elif val == 2:
# # golden ratio tomography (sorted bunches)
# self.client.set_global_var("tomo_type", val)
#elif val == 3:
# # equally spaced tomography with starting angles shifted by golden ratio
# self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@property
def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime")
@@ -971,73 +948,47 @@ class LamNI(LamNIOpticsMixin):
# _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0
angle_end = start_angle + 360
angles = np.linspace(
for angle in np.linspace(
start_angle + _tomo_shift_angles,
angle_end,
num=int(360 / self.tomo_angle_stepsize) + 1,
endpoint=True,
)
# reverse even sub-tomograms
if not (subtomo_number % 2):
angles = np.flip(angles)
for angle in angles:
self.progress["subtomo"] = subtomo_number
self.progress["subtomo_projection"] = angles.index(angle)
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize
self.progress["projection"] = (subtomo_number - 1) * self.progress[
"subtomo_total_projections"
] + self.progress["subtomo_projection"]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8
self.progress["angle"] = angle
self._tomo_scan_at_angle(angle, subtomo_number)
def _print_progress(self):
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle}")
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
raise exc
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
if self._was_beam_okay() and not error_caught:
successful = True
else:
self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser(
@@ -1055,29 +1006,22 @@ class LamNI(LamNIOpticsMixin):
scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy()
if (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None):
if subtomo_start == 1 and start_angle is None:
# pylint: disable=undefined-variable
if bec.active_account != "":
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
else:
self.tomo_id = 0
self.tomo_id = self.add_sample_database(
self.sample_name,
str(datetime.date.today()),
bec.active_account.decode(),
bec.queue.next_scan_number,
"lamni",
"test additional info",
"BEC",
)
self.write_pdf_report()
with scans.dataset_id_on_hold:
if self.tomo_type == 1:
# 8 equally spaced sub-tomograms
self.progress["tomo_type"] = "Equally spaced sub-tomograms"
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
def tomo_parameters(self):
"""print and update the tomo parameters"""
@@ -1100,10 +1044,8 @@ class LamNI(LamNIOpticsMixin):
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
if self.tomo_type == 1:
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms")
print(f"Total number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Sample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ")

View File

@@ -4,7 +4,7 @@ from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal
from prettytable import PrettyTable, FRAME
from prettytable import FRAME, PrettyTable
class OMNYSampleStorageError(Exception):
@@ -28,7 +28,6 @@ class OMNYSampleStorage(Device):
"unset_shuttle_slot",
"get_shuttle_name_slot",
"is_shuttle_slot_used",
"search_shuttle_in_slot",
"show_all",
]
SUB_VALUE = "value"
@@ -60,7 +59,7 @@ class OMNYSampleStorage(Device):
parking_placed = Dcpt(parking_placed)
sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
}
sample_placed = Dcpt(sample_placed)
@@ -228,25 +227,11 @@ class OMNYSampleStorage(Device):
val = self.sample_in_samplestage_name.get()
return str(val)
def search_shuttle_in_slot(self, shuttle: str) -> int:
returnvalue = 0
for i in range(1, 7):
if self.get_shuttle_name_slot(i) == shuttle:
returnvalue = i
return returnvalue
def show_all(self):
t = PrettyTable()
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
for ch in ["A", "B", "C"]:
t.clear()
shuttle_in_slot = self.search_shuttle_in_slot(ch)
if shuttle_in_slot > 0:
t.title = green + "Shuttle " + ch + " in OMNY slot " + str(shuttle_in_slot) + white
else:
t.title = red + "Shuttle " + ch + white
t.title = "Shuttle " + ch
field_names = [""]
for ax in [1, 3, 5]:
row = []

View File

@@ -94,7 +94,7 @@ class GalilController(Controller):
if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0)
backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[{axis_Id_numeric}]")) != 0)
backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[axis]")) != 0)
return bool(
is_moving or backlash_is_active or self.is_thread_active(0) or self.is_thread_active(2)
)
@@ -118,9 +118,6 @@ class GalilController(Controller):
rt_not_blocked_by_galil = bool(self.socket_put_and_receive(f"MG@OUT[9]"))
air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]"))
return rt_not_blocked_by_galil and air_off
def get_digital_input(self,channel):
return bool(float(self.socket_put_and_receive(f"MG @IN[{channel}]").strip()))
def axis_is_referenced(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
@@ -149,9 +146,9 @@ class GalilController(Controller):
self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_confirmed(f"ndir={direction_flag}")
self.socket_put_confirmed("XQ#NEWPAR")
time.sleep(0.1)
time.sleep(0.005)
self.socket_put_confirmed("XQ#FES")
time.sleep(0.1)
time.sleep(0.01)
while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.01)
@@ -172,10 +169,8 @@ class GalilController(Controller):
Args:
axis_Id_numeric (int): Axis number
"""
time.sleep(0.1)
self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_and_receive("XQ#NEWPAR")
time.sleep(0.1)
self.socket_put_confirmed("XQ#FRM")
time.sleep(0.1)
while self.is_axis_moving(None, axis_Id_numeric):
@@ -246,42 +241,12 @@ class GalilController(Controller):
print(t)
self.show_running_threads()
self.show_status_other()
def show_status_other(self):
#Todo: move to lgalil specific section
if(self.get_digital_input(5)):
print("Emergency stop is not pushed.")
else:
print("Emergency stop is pushed.")
if(self.get_digital_input(6)):
print("Driver axis 2 error.")
if(self.get_digital_input(13)):
print("No air pressure at inner rotation.")
else:
print("There is air pressure at the inner rotation.")
if(self.get_digital_input(14)):
print("No air pressure at outer rotation axial.")
else:
print("There is air pressure at the outer rotation axial.")
if(self.get_digital_input(15)):
print("No air pressure at outer rotation radial.")
else:
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
def galil_show_all(self) -> None:
for controller in self._controller_instances.values():
if isinstance(controller, GalilController):
controller.describe()
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
def lamni_lights_on(self):
self.socket_put_confirmed("CB1")
@staticmethod
def axis_Id_to_numeric(axis_Id: str) -> int:
return ord(axis_Id.lower()) - 97
@@ -369,16 +334,12 @@ class GalilSetpointSignal(GalilSignalBase):
time.sleep(0.1)
if self.parent.axis_Id_numeric == 2:
try:
rt = self.parent.device_manager.devices[self.parent.rt]
if rt.enabled:
angle_status = self.parent.device_manager.devices[
self.parent.rt
].obj.controller.feedback_status_angle_lamni()
if angle_status:
self.controller.socket_put_confirmed("angintf=1")
except KeyError:
logger.warning("RT is disabled. Failed to update RT angle interferometer status to galil.")
angle_status = self.parent.device_manager.devices[
self.parent.rt
].obj.controller.feedback_status_angle_lamni()
if angle_status:
self.controller.socket_put_confirmed("angintf=1")
self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}")
self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}")

View File

@@ -77,7 +77,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGA\r",
b"MGbcklact[0]\r",
b"MGbcklact[axis]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRA, _LFA\r",
@@ -93,7 +93,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r",
b"XQ#FES\r",
b"MG_BGB\r",
b"MGbcklact[1]\r",
b"MGbcklact[axis]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG _LRB, _LFB\r",
@@ -119,7 +119,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGA\r",
b"MGbcklact[0]\r",
b"MGbcklact[axis]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[0]\r",
@@ -133,7 +133,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r",
b"XQ#FRM\r",
b"MG_BGB\r",
b"MGbcklact[1]\r",
b"MGbcklact[axis]\r",
b"MG_XQ0\r",
b"MG_XQ2\r",
b"MG axisref[1]\r",

View File

@@ -172,11 +172,7 @@ def test_on_mca_data(mock_det, values, expected_nothing):
[
(
{"scan_id": 123},
{
"mca1": {"value": [100, 120, 140]},
"mca3": {"value": [200, 220, 240]},
"mca4": {"value": [300, 320, 340]},
},
{"mca1": [100, 120, 140], "mca3": [200, 220, 240], "mca4": [300, 320, 340]},
)
],
)