Files
cristallina/mx/mx_experiment.py

320 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python
import sys
import os
from loguru import logger
# at the moment this allows us to group the subdirectories as modules easily
# TODO: a more general way would be to have this cristallina as a installed package
sys.path.insert(0, os.path.expanduser("/sf/cristallina/applications/slic/cristallina"))
def setup_general_logging():
"""Setup logging to console and files in both the snapshots and
the respective pgroup.
"""
logger.remove()
logger.add(
sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO",
)
logger.info("Loading started.")
# create file handler which logs
try:
logger.add(
"/sf/cristallina/applications/beamline/snapshots/slic_logs/cristallina_mx.log",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="DEBUG",
rotation="1 week",
)
logger.info("Logging to snapshots.")
except PermissionError as e:
logger.warning("Cannot write log file to snapshots.")
logger.warning(e)
def setup_logging_pgroup(pgroup, level="INFO"):
try:
logger.add(
f"/sf/cristallina/data/{pgroup}/scratch/slic.log",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level=level,
rotation="1 week",
)
logger.info(f"Logging to pgroup {pgroup}.")
except PermissionError as e:
logger.warning(f"Cannot write log file to pgroup {pgroup}.")
# We setup the logging before going further so that
# other modules can write startup messages into the log file.
setup_general_logging()
from slic.gui import GUI
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
from slic.core.acquisition import SFAcquisition, PVAcquisition
from slic.core.condition import PVCondition
from slic.core.scanner import Scanner
from slic.devices.general.motor import Motor
from slic.utils import devices, Marker, as_shortcut, snapshot
from slic.utils import Channels, Config, Elog, Screenshot, PV
from slic.core.acquisition.fakeacquisition import FakeAcquisition
print( os.getcwd() )
from channels.bs_channels import (
detectors_MX,
bs_channels,
camera_channels,
)
from channels.pv_channels import pv_channels
################# DEVICES #################
dummy = DummyAdjustable(units="au")
from devices.knife_edge import KnifeEdge
# from devices.standa import standa
# from devices.newport import newport
from beamline.components import (
upstream_attenuator,
attenuator,
pp_shutter,
pulsepicker,
alignment_laser,
pbps113,
pbps149,
)
from systems.components import cta
# MX adajustables
from slic.core.device.simpledevice import SimpleDevice
from slic.devices.general.motor import Motor
# hve positioners
hve_mot_v = Motor("SARES30-MOBI1:MOT_1")
hve_mot_h1 = Motor("SARES30-MOBI1:MOT_2")
hve_mot_h2 = Motor("SARES30-MOBI1:MOT_3")
# collimator
coll_x = Motor("SARES30-SMX:MCS1")
coll_y = Motor("SARES30-SMX:MCS2")
# post-tube
pt_x1 = Motor("SARES30-SMX:MCS4")
pt_x2 = Motor("SARES30-SMX:MCS5")
pt_y1 = Motor("SARES30-SMX:MCS6")
pt_y2 = Motor("SARES30-SMX:MCS7")
pt_z = Motor("SARES30-SMX:MCS8")
# post-tube
detector_z = Motor("SAR-EXPMX:MOT_DET_Z")
# post-tube
backlight = Motor("SAR-EXPMX:MOT_BLGT")
# fast stage
mx_fast_x = Motor("SAR-EXPMX:MOT_FX")
mx_fast_y = Motor("SAR-EXPMX:MOT_FY")
################# DAQ Setup #################
instrument = "cristallina"
#pgroup = "p21734"
pgroup = "p22215"
# setup pgroup specific logger
setup_logging_pgroup(pgroup)
# Moved from main cristallina.py here temporarily
mxdaq = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels,
default_pvs=pv_channels,
default_detectors=detectors_MX,
rate_multiplicator=1,
append_user_tag_to_data_dir=True
)
# There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place.
try:
mxdaq.update_config_pvs()
except Exception as e:
logger.warning(f"error updating config pvs for mxdaq: {e}")
check_intensity_gas_monitor = PVCondition(
"SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US",
vmin=4,
vmax=2000,
wait_time=0.5,
required_fraction=0.8,
)
mxscan = Scanner(default_acquisitions=[mxdaq], condition=check_intensity_gas_monitor)
mxgui = GUI(mxscan, show_goto=True, show_spec=False, show_scan=True, show_scan2D=False, show_run=False, show_static=False, show_sfx=True, start_tab="sfx")
#############################################
################# DAQ Setup #################
############## in positions ##############
coll_in_pos_x, coll_in_pos_y = 9.50, 1.38
backlight_in = -30000
detector_in_pos = 116
pt_in_pos_x1 = -4.301
pt_in_pos_x2 = -4.501
pt_in_pos_y1 = 10.996
pt_in_pos_y2 = 10.664
pt_in_pos_z = 0.5
############## out positions ##############
coll_out_pos_x, coll_out_pos_y = -12, 1.38
backlight_out = 1000
detector_out_pos = 220
pt_out_pos_x1 = 4.8
pt_out_pos_x2 = 4.6
pt_out_pos_y1 = -12.8
pt_out_pos_y2 = -13.1
pt_out_pos_z = -8
@as_shortcut
def a_data_collection():
# move backlight up
backlight.set( backlight_out ).wait()
# post-tube in
pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel
pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel
pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel
pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel
for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in): # this waits for all of them to be done!
logger.info(f"waiting for post-tube to move in")
t.wait()
pt_z_in = pt_z.set( pt_in_pos_z ).wait() # this no longer runs in parallel
# collimator in
cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel
cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel
for t in (cx_in, cy_in): # this waits for all of them to be done!
logger.info(f"waiting for collimator to move in")
t.wait()
# detector in
logger.info(f"waiting for detector to move in")
detector_z.set( detector_in_pos ).wait() # this runs in parallel
@as_shortcut
def b_sample_alignment():
# detector out
logger.info(f"waiting for detector to move out")
detector_z.set( detector_out_pos ).wait()
# collimator out
cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel
cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel
for t in (cx_out, cy_out): # this waits for all of them to be done!
logger.info(f"waiting for collimator to move out")
t.wait()
# post-tube out
pt_z_out = pt_z.set( pt_out_pos_z ).wait() # this runs in parallel
pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel
pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel
pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel
pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel
for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done!
logger.info(f"waiting for post_tube to move out")
t.wait()
@as_shortcut
def c_backlight_in():
# safety logic for backlight in
if round( coll_x.get(), 2 ) == coll_out_pos_x and round( pt_y1.get(), 2 ) == pt_out_pos_y1 and round( detector_z.get(), 2 ) > detector_in_pos:
backlight.set( backlight_in ).wait()
else:
logger.warning( "some devices are in the way" )
@as_shortcut
def ca_backlight_out():
backlight.set( backlight_out ).wait()
@as_shortcut
def post_tube_in():
# safety logic for backlight in
if backlight.get() > 0 and detector_z.get() > 20:
pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel
pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel
pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel
pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel
pt_z_in = pt_z.set( pt_in_pos_z ) # this runs in parallel
for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in, pt_z_in): # this waits for all of them to be done!
t.wait()
else:
logger.warning( "devices are in the way" )
@as_shortcut
def post_tube_out():
# safety logic for post-tube out
if detector_z.get() > 20:
pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel
pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel
pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel
pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel
pt_z_out = pt_z.set( pt_out_pos_z ) # this runs in parallel
for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done!
t.wait()
else:
logger.warning( "detector needs to move" )
@as_shortcut
def coll_in():
cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel
cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel
for t in (cx_in, cy_in): # this waits for all of them to be done!
logger.info(f"waiting for collimator to move in")
t.wait()
@as_shortcut
def coll_out():
cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel
cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel
for t in (cx_out, cy_out): # this waits for all of them to be done!
logger.info(f"waiting for collimator to move out")
t.wait()
@as_shortcut
def detector_in():
# safety logic for detector in
if backlight.get() > 0 and pt_y1.get() > 8:
logger.info(f"waiting for detector to move in")
detector_z.set( detector_in_pos ).wait() # this runs in parallel
else:
logger.warning( "devices are in the way" )
@as_shortcut
def detector_out():
logger.info(f"waiting for detector to move out")
detector_z.set( detector_out_pos ).wait() # this runs in parallel