import time from loguru import logger # TODO # Most of these are outdated and workarounds for previous issues. # Do not use without review as for example the CTA is now much better integrated. def simple_scan(): """ adjustable: Adjustable names to scan. n_pulses (int): Number of pulses per step. filename (str): Name of output file. """ scan.scan1D(adjustable, start_pos, end_pos, step_size, n_pulses, filename) # TODO: make stand work with stand as well, what's the easiest way here? def simple_acquisition(daq, filename, n_pulses=10, sample="--", comment="", parameters="", is_scan_step=False, stand_client=None): """ Performs an acquisition and records the given comments and parameters. is_scan_step = True if we want to continue the current run (which needs to have been acquired in the same way). """ logger.info("Simple acquisition started.") task = daq.acquire(filename, n_pulses=n_pulses, n_repeat=1, is_scan_step=is_scan_step, wait=True) run_number = daq.client.run_number all_things = {} all_things["run_number"] = run_number all_things["time"] = time.asctime() all_things["topic"] = "Beamline commissioning" all_things["sample"] = sample all_things["comment"] = comment all_things["parameters"] = parameters # TODO # combine with other parameters (spreadsheet overview) # overviews = {thing.name: thing.get() for thing in overview} # combined = all_things | overviews combined = all_things if stand_client is not None: stand_client.add_row(**combined) logger.debug(f"Simple acquisition complete: {combined}") return run_number def scan_positions(positions_x, positions_y): for x in positions_x: for y in positions_y: smaract_mini_XYZ.x.mv(x) smaract_mini_XYZ.y.mv(y) time.sleep(1) # while smaract_mini_XYZ.x.is_moving(): # time.sleep(1) simple_acquisition( "Speckle scan grid 5", n_pulses=100, comment=f"Grid scan 5", parameters=f"at x={x:.5f}mm, y={y:.5f}mm, OWIS:{OWIS.get():.3f}mm", ) def minimized_acquisition(daq, run_number=839, n_pulses=100): if n_pulses > 6000: logger.warning("DAQ cannot provide data of so many pulses in one request. Aborting.") return import cta_lib repetitions = n_pulses # this is such an ugly interface... # anyway: we set the number of repetitions for the CTA cta.cta_client.set_repetition_config(config={"mode": cta_lib.CtaLib.RepetitionMode.NTIMES, "n": repetitions}) # logger.info(f"CTA start sequence, total runtime approximately {repetitions/10:.1f}s.") cta.cta_client.start() time.sleep(1) start = time.time() while cta.cta_client.is_running(): print(f"Wait for CTA sequence to finish, {time.time() - start:.0f}s elapsed.") time.sleep(5) # Without the wait time the DAQ is returning fewer images than we asked it for print("Waiting 60s for DAQ pipeline") time.sleep(60) CTA_sequence_start_PID = int(PV("SAR-CCTA-ESC:seq0Ctrl-StartedAt-O").get()) logger.info(f"Retrieve images starting from pid {CTA_sequence_start_PID}.") # n_pulses*10 because of current rate_multiplicator res = daq.retrieve( "CTA_pulses", np.arange(CTA_sequence_start_PID, CTA_sequence_start_PID + n_pulses * 10), run_number=run_number, ) logger.debug(res) def acquire_measured(daq, filename, sample="", comment="", parameters="", repetitions=1, no_CTA=False): """ Combined acquisition: necessary to create a new run and then start the CTA sequence to collect data for repetitions x 10 minutes at 10 Hz. This is all rather brittle and rough but runs OK at the moment. """ run_number = daq.client.run_number # get current run number all_things = {} all_things["run_number"] = run_number + 1 # but store next one all_things["time"] = time.asctime() all_things["topic"] = "Li magnetism" all_things["sample"] = sample all_things["comment"] = comment all_things["parameters"] = parameters overviews = {thing.name: thing.get() for thing in overview} combined = all_things | overviews stand_client.add_row(**combined) logger.debug(f"Simple acquisition complete: {combined}") run_number = None # should automatically increase in broker client logger.info(f"Retrieve images starting from pid {CTA_sequence_start_PID}.") # n_pulses*10 because of current rate_multiplicator res = daq.retrieve( "CTA_pulses", np.arange(CTA_sequence_start_PID, CTA_sequence_start_PID + n_pulses * 10), run_number=run_number, ) logger.debug(res) def acquire_long(daq, filename, sample="", comment="", parameters="", repetitions=1, no_CTA=False): """ Combined acquisition: necessary to create a new run and then start the CTA sequence to collect data for repetitions x 10 minutes at 10 Hz. This is all rather brittle and rough but runs OK at the moment. """ n_pulses = 6000 # 10 minutes, longer is not possible because the DAQ complains. So we split this up. if no_CTA: run_number = simple_acquisition(filename, n_pulses=n_pulses, sample=sample, comment=comment, parameters=parameters, daq=daq) return # only 10 pulses as a baseline run_number = simple_acquisition(filename, n_pulses=10, sample=sample, comment=comment, parameters=parameters, daq=daq) for _ in range(repetitions): minimized_acquisition(run_number=run_number, n_pulses=n_pulses, daq=daq) def acquire_long_energy(daq, filename, sample="LiHoF4 sample", comment="", parameters="", repetitions=1): """ Use acquire long to make an enery scan, every energy is a new run. """ energies = np.linspace(8044, 8092, 13) #energies = np.linspace(8044, 8092, 7) for i, energy in enumerate(energies): undulators.set_target_value(energy) time.sleep(10) parameters = f'Energy: {energy}eV, multiplicator:{daq.client.config.rate_multiplicator}, steps: {i} of {len(energies)-1}' acquire_long(filename=f'LiTbF4,energy_scan_energy_{energy}_multiplicator_{daq.client.config.rate_multiplicator}',sample=sample, comment=comment, parameters=parameters, daq=daq, repetitions=repetitions, no_CTA=True) # def acquire_long_energy(filename, sample="", comment="", parameters="", daq=slow_daq, repetitions=1): # """ a brittle energy scan combined with the CTA # """ # target_values = np.arange(7490, 7540, 2) # for target in target_values: # undulators.set_target_value(target) # time.sleep(10) # run_number = simple_acquisition(filename, n_pulses=10, sample=sample, comment=comment, parameters=parameters, daq=slow_daq) # n_pulses = 60 # 1 minute. # minimized_acquisition(run_number=run_number, n_pulses=n_pulses, daq=slow_daq) def acquisition_pms(daq, run_number=200): import cta_lib cta.cta_client.set_repetition_config(config={"mode": cta_lib.CtaLib.RepetitionMode.NTIMES, "n": 1}) cta.cta_client.start() time.sleep(1) start = time.time() while cta.cta_client.is_running(): print(f"Wait for CTA sequence to finish, {time.time() - start:.0f}s elapsed.") time.sleep(5) CTA_sequence_start_PID = int(PV("SAR-CCTA-ESC:seq0Ctrl-StartedAt-O").get()) CTA_sequence_length = int(PV("SAR-CCTA-ESC:seq0Ctrl-Length-I").get()) CTA_sequence_end_PID = CTA_sequence_start_PID + CTA_sequence_length - 1 print("Start PID = ", CTA_sequence_start_PID) print("End PID = ", CTA_sequence_end_PID) # Without the wait time the DAQ is returning fewer images than we asked it for print("Waiting 30s for DAQ pipeline") time.sleep(30) logger.info(f"Retrieve images starting from pid {CTA_sequence_start_PID}.") print(np.arange(CTA_sequence_start_PID, CTA_sequence_end_PID+1)) res = daq.retrieve( "PuMa_run", np.arange(CTA_sequence_start_PID, CTA_sequence_end_PID), run_number=run_number, ) logger.debug(res) def acquire_and_pulse(tag="test"): # empty images because pulse picker is closed here, only for the generation of a new run task = daq.acquire(tag, n_pulses=1, n_repeat=1, is_scan_step=False, wait=True) run_number = int(task.filenames[0].split("/")[6][3:]) print(f"new run {run_number} generated.") acquisition_pms(run_number=run_number) def acquire_and_pulse_long(tag="test"): pp_normal(opening='on') time.sleep(0.1) # empty images because pulse picker is closed here, only for the generation of a new run task = daq.acquire(tag, n_pulses=1000, n_repeat=1, is_scan_step=False, wait=True) run_number = int(task.filenames[0].split("/")[6][3:]) print(f"new run {run_number} generated.") pp_cta() time.sleep(0.1) acquisition_pms(run_number=run_number)