Compare commits

..

1 Commits

Author SHA1 Message Date
0849d093f8 ci: fixed security detection job 2024-04-25 15:58:05 +02:00
7 changed files with 174 additions and 352 deletions

View File

@@ -1,7 +1,105 @@
include: # This file is a template, and might need editing before it works on your project.
- project: bec/awi_utils # Official language image. Look for the different tagged releases at:
file: /templates/plugin-repo-template.yml # https://hub.docker.com/r/library/python/tags/
inputs: image: $CI_DEPENDENCY_PROXY_GROUP_IMAGE_PREFIX/python:3.10
name: "csaxs"
target: "csaxs_bec"
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
- if: $CI_COMMIT_BRANCH && $CI_OPEN_MERGE_REQUESTS
when: never
- if: $CI_COMMIT_BRANCH
include:
- template: Jobs/Secret-Detection.gitlab-ci.yml
#commands to run in the Docker container before starting each job.
before_script:
- pip install -e .[dev]
# different stages in the pipeline
stages:
- Formatter
- test # must be called test for security/secret-detection to work
- AdditionalTests
- Deploy
pylint:
stage: Formatter
script:
- pip install pylint pylint-exit anybadge
- pip install -e .[dev]
- mkdir ./pylint
- pylint ./csaxs_bec --output-format=text --output=./pylint/pylint.log | tee ./pylint/pylint.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint.log)
- anybadge --label=Pylint --file=pylint/pylint.svg --value=$PYLINT_SCORE 2=red 4=orange 8=yellow 10=green
- echo "Pylint score is $PYLINT_SCORE"
artifacts:
paths:
- ./pylint/
expire_in: 1 week
pylint-check:
stage: Formatter
needs: []
allow_failure: true
before_script:
- pip install pylint pylint-exit anybadge
- apt-get update
- apt-get install -y bc
script:
# Identify changed Python files
- if [ "$CI_PIPELINE_SOURCE" == "merge_request_event" ]; then
TARGET_BRANCH_COMMIT_SHA=$(git rev-parse $CI_MERGE_REQUEST_TARGET_BRANCH_NAME);
CHANGED_FILES=$(git diff --name-only $SOURCE_BRANCH_COMMIT_SHA $TARGET_BRANCH_COMMIT_SHA | grep '\.py$' || true);
else
CHANGED_FILES=$(git diff --name-only $CI_COMMIT_BEFORE_SHA $CI_COMMIT_SHA | grep '\.py$' || true);
fi
- if [ -z "$CHANGED_FILES" ]; then echo "No Python files changed."; exit 0; fi
# Run pylint only on changed files
- mkdir ./pylint
- pylint $CHANGED_FILES --output-format=text . | tee ./pylint/pylint_changed_files.log || pylint-exit $?
- PYLINT_SCORE=$(sed -n 's/^Your code has been rated at \([-0-9.]*\)\/.*/\1/p' ./pylint/pylint_changed_files.log)
- echo "Pylint score is $PYLINT_SCORE"
# Fail the job if the pylint score is below 9
- if [ "$(echo "$PYLINT_SCORE < 9" | bc)" -eq 1 ]; then echo "Your pylint score is below the acceptable threshold (9)."; exit 1; fi
artifacts:
paths:
- ./pylint/
expire_in: 1 week
secret_detection:
before_script:
- ''
config_test:
stage: test
script:
- ophyd_test --config ./csaxs_bec/device_configs/ --output ./config_tests
artifacts:
paths:
- ./config_tests
when: on_failure
expire_in: "30 days"
allow_failure: true
pytest:
stage: test
script:
- pip install coverage
- coverage run --source=./csaxs_bec -m pytest -v --junitxml=report.xml --random-order --full-trace ./tests
- coverage report
- coverage xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml

View File

@@ -13,166 +13,6 @@ umv = builtins.__dict__.get("umv")
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
class LamNIInitError(Exception):
pass
class LaMNIInitStagesMixin:
def lamni_init_stages(self):
user_input = input("Starting initialization of LamNI stages. OK? [y/n]")
if user_input == "y":
print("staring...")
dev.lsamrot.enabled=True
else:
return
if self.check_all_axes_of_lamni_referenced():
user_input = input("Continue anyways? [y/n]")
if user_input == "y":
print("ok then...")
else:
return
axis_id_lsamrot = dev.lsamrot._config["deviceConfig"].get("axis_Id")
if dev.lsamrot.controller.get_motor_limit_switch(axis_id_lsamrot)[1] == False:
user_input = input("The rotation stage will be moved to one limit [y/n]")
if user_input == "y":
print("starting...")
else:
return
self.drive_axis_to_limit(dev.lsamrot, "forward")
dev.lsamrot.enabled=False
print("Now hard reboot the controller and run the initialization routine again.")
print("The controller will be disabled in bec. To enable dev.lsamrot.enabled=True")
return
user_input = input(
"Init of loptz. Can the stage move to the upstream limit without collision?? [y/n]"
)
if user_input == "y":
print("ok then...")
else:
return
print("Referencing loptz")
self.drive_axis_to_limit(dev.loptz, "forward")
self.find_reference_mark(dev.loptz)
print("Referencing loptx")
self.drive_axis_to_limit(dev.loptx, "reverse")
self.find_reference_mark(dev.loptx)
print("Referencing lopty")
self.drive_axis_to_limit(dev.lopty, "forward")
self.find_reference_mark(dev.lopty)
print("Referencing lsamx")
self.drive_axis_to_limit(dev.lsamx, "forward")
self.find_reference_mark(dev.lsamx)
print("Referencing lsamy")
self.drive_axis_to_limit(dev.lsamy, "reverse")
self.find_reference_mark(dev.lsamy)
# the dual encoder requires the reference mark to pass on both encoders
print("Referencing lsamrot")
self.drive_axis_to_limit(dev.lsamrot, "reverse")
time.sleep(0.1)
self.find_reference_mark(dev.lsamrot)
user_input = input("Init of leye. Can the stage move to -x limit without collision? [y/n]")
if user_input == "y":
print("starting...")
else:
return
print("Referencing leyex")
self.drive_axis_to_limit(dev.leyex, "forward")
print("Referencing leyey")
self.drive_axis_to_limit(dev.leyey, "forward")
# set_lm lsamx 6 14
# set_lm lsamy 6 14
# set_lm lsamrot -3 362
# set_lm loptx -1 -0.2
# set_lm lopty 3.0 3.6
# set_lm loptz 82 87
# set_lm leyex 0 25
# set_lm leyey 0.5 50
print("Init of Smaract stages")
dev.losax.controller.find_reference_mark(2, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(0, 0, 1000, 1)
time.sleep(1)
dev.losax.controller.find_reference_mark(1, 0, 1000, 1)
time.sleep(1)
#dev.losax.controller.find_reference_mark(3, 1, 1000, 1)
#time.sleep(1)
#dev.losax.controller.find_reference_mark(4, 1, 1000, 1)
#time.sleep(1)
# set_lm losax -1.5 0.25
# set_lm losay -2.5 4.1
# set_lm losaz -4.1 -0.5
# set_lm lcsy -1.5 5
self._align_setup()
def find_reference_mark(self, device):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.find_reference(axis_id_numeric)
def drive_axis_to_limit(self, device, direction):
axis_id = device._config["deviceConfig"].get("axis_Id")
axis_id_numeric = self.axis_id_to_numeric(axis_id)
device.controller.drive_axis_to_limit(axis_id_numeric, direction)
def axis_id_to_numeric(self, axis_id) -> int:
return ord(axis_id.lower()) - 97
def _align_setup(self):
user_input = input("Start moving stages to default initial positions? [y/n]")
if user_input == "y":
print("Start moving stages...")
else:
print("Stopping.")
return
lsamx_center = dev.lsamx.user_parameter.get("center")
if lsamx_center is None:
raise LamNIInitError(
"Could not find a lsamx center position. Please check your device config."
)
lsamy_center = dev.lsamy.user_parameter.get("center")
if lsamy_center is None:
raise LamNIInitError(
"Could not find a lsamy center position. Please check your device config."
)
umv(dev.lsamx, lsamx_center, dev.lsamy, lsamy_center, dev.loptx, -0.3, dev.lopty, 0)
umv(dev.losax, -1)
umv(dev.loptz, 82.25)
umv(dev.lsamrot, -1)
umv(dev.lsamrot, 0)
time.sleep(2)
dev.rtx.controller.feedback_disable_and_even_reset_lamni_angle_interferometer()
def check_all_axes_of_lamni_referenced(self):
if (
dev.losax.controller.axis_is_referenced(0)
& dev.losax.controller.axis_is_referenced(1)
& dev.losax.controller.axis_is_referenced(2)
& dev.lsamx.controller.all_axes_referenced()
):
print("All axes of LamNI are referenced.")
return True
else:
return False
class LamNIOpticsMixin: class LamNIOpticsMixin:
@staticmethod @staticmethod
def _get_user_param_safe(device, var): def _get_user_param_safe(device, var):

View File

@@ -17,7 +17,6 @@ from typeguard import typechecked
from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen from csaxs_bec.bec_ipython_client.plugins.cSAXS import epics_get, epics_put, fshopen
from .lamni_optics_mixin import LamNIOpticsMixin from .lamni_optics_mixin import LamNIOpticsMixin
from .lamni_optics_mixin import LaMNIInitStagesMixin
logger = bec_logger.logger logger = bec_logger.logger
bec = builtins.__dict__.get("bec") bec = builtins.__dict__.get("bec")
@@ -33,7 +32,7 @@ class XrayEyeAlign:
self.lamni = lamni self.lamni = lamni
self.device_manager = client.device_manager self.device_manager = client.device_manager
self.scans = client.scans self.scans = client.scans
# self.xeye = self.device_manager.devices.xeye self.xeye = self.device_manager.devices.xeye
self.alignment_values = defaultdict(list) self.alignment_values = defaultdict(list)
self._reset_init_values() self._reset_init_values()
self.corr_pos_x = [] self.corr_pos_x = []
@@ -512,7 +511,7 @@ class LamNI(LamNIOpticsMixin):
super().__init__() super().__init__()
self.client = client self.client = client
self.align = XrayEyeAlign(client, self) self.align = XrayEyeAlign(client, self)
self.init = LaMNIInitStagesMixin()
self.check_shutter = True self.check_shutter = True
self.check_light_available = True self.check_light_available = True
self.check_fofb = True self.check_fofb = True
@@ -525,7 +524,6 @@ class LamNI(LamNIOpticsMixin):
self._beam_is_okay = True self._beam_is_okay = True
self._stop_beam_check_event = None self._stop_beam_check_event = None
self.beam_check_thread = None self.beam_check_thread = None
self.progress = {}
def get_beamline_checks_enabled(self): def get_beamline_checks_enabled(self):
print( print(
@@ -588,27 +586,6 @@ class LamNI(LamNIOpticsMixin):
def tomo_circfov(self, val: float): def tomo_circfov(self, val: float):
self.client.set_global_var("tomo_circfov", val) self.client.set_global_var("tomo_circfov", val)
@property
def tomo_type(self):
val = self.client.get_global_var("tomo_type")
if val is None:
return 1
return val
@tomo_type.setter
def tomo_type(self, val: float):
if val == 1:
# equally spaced tomography with 8 sub tomograms
self.client.set_global_var("tomo_type", val)
#elif val == 2:
# # golden ratio tomography (sorted bunches)
# self.client.set_global_var("tomo_type", val)
#elif val == 3:
# # equally spaced tomography with starting angles shifted by golden ratio
# self.client.set_global_var("tomo_type", val)
else:
raise ValueError("Unknown tomo_type.")
@property @property
def tomo_countingtime(self): def tomo_countingtime(self):
val = self.client.get_global_var("tomo_countingtime") val = self.client.get_global_var("tomo_countingtime")
@@ -971,73 +948,47 @@ class LamNI(LamNIOpticsMixin):
# _tomo_shift_angles (potential global variable) # _tomo_shift_angles (potential global variable)
_tomo_shift_angles = 0 _tomo_shift_angles = 0
angle_end = start_angle + 360 angle_end = start_angle + 360
angles = np.linspace( for angle in np.linspace(
start_angle + _tomo_shift_angles, start_angle + _tomo_shift_angles,
angle_end, angle_end,
num=int(360 / self.tomo_angle_stepsize) + 1, num=int(360 / self.tomo_angle_stepsize) + 1,
endpoint=True, endpoint=True,
) ):
# reverse even sub-tomograms successful = False
if not (subtomo_number % 2): error_caught = False
angles = np.flip(angles) if 0 <= angle < 360.05:
for angle in angles: print(f"Starting LamNI scan for angle {angle}")
self.progress["subtomo"] = subtomo_number while not successful:
self.progress["subtomo_projection"] = angles.index(angle) self._start_beam_check()
self.progress["subtomo_total_projections"] = 180 / self.tomo_angle_stepsize if not self.special_angles:
self.progress["projection"] = (subtomo_number - 1) * self.progress[ self._current_special_angles = []
"subtomo_total_projections" if self._current_special_angles:
] + self.progress["subtomo_projection"] next_special_angle = self._current_special_angles[0]
self.progress["total_projections"] = 180 / self.tomo_angle_stepsize * 8 if np.isclose(angle, next_special_angle, atol=0.5):
self.progress["angle"] = angle self._current_special_angles.pop(0)
self._tomo_scan_at_angle(angle, subtomo_number) num_repeats = self.special_angle_repeats
def _print_progress(self):
print("\x1b[95mProgress report:")
print(f"Tomo type: ....................... {self.progress['tomo_type']}")
print(f"Projection: ...................... {self.progress['projection']}")
print(f"Total projections expected ....... {self.progress['total_projections']}")
print(f"Angle: ........................... {self.progress['angle']}")
print(f"Current subtomo: ................. {self.progress['subtomo']}")
print(f"Current projection within subtomo: {self.progress['subtomo_projection']}\x1b[0m")
def _tomo_scan_at_angle(self, angle, subtomo_number):
successful = False
error_caught = False
if 0 <= angle < 360.05:
print(f"Starting LamNI scan for angle {angle} in subtomo {subtomo_number}")
self._print_progress()
while not successful:
self._start_beam_check()
if not self.special_angles:
self._current_special_angles = []
if self._current_special_angles:
next_special_angle = self._current_special_angles[0]
if np.isclose(angle, next_special_angle, atol=0.5):
self._current_special_angles.pop(0)
num_repeats = self.special_angle_repeats
else:
num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else: else:
raise exc num_repeats = 1
try:
start_scan_number = bec.queue.next_scan_number
for i in range(num_repeats):
self._at_each_angle(angle)
error_caught = False
except AlarmBase as exc:
if exc.alarm_type == "TimeoutError":
bec.queue.request_queue_reset()
time.sleep(2)
error_caught = True
else:
raise exc
if self._was_beam_okay() and not error_caught: if self._was_beam_okay() and not error_caught:
successful = True successful = True
else: else:
self._wait_for_beamline_checks() self._wait_for_beamline_checks()
end_scan_number = bec.queue.next_scan_number end_scan_number = bec.queue.next_scan_number
for scan_nr in range(start_scan_number, end_scan_number): for scan_nr in range(start_scan_number, end_scan_number):
self._write_tomo_scan_number(scan_nr, angle, subtomo_number) self._write_tomo_scan_number(scan_nr, angle, subtomo_number)
def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None: def _write_tomo_scan_number(self, scan_number: int, angle: float, subtomo_number: int) -> None:
tomo_scan_numbers_file = os.path.expanduser( tomo_scan_numbers_file = os.path.expanduser(
@@ -1055,29 +1006,22 @@ class LamNI(LamNIOpticsMixin):
scans = builtins.__dict__.get("scans") scans = builtins.__dict__.get("scans")
self._current_special_angles = self.special_angles.copy() self._current_special_angles = self.special_angles.copy()
if (self.tomo_type == 1 and subtomo_start == 1 and start_angle is None): if subtomo_start == 1 and start_angle is None:
# pylint: disable=undefined-variable # pylint: disable=undefined-variable
if bec.active_account != "": self.tomo_id = self.add_sample_database(
self.tomo_id = self.add_sample_database( self.sample_name,
self.sample_name, str(datetime.date.today()),
str(datetime.date.today()), bec.active_account.decode(),
bec.active_account.decode(), bec.queue.next_scan_number,
bec.queue.next_scan_number, "lamni",
"lamni", "test additional info",
"test additional info", "BEC",
"BEC", )
) self.write_pdf_report()
self.write_pdf_report()
else:
self.tomo_id = 0
with scans.dataset_id_on_hold: with scans.dataset_id_on_hold:
if self.tomo_type == 1: for ii in range(subtomo_start, 9):
# 8 equally spaced sub-tomograms self.sub_tomo_scan(ii, start_angle=start_angle)
self.progress["tomo_type"] = "Equally spaced sub-tomograms" start_angle = None
for ii in range(subtomo_start, 9):
self.sub_tomo_scan(ii, start_angle=start_angle)
start_angle = None
def tomo_parameters(self): def tomo_parameters(self):
"""print and update the tomo parameters""" """print and update the tomo parameters"""
@@ -1100,10 +1044,8 @@ class LamNI(LamNIOpticsMixin):
print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}") print(f" _tomo_fovy_offset <mm> = {self.align.tomo_fovy_offset}")
print(f" _manual_shift_x <mm> = {self.manual_shift_x}") print(f" _manual_shift_x <mm> = {self.manual_shift_x}")
print(f" _manual_shift_y <mm> = {self.manual_shift_y}") print(f" _manual_shift_y <mm> = {self.manual_shift_y}")
if self.tomo_type == 1: print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print("\x1b[1mTomo type 1:\x1b[0m 8 equally spaced sub-tomograms") print(f"Resulting in number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Total number of projections: {360/self.tomo_angle_stepsize*8}")
print(f"Angular step within sub-tomogram: {self.tomo_angle_stepsize} degrees")
print(f"Sample name: {self.sample_name}\n") print(f"Sample name: {self.sample_name}\n")
user_input = input("Are these parameters correctly set for your scan? ") user_input = input("Are these parameters correctly set for your scan? ")

View File

@@ -4,7 +4,7 @@ from ophyd import Component as Cpt
from ophyd import Device from ophyd import Device
from ophyd import DynamicDeviceComponent as Dcpt from ophyd import DynamicDeviceComponent as Dcpt
from ophyd import EpicsSignal from ophyd import EpicsSignal
from prettytable import PrettyTable, FRAME from prettytable import FRAME, PrettyTable
class OMNYSampleStorageError(Exception): class OMNYSampleStorageError(Exception):
@@ -28,7 +28,6 @@ class OMNYSampleStorage(Device):
"unset_shuttle_slot", "unset_shuttle_slot",
"get_shuttle_name_slot", "get_shuttle_name_slot",
"is_shuttle_slot_used", "is_shuttle_slot_used",
"search_shuttle_in_slot",
"show_all", "show_all",
] ]
SUB_VALUE = "value" SUB_VALUE = "value"
@@ -60,7 +59,7 @@ class OMNYSampleStorage(Device):
parking_placed = Dcpt(parking_placed) parking_placed = Dcpt(parking_placed)
sample_placed = { sample_placed = {
f"sample{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {}) f"parking{i}": (EpicsSignal, f"XOMNY-SAMPLE_DB_omny:{i}", {})
for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101] for i in [10, 11, 12, 13, 14, 32, 33, 34, 100, 101]
} }
sample_placed = Dcpt(sample_placed) sample_placed = Dcpt(sample_placed)
@@ -228,25 +227,11 @@ class OMNYSampleStorage(Device):
val = self.sample_in_samplestage_name.get() val = self.sample_in_samplestage_name.get()
return str(val) return str(val)
def search_shuttle_in_slot(self, shuttle: str) -> int:
returnvalue = 0
for i in range(1, 7):
if self.get_shuttle_name_slot(i) == shuttle:
returnvalue = i
return returnvalue
def show_all(self): def show_all(self):
t = PrettyTable() t = PrettyTable()
red = "\x1b[91m"
green = "\x1b[92m"
white = "\x1b[0m"
for ch in ["A", "B", "C"]: for ch in ["A", "B", "C"]:
t.clear() t.clear()
shuttle_in_slot = self.search_shuttle_in_slot(ch) t.title = "Shuttle " + ch
if shuttle_in_slot > 0:
t.title = green + "Shuttle " + ch + " in OMNY slot " + str(shuttle_in_slot) + white
else:
t.title = red + "Shuttle " + ch + white
field_names = [""] field_names = [""]
for ax in [1, 3, 5]: for ax in [1, 3, 5]:
row = [] row = []

View File

@@ -94,7 +94,7 @@ class GalilController(Controller):
if axis_Id is None and axis_Id_numeric is not None: if axis_Id is None and axis_Id_numeric is not None:
axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric) axis_Id = self.axis_Id_numeric_to_alpha(axis_Id_numeric)
is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0) is_moving = bool(float(self.socket_put_and_receive(f"MG_BG{axis_Id}")) != 0)
backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[{axis_Id_numeric}]")) != 0) backlash_is_active = bool(float(self.socket_put_and_receive(f"MGbcklact[axis]")) != 0)
return bool( return bool(
is_moving or backlash_is_active or self.is_thread_active(0) or self.is_thread_active(2) is_moving or backlash_is_active or self.is_thread_active(0) or self.is_thread_active(2)
) )
@@ -119,9 +119,6 @@ class GalilController(Controller):
air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]")) air_off = bool(self.socket_put_and_receive(f"MG@OUT[13]"))
return rt_not_blocked_by_galil and air_off return rt_not_blocked_by_galil and air_off
def get_digital_input(self,channel):
return bool(float(self.socket_put_and_receive(f"MG @IN[{channel}]").strip()))
def axis_is_referenced(self, axis_Id_numeric) -> bool: def axis_is_referenced(self, axis_Id_numeric) -> bool:
return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip())) return bool(float(self.socket_put_and_receive(f"MG axisref[{axis_Id_numeric}]").strip()))
@@ -149,9 +146,9 @@ class GalilController(Controller):
self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_confirmed(f"ndir={direction_flag}") self.socket_put_confirmed(f"ndir={direction_flag}")
self.socket_put_confirmed("XQ#NEWPAR") self.socket_put_confirmed("XQ#NEWPAR")
time.sleep(0.1) time.sleep(0.005)
self.socket_put_confirmed("XQ#FES") self.socket_put_confirmed("XQ#FES")
time.sleep(0.1) time.sleep(0.01)
while self.is_axis_moving(None, axis_Id_numeric): while self.is_axis_moving(None, axis_Id_numeric):
time.sleep(0.01) time.sleep(0.01)
@@ -172,10 +169,8 @@ class GalilController(Controller):
Args: Args:
axis_Id_numeric (int): Axis number axis_Id_numeric (int): Axis number
""" """
time.sleep(0.1)
self.socket_put_confirmed(f"naxis={axis_Id_numeric}") self.socket_put_confirmed(f"naxis={axis_Id_numeric}")
self.socket_put_and_receive("XQ#NEWPAR") self.socket_put_and_receive("XQ#NEWPAR")
time.sleep(0.1)
self.socket_put_confirmed("XQ#FRM") self.socket_put_confirmed("XQ#FRM")
time.sleep(0.1) time.sleep(0.1)
while self.is_axis_moving(None, axis_Id_numeric): while self.is_axis_moving(None, axis_Id_numeric):
@@ -246,42 +241,12 @@ class GalilController(Controller):
print(t) print(t)
self.show_running_threads() self.show_running_threads()
self.show_status_other()
def show_status_other(self):
#Todo: move to lgalil specific section
if(self.get_digital_input(5)):
print("Emergency stop is not pushed.")
else:
print("Emergency stop is pushed.")
if(self.get_digital_input(6)):
print("Driver axis 2 error.")
if(self.get_digital_input(13)):
print("No air pressure at inner rotation.")
else:
print("There is air pressure at the inner rotation.")
if(self.get_digital_input(14)):
print("No air pressure at outer rotation axial.")
else:
print("There is air pressure at the outer rotation axial.")
if(self.get_digital_input(15)):
print("No air pressure at outer rotation radial.")
else:
print("There is air pressure at the outer rotation radial.")
swver = float(self.socket_put_and_receive("MGswver"))
print(f"Lgalil LAMNI firmware version {swver:2.0f}.")
def galil_show_all(self) -> None: def galil_show_all(self) -> None:
for controller in self._controller_instances.values(): for controller in self._controller_instances.values():
if isinstance(controller, GalilController): if isinstance(controller, GalilController):
controller.describe() controller.describe()
def lamni_lights_off(self):
self.socket_put_confirmed("SB1")
def lamni_lights_on(self):
self.socket_put_confirmed("CB1")
@staticmethod @staticmethod
def axis_Id_to_numeric(axis_Id: str) -> int: def axis_Id_to_numeric(axis_Id: str) -> int:
return ord(axis_Id.lower()) - 97 return ord(axis_Id.lower()) - 97
@@ -369,16 +334,12 @@ class GalilSetpointSignal(GalilSignalBase):
time.sleep(0.1) time.sleep(0.1)
if self.parent.axis_Id_numeric == 2: if self.parent.axis_Id_numeric == 2:
try: angle_status = self.parent.device_manager.devices[
rt = self.parent.device_manager.devices[self.parent.rt] self.parent.rt
if rt.enabled: ].obj.controller.feedback_status_angle_lamni()
angle_status = self.parent.device_manager.devices[
self.parent.rt if angle_status:
].obj.controller.feedback_status_angle_lamni() self.controller.socket_put_confirmed("angintf=1")
if angle_status:
self.controller.socket_put_confirmed("angintf=1")
except KeyError:
logger.warning("RT is disabled. Failed to update RT angle interferometer status to galil.")
self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}") self.controller.socket_put_confirmed(f"naxis={self.parent.axis_Id_numeric}")
self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}") self.controller.socket_put_confirmed(f"ntarget={target_val:.3f}")

View File

@@ -77,7 +77,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r", b"XQ#NEWPAR\r",
b"XQ#FES\r", b"XQ#FES\r",
b"MG_BGA\r", b"MG_BGA\r",
b"MGbcklact[0]\r", b"MGbcklact[axis]\r",
b"MG_XQ0\r", b"MG_XQ0\r",
b"MG_XQ2\r", b"MG_XQ2\r",
b"MG _LRA, _LFA\r", b"MG _LRA, _LFA\r",
@@ -93,7 +93,7 @@ def test_axis_put(leyey, target_pos, socket_put_messages, socket_get_messages):
b"XQ#NEWPAR\r", b"XQ#NEWPAR\r",
b"XQ#FES\r", b"XQ#FES\r",
b"MG_BGB\r", b"MG_BGB\r",
b"MGbcklact[1]\r", b"MGbcklact[axis]\r",
b"MG_XQ0\r", b"MG_XQ0\r",
b"MG_XQ2\r", b"MG_XQ2\r",
b"MG _LRB, _LFB\r", b"MG _LRB, _LFB\r",
@@ -119,7 +119,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r", b"XQ#NEWPAR\r",
b"XQ#FRM\r", b"XQ#FRM\r",
b"MG_BGA\r", b"MG_BGA\r",
b"MGbcklact[0]\r", b"MGbcklact[axis]\r",
b"MG_XQ0\r", b"MG_XQ0\r",
b"MG_XQ2\r", b"MG_XQ2\r",
b"MG axisref[0]\r", b"MG axisref[0]\r",
@@ -133,7 +133,7 @@ def test_drive_axis_to_limit(leyex, axis_nr, direction, socket_put_messages, soc
b"XQ#NEWPAR\r", b"XQ#NEWPAR\r",
b"XQ#FRM\r", b"XQ#FRM\r",
b"MG_BGB\r", b"MG_BGB\r",
b"MGbcklact[1]\r", b"MGbcklact[axis]\r",
b"MG_XQ0\r", b"MG_XQ0\r",
b"MG_XQ2\r", b"MG_XQ2\r",
b"MG axisref[1]\r", b"MG axisref[1]\r",

View File

@@ -172,11 +172,7 @@ def test_on_mca_data(mock_det, values, expected_nothing):
[ [
( (
{"scan_id": 123}, {"scan_id": 123},
{ {"mca1": [100, 120, 140], "mca3": [200, 220, 240], "mca4": [300, 320, 340]},
"mca1": {"value": [100, 120, 140]},
"mca3": {"value": [200, 220, 240]},
"mca4": {"value": [300, 320, 340]},
},
) )
], ],
) )