Compare commits

...

14 Commits

Author SHA1 Message Date
b07ac52371 test: add tests for bpm and bpm_control
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
CI for csaxs_bec / test (pull_request) Successful in 1m58s
2026-03-27 20:07:44 +01:00
9557d98f30 fix(pseudo_devices): fix pseudo devices, bpm and bpm_control 2026-03-27 20:05:43 +01:00
x12sa
fecd4b84a4 feat: Add BPM and BPMControl pseudo devices 2026-03-27 20:05:42 +01:00
x12sa
b67e1c012c renamed rt flyer to rt positions
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m54s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 16:05:35 +01:00
x12sa
cbbec12d9b option to upload to a php interface
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 15:44:49 +01:00
x12sa
8f4a9f025e some adjustments
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 13:43:28 +01:00
x12sa
1b9b983ab2 wip optics config for energy device
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m56s
2026-03-27 12:48:51 +01:00
x12sa
d7b442969a added motors to endstation config 2026-03-27 12:48:51 +01:00
x12sa
f92db3f169 movable cards and more
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 3s
CI for csaxs_bec / test (push) Successful in 1m55s
2026-03-26 16:38:51 +01:00
x12sa
55531c8a65 next version 2026-03-26 16:38:51 +01:00
x12sa
1d408818cc next iteration, seems a first good and usable ver. 2026-03-26 16:38:51 +01:00
x12sa
ae2045dd10 fixes in contrast and audio confirm logif 2026-03-26 16:38:51 +01:00
x12sa
fd4d455a5b webpage version2 2026-03-26 16:38:51 +01:00
x12sa
3411aaaeb4 first version of webpage 2026-03-26 16:38:51 +01:00
18 changed files with 3331 additions and 112 deletions

View File

@@ -0,0 +1,89 @@
"""
LamNI/webpage_generator.py
===========================
LamNI-specific webpage generator subclass.
Integration (inside the LamNI __init__ / startup):
---------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import (
LamniWebpageGenerator,
)
self._webpage_gen = LamniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "lamni"):
----------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
lamni._webpage_gen.status()
lamni._webpage_gen.verbosity = 2
lamni._webpage_gen.stop()
lamni._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class LamniWebpageGenerator(WebpageGeneratorBase):
"""
LamNI-specific webpage generator.
Logo: LamNI.png from the same directory as this file.
Override _collect_setup_data() to add LamNI-specific temperatures,
sample name, and measurement settings.
"""
# TODO: fill in LamNI-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample": "lamni_temphum.temperature_sample",
# "OSA": "lamni_temphum.temperature_osa",
}
def _logo_path(self):
return Path(__file__).parent / "LamNI.png"
def _collect_setup_data(self) -> dict:
# ── LamNI-specific data goes here ─────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "lamni",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "lamni",
# LamNI-specific data here
}

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3

View File

@@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import (
TomoIDManager,
)
# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
# FlomniWebpageGenerator,
# VERBOSITY_SILENT, # 0 — no output
# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default)
# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle
# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle
# )
logger = bec_logger.logger
if builtins.__dict__.get("bec") is not None:
@@ -1303,6 +1311,16 @@ class Flomni(
self.corr_angle_y_2 = []
self._progress_proxy = _ProgressProxy(self.client)
self._progress_proxy.reset()
from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import (
FlomniWebpageGenerator,
)
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
upload_url="http://s1090968537.online.de/upload.php", # optional
)
self._webpage_gen.start()
self.OMNYTools = OMNYTools(self.client)
self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername)
self.tomo_id_manager = TomoIDManager()
@@ -2249,8 +2267,8 @@ class Flomni(
+ ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n'
)
padding = 20
fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}"
stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}"
fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}"
stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}"
dataset_id = str(self.client.queue.next_dataset_number)
account = bec.active_account
content = [
@@ -2267,7 +2285,7 @@ class Flomni(
f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n",
f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n",
f"{'FOV:':<{padding}}{fovxy:>{padding}}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}.0f}\n",
f"{'Stitching:':<{padding}}{stitching:>{padding}}\n",
f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n",
f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n",
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,96 @@
"""
omny/webpage_generator.py
==========================
OMNY-specific webpage generator subclass.
Integration (inside the OMNY __init__ / startup):
--------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import (
OmnyWebpageGenerator,
)
self._webpage_gen = OmnyWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
)
self._webpage_gen.start()
Or use the factory (auto-selects by session name "omny"):
---------------------------------------------------------
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
make_webpage_generator,
)
self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/")
self._webpage_gen.start()
Interactive helpers:
--------------------
omny._webpage_gen.status()
omny._webpage_gen.verbosity = 2
omny._webpage_gen.stop()
omny._webpage_gen.start()
"""
from pathlib import Path
from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import (
WebpageGeneratorBase,
_safe_get,
_safe_float,
_gvar,
)
class OmnyWebpageGenerator(WebpageGeneratorBase):
"""
OMNY-specific webpage generator.
Logo: OMNY.png from the same directory as this file.
Override _collect_setup_data() to add OMNY-specific temperatures,
sample name, and measurement settings.
The old OMNY spec webpage showed:
- Cryo temperatures (XOMNY-TEMP-CRYO-A/B)
- Per-channel temperatures (XOMNY-TEMP1..48)
- Dewar pressure / LN2 flow
- Interferometer strengths (OINTERF)
Map these to BEC device paths below once available.
"""
# TODO: fill in OMNY-specific device paths
# label -> dotpath under device_manager.devices
_TEMP_MAP = {
# "Sample (cryo A)": "omny_temp.cryo_a",
# "Cryo head (B)": "omny_temp.cryo_b",
}
def _logo_path(self):
return Path(__file__).parent / "OMNY.png"
def _collect_setup_data(self) -> dict:
# ── OMNY-specific data goes here ──────────────────────────────
# Uncomment and adapt when device names are known:
#
# dm = self._bec.device_manager
# sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A"
# temperatures = {
# label: _safe_float(_safe_get(dm, path))
# for label, path in self._TEMP_MAP.items()
# }
# settings = {
# "Sample name": sample_name,
# "FOV x / y": ...,
# "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"),
# "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"),
# }
# return {
# "type": "omny",
# "sample_name": sample_name,
# "temperatures": temperatures,
# "settings": settings,
# }
# Placeholder — returns minimal info until implemented
return {
"type": "omny",
# OMNY-specific data here
}

View File

@@ -72,7 +72,7 @@ xbpm3x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -95,7 +95,7 @@ xbpm3y:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -118,7 +118,7 @@ sl3trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -141,7 +141,7 @@ sl3trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -164,7 +164,7 @@ sl3trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -187,7 +187,7 @@ sl3trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -210,7 +210,7 @@ fast_shutter_n1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -234,7 +234,7 @@ fast_shutter_o1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -257,7 +257,7 @@ fast_shutter_o2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -280,7 +280,7 @@ filter_array_1_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -303,7 +303,7 @@ filter_array_2_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -326,7 +326,7 @@ filter_array_3_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -349,7 +349,7 @@ filter_array_4_x:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -372,7 +372,7 @@ sl4trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -395,7 +395,7 @@ sl4trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -418,7 +418,7 @@ sl4trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -441,7 +441,7 @@ sl4trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -466,7 +466,7 @@ sl5trxi:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -489,7 +489,7 @@ sl5trxo:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -512,7 +512,7 @@ sl5trxb:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -535,7 +535,7 @@ sl5trxt:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -544,6 +544,66 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx:
description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -558,7 +618,7 @@ xbimtrx:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -581,7 +641,7 @@ xbimtry:
# precision: 3
# tolerance: 0.005
enabled: true
onFailure: buffer
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
@@ -589,3 +649,270 @@ xbimtry:
init_position: 0
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
################### XBOX related ###################
# we assue the epics settings for resolution, velocity etc. are correct
# we do not overwrite from here
aptrx:
description: Aperture pinhole X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
aptry:
description: Aperture pinhole Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-PIN1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrx:
description: Exposure box aperture X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtry:
description: Exposure box aperture Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebtrz:
description: Exposure box aperture Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-EB:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
ebsupport:
description: Exposure box granite support Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-EH1-EB:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx1:
description: FTS1 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry1:
description: FTS1 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrx2:
description: FTS2 translation X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttry2:
description: FTS2 translation Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
fttrz:
description: FTS1 translation Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-FTS1:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1x:
description: Beamstop 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs1y:
description: Beamstop 1 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS1:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2x:
description: Beamstop 2 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
bs2y:
description: Beamstop 2 Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-BS2:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrx:
description: Detector table X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttry:
description: Detector table Y
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRY1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dttrz:
description: Detector table Z
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DETT:TRZ1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
dettrx:
description: Detector 1 X
deviceClass: ophyd_devices.devices.psi_motor.EpicsUserMotorVME
deviceConfig:
prefix: X12SA-ES1-DET1:TRX1
deviceTags:
- cSAXS_ES
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
####################
### Beamstop control for flight tube
####################
beamstop_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -199,6 +199,25 @@ xbpm1c4:
readOnly: true
softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################
######### End of xbpm sub devices ##########
############################################

View File

@@ -68,91 +68,110 @@ ccmx:
- cSAXS
- optics
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
ccm_energy:
description: 'test'
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: 'X12SA-OP-CCM1:'
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false
##########################################################################
######################## SMARACT STAGES ##################################
##########################################################################
xbpm2x:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: A
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 0
# xbpm2x:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: A
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 0
xbpm2y:
description: X-ray beam position monitor 1 in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: B
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 1
# xbpm2y:
# description: X-ray beam position monitor 1 in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: B
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 1
# cu_foilx:
# description: Cu foil in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: C
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 2
# scinx:
# description: scintillator in OPbox
# deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
# deviceConfig:
# axis_Id: D
# host: x12sa-eb-smaract-mcs-03.psi.ch
# limits:
# - -200
# - 200
# port: 5000
# sign: 1
# enabled: true
# onFailure: buffer
# readOnly: false
# readoutPriority: baseline
# connectionTimeout: 20
# userParameter:
# # bl_smar_stage to use csaxs reference method. assign number according to axis channel
# bl_smar_stage: 3
cu_foilx:
description: Cu foil in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: C
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 2
scinx:
description: scintillator in OPbox
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
deviceConfig:
axis_Id: D
host: x12sa-eb-smaract-mcs-03.psi.ch
limits:
- -200
- 200
port: 5000
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
userParameter:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 3
# dmm1_trx_readback_example: # This is the same template as for i.e. bpm4i

View File

@@ -10,8 +10,8 @@
endstation:
- !include ./bl_endstation.yaml
detectors:
- !include ./bl_detectors.yaml
# detectors:
# - !include ./bl_detectors.yaml
#sastt:
# - !include ./sastt.yaml

View File

@@ -395,7 +395,7 @@ rtz:
readoutPriority: on_request
connectionTimeout: 20
rt_flyer:
rt_positions:
deviceClass: csaxs_bec.devices.omny.rt.rt_flomni_ophyd.RtFlomniFlyer
deviceConfig:
host: mpc2844.psi.ch
@@ -522,6 +522,18 @@ omny_panda:
FMC_IN.VAL2.Min: cap_voltage_fzp_x_min
FMC_IN.VAL2.Max: cap_voltage_fzp_x_max
FMC_IN.VAL2.Mean: cap_voltage_fzp_x_mean
INENC1.VAL.Max: interf_st_fzp_y_max
INENC1.VAL.Mean: interf_st_fzp_y_mean
INENC1.VAL.Min: interf_st_fzp_y_min
INENC2.VAL.Max: interf_st_fzp_x_max
INENC2.VAL.Mean: interf_st_fzp_x_mean
INENC2.VAL.Min: interf_st_fzp_x_min
INENC3.VAL.Max: interf_st_rotz_max
INENC3.VAL.Mean: interf_st_rotz_mean
INENC3.VAL.Min: interf_st_rotz_min
INENC4.VAL.Max: interf_st_rotx_max
INENC4.VAL.Mean: interf_st_rotx_mean
INENC4.VAL.Min: interf_st_rotx_min
deviceTags:
- detector
enabled: true

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -48,7 +48,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -56,7 +55,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,172 @@
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="BPM intensity",
)
def __init__(
self,
name,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_right - sum_left) / sum_total
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total

View File

@@ -0,0 +1,189 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"

View File

@@ -0,0 +1 @@
# from ophyd

View File

@@ -105,11 +105,11 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_report_instructions(self):
"""Scan report instructions for the progress bar"""
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_flyer"]})
yield from self.stubs.scan_report_instruction({"device_progress": ["rt_positions"]})
@property
def monitor_sync(self) -> str:
return "rt_flyer"
return "rt_positions"
def initialize(self):
self.scan_motors = []
@@ -294,10 +294,10 @@ class FlomniFermatScan(AsyncFlyScanBase):
def scan_core(self):
# send off the flyer
yield from self.stubs.kickoff(device="rt_flyer")
yield from self.stubs.kickoff(device="rt_positions")
# start the readout loop of the flyer
status = yield from self.stubs.complete(device="rt_flyer", wait=False)
status = yield from self.stubs.complete(device="rt_positions", wait=False)
# read the monitors until the flyer is done
while not status.done:

View File

@@ -0,0 +1,241 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1