PSO distance works with manual triggers

This commit is contained in:
gac-x05la
2024-10-31 17:49:09 +01:00
committed by mohacsi_i
parent 931d1601b5
commit 8c982f4be9
5 changed files with 119 additions and 141 deletions

View File

@@ -29,49 +29,37 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
def on_stage(self) -> None:
""" Configuration and staging
"""
logger.warning(self.parent.scaninfo.__dict__)
npoints = int(d["ntotal"])
trigger = d.get("trigger", DriveDataCaptureTrigger.PsoOutput)
source0 = d.get("source0", DriveDataCaptureInput.PrimaryFeedback)
source1 = d.get("source1", DriveDataCaptureInput.PositionCommand)
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
prefix = self.parent.parent.name if self.parent.parent is not None else self.parent.name
d = {}
if hasattr(self.parent.scaninfo, prefix + '_num_points'):
val = str(getattr(self.parent.scaninfo, prefix + '_num_points'))
d['num_points'] = val
if hasattr(self.parent.scaninfo, prefix + '_ddc_trigger'):
val = str(getattr(self.parent.scaninfo, prefix + '_ddc_trigger'))
d['ddc_trigger'] = val
if hasattr(self.parent.scaninfo, prefix + '_ddc_source0'):
val = getattr(self.parent.scaninfo, prefix + '_ddc_source0')
d['ddc_source0'] = val
if hasattr(self.parent.scaninfo, prefix + '_ddc_source1'):
val = getattr(self.parent.scaninfo, prefix + '_ddc_source1')
d['ddc_source1'] = val
if hasattr(self.parent.scaninfo, 'num_points'):
val = str(getattr(self.parent.scaninfo, 'num_points'))
d['num_points'] = val
if hasattr(self.parent.scaninfo, 'ddc_trigger'):
val = str(getattr(self.parent.scaninfo, 'ddc_trigger'))
d['ddc_trigger'] = val
if hasattr(self.parent.scaninfo, 'ddc_source0'):
val = getattr(self.parent.scaninfo, 'ddc_source0')
d['ddc_source0'] = val
if hasattr(self.parent.scaninfo, 'ddc_source1'):
val = getattr(self.parent.scaninfo, 'ddc_source1')
d['ddc_source1'] = val
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
if f'{prefix}_num_points_total' in scanargs:
d['num_points'] = scanargs[f'{prefix}_num_points_total']
if f'{prefix}_ddc_trigger' in scanargs:
d['ddc_trigger'] = scanargs[f'{prefix}_ddc_trigger']
if f'{prefix}_ddc_source0' in scanargs:
d['ddc_source0'] = scanargs[f'{prefix}_ddc_source0']
if f'{prefix}_ddc_source1' in scanargs:
d['ddc_source1'] = scanargs[f'{prefix}_ddc_source1']
if 'num_points_total' in scanargs:
d['num_points'] = scanargs['num_points_total']
if 'ddc_trigger' in scanargs:
d['ddc_trigger'] = scanargs['ddc_trigger']
if 'ddc_source0' in scanargs:
d['ddc_source0'] = scanargs['ddc_source0']
if 'ddc_source1' in scanargs:
d['ddc_source1'] = scanargs['ddc_source1']
# Perform bluesky-style configuration
if len(d)>0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Stage the DDC distance module
@@ -90,7 +78,7 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
class aa1AxisDriveDataCollection(Device):
class aa1AxisDriveDataCollection(PSIDeviceBase):
"""Axis data collection
This class provides convenience wrappers around the Aerotech API's axis
@@ -160,13 +148,6 @@ class aa1AxisDriveDataCollection(Device):
"""Reset incremental readback"""
self._switch.set("ResetRB", settle_time=0.1).wait()
def complete(self, settle_time=0.1) -> DeviceStatus:
"""DDC just reads back whatever is available in the buffers"""
sleep(settle_time)
status = DeviceStatus(self)
status.set_finished()
return status
def _collect(self, index=0):
"""Force a readback of the data buffer

View File

@@ -28,74 +28,77 @@ class AerotechPsoDistanceMixin(CustomDeviceMixin):
def on_stage(self) -> None:
""" Configuration and staging
"""
# logger.warning(self.parent.scaninfo.scan_msg.info.keys())
logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scan = self.parent.scaninfo.scan_msg
scanparam = self.parent.scaninfo.scan_msg.info
prefix = self.parent.parent.name if self.parent.parent is not None else self.parent.name
d = {}
if hasattr(self.parent.scaninfo, prefix + '_pso_distance'):
val = str(getattr(self.parent.scaninfo, prefix + '_pso_distance'))
d['pso_distance'] = val
if hasattr(self.parent.scaninfo, prefix + '_pso_wavemode'):
val = str(getattr(self.parent.scaninfo, prefix + '_pso_wavemode'))
d['pso_wavemode'] = val
if hasattr(self.parent.scaninfo, prefix + '_pso_w_pulse'):
val = getattr(self.parent.scaninfo, prefix + '_pso_w_pulse')
d['pso_w_pulse'] = val
if hasattr(self.parent.scaninfo, prefix + '_pso_t_pulse'):
val = getattr(self.parent.scaninfo, prefix + '_pso_t_pulse')
d['pso_t_pulse'] = val
if hasattr(self.parent.scaninfo, prefix + '_pso_n_pulse'):
val = getattr(self.parent.scaninfo, prefix + '_pso_n_pulse')
d['pso_n_pulse'] = val
if hasattr(self.parent.scaninfo, 'pso_distance'):
val = str(getattr(self.parent.scaninfo, 'pso_distance'))
d['pso_distance'] = val
if hasattr(self.parent.scaninfo, 'pso_wavemode'):
val = str(getattr(self.parent.scaninfo, 'pso_wavemode'))
d['pso_wavemode'] = val
if hasattr(self.parent.scaninfo, 'pso_w_pulse'):
val = getattr(self.parent.scaninfo, 'pso_w_pulse')
d['pso_w_pulse'] = val
if hasattr(self.parent.scaninfo, 'pso_t_pulse'):
val = getattr(self.parent.scaninfo, 'pso_t_pulse')
d['pso_t_pulse'] = val
if hasattr(self.parent.scaninfo, 'pso_n_pulse'):
val = getattr(self.parent.scaninfo, 'pso_n_pulse')
d['pso_n_pulse'] = val
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
if f'{prefix}_pso_distance' in scanargs:
d['pso_distance'] = scanargs[f'{prefix}_pso_distance']
if f'{prefix}_pso_wavemode' in scanargs:
d['pso_wavemode'] = scanargs[f'{prefix}_pso_wavemode']
if f'{prefix}_pso_w_pulse' in scanargs:
d['pso_w_pulse'] = scanargs[f'{prefix}_pso_w_pulse']
if f'{prefix}_pso_t_pulse' in scanargs:
d['pso_t_pulse'] = scanargs[f'{prefix}_pso_t_pulse']
if f'{prefix}_pso_n_pulse' in scanargs:
d['pso_n_pulse'] = scanargs[f'{prefix}_pso_n_pulse']
if 'psod_distance' in scanargs:
d['pso_distance'] = scanargs['psod_distance']
if 'psod_wavemode' in scanargs:
d['pso_wavemode'] = scanargs['psod_wavemode']
if 'psod_w_pulse' in scanargs:
d['pso_w_pulse'] = scanargs['psod_w_pulse']
if 'psod_t_pulse' in scanargs:
d['pso_t_pulse'] = scanargs['psod_t_pulse']
if 'psod_n_pulse' in scanargs:
d['pso_n_pulse'] = scanargs['psod_n_pulse']
# Perform bluesky-style configuration
if len(d)>0:
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Stage the PSO distance module
if isinstance(self.parent._distance_value, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
# Start monitoring the counters
self.parent.dstEventsEna.set("On").wait()
self.parent.dstCounterEna.set("On").wait()
# Wait for polling
sleep(0.5)
# Start monitoring the counters if distance is valid
if self.parent.dstDistanceVal.get() > 0:
self.parent.dstEventsEna.set("On").wait()
self.parent.dstCounterEna.set("On").wait()
def on_unstage(self):
""" Standard bluesky unstage"""
# Ensure output is set to low
if self.parent.output.value:
self.parent.toggle()
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
def on_trigger(self) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
if self.parent.dstDistanceVal.get() == 0:
status = self.parent._eventSingle.set(1, settle_time=0.1)
return status
@@ -163,14 +166,16 @@ class AerotechPsoWindowMixin(CustomDeviceMixin):
def on_unstage(self):
""" Standard bluesky unstage"""
# Ensure output is set to low
if self.parent.output.value:
self.parent.toggle()
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
@@ -205,6 +210,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
dstEventsEna = Component(EpicsSignal, "DIST:EVENTS", put_complete=True, kind=Kind.config)
dstCounterEna = Component(EpicsSignal, "DIST:COUNTER", put_complete=True, kind=Kind.omitted)
dstCounterVal = Component(EpicsSignalRO, "DIST:CTR0_RBV", auto_monitor=True, kind=Kind.normal)
dstDistanceVal = Component(EpicsSignalRO, "DIST:DISTANCE_RBV", auto_monitor=True, kind=Kind.normal)
dstArrayIdx = Component(EpicsSignalRO, "DIST:IDX_RBV", auto_monitor=True, kind=Kind.normal)
dstArrayDepleted = Component(EpicsSignalRO, "DIST:DEPLETED-RBV", auto_monitor=True, kind=Kind.normal)
@@ -340,7 +346,7 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
:param pso_n_pulse : trigger number of pulses in pulsed mode (default: 1)
"""
pso_distance = d.get("pso_distance", None)
pso_distance = d.get("pso_distance", 0)
pso_wavemode = d.get("pso_wavemode", "pulsed")
# Validate input parameters
@@ -368,26 +374,26 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
return (old, new)
# ########################################################################
# Bluesky flyer interface
def prepare(self, distance=None) -> DeviceStatus:
""" Arm trigger for a synchronous or asynchronous acquisition"""
if distance is not None:
# Write a new array
if isinstance(distance, (float, int)):
self.dstDistance.set(distance).wait()
elif isinstance(distance, (np.ndarray, list, tuple)):
self.dstDistanceArr.set(distance).wait()
else:
# Rearm the already configured array
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
self.dstArrayRearm.set(1).wait()
# Start monitoring the counters
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
status = DeviceStatus(self)
status.set_finished()
return status
# # ########################################################################
# # Bluesky flyer interface
# def prepare(self, distance=None) -> DeviceStatus:
# """ Arm trigger for a synchronous or asynchronous acquisition"""
# if distance is not None:
# # Write a new array
# if isinstance(distance, (float, int)):
# self.dstDistance.set(distance).wait()
# elif isinstance(distance, (np.ndarray, list, tuple)):
# self.dstDistanceArr.set(distance).wait()
# else:
# # Rearm the already configured array
# if isinstance(self._distance_value, (np.ndarray, list, tuple)):
# self.dstArrayRearm.set(1).wait()
# # Start monitoring the counters
# self.dstEventsEna.set("On").wait()
# self.dstCounterEna.set("On").wait()
# status = DeviceStatus(self)
# status.set_finished()
# return status

View File

@@ -1,20 +1,9 @@
import time
from collections import OrderedDict
from time import sleep
import numpy as np
from ophyd import Component, Device, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
try:
from AerotechAutomation1Enums import (
DriveDataCaptureInput,
DriveDataCaptureTrigger,
)
except ModuleNotFoundError:
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureInput
from tomcat_bec.devices.aerotech.AerotechAutomation1Enums import DriveDataCaptureTrigger
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
from bec_lib import bec_logger
@@ -31,41 +20,39 @@ class AerotechTasksMixin(CustomDeviceMixin):
NOTE: Tomcat might use multiple cameras with their own separate DAQ instances.
"""
logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# logger.warning(self.parent.scaninfo.scan_msg.info['kwargs'].keys())
# Fish out our configuration from scaninfo (via explicit or generic addressing)
scanparam = self.parent.scaninfo.scan_msg.info
prefix = self.parent.parent.name if self.parent.parent is not None else self.parent.name
d = {}
if hasattr(self.parent.scaninfo, prefix + '_script_text'):
val = str(getattr(self.parent.scaninfo, prefix + '_script_text'))
d['script_text'] = val
if hasattr(self.parent.scaninfo, prefix + '_script_file'):
val = str(getattr(self.parent.scaninfo, prefix + '_script_file'))
d['script_file'] = val
if hasattr(self.parent.scaninfo, prefix + '_script_mode'):
val = getattr(self.parent.scaninfo, prefix + '_script_mode')
d['script_mode'] = val
if hasattr(self.parent.scaninfo, prefix + '_script_task'):
val = str(getattr(self.parent.scaninfo, prefix + '_script_task'))
d['script_task'] = val
if hasattr(self.parent.scaninfo, 'script_text'):
val = str(getattr(self.parent.scaninfo, 'script_text'))
d['script_text'] = val
if hasattr(self.parent.scaninfo, 'script_file'):
val = str(getattr(self.parent.scaninfo, 'script_file'))
d['script_file'] = val
if hasattr(self.parent.scaninfo, 'script_mode'):
val = getattr(self.parent.scaninfo, 'script_mode')
d['script_mode'] = val
if hasattr(self.parent.scaninfo, 'script_task'):
val = str(getattr(self.parent.scaninfo, 'script_task'))
d['script_task'] = val
if 'kwargs' in scanparam:
scanargs = scanparam['kwargs']
if f'{prefix}_script_text' in scanargs:
d['script_text'] = scanargs[f'{prefix}_script_text']
if f'{prefix}_script_file' in scanargs:
d['script_file'] = scanargs[f'{prefix}_script_file']
if f'{prefix}_script_mode' in scanargs:
d['script_mode'] = scanargs[f'{prefix}_script_mode']
if f'{prefix}_script_task' in scanargs:
d['script_task'] = scanargs[f'{prefix}_script_task']
if 'script_text' in scanargs:
d['script_text'] = scanargs['script_text']
if 'script_file' in scanargs:
d['script_file'] = scanargs['script_file']
if 'script_mode' in scanargs:
d['script_mode'] = scanargs['script_mode']
if 'script_task' in scanargs:
d['script_task'] = scanargs['script_task']
# Perform bluesky-style configuration
if len(d)>0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# The actual staging

View File

@@ -190,10 +190,12 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
d['trigger_mode'] = val
# Perform bluesky-style configuration
if len(d)>0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Sync if out of sync

View File

@@ -66,10 +66,12 @@ class StdDaqMixin(CustomDeviceMixin):
d['file_path'] = val
# Perform bluesky-style configuration
if len(d)>0:
logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Only start acquisition if there was config
if len(d) == 0:
logger.warning(f"[{self.parent.name}] No configuration to stage.")
return
# Try to start a new run