325 lines
9.2 KiB
Python
Executable File
325 lines
9.2 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import sys
|
|
import os
|
|
from loguru import logger
|
|
|
|
# at the moment this allows us to group the subdirectories as modules easily
|
|
# TODO: a more general way would be to have this cristallina as a installed package
|
|
os.chdir("/sf/cristallina/applications/slic/cristallina")
|
|
|
|
|
|
def setup_general_logging():
|
|
"""Setup logging to console and files in both the snapshots and
|
|
the respective pgroup.
|
|
"""
|
|
|
|
logger.remove()
|
|
logger.add(
|
|
sys.stderr,
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level="INFO",
|
|
)
|
|
logger.info("Loading started.")
|
|
|
|
# create file handler which logs
|
|
try:
|
|
logger.add(
|
|
"/sf/cristallina/applications/beamline/snapshots/slic_logs/cristallina.log",
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level="DEBUG",
|
|
rotation="1 week",
|
|
)
|
|
logger.info("Logging to snapshots.")
|
|
except PermissionError as e:
|
|
logger.warning("Cannot write log file to snapshots.")
|
|
logger.warning(e)
|
|
|
|
|
|
def setup_logging_pgroup(pgroup, level="INFO"):
|
|
try:
|
|
logger.add(
|
|
f"/sf/cristallina/data/{pgroup}/scratch/slic.log",
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
level=level,
|
|
rotation="1 week",
|
|
)
|
|
logger.info(f"Logging to pgroup {pgroup}.")
|
|
except PermissionError as e:
|
|
logger.warning(f"Cannot write log file to pgroup {pgroup}.")
|
|
|
|
|
|
# We setup the logging before going further so that
|
|
# other modules can write startup messages into the log file.
|
|
setup_general_logging()
|
|
|
|
from beamline.alignment_laser import AlignmentLaser
|
|
|
|
from slic.gui import GUI
|
|
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
|
|
from slic.core.acquisition import SFAcquisition, PVAcquisition
|
|
from slic.core.condition import PVCondition
|
|
from slic.core.scanner import Scanner
|
|
|
|
|
|
from slic.devices.general.motor import Motor
|
|
|
|
from slic.utils import devices, Marker, as_shortcut, snapshot
|
|
from slic.utils import Channels, Config, Elog, Screenshot, PV
|
|
from slic.core.acquisition.fakeacquisition import FakeAcquisition
|
|
|
|
|
|
from channels.bs_channels import (
|
|
detectors,
|
|
detectors_MX,
|
|
bs_channels,
|
|
camera_channels,
|
|
)
|
|
|
|
from channels.pv_channels import pv_channels
|
|
from spreadsheet import overview
|
|
|
|
from channels.bs_channels import bs_channels_bernina_DCM
|
|
from channels.pv_channels import pv_channels_bernina_DCM
|
|
|
|
from channels.bs_channels import bs_channels_cristallina_beamline
|
|
from channels.pv_channels import pv_channels_cristallina_beamline
|
|
|
|
|
|
# from channels_minimal import detectors_min, channels_min, pvs_min
|
|
|
|
|
|
################# DEVICES #################
|
|
dummy = DummyAdjustable(units="au")
|
|
dummy2 = DummyAdjustable(ID='DUMMY2', name='Dummy2', units="au")
|
|
|
|
|
|
# from devices.knife_edge import KnifeEdge
|
|
# from devices.standa import standa
|
|
# from devices.newport import newport
|
|
|
|
|
|
from beamline.components import (
|
|
upstream_attenuator,
|
|
attenuator,
|
|
pp_shutter,
|
|
pulsepicker,
|
|
alignment_laser,
|
|
pbps113,
|
|
slits107,slits149,slitsi0,
|
|
mono,
|
|
m3,
|
|
i0,
|
|
)
|
|
|
|
from beamline.components import kbHor, kbVer
|
|
|
|
from systems.components import cta
|
|
|
|
from gp_exp.components import Newport_large, OWIS, slitsjj
|
|
|
|
|
|
|
|
|
|
# Undulators
|
|
from beamline import undulator
|
|
|
|
undulators = undulator.Undulators()
|
|
|
|
logger.info(f"Using undulator (Aramis) offset to PSSS energy of {undulator.energy_offset} eV.")
|
|
|
|
|
|
|
|
# Undulators & mono
|
|
from beamline import photon_energy
|
|
|
|
cr_photon_energy = photon_energy.PhotonEnergy()
|
|
|
|
logger.info(f"Photon energy offsets: PSSS {photon_energy.energy_offset_PSSS} eV , DCCM {photon_energy.energy_offset_DCCM} eV, undulator {photon_energy.energy_offset_undulators} eV.")
|
|
|
|
|
|
|
|
|
|
# Pulse picker shutter
|
|
#pp_shutter = PP_Shutter(
|
|
# "SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", name="Cristallina pulse picker shutter"
|
|
#) # Shutter buttton when using the pulse picker
|
|
|
|
## Slits
|
|
from slic.devices.xoptics import slits
|
|
|
|
## Smaract & attocube stages
|
|
from gp_exp.smaract_device_def import smaract_Juraj, smaract_mini_XYZ
|
|
# from gp_exp.attocube_device_def import attocube # disabled fbecause of issues with Adjustables
|
|
from gp_exp.jj_device_def import jjslits
|
|
|
|
# Bernina monochromator
|
|
from beamline.bernina_mono import BerninaMono
|
|
|
|
BerninaDCM = BerninaMono("SAROP21-ODCM098")
|
|
|
|
# Diffractometer
|
|
from crq_exp.diffractometer import Diffractometer
|
|
dm1 = Diffractometer("SARES31-GPS")
|
|
dm2 = Diffractometer("SARES32-GPS")
|
|
|
|
# Set according to which diffractometer is being used
|
|
diffractometer = dm1
|
|
|
|
# Dilution fridge
|
|
#from crq_exp.dilsc import Dilution
|
|
|
|
try:
|
|
dilution = Dilution()
|
|
except Exception as e:
|
|
logger.warning(f"Error: Could not connect to dilution fridge. {e}")
|
|
dilution = None
|
|
|
|
|
|
# MX adajustables
|
|
# import mx.mx_adjustables
|
|
|
|
# Temporary quick hack thermometer addition for stand
|
|
#TODO: make in a thermometer class ...
|
|
T_reg = PVAdjustable('SARES31-DIL-LS1:A_KELVIN')
|
|
T_plato = PVAdjustable('SARES31-DIL-LS1:7_KELVIN')
|
|
T_chip = PVAdjustable('SARES31-DIL-LS1:8_KELVIN')
|
|
|
|
|
|
################# Stand setup ##################
|
|
|
|
# TODO: requires the stand client, need small howto how to start and configure or let it run all the time
|
|
from slic.core.acquisition.spreadsheet import Spreadsheet
|
|
from stand.time import Time
|
|
|
|
|
|
adjs_for_spreadsheet = {
|
|
"Time": str(Time()),
|
|
"T_reg": T_reg,
|
|
"T_plato": T_plato,
|
|
"T_chip":T_chip,
|
|
"Transmission": attenuator.trans1st,
|
|
"Upstream Transmission": upstream_attenuator.trans1st,
|
|
"Energy_setpoint": cr_photon_energy,
|
|
"Energy_offset undulator": photon_energy.energy_offset_undulators,
|
|
"TD": diffractometer.td,
|
|
"TRX": diffractometer.tr_x,
|
|
"TRY": diffractometer.tr_y,
|
|
"TRXBASE": diffractometer.trx_base,
|
|
"TRYBASE": diffractometer.try_base,
|
|
"THETA": diffractometer.theta,
|
|
"TWOTHETA": diffractometer.twotheta,
|
|
}
|
|
|
|
|
|
if dilution is not None:
|
|
adjs_dilsc = {
|
|
#"Magnet_X": dilution.x,
|
|
#"Magnet_Y": dilution.y,
|
|
#"Magnet_Z": dilution.z,
|
|
"DilSc_T_chip": T_chip,
|
|
"DilSc_T_plato": T_plato,
|
|
#"DilSc_T_chip": dilution.T_chip,
|
|
"DilSc_T_pucksensor": T_reg,
|
|
}
|
|
adjs_for_spreadsheet.update(adjs_dilsc)
|
|
|
|
|
|
# temporary mono
|
|
from exp_temp.mono import Mono_rotation
|
|
|
|
|
|
# setup spreadsheet for transmission to stand
|
|
spreadsheet = Spreadsheet(
|
|
adjs_for_spreadsheet,
|
|
placeholders=("comment", "sample", "run_usable"),
|
|
host="saresc-vcons-02.psi.ch",
|
|
port=9090,
|
|
)
|
|
|
|
try:
|
|
from stand.client import Client
|
|
stand_host = "saresc-vcons-02.psi.ch"
|
|
stand_client = Client(host=stand_host, port=9090)
|
|
response = stand_client.get()
|
|
logger.info("Connected to stand server")
|
|
except Exception as error:
|
|
# catching excpetions with a broad net because different connection errors can occur.
|
|
logger.warning(f"Cannot connect to stand server on {stand_host}. Disabling spreadsheet.")
|
|
spreadsheet = None
|
|
|
|
# spreadsheet = Spreadsheet(adjs, placeholders=PLACEHOLDERS, host="127.0.0.1", port=8080))
|
|
|
|
################# DAQ Setup #################
|
|
instrument = "cristallina"
|
|
# experiment_type = "MX" # "MX" or "Q" for the different setups and detector configurations
|
|
experiment_type = "Q" # "MX" or "Q" for the different setups and detector configurations
|
|
|
|
from pgroups import pgroup, pgroup_scratch
|
|
|
|
# setup pgroup specific logger
|
|
setup_logging_pgroup(pgroup)
|
|
|
|
logger.info(f"Using Cristallina{experiment_type} setup for detectors.")
|
|
|
|
if experiment_type == "MX":
|
|
daq = SFAcquisition(
|
|
instrument,
|
|
pgroup,
|
|
default_channels=bs_channels,
|
|
default_pvs=pv_channels,
|
|
default_detectors=detectors_MX,
|
|
rate_multiplicator=1,
|
|
spreadsheet=spreadsheet,
|
|
)
|
|
elif experiment_type == "Q":
|
|
daq = SFAcquisition(
|
|
instrument,
|
|
pgroup,
|
|
default_channels=bs_channels,
|
|
default_pvs=pv_channels,
|
|
default_detectors=detectors,
|
|
rate_multiplicator=1,
|
|
spreadsheet=spreadsheet,
|
|
)
|
|
else:
|
|
logger.error(f"Experiment type {experiment_type} not supported. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# default_channels=bs_channels,
|
|
# default_pvs=pv_channels,
|
|
|
|
|
|
daq.update_config_pvs()
|
|
|
|
# There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place.
|
|
try:
|
|
daq.update_config_pvs()
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
from acquisition import multiple_daqs
|
|
|
|
DAQS = multiple_daqs.generate_DAQS(instrument, pgroup, bs_channels, pv_channels, detectors, spreadsheet)
|
|
|
|
# daq_fake = FakeAcquisition(instrument, pgroup)
|
|
# daq_PV_only = PVAcquisition(instrument, pgroup, default_channels=channels_ks) # workaround for KS not going to DB
|
|
|
|
# Run the scan only when gas monitor value larger than 10uJ (and smaller than 2000uJ):
|
|
# required fraction defines ammount of data recorded to save the step and move on to the next one
|
|
check_intensity_gas_monitor = PVCondition(
|
|
"SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US",
|
|
vmin=100,
|
|
vmax=2000,
|
|
wait_time=0.5,
|
|
required_fraction=0.8,
|
|
)
|
|
|
|
scan = Scanner(default_acquisitions=[daq], condition=None)
|
|
gui = GUI(scan, show_goto=True, show_spec=True)
|
|
|
|
from exp_temp import SAT
|
|
|
|
|
|
logger.info(f"Running at {instrument} with pgroup {pgroup}. Experiment type: {experiment_type}.")
|
|
logger.info("Loading finished.")
|