#!/usr/bin/env python import sys import os from loguru import logger # at the moment this allows us to group the subdirectories as modules easily # TODO: a more general way would be to have this cristallina as a installed package sys.path.insert(0, os.path.expanduser("/sf/cristallina/applications/slic/cristallina")) def setup_general_logging(): """Setup logging to console and files in both the snapshots and the respective pgroup. """ logger.remove() logger.add( sys.stderr, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="INFO", ) logger.info("Loading started.") # create file handler which logs try: logger.add( "/sf/cristallina/applications/beamline/snapshots/slic_logs/cristallina_mx.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level="DEBUG", rotation="1 week", ) logger.info("Logging to snapshots.") except PermissionError as e: logger.warning("Cannot write log file to snapshots.") logger.warning(e) def setup_logging_pgroup(pgroup, level="INFO"): try: logger.add( f"/sf/cristallina/data/{pgroup}/scratch/slic.log", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", level=level, rotation="1 week", ) logger.info(f"Logging to pgroup {pgroup}.") except PermissionError as e: logger.warning(f"Cannot write log file to pgroup {pgroup}.") # We setup the logging before going further so that # other modules can write startup messages into the log file. setup_general_logging() from slic.gui import GUI from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable from slic.core.acquisition import SFAcquisition, PVAcquisition from slic.core.condition import PVCondition from slic.core.scanner import Scanner from slic.devices.general.motor import Motor from slic.utils import devices, Marker, as_shortcut, snapshot from slic.utils import Channels, Config, Elog, Screenshot, PV from slic.core.acquisition.fakeacquisition import FakeAcquisition print( os.getcwd() ) from channels.bs_channels import ( detectors_MX, bs_channels, camera_channels, ) from channels.pv_channels import pv_channels ################# DEVICES ################# dummy = DummyAdjustable(units="au") from devices.knife_edge import KnifeEdge # from devices.standa import standa # from devices.newport import newport from beamline.components import ( upstream_attenuator, attenuator, pp_shutter, pulsepicker, alignment_laser, pbps113, pbps149, ) from systems.components import cta # MX adajustables from slic.core.device.simpledevice import SimpleDevice from slic.devices.general.motor import Motor # hve positioners hve_mot_v = Motor("SARES30-MOBI1:MOT_1") hve_mot_h1 = Motor("SARES30-MOBI1:MOT_2") hve_mot_h2 = Motor("SARES30-MOBI1:MOT_3") # collimator coll_x = Motor("SARES30-SMX:MCS1") coll_y = Motor("SARES30-SMX:MCS2") # post-tube pt_x1 = Motor("SARES30-SMX:MCS4") pt_x2 = Motor("SARES30-SMX:MCS5") pt_y1 = Motor("SARES30-SMX:MCS6") pt_y2 = Motor("SARES30-SMX:MCS7") pt_z = Motor("SARES30-SMX:MCS8") # post-tube detector_z = Motor("SAR-EXPMX:MOT_DET_Z") # post-tube backlight = Motor("SAR-EXPMX:MOT_BLGT") # fast stage mx_fast_x = Motor("SAR-EXPMX:MOT_FX") mx_fast_y = Motor("SAR-EXPMX:MOT_FY") ################# DAQ Setup ################# instrument = "cristallina" #pgroup = "p21734" pgroup = "p22215" # setup pgroup specific logger setup_logging_pgroup(pgroup) # Moved from main cristallina.py here temporarily mxdaq = SFAcquisition( instrument, pgroup, default_channels=bs_channels, default_pvs=pv_channels, default_detectors=detectors_MX, rate_multiplicator=1, append_user_tag_to_data_dir=True ) # There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place. try: mxdaq.update_config_pvs() except Exception as e: logger.warning(f"error updating config pvs for mxdaq: {e}") check_intensity_gas_monitor = PVCondition( "SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US", vmin=4, vmax=2000, wait_time=0.5, required_fraction=0.8, ) mxscan = Scanner(default_acquisitions=[mxdaq], condition=check_intensity_gas_monitor) mxgui = GUI(mxscan, show_goto=True, show_spec=False, show_scan=True, show_scan2D=False, show_run=False, show_static=False, show_sfx=True, start_tab="sfx") ############################################# ################# DAQ Setup ################# ############## in positions ############## coll_in_pos_x, coll_in_pos_y = 9.50, 1.38 backlight_in = -30000 detector_in_pos = 116 pt_in_pos_x1 = -4.301 pt_in_pos_x2 = -4.501 pt_in_pos_y1 = 10.996 pt_in_pos_y2 = 10.664 pt_in_pos_z = 0.5 ############## out positions ############## coll_out_pos_x, coll_out_pos_y = -12, 1.38 backlight_out = 1000 detector_out_pos = 220 pt_out_pos_x1 = 4.8 pt_out_pos_x2 = 4.6 pt_out_pos_y1 = -12.8 pt_out_pos_y2 = -13.1 pt_out_pos_z = -8 @as_shortcut def a_data_collection(): # move backlight up backlight.set( backlight_out ).wait() # post-tube in pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in): # this waits for all of them to be done! logger.info(f"waiting for post-tube to move in") t.wait() pt_z_in = pt_z.set( pt_in_pos_z ).wait() # this no longer runs in parallel # collimator in cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel for t in (cx_in, cy_in): # this waits for all of them to be done! logger.info(f"waiting for collimator to move in") t.wait() # detector in logger.info(f"waiting for detector to move in") detector_z.set( detector_in_pos ).wait() # this runs in parallel @as_shortcut def b_sample_alignment(): # detector out logger.info(f"waiting for detector to move out") detector_z.set( detector_out_pos ).wait() # collimator out cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel for t in (cx_out, cy_out): # this waits for all of them to be done! logger.info(f"waiting for collimator to move out") t.wait() # post-tube out pt_z_out = pt_z.set( pt_out_pos_z ).wait() # this runs in parallel pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done! logger.info(f"waiting for post_tube to move out") t.wait() @as_shortcut def c_backlight_in(): # safety logic for backlight in if round( coll_x.get(), 2 ) == coll_out_pos_x and round( pt_y1.get(), 2 ) == pt_out_pos_y1 and round( detector_z.get(), 2 ) > detector_in_pos: backlight.set( backlight_in ).wait() else: logger.warning( "some devices are in the way" ) @as_shortcut def ca_backlight_out(): backlight.set( backlight_out ).wait() @as_shortcut def post_tube_in(): # safety logic for backlight in if backlight.get() > 0 and detector_z.get() > 20: pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel pt_z_in = pt_z.set( pt_in_pos_z ) # this runs in parallel for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in, pt_z_in): # this waits for all of them to be done! t.wait() else: logger.warning( "devices are in the way" ) @as_shortcut def post_tube_out(): # safety logic for post-tube out if detector_z.get() > 20: pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel pt_z_out = pt_z.set( pt_out_pos_z ) # this runs in parallel for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done! t.wait() else: logger.warning( "detector needs to move" ) @as_shortcut def coll_in(): cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel for t in (cx_in, cy_in): # this waits for all of them to be done! logger.info(f"waiting for collimator to move in") t.wait() @as_shortcut def coll_out(): cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel for t in (cx_out, cy_out): # this waits for all of them to be done! logger.info(f"waiting for collimator to move out") t.wait() @as_shortcut def detector_in(): # safety logic for detector in if backlight.get() > 0 and pt_y1.get() > 8: logger.info(f"waiting for detector to move in") detector_z.set( detector_in_pos ).wait() # this runs in parallel else: logger.warning( "devices are in the way" ) @as_shortcut def detector_out(): logger.info(f"waiting for detector to move out") detector_z.set( detector_out_pos ).wait() # this runs in parallel