Compare commits

..

2 Commits

Author SHA1 Message Date
ca40a8eb3d fix: added missing init file to omny subdir 2024-05-23 16:13:11 +02:00
f476b210f1 docs: added csaxs docs templates 2024-05-23 16:03:11 +02:00
34 changed files with 1342 additions and 1535 deletions

View File

@@ -1,46 +0,0 @@
ddg:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
ddg_config:
delay_burst: 40.e-3
delta_width: 0
additional_triggers: 0
polarity:
- 1 # T0
- 0 # eiger and LeCroy4
- 1
- 1
- 1
amplitude: 4.5
offset: 0
thres_trig_level: 2.5
set_high_on_exposure: False
set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
eiger_jfjoch:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jungfrau_joch.Eiger9McSAXS
deviceConfig:
deviceTags:
- cSAXS
- eiger9m
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: False
simulated_monitor:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

View File

@@ -1,38 +0,0 @@
############################################################
#################### npoint motors #########################
############################################################
npx:
description: nPoint x axis on the big npoint controller
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: A
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
deviceTags:
- npoint
npy:
description: nPoint y axis on the big npoint controller
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: B
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
deviceTags:
- npoint

View File

@@ -3,36 +3,35 @@
### csaxs_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| Bpm4i | | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| InsertionDevice | Python wrapper for the CSAXS insertion device control<br><br> This wrapper provides a positioner interface for the ID control.<br> is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.InsertionDevice](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/InsertionDevice.py) |
| XbpmBase | Python wrapper for X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmCsaxsOp | Python wrapper for custom XBPMs in the cSAXS optics hutch<br><br> This is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmSim | Python wrapper for simulated X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br><br> This simulation device extends the basic proxy with a script that<br> fills signals with quasi-randomized values.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| DelayGeneratorcSAXS | <br> DG645 delay generator at cSAXS (multiple can be in use depending on the setup)<br><br> Default values for setting up DDG.<br> Note: checks of set calues are not (only partially) included, check manual for details on possible settings.<br> https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf<br><br> - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode<br> - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition<br> - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)<br> - polarity : (list of 0/1) polarity for different channels<br> - amplitude : (float) amplitude voltage of TTLs<br> - offset : (float) offset for ampltitude<br> - thres_trig_level : (float) threshold of trigger amplitude<br><br> Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):<br><br> - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.<br> # TODO trigger_width and fixed_ttl could be combined into single list.<br> - fixed_ttl_width : (list of either 1 or 0), one for each channel.<br> - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.<br> - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.<br> - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).<br> - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.<br> | [csaxs_bec.devices.epics.delay_generator_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/delay_generator_csaxs.py) |
| Eiger1p5MDetector | | [csaxs_bec.devices.omny.eiger1p5m](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/eiger1p5m.py) |
| Eiger9McSAXS | <br> Eiger9M detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> Various EpicsPVs for controlling the detector<br> | [csaxs_bec.devices.epics.eiger9m_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/eiger9m_csaxs.py) |
| SLSDetectorCam | SLS Detector Camera - Pilatus<br><br> Base class to map EPICS PVs to ophyd signals.<br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| EpicsDXPFalcon | <br> DXP parameters for Falcon detector<br><br> Base class to map EPICS PVs from DXP parameters to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconcSAXS | <br> Falcon Sitoro detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector<br> mca (EpicsMCARecord) : MCA parameters for Falcon detector<br> hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector<br> MIN_READOUT (float) : Minimum readout time for the detector<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconHDF5Plugins | <br> HDF5 parameters for Falcon detector<br><br> Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FlomniGalilMotor | | [csaxs_bec.devices.omny.galil.fgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fgalil_ophyd.py) |
| FlomniSampleStorage | | [csaxs_bec.devices.omny.flomni_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/flomni_sample_storage.py) |
| FuprGalilMotor | | [csaxs_bec.devices.omny.galil.fupr_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fupr_ophyd.py) |
| FalconcSAXS | <br> Falcon Sitoro detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector<br> mca (EpicsMCARecord) : MCA parameters for Falcon detector<br> hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector<br> MIN_READOUT (float) : Minimum readout time for the detector<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| MCScSAXS | MCS card for cSAXS for implementation at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| SIS38XX | SIS38XX card for access to EPICs PVs at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| PilatuscSAXS | Pilatus_2 300k detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> cam (SLSDetectorCam) : Detector camera<br> MIN_READOUT (float) : Minimum readout time for the detector<br><br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| Bpm4i | | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorPITCH | Girder YAW pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorROLL | Girder ROLL pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorX1 | Girder X translation pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorY1 | Girder Y translation pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorYAW | Girder YAW pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| InsertionDevice | Python wrapper for the CSAXS insertion device control<br><br> This wrapper provides a positioner interface for the ID control.<br> is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.InsertionDevice](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/InsertionDevice.py) |
| LamniGalilMotor | | [csaxs_bec.devices.omny.galil.lgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/lgalil_ophyd.py) |
| MCScSAXS | MCS card for cSAXS for implementation at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| NPointAxis | <br> NPointAxis class, which inherits from Device and PositionerBase. This class<br> represents an axis of an nPoint piezo stage and provides the necessary<br> functionality to move the axis and read its current position.<br> | [csaxs_bec.devices.npoint.npoint](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/npoint/npoint.py) |
| OMNYSampleStorage | | [csaxs_bec.devices.omny.omny_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/omny_sample_storage.py) |
| PilatuscSAXS | Pilatus_2 300k detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> cam (SLSDetectorCam) : Detector camera<br> MIN_READOUT (float) : Minimum readout time for the detector<br><br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| PmDetectorRotation | Detector rotation pseudo motor<br><br> Small wrapper to convert detector pusher position to rotation angle.<br> | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| PmMonoBender | Monochromator bender<br><br> Small wrapper to combine the four monochromator bender motors.<br> | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| Xeye | | [csaxs_bec.devices.sls_devices.cSAXS.xeye](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/sls_devices/cSAXS/xeye.py) |
| SmaractMotor | | [csaxs_bec.devices.smaract.smaract_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/smaract/smaract_ophyd.py) |
| Eiger1p5MDetector | | [csaxs_bec.devices.omny.eiger1p5m](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/eiger1p5m.py) |
| FlomniSampleStorage | | [csaxs_bec.devices.omny.flomni_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/flomni_sample_storage.py) |
| OMNYSampleStorage | | [csaxs_bec.devices.omny.omny_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/omny_sample_storage.py) |
| FlomniGalilMotor | | [csaxs_bec.devices.omny.galil.fgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fgalil_ophyd.py) |
| FuprGalilMotor | | [csaxs_bec.devices.omny.galil.fupr_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fupr_ophyd.py) |
| LamniGalilMotor | | [csaxs_bec.devices.omny.galil.lgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/lgalil_ophyd.py) |
| SGalilMotor | "SGalil Motors at cSAXS have a<br> DC motor (y axis - vertical) - implemented as C<br> and a step motor (x-axis horizontal) - implemented as E<br> that require different communication for control<br> | [csaxs_bec.devices.omny.galil.sgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/sgalil_ophyd.py) |
| RtFlomniMotor | | [csaxs_bec.devices.omny.rt.rt_flomni_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py) |
| RtLamniMotor | | [csaxs_bec.devices.omny.rt.rt_lamni_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py) |
| SGalilMotor | "SGalil Motors at cSAXS have a<br> DC motor (y axis - vertical) - implemented as C<br> and a step motor (x-axis horizontal) - implemented as E<br> that require different communication for control<br> | [csaxs_bec.devices.omny.galil.sgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/sgalil_ophyd.py) |
| SIS38XX | SIS38XX card for access to EPICs PVs at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| SLSDetectorCam | SLS Detector Camera - Pilatus<br><br> Base class to map EPICS PVs to ophyd signals.<br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| SmaractMotor | | [csaxs_bec.devices.smaract.smaract_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/smaract/smaract_ophyd.py) |
| XbpmBase | Python wrapper for X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmCsaxsOp | Python wrapper for custom XBPMs in the cSAXS optics hutch<br><br> This is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmSim | Python wrapper for simulated X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br><br> This simulation device extends the basic proxy with a script that<br> fills signals with quasi-randomized values.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| Xeye | | [csaxs_bec.devices.sls_devices.cSAXS.xeye](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/sls_devices/cSAXS/xeye.py) |

View File

@@ -1,8 +1,5 @@
import time
from bec_lib import bec_logger
from ophyd import Component, DeviceStatus
from ophyd import Component
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
@@ -17,165 +14,6 @@ class DelayGeneratorError(Exception):
"""Exception raised for errors."""
# class DDGSetup(DDGCustomMixin):
# """
# Mixin class for DelayGenerator logic at cSAXS.
# At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
# implemented in the ddg_config signals that are passed via the device config.
# """
# def initialize_default_parameter(self) -> None:
# """Method to initialize default parameters."""
# for ii, channel in enumerate(self.parent.all_channels):
# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
# self.parent.set_channels("amplitude", self.parent.amplitude.get())
# self.parent.set_channels("offset", self.parent.offset.get())
# # Setup reference
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # Set threshold level for ext. pulses
# self.parent.level.put(self.parent.thres_trig_level.get())
# def prepare_ddg(self) -> None:
# """
# Method to prepare scan logic of cSAXS
# Two scantypes are supported: "step" and "fly":
# - step: Scan is performed by stepping the motor and acquiring data at each step
# - fly: Scan is performed by moving the motor with a constant velocity and acquiring data
# Custom logic for different DDG behaviour during scans.
# - set_high_on_exposure : If True, then TTL signal is high during
# the full exposure time of the scan (all frames).
# E.g. Keep shutter open for the full scan.
# - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
# If the value is 0, then the width of the TTL pulse is determined,
# no matter which parameters are passed from the scaninfo for exposure time
# - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
# were: SINGLE_SHOT, EXT_RISING_EDGE
# """
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # scantype "step"
# if self.parent.scaninfo.scan_type == "step":
# # High on exposure means that the signal
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.frames_per_trigger
# * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# # scantype "fly"
# elif self.parent.scaninfo.scan_type == "fly":
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
# + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# else:
# raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# # Set common DDG parameters
# self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
# self.parent.set_channels("delay", 0.0)
# def on_trigger(self) -> None:
# """Method to be executed upon trigger"""
# if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
# self.parent.trigger_shot.put(1)
# def check_scan_id(self) -> None:
# """
# Method to check if scan_id has changed.
# If yes, then it changes parent.stopped to True, which will stop further actions.
# """
# old_scan_id = self.parent.scaninfo.scan_id
# self.parent.scaninfo.load_scan_metadata()
# if self.parent.scaninfo.scan_id != old_scan_id:
# self.parent.stopped = True
# def finished(self) -> None:
# """Method checks if DDG finished acquisition"""
# def on_pre_scan(self) -> None:
# """
# Method called by pre_scan hook in parent class.
# Executes trigger if premove_trigger is Trus.
# """
# if self.parent.premove_trigger.get() is True:
# self.parent.trigger_shot.put(1)
class DDGSetup(DDGCustomMixin):
"""
Mixin class for DelayGenerator logic at cSAXS.
@@ -203,93 +41,114 @@ class DDGSetup(DDGCustomMixin):
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "jjf_test"
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
"""
Method to prepare scan logic of cSAXS
def on_stage(self) -> None:
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
self.parent.set_channels("width", exp_time)
self.parent.set_channels("delay", 0.0)
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
self.parent.burst_enable(num_burst_cycle, 0, total_exposure, config="first")
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
)
logger.info(f"Return status {self.parent.name}")
return status
# timer = 0
# while True:
# self.parent.trigger_burst_readout.put(1)
# state = self.parent.burst_cycle_finished.get()
# if state == 1:
# break
# time.sleep(0.05)
# timer +=0.05
# if timer>3:
# raise TimeoutError(f"{self.parent.name} did not return. Bit state for end_burst_cycle is {state} for state")
def on_complete(self) -> DeviceStatus:
pass
# logger.info(f"On complete started for {self.parent.name}")
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name != "jjf_test":
# return None
# def check_ddg()->int:
# lambda r : self.parent.trigger_burst_readout.put(1)
# return lambda r: self.parent.burst_cycle_finished.get()
# status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
# timeout=3,
# check_stopped=True,
# exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
# )
# logger.info(f"Return status {self.parent.name}")
# return status
def check_scan_id(self) -> None:
"""
@@ -344,7 +203,6 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
@@ -430,6 +288,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
@@ -475,6 +334,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
@@ -482,48 +342,4 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
import time
config = {
"delay_burst": 40.0e-3,
"delta_width": 0,
"additional_triggers": 0,
"polarity": [1, 0, 1, 1, 1], # T0 # to eiger and lecroy4
"amplitude": 4.5,
"offset": 0,
"thres_trig_level": 2.5,
"set_high_on_exposure": False,
"set_high_on_stage": False,
}
start = time.time()
print(f"Start with init of DDG3 with config: {config}")
dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="dgen", ddg_config=config)
print(f"Finished init after: {time.time()-start}s")
start = time.time()
print(f"Start setting up DDG3")
exp_time = 1/(2e3) # 2 kHz
readout = exp_time/10
delay = 0
num_burst_cycle = 1e4 # N triggers
total_exposure = exp_time+readout
delay_burst = dgen.delay_burst.get()
dgen.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
dgen.set_channels(signal='width', value=exp_time)
dgen.set_channels(signal='delay', value=0)
dgen.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
print(f"Start sending {num_burst_cycle} triggers after {time.time()-start}s, ETA {num_burst_cycle*total_exposure}s")
break_time = time.time()
dgen.trigger()
# Wait here briefly for status to finish, whether this is realiable has to be tested
time.sleep(num_burst_cycle*total_exposure)
timer = 0
while True:
dgen.trigger_burst_readout.put(1)
state = dgen.burst_cycle_finished.get()
if state == 1:
break
time.sleep(0.05)
timer +=0.05
if timer>3:
raise TimeoutError(f"dgen.name did not return with value {state} for state")
print(f"Finished trigger cascade of {num_burst_cycle} with {exp_time}s -> {num_burst_cycle*exp_time}s after {time.time()-start}s in total, {break_time} for sending triggers.")
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)

View File

@@ -5,6 +5,8 @@ import time
from typing import Any
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
@@ -40,17 +42,6 @@ class Eiger9MSetup(CustomDetectorMixin):
self.std_client = None
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize the detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_detector(self) -> None:
"""Initialize detector"""
self.stop_detector()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def initialize_default_parameter(self) -> None:
"""Set default parameters for Eiger9M detector"""
self.update_readout_time()
@@ -64,19 +55,29 @@ class Eiger9MSetup(CustomDetectorMixin):
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector(self) -> None:
"""Initialize detector"""
# Stops the detector
self.stop_detector()
# Sets the trigger source to GATING
self.parent.set_trigger(TriggerSource.GATING)
def initialize_detector_backend(self) -> None:
"""Initialize detector backend"""
# Std client
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
# Stop writer
self.std_client.stop_writer()
# Change e-account
eacc = self.parent.scaninfo.username
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
all_signals=True,
signal_conditions=signal_conditions, timeout=self.parent.timeout, all_signals=True
):
raise EigerTimeoutError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
@@ -97,6 +98,7 @@ class Eiger9MSetup(CustomDetectorMixin):
"""
# Load config from client and check old value
cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key)
if old_value is None:
@@ -109,112 +111,19 @@ class Eiger9MSetup(CustomDetectorMixin):
f" {type(old_value)}:{old_value}"
)
# Update config with new value and send back to client
cfg.update({cfg_key: value})
logger.debug(cfg)
self.std_client.set_config(cfg)
logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
def on_stage(self) -> None:
"""Prepare the detector for scan"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def set_detector_threshold(self) -> None:
"""
Set the detector threshold
The function sets the detector threshold automatically to 1/2 of the beam energy.
"""
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
setpoint = int(mokev * factor)
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
if setpoint != energy:
self.parent.cam.beam_energy.set(setpoint)
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
self.update_readout_time()
def prepare_data_backend(self) -> None:
"""Prepare the data backend for the scan"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
self.filepath_exists(self.parent.filepath.get())
self.stop_detector_backend()
try:
self.std_client.start_writer_async(
{
"output_file": self.parent.filepath.get(),
"n_images": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
}
)
except Exception as exc:
time.sleep(5)
if self.std_client.get_status()["state"] == "READY":
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
signal_conditions = [
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=False,
all_signals=True,
):
raise EigerTimeoutError(
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
f" {self.std_client.get_status()}"
)
def on_unstage(self) -> None:
"""Unstage the detector"""
pass
def on_complete(self) -> None:
"""Complete the detector"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
def on_stop(self) -> None:
"""Stop the detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop the detector"""
# Stop detector
self.parent.cam.acquire.put(0)
# Check if detector returned in idle state
signal_conditions = [
(
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
@@ -226,7 +135,7 @@ class Eiger9MSetup(CustomDetectorMixin):
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
timeout=self.parent.timeout - self.parent.timeout // 2,
check_stopped=True,
all_signals=False,
):
@@ -234,7 +143,7 @@ class Eiger9MSetup(CustomDetectorMixin):
self.parent.cam.acquire.put(0)
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
timeout=self.parent.timeout - self.parent.timeout // 2,
check_stopped=True,
all_signals=False,
):
@@ -246,12 +155,97 @@ class Eiger9MSetup(CustomDetectorMixin):
"""Close file writer"""
self.std_client.stop_writer()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.set_trigger(TriggerSource.GATING)
def set_detector_threshold(self) -> None:
"""
Set the detector threshold
The function sets the detector threshold automatically to 1/2 of the beam energy.
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
if setpoint != energy:
self.parent.cam.beam_energy.set(setpoint)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def prepare_data_backend(self) -> None:
"""Prepare the data backend for the scan"""
self.parent.filepath = self.parent.filewriter.compile_full_filename(
f"{self.parent.name}.h5"
)
self.filepath_exists(self.parent.filepath)
self.stop_detector_backend()
try:
self.std_client.start_writer_async(
{
"output_file": self.parent.filepath,
"n_images": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
}
)
except Exception as exc:
time.sleep(5)
if self.std_client.get_status()["state"] == "READY":
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
# Check status of std_daq
signal_conditions = [
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
check_stopped=False,
all_signals=True,
):
raise EigerTimeoutError(
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
f" {self.std_client.get_status()}"
)
def filepath_exists(self, filepath: str) -> None:
"""Check if filepath exists"""
signal_conditions = [(lambda: os.path.exists(os.path.dirname(filepath)), True)]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
timeout=self.parent.timeout,
check_stopped=False,
all_signals=True,
):
@@ -270,7 +264,7 @@ class Eiger9MSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=False,
):
@@ -278,7 +272,43 @@ class Eiger9MSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def finished(self, timeout: int = 5) -> None:
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.parent.filepath, done=done)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self):
"""Check if acquisition is finished."""
with self._lock:
signal_conditions = [
@@ -296,7 +326,7 @@ class Eiger9MSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=True,
):
@@ -327,7 +357,7 @@ class SLSDetectorCam(Device):
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
class TriggerSource(int, enum.Enum):
class TriggerSource(enum.IntEnum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
@@ -336,7 +366,7 @@ class TriggerSource(int, enum.Enum):
BURST_TRIGGER = 3
class DetectorState(int, enum.Enum):
class DetectorState(enum.IntEnum):
"""Detector states for Eiger9M detector"""
IDLE = 0
@@ -366,16 +396,37 @@ class Eiger9McSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = Eiger9MSetup
# specify minimum readout time for detector and timeout for checks after unstage
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
cam = ADCpt(SLSDetectorCam, "cam1:")
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
Args:
trigger_source (TriggerSource): Trigger source for the detector
"""
value = trigger_source
self.cam.trigger_mode.put(value)
def stage(self) -> list[object]:
"""
Add functionality to stage, and arm the detector
Additional call to:
- custom_prepare.arm_acquisition()
"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)

View File

@@ -1,7 +1,8 @@
import enum
import os
import threading
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
@@ -98,16 +99,6 @@ class FalconSetup(CustomDetectorMixin):
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize Falcon detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_default_parameter(self) -> None:
"""
Set default parameters for Falcon
@@ -133,7 +124,7 @@ class FalconSetup(CustomDetectorMixin):
"""Initialize Falcon detector"""
self.stop_detector()
self.stop_detector_backend()
self.set_trigger(
self.parent.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
# 1 Realtime
@@ -145,6 +136,30 @@ class FalconSetup(CustomDetectorMixin):
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
signal_conditions = [
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout - self.parent.timeout // 2,
all_signals=False,
):
# Retry stop detector and wait for remaining time
raise FalconTimeoutError(
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
self.parent.hdf5.capture.put(0)
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Falcon."""
self.parent.hdf5.enable.put(1)
@@ -158,16 +173,9 @@ class FalconSetup(CustomDetectorMixin):
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
self.set_trigger(
self.parent.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
@@ -177,10 +185,10 @@ class FalconSetup(CustomDetectorMixin):
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
file_path, file_name = os.path.split(self.parent.filepath.get())
self.parent.filepath = self.parent.filewriter.compile_full_filename(
f"{self.parent.name}.h5"
)
file_path, file_name = os.path.split(self.parent.filepath)
self.parent.hdf5.file_path.put(file_path)
self.parent.hdf5.file_name.put(file_name)
self.parent.hdf5.file_template.put("%s%s")
@@ -204,7 +212,7 @@ class FalconSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=False,
):
@@ -212,85 +220,64 @@ class FalconSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def on_unstage(self) -> None:
"""Unstage detector and backend"""
pass
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def on_complete(self) -> None:
"""Complete detector and backend"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
signal_conditions = [
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
all_signals=False,
):
# Retry stop detector and wait for remaining time
raise FalconTimeoutError(
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
self.parent.hdf5.capture.put(0)
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
with self._lock:
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Set triggering mode for detector
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.parent.collect_mode.put(mapping)
self.parent.pixel_advance_mode.put(trigger)
self.parent.ignore_gate.put(ignore_gate)
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.parent.filepath, done=done)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self) -> None:
"""Check if scan finished succesfully"""
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
class FalconcSAXS(PSIDetectorBase):
@@ -316,7 +303,6 @@ class FalconcSAXS(PSIDetectorBase):
custom_prepare_cls = FalconSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
@@ -344,6 +330,30 @@ class FalconcSAXS(PSIDetectorBase):
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.collect_mode.put(mapping)
self.pixel_advance_mode.put(trigger)
self.ignore_gate.put(ignore_gate)
def stage(self) -> list[object]:
"""Stage"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__":
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)

View File

@@ -74,11 +74,6 @@ class MCSSetup(CustomDetectorMixin):
]
self.mca_data = defaultdict(lambda: [])
def on_init(self) -> None:
"""Init sequence for the detector"""
self.initialize_detector()
self.initialize_detector_backend()
def initialize_detector(self) -> None:
"""Initialize detector"""
# External trigger for pixel advance
@@ -89,7 +84,7 @@ class MCSSetup(CustomDetectorMixin):
# Set number of channels to 5
self.parent.mux_output.set(5)
# Trigger Mode used for cSAXS
self.parent.input_mode.set(TriggerSource.MODE3)
self.parent.set_trigger(TriggerSource.MODE3)
# specify polarity of trigger signals
self.parent.input_polarity.set(0)
self.parent.output_polarity.set(1)
@@ -147,15 +142,10 @@ class MCSSetup(CustomDetectorMixin):
expire=self._stream_ttl,
)
def on_stage(self) -> None:
"""Stage detector"""
self.prepare_detector()
self.prepare_detector_backend()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_acquisition_params()
self.parent.input_mode.set(TriggerSource.MODE3)
self.parent.set_trigger(TriggerSource.MODE3)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for scan"""
@@ -185,15 +175,7 @@ class MCSSetup(CustomDetectorMixin):
self.counter = 0
self.parent.erase_start.set(1)
def on_unstage(self) -> None:
"""Unstage detector"""
pass
def on_complete(self) -> None:
"""Complete detector"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
def finished(self, timeout: int = 5) -> None:
def finished(self) -> None:
"""Check if acquisition is finished, if not successful, rais MCSTimeoutError"""
signal_conditions = [
(lambda: self.acquisition_done, True),
@@ -201,7 +183,7 @@ class MCSSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=True,
):
@@ -213,15 +195,12 @@ class MCSSetup(CustomDetectorMixin):
f" {total_frames} frames arriving at the mcs card"
)
def on_stop(self) -> None:
"""Stop detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop detector"""
self.parent.stop_all.set(1)
return super().stop_detector()
def stop_detector_backend(self) -> None:
"""Stop acquisition of data"""
self.acquisition_done = True
@@ -234,7 +213,7 @@ class SIS38XX(Device):
class MCScSAXS(PSIDetectorBase):
"""MCS card for cSAXS for implementation at cSAXS beamline"""
USER_ACCESS = []
USER_ACCESS = ["describe", "_init_mcs"]
SUB_PROGRESS = "progress"
SUB_VALUE = "value"
_default_sub = SUB_VALUE
@@ -243,7 +222,6 @@ class MCScSAXS(PSIDetectorBase):
custom_prepare_cls = MCSSetup
# specify minimum readout time for detector
MIN_READOUT = 0
TIMEOUT_FOR_SIGNALS = 5
# PV access to SISS38XX card
# Acquisition
@@ -294,8 +272,11 @@ class MCScSAXS(PSIDetectorBase):
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
mcs_config=None,
**kwargs,
):
@@ -308,11 +289,25 @@ class MCScSAXS(PSIDetectorBase):
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger mode from TriggerSource"""
value = int(trigger_source)
self.input_mode.set(value)
def stage(self) -> list[object]:
"""stage the detector for upcoming acquisition"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
# Automatically connect to test environmenr if directly invoked
if __name__ == "__main__":

View File

@@ -1,12 +1,12 @@
import enum
import json
import os
import threading
import time
import numpy as np
import requests
from bec_lib import bec_logger
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
@@ -16,6 +16,8 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
logger = bec_logger.logger
MIN_READOUT = 3e-3
class PilatusError(Exception):
"""Base class for exceptions in this module."""
@@ -70,15 +72,6 @@ class PilatusSetup(CustomDetectorMixin):
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize the detector"""
self.initialize_default_parameter()
self.initialize_detector()
def initialize_default_parameter(self) -> None:
"""Set default parameters for Eiger9M detector"""
self.update_readout_time()
@@ -97,15 +90,7 @@ class PilatusSetup(CustomDetectorMixin):
# Stops the detector
self.stop_detector()
# Sets the trigger source to GATING
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
def on_stage(self) -> None:
"""Stage the detector for scan"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(
done=False, successful=False, metadata={"input_path": self.parent.filepath_raw}
)
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
def prepare_detector(self) -> None:
"""
@@ -116,7 +101,84 @@ class PilatusSetup(CustomDetectorMixin):
"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
def set_detector_threshold(self) -> None:
"""
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def create_directory(self, filepath: str) -> None:
"""Create directory if it does not exist"""
os.makedirs(filepath, exist_ok=True)
def stop_detector_backend(self) -> None:
"""Stop the file writer zmq service for pilatus_2"""
self.close_file_writer()
time.sleep(0.1)
self.stop_file_writer()
time.sleep(0.1)
def close_file_writer(self) -> None:
"""
Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self.send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self.send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
def prepare_data_backend(self) -> None:
"""
@@ -129,9 +191,7 @@ class PilatusSetup(CustomDetectorMixin):
self.stop_detector_backend()
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename("pilatus_2.h5")
).wait()
self.parent.filepath = self.parent.filewriter.compile_full_filename("pilatus_2.h5")
self.parent.cam.file_path.put("/dev/shm/zmq/")
self.parent.cam.file_name.put(
f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}"
@@ -215,76 +275,6 @@ class PilatusSetup(CustomDetectorMixin):
except Exception as exc:
logger.info(f"Pilatus2 wait threw Exception: {exc}")
def set_detector_threshold(self) -> None:
"""
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def create_directory(self, filepath: str) -> None:
"""Create directory if it does not exist"""
os.makedirs(filepath, exist_ok=True)
def close_file_writer(self) -> None:
"""
Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self.send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self.send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
def send_requests_put(self, url: str, data: list = None, headers: dict = None) -> object:
"""
Send a put request to the given url
@@ -312,8 +302,14 @@ class PilatusSetup(CustomDetectorMixin):
"""
return requests.delete(url=url, headers=headers, timeout=5)
def on_pre_scan(self) -> None:
"""Prepare detector for scan"""
def pre_scan(self) -> None:
"""
Pre_scan function call
This function is called just before the scan core.
Here it is used to arm the detector for the acquisition
"""
self.arm_acquisition()
def arm_acquisition(self) -> None:
@@ -322,18 +318,43 @@ class PilatusSetup(CustomDetectorMixin):
# TODO is this sleep needed? to be tested with detector and for how long
time.sleep(0.5)
def on_unstage(self) -> None:
"""Unstage the detector"""
pass
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS and publish the event for the h5_converter
def on_complete(self) -> None:
"""Complete the scan"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(
done=True, successful=True, metadata={"input_path": self.parent.filepath_raw}
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(
file_path=self.parent.filepath,
done=done,
metadata={"input_path": self.parent.filepath_raw},
)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath,
done=done,
successful=successful,
metadata={"input_path": self.parent.filepath_raw},
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self, timeout: int = 5) -> None:
def finished(self) -> None:
"""Check if acquisition is finished."""
# pylint: disable=protected-access
# TODO: at the moment this relies on device.mcs.obj._staged attribute
@@ -342,7 +363,7 @@ class PilatusSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=True,
):
@@ -354,21 +375,16 @@ class PilatusSetup(CustomDetectorMixin):
self.stop_detector()
self.stop_detector_backend()
def on_stop(self) -> None:
"""Stop detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop detector"""
self.parent.cam.acquire.put(0)
def stop_detector_backend(self) -> None:
"""Stop the file writer zmq service for pilatus_2"""
self.close_file_writer()
time.sleep(0.1)
self.stop_file_writer()
time.sleep(0.1)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
class PilatuscSAXS(PSIDetectorBase):
@@ -385,16 +401,20 @@ class PilatuscSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
USER_ACCESS = ["describe"]
# specify Setup class
custom_prepare_cls = PilatusSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
cam = ADCpt(SLSDetectorCam, "cam1:")
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector"""
value = trigger_source
self.cam.trigger_mode.put(value)
if __name__ == "__main__":
pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)

View File

@@ -1,166 +0,0 @@
import enum
import os
import threading
import time
from typing import Any
import numpy as np
import openapi_client
from bec_lib.logger import bec_logger
from openapi_client.models.dataset_settings import DatasetSettings
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from std_daq_client import StdDaqClient
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class EigerError(Exception):
"""Base class for exceptions in this module."""
class EigerTimeoutError(EigerError):
"""Raised when the Eiger does not respond in time."""
class DetectorState(str, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = "idle"
class ErrorState(int, enum.Enum):
""" Error State in the detector"""
ERROR_STATUS_WAITING = 504
class Eiger9MJungfrauJochSetup(CustomDetectorMixin):
"""Eiger setup class
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
# kwargs["host"] =
# kwargs["port"] =
configuration = openapi_client.Configuration(host="http://sls-jfjoch-001:8080")
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != DetectorState.IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ErrorState.ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
time.sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def on_stage(self):
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
exp_time = self.parent.scaninfo.exp_time
readout = self.parent.scaninfo.readout_time
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
cycles = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
total_points = num_burst_cycle * cycles
dataset_settings = DatasetSettings(
image_time_us = int(exp_time*1e6),
ntrigger=total_points,
beam_x_pxl = 0,
beam_y_pxl = 0,
detector_distance_mm = 100,
incident_energy_ke_v = 10.00,
)
self.api.start_post(dataset_settings=dataset_settings)
def on_unstage(self):
pass
def on_complete(self):
logger.info("Starting complete for {self.parent.name}")
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
def wait_till_done_post():
try:
done = self.api.wait_till_done_post(timeout=1)
except Exception as e: #TODO: be more specific
return False
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return True
return True
status = self.wait_with_status(signal_conditions=[(wait_till_done_post, True)], check_stopped=True,timeout=10)
return status
def on_stop(self):
self.api.cancel_post()
class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
class Eiger9McSAXS(PSIDetectorBase):
"""
Eiger9M detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
Various EpicsPVs for controlling the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = Eiger9MJungfrauJochSetup
# specify minimum readout time for detector and timeout for checks after unstage
MIN_READOUT = 20e-6
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:")
eiger.custom_prepare.client.init()

View File

@@ -1,84 +0,0 @@
import string
from time import sleep
import openapi_client
ERROR_STATUS_WAITING = 504
STATUS_IDLE = "Idle"
# allow / to enable creation of subfolders
ALLOWED_CHARS = set(
string.ascii_letters + string.digits + "_-+/"
)
def character_cleanup(s, default="_", allowed=ALLOWED_CHARS):
return "".join(i if i in allowed else default for i in s)
class JFJClient:
def __init__(self, host):
configuration = openapi_client.Configuration(host=host)
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != STATUS_IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def get_detector_status(self):
return self.api.detector_status_get()#.state #TODO
def get_detectors(self):
return self.api.config_select_detector_get()
def get_detector_config(self):
return self.api.config_detector_get()
def acquire(self, file_prefix, **kwargs):
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
file_prefix = character_cleanup(file_prefix)
dataset_settings = openapi_client.DatasetSettings(file_prefix=file_prefix, **kwargs)
self.api.start_post(dataset_settings=dataset_settings)
self.actually_wait_till_done()
def take_pedestal(self):
self.api.pedestal_post()
self.actually_wait_till_done()

View File

@@ -1,15 +1,12 @@
import functools
import socket
import threading
import time
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal, SignalRO
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import raise_if_disconnected
from prettytable import PrettyTable
from typeguard import typechecked
def channel_checked(fcn):
@@ -17,32 +14,81 @@ def channel_checked(fcn):
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
# pylint: disable=protected-access
self._check_channel(args[0])
return fcn(self, *args, **kwargs)
return wrapper
class NpointError(Exception):
"""
Base class for Npoint errors.
"""
class SocketIO:
"""SocketIO helper class for TCP IP connections"""
def __init__(self, sock=None):
self.is_open = False
if sock is None:
self.open()
else:
self.sock = sock
def connect(self, host, port):
print(f"connecting to {host} port {port}")
# self.sock.create_connection((host, port))
self.sock.connect((host, port))
def _put(self, msg_bytes):
return self.sock.send(msg_bytes)
def _recv(self, buffer_length=1024):
return self.sock.recv(buffer_length)
def _initialize_socket(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(5)
def put(self, msg):
return self._put(msg)
def receive(self, buffer_length=1024):
return self._recv(buffer_length=buffer_length)
def open(self):
self._initialize_socket()
self.is_open = True
def close(self):
self.sock.close()
self.sock = None
self.is_open = False
class NPointController(Controller):
"""
Controller for nPoint piezo stages. This class inherits from the Controller class
and provides a singleton interface to the nPoint controller.
"""
class NPointController:
_controller_instance = None
_axes_per_controller = 3
NUM_CHANNELS = 3
_read_single_loc_bit = "A0"
_write_single_loc_bit = "A2"
_trailing_bit = "55"
_range_offset = "78"
_channel_base = ["11", "83"]
def __init__(
self, comm_socket: SocketIO, server_ip: str = "129.129.99.87", server_port: int = 23
) -> None:
self._lock = threading.RLock()
super().__init__()
self._server_and_port_name = (server_ip, server_port)
self.socket = comm_socket
self.connected = False
def __new__(cls, *args, **kwargs):
if not NPointController._controller_instance:
NPointController._controller_instance = object.__new__(cls)
return NPointController._controller_instance
@classmethod
def create(cls):
return cls(SocketIO())
def show_all(self) -> None:
"""Display current status of all channels
@@ -52,13 +98,54 @@ class NPointController(Controller):
if not self.connected:
print("npoint controller is currently disabled.")
return
print(f"Connected to controller at {self._socket_host}:{self._socket_port}")
print(f"Connected to controller at {self._server_and_port_name}")
t = PrettyTable()
t.field_names = ["Channel", "Range", "Position", "Target"]
for ii in range(self._axes_per_controller):
t.add_row([ii, self._get_range(ii), self.get_current_pos(ii), self.get_target_pos(ii)])
for ii in range(self.NUM_CHANNELS):
t.add_row(
[ii, self._get_range(ii), self._get_current_pos(ii), self._get_target_pos(ii)]
)
print(t)
@threadlocked
def on(self) -> None:
"""Enable the NPoint controller and open a new socket.
Raises:
TimeoutError: Raised if the socket connection raises a timeout.
Returns:
None
"""
if self.connected:
print("You are already connected to the NPoint controller.")
return
if not self.socket.is_open:
self.socket.open()
try:
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
except socket.timeout:
raise TimeoutError(
f"Failed to connect to the specified server and port {self._server_and_port_name}."
)
except OSError:
print("ERROR while connecting. Let's try again")
self.socket.close()
time.sleep(0.5)
self.socket.open()
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
self.connected = True
@threadlocked
def off(self) -> None:
"""Disable the controller and close the socket.
Returns:
None
"""
self.socket.close()
self.connected = False
@channel_checked
def _get_range(self, channel: int) -> int:
"""Get the range of the specified channel axis.
@@ -87,7 +174,7 @@ class NPointController(Controller):
return device_range
@channel_checked
def get_current_pos(self, channel: int) -> float:
def _get_current_pos(self, channel: int) -> float:
# for first channel: 0x11 83 13 34
addr = self._channel_base.copy()
addr.extend([f"{19 + 16 * channel:x}", "34"])
@@ -100,7 +187,7 @@ class NPointController(Controller):
return pos
@channel_checked
def set_target_pos(self, channel: int, pos: float) -> None:
def _set_target_pos(self, channel: int, pos: float) -> None:
# for first channel: 0x11 83 12 18 00 00 00 00
addr = self._channel_base.copy()
addr.extend([f"{18 + channel * 16:x}", "18"])
@@ -112,7 +199,7 @@ class NPointController(Controller):
self._put(send_buffer)
@channel_checked
def get_target_pos(self, channel: int) -> float:
def _get_target_pos(self, channel: int) -> float:
# for first channel: 0x11 83 12 18
addr = self._channel_base.copy()
addr.extend([f"{18 + channel * 16:x}", "18"])
@@ -127,17 +214,17 @@ class NPointController(Controller):
def _set_servo(self, channel: int, enable: bool) -> None:
print("Not tested")
return
# # for first channel: 0x11 83 10 84 00 00 00 00
# addr = self._channel_base.copy()
# addr.extend([f"{16 + channel * 16:x}", "84"])
# for first channel: 0x11 83 10 84 00 00 00 00
addr = self._channel_base.copy()
addr.extend([f"{16 + channel * 16:x}", "84"])
# if enable:
# data = ["00"] * 3 + ["01"]
# else:
# data = ["00"] * 4
# send_buffer = self.__write_single_location_buffer(addr, data)
if enable:
data = ["00"] * 3 + ["01"]
else:
data = ["00"] * 4
send_buffer = self.__write_single_location_buffer(addr, data)
# self._put(send_buffer)
self._put(send_buffer)
@channel_checked
def _get_servo(self, channel: int) -> int:
@@ -163,7 +250,7 @@ class NPointController(Controller):
"""
buffer = b"".join([bytes.fromhex(m) for m in buffer])
self.sock.put(buffer)
self.socket.put(buffer)
@threadlocked
def _put_and_receive(self, msg_hex_list: list) -> list:
@@ -177,8 +264,8 @@ class NPointController(Controller):
"""
buffer = b"".join([bytes.fromhex(m) for m in msg_hex_list])
self.sock.put(buffer)
recv_msg = self.sock.receive()
self.socket.put(buffer)
recv_msg = self.socket.receive()
recv_hex_list = [hex(m) for m in recv_msg]
self._verify_received_msg(msg_hex_list, recv_hex_list)
return recv_hex_list
@@ -206,9 +293,9 @@ class NPointController(Controller):
raise RuntimeError("Connection failure. Please restart the controller.")
def _check_channel(self, channel: int) -> None:
if channel >= self._axes_per_controller:
if channel >= self.NUM_CHANNELS:
raise ValueError(
f"Channel {channel+1} exceeds the available number of channels ({self._axes_per_controller})"
f"Channel {channel+1} exceeds the available number of channels ({self.NUM_CHANNELS})"
)
@staticmethod
@@ -304,285 +391,155 @@ class NPointController(Controller):
self.off()
class NpointSignalBase(SocketSignal):
"""
Base class for nPoint signals.
"""
class NPointAxis:
def __init__(self, controller: NPointController, channel: int, name: str) -> None:
super().__init__()
self._axis_range = 100
self.controller = controller
self.channel = channel
self.name = name
self.controller._check_channel(channel)
self._settling_time = 0.1
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller: NPointController = self.parent.controller
self.sock = self.parent.controller.sock
if self.settling_time == 0:
self.settling_time = 0.1
print(f"Setting the npoint settling time to {self.settling_time:.2f} s.")
print(
"You can set the settling time depending on the stage tuning\nusing the settling_time property."
)
print("This is the waiting time before the counting is done.")
class NpointSignalRO(NpointSignalBase):
"""
Base class for read-only signals.
"""
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
@threadlocked
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
class NpointReadbackSignal(NpointSignalRO):
"""
Signal to read the current position of an nPoint piezo stage.
"""
@threadlocked
def _socket_get(self):
return self.controller.get_current_pos(self.parent.axis_Id_numeric) * self.parent.sign
class NpointSetpointSignal(NpointSignalBase):
"""
Signal to set the target position of an nPoint piezo stage.
"""
setpoint = 0
@threadlocked
def _socket_get(self):
return self.controller.get_target_pos(self.parent.axis_Id_numeric) * self.parent.sign
@threadlocked
def _socket_set(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
return self.controller.set_target_pos(
self.parent.axis_Id_numeric, target_val * self.parent.sign
)
class NpointMotorIsMoving(SignalRO):
"""
Signal to indicate whether the motor is currently moving or not.
"""
def set_motor_is_moving(self, value: int) -> None:
"""
Set the motor_is_moving signal to the specified value.
Args:
value (int): 1 if the motor is moving, 0 otherwise.
"""
self._readback = value
class NPointAxis(Device, PositionerBase):
"""
NPointAxis class, which inherits from Device and PositionerBase. This class
represents an axis of an nPoint piezo stage and provides the necessary
functionality to move the axis and read its current position.
"""
USER_ACCESS = ["controller"]
readback = Cpt(NpointReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(NpointSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(NpointMotorIsMoving, value=0, kind="normal")
settling_time = Cpt(Signal, value=0.1, kind="config")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=8085,
limits=None,
sign=1,
socket_cls=SocketIO,
tolerance: float = 0.05,
**kwargs,
):
self.controller = NPointController(
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.sign = sign
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = tolerance
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
def show_all(self) -> None:
self.controller.show_all()
@raise_if_disconnected
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
def get(self) -> float:
"""Get current position for this channel.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Raises:
RuntimeError: Raised if channel is not connected.
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
Returns:
float: position
"""
self._started_moving = False
timeout = kwargs.pop("timeout", 10)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
return self.controller._get_current_pos(self.channel)
def move_and_finish():
self.motor_is_moving.set_motor_is_moving(1)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(self.settling_time.get())
self.motor_is_moving.set_motor_is_moving(0)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
self._done_moving(success=success)
@raise_if_disconnected
def get_target_pos(self) -> float:
"""Get target position for this channel.
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
Raises:
RuntimeError: Raised if channel is not connected.
return status
@property
def axis_Id(self):
Returns:
float: position
"""
Return the axis_Id_alpha.
"""
return self._axis_Id_alpha
return self.controller._get_target_pos(self.channel)
@axis_Id.setter
def axis_Id(self, val: str):
"""
Set the axis_Id_alpha and axis_Id_numeric based on the alpha value.
@raise_if_disconnected
@typechecked
def set(self, pos: float) -> None:
"""Set a new target position and wait until settled (settling_time).
Args:
val (str): Single-character axis identifier.
pos (float): New target position
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
"""
if isinstance(val, str):
if len(val) != 1:
raise ValueError("Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
raise TypeError(f"Expected value of type str but received {type(val)}")
self.controller._set_target_pos(self.channel, pos)
time.sleep(self.settling_time)
@property
def axis_Id_numeric(self):
"""
Return the numeric value of the axis_Id.
"""
return self._axis_Id_numeric
def connected(self) -> bool:
return self.controller.connected
@axis_Id_numeric.setter
def axis_Id_numeric(self, val: int):
@property
@raise_if_disconnected
def servo(self) -> int:
"""Get servo status
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
int: Servo status
"""
Set the axis_Id_numeric and axis_Id_alpha based on the numeric value.
return self.controller._get_servo(self.channel)
@servo.setter
@raise_if_disconnected
@typechecked
def servo(self, val: bool) -> None:
"""Set servo status
Args:
val (int): Numeric axis identifier.
val (bool): Servo status
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
"""
if isinstance(val, int):
if val > 26:
raise ValueError("Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
self.controller._set_servo(self.channel, val)
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
def settling_time(self) -> float:
return self._settling_time
def stage(self) -> list[object]:
return super().stage()
@settling_time.setter
@typechecked
def settling_time(self, val: float) -> None:
self._settling_time = val
print(f"Setting the npoint settling time to {val:.2f} s.")
def unstage(self) -> list[object]:
return super().unstage()
class NPointEpics(NPointAxis):
def __init__(self, controller: NPointController, channel: int, name: str) -> None:
super().__init__(controller, channel, name)
self.low_limit = -50
self.high_limit = 50
self._prefix = name
def get_pv(self) -> str:
return self.name
def get_position(self, readback=True) -> float:
if readback:
return self.get()
else:
return self.get_target_pos()
def within_limits(self, pos: float) -> bool:
return pos > self.low_limit and pos < self.high_limit
def move(self, position: float, wait=True) -> None:
self.set(position)
if __name__ == "__main__":
npx = NPointAxis(axis_Id="A", name="npx", host="nPoint000003.psi.ch", port=23)
npy = NPointAxis(axis_Id="B", name="npy", host="nPoint000003.psi.ch", port=23)
npx.controller.on()
print("socket is open, axis is ready!")
npx.move(10)
print(npx.read())
npx.controller.off()
## EXAMPLES ##
#
# Create controller and socket instance explicitly:
# controller = NPointController(SocketIO())
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
# Create controller instance explicitly
# controller = NPointController.create()
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
# Single-line axis:
# npointx = NPointAxis(NPointController.create(), 0, "nx")
#
# EPICS wrapper:
# nx = NPointEpics(NPointController.create(), 0, "nx")
controller = NPointController.create()
npointx = NPointAxis(NPointController.create(), 0, "nx")
npointy = NPointAxis(NPointController.create(), 0, "ny")

View File

@@ -1,9 +0,0 @@
def patch_dual_pvs(device):
device.wait_for_connection(all_signals=True)
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv

View File

@@ -1,5 +1,4 @@
from .flomni_fermat_scan import FlomniFermatScan
from .jungfrau_joch_scan import JungfrauJochTest
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .owis_grid import OwisGrid
from .sgalil_grid import SgalilGrid

View File

@@ -1,70 +0,0 @@
import time
from bec_lib import bec_logger
from bec_lib.device import DeviceBase
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
logger = bec_logger.logger
class JungfrauJochTest(AsyncFlyScanBase):
"""Owis-based grid scan."""
scan_name = "jjf_test"
#scan_report_hint = "device_progress"
required_kwargs = ["points", "exp_time", "readout_time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
gui_config = {
"Acquisition Parameters": ["num_points", "cycles"],
"Exposure Parameters" : ["exp_time", "readout_time"]
}
def __init__(
self,
num_points:int,
exp_time: float,
readout_time: float,
cycles:int =1,
**kwargs,
):
"""
JungfrauJoch Test scan.
Args:
device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs
num_points (int) : Number of points per burst
exp_time (float) : exposure time.
readout_time (float): readout time of detector
cycles (int) : number of cycles, default is 1
Example:
scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2)
"""
if readout_time <=0:
raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}")
super().__init__(exp_time=exp_time, readout_time=readout_time,**kwargs)
self.device = "ddg"
self.num_points = num_points
self.cycles = cycles
self.primary_readout_cycle = 0.2
def scan_core(self):
logger.info(f"Starting with Scan Core")
total_exposure = self.num_points * (self.exp_time + self.readout_time)
for i in range(self.cycles):
logger.info(f"Beginning cycle {i} of {self.cycles}")
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.read_and_wait(
group="primary",
wait_group="readout_primary",
point_id=self.point_id,
)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=total_exposure)
self.point_id +=1
logger.info(f"Finished cycle {i} of {self.cycles}")
logger.info(f"Finished scan")
self.num_pos = self.point_id + 1

20
docs/Makefile Normal file
View File

@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

View File

@@ -0,0 +1,34 @@
{{ fullname | escape | underline}}
.. currentmodule:: {{ module }}
.. autoclass:: {{ objname }}
:members:
:show-inheritance:
:inherited-members:
:special-members: __call__, __add__, __mul__
{% block methods %}
{% if methods %}
.. rubric:: {{ _('Methods') }}
.. autosummary::
:nosignatures:
{% for item in methods %}
{%- if not item.startswith('_') %}
~{{ name }}.{{ item }}
{%- endif -%}
{%- endfor %}
{% endif %}
{% endblock %}
{% block attributes %}
{% if attributes %}
.. rubric:: {{ _('Attributes') }}
.. autosummary::
{% for item in attributes %}
~{{ name }}.{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}

View File

@@ -0,0 +1,66 @@
{{ fullname | escape | underline}}
.. automodule:: {{ fullname }}
{% block attributes %}
{% if attributes %}
.. rubric:: Module attributes
.. autosummary::
:toctree:
{% for item in attributes %}
{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}
{% block functions %}
{% if functions %}
.. rubric:: {{ _('Functions') }}
.. autosummary::
:toctree:
:nosignatures:
{% for item in functions %}
{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}
{% block classes %}
{% if classes %}
.. rubric:: {{ _('Classes') }}
.. autosummary::
:toctree:
:template: custom-class-template.rst
:nosignatures:
{% for item in classes %}
{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}
{% block exceptions %}
{% if exceptions %}
.. rubric:: {{ _('Exceptions') }}
.. autosummary::
:toctree:
{% for item in exceptions %}
{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}
{% block modules %}
{% if modules %}
.. autosummary::
:toctree:
:template: custom-module-template.rst
:recursive:
{% for item in modules %}
{{ item }}
{%- endfor %}
{% endif %}
{% endblock %}

81
docs/conf.py Normal file
View File

@@ -0,0 +1,81 @@
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
import os
import pathlib
import tomli
project = "cSAXS"
copyright = "2023, Paul Scherrer Institute"
author = "Paul Scherrer Institute"
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
current_path = pathlib.Path(__file__).parent.parent.resolve()
version_path = f"{current_path}/pyproject.toml"
def get_version():
"""load the version from the version file"""
with open(version_path, "r", encoding="utf-8") as file:
res = tomli.loads(file.read())
return res["project"]["version"]
release = get_version()
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.autosummary",
# "sphinx.ext.coverage",
"sphinx.ext.viewcode",
"sphinx.ext.napoleon",
"sphinx_toolbox.collapse",
"sphinx_copybutton",
"myst_parser",
"sphinx_design",
]
myst_enable_extensions = [
"amsmath",
"attrs_inline",
"colon_fence",
"deflist",
"dollarmath",
"fieldlist",
"html_admonition",
"html_image",
"replacements",
"smartquotes",
"strikethrough",
"substitution",
"tasklist",
]
autosummary_generate = True # Turn on sphinx.ext.autosummary
add_module_names = False # Remove namespaces from class/method signatures
autodoc_inherit_docstrings = True # If no docstring, inherit from base class
set_type_checking_flag = True # Enable 'expensive' imports for sphinx_autodoc_typehints
autoclass_content = "both" # Include both class docstring and __init__
# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
language = "Python"
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = "pydata_sphinx_theme"
html_static_path = ["_static"]
html_css_files = ["css/custom.css"]
html_logo = "_static/bec.png"
html_theme_options = {"show_nav_level": 1, "navbar_align": "content"}

View File

@@ -0,0 +1,11 @@
(developer)=
# Development
```{toctree}
---
maxdepth: 1
hidden: true
---
reference/
```

View File

@@ -0,0 +1,10 @@
## API Reference
```{eval-rst}
.. autosummary::
:toctree: _autosummary
:template: custom-module-template.rst
:recursive:
csaxs_bec
```

39
docs/index.md Normal file
View File

@@ -0,0 +1,39 @@
# cSAXS documentation
<br><br>
````{grid} 3
:gutter: 5
```{grid-item-card} Introduction
:link: introduction
:link-type: ref
General information.
```
```{grid-item-card} User
:link: user
:link-type: ref
Information for users.
```
```{grid-item-card} Developer
:link: developer
:link-type: ref
Information for developers.
```
````
```{toctree}
---
numbered: true
maxdepth: 1
---
introduction/introduction
user/user
developer/developer

View File

@@ -0,0 +1,2 @@
(introduction)=
# Introduction

35
docs/make.bat Normal file
View File

@@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

9
docs/requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
sphinx
sphinx_copybutton
recommonmark
sphinx-toolbox
pydata-sphinx-theme
sphinx-copybutton
myst-parser
sphinx-design
tomli

2
docs/user/user.md Normal file
View File

@@ -0,0 +1,2 @@
(user)=
# User

View File

@@ -7,6 +7,16 @@ from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import Trigg
from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_DDGSetup():
mock_ddg = mock.MagicMock()

View File

@@ -10,13 +10,23 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_det():
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -29,9 +39,10 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(Eiger9McSAXS, "_init"):
det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
det = Eiger9McSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -39,6 +50,7 @@ def test_init():
"""Test the _init function:"""
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -60,7 +72,7 @@ def test_init():
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
) as mock_init_backend,
):
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
mock_init_backend.assert_called_once()
@@ -224,8 +236,8 @@ def test_stage(
mock_det.cam.beam_energy.put(scaninfo["mokev"])
mock_det.stopped = stopped
mock_det.cam.detector_state._read_pv.mock_data = detector_state
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
mock_det.filepath.set(scaninfo["filepath"]).wait()
with mock.patch.object(mock_det.custom_prepare, "prepare_detector_backend") as mock_prep_fw:
mock_det.filepath = scaninfo["filepath"]
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
@@ -239,7 +251,7 @@ def test_stage(
)
assert mock_det.cam.num_frames.get() == 1
mock_publish_file_location.assert_called_with(done=False, successful=False)
mock_publish_file_location.assert_called_with(done=False)
assert mock_det.cam.acquire.get() == 1
@@ -314,7 +326,7 @@ def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_excep
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_complete(mock_det, stopped, expected_exception):
def test_unstage(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
@@ -323,10 +335,10 @@ def test_complete(mock_det, stopped, expected_exception):
):
mock_det.stopped = stopped
if expected_exception:
mock_det.complete()
mock_det.unstage()
assert mock_det.stopped is True
else:
mock_det.complete()
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
@@ -345,11 +357,12 @@ def test_stop_detector_backend(mock_det):
[
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.filepath = scaninfo["filepath"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)

View File

@@ -11,13 +11,23 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS, FalconTimeoutError
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_det():
name = "falcon"
prefix = "X12SA-SITORO:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -32,9 +42,10 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(FalconcSAXS, "_init"):
det = FalconcSAXS(name=name, prefix=prefix, device_manager=dm)
det = FalconcSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -128,9 +139,9 @@ def test_stage(mock_det, scaninfo):
This includes testing _prep_det
"""
with (
mock.patch.object(mock_det.custom_prepare, "set_trigger") as mock_set_trigger,
mock.patch.object(mock_det, "set_trigger") as mock_set_trigger,
mock.patch.object(
mock_det.custom_prepare, "prepare_data_backend"
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_prep_data_backend,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
@@ -147,7 +158,7 @@ def test_stage(mock_det, scaninfo):
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
)
mock_prep_data_backend.assert_called_once()
mock_publish_file_location.assert_called_once_with(done=False, successful=False)
mock_publish_file_location.assert_called_once_with(done=False)
mock_arm_acquisition.assert_called_once()
@@ -193,11 +204,12 @@ def test_prepare_data_backend(mock_det, scaninfo):
[
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.filepath = scaninfo["filepath"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)
@@ -242,18 +254,24 @@ def test_trigger(mock_det):
mock_on_trigger.assert_called_once()
def test_complete(mock_det):
@pytest.mark.parametrize("stopped, expected_abort", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_abort):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = False
mock_det.complete()
assert mock_finished.call_count == 1
call = mock.call(done=True, successful=True)
assert mock_publish_file_location.call_args == call
mock_det.stopped = stopped
if expected_abort:
mock_det.unstage()
assert mock_det.stopped is stopped
assert mock_publish_file_location.call_count == 0
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is stopped
def test_stop(mock_det):

View File

@@ -16,13 +16,23 @@ from csaxs_bec.devices.epics.mcs_csaxs import (
ReadoutMode,
TriggerSource,
)
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_det():
name = "mcs"
prefix = "X12SA-MCS:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -37,9 +47,8 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(MCScSAXS, "_init"):
det = MCScSAXS(name=name, prefix=prefix, device_manager=dm)
det = MCScSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -47,6 +56,7 @@ def test_init():
"""Test the _init function:"""
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -58,6 +68,9 @@ def test_init():
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
with (
mock.patch(
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_default_parameter"
) as mock_default,
mock.patch(
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector"
) as mock_init_det,
@@ -65,7 +78,8 @@ def test_init():
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector_backend"
) as mock_init_backend,
):
MCScSAXS(name=name, prefix=prefix, device_manager=dm)
MCScSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
mock_init_backend.assert_called_once()
@@ -261,10 +275,23 @@ def test_prepare_detector_backend(mock_det):
assert mock_det.read_mode.get() == ReadoutMode.EVENT
def test_complete(mock_det):
with (mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,):
mock_det.complete()
assert mock_finished.call_count == 1
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
assert mock_det.stopped is True
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
def test_stop_detector_backend(mock_det):

View File

@@ -1,43 +1,49 @@
import copy
from unittest import mock
import pytest
from csaxs_bec.devices.npoint import NPointAxis, NPointController
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class SocketMock:
def __init__(self, sock=None):
self.buffer_put = ""
self.buffer_recv = ""
self.is_open = False
if sock is None:
self.open()
else:
self.sock = sock
@pytest.fixture
def controller():
"""
Fixture to create a NPointController object.
"""
with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls:
controller = NPointController(
socket_cls=socket_cls, socket_host="localhost", socket_port=1234
)
controller.on()
controller.sock.reset_mock()
yield controller
controller.off()
def connect(self, host, port):
print(f"connecting to {host} port {port}")
# self.sock.create_connection((host, port))
# self.sock.connect((host, port))
def _put(self, msg_bytes):
self.buffer_put = msg_bytes
print(self.buffer_put)
@pytest.fixture
def npointx():
"""
Fixture to create a NPointAxis object.
"""
controller = mock.MagicMock()
npointx = NPointAxis(
axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
)
npointx.controller.on()
npointx.controller.sock.reset_mock()
npointx.controller.sock.receive.reset_mock()
yield npointx
npointx.controller.off()
def _recv(self, buffer_length=1024):
print(self.buffer_recv)
return self.buffer_recv
def _initialize_socket(self):
pass
def put(self, msg):
return self._put(msg)
def receive(self, buffer_length=1024):
return self._recv(buffer_length=buffer_length)
def open(self):
self._initialize_socket()
self.is_open = True
def close(self):
self.sock = None
self.is_open = False
@pytest.mark.parametrize(
@@ -48,29 +54,12 @@ def npointx():
(-5, b"\xa2\x18\x12\x83\x1133\xff\xffU"),
],
)
def test_axis_put(npointx, pos, msg):
"""
Test that the set target position sends the correct message to the controller.
"""
npointx.controller.set_target_pos(npointx.axis_Id_numeric, pos)
npointx.controller.sock.put.assert_called_with(msg)
def test_npoint_axis_move(npointx):
"""
Test that the move method sends the correct messages to the controller.
It should send the set target position, followed by 2 get current position messages.
"""
npointx.controller.sock.receive.side_effect = [
b"\xa0\x34\x13\x83\x11\x00\x00\x00\x00U", # pos 0
b"\xa0\x34\x13\x83\x11\xcd\xcc\x00\x00U", # pos 5
]
npointx.move(5)
assert (
mock.call(b"\xa2\x18\x12\x83\x11\xcd\xcc\x00\x00U")
in npointx.controller.sock.put.mock_calls
)
assert len(npointx.controller.sock.put.mock_calls) == 3
def test_axis_put(pos, msg):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
npointx.set(pos)
assert npointx.controller.socket.buffer_put == msg
@pytest.mark.parametrize(
@@ -81,12 +70,13 @@ def test_npoint_axis_move(npointx):
(-5, b"\xa04\x13\x83\x11U", b"\xa0\x34\x13\x83\x1133\xff\xffU"),
],
)
def test_axis_get_out(npointx, pos, msg_in, msg_out):
"""
Test that the readback value is correctly read from the controller.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
assert pytest.approx(npointx.readback.get(), rel=0.01) == pos
def test_axis_get_out(pos, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
npointx.controller.socket.buffer_recv = msg_out
assert pytest.approx(npointx.get(), rel=0.01) == pos
# assert controller.socket.buffer_put == msg_in
@pytest.mark.parametrize(
@@ -97,40 +87,31 @@ def test_axis_get_out(npointx, pos, msg_in, msg_out):
(2, b"\xa043\x83\x11U", b"\xa0\x34\x13\x83\x1133\xff\xffU"),
],
)
def test_axis_get_in(npointx, axis, msg_in, msg_out):
"""
Test that the readback value is correctly read from the controller by directly calling the
controller's method.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
npointx.controller.get_current_pos(axis)
npointx.controller.sock.put.assert_called_once_with(msg_in)
def test_axis_get_in(axis, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
controller.socket.buffer_recv = msg_out
controller._get_current_pos(axis)
assert controller.socket.buffer_put == msg_in
def test_axis_out_of_range(controller):
"""
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
"""
def test_axis_out_of_range():
controller = NPointController(SocketMock())
with pytest.raises(ValueError):
npointx = NPointAxis(
axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
)
npointx = NPointAxis(controller, 3, "nx")
def test_get_axis_out_of_range(controller):
"""
Test that an error is raised when trying to get the current position of an invalid axis.
"""
def test_get_axis_out_of_range():
controller = NPointController(SocketMock())
with pytest.raises(ValueError):
controller.get_current_pos(3)
controller._get_current_pos(3)
def test_set_axis_out_of_range(controller):
"""
Test that an error is raised when trying to set the target position of an invalid axis.
"""
def test_set_axis_out_of_range():
controller = NPointController(SocketMock())
with pytest.raises(ValueError):
controller.set_target_pos(3, 5)
controller._set_target_pos(3, 5)
@pytest.mark.parametrize(
@@ -142,9 +123,6 @@ def test_set_axis_out_of_range(controller):
],
)
def test_hex_list_to_int(in_buffer, byteorder, signed, val):
"""
Test that the hex list is correctly converted to an integer
"""
assert (
NPointController._hex_list_to_int(
copy.deepcopy(in_buffer), byteorder=byteorder, signed=signed
@@ -161,12 +139,10 @@ def test_hex_list_to_int(in_buffer, byteorder, signed, val):
(2, b"\xa0x0\x83\x11U", b"\xa0\x78\x13\x83\x11\x64\x00\x00\x00U"),
],
)
def test_get_range(npointx, axis, msg_in, msg_out):
"""
Test that the range is correctly read from the controller by directly calling the
controller's method.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
val = npointx.controller._get_range(axis)
npointx.controller.sock.put.assert_called_once_with(msg_in)
assert val == 100
def test_get_range(axis, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
controller.socket.buffer_recv = msg_out
val = controller._get_range(axis)
assert controller.socket.buffer_put == msg_in and val == 100

View File

@@ -11,13 +11,23 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.pilatus_csaxs import PilatuscSAXS
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_det():
name = "pilatus"
prefix = "X12SA-ES-PILATUS300K:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -30,7 +40,9 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(PilatuscSAXS, "_init"):
det = PilatuscSAXS(name=name, prefix=prefix, device_manager=dm)
det = PilatuscSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
patch_dual_pvs(det)
yield det
@@ -49,7 +61,7 @@ def test_init_detector(mock_det, trigger_source, detector_state):
Validation upon setting the correct PVs
"""
mock_det.custom_prepare.on_init() # call the method you want to test
mock_det.custom_prepare.initialize_detector() # call the method you want to test
assert mock_det.cam.acquire.get() == detector_state
assert mock_det.cam.trigger_mode.get() == trigger_source
@@ -84,8 +96,6 @@ def test_init_detector(mock_det, trigger_source, detector_state):
],
)
def test_stage(mock_det, scaninfo, stopped, expected_exception):
path = "tmp"
mock_det.filepath_raw = path
with mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@@ -95,12 +105,14 @@ def test_stage(mock_det, scaninfo, stopped, expected_exception):
mock_det.device_manager.add_device("mokev", value=12.4)
mock_det.stopped = stopped
with (
mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_data_backend,
mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_data_backend,
mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time,
):
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.filepath = scaninfo["filepath"]
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
@@ -115,13 +127,11 @@ def test_stage(mock_det, scaninfo, stopped, expected_exception):
)
assert mock_det.cam.num_frames.get() == 1
mock_publish_file_location.assert_called_once_with(
done=False, successful=False, metadata={"input_path": path}
)
mock_publish_file_location.assert_called_with(done=False)
def test_pre_scan(mock_det):
mock_det.custom_prepare.on_pre_scan()
mock_det.custom_prepare.pre_scan()
assert mock_det.cam.acquire.get() == 1
@@ -159,16 +169,23 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
"scan_id": "123",
}
),
(
{
"filepath": "test.h5",
"filepath_raw": "test5_raw.h5",
"successful": None,
"done": True,
"scan_id": "123",
}
),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath_raw = scaninfo["filepath_raw"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"],
successful=scaninfo["successful"],
metadata={"input_path": scaninfo["filepath_raw"]},
done=scaninfo["done"], successful=scaninfo["successful"]
)
if scaninfo["successful"] is None:
msg = messages.FileMessage(
@@ -386,19 +403,23 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e
assert call == mock_call
def test_complete(mock_det):
path = "tmp"
mock_det.filepath_raw = path
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.complete()
assert mock_finished.call_count == 1
call = mock.call(done=True, successful=True, metadata={"input_path": path})
assert mock_publish_file_location.call_args == call
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
assert mock_det.stopped is True
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
def test_stop(mock_det):

View File

@@ -373,7 +373,6 @@ def test_LamNIFermatScan(scan_msg, reference_scan_list):
reference_scan_list[ii].content["parameter"]["rpc_id"] = scan_instructions[
ii
].content["parameter"]["rpc_id"]
reference_scan_list[ii].metadata["RID"] = scan_instructions[ii].metadata.get("RID")
if instr.content["parameter"].get("value"):
assert np.isclose(
instr.content["parameter"].get("value"),