320 lines
9.8 KiB
Python
Executable File
320 lines
9.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import sys
|
|
import os
|
|
from loguru import logger
|
|
|
|
# at the moment this allows us to group the subdirectories as modules easily
|
|
# TODO: a more general way would be to have this cristallina as a installed package
|
|
sys.path.insert(0, os.path.expanduser("/sf/cristallina/applications/slic/cristallina"))
|
|
|
|
def setup_general_logging():
|
|
"""Setup logging to console and files in both the snapshots and
|
|
the respective pgroup.
|
|
"""
|
|
|
|
logger.remove()
|
|
logger.add(
|
|
sys.stderr,
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level="INFO",
|
|
)
|
|
logger.info("Loading started.")
|
|
|
|
# create file handler which logs
|
|
try:
|
|
logger.add(
|
|
"/sf/cristallina/applications/beamline/snapshots/slic_logs/cristallina_mx.log",
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level="DEBUG",
|
|
rotation="1 week",
|
|
)
|
|
logger.info("Logging to snapshots.")
|
|
except PermissionError as e:
|
|
logger.warning("Cannot write log file to snapshots.")
|
|
logger.warning(e)
|
|
|
|
|
|
def setup_logging_pgroup(pgroup, level="INFO"):
|
|
try:
|
|
logger.add(
|
|
f"/sf/cristallina/data/{pgroup}/scratch/slic.log",
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level=level,
|
|
rotation="1 week",
|
|
)
|
|
logger.info(f"Logging to pgroup {pgroup}.")
|
|
except PermissionError as e:
|
|
logger.warning(f"Cannot write log file to pgroup {pgroup}.")
|
|
|
|
|
|
# We setup the logging before going further so that
|
|
# other modules can write startup messages into the log file.
|
|
setup_general_logging()
|
|
|
|
from slic.gui import GUI
|
|
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
|
|
from slic.core.acquisition import SFAcquisition, PVAcquisition
|
|
from slic.core.condition import PVCondition
|
|
from slic.core.scanner import Scanner
|
|
|
|
|
|
from slic.devices.general.motor import Motor
|
|
|
|
from slic.utils import devices, Marker, as_shortcut, snapshot
|
|
from slic.utils import Channels, Config, Elog, Screenshot, PV
|
|
from slic.core.acquisition.fakeacquisition import FakeAcquisition
|
|
|
|
|
|
print( os.getcwd() )
|
|
|
|
from channels.bs_channels import (
|
|
detectors_MX,
|
|
bs_channels,
|
|
camera_channels,
|
|
)
|
|
|
|
from channels.pv_channels import pv_channels
|
|
|
|
|
|
################# DEVICES #################
|
|
dummy = DummyAdjustable(units="au")
|
|
|
|
from devices.knife_edge import KnifeEdge
|
|
# from devices.standa import standa
|
|
# from devices.newport import newport
|
|
|
|
from beamline.components import (
|
|
upstream_attenuator,
|
|
attenuator,
|
|
pp_shutter,
|
|
pulsepicker,
|
|
alignment_laser,
|
|
pbps113,
|
|
pbps149,
|
|
)
|
|
|
|
from systems.components import cta
|
|
|
|
# MX adajustables
|
|
from slic.core.device.simpledevice import SimpleDevice
|
|
from slic.devices.general.motor import Motor
|
|
|
|
# hve positioners
|
|
hve_mot_v = Motor("SARES30-MOBI1:MOT_1")
|
|
hve_mot_h1 = Motor("SARES30-MOBI1:MOT_2")
|
|
hve_mot_h2 = Motor("SARES30-MOBI1:MOT_3")
|
|
|
|
# collimator
|
|
coll_x = Motor("SARES30-SMX:MCS1")
|
|
coll_y = Motor("SARES30-SMX:MCS2")
|
|
|
|
# post-tube
|
|
pt_x1 = Motor("SARES30-SMX:MCS4")
|
|
pt_x2 = Motor("SARES30-SMX:MCS5")
|
|
pt_y1 = Motor("SARES30-SMX:MCS6")
|
|
pt_y2 = Motor("SARES30-SMX:MCS7")
|
|
pt_z = Motor("SARES30-SMX:MCS8")
|
|
|
|
# post-tube
|
|
detector_z = Motor("SAR-EXPMX:MOT_DET_Z")
|
|
|
|
# post-tube
|
|
backlight = Motor("SAR-EXPMX:MOT_BLGT")
|
|
|
|
# fast stage
|
|
mx_fast_x = Motor("SAR-EXPMX:MOT_FX")
|
|
mx_fast_y = Motor("SAR-EXPMX:MOT_FY")
|
|
|
|
|
|
################# DAQ Setup #################
|
|
instrument = "cristallina"
|
|
#pgroup = "p21734"
|
|
pgroup = "p22215"
|
|
|
|
# setup pgroup specific logger
|
|
setup_logging_pgroup(pgroup)
|
|
|
|
# Moved from main cristallina.py here temporarily
|
|
mxdaq = SFAcquisition(
|
|
instrument,
|
|
pgroup,
|
|
default_channels=bs_channels,
|
|
default_pvs=pv_channels,
|
|
default_detectors=detectors_MX,
|
|
rate_multiplicator=1,
|
|
append_user_tag_to_data_dir=True
|
|
)
|
|
|
|
# There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place.
|
|
try:
|
|
mxdaq.update_config_pvs()
|
|
except Exception as e:
|
|
logger.warning(f"error updating config pvs for mxdaq: {e}")
|
|
|
|
check_intensity_gas_monitor = PVCondition(
|
|
"SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US",
|
|
vmin=4,
|
|
vmax=2000,
|
|
wait_time=0.5,
|
|
required_fraction=0.8,
|
|
)
|
|
|
|
mxscan = Scanner(default_acquisitions=[mxdaq], condition=check_intensity_gas_monitor)
|
|
|
|
mxgui = GUI(mxscan, show_goto=True, show_spec=False, show_scan=True, show_scan2D=False, show_run=False, show_static=False, show_sfx=True, start_tab="sfx")
|
|
|
|
#############################################
|
|
################# DAQ Setup #################
|
|
|
|
############## in positions ##############
|
|
coll_in_pos_x, coll_in_pos_y = 9.50, 1.38
|
|
backlight_in = -30000
|
|
detector_in_pos = 116
|
|
pt_in_pos_x1 = -4.301
|
|
pt_in_pos_x2 = -4.501
|
|
pt_in_pos_y1 = 10.996
|
|
pt_in_pos_y2 = 10.664
|
|
pt_in_pos_z = 0.5
|
|
|
|
|
|
############## out positions ##############
|
|
coll_out_pos_x, coll_out_pos_y = -12, 1.38
|
|
backlight_out = 1000
|
|
detector_out_pos = 220
|
|
pt_out_pos_x1 = 4.8
|
|
pt_out_pos_x2 = 4.6
|
|
pt_out_pos_y1 = -12.8
|
|
pt_out_pos_y2 = -13.1
|
|
pt_out_pos_z = -8
|
|
|
|
@as_shortcut
|
|
def a_data_collection():
|
|
|
|
# move backlight up
|
|
backlight.set( backlight_out ).wait()
|
|
|
|
# post-tube in
|
|
|
|
pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel
|
|
pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel
|
|
pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel
|
|
pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel
|
|
|
|
for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in): # this waits for all of them to be done!
|
|
logger.info(f"waiting for post-tube to move in")
|
|
t.wait()
|
|
|
|
pt_z_in = pt_z.set( pt_in_pos_z ).wait() # this no longer runs in parallel
|
|
|
|
# collimator in
|
|
cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel
|
|
cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel
|
|
for t in (cx_in, cy_in): # this waits for all of them to be done!
|
|
logger.info(f"waiting for collimator to move in")
|
|
t.wait()
|
|
|
|
# detector in
|
|
logger.info(f"waiting for detector to move in")
|
|
detector_z.set( detector_in_pos ).wait() # this runs in parallel
|
|
|
|
@as_shortcut
|
|
def b_sample_alignment():
|
|
|
|
# detector out
|
|
logger.info(f"waiting for detector to move out")
|
|
detector_z.set( detector_out_pos ).wait()
|
|
|
|
# collimator out
|
|
cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel
|
|
cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel
|
|
for t in (cx_out, cy_out): # this waits for all of them to be done!
|
|
logger.info(f"waiting for collimator to move out")
|
|
t.wait()
|
|
|
|
# post-tube out
|
|
pt_z_out = pt_z.set( pt_out_pos_z ).wait() # this runs in parallel
|
|
pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel
|
|
pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel
|
|
pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel
|
|
pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel
|
|
for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done!
|
|
logger.info(f"waiting for post_tube to move out")
|
|
t.wait()
|
|
|
|
@as_shortcut
|
|
def c_backlight_in():
|
|
|
|
# safety logic for backlight in
|
|
if round( coll_x.get(), 2 ) == coll_out_pos_x and round( pt_y1.get(), 2 ) == pt_out_pos_y1 and round( detector_z.get(), 2 ) > detector_in_pos:
|
|
backlight.set( backlight_in ).wait()
|
|
else:
|
|
logger.warning( "some devices are in the way" )
|
|
|
|
@as_shortcut
|
|
def ca_backlight_out():
|
|
backlight.set( backlight_out ).wait()
|
|
|
|
@as_shortcut
|
|
def post_tube_in():
|
|
|
|
# safety logic for backlight in
|
|
if backlight.get() > 0 and detector_z.get() > 20:
|
|
pt_x1_in = pt_x1.set( pt_in_pos_x1 ) # this runs in parallel
|
|
pt_x2_in = pt_x2.set( pt_in_pos_x2 ) # this runs in parallel
|
|
pt_y1_in = pt_y1.set( pt_in_pos_y1 ) # this runs in parallel
|
|
pt_y2_in = pt_y2.set( pt_in_pos_y2 ) # this runs in parallel
|
|
pt_z_in = pt_z.set( pt_in_pos_z ) # this runs in parallel
|
|
for t in (pt_x1_in, pt_x2_in, pt_y1_in, pt_y2_in, pt_z_in): # this waits for all of them to be done!
|
|
t.wait()
|
|
else:
|
|
logger.warning( "devices are in the way" )
|
|
|
|
@as_shortcut
|
|
def post_tube_out():
|
|
|
|
# safety logic for post-tube out
|
|
if detector_z.get() > 20:
|
|
pt_x1_out = pt_x1.set( pt_out_pos_x1 ) # this runs in parallel
|
|
pt_x2_out = pt_x2.set( pt_out_pos_x2 ) # this runs in parallel
|
|
pt_y1_out = pt_y1.set( pt_out_pos_y1 ) # this runs in parallel
|
|
pt_y2_out = pt_y2.set( pt_out_pos_y2 ) # this runs in parallel
|
|
pt_z_out = pt_z.set( pt_out_pos_z ) # this runs in parallel
|
|
for t in (pt_x1_out, pt_x2_out, pt_y1_out, pt_y2_out, pt_z_out): # this waits for all of them to be done!
|
|
t.wait()
|
|
else:
|
|
logger.warning( "detector needs to move" )
|
|
|
|
@as_shortcut
|
|
def coll_in():
|
|
cx_in = coll_x.set( coll_in_pos_x ) # this runs in parallel
|
|
cy_in = coll_y.set( coll_in_pos_y ) # this runs in parallel
|
|
for t in (cx_in, cy_in): # this waits for all of them to be done!
|
|
logger.info(f"waiting for collimator to move in")
|
|
t.wait()
|
|
|
|
@as_shortcut
|
|
def coll_out():
|
|
cx_out = coll_x.set( coll_out_pos_x ) # this runs in parallel
|
|
cy_out = coll_y.set( coll_out_pos_y ) # this runs in parallel
|
|
for t in (cx_out, cy_out): # this waits for all of them to be done!
|
|
logger.info(f"waiting for collimator to move out")
|
|
t.wait()
|
|
|
|
@as_shortcut
|
|
def detector_in():
|
|
|
|
# safety logic for detector in
|
|
if backlight.get() > 0 and pt_y1.get() > 8:
|
|
logger.info(f"waiting for detector to move in")
|
|
detector_z.set( detector_in_pos ).wait() # this runs in parallel
|
|
else:
|
|
logger.warning( "devices are in the way" )
|
|
|
|
@as_shortcut
|
|
def detector_out():
|
|
logger.info(f"waiting for detector to move out")
|
|
detector_z.set( detector_out_pos ).wait() # this runs in parallel
|
|
|
|
|
|
|