Helical scan with proggress bar

This commit is contained in:
gac-x06da
2025-02-07 17:06:12 +01:00
committed by mohacsi_i
parent 963b775200
commit e79a3f785a
6 changed files with 149 additions and 18 deletions

View File

@@ -493,10 +493,10 @@ bstop_z:
bstop_pneum:
description: Beamstop pneumatic
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', auto_monitor: true, put_complete: true}
deviceConfig: {read_pv: 'X06DA-ES-BS:GET-POS', write_pv: 'X06DA-ES-BS:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readoutPriority: baseline
readOnly: false
softwareTrigger: false
bstop_diode:
@@ -511,7 +511,7 @@ bstop_diode:
frontlight:
description: Microscope frontlight
deviceClass: ophyd.EpicsSignal
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'normal', put_complete: true}
deviceConfig: {read_pv: 'X06DA-ES-FL:SET-BRGHT', kind: 'config', put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
@@ -520,10 +520,10 @@ frontlight:
backlight:
description: Backlight reflector
deviceClass: pxiii_bec.devices.PneumaticValve
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', auto_monitor: true, put_complete: true}
deviceConfig: {read_pv: 'X06DA-ES-BL:GET-POS', write_pv: 'X06DA-ES-BL:SET-POS', kind: 'config', auto_monitor: true, put_complete: true}
onFailure: buffer
enabled: true
readoutPriority: monitored
readoutPriority: baseline
readOnly: false
softwareTrigger: false
det_y:
@@ -595,7 +595,7 @@ abr:
softwareTrigger: false
shx:
description: SmarGon X axis
deviceClass: pxiii_bec.devices.SmarGonAxisA
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
@@ -604,7 +604,7 @@ shx:
softwareTrigger: false
shy:
description: SmarGon Y axis
deviceClass: pxiii_bec.devices.SmarGonAxisA
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: -2, high_limit: 2, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
@@ -613,7 +613,7 @@ shy:
softwareTrigger: false
shz:
description: SmarGon Z axis
deviceClass: pxiii_bec.devices.SmarGonAxisA
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 10, high_limit: 22, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
@@ -622,7 +622,7 @@ shz:
softwareTrigger: false
chi:
description: SmarGon CHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisA
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', low_limit: 0, high_limit: 40, sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true
@@ -631,7 +631,7 @@ chi:
softwareTrigger: false
phi:
description: SmarGon PHI axis
deviceClass: pxiii_bec.devices.SmarGonAxisA
deviceClass: pxiii_bec.devices.SmarGonAxisB
deviceConfig: {prefix: 'SCS', sg_url: 'http://x06da-smargopolo.psi.ch:3000'}
onFailure: buffer
enabled: true

View File

@@ -84,7 +84,7 @@ class AerotechAbrMixin(CustomPrepare):
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
scanname = self.parent.scaninfo.scan_msg.info["scan_name"]
if scanname in ("standardscan", "helicalscan"):
if scanname in ("standardscan", "helicalscan", "helicalscan1", "helicalscan2", "helicalscan3"):
d["scan_command"] = AbrCmd.MEASURE_STANDARD
d["var_1"] = scanargs["start"]
d["var_2"] = scanargs["range"]

View File

@@ -3,26 +3,30 @@ from ophyd.status import SubscriptionStatus
class PneumaticValve(EpicsSignal):
"""Wrapper around EpicsSignal to wait until reaching target
"""Wrapper around EpicsSignal to wait until reaching target. Use the
status returned by set() to wait until movement is finished. Do NOT
use put if you want to wait, that's a low-level PV write op.
NOTE: The SET and GET states do not match exactly
"""
def set(self, value, *, timeout=5, settle_time=0.1):
"""Overloaded settet that waits for target state"""
"""Overloaded setter that waits for target state
NOTE: The SubscriptionStatus callback does not run in put()
"""
# Lazy hardcoded state lookup
target = 1 if value in (1, "Measure") else 2
# Define wait until the busy flag goes high
# Define wait until an end state is reached
def on_target(*, value, **_):
return bool(value == target)
# Subscribe in advance and wait for update
# Subscribe a monitor in advance and wait for update
status = SubscriptionStatus(self, on_target, timeout=timeout, settle_time=0.1)
# Set value to start movement
super().set(value, settle_time=settle_time).wait()
# Return the monitor
return status
def check_value(self, value):

View File

@@ -198,7 +198,8 @@ class SmarGonAxis(PVPositioner):
def omove(self, position, wait=True, timeout=2.0, moved_cb=None):
"""Original move command without the BEC wrappers"""
status = self.setpoint.set(position, settle_time=0.1).wait()
status = self.setpoint.set(position, settle_time=0.1)
status.wait()
if not wait:
return status

View File

@@ -4,4 +4,6 @@ from .mx_measurements import (
MeasureRasterSimple,
MeasureScreening,
MeasureHelical,
MeasureHelical2,
)

View File

@@ -353,3 +353,127 @@ class MeasureHelical(AerotechFlyscanBase):
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()
class MeasureHelical2(AerotechFlyscanBase):
"""Helical scan using the OMEGA motor
Measure an absolute continous line scan from `start` to `start` + `range`
during `move_time` on the Omega axis with PSO output.
The scan itself is executed by the scan service running on the Aerotech
controller. Ophyd just configures, launches it and waits for completion.
Example
-------
>>> scans.standard_wedge(start=42, range=10, move_time=20)
Parameters
----------
start : float
Scan start position of the axis.
range : float
Scan range of the axis.
move_time : float
Total travel time for the movement [s].
ready_rate : float, optional
No clue what is this... (default=500)
sg_start : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_end : (float, float, float, float, float)
Complete SmarGon coordinate in tuple form.
sg_steps : int
Number of steps with SmarGon.
"""
scan_name = "helicalscan2"
required_kwargs = ["start", "range", "move_time", "sg_start", "sg_end", "sg_steps"]
point_id = 0
# def __init__(self, *args, parameter: dict = None, **kwargs):
# """Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
# parameters.
# """
# self.num_pos = kwargs["sg_steps"]
# super().__init__(*args, parameter=parameter, **kwargs)
def prepare_positions(self):
# Smargon has no velocity control
self.smargon_start = np.array(self.caller_kwargs.get("sg_start"))
self.smargon_end = np.array(self.caller_kwargs.get("sg_end"))
self.smargon_steps = self.caller_kwargs.get("sg_steps")
self.smargon_range = self.smargon_end - self.smargon_start
self.smargon_step_size = self.smargon_range / self.smargon_steps
self.smargon_step_time = self.caller_kwargs.get("move_time") / self.smargon_steps
logger.info(f"Start:\t{self.smargon_start}")
logger.info(f"End:\t{self.smargon_end}")
logger.info(f"Steps:\t{self.smargon_steps}")
logger.info(f"Range:\t{self.smargon_range}")
logger.info(f"StepSize:\t{self.smargon_step_size}")
logger.info(f"StepTime:\t{self.smargon_step_time}")
self.num_pos = self.smargon_steps
self.positions = np.linspace(self.smargon_start, self.smargon_end, self.smargon_steps)
self.start_pos = self.positions[0, :]
# Call super
yield from super().prepare_positions()
# def update_scan_motors(self):
# """ Update step scan motors"""
# self.scan_motors = ['shx', 'shy', 'shz', 'chi', 'phi']
def pre_scan(self):
"""Mostly just checking if ABR stage is ok..."""
# Move roughly to start position
st0 = yield from self.stubs.send_rpc("shx", "omove", self.start_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", self.start_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", self.start_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", self.start_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", self.start_pos[4])
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
# print(f"\n\n{self.readout_priority}\n\n")
# Call super
yield from super().pre_scan()
def scan_core(self):
"""The actual scan logic comes here."""
# Kick off the run
yield from self.stubs.send_rpc_and_wait("abr", "kickoff")
logger.info("Measurement launched on the ABR stage...")
logger.info("Performing SmarGon stepping...")
for _, sg_pos in enumerate(self.positions):
# sg_pos = self.smargon_start + ss * self.smargon_step_size
# Move to position but don't care
st0 = yield from self.stubs.send_rpc("shx", "omove", sg_pos[0])
st1 = yield from self.stubs.send_rpc("shy", "omove", sg_pos[1])
st2 = yield from self.stubs.send_rpc("shz", "omove", sg_pos[2])
st3 = yield from self.stubs.send_rpc("chi", "omove", sg_pos[3])
st4 = yield from self.stubs.send_rpc("phi", "omove", sg_pos[4])
t_start = time.time()
st0.wait()
st1.wait()
st2.wait()
st3.wait()
st4.wait()
t_end = time.time()
t_elapsed = t_end-t_start
time.sleep(max(self.smargon_step_time-t_elapsed, 0))
yield from self.stubs.read(group="monitored", point_id=self.point_id)
self.point_id += 1
# Wait for scan task to finish
if self.abr_complete:
if self.abr_timeout is not None:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
st.wait()
else:
st = yield from self.stubs.send_rpc_and_wait("abr", "complete")
st.wait()