diff --git a/tomcat_bec/scans/tutorial_fly_scan.py b/tomcat_bec/scans/tutorial_fly_scan.py new file mode 100644 index 0000000..ead9ae4 --- /dev/null +++ b/tomcat_bec/scans/tutorial_fly_scan.py @@ -0,0 +1,193 @@ +import time + +import numpy as np +from bec_lib.device import DeviceBase +from bec_server.scan_server.scans import Acquire, AsyncFlyScanBase + + +class AcquireDark(Acquire): + scan_name = "acquire_dark" + + def __init__(self, num: int, **kwargs): + """ + Acquire a dark image. This scan is used to acquire a dark image. The dark image is an image taken with the shutter + closed and no beam on the sample. The dark image is used to correct the data images for dark current. + + Args: + num (int): number of dark images to acquire + + Returns: + ScanReport + + Examples: + >>> scans.acquire_dark(5) + + """ + super().__init__(**kwargs) + self.burst_at_each_point = num + self.shutter = "hx" # change to the correct shutter device + + def scan_core(self): + # close the shutter + yield from self.stubs.set_and_wait(device=[self.shutter], positions=[0]) + yield from super().scan_core() + + +class AcquireFlat(Acquire): + scan_name = "acquire_flat" + + def __init__(self, num: int, out_position: float, **kwargs): + """ + Acquire a flat field image. This scan is used to acquire a flat field image. The flat field image is an image taken + with the shutter open but the sample out of the beam. The flat field image is used to correct the data images for + non-uniformity in the detector. + + Args: + num (int): number of flat field images to acquire + out_position (float): position to move the sample stage to take the flat field image + + Returns: + ScanReport + + Examples: + >>> scans.acquire_flat(5, 20) + + """ + super().__init__(**kwargs) + self.burst_at_each_point = num + self.out_position = out_position + self.sample_stage = "samy" # change to the correct sample stage device + self.shutter = "hx" # change to the correct shutter device + + def scan_core(self): + # open the shutter and move the sample stage to the out position + yield from self.stubs.set_and_wait( + device=[self.shutter, self.sample_stage], positions=[1, self.out_position] + ) + yield from super().scan_core() + + +class TutorialFlyScanContLine(AsyncFlyScanBase): + scan_name = "tutorial_cont_line_fly_scan" + + def __init__( + self, + motor: DeviceBase, + start: float, + stop: float, + sample_in: float, + sample_out: float, + num_darks: int = 0, + num_flats: int = 0, + exp_time: float = 0, + relative: bool = False, + **kwargs, + ): + """ + A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor + reaches the end position. + + Args: + motor (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + sample_in (float): position to move the sample stage to take the data image + sample_out (float): position to move the sample stage to take the flat field image + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.tutorial_cont_line_fly_scan(dev.sam_rot, 0, 360, 20, -200, num_darks=5, num_flats=5, exp_time=0.1) + + """ + super().__init__(**kwargs) + self.motor = motor + self.start = start + self.stop = stop + self.sample_in = sample_in + self.sample_out = sample_out + self.num_darks = num_darks + self.num_flats = num_flats + self.exp_time = exp_time + self.relative = relative + self.sample_stage = "samy" # change to the correct sample stage device + self.shutter = "hx" # change to the correct shutter device + + def prepare_positions(self): + self.positions = np.array([[self.start], [self.stop]]) + self.num_pos = None + yield from self._set_position_offset() + + def scan_core(self): + # move the motor to the start position + + if self.num_darks: + self.connector.send_client_info( + f"Acquiring {self.num_darks} dark images", + show_asap=True, + rid=self.metadata.get("RID"), + ) + darks = AcquireDark( + num=self.num_darks, + device_manager=self.device_manager, + metadata=self.metadata, + exp_time=self.exp_time, + ) + yield from darks.scan_core() + self.point_id = darks.point_id + + if self.num_flats: + self.connector.send_client_info( + f"Acquiring {self.num_flats} flat field images", + show_asap=True, + rid=self.metadata.get("RID"), + ) + flats = AcquireFlat( + num=self.num_flats, + exp_time=self.exp_time, + out_position=self.sample_out, + device_manager=self.device_manager, + metadata=self.metadata, + ) + flats.point_id = self.point_id + yield from flats.scan_core() + self.point_id = flats.point_id + + # move to in position and open the shutter + yield from self.stubs.set_and_wait( + device=[self.sample_stage, self.shutter], positions=[self.sample_in, 1] + ) + + yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0]) + + # start the flyer + flyer_request = yield from self.stubs.set_with_response( + device=self.motor, value=self.positions[1][0] + ) + + self.connector.send_client_info( + "Starting the scan", show_asap=True, rid=self.metadata.get("RID") + ) + # send a trigger + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + while True: + # read the data + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + time.sleep(1) + + if self.stubs.request_is_completed(flyer_request): + # stop the scan if the motor has reached the stop position + break + + # increase the point id + self.point_id += 1 + + def finalize(self): + yield from super().finalize() + self.num_pos = self.point_id + 1 diff --git a/tomcat_bec/scans/tutorial_fly_scan_cont_line.py b/tomcat_bec/scans/tutorial_fly_scan_cont_line.py new file mode 100644 index 0000000..e333ff8 --- /dev/null +++ b/tomcat_bec/scans/tutorial_fly_scan_cont_line.py @@ -0,0 +1,79 @@ +import numpy as np + +from bec_lib.device import DeviceBase +from bec_server.scan_server.scans import AsyncFlyScanBase + + +class TutorialFlyScanContLine(AsyncFlyScanBase): + scan_name = "tutorial_cont_line_fly_scan" + + def __init__( # Input arguments + self, + motor: DeviceBase, + start: float, + stop: float, + exp_time: float = 0, + relative: bool = False, + **kwargs, ### For more metadata + ): + """ + A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor + reaches the end position. + + Args: + motor (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.tutorial_cont_line_fly_scan(dev.sam_rot, 0, 180, exp_time=0.1) + + """ + super().__init__(**kwargs) # I need to just give weiter these + self.motor = motor # Can filter the config for the active rotation stage + self.start = start + self.stop = stop + self.exp_time = exp_time + self.relative = relative + + def prepare_positions(self): + self.positions = np.array([[self.start], [self.stop]]) + self.num_pos = None # If I do not want to read out anything, this could be 1 -> If it is known it is easy for the progress bar + yield from self._set_position_offset() + + def scan_core(self): + # move the motor to the start position = move command in the scan world + yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0]) # go to zero + + # start the flyer -> start rotation + flyer_request = yield from self.stubs.set_with_response(device=self.motor, value=self.positions[1][0]) + + + while True: # loop for what happens during the scan + # send a trigger + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + # wait for the trigger to complete + yield from self.stubs.wait( + wait_type="trigger", group="trigger", wait_time=self.exp_time + ) + # read the data -> asynchrone devices large detector (bekomme die daten erst am ende vom scan ) + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + + if self.stubs.request_is_completed(flyer_request): + # stop the scan if the motor has reached the stop position + break + + # increase the point id + self.point_id += 1 + + def finalize(self): # finalize erweitern + yield from super().finalize() + self.num_pos = self.point_id + 1 \ No newline at end of file