Compare commits

...

19 Commits

Author SHA1 Message Date
gac-x12sa
39491e3189 feat: first draft for jungfraujoch ophyd 2024-10-09 16:38:25 +02:00
gac-x12sa
9dd96c9f6e feat: add jfj test scan, refactor ddg csaxs 2024-10-08 15:01:58 +02:00
gac-x12sa
c16642b848 refactor: simple refactoring of ddg for test purposes 2024-10-07 17:22:54 +02:00
ci_update_bot
b5a5082919 docs: Update device list 2024-10-03 13:42:26 +00:00
3a40b4a37e refactor(npoint): minor improvements to the npoint config template 2024-10-02 18:31:53 +02:00
baafa982e3 refactor(npoint): cleanup 2024-10-02 18:27:02 +02:00
8c2d705a89 docs: improved npoint test docs 2024-10-02 18:21:11 +02:00
59db1c067b test: fixed npoint tests after axis refactoring 2024-10-02 18:11:51 +02:00
gac-x12sa
15bdbe2e03 feat: added npoint support to ophyd 2024-10-02 11:50:39 +02:00
0d2b4c4423 refactor(npoint): cleanup 2024-09-18 21:49:49 +02:00
59e0755e14 test: fixed test for unique rids 2024-06-25 10:06:26 +02:00
ci_update_bot
2e6d1ad343 docs: Update device list 2024-06-23 09:04:52 +00:00
fdfb6db84b refactor: publish file location with successful and done 2024-06-06 15:43:32 +02:00
4e4ca325ab fix: use on_complete instead of on_unstage and adapt changes for base class 2024-06-04 09:02:06 +02:00
ci_update_bot
319771e2aa docs: Update device list 2024-05-27 10:48:14 +00:00
26b8ac997d refactor: cleanup imports, tests and classes 2024-05-27 08:49:53 +02:00
3f21090441 refactor: moved patch_dual_pvs to devices.tests_utils 2024-05-27 08:34:11 +02:00
34fdc39ebc refactor: moved patch pvs to conftest.py 2024-05-27 08:34:11 +02:00
f3d7f1ba64 feat: adapt detector classes and test for psi_detector_base refactoring 2024-05-27 08:34:11 +02:00
23 changed files with 1540 additions and 1038 deletions

View File

@@ -0,0 +1,46 @@
ddg:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.epics.delay_generator_csaxs.DelayGeneratorcSAXS
deviceConfig:
prefix: 'X12SA-CPCL-DDG3:'
ddg_config:
delay_burst: 40.e-3
delta_width: 0
additional_triggers: 0
polarity:
- 1 # T0
- 0 # eiger and LeCroy4
- 1
- 1
- 1
amplitude: 4.5
offset: 0
thres_trig_level: 2.5
set_high_on_exposure: False
set_high_on_stage: False
deviceTags:
- cSAXS
- ddg_detectors
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: True
eiger_jfjoch:
description: DelayGenerator for detector triggering
deviceClass: csaxs_bec.devices.jungfraujoch.eiger_jungfrau_joch.Eiger9McSAXS
deviceConfig:
deviceTags:
- cSAXS
- eiger9m
onFailure: buffer
enabled: true
readoutPriority: async
softwareTrigger: False
simulated_monitor:
readoutPriority: monitored
deviceClass: ophyd_devices.SimMonitor
deviceConfig:
deviceTags:
- beamline
enabled: true
readOnly: false

View File

@@ -0,0 +1,38 @@
############################################################
#################### npoint motors #########################
############################################################
npx:
description: nPoint x axis on the big npoint controller
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: A
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
deviceTags:
- npoint
npy:
description: nPoint y axis on the big npoint controller
deviceClass: csaxs_bec.devices.npoint.npoint.NPointAxis
deviceConfig:
axis_Id: B
host: "nPoint000003.psi.ch"
limits:
- -50
- 50
port: 23
sign: 1
enabled: true
onFailure: buffer
readOnly: false
readoutPriority: baseline
deviceTags:
- npoint

View File

@@ -3,35 +3,36 @@
### csaxs_bec
| Device | Documentation | Module |
| :----- | :------------- | :------ |
| InsertionDevice | Python wrapper for the CSAXS insertion device control<br><br> This wrapper provides a positioner interface for the ID control.<br> is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.InsertionDevice](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/InsertionDevice.py) |
| XbpmBase | Python wrapper for X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmCsaxsOp | Python wrapper for custom XBPMs in the cSAXS optics hutch<br><br> This is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmSim | Python wrapper for simulated X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br><br> This simulation device extends the basic proxy with a script that<br> fills signals with quasi-randomized values.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| DelayGeneratorcSAXS | <br> DG645 delay generator at cSAXS (multiple can be in use depending on the setup)<br><br> Default values for setting up DDG.<br> Note: checks of set calues are not (only partially) included, check manual for details on possible settings.<br> https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf<br><br> - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode<br> - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition<br> - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)<br> - polarity : (list of 0/1) polarity for different channels<br> - amplitude : (float) amplitude voltage of TTLs<br> - offset : (float) offset for ampltitude<br> - thres_trig_level : (float) threshold of trigger amplitude<br><br> Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):<br><br> - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.<br> # TODO trigger_width and fixed_ttl could be combined into single list.<br> - fixed_ttl_width : (list of either 1 or 0), one for each channel.<br> - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.<br> - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.<br> - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).<br> - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.<br> | [csaxs_bec.devices.epics.delay_generator_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/delay_generator_csaxs.py) |
| Eiger9McSAXS | <br> Eiger9M detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> Various EpicsPVs for controlling the detector<br> | [csaxs_bec.devices.epics.eiger9m_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/eiger9m_csaxs.py) |
| SLSDetectorCam | SLS Detector Camera - Pilatus<br><br> Base class to map EPICS PVs to ophyd signals.<br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| EpicsDXPFalcon | <br> DXP parameters for Falcon detector<br><br> Base class to map EPICS PVs from DXP parameters to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconHDF5Plugins | <br> HDF5 parameters for Falcon detector<br><br> Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconcSAXS | <br> Falcon Sitoro detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector<br> mca (EpicsMCARecord) : MCA parameters for Falcon detector<br> hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector<br> MIN_READOUT (float) : Minimum readout time for the detector<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| MCScSAXS | MCS card for cSAXS for implementation at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| SIS38XX | SIS38XX card for access to EPICs PVs at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| PilatuscSAXS | Pilatus_2 300k detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> cam (SLSDetectorCam) : Detector camera<br> MIN_READOUT (float) : Minimum readout time for the detector<br><br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| Bpm4i | | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| DelayGeneratorcSAXS | <br> DG645 delay generator at cSAXS (multiple can be in use depending on the setup)<br><br> Default values for setting up DDG.<br> Note: checks of set calues are not (only partially) included, check manual for details on possible settings.<br> https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf<br><br> - delay_burst : (float >=0) Delay between trigger and first pulse in burst mode<br> - delta_width : (float >= 0) Add width to fast shutter signal to make sure its open during acquisition<br> - additional_triggers : (int) add additional triggers to burst mode (mcs card needs +1 triggers per line)<br> - polarity : (list of 0/1) polarity for different channels<br> - amplitude : (float) amplitude voltage of TTLs<br> - offset : (float) offset for ampltitude<br> - thres_trig_level : (float) threshold of trigger amplitude<br><br> Custom signals for logic in different DDGs during scans (for custom_prepare.prepare_ddg):<br><br> - set_high_on_exposure : (bool): if True, then TTL signal should go high during the full acquisition time of a scan.<br> # TODO trigger_width and fixed_ttl could be combined into single list.<br> - fixed_ttl_width : (list of either 1 or 0), one for each channel.<br> - trigger_width : (float) if fixed_ttl_width is True, then the width of the TTL pulse is set to this value.<br> - set_trigger_source : (TriggerSource) specifies the default trigger source for the DDG.<br> - premove_trigger : (bool) if True, then a trigger should be executed before the scan starts (to be implemented in on_pre_scan).<br> - set_high_on_stage : (bool) if True, then TTL signal should go high already on stage.<br> | [csaxs_bec.devices.epics.delay_generator_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/delay_generator_csaxs.py) |
| Eiger1p5MDetector | | [csaxs_bec.devices.omny.eiger1p5m](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/eiger1p5m.py) |
| Eiger9McSAXS | <br> Eiger9M detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> Various EpicsPVs for controlling the detector<br> | [csaxs_bec.devices.epics.eiger9m_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/eiger9m_csaxs.py) |
| EpicsDXPFalcon | <br> DXP parameters for Falcon detector<br><br> Base class to map EPICS PVs from DXP parameters to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconcSAXS | <br> Falcon Sitoro detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector<br> dxp (EpicsDXPFalcon) : DXP parameters for Falcon detector<br> mca (EpicsMCARecord) : MCA parameters for Falcon detector<br> hdf5 (FalconHDF5Plugins) : HDF5 parameters for Falcon detector<br> MIN_READOUT (float) : Minimum readout time for the detector<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FalconHDF5Plugins | <br> HDF5 parameters for Falcon detector<br><br> Base class to map EPICS PVs from HDF5 Plugin to ophyd signals.<br> | [csaxs_bec.devices.epics.falcon_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/falcon_csaxs.py) |
| FlomniGalilMotor | | [csaxs_bec.devices.omny.galil.fgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fgalil_ophyd.py) |
| FlomniSampleStorage | | [csaxs_bec.devices.omny.flomni_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/flomni_sample_storage.py) |
| FuprGalilMotor | | [csaxs_bec.devices.omny.galil.fupr_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fupr_ophyd.py) |
| GirderMotorPITCH | Girder YAW pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorROLL | Girder ROLL pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorX1 | Girder X translation pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorY1 | Girder Y translation pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| GirderMotorYAW | Girder YAW pseudo motor | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| InsertionDevice | Python wrapper for the CSAXS insertion device control<br><br> This wrapper provides a positioner interface for the ID control.<br> is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.InsertionDevice](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/InsertionDevice.py) |
| LamniGalilMotor | | [csaxs_bec.devices.omny.galil.lgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/lgalil_ophyd.py) |
| MCScSAXS | MCS card for cSAXS for implementation at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| NPointAxis | <br> NPointAxis class, which inherits from Device and PositionerBase. This class<br> represents an axis of an nPoint piezo stage and provides the necessary<br> functionality to move the axis and read its current position.<br> | [csaxs_bec.devices.npoint.npoint](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/npoint/npoint.py) |
| OMNYSampleStorage | | [csaxs_bec.devices.omny.omny_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/omny_sample_storage.py) |
| PilatuscSAXS | Pilatus_2 300k detector for CSAXS<br><br> Parent class: PSIDetectorBase<br><br> class attributes:<br> custom_prepare_cls (Eiger9MSetup) : Custom detector setup class for cSAXS,<br> inherits from CustomDetectorMixin<br> cam (SLSDetectorCam) : Detector camera<br> MIN_READOUT (float) : Minimum readout time for the detector<br><br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| PmDetectorRotation | Detector rotation pseudo motor<br><br> Small wrapper to convert detector pusher position to rotation angle.<br> | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| PmMonoBender | Monochromator bender<br><br> Small wrapper to combine the four monochromator bender motors.<br> | [csaxs_bec.devices.epics.specMotors](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/specMotors.py) |
| Xeye | | [csaxs_bec.devices.sls_devices.cSAXS.xeye](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/sls_devices/cSAXS/xeye.py) |
| SmaractMotor | | [csaxs_bec.devices.smaract.smaract_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/smaract/smaract_ophyd.py) |
| Eiger1p5MDetector | | [csaxs_bec.devices.omny.eiger1p5m](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/eiger1p5m.py) |
| FlomniSampleStorage | | [csaxs_bec.devices.omny.flomni_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/flomni_sample_storage.py) |
| OMNYSampleStorage | | [csaxs_bec.devices.omny.omny_sample_storage](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/omny_sample_storage.py) |
| FlomniGalilMotor | | [csaxs_bec.devices.omny.galil.fgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fgalil_ophyd.py) |
| FuprGalilMotor | | [csaxs_bec.devices.omny.galil.fupr_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/fupr_ophyd.py) |
| LamniGalilMotor | | [csaxs_bec.devices.omny.galil.lgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/lgalil_ophyd.py) |
| SGalilMotor | "SGalil Motors at cSAXS have a<br> DC motor (y axis - vertical) - implemented as C<br> and a step motor (x-axis horizontal) - implemented as E<br> that require different communication for control<br> | [csaxs_bec.devices.omny.galil.sgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/sgalil_ophyd.py) |
| RtFlomniMotor | | [csaxs_bec.devices.omny.rt.rt_flomni_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/rt/rt_flomni_ophyd.py) |
| RtLamniMotor | | [csaxs_bec.devices.omny.rt.rt_lamni_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/rt/rt_lamni_ophyd.py) |
| SGalilMotor | "SGalil Motors at cSAXS have a<br> DC motor (y axis - vertical) - implemented as C<br> and a step motor (x-axis horizontal) - implemented as E<br> that require different communication for control<br> | [csaxs_bec.devices.omny.galil.sgalil_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/omny/galil/sgalil_ophyd.py) |
| SIS38XX | SIS38XX card for access to EPICs PVs at cSAXS beamline | [csaxs_bec.devices.epics.mcs_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/mcs_csaxs.py) |
| SLSDetectorCam | SLS Detector Camera - Pilatus<br><br> Base class to map EPICS PVs to ophyd signals.<br> | [csaxs_bec.devices.epics.pilatus_csaxs](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/pilatus_csaxs.py) |
| SmaractMotor | | [csaxs_bec.devices.smaract.smaract_ophyd](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/smaract/smaract_ophyd.py) |
| XbpmBase | Python wrapper for X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmCsaxsOp | Python wrapper for custom XBPMs in the cSAXS optics hutch<br><br> This is completely custom XBPM with templates directly in the<br> VME repo. Thus it needs a custom ophyd template as well...<br><br> WARN: The x and y are not updated by the IOC<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| XbpmSim | Python wrapper for simulated X-ray Beam Position Monitors<br><br> XBPM's consist of a metal-coated diamond window that ejects<br> photoelectrons from the incoming X-ray beam. These electons<br> are collected and their current is measured. Effectively<br> they act as four quadrant photodiodes and are used as BPMs<br> at the undulator beamlines of SLS.<br><br> Note: EPICS provided signals are read only, but the user can<br> change the beam position offset.<br><br> This simulation device extends the basic proxy with a script that<br> fills signals with quasi-randomized values.<br> | [csaxs_bec.devices.epics.XbpmBase](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/epics/XbpmBase.py) |
| Xeye | | [csaxs_bec.devices.sls_devices.cSAXS.xeye](https://gitlab.psi.ch/bec/csaxs_bec/-/blob/main/csaxs_bec/devices/sls_devices/cSAXS/xeye.py) |

View File

@@ -1,5 +1,8 @@
import time
from bec_lib import bec_logger
from ophyd import Component
from ophyd import Component, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import (
DDGCustomMixin,
PSIDelayGeneratorBase,
@@ -14,6 +17,165 @@ class DelayGeneratorError(Exception):
"""Exception raised for errors."""
# class DDGSetup(DDGCustomMixin):
# """
# Mixin class for DelayGenerator logic at cSAXS.
# At cSAXS, multiple DDGs were operated at the same time. There different behaviour is
# implemented in the ddg_config signals that are passed via the device config.
# """
# def initialize_default_parameter(self) -> None:
# """Method to initialize default parameters."""
# for ii, channel in enumerate(self.parent.all_channels):
# self.parent.set_channels("polarity", self.parent.polarity.get()[ii], [channel])
# self.parent.set_channels("amplitude", self.parent.amplitude.get())
# self.parent.set_channels("offset", self.parent.offset.get())
# # Setup reference
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch1" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_channels(
# "reference", 0, [f"channel{pair}.ch2" for pair in self.parent.all_delay_pairs]
# )
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # Set threshold level for ext. pulses
# self.parent.level.put(self.parent.thres_trig_level.get())
# def prepare_ddg(self) -> None:
# """
# Method to prepare scan logic of cSAXS
# Two scantypes are supported: "step" and "fly":
# - step: Scan is performed by stepping the motor and acquiring data at each step
# - fly: Scan is performed by moving the motor with a constant velocity and acquiring data
# Custom logic for different DDG behaviour during scans.
# - set_high_on_exposure : If True, then TTL signal is high during
# the full exposure time of the scan (all frames).
# E.g. Keep shutter open for the full scan.
# - fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
# If the value is 0, then the width of the TTL pulse is determined,
# no matter which parameters are passed from the scaninfo for exposure time
# - set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
# were: SINGLE_SHOT, EXT_RISING_EDGE
# """
# self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# # scantype "step"
# if self.parent.scaninfo.scan_type == "step":
# # High on exposure means that the signal
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.frames_per_trigger
# * (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# # scantype "fly"
# elif self.parent.scaninfo.scan_type == "fly":
# if self.parent.set_high_on_exposure.get():
# # caluculate parameters
# exp_time = (
# self.parent.delta_width.get()
# + self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
# + self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
# )
# total_exposure = exp_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = 1 + self.parent.additional_triggers.get()
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# for value, channel in zip(
# self.parent.fixed_ttl_width.get(), self.parent.all_channels
# ):
# logger.debug(f"Trying to set DDG {channel} to {value}")
# if value != 0:
# self.parent.set_channels("width", value, channels=[channel])
# else:
# # caluculate parameters
# exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
# total_exposure = exp_time + self.parent.scaninfo.readout_time
# delay_burst = self.parent.delay_burst.get()
# num_burst_cycle = (
# self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
# )
# # Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
# if not self.parent.trigger_width.get():
# self.parent.set_channels("width", exp_time)
# else:
# self.parent.set_channels("width", self.parent.trigger_width.get())
# else:
# raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# # Set common DDG parameters
# self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
# self.parent.set_channels("delay", 0.0)
# def on_trigger(self) -> None:
# """Method to be executed upon trigger"""
# if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
# self.parent.trigger_shot.put(1)
# def check_scan_id(self) -> None:
# """
# Method to check if scan_id has changed.
# If yes, then it changes parent.stopped to True, which will stop further actions.
# """
# old_scan_id = self.parent.scaninfo.scan_id
# self.parent.scaninfo.load_scan_metadata()
# if self.parent.scaninfo.scan_id != old_scan_id:
# self.parent.stopped = True
# def finished(self) -> None:
# """Method checks if DDG finished acquisition"""
# def on_pre_scan(self) -> None:
# """
# Method called by pre_scan hook in parent class.
# Executes trigger if premove_trigger is Trus.
# """
# if self.parent.premove_trigger.get() is True:
# self.parent.trigger_shot.put(1)
class DDGSetup(DDGCustomMixin):
"""
Mixin class for DelayGenerator logic at cSAXS.
@@ -41,114 +203,93 @@ class DDGSetup(DDGCustomMixin):
self.parent.level.put(self.parent.thres_trig_level.get())
def prepare_ddg(self) -> None:
"""
Method to prepare scan logic of cSAXS
Two scantypes are supported: "step" and "fly":
- step: Scan is performed by stepping the motor and acquiring data at each step
- fly: Scan is performed by moving the motor with a constant velocity and acquiring data
Custom logic for different DDG behaviour during scans.
- set_high_on_exposure : If True, then TTL signal is high during
the full exposure time of the scan (all frames).
E.g. Keep shutter open for the full scan.
- fixed_ttl_width : fixed_ttl_width is a list of 5 values, one for each channel.
If the value is 0, then the width of the TTL pulse is determined,
no matter which parameters are passed from the scaninfo for exposure time
- set_trigger_source : Specifies the default trigger source for the DDG. For cSAXS, relevant ones
were: SINGLE_SHOT, EXT_RISING_EDGE
"""
self.parent.set_trigger(getattr(TriggerSource, self.parent.set_trigger_source.get()))
# scantype "step"
if self.parent.scaninfo.scan_type == "step":
# High on exposure means that the signal
if self.parent.set_high_on_exposure.get():
# caluculate parameters
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# scantype "jjf_test"
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
# exp_time = self.parent.scaninfo.exp_time
# readout = self.parent.scaninfo.readout_time
# num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
# total_exposure = exp_time+readout
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
delay = 0
delay_burst = self.parent.delay_burst.get()
self.parent.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
self.parent.set_channels(signal='width', value=exp_time)
self.parent.set_channels(signal='delay', value=delay)
self.parent.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.frames_per_trigger
* (self.parent.scaninfo.exp_time + self.parent.scaninfo.readout_time)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.frames_per_trigger + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
# scantype "fly"
elif self.parent.scaninfo.scan_type == "fly":
if self.parent.set_high_on_exposure.get():
# caluculate parameters
exp_time = (
self.parent.delta_width.get()
+ self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points
+ self.parent.scaninfo.readout_time * (self.parent.scaninfo.num_points - 1)
)
total_exposure = exp_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = 1 + self.parent.additional_triggers.get()
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
for value, channel in zip(
self.parent.fixed_ttl_width.get(), self.parent.all_channels
):
logger.debug(f"Trying to set DDG {channel} to {value}")
if value != 0:
self.parent.set_channels("width", value, channels=[channel])
else:
# caluculate parameters
exp_time = self.parent.delta_width.get() + self.parent.scaninfo.exp_time
total_exposure = exp_time + self.parent.scaninfo.readout_time
delay_burst = self.parent.delay_burst.get()
num_burst_cycle = (
self.parent.scaninfo.num_points + self.parent.additional_triggers.get()
)
# Set individual channel widths, if fixed_ttl_width and trigger_width are combined, this can be a common call too
if not self.parent.trigger_width.get():
self.parent.set_channels("width", exp_time)
else:
self.parent.set_channels("width", self.parent.trigger_width.get())
else:
raise Exception(f"Unknown scan type {self.parent.scaninfo.scan_type}")
# Set common DDG parameters
self.parent.burst_enable(num_burst_cycle, delay_burst, total_exposure, config="first")
self.parent.set_channels("delay", 0.0)
def on_stage(self) -> None:
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
self.parent.set_channels("width", exp_time)
self.parent.set_channels("delay", 0.0)
logger.info(f"{self.parent.name}: On stage with n_burst: {num_burst_cycle} and total_exp {total_exposure}")
self.parent.burst_enable(num_burst_cycle, 0, total_exposure, config="first")
def on_trigger(self) -> None:
"""Method to be executed upon trigger"""
if self.parent.source.read()[self.parent.source.name]["value"] == TriggerSource.SINGLE_SHOT:
self.parent.trigger_shot.put(1)
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name == "jjf_test":
exp_time = 480e-6#self.parent.scaninfo.exp_time
readout = 20e-6#self.parent.scaninfo.readout_time
total_exposure = exp_time+readout
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
num_burst_cycle = int(num_burst_cycle * self.parent.scaninfo.exp_time/total_exposure)
cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
#time.sleep(num_burst_cycle*total_exposure)
def check_ddg()->int:
self.parent.trigger_burst_readout.put(1)
return self.parent.burst_cycle_finished.get()
status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
timeout=num_burst_cycle*total_exposure+1,
check_stopped=True,
exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
)
logger.info(f"Return status {self.parent.name}")
return status
# timer = 0
# while True:
# self.parent.trigger_burst_readout.put(1)
# state = self.parent.burst_cycle_finished.get()
# if state == 1:
# break
# time.sleep(0.05)
# timer +=0.05
# if timer>3:
# raise TimeoutError(f"{self.parent.name} did not return. Bit state for end_burst_cycle is {state} for state")
def on_complete(self) -> DeviceStatus:
pass
# logger.info(f"On complete started for {self.parent.name}")
# scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
# if scan_name != "jjf_test":
# return None
# def check_ddg()->int:
# lambda r : self.parent.trigger_burst_readout.put(1)
# return lambda r: self.parent.burst_cycle_finished.get()
# status = self.wait_with_status(signal_conditions=[(check_ddg, 1)],
# timeout=3,
# check_stopped=True,
# exception_on_timeout=DelayGeneratorError(f"{self.parent.name} run into timeout in complete call.")
# )
# logger.info(f"Return status {self.parent.name}")
# return status
def check_scan_id(self) -> None:
"""
@@ -203,6 +344,7 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
custom_prepare_cls = DDGSetup
delay_burst = Component(
bec_utils.ConfigSignal, name="delay_burst", kind="config", config_storage_name="ddg_config"
)
@@ -288,7 +430,6 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ddg_config=None,
**kwargs,
):
@@ -334,7 +475,6 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
@@ -342,4 +482,48 @@ class DelayGeneratorcSAXS(PSIDelayGeneratorBase):
if __name__ == "__main__":
# Start delay generator in simulation mode.
# Note: To run, access to Epics must be available.
dgen = DelayGeneratorcSAXS("delaygen:DG1:", name="dgen", sim_mode=True)
import time
config = {
"delay_burst": 40.0e-3,
"delta_width": 0,
"additional_triggers": 0,
"polarity": [1, 0, 1, 1, 1], # T0 # to eiger and lecroy4
"amplitude": 4.5,
"offset": 0,
"thres_trig_level": 2.5,
"set_high_on_exposure": False,
"set_high_on_stage": False,
}
start = time.time()
print(f"Start with init of DDG3 with config: {config}")
dgen = DelayGeneratorcSAXS("X12SA-CPCL-DDG3:", name="dgen", ddg_config=config)
print(f"Finished init after: {time.time()-start}s")
start = time.time()
print(f"Start setting up DDG3")
exp_time = 1/(2e3) # 2 kHz
readout = exp_time/10
delay = 0
num_burst_cycle = 1e4 # N triggers
total_exposure = exp_time+readout
delay_burst = dgen.delay_burst.get()
dgen.set_trigger(trigger_source=TriggerSource.SINGLE_SHOT)
dgen.set_channels(signal='width', value=exp_time)
dgen.set_channels(signal='delay', value=0)
dgen.burst_enable(count=num_burst_cycle, delay=delay_burst, period=total_exposure, config="first")
print(f"Start sending {num_burst_cycle} triggers after {time.time()-start}s, ETA {num_burst_cycle*total_exposure}s")
break_time = time.time()
dgen.trigger()
# Wait here briefly for status to finish, whether this is realiable has to be tested
time.sleep(num_burst_cycle*total_exposure)
timer = 0
while True:
dgen.trigger_burst_readout.put(1)
state = dgen.burst_cycle_finished.get()
if state == 1:
break
time.sleep(0.05)
timer +=0.05
if timer>3:
raise TimeoutError(f"dgen.name did not return with value {state} for state")
print(f"Finished trigger cascade of {num_burst_cycle} with {exp_time}s -> {num_burst_cycle*exp_time}s after {time.time()-start}s in total, {break_time} for sending triggers.")

View File

@@ -5,8 +5,6 @@ import time
from typing import Any
import numpy as np
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
@@ -42,6 +40,17 @@ class Eiger9MSetup(CustomDetectorMixin):
self.std_client = None
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize the detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_detector(self) -> None:
"""Initialize detector"""
self.stop_detector()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def initialize_default_parameter(self) -> None:
"""Set default parameters for Eiger9M detector"""
self.update_readout_time()
@@ -55,29 +64,19 @@ class Eiger9MSetup(CustomDetectorMixin):
)
self.parent.readout_time = max(readout_time, self.parent.MIN_READOUT)
def initialize_detector(self) -> None:
"""Initialize detector"""
# Stops the detector
self.stop_detector()
# Sets the trigger source to GATING
self.parent.set_trigger(TriggerSource.GATING)
def initialize_detector_backend(self) -> None:
"""Initialize detector backend"""
# Std client
self.std_client = StdDaqClient(url_base=self.std_rest_server_url)
# Stop writer
self.std_client.stop_writer()
# Change e-account
eacc = self.parent.scaninfo.username
self.update_std_cfg("writer_user_id", int(eacc.strip(" e")))
signal_conditions = [(lambda: self.std_client.get_status()["state"], "READY")]
if not self.wait_for_signals(
signal_conditions=signal_conditions, timeout=self.parent.timeout, all_signals=True
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
all_signals=True,
):
raise EigerTimeoutError(
f"Std client not in READY state, returns: {self.std_client.get_status()}"
@@ -98,7 +97,6 @@ class Eiger9MSetup(CustomDetectorMixin):
"""
# Load config from client and check old value
cfg = self.std_client.get_config()
old_value = cfg.get(cfg_key)
if old_value is None:
@@ -111,19 +109,112 @@ class Eiger9MSetup(CustomDetectorMixin):
f" {type(old_value)}:{old_value}"
)
# Update config with new value and send back to client
cfg.update({cfg_key: value})
logger.debug(cfg)
self.std_client.set_config(cfg)
logger.debug(f"Updated std_daq config for key {cfg_key} from {old_value} to {value}")
def on_stage(self) -> None:
"""Prepare the detector for scan"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.cam.trigger_mode.put(TriggerSource.GATING)
def set_detector_threshold(self) -> None:
"""
Set the detector threshold
The function sets the detector threshold automatically to 1/2 of the beam energy.
"""
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
setpoint = int(mokev * factor)
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
if setpoint != energy:
self.parent.cam.beam_energy.set(setpoint)
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
self.update_readout_time()
def prepare_data_backend(self) -> None:
"""Prepare the data backend for the scan"""
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
self.filepath_exists(self.parent.filepath.get())
self.stop_detector_backend()
try:
self.std_client.start_writer_async(
{
"output_file": self.parent.filepath.get(),
"n_images": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
}
)
except Exception as exc:
time.sleep(5)
if self.std_client.get_status()["state"] == "READY":
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
signal_conditions = [
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=False,
all_signals=True,
):
raise EigerTimeoutError(
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
f" {self.std_client.get_status()}"
)
def on_unstage(self) -> None:
"""Unstage the detector"""
pass
def on_complete(self) -> None:
"""Complete the detector"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
def on_stop(self) -> None:
"""Stop the detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop the detector"""
# Stop detector
self.parent.cam.acquire.put(0)
# Check if detector returned in idle state
signal_conditions = [
(
lambda: self.parent.cam.detector_state.read()[self.parent.cam.detector_state.name][
@@ -135,7 +226,7 @@ class Eiger9MSetup(CustomDetectorMixin):
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout - self.parent.timeout // 2,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
check_stopped=True,
all_signals=False,
):
@@ -143,7 +234,7 @@ class Eiger9MSetup(CustomDetectorMixin):
self.parent.cam.acquire.put(0)
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout - self.parent.timeout // 2,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
check_stopped=True,
all_signals=False,
):
@@ -155,97 +246,12 @@ class Eiger9MSetup(CustomDetectorMixin):
"""Close file writer"""
self.std_client.stop_writer()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.set_trigger(TriggerSource.GATING)
def set_detector_threshold(self) -> None:
"""
Set the detector threshold
The function sets the detector threshold automatically to 1/2 of the beam energy.
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
energy = self.parent.cam.beam_energy.read()[self.parent.cam.beam_energy.name]["value"]
if setpoint != energy:
self.parent.cam.beam_energy.set(setpoint)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def prepare_data_backend(self) -> None:
"""Prepare the data backend for the scan"""
self.parent.filepath = self.parent.filewriter.compile_full_filename(
f"{self.parent.name}.h5"
)
self.filepath_exists(self.parent.filepath)
self.stop_detector_backend()
try:
self.std_client.start_writer_async(
{
"output_file": self.parent.filepath,
"n_images": int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
),
}
)
except Exception as exc:
time.sleep(5)
if self.std_client.get_status()["state"] == "READY":
raise EigerTimeoutError(f"Timeout of start_writer_async with {exc}") from exc
# Check status of std_daq
signal_conditions = [
(lambda: self.std_client.get_status()["acquisition"]["state"], "WAITING_IMAGES")
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
check_stopped=False,
all_signals=True,
):
raise EigerTimeoutError(
"Timeout of 5s reached for std_daq start_writer_async with std_daq client status"
f" {self.std_client.get_status()}"
)
def filepath_exists(self, filepath: str) -> None:
"""Check if filepath exists"""
signal_conditions = [(lambda: os.path.exists(os.path.dirname(filepath)), True)]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=False,
all_signals=True,
):
@@ -264,7 +270,7 @@ class Eiger9MSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
@@ -272,43 +278,7 @@ class Eiger9MSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS.
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.parent.filepath, done=done)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self):
def finished(self, timeout: int = 5) -> None:
"""Check if acquisition is finished."""
with self._lock:
signal_conditions = [
@@ -326,7 +296,7 @@ class Eiger9MSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
@@ -357,7 +327,7 @@ class SLSDetectorCam(Device):
detector_state = ADCpt(EpicsSignalRO, "DetectorState_RBV")
class TriggerSource(enum.IntEnum):
class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
@@ -366,7 +336,7 @@ class TriggerSource(enum.IntEnum):
BURST_TRIGGER = 3
class DetectorState(enum.IntEnum):
class DetectorState(int, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = 0
@@ -396,37 +366,16 @@ class Eiger9McSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = Eiger9MSetup
# specify minimum readout time for detector
# specify minimum readout time for detector and timeout for checks after unstage
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
cam = ADCpt(SLSDetectorCam, "cam1:")
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector.
Check the TriggerSource enum for possible values
Args:
trigger_source (TriggerSource): Trigger source for the detector
"""
value = trigger_source
self.cam.trigger_mode.put(value)
def stage(self) -> list[object]:
"""
Add functionality to stage, and arm the detector
Additional call to:
- custom_prepare.arm_acquisition()
"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:", sim_mode=True)

View File

@@ -1,8 +1,7 @@
import enum
import os
import threading
from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
@@ -99,6 +98,16 @@ class FalconSetup(CustomDetectorMixin):
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize Falcon detector"""
self.initialize_default_parameter()
self.initialize_detector()
self.initialize_detector_backend()
def initialize_default_parameter(self) -> None:
"""
Set default parameters for Falcon
@@ -124,7 +133,7 @@ class FalconSetup(CustomDetectorMixin):
"""Initialize Falcon detector"""
self.stop_detector()
self.stop_detector_backend()
self.parent.set_trigger(
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
# 1 Realtime
@@ -136,30 +145,6 @@ class FalconSetup(CustomDetectorMixin):
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
signal_conditions = [
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout - self.parent.timeout // 2,
all_signals=False,
):
# Retry stop detector and wait for remaining time
raise FalconTimeoutError(
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
self.parent.hdf5.capture.put(0)
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for Falcon."""
self.parent.hdf5.enable.put(1)
@@ -173,9 +158,16 @@ class FalconSetup(CustomDetectorMixin):
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
self.parent.set_trigger(
self.set_trigger(
mapping_mode=MappingSource.MAPPING, trigger_source=TriggerSource.GATE, ignore_gate=0
)
self.parent.preset_real.put(self.parent.scaninfo.exp_time)
@@ -185,10 +177,10 @@ class FalconSetup(CustomDetectorMixin):
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
self.parent.filepath = self.parent.filewriter.compile_full_filename(
f"{self.parent.name}.h5"
)
file_path, file_name = os.path.split(self.parent.filepath)
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
).wait()
file_path, file_name = os.path.split(self.parent.filepath.get())
self.parent.hdf5.file_path.put(file_path)
self.parent.hdf5.file_name.put(file_name)
self.parent.hdf5.file_template.put("%s%s")
@@ -212,7 +204,7 @@ class FalconSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=self.parent.TIMEOUT_FOR_SIGNALS,
check_stopped=True,
all_signals=False,
):
@@ -220,65 +212,86 @@ class FalconSetup(CustomDetectorMixin):
f"Failed to arm the acquisition. Detector state {signal_conditions[0][0]}"
)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def on_unstage(self) -> None:
"""Unstage detector and backend"""
pass
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS.
def on_complete(self) -> None:
"""Complete detector and backend"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(done=True, successful=True)
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(file_path=self.parent.filepath, done=done)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath, done=done, successful=successful
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self) -> None:
"""Check if scan finished succesfully"""
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
signal_conditions = [
(lambda: self.parent.state.read()[self.parent.state.name]["value"], DetectorState.DONE)
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
all_signals=False,
):
# Retry stop detector and wait for remaining time
raise FalconTimeoutError(
f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
)
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
self.parent.hdf5.capture.put(0)
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
with self._lock:
total_frames = int(
self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger
)
signal_conditions = [
(self.parent.dxp.current_pixel.get, total_frames),
(self.parent.hdf5.array_counter.get, total_frames),
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
logger.debug(
f"Falcon missed a trigger: received trigger {self.parent.dxp.current_pixel.get()},"
f" send data {self.parent.hdf5.array_counter.get()} from total_frames"
f" {total_frames}"
)
self.stop_detector()
self.stop_detector_backend()
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.parent.collect_mode.put(mapping)
self.parent.pixel_advance_mode.put(trigger)
self.parent.ignore_gate.put(ignore_gate)
class FalconcSAXS(PSIDetectorBase):
"""
@@ -303,6 +316,7 @@ class FalconcSAXS(PSIDetectorBase):
custom_prepare_cls = FalconSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
dxp = Cpt(EpicsDXPFalcon, "dxp1:")
@@ -330,30 +344,6 @@ class FalconcSAXS(PSIDetectorBase):
pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun")
nd_array_mode = Cpt(EpicsSignal, "NDArrayMode")
def set_trigger(
self, mapping_mode: MappingSource, trigger_source: TriggerSource, ignore_gate: int = 0
) -> None:
"""
Set triggering mode for detector
Args:
mapping_mode (MappingSource): Mapping mode for the detector
trigger_source (TriggerSource): Trigger source for the detector, pixel_advance_signal
ignore_gate (int): Ignore gate from TTL signal; defaults to 0
"""
mapping = int(mapping_mode)
trigger = trigger_source
self.collect_mode.put(mapping)
self.pixel_advance_mode.put(trigger)
self.ignore_gate.put(ignore_gate)
def stage(self) -> list[object]:
"""Stage"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
if __name__ == "__main__":
falcon = FalconcSAXS(name="falcon", prefix="X12SA-SITORO:", sim_mode=True)

View File

@@ -74,6 +74,11 @@ class MCSSetup(CustomDetectorMixin):
]
self.mca_data = defaultdict(lambda: [])
def on_init(self) -> None:
"""Init sequence for the detector"""
self.initialize_detector()
self.initialize_detector_backend()
def initialize_detector(self) -> None:
"""Initialize detector"""
# External trigger for pixel advance
@@ -84,7 +89,7 @@ class MCSSetup(CustomDetectorMixin):
# Set number of channels to 5
self.parent.mux_output.set(5)
# Trigger Mode used for cSAXS
self.parent.set_trigger(TriggerSource.MODE3)
self.parent.input_mode.set(TriggerSource.MODE3)
# specify polarity of trigger signals
self.parent.input_polarity.set(0)
self.parent.output_polarity.set(1)
@@ -142,10 +147,15 @@ class MCSSetup(CustomDetectorMixin):
expire=self._stream_ttl,
)
def on_stage(self) -> None:
"""Stage detector"""
self.prepare_detector()
self.prepare_detector_backend()
def prepare_detector(self) -> None:
"""Prepare detector for scan"""
self.set_acquisition_params()
self.parent.set_trigger(TriggerSource.MODE3)
self.parent.input_mode.set(TriggerSource.MODE3)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for scan"""
@@ -175,7 +185,15 @@ class MCSSetup(CustomDetectorMixin):
self.counter = 0
self.parent.erase_start.set(1)
def finished(self) -> None:
def on_unstage(self) -> None:
"""Unstage detector"""
pass
def on_complete(self) -> None:
"""Complete detector"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
def finished(self, timeout: int = 5) -> None:
"""Check if acquisition is finished, if not successful, rais MCSTimeoutError"""
signal_conditions = [
(lambda: self.acquisition_done, True),
@@ -183,7 +201,7 @@ class MCSSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
@@ -195,12 +213,15 @@ class MCSSetup(CustomDetectorMixin):
f" {total_frames} frames arriving at the mcs card"
)
def on_stop(self) -> None:
"""Stop detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop detector"""
self.parent.stop_all.set(1)
return super().stop_detector()
def stop_detector_backend(self) -> None:
"""Stop acquisition of data"""
self.acquisition_done = True
@@ -213,7 +234,7 @@ class SIS38XX(Device):
class MCScSAXS(PSIDetectorBase):
"""MCS card for cSAXS for implementation at cSAXS beamline"""
USER_ACCESS = ["describe", "_init_mcs"]
USER_ACCESS = []
SUB_PROGRESS = "progress"
SUB_VALUE = "value"
_default_sub = SUB_VALUE
@@ -222,6 +243,7 @@ class MCScSAXS(PSIDetectorBase):
custom_prepare_cls = MCSSetup
# specify minimum readout time for detector
MIN_READOUT = 0
TIMEOUT_FOR_SIGNALS = 5
# PV access to SISS38XX card
# Acquisition
@@ -272,11 +294,8 @@ class MCScSAXS(PSIDetectorBase):
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
mcs_config=None,
**kwargs,
):
@@ -289,25 +308,11 @@ class MCScSAXS(PSIDetectorBase):
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
device_manager=device_manager,
sim_mode=sim_mode,
**kwargs,
)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger mode from TriggerSource"""
value = int(trigger_source)
self.input_mode.set(value)
def stage(self) -> list[object]:
"""stage the detector for upcoming acquisition"""
rtr = super().stage()
self.custom_prepare.arm_acquisition()
return rtr
# Automatically connect to test environmenr if directly invoked
if __name__ == "__main__":

View File

@@ -1,12 +1,12 @@
import enum
import json
import os
import threading
import time
import numpy as np
import requests
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib import bec_logger
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV, Staged
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
@@ -16,8 +16,6 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
logger = bec_logger.logger
MIN_READOUT = 3e-3
class PilatusError(Exception):
"""Base class for exceptions in this module."""
@@ -72,6 +70,15 @@ class PilatusSetup(CustomDetectorMixin):
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
self._lock = threading.RLock()
def on_init(self) -> None:
"""Initialize the detector"""
self.initialize_default_parameter()
self.initialize_detector()
def initialize_default_parameter(self) -> None:
"""Set default parameters for Eiger9M detector"""
self.update_readout_time()
@@ -90,7 +97,15 @@ class PilatusSetup(CustomDetectorMixin):
# Stops the detector
self.stop_detector()
# Sets the trigger source to GATING
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
def on_stage(self) -> None:
"""Stage the detector for scan"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(
done=False, successful=False, metadata={"input_path": self.parent.filepath_raw}
)
def prepare_detector(self) -> None:
"""
@@ -101,84 +116,7 @@ class PilatusSetup(CustomDetectorMixin):
"""
self.set_detector_threshold()
self.set_acquisition_params()
self.parent.set_trigger(TriggerSource.EXT_ENABLE)
def set_detector_threshold(self) -> None:
"""
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def create_directory(self, filepath: str) -> None:
"""Create directory if it does not exist"""
os.makedirs(filepath, exist_ok=True)
def stop_detector_backend(self) -> None:
"""Stop the file writer zmq service for pilatus_2"""
self.close_file_writer()
time.sleep(0.1)
self.stop_file_writer()
time.sleep(0.1)
def close_file_writer(self) -> None:
"""
Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self.send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self.send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
self.parent.cam.trigger_mode.put(TriggerSource.EXT_ENABLE)
def prepare_data_backend(self) -> None:
"""
@@ -191,7 +129,9 @@ class PilatusSetup(CustomDetectorMixin):
self.stop_detector_backend()
self.parent.filepath = self.parent.filewriter.compile_full_filename("pilatus_2.h5")
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename("pilatus_2.h5")
).wait()
self.parent.cam.file_path.put("/dev/shm/zmq/")
self.parent.cam.file_name.put(
f"{self.parent.scaninfo.username}_2_{self.parent.scaninfo.scan_number:05d}"
@@ -275,6 +215,76 @@ class PilatusSetup(CustomDetectorMixin):
except Exception as exc:
logger.info(f"Pilatus2 wait threw Exception: {exc}")
def set_detector_threshold(self) -> None:
"""
Set correct detector threshold to 1/2 of current X-ray energy, allow 5% tolerance
Threshold might be in ev or keV
"""
# get current beam energy from device manageer
mokev = self.parent.device_manager.devices.mokev.obj.read()[
self.parent.device_manager.devices.mokev.name
]["value"]
factor = 1
# Check if energies are eV or keV, assume keV as the default
unit = getattr(self.parent.cam.threshold_energy, "units", None)
if unit is not None and unit == "eV":
factor = 1000
# set energy on detector
setpoint = int(mokev * factor)
# set threshold on detector
threshold = self.parent.cam.threshold_energy.read()[self.parent.cam.threshold_energy.name][
"value"
]
if not np.isclose(setpoint / 2, threshold, rtol=0.05):
self.parent.cam.threshold_energy.set(setpoint / 2)
def set_acquisition_params(self) -> None:
"""Set acquisition parameters for the detector"""
# Set number of images and frames (frames is for internal burst of detector)
self.parent.cam.num_images.put(
int(self.parent.scaninfo.num_points * self.parent.scaninfo.frames_per_trigger)
)
self.parent.cam.num_frames.put(1)
# Update the readout time of the detector
self.update_readout_time()
def create_directory(self, filepath: str) -> None:
"""Create directory if it does not exist"""
os.makedirs(filepath, exist_ok=True)
def close_file_writer(self) -> None:
"""
Close the file writer for pilatus_2
Delete the data from x12sa-pd-2
"""
url = "http://x12sa-pd-2:8080/stream/pilatus_2"
try:
res = self.send_requests_delete(url=url)
if not res.ok:
res.raise_for_status()
except Exception as exc:
logger.info(f"Pilatus2 close threw Exception: {exc}")
def stop_file_writer(self) -> None:
"""
Stop the file writer for pilatus_2
Runs on xbl-daq-34
"""
url = "http://xbl-daq-34:8091/pilatus_2/stop"
res = self.send_requests_put(url=url)
if not res.ok:
res.raise_for_status()
def send_requests_put(self, url: str, data: list = None, headers: dict = None) -> object:
"""
Send a put request to the given url
@@ -302,14 +312,8 @@ class PilatusSetup(CustomDetectorMixin):
"""
return requests.delete(url=url, headers=headers, timeout=5)
def pre_scan(self) -> None:
"""
Pre_scan function call
This function is called just before the scan core.
Here it is used to arm the detector for the acquisition
"""
def on_pre_scan(self) -> None:
"""Prepare detector for scan"""
self.arm_acquisition()
def arm_acquisition(self) -> None:
@@ -318,43 +322,18 @@ class PilatusSetup(CustomDetectorMixin):
# TODO is this sleep needed? to be tested with detector and for how long
time.sleep(0.5)
def publish_file_location(self, done: bool = False, successful: bool = None) -> None:
"""
Publish the filepath to REDIS and publish the event for the h5_converter
def on_unstage(self) -> None:
"""Unstage the detector"""
pass
We publish two events here:
- file_event: event for the filewriter
- public_file: event for any secondary service (e.g. radial integ code)
Args:
done (bool): True if scan is finished
successful (bool): True if scan was successful
"""
pipe = self.parent.connector.pipeline()
if successful is None:
msg = messages.FileMessage(
file_path=self.parent.filepath,
done=done,
metadata={"input_path": self.parent.filepath_raw},
)
else:
msg = messages.FileMessage(
file_path=self.parent.filepath,
done=done,
successful=successful,
metadata={"input_path": self.parent.filepath_raw},
)
self.parent.connector.set_and_publish(
MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name),
msg,
pipe=pipe,
def on_complete(self) -> None:
"""Complete the scan"""
self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
self.publish_file_location(
done=True, successful=True, metadata={"input_path": self.parent.filepath_raw}
)
self.parent.connector.set_and_publish(
MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe
)
pipe.execute()
def finished(self) -> None:
def finished(self, timeout: int = 5) -> None:
"""Check if acquisition is finished."""
# pylint: disable=protected-access
# TODO: at the moment this relies on device.mcs.obj._staged attribute
@@ -363,7 +342,7 @@ class PilatusSetup(CustomDetectorMixin):
]
if not self.wait_for_signals(
signal_conditions=signal_conditions,
timeout=self.parent.timeout,
timeout=timeout,
check_stopped=True,
all_signals=True,
):
@@ -375,16 +354,21 @@ class PilatusSetup(CustomDetectorMixin):
self.stop_detector()
self.stop_detector_backend()
def on_stop(self) -> None:
"""Stop detector"""
self.stop_detector()
self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stop detector"""
self.parent.cam.acquire.put(0)
def check_scan_id(self) -> None:
"""Checks if scan_id has changed and stops the scan if it has"""
old_scan_id = self.parent.scaninfo.scan_id
self.parent.scaninfo.load_scan_metadata()
if self.parent.scaninfo.scan_id != old_scan_id:
self.parent.stopped = True
def stop_detector_backend(self) -> None:
"""Stop the file writer zmq service for pilatus_2"""
self.close_file_writer()
time.sleep(0.1)
self.stop_file_writer()
time.sleep(0.1)
class PilatuscSAXS(PSIDetectorBase):
@@ -401,20 +385,16 @@ class PilatuscSAXS(PSIDetectorBase):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = PilatusSetup
# specify minimum readout time for detector
MIN_READOUT = 3e-3
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
cam = ADCpt(SLSDetectorCam, "cam1:")
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source for the detector"""
value = trigger_source
self.cam.trigger_mode.put(value)
if __name__ == "__main__":
pilatus_2 = PilatuscSAXS(name="pilatus_2", prefix="X12SA-ES-PILATUS300K:", sim_mode=True)

View File

@@ -0,0 +1,166 @@
import enum
import os
import threading
import time
from typing import Any
import numpy as np
import openapi_client
from bec_lib.logger import bec_logger
from openapi_client.models.dataset_settings import DatasetSettings
from ophyd import ADComponent as ADCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, EpicsSignalWithRBV
from std_daq_client import StdDaqClient
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
logger = bec_logger.logger
class EigerError(Exception):
"""Base class for exceptions in this module."""
class EigerTimeoutError(EigerError):
"""Raised when the Eiger does not respond in time."""
class DetectorState(str, enum.Enum):
"""Detector states for Eiger9M detector"""
IDLE = "idle"
class ErrorState(int, enum.Enum):
""" Error State in the detector"""
ERROR_STATUS_WAITING = 504
class Eiger9MJungfrauJochSetup(CustomDetectorMixin):
"""Eiger setup class
Parent class: CustomDetectorMixin
"""
def __init__(self, *args, parent: Device = None, **kwargs) -> None:
super().__init__(*args, parent=parent, **kwargs)
# kwargs["host"] =
# kwargs["port"] =
configuration = openapi_client.Configuration(host="http://sls-jfjoch-001:8080")
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != DetectorState.IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ErrorState.ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
time.sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def on_stage(self):
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
exp_time = self.parent.scaninfo.exp_time
readout = self.parent.scaninfo.readout_time
num_burst_cycle = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["num_points"]
cycles = self.parent.scaninfo.scan_msg.content["info"]["kwargs"]["cycles"]
total_points = num_burst_cycle * cycles
dataset_settings = DatasetSettings(
image_time_us = int(exp_time*1e6),
ntrigger=total_points,
beam_x_pxl = 0,
beam_y_pxl = 0,
detector_distance_mm = 100,
incident_energy_ke_v = 10.00,
)
self.api.start_post(dataset_settings=dataset_settings)
def on_unstage(self):
pass
def on_complete(self):
logger.info("Starting complete for {self.parent.name}")
scan_name = self.parent.scaninfo.scan_msg.content["info"].get("scan_name", "")
if scan_name != "jjf_test":
return
def wait_till_done_post():
try:
done = self.api.wait_till_done_post(timeout=1)
except Exception as e: #TODO: be more specific
return False
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return True
return True
status = self.wait_with_status(signal_conditions=[(wait_till_done_post, True)], check_stopped=True,timeout=10)
return status
def on_stop(self):
self.api.cancel_post()
class TriggerSource(int, enum.Enum):
"""Trigger signals for Eiger9M detector"""
AUTO = 0
TRIGGER = 1
GATING = 2
BURST_TRIGGER = 3
class Eiger9McSAXS(PSIDetectorBase):
"""
Eiger9M detector for CSAXS
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (FalconSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
Various EpicsPVs for controlling the detector
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = []
# specify Setup class
custom_prepare_cls = Eiger9MJungfrauJochSetup
# specify minimum readout time for detector and timeout for checks after unstage
MIN_READOUT = 20e-6
TIMEOUT_FOR_SIGNALS = 5
# specify class attributes
if __name__ == "__main__":
eiger = Eiger9McSAXS(name="eiger", prefix="X12SA-ES-EIGER9M:")
eiger.custom_prepare.client.init()

View File

@@ -0,0 +1,84 @@
import string
from time import sleep
import openapi_client
ERROR_STATUS_WAITING = 504
STATUS_IDLE = "Idle"
# allow / to enable creation of subfolders
ALLOWED_CHARS = set(
string.ascii_letters + string.digits + "_-+/"
)
def character_cleanup(s, default="_", allowed=ALLOWED_CHARS):
return "".join(i if i in allowed else default for i in s)
class JFJClient:
def __init__(self, host):
configuration = openapi_client.Configuration(host=host)
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
if status != STATUS_IDLE:
self.api.initialize_post()
self.actually_wait_till_done()
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e: #TODO: be more specific
if e.status != ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # seems we get None instead of: status == 200
return
sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def get_detector_status(self):
return self.api.detector_status_get()#.state #TODO
def get_detectors(self):
return self.api.config_select_detector_get()
def get_detector_config(self):
return self.api.config_detector_get()
def acquire(self, file_prefix, **kwargs):
status = self.get_daq_status()
if status != STATUS_IDLE:
raise RuntimeError(f"status is not {STATUS_IDLE} but: {status}")
file_prefix = character_cleanup(file_prefix)
dataset_settings = openapi_client.DatasetSettings(file_prefix=file_prefix, **kwargs)
self.api.start_post(dataset_settings=dataset_settings)
self.actually_wait_till_done()
def take_pedestal(self):
self.api.pedestal_post()
self.actually_wait_till_done()

View File

@@ -1,12 +1,15 @@
import functools
import socket
import threading
import time
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import raise_if_disconnected
import numpy as np
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal, SignalRO
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError, ReadOnlyError
from ophyd_devices.utils.controller import Controller, threadlocked
from ophyd_devices.utils.socket import SocketIO, SocketSignal, raise_if_disconnected
from prettytable import PrettyTable
from typeguard import typechecked
def channel_checked(fcn):
@@ -14,81 +17,32 @@ def channel_checked(fcn):
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
# pylint: disable=protected-access
self._check_channel(args[0])
return fcn(self, *args, **kwargs)
return wrapper
class SocketIO:
"""SocketIO helper class for TCP IP connections"""
def __init__(self, sock=None):
self.is_open = False
if sock is None:
self.open()
else:
self.sock = sock
def connect(self, host, port):
print(f"connecting to {host} port {port}")
# self.sock.create_connection((host, port))
self.sock.connect((host, port))
def _put(self, msg_bytes):
return self.sock.send(msg_bytes)
def _recv(self, buffer_length=1024):
return self.sock.recv(buffer_length)
def _initialize_socket(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(5)
def put(self, msg):
return self._put(msg)
def receive(self, buffer_length=1024):
return self._recv(buffer_length=buffer_length)
def open(self):
self._initialize_socket()
self.is_open = True
def close(self):
self.sock.close()
self.sock = None
self.is_open = False
class NpointError(Exception):
"""
Base class for Npoint errors.
"""
class NPointController:
_controller_instance = None
class NPointController(Controller):
"""
Controller for nPoint piezo stages. This class inherits from the Controller class
and provides a singleton interface to the nPoint controller.
"""
NUM_CHANNELS = 3
_axes_per_controller = 3
_read_single_loc_bit = "A0"
_write_single_loc_bit = "A2"
_trailing_bit = "55"
_range_offset = "78"
_channel_base = ["11", "83"]
def __init__(
self, comm_socket: SocketIO, server_ip: str = "129.129.99.87", server_port: int = 23
) -> None:
self._lock = threading.RLock()
super().__init__()
self._server_and_port_name = (server_ip, server_port)
self.socket = comm_socket
self.connected = False
def __new__(cls, *args, **kwargs):
if not NPointController._controller_instance:
NPointController._controller_instance = object.__new__(cls)
return NPointController._controller_instance
@classmethod
def create(cls):
return cls(SocketIO())
def show_all(self) -> None:
"""Display current status of all channels
@@ -98,54 +52,13 @@ class NPointController:
if not self.connected:
print("npoint controller is currently disabled.")
return
print(f"Connected to controller at {self._server_and_port_name}")
print(f"Connected to controller at {self._socket_host}:{self._socket_port}")
t = PrettyTable()
t.field_names = ["Channel", "Range", "Position", "Target"]
for ii in range(self.NUM_CHANNELS):
t.add_row(
[ii, self._get_range(ii), self._get_current_pos(ii), self._get_target_pos(ii)]
)
for ii in range(self._axes_per_controller):
t.add_row([ii, self._get_range(ii), self.get_current_pos(ii), self.get_target_pos(ii)])
print(t)
@threadlocked
def on(self) -> None:
"""Enable the NPoint controller and open a new socket.
Raises:
TimeoutError: Raised if the socket connection raises a timeout.
Returns:
None
"""
if self.connected:
print("You are already connected to the NPoint controller.")
return
if not self.socket.is_open:
self.socket.open()
try:
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
except socket.timeout:
raise TimeoutError(
f"Failed to connect to the specified server and port {self._server_and_port_name}."
)
except OSError:
print("ERROR while connecting. Let's try again")
self.socket.close()
time.sleep(0.5)
self.socket.open()
self.socket.connect(self._server_and_port_name[0], self._server_and_port_name[1])
self.connected = True
@threadlocked
def off(self) -> None:
"""Disable the controller and close the socket.
Returns:
None
"""
self.socket.close()
self.connected = False
@channel_checked
def _get_range(self, channel: int) -> int:
"""Get the range of the specified channel axis.
@@ -174,7 +87,7 @@ class NPointController:
return device_range
@channel_checked
def _get_current_pos(self, channel: int) -> float:
def get_current_pos(self, channel: int) -> float:
# for first channel: 0x11 83 13 34
addr = self._channel_base.copy()
addr.extend([f"{19 + 16 * channel:x}", "34"])
@@ -187,7 +100,7 @@ class NPointController:
return pos
@channel_checked
def _set_target_pos(self, channel: int, pos: float) -> None:
def set_target_pos(self, channel: int, pos: float) -> None:
# for first channel: 0x11 83 12 18 00 00 00 00
addr = self._channel_base.copy()
addr.extend([f"{18 + channel * 16:x}", "18"])
@@ -199,7 +112,7 @@ class NPointController:
self._put(send_buffer)
@channel_checked
def _get_target_pos(self, channel: int) -> float:
def get_target_pos(self, channel: int) -> float:
# for first channel: 0x11 83 12 18
addr = self._channel_base.copy()
addr.extend([f"{18 + channel * 16:x}", "18"])
@@ -214,17 +127,17 @@ class NPointController:
def _set_servo(self, channel: int, enable: bool) -> None:
print("Not tested")
return
# for first channel: 0x11 83 10 84 00 00 00 00
addr = self._channel_base.copy()
addr.extend([f"{16 + channel * 16:x}", "84"])
# # for first channel: 0x11 83 10 84 00 00 00 00
# addr = self._channel_base.copy()
# addr.extend([f"{16 + channel * 16:x}", "84"])
if enable:
data = ["00"] * 3 + ["01"]
else:
data = ["00"] * 4
send_buffer = self.__write_single_location_buffer(addr, data)
# if enable:
# data = ["00"] * 3 + ["01"]
# else:
# data = ["00"] * 4
# send_buffer = self.__write_single_location_buffer(addr, data)
self._put(send_buffer)
# self._put(send_buffer)
@channel_checked
def _get_servo(self, channel: int) -> int:
@@ -250,7 +163,7 @@ class NPointController:
"""
buffer = b"".join([bytes.fromhex(m) for m in buffer])
self.socket.put(buffer)
self.sock.put(buffer)
@threadlocked
def _put_and_receive(self, msg_hex_list: list) -> list:
@@ -264,8 +177,8 @@ class NPointController:
"""
buffer = b"".join([bytes.fromhex(m) for m in msg_hex_list])
self.socket.put(buffer)
recv_msg = self.socket.receive()
self.sock.put(buffer)
recv_msg = self.sock.receive()
recv_hex_list = [hex(m) for m in recv_msg]
self._verify_received_msg(msg_hex_list, recv_hex_list)
return recv_hex_list
@@ -293,9 +206,9 @@ class NPointController:
raise RuntimeError("Connection failure. Please restart the controller.")
def _check_channel(self, channel: int) -> None:
if channel >= self.NUM_CHANNELS:
if channel >= self._axes_per_controller:
raise ValueError(
f"Channel {channel+1} exceeds the available number of channels ({self.NUM_CHANNELS})"
f"Channel {channel+1} exceeds the available number of channels ({self._axes_per_controller})"
)
@staticmethod
@@ -391,155 +304,285 @@ class NPointController:
self.off()
class NPointAxis:
def __init__(self, controller: NPointController, channel: int, name: str) -> None:
super().__init__()
self._axis_range = 100
self.controller = controller
self.channel = channel
self.name = name
self.controller._check_channel(channel)
self._settling_time = 0.1
class NpointSignalBase(SocketSignal):
"""
Base class for nPoint signals.
"""
if self.settling_time == 0:
self.settling_time = 0.1
print(f"Setting the npoint settling time to {self.settling_time:.2f} s.")
print(
"You can set the settling time depending on the stage tuning\nusing the settling_time property."
)
print("This is the waiting time before the counting is done.")
def __init__(self, signal_name, **kwargs):
self.signal_name = signal_name
super().__init__(**kwargs)
self.controller: NPointController = self.parent.controller
self.sock = self.parent.controller.sock
def show_all(self) -> None:
self.controller.show_all()
@raise_if_disconnected
def get(self) -> float:
"""Get current position for this channel.
class NpointSignalRO(NpointSignalBase):
"""
Base class for read-only signals.
"""
Raises:
RuntimeError: Raised if channel is not connected.
def __init__(self, signal_name, **kwargs):
super().__init__(signal_name, **kwargs)
self._metadata["write_access"] = False
Returns:
float: position
@threadlocked
def _socket_set(self, val):
raise ReadOnlyError("Read-only signals cannot be set")
class NpointReadbackSignal(NpointSignalRO):
"""
Signal to read the current position of an nPoint piezo stage.
"""
@threadlocked
def _socket_get(self):
return self.controller.get_current_pos(self.parent.axis_Id_numeric) * self.parent.sign
class NpointSetpointSignal(NpointSignalBase):
"""
Signal to set the target position of an nPoint piezo stage.
"""
setpoint = 0
@threadlocked
def _socket_get(self):
return self.controller.get_target_pos(self.parent.axis_Id_numeric) * self.parent.sign
@threadlocked
def _socket_set(self, val):
target_val = val * self.parent.sign
self.setpoint = target_val
return self.controller.set_target_pos(
self.parent.axis_Id_numeric, target_val * self.parent.sign
)
class NpointMotorIsMoving(SignalRO):
"""
Signal to indicate whether the motor is currently moving or not.
"""
def set_motor_is_moving(self, value: int) -> None:
"""
return self.controller._get_current_pos(self.channel)
@raise_if_disconnected
def get_target_pos(self) -> float:
"""Get target position for this channel.
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
float: position
"""
return self.controller._get_target_pos(self.channel)
@raise_if_disconnected
@typechecked
def set(self, pos: float) -> None:
"""Set a new target position and wait until settled (settling_time).
Set the motor_is_moving signal to the specified value.
Args:
pos (float): New target position
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
value (int): 1 if the motor is moving, 0 otherwise.
"""
self.controller._set_target_pos(self.channel, pos)
time.sleep(self.settling_time)
self._readback = value
class NPointAxis(Device, PositionerBase):
"""
NPointAxis class, which inherits from Device and PositionerBase. This class
represents an axis of an nPoint piezo stage and provides the necessary
functionality to move the axis and read its current position.
"""
USER_ACCESS = ["controller"]
readback = Cpt(NpointReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(NpointSetpointSignal, signal_name="setpoint")
motor_is_moving = Cpt(NpointMotorIsMoving, value=0, kind="normal")
settling_time = Cpt(Signal, value=0.1, kind="config")
high_limit_travel = Cpt(Signal, value=0, kind="omitted")
low_limit_travel = Cpt(Signal, value=0, kind="omitted")
SUB_READBACK = "readback"
SUB_CONNECTION_CHANGE = "connection_change"
_default_sub = SUB_READBACK
def __init__(
self,
axis_Id,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
host="mpc2680.psi.ch",
port=8085,
limits=None,
sign=1,
socket_cls=SocketIO,
tolerance: float = 0.05,
**kwargs,
):
self.controller = NPointController(
socket_cls=socket_cls, socket_host=host, socket_port=port
)
self.axis_Id = axis_Id
self.sign = sign
self.controller.set_axis(axis=self, axis_nr=self.axis_Id_numeric)
self.tolerance = tolerance
super().__init__(
prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.readback.name = self.name
self.controller.subscribe(
self._update_connection_state, event_type=self.SUB_CONNECTION_CHANGE
)
self._update_connection_state()
if limits is not None:
assert len(limits) == 2
self.low_limit_travel.put(limits[0])
self.high_limit_travel.put(limits[1])
@property
def connected(self) -> bool:
return self.controller.connected
def limits(self):
return (self.low_limit_travel.get(), self.high_limit_travel.get())
@property
def low_limit(self):
return self.limits[0]
@property
def high_limit(self):
return self.limits[1]
def check_value(self, pos):
"""Check that the position is within the soft limits"""
low_limit, high_limit = self.limits
if low_limit < high_limit and not (low_limit <= pos <= high_limit):
raise LimitError(f"position={pos} not within limits {self.limits}")
def _update_connection_state(self, **kwargs):
for walk in self.walk_signals():
walk.item._metadata["connected"] = self.controller.connected
@raise_if_disconnected
def servo(self) -> int:
"""Get servo status
def move(self, position, wait=True, **kwargs):
"""Move to a specified position, optionally waiting for motion to
complete.
Raises:
RuntimeError: Raised if channel is not connected.
Parameters
----------
position
Position to move to
moved_cb : callable
Call this callback when movement has finished. This callback must
accept one keyword argument: 'obj' which will be set to this
positioner instance.
timeout : float, optional
Maximum time to wait for the motion. If None, the default timeout
for this positioner is used.
Returns:
int: Servo status
Returns
-------
status : MoveStatus
Raises
------
TimeoutError
When motion takes longer than `timeout`
ValueError
On invalid positions
RuntimeError
If motion fails other than timing out
"""
return self.controller._get_servo(self.channel)
self._started_moving = False
timeout = kwargs.pop("timeout", 10)
status = super().move(position, timeout=timeout, **kwargs)
self.user_setpoint.put(position, wait=False)
@servo.setter
@raise_if_disconnected
@typechecked
def servo(self, val: bool) -> None:
"""Set servo status
def move_and_finish():
self.motor_is_moving.set_motor_is_moving(1)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
time.sleep(self.settling_time.get())
self.motor_is_moving.set_motor_is_moving(0)
val = self.readback.read()
self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time())
success = np.isclose(val[self.name]["value"], position, atol=self.tolerance)
self._done_moving(success=success)
threading.Thread(target=move_and_finish, daemon=True).start()
try:
if wait:
status_wait(status)
except KeyboardInterrupt:
self.stop()
raise
return status
@property
def axis_Id(self):
"""
Return the axis_Id_alpha.
"""
return self._axis_Id_alpha
@axis_Id.setter
def axis_Id(self, val: str):
"""
Set the axis_Id_alpha and axis_Id_numeric based on the alpha value.
Args:
val (bool): Servo status
Raises:
RuntimeError: Raised if channel is not connected.
Returns:
None
val (str): Single-character axis identifier.
"""
self.controller._set_servo(self.channel, val)
@property
def settling_time(self) -> float:
return self._settling_time
@settling_time.setter
@typechecked
def settling_time(self, val: float) -> None:
self._settling_time = val
print(f"Setting the npoint settling time to {val:.2f} s.")
class NPointEpics(NPointAxis):
def __init__(self, controller: NPointController, channel: int, name: str) -> None:
super().__init__(controller, channel, name)
self.low_limit = -50
self.high_limit = 50
self._prefix = name
def get_pv(self) -> str:
return self.name
def get_position(self, readback=True) -> float:
if readback:
return self.get()
if isinstance(val, str):
if len(val) != 1:
raise ValueError("Only single-character axis_Ids are supported.")
self._axis_Id_alpha = val
self._axis_Id_numeric = ord(val.lower()) - 97
else:
return self.get_target_pos()
raise TypeError(f"Expected value of type str but received {type(val)}")
def within_limits(self, pos: float) -> bool:
return pos > self.low_limit and pos < self.high_limit
@property
def axis_Id_numeric(self):
"""
Return the numeric value of the axis_Id.
"""
return self._axis_Id_numeric
def move(self, position: float, wait=True) -> None:
self.set(position)
@axis_Id_numeric.setter
def axis_Id_numeric(self, val: int):
"""
Set the axis_Id_numeric and axis_Id_alpha based on the numeric value.
Args:
val (int): Numeric axis identifier.
"""
if isinstance(val, int):
if val > 26:
raise ValueError("Numeric value exceeds supported range.")
self._axis_Id_alpha = val
self._axis_Id_numeric = (chr(val + 97)).capitalize()
else:
raise TypeError(f"Expected value of type int but received {type(val)}")
@property
def egu(self):
"""The engineering units (EGU) for positions"""
return "um"
def stage(self) -> list[object]:
return super().stage()
def unstage(self) -> list[object]:
return super().unstage()
if __name__ == "__main__":
## EXAMPLES ##
#
# Create controller and socket instance explicitly:
# controller = NPointController(SocketIO())
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
# Create controller instance explicitly
# controller = NPointController.create()
# npointx = NPointAxis(controller, 0, "nx")
# npointy = NPointAxis(controller, 1, "ny")
# Single-line axis:
# npointx = NPointAxis(NPointController.create(), 0, "nx")
#
# EPICS wrapper:
# nx = NPointEpics(NPointController.create(), 0, "nx")
controller = NPointController.create()
npointx = NPointAxis(NPointController.create(), 0, "nx")
npointy = NPointAxis(NPointController.create(), 0, "ny")
npx = NPointAxis(axis_Id="A", name="npx", host="nPoint000003.psi.ch", port=23)
npy = NPointAxis(axis_Id="B", name="npy", host="nPoint000003.psi.ch", port=23)
npx.controller.on()
print("socket is open, axis is ready!")
npx.move(10)
print(npx.read())
npx.controller.off()

View File

@@ -0,0 +1,9 @@
def patch_dual_pvs(device):
device.wait_for_connection(all_signals=True)
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv

View File

@@ -1,4 +1,5 @@
from .flomni_fermat_scan import FlomniFermatScan
from .jungfrau_joch_scan import JungfrauJochTest
from .LamNIFermatScan import LamNIFermatScan, LamNIMoveToScanCenter
from .owis_grid import OwisGrid
from .sgalil_grid import SgalilGrid

View File

@@ -0,0 +1,70 @@
import time
from bec_lib import bec_logger
from bec_lib.device import DeviceBase
from bec_lib.endpoints import MessageEndpoints
from bec_server.scan_server.scans import AsyncFlyScanBase, ScanAbortion
logger = bec_logger.logger
class JungfrauJochTest(AsyncFlyScanBase):
"""Owis-based grid scan."""
scan_name = "jjf_test"
#scan_report_hint = "device_progress"
required_kwargs = ["points", "exp_time", "readout_time"]
arg_input = {}
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
gui_config = {
"Acquisition Parameters": ["num_points", "cycles"],
"Exposure Parameters" : ["exp_time", "readout_time"]
}
def __init__(
self,
num_points:int,
exp_time: float,
readout_time: float,
cycles:int =1,
**kwargs,
):
"""
JungfrauJoch Test scan.
Args:
device (DeviceBase) : The device to be triggered, currently only for delaygenerator csaxs
num_points (int) : Number of points per burst
exp_time (float) : exposure time.
readout_time (float): readout time of detector
cycles (int) : number of cycles, default is 1
Example:
scans.jjf_test(points = 100, exp_time= 1e-3, readout_time=1e-3, cycles = 2)
"""
if readout_time <=0:
raise ScanAbortion(f"Readout time must be larger than 0, provided value {readout_time}")
super().__init__(exp_time=exp_time, readout_time=readout_time,**kwargs)
self.device = "ddg"
self.num_points = num_points
self.cycles = cycles
self.primary_readout_cycle = 0.2
def scan_core(self):
logger.info(f"Starting with Scan Core")
total_exposure = self.num_points * (self.exp_time + self.readout_time)
for i in range(self.cycles):
logger.info(f"Beginning cycle {i} of {self.cycles}")
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
yield from self.stubs.read_and_wait(
group="primary",
wait_group="readout_primary",
point_id=self.point_id,
)
yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=total_exposure)
self.point_id +=1
logger.info(f"Finished cycle {i} of {self.cycles}")
logger.info(f"Finished scan")
self.num_pos = self.point_id + 1

View File

@@ -7,16 +7,6 @@ from ophyd_devices.interfaces.base_classes.psi_delay_generator_base import Trigg
from csaxs_bec.devices.epics.delay_generator_csaxs import DDGSetup
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
@pytest.fixture(scope="function")
def mock_DDGSetup():
mock_ddg = mock.MagicMock()

View File

@@ -10,23 +10,13 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.eiger9m_csaxs import Eiger9McSAXS
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_det():
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -39,10 +29,9 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(Eiger9McSAXS, "_init"):
det = Eiger9McSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
det = Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -50,7 +39,6 @@ def test_init():
"""Test the _init function:"""
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -72,7 +60,7 @@ def test_init():
"csaxs_bec.devices.epics.eiger9m_csaxs.Eiger9MSetup.initialize_detector_backend"
) as mock_init_backend,
):
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
Eiger9McSAXS(name=name, prefix=prefix, device_manager=dm)
mock_default.assert_called_once()
mock_init_det.assert_called_once()
mock_init_backend.assert_called_once()
@@ -236,8 +224,8 @@ def test_stage(
mock_det.cam.beam_energy.put(scaninfo["mokev"])
mock_det.stopped = stopped
mock_det.cam.detector_state._read_pv.mock_data = detector_state
with mock.patch.object(mock_det.custom_prepare, "prepare_detector_backend") as mock_prep_fw:
mock_det.filepath = scaninfo["filepath"]
with mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_prep_fw:
mock_det.filepath.set(scaninfo["filepath"]).wait()
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
@@ -251,7 +239,7 @@ def test_stage(
)
assert mock_det.cam.num_frames.get() == 1
mock_publish_file_location.assert_called_with(done=False)
mock_publish_file_location.assert_called_with(done=False, successful=False)
assert mock_det.cam.acquire.get() == 1
@@ -326,7 +314,7 @@ def test_prepare_detector_backend(mock_det, scaninfo, daq_status, expected_excep
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
def test_complete(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
@@ -335,10 +323,10 @@ def test_unstage(mock_det, stopped, expected_exception):
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
mock_det.complete()
assert mock_det.stopped is True
else:
mock_det.unstage()
mock_det.complete()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
@@ -357,12 +345,11 @@ def test_stop_detector_backend(mock_det):
[
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)

View File

@@ -11,23 +11,13 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.falcon_csaxs import FalconcSAXS, FalconTimeoutError
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_det():
name = "falcon"
prefix = "X12SA-SITORO:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -42,10 +32,9 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(FalconcSAXS, "_init"):
det = FalconcSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
det = FalconcSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -139,9 +128,9 @@ def test_stage(mock_det, scaninfo):
This includes testing _prep_det
"""
with (
mock.patch.object(mock_det, "set_trigger") as mock_set_trigger,
mock.patch.object(mock_det.custom_prepare, "set_trigger") as mock_set_trigger,
mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
mock_det.custom_prepare, "prepare_data_backend"
) as mock_prep_data_backend,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
@@ -158,7 +147,7 @@ def test_stage(mock_det, scaninfo):
scaninfo["num_points"] * scaninfo["frames_per_trigger"]
)
mock_prep_data_backend.assert_called_once()
mock_publish_file_location.assert_called_once_with(done=False)
mock_publish_file_location.assert_called_once_with(done=False, successful=False)
mock_arm_acquisition.assert_called_once()
@@ -204,12 +193,11 @@ def test_prepare_data_backend(mock_det, scaninfo):
[
({"filepath": "test.h5", "successful": True, "done": False, "scan_id": "123"}),
({"filepath": "test.h5", "successful": False, "done": True, "scan_id": "123"}),
({"filepath": "test.h5", "successful": None, "done": True, "scan_id": "123"}),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
)
@@ -254,24 +242,18 @@ def test_trigger(mock_det):
mock_on_trigger.assert_called_once()
@pytest.mark.parametrize("stopped, expected_abort", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_abort):
def test_complete(mock_det):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_abort:
mock_det.unstage()
assert mock_det.stopped is stopped
assert mock_publish_file_location.call_count == 0
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is stopped
mock_det.stopped = False
mock_det.complete()
assert mock_finished.call_count == 1
call = mock.call(done=True, successful=True)
assert mock_publish_file_location.call_args == call
def test_stop(mock_det):

View File

@@ -16,23 +16,13 @@ from csaxs_bec.devices.epics.mcs_csaxs import (
ReadoutMode,
TriggerSource,
)
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_det():
name = "mcs"
prefix = "X12SA-MCS:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -47,8 +37,9 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(MCScSAXS, "_init"):
det = MCScSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
det = MCScSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
det.TIMEOUT_FOR_SIGNALS = 0.1
yield det
@@ -56,7 +47,6 @@ def test_init():
"""Test the _init function:"""
name = "eiger"
prefix = "X12SA-ES-EIGER9M:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -68,9 +58,6 @@ def test_init():
with mock.patch.object(ophyd, "cl") as mock_cl:
mock_cl.get_pv = MockPV
with (
mock.patch(
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_default_parameter"
) as mock_default,
mock.patch(
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector"
) as mock_init_det,
@@ -78,8 +65,7 @@ def test_init():
"csaxs_bec.devices.epics.mcs_csaxs.MCSSetup.initialize_detector_backend"
) as mock_init_backend,
):
MCScSAXS(name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode)
mock_default.assert_called_once()
MCScSAXS(name=name, prefix=prefix, device_manager=dm)
mock_init_det.assert_called_once()
mock_init_backend.assert_called_once()
@@ -275,23 +261,10 @@ def test_prepare_detector_backend(mock_det):
assert mock_det.read_mode.get() == ReadoutMode.EVENT
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
assert mock_det.stopped is True
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
def test_complete(mock_det):
with (mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,):
mock_det.complete()
assert mock_finished.call_count == 1
def test_stop_detector_backend(mock_det):

View File

@@ -1,49 +1,43 @@
import copy
from unittest import mock
import pytest
from csaxs_bec.devices.npoint import NPointAxis, NPointController
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
class SocketMock:
def __init__(self, sock=None):
self.buffer_put = ""
self.buffer_recv = ""
self.is_open = False
if sock is None:
self.open()
else:
self.sock = sock
def connect(self, host, port):
print(f"connecting to {host} port {port}")
# self.sock.create_connection((host, port))
# self.sock.connect((host, port))
@pytest.fixture
def controller():
"""
Fixture to create a NPointController object.
"""
with mock.patch("ophyd_devices.utils.socket.SocketIO") as socket_cls:
controller = NPointController(
socket_cls=socket_cls, socket_host="localhost", socket_port=1234
)
controller.on()
controller.sock.reset_mock()
yield controller
controller.off()
def _put(self, msg_bytes):
self.buffer_put = msg_bytes
print(self.buffer_put)
def _recv(self, buffer_length=1024):
print(self.buffer_recv)
return self.buffer_recv
def _initialize_socket(self):
pass
def put(self, msg):
return self._put(msg)
def receive(self, buffer_length=1024):
return self._recv(buffer_length=buffer_length)
def open(self):
self._initialize_socket()
self.is_open = True
def close(self):
self.sock = None
self.is_open = False
@pytest.fixture
def npointx():
"""
Fixture to create a NPointAxis object.
"""
controller = mock.MagicMock()
npointx = NPointAxis(
axis_Id="A", name="npointx", host="localhost", port=1234, socket_cls=controller
)
npointx.controller.on()
npointx.controller.sock.reset_mock()
npointx.controller.sock.receive.reset_mock()
yield npointx
npointx.controller.off()
@pytest.mark.parametrize(
@@ -54,12 +48,29 @@ class SocketMock:
(-5, b"\xa2\x18\x12\x83\x1133\xff\xffU"),
],
)
def test_axis_put(pos, msg):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
npointx.set(pos)
assert npointx.controller.socket.buffer_put == msg
def test_axis_put(npointx, pos, msg):
"""
Test that the set target position sends the correct message to the controller.
"""
npointx.controller.set_target_pos(npointx.axis_Id_numeric, pos)
npointx.controller.sock.put.assert_called_with(msg)
def test_npoint_axis_move(npointx):
"""
Test that the move method sends the correct messages to the controller.
It should send the set target position, followed by 2 get current position messages.
"""
npointx.controller.sock.receive.side_effect = [
b"\xa0\x34\x13\x83\x11\x00\x00\x00\x00U", # pos 0
b"\xa0\x34\x13\x83\x11\xcd\xcc\x00\x00U", # pos 5
]
npointx.move(5)
assert (
mock.call(b"\xa2\x18\x12\x83\x11\xcd\xcc\x00\x00U")
in npointx.controller.sock.put.mock_calls
)
assert len(npointx.controller.sock.put.mock_calls) == 3
@pytest.mark.parametrize(
@@ -70,13 +81,12 @@ def test_axis_put(pos, msg):
(-5, b"\xa04\x13\x83\x11U", b"\xa0\x34\x13\x83\x1133\xff\xffU"),
],
)
def test_axis_get_out(pos, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
npointx.controller.socket.buffer_recv = msg_out
assert pytest.approx(npointx.get(), rel=0.01) == pos
# assert controller.socket.buffer_put == msg_in
def test_axis_get_out(npointx, pos, msg_in, msg_out):
"""
Test that the readback value is correctly read from the controller.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
assert pytest.approx(npointx.readback.get(), rel=0.01) == pos
@pytest.mark.parametrize(
@@ -87,31 +97,40 @@ def test_axis_get_out(pos, msg_in, msg_out):
(2, b"\xa043\x83\x11U", b"\xa0\x34\x13\x83\x1133\xff\xffU"),
],
)
def test_axis_get_in(axis, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
controller.socket.buffer_recv = msg_out
controller._get_current_pos(axis)
assert controller.socket.buffer_put == msg_in
def test_axis_get_in(npointx, axis, msg_in, msg_out):
"""
Test that the readback value is correctly read from the controller by directly calling the
controller's method.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
npointx.controller.get_current_pos(axis)
npointx.controller.sock.put.assert_called_once_with(msg_in)
def test_axis_out_of_range():
controller = NPointController(SocketMock())
def test_axis_out_of_range(controller):
"""
Test that an error is raised when trying to create an NPointAxis object with an invalid axis ID.
"""
with pytest.raises(ValueError):
npointx = NPointAxis(controller, 3, "nx")
npointx = NPointAxis(
axis_Id="G", name="npointx", host="localhost", port=1234, socket_cls=mock.MagicMock()
)
def test_get_axis_out_of_range():
controller = NPointController(SocketMock())
def test_get_axis_out_of_range(controller):
"""
Test that an error is raised when trying to get the current position of an invalid axis.
"""
with pytest.raises(ValueError):
controller._get_current_pos(3)
controller.get_current_pos(3)
def test_set_axis_out_of_range():
controller = NPointController(SocketMock())
def test_set_axis_out_of_range(controller):
"""
Test that an error is raised when trying to set the target position of an invalid axis.
"""
with pytest.raises(ValueError):
controller._set_target_pos(3, 5)
controller.set_target_pos(3, 5)
@pytest.mark.parametrize(
@@ -123,6 +142,9 @@ def test_set_axis_out_of_range():
],
)
def test_hex_list_to_int(in_buffer, byteorder, signed, val):
"""
Test that the hex list is correctly converted to an integer
"""
assert (
NPointController._hex_list_to_int(
copy.deepcopy(in_buffer), byteorder=byteorder, signed=signed
@@ -139,10 +161,12 @@ def test_hex_list_to_int(in_buffer, byteorder, signed, val):
(2, b"\xa0x0\x83\x11U", b"\xa0\x78\x13\x83\x11\x64\x00\x00\x00U"),
],
)
def test_get_range(axis, msg_in, msg_out):
controller = NPointController(SocketMock())
npointx = NPointAxis(controller, 0, "nx")
controller.on()
controller.socket.buffer_recv = msg_out
val = controller._get_range(axis)
assert controller.socket.buffer_put == msg_in and val == 100
def test_get_range(npointx, axis, msg_in, msg_out):
"""
Test that the range is correctly read from the controller by directly calling the
controller's method.
"""
npointx.controller.sock.receive.side_effect = [msg_out]
val = npointx.controller._get_range(axis)
npointx.controller.sock.put.assert_called_once_with(msg_in)
assert val == 100

View File

@@ -11,23 +11,13 @@ from bec_server.device_server.tests.utils import DMMock
from ophyd_devices.tests.utils import MockPV
from csaxs_bec.devices.epics.pilatus_csaxs import PilatuscSAXS
def patch_dual_pvs(device):
for walk in device.walk_signals():
if not hasattr(walk.item, "_read_pv"):
continue
if not hasattr(walk.item, "_write_pv"):
continue
if walk.item._read_pv.pvname.endswith("_RBV"):
walk.item._read_pv = walk.item._write_pv
from csaxs_bec.devices.tests_utils.utils import patch_dual_pvs
@pytest.fixture(scope="function")
def mock_det():
name = "pilatus"
prefix = "X12SA-ES-PILATUS300K:"
sim_mode = False
dm = DMMock()
with mock.patch.object(dm, "connector"):
with (
@@ -40,9 +30,7 @@ def mock_det():
mock_cl.get_pv = MockPV
mock_cl.thread_class = threading.Thread
with mock.patch.object(PilatuscSAXS, "_init"):
det = PilatuscSAXS(
name=name, prefix=prefix, device_manager=dm, sim_mode=sim_mode
)
det = PilatuscSAXS(name=name, prefix=prefix, device_manager=dm)
patch_dual_pvs(det)
yield det
@@ -61,7 +49,7 @@ def test_init_detector(mock_det, trigger_source, detector_state):
Validation upon setting the correct PVs
"""
mock_det.custom_prepare.initialize_detector() # call the method you want to test
mock_det.custom_prepare.on_init() # call the method you want to test
assert mock_det.cam.acquire.get() == detector_state
assert mock_det.cam.trigger_mode.get() == trigger_source
@@ -96,6 +84,8 @@ def test_init_detector(mock_det, trigger_source, detector_state):
],
)
def test_stage(mock_det, scaninfo, stopped, expected_exception):
path = "tmp"
mock_det.filepath_raw = path
with mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location:
@@ -105,14 +95,12 @@ def test_stage(mock_det, scaninfo, stopped, expected_exception):
mock_det.device_manager.add_device("mokev", value=12.4)
mock_det.stopped = stopped
with (
mock.patch.object(
mock_det.custom_prepare, "prepare_detector_backend"
) as mock_data_backend,
mock.patch.object(mock_det.custom_prepare, "prepare_data_backend") as mock_data_backend,
mock.patch.object(
mock_det.custom_prepare, "update_readout_time"
) as mock_update_readout_time,
):
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
if expected_exception:
with pytest.raises(Exception):
mock_det.timeout = 0.1
@@ -127,11 +115,13 @@ def test_stage(mock_det, scaninfo, stopped, expected_exception):
)
assert mock_det.cam.num_frames.get() == 1
mock_publish_file_location.assert_called_with(done=False)
mock_publish_file_location.assert_called_once_with(
done=False, successful=False, metadata={"input_path": path}
)
def test_pre_scan(mock_det):
mock_det.custom_prepare.pre_scan()
mock_det.custom_prepare.on_pre_scan()
assert mock_det.cam.acquire.get() == 1
@@ -169,23 +159,16 @@ def test_update_readout_time(mock_det, readout_time, expected_value):
"scan_id": "123",
}
),
(
{
"filepath": "test.h5",
"filepath_raw": "test5_raw.h5",
"successful": None,
"done": True,
"scan_id": "123",
}
),
],
)
def test_publish_file_location(mock_det, scaninfo):
mock_det.scaninfo.scan_id = scaninfo["scan_id"]
mock_det.filepath = scaninfo["filepath"]
mock_det.filepath.set(scaninfo["filepath"]).wait()
mock_det.filepath_raw = scaninfo["filepath_raw"]
mock_det.custom_prepare.publish_file_location(
done=scaninfo["done"], successful=scaninfo["successful"]
done=scaninfo["done"],
successful=scaninfo["successful"],
metadata={"input_path": scaninfo["filepath_raw"]},
)
if scaninfo["successful"] is None:
msg = messages.FileMessage(
@@ -403,23 +386,19 @@ def test_prep_file_writer(mock_det, scaninfo, data_msgs, urls, requests_state, e
assert call == mock_call
@pytest.mark.parametrize("stopped, expected_exception", [(False, False), (True, True)])
def test_unstage(mock_det, stopped, expected_exception):
def test_complete(mock_det):
path = "tmp"
mock_det.filepath_raw = path
with (
mock.patch.object(mock_det.custom_prepare, "finished") as mock_finished,
mock.patch.object(
mock_det.custom_prepare, "publish_file_location"
) as mock_publish_file_location,
):
mock_det.stopped = stopped
if expected_exception:
mock_det.unstage()
assert mock_det.stopped is True
else:
mock_det.unstage()
mock_finished.assert_called_once()
mock_publish_file_location.assert_called_with(done=True, successful=True)
assert mock_det.stopped is False
mock_det.complete()
assert mock_finished.call_count == 1
call = mock.call(done=True, successful=True, metadata={"input_path": path})
assert mock_publish_file_location.call_args == call
def test_stop(mock_det):

View File

@@ -373,6 +373,7 @@ def test_LamNIFermatScan(scan_msg, reference_scan_list):
reference_scan_list[ii].content["parameter"]["rpc_id"] = scan_instructions[
ii
].content["parameter"]["rpc_id"]
reference_scan_list[ii].metadata["RID"] = scan_instructions[ii].metadata.get("RID")
if instr.content["parameter"].get("value"):
assert np.isclose(
instr.content["parameter"].get("value"),