Scans from Fede

This commit is contained in:
2024-10-21 16:08:19 +02:00
parent 4f3e31205f
commit d4470ed613
2 changed files with 272 additions and 0 deletions

View File

@@ -0,0 +1,193 @@
import time
import numpy as np
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import Acquire, AsyncFlyScanBase
class AcquireDark(Acquire):
scan_name = "acquire_dark"
def __init__(self, num: int, **kwargs):
"""
Acquire a dark image. This scan is used to acquire a dark image. The dark image is an image taken with the shutter
closed and no beam on the sample. The dark image is used to correct the data images for dark current.
Args:
num (int): number of dark images to acquire
Returns:
ScanReport
Examples:
>>> scans.acquire_dark(5)
"""
super().__init__(**kwargs)
self.burst_at_each_point = num
self.shutter = "hx" # change to the correct shutter device
def scan_core(self):
# close the shutter
yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0])
yield from super().scan_core()
class AcquireFlat(Acquire):
scan_name = "acquire_flat"
def __init__(self, num: int, out_position: float, **kwargs):
"""
Acquire a flat field image. This scan is used to acquire a flat field image. The flat field image is an image taken
with the shutter open but the sample out of the beam. The flat field image is used to correct the data images for
non-uniformity in the detector.
Args:
num (int): number of flat field images to acquire
out_position (float): position to move the sample stage to take the flat field image
Returns:
ScanReport
Examples:
>>> scans.acquire_flat(5, 20)
"""
super().__init__(**kwargs)
self.burst_at_each_point = num
self.out_position = out_position
self.sample_stage = "samy" # change to the correct sample stage device
self.shutter = "hx" # change to the correct shutter device
def scan_core(self):
# open the shutter and move the sample stage to the out position
yield from self.stubs.set_and_wait(
device=[self.shutter, self.sample_stage], positions=[1, self.out_position]
)
yield from super().scan_core()
class TutorialFlyScanContLine(AsyncFlyScanBase):
scan_name = "tutorial_cont_line_fly_scan"
def __init__(
self,
motor: DeviceBase,
start: float,
stop: float,
sample_in: float,
sample_out: float,
num_darks: int = 0,
num_flats: int = 0,
exp_time: float = 0,
relative: bool = False,
**kwargs,
):
"""
A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst
acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor
reaches the end position.
Args:
motor (DeviceBase): motor to move continuously from start to stop position
start (float): start position
stop (float): stop position
sample_in (float): position to move the sample stage to take the data image
sample_out (float): position to move the sample stage to take the flat field image
exp_time (float): exposure time in seconds. Default is 0.
relative (bool): if True, the motor will be moved relative to its current position. Default is False.
Returns:
ScanReport
Examples:
>>> scans.tutorial_cont_line_fly_scan(dev.sam_rot, 0, 360, 20, -200, num_darks=5, num_flats=5, exp_time=0.1)
"""
super().__init__(**kwargs)
self.motor = motor
self.start = start
self.stop = stop
self.sample_in = sample_in
self.sample_out = sample_out
self.num_darks = num_darks
self.num_flats = num_flats
self.exp_time = exp_time
self.relative = relative
self.sample_stage = "samy" # change to the correct sample stage device
self.shutter = "hx" # change to the correct shutter device
def prepare_positions(self):
self.positions = np.array([[self.start], [self.stop]])
self.num_pos = None
yield from self._set_position_offset()
def scan_core(self):
# move the motor to the start position
if self.num_darks:
self.connector.send_client_info(
f"Acquiring {self.num_darks} dark images",
show_asap=True,
rid=self.metadata.get("RID"),
)
darks = AcquireDark(
num=self.num_darks,
device_manager=self.device_manager,
metadata=self.metadata,
exp_time=self.exp_time,
)
yield from darks.scan_core()
self.point_id = darks.point_id
if self.num_flats:
self.connector.send_client_info(
f"Acquiring {self.num_flats} flat field images",
show_asap=True,
rid=self.metadata.get("RID"),
)
flats = AcquireFlat(
num=self.num_flats,
exp_time=self.exp_time,
out_position=self.sample_out,
device_manager=self.device_manager,
metadata=self.metadata,
)
flats.point_id = self.point_id
yield from flats.scan_core()
self.point_id = flats.point_id
# move to in position and open the shutter
yield from self.stubs.set_and_wait(
device=[self.sample_stage, self.shutter], positions=[self.sample_in, 1]
)
yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0])
# start the flyer
flyer_request = yield from self.stubs.set_with_response(
device=self.motor, value=self.positions[1][0]
)
self.connector.send_client_info(
"Starting the scan", show_asap=True, rid=self.metadata.get("RID")
)
# send a trigger
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
while True:
# read the data
yield from self.stubs.read_and_wait(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
time.sleep(1)
if self.stubs.request_is_completed(flyer_request):
# stop the scan if the motor has reached the stop position
break
# increase the point id
self.point_id += 1
def finalize(self):
yield from super().finalize()
self.num_pos = self.point_id + 1

View File

@@ -0,0 +1,79 @@
import numpy as np
from bec_lib.device import DeviceBase
from bec_server.scan_server.scans import AsyncFlyScanBase
class TutorialFlyScanContLine(AsyncFlyScanBase):
scan_name = "tutorial_cont_line_fly_scan"
def __init__( # Input arguments
self,
motor: DeviceBase,
start: float,
stop: float,
exp_time: float = 0,
relative: bool = False,
**kwargs, ### For more metadata
):
"""
A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst
acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor
reaches the end position.
Args:
motor (DeviceBase): motor to move continuously from start to stop position
start (float): start position
stop (float): stop position
exp_time (float): exposure time in seconds. Default is 0.
relative (bool): if True, the motor will be moved relative to its current position. Default is False.
Returns:
ScanReport
Examples:
>>> scans.tutorial_cont_line_fly_scan(dev.sam_rot, 0, 180, exp_time=0.1)
"""
super().__init__(**kwargs) # I need to just give weiter these
self.motor = motor # Can filter the config for the active rotation stage
self.start = start
self.stop = stop
self.exp_time = exp_time
self.relative = relative
def prepare_positions(self):
self.positions = np.array([[self.start], [self.stop]])
self.num_pos = None # If I do not want to read out anything, this could be 1 -> If it is known it is easy for the progress bar
yield from self._set_position_offset()
def scan_core(self):
# move the motor to the start position = move command in the scan world
yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0]) # go to zero
# start the flyer -> start rotation
flyer_request = yield from self.stubs.set_with_response(device=self.motor, value=self.positions[1][0])
while True: # loop for what happens during the scan
# send a trigger
yield from self.stubs.trigger(group="trigger", point_id=self.point_id)
# wait for the trigger to complete
yield from self.stubs.wait(
wait_type="trigger", group="trigger", wait_time=self.exp_time
)
# read the data -> asynchrone devices large detector (bekomme die daten erst am ende vom scan )
yield from self.stubs.read_and_wait(
group="primary", wait_group="readout_primary", point_id=self.point_id
)
if self.stubs.request_is_completed(flyer_request):
# stop the scan if the motor has reached the stop position
break
# increase the point id
self.point_id += 1
def finalize(self): # finalize erweitern
yield from super().finalize()
self.num_pos = self.point_id + 1