Files
cristallina/cristallina.py
2025-11-25 15:32:52 +01:00

413 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
import sys
import os
from loguru import logger
# at the moment this allows us to group the subdirectories as modules easily
# TODO: a more general way would be to have this cristallina as a installed package
os.chdir("/sf/cristallina/applications/slic/cristallina")
def setup_general_logging():
"""Setup logging to console and files in both the snapshots and
the respective pgroup.
"""
logger.remove()
logger.add(
sys.stderr,
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="INFO",
)
logger.info("Loading started.")
# create file handler which logs
try:
logger.add(
"/sf/cristallina/applications/beamline/snapshots/slic_logs/cristallina.log",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level="DEBUG",
rotation="1 week",
)
logger.info("Logging to snapshots.")
except PermissionError as e:
logger.warning("Cannot write log file to snapshots.")
logger.warning(e)
def setup_logging_pgroup(pgroup, level="INFO"):
try:
logger.add(
f"/sf/cristallina/data/{pgroup}/res/slic.log",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
level=level,
rotation="1 week",
)
logger.info(f"Logging to pgroup {pgroup}.")
except PermissionError as e:
logger.warning(f"Cannot write log file to pgroup {pgroup}.")
# We setup the logging before going further so that
# other modules can write startup messages into the log file.
setup_general_logging()
from beamline.alignment_laser import AlignmentLaser
from slic.gui import GUI
from slic.core.adjustable import Adjustable, PVAdjustable, DummyAdjustable
from slic.core.acquisition import SFAcquisition, PVAcquisition
from slic.core.condition import PVCondition
from slic.core.scanner import Scanner
from slic.devices.general.motor import Motor
from slic.utils import devices, Marker, as_shortcut, snapshot
from slic.utils import Channels, Config, Elog, Screenshot, PV
from slic.core.acquisition.fakeacquisition import FakeAcquisition
from channels.bs_channels import (
detectors,
detectors_MX,
bs_channels,
camera_channels,
)
from channels.pv_channels import pv_channels
from spreadsheet import overview
from channels.bs_channels import bs_channels_bernina_DCM
from channels.pv_channels import pv_channels_bernina_DCM
from channels.bs_channels import bs_channels_cristallina_beamline
from channels.pv_channels import pv_channels_cristallina_beamline
# from channels_minimal import detectors_min, channels_min, pvs_min
################# DEVICES #################
dummy = DummyAdjustable(units="au")
dummy2 = DummyAdjustable(ID='DUMMY2', name='Dummy2', units="au")
# from devices.knife_edge import KnifeEdge
# from devices.standa import standa
# from devices.newport import newport
from beamline.components import (
upstream_attenuator,
attenuator,
pp_shutter,
pulsepicker,
alignment_laser,
pbps113,
slits107,slits149,slitsi0,
mono,
m3,
i0,
)
from beamline.components import kbHor, kbVer
from systems.components import cta
from gp_exp.components import Newport_large, OWIS, slitsjj
# Undulators
from beamline import undulator
undulators = undulator.Undulators()
logger.info(f"Using undulator (Aramis) offset to PSSS energy of {undulator.energy_offset} eV.")
# Undulators & mono
from beamline import photon_energy
cr_photon_energy = photon_energy.PhotonEnergy()
# added limit to photon energy setpoint
cr_photon_energy.set_limits(5000, 13000)
logger.info(f"Photon energy offsets: PSSS {photon_energy.energy_offset_PSSS} eV, DCCM {photon_energy.energy_offset_DCCM} eV, undulator {photon_energy.energy_offset_undulators} eV.")
# Pulse picker shutter
#pp_shutter = PP_Shutter(
# "SARES30-LTIM01-EVR0:RearUniv0-Ena-SP", name="Cristallina pulse picker shutter"
#) # Shutter buttton when using the pulse picker
## Slits
from slic.devices.xoptics import slits
## Smaract & attocube stages
from gp_exp.smaract_device_def import smaract_Juraj, smaract_mini_XYZ
# from gp_exp.attocube_device_def import attocube # disabled fbecause of issues with Adjustables
from gp_exp.jj_device_def import jjslits
# Bernina monochromator
from beamline.bernina_mono import BerninaMono
BerninaDCM = BerninaMono("SAROP21-ODCM098")
# Diffractometer
from crq_exp.diffractometer import Diffractometer
dm1 = Diffractometer("SARES31-GPS")
dm2 = Diffractometer("SARES32-GPS")
# Set according to which diffractometer is being used
diffractometer = dm1
# Dilution fridge
from crq_exp.dilsc import Dilution
try:
dilution = Dilution('SARES31-DIL-LS1','SARES31-MAG-IPS1')
except Exception as e:
logger.warning(f"Error: Could not connect to dilution fridge. {e}")
dilution = None
# Temporary quick hack thermometer addition for stand
#TODO: make in a thermometer class ...
T_reg = PVAdjustable('SARES31-DIL-LS1:A_KELVIN')
T_plato = PVAdjustable('SARES31-DIL-LS1:7_KELVIN')
T_chip = PVAdjustable('SARES31-DIL-LS1:8_KELVIN')
# TODO: better attenuators
upstream_transmission = PVAdjustable("SAROP31-OATA150:UsrRec.TC1")
downstream_transmission = PVAdjustable("SARFE10-OATT053:UsrRec.TC1")
# from crq_exp.puma import Puma
# puma = Puma()
# temporary mono
# from exp_temp.mono import Mono_rotation
################# Stand setup ##################
from slic.core.acquisition.detcfg import DetectorConfig
from slic.core.acquisition.spreadsheet import Spreadsheet
from stand.time import Time
time_adjustable = Time()
adjs_for_spreadsheet = {
"Time": time_adjustable,
# "T_reg": T_reg,
# "T_plato": T_plato,
# "T_chip":T_chip,
"Transmission": downstream_transmission,
"Upstream Transmission": upstream_transmission,
"Energy_setpoint": cr_photon_energy,
"Energy_offset undulator": photon_energy.energy_offset_undulators,
"TD": diffractometer.td,
"TDY": diffractometer.td_y,
"TRX": diffractometer.tr_x,
"TRY": diffractometer.tr_y,
"TRXBASE": diffractometer.trx_base,
"TRYBASE": diffractometer.try_base,
"THETA": diffractometer.theta,
"TWOTHETA": diffractometer.twotheta,
# PuMa positions if available:
# "Sample_X": puma.sample_x,
# "Sample_Y": puma.sample_y,
# " Sample_Z": puma.sample_z,
# "Sample_R": puma.sample_r,
# "Magnet_X": puma.magnet_x,
# "Magnet_Y": puma.magnet_y,
# "Magnet_Z": puma.magnet_z,
}
if dilution is not None:
adjs_dilsc = {
"Magnet_X": dilution.x,
"Magnet_Y": dilution.y,
"Magnet_Z": dilution.z,
"DilSc_T_reg": dilution.T_CHA,
"DilSc_T_CH7": dilution.T_CH7,
"DilSc_T_CH8": dilution.T_CH8,
}
adjs_for_spreadsheet.update(adjs_dilsc)
# setup spreadsheet for transmission to stand
spreadsheet = Spreadsheet(
adjs_for_spreadsheet,
placeholders=("comment", "sample", "run_usable"),
host="saresc-vcons-02.psi.ch",
port=9090,
)
try:
from stand.client import Client
stand_host = "saresc-vcons-02.psi.ch"
stand_client = Client(host=stand_host, port=9090)
response = stand_client.get()
logger.info("Connected to stand server.")
except Exception as error:
# catching excpetions with a broad net because different connection errors can occur.
logger.warning(f"Cannot connect to stand server on {stand_host}. Disabling spreadsheet.")
spreadsheet = None
################# DAQ Setup #################
instrument = "cristallina"
# experiment_type = "MX" # "MX" or "Q" for the different setups and detector configurations
experiment_type = "Q" # "MX" or "Q" for the different setups and detector configurations
from pgroups import pgroup, pgroup_scratch
# setup pgroup specific logger
setup_logging_pgroup(pgroup)
logger.info(f"Using Cristallina{experiment_type} setup for detectors.")
if experiment_type == "MX":
daq = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels,
default_pvs=pv_channels,
default_detectors=detectors_MX,
rate_multiplicator=1,
spreadsheet=spreadsheet,
)
elif experiment_type == "Q":
daq = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels,
default_pvs=pv_channels,
default_detectors=detectors,
rate_multiplicator=1,
spreadsheet=spreadsheet,
)
else:
logger.error(f"Experiment type {experiment_type} not supported. Exiting.")
sys.exit(1)
# default_channels=bs_channels,
# default_pvs=pv_channels,
daq.update_config_pvs()
# There is a new EPICS buffer, so the archiver is no longer used. This makes sure we are taking PVs from the right place.
try:
daq.update_config_pvs()
except Exception as e:
print(e)
from acquisition import multiple_daqs
DAQS = multiple_daqs.generate_DAQS(instrument, pgroup, bs_channels, pv_channels, detectors, spreadsheet)
# daq_fake = FakeAcquisition(instrument, pgroup)
# daq_PV_only = PVAcquisition(instrument, pgroup, default_channels=channels_ks) # workaround for KS not going to DB
# Run the scan only when gas monitor value larger than 10uJ (and smaller than 2000uJ):
# required fraction defines amount of data recorded to save the step and move on to the next one
check_intensity_gas_monitor = PVCondition(
"SARFE10-PBPG050:PHOTON-ENERGY-PER-PULSE-US",
vmin=500,
vmax=2000,
wait_time=0.5,
required_fraction=0.8,
)
scan = Scanner(default_acquisitions=[daq], condition=check_intensity_gas_monitor)
# no condition
scan_without_beam = Scanner(default_acquisitions=[daq], condition=None)
gui = GUI(scan, show_goto=True, show_spec=True)
logger.info(f"Running at {instrument} with pgroup {pgroup}. Experiment type: {experiment_type}.")
logger.info("Loading finished.")
###################################################################
###### Temporary place for the p22266 beamtime ####################
###################################################################
from slic.core.adjustable.linked import LinkedInterpolated
from slic.core.scanner.scaninfo import ScanInfo
from slic.utils import nice_arange
from crq_exp import synchronization
# Temperature setpoint
temperature_setpoint=PVAdjustable('SARES31-DIL-LS1:H0_SETP_SET')
# d fixed to 3.0 mm, x_0 = 4.574 mm and theta_0 = -67.853 deg
# d = 3
# theta_0 = -67.853
# x_0 = 4.574
# thetas = np.linspace(-2, 25)
# TRX = d * np.sin(np.deg2rad(thetas + theta_0)) + x_0
# linked_theta = LinkedInterpolated("ThetaLinked", diffractometer.theta, diffractometer.tr_x, thetas, TRX)
# Sample 3
# d = 2.5000000
# x_0 = 4.04466204
# theta_0 = -64.1376413
# thetas = np.linspace(-10, 15)
# TRX = d * np.sin(np.deg2rad(thetas + theta_0)) + x_0
# linked_theta_s3 = LinkedInterpolated("ThetaLinked", diffractometer.theta, diffractometer.tr_x, thetas, TRX)
# Beware: double_pixels_action interpolate does not seem to work, results in empty data
detector_8M = DetectorConfig()
detector_8M.add("JF17T16V01", adc_to_energy=True, compression=True, crystfel_lists_laser=False, factor=0.1,
remove_raw_files=True, save_dap_results=False, geometry=True, disabled_modules = [0,1,2,3,4,5,8,9,10,11,12,13,14,15],double_pixels_action="keep")
detectors_8M_I0= DetectorConfig()
detectors_8M_I0.add("JF17T16V01", adc_to_energy=True, compression=True, crystfel_lists_laser=False, factor=0.1,
remove_raw_files=True, save_dap_results=False, geometry=True, disabled_modules = [0,1,2,3,4,5,8,9,10,11,12,13,14,15], double_pixels_action="keep")
detectors_8M_I0.add("JF20T01V01", adc_to_energy=True, compression=True, crystfel_lists_laser=False, factor=1,
remove_raw_files=True, save_dap_results=False, geometry=False, double_pixels_action="keep")
daq_8M_I0 = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels,
default_pvs=pv_channels,
default_detectors=detectors_8M_I0,
rate_multiplicator=1,
spreadsheet=spreadsheet)
detectors_1p5M_I0= DetectorConfig()
detectors_1p5M_I0.add("JF16T03V02", adc_to_energy=True, compression=True, crystfel_lists_laser=False, factor=0.1,
remove_raw_files=True, save_dap_results=False, geometry=True, double_pixels_action="keep")
detectors_1p5M_I0.add("JF20T01V01", adc_to_energy=True, compression=True, crystfel_lists_laser=False, factor=1,
remove_raw_files=True, save_dap_results=False, geometry=False, double_pixels_action="keep")
daq_1p5M_I0 = SFAcquisition(
instrument,
pgroup,
default_channels=bs_channels,
default_pvs=pv_channels,
default_detectors=detectors_1p5M_I0,
rate_multiplicator=1,
spreadsheet=spreadsheet)